Coverage for functions \ flipdare \ service \ processor \ user_processor.py: 55%

134 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-05-08 12:22 +1000

1#!/usr/bin/env python 

2# Copyright (c) 2026 Flipdare Pty Ltd. All rights reserved. 

3# 

4# This file is part of Flipdare's proprietary software and contains 

5# confidential and copyrighted material. Unauthorised copying, 

6# modification, distribution, or use of this file is strictly 

7# prohibited without prior written permission from Flipdare Pty Ltd. 

8# 

9# This software includes third-party components licensed under MIT, 

10# BSD, and Apache 2.0 licences. See THIRD_PARTY_NOTICES for details. 

11# 

12 

13from __future__ import annotations 

14 

15from pathlib import Path 

16from typing import TYPE_CHECKING, Any 

17from google.cloud import firestore 

18from google.cloud.storage.bucket import Bucket as StorageBucket 

19from flipdare.result.outcome import Outcome 

20from flipdare.service.processor._processor_mixin import ProcessorMixin 

21from flipdare.app_globals import truncate_string 

22from flipdare.app_log import LOG 

23from flipdare.app_types import UserBridge 

24from flipdare.constants import DOWNLOAD_FILE_DIR 

25from flipdare.result.app_result import AppResult 

26from flipdare.result.job_result import JobResult 

27from flipdare.service.core.step_processor import ProcessingStep, StepProcessor 

28from flipdare.generated import AppErrorCode 

29from flipdare.generated.model.user_model import UserKeys 

30from flipdare.generated.shared.firestore_collections import FirestoreCollections 

31from flipdare.util.time_util import TimeUtil 

32from flipdare.wrapper import UserWrapper 

33 

34if TYPE_CHECKING: 

35 from flipdare.backend.indexer_service import IndexerService 

36 from flipdare.firestore.invite_db import InviteDb 

37 

38_K = UserKeys 

39 

40 

41class UserProcessor(ProcessorMixin): 

42 def __init__( 

43 self, 

44 bucket: StorageBucket, 

45 user_bridge: UserBridge, 

46 invite_db: InviteDb, 

47 indexer_service: IndexerService, 

48 local_path: Path = DOWNLOAD_FILE_DIR, 

49 ) -> None: 

50 super().__init__(bucket=bucket, local_path=local_path) 

51 self._indexer_service = indexer_service 

52 self._user_bridge = user_bridge 

53 self.invite_db = invite_db 

54 

55 @property 

56 def indexer_service(self) -> IndexerService: 

57 return self._indexer_service 

58 

59 @property 

60 def user_bridge(self) -> UserBridge: 

61 return self._user_bridge 

62 

63 def process_user( 

64 self, 

65 user: UserWrapper, 

66 is_update: bool, 

67 before: UserWrapper | None = None, 

68 ) -> JobResult[UserWrapper]: 

69 user_id = user.doc_id 

70 # Check if already complete 

71 if user.processing_complete: 

72 if is_update: 

73 # stats/detail changed, need to re-index 

74 LOG().info(f"User update detected for {user_id}, re-indexing in search") 

75 user.reindex() 

76 else: 

77 msg = f"User already processed for {user_id}" 

78 LOG().info(msg) 

79 return JobResult.skip_doc(doc_id=user_id, message=msg) 

80 

81 # Use StepProcessor for the workflow 

82 start = TimeUtil.get_current_utc_dt() 

83 

84 steps: list[ProcessingStep[_K, UserWrapper]] = [] 

85 

86 # Add atomic stats increment step if we have before data 

87 if before is not None and is_update: 

88 steps.append( 

89 ProcessingStep[Any, UserWrapper]( 

90 state_key="STATS_INCREMENT", # Dummy key, not tracked in model 

91 handler=lambda m: self._increment_stats_atomically(before, m), 

92 description="Increment stats atomically", 

93 required=False, 

94 ), 

95 ) 

96 

97 steps.extend( 

98 [ 

99 ProcessingStep[_K, UserWrapper]( 

100 state_key=_K.CONTEXT_CREATED, 

101 handler=lambda m: self._create_thumbnail_hash_step(m), 

102 description="Generate thumbnail hash", 

103 required=True, 

104 ), 

105 ProcessingStep[_K, UserWrapper]( 

106 state_key=_K.SEARCH_INDEXED, 

107 handler=lambda m: self._index_user_in_search_step(m, is_update=is_update), 

108 description="Index in search", 

109 required=True, 

110 ), 

111 ], 

112 ) 

113 

114 processor = StepProcessor( 

115 wrapper=user, 

116 steps=steps, 

117 save_handler=lambda m: self.user_bridge.update(m), 

118 process_name=f"process_user_{user_id}", 

119 ) 

120 

121 result = processor.execute() 

122 

123 end = TimeUtil.get_current_utc_dt() 

124 duration = TimeUtil.duration_in_seconds(start, end) 

125 

126 outcome = result.outcome 

127 match outcome: 

128 case Outcome.ERROR: 

129 LOG().error(f"Error processing user {user_id}: {result.message}") 

130 return JobResult.from_result( 

131 result, 

132 doc_id=user_id, 

133 data=user.to_json_dict(), 

134 duration=duration, 

135 ) 

136 case Outcome.SKIPPED: 

137 return JobResult.skip_doc( 

138 doc_id=user_id, 

139 message=result.message or "Already processed", 

140 duration=duration, 

141 ) 

142 case Outcome.WARNING: 

143 LOG().warning(f"Partial processing for user {user_id}: {result.message}") 

