Coverage for functions \ flipdare \ firestore \ pledge_db.py: 32%

222 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-05-08 12:22 +1000

1#!/usr/bin/env python 

2# Copyright (c) 2026 Flipdare Pty Ltd. All rights reserved. 

3# 

4# This file is part of Flipdare's proprietary software and contains 

5# confidential and copyrighted material. Unauthorised copying, 

6# modification, distribution, or use of this file is strictly 

7# prohibited without prior written permission from Flipdare Pty Ltd. 

8# 

9# This software includes third-party components licensed under MIT, 

10# BSD, and Apache 2.0 licences. See THIRD_PARTY_NOTICES for details. 

11# 

12from typing import Any 

13from google.cloud.firestore import Client as FirestoreClient, FieldFilter 

14from flipdare.app_log import LOG 

15from flipdare.constants import IS_DEBUG, PAYMENT_MAX_RETRIES 

16from flipdare.error.app_error import DatabaseError 

17from flipdare.firestore._app_db import AppDb 

18from flipdare.firestore._app_sub_db import AppSubDb 

19from flipdare.firestore.core.db_query import DbQuery, WhereField, FieldOp 

20from flipdare.firestore.core.pledge_event_transaction import PledgeEventTransaction 

21from flipdare.generated.model.payment.payment_event_model import PaymentEventModel 

22from flipdare.generated.model.payment.payment_model import PaymentKeys 

23from flipdare.generated.model.payment.payment_result_model import PaymentResultModel 

24from flipdare.generated.model.payment.payment_schedule_model import PaymentScheduleKeys 

25from flipdare.generated.model.payment.pledge_model import ( 

26 PledgeInternalKeys, 

27 PledgeKeys, 

28 PledgeModel, 

29) 

30from flipdare.generated.shared.app_error_code import AppErrorCode 

31from flipdare.generated.shared.firestore_collections import FirestoreCollections 

32from flipdare.generated.shared.model.pledge_status import PledgeStatus 

33from flipdare.wrapper.payment.payment_event_wrapper import PaymentEventWrapper 

34from flipdare.wrapper.payment.pledge_wrapper import PledgeWrapper 

35from google.cloud.firestore import And, Or 

36from collections import defaultdict 

37from datetime import datetime 

38from flipdare.analysis.data.nested.time_series_payment_data import ( 

39 PaymentStat, 

40 TimeSeriesPaymentData, 

41) 

42from flipdare.firestore.core.collection_stat_query import CollectionStatQuery 

43from flipdare.generated.shared.backend.app_job_type import AppJobType 

44from flipdare.generated.shared.payment.payment_status import PaymentStatus 

45from flipdare.util.time_util import TimeUtil 

46 

47_PLEDGE: str = FirestoreCollections.PLEDGE.value 

48 

49__all__ = ["PledgeDb"] 

50 

51_PledgeK = PledgeKeys 

52_OP = FieldOp 

53_I = PledgeInternalKeys 

54 

55 

56class PledgeDb(AppDb[PledgeWrapper, PledgeModel]): 

57 """Class for managing pledge-related database operations.""" 

58 

59 def __init__(self, client: FirestoreClient) -> None: 

60 super().__init__( 

61 client=client, 

62 collection_name=FirestoreCollections.PLEDGE, 

63 model_class=PledgeModel, 

64 wrapper_class=PledgeWrapper, 

65 ) 

66 

67 self.events = AppSubDb[PaymentEventWrapper, PaymentEventModel]( 

68 client=client, 

69 collection_name=FirestoreCollections.PLEDGE, 

70 sub_collection_name=FirestoreCollections.PLEDGE_PAYMENT_EVENTS, 

71 wrapper_class=PaymentEventWrapper, 

72 model_class=PaymentEventModel, 

73 ) 

74 

75 def get_payment_events(self, pledge_id: str) -> list[PaymentEventWrapper]: 

76 """Get all payment events for a pledge.""" 

77 if IS_DEBUG: 

78 LOG().debug(f"Getting payment events for pledge: {pledge_id}") 

79 

80 return self.events.get_all_sub(pledge_id) 

81 

82 def add_payment_event( 

83 self, 

84 pledge: PledgeWrapper, 

85 event: PaymentEventModel, 

86 payment_status: PaymentStatus, 

87 result: PaymentResultModel, 

88 ) -> PaymentEventWrapper: 

