Coverage for functions \ flipdare \ service \ processor \ compliance_processor.py: 84%
117 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-05-08 12:22 +1000
« prev ^ index » next coverage.py v7.13.0, created at 2026-05-08 12:22 +1000
1#!/usr/bin/env python
2# Copyright (c) 2026 Flipdare Pty Ltd. All rights reserved.
3#
4# This file is part of Flipdare's proprietary software and contains
5# confidential and copyrighted material. Unauthorised copying,
6# modification, distribution, or use of this file is strictly
7# prohibited without prior written permission from Flipdare Pty Ltd.
8#
9# This software includes third-party components licensed under MIT,
10# BSD, and Apache 2.0 licences. See THIRD_PARTY_NOTICES for details.
11#
13"""
14ComplianceProcessor - Handles compliance delete/anonymize processing.
15Uses StepProcessor for automatic state tracking and retry/resume capability.
16Step state is persisted on the ComplianceWrapper after each successful step.
17"""
19from __future__ import annotations
21from collections.abc import Callable
22from dataclasses import dataclass
23from enum import StrEnum
25from flipdare.app_defaults import get_fallback_avatar
26from flipdare.app_log import LOG
27from flipdare.app_types import ComplianceBridge
28from flipdare.backend.indexer_service import IndexerService
29from flipdare.constants import (
30 DELETED_USER_EMAIL_DOMAIN,
31 DELETED_USER_MIN_ID_LENGTH,
32 DELETED_USER_NAME_PREFIX,
33 IS_DEBUG,
34)
35from flipdare.result.app_result import AppResult
36from flipdare.core.tokenizer import Tokenizer
37from flipdare.generated import AppErrorCode
38from flipdare.service.core.step_processor import ProcessingStep, StepProcessor
39from flipdare.wrapper.backend.compliance_wrapper import ComplianceWrapper
40from flipdare.wrapper.user_wrapper import UserWrapper
42__all__ = ["AnonymizeOptions", "ComplianceProcessor"]
45class _Step(StrEnum):
46 DELETED_FROM_SEARCH = "deleted_from_search"
47 FINALIZED = "anonymized" # tracks both delete and anonymize completion
50@dataclass
51class AnonymizeOptions:
52 length: int = DELETED_USER_MIN_ID_LENGTH
53 email_domain: str = DELETED_USER_EMAIL_DOMAIN
54 name_prefix: str = DELETED_USER_NAME_PREFIX
57class ComplianceProcessor:
58 """
59 Handles compliance delete/anonymize workflows with StepProcessor state tracking.
61 Each step result is persisted on the ComplianceWrapper so retries resume from the
62 last successful checkpoint instead of repeating completed work.
64 Steps (delete): DELETED_FROM_SEARCH → FINALIZED
65 Steps (anonymize): DELETED_FROM_SEARCH → FINALIZED
66 """
68 def __init__(
69 self,
70 compliance_bridge: ComplianceBridge,
71 indexer_service: IndexerService,
72 tokenizer: Tokenizer,
73 options: AnonymizeOptions | None = None,
74 ) -> None:
75 self.compliance_bridge = compliance_bridge
76 self.indexer_service = indexer_service
77 self.tokenizer = tokenizer
78 self._options = options or AnonymizeOptions()
80 def process_delete(
81 self,
82 compliance: ComplianceWrapper,
83 delete_handler: Callable[[str], AppResult[None]],
84 delete_search_index: bool,
85 ) -> AppResult[ComplianceWrapper]:
86 """Execute a delete compliance job with step tracking."""
87 steps: list[ProcessingStep[_Step, ComplianceWrapper]] = []
89 if delete_search_index:
90 steps.append(
91 ProcessingStep(
92 state_key=_Step.DELETED_FROM_SEARCH,
93 handler=lambda m: self._step_delete_from_search(m),
94 description="Delete from search index",
95 required=True,
96 )
97 )
99 steps.append(
100 ProcessingStep(
101 state_key=_Step.FINALIZED,
102 handler=lambda m: self._step_delete_object(m, delete_handler),
103 description="Delete from source collection",
104 required=True,
105 )
106 )
108 return StepProcessor(
109 wrapper=compliance,
110 steps=steps,
111 save_handler=lambda m: self.compliance_bridge.update(m),
112 process_name=f"compliance_delete_{compliance.obj_id}",
113 ).execute()
115 def process_anonymize(
116 self,
117 compliance: ComplianceWrapper,
118 user: UserWrapper,
119 update_handler: Callable[[UserWrapper], AppResult[UserWrapper]],
120 ) -> AppResult[ComplianceWrapper]:
121 """Execute a user anonymization compliance job with step tracking."""
122 steps: list[ProcessingStep[_Step, ComplianceWrapper]] = [
123 ProcessingStep(
124 state_key=_Step.DELETED_FROM_SEARCH,
125 handler=lambda m: self._step_delete_from_search(m),
126 description="Delete user from search index",
127 required=True,
128 ),
129 ProcessingStep(
130 state_key=_Step.FINALIZED,
131 handler=lambda m: self._step_anonymize_object(m, user, update_handler),
132 description="Anonymize user record",
133 required=True,
134 ),
135 ]
137 return StepProcessor(
138 wrapper=compliance,
139 steps=steps,
140 save_handler=lambda m: self.compliance_bridge.update(m),
141 process_name=f"compliance_anonymize_{compliance.obj_id}",
142 ).execute()
144 # ========================================================================
145 # Step Handlers
146 # ========================================================================
148 def _step_delete_from_search(
149 self, compliance: ComplianceWrapper
150 ) -> AppResult[ComplianceWrapper]:
151 uid = compliance.uid
152 obj_id = compliance.obj_id
153 debug_msg = f"({obj_id}/{uid})"
154 main_result = AppResult[ComplianceWrapper](doc_id=obj_id)
156 try:
157 general_result = self.indexer_service.delete_general(uid=uid, obj_id=obj_id)
158 if general_result.is_error:
159 msg = f"Failed to delete from search for {debug_msg}\n{general_result.formatted}" # noqa: S608
160 LOG().error(msg)
161 main_result.merge(general_result)
162 return main_result
164 friend_result = self.indexer_service.delete_all_friends(uid=uid)
165 if friend_result.is_error:
166 msg = f"Failed to delete friends from search for {debug_msg}\n{friend_result.formatted}"
167 LOG().error(msg)
168 main_result.merge(friend_result)
170 except Exception as error:
171 cause = f"Error deleting {debug_msg} from search: {error}"
172 LOG().error(cause)
173 main_result.add_error(AppErrorCode.COMP_DELETE_FAILED, cause)
175 return main_result
177 def _step_delete_object(
178 self,
179 compliance: ComplianceWrapper,
180 delete_handler: Callable[[str], AppResult[None]],
181 ) -> AppResult[ComplianceWrapper]:
182 obj_id = compliance.obj_id
183 debug_msg = f"({obj_id})"
184 main_result = AppResult[ComplianceWrapper](doc_id=obj_id)
186 if IS_DEBUG:
187 LOG().debug(f"Deleting {debug_msg} from source collection")
189 delete_result = delete_handler(obj_id)
190 if delete_result.is_error:
191 msg = f"Failed to delete {debug_msg}.\n{delete_result.formatted}"
192 LOG().error(msg, include_stack=True)
193 main_result.add_error(AppErrorCode.COMP_DELETE_FAILED, msg)
194 else:
195 main_result.ok(doc_id=obj_id, message=f"Deleted {debug_msg}")
197 return main_result
199 def _step_anonymize_object(
200 self,
201 compliance: ComplianceWrapper,
202 user: UserWrapper,
203 update_handler: Callable[[UserWrapper], AppResult[UserWrapper]],
204 ) -> AppResult[ComplianceWrapper]:
205 obj_id = compliance.obj_id
206 debug_msg = f"({obj_id})"
207 main_result = AppResult[ComplianceWrapper](doc_id=obj_id)
209 self._anonymize_user(user)
210 updates = user.get_updates()
211 if not updates:
212 msg = f"No updates generated when anonymizing user {debug_msg}"
213 LOG().error(msg, include_stack=True)
214 main_result.add_error(AppErrorCode.COMP_ANONYMIZE_FAILED, msg)
215 return main_result
217 if IS_DEBUG:
218 LOG().debug(f"Anonymizing user {debug_msg} with {len(updates)} field updates")
220 result = update_handler(user)
221 if result.is_error or result.is_warning:
222 msg = f"Failed to persist anonymized user {debug_msg}\n{result.formatted}"
223 LOG().error(msg, include_stack=True)
224 main_result.add_error(AppErrorCode.COMP_ANONYMIZE_FAILED, msg)
225 else:
226 if IS_DEBUG:
227 LOG().debug(f"Successfully anonymized user {debug_msg}")
228 main_result.ok(doc_id=obj_id, message=f"Anonymized user {debug_msg}")
230 return main_result
232 # ========================================================================
233 # Anonymize Helpers
234 # ========================================================================
236 def _anonymize_user(self, user: UserWrapper) -> None:
237 """
238 When deleting a user (for compliance) we:
239 1. move the user data to the compliance collection (with a compliance model that includes the
240 deleted_by_uid and obj_type for search indexing)
241 2. anonymise the existing user record in the user collection (instead of deleting, to
242 preserve referential integrity for other records that reference the user_id)
243 3. delete the user from the search index
244 """
245 user_id = user.doc_id
246 opts = self._options
247 anon_uid = user_id[: opts.length] if len(user_id) >= opts.length else user_id
248 anon_name = f"{opts.name_prefix} {anon_uid}"
249 anon_email = f"{anon_name}@{opts.email_domain}"
251 user.avatar = get_fallback_avatar()
252 user.email = self._anonymize_value(anon_email, user.email)
253 user.name = (
254 self._anonymize_value(anon_name, user.name) if user.name is not None else user.name
255 )
257 if user.description is not None:
258 user.description = self._anonymize_value(opts.name_prefix, user.description)
259 if user.display_name is not None:
260 user.display_name = self._anonymize_value(opts.name_prefix, user.display_name)
262 def _anonymize_value(self, anon_value: str, value: str) -> str:
263 """
264 Anonymize a value if it appears to be a person's name.
266 Args:
267 anon_value: The anonymized value to use
268 value: The original value to check
270 Returns:
271 The anonymized value if it's a person name, otherwise the original value
273 """
274 try:
275 if self.tokenizer.is_person_name(value):
276 return anon_value
277 return value
278 except Exception as e:
279 LOG().warning(f"Error tokenizing value for anonymization: {e}")
280 return anon_value