Coverage for functions \ flipdare \ analysis \ data \ time_series_metric_data.py: 78%
139 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-05-08 12:22 +1000
« prev ^ index » next coverage.py v7.13.0, created at 2026-05-08 12:22 +1000
1#!/usr/bin/env python
2# Copyright (c) 2026 Flipdare Pty Ltd. All rights reserved.
3#
4# This file is part of Flipdare's proprietary software and contains
5# confidential and copyrighted material. Unauthorised copying,
6# modification, distribution, or use of this file is strictly
7# prohibited without prior written permission from Flipdare Pty Ltd.
8#
9# This software includes third-party components licensed under MIT,
10# BSD, and Apache 2.0 licences. See THIRD_PARTY_NOTICES for details.
11#
13from __future__ import annotations
15from typing import Any, override
16from dataclasses import dataclass, field
17from datetime import datetime
18from flipdare.analysis.data._time_series_protocol import TimeSeriesProtocol, TimeSeriesPlotInfo
19from flipdare.app_types import (
20 AnalysisDataType,
21 LabelledSeriesType,
22 StatSource,
23 ReportListType,
24 StatMetric,
25)
26from flipdare.generated.model.backend.metric.count_metric import CountMetric
27from flipdare.util.time_util import TimeUtil
28import pandas as pd
30__all__ = ["TimeSeriesMetricData"]
32_DATE_COL = "Date"
33_SOURCE_COL = "Source"
34_BASE_COLS: frozenset[str] = frozenset({_DATE_COL, _SOURCE_COL})
37def _to_row(dt: datetime, source: StatSource, metric: StatMetric) -> dict[str, Any]:
38 base: dict[str, Any] = {_DATE_COL: dt, _SOURCE_COL: source.value}
39 if isinstance(metric, CountMetric):
40 return base | {
41 "Success Ct": float(metric.success_ct),
42 "Failed Ct": float(metric.failed_ct),
43 "Skipped Ct": float(metric.skipped_ct),
44 "Duration": float(metric.duration),
45 }
46 # SuccessMetric
47 return base | {
48 "Succeeded": float(1 if metric.succeeded else 0),
49 "Duration": float(metric.duration),
50 }
53@dataclass
54class TimeSeriesMetricData(TimeSeriesProtocol):
55 error_ct: int = 0
56 _rows: dict[type[StatMetric], list[dict[str, Any]]] = field(
57 default_factory=dict,
58 init=False,
59 repr=False,
60 )
61 _sources: set[StatSource] = field(
62 default_factory=set,
63 init=False,
64 repr=False,
65 )
67 @property
68 @override
69 def has_data(self) -> bool:
70 return bool(self._rows)
72 @property
73 @override
74 def count(self) -> int:
75 return sum(len(rows) for rows in self._rows.values())
77 @property
78 @override
79 def dates(self) -> list[datetime]:
80 return sorted({row[_DATE_COL] for rows in self._rows.values() for row in rows})
82 @property
83 @override
84 def headers(self) -> list[str]:
85 return self.df.columns.tolist()
87 @property
88 def metric_types(self) -> list[type[StatMetric]]:
89 return list(self._rows.keys())
91 @property
92 def sources(self) -> list[StatSource]:
93 """Unique sources sorted by value."""
94 return sorted(self._sources, key=lambda s: s.value)
96 @property
97 def df(self) -> pd.DataFrame:
98 """Merged DataFrame across all metric types, sorted by date. Use for report tables."""
99 frames = [self.df_for(m_type) for m_type in self._rows]
100 if not frames:
101 return pd.DataFrame(columns=[_DATE_COL, _SOURCE_COL])
102 return pd.concat(frames, ignore_index=True).sort_values(_DATE_COL).reset_index(drop=True)
104 def add_metric(self, dt: datetime, source: StatSource, metric: StatMetric) -> None:
105 """Add a metric data point for a given datetime and source."""
106 m_type = type(metric)
107 if m_type not in self._rows:
108 self._rows[m_type] = []
110 self._sources.add(source)
111 self._rows[m_type].append(_to_row(dt, source, metric))
113 def merge(self, other: TimeSeriesMetricData) -> None:
114 """Merge another TimeSeriesMetricData into this one."""
115 for m_type, rows in other._rows.items():
116 if m_type not in self._rows:
117 self._rows[m_type] = []
118 self._rows[m_type].extend(rows)
119 self._sources.update(other._sources)
120 self.error_ct += other.error_ct
122 def df_for(self, metric_type: type[StatMetric]) -> pd.DataFrame:
123 """DataFrame for a single metric type, sorted by date."""
124 rows = self._rows.get(metric_type, [])
125 if not rows:
126 return pd.DataFrame()
127 return pd.DataFrame(rows).sort_values(_DATE_COL).reset_index(drop=True)
129 def dates_for(self, metric_type: type[StatMetric]) -> list[datetime]:
130 """Sorted unique dates for a single metric type."""
131 frame = self.df_for(metric_type)
132 if frame.empty:
133 return []
134 return sorted(frame[_DATE_COL].unique().tolist())
136 def series_labels(self, metric_type: type[StatMetric]) -> list[str]:
137 """Metric field names — use as graph legend labels aligned to series_data."""
138 frame = self.df_for(metric_type)
139 if frame.empty:
140 return []
141 return [c for c in frame.columns if c not in _BASE_COLS]
143 def date_labels_for(self, metric_type: type[StatMetric]) -> list[str]:
144 """Formatted date labels aligned to dates_for(metric_type). Use as x-axis values."""
145 return [TimeUtil.formatted_short(dt) for dt in self.dates_for(metric_type)]
147 @property
148 def all_date_labels(self) -> list[str]:
149 # NOTE: this returns all date labels
150 # but may not be consistent for a particular metric type
151 # e.g. if CountMetric has 12/4,13/4 and SuccesMetric has 14/5,15/5
152 # all labels will be 12/4,13/4,14/4,15/4
153 return [TimeUtil.formatted_short(dt) for dt in self.dates]
155 def source_data(self, metric_type: type[StatMetric], source: StatSource) -> LabelledSeriesType:
156 """Labelled series for a single source and metric type, with None for missing dates."""
157 frame = self.df_for(metric_type)
158 if frame.empty:
159 return []
161 all_dates = self.dates_for(metric_type)
162 metric_cols = [c for c in frame.columns if c not in _BASE_COLS]
163 src_val = source.value
164 src_frame = frame[frame[_SOURCE_COL] == src_val].set_index(_DATE_COL)
165 return [
166 (
167 col,
168 [
169 (
170 float(src_frame.loc[pd.Timestamp(dt), col]) # type: ignore[arg-type]
171 if dt in src_frame.index
172 else None
173 )
174 for dt in all_dates
175 ],
176 )
177 for col in metric_cols
178 ]
180 @override
181 def table_data(self) -> ReportListType:
182 return self.df.to_numpy().tolist()
184 @override
185 def plot_info(self) -> list[TimeSeriesPlotInfo]:
186 """
187 One TimeSeriesPlotInfo per (source, metric_type).
189 Each plot has one series per metric field (success_ct / failed_ct / … or succeeded / duration).
190 """
191 result: list[TimeSeriesPlotInfo] = []
192 metrics = self.metric_types
193 for m_type in metrics:
194 src_data = self._plot_data(m_type)
195 date_labels = self.date_labels_for(m_type)
196 legend: list[str] = []
197 plot_data: AnalysisDataType = []
198 plot_label = m_type.__name__
200 for source, labelled_series in src_data.items():
201 legend.extend(
202 f"{source.value.upper()} - {col_label}" for col_label, _ in labelled_series
203 )
204 plot_data.extend([values for _, values in labelled_series])
206 result.append(
207 TimeSeriesPlotInfo(
208 label=plot_label,
209 x_title="Date",
210 y_title="Count" if "Count" in plot_label else "Value",
211 x_labels=date_labels,
212 legend_labels=legend,
213 data=plot_data,
214 )
215 )
216 return result
218 def _plot_data(self, metric_type: type[StatMetric]) -> dict[StatSource, LabelledSeriesType]:
219 """
220 Per-source labelled series aligned to the full date range for the metric type.
222 Each source maps to a list of (label, values) pairs — one per metric field —
223 with None where the source has no data for a date.
224 """
225 frame = self.df_for(metric_type)
226 if frame.empty:
227 return {}
228 all_dates = self.dates_for(metric_type)
229 metric_cols = [c for c in frame.columns if c not in _BASE_COLS]
230 src_by_val = {s.value: s for s in self._sources}
232 result: dict[StatSource, LabelledSeriesType] = {}
233 for src_val in sorted(frame[_SOURCE_COL].unique()):
234 src_frame = frame[frame[_SOURCE_COL] == src_val].set_index(_DATE_COL)
235 result[src_by_val[src_val]] = [
236 (
237 col,
238 [
239 (
240 float(src_frame.loc[pd.Timestamp(dt), col]) # type: ignore[arg-type]
241 if dt in src_frame.index
242 else None
243 )
244 for dt in all_dates
245 ],
246 )
247 for col in metric_cols
248 ]
249 return result
251 @property
252 @override
253 def debug_str(self) -> str:
254 metric_types = self.metric_types
255 debug_str = (
256 f"All Dates : {self.all_date_labels}\n"
257 f"Sources : {[s.value for s in self.sources]}\n"
258 f"Metric Types : {[t.__name__ for t in metric_types]}\n"
259 f"Headers : {self.headers}\n"
260 )
262 for metric_type in metric_types:
263 frame = self.df_for(metric_type)
264 date_labels = self.date_labels_for(metric_type)
265 plot_data = self._plot_data(metric_type)
267 msg = (
268 f"{metric_type.__name__}\n"
269 f"\tRows : {len(frame)} rows\n"
270 f"\tDate Labels : {date_labels}\n"
271 f"\tColumns : {frame.columns.tolist()}\n"
272 )
274 for source, labelled_series in plot_data.items():
275 src_name = source.value
276 for col_label, values in labelled_series:
277 msg += f"\tSource {src_name}, Column {col_label}:\n\t\t{values}\n"
279 debug_str += msg
281 return debug_str