Coverage for functions \ flipdare \ service \ processor \ group_processor.py: 79%

193 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-05-08 12:22 +1000

1#!/usr/bin/env python 

2# Copyright (c) 2026 Flipdare Pty Ltd. All rights reserved. 

3# 

4# This file is part of Flipdare's proprietary software and contains 

5# confidential and copyrighted material. Unauthorised copying, 

6# modification, distribution, or use of this file is strictly 

7# prohibited without prior written permission from Flipdare Pty Ltd. 

8# 

9# This software includes third-party components licensed under MIT, 

10# BSD, and Apache 2.0 licences. See THIRD_PARTY_NOTICES for details. 

11# 

12 

13from __future__ import annotations 

14 

15from pathlib import Path 

16from typing import TYPE_CHECKING, Any 

17 

18from google.cloud import firestore 

19from google.cloud.storage.bucket import Bucket as StorageBucket 

20from flipdare.service.processor._processor_mixin import ProcessorMixin 

21from flipdare.backend.avatar_loader import AvatarLoader 

22from flipdare.app_globals import truncate_string 

23from flipdare.app_log import LOG 

24from flipdare.constants import DOWNLOAD_FILE_DIR, IS_DEBUG, NO_DOC_ID 

25from flipdare.result.app_result import AppResult 

26from flipdare.service.core.step_processor import ProcessingStep, StepProcessor 

27from flipdare.generated import AppErrorCode, GroupMemberType, RequestStatus 

28from flipdare.generated.model.group_member_model import GroupMemberModel 

29from flipdare.generated.model.group_model import GroupKeys 

30from flipdare.generated.model.internal.view_stats_model import ViewStatsKeys 

31from flipdare.generated.shared.firestore_collections import FirestoreCollections 

32from flipdare.wrapper import GroupMemberWrapper, GroupWrapper 

33 

34if TYPE_CHECKING: 

35 

36 from flipdare.backend.indexer_service import IndexerService 

37 from flipdare.firestore.group_db import GroupDb 

38 

39 

40_K = GroupKeys 

41_V = ViewStatsKeys 

42 

43 

44class GroupProcessor(ProcessorMixin): 

45 

46 def __init__( 

47 self, 

48 bucket: StorageBucket, 

49 group_db: GroupDb, 

50 indexer_service: IndexerService, 

51 local_path: Path = DOWNLOAD_FILE_DIR, 

52 ) -> None: 

53 super().__init__(bucket=bucket, local_path=local_path) 

54 self._indexer_service = indexer_service 

55 self._group_db = group_db 

56 

57 @property 

58 def indexer_service(self) -> IndexerService: 

59 return self._indexer_service 

60 

61 @property 

62 def group_db(self) -> GroupDb: 

63 return self._group_db 

64 

65 def process_group( 

66 self, 

67 group: GroupWrapper, 

68 is_update: bool, 

69 before: GroupWrapper | None = None, 

70 ) -> AppResult[GroupWrapper]: 

71 group_id = group.doc_id 

72 main_result = AppResult[GroupWrapper]( 

73 task_name=f"ProcessGroup for {group_id or NO_DOC_ID}", 

74 ) 

75 

76 # Check if already complete 

77 if group.processing_complete: 

78 if is_update: 

79 # stats/detail changed, need to re-index 

80 LOG().info(f"Group update detected for {group_id}, re-indexing in search") 

81 group.reindex() 

82 else: 

83 msg = f"Group already processed for {group_id}" 

84 LOG().info(msg) 

85 return AppResult[GroupWrapper].ok(doc_id=group_id, message=msg) 

86 

87 # Use StepProcessor for the workflow 

88 processor = self._group_processor(group, is_update=is_update, before=before) 

89 if processor is None: 

90 main_result.add_error( 

91 AppErrorCode.PROCESSING_STEP, 

92 f"Failed to build processor for Group {group_id}", 

93 ) 

94 return main_result 

95 

96 result = processor.execute() 

97 if result.is_error: 

98 main_result.merge(result) 

99 

100 return main_result 

101 

102 def _group_processor( 

103 self, 

104 group: GroupWrapper, 

105 is_update: bool, 

106 before: GroupWrapper | None = None, 

107 ) -> StepProcessor[GroupWrapper] | None: 

108 group_id = group.doc_id 

109 steps = [] 

110 

111 # Add atomic stats increment step if we have before data 

112 if before is not None and is_update: 

113 steps.append( 

114 ProcessingStep[Any, GroupWrapper]( 

115 state_key="STATS_INCREMENT", # Dummy key, not tracked in model 

116 handler=lambda m: self._increment_stats_atomically(before, m), 

117 description="Increment stats atomically", 

118 required=False, 

119 ), 

120 ) 

121 

