Coverage for functions \ flipdare \ firestore \ backend \ app_log_db.py: 86%
95 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-05-08 12:22 +1000
« prev ^ index » next coverage.py v7.13.0, created at 2026-05-08 12:22 +1000
1#!/usr/bin/env python
2# Copyright (c) 2026 Flipdare Pty Ltd. All rights reserved.
3#
4# This file is part of Flipdare's proprietary software and contains
5# confidential and copyrighted material. Unauthorised copying,
6# modification, distribution, or use of this file is strictly
7# prohibited without prior written permission from Flipdare Pty Ltd.
8#
9# This software includes third-party components licensed under MIT,
10# BSD, and Apache 2.0 licences. See THIRD_PARTY_NOTICES for details.
11#
13from collections import defaultdict
14from datetime import datetime
15from typing import Any
16from google.cloud.firestore import Client as FirestoreClient
17from flipdare.analysis.data.nested.time_series_log_data import TimeSeriesLogData
18from flipdare.app_log import LOG
19from flipdare.constants import IS_DEBUG
20from flipdare.error.app_error import DatabaseError
21from flipdare.error.app_error_protocol import AppErrorProtocol
22from flipdare.firestore._app_db import AppDb
23from flipdare.firestore.core.collection_stat_query import CollectionStatQuery
24from flipdare.firestore.core.db_query import DbQuery, FieldOp, OrderByField, WhereField
25from flipdare.generated import AppErrorCode, AppLogKeys, AppLogModel, AppPaymentErrorCode
26from flipdare.generated.shared.backend.app_job_type import AppJobType
27from flipdare.generated.shared.backend.system_log_type import SystemLogType
28from flipdare.generated.shared.firestore_collections import FirestoreCollections
29from flipdare.util.time_util import TimeUtil
30from flipdare.wrapper.backend.app_log_wrapper import AppLogWrapper
32__all__ = ["AppLogDb"]
34_LOG = FirestoreCollections.APP_LOG.value
36_K = AppLogKeys
37_OP = FieldOp
40class AppLogDb(AppDb[AppLogWrapper, AppLogModel]):
42 def __init__(self, client: FirestoreClient) -> None:
43 super().__init__(
44 client=client,
45 collection_name=FirestoreCollections.APP_LOG,
46 model_class=AppLogModel,
47 wrapper_class=AppLogWrapper,
48 )
50 def get_log_stats(self, days: int = 7) -> TimeSeriesLogData:
51 date_ranges = TimeUtil.get_date_range(
52 days, start=TimeUtil.get_start_of_day_utc(), reverse=True
53 )
54 agg_stats = TimeSeriesLogData()
55 error_ct = 0
56 for log_type in SystemLogType:
57 if log_type == SystemLogType.INFO:
58 continue # skip info logs — not used in charts
60 for date_range in date_ranges:
61 from_date = date_range.from_date
62 to_date = date_range.to_date
64 err_ct, counts = self._get_log_counts(log_type, from_date, to_date)
65 error_ct += err_ct
67 for error_code, count in counts.items():
68 # only get stats not equal to 0 .
69 if count > 0:
70 agg_stats.add(from_date, log_type, error_code.category, count)
72 if error_ct > 0:
73 msg = f"Encountered {error_ct} errors while getting log stats"
74 self.log_error(
75 job_type=AppJobType.REPORT_LOG_STATS,
76 message=msg,
77 error_code=AppErrorCode.DATABASE,
78 )
80 return agg_stats
82 def _get_log_counts(
83 self,
84 log_type: SystemLogType,
85 from_date: datetime,
86 to_date: datetime,
87 ) -> tuple[int, dict[AppErrorProtocol, float]]:
89 log_label = f"{TimeUtil.formatted_user(from_date)} - {TimeUtil.formatted_user(to_date)}"
90 counts: dict[AppErrorProtocol, float] = defaultdict(float)
91 error_ct = 0
93 error_codes = [*AppErrorCode, *AppPaymentErrorCode]
94 client = self.client
95 col_name = self.collection_name
97 for error_code in error_codes:
98 where_fields = [
99 WhereField[Any](_K.LOG_TYPE, _OP.EQUAL, log_type),
100 WhereField[Any](_K.ERROR_CODE, _OP.EQUAL, error_code),
101 ]
103 try:
104 db_query = CollectionStatQuery.custom(
105 from_date=from_date,
106 to_date=to_date,
107 where_fields=where_fields,
108 )
109 query = db_query.get_query(client, col_name)
110 agg_value = self._get_agg_value(query=query)
111 if agg_value is None:
112 msg = f"No count value returned for {log_label} - {log_type}/{error_code}"
113 LOG().warning(msg)
114 error_ct += 1
115 continue
116 if agg_value.is_error:
117 msg = f"Error getting count for {log_label} - {log_type}/{error_code}"
118 LOG().error(msg)
119 error_ct += 1
120 continue
122 counts[error_code] = agg_value.count
124 except Exception as e:
125 LOG().error(
126 f"Error getting aggregate stats {log_label}: {log_type}/{error_code}: {e}"
127 )
128 error_ct += 1
129 continue
131 if IS_DEBUG:
132 msg = f"Total count for {log_label} - {log_type}: {sum(counts.values())} (errors: {error_ct})"
133 LOG().debug(msg)
135 return error_ct, counts
137 def get_recent_payment_critical_issues(self, hours: int = 24) -> list[AppLogWrapper]:
138 """
139 Get issues that are waiting for admin review in the last 4 hours
140 """
141 hours_ago = TimeUtil.get_utc_time_hours_ago(hours)
142 if IS_DEBUG:
143 debug_str = (
144 f"Getting issues waiting for admin review within last "
145 f"{TimeUtil.formatted_dt(hours_ago)}"
146 )
147 LOG().debug(debug_str)
149 error_codes = [
150 AppPaymentErrorCode.CANCEL_INTENT_FAILED.value,
151 AppPaymentErrorCode.LOOSING_MONEY.value,
152 AppPaymentErrorCode.AMOUNT_TOO_SMALL.value,
153 AppPaymentErrorCode.FX_ACCOUNT_ESTIMATE_ERROR.value,
154 ]
155 error_codes_str = list(error_codes)
157 try:
158 where_fields = [
159 WhereField[_K](_K.CREATED_AT, _OP.GREATER_THAN_OR_EQUAL, hours_ago),
160 WhereField[_K](_K.ERROR_CODE, _OP.IN, error_codes_str),
161 ]
162 order_by = OrderByField.created_at(descending=False)
163 query = DbQuery.and_(where_fields, order_by=order_by)
165 results = query.get_query(self.client, _LOG).stream()
166 issues = [
167 issue for doc in results if (issue := self._cvt_snap_to_model(doc)) is not None
168 ]
169 if IS_DEBUG:
170 LOG().debug(f"Retrieved {len(issues)} issues waiting for admin review.")
172 return issues
173 except Exception as e:
174 msg = f"Failed to get issues waiting for admin review: {e}"
175 raise DatabaseError(
176 message=msg,
177 error_code=AppErrorCode.DATABASE,
178 collection_name=self.collection_name,
179 ) from e