Coverage for functions \ flipdare \ firestore \ flag_db.py: 91%
111 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-05-08 12:22 +1000
« prev ^ index » next coverage.py v7.13.0, created at 2026-05-08 12:22 +1000
1#!/usr/bin/env python
2# Copyright (c) 2026 Flipdare Pty Ltd. All rights reserved.
3#
4# This file is part of Flipdare's proprietary software and contains
5# confidential and copyrighted material. Unauthorised copying,
6# modification, distribution, or use of this file is strictly
7# prohibited without prior written permission from Flipdare Pty Ltd.
8#
9# This software includes third-party components licensed under MIT,
10# BSD, and Apache 2.0 licences. See THIRD_PARTY_NOTICES for details.
11#
14from google.cloud.firestore import Client as FirestoreClient
15from google.cloud.firestore_v1.base_document import BaseDocumentReference, DocumentSnapshot
16from flipdare.app_log import LOG
17from flipdare.constants import IS_DEBUG
18from flipdare.error.app_error import CodePathError, DatabaseError
19from flipdare.firestore._app_db import AppDb
20from flipdare.firestore._app_sub_db import AppSubDb
21from flipdare.firestore.core.db_query import DbQuery, FieldOp, OrderByField, WhereField
22from flipdare.firestore.core.sub_comment_transaction import SubCommentTransaction
23from flipdare.generated import (
24 AppErrorCode,
25 DisputedProgress,
26 FlagKeys,
27 FlagModel,
28 FlagType,
29 IssueProgress,
30)
31from flipdare.generated.model.issue.flag_model import FlagInternalKeys
32from flipdare.generated.model.issue.issue_comment_model import IssueCommentModel
33from flipdare.generated.shared.firestore_collections import FirestoreCollections
34from flipdare.util.time_util import TimeUtil
35from flipdare.wrapper import FlagWrapper, IssueCommentWrapper
37_FLAG: str = FirestoreCollections.FLAG.value
39__all__ = ["FlagDb"]
41_K = FlagKeys
42_I = FlagInternalKeys
43_OP = FieldOp
45type _KeyType = FlagKeys | FlagInternalKeys # for type checking.
48class FlagDb(AppDb[FlagWrapper, FlagModel]):
49 """Class for managing flag-related database operations."""
51 def __init__(self, client: FirestoreClient) -> None:
52 super().__init__(
53 client=client,
54 collection_name=FirestoreCollections.FLAG,
55 model_class=FlagModel,
56 wrapper_class=FlagWrapper,
57 )
59 self.comments = AppSubDb[IssueCommentWrapper, IssueCommentModel](
60 client=client,
61 collection_name=FirestoreCollections.FLAG,
62 sub_collection_name=FirestoreCollections.FLAG_COMMENTS,
63 wrapper_class=IssueCommentWrapper,
64 model_class=IssueCommentModel,
65 )
66 self.comments_tx = SubCommentTransaction[IssueCommentModel](
67 self,
68 FirestoreCollections.FLAG_COMMENTS,
69 )
71 def create_flag_comment(
72 self,
73 flag_id: str,
74 comment: IssueCommentModel,
75 ) -> IssueCommentWrapper:
76 """Create a new flag comment under the specified flag."""
77 sub_col_name = self.comments.sub_collection_name
79 try:
80 # Prepare the comment data with the generated ID
81 comment_ref = self.comments_tx.create_with_increment(
82 parent_id=flag_id,
83 model=comment,
84 count_field="comment_count",
85 )
86 # Retrieval after commit
87 return self._complete_transaction(flag_id, comment_ref)
88 except Exception as e:
89 msg = f"Failed to create comment for flag {flag_id}: {e}"
90 raise DatabaseError(
91 msg,
92 error_code=AppErrorCode.DATABASE,
93 collection_name=sub_col_name,
94 document_id=flag_id,
95 ) from e
97 def _complete_transaction(
98 self,
99 flag_id: str,
100 comment_ref: BaseDocumentReference,
101 ) -> IssueCommentWrapper:
102 saved_doc = comment_ref.get()
103 if not isinstance(saved_doc, DocumentSnapshot):
104 msg = (
105 f"Error retreiving flag {flag_id} comment after transaction:"
106 "Are you sure you didn't call Awaitable version of get?"
107 )
108 raise CodePathError(message=msg)
110 saved_model = self.comments._cvt_sub_snap_to_model(saved_doc)
111 if saved_model is None:
112 raise DatabaseError(
113 f"Failed to retrieve saved comment for flag {flag_id}",
114 error_code=AppErrorCode.DATABASE,
115 collection_name=self.comments.sub_collection_name,
116 document_id=flag_id,
117 )
118 return saved_model
120 def get_most_recent_comment_for_flag(self, flag_id: str) -> IssueCommentWrapper | None:
121 """Get the most recent comment for a given flag."""
122 try:
123 order_by = OrderByField.created_at(descending=True)
124 docs = self.comments.get_all_sub(parent_id=flag_id, order_by=order_by, limit=1)
125 LOG().warning(f"Retrieved {len(docs)} comments for flag {flag_id}")
126 if not docs:
127 return None
128 return docs[0]
129 except Exception as e:
130 msg = f"Failed to get most recent comment for flag {flag_id}: {e}"
131 raise DatabaseError(
132 msg,
133 error_code=AppErrorCode.DATABASE,
134 collection_name=self.comments.sub_collection_name,
135 document_id=flag_id,
136 ) from e
138 def get_recent_major_unprocessed(self, hours: int | None = None) -> list[FlagWrapper]:
139 """
140 Gets flags in open state for cron (e.g. most likely failed processing)
141 """
142 if hours is None:
143 hours = self.def_window_hours
145 hours_ago = TimeUtil.get_utc_time_hours_ago(hours)
146 major = FlagType.all_major_types()
147 major_statuses = [ft.value for ft in major]
149 if IS_DEBUG:
150 debug_str = (
151 f"Getting flags with type '{major_statuses}' and '{IssueProgress.OPEN}' waiting for admin review within last "
152 f"{TimeUtil.formatted_dt(hours_ago)}"
153 )
154 LOG().info(debug_str)
156 try:
157 and_fields = [
158 WhereField[_K](_K.PROGRESS, _OP.EQUAL, IssueProgress.OPEN.value),
159 WhereField[_K](_K.FLAG_TYPE, _OP.IN, major_statuses),
160 WhereField[_K](_K.CREATED_AT, _OP.GREATER_THAN_OR_EQUAL, hours_ago),
161 ]
162 order_by = OrderByField.created_at(descending=False)
163 query = DbQuery.and_(and_fields, order_by=order_by)
165 flag_docs = query.get_query(self.client, _FLAG).stream()
166 flags = [
167 flag for doc in flag_docs if (flag := self._cvt_snap_to_model(doc)) is not None
168 ]
169 if IS_DEBUG:
170 LOG().debug(f"Retrieved {len(flags)} major unprocessed flags.")
171 return flags
172 except Exception as e:
173 msg = f"Failed to get flags waiting for admin review: {e}"
174 raise DatabaseError(
175 msg,
176 error_code=AppErrorCode.DATABASE,
177 collection_name=_FLAG,
178 document_id=None,
179 ) from e
181 def get_recent_major_unacknowledged(self, hours: int | None = None) -> list[FlagWrapper]:
182 """
183 Gets major flags that have been auto acknowledged but not yet reviewed by admin.
184 """
185 if hours is None:
186 hours = self.def_window_hours
188 hours_ago = TimeUtil.get_utc_time_hours_ago(hours)
189 major_flags = FlagType.all_major_types()
190 major_flags_str = [ft.value for ft in major_flags]
192 debug_str = (
193 f"Getting flags with type ({major_flags_str}) waiting for "
194 "admin review within last "
195 f"{TimeUtil.formatted_dt(hours_ago)}"
196 )
197 LOG().info(debug_str)
199 try:
200 and_fields = [
201 WhereField[_K](_K.FLAG_TYPE, _OP.IN, major_flags_str),
202 WhereField[_K](_K.PROGRESS, _OP.EQUAL, IssueProgress.WAITING_ADMIN.value),
203 WhereField[_K](_K.CREATED_AT, _OP.GREATER_THAN_OR_EQUAL, hours_ago),
204 ]
205 order_by = OrderByField.created_at(descending=False)
206 query = DbQuery.and_(and_fields, order_by=order_by)
208 flag_docs = query.get_query(self.client, _FLAG).stream()
209 entries = [
210 flag for doc in flag_docs if (flag := self._cvt_snap_to_model(doc)) is not None
211 ]
212 if IS_DEBUG:
213 LOG().debug(f"Retrieved {len(entries)} major flags waiting for admin review.")
215 return entries
216 except Exception as e:
217 msg = f"Failed to get flags waiting for admin review: {e}"
218 raise DatabaseError(
219 msg,
220 error_code=AppErrorCode.DATABASE,
221 collection_name=_FLAG,
222 document_id=None,
223 ) from e
225 def get_recent_waiting_disputed_service(self, hours: int = 4) -> list[FlagWrapper]:
226 """
227 Get flags that are waiting for admin review in the last 4 hours
228 """
229 hours_ago = TimeUtil.get_utc_time_hours_ago(hours)
230 debug_str = (
231 f"Getting flags waiting for admin review within last "
232 f"{TimeUtil.formatted_dt(hours_ago)}"
233 )
234 LOG().info(debug_str)
236 try:
237 and_ = [
238 WhereField[_KeyType](_I.CREATED_AT, _OP.GREATER_THAN_OR_EQUAL, hours_ago),
239 WhereField[_KeyType](
240 _K.DISPUTED_PROGRESS,
241 _OP.EQUAL,
242 DisputedProgress.WAITING_ADMIN.value,
243 ),
244 ]
245 order_by = OrderByField.created_at(descending=False)
246 query = DbQuery.and_(and_, order_by=order_by)
248 flag_docs = query.get_query(self.client, _FLAG).stream()
249 flags = [
250 flag for doc in flag_docs if (flag := self._cvt_snap_to_model(doc)) is not None
251 ]
252 if IS_DEBUG:
253 LOG().debug(f"Retrieved {len(flags)} disputed flags waiting for admin review.")
254 return flags
255 except Exception as e:
256 msg = f"Failed to get flags waiting for admin review: {e}"
257 raise DatabaseError(
258 msg,
259 error_code=AppErrorCode.DATABASE,
260 collection_name=_FLAG,
261 document_id=None,
262 ) from e