Coverage for functions \ flipdare \ service \ compliance_service.py: 99%

86 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-05-08 12:22 +1000

1#!/usr/bin/env python 

2# Copyright (c) 2026 Flipdare Pty Ltd. All rights reserved. 

3# 

4# This file is part of Flipdare's proprietary software and contains 

5# confidential and copyrighted material. Unauthorised copying, 

6# modification, distribution, or use of this file is strictly 

7# prohibited without prior written permission from Flipdare Pty Ltd. 

8# 

9# This software includes third-party components licensed under MIT, 

10# BSD, and Apache 2.0 licences. See THIRD_PARTY_NOTICES for details. 

11# 

12 

13from __future__ import annotations 

14 

15from collections.abc import Callable 

16from typing import TYPE_CHECKING, Any 

17 

18from flipdare.app_log import LOG 

19from flipdare.constants import IS_DEBUG 

20from flipdare.result.app_result import AppResult 

21from flipdare.core.job_type_decorator import job_type_decorator 

22from flipdare.result.job_result import JobResult 

23from flipdare.core.trigger_decorator import trigger_decorator 

24from flipdare.core.tokenizer import Tokenizer 

25from flipdare.generated import AppErrorCode, AppJobType 

26from flipdare.generated.model.backend.compliance_model import ComplianceModel 

27from flipdare.generated.shared.firestore_collections import FirestoreCollections 

28from flipdare.service._service_provider import ServiceProvider 

29from flipdare.service.processor.compliance_processor import AnonymizeOptions, ComplianceProcessor 

30from flipdare.wrapper import ( 

31 AppJobWrapper, 

32 ChatWrapper, 

33 ContentWrapper, 

34 DareWrapper, 

35 PersistedWrapper, 

36 PledgeWrapper, 

37 UserWrapper, 

38) 

39 

40if TYPE_CHECKING: 

41 from flipdare.manager.db_manager import DbManager 

42 from flipdare.manager.backend_manager import BackendManager 

43 

44__all__ = ["AnonymizeOptions", "ComplianceService"] 

45 

46_JT = AppJobType 

47_COL = FirestoreCollections.COMPLIANCE 

48 

49 

50class ComplianceService(ServiceProvider): 

51 def __init__( 

52 self, 

53 db_manager: DbManager | None = None, 

54 backend_manager: BackendManager | None = None, 

55 tokenize: Tokenizer | None = None, 

56 processor: ComplianceProcessor | None = None, 

57 ) -> None: 

58 super().__init__( 

59 backend_manager=backend_manager, 

60 db_manager=db_manager, 

61 ) 

62 self.tokenizer = tokenize or Tokenizer.instance() 

63 self._processor = processor 

64 

65 @property 

66 def processor(self) -> ComplianceProcessor: 

67 if self._processor is None: 

68 self._processor = ComplianceProcessor( 

69 compliance_bridge=self.compliance_bridge, 

70 indexer_service=self.indexer, 

71 tokenizer=self.tokenizer, 

72 ) 

73 return self._processor 

74 

75 # ======================================================================== 

76 # Triggers 

77 # ======================================================================== 

78 

79 @job_type_decorator(_JT.TR_USER_ANONYMIZE) 

80 @trigger_decorator(job_type=_JT.TR_USER_ANONYMIZE, collection=_COL, wrapper_class=UserWrapper) 

81 def trigger_user_anonymize( 

82 self, 

83 job: AppJobWrapper, 

84 *, 

85 wrapper: UserWrapper, 

86 ) -> JobResult[UserWrapper]: 

87 # Users are anonymized instead of deleted to preserve referential integrity. 

88 return self._run_anonymize(job=job, wrapper=wrapper) 

89 

90 @job_type_decorator(_JT.TR_CONTENT_DELETE) 

91 @trigger_decorator( 

92 job_type=_JT.TR_CONTENT_DELETE, collection=_COL, wrapper_class=ContentWrapper 

93 ) 

94 def trigger_content_delete( 

95 self, 

96 job: AppJobWrapper, 

97 *, 

98 wrapper: ContentWrapper, 

99 ) -> JobResult[ContentWrapper]: 

100 return self._run_delete( 

101 job=job, 

102 wrapper=wrapper, 

103 delete_handler=self.content_bridge.delete, 

104 delete_search_index=True, 

105 ) 

106 

107 @job_type_decorator(_JT.TR_CHAT_DELETE) 

108 @trigger_decorator(job_type=_JT.TR_CHAT_DELETE, collection=_COL, wrapper_class=ChatWrapper) 

109 def trigger_chat_delete( 

110 self, 

111 job: AppJobWrapper, 

112 *, 

113 wrapper: ChatWrapper, 

114 ) -> JobResult[ChatWrapper]: 

