Coverage for functions \ flipdare \ service \ user_service.py: 44%

165 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-05-08 12:22 +1000

1#!/usr/bin/env python 

2# Copyright (c) 2026 Flipdare Pty Ltd. All rights reserved. 

3# 

4# This file is part of Flipdare's proprietary software and contains 

5# confidential and copyrighted material. Unauthorised copying, 

6# modification, distribution, or use of this file is strictly 

7# prohibited without prior written permission from Flipdare Pty Ltd. 

8# 

9# This software includes third-party components licensed under MIT, 

10# BSD, and Apache 2.0 licences. See THIRD_PARTY_NOTICES for details. 

11# 

12 

13from __future__ import annotations 

14from typing import TYPE_CHECKING, Any 

15 

16from flipdare.service._service_provider import ServiceProvider 

17from flipdare.service.processor.user_processor import UserProcessor 

18from flipdare.app_log import LOG 

19from flipdare.constants import IS_DEBUG 

20from flipdare.result.app_result import AppResult 

21from flipdare.core.cron_decorator import cron_decorator 

22from flipdare.core.job_type_decorator import job_type_decorator 

23from flipdare.result.job_result import JobResult 

24from flipdare.core.trigger_decorator import trigger_decorator 

25from flipdare.generated import AppErrorCode, AppJobType 

26from flipdare.result.outcome import Outcome 

27from flipdare.generated.shared.firestore_collections import FirestoreCollections 

28from flipdare.wrapper import AppJobWrapper, UserWrapper 

29from flipdare.mailer.user.user_summary_email import UserSummaryEmail 

30from flipdare.generated.model.backend.metric.count_metric import CountMetric 

31from flipdare.service.core.cron_processor import CronConfig, CronProcessor 

32from flipdare.app_types import CronResult 

33from flipdare.util.time_util import TimeUtil 

34from flipdare.wrapper.backend.user_summary_wrapper import UserSummaryWrapper 

35 

36if TYPE_CHECKING: 

37 from flipdare.manager.db_manager import DbManager 

38 from flipdare.manager.backend_manager import BackendManager 

39 

40__all__ = ["UserService"] 

41 

42 

43class UserService(ServiceProvider): 

44 def __init__( 

45 self, 

46 db_manager: DbManager | None = None, 

47 backend_manager: BackendManager | None = None, 

48 ) -> None: 

49 super().__init__( 

50 backend_manager=backend_manager, 

51 db_manager=db_manager, 

52 ) 

53 self._user_processor: UserProcessor | None = None 

54 

55 @property 

56 def user_processor(self) -> UserProcessor: 

57 if self._user_processor is None: 

58 self._user_processor = UserProcessor( 

59 bucket=self.storage_bucket, 

60 user_bridge=self.user_bridge, 

61 indexer_service=self.indexer, 

62 invite_db=self.invite_db, 

63 ) 

64 return self._user_processor 

65 

66 @cron_decorator(job_type=AppJobType.CR_USER_UNPROCESSED) 

67 def cron_user_unprocessed(self) -> CronResult: 

68 user_service = self.user_service 

69 user_db = self.user_db 

70 

71 config = CronConfig( 

72 job_type=AppJobType.CR_USER_UNPROCESSED, 

73 job_name="cron_user_unprocessed", 

74 query_fn=lambda: [ 

75 u for u in user_db.get_recent_unprocessed() if not u.processing_complete 

76 ], 

77 process_fn=lambda user: user_service.user_processor.process_user( 

78 user, is_update=False 

79 ), 

80 ) 

81 return CronProcessor(config).process_result() 

82 

83 @cron_decorator(job_type=AppJobType.CR_USER_DAILY_SUMMARY) 

84 def cron_user_summary(self) -> CronResult: 

85 summary_db = self.summary_db 

86 updated_summaries: list[UserSummaryWrapper] = [] 

87 

88 main_result = AppResult(task_name="cron_user_summary") 

89 failed_ct = 0 

90 success_ct = 0 

91 start = TimeUtil.get_current_utc_dt() 

92 

