Coverage for functions \ flipdare \ service \ processor \ compliance_processor.py: 84%

117 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-05-08 12:22 +1000

1#!/usr/bin/env python 

2# Copyright (c) 2026 Flipdare Pty Ltd. All rights reserved. 

3# 

4# This file is part of Flipdare's proprietary software and contains 

5# confidential and copyrighted material. Unauthorised copying, 

6# modification, distribution, or use of this file is strictly 

7# prohibited without prior written permission from Flipdare Pty Ltd. 

8# 

9# This software includes third-party components licensed under MIT, 

10# BSD, and Apache 2.0 licences. See THIRD_PARTY_NOTICES for details. 

11# 

12 

13""" 

14ComplianceProcessor - Handles compliance delete/anonymize processing. 

15Uses StepProcessor for automatic state tracking and retry/resume capability. 

16Step state is persisted on the ComplianceWrapper after each successful step. 

17""" 

18 

19from __future__ import annotations 

20 

21from collections.abc import Callable 

22from dataclasses import dataclass 

23from enum import StrEnum 

24 

25from flipdare.app_defaults import get_fallback_avatar 

26from flipdare.app_log import LOG 

27from flipdare.app_types import ComplianceBridge 

28from flipdare.backend.indexer_service import IndexerService 

29from flipdare.constants import ( 

30 DELETED_USER_EMAIL_DOMAIN, 

31 DELETED_USER_MIN_ID_LENGTH, 

32 DELETED_USER_NAME_PREFIX, 

33 IS_DEBUG, 

34) 

35from flipdare.result.app_result import AppResult 

36from flipdare.core.tokenizer import Tokenizer 

37from flipdare.generated import AppErrorCode 

38from flipdare.service.core.step_processor import ProcessingStep, StepProcessor 

39from flipdare.wrapper.backend.compliance_wrapper import ComplianceWrapper 

40from flipdare.wrapper.user_wrapper import UserWrapper 

41 

42__all__ = ["AnonymizeOptions", "ComplianceProcessor"] 

43 

44 

45class _Step(StrEnum): 

46 DELETED_FROM_SEARCH = "deleted_from_search" 

47 FINALIZED = "anonymized" # tracks both delete and anonymize completion 

48 

49 

50@dataclass 

51class AnonymizeOptions: 

52 length: int = DELETED_USER_MIN_ID_LENGTH 

53 email_domain: str = DELETED_USER_EMAIL_DOMAIN 

54 name_prefix: str = DELETED_USER_NAME_PREFIX 

55 

56 

57class ComplianceProcessor: 

58 """ 

59 Handles compliance delete/anonymize workflows with StepProcessor state tracking. 

60 

61 Each step result is persisted on the ComplianceWrapper so retries resume from the 

62 last successful checkpoint instead of repeating completed work. 

63 

64 Steps (delete): DELETED_FROM_SEARCH → FINALIZED 

65 Steps (anonymize): DELETED_FROM_SEARCH → FINALIZED 

66 """ 

67 

68 def __init__( 

69 self, 

70 compliance_bridge: ComplianceBridge, 

71 indexer_service: IndexerService, 

72 tokenizer: Tokenizer, 

73 options: AnonymizeOptions | None = None, 

74 ) -> None: 

75 self.compliance_bridge = compliance_bridge 

76 self.indexer_service = indexer_service 

77 self.tokenizer = tokenizer 

78 self._options = options or AnonymizeOptions() 

79 

80 def process_delete( 

81 self, 

82 compliance: ComplianceWrapper, 

83 delete_handler: Callable[[str], AppResult[None]], 

84 delete_search_index: bool, 

85 ) -> AppResult[ComplianceWrapper]: 

86 """Execute a delete compliance job with step tracking.""" 

87 steps: list[ProcessingStep[_Step, ComplianceWrapper]] = [] 

88 

89 if delete_search_index: 

