Coverage for functions \ flipdare \ firestore \ pledge_db.py: 32%
222 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-05-08 12:22 +1000
« prev ^ index » next coverage.py v7.13.0, created at 2026-05-08 12:22 +1000
1#!/usr/bin/env python
2# Copyright (c) 2026 Flipdare Pty Ltd. All rights reserved.
3#
4# This file is part of Flipdare's proprietary software and contains
5# confidential and copyrighted material. Unauthorised copying,
6# modification, distribution, or use of this file is strictly
7# prohibited without prior written permission from Flipdare Pty Ltd.
8#
9# This software includes third-party components licensed under MIT,
10# BSD, and Apache 2.0 licences. See THIRD_PARTY_NOTICES for details.
11#
12from typing import Any
13from google.cloud.firestore import Client as FirestoreClient, FieldFilter
14from flipdare.app_log import LOG
15from flipdare.constants import IS_DEBUG, PAYMENT_MAX_RETRIES
16from flipdare.error.app_error import DatabaseError
17from flipdare.firestore._app_db import AppDb
18from flipdare.firestore._app_sub_db import AppSubDb
19from flipdare.firestore.core.db_query import DbQuery, WhereField, FieldOp
20from flipdare.firestore.core.pledge_event_transaction import PledgeEventTransaction
21from flipdare.generated.model.payment.payment_event_model import PaymentEventModel
22from flipdare.generated.model.payment.payment_model import PaymentKeys
23from flipdare.generated.model.payment.payment_result_model import PaymentResultModel
24from flipdare.generated.model.payment.payment_schedule_model import PaymentScheduleKeys
25from flipdare.generated.model.payment.pledge_model import (
26 PledgeInternalKeys,
27 PledgeKeys,
28 PledgeModel,
29)
30from flipdare.generated.shared.app_error_code import AppErrorCode
31from flipdare.generated.shared.firestore_collections import FirestoreCollections
32from flipdare.generated.shared.model.pledge_status import PledgeStatus
33from flipdare.wrapper.payment.payment_event_wrapper import PaymentEventWrapper
34from flipdare.wrapper.payment.pledge_wrapper import PledgeWrapper
35from google.cloud.firestore import And, Or
36from collections import defaultdict
37from datetime import datetime
38from flipdare.analysis.data.nested.time_series_payment_data import (
39 PaymentStat,
40 TimeSeriesPaymentData,
41)
42from flipdare.firestore.core.collection_stat_query import CollectionStatQuery
43from flipdare.generated.shared.backend.app_job_type import AppJobType
44from flipdare.generated.shared.payment.payment_status import PaymentStatus
45from flipdare.util.time_util import TimeUtil
47_PLEDGE: str = FirestoreCollections.PLEDGE.value
49__all__ = ["PledgeDb"]
51_PledgeK = PledgeKeys
52_OP = FieldOp
53_I = PledgeInternalKeys
56class PledgeDb(AppDb[PledgeWrapper, PledgeModel]):
57 """Class for managing pledge-related database operations."""
59 def __init__(self, client: FirestoreClient) -> None:
60 super().__init__(
61 client=client,
62 collection_name=FirestoreCollections.PLEDGE,
63 model_class=PledgeModel,
64 wrapper_class=PledgeWrapper,
65 )
67 self.events = AppSubDb[PaymentEventWrapper, PaymentEventModel](
68 client=client,
69 collection_name=FirestoreCollections.PLEDGE,
70 sub_collection_name=FirestoreCollections.PLEDGE_PAYMENT_EVENTS,
71 wrapper_class=PaymentEventWrapper,
72 model_class=PaymentEventModel,
73 )
75 def get_payment_events(self, pledge_id: str) -> list[PaymentEventWrapper]:
76 """Get all payment events for a pledge."""
77 if IS_DEBUG:
78 LOG().debug(f"Getting payment events for pledge: {pledge_id}")
80 return self.events.get_all_sub(pledge_id)
82 def add_payment_event(
83 self,
84 pledge: PledgeWrapper,
85 event: PaymentEventModel,
86 payment_status: PaymentStatus,
87 result: PaymentResultModel,
88 ) -> PaymentEventWrapper:
89 """Add a payment event to a pledge."""
90 pledge_id = pledge.doc_id
91 if IS_DEBUG:
92 LOG().debug(f"Adding payment event for pledge: {pledge_id}, event: {event}")
94 trans = PledgeEventTransaction(self.events)
95 return trans.add_event(
96 pledge=pledge,
97 event=event,
98 payment_status=payment_status,
99 result=result,
100 )
102 def get_pledges_for_dare(self, dare_id: str) -> list[PledgeWrapper]:
103 """Get all pledges for a specific dare from Firestore."""
104 query = self.client.collection(_PLEDGE).where(
105 filter=WhereField[_PledgeK](_PledgeK.DARE_ID, _OP.EQUAL, dare_id).filter,
106 )
107 results = query.get()
108 if not results or len(results) == 0:
109 return []
111 entries = [
112 pledge for doc in results if (pledge := self._cvt_snap_to_model(doc)) is not None
113 ]
114 if IS_DEBUG:
115 LOG().debug(f"Retrieved {len(entries)} pledges for dare {dare_id}.")
117 return entries
119 def get_by_payment_intent_id(self, payment_intent_id: str) -> PledgeWrapper | None:
120 """Get a pledge by its associated Stripe payment intent ID."""
121 query = self.client.collection(_PLEDGE).where(
122 filter=WhereField[Any](
123 _Nested.payment_key(PaymentKeys.PAYMENT_INTENT_ID), _OP.EQUAL, payment_intent_id
124 ).filter,
125 )
126 results = query.get()
127 if not results or len(results) == 0:
128 return None
130 # Assuming payment_intent_id is unique, we can return the first match
131 for doc in results:
132 pledge = self._cvt_snap_to_model(doc)
133 if pledge is not None:
134 if IS_DEBUG:
135 LOG().debug(
136 f"Found pledge with payment_intent_id {payment_intent_id}: {pledge.doc_id}"
137 )
138 return pledge
140 if IS_DEBUG:
141 LOG().debug(f"No pledge found with payment_intent_id {payment_intent_id}.")
142 return None
144 # ========================================================================
145 # CRONS
146 # ========================================================================
148 def get_pledges_to_reauthorize(self, expire_in_hours: int = 24) -> list[PledgeWrapper]:
149 """
150 Gets charges that are expiring within the next 24 hours and have not yet been captured.
151 These intents must be refreshed to prevent them from expiring and being automatically cancelled by Stripe.
152 """
153 now = TimeUtil.get_current_utc_dt()
154 expire_in = TimeUtil.get_utc_time_future_hours(now, expire_in_hours)
156 # stripe stores expires_at as epoch ints, so we need to convert our datetime to epoch seconds for the query
157 expire_in_epoch = TimeUtil.dt_to_simple_epoch(expire_in)
159 if IS_DEBUG:
160 msg = (
161 f"Getting captured charges that are expiring on or before {TimeUtil.formatted_dt(expire_in)} ({expire_in_epoch} sec)"
162 f" (in the next {expire_in_hours} hours) and have not yet been captured"
163 )
164 LOG().debug(msg)
166 try:
167 and_fields = [
168 WhereField[Any](
169 _Nested.payment_key(PaymentKeys.CAPTURE_BEFORE),
170 _OP.LESS_THAN_OR_EQUAL,
171 expire_in_epoch,
172 ),
173 WhereField[Any](
174 _Nested.payment_key(PaymentKeys.STATUS),
175 _OP.EQUAL,
176 PaymentStatus.HOLD.value,
177 ),
178 ]
179 query = DbQuery.and_(and_fields)
181 charge_docs = query.get_query(self.client, _PLEDGE).stream()
182 charges = [
183 charge
184 for doc in charge_docs
185 if (charge := self._cvt_snap_to_model(doc)) is not None
186 ]
187 if IS_DEBUG:
188 LOG().debug(f"Retrieved {len(charges)} expiring charges.")
189 return charges
190 except Exception as e:
191 msg = f"Failed to get expiring charges: {e}"
192 raise DatabaseError(
193 msg,
194 error_code=AppErrorCode.DATABASE,
195 collection_name=_PLEDGE,
196 document_id=None,
197 ) from e
199 def get_pledges_to_capture(self) -> list[PledgeWrapper]:
200 """
201 Get pledges that need to be captured.
202 """
203 return self._get_pledges(
204 PaymentStatus.CAPTURE, _Nested.schedule_key(PaymentScheduleKeys.CAPTURE_ON)
205 )
207 def get_pledges_to_transfer(self) -> list[PledgeWrapper]:
208 """
209 Get pledges that need to be transferred.
210 """
211 return self._get_pledges(
212 PaymentStatus.TRANSFER, _Nested.schedule_key(PaymentScheduleKeys.TRANSFER_ON)
213 )
215 def get_pledges_to_refund(self) -> list[PledgeWrapper]:
216 """
217 Get pledges that need to be refunded.
218 """
219 return self._get_pledges(
220 PaymentStatus.REFUND, _Nested.schedule_key(PaymentScheduleKeys.REFUND_ON)
221 )
223 def _get_pledges(
224 self,
225 status: PaymentStatus,
226 capture_key: Any,
227 ) -> list[PledgeWrapper]:
228 msg = f"Getting captured pledges with status {status} and non-null {capture_key} for processing"
229 LOG().info(msg)
231 try:
232 and_fields = [
233 WhereField[_PledgeK](_PledgeK.STATUS, _OP.EQUAL, status),
234 WhereField[_PledgeK](capture_key, _OP.NOT_EQUAL, None),
235 ]
236 query = DbQuery.and_(and_fields)
238 charge_docs = query.get_query(self.client, _PLEDGE).stream()
239 charges = [
240 charge
241 for doc in charge_docs
242 if (charge := self._cvt_snap_to_model(doc)) is not None
243 ]
244 if IS_DEBUG:
245 LOG().debug(f"Retrieved {len(charges)} charges ready for {status}.")
246 return charges
247 except Exception as e:
248 msg = f"Failed to get pledges ready for {status}: {e}"
249 raise DatabaseError(
250 msg,
251 error_code=AppErrorCode.DATABASE,
252 collection_name=_PLEDGE,
253 document_id=None,
254 ) from e
256 # ========================================================================
257 # REPORTS
258 # ========================================================================
260 def get_unprocessed_payments(self) -> list[PledgeWrapper]:
261 """
262 This finds pledges that require:
263 1. "processing"
264 2. Dont have a "StripeCharge" but are not in an error state (since we want to retry those)
266 NOTE: we dont use a date range to ensure we get all unprocessed pledges,
267 NOTE: but we could consider adding one in the future if performance becomes an issue.
269 """
270 status_filter = [
271 FieldFilter(_PledgeK.STATUS.value, _OP.EQUAL.value, PledgeStatus.PROCESSING.value),
272 FieldFilter(_I.ERROR_COUNT.value, _OP.LESS_THAN.value, PAYMENT_MAX_RETRIES),
273 ]
275 statuses = [
276 PledgeStatus.REFUND_ERROR.value,
277 PledgeStatus.ERROR.value,
278 ]
280 and_filter = [
281 FieldFilter(_PledgeK.STATUS.value, _OP.NOT_IN.value, statuses),
282 FieldFilter(_Nested.payment_key(PaymentKeys.LAST_EVENT), _OP.NOT_EQUAL.value, None),
283 ]
284 try:
285 query = DbQuery(filter_value=And([*status_filter, Or(And(and_filter))])) # type: ignore[arg-type]
286 results = query.get_query(self.client, _PLEDGE).stream()
288 entries = [
289 pledge for doc in results if (pledge := self._cvt_snap_to_model(doc)) is not None
290 ]
291 if IS_DEBUG:
292 LOG().debug(f"Retrieved {len(entries)} pledges ready for payment processing.")
294 return entries
296 except Exception as e:
297 msg = f"Failed to get pledges ready for payment processing: {e}"
298 raise DatabaseError(
299 message=msg,
300 error_code=AppErrorCode.DATABASE,
301 collection_name=self.collection_name,
302 ) from e
304 def get_unprocessed_refunds(self) -> list[PledgeWrapper]:
305 """
306 This finds pledges that require:
307 1. "REFUND_REQUESTED" or "REFUND_ERROR"
309 NOTE: we dont use a date range to ensure we get all unprocessed refunds.
310 NOTE: BECAUSE NOT PROCESSING REFUNDS CAN COST US MONEY.
311 NOTE: via CHARGE_BACK or STRIPE automatically retrying refunds that errored.
312 """
313 statuses = [
314 PledgeStatus.REFUND_REQUESTED.value,
315 PledgeStatus.REFUND_ERROR.value,
316 ]
318 where_filter = [
319 WhereField[Any](_PledgeK.STATUS, _OP.IN, statuses),
320 WhereField[Any](_I.ERROR_COUNT, _OP.LESS_THAN, PAYMENT_MAX_RETRIES),
321 ]
322 try:
323 query = DbQuery.and_(where_filter)
324 results = query.get_query(self.client, _PLEDGE).stream()
326 entries = [
327 pledge for doc in results if (pledge := self._cvt_snap_to_model(doc)) is not None
328 ]
329 if IS_DEBUG:
330 LOG().debug(f"Retrieved {len(entries)} pledges requiring refund processing.")
332 return entries
333 except Exception as e:
334 msg = f"Failed to get pledges requiring refund processing: {e}"
335 raise DatabaseError(
336 message=msg,
337 error_code=AppErrorCode.DATABASE,
338 collection_name=self.collection_name,
339 ) from e
341 # ========================================================================
342 # STATS
343 # ========================================================================
345 def get_stats(self, days: int = 7) -> TimeSeriesPaymentData:
346 date_ranges = TimeUtil.get_date_range(
347 days, start=TimeUtil.get_start_of_day_utc(), reverse=True
348 )
350 agg_stats = TimeSeriesPaymentData()
351 error_ct = 0
352 for date_range in date_ranges:
353 from_date = date_range.from_date
354 to_date = date_range.to_date
356 err_ct, counts = self._get_capture_stats(from_date, to_date)
357 error_ct += err_ct
359 for stat_type, stat_ct in counts.items():
360 # only get stats not equal to 0 .
361 if not stat_ct.is_empty:
362 agg_stats.add(from_date, stat_type, stat_ct)
364 if error_ct > 0:
365 msg = f"Encountered {error_ct} errors while getting payment stats"
366 self.log_error(
367 job_type=AppJobType.REPORT_PAYMENT_STATS,
368 message=msg,
369 error_code=AppErrorCode.DATABASE,
370 )
372 return agg_stats
374 def _get_capture_stats(
375 self,
376 from_date: datetime,
377 to_date: datetime,
378 ) -> tuple[int, dict[PaymentStatus, PaymentStat]]:
380 counts: dict[PaymentStatus, PaymentStat] = defaultdict(lambda: PaymentStat(0.0, 0.0))
381 error_ct = 0
383 client = self.client
384 col_name = self.collection_name
386 debug_label = f"{TimeUtil.formatted_user(from_date)} - {TimeUtil.formatted_user(to_date)}"
387 for status in PaymentStatus:
389 for status_type in [True, False]: # error and non error.
390 where_fields = _Nested.where_fields(status, status_type)
391 if where_fields is None:
392 continue
393 try:
394 db_query = CollectionStatQuery.custom(
395 from_date=from_date,
396 to_date=to_date,
397 where_fields=where_fields,
398 )
399 query = db_query.get_query(client, col_name)
400 agg_value = self._get_agg_value(query=query)
401 if agg_value is None:
402 msg = f"No count value returned for {debug_label} - {status}"
403 LOG().warning(msg)
404 error_ct += 1
405 continue
406 if agg_value.is_error:
407 msg = f"Error getting count for {debug_label} - {status}"
408 LOG().error(msg)
409 error_ct += 1
410 continue
412 if status_type:
413 # error stat, so count is actually error_count
414 charge_stat = PaymentStat(count=agg_value.count, error_count=0.0)
415 else:
416 # non error stat, so count is actually count and error_count is 0.
417 charge_stat = PaymentStat(count=0.0, error_count=agg_value.count)
419 counts[status].accumulate(charge_stat)
421 except Exception as e:
422 LOG().error(f"Error getting aggregate stats {debug_label}: {status}: {e}")
423 error_ct += 1
424 continue
426 if IS_DEBUG:
427 total_ct = sum((stat.count + stat.error_count) for stat in counts.values())
428 msg = f"Total count for {debug_label}: {total_ct } (errors: {error_ct})"
429 LOG().debug(msg)
431 return error_ct, counts
434class _Nested:
435 @staticmethod
436 def payment_key(key: PaymentKeys) -> str:
437 return f"{_PledgeK.PAYMENT.value}.{key.value}"
439 @staticmethod
440 def schedule_key(key: PaymentScheduleKeys) -> str:
441 return f"{_PledgeK.PAYMENT.value}.{PaymentKeys.SCHEDULE.value}.{key.value}"
443 @staticmethod
444 def _where_payment(
445 key: PaymentKeys,
446 op: FieldOp,
447 value: Any,
448 ) -> WhereField[Any]:
449 return WhereField[Any](_Nested.payment_key(key), op, value)
451 @staticmethod
452 def _where_schedule(
453 key: PaymentScheduleKeys,
454 op: FieldOp,
455 value: Any,
456 ) -> WhereField[Any]:
457 return WhereField[Any](_Nested.schedule_key(key), op, value)
459 @staticmethod
460 def where_fields(status: PaymentStatus, is_error: bool) -> list[WhereField[Any]] | None:
461 return (
462 _Nested._where_info_fields(status)
463 if not is_error
464 else _Nested._where_error_fields(status)
465 )
467 @staticmethod
468 def _where_info_fields(status: PaymentStatus) -> list[WhereField[Any]] | None:
469 return [_Nested._where_payment(PaymentKeys.STATUS, _OP.EQUAL, status.value)]
471 @staticmethod
472 def _where_error_fields(status: PaymentStatus) -> list[WhereField[Any]] | None:
473 match status:
474 case PaymentStatus.HOLD:
475 return [WhereField[Any](_PledgeK.ERROR_COUNT, _OP.NOT_EQUAL, 0)]
476 case PaymentStatus.CAPTURE:
477 return [
478 _Nested._where_payment(PaymentKeys.STATUS, _OP.EQUAL, PaymentStatus.CAPTURE),
479 _Nested._where_schedule(PaymentScheduleKeys.CAPTURE_ON, _OP.EQUAL, None),
480 ]
481 case PaymentStatus.TRANSFER:
482 return [
483 _Nested._where_payment(PaymentKeys.STATUS, _OP.EQUAL, PaymentStatus.TRANSFER),
484 _Nested._where_schedule(PaymentScheduleKeys.TRANSFER_ON, _OP.EQUAL, None),
485 ]
486 case PaymentStatus.REFUND:
487 return [
488 _Nested._where_payment(PaymentKeys.STATUS, _OP.EQUAL, PaymentStatus.REFUND),
489 _Nested._where_schedule(PaymentScheduleKeys.REFUND_ON, _OP.EQUAL, None),
490 ]
491 case _:
492 if IS_DEBUG:
493 LOG().debug(f"Not collecting error stats for status {status}")
494 return None