122 steps.extend( 

123 [ 

124 ProcessingStep[_K, GroupWrapper]( 

125 state_key=_K.HASH_GENERATED, 

126 handler=lambda m: self._create_thumbnail_hash(m), 

127 description="Generate thumbnail hash", 

128 required=True, 

129 ), 

130 ProcessingStep[_K, GroupWrapper]( 

131 state_key=_K.MEMBER_CREATED, 

132 handler=lambda m: self._add_owner_member_step(m), 

133 description="Add owner as group member", 

134 required=True, 

135 ), 

136 ProcessingStep[_K, GroupWrapper]( 

137 state_key=_K.SEARCH_INDEXED, 

138 handler=lambda m: self._index_group_in_search(m, is_update=is_update), 

139 description="Index in search", 

140 required=True, 

141 ), 

142 ], 

143 ) 

144 

145 return StepProcessor( 

146 wrapper=group, 

147 steps=steps, 

148 save_handler=lambda m: self._update_member(m), 

149 process_name=f"process_group_{group_id}", 

150 ) 

151 

152 def _update_member(self, model: GroupWrapper) -> AppResult[GroupWrapper]: 

153 result = AppResult[GroupWrapper]( 

154 task_name=f"UpdateGroup {model.doc_id or NO_DOC_ID} in DB", 

155 ) 

156 

157 updated_group: GroupWrapper | None = None 

158 try: 

159 doc_id = model.doc_id 

160 updated_group = self.group_db.update(doc_id=doc_id, updates=model.get_updates()) 

161 

162 except Exception as e: 

163 msg = f"Exception updating Group {model.doc_id}: {e}" 

164 LOG().error(msg) 

165 result.add_error(AppErrorCode.DATABASE_EX, msg) 

166 return result 

167 

168 if updated_group is None: 

169 msg = f"Failed to update Group {model.doc_id} - no data returned" 

170 LOG().error(msg) 

171 result.add_error(AppErrorCode.NOT_FOUND, msg) 

172 return result 

173 

174 result.generated = updated_group 

175 return result 

176 

177 def _increment_stats_atomically( # noqa: PLR0915, PLR0912 

178 self, 

179 before: GroupWrapper, 

180 after: GroupWrapper, 

181 ) -> AppResult[GroupWrapper]: 

182 """Increment stats atomically using Firestore increment to prevent race conditions.""" 

183 main_result = AppResult[GroupWrapper](task_name="IncrementStatsAtomically") 

184 group_id = after.doc_id 

185 

186 # Compare viewStats and calculate deltas 

187 updates = {} 

188 v_before = before.view_stats 

189 v_after = after.view_stats 

190 

191 debug_msg = "" 

192 delta_views: int | None = None 

193 delta_bookmarks: int | None = None 

194 delta_flags: int | None = None 

195 delta_likes: int | None = None 

196 delta_dislikes: int | None = None 

197 

198 if v_after.views > v_before.views: 

199 delta_views = v_after.views - v_before.views 

200 if v_after.bookmarks > v_before.bookmarks: 

201 delta_bookmarks = v_after.bookmarks - v_before.bookmarks 

202 if v_after.flags > v_before.flags: 

203 delta_flags = v_after.flags - v_before.flags 

204 if v_after.likes > v_before.likes: 

205 delta_likes = v_after.likes - v_before.likes 

206 if v_after.dislikes > v_before.dislikes: 

207 delta_dislikes = v_after.dislikes - v_before.dislikes 

208 

209 if ( 

210 delta_views is None 

211 and delta_bookmarks is None 

212 and delta_flags is None 

213 and delta_likes is None 

214 and delta_dislikes is None 

215 ): 

216 if IS_DEBUG: 

217 msg = f"No view stat increases detected for Group {group_id}, skipping atomic increment." 

218 LOG().debug(msg) 

219 

220 return main_result 

221 

222 views_key = f"view_stats.{_V.VIEWS.value}" 

223 bookmarks_key = f"view_stats.{_V.BOOKMARKS.value}" 

224 flags_key = f"view_stats.{_V.FLAGS.value}" 

225 likes_key = f"view_stats.{_V.LIKES.value}" 

226 dislikes_key = f"view_stats.{_V.DISLIKES.value}" 

227 

228 if delta_views is not None: 

229 updates[views_key] = firestore.Increment(delta_views) 

230 debug_msg += f"views: {v_before.views} -> {v_after.views} (delta {delta_views})\n" 

231 if delta_bookmarks is not None: 

232 updates[bookmarks_key] = firestore.Increment(delta_bookmarks) 

233 debug_msg += f"bookmarks: {v_before.bookmarks} -> {v_after.bookmarks} (delta {delta_bookmarks})\n" 

234 if delta_flags is not None: 

235 updates[flags_key] = firestore.Increment(delta_flags) 

236 debug_msg += f"flags: {v_before.flags} -> {v_after.flags} (delta {delta_flags})\n" 

237 if delta_likes is not None: 

238 updates[likes_key] = firestore.Increment(delta_likes) 

239 debug_msg += f"likes: {v_before.likes} -> {v_after.likes} (delta {delta_likes})\n" 

240 if delta_dislikes is not None: 

241 updates[dislikes_key] = firestore.Increment(delta_dislikes) 

