Coverage for functions \ flipdare \ service \ group_service.py: 52%

71 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-05-08 12:22 +1000

1#!/usr/bin/env python 

2# Copyright (c) 2026 Flipdare Pty Ltd. All rights reserved. 

3# 

4# This file is part of Flipdare's proprietary software and contains 

5# confidential and copyrighted material. Unauthorised copying, 

6# modification, distribution, or use of this file is strictly 

7# prohibited without prior written permission from Flipdare Pty Ltd. 

8# 

9# This software includes third-party components licensed under MIT, 

10# BSD, and Apache 2.0 licences. See THIRD_PARTY_NOTICES for details. 

11# 

12 

13from __future__ import annotations 

14from typing import TYPE_CHECKING 

15from flipdare.app_types import CronResult 

16from flipdare.core.cron_decorator import cron_decorator 

17from flipdare.service._service_provider import ServiceProvider 

18from flipdare.service.core.cron_processor import CronConfig, CronProcessor 

19from flipdare.service.processor.group_member_processor import GroupMemberProcessor 

20from flipdare.service.processor.group_processor import GroupProcessor 

21from flipdare.result.app_result import AppResult 

22from flipdare.core.job_type_decorator import job_type_decorator 

23from flipdare.result.job_result import JobResult 

24from flipdare.core.trigger_decorator import trigger_decorator 

25from flipdare.generated import AppJobType 

26from flipdare.generated.shared.firestore_collections import FirestoreCollections 

27from flipdare.wrapper import ( 

28 AppJobWrapper, 

29 GroupMemberWrapper, 

30 GroupWrapper, 

31) 

32 

33if TYPE_CHECKING: 

34 from flipdare.manager.db_manager import DbManager 

35 from flipdare.manager.backend_manager import BackendManager 

36 

37__all__ = ["GroupService"] 

38 

39 

40_JT = AppJobType 

41_GROUP = FirestoreCollections.GROUP 

42_MBR = FirestoreCollections.GROUP_MEMBER 

43 

44 

45class GroupService(ServiceProvider): 

46 """This is for friends and invites ..""" 

47 

48 def __init__( 

49 self, 

50 db_manager: DbManager | None = None, 

51 backend_manager: BackendManager | None = None, 

52 ) -> None: 

53 super().__init__( 

54 backend_manager=backend_manager, 

55 db_manager=db_manager, 

56 ) 

57 

58 self._group_processor: GroupProcessor | None = None 

59 self._group_member_processor: GroupMemberProcessor | None = None 

60 

61 @property 

62 def group_processor(self) -> GroupProcessor: 

63 if self._group_processor is None: 

64 self._group_processor = GroupProcessor( 

65 bucket=self.storage_bucket, 

66 group_db=self.group_db, 

67 indexer_service=self.indexer, 

68 ) 

69 return self._group_processor 

70 

71 @property 

72 def group_member_processor(self) -> GroupMemberProcessor: 

73 if self._group_member_processor is None: 

74 self._group_member_process = GroupMemberProcessor( 

75 bucket=self.storage_bucket, 

76 group_db=self.group_db, 

77 indexer_service=self.indexer, 

78 notification_service=self.notification_service, 

79 ) 

80 return self._group_member_process 

81 

82 # ======================================================================== 

83 # CRONS 

84 # ======================================================================== 

85 

86 @cron_decorator(job_type=_JT.CR_GROUP_UNPROCESSED) 

87 def cron_group_unprocessed(self) -> CronResult: 

88 """Process unprocessed groups.""" 

89 group = self.service_manager.group 

90 config = CronConfig( 

91 job_type=_JT.CR_GROUP_UNPROCESSED, 

92 job_name="cron_group", 

93 query_fn=lambda: self.group_db.get_recent_group_unprocessed(), 

94 process_fn=lambda g: group.group_processor.process_group(g, is_update=True), 

95 ) 

96 return CronProcessor(config).process_result() 

97 

98 @cron_decorator(job_type=_JT.CR_GROUP_MEMBER_UNPROCESSED) 

99 def cron_group_member_unprocessed(self) -> CronResult: 

100 """Process unprocessed group members.""" 

101 group = self.service_manager.group 

102 config = CronConfig( 

103 job_type=_JT.CR_GROUP_MEMBER_UNPROCESSED, 

104 job_name="cron_group_member_unprocessed", 

105 query_fn=lambda: self.group_db.get_recent_member_unprocessed(), 

106 process_fn=lambda member: group.group_member_processor.process_group_member(member), 

107 ) 

108 return CronProcessor(config).process_result() 

109 

110 @cron_decorator(job_type=_JT.CR_GROUP_MEMBER_STATUS_UNPROCESSED) 

111 def cron_group_member_status_unprocessed(self) -> CronResult: 

112 """Process group member status changes.""" 

113 group = self.service_manager.group 

114 config = CronConfig( 

115 job_type=_JT.CR_GROUP_MEMBER_STATUS_UNPROCESSED, 

116 job_name="cron_group_member_status_unprocessed", 

117 query_fn=lambda: self.group_db.get_recent_member_unprocessed_status(), 

118 process_fn=lambda member: group.group_member_processor.process_group_member(member), 

119 ) 

120 return CronProcessor(config).process_result() 

121 

122 # ======================================================================== 

123 # TRIGGERS - Delegate to processors 

124 # ======================================================================== 

125 

126 @job_type_decorator(_JT.TR_GROUP) 

127 @trigger_decorator(job_type=_JT.TR_GROUP, collection=_GROUP, wrapper_class=GroupWrapper) 

128 def trigger_group( 

129 self, 

130 job: AppJobWrapper, 

131 *, 

132 wrapper: GroupWrapper, 

133 ) -> JobResult[GroupWrapper]: 

134 group_id = wrapper.doc_id 

135 is_update = job.has_changes 

136 

137 main_result = AppResult[GroupWrapper]( 

138 doc_id=job.doc_id, task_name=f" for group {group_id}" 

139 ) 

140 process_result = self.group_processor.process_group(wrapper, is_update=is_update) 

141 if process_result.is_error: 

142 main_result.merge(process_result) 

143 return JobResult.from_result( 

144 main_result, 

145 doc_id=group_id, 

146 data=wrapper.to_json_dict(), 

147 ) 

148 

149 return JobResult.ok(doc_id=group_id) 

150 

151 @job_type_decorator(_JT.TR_GROUP_MEMBER) 

152 @trigger_decorator( 

153 job_type=_JT.TR_GROUP_MEMBER, collection=_MBR, wrapper_class=GroupMemberWrapper 

154 ) 

155 def trigger_group_member( 

156 self, 

157 job: AppJobWrapper, 

158 *, 

159 wrapper: GroupMemberWrapper, 

160 ) -> JobResult[GroupMemberWrapper]: 

161 doc_id = job.obj_id 

162 member_id = wrapper.doc_id 

163 

164 main_result = AppResult[GroupMemberWrapper]( 

165 task_name=f" for group member {member_id}", doc_id=doc_id 

166 ) 

167 process_result = self._group_member_process.process_group_member(wrapper) 

168 if process_result.is_error: 

169 main_result.merge(process_result) 

170 return JobResult.from_result( 

171 main_result, 

172 doc_id=member_id, 

173 data=wrapper.to_json_dict(), 

174 ) 

175 

176 return JobResult.ok(doc_id=member_id)