Coverage for functions \ flipdare \ service \ core \ cron_processor.py: 84%
141 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-05-08 12:22 +1000
« prev ^ index » next coverage.py v7.13.0, created at 2026-05-08 12:22 +1000
1#!/usr/bin/env python
2# Copyright (c) 2026 Flipdare Pty Ltd. All rights reserved.
3#
4# This file is part of Flipdare's proprietary software and contains
5# confidential and copyrighted material. Unauthorised copying,
6# modification, distribution, or use of this file is strictly
7# prohibited without prior written permission from Flipdare Pty Ltd.
8#
9# This software includes third-party components licensed under MIT,
10# BSD, and Apache 2.0 licences. See THIRD_PARTY_NOTICES for details.
11#
13"""
14Generic cron processor to eliminate code duplication across admin cron methods.
15"""
17from __future__ import annotations
19from collections.abc import Callable
20from typing import TYPE_CHECKING, Any
21from collections.abc import Sequence
22from dataclasses import dataclass
23from flipdare.app_log import LOG
24from flipdare.app_types import CronResult
25from flipdare.constants import NO_DOC_ID
26from flipdare.result.app_result import AppResult
27from flipdare.core.cron_decorator import cron_decorator
28from flipdare.result.output_result import OutputResult
29from flipdare.result.job_result import JobResult
30from flipdare.generated.model.backend.metric.count_metric import CountMetric
31from flipdare.generated.shared.app_error_code import AppErrorCode
32from flipdare.generated.shared.backend.app_job_type import AppJobType
33from flipdare.result.outcome import Outcome
34from flipdare.util.time_util import TimeUtil
35from flipdare.wrapper import PersistedWrapper
37if TYPE_CHECKING:
38 from flipdare.backend.app_logger import AppLogger
40__all__ = ["CronProcessor", "CronConfig"]
43@dataclass(frozen=True)
44class CronResultEntry:
45 doc_id: str
46 outcome: Outcome
47 message: str
50class CronConfig[T: PersistedWrapper[Any]]:
51 """Configuration for a generic cron processor."""
53 def __init__(
54 self,
55 job_type: AppJobType,
56 job_name: str,
57 query_fn: Callable[[], list[T]],
58 process_fn: Callable[[T], Outcome | AppResult[Any] | JobResult[Any]],
59 error_type: AppErrorCode = AppErrorCode.DATABASE_EX,
60 report_fn: (
61 Callable[[AppJobType, Sequence[CronResultEntry]], OutputResult | None] | None
62 ) = None,
63 skip_empty_check: bool = False,
64 ) -> None:
65 """
66 Initialize cron configuration.
68 Args:
69 job_type: The AppJobType for the perf decorator
70 job_name: Human-readable name for logging (e.g., "cron_invite_unprocessed")
71 query_fn: Function that returns list of items to process
72 process_fn: Function that processes each item and returns ResultValue or AppResult
73 error_type: Default error type for exceptions
74 report_fn: Optional function to call with (job_type, ok_ids, error_ids) for reporting
75 skip_empty_check: If True, don't return early when query returns empty list
77 """
78 self.job_type = job_type
79 self.job_name = job_name
80 self.query_fn = query_fn
81 self.process_fn = process_fn
82 self.error_type = error_type
83 self.report_fn = report_fn
84 self.skip_empty_check = skip_empty_check
87class CronProcessor[T: PersistedWrapper[Any]]:
88 """
89 Generic processor for cron jobs that follow the pattern:
90 1. Query for items
91 2. Process each item
92 3. Track success/failure/skip counts
93 4. Handle exceptions
94 5. Return performance results
96 Example usage:
97 config = CronConfig(
98 job_type=AppJobType.CR_INVITE_UNPROCESSED,
99 job_name="cron_invite_unprocessed",
100 query_fn=lambda: self.invite_db.get_unprocessed_invites_last_week(),
101 process_fn=lambda invite: self._invite_processor.process_invite_signup(invite)
102 )
103 return CronProcessor.process(config)
104 """
106 def __init__(
107 self,
108 config: CronConfig[T],
109 log_creator: AppLogger | None = None,
110 ) -> None:
111 self._log_creator = log_creator
112 self._config = config
114 @property
115 def log_creator(self) -> AppLogger:
116 from flipdare.services import get_app_logger
118 if self._log_creator is None:
119 self._log_creator = get_app_logger()
120 return self._log_creator
122 def process_result(self) -> CronResult:
123 """
124 Execute a generic cron processing job.
126 Args:
127 config: CronConfig containing all necessary functions and metadata
129 Returns:
130 CronResult from the stat decorator (which wraps CronResult)
132 """
133 config = self._config
135 # Apply the perf decorator dynamically
136 @cron_decorator(job_type=config.job_type)
137 def _execute() -> CronResult:
138 return self._process_result_items(config)
140 return _execute()
142 def _process_result_items( # noqa: PLR0912, PLR0915
143 self,
144 config: CronConfig[T],
145 ) -> CronResult:
146 """Internal implementation of the cron processing logic."""
147 main_result = AppResult(task_name=config.job_name)
148 passed_ct = 0
149 failed_ct = 0
150 skipped_ct = 0
151 processing_error = False
152 processed: list[CronResultEntry] = []
153 start = TimeUtil.get_current_utc_dt()
155 try:
156 items = config.query_fn()
157 # Check for empty results unless skip_empty_check is True
158 if not config.skip_empty_check and not items:
159 msg = f"No items found for {config.job_name} processing."
160 LOG().info(msg)
161 return CountMetric.empty()
163 item_type = config.job_name.replace("cron_", "").replace("_unprocessed", "")
164 msg = f"Found {len(items)} {item_type}(s) for {config.job_name} processing."
165 LOG().info(msg)
167 for item in items:
168 try:
169 result = config.process_fn(item)
170 doc_id = item.doc_id or NO_DOC_ID
172 if result.is_ok:
173 passed_ct += 1
174 processed.append(
175 CronResultEntry(doc_id, Outcome.OK, f"Processed {item_type} {doc_id}")
176 )
177 continue
178 if result.is_skipped:
179 skipped_ct += 1
180 continue
182 # Handles, Outcome, AppResult and OutputAppResult
183 error: str | None = None
184 match result:
185 case AppResult():
186 match result.outcome:
187 case Outcome.ERROR:
188 error = f"Error processing {item_type} {doc_id}"
189 main_result.merge(result)
190 case Outcome.SKIPPED | Outcome.WARNING:
191 skipped_ct += 1
192 case Outcome.OK:
193 processed.append(
194 CronResultEntry(
195 doc_id, Outcome.OK, f"Processed {item_type} {doc_id}"
196 )
197 )
198 passed_ct += 1
199 case JobResult():
200 match result.app_result.outcome:
201 case Outcome.ERROR:
202 error = f"Error processing {item_type} {doc_id}"
203 main_result.merge(result.app_result)
204 case Outcome.SKIPPED | Outcome.WARNING:
205 skipped_ct += 1
206 case Outcome.OK:
207 processed.append(
208 CronResultEntry(
209 doc_id, Outcome.OK, f"Processed {item_type} {doc_id}"
210 )
211 )
212 passed_ct += 1
213 case Outcome():
214 match result:
215 case Outcome.ERROR:
216 error = f"Error processing {item_type} {doc_id}"
217 main_result.add_error(config.error_type, error)
218 case Outcome.SKIPPED | Outcome.WARNING:
219 skipped_ct += 1
220 case Outcome.OK:
221 processed.append(
222 CronResultEntry(
223 doc_id, Outcome.OK, f"Processed {item_type} {doc_id}"
224 )
225 )
226 passed_ct += 1
228 if error is not None:
229 failed_ct += 1
230 LOG().error(error)
231 processed.append(CronResultEntry(doc_id, Outcome.ERROR, error))
233 except Exception as item_ex:
234 doc_id = getattr(item, "doc_id", NO_DOC_ID) or NO_DOC_ID
235 msg = f"Exception processing {item_type} {doc_id}: {item_ex}"
236 LOG().error(msg)
237 main_result.add_error(config.error_type, msg)
238 processed.append(CronResultEntry(doc_id, Outcome.ERROR, msg))
239 failed_ct += 1
240 processing_error = True
242 except Exception as e:
243 msg = f"Exception during {config.job_name} processing: {e}"
244 LOG().error(msg)
245 main_result.add_error(config.error_type, msg)
246 processing_error = True
248 # Call report function if provided
249 if config.report_fn is not None:
250 try:
251 config.report_fn(config.job_type, processed)
252 except Exception as report_ex:
253 LOG().warning(f"Failed to generate report for {config.job_name}: {report_ex}")
255 end = TimeUtil.get_current_utc_dt()
256 duration = TimeUtil.duration_in_seconds(start, end)
258 msg = (
259 f"{config.job_name} completed: {passed_ct} passed, "
260 f"{failed_ct} failed, {skipped_ct} skipped."
261 )
262 LOG().info(msg)
264 if not processing_error and not main_result.is_error:
265 LOG().info(f"Cron job {config.job_name} completed successfully.")
266 # we have a least one result so return counts
267 return CountMetric(
268 success_ct=passed_ct,
269 failed_ct=failed_ct,
270 skipped_ct=skipped_ct,
271 duration=duration,
272 )
274 # error occurred during processing items
275 LOG().error(f"Cron job {config.job_name} completed with errors\n{main_result.formatted}")
276 self.log_creator.system_error(
277 job_type=config.job_type,
278 message=f"Cron job {config.job_name} encountered errors.",
279 error_code=config.error_type,
280 result=main_result,
281 )
282 return JobResult.from_result(
283 main_result,
284 duration=duration,
285 data={
286 "success_ct": passed_ct,
287 "failed_ct": failed_ct,
288 "skipped_ct": skipped_ct,
289 },
290 )