Coverage for functions \ flipdare \ firestore \ backend \ app_stat_db.py: 88%

102 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-05-08 12:22 +1000

1#!/usr/bin/env python 

2# Copyright (c) 2026 Flipdare Pty Ltd. All rights reserved. 

3# 

4# This file is part of Flipdare's proprietary software and contains 

5# confidential and copyrighted material. Unauthorised copying, 

6# modification, distribution, or use of this file is strictly 

7# prohibited without prior written permission from Flipdare Pty Ltd. 

8# 

9# This software includes third-party components licensed under MIT, 

10# BSD, and Apache 2.0 licences. See THIRD_PARTY_NOTICES for details. 

11# 

12 

13from datetime import datetime, timedelta 

14from typing import Any, cast, override 

15from google.cloud.firestore import Client as FirestoreClient, DocumentSnapshot 

16from flipdare.app_log import LOG 

17from flipdare.app_types import DatabaseDict, StatMetric 

18from flipdare.constants import IS_TRACE 

19from flipdare.analysis.data.time_series_metric_data import TimeSeriesMetricData 

20from flipdare.error.app_error import DatabaseError 

21from flipdare.firestore._app_db import AppDb 

22from flipdare.firestore._app_sub_db import AppSubDb 

23from flipdare.generated.model.backend.app_stat_metric_model import AppStatMetricModel 

24from flipdare.generated.model.backend.app_stat_model import AppStatKeys, AppStatModel 

25from flipdare.generated.model.backend.metric.count_metric import CountMetric 

26from flipdare.generated.model.backend.metric.outcome_metric import OutcomeMetric 

27from flipdare.generated.shared.app_error_code import AppErrorCode 

28from flipdare.generated.shared.firestore_collections import FirestoreCollections 

29from flipdare.util.time_util import TimeUtil 

30from flipdare.wrapper.backend.app_stat_metric_wrapper import ( 

31 AppStatMetricWrapper, 

32) 

33from flipdare.wrapper.backend.app_stat_wrapper import AppStatWrapper 

34 

35__all__ = ["AppStatDb"] 

36 

37_STAT: str = FirestoreCollections.APP_STAT.value 

38_STAT_METRIC: str = FirestoreCollections.APP_STAT_METRIC.value 

39 

40_K = AppStatKeys 

41 

42 

43class AppStatDb(AppDb[AppStatWrapper, AppStatModel]): 

44 """Class for managing app performance database operations.""" 

45 

46 def __init__(self, client: FirestoreClient) -> None: 

47 # /stats/AppStatsModel/AppStatsMetricModel 

48 super().__init__( 

49 client=client, 

50 collection_name=FirestoreCollections.APP_STAT, 

51 model_class=AppStatModel, 

52 wrapper_class=AppStatWrapper, 

53 ) 

54 

55 self.metrics = AppSubDb[AppStatMetricWrapper, AppStatMetricModel]( 

56 client=client, 

57 collection_name=FirestoreCollections.APP_STAT, 

58 sub_collection_name=FirestoreCollections.APP_STAT_METRIC, 

59 wrapper_class=AppStatMetricWrapper, 

60 model_class=AppStatMetricModel, 

61 ) 

62 

63 def add(self, metric: AppStatMetricModel, day: datetime | None = None) -> AppStatMetricWrapper: 

64 day = TimeUtil.get_start_of_day_utc() if day is None else day 

65 doc_id = TimeUtil.formatted_short(day) 

66 

67 if IS_TRACE: 

68 LOG().trace(f"Adding metric to AppStatDb with doc_id {doc_id} and metric {metric}") 

69 

70 try: 

71 data = AppStatModel(id=None, date_str=doc_id).to_dict() 

72 data["id"] = doc_id 

73 # call our overridden create 

74 self.create(data=data, merge=True) 

75 

76 return self.metrics.create_sub( 

77 parent_id=doc_id, 

78 data=metric, 

79 ) 

80 except Exception as e: 

81 LOG().error(f"Error adding metric to AppStatsDb: {e}") 

