Coverage for functions \ flipdare \ firestore \ backend \ run_config_group_db.py: 84%
45 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-05-08 12:22 +1000
« prev ^ index » next coverage.py v7.13.0, created at 2026-05-08 12:22 +1000
1#!/usr/bin/env python
2# Copyright (c) 2026 Flipdare Pty Ltd. All rights reserved.
3#
4# This file is part of Flipdare's proprietary software and contains
5# confidential and copyrighted material. Unauthorised copying,
6# modification, distribution, or use of this file is strictly
7# prohibited without prior written permission from Flipdare Pty Ltd.
8#
9# This software includes third-party components licensed under MIT,
10# BSD, and Apache 2.0 licences. See THIRD_PARTY_NOTICES for details.
11#
14from google.cloud.firestore import Client as FirestoreClient
15from flipdare.app_log import LOG
16from flipdare.firestore._app_db import AppDb
17from flipdare.firestore.core.db_query import DbQuery, FieldOp, WhereField
18from flipdare.generated.model.backend.run_config_group_model import (
19 RunConfigGroupKeys,
20 RunConfigGroupModel,
21)
22from flipdare.generated.shared.backend.app_job_group import AppJobGroup
23from flipdare.generated.shared.backend.app_job_type import AppJobType
24from flipdare.generated.shared.firestore_collections import FirestoreCollections
25from flipdare.wrapper.backend.run_config_group_wrapper import RunConfigGroupWrapper
27__all__ = ["RunConfigGroupDb"]
29_RUNTIME_GROUP: str = FirestoreCollections.RUNTIME_GROUP.value
30_K = RunConfigGroupKeys
31_OP = FieldOp
34class RunConfigGroupDb(AppDb[RunConfigGroupWrapper, RunConfigGroupModel]):
35 def __init__(self, client: FirestoreClient) -> None:
36 super().__init__(
37 client=client,
38 collection_name=FirestoreCollections.RUNTIME_GROUP,
39 model_class=RunConfigGroupModel,
40 wrapper_class=RunConfigGroupWrapper,
41 )
43 def enable(self, job_group: AppJobGroup) -> None:
44 """Enable all jobs in *job_group*."""
45 LOG().info(f"Enabling group '{job_group.value}'...")
46 self._set_group_config(job_group=job_group, enabled=True)
48 def disable(self, job_group: AppJobGroup) -> None:
49 """Disable all jobs in *job_group*."""
50 LOG().info(f"Disabling group '{job_group.value}'...")
51 self._set_group_config(job_group=job_group, enabled=False)
53 def get_group_config(self, job_group: AppJobGroup) -> RunConfigGroupWrapper | None:
54 """Return the group-level enable/disable config, or ``None`` if not yet set."""
55 doc_id = job_group.value
56 data = self._get(doc_id)
57 if data is None:
58 return None
59 return RunConfigGroupWrapper.from_dict(data=data)
61 def get_config(
62 self,
63 job_group: AppJobGroup,
64 job_type: AppJobType,
65 ) -> RunConfigGroupWrapper | None:
66 """Return the running-job marker for a specific ``(job_group, job_type)`` pair."""
67 LOG().debug(f"Getting running-job config for '{job_group.value}/{job_type.value}'...")
68 query = DbQuery.and_(
69 [
70 WhereField[_K](_K.JOB_GROUP, _OP.EQUAL, job_group.value),
71 WhereField[_K](_K.JOB_TYPE, _OP.EQUAL, job_type.value),
72 ],
73 limit=1,
74 )
75 results = query.get_query(self.client, _RUNTIME_GROUP).get()
76 if not results:
77 LOG().debug(f"No running-job config found for '{job_group.value}/{job_type.value}'")
78 return None
79 return self._cvt_snap_to_model(results[0])
81 def _set_group_config(self, job_group: AppJobGroup, enabled: bool) -> RunConfigGroupWrapper:
82 """Upsert the group-level config using a predictable document ID."""
83 doc_id = job_group.value
84 model = RunConfigGroupModel(
85 id=doc_id,
86 enabled=enabled,
87 job_group=job_group,
88 )
89 data = model.to_dict()
90 data["id"] = doc_id # Ensure the ID is included in the data for the wrapper
92 cfg = RunConfigGroupWrapper.from_dict(data=data)
94 payload = cfg.to_dict()
95 self.client.collection(_RUNTIME_GROUP).document(doc_id).set(payload)
96 return RunConfigGroupWrapper.from_dict(data={"id": doc_id, **payload})