93 summaries = summary_db.get_user_reports() 

94 for summary in summaries: 

95 summary_id = summary.doc_id 

96 user_id = summary.uid 

97 user_result = self.user_bridge.get(user_id) 

98 if user_result.is_error: 

99 msg = f"No email for userID {user_id},summaryID {summary_id}" 

100 main_result.add_error( 

101 AppErrorCode.NOT_FOUND, 

102 msg, 

103 extra=summary.to_json_dict(), 

104 ) 

105 failed_ct += 1 

106 continue 

107 

108 user = user_result.generated 

109 assert user is not None # narrowing 

110 

111 debug_str = f"user={user_id}, summary_id={summary_id}" 

112 LOG().info(f"Processing daily report for {debug_str}.") 

113 

114 result = self._send_daily_summary(summary=summary, user=user) 

115 if result.is_ok: 

116 success_ct += 1 

117 summary.summary_sent = True 

118 updated_summaries.append(summary) 

119 else: 

120 failed_ct += 1 

121 continue 

122 

123 msg = ( 

124 f"Succeeded summaries: {success_ct}, Failed summaries: {failed_ct} " 

125 f"updates={len(updated_summaries)}" 

126 ) 

127 LOG().info(msg) 

128 ct = summary_db.batch_update(updated_summaries) 

129 

130 if ct != len(updated_summaries): 

131 cause = f"Failed to resolve user summaries:\n{msg}" 

132 main_result.add_error(AppErrorCode.USER_SUMMARY, cause) 

133 

134 end = TimeUtil.get_current_utc_dt() 

135 duration = TimeUtil.duration_in_seconds(start, end) 

136 

137 if main_result.is_error: 

138 return JobResult.from_result( 

139 result=main_result, 

140 duration=duration, 

141 ) 

142 

143 return CountMetric( 

144 success_ct=success_ct, 

145 failed_ct=failed_ct, 

146 skipped_ct=0, 

147 duration=duration, 

148 ) 

149 

150 def _send_daily_summary(self, summary: UserSummaryWrapper, user: UserWrapper) -> AppResult: 

151 summary_db = self.summary_db 

152 storage_util = self.storage_util 

153 summary_id = summary.doc_id 

154 assert summary_id # narrowing 

155 

156 user_email = user.email 

157 user_tz_str = user.tz_str or "UTC" 

158 

159 debug_str = f"user={user_email}, summary_id={summary_id}" 

160 result: AppResult = AppResult( 

161 doc_id=summary_id, 

162 task_name=f"SendDailySummary for {debug_str}", 

163 ) 

164 

165 summary_entries = summary_db.get_report_entries(parent_id=summary_id) 

166 if not summary_entries or len(summary_entries) == 0: 

167 LOG().info(f"No summary entries found for {debug_str}, resolving.") 

168 return result 

169 

170 daily_summary = UserSummaryEmail( 

171 summary=summary, 

172 summaries=UserSummaryEmail.build_summaries(summary_entries), 

173 storage_util=storage_util, 

174 user_tz_str=user_tz_str, 

175 ) 

176 

177 try: 

178 daily_summary.validate() 

179 except Exception as ex: 

180 msg = f"Daily summary email for {debug_str} is invalid: {ex}" 

181 result.add_error(AppErrorCode.EMAIL_TEMPLATE, msg, extra={"exception": str(ex)}) 

182 return result 

183 

184 try: 

185 self.user_mailer._send_raw( 

186 email=user_email, 

187 subject=daily_summary.subject, 

188 html_str=daily_summary.render_html(), 

189 text_str=daily_summary.render_text(), 

190 should_minify=True, # should minify user emails, faster! 

191 no_reply=True, 

192 ) 

193 LOG().info(f"Sent daily summary email for {debug_str}.") 

194 return result 

195 except Exception as ex: 

196 msg = f"Failed to send daily summary email to {debug_str}: {ex}" 

197 result.add_error(AppErrorCode.SERVER_EX, msg, extra={"exception": str(ex)}) 

198 return result 

199 

