Coverage for functions \ flipdare \ firestore \ group_db.py: 56%

156 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-05-08 12:22 +1000

1#!/usr/bin/env python 

2# Copyright (c) 2026 Flipdare Pty Ltd. All rights reserved. 

3# 

4# This file is part of Flipdare's proprietary software and contains 

5# confidential and copyrighted material. Unauthorised copying, 

6# modification, distribution, or use of this file is strictly 

7# prohibited without prior written permission from Flipdare Pty Ltd. 

8# 

9# This software includes third-party components licensed under MIT, 

10# BSD, and Apache 2.0 licences. See THIRD_PARTY_NOTICES for details. 

11# 

12 

13 

14from google.cloud.firestore import And 

15from google.cloud.firestore import Client as FirestoreClient 

16from google.cloud.firestore_v1.base_document import DocumentSnapshot 

17from google.cloud.firestore_v1.stream_generator import StreamGenerator 

18 

19from flipdare.app_log import LOG 

20from flipdare.error.app_error import AppErrorCode, DatabaseError 

21from flipdare.firestore import DbQuery, FieldOp, WhereField 

22from flipdare.firestore._app_db import AppDb 

23from flipdare.firestore._app_sub_db import AppSubDb 

24from flipdare.generated import AppJobType, GroupMemberKeys 

25from flipdare.generated.model.group_member_model import GroupMemberInternalKeys, GroupMemberModel 

26from flipdare.generated.model.group_model import GroupInternalKeys, GroupKeys, GroupModel 

27from flipdare.generated.shared.model.user.request_status import RequestStatus 

28from flipdare.util import TimeUtil 

29from flipdare.generated.shared.firestore_collections import FirestoreCollections 

30from flipdare.wrapper import ( 

31 GroupMemberWrapper, 

32 GroupWrapper, 

33 UserWrapper, 

34) 

35 

36_GROUP: str = FirestoreCollections.GROUP.value 

37_GROUP_MEMBER: str = FirestoreCollections.GROUP_MEMBER.value 

38 

39__all__ = ["GroupDb"] 

40 

41 

42_GRP_K = GroupKeys 

43_GRP_I = GroupInternalKeys 

44_MEM_I = GroupMemberInternalKeys 

45_MEM_K = GroupMemberKeys 

46 

47_OP = FieldOp 

48 

49 

50class GroupDb(AppDb[GroupWrapper, GroupModel]): 

51 """Class for managing group-related database operations.""" 

52 

53 def __init__(self, client: FirestoreClient) -> None: 

54 super().__init__( 

55 client=client, 

56 collection_name=FirestoreCollections.GROUP, 

57 model_class=GroupModel, 

58 wrapper_class=GroupWrapper, 

59 ) 

60 

61 self.members = AppSubDb[GroupMemberWrapper, GroupMemberModel]( 

62 client=client, 

63 collection_name=FirestoreCollections.GROUP, 

64 sub_collection_name=FirestoreCollections.GROUP_MEMBER, 

65 wrapper_class=GroupMemberWrapper, 

66 model_class=GroupMemberModel, 

67 ) 

68 

69 def groups(self, user_id: str) -> list[str] | None: 

70 """Get all group IDs for a user from Firestore.""" 

71 LOG().debug(f"Getting groups for user: {user_id}") 

72 try: 

73 query = DbQuery.where(WhereField("uid", FieldOp.EQUAL, user_id)) 

74 results = query.get_query(self.client, _GROUP).get() 

75 

76 groups = [doc.id for doc in results] 

77 return groups if len(groups) > 0 else None 

78 except Exception as error: 

79 LOG().error(f"Error searching for groups for user {user_id}: {error}") 

80 return None 

81 

82 def create_member( 

83 self, 

84 group_id: str, 

85 member_data: GroupMemberModel, 

86 ) -> GroupMemberWrapper: 

87 """Add a new member to a group in Firestore.""" 

88 return self.members.create_sub(group_id, member_data) 

89 

90 def update_member( 

91 self, 

92 group_id: str, 

93 member_id: str, 

94 member_data: GroupMemberWrapper, 

95 ) -> GroupMemberWrapper | None: 

96 """Update a member of a group in Firestore.""" 

97 return self.members.update_sub(group_id, member_id, member_data.get_updates()) 

98 

99 def get_member(self, group_id: str, member_id: str) -> GroupMemberWrapper | None: 

