Coverage for functions \ flipdare \ service \ processor \ dare_processor.py: 82%

217 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-05-08 12:22 +1000

1#!/usr/bin/env python 

2# Copyright (c) 2026 Flipdare Pty Ltd. All rights reserved. 

3# 

4# This file is part of Flipdare's proprietary software and contains 

5# confidential and copyrighted material. Unauthorised copying, 

6# modification, distribution, or use of this file is strictly 

7# prohibited without prior written permission from Flipdare Pty Ltd. 

8# 

9# This software includes third-party components licensed under MIT, 

10# BSD, and Apache 2.0 licences. See THIRD_PARTY_NOTICES for details. 

11# 

12 

13from __future__ import annotations 

14from pathlib import Path 

15from typing import TYPE_CHECKING 

16 

17from google.cloud.storage.bucket import Bucket as StorageBucket 

18 

19from flipdare.generated.model.internal.video_model import VideoModel 

20from flipdare.service.processor._processor_mixin import ProcessorMixin 

21from flipdare.service.user_summary_service import UserSummaryService 

22from flipdare.app_globals import truncate_string 

23from flipdare.app_log import LOG 

24from flipdare.app_types import DareBridge 

25from flipdare.constants import DOWNLOAD_FILE_DIR, IS_DEBUG, IS_TRACE 

26from flipdare.result import AppResult 

27from flipdare.service.core.step_processor import ProcessingStep, StepProcessor 

28from flipdare.result.job_result import JobResult 

29from flipdare.firestore.context.dare_context import ( 

30 DareContext, 

31 DareContextFactory, 

32 GroupDareContext, 

33 UserDareContext, 

34) 

35from flipdare.generated import AppErrorCode 

36from flipdare.generated.model.dare_model import DareKeys 

37from flipdare.wrapper import DareWrapper 

38from flipdare.wrapper.backend.user_summary_entry_wrapper import UserSummaryEntryWrapper 

39 

40if TYPE_CHECKING: 

41 from flipdare.backend.indexer_service import IndexerService 

42 

43_K = DareKeys 

44 

45 

46class DareProcessor(ProcessorMixin): 

47 

48 def __init__( 

49 self, 

50 bucket: StorageBucket, 

51 dare_bridge: DareBridge, 

52 indexer: IndexerService, 

53 summary_service: UserSummaryService, 

54 local_path: Path = DOWNLOAD_FILE_DIR, 

55 ) -> None: 

56 super().__init__(bucket=bucket, local_path=local_path) 

57 self.indexer = indexer 

58 self.dare_bridge = dare_bridge 

59 self.summary_service = summary_service 

60 

61 def process_complete_dare( 

62 self, 

63 dare: DareWrapper, 

64 ) -> JobResult[DareWrapper]: 

65 return self._process(dare, is_complete=True) 

66 

67 def process_dare( 

68 self, 

69 dare: DareWrapper, 

70 ) -> JobResult[DareWrapper]: 

71 return self._process(dare, is_complete=False) 

72 

73 def _process( 

74 self, 

75 dare: DareWrapper, 

76 is_complete: bool = False, 

77 ) -> JobResult[DareWrapper]: 

78 """ 

79 Process dare video content with multi-step workflow. 

80 

81 Steps: 

82 0. Increment stats atomically if before/after differ (prevents race conditions) 

83 1. Create video thumbnail 

84 2. Generate thumbnail hash 

85 3. Create optimized video version 

86 4. Index in search 

87 """ 

88 dare_id = dare.doc_id 

89 # Check if already complete 

90 if dare.processing_complete: 

91 msg = f"Dare already processed for {dare_id}" 

92 if IS_DEBUG: 

93 LOG().debug(msg) 

94 

95 return JobResult.skip_doc(doc_id=dare_id, message=msg) 

96 

97 # Use StepProcessor for the workflow 

98 debug_label = f'{"complete" if is_complete else "creation"}' 

99 

