Coverage for functions \ flipdare \ service \ processor \ dare_processor.py: 82%
217 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-05-08 12:22 +1000
« prev ^ index » next coverage.py v7.13.0, created at 2026-05-08 12:22 +1000
1#!/usr/bin/env python
2# Copyright (c) 2026 Flipdare Pty Ltd. All rights reserved.
3#
4# This file is part of Flipdare's proprietary software and contains
5# confidential and copyrighted material. Unauthorised copying,
6# modification, distribution, or use of this file is strictly
7# prohibited without prior written permission from Flipdare Pty Ltd.
8#
9# This software includes third-party components licensed under MIT,
10# BSD, and Apache 2.0 licences. See THIRD_PARTY_NOTICES for details.
11#
13from __future__ import annotations
14from pathlib import Path
15from typing import TYPE_CHECKING
17from google.cloud.storage.bucket import Bucket as StorageBucket
19from flipdare.generated.model.internal.video_model import VideoModel
20from flipdare.service.processor._processor_mixin import ProcessorMixin
21from flipdare.service.user_summary_service import UserSummaryService
22from flipdare.app_globals import truncate_string
23from flipdare.app_log import LOG
24from flipdare.app_types import DareBridge
25from flipdare.constants import DOWNLOAD_FILE_DIR, IS_DEBUG, IS_TRACE
26from flipdare.result import AppResult
27from flipdare.service.core.step_processor import ProcessingStep, StepProcessor
28from flipdare.result.job_result import JobResult
29from flipdare.firestore.context.dare_context import (
30 DareContext,
31 DareContextFactory,
32 GroupDareContext,
33 UserDareContext,
34)
35from flipdare.generated import AppErrorCode
36from flipdare.generated.model.dare_model import DareKeys
37from flipdare.wrapper import DareWrapper
38from flipdare.wrapper.backend.user_summary_entry_wrapper import UserSummaryEntryWrapper
40if TYPE_CHECKING:
41 from flipdare.backend.indexer_service import IndexerService
43_K = DareKeys
46class DareProcessor(ProcessorMixin):
48 def __init__(
49 self,
50 bucket: StorageBucket,
51 dare_bridge: DareBridge,
52 indexer: IndexerService,
53 summary_service: UserSummaryService,
54 local_path: Path = DOWNLOAD_FILE_DIR,
55 ) -> None:
56 super().__init__(bucket=bucket, local_path=local_path)
57 self.indexer = indexer
58 self.dare_bridge = dare_bridge
59 self.summary_service = summary_service
61 def process_complete_dare(
62 self,
63 dare: DareWrapper,
64 ) -> JobResult[DareWrapper]:
65 return self._process(dare, is_complete=True)
67 def process_dare(
68 self,
69 dare: DareWrapper,
70 ) -> JobResult[DareWrapper]:
71 return self._process(dare, is_complete=False)
73 def _process(
74 self,
75 dare: DareWrapper,
76 is_complete: bool = False,
77 ) -> JobResult[DareWrapper]:
78 """
79 Process dare video content with multi-step workflow.
81 Steps:
82 0. Increment stats atomically if before/after differ (prevents race conditions)
83 1. Create video thumbnail
84 2. Generate thumbnail hash
85 3. Create optimized video version
86 4. Index in search
87 """
88 dare_id = dare.doc_id
89 # Check if already complete
90 if dare.processing_complete:
91 msg = f"Dare already processed for {dare_id}"
92 if IS_DEBUG:
93 LOG().debug(msg)
95 return JobResult.skip_doc(doc_id=dare_id, message=msg)
97 # Use StepProcessor for the workflow
98 debug_label = f'{"complete" if is_complete else "creation"}'
100 steps: list[ProcessingStep[_K, DareWrapper]] = [
101 ProcessingStep[_K, DareWrapper](
102 state_key=_K.THUMBNAIL_CREATED,
103 handler=lambda m: self._create_video_thumbnail(m, is_complete=is_complete),
104 description=f"Create '{debug_label}' video thumbnail",
105 required=True,
106 ),
107 ProcessingStep[_K, DareWrapper](
108 state_key=_K.HASH_CREATED,
109 handler=lambda m: self._create_thumbnail_hash(m, is_complete=is_complete),
110 description=f"Generate '{debug_label}' thumbnail hash",
111 required=True,
112 ),
113 ProcessingStep[_K, DareWrapper](
114 state_key=_K.OPTIMIZED_VIDEO,
115 handler=lambda m: self._create_optimized_video(m, is_complete=is_complete),
116 description=f"Create '{debug_label}' optimized video",
117 required=False, # Optional - don't fail if optimization fails
118 ),
119 ProcessingStep[_K, DareWrapper](
120 state_key=_K.SEARCH_INDEXED,
121 handler=lambda m: self._index_dare(m),
122 description=f"Index '{debug_label}' in search",
123 required=True,
124 ),
125 # summary entry examines DareStatus, so no need to explicity set is_complete.
126 ProcessingStep[_K, DareWrapper](
127 state_key=_K.EMAIL_SENT,
128 handler=lambda m: self._create_summary_entry(m),
129 description=f"Create '{debug_label}' summary entry",
130 required=False, # Optional - don't fail if summary entry creation fails
131 ),
132 ]
134 processor = StepProcessor(
135 wrapper=dare,
136 steps=steps,
137 save_handler=lambda m: self.dare_bridge.update(m),
138 process_name=f"process_dare_{dare_id}",
139 )
141 result = processor.execute()
142 if result.is_error:
143 return JobResult.from_result(result, doc_id=dare_id, data=dare.to_json_dict())
144 return JobResult.ok(doc_id=dare_id)
146 def _create_video_thumbnail(
147 self, dare: DareWrapper, is_complete: bool = False
148 ) -> AppResult[DareWrapper]:
149 """Create thumbnail image from video."""
150 main_result = AppResult[DareWrapper](task_name="CreateVideoThumbnail")
151 dare_id = dare.doc_id
152 video = self._get_video_to_process(dare, is_complete)
153 if video is None:
154 main_result.add_error(
155 AppErrorCode.MISSING_DATA,
156 "No video found to create thumbnail hash for",
157 )
158 return main_result
160 thumbnail = video.thumbnail
162 # Skip if thumbnail already exists
163 if thumbnail is not None:
164 if IS_DEBUG:
165 msg = f"DareWrapper: {dare_id} already has thumbnail. Skipping thumbnail creation."
166 LOG().debug(msg)
167 main_result.generated = dare
168 return main_result
170 # Generate thumbnail
171 LOG().debug(f"Adding thumbnail image for DareWrapper: {dare_id}")
172 thumbnail = self.generate_thumbnail(
173 video.source.uid,
174 video.source.url,
175 video.w,
176 video.h,
177 )
178 if thumbnail is None:
179 main_result.add_error(
180 AppErrorCode.THUMBNAIL_GENERATION,
181 "Failed to generate thumbnail",
182 )
183 return main_result
185 video.thumbnail = thumbnail
186 if is_complete:
187 dare.set_completed_event_video(video)
188 else:
189 dare.set_video(video)
191 main_result.generated = dare
192 return main_result
194 def _create_thumbnail_hash(
195 self, dare: DareWrapper, is_complete: bool = False
196 ) -> AppResult[DareWrapper]:
197 """Generate perceptual hash for thumbnail image."""
198 main_result = AppResult[DareWrapper](task_name="CreateThumbnailHash")
200 dare_id = dare.doc_id
201 video = self._get_video_to_process(dare, is_complete)
202 if video is None:
203 main_result.add_error(
204 AppErrorCode.MISSING_DATA,
205 "No video found to create thumbnail hash for",
206 )
207 return main_result
209 thumbnail = video.thumbnail
211 if thumbnail is None:
212 main_result.add_error(
213 AppErrorCode.MISSING_DATA,
214 f"Thumbnail missing for dare {dare_id}",
215 )
216 return main_result
218 # Skip if hash already exists
219 if thumbnail.blur_hash is not None:
220 LOG().debug(
221 f"DareWrapper: {dare_id} already has thumbnail hash. Skipping hash generation.",
222 )
223 main_result.generated = dare
224 return main_result
226 # Generate hash
227 LOG().debug(f"Generating hash for thumbnail of DareWrapper: {dare_id}")
228 hash_code = self.get_image_hash(thumbnail, width=video.w, height=video.h)
229 if hash_code is None:
230 main_result.add_error(AppErrorCode.HASH, "Failed to generate hash for thumbnail")
231 return main_result
233 LOG().debug(
234 f"Generated thumbnail hash for DareWrapper: {dare_id} hash: {truncate_string(hash_code)}",
235 )
237 thumbnail.blur_hash = hash_code
238 video.thumbnail = thumbnail
240 if is_complete:
241 dare.set_completed_event_video(video)
242 else:
243 dare.set_video(video)
245 main_result.generated = dare
246 return main_result
248 def _create_optimized_video(
249 self, dare_model: DareWrapper, is_complete: bool = False
250 ) -> AppResult[DareWrapper]:
251 """Create optimized/compressed version of video."""
252 main_result = AppResult[DareWrapper](task_name="CreateOptimizedVideo")
253 dare_id = dare_model.doc_id
254 video = self._get_video_to_process(dare_model, is_complete)
255 if video is None:
256 main_result.add_error(
257 AppErrorCode.MISSING_DATA,
258 "No video found to create optimized version for",
259 )
260 return main_result
262 # Skip if optimized version already exists
263 if video.low is not None:
264 if IS_DEBUG:
265 msg = f"DareWrapper: {dare_id} video already has optimized version. Skipping optimization."
266 LOG().debug(msg)
267 main_result.generated = dare_model
268 return main_result
270 # Create optimized version
271 if IS_DEBUG:
272 LOG().debug(f"Creating optimized video for DareWrapper: {dare_id}")
274 optimized_stored_file = self.optimize_video(video)
275 if optimized_stored_file is None:
276 cause = f"Failed to optimize video for DareWrapper: {dare_id}"
277 main_result.add_error(AppErrorCode.OPTIMIZE_VIDEO, cause)
278 return main_result
280 video.low = optimized_stored_file
281 if is_complete:
282 dare_model.set_completed_event_video(video)
283 else:
284 dare_model.set_video(video)
286 main_result.generated = dare_model
287 return main_result
289 def _create_summary_entry(
290 self,
291 dare_obj: DareWrapper | str,
292 ) -> AppResult[UserSummaryEntryWrapper]:
293 """Create email summary entry for dare action."""
294 doc_id = dare_obj.doc_id if isinstance(dare_obj, DareWrapper) else dare_obj
295 main_result = AppResult[UserSummaryEntryWrapper](doc_id=doc_id)
297 dare_result = self._get_dare(dare_obj)
298 if dare_result.is_error:
299 main_result.merge(dare_result)
300 return main_result
302 dare = dare_result.generated
303 assert dare
304 assert dare.doc_id
306 context_result = self._get_dare_context(dare)
307 if context_result.is_error:
308 main_result.merge(context_result)
309 return main_result
311 context = context_result.generated
312 assert context
314 try:
315 match context:
316 case GroupDareContext():
317 summary_result = self.summary_service.create_group_dare_entry(context)
318 case UserDareContext():
319 summary_result = self.summary_service.create_dare_entry(context)
320 case _:
321 msg = f"Unsupported context type for summary entry: {type(context)}"
322 LOG().error(msg)
323 main_result.add_error(AppErrorCode.CONTEXT, msg)
324 return main_result
326 if summary_result.is_error:
327 main_result.merge(summary_result)
328 else:
329 assert summary_result.generated
330 main_result.generated = summary_result.generated
331 except Exception as e:
332 msg = f"Exception creating summary entry for dare {dare.doc_id}: {e}"
333 LOG().error(msg)
334 main_result.add_error(AppErrorCode.DATABASE_EX, msg)
336 return main_result
338 def _index_dare(
339 self,
340 dare_model: DareWrapper,
341 is_update: bool = False,
342 ) -> AppResult[DareWrapper]:
343 """
344 Add dare to search index (or remove if private).
345 The indexer.process_dare method will check whether the dare needs to be:
346 - added to the index (if new)
347 - updated in the index (if existing and significant changes)
348 - removed from the index (if now private)
349 """
350 main_result = AppResult[DareWrapper](task_name="IndexVideoInSearch")
352 if IS_TRACE:
353 LOG().trace(f"Checking dare {dare_model.doc_id} in search index")
355 index_result = self.indexer.process_dare(dare_model, is_update=is_update)
356 if index_result.is_error:
357 main_result.merge(index_result)
358 return main_result
360 main_result.generated = dare_model
361 return main_result
363 # ========================================================================
364 # Helper Methods
365 # ========================================================================
367 def _get_video_to_process(
368 self,
369 dare: DareWrapper,
370 is_complete: bool = False,
371 ) -> VideoModel | None:
372 """Get the video model to process (either from the dare or the completed event)."""
373 if not is_complete:
374 return dare.video
376 completed_event = dare.completed_event
377 if completed_event is None:
378 LOG().error(f"Dare {dare.doc_id} marked as complete but has no completed event.")
379 return None
381 video = completed_event.video
382 if video is None:
383 LOG().error(f"Dare {dare.doc_id} marked as complete but completed event has no video.")
384 return None
386 return video
388 def _get_dare(self, dare_obj: DareWrapper | str) -> AppResult[DareWrapper]:
389 """Get DareModel from object or ID."""
390 main_result: AppResult[DareWrapper]
392 match dare_obj:
393 case DareWrapper():
394 if IS_TRACE:
395 LOG().trace(f"DareWrapper provided directly for dare {dare_obj.doc_id}")
397 doc_id = dare_obj.doc_id
398 main_result = AppResult[DareWrapper](doc_id=doc_id)
399 main_result.generated = dare_obj
400 return main_result
401 case str():
402 doc_id = dare_obj
403 if IS_TRACE:
404 LOG().trace(f"Dare ID provided for dare {doc_id}")
406 main_result = AppResult[DareWrapper](doc_id=doc_id)
407 dare_id = dare_obj
408 get_result = self.dare_bridge.get(dare_id)
409 if get_result.is_error:
410 LOG().error(f"Error fetching dare {dare_id}: Error:\n{get_result.error_str}")
411 main_result.merge(get_result)
412 else:
413 result = get_result.generated
414 assert result is not None # narrowing, since not error
415 main_result.generated = result
417 return main_result
419 def _get_dare_context(self, dare: DareWrapper) -> AppResult[DareContext]:
420 """Create DareContext from DareModel."""
421 doc_id = dare.doc_id
422 main_result: AppResult[DareContext] = AppResult(doc_id=doc_id)
424 try:
425 dare_context = DareContextFactory().create(dare)
426 if dare_context is not None:
427 main_result.generated = dare_context
428 else:
429 cause = f"Failed to build DareContext for {doc_id}"
430 main_result.add_error(
431 AppErrorCode.CONTEXT,
432 cause,
433 extra=dare.to_json_dict(),
434 )
435 except Exception as e:
436 cause = f"Exception creating DareContext for {doc_id}: {e}"
437 main_result.add_error(
438 AppErrorCode.CONTEXT,
439 cause,
440 extra=dare.to_json_dict(),
441 )
443 return main_result