90 steps.append( 

91 ProcessingStep( 

92 state_key=_Step.DELETED_FROM_SEARCH, 

93 handler=lambda m: self._step_delete_from_search(m), 

94 description="Delete from search index", 

95 required=True, 

96 ) 

97 ) 

98 

99 steps.append( 

100 ProcessingStep( 

101 state_key=_Step.FINALIZED, 

102 handler=lambda m: self._step_delete_object(m, delete_handler), 

103 description="Delete from source collection", 

104 required=True, 

105 ) 

106 ) 

107 

108 return StepProcessor( 

109 wrapper=compliance, 

110 steps=steps, 

111 save_handler=lambda m: self.compliance_bridge.update(m), 

112 process_name=f"compliance_delete_{compliance.obj_id}", 

113 ).execute() 

114 

115 def process_anonymize( 

116 self, 

117 compliance: ComplianceWrapper, 

118 user: UserWrapper, 

119 update_handler: Callable[[UserWrapper], AppResult[UserWrapper]], 

120 ) -> AppResult[ComplianceWrapper]: 

121 """Execute a user anonymization compliance job with step tracking.""" 

122 steps: list[ProcessingStep[_Step, ComplianceWrapper]] = [ 

123 ProcessingStep( 

124 state_key=_Step.DELETED_FROM_SEARCH, 

125 handler=lambda m: self._step_delete_from_search(m), 

126 description="Delete user from search index", 

127 required=True, 

128 ), 

129 ProcessingStep( 

130 state_key=_Step.FINALIZED, 

131 handler=lambda m: self._step_anonymize_object(m, user, update_handler), 

132 description="Anonymize user record", 

133 required=True, 

134 ), 

135 ] 

136 

137 return StepProcessor( 

138 wrapper=compliance, 

139 steps=steps, 

140 save_handler=lambda m: self.compliance_bridge.update(m), 

141 process_name=f"compliance_anonymize_{compliance.obj_id}", 

142 ).execute() 

143 

144 # ======================================================================== 

145 # Step Handlers 

146 # ======================================================================== 

147 

148 def _step_delete_from_search( 

149 self, compliance: ComplianceWrapper 

150 ) -> AppResult[ComplianceWrapper]: 

151 uid = compliance.uid 

152 obj_id = compliance.obj_id 

153 debug_msg = f"({obj_id}/{uid})" 

154 main_result = AppResult[ComplianceWrapper](doc_id=obj_id) 

155 

156 try: 

157 general_result = self.indexer_service.delete_general(uid=uid, obj_id=obj_id) 

158 if general_result.is_error: 

159 msg = f"Failed to delete from search for {debug_msg}\n{general_result.formatted}" # noqa: S608 

160 LOG().error(msg) 

161 main_result.merge(general_result) 

162 return main_result 

163 

164 friend_result = self.indexer_service.delete_all_friends(uid=uid) 

165 if friend_result.is_error: 

166 msg = f"Failed to delete friends from search for {debug_msg}\n{friend_result.formatted}" 

167 LOG().error(msg) 

168 main_result.merge(friend_result) 

169 

170 except Exception as error: 

171 cause = f"Error deleting {debug_msg} from search: {error}" 

172 LOG().error(cause) 

173 main_result.add_error(AppErrorCode.COMP_DELETE_FAILED, cause) 

174 

175 return main_result 

176 

177 def _step_delete_object( 

178 self, 

179 compliance: ComplianceWrapper, 

180 delete_handler: Callable[[str], AppResult[None]], 

181 ) -> AppResult[ComplianceWrapper]: 

182 obj_id = compliance.obj_id 

183 debug_msg = f"({obj_id})" 

184 main_result = AppResult[ComplianceWrapper](doc_id=obj_id) 

185 

186 if IS_DEBUG: 

187 LOG().debug(f"Deleting {debug_msg} from source collection") 

188 

189 delete_result = delete_handler(obj_id) 

