Coverage for functions \ flipdare \ analysis \ data \ time_series_metric_data.py: 78%

139 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-05-08 12:22 +1000

1#!/usr/bin/env python 

2# Copyright (c) 2026 Flipdare Pty Ltd. All rights reserved. 

3# 

4# This file is part of Flipdare's proprietary software and contains 

5# confidential and copyrighted material. Unauthorised copying, 

6# modification, distribution, or use of this file is strictly 

7# prohibited without prior written permission from Flipdare Pty Ltd. 

8# 

9# This software includes third-party components licensed under MIT, 

10# BSD, and Apache 2.0 licences. See THIRD_PARTY_NOTICES for details. 

11# 

12 

13from __future__ import annotations 

14 

15from typing import Any, override 

16from dataclasses import dataclass, field 

17from datetime import datetime 

18from flipdare.analysis.data._time_series_protocol import TimeSeriesProtocol, TimeSeriesPlotInfo 

19from flipdare.app_types import ( 

20 AnalysisDataType, 

21 LabelledSeriesType, 

22 StatSource, 

23 ReportListType, 

24 StatMetric, 

25) 

26from flipdare.generated.model.backend.metric.count_metric import CountMetric 

27from flipdare.util.time_util import TimeUtil 

28import pandas as pd 

29 

30__all__ = ["TimeSeriesMetricData"] 

31 

32_DATE_COL = "Date" 

33_SOURCE_COL = "Source" 

34_BASE_COLS: frozenset[str] = frozenset({_DATE_COL, _SOURCE_COL}) 

35 

36 

37def _to_row(dt: datetime, source: StatSource, metric: StatMetric) -> dict[str, Any]: 

38 base: dict[str, Any] = {_DATE_COL: dt, _SOURCE_COL: source.value} 

39 if isinstance(metric, CountMetric): 

40 return base | { 

41 "Success Ct": float(metric.success_ct), 

42 "Failed Ct": float(metric.failed_ct), 

43 "Skipped Ct": float(metric.skipped_ct), 

44 "Duration": float(metric.duration), 

45 } 

46 # SuccessMetric 

47 return base | { 

48 "Succeeded": float(1 if metric.succeeded else 0), 

49 "Duration": float(metric.duration), 

50 } 

51 

52 

53@dataclass 

54class TimeSeriesMetricData(TimeSeriesProtocol): 

55 error_ct: int = 0 

56 _rows: dict[type[StatMetric], list[dict[str, Any]]] = field( 

57 default_factory=dict, 

58 init=False, 

59 repr=False, 

60 ) 

61 _sources: set[StatSource] = field( 

62 default_factory=set, 

63 init=False, 

64 repr=False, 

65 ) 

66 

67 @property 

68 @override 

69 def has_data(self) -> bool: 

70 return bool(self._rows) 

71 

72 @property 

73 @override 

74 def count(self) -> int: 

75 return sum(len(rows) for rows in self._rows.values()) 

76 

77 @property 

78 @override 

79 def dates(self) -> list[datetime]: 

80 return sorted({row[_DATE_COL] for rows in self._rows.values() for row in rows}) 

81 

82 @property 

83 @override 

84 def headers(self) -> list[str]: 

85 return self.df.columns.tolist() 

86 

87 @property 

88 def metric_types(self) -> list[type[StatMetric]]: 

89 return list(self._rows.keys()) 

90 

91 @property 

92 def sources(self) -> list[StatSource]: 

93 """Unique sources sorted by value.""" 

94 return sorted(self._sources, key=lambda s: s.value) 

95 

96 @property 

97 def df(self) -> pd.DataFrame: 

98 """Merged DataFrame across all metric types, sorted by date. Use for report tables.""" 

99 frames = [self.df_for(m_type) for m_type in self._rows] 

100 if not frames: 

101 return pd.DataFrame(columns=[_DATE_COL, _SOURCE_COL]) 

102 return pd.concat(frames, ignore_index=True).sort_values(_DATE_COL).reset_index(drop=True) 

103 

104 def add_metric(self, dt: datetime, source: StatSource, metric: StatMetric) -> None: 

105 """Add a metric data point for a given datetime and source.""" 

106 m_type = type(metric) 

107 if m_type not in self._rows: 

108 self._rows[m_type] = [] 

109 

110 self._sources.add(source) 

111 self._rows[m_type].append(_to_row(dt, source, metric)) 

112 

113 def merge(self, other: TimeSeriesMetricData) -> None: 

114 """Merge another TimeSeriesMetricData into this one.""" 

115 for m_type, rows in other._rows.items(): 

116 if m_type not in self._rows: 

117 self._rows[m_type] = [] 

118 self._rows[m_type].extend(rows) 

119 self._sources.update(other._sources) 

120 self.error_ct += other.error_ct 

121 

122 def df_for(self, metric_type: type[StatMetric]) -> pd.DataFrame: 

123 """DataFrame for a single metric type, sorted by date.""" 

124 rows = self._rows.get(metric_type, []) 

125 if not rows: 

126 return pd.DataFrame() 

127 return pd.DataFrame(rows).sort_values(_DATE_COL).reset_index(drop=True) 

128 

129 def dates_for(self, metric_type: type[StatMetric]) -> list[datetime]: 

130 """Sorted unique dates for a single metric type.""" 

131 frame = self.df_for(metric_type) 

132 if frame.empty: 

133 return [] 

134 return sorted(frame[_DATE_COL].unique().tolist()) 

135 

136 def series_labels(self, metric_type: type[StatMetric]) -> list[str]: 

137 """Metric field names — use as graph legend labels aligned to series_data.""" 

