Coverage for functions \ flipdare \ task \ trigger_task_handler.py: 58%

31 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-05-08 12:22 +1000

1#!/usr/bin/env python 

2# Copyright (c) 2026 Flipdare Pty Ltd. All rights reserved. 

3# 

4# This file is part of Flipdare's proprietary software and contains 

5# confidential and copyrighted material. Unauthorised copying, 

6# modification, distribution, or use of this file is strictly 

7# prohibited without prior written permission from Flipdare Pty Ltd. 

8# 

9# This software includes third-party components licensed under MIT, 

10# BSD, and Apache 2.0 licences. See THIRD_PARTY_NOTICES for details. 

11# 

12 

13from __future__ import annotations 

14 

15from typing import TYPE_CHECKING 

16from collections.abc import Callable 

17from flipdare.manager.service_manager import ServiceManager 

18from flipdare.result.outcome import Outcome 

19from flipdare.generated.shared.app_error_code import AppErrorCode 

20from flipdare.generated.shared.backend.app_job_type import AppJobType 

21from flipdare.job_types import TriggerJobType 

22from flipdare.service._service_provider import ServiceProvider 

23from flipdare.wrapper.backend.app_job_wrapper import AppJobWrapper 

24 

25if TYPE_CHECKING: 

26 from flipdare.manager.db_manager import DbManager 

27 from flipdare.manager.backend_manager import BackendManager 

28 

29 

30__all__ = ["TriggerTaskHandler"] 

31 

32 

33type _DispatchType = dict[TriggerJobType, Callable[..., Outcome]] 

34type _DispatchTypeOpt = _DispatchType | None 

35 

36 

37class TriggerTaskHandler(ServiceProvider): 

38 def __init__( 

39 self, 

40 db_manager: DbManager | None = None, 

41 system_manager: BackendManager | None = None, 

42 admin_manager: ServiceManager | None = None, 

43 ) -> None: 

44 super().__init__( 

45 db_manager=db_manager, 

46 backend_manager=system_manager, 

47 service_manager=admin_manager, 

48 ) 

49 

50 self._dispatcher: _DispatchTypeOpt = None 

51 

52 @property 

53 def dispatcher(self) -> _DispatchType: 

54 # this is faster than a case, and will be called many times 

55 # so performance is an issue 

56 if self._dispatcher is not None: 

57 return self._dispatcher 

58 

59 service_manager = self.service_manager 

60 self._dispatcher = { 

61 AppJobType.TR_USER: service_manager.user.trigger_user, 

62 AppJobType.TR_USER_ANONYMIZE: service_manager.compliance.trigger_user_anonymize, 

63 AppJobType.TR_CONTENT: service_manager.content.trigger_content, 

64 AppJobType.TR_CONTENT_DELETE: service_manager.compliance.trigger_content_delete, 

65 AppJobType.TR_INVITE: service_manager.friend.trigger_invite, 

66 AppJobType.TR_FRIEND: service_manager.friend.trigger_friend, 

67 # AppJobType.TR_CHAT: service_manager.chat.trigger_chat, 

68 AppJobType.TR_CHAT_DELETE: service_manager.compliance.trigger_chat_delete, 

69 AppJobType.TR_DARE: service_manager.dare.trigger_dare, 

70 AppJobType.TR_DARE_DELETE: service_manager.compliance.trigger_dare_delete, 

71 AppJobType.TR_GROUP: service_manager.group.trigger_group, 

72 AppJobType.TR_GROUP_MEMBER: service_manager.group.trigger_group_member, 

73 AppJobType.TR_FLAG: service_manager.flag.trigger_flag, 

74 AppJobType.TR_PLEDGE: service_manager.pledge.trigger_pledge, 

75 AppJobType.TR_PLEDGE_DELETE: service_manager.compliance.trigger_pledge_delete, 

76 } 

77 

78 return self._dispatcher 

79 

80 def run_trigger( 

81 self, 

82 job_type: TriggerJobType, 

83 job: AppJobWrapper, 

84 ) -> Outcome: 

85 handler_fn = self.dispatcher.get(job_type) 

86 if handler_fn is None: 

87 msg = f"No trigger handler found for job type: {job_type}" 

88 self.app_logger.system_error( 

89 error_code=AppErrorCode.TRIGGER, 

90 job_type=job_type, 

91 message=msg, 

92 data=job.to_dict(), 

93 ) 

94 return Outcome.ERROR 

95 

96 return handler_fn(job)