100 steps: list[ProcessingStep[_K, DareWrapper]] = [ 

101 ProcessingStep[_K, DareWrapper]( 

102 state_key=_K.THUMBNAIL_CREATED, 

103 handler=lambda m: self._create_video_thumbnail(m, is_complete=is_complete), 

104 description=f"Create '{debug_label}' video thumbnail", 

105 required=True, 

106 ), 

107 ProcessingStep[_K, DareWrapper]( 

108 state_key=_K.HASH_CREATED, 

109 handler=lambda m: self._create_thumbnail_hash(m, is_complete=is_complete), 

110 description=f"Generate '{debug_label}' thumbnail hash", 

111 required=True, 

112 ), 

113 ProcessingStep[_K, DareWrapper]( 

114 state_key=_K.OPTIMIZED_VIDEO, 

115 handler=lambda m: self._create_optimized_video(m, is_complete=is_complete), 

116 description=f"Create '{debug_label}' optimized video", 

117 required=False, # Optional - don't fail if optimization fails 

118 ), 

119 ProcessingStep[_K, DareWrapper]( 

120 state_key=_K.SEARCH_INDEXED, 

121 handler=lambda m: self._index_dare(m), 

122 description=f"Index '{debug_label}' in search", 

123 required=True, 

124 ), 

125 # summary entry examines DareStatus, so no need to explicity set is_complete. 

126 ProcessingStep[_K, DareWrapper]( 

127 state_key=_K.EMAIL_SENT, 

128 handler=lambda m: self._create_summary_entry(m), 

129 description=f"Create '{debug_label}' summary entry", 

130 required=False, # Optional - don't fail if summary entry creation fails 

131 ), 

132 ] 

133 

134 processor = StepProcessor( 

135 wrapper=dare, 

136 steps=steps, 

137 save_handler=lambda m: self.dare_bridge.update(m), 

138 process_name=f"process_dare_{dare_id}", 

139 ) 

140 

141 result = processor.execute() 

142 if result.is_error: 

143 return JobResult.from_result(result, doc_id=dare_id, data=dare.to_json_dict()) 

144 return JobResult.ok(doc_id=dare_id) 

145 

146 def _create_video_thumbnail( 

147 self, dare: DareWrapper, is_complete: bool = False 

148 ) -> AppResult[DareWrapper]: 

149 """Create thumbnail image from video.""" 

150 main_result = AppResult[DareWrapper](task_name="CreateVideoThumbnail") 

151 dare_id = dare.doc_id 

152 video = self._get_video_to_process(dare, is_complete) 

153 if video is None: 

154 main_result.add_error( 

155 AppErrorCode.MISSING_DATA, 

156 "No video found to create thumbnail hash for", 

157 ) 

158 return main_result 

159 

160 thumbnail = video.thumbnail 

161 

162 # Skip if thumbnail already exists 

163 if thumbnail is not None: 

164 if IS_DEBUG: 

165 msg = f"DareWrapper: {dare_id} already has thumbnail. Skipping thumbnail creation." 

166 LOG().debug(msg) 

167 main_result.generated = dare 

168 return main_result 

169 

170 # Generate thumbnail 

171 LOG().debug(f"Adding thumbnail image for DareWrapper: {dare_id}") 

172 thumbnail = self.generate_thumbnail( 

173 video.source.uid, 

174 video.source.url, 

175 video.w, 

176 video.h, 

177 ) 

178 if thumbnail is None: 

179 main_result.add_error( 

180 AppErrorCode.THUMBNAIL_GENERATION, 

181 "Failed to generate thumbnail", 

182 ) 

183 return main_result 

184 

185 video.thumbnail = thumbnail 

186 if is_complete: 

187 dare.set_completed_event_video(video) 

188 else: 

189 dare.set_video(video) 

190 

191 main_result.generated = dare 

192 return main_result 

193 

194 def _create_thumbnail_hash( 

195 self, dare: DareWrapper, is_complete: bool = False 

196 ) -> AppResult[DareWrapper]: 

197 """Generate perceptual hash for thumbnail image.""" 

198 main_result = AppResult[DareWrapper](task_name="CreateThumbnailHash") 

199 

200 dare_id = dare.doc_id 

201 video = self._get_video_to_process(dare, is_complete) 

202 if video is None: 

203 main_result.add_error( 

204 AppErrorCode.MISSING_DATA, 

205 "No video found to create thumbnail hash for", 

206 ) 

207 return main_result 

208 

209 thumbnail = video.thumbnail 

210 

211 if thumbnail is None: 