89 """Add a payment event to a pledge.""" 

90 pledge_id = pledge.doc_id 

91 if IS_DEBUG: 

92 LOG().debug(f"Adding payment event for pledge: {pledge_id}, event: {event}") 

93 

94 trans = PledgeEventTransaction(self.events) 

95 return trans.add_event( 

96 pledge=pledge, 

97 event=event, 

98 payment_status=payment_status, 

99 result=result, 

100 ) 

101 

102 def get_pledges_for_dare(self, dare_id: str) -> list[PledgeWrapper]: 

103 """Get all pledges for a specific dare from Firestore.""" 

104 query = self.client.collection(_PLEDGE).where( 

105 filter=WhereField[_PledgeK](_PledgeK.DARE_ID, _OP.EQUAL, dare_id).filter, 

106 ) 

107 results = query.get() 

108 if not results or len(results) == 0: 

109 return [] 

110 

111 entries = [ 

112 pledge for doc in results if (pledge := self._cvt_snap_to_model(doc)) is not None 

113 ] 

114 if IS_DEBUG: 

115 LOG().debug(f"Retrieved {len(entries)} pledges for dare {dare_id}.") 

116 

117 return entries 

118 

119 def get_by_payment_intent_id(self, payment_intent_id: str) -> PledgeWrapper | None: 

120 """Get a pledge by its associated Stripe payment intent ID.""" 

121 query = self.client.collection(_PLEDGE).where( 

122 filter=WhereField[Any]( 

123 _Nested.payment_key(PaymentKeys.PAYMENT_INTENT_ID), _OP.EQUAL, payment_intent_id 

124 ).filter, 

125 ) 

126 results = query.get() 

127 if not results or len(results) == 0: 

128 return None 

129 

130 # Assuming payment_intent_id is unique, we can return the first match 

131 for doc in results: 

132 pledge = self._cvt_snap_to_model(doc) 

133 if pledge is not None: 

134 if IS_DEBUG: 

135 LOG().debug( 

136 f"Found pledge with payment_intent_id {payment_intent_id}: {pledge.doc_id}" 

137 ) 

138 return pledge 

139 

140 if IS_DEBUG: 

141 LOG().debug(f"No pledge found with payment_intent_id {payment_intent_id}.") 

142 return None 

143 

144 # ======================================================================== 

145 # CRONS 

146 # ======================================================================== 

147 

148 def get_pledges_to_reauthorize(self, expire_in_hours: int = 24) -> list[PledgeWrapper]: 

149 """ 

150 Gets charges that are expiring within the next 24 hours and have not yet been captured. 

151 These intents must be refreshed to prevent them from expiring and being automatically cancelled by Stripe. 

152 """ 

153 now = TimeUtil.get_current_utc_dt() 

154 expire_in = TimeUtil.get_utc_time_future_hours(now, expire_in_hours) 

155 

156 # stripe stores expires_at as epoch ints, so we need to convert our datetime to epoch seconds for the query 

157 expire_in_epoch = TimeUtil.dt_to_simple_epoch(expire_in) 

158 

159 if IS_DEBUG: 

160 msg = ( 

161 f"Getting captured charges that are expiring on or before {TimeUtil.formatted_dt(expire_in)} ({expire_in_epoch} sec)" 

162 f" (in the next {expire_in_hours} hours) and have not yet been captured" 

163 ) 

164 LOG().debug(msg) 

165 

166 try: 

167 and_fields = [ 

168 WhereField[Any]( 

169 _Nested.payment_key(PaymentKeys.CAPTURE_BEFORE), 

170 _OP.LESS_THAN_OR_EQUAL, 

171 expire_in_epoch, 

172 ), 

173 WhereField[Any]( 

174 _Nested.payment_key(PaymentKeys.STATUS), 

175 _OP.EQUAL, 

176 PaymentStatus.HOLD.value, 

177 ), 

178 ] 

179 query = DbQuery.and_(and_fields) 

180 

181 charge_docs = query.get_query(self.client, _PLEDGE).stream() 

182 charges = [ 

183 charge 

184 for doc in charge_docs 

185 if (charge := self._cvt_snap_to_model(doc)) is not None 

186 ] 

187 if IS_DEBUG: 

