Coverage for functions \ flipdare \ job \ job_admin.py: 100%

0 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-05-08 12:22 +1000

1#!/usr/bin/env python3 

2# Copyright (c) 2026 Flipdare Pty Ltd. All rights reserved. 

3# 

4# This file is part of Flipdare's proprietary software and contains 

5# confidential and copyrighted material. Unauthorised copying, 

6# modification, distribution, or use of this file is strictly 

7# prohibited without prior written permission from Flipdare Pty Ltd. 

8# 

9# This software includes third-party components licensed under MIT, 

10# BSD, and Apache 2.0 licences. See THIRD_PARTY_NOTICES for details. 

11# 

12 

13from __future__ import annotations 

14 

15from typing import TYPE_CHECKING, Any 

16from flipdare.app_config import AppConfig 

17from flipdare.app_log import LOG 

18from flipdare.app_types import DatabaseDict 

19from flipdare.constants import IS_DEBUG 

20from flipdare.core.singleton import Singleton 

21from flipdare.generated.shared.app_error_code import AppErrorCode 

22from flipdare.generated.shared.backend.app_job_type import AppJobType 

23from flipdare.job.app_job_schedule import AppJobSchedule 

24from flipdare.job.cron_validator import CronValidator 

25from flipdare.job.trigger_data import TriggerData, UpdateTriggerData 

26from flipdare.service._error_mixin import ErrorMixin 

27 

28if TYPE_CHECKING: 

29 from flipdare.backend.app_scheduler import AppScheduler 

30 from flipdare.backend.app_logger import AppLogger 

31 from flipdare.backend.job_logger import JobLogger 

32 from flipdare.backend.runtime_config_admin import RuntimeConfigAdmin 

33 

34 

35class JobAdmin(ErrorMixin, Singleton): 

36 def __init__( 

37 self, 

38 runtime_config: RuntimeConfigAdmin | None = None, 

39 app_scheduler: AppScheduler | None = None, 

40 app_config: AppConfig | None = None, 

41 job_logger: JobLogger | None = None, 

42 app_logger: AppLogger | None = None, 

43 ) -> None: 

44 super().__init__() 

45 self._runtime_config = runtime_config 

46 self._app_scheduler = app_scheduler 

47 self._app_config = app_config 

48 self._job_logger = job_logger 

49 self._app_logger = app_logger 

50 

51 @property 

52 def app_scheduler(self) -> AppScheduler: 

53 from flipdare.services import get_app_scheduler 

54 

55 if self._app_scheduler is None: 

56 self._app_scheduler = get_app_scheduler() 

57 return self._app_scheduler 

58 

59 @property 

60 def runtime_config(self) -> RuntimeConfigAdmin: 

61 from flipdare.services import get_runtime_config 

62 

63 if self._runtime_config is None: 

64 self._runtime_config = get_runtime_config() 

65 return self._runtime_config 

66 

67 @property 

68 def app_config(self) -> AppConfig: 

69 from flipdare.app_config import get_app_config 

70 

71 if self._app_config is None: 

72 self._app_config = get_app_config() 

73 return self._app_config 

74 

75 @property 

76 def job_logger(self) -> JobLogger: 

77 from flipdare.services import get_job_logger 

78 

79 if self._job_logger is None: 

80 self._job_logger = get_job_logger() 

81 

82 return self._job_logger 

83 

84 @property 

85 def app_logger(self) -> AppLogger: 

86 from flipdare.services import get_app_logger 

87 

88 if self._app_logger is None: 

89 self._app_logger = get_app_logger() 

90 

91 return self._app_logger 

92 

93 def cron_job(self, cron_name: str, validator: CronValidator, interval: AppJobSchedule) -> None: 

94 if not validator.valid(): 

95 msg = f"Cron job validation failed for {cron_name}" 

96 self.validator_error(message=msg, validator=validator) 

97 return 

98 

99 try: 

100 self.app_scheduler.run(interval=interval) 

101 except Exception as e: 

102 msg = f"Error running cron job {cron_name}: {e}" 

103 self.job_error( 

104 job_type=validator.job_type, 

105 error_code=AppErrorCode.TRIGGER, 

106 message=msg, 

107 error=e, 

108 notify_admin=False, 

109 ) 

110 

111 def trigger_job(self, job_type: AppJobType, validated: TriggerData[Any, Any]) -> None: 