212 main_result.add_error( 

213 AppErrorCode.MISSING_DATA, 

214 f"Thumbnail missing for dare {dare_id}", 

215 ) 

216 return main_result 

217 

218 # Skip if hash already exists 

219 if thumbnail.blur_hash is not None: 

220 LOG().debug( 

221 f"DareWrapper: {dare_id} already has thumbnail hash. Skipping hash generation.", 

222 ) 

223 main_result.generated = dare 

224 return main_result 

225 

226 # Generate hash 

227 LOG().debug(f"Generating hash for thumbnail of DareWrapper: {dare_id}") 

228 hash_code = self.get_image_hash(thumbnail, width=video.w, height=video.h) 

229 if hash_code is None: 

230 main_result.add_error(AppErrorCode.HASH, "Failed to generate hash for thumbnail") 

231 return main_result 

232 

233 LOG().debug( 

234 f"Generated thumbnail hash for DareWrapper: {dare_id} hash: {truncate_string(hash_code)}", 

235 ) 

236 

237 thumbnail.blur_hash = hash_code 

238 video.thumbnail = thumbnail 

239 

240 if is_complete: 

241 dare.set_completed_event_video(video) 

242 else: 

243 dare.set_video(video) 

244 

245 main_result.generated = dare 

246 return main_result 

247 

248 def _create_optimized_video( 

249 self, dare_model: DareWrapper, is_complete: bool = False 

250 ) -> AppResult[DareWrapper]: 

251 """Create optimized/compressed version of video.""" 

252 main_result = AppResult[DareWrapper](task_name="CreateOptimizedVideo") 

253 dare_id = dare_model.doc_id 

254 video = self._get_video_to_process(dare_model, is_complete) 

255 if video is None: 

256 main_result.add_error( 

257 AppErrorCode.MISSING_DATA, 

258 "No video found to create optimized version for", 

259 ) 

260 return main_result 

261 

262 # Skip if optimized version already exists 

263 if video.low is not None: 

264 if IS_DEBUG: 

265 msg = f"DareWrapper: {dare_id} video already has optimized version. Skipping optimization." 

266 LOG().debug(msg) 

267 main_result.generated = dare_model 

268 return main_result 

269 

270 # Create optimized version 

271 if IS_DEBUG: 

272 LOG().debug(f"Creating optimized video for DareWrapper: {dare_id}") 

273 

274 optimized_stored_file = self.optimize_video(video) 

275 if optimized_stored_file is None: 

276 cause = f"Failed to optimize video for DareWrapper: {dare_id}" 

277 main_result.add_error(AppErrorCode.OPTIMIZE_VIDEO, cause) 

278 return main_result 

279 

280 video.low = optimized_stored_file 

281 if is_complete: 

282 dare_model.set_completed_event_video(video) 

283 else: 

284 dare_model.set_video(video) 

285 

286 main_result.generated = dare_model 

287 return main_result 

288 

289 def _create_summary_entry( 

290 self, 

291 dare_obj: DareWrapper | str, 

292 ) -> AppResult[UserSummaryEntryWrapper]: 

293 """Create email summary entry for dare action.""" 

294 doc_id = dare_obj.doc_id if isinstance(dare_obj, DareWrapper) else dare_obj 

295 main_result = AppResult[UserSummaryEntryWrapper](doc_id=doc_id) 

296 

297 dare_result = self._get_dare(dare_obj) 

298 if dare_result.is_error: 

299 main_result.merge(dare_result) 

300 return main_result 

301 

302 dare = dare_result.generated 

303 assert dare 

304 assert dare.doc_id 

305 

306 context_result = self._get_dare_context(dare) 

307 if context_result.is_error: 

308 main_result.merge(context_result) 

309 return main_result 

310 

311 context = context_result.generated 

312 assert context 

313 

314 try: 

315 match context: 

316 case GroupDareContext(): 

317 summary_result = self.summary_service.create_group_dare_entry(context) 

318 case UserDareContext(): 

319 summary_result = self.summary_service.create_dare_entry(context) 

320 case _: 

321 msg = f"Unsupported context type for summary entry: {type(context)}" 

322 LOG().error(msg) 

323 main_result.add_error(AppErrorCode.CONTEXT, msg) 

324 return main_result 

325 

326 if summary_result.is_error: 