188 LOG().debug(f"Retrieved {len(charges)} expiring charges.") 

189 return charges 

190 except Exception as e: 

191 msg = f"Failed to get expiring charges: {e}" 

192 raise DatabaseError( 

193 msg, 

194 error_code=AppErrorCode.DATABASE, 

195 collection_name=_PLEDGE, 

196 document_id=None, 

197 ) from e 

198 

199 def get_pledges_to_capture(self) -> list[PledgeWrapper]: 

200 """ 

201 Get pledges that need to be captured. 

202 """ 

203 return self._get_pledges( 

204 PaymentStatus.CAPTURE, _Nested.schedule_key(PaymentScheduleKeys.CAPTURE_ON) 

205 ) 

206 

207 def get_pledges_to_transfer(self) -> list[PledgeWrapper]: 

208 """ 

209 Get pledges that need to be transferred. 

210 """ 

211 return self._get_pledges( 

212 PaymentStatus.TRANSFER, _Nested.schedule_key(PaymentScheduleKeys.TRANSFER_ON) 

213 ) 

214 

215 def get_pledges_to_refund(self) -> list[PledgeWrapper]: 

216 """ 

217 Get pledges that need to be refunded. 

218 """ 

219 return self._get_pledges( 

220 PaymentStatus.REFUND, _Nested.schedule_key(PaymentScheduleKeys.REFUND_ON) 

221 ) 

222 

223 def _get_pledges( 

224 self, 

225 status: PaymentStatus, 

226 capture_key: Any, 

227 ) -> list[PledgeWrapper]: 

228 msg = f"Getting captured pledges with status {status} and non-null {capture_key} for processing" 

229 LOG().info(msg) 

230 

231 try: 

232 and_fields = [ 

233 WhereField[_PledgeK](_PledgeK.STATUS, _OP.EQUAL, status), 

234 WhereField[_PledgeK](capture_key, _OP.NOT_EQUAL, None), 

235 ] 

236 query = DbQuery.and_(and_fields) 

237 

238 charge_docs = query.get_query(self.client, _PLEDGE).stream() 

239 charges = [ 

240 charge 

241 for doc in charge_docs 

242 if (charge := self._cvt_snap_to_model(doc)) is not None 

243 ] 

244 if IS_DEBUG: 

245 LOG().debug(f"Retrieved {len(charges)} charges ready for {status}.") 

246 return charges 

247 except Exception as e: 

248 msg = f"Failed to get pledges ready for {status}: {e}" 

249 raise DatabaseError( 

250 msg, 

251 error_code=AppErrorCode.DATABASE, 

252 collection_name=_PLEDGE, 

253 document_id=None, 

254 ) from e 

255 

256 # ======================================================================== 

257 # REPORTS 

258 # ======================================================================== 

259 

260 def get_unprocessed_payments(self) -> list[PledgeWrapper]: 

261 """ 

262 This finds pledges that require: 

263 1. "processing" 

264 2. Dont have a "StripeCharge" but are not in an error state (since we want to retry those) 

265 

266 NOTE: we dont use a date range to ensure we get all unprocessed pledges, 

267 NOTE: but we could consider adding one in the future if performance becomes an issue. 

268 

269 """ 

270 status_filter = [ 

271 FieldFilter(_PledgeK.STATUS.value, _OP.EQUAL.value, PledgeStatus.PROCESSING.value), 

272 FieldFilter(_I.ERROR_COUNT.value, _OP.LESS_THAN.value, PAYMENT_MAX_RETRIES), 

273 ] 

274 

275 statuses = [ 

276 PledgeStatus.REFUND_ERROR.value, 

277 PledgeStatus.ERROR.value, 

278 ] 

279 

280 and_filter = [ 

281 FieldFilter(_PledgeK.STATUS.value, _OP.NOT_IN.value, statuses), 

282 FieldFilter(_Nested.payment_key(PaymentKeys.LAST_EVENT), _OP.NOT_EQUAL.value, None), 

283 ] 

284 try: 

285 query = DbQuery(filter_value=And([*status_filter, Or(And(and_filter))])) # type: ignore[arg-type] 

286 results = query.get_query(self.client, _PLEDGE).stream() 

287 

288 entries = [ 

289 pledge for doc in results if (pledge := self._cvt_snap_to_model(doc)) is not None 

290 ] 

