Coverage for functions \ flipdare \ task \ trigger_task_handler.py: 58%
31 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-05-08 12:22 +1000
« prev ^ index » next coverage.py v7.13.0, created at 2026-05-08 12:22 +1000
1#!/usr/bin/env python
2# Copyright (c) 2026 Flipdare Pty Ltd. All rights reserved.
3#
4# This file is part of Flipdare's proprietary software and contains
5# confidential and copyrighted material. Unauthorised copying,
6# modification, distribution, or use of this file is strictly
7# prohibited without prior written permission from Flipdare Pty Ltd.
8#
9# This software includes third-party components licensed under MIT,
10# BSD, and Apache 2.0 licences. See THIRD_PARTY_NOTICES for details.
11#
13from __future__ import annotations
15from typing import TYPE_CHECKING
16from collections.abc import Callable
17from flipdare.manager.service_manager import ServiceManager
18from flipdare.result.outcome import Outcome
19from flipdare.generated.shared.app_error_code import AppErrorCode
20from flipdare.generated.shared.backend.app_job_type import AppJobType
21from flipdare.job_types import TriggerJobType
22from flipdare.service._service_provider import ServiceProvider
23from flipdare.wrapper.backend.app_job_wrapper import AppJobWrapper
25if TYPE_CHECKING:
26 from flipdare.manager.db_manager import DbManager
27 from flipdare.manager.backend_manager import BackendManager
30__all__ = ["TriggerTaskHandler"]
33type _DispatchType = dict[TriggerJobType, Callable[..., Outcome]]
34type _DispatchTypeOpt = _DispatchType | None
37class TriggerTaskHandler(ServiceProvider):
38 def __init__(
39 self,
40 db_manager: DbManager | None = None,
41 system_manager: BackendManager | None = None,
42 admin_manager: ServiceManager | None = None,
43 ) -> None:
44 super().__init__(
45 db_manager=db_manager,
46 backend_manager=system_manager,
47 service_manager=admin_manager,
48 )
50 self._dispatcher: _DispatchTypeOpt = None
52 @property
53 def dispatcher(self) -> _DispatchType:
54 # this is faster than a case, and will be called many times
55 # so performance is an issue
56 if self._dispatcher is not None:
57 return self._dispatcher
59 service_manager = self.service_manager
60 self._dispatcher = {
61 AppJobType.TR_USER: service_manager.user.trigger_user,
62 AppJobType.TR_USER_ANONYMIZE: service_manager.compliance.trigger_user_anonymize,
63 AppJobType.TR_CONTENT: service_manager.content.trigger_content,
64 AppJobType.TR_CONTENT_DELETE: service_manager.compliance.trigger_content_delete,
65 AppJobType.TR_INVITE: service_manager.friend.trigger_invite,
66 AppJobType.TR_FRIEND: service_manager.friend.trigger_friend,
67 # AppJobType.TR_CHAT: service_manager.chat.trigger_chat,
68 AppJobType.TR_CHAT_DELETE: service_manager.compliance.trigger_chat_delete,
69 AppJobType.TR_DARE: service_manager.dare.trigger_dare,
70 AppJobType.TR_DARE_DELETE: service_manager.compliance.trigger_dare_delete,
71 AppJobType.TR_GROUP: service_manager.group.trigger_group,
72 AppJobType.TR_GROUP_MEMBER: service_manager.group.trigger_group_member,
73 AppJobType.TR_FLAG: service_manager.flag.trigger_flag,
74 AppJobType.TR_PLEDGE: service_manager.pledge.trigger_pledge,
75 AppJobType.TR_PLEDGE_DELETE: service_manager.compliance.trigger_pledge_delete,
76 }
78 return self._dispatcher
80 def run_trigger(
81 self,
82 job_type: TriggerJobType,
83 job: AppJobWrapper,
84 ) -> Outcome:
85 handler_fn = self.dispatcher.get(job_type)
86 if handler_fn is None:
87 msg = f"No trigger handler found for job type: {job_type}"
88 self.app_logger.system_error(
89 error_code=AppErrorCode.TRIGGER,
90 job_type=job_type,
91 message=msg,
92 data=job.to_dict(),
93 )
94 return Outcome.ERROR
96 return handler_fn(job)