115 return self._run_delete( 

116 job=job, 

117 wrapper=wrapper, 

118 delete_handler=self.chat_bridge.delete, 

119 delete_search_index=False, 

120 ) 

121 

122 @job_type_decorator(_JT.TR_DARE_DELETE) 

123 @trigger_decorator(job_type=_JT.TR_DARE_DELETE, collection=_COL, wrapper_class=DareWrapper) 

124 def trigger_dare_delete( 

125 self, 

126 job: AppJobWrapper, 

127 *, 

128 wrapper: DareWrapper, 

129 ) -> JobResult[DareWrapper]: 

130 return self._run_delete( 

131 job=job, 

132 wrapper=wrapper, 

133 delete_handler=self.dare_bridge.delete, 

134 delete_search_index=True, 

135 ) 

136 

137 @job_type_decorator(_JT.TR_PLEDGE_DELETE) 

138 @trigger_decorator(job_type=_JT.TR_PLEDGE_DELETE, collection=_COL, wrapper_class=PledgeWrapper) 

139 def trigger_pledge_delete( 

140 self, 

141 job: AppJobWrapper, 

142 *, 

143 wrapper: PledgeWrapper, 

144 ) -> JobResult[PledgeWrapper]: 

145 return self._run_delete( 

146 job=job, 

147 wrapper=wrapper, 

148 delete_handler=self.pledge_bridge.delete, 

149 delete_search_index=False, 

150 ) 

151 

152 # ======================================================================== 

153 # Core 

154 # ======================================================================== 

155 

156 def _run_delete( 

157 self, 

158 job: AppJobWrapper, 

159 wrapper: PersistedWrapper[Any], 

160 delete_handler: Callable[[str], AppResult[None]], 

161 delete_search_index: bool, 

162 ) -> JobResult[Any]: 

163 doc_id = job.doc_id 

164 debug_label = f"{type(wrapper).__name__} {doc_id}" 

165 

166 compliance, error_output = self._create_compliance_record(job, wrapper, debug_label) 

167 if error_output is not None: 

168 return error_output 

169 

170 assert compliance 

171 result = self.processor.process_delete( 

172 compliance=compliance, 

173 delete_handler=delete_handler, 

174 delete_search_index=delete_search_index, 

175 ) 

176 return self._build_output(job, result, debug_label) 

177 

178 def _run_anonymize( 

179 self, 

180 job: AppJobWrapper, 

181 wrapper: UserWrapper, 

182 ) -> JobResult[UserWrapper]: 

183 doc_id = job.doc_id 

184 debug_label = f"User {doc_id}" 

185 

186 compliance, error_output = self._create_compliance_record(job, wrapper, debug_label) 

187 if error_output is not None: 

188 return error_output 

189 

190 assert compliance 

191 result = self.processor.process_anonymize( 

192 compliance=compliance, 

193 user=wrapper, 

194 update_handler=self.user_bridge.update, 

195 ) 

196 return self._build_output(job, result, debug_label) 

197 

198 def _create_compliance_record( 

199 self, 

200 job: AppJobWrapper, 

201 wrapper: PersistedWrapper[Any], 

202 debug_label: str, 

203 ) -> tuple[Any, JobResult[Any] | None]: 

204 """ 

205 Create the compliance archive record. 

206 Returns (compliance_wrapper, None) on success, or (None, error_output) on failure. 

207 """ 

208 create_result = self.compliance_bridge.create(ComplianceModel.from_wrapper(wrapper)) 

209 if create_result.is_error: 

210 msg = f"Failed to create compliance record for {debug_label}" 

211 LOG().error(f"{msg}\n{create_result.formatted}", include_stack=True) 

212 output = JobResult[Any].ok(doc_id=job.doc_id, job_type=job.job_type, collection=_COL) 

213 output.set_error( 

214 app_result=create_result, 

215 error_code=AppErrorCode.COMP_CREATE_FAILED, 

216 message=msg, 

217 ) 

218 return None, output 

219 

220 return create_result.generated, None 

221 

222 def _build_output( 

223 self, 

224 job: AppJobWrapper, 

225 result: AppResult[Any], 

226 debug_label: str, 

227 ) -> JobResult[Any]: 

228 output = JobResult[Any].ok(doc_id=job.doc_id, job_type=job.job_type, collection=_COL) 

229 if result.is_error: 

230 msg = f"Compliance processing failed for {debug_label}\n{result.formatted}" 

231 output.set_error( 

232 app_result=result, 

233 error_code=AppErrorCode.COMP_PROCESSING_FAILED, 

234 message=msg, 

235 ) 

236 else: 

237 msg = f"Successfully processed compliance for {debug_label}" 

238 if IS_DEBUG: 

239 LOG().debug(msg) 

240 output.set_ok(message=msg) 

241 return output