291 if IS_DEBUG: 

292 LOG().debug(f"Retrieved {len(entries)} pledges ready for payment processing.") 

293 

294 return entries 

295 

296 except Exception as e: 

297 msg = f"Failed to get pledges ready for payment processing: {e}" 

298 raise DatabaseError( 

299 message=msg, 

300 error_code=AppErrorCode.DATABASE, 

301 collection_name=self.collection_name, 

302 ) from e 

303 

304 def get_unprocessed_refunds(self) -> list[PledgeWrapper]: 

305 """ 

306 This finds pledges that require: 

307 1. "REFUND_REQUESTED" or "REFUND_ERROR" 

308 

309 NOTE: we dont use a date range to ensure we get all unprocessed refunds. 

310 NOTE: BECAUSE NOT PROCESSING REFUNDS CAN COST US MONEY. 

311 NOTE: via CHARGE_BACK or STRIPE automatically retrying refunds that errored. 

312 """ 

313 statuses = [ 

314 PledgeStatus.REFUND_REQUESTED.value, 

315 PledgeStatus.REFUND_ERROR.value, 

316 ] 

317 

318 where_filter = [ 

319 WhereField[Any](_PledgeK.STATUS, _OP.IN, statuses), 

320 WhereField[Any](_I.ERROR_COUNT, _OP.LESS_THAN, PAYMENT_MAX_RETRIES), 

321 ] 

322 try: 

323 query = DbQuery.and_(where_filter) 

324 results = query.get_query(self.client, _PLEDGE).stream() 

325 

326 entries = [ 

327 pledge for doc in results if (pledge := self._cvt_snap_to_model(doc)) is not None 

328 ] 

329 if IS_DEBUG: 

330 LOG().debug(f"Retrieved {len(entries)} pledges requiring refund processing.") 

331 

332 return entries 

333 except Exception as e: 

334 msg = f"Failed to get pledges requiring refund processing: {e}" 

335 raise DatabaseError( 

336 message=msg, 

337 error_code=AppErrorCode.DATABASE, 

338 collection_name=self.collection_name, 

339 ) from e 

340 

341 # ======================================================================== 

342 # STATS 

343 # ======================================================================== 

344 

345 def get_stats(self, days: int = 7) -> TimeSeriesPaymentData: 

346 date_ranges = TimeUtil.get_date_range( 

347 days, start=TimeUtil.get_start_of_day_utc(), reverse=True 

348 ) 

349 

350 agg_stats = TimeSeriesPaymentData() 

351 error_ct = 0 

352 for date_range in date_ranges: 

353 from_date = date_range.from_date 

354 to_date = date_range.to_date 

355 

356 err_ct, counts = self._get_capture_stats(from_date, to_date) 

357 error_ct += err_ct 

358 

359 for stat_type, stat_ct in counts.items(): 

360 # only get stats not equal to 0 . 

361 if not stat_ct.is_empty: 

362 agg_stats.add(from_date, stat_type, stat_ct) 

363 

364 if error_ct > 0: 

365 msg = f"Encountered {error_ct} errors while getting payment stats" 

366 self.log_error( 

367 job_type=AppJobType.REPORT_PAYMENT_STATS, 

368 message=msg, 

369 error_code=AppErrorCode.DATABASE, 

370 ) 

371 

372 return agg_stats 

373 

374 def _get_capture_stats( 

375 self, 

376 from_date: datetime, 

377 to_date: datetime, 

378 ) -> tuple[int, dict[PaymentStatus, PaymentStat]]: 

379 

380 counts: dict[PaymentStatus, PaymentStat] = defaultdict(lambda: PaymentStat(0.0, 0.0)) 

381 error_ct = 0 

382 

383 client = self.client 

384 col_name = self.collection_name 

385 

386 debug_label = f"{TimeUtil.formatted_user(from_date)} - {TimeUtil.formatted_user(to_date)}" 

387 for status in PaymentStatus: 

388 

389 for status_type in [True, False]: # error and non error. 

390 where_fields = _Nested.where_fields(status, status_type) 

391 if where_fields is None: 

392 continue 

393 try: 

394 db_query = CollectionStatQuery.custom( 

395 from_date=from_date, 

396 to_date=to_date, 

397 where_fields=where_fields, 

398 ) 

