Coverage for functions \ flipdare \ task \ report \ stats_reporter.py: 61%

44 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-05-08 12:22 +1000

1#!/usr/bin/env python 

2# Copyright (c) 2026 Flipdare Pty Ltd. All rights reserved. 

3# 

4# This file is part of Flipdare's proprietary software and contains 

5# confidential and copyrighted material. Unauthorised copying, 

6# modification, distribution, or use of this file is strictly 

7# prohibited without prior written permission from Flipdare Pty Ltd. 

8# 

9# This software includes third-party components licensed under MIT, 

10# BSD, and Apache 2.0 licences. See THIRD_PARTY_NOTICES for details. 

11# 

12 

13from __future__ import annotations 

14 

15from typing import TYPE_CHECKING 

16from collections.abc import Callable 

17from flipdare.service._service_provider import ServiceProvider 

18from flipdare.task.report.core.time_series_report import ( 

19 TimeSeriesReport, 

20 TimeSeriesReportType, 

21) 

22from flipdare.analysis.data._time_series_protocol import TimeSeriesProtocol 

23from flipdare.analysis.data.nested.time_series_collection_data import TimeSeriesCollectionData 

24from flipdare.analysis.plot.time_series_plotter import TimeSeriesPlotter 

25from flipdare.app_log import LOG 

26from flipdare.constants import ( 

27 IS_DEBUG, 

28 REPORT_CHARGE_STATS_DAYS, 

29 REPORT_ERROR_STATS_DAYS, 

30 REPORT_JOB_STATS_DAYS, 

31 RPORT_LOG_STATS_DAYS, 

32) 

33from flipdare.result.output_result import OutputResult 

34from flipdare.generated.shared.backend.app_job_type import AppJobType 

35from flipdare.generated.shared.firestore_collections import FirestoreCollections 

36 

37if TYPE_CHECKING: 

38 from flipdare.manager.db_manager import DbManager 

39 from flipdare.manager.backend_manager import BackendManager 

40 

41__all__ = ["StatsReporter"] 

42 

43 

44class StatsReporter(ServiceProvider): 

45 def __init__( 

46 self, 

47 db_manager: DbManager, 

48 backend_manager: BackendManager, 

49 ) -> None: 

50 super().__init__( 

51 db_manager=db_manager, 

52 backend_manager=backend_manager, 

53 ) 

54 

55 def payment_stats(self) -> OutputResult: 

56 return self._run_report( 

57 job_type=AppJobType.REPORT_PAYMENT_STATS, 

58 query_fn=lambda: self.pledge_db.get_stats(REPORT_CHARGE_STATS_DAYS), 

59 x_label="Date", 

60 y_label="Count", 

61 ) 

62 

63 def job_stats(self) -> OutputResult: 

64 return self._run_report( 

65 job_type=AppJobType.REPORT_JOB_TYPE_STATS, 

66 query_fn=lambda: self.stat_db.get_stats(REPORT_JOB_STATS_DAYS), 

67 x_label="Date", 

68 y_label="Count", 

69 ) 

70 

71 def log_stats(self) -> OutputResult: 

72 return self._run_report( 

73 job_type=AppJobType.REPORT_LOG_STATS, 

74 query_fn=lambda: self.log_db.get_log_stats(RPORT_LOG_STATS_DAYS), 

75 x_label="Date", 

76 y_label="Count", 

77 ) 

78 

79 def error_stats(self) -> OutputResult: 

80 return self._run_report( 

81 job_type=AppJobType.REPORT_ERROR_STATS, 

82 query_fn=lambda: self._error_stats_query(REPORT_ERROR_STATS_DAYS), 

83 x_label="Date", 

84 y_label="Count", 

85 ) 

86 

87 def _run_report( 

88 self, 

89 job_type: TimeSeriesReportType, 

90 query_fn: Callable[..., TimeSeriesProtocol], 

91 x_label: str, 

92 y_label: str, 

93 ) -> OutputResult: 

94 LOG().info(f"Running {job_type.label} report : {job_type.description}") 

95 

96 report = TimeSeriesReport( 

97 job_type=job_type, 

98 query_fn=query_fn, 

99 plot_fn=lambda data: TimeSeriesPlotter( 

100 plot_title=job_type.short_description, 

101 x_label=x_label, 

102 y_label=y_label, 

103 data=data, 

104 ), 

105 app_logger=self.app_logger, 

106 mailer=self.admin_mailer, 

107 ) 

108 return report.create_and_send() 

109 

110 def _error_stats_query(self, days: int) -> TimeSeriesCollectionData: 

111 agg_stats = TimeSeriesCollectionData() 

112 

113 for collection in FirestoreCollections: 

114 if IS_DEBUG: 

115 LOG().debug(f"Getting error stats for collection {collection.value}") 

116 

117 try: 

118 db = self.db_manager.get_db_for_collection(collection=collection) 

119 if db is None: 

120 LOG().warning(f"No database found for collection {collection.value}") 

121 continue 

122 stats = db.get_collection_stats(days=days) 

123 agg_stats.merge(stats) 

124 except Exception as ex: 

125 LOG().error(f"Error getting error stats for collection {collection.value}: {ex}") 

126 

127 return agg_stats