Coverage for functions \ flipdare \ firestore \ user_summary_db.py: 53%

100 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-05-08 12:22 +1000

1#!/usr/bin/env python 

2# Copyright (c) 2026 Flipdare Pty Ltd. All rights reserved. 

3# 

4# This file is part of Flipdare's proprietary software and contains 

5# confidential and copyrighted material. Unauthorised copying, 

6# modification, distribution, or use of this file is strictly 

7# prohibited without prior written permission from Flipdare Pty Ltd. 

8# 

9# This software includes third-party components licensed under MIT, 

10# BSD, and Apache 2.0 licences. See THIRD_PARTY_NOTICES for details. 

11# 

12 

13 

14from dataclasses import dataclass 

15 

16from google.cloud.firestore import Client as FirestoreClient 

17 

18from flipdare.app_log import LOG 

19from flipdare.constants import IS_DEBUG, NO_DOC_ID 

20from flipdare.firestore._app_sub_db import AppSubDb 

21from flipdare.firestore.core.db_query import DbQuery, FieldOp, WhereField 

22from flipdare.generated import AppErrorCode, AppJobType, SystemLogType, UserSummaryKeys 

23from flipdare.generated.model.backend.user_summary_entry_model import UserSummaryEntryModel 

24from flipdare.generated.model.backend.user_summary_model import ( 

25 UserSummaryInternalKeys, 

26 UserSummaryModel, 

27) 

28from flipdare.util import FirestoreTime, TimeUtil 

29from flipdare.generated.shared.firestore_collections import FirestoreCollections 

30from flipdare.wrapper import ( 

31 UserSummaryEntryWrapper, 

32 UserSummaryWrapper, 

33) 

34 

35__all__ = ["UserSummaryDb"] 

36 

37_SUMMARY: str = FirestoreCollections.USER_SUMMARY.value 

38 

39_K = UserSummaryKeys 

40_I = UserSummaryInternalKeys 

41 

42 

43@dataclass 

44class SummaryResult: 

45 summary: UserSummaryWrapper 

46 entry: UserSummaryEntryWrapper 

47 

48 

49class UserSummaryDb(AppSubDb[UserSummaryWrapper, UserSummaryModel]): 

50 

51 def __init__(self, client: FirestoreClient) -> None: 

52 super().__init__( 

53 client=client, 

54 collection_name=FirestoreCollections.USER_SUMMARY, 

55 model_class=UserSummaryModel, 

56 wrapper_class=UserSummaryWrapper, 

57 sub_collection_name=FirestoreCollections.USER_SUMMARY_ENTRY, 

58 ) 

59 

60 self.entries = AppSubDb[UserSummaryEntryWrapper, UserSummaryEntryModel]( 

61 client=client, 

62 collection_name=FirestoreCollections.USER_SUMMARY, 

63 model_class=UserSummaryEntryModel, 

64 wrapper_class=UserSummaryEntryWrapper, 

65 sub_collection_name=FirestoreCollections.USER_SUMMARY_ENTRY, 

66 ) 

67 

68 def get_user_report(self, user_id: str, is_sent: bool = False) -> UserSummaryWrapper | None: 

69 query = DbQuery.and_( 

70 where_fields=[ 

71 WhereField[_K](_K.UID, FieldOp.EQUAL, user_id), 

72 WhereField[_I](_I.SUMMARY_SENT, FieldOp.EQUAL, is_sent), 

73 WhereField( 

74 _K.CREATED_AT, 

75 FieldOp.GREATER_THAN_OR_EQUAL, 

76 TimeUtil.get_start_of_day_utc(), 

77 ), 

78 ], 

79 limit=1, 

80 ) 

81 

82 results = query.get_query(self.client, _SUMMARY).get() 

83 if not results or len(results) == 0: 

84 return None 

85 

86 return self._cvt_snap_to_model(results[0]) 

87 

88 def get_user_reports(self) -> list[UserSummaryWrapper]: 

89 query = DbQuery.and_( 

90 where_fields=[ 

91 WhereField[_I](_I.SUMMARY_SENT, FieldOp.EQUAL, False), 

92 WhereField[_K]( 

93 _K.CREATED_AT, 

94 FieldOp.GREATER_THAN_OR_EQUAL, 

95 TimeUtil.get_start_of_day_utc(), 

96 ), 

97 ], 

98 ) 

99 

100 results = query.get_query(self.client, _SUMMARY).get() 

101 

102 if not results or len(results) == 0: 

103 return [] 

104 

105 return self._process_results(results) 

106 

107 def get_report_entries(self, parent_id: str) -> list[UserSummaryEntryWrapper]: 

108 return self.entries.get_all_sub(parent_id=parent_id) 

109 

110 def mark_sent(self, user_ids: list[str]) -> dict[str, UserSummaryWrapper]: 

111 if not user_ids: 

112 return {} 

113 

114 query = DbQuery.and_( 

115 where_fields=[ 

116 WhereField[_K](_K.UID, FieldOp.IN, user_ids), 

117 WhereField[_I](_I.SUMMARY_SENT, FieldOp.EQUAL, False), 

118 WhereField[_I]( 

119 _I.CREATED_AT, 

120 FieldOp.GREATER_THAN_OR_EQUAL, 

121 TimeUtil.get_start_of_day_utc(), 

122 ), 

123 ], 

124 ) 

125 

126 results = query.get_query(self.client, _SUMMARY).get() 

127 

128 if not results or len(results) == 0: 

129 return {} 

130 

131 models = self._process_results(results) 

132 return {model.doc_id: model for model in models} 

133 

134 def create_report_entry( 

135 self, 

136 user_id: str, 

137 entry: UserSummaryEntryModel, 

138 ) -> SummaryResult | None: 

139 from flipdare.services import get_app_logger 

140 

141 existing_report = self.get_user_report(user_id) 

142 if existing_report is None: 

143 return self._create_new_report(user_id, entry) 

144 

145 existing_start = FirestoreTime.from_firestore(existing_report.created_at_db) 

146 start_of_day = TimeUtil.get_start_of_day_utc() 

147 if existing_start is None or existing_start < start_of_day: 

148 if existing_start is None: 

149 msg = f"Existing report for user_id: {user_id} has no creation time" 

150 LOG().warning(msg) 

151 get_app_logger().system_error( 

152 job_type=AppJobType.CR_USER_DAILY_SUMMARY, 

153 log_type=SystemLogType.WARNING, 

154 error_code=AppErrorCode.INVALID_DATA, 

155 message=msg, 

156 doc_id=existing_report.doc_id or NO_DOC_ID, 

157 data=existing_report.to_dict(), 

158 ) 

159 

160 if IS_DEBUG: 

161 LOG().debug( 

162 f"Existing report for user_id: {user_id} is from previous day " 

163 f"({existing_start}), creating new report.", 

164 ) 

165 return self._create_new_report(user_id, entry) 

166 

167 if IS_DEBUG: 

168 LOG().debug( 

169 f"Existing report found for user_id: {user_id} " 

170 f"({existing_report.report_count}), updating report count.", 

171 ) 

172 return self._update_report(existing_report, entry) 

173 

174 def _update_report( 

175 self, 

176 existing_report: UserSummaryWrapper, 

177 entry: UserSummaryEntryModel, 

178 ) -> SummaryResult | None: 

179 updated_model = self._update_entry_count(existing_report, existing_report.report_count + 1) 

180 if updated_model is None: 

181 return None 

182 

183 user_id = existing_report.doc_id 

184 parent_id = updated_model.doc_id 

185 entry_model: UserSummaryEntryWrapper | None = None 

186 try: 

187 entry_model = self.entries.create_sub(parent_id=parent_id, data=entry) 

188 except Exception as e: 

189 LOG().error(f"Failed to add report entry for user_id: {user_id}: {e}") 

190 return None 

191 

192 if IS_DEBUG: 

193 LOG().debug(f"Updated report count and added entry for user_id: {user_id}.") 

194 

195 return SummaryResult(summary=updated_model, entry=entry_model) 

196 

197 def _update_entry_count( 

198 self, 

199 existing_report: UserSummaryWrapper, 

200 new_count: int, 

201 ) -> UserSummaryWrapper | None: 

202 user_id = existing_report.doc_id 

203 existing_report.report_count = new_count 

204 updated_model = self.update_model(existing_report) 

205 if updated_model is None: 

206 LOG().error(f"Failed to update report count for user_id: {user_id}.") 

207 return None 

208 LOG().debug(f"Updated report count to {new_count} for user_id: {user_id}.") 

209 return updated_model 

210 

211 def _create_new_report( 

212 self, 

213 user_id: str, 

214 entry: UserSummaryEntryModel, 

215 ) -> SummaryResult | None: 

216 

217 try: 

218 summary = UserSummaryModel( 

219 id=None, 

220 uid=user_id, 

221 report_count=1, 

222 ) 

223 summary_model = self.create(summary) 

224 

225 parent_id = summary_model.doc_id 

226 entry_model = self.entries.create_sub(parent_id=parent_id, data=entry) 

227 LOG().debug(f"Created new user report and added entry for user_id: {user_id}.") 

228 return SummaryResult(summary=summary_model, entry=entry_model) 

229 except Exception as e: 

230 LOG().error(f"Failed to create new report for user_id: {user_id}: {e}") 

231 return None