399 query = db_query.get_query(client, col_name) 

400 agg_value = self._get_agg_value(query=query) 

401 if agg_value is None: 

402 msg = f"No count value returned for {debug_label} - {status}" 

403 LOG().warning(msg) 

404 error_ct += 1 

405 continue 

406 if agg_value.is_error: 

407 msg = f"Error getting count for {debug_label} - {status}" 

408 LOG().error(msg) 

409 error_ct += 1 

410 continue 

411 

412 if status_type: 

413 # error stat, so count is actually error_count 

414 charge_stat = PaymentStat(count=agg_value.count, error_count=0.0) 

415 else: 

416 # non error stat, so count is actually count and error_count is 0. 

417 charge_stat = PaymentStat(count=0.0, error_count=agg_value.count) 

418 

419 counts[status].accumulate(charge_stat) 

420 

421 except Exception as e: 

422 LOG().error(f"Error getting aggregate stats {debug_label}: {status}: {e}") 

423 error_ct += 1 

424 continue 

425 

426 if IS_DEBUG: 

427 total_ct = sum((stat.count + stat.error_count) for stat in counts.values()) 

428 msg = f"Total count for {debug_label}: {total_ct } (errors: {error_ct})" 

429 LOG().debug(msg) 

430 

431 return error_ct, counts 

432 

433 

434class _Nested: 

435 @staticmethod 

436 def payment_key(key: PaymentKeys) -> str: 

437 return f"{_PledgeK.PAYMENT.value}.{key.value}" 

438 

439 @staticmethod 

440 def schedule_key(key: PaymentScheduleKeys) -> str: 

441 return f"{_PledgeK.PAYMENT.value}.{PaymentKeys.SCHEDULE.value}.{key.value}" 

442 

443 @staticmethod 

444 def _where_payment( 

445 key: PaymentKeys, 

446 op: FieldOp, 

447 value: Any, 

448 ) -> WhereField[Any]: 

449 return WhereField[Any](_Nested.payment_key(key), op, value) 

450 

451 @staticmethod 

452 def _where_schedule( 

453 key: PaymentScheduleKeys, 

454 op: FieldOp, 

455 value: Any, 

456 ) -> WhereField[Any]: 

457 return WhereField[Any](_Nested.schedule_key(key), op, value) 

458 

459 @staticmethod 

460 def where_fields(status: PaymentStatus, is_error: bool) -> list[WhereField[Any]] | None: 

461 return ( 

462 _Nested._where_info_fields(status) 

463 if not is_error 

464 else _Nested._where_error_fields(status) 

465 ) 

466 

467 @staticmethod 

468 def _where_info_fields(status: PaymentStatus) -> list[WhereField[Any]] | None: 

469 return [_Nested._where_payment(PaymentKeys.STATUS, _OP.EQUAL, status.value)] 

470 

471 @staticmethod 

472 def _where_error_fields(status: PaymentStatus) -> list[WhereField[Any]] | None: 

473 match status: 

474 case PaymentStatus.HOLD: 

475 return [WhereField[Any](_PledgeK.ERROR_COUNT, _OP.NOT_EQUAL, 0)] 

476 case PaymentStatus.CAPTURE: 

477 return [ 

478 _Nested._where_payment(PaymentKeys.STATUS, _OP.EQUAL, PaymentStatus.CAPTURE), 

479 _Nested._where_schedule(PaymentScheduleKeys.CAPTURE_ON, _OP.EQUAL, None), 

480 ] 

481 case PaymentStatus.TRANSFER: 

482 return [ 

483 _Nested._where_payment(PaymentKeys.STATUS, _OP.EQUAL, PaymentStatus.TRANSFER), 

484 _Nested._where_schedule(PaymentScheduleKeys.TRANSFER_ON, _OP.EQUAL, None), 

485 ] 

486 case PaymentStatus.REFUND: 

487 return [ 

488 _Nested._where_payment(PaymentKeys.STATUS, _OP.EQUAL, PaymentStatus.REFUND), 

489 _Nested._where_schedule(PaymentScheduleKeys.REFUND_ON, _OP.EQUAL, None), 

490 ] 

491 case _: 

492 if IS_DEBUG: 

493 LOG().debug(f"Not collecting error stats for status {status}") 

494 return None