Coverage for functions \ flipdare \ firestore \ backend \ run_config_job_db.py: 93%

42 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-05-08 12:22 +1000

1#!/usr/bin/env python 

2# Copyright (c) 2026 Flipdare Pty Ltd. All rights reserved. 

3# 

4# This file is part of Flipdare's proprietary software and contains 

5# confidential and copyrighted material. Unauthorised copying, 

6# modification, distribution, or use of this file is strictly 

7# prohibited without prior written permission from Flipdare Pty Ltd. 

8# 

9# This software includes third-party components licensed under MIT, 

10# BSD, and Apache 2.0 licences. See THIRD_PARTY_NOTICES for details. 

11# 

12 

13 

14from google.cloud.firestore import Client as FirestoreClient 

15from flipdare.app_log import LOG 

16from flipdare.firestore._app_db import AppDb 

17from flipdare.firestore.core.db_query import DbQuery, FieldOp, WhereField 

18from flipdare.generated.model.backend.run_config_job_model import ( 

19 RunConfigJobKeys, 

20 RunConfigJobModel, 

21) 

22from flipdare.generated.shared.backend.app_job_type import AppJobType 

23from flipdare.generated.shared.firestore_collections import FirestoreCollections 

24from flipdare.util.time_util import TimeUtil 

25from flipdare.wrapper.backend.run_config_job_wrapper import RunConfigJobWrapper 

26 

27_RUNTIME_CONFIG: str = FirestoreCollections.RUNTIME_JOB.value 

28 

29__all__ = ["RunConfigJobDb"] 

30 

31_K = RunConfigJobKeys 

32_OP = FieldOp 

33 

34 

35class RunConfigJobDb(AppDb[RunConfigJobWrapper, RunConfigJobModel]): 

36 def __init__(self, client: FirestoreClient) -> None: 

37 super().__init__( 

38 client=client, 

39 collection_name=FirestoreCollections.RUNTIME_JOB, 

40 model_class=RunConfigJobModel, 

41 wrapper_class=RunConfigJobWrapper, 

42 ) 

43 

44 def start_job( 

45 self, 

46 job_type: AppJobType, 

47 ) -> RunConfigJobWrapper | None: 

48 """Create a running-job marker so other instances know the job is active.""" 

49 cfg = RunConfigJobModel( 

50 id=None, 

51 job_group=job_type.job_group, 

52 job_type=job_type, 

53 started_at=TimeUtil.get_current_utc_float_time(), 

54 ) 

55 

56 try: 

57 created_data = self._create(cfg.to_dict()) 

58 LOG().info(f"Started job '{job_type.job_group.value}/{job_type.value}'...") 

59 return RunConfigJobWrapper.from_dict(data=created_data) 

60 except Exception as e: 

61 self.log_error( 

62 message=f"Failed to start job '{job_type.job_group.value}/{job_type.value}': {e}", 

63 job_type=job_type, 

64 ) 

65 return None 

66 

67 def is_job_running(self, job_type: AppJobType) -> bool: 

68 """Return ``True`` if a running-job marker exists for this job.""" 

69 return self.get_job(job_type) is not None 

70 

71 def cancel_job(self, job_type: AppJobType) -> None: 

72 """Delete the running-job marker when a job finishes.""" 

73 LOG().info(f"Cancelling job '{job_type.job_group.value}/{job_type.value}'...") 

74 query = DbQuery.and_( 

75 [ 

76 WhereField[_K](_K.JOB_GROUP, _OP.EQUAL, job_type.job_group.value), 

77 WhereField[_K](_K.JOB_TYPE, _OP.EQUAL, job_type.value), 

78 ], 

79 limit=1, 

80 ) 

81 results = query.get_query(self.client, _RUNTIME_CONFIG).get() 

82 if results: 

83 self.delete(results[0].id) 

84 

85 def get_job( 

86 self, 

87 job_type: AppJobType, 

88 ) -> RunConfigJobWrapper | None: 

89 """Return the running-job marker for a specific ``(job_group, job_type)`` pair.""" 

90 job_group = job_type.job_group 

91 LOG().debug(f"Getting running-job config for '{job_group.value}/{job_type.value}'...") 

92 query = DbQuery.and_( 

93 [ 

94 WhereField[_K](_K.JOB_GROUP, _OP.EQUAL, job_group.value), 

95 WhereField[_K](_K.JOB_TYPE, _OP.EQUAL, job_type.value), 

96 ], 

97 limit=1, 

98 ) 

99 results = query.get_query(self.client, _RUNTIME_CONFIG).get() 

100 if not results: 

101 LOG().debug(f"No running-job config found for '{job_group.value}/{job_type.value}'") 

102 return None 

103 return self._cvt_snap_to_model(results[0])