Coverage for functions \ flipdare \ job \ job_config.py: 85%
169 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-05-08 12:22 +1000
« prev ^ index » next coverage.py v7.13.0, created at 2026-05-08 12:22 +1000
1#!/usr/bin/env python
2# Copyright (c) 2026 Flipdare Pty Ltd. All rights reserved.
3#
4# This file is part of Flipdare's proprietary software and contains
5# confidential and copyrighted material. Unauthorised copying,
6# modification, distribution, or use of this file is strictly
7# prohibited without prior written permission from Flipdare Pty Ltd.
8#
9# This software includes third-party components licensed under MIT,
10# BSD, and Apache 2.0 licences. See THIRD_PARTY_NOTICES for details.
11#
13"""
14Efficient job configuration loader using dataclasses and caching.
15Loads job_config.yaml once and provides fast access to job metadata.
16"""
18from collections.abc import Iterator
19from dataclasses import dataclass
20from functools import lru_cache
21from pathlib import Path
22from typing import IO, Any, Optional
24from flipdare.app_log import LOG
25from flipdare.constants import JOB_CONFIG_PATH
26from flipdare.generated.shared.backend.app_job_group import AppJobGroup
27from flipdare.generated.shared.backend.app_job_type import AppJobType
28from flipdare.job.app_job_schedule import AppJobSchedule
29from flipdare.job.job_config_option import JobConfigOption
30from flipdare.util.yaml_loader import YamlLoader
32__all__ = ["JobConfig"]
35@dataclass(frozen=True)
36class _SavedState:
37 jobs: dict[str, JobConfigOption]
38 triggers: dict[str, JobConfigOption]
39 tasks: dict[str, JobConfigOption]
40 by_interval: dict[AppJobSchedule, list[JobConfigOption]]
41 by_job_group: dict[AppJobGroup, list[JobConfigOption]]
44class JobConfig:
45 """
46 Singleton loader for job configuration with efficient caching.
48 Features:
49 - Loads YAML once and caches in memory
50 - Uses dataclasses for minimal memory overhead
51 - Supports both flat and hierarchical YAML formats
52 - Fast lookups with dict-based indexing
53 """
55 _instance: Optional["JobConfig"] = None
56 _config_path: Path
58 def __init__(self, config_path: Path = JOB_CONFIG_PATH) -> None:
59 LOG().info(f"Loading job configuration from {config_path}...")
61 self._config_path = Path(config_path)
62 self._jobs: dict[str, JobConfigOption] = {}
63 self._triggers: dict[str, JobConfigOption] = {}
64 self._tasks: dict[str, JobConfigOption] = {}
65 self._by_interval: dict[AppJobSchedule, list[JobConfigOption]] = {}
66 self._by_job_group: dict[AppJobGroup, list[JobConfigOption]] = {}
68 self._load_config()
70 @classmethod
71 def instance(cls, config_path: Path = JOB_CONFIG_PATH) -> "JobConfig":
72 """Get singleton instance"""
73 if cls._instance is None:
74 cls._instance = cls(config_path)
75 return cls._instance
77 def try_reload(self, cfg: IO[str]) -> bool:
78 """
79 Reload job configuration from a YAML stream.
81 Snapshots the current state before attempting the reload. On success the
82 LRU caches for ``is_trigger`` / ``is_task`` are invalidated. On failure
83 the previous state is fully restored.
85 Returns:
86 True if the reload succeeded, False otherwise.
88 """
89 # Snapshot current state so we can restore on failure
90 saved = _SavedState(
91 self._jobs.copy(),
92 self._triggers.copy(),
93 self._tasks.copy(),
94 {k: list(v) for k, v in self._by_interval.items()},
95 {k: list(v) for k, v in self._by_job_group.items()},
96 )
97 LOG().info("Attempting to reload JobConfig from provided YAML stream...")
98 try:
99 self._jobs.clear()
100 self._triggers.clear()
101 self._tasks.clear()
102 self._by_interval.clear()
103 self._by_job_group.clear()
104 self._load_config(cfg=cfg)
105 if not self._jobs:
106 msg = "Reloaded config contains no job entries, cant reload."
107 LOG().error(msg)
108 self._reload_from_saved(saved)
109 return False
111 # Invalidate stale caches
112 self.is_trigger.cache_clear()
113 self.is_task.cache_clear()
114 LOG().info(f"JobConfigLoader reloaded: {len(self._jobs)} jobs.")
115 except Exception as e:
116 LOG().error(f"JobConfigLoader reload failed, restoring previous config: {e}")
117 self._reload_from_saved(saved)
118 return False
119 else:
120 # This runs ONLY if the try block succeeded
121 LOG().info(f"JobConfigLoader reloaded: {len(self._jobs)} jobs.")
122 return True
124 def _reload_from_saved(self, saved: _SavedState) -> None:
125 self._jobs = saved.jobs
126 self._triggers = saved.triggers
127 self._tasks = saved.tasks
128 self._by_interval = saved.by_interval
129 self._by_job_group = saved.by_job_group
131 # === Query Methods ===
133 def get(self, job_name: str) -> JobConfigOption | None:
134 """Get job config by name"""
135 return self._jobs.get(job_name)
137 def all(self) -> list[JobConfigOption]:
138 """Get all job configs"""
139 return list(self._jobs.values())
141 def triggers(self) -> list[JobConfigOption]:
142 """Get all trigger jobs"""
143 return list(self._triggers.values())
145 def tasks(self) -> list[JobConfigOption]:
146 """Get all scheduled tasks"""
147 return list(self._tasks.values())
149 def by_interval(self, interval: AppJobSchedule) -> list[JobConfigOption]:
150 """Get all jobs for a specific interval"""
151 return self._by_interval.get(interval, [])
153 def by_job_group(self, app_job_group: AppJobGroup) -> list[JobConfigOption]:
154 """Get all jobs for a specific AppJobGroup"""
155 return self._by_job_group.get(app_job_group, [])
157 def intervals(self) -> list[AppJobSchedule]:
158 """Get all unique intervals used in config"""
159 return list(self._by_interval.keys())
161 def job_groups(self) -> list[AppJobGroup]:
162 """Get all unique job groups referenced in config"""
163 return list(self._by_job_group.keys())
165 # === Convenience Methods ===
167 def get_by_job_type(self, app_job_type: AppJobType) -> JobConfigOption | None:
168 """Get job config by AppJobType enum"""
169 return self.get(app_job_type.value)
171 def get_by_job_group(self, app_job_group: AppJobGroup) -> list[JobConfigOption]:
172 """Get all jobs belonging to a specific AppJobGroup."""
173 result = []
174 for job in self._jobs.values():
175 try:
176 if job.job_group == app_job_group:
177 result.append(job)
178 except ValueError:
179 pass # job has no valid group mapping
180 return result
182 @lru_cache(maxsize=128) # noqa: B019 - global singleton cache, not per-instance
183 def is_trigger(self, job_type: AppJobType) -> bool:
184 """Check if a job is a trigger (cached)"""
185 job_name = job_type.value
186 job = self.get(job_name)
187 return job.is_trigger if job else False
189 @lru_cache(maxsize=128) # noqa: B019 - global singleton cache, not per-instance
190 def is_task(self, job_type: AppJobType) -> bool:
191 """Check if a job is a task (cached)"""
192 job_name = job_type.value
193 job = self.get(job_name)
194 return job.is_task if job else False
196 def validate_enums(self) -> list[str]:
197 """
198 Validate that all jobs have valid enum mappings.
199 Returns (missing_job_types, missing_runtime_config_types)
200 """
201 jobs = self._jobs.values()
203 # 1. Collect missing types/groups from existing jobs
204 missing_job_types = [j.name for j in jobs if j._job_type is None]
206 # 2. Extend with missing AppJobType values (Ruff PERF401 fix)
207 # NOTE: we excluded scheduled jobs, because they have no config
208 # NOTE: and are scheduled by firebase..
209 missing_job_types.extend(
210 [jt.value for jt in AppJobType if not jt.is_scheduled and jt.value not in self._jobs]
211 )
213 return missing_job_types
215 def __len__(self) -> int:
216 """Total number of jobs"""
217 return len(self._jobs)
219 def __contains__(self, job_name: str) -> bool:
220 """Check if job exists"""
221 return job_name in self._jobs
223 def __iter__(self) -> Iterator[JobConfigOption]:
224 """Iterate over all jobs"""
225 return iter(self._jobs.values())
227 #
228 # internal
229 #
231 def _load_config(self, cfg: IO[str] | None = None) -> None:
232 """Load and parse YAML configuration"""
233 source = cfg or Path(self._config_path)
234 yaml_loader = YamlLoader(source=source)
235 data = yaml_loader.load()
237 # Handle hierarchical format (triggers/tasks sections)
238 if "triggers" in data or "tasks" in data:
239 self._load_hierarchical(data)
240 else:
241 # Handle flat format (original structure)
242 self._load_flat(data)
244 # Build indexes for fast lookups
245 self._build_indexes()
247 def _create_job_config(self, name: str, config_dict: dict[str, Any]) -> JobConfigOption:
248 # --- AppJobType (derived from the job name) ---
249 try:
250 app_job_type = AppJobType(name)
251 except Exception as ex:
252 valid = [e.value for e in AppJobType]
253 msg = (
254 f"Job '{name}' has no AppJobType mapping. "
255 f"Add '{name}' to AppJobType enum. Valid values: {valid}"
256 )
257 raise ValueError(msg) from ex
259 # --- JobInterval (required YAML field 'schedule') ---
260 schedule_str: str | None = config_dict.get("schedule")
261 if not schedule_str:
262 msg = f"Job '{name}' is missing required 'schedule' field in job_config.yaml."
263 raise ValueError(msg)
264 try:
265 job_schedule = AppJobSchedule.from_string(schedule_str)
266 except (ValueError, AttributeError) as ex:
267 valid_intervals = [e.value for e in AppJobSchedule]
268 msg = (
269 f"Job '{name}' has unknown schedule '{schedule_str}'. "
270 f"Valid values: {valid_intervals}"
271 )
272 raise ValueError(msg) from ex
274 # Strip loader-managed keys; remaining fields pass through to JobConfig
275 config_copy = {
276 k: v for k, v in config_dict.items() if k not in ("app_job_group", "schedule")
277 }
279 return JobConfigOption(
280 name=name,
281 _job_type=app_job_type,
282 _schedule=job_schedule,
283 **config_copy,
284 )
286 def _load_hierarchical(self, data: dict[str, Any]) -> None:
287 """Load hierarchical YAML format with triggers/crons sections"""
288 # Load triggers
289 for trigger_name, trigger_config in data.get("triggers", {}).items():
290 job = self._create_job_config(trigger_name, trigger_config)
291 self._jobs[trigger_name] = job
292 self._triggers[trigger_name] = job
294 # Load tasks
295 for task_name, task_config in data.get("tasks", {}).items():
296 job = self._create_job_config(task_name, task_config)
297 self._jobs[task_name] = job
298 self._tasks[task_name] = job
300 def _load_flat(self, data: dict[str, Any]) -> None:
301 """Load flat YAML format (one entry per job)"""
302 for job_name, job_config in data.items():
303 if job_name.startswith("_"): # Skip template definitions
304 continue
306 job = self._create_job_config(job_name, job_config)
307 self._jobs[job_name] = job
309 if job.is_trigger:
310 self._triggers[job_name] = job
311 else:
312 self._tasks[job_name] = job
314 def _build_indexes(self) -> None:
315 """Build lookup indexes for efficient queries"""
316 for job in self._jobs.values():
317 # Index by interval
318 interval = job.schedule
319 if interval not in self._by_interval:
320 self._by_interval[interval] = []
321 self._by_interval[interval].append(job)
323 # Index by job group
324 if job.job_group not in self._by_job_group:
325 self._by_job_group[job.job_group] = []
326 self._by_job_group[job.job_group].append(job)