82 raise 

83 

84 @override 

85 def create(self, data: AppStatModel | dict[str, Any], merge: bool = True) -> AppStatWrapper: 

86 # NOTE: we have to override create, since we are explictly setting the doc_id 

87 day = TimeUtil.get_start_of_day_utc() 

88 

89 doc_data = data.to_dict() if isinstance(data, AppStatModel) else data 

90 key = _K.DATE_STR.value 

91 doc_id = doc_data[key] if key in doc_data else TimeUtil.formatted_short(day) 

92 

93 if IS_TRACE: 

94 LOG().trace(f"Creating AppStatModel with doc_id {doc_id} and data {doc_data}") 

95 

96 col_name = self.collection_name 

97 saved_data: DatabaseDict | None = None 

98 try: 

99 doc_ref = self.client.collection(col_name).document(doc_id) 

100 doc_ref.set(doc_data, merge=merge) 

101 snap = doc_ref.get() 

102 snap = cast("DocumentSnapshot", snap) 

103 saved_data = self._cvt_snap_to_data(snap) 

104 

105 except Exception as error: 

106 msg = f"Error creating stat for collection {col_name}/{doc_id}" 

107 LOG().error(f"{msg}: {error}\n\t{data}") 

108 raise DatabaseError( 

109 f"Failed to create document for collection {col_name}", 

110 error_code=AppErrorCode.DATABASE, 

111 collection_name=col_name, 

112 error=error, 

113 ) from error 

114 

115 if saved_data is not None: 

116 return self.wrapper_class.from_dict(saved_data) 

117 

118 raise DatabaseError( 

119 f"Failed to create document for collection {col_name}", 

120 error_code=AppErrorCode.DATABASE, 

121 collection_name=col_name, 

122 document_id=doc_ref.id, 

123 ) 

124 

125 def get_stats(self, days: int = 7) -> TimeSeriesMetricData: 

126 count = self._get_metric_from_last_week(days, CountMetric) 

127 success = self._get_metric_from_last_week(days, OutcomeMetric) 

128 count.merge(success) 

129 

130 return count 

131 

132 def _get_metric_from_last_week( 

133 self, days: int, model_class: type[StatMetric] 

134 ) -> TimeSeriesMetricData: 

135 stats: TimeSeriesMetricData = TimeSeriesMetricData() 

136 today = TimeUtil.get_start_of_day_utc() 

137 error_ct = 0 

138 

139 for i in range(days): 

140 day = today - timedelta(days=i) 

141 day_id = TimeUtil.formatted_short(day) 

142 dt = TimeUtil.short_to_utc_dt(day_id) 

143 

144 # Query the sub-collection for this specific day 

145 metrics_ref = self.client.collection(_STAT).document(day_id).collection(_STAT_METRIC) 

146 docs = metrics_ref.stream() 

147 count = 0 

148 

149 for doc in docs: 

150 metric = self.metrics._cvt_snap_to_model(doc) 

151 count += 1 

152 

153 if metric is None: 

154 msg = f"Failed to convert document {doc.id} to AppStatMetricModel: {doc.to_dict()}" 

155 LOG().warning(msg) 

156 error_ct += 1 

157 continue 

158 

159 # get the actual metric .. 

160 metric_model = metric.model.metric 

161 

162 if not isinstance(metric_model, model_class): 

163 if IS_TRACE: 

164 actual = type(metric_model).__name__ 

165 msg = f"Skipping metric {day_id}/{doc.id}, since metric {actual} not required" 

166 LOG().trace(msg) 

167 continue 

168 

169 if IS_TRACE: 

170 msg = f"Adding metric {day_id}/{doc.id} to stats, metric: {metric_model}" 

171 LOG().trace(msg) 

172 

173 stats.add_metric(dt, metric.job_type, metric_model) 

174 

175 if IS_TRACE: 

176 LOG().trace(f"Processed {count} documents for day {day_id}") 

177 

178 stats.error_ct = error_ct 

179 return stats