138 frame = self.df_for(metric_type) 

139 if frame.empty: 

140 return [] 

141 return [c for c in frame.columns if c not in _BASE_COLS] 

142 

143 def date_labels_for(self, metric_type: type[StatMetric]) -> list[str]: 

144 """Formatted date labels aligned to dates_for(metric_type). Use as x-axis values.""" 

145 return [TimeUtil.formatted_short(dt) for dt in self.dates_for(metric_type)] 

146 

147 @property 

148 def all_date_labels(self) -> list[str]: 

149 # NOTE: this returns all date labels 

150 # but may not be consistent for a particular metric type 

151 # e.g. if CountMetric has 12/4,13/4 and SuccesMetric has 14/5,15/5 

152 # all labels will be 12/4,13/4,14/4,15/4 

153 return [TimeUtil.formatted_short(dt) for dt in self.dates] 

154 

155 def source_data(self, metric_type: type[StatMetric], source: StatSource) -> LabelledSeriesType: 

156 """Labelled series for a single source and metric type, with None for missing dates.""" 

157 frame = self.df_for(metric_type) 

158 if frame.empty: 

159 return [] 

160 

161 all_dates = self.dates_for(metric_type) 

162 metric_cols = [c for c in frame.columns if c not in _BASE_COLS] 

163 src_val = source.value 

164 src_frame = frame[frame[_SOURCE_COL] == src_val].set_index(_DATE_COL) 

165 return [ 

166 ( 

167 col, 

168 [ 

169 ( 

170 float(src_frame.loc[pd.Timestamp(dt), col]) # type: ignore[arg-type] 

171 if dt in src_frame.index 

172 else None 

173 ) 

174 for dt in all_dates 

175 ], 

176 ) 

177 for col in metric_cols 

178 ] 

179 

180 @override 

181 def table_data(self) -> ReportListType: 

182 return self.df.to_numpy().tolist() 

183 

184 @override 

185 def plot_info(self) -> list[TimeSeriesPlotInfo]: 

186 """ 

187 One TimeSeriesPlotInfo per (source, metric_type). 

188 

189 Each plot has one series per metric field (success_ct / failed_ct / … or succeeded / duration). 

190 """ 

191 result: list[TimeSeriesPlotInfo] = [] 

192 metrics = self.metric_types 

193 for m_type in metrics: 

194 src_data = self._plot_data(m_type) 

195 date_labels = self.date_labels_for(m_type) 

196 legend: list[str] = [] 

197 plot_data: AnalysisDataType = [] 

198 plot_label = m_type.__name__ 

199 

200 for source, labelled_series in src_data.items(): 

201 legend.extend( 

202 f"{source.value.upper()} - {col_label}" for col_label, _ in labelled_series 

203 ) 

204 plot_data.extend([values for _, values in labelled_series]) 

205 

206 result.append( 

207 TimeSeriesPlotInfo( 

208 label=plot_label, 

209 x_title="Date", 

210 y_title="Count" if "Count" in plot_label else "Value", 

211 x_labels=date_labels, 

212 legend_labels=legend, 

213 data=plot_data, 

214 ) 

215 ) 

216 return result 

217 

218 def _plot_data(self, metric_type: type[StatMetric]) -> dict[StatSource, LabelledSeriesType]: 

219 """ 

220 Per-source labelled series aligned to the full date range for the metric type. 

221 

222 Each source maps to a list of (label, values) pairs — one per metric field — 

223 with None where the source has no data for a date. 

224 """ 

225 frame = self.df_for(metric_type) 

226 if frame.empty: 

227 return {} 

228 all_dates = self.dates_for(metric_type) 

229 metric_cols = [c for c in frame.columns if c not in _BASE_COLS] 

230 src_by_val = {s.value: s for s in self._sources} 

231 

232 result: dict[StatSource, LabelledSeriesType] = {} 

233 for src_val in sorted(frame[_SOURCE_COL].unique()): 

234 src_frame = frame[frame[_SOURCE_COL] == src_val].set_index(_DATE_COL) 

235 result[src_by_val[src_val]] = [ 

236 ( 

237 col, 

238 [ 

239 ( 

240 float(src_frame.loc[pd.Timestamp(dt), col]) # type: ignore[arg-type] 

241 if dt in src_frame.index 

242 else None 

243 ) 

244 for dt in all_dates 

245 ], 

246 ) 

247 for col in metric_cols 

248 ] 

249 return result 

250 

251 @property 

252 @override 

253 def debug_str(self) -> str: 

254 metric_types = self.metric_types 

255 debug_str = ( 

256 f"All Dates : {self.all_date_labels}\n" 

257 f"Sources : {[s.value for s in self.sources]}\n" 

258 f"Metric Types : {[t.__name__ for t in metric_types]}\n" 

259 f"Headers : {self.headers}\n" 

260 ) 

261 

262 for metric_type in metric_types: 

263 frame = self.df_for(metric_type) 

264 date_labels = self.date_labels_for(metric_type) 

265 plot_data = self._plot_data(metric_type) 

266 

267 msg = ( 

268 f"{metric_type.__name__}\n" 

269 f"\tRows : {len(frame)} rows\n" 

270 f"\tDate Labels : {date_labels}\n" 

271 f"\tColumns : {frame.columns.tolist()}\n" 

272 ) 

273 

274 for source, labelled_series in plot_data.items(): 

275 src_name = source.value 

276 for col_label, values in labelled_series: 

277 msg += f"\tSource {src_name}, Column {col_label}:\n\t\t{values}\n" 

278 

279 debug_str += msg 

280 

281 return debug_str