112 """ 

113 Creates a job in the database from trigger data. 

114 

115 Flow: 

116 1. Firestore trigger fires (e.g., user document created/updated) 

117 2. TriggerData validates and creates PersistedWrapper[TModel] from the event 

118 3. This method stores the model as a dict in the jobs collection 

119 4. AppScheduler later processes these jobs, fetching fresh data from Firestore 

120 

121 NOTE: This method checks if a job is running but does not preemptively cancel. 

122 NOTE: That is the responsibility of the scheduler. 

123 

124 FIXME: We should add low/high prority jobs.. 

125 - then when the queue is empty, we can process jobs with fewer changes. 

126 - but still ensure the data is fresh for the users. 

127 

128 Args: 

129 job_type: The type of job (e.g., TR_USER, TR_DARE) 

130 validated: TriggerData containing PersistedWrapper[TModel] from the trigger event 

131 

132 """ 

133 job_name = validated.job_type 

134 valid_result = validated.valid() 

135 if valid_result.is_error: 

136 msg = f"Trigger validation failed for {job_name}: {valid_result.error}" 

137 self.validator_error(message=msg, validator=validated) 

138 return 

139 

140 # NOTE: if we arn't processing jobs, we still add it 

141 # NOTE: and process when we are back online .. 

142 # if not self.runtime_config.is_job_enabled(job_type): 

143 

144 doc_id = validated.doc_id 

145 assert doc_id is not None # narrowing 

146 

147 debug_str = f"[{job_name}, doc_id={doc_id}, job_type={job_type.value}]" 

148 # NOTE: we still need to add the job, even if the job is running 

149 # NOTE: and process in the next cycle.. 

150 # if self.runtime_config.is_job_running(job_type): 

151 

152 try: 

153 if isinstance(validated, UpdateTriggerData): 

154 self._handle_update(doc_id, job_type, validated) 

155 # elif isinstance(validator, TriggerData): 

156 else: 

157 # creation trigger, therefore no need for version check 

158 self.info(f"Created trigger job ({debug_str}).") 

159 model = validated.wrapper 

160 assert model # narrowing, validated above 

161 self.job_logger.create_new( 

162 doc_id, 

163 job_type, 

164 model=model.to_dict(), 

165 ) 

166 

167 except Exception as e: 

168 # these could be superflous, so we log, but dont notify admin 

169 msg = f"Error creating trigger job for {debug_str}: {e}" 

170 self.job_error( 

171 job_type=job_type, 

172 error_code=AppErrorCode.TRIGGER, 

173 doc_id=doc_id, 

174 message=msg, 

175 error=e, 

176 notify_admin=False, 

177 ) 

178 

179 def _handle_update( 

180 self, 

181 doc_id: str, 

182 job_type: AppJobType, 

183 validated: UpdateTriggerData[Any], 

184 ) -> None: 

185 job_name = validated.job_type 

186 

187 debug_str = f"[{job_name}, doc_id={doc_id}, job_type={job_type.value}]" 

188 

189 before = validated.before_wrapper 

190 after = validated.wrapper 

191 

192 assert before # narrowing, validated above 

193 assert after # narrowing, validated above 

194 

195 if not after.version_changed(before): 

196 if IS_DEBUG: 

197 msg = f"No version change detected. Skipping. ({debug_str})." 

198 LOG().debug(msg) 

199 return 

200 

201 score = before.calculate_change_score(after) 

202 threshold = self.app_config.change_score_threshold 

203 if IS_DEBUG: 

204 msg = f"Calculated change score: {score:.4f} (threshold: {threshold}) for {debug_str}" 

205 LOG().debug(msg) 

206 

207 if score < threshold: 

208 if IS_DEBUG: 

209 msg = f"Insignificant change. Skipping ({debug_str})." 

210 LOG().debug(msg) 

211 return 

212 

213 if IS_DEBUG: 

214 msg = ( 

215 f"Update Trigger: {debug_str} - Change score={score:.4f} Threshold={threshold}\n" 

216 f"-- BEFORE -- \n" 

217 f"{before.to_dict()}\n" 

218 f"-- AFTER -- \n" 

219 f"{after.to_dict()}" 

220 ) 

221 LOG().debug(msg) 

222 

223 self.job_logger.create_update( 

224 doc_id, 

225 job_type, 

226 before_model=before.to_dict(), 

227 after_model=after.to_dict(), 

228 updated_data=validated.updates(), 

229 ) 

230 

231 def validator_error( 

232 self, 

233 message: str, 

234 validator: CronValidator | TriggerData[Any, Any], 

235 ) -> None: # pragma: no cover 

236 LOG().error(message) 

237 job_type = validator.job_type 

238 self.app_logger.validation_error( 

239 job_type=job_type, 

240 error_code=AppErrorCode.VALIDATION, 

241 validator=validator, 

242 message=message, 

243 ) 

244 

245 def info(self, message: str, data: DatabaseDict | None = None) -> None: 

246 LOG().info(message) 

247 self.app_logger.info(message=message, data=data)