190 if delete_result.is_error: 

191 msg = f"Failed to delete {debug_msg}.\n{delete_result.formatted}" 

192 LOG().error(msg, include_stack=True) 

193 main_result.add_error(AppErrorCode.COMP_DELETE_FAILED, msg) 

194 else: 

195 main_result.ok(doc_id=obj_id, message=f"Deleted {debug_msg}") 

196 

197 return main_result 

198 

199 def _step_anonymize_object( 

200 self, 

201 compliance: ComplianceWrapper, 

202 user: UserWrapper, 

203 update_handler: Callable[[UserWrapper], AppResult[UserWrapper]], 

204 ) -> AppResult[ComplianceWrapper]: 

205 obj_id = compliance.obj_id 

206 debug_msg = f"({obj_id})" 

207 main_result = AppResult[ComplianceWrapper](doc_id=obj_id) 

208 

209 self._anonymize_user(user) 

210 updates = user.get_updates() 

211 if not updates: 

212 msg = f"No updates generated when anonymizing user {debug_msg}" 

213 LOG().error(msg, include_stack=True) 

214 main_result.add_error(AppErrorCode.COMP_ANONYMIZE_FAILED, msg) 

215 return main_result 

216 

217 if IS_DEBUG: 

218 LOG().debug(f"Anonymizing user {debug_msg} with {len(updates)} field updates") 

219 

220 result = update_handler(user) 

221 if result.is_error or result.is_warning: 

222 msg = f"Failed to persist anonymized user {debug_msg}\n{result.formatted}" 

223 LOG().error(msg, include_stack=True) 

224 main_result.add_error(AppErrorCode.COMP_ANONYMIZE_FAILED, msg) 

225 else: 

226 if IS_DEBUG: 

227 LOG().debug(f"Successfully anonymized user {debug_msg}") 

228 main_result.ok(doc_id=obj_id, message=f"Anonymized user {debug_msg}") 

229 

230 return main_result 

231 

232 # ======================================================================== 

233 # Anonymize Helpers 

234 # ======================================================================== 

235 

236 def _anonymize_user(self, user: UserWrapper) -> None: 

237 """ 

238 When deleting a user (for compliance) we: 

239 1. move the user data to the compliance collection (with a compliance model that includes the 

240 deleted_by_uid and obj_type for search indexing) 

241 2. anonymise the existing user record in the user collection (instead of deleting, to 

242 preserve referential integrity for other records that reference the user_id) 

243 3. delete the user from the search index 

244 """ 

245 user_id = user.doc_id 

246 opts = self._options 

247 anon_uid = user_id[: opts.length] if len(user_id) >= opts.length else user_id 

248 anon_name = f"{opts.name_prefix} {anon_uid}" 

249 anon_email = f"{anon_name}@{opts.email_domain}" 

250 

251 user.avatar = get_fallback_avatar() 

252 user.email = self._anonymize_value(anon_email, user.email) 

253 user.name = ( 

254 self._anonymize_value(anon_name, user.name) if user.name is not None else user.name 

255 ) 

256 

257 if user.description is not None: 

258 user.description = self._anonymize_value(opts.name_prefix, user.description) 

259 if user.display_name is not None: 

260 user.display_name = self._anonymize_value(opts.name_prefix, user.display_name) 

261 

262 def _anonymize_value(self, anon_value: str, value: str) -> str: 

263 """ 

264 Anonymize a value if it appears to be a person's name. 

265 

266 Args: 

267 anon_value: The anonymized value to use 

268 value: The original value to check 

269 

270 Returns: 

271 The anonymized value if it's a person name, otherwise the original value 

272 

273 """ 

274 try: 

275 if self.tokenizer.is_person_name(value): 

276 return anon_value 

277 return value 

278 except Exception as e: 

279 LOG().warning(f"Error tokenizing value for anonymization: {e}") 

280 return anon_value