327 main_result.merge(summary_result) 

328 else: 

329 assert summary_result.generated 

330 main_result.generated = summary_result.generated 

331 except Exception as e: 

332 msg = f"Exception creating summary entry for dare {dare.doc_id}: {e}" 

333 LOG().error(msg) 

334 main_result.add_error(AppErrorCode.DATABASE_EX, msg) 

335 

336 return main_result 

337 

338 def _index_dare( 

339 self, 

340 dare_model: DareWrapper, 

341 is_update: bool = False, 

342 ) -> AppResult[DareWrapper]: 

343 """ 

344 Add dare to search index (or remove if private). 

345 The indexer.process_dare method will check whether the dare needs to be: 

346 - added to the index (if new) 

347 - updated in the index (if existing and significant changes) 

348 - removed from the index (if now private) 

349 """ 

350 main_result = AppResult[DareWrapper](task_name="IndexVideoInSearch") 

351 

352 if IS_TRACE: 

353 LOG().trace(f"Checking dare {dare_model.doc_id} in search index") 

354 

355 index_result = self.indexer.process_dare(dare_model, is_update=is_update) 

356 if index_result.is_error: 

357 main_result.merge(index_result) 

358 return main_result 

359 

360 main_result.generated = dare_model 

361 return main_result 

362 

363 # ======================================================================== 

364 # Helper Methods 

365 # ======================================================================== 

366 

367 def _get_video_to_process( 

368 self, 

369 dare: DareWrapper, 

370 is_complete: bool = False, 

371 ) -> VideoModel | None: 

372 """Get the video model to process (either from the dare or the completed event).""" 

373 if not is_complete: 

374 return dare.video 

375 

376 completed_event = dare.completed_event 

377 if completed_event is None: 

378 LOG().error(f"Dare {dare.doc_id} marked as complete but has no completed event.") 

379 return None 

380 

381 video = completed_event.video 

382 if video is None: 

383 LOG().error(f"Dare {dare.doc_id} marked as complete but completed event has no video.") 

384 return None 

385 

386 return video 

387 

388 def _get_dare(self, dare_obj: DareWrapper | str) -> AppResult[DareWrapper]: 

389 """Get DareModel from object or ID.""" 

390 main_result: AppResult[DareWrapper] 

391 

392 match dare_obj: 

393 case DareWrapper(): 

394 if IS_TRACE: 

395 LOG().trace(f"DareWrapper provided directly for dare {dare_obj.doc_id}") 

396 

397 doc_id = dare_obj.doc_id 

398 main_result = AppResult[DareWrapper](doc_id=doc_id) 

399 main_result.generated = dare_obj 

400 return main_result 

401 case str(): 

402 doc_id = dare_obj 

403 if IS_TRACE: 

404 LOG().trace(f"Dare ID provided for dare {doc_id}") 

405 

406 main_result = AppResult[DareWrapper](doc_id=doc_id) 

407 dare_id = dare_obj 

408 get_result = self.dare_bridge.get(dare_id) 

409 if get_result.is_error: 

410 LOG().error(f"Error fetching dare {dare_id}: Error:\n{get_result.error_str}") 

411 main_result.merge(get_result) 

412 else: 

413 result = get_result.generated 

414 assert result is not None # narrowing, since not error 

415 main_result.generated = result 

416 

417 return main_result 

418 

419 def _get_dare_context(self, dare: DareWrapper) -> AppResult[DareContext]: 

420 """Create DareContext from DareModel.""" 

421 doc_id = dare.doc_id 

422 main_result: AppResult[DareContext] = AppResult(doc_id=doc_id) 

423 

424 try: 

425 dare_context = DareContextFactory().create(dare) 

426 if dare_context is not None: 

427 main_result.generated = dare_context 

428 else: 

429 cause = f"Failed to build DareContext for {doc_id}" 

430 main_result.add_error( 

431 AppErrorCode.CONTEXT, 

432 cause, 

433 extra=dare.to_json_dict(), 

434 ) 

435 except Exception as e: 

436 cause = f"Exception creating DareContext for {doc_id}: {e}" 

437 main_result.add_error( 

438 AppErrorCode.CONTEXT, 

439 cause, 

440 extra=dare.to_json_dict(), 

441 ) 

442 

443 return main_result