Coverage for functions \ flipdare \ firestore \ backend \ app_stat_db.py: 88%
102 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-05-08 12:22 +1000
« prev ^ index » next coverage.py v7.13.0, created at 2026-05-08 12:22 +1000
1#!/usr/bin/env python
2# Copyright (c) 2026 Flipdare Pty Ltd. All rights reserved.
3#
4# This file is part of Flipdare's proprietary software and contains
5# confidential and copyrighted material. Unauthorised copying,
6# modification, distribution, or use of this file is strictly
7# prohibited without prior written permission from Flipdare Pty Ltd.
8#
9# This software includes third-party components licensed under MIT,
10# BSD, and Apache 2.0 licences. See THIRD_PARTY_NOTICES for details.
11#
13from datetime import datetime, timedelta
14from typing import Any, cast, override
15from google.cloud.firestore import Client as FirestoreClient, DocumentSnapshot
16from flipdare.app_log import LOG
17from flipdare.app_types import DatabaseDict, StatMetric
18from flipdare.constants import IS_TRACE
19from flipdare.analysis.data.time_series_metric_data import TimeSeriesMetricData
20from flipdare.error.app_error import DatabaseError
21from flipdare.firestore._app_db import AppDb
22from flipdare.firestore._app_sub_db import AppSubDb
23from flipdare.generated.model.backend.app_stat_metric_model import AppStatMetricModel
24from flipdare.generated.model.backend.app_stat_model import AppStatKeys, AppStatModel
25from flipdare.generated.model.backend.metric.count_metric import CountMetric
26from flipdare.generated.model.backend.metric.outcome_metric import OutcomeMetric
27from flipdare.generated.shared.app_error_code import AppErrorCode
28from flipdare.generated.shared.firestore_collections import FirestoreCollections
29from flipdare.util.time_util import TimeUtil
30from flipdare.wrapper.backend.app_stat_metric_wrapper import (
31 AppStatMetricWrapper,
32)
33from flipdare.wrapper.backend.app_stat_wrapper import AppStatWrapper
35__all__ = ["AppStatDb"]
37_STAT: str = FirestoreCollections.APP_STAT.value
38_STAT_METRIC: str = FirestoreCollections.APP_STAT_METRIC.value
40_K = AppStatKeys
43class AppStatDb(AppDb[AppStatWrapper, AppStatModel]):
44 """Class for managing app performance database operations."""
46 def __init__(self, client: FirestoreClient) -> None:
47 # /stats/AppStatsModel/AppStatsMetricModel
48 super().__init__(
49 client=client,
50 collection_name=FirestoreCollections.APP_STAT,
51 model_class=AppStatModel,
52 wrapper_class=AppStatWrapper,
53 )
55 self.metrics = AppSubDb[AppStatMetricWrapper, AppStatMetricModel](
56 client=client,
57 collection_name=FirestoreCollections.APP_STAT,
58 sub_collection_name=FirestoreCollections.APP_STAT_METRIC,
59 wrapper_class=AppStatMetricWrapper,
60 model_class=AppStatMetricModel,
61 )
63 def add(self, metric: AppStatMetricModel, day: datetime | None = None) -> AppStatMetricWrapper:
64 day = TimeUtil.get_start_of_day_utc() if day is None else day
65 doc_id = TimeUtil.formatted_short(day)
67 if IS_TRACE:
68 LOG().trace(f"Adding metric to AppStatDb with doc_id {doc_id} and metric {metric}")
70 try:
71 data = AppStatModel(id=None, date_str=doc_id).to_dict()
72 data["id"] = doc_id
73 # call our overridden create
74 self.create(data=data, merge=True)
76 return self.metrics.create_sub(
77 parent_id=doc_id,
78 data=metric,
79 )
80 except Exception as e:
81 LOG().error(f"Error adding metric to AppStatsDb: {e}")
82 raise
84 @override
85 def create(self, data: AppStatModel | dict[str, Any], merge: bool = True) -> AppStatWrapper:
86 # NOTE: we have to override create, since we are explictly setting the doc_id
87 day = TimeUtil.get_start_of_day_utc()
89 doc_data = data.to_dict() if isinstance(data, AppStatModel) else data
90 key = _K.DATE_STR.value
91 doc_id = doc_data[key] if key in doc_data else TimeUtil.formatted_short(day)
93 if IS_TRACE:
94 LOG().trace(f"Creating AppStatModel with doc_id {doc_id} and data {doc_data}")
96 col_name = self.collection_name
97 saved_data: DatabaseDict | None = None
98 try:
99 doc_ref = self.client.collection(col_name).document(doc_id)
100 doc_ref.set(doc_data, merge=merge)
101 snap = doc_ref.get()
102 snap = cast("DocumentSnapshot", snap)
103 saved_data = self._cvt_snap_to_data(snap)
105 except Exception as error:
106 msg = f"Error creating stat for collection {col_name}/{doc_id}"
107 LOG().error(f"{msg}: {error}\n\t{data}")
108 raise DatabaseError(
109 f"Failed to create document for collection {col_name}",
110 error_code=AppErrorCode.DATABASE,
111 collection_name=col_name,
112 error=error,
113 ) from error
115 if saved_data is not None:
116 return self.wrapper_class.from_dict(saved_data)
118 raise DatabaseError(
119 f"Failed to create document for collection {col_name}",
120 error_code=AppErrorCode.DATABASE,
121 collection_name=col_name,
122 document_id=doc_ref.id,
123 )
125 def get_stats(self, days: int = 7) -> TimeSeriesMetricData:
126 count = self._get_metric_from_last_week(days, CountMetric)
127 success = self._get_metric_from_last_week(days, OutcomeMetric)
128 count.merge(success)
130 return count
132 def _get_metric_from_last_week(
133 self, days: int, model_class: type[StatMetric]
134 ) -> TimeSeriesMetricData:
135 stats: TimeSeriesMetricData = TimeSeriesMetricData()
136 today = TimeUtil.get_start_of_day_utc()
137 error_ct = 0
139 for i in range(days):
140 day = today - timedelta(days=i)
141 day_id = TimeUtil.formatted_short(day)
142 dt = TimeUtil.short_to_utc_dt(day_id)
144 # Query the sub-collection for this specific day
145 metrics_ref = self.client.collection(_STAT).document(day_id).collection(_STAT_METRIC)
146 docs = metrics_ref.stream()
147 count = 0
149 for doc in docs:
150 metric = self.metrics._cvt_snap_to_model(doc)
151 count += 1
153 if metric is None:
154 msg = f"Failed to convert document {doc.id} to AppStatMetricModel: {doc.to_dict()}"
155 LOG().warning(msg)
156 error_ct += 1
157 continue
159 # get the actual metric ..
160 metric_model = metric.model.metric
162 if not isinstance(metric_model, model_class):
163 if IS_TRACE:
164 actual = type(metric_model).__name__
165 msg = f"Skipping metric {day_id}/{doc.id}, since metric {actual} not required"
166 LOG().trace(msg)
167 continue
169 if IS_TRACE:
170 msg = f"Adding metric {day_id}/{doc.id} to stats, metric: {metric_model}"
171 LOG().trace(msg)
173 stats.add_metric(dt, metric.job_type, metric_model)
175 if IS_TRACE:
176 LOG().trace(f"Processed {count} documents for day {day_id}")
178 stats.error_ct = error_ct
179 return stats