Coverage for functions \ flipdare \ analysis \ data \ nested \ time_series_log_data.py: 100%

69 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-05-08 12:22 +1000

1#!/usr/bin/env python 

2# Copyright (c) 2026 Flipdare Pty Ltd. All rights reserved. 

3# 

4# This file is part of Flipdare's proprietary software and contains 

5# confidential and copyrighted material. Unauthorised copying, 

6# modification, distribution, or use of this file is strictly 

7# prohibited without prior written permission from Flipdare Pty Ltd. 

8# 

9# This software includes third-party components licensed under MIT, 

10# BSD, and Apache 2.0 licences. See THIRD_PARTY_NOTICES for details. 

11# 

12 

13from typing import override 

14from datetime import datetime 

15from collections import defaultdict 

16from dataclasses import dataclass, field 

17from flipdare.analysis.data._time_series_protocol import TimeSeriesPlotInfo 

18from flipdare.analysis.data.nested._time_series_nested_data import TimeSeriesNestedData 

19from flipdare.app_types import AnalysisArrayType, AnalysisDataType, ReportListType 

20from flipdare.generated.shared.app_log_category import AppLogCategory 

21from flipdare.generated.shared.backend.system_log_type import SystemLogType 

22from flipdare.util.time_util import TimeUtil 

23 

24 

25@dataclass(frozen=True, slots=True) 

26class LogStat: # type: ignore[misc] 

27 _stats: defaultdict[AppLogCategory, float] = field( 

28 default_factory=lambda: defaultdict(float), 

29 init=False, 

30 repr=False, 

31 ) 

32 

33 def accumulate(self, other: "LogStat") -> "LogStat": 

34 new_stat = LogStat() 

35 for category in set(self._stats.keys()) | set(other._stats.keys()): 

36 new_stat._stats[category] = self._stats.get(category, 0.0) + other._stats.get( 

37 category, 0.0 

38 ) 

39 return new_stat 

40 

41 

42_EMPTY_STAT = LogStat() 

43 

44 

45@dataclass 

46class TimeSeriesLogData(TimeSeriesNestedData[SystemLogType, LogStat]): 

47 

48 @property 

49 @override 

50 def headers(self) -> list[str]: 

51 return ["Date", "LogType", *[cat.value for cat in self.categories]] 

52 

53 @property 

54 def categories(self) -> list[AppLogCategory]: 

55 seen: set[AppLogCategory] = set() 

56 for log_type_data in self._rows.values(): 

57 for stat in log_type_data.values(): 

58 seen.update(stat._stats.keys()) 

59 return sorted(seen, key=lambda x: str(x)) 

60 

61 @property 

62 def log_types(self) -> list[SystemLogType]: 

63 seen: set[SystemLogType] = set() 

64 for log_type_data in self._rows.values(): 

65 seen.update(log_type_data.keys()) 

66 return sorted(seen, key=lambda x: str(x)) 

67 

68 def add( 

69 self, 

70 dt: datetime, 

71 log_type: SystemLogType, 

72 category: AppLogCategory, 

73 value: float, 

74 ) -> None: 

75 """Add (or accumulate) a value for a given datetime, log type, and error category.""" 

76 day = self._to_day(dt) 

77 existing_stat = self._rows[day].get(log_type, _EMPTY_STAT) 

78 new_stat = LogStat() 

79 new_stat._stats[category] = value 

80 self._rows[day][log_type] = existing_stat.accumulate(new_stat) 

81 

82 def get_category_values( 

83 self, 

84 log_type: SystemLogType, 

85 category: AppLogCategory, 

86 ) -> AnalysisArrayType: 

87 """Get a list of values for a given log type and category, aligned to self.dates.""" 

88 return [ 

89 self._rows[dt].get(log_type, _EMPTY_STAT)._stats.get(category, 0.0) 

90 for dt in self.dates 

91 ] 

92 

93 @override 

94 def table_data(self) -> ReportListType: 

95 # One row per (date, log_type) — category counts spread as columns matching headers. 

96 data: ReportListType = [] 

97 categories = self.categories 

98 for log_type in self.log_types: 

99 for dt in self.dates: 

100 log_type_data = self._rows[dt] 

101 stat = log_type_data.get(log_type, _EMPTY_STAT) 

102 row: list[object] = [TimeUtil.formatted_short(dt), str(log_type)] 

103 row.extend(stat._stats.get(cat, 0.0) for cat in categories) 

104 data.append(row) 

105 return data 

106 

107 @override 

108 def plot_info(self) -> list[TimeSeriesPlotInfo]: 

109 """One plot per log type present in data. Each series is one category, aligned to dates.""" 

110 dates = self.dates 

111 date_labels = [TimeUtil.formatted_short(dt) for dt in dates] 

112 categories = self.categories 

113 legend_labels = [str(cat) for cat in categories] 

114 

115 result: list[TimeSeriesPlotInfo] = [] 

116 for log_type in self.log_types: 

117 data: AnalysisDataType = [ 

118 [self._rows[dt].get(log_type, _EMPTY_STAT)._stats.get(cat, 0.0) for dt in dates] 

119 for cat in categories 

120 ] 

121 result.append( 

122 TimeSeriesPlotInfo( 

123 label=str(log_type), 

124 x_title="Date", 

125 y_title="Count", 

126 x_labels=date_labels, 

127 legend_labels=legend_labels, 

128 data=data, 

129 ) 

130 ) 

131 return result