Coverage for functions \ flipdare \ firestore \ group_db.py: 56%
156 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-05-08 12:22 +1000
« prev ^ index » next coverage.py v7.13.0, created at 2026-05-08 12:22 +1000
1#!/usr/bin/env python
2# Copyright (c) 2026 Flipdare Pty Ltd. All rights reserved.
3#
4# This file is part of Flipdare's proprietary software and contains
5# confidential and copyrighted material. Unauthorised copying,
6# modification, distribution, or use of this file is strictly
7# prohibited without prior written permission from Flipdare Pty Ltd.
8#
9# This software includes third-party components licensed under MIT,
10# BSD, and Apache 2.0 licences. See THIRD_PARTY_NOTICES for details.
11#
14from google.cloud.firestore import And
15from google.cloud.firestore import Client as FirestoreClient
16from google.cloud.firestore_v1.base_document import DocumentSnapshot
17from google.cloud.firestore_v1.stream_generator import StreamGenerator
19from flipdare.app_log import LOG
20from flipdare.error.app_error import AppErrorCode, DatabaseError
21from flipdare.firestore import DbQuery, FieldOp, WhereField
22from flipdare.firestore._app_db import AppDb
23from flipdare.firestore._app_sub_db import AppSubDb
24from flipdare.generated import AppJobType, GroupMemberKeys
25from flipdare.generated.model.group_member_model import GroupMemberInternalKeys, GroupMemberModel
26from flipdare.generated.model.group_model import GroupInternalKeys, GroupKeys, GroupModel
27from flipdare.generated.shared.model.user.request_status import RequestStatus
28from flipdare.util import TimeUtil
29from flipdare.generated.shared.firestore_collections import FirestoreCollections
30from flipdare.wrapper import (
31 GroupMemberWrapper,
32 GroupWrapper,
33 UserWrapper,
34)
36_GROUP: str = FirestoreCollections.GROUP.value
37_GROUP_MEMBER: str = FirestoreCollections.GROUP_MEMBER.value
39__all__ = ["GroupDb"]
42_GRP_K = GroupKeys
43_GRP_I = GroupInternalKeys
44_MEM_I = GroupMemberInternalKeys
45_MEM_K = GroupMemberKeys
47_OP = FieldOp
50class GroupDb(AppDb[GroupWrapper, GroupModel]):
51 """Class for managing group-related database operations."""
53 def __init__(self, client: FirestoreClient) -> None:
54 super().__init__(
55 client=client,
56 collection_name=FirestoreCollections.GROUP,
57 model_class=GroupModel,
58 wrapper_class=GroupWrapper,
59 )
61 self.members = AppSubDb[GroupMemberWrapper, GroupMemberModel](
62 client=client,
63 collection_name=FirestoreCollections.GROUP,
64 sub_collection_name=FirestoreCollections.GROUP_MEMBER,
65 wrapper_class=GroupMemberWrapper,
66 model_class=GroupMemberModel,
67 )
69 def groups(self, user_id: str) -> list[str] | None:
70 """Get all group IDs for a user from Firestore."""
71 LOG().debug(f"Getting groups for user: {user_id}")
72 try:
73 query = DbQuery.where(WhereField("uid", FieldOp.EQUAL, user_id))
74 results = query.get_query(self.client, _GROUP).get()
76 groups = [doc.id for doc in results]
77 return groups if len(groups) > 0 else None
78 except Exception as error:
79 LOG().error(f"Error searching for groups for user {user_id}: {error}")
80 return None
82 def create_member(
83 self,
84 group_id: str,
85 member_data: GroupMemberModel,
86 ) -> GroupMemberWrapper:
87 """Add a new member to a group in Firestore."""
88 return self.members.create_sub(group_id, member_data)
90 def update_member(
91 self,
92 group_id: str,
93 member_id: str,
94 member_data: GroupMemberWrapper,
95 ) -> GroupMemberWrapper | None:
96 """Update a member of a group in Firestore."""
97 return self.members.update_sub(group_id, member_id, member_data.get_updates())
99 def get_member(self, group_id: str, member_id: str) -> GroupMemberWrapper | None:
100 """Get a specific member of a group from Firestore."""
101 return self.members.get_sub(group_id, member_id)
103 def get_members(self, group_id: str, limit: int | None = None) -> list[GroupMemberWrapper]:
104 """Get all members for a group from Firestore."""
105 LOG().debug(f"Getting members for group: {group_id}")
106 try:
107 return self.members.get_all_sub(group_id, limit=limit)
108 except Exception as error:
109 LOG().error(f"Error searching for members for group {group_id}: {error}")
110 return []
112 def get_users(self, group_id: str, limit: int | None = None) -> list[UserWrapper]:
113 """Get all user IDs for members of a group from Firestore."""
114 from flipdare.services import get_user_db
116 user_db = get_user_db()
118 LOG().debug(f"Getting users for group: {group_id}")
119 users: list[UserWrapper] = []
121 try:
122 members = self.members.get_all_sub(group_id, limit=limit)
123 for member in members:
124 user = user_db.get(member.uid)
125 if user is not None:
126 users.append(user)
127 return users
128 except Exception as error:
129 LOG().error(f"Error searching for users for group {group_id}: {error}")
130 return []
132 def get_recent_group_unprocessed(self, hours: int | None = None) -> list[GroupWrapper]:
133 job_type = AppJobType.CR_GROUP_UNPROCESSED
134 if hours is None:
135 hours = self.def_window_hours
137 hours_ago = TimeUtil.get_utc_time_hours_ago(hours)
138 msg = (
139 f"Getting unprocessed content within the last {TimeUtil.formatted_dt(hours_ago)} hours"
140 )
141 LOG().debug(msg)
143 query_stream: StreamGenerator[DocumentSnapshot] | None = None
144 try:
145 query = self.client.collection_group(_GROUP)
146 query = query.where(
147 filter=And(
148 [
149 WhereField[_GRP_I](_GRP_I.PROCESSED, _OP.EQUAL, False).filter,
150 WhereField[_GRP_K](
151 _GRP_K.UPDATED_AT,
152 _OP.GREATER_THAN_OR_EQUAL,
153 hours_ago,
154 ).filter,
155 ],
156 ),
157 )
158 query_stream = query.stream()
160 except Exception as e:
161 msg = f"Failed to get groups with unprocessed status: {e}"
162 raise DatabaseError(
163 msg,
164 error_code=AppErrorCode.DATABASE,
165 collection_name=_GROUP,
166 document_id=None,
167 ) from e
169 groups = []
170 errors = []
171 for doc in query_stream:
172 try:
173 group = self._cvt_snap_to_model(doc)
174 if group is None:
175 errors.append(f"Could not convert group doc {doc.id} to model")
176 continue
178 groups.append(group)
179 LOG().info(f"added group {group.doc_id}")
180 except Exception as e:
181 errors.append(f"Error processing group doc {doc.id}: {e}")
182 continue
184 if len(errors) > 0:
185 error_summary = "\n".join(errors)
186 LOG().error(f"Errors processing group:\n{error_summary}")
187 self.log_creator.db_error(
188 error_code=AppErrorCode.INVALID_DATA,
189 job_type=job_type,
190 collection=FirestoreCollections.GROUP,
191 message=f"Errors processing group members:\n{error_summary}",
192 )
194 LOG().debug(f"Retrieved {len(groups)} groups that require processing.")
195 return groups
197 def get_recent_member_unprocessed_status(
198 self,
199 hours: int | None = None,
200 ) -> list[GroupMemberWrapper]:
201 job_type = AppJobType.CR_GROUP_MEMBER_UNPROCESSED
203 if hours is None:
204 hours = self.def_window_hours
206 hours_ago = TimeUtil.get_utc_time_hours_ago(hours)
207 msg = (
208 f"Getting unprocessed content within the last {TimeUtil.formatted_dt(hours_ago)} hours"
209 )
210 LOG().debug(msg)
212 query_stream: StreamGenerator[DocumentSnapshot] | None = None
213 try:
214 # need a specific query for sub-collections
215 complete_status = RequestStatus.complete()
216 complete_values = [status.value for status in complete_status]
218 query = self.client.collection_group(_GROUP_MEMBER)
219 query = query.where(
220 filter=And(
221 [
222 WhereField[_MEM_I](
223 _MEM_I.REQUEST_NOTIFICATION_SENT,
224 _OP.EQUAL,
225 False,
226 ).filter,
227 WhereField[_MEM_K](_MEM_K.STATUS, _OP.IN, complete_values).filter,
228 WhereField[_MEM_K](
229 _MEM_K.UPDATED_AT,
230 _OP.GREATER_THAN_OR_EQUAL,
231 hours_ago,
232 ).filter,
233 ],
234 ),
235 )
236 query_stream = query.stream()
238 except Exception as e:
239 msg = f"Failed to get group members with unprocessed status: {e}"
240 raise DatabaseError(
241 msg,
242 error_code=AppErrorCode.DATABASE,
243 collection_name=_GROUP_MEMBER,
244 document_id=None,
245 ) from e
247 return self._process_member_stream(job_type, query_stream)
249 def get_recent_member_unprocessed(
250 self,
251 hours: int | None = None,
252 ) -> list[GroupMemberWrapper]:
253 job_type = AppJobType.CR_GROUP_MEMBER_UNPROCESSED
255 if hours is None:
256 hours = self.def_window_hours
258 hours_ago = TimeUtil.get_utc_time_hours_ago(hours)
259 msg = f"Getting recent group members within the last {TimeUtil.formatted_dt(hours_ago)} hours"
260 LOG().debug(msg)
262 query_stream: StreamGenerator[DocumentSnapshot] | None = None
263 try:
264 # need a specific query for sub-collections
265 query = self.client.collection_group(_GROUP_MEMBER)
266 query = query.where(
267 filter=And(
268 [
269 WhereField[_MEM_I](
270 _MEM_I.REQUEST_NOTIFICATION_SENT,
271 _OP.EQUAL,
272 False,
273 ).filter,
274 WhereField[_MEM_K](
275 _MEM_K.UPDATED_AT,
276 _OP.GREATER_THAN_OR_EQUAL,
277 hours_ago,
278 ).filter,
279 ],
280 ),
281 )
282 query_stream = query.stream()
284 except Exception as e:
285 msg = f"Failed to get group members with unprocessed status: {e}"
286 raise DatabaseError(
287 msg,
288 error_code=AppErrorCode.DATABASE,
289 collection_name=_GROUP_MEMBER,
290 document_id=None,
291 ) from e
293 return self._process_member_stream(job_type, query_stream)
295 def _process_member_stream(
296 self,
297 job_type: AppJobType,
298 query_stream: StreamGenerator[DocumentSnapshot] | None,
299 ) -> list[GroupMemberWrapper]:
300 members = []
301 errors = []
302 if query_stream is None:
303 return []
305 for doc in query_stream:
306 try:
307 member = self.members._cvt_sub_snap_to_model(doc)
308 if member is None:
309 errors.append(f"Could not convert group member doc {doc.id} to model")
310 continue
312 members.append(member)
313 LOG().info(f"added member {member.doc_id} from group {member.gid}")
314 except Exception as e:
315 errors.append(f"Error processing group member doc {doc.id}: {e}")
316 continue
318 if len(errors) > 0:
319 error_summary = "\n".join(errors)
320 LOG().error(f"Errors processing group members:\n{error_summary}")
321 self.log_creator.db_error(
322 error_code=AppErrorCode.INVALID_DATA,
323 job_type=job_type,
324 collection=FirestoreCollections.GROUP_MEMBER,
325 message=f"Errors processing group members:\n{error_summary}",
326 )
328 LOG().debug(f"Retrieved {len(members)} group members waiting for admin review.")
329 return members