Coverage for functions \ flipdare \ firestore \ backend \ app_job_db.py: 95%

44 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-05-08 12:22 +1000

1#!/usr/bin/env python 

2# Copyright (c) 2026 Flipdare Pty Ltd. All rights reserved. 

3# 

4# This file is part of Flipdare's proprietary software and contains 

5# confidential and copyrighted material. Unauthorised copying, 

6# modification, distribution, or use of this file is strictly 

7# prohibited without prior written permission from Flipdare Pty Ltd. 

8# 

9# This software includes third-party components licensed under MIT, 

10# BSD, and Apache 2.0 licences. See THIRD_PARTY_NOTICES for details. 

11# 

12 

13from typing import Any 

14from google.cloud.firestore import Client as FirestoreClient 

15from flipdare.app_log import LOG 

16from flipdare.constants import IS_TRACE 

17from flipdare.firestore._app_db import AppDb 

18from flipdare.firestore.core.db_query import DbQuery, FieldOp, WhereField 

19from flipdare.generated import AppJobModel 

20from flipdare.generated.model.backend.app_job_model import AppJobInternalKeys, AppJobKeys 

21from flipdare.generated.shared.backend.app_job_type import AppJobType 

22from flipdare.generated.shared.firestore_collections import FirestoreCollections 

23from flipdare.util.time_util import FirestoreTime 

24from flipdare.wrapper import AppJobWrapper 

25 

26__all__ = ["AppJobDb"] 

27 

28_JOB: str = FirestoreCollections.APP_JOB.value 

29 

30_K = AppJobKeys 

31_I = AppJobInternalKeys 

32 

33 

34type _KeyType = AppJobKeys | AppJobInternalKeys # for type checking. 

35 

36 

37class AppJobDb(AppDb[AppJobWrapper, AppJobModel]): 

38 """Class for future operations.""" 

39 

40 def __init__(self, client: FirestoreClient) -> None: 

41 super().__init__( 

42 client=client, 

43 collection_name=FirestoreCollections.APP_JOB, 

44 model_class=AppJobModel, 

45 wrapper_class=AppJobWrapper, 

46 ) 

47 

48 def get_jobs_to_process(self, job_types: list[AppJobType]) -> list[AppJobWrapper]: 

49 and_filters = [ 

50 WhereField[_KeyType](_I.PROCESSED, FieldOp.EQUAL, False), 

51 ] 

52 or_filters = [ 

53 WhereField[_KeyType](_K.JOB_TYPE, FieldOp.EQUAL, job_type.value) 

54 for job_type in job_types 

55 ] 

56 

57 if IS_TRACE: 

58 LOG().trace( 

59 f"Getting jobs to process by type [{job_types}] with and filter:" 

60 f"\n\t{and_filters}" 

61 f"\n\tor_filters: {or_filters}" 

62 ) 

63 

64 query = DbQuery.complex_and_or_(and_filters, or_filters) 

65 results = query.get_query(self.client, _JOB).get() 

66 if len(results) == 0: 

67 # LOG().debug(f"No jobs found to process for types: [{job_types}]") 

68 return [] 

69 

70 LOG().debug(f"Found {len(results)} jobs to process for types: [{job_types}]") 

71 return self._process_results(results) 

72 

73 def get_all_jobs_to_process(self) -> list[AppJobWrapper]: 

74 query = DbQuery.where(WhereField[_KeyType](_I.PROCESSED, FieldOp.EQUAL, False)) 

75 results = query.get_query(self.client, _JOB).get() 

76 if len(results) == 0: 

77 return [] 

78 

79 return self._process_results(results) 

80 

81 def resolve_jobs(self, job_ids: list[str]) -> None: 

82 batch = self.client.batch() 

83 for job_id in job_ids: 

84 doc_ref = self.client.collection(_JOB).document(job_id) 

85 data: dict[str, Any] = { 

86 _I.PROCESSED.value: True, 

87 _I.UPDATED_AT: FirestoreTime.server_timestamp(), 

88 } 

89 batch.update(doc_ref, data) 

90 batch.commit()