242 debug_msg += ( 

243 f"dislikes: {v_before.dislikes} -> {v_after.dislikes} (delta {delta_dislikes})\n" 

244 ) 

245 

246 # Apply atomic increments if any deltas exist 

247 try: 

248 client = self.group_db.client 

249 client.collection(FirestoreCollections.GROUP.value).document(group_id).update(updates) 

250 if IS_DEBUG: 

251 msg = f"Group {group_id}: applied {len(updates)} stat increments\n{debug_msg}" 

252 LOG().debug(msg) 

253 

254 except Exception as e: 

255 msg = f"Exception applying atomic increments for Group {group_id}: {e}" 

256 LOG().error(msg) 

257 main_result.add_error(AppErrorCode.DATABASE_EX, msg) 

258 return main_result 

259 

260 main_result.generated = after 

261 return main_result 

262 

263 def _add_owner_member_step(self, group: GroupWrapper) -> AppResult[GroupMemberWrapper]: 

264 """Ensure the owner is added as a GroupMemberModel entry.""" 

265 doc_id = group.doc_id 

266 main_result = AppResult[GroupMemberWrapper]( 

267 task_name=f"AddOwnerGroupMember for Group {doc_id}", 

268 ) 

269 

270 owner_id = group.uid 

271 # Check if owner is already a member 

272 result = self.group_db.get_member(group_id=doc_id, member_id=owner_id) 

273 if result is not None: 

274 main_result.generated = result 

275 LOG().debug( 

276 f"Owner {owner_id} is already a member of Group {doc_id}, skipping member creation.", 

277 ) 

278 return main_result 

279 

280 # Create GroupMemberModel for owner 

281 LOG().debug(f"Adding owner {owner_id} as member of Group {doc_id}.") 

282 try: 

283 member = GroupMemberModel( 

284 id=None, 

285 gid=doc_id, 

286 uid=owner_id, 

287 member_type=GroupMemberType.OWNER, 

288 status=RequestStatus.ACCEPTED, 

289 ) 

290 except Exception as e: 

291 msg = f"Failed to create GroupMemberModel for owner {owner_id} of Group {doc_id}: {e}" 

292 LOG().error(msg) 

293 main_result.add_error(AppErrorCode.DATABASE_EX, msg) 

294 return main_result 

295 

296 try: 

297 saved_model = self.group_db.create_member(group_id=doc_id, member_data=member) 

298 main_result.generated = saved_model 

299 return main_result 

300 

301 except Exception as e: 

302 msg = f"Failed to save GroupMemberModel for owner {owner_id} of Group {doc_id}: {e}" 

303 LOG().error(msg) 

304 main_result.add_error(AppErrorCode.DATABASE_EX, msg) 

305 return main_result 

306 

307 def _create_thumbnail_hash(self, group: GroupWrapper) -> AppResult[GroupWrapper]: 

308 """Generate thumbnail hash for GroupWrapper avatar image.""" 

309 doc_id = group.doc_id 

310 main_result = AppResult[GroupWrapper]( 

311 task_name=f"CreateThumbnailHash for Group {doc_id}", 

312 ) 

313 

314 avatar_image = group.avatar 

315 if avatar_image is None: 

316 def_avatar = AvatarLoader().get_random_group_avatar() 

317 if IS_DEBUG: 

318 msg = f"No avatar for GroupModel {doc_id}, assigning default avatar {def_avatar.name}" 

319 LOG().debug(msg) 

320 

321 group.avatar = def_avatar.image_model 

322 main_result.generated = group 

323 return main_result 

324 

325 # Generate hash 

326 if IS_DEBUG: 

327 LOG().debug(f"Generating hash for thumbnail of GroupModel: {doc_id}") 

328 

329 hash_code = self.get_image_hash(avatar_image, width=avatar_image.w, height=avatar_image.h) 

330 if hash_code is None: 

331 main_result.add_error(AppErrorCode.HASH, "Failed to generate hash for thumbnail") 

332 return main_result 

333 

334 if IS_DEBUG: 

335 msg = f"Generated thumbnail hash for UserWrapper: {doc_id} hash: {truncate_string(hash_code)}" 

336 LOG().debug(msg) 

337 

338 avatar_image.blur_hash = hash_code 

339 group.avatar = avatar_image 

340 main_result.generated = group 

341 return main_result 

342 

343 def _index_group_in_search( 

344 self, 

345 group: GroupWrapper, 

346 is_update: bool, 

347 ) -> AppResult[GroupWrapper]: 

348 """Add user to search index (or remove if private).""" 

349 main_result = AppResult[GroupWrapper](task_name="IndexGroup InSearch") 

350 

351 # NOTE: we pass to index irrespective of whether the dare is private, 

352 # NOTE: because it may have changed from public to private or vice versa. 

353 # NOTE: and will need to be deleted. 

354 index_result = self.indexer_service.process_group(group, is_update=is_update) 

355 if index_result.is_error: 

356 main_result.merge(index_result) 

357 return main_result 

358 

359 main_result.generated = group 

360 return main_result