Coverage for functions \ flipdare \ service \ content_service.py: 44%
72 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-05-08 12:22 +1000
« prev ^ index » next coverage.py v7.13.0, created at 2026-05-08 12:22 +1000
1#!/usr/bin/env python
2# Copyright (c) 2026 Flipdare Pty Ltd. All rights reserved.
3#
4# This file is part of Flipdare's proprietary software and contains
5# confidential and copyrighted material. Unauthorised copying,
6# modification, distribution, or use of this file is strictly
7# prohibited without prior written permission from Flipdare Pty Ltd.
8#
9# This software includes third-party components licensed under MIT,
10# BSD, and Apache 2.0 licences. See THIRD_PARTY_NOTICES for details.
11#
13from __future__ import annotations
14from pathlib import Path
15from typing import TYPE_CHECKING
16from flipdare.service._service_provider import ServiceProvider
17from flipdare.service.processor.content_processor import ContentProcessor
18from flipdare.app_log import LOG
19from flipdare.constants import DOWNLOAD_FILE_DIR
20from flipdare.result.app_result import AppResult
21from flipdare.core.job_type_decorator import job_type_decorator
22from flipdare.result.job_result import JobResult
23from flipdare.core.trigger_decorator import trigger_decorator
24from flipdare.generated.shared.app_error_code import AppErrorCode
25from flipdare.generated.shared.backend.app_job_type import AppJobType
26from flipdare.generated.shared.firestore_collections import FirestoreCollections
27from flipdare.wrapper import AppJobWrapper, ContentWrapper
28from flipdare.app_types import CronResult
29from flipdare.constants import IS_DEBUG
30from flipdare.core.cron_decorator import cron_decorator
31from flipdare.generated.model.backend.metric.count_metric import CountMetric
32from flipdare.util.time_util import TimeUtil
34if TYPE_CHECKING:
35 from flipdare.manager.db_manager import DbManager
36 from flipdare.manager.backend_manager import BackendManager
38__all__ = ["ContentService"]
41_JT = AppJobType
42_COL = FirestoreCollections.CONTENT
45class ContentService(ServiceProvider):
47 def __init__(
48 self,
49 db_manager: DbManager | None = None,
50 backend_manager: BackendManager | None = None,
51 local_path: Path = DOWNLOAD_FILE_DIR,
52 ) -> None:
53 super().__init__(
54 backend_manager=backend_manager,
55 db_manager=db_manager,
56 )
58 self._local_path = local_path
59 self._content_processor: ContentProcessor | None = None
61 @property
62 def content_processor(self) -> ContentProcessor:
63 if self._content_processor is None:
64 self._content_processor = ContentProcessor(
65 bucket=self.storage_bucket,
66 content_db=self.content_db,
67 indexer_service=self.indexer,
68 local_path=self._local_path,
69 )
70 return self._content_processor
72 # ========================================================================
73 # CRONS
74 # ========================================================================
76 @cron_decorator(job_type=_JT.CR_CONTENT_UNPROCESSED)
77 def cron_content_unprocessed(self) -> CronResult:
78 content_processor = self.content_processor
79 main_result = AppResult()
80 success_ct = 0
81 failed_ct = 0
82 skipped_ct = 0
83 start = TimeUtil.get_current_utc_dt()
85 try:
86 items = self.content_db.get_recent_unprocessed()
87 for content in items:
88 if content.processing_complete:
89 skipped_ct += 1
90 continue
91 log_result = content_processor.process_content(content, is_update=False)
92 result = log_result.app_result
93 if result.is_ok:
94 success_ct += 1
95 elif result.is_skipped:
96 skipped_ct += 1
97 else:
98 failed_ct += 1
99 main_result.merge(result)
100 except Exception as e:
101 msg = f"Exception during cron_content_unprocessed: {e}"
102 LOG().error(msg)
103 main_result.add_error(AppErrorCode.DATABASE_EX, msg)
105 end = TimeUtil.get_current_utc_dt()
106 duration = TimeUtil.duration_in_seconds(start, end)
108 if IS_DEBUG:
109 LOG().debug(
110 f"cron_content_unprocessed: {success_ct} ok, "
111 f"{failed_ct} failed, {skipped_ct} skipped."
112 )
114 if main_result.is_ok:
115 return CountMetric(
116 success_ct=success_ct,
117 failed_ct=failed_ct,
118 skipped_ct=skipped_ct,
119 duration=duration,
120 )
122 LOG().error(f"cron_content_unprocessed errors:\n{main_result.formatted}")
123 return CountMetric.error(duration=duration)
125 # ========================================================================
126 # TRIGGERS - Delegate to processors
127 # ========================================================================
129 @job_type_decorator(_JT.TR_CONTENT)
130 @trigger_decorator(job_type=_JT.TR_CONTENT, collection=_COL, wrapper_class=ContentWrapper)
131 def trigger_content(
132 self,
133 job: AppJobWrapper,
134 *,
135 wrapper: ContentWrapper,
136 ) -> JobResult[ContentWrapper]:
137 is_update = job.has_changes
138 return self.content_processor.process_content(wrapper, is_update=is_update)