144 return JobResult.partial( 

145 doc_id=user_id, 

146 message=result.message or "Partial processing", 

147 error_code=result.main_error or AppErrorCode.PROCESSING_STEP, 

148 duration=duration, 

149 ) 

150 case Outcome.OK: 

151 return JobResult.ok(doc_id=user_id) 

152 

153 def _increment_stats_atomically( 

154 self, 

155 before: UserWrapper, 

156 after: UserWrapper, 

157 ) -> AppResult[UserWrapper]: 

158 """Increment stats atomically using Firestore increment to prevent race conditions.""" 

159 main_result = AppResult[UserWrapper](task_name="IncrementStatsAtomically") 

160 user_id = after.doc_id 

161 # Compare viewStats and calculate deltas 

162 updates = {} 

163 before_views = before.view_stats 

164 after_views = after.view_stats 

165 

166 if after_views.views > before_views.views: 

167 delta = after_views.views - before_views.views 

168 updates["viewStats.views"] = firestore.Increment(delta) 

169 LOG().debug(f"User {user_id}: incrementing views by {delta}") 

170 

171 if after_views.bookmarks > before_views.bookmarks: 

172 delta = after_views.bookmarks - before_views.bookmarks 

173 updates["viewStats.bookmarks"] = firestore.Increment(delta) 

174 LOG().debug(f"User {user_id}: incrementing bookmarks by {delta}") 

175 

176 if after_views.flags > before_views.flags: 

177 delta = after_views.flags - before_views.flags 

178 updates["viewStats.flags"] = firestore.Increment(delta) 

179 LOG().debug(f"User {user_id}: incrementing flags by {delta}") 

180 

181 if after_views.likes > before_views.likes: 

182 delta = after_views.likes - before_views.likes 

183 updates["viewStats.likes"] = firestore.Increment(delta) 

184 LOG().debug(f"User {user_id}: incrementing likes by {delta}") 

185 

186 if after_views.dislikes > before_views.dislikes: 

187 delta = after_views.dislikes - before_views.dislikes 

188 updates["viewStats.dislikes"] = firestore.Increment(delta) 

189 LOG().debug(f"User {user_id}: incrementing dislikes by {delta}") 

190 

191 # Apply atomic increments if any deltas exist 

192 if updates: 

193 try: 

194 self.user_bridge.db.client.collection(FirestoreCollections.USER.value).document( 

195 user_id, 

196 ).update(updates) 

197 LOG().info(f"User {user_id}: applied {len(updates)} atomic stat increments") 

198 except Exception as e: 

199 main_result.add_error( 

200 AppErrorCode.DATABASE_EX, 

201 f"Failed to apply atomic increments: {e}", 

202 ) 

203 return main_result 

204 

205 main_result.generated = after 

206 return main_result 

207 

208 def _create_thumbnail_hash_step(self, user: UserWrapper) -> AppResult[UserWrapper]: 

209 """Generate thumbnail hash for UserWrapper avatar image.""" 

210 doc_id = user.doc_id 

211 main_result = AppResult[UserWrapper](task_name=f"CreateThumbnailHash for User {doc_id}") 

212 

213 avatar_image = user.avatar 

214 if avatar_image is None: 

215 # thi can happen if no avatar set 

216 msg = f"No avatar image for UserWrapper: {doc_id}" 

217 return AppResult[UserWrapper].skip(doc_id=doc_id, message=msg) 

218 

219 # Generate hash 

220 LOG().debug(f"Generating hash for thumbnail of UserWrapper: {doc_id}") 

221 hash_code = self.get_image_hash(avatar_image, width=avatar_image.w, height=avatar_image.h) 

222 if hash_code is None: 

223 main_result.add_error(AppErrorCode.HASH, "Failed to generate hash for thumbnail") 

224 return main_result 

225 

226 LOG().debug( 

227 f"Generated thumbnail hash for UserWrapper: {doc_id} hash: {truncate_string(hash_code)}", 

228 ) 

229 

230 avatar_image.blur_hash = hash_code 

231 user.avatar = avatar_image 

232 main_result.generated = user 

233 return main_result 

234 

235 def _mark_invite_signup_step(self, user: UserWrapper) -> AppResult[UserWrapper]: 

236 """Mark invites as processed when user signs up.""" 

237 doc_id = user.doc_id 

238 main_result = AppResult[UserWrapper](task_name=f"MarkInviteSignup for User {doc_id}") 

239 

240 to_email = user.email 

241 LOG().debug(f"Marking invites as processed for email: {to_email}") 

242 try: 

243 self.invite_db.mark_invite_signup(to_email) 

244 except Exception as error: 

245 main_result.add_exception( 

246 AppErrorCode.DATABASE_EX, 

247 ex=error, 

248 extra=f"Error marking invites as processed for email: {to_email}", 

249 ) 

250 return main_result 

251 

252 main_result.generated = user 

253 return main_result 

254 

255 def _index_user_in_search_step( 

256 self, 

257 user: UserWrapper, 

258 is_update: bool, 

259 ) -> AppResult[UserWrapper]: 

260 """Add user to search index (or remove if private).""" 

261 main_result = AppResult[UserWrapper](task_name="IndexUserInSearch") 

262 

263 # NOTE: we pass to index irrespective of whether the dare is private, 

264 # NOTE: because it may have changed from public to private or vice versa. 

265 # NOTE: and will need to be deleted. 

266 index_result = self.indexer_service.process_user(user, is_update=is_update) 

267 if index_result.is_error: 

268 main_result.merge(index_result) 

269 return main_result 

270 

271 main_result.generated = user 

272 return main_result