100 """Get a specific member of a group from Firestore.""" 

101 return self.members.get_sub(group_id, member_id) 

102 

103 def get_members(self, group_id: str, limit: int | None = None) -> list[GroupMemberWrapper]: 

104 """Get all members for a group from Firestore.""" 

105 LOG().debug(f"Getting members for group: {group_id}") 

106 try: 

107 return self.members.get_all_sub(group_id, limit=limit) 

108 except Exception as error: 

109 LOG().error(f"Error searching for members for group {group_id}: {error}") 

110 return [] 

111 

112 def get_users(self, group_id: str, limit: int | None = None) -> list[UserWrapper]: 

113 """Get all user IDs for members of a group from Firestore.""" 

114 from flipdare.services import get_user_db 

115 

116 user_db = get_user_db() 

117 

118 LOG().debug(f"Getting users for group: {group_id}") 

119 users: list[UserWrapper] = [] 

120 

121 try: 

122 members = self.members.get_all_sub(group_id, limit=limit) 

123 for member in members: 

124 user = user_db.get(member.uid) 

125 if user is not None: 

126 users.append(user) 

127 return users 

128 except Exception as error: 

129 LOG().error(f"Error searching for users for group {group_id}: {error}") 

130 return [] 

131 

132 def get_recent_group_unprocessed(self, hours: int | None = None) -> list[GroupWrapper]: 

133 job_type = AppJobType.CR_GROUP_UNPROCESSED 

134 if hours is None: 

135 hours = self.def_window_hours 

136 

137 hours_ago = TimeUtil.get_utc_time_hours_ago(hours) 

138 msg = ( 

139 f"Getting unprocessed content within the last {TimeUtil.formatted_dt(hours_ago)} hours" 

140 ) 

141 LOG().debug(msg) 

142 

143 query_stream: StreamGenerator[DocumentSnapshot] | None = None 

144 try: 

145 query = self.client.collection_group(_GROUP) 

146 query = query.where( 

147 filter=And( 

148 [ 

149 WhereField[_GRP_I](_GRP_I.PROCESSED, _OP.EQUAL, False).filter, 

150 WhereField[_GRP_K]( 

151 _GRP_K.UPDATED_AT, 

152 _OP.GREATER_THAN_OR_EQUAL, 

153 hours_ago, 

154 ).filter, 

155 ], 

156 ), 

157 ) 

158 query_stream = query.stream() 

159 

160 except Exception as e: 

161 msg = f"Failed to get groups with unprocessed status: {e}" 

162 raise DatabaseError( 

163 msg, 

164 error_code=AppErrorCode.DATABASE, 

165 collection_name=_GROUP, 

166 document_id=None, 

167 ) from e 

168 

169 groups = [] 

170 errors = [] 

171 for doc in query_stream: 

172 try: 

173 group = self._cvt_snap_to_model(doc) 

174 if group is None: 

175 errors.append(f"Could not convert group doc {doc.id} to model") 

176 continue 

177 

178 groups.append(group) 

179 LOG().info(f"added group {group.doc_id}") 

180 except Exception as e: 

181 errors.append(f"Error processing group doc {doc.id}: {e}") 

182 continue 

183 

184 if len(errors) > 0: 

185 error_summary = "\n".join(errors) 

186 LOG().error(f"Errors processing group:\n{error_summary}") 

187 self.log_creator.db_error( 

188 error_code=AppErrorCode.INVALID_DATA, 

189 job_type=job_type, 

190 collection=FirestoreCollections.GROUP, 

191 message=f"Errors processing group members:\n{error_summary}", 

192 ) 

193 

194 LOG().debug(f"Retrieved {len(groups)} groups that require processing.") 

195 return groups 

196 

197 def get_recent_member_unprocessed_status( 

198 self, 

199 hours: int | None = None, 

200 ) -> list[GroupMemberWrapper]: 

201 job_type = AppJobType.CR_GROUP_MEMBER_UNPROCESSED 

202 

203 if hours is None: 

204 hours = self.def_window_hours 

205 

206 hours_ago = TimeUtil.get_utc_time_hours_ago(hours) 

207 msg = ( 

208 f"Getting unprocessed content within the last {TimeUtil.formatted_dt(hours_ago)} hours" 

209 ) 

210 LOG().debug(msg) 

211 

