Coverage for functions \ flipdare \ service \ processor \ user_processor.py: 55%
134 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-05-08 12:22 +1000
« prev ^ index » next coverage.py v7.13.0, created at 2026-05-08 12:22 +1000
1#!/usr/bin/env python
2# Copyright (c) 2026 Flipdare Pty Ltd. All rights reserved.
3#
4# This file is part of Flipdare's proprietary software and contains
5# confidential and copyrighted material. Unauthorised copying,
6# modification, distribution, or use of this file is strictly
7# prohibited without prior written permission from Flipdare Pty Ltd.
8#
9# This software includes third-party components licensed under MIT,
10# BSD, and Apache 2.0 licences. See THIRD_PARTY_NOTICES for details.
11#
13from __future__ import annotations
15from pathlib import Path
16from typing import TYPE_CHECKING, Any
17from google.cloud import firestore
18from google.cloud.storage.bucket import Bucket as StorageBucket
19from flipdare.result.outcome import Outcome
20from flipdare.service.processor._processor_mixin import ProcessorMixin
21from flipdare.app_globals import truncate_string
22from flipdare.app_log import LOG
23from flipdare.app_types import UserBridge
24from flipdare.constants import DOWNLOAD_FILE_DIR
25from flipdare.result.app_result import AppResult
26from flipdare.result.job_result import JobResult
27from flipdare.service.core.step_processor import ProcessingStep, StepProcessor
28from flipdare.generated import AppErrorCode
29from flipdare.generated.model.user_model import UserKeys
30from flipdare.generated.shared.firestore_collections import FirestoreCollections
31from flipdare.util.time_util import TimeUtil
32from flipdare.wrapper import UserWrapper
34if TYPE_CHECKING:
35 from flipdare.backend.indexer_service import IndexerService
36 from flipdare.firestore.invite_db import InviteDb
38_K = UserKeys
41class UserProcessor(ProcessorMixin):
42 def __init__(
43 self,
44 bucket: StorageBucket,
45 user_bridge: UserBridge,
46 invite_db: InviteDb,
47 indexer_service: IndexerService,
48 local_path: Path = DOWNLOAD_FILE_DIR,
49 ) -> None:
50 super().__init__(bucket=bucket, local_path=local_path)
51 self._indexer_service = indexer_service
52 self._user_bridge = user_bridge
53 self.invite_db = invite_db
55 @property
56 def indexer_service(self) -> IndexerService:
57 return self._indexer_service
59 @property
60 def user_bridge(self) -> UserBridge:
61 return self._user_bridge
63 def process_user(
64 self,
65 user: UserWrapper,
66 is_update: bool,
67 before: UserWrapper | None = None,
68 ) -> JobResult[UserWrapper]:
69 user_id = user.doc_id
70 # Check if already complete
71 if user.processing_complete:
72 if is_update:
73 # stats/detail changed, need to re-index
74 LOG().info(f"User update detected for {user_id}, re-indexing in search")
75 user.reindex()
76 else:
77 msg = f"User already processed for {user_id}"
78 LOG().info(msg)
79 return JobResult.skip_doc(doc_id=user_id, message=msg)
81 # Use StepProcessor for the workflow
82 start = TimeUtil.get_current_utc_dt()
84 steps: list[ProcessingStep[_K, UserWrapper]] = []
86 # Add atomic stats increment step if we have before data
87 if before is not None and is_update:
88 steps.append(
89 ProcessingStep[Any, UserWrapper](
90 state_key="STATS_INCREMENT", # Dummy key, not tracked in model
91 handler=lambda m: self._increment_stats_atomically(before, m),
92 description="Increment stats atomically",
93 required=False,
94 ),
95 )
97 steps.extend(
98 [
99 ProcessingStep[_K, UserWrapper](
100 state_key=_K.CONTEXT_CREATED,
101 handler=lambda m: self._create_thumbnail_hash_step(m),
102 description="Generate thumbnail hash",
103 required=True,
104 ),
105 ProcessingStep[_K, UserWrapper](
106 state_key=_K.SEARCH_INDEXED,
107 handler=lambda m: self._index_user_in_search_step(m, is_update=is_update),
108 description="Index in search",
109 required=True,
110 ),
111 ],
112 )
114 processor = StepProcessor(
115 wrapper=user,
116 steps=steps,
117 save_handler=lambda m: self.user_bridge.update(m),
118 process_name=f"process_user_{user_id}",
119 )
121 result = processor.execute()
123 end = TimeUtil.get_current_utc_dt()
124 duration = TimeUtil.duration_in_seconds(start, end)
126 outcome = result.outcome
127 match outcome:
128 case Outcome.ERROR:
129 LOG().error(f"Error processing user {user_id}: {result.message}")
130 return JobResult.from_result(
131 result,
132 doc_id=user_id,
133 data=user.to_json_dict(),
134 duration=duration,
135 )
136 case Outcome.SKIPPED:
137 return JobResult.skip_doc(
138 doc_id=user_id,
139 message=result.message or "Already processed",
140 duration=duration,
141 )
142 case Outcome.WARNING:
143 LOG().warning(f"Partial processing for user {user_id}: {result.message}")
144 return JobResult.partial(
145 doc_id=user_id,
146 message=result.message or "Partial processing",
147 error_code=result.main_error or AppErrorCode.PROCESSING_STEP,
148 duration=duration,
149 )
150 case Outcome.OK:
151 return JobResult.ok(doc_id=user_id)
153 def _increment_stats_atomically(
154 self,
155 before: UserWrapper,
156 after: UserWrapper,
157 ) -> AppResult[UserWrapper]:
158 """Increment stats atomically using Firestore increment to prevent race conditions."""
159 main_result = AppResult[UserWrapper](task_name="IncrementStatsAtomically")
160 user_id = after.doc_id
161 # Compare viewStats and calculate deltas
162 updates = {}
163 before_views = before.view_stats
164 after_views = after.view_stats
166 if after_views.views > before_views.views:
167 delta = after_views.views - before_views.views
168 updates["viewStats.views"] = firestore.Increment(delta)
169 LOG().debug(f"User {user_id}: incrementing views by {delta}")
171 if after_views.bookmarks > before_views.bookmarks:
172 delta = after_views.bookmarks - before_views.bookmarks
173 updates["viewStats.bookmarks"] = firestore.Increment(delta)
174 LOG().debug(f"User {user_id}: incrementing bookmarks by {delta}")
176 if after_views.flags > before_views.flags:
177 delta = after_views.flags - before_views.flags
178 updates["viewStats.flags"] = firestore.Increment(delta)
179 LOG().debug(f"User {user_id}: incrementing flags by {delta}")
181 if after_views.likes > before_views.likes:
182 delta = after_views.likes - before_views.likes
183 updates["viewStats.likes"] = firestore.Increment(delta)
184 LOG().debug(f"User {user_id}: incrementing likes by {delta}")
186 if after_views.dislikes > before_views.dislikes:
187 delta = after_views.dislikes - before_views.dislikes
188 updates["viewStats.dislikes"] = firestore.Increment(delta)
189 LOG().debug(f"User {user_id}: incrementing dislikes by {delta}")
191 # Apply atomic increments if any deltas exist
192 if updates:
193 try:
194 self.user_bridge.db.client.collection(FirestoreCollections.USER.value).document(
195 user_id,
196 ).update(updates)
197 LOG().info(f"User {user_id}: applied {len(updates)} atomic stat increments")
198 except Exception as e:
199 main_result.add_error(
200 AppErrorCode.DATABASE_EX,
201 f"Failed to apply atomic increments: {e}",
202 )
203 return main_result
205 main_result.generated = after
206 return main_result
208 def _create_thumbnail_hash_step(self, user: UserWrapper) -> AppResult[UserWrapper]:
209 """Generate thumbnail hash for UserWrapper avatar image."""
210 doc_id = user.doc_id
211 main_result = AppResult[UserWrapper](task_name=f"CreateThumbnailHash for User {doc_id}")
213 avatar_image = user.avatar
214 if avatar_image is None:
215 # thi can happen if no avatar set
216 msg = f"No avatar image for UserWrapper: {doc_id}"
217 return AppResult[UserWrapper].skip(doc_id=doc_id, message=msg)
219 # Generate hash
220 LOG().debug(f"Generating hash for thumbnail of UserWrapper: {doc_id}")
221 hash_code = self.get_image_hash(avatar_image, width=avatar_image.w, height=avatar_image.h)
222 if hash_code is None:
223 main_result.add_error(AppErrorCode.HASH, "Failed to generate hash for thumbnail")
224 return main_result
226 LOG().debug(
227 f"Generated thumbnail hash for UserWrapper: {doc_id} hash: {truncate_string(hash_code)}",
228 )
230 avatar_image.blur_hash = hash_code
231 user.avatar = avatar_image
232 main_result.generated = user
233 return main_result
235 def _mark_invite_signup_step(self, user: UserWrapper) -> AppResult[UserWrapper]:
236 """Mark invites as processed when user signs up."""
237 doc_id = user.doc_id
238 main_result = AppResult[UserWrapper](task_name=f"MarkInviteSignup for User {doc_id}")
240 to_email = user.email
241 LOG().debug(f"Marking invites as processed for email: {to_email}")
242 try:
243 self.invite_db.mark_invite_signup(to_email)
244 except Exception as error:
245 main_result.add_exception(
246 AppErrorCode.DATABASE_EX,
247 ex=error,
248 extra=f"Error marking invites as processed for email: {to_email}",
249 )
250 return main_result
252 main_result.generated = user
253 return main_result
255 def _index_user_in_search_step(
256 self,
257 user: UserWrapper,
258 is_update: bool,
259 ) -> AppResult[UserWrapper]:
260 """Add user to search index (or remove if private)."""
261 main_result = AppResult[UserWrapper](task_name="IndexUserInSearch")
263 # NOTE: we pass to index irrespective of whether the dare is private,
264 # NOTE: because it may have changed from public to private or vice versa.
265 # NOTE: and will need to be deleted.
266 index_result = self.indexer_service.process_user(user, is_update=is_update)
267 if index_result.is_error:
268 main_result.merge(index_result)
269 return main_result
271 main_result.generated = user
272 return main_result