Coverage for functions \ flipdare \ service \ processor \ group_processor.py: 79%
193 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-05-08 12:22 +1000
« prev ^ index » next coverage.py v7.13.0, created at 2026-05-08 12:22 +1000
1#!/usr/bin/env python
2# Copyright (c) 2026 Flipdare Pty Ltd. All rights reserved.
3#
4# This file is part of Flipdare's proprietary software and contains
5# confidential and copyrighted material. Unauthorised copying,
6# modification, distribution, or use of this file is strictly
7# prohibited without prior written permission from Flipdare Pty Ltd.
8#
9# This software includes third-party components licensed under MIT,
10# BSD, and Apache 2.0 licences. See THIRD_PARTY_NOTICES for details.
11#
13from __future__ import annotations
15from pathlib import Path
16from typing import TYPE_CHECKING, Any
18from google.cloud import firestore
19from google.cloud.storage.bucket import Bucket as StorageBucket
20from flipdare.service.processor._processor_mixin import ProcessorMixin
21from flipdare.backend.avatar_loader import AvatarLoader
22from flipdare.app_globals import truncate_string
23from flipdare.app_log import LOG
24from flipdare.constants import DOWNLOAD_FILE_DIR, IS_DEBUG, NO_DOC_ID
25from flipdare.result.app_result import AppResult
26from flipdare.service.core.step_processor import ProcessingStep, StepProcessor
27from flipdare.generated import AppErrorCode, GroupMemberType, RequestStatus
28from flipdare.generated.model.group_member_model import GroupMemberModel
29from flipdare.generated.model.group_model import GroupKeys
30from flipdare.generated.model.internal.view_stats_model import ViewStatsKeys
31from flipdare.generated.shared.firestore_collections import FirestoreCollections
32from flipdare.wrapper import GroupMemberWrapper, GroupWrapper
34if TYPE_CHECKING:
36 from flipdare.backend.indexer_service import IndexerService
37 from flipdare.firestore.group_db import GroupDb
40_K = GroupKeys
41_V = ViewStatsKeys
44class GroupProcessor(ProcessorMixin):
46 def __init__(
47 self,
48 bucket: StorageBucket,
49 group_db: GroupDb,
50 indexer_service: IndexerService,
51 local_path: Path = DOWNLOAD_FILE_DIR,
52 ) -> None:
53 super().__init__(bucket=bucket, local_path=local_path)
54 self._indexer_service = indexer_service
55 self._group_db = group_db
57 @property
58 def indexer_service(self) -> IndexerService:
59 return self._indexer_service
61 @property
62 def group_db(self) -> GroupDb:
63 return self._group_db
65 def process_group(
66 self,
67 group: GroupWrapper,
68 is_update: bool,
69 before: GroupWrapper | None = None,
70 ) -> AppResult[GroupWrapper]:
71 group_id = group.doc_id
72 main_result = AppResult[GroupWrapper](
73 task_name=f"ProcessGroup for {group_id or NO_DOC_ID}",
74 )
76 # Check if already complete
77 if group.processing_complete:
78 if is_update:
79 # stats/detail changed, need to re-index
80 LOG().info(f"Group update detected for {group_id}, re-indexing in search")
81 group.reindex()
82 else:
83 msg = f"Group already processed for {group_id}"
84 LOG().info(msg)
85 return AppResult[GroupWrapper].ok(doc_id=group_id, message=msg)
87 # Use StepProcessor for the workflow
88 processor = self._group_processor(group, is_update=is_update, before=before)
89 if processor is None:
90 main_result.add_error(
91 AppErrorCode.PROCESSING_STEP,
92 f"Failed to build processor for Group {group_id}",
93 )
94 return main_result
96 result = processor.execute()
97 if result.is_error:
98 main_result.merge(result)
100 return main_result
102 def _group_processor(
103 self,
104 group: GroupWrapper,
105 is_update: bool,
106 before: GroupWrapper | None = None,
107 ) -> StepProcessor[GroupWrapper] | None:
108 group_id = group.doc_id
109 steps = []
111 # Add atomic stats increment step if we have before data
112 if before is not None and is_update:
113 steps.append(
114 ProcessingStep[Any, GroupWrapper](
115 state_key="STATS_INCREMENT", # Dummy key, not tracked in model
116 handler=lambda m: self._increment_stats_atomically(before, m),
117 description="Increment stats atomically",
118 required=False,
119 ),
120 )
122 steps.extend(
123 [
124 ProcessingStep[_K, GroupWrapper](
125 state_key=_K.HASH_GENERATED,
126 handler=lambda m: self._create_thumbnail_hash(m),
127 description="Generate thumbnail hash",
128 required=True,
129 ),
130 ProcessingStep[_K, GroupWrapper](
131 state_key=_K.MEMBER_CREATED,
132 handler=lambda m: self._add_owner_member_step(m),
133 description="Add owner as group member",
134 required=True,
135 ),
136 ProcessingStep[_K, GroupWrapper](
137 state_key=_K.SEARCH_INDEXED,
138 handler=lambda m: self._index_group_in_search(m, is_update=is_update),
139 description="Index in search",
140 required=True,
141 ),
142 ],
143 )
145 return StepProcessor(
146 wrapper=group,
147 steps=steps,
148 save_handler=lambda m: self._update_member(m),
149 process_name=f"process_group_{group_id}",
150 )
152 def _update_member(self, model: GroupWrapper) -> AppResult[GroupWrapper]:
153 result = AppResult[GroupWrapper](
154 task_name=f"UpdateGroup {model.doc_id or NO_DOC_ID} in DB",
155 )
157 updated_group: GroupWrapper | None = None
158 try:
159 doc_id = model.doc_id
160 updated_group = self.group_db.update(doc_id=doc_id, updates=model.get_updates())
162 except Exception as e:
163 msg = f"Exception updating Group {model.doc_id}: {e}"
164 LOG().error(msg)
165 result.add_error(AppErrorCode.DATABASE_EX, msg)
166 return result
168 if updated_group is None:
169 msg = f"Failed to update Group {model.doc_id} - no data returned"
170 LOG().error(msg)
171 result.add_error(AppErrorCode.NOT_FOUND, msg)
172 return result
174 result.generated = updated_group
175 return result
177 def _increment_stats_atomically( # noqa: PLR0915, PLR0912
178 self,
179 before: GroupWrapper,
180 after: GroupWrapper,
181 ) -> AppResult[GroupWrapper]:
182 """Increment stats atomically using Firestore increment to prevent race conditions."""
183 main_result = AppResult[GroupWrapper](task_name="IncrementStatsAtomically")
184 group_id = after.doc_id
186 # Compare viewStats and calculate deltas
187 updates = {}
188 v_before = before.view_stats
189 v_after = after.view_stats
191 debug_msg = ""
192 delta_views: int | None = None
193 delta_bookmarks: int | None = None
194 delta_flags: int | None = None
195 delta_likes: int | None = None
196 delta_dislikes: int | None = None
198 if v_after.views > v_before.views:
199 delta_views = v_after.views - v_before.views
200 if v_after.bookmarks > v_before.bookmarks:
201 delta_bookmarks = v_after.bookmarks - v_before.bookmarks
202 if v_after.flags > v_before.flags:
203 delta_flags = v_after.flags - v_before.flags
204 if v_after.likes > v_before.likes:
205 delta_likes = v_after.likes - v_before.likes
206 if v_after.dislikes > v_before.dislikes:
207 delta_dislikes = v_after.dislikes - v_before.dislikes
209 if (
210 delta_views is None
211 and delta_bookmarks is None
212 and delta_flags is None
213 and delta_likes is None
214 and delta_dislikes is None
215 ):
216 if IS_DEBUG:
217 msg = f"No view stat increases detected for Group {group_id}, skipping atomic increment."
218 LOG().debug(msg)
220 return main_result
222 views_key = f"view_stats.{_V.VIEWS.value}"
223 bookmarks_key = f"view_stats.{_V.BOOKMARKS.value}"
224 flags_key = f"view_stats.{_V.FLAGS.value}"
225 likes_key = f"view_stats.{_V.LIKES.value}"
226 dislikes_key = f"view_stats.{_V.DISLIKES.value}"
228 if delta_views is not None:
229 updates[views_key] = firestore.Increment(delta_views)
230 debug_msg += f"views: {v_before.views} -> {v_after.views} (delta {delta_views})\n"
231 if delta_bookmarks is not None:
232 updates[bookmarks_key] = firestore.Increment(delta_bookmarks)
233 debug_msg += f"bookmarks: {v_before.bookmarks} -> {v_after.bookmarks} (delta {delta_bookmarks})\n"
234 if delta_flags is not None:
235 updates[flags_key] = firestore.Increment(delta_flags)
236 debug_msg += f"flags: {v_before.flags} -> {v_after.flags} (delta {delta_flags})\n"
237 if delta_likes is not None:
238 updates[likes_key] = firestore.Increment(delta_likes)
239 debug_msg += f"likes: {v_before.likes} -> {v_after.likes} (delta {delta_likes})\n"
240 if delta_dislikes is not None:
241 updates[dislikes_key] = firestore.Increment(delta_dislikes)
242 debug_msg += (
243 f"dislikes: {v_before.dislikes} -> {v_after.dislikes} (delta {delta_dislikes})\n"
244 )
246 # Apply atomic increments if any deltas exist
247 try:
248 client = self.group_db.client
249 client.collection(FirestoreCollections.GROUP.value).document(group_id).update(updates)
250 if IS_DEBUG:
251 msg = f"Group {group_id}: applied {len(updates)} stat increments\n{debug_msg}"
252 LOG().debug(msg)
254 except Exception as e:
255 msg = f"Exception applying atomic increments for Group {group_id}: {e}"
256 LOG().error(msg)
257 main_result.add_error(AppErrorCode.DATABASE_EX, msg)
258 return main_result
260 main_result.generated = after
261 return main_result
263 def _add_owner_member_step(self, group: GroupWrapper) -> AppResult[GroupMemberWrapper]:
264 """Ensure the owner is added as a GroupMemberModel entry."""
265 doc_id = group.doc_id
266 main_result = AppResult[GroupMemberWrapper](
267 task_name=f"AddOwnerGroupMember for Group {doc_id}",
268 )
270 owner_id = group.uid
271 # Check if owner is already a member
272 result = self.group_db.get_member(group_id=doc_id, member_id=owner_id)
273 if result is not None:
274 main_result.generated = result
275 LOG().debug(
276 f"Owner {owner_id} is already a member of Group {doc_id}, skipping member creation.",
277 )
278 return main_result
280 # Create GroupMemberModel for owner
281 LOG().debug(f"Adding owner {owner_id} as member of Group {doc_id}.")
282 try:
283 member = GroupMemberModel(
284 id=None,
285 gid=doc_id,
286 uid=owner_id,
287 member_type=GroupMemberType.OWNER,
288 status=RequestStatus.ACCEPTED,
289 )
290 except Exception as e:
291 msg = f"Failed to create GroupMemberModel for owner {owner_id} of Group {doc_id}: {e}"
292 LOG().error(msg)
293 main_result.add_error(AppErrorCode.DATABASE_EX, msg)
294 return main_result
296 try:
297 saved_model = self.group_db.create_member(group_id=doc_id, member_data=member)
298 main_result.generated = saved_model
299 return main_result
301 except Exception as e:
302 msg = f"Failed to save GroupMemberModel for owner {owner_id} of Group {doc_id}: {e}"
303 LOG().error(msg)
304 main_result.add_error(AppErrorCode.DATABASE_EX, msg)
305 return main_result
307 def _create_thumbnail_hash(self, group: GroupWrapper) -> AppResult[GroupWrapper]:
308 """Generate thumbnail hash for GroupWrapper avatar image."""
309 doc_id = group.doc_id
310 main_result = AppResult[GroupWrapper](
311 task_name=f"CreateThumbnailHash for Group {doc_id}",
312 )
314 avatar_image = group.avatar
315 if avatar_image is None:
316 def_avatar = AvatarLoader().get_random_group_avatar()
317 if IS_DEBUG:
318 msg = f"No avatar for GroupModel {doc_id}, assigning default avatar {def_avatar.name}"
319 LOG().debug(msg)
321 group.avatar = def_avatar.image_model
322 main_result.generated = group
323 return main_result
325 # Generate hash
326 if IS_DEBUG:
327 LOG().debug(f"Generating hash for thumbnail of GroupModel: {doc_id}")
329 hash_code = self.get_image_hash(avatar_image, width=avatar_image.w, height=avatar_image.h)
330 if hash_code is None:
331 main_result.add_error(AppErrorCode.HASH, "Failed to generate hash for thumbnail")
332 return main_result
334 if IS_DEBUG:
335 msg = f"Generated thumbnail hash for UserWrapper: {doc_id} hash: {truncate_string(hash_code)}"
336 LOG().debug(msg)
338 avatar_image.blur_hash = hash_code
339 group.avatar = avatar_image
340 main_result.generated = group
341 return main_result
343 def _index_group_in_search(
344 self,
345 group: GroupWrapper,
346 is_update: bool,
347 ) -> AppResult[GroupWrapper]:
348 """Add user to search index (or remove if private)."""
349 main_result = AppResult[GroupWrapper](task_name="IndexGroup InSearch")
351 # NOTE: we pass to index irrespective of whether the dare is private,
352 # NOTE: because it may have changed from public to private or vice versa.
353 # NOTE: and will need to be deleted.
354 index_result = self.indexer_service.process_group(group, is_update=is_update)
355 if index_result.is_error:
356 main_result.merge(index_result)
357 return main_result
359 main_result.generated = group
360 return main_result