Coverage for functions \ flipdare \ firestore \ backend \ app_job_db.py: 95%
44 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-05-08 12:22 +1000
« prev ^ index » next coverage.py v7.13.0, created at 2026-05-08 12:22 +1000
1#!/usr/bin/env python
2# Copyright (c) 2026 Flipdare Pty Ltd. All rights reserved.
3#
4# This file is part of Flipdare's proprietary software and contains
5# confidential and copyrighted material. Unauthorised copying,
6# modification, distribution, or use of this file is strictly
7# prohibited without prior written permission from Flipdare Pty Ltd.
8#
9# This software includes third-party components licensed under MIT,
10# BSD, and Apache 2.0 licences. See THIRD_PARTY_NOTICES for details.
11#
13from typing import Any
14from google.cloud.firestore import Client as FirestoreClient
15from flipdare.app_log import LOG
16from flipdare.constants import IS_TRACE
17from flipdare.firestore._app_db import AppDb
18from flipdare.firestore.core.db_query import DbQuery, FieldOp, WhereField
19from flipdare.generated import AppJobModel
20from flipdare.generated.model.backend.app_job_model import AppJobInternalKeys, AppJobKeys
21from flipdare.generated.shared.backend.app_job_type import AppJobType
22from flipdare.generated.shared.firestore_collections import FirestoreCollections
23from flipdare.util.time_util import FirestoreTime
24from flipdare.wrapper import AppJobWrapper
26__all__ = ["AppJobDb"]
28_JOB: str = FirestoreCollections.APP_JOB.value
30_K = AppJobKeys
31_I = AppJobInternalKeys
34type _KeyType = AppJobKeys | AppJobInternalKeys # for type checking.
37class AppJobDb(AppDb[AppJobWrapper, AppJobModel]):
38 """Class for future operations."""
40 def __init__(self, client: FirestoreClient) -> None:
41 super().__init__(
42 client=client,
43 collection_name=FirestoreCollections.APP_JOB,
44 model_class=AppJobModel,
45 wrapper_class=AppJobWrapper,
46 )
48 def get_jobs_to_process(self, job_types: list[AppJobType]) -> list[AppJobWrapper]:
49 and_filters = [
50 WhereField[_KeyType](_I.PROCESSED, FieldOp.EQUAL, False),
51 ]
52 or_filters = [
53 WhereField[_KeyType](_K.JOB_TYPE, FieldOp.EQUAL, job_type.value)
54 for job_type in job_types
55 ]
57 if IS_TRACE:
58 LOG().trace(
59 f"Getting jobs to process by type [{job_types}] with and filter:"
60 f"\n\t{and_filters}"
61 f"\n\tor_filters: {or_filters}"
62 )
64 query = DbQuery.complex_and_or_(and_filters, or_filters)
65 results = query.get_query(self.client, _JOB).get()
66 if len(results) == 0:
67 # LOG().debug(f"No jobs found to process for types: [{job_types}]")
68 return []
70 LOG().debug(f"Found {len(results)} jobs to process for types: [{job_types}]")
71 return self._process_results(results)
73 def get_all_jobs_to_process(self) -> list[AppJobWrapper]:
74 query = DbQuery.where(WhereField[_KeyType](_I.PROCESSED, FieldOp.EQUAL, False))
75 results = query.get_query(self.client, _JOB).get()
76 if len(results) == 0:
77 return []
79 return self._process_results(results)
81 def resolve_jobs(self, job_ids: list[str]) -> None:
82 batch = self.client.batch()
83 for job_id in job_ids:
84 doc_ref = self.client.collection(_JOB).document(job_id)
85 data: dict[str, Any] = {
86 _I.PROCESSED.value: True,
87 _I.UPDATED_AT: FirestoreTime.server_timestamp(),
88 }
89 batch.update(doc_ref, data)
90 batch.commit()