200 @job_type_decorator(AppJobType.TR_USER) 

201 @trigger_decorator(job_type=AppJobType.TR_USER, collection=FirestoreCollections.USER) 

202 def trigger_user(self, job: AppJobWrapper) -> JobResult[Any]: 

203 user_id = job.obj_id 

204 job_id = job.doc_id 

205 is_update = job.has_changes 

206 

207 main_result = AppResult[UserWrapper](doc_id=user_id) 

208 try: 

209 if IS_DEBUG: 

210 LOG().debug(f"Processing new user job {job_id} for user ID: {user_id}") 

211 

212 result = self.user_bridge.get(user_id) 

213 if result.is_error: 

214 main_result.merge(result) 

215 return JobResult.from_result( 

216 result=main_result, 

217 doc_id=user_id, 

218 data=job.to_json_dict(), 

219 ) 

220 

221 user = result.generated 

222 assert user is not None # NO TE: type narrowing 

223 

224 invite_result = self._check_user_invited(user, is_update) 

225 if invite_result.is_error: 

226 # just log a warning and continue 

227 cause = f"Failed to process invite for user {user_id}: {invite_result.formatted}" 

228 LOG().warning(cause) 

229 main_result.add_warning(cause) 

230 

231 process_ok = self.user_processor.process_user(user, is_update=is_update) 

232 if process_ok.is_error: 

233 main_result.add_error( 

234 AppErrorCode.CONTENT, 

235 f"User processing failed for user ID {user_id}.", 

236 extra=user.to_json_dict(), 

237 ) 

238 return JobResult.from_result( 

239 result=main_result, 

240 doc_id=user_id, 

241 data=job.to_json_dict(), 

242 ) 

243 if process_ok.is_skipped: 

244 msg = f"User processing skipped for user ID {user_id}." 

245 LOG().info(msg) 

246 return JobResult.skip_doc(doc_id=user_id, message=msg) 

247 

248 return JobResult.ok( 

249 message=f"User {user_id} processed successfully.", 

250 doc_id=user_id, 

251 ) 

252 

253 except Exception as error: 

254 cause = f"Error processing new dare job ID {job_id}: {error}" 

255 main_result.add_error(AppErrorCode.UNEXPECTED_CODE_PATH, cause) 

256 return JobResult.from_result( 

257 result=main_result, 

258 doc_id=user_id, 

259 data=job.to_json_dict(), 

260 ) 

261 

262 def _check_user_invited( 

263 self, 

264 user: UserWrapper, 

265 is_update: bool, 

266 ) -> AppResult[UserWrapper]: 

267 user_id = user.doc_id 

268 main_result: AppResult[UserWrapper] = AppResult(doc_id=user_id) 

269 

270 if user.invite_id is None: 

271 msg = f"User ID {user_id} has no inviteId, skipping invite processing." 

272 

273 if IS_DEBUG: 

274 LOG().debug(msg) 

275 

276 return AppResult[UserWrapper].skip(doc_id=user_id, message=msg) 

277 if is_update: 

278 msg = f"User ID {user_id} is an update, skipping invite processing." 

279 

280 if IS_DEBUG: 

281 LOG().debug(msg) 

282 

283 return AppResult[UserWrapper].ok(doc_id=user_id, message=msg) 

284 

285 invite_result = self.invite_bridge.get(user.invite_id) 

286 if invite_result.is_error: 

287 main_result.merge(invite_result) 

288 return main_result 

289 

290 invite = invite_result.generated 

291 assert invite # narrowing 

292 result_value = self.friend_service.trigger_invite_signup(invite) 

293 if result_value != Outcome.OK: 

294 cause = ( 

295 f"Failed to process invited user signup for user ID {user_id} " 

296 f"with invite ID {user.invite_id}" 

297 ) 

298 main_result.add_error(AppErrorCode.UNEXPECTED_CODE_PATH, cause) 

299 

300 if main_result.is_error: 

301 return main_result 

302 

303 return AppResult[UserWrapper].ok( 

304 doc_id=user_id, 

305 message="User invite processed successfully.", 

306 )