212 query_stream: StreamGenerator[DocumentSnapshot] | None = None 

213 try: 

214 # need a specific query for sub-collections 

215 complete_status = RequestStatus.complete() 

216 complete_values = [status.value for status in complete_status] 

217 

218 query = self.client.collection_group(_GROUP_MEMBER) 

219 query = query.where( 

220 filter=And( 

221 [ 

222 WhereField[_MEM_I]( 

223 _MEM_I.REQUEST_NOTIFICATION_SENT, 

224 _OP.EQUAL, 

225 False, 

226 ).filter, 

227 WhereField[_MEM_K](_MEM_K.STATUS, _OP.IN, complete_values).filter, 

228 WhereField[_MEM_K]( 

229 _MEM_K.UPDATED_AT, 

230 _OP.GREATER_THAN_OR_EQUAL, 

231 hours_ago, 

232 ).filter, 

233 ], 

234 ), 

235 ) 

236 query_stream = query.stream() 

237 

238 except Exception as e: 

239 msg = f"Failed to get group members with unprocessed status: {e}" 

240 raise DatabaseError( 

241 msg, 

242 error_code=AppErrorCode.DATABASE, 

243 collection_name=_GROUP_MEMBER, 

244 document_id=None, 

245 ) from e 

246 

247 return self._process_member_stream(job_type, query_stream) 

248 

249 def get_recent_member_unprocessed( 

250 self, 

251 hours: int | None = None, 

252 ) -> list[GroupMemberWrapper]: 

253 job_type = AppJobType.CR_GROUP_MEMBER_UNPROCESSED 

254 

255 if hours is None: 

256 hours = self.def_window_hours 

257 

258 hours_ago = TimeUtil.get_utc_time_hours_ago(hours) 

259 msg = f"Getting recent group members within the last {TimeUtil.formatted_dt(hours_ago)} hours" 

260 LOG().debug(msg) 

261 

262 query_stream: StreamGenerator[DocumentSnapshot] | None = None 

263 try: 

264 # need a specific query for sub-collections 

265 query = self.client.collection_group(_GROUP_MEMBER) 

266 query = query.where( 

267 filter=And( 

268 [ 

269 WhereField[_MEM_I]( 

270 _MEM_I.REQUEST_NOTIFICATION_SENT, 

271 _OP.EQUAL, 

272 False, 

273 ).filter, 

274 WhereField[_MEM_K]( 

275 _MEM_K.UPDATED_AT, 

276 _OP.GREATER_THAN_OR_EQUAL, 

277 hours_ago, 

278 ).filter, 

279 ], 

280 ), 

281 ) 

282 query_stream = query.stream() 

283 

284 except Exception as e: 

285 msg = f"Failed to get group members with unprocessed status: {e}" 

286 raise DatabaseError( 

287 msg, 

288 error_code=AppErrorCode.DATABASE, 

289 collection_name=_GROUP_MEMBER, 

290 document_id=None, 

291 ) from e 

292 

293 return self._process_member_stream(job_type, query_stream) 

294 

295 def _process_member_stream( 

296 self, 

297 job_type: AppJobType, 

298 query_stream: StreamGenerator[DocumentSnapshot] | None, 

299 ) -> list[GroupMemberWrapper]: 

300 members = [] 

301 errors = [] 

302 if query_stream is None: 

303 return [] 

304 

305 for doc in query_stream: 

306 try: 

307 member = self.members._cvt_sub_snap_to_model(doc) 

308 if member is None: 

309 errors.append(f"Could not convert group member doc {doc.id} to model") 

310 continue 

311 

312 members.append(member) 

313 LOG().info(f"added member {member.doc_id} from group {member.gid}") 

314 except Exception as e: 

315 errors.append(f"Error processing group member doc {doc.id}: {e}") 

316 continue 

317 

318 if len(errors) > 0: 

319 error_summary = "\n".join(errors) 

320 LOG().error(f"Errors processing group members:\n{error_summary}") 

321 self.log_creator.db_error( 

322 error_code=AppErrorCode.INVALID_DATA, 

323 job_type=job_type, 

324 collection=FirestoreCollections.GROUP_MEMBER, 

325 message=f"Errors processing group members:\n{error_summary}", 

326 ) 

327 

328 LOG().debug(f"Retrieved {len(members)} group members waiting for admin review.") 

329 return members