Coverage for functions \ flipdare \ analysis \ data \ nested \ time_series_collection_data.py: 92%
53 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-05-08 12:22 +1000
« prev ^ index » next coverage.py v7.13.0, created at 2026-05-08 12:22 +1000
1#!/usr/bin/env python
2# Copyright (c) 2026 Flipdare Pty Ltd. All rights reserved.
3#
4# This file is part of Flipdare's proprietary software and contains
5# confidential and copyrighted material. Unauthorised copying,
6# modification, distribution, or use of this file is strictly
7# prohibited without prior written permission from Flipdare Pty Ltd.
8#
9# This software includes third-party components licensed under MIT,
10# BSD, and Apache 2.0 licences. See THIRD_PARTY_NOTICES for details.
11#
13from typing import override
14from datetime import datetime
15from dataclasses import dataclass
16from flipdare.analysis.data._time_series_protocol import TimeSeriesPlotInfo
17from flipdare.analysis.data.nested._time_series_nested_data import TimeSeriesNestedData
18from flipdare.app_types import AnalysisDataType, ReportListType
19from flipdare.generated.shared.firestore_collections import FirestoreCollections
20from flipdare.util.time_util import TimeUtil
22__all__ = ["CollectionStat", "TimeSeriesCollectionData"]
25@dataclass(frozen=True, slots=True)
26class CollectionStat: # type: ignore[misc]
27 total_ct: float
28 unprocessed_ct: float
29 processed_ct: float
30 error_ct: float
32 def accumulate(self, other: "CollectionStat") -> "CollectionStat":
33 return CollectionStat(
34 total_ct=self.total_ct + other.total_ct,
35 processed_ct=self.processed_ct + other.processed_ct,
36 error_ct=self.error_ct + other.error_ct,
37 unprocessed_ct=self.unprocessed_ct + other.unprocessed_ct,
38 )
41_EMPTY_STAT = CollectionStat(0.0, 0.0, 0.0, 0.0)
44@dataclass
45class TimeSeriesCollectionData(TimeSeriesNestedData[FirestoreCollections, CollectionStat]):
46 @property
47 @override
48 def headers(self) -> list[str]:
49 return [
50 "Date",
51 "Collection",
52 "TotalCount",
53 "ProcessedCount",
54 "UnprocessedCount",
55 "ErrorCount",
56 ]
58 def add(self, dt: datetime, collection: FirestoreCollections, stat: CollectionStat) -> None:
59 """Accumulate a CollectionStat for a given date and collection."""
60 day = self._to_day(dt)
61 existing = self._rows[day].get(collection, _EMPTY_STAT)
62 self._rows[day][collection] = existing.accumulate(stat)
64 def merge(self, other: "TimeSeriesCollectionData") -> None:
65 """Merge another TimeSeriesCollectionData into this one by accumulating stats."""
66 for dt, col_data in other._rows.items():
67 for collection, stat in col_data.items():
68 self.add(dt, collection, stat)
70 @override
71 def table_data(self) -> ReportListType:
72 result: ReportListType = []
73 for dt in self.dates:
74 date_str = TimeUtil.formatted_short(dt)
75 for collection, stat in sorted(self._rows[dt].items(), key=lambda x: str(x[0])):
76 unprocessed_ct = stat.total_ct - stat.processed_ct - stat.error_ct
77 result.append(
78 [
79 date_str,
80 str(collection),
81 stat.total_ct,
82 stat.processed_ct,
83 unprocessed_ct,
84 stat.error_ct,
85 ]
86 )
87 return result
89 @override
90 def plot_info(self) -> list[TimeSeriesPlotInfo]:
91 """One plot per collection with 3 series: total_ct, processed_ct, error_ct."""
92 dates = self.dates
93 date_labels = [TimeUtil.formatted_short(dt) for dt in dates]
94 collections: list[FirestoreCollections] = sorted(
95 {col for row in self._rows.values() for col in row},
96 key=str,
97 )
98 legend_labels = ["TotalCount", "ProcessedCount", "ErrorCount"]
99 result: list[TimeSeriesPlotInfo] = []
100 for collection in collections:
101 stats = [self._rows[dt].get(collection, _EMPTY_STAT) for dt in dates]
102 data: AnalysisDataType = [
103 [s.total_ct for s in stats],
104 [s.processed_ct for s in stats],
105 [s.error_ct for s in stats],
106 ]
107 result.append(
108 TimeSeriesPlotInfo(
109 label=str(collection),
110 x_title="Date",
111 y_title="Count",
112 x_labels=date_labels,
113 legend_labels=legend_labels,
114 data=data,
115 )
116 )
117 return result