Coverage for functions \ flipdare \ firestore \ _app_db.py: 100%
0 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-05-08 12:22 +1000
« prev ^ index » next coverage.py v7.13.0, created at 2026-05-08 12:22 +1000
1#!/usr/bin/env python
2# Copyright (c) 2026 Flipdare Pty Ltd. All rights reserved.
3#
4# This file is part of Flipdare's proprietary software and contains
5# confidential and copyrighted material. Unauthorised copying,
6# modification, distribution, or use of this file is strictly
7# prohibited without prior written permission from Flipdare Pty Ltd.
8#
9# This software includes third-party components licensed under MIT,
10# BSD, and Apache 2.0 licences. See THIRD_PARTY_NOTICES for details.
11#
13from __future__ import annotations
15from collections.abc import Awaitable, Sequence
16from dataclasses import dataclass
17from typing import TYPE_CHECKING, Any, NoReturn, cast
19from google.cloud.firestore import Client as FirestoreClient
20from google.cloud.firestore_v1.base_document import DocumentSnapshot
21from google.cloud.firestore_v1.base_query import FieldFilter
22from google.cloud.firestore_v1.field_path import FieldPath
23from google.cloud.firestore_v1 import CollectionReference, Query, aggregation
25from flipdare.analysis.data.nested.time_series_collection_data import (
26 CollectionStat,
27 TimeSeriesCollectionData,
28)
29from flipdare.app_log import LOG
30from flipdare.app_types import DatabaseDict
31from flipdare.constants import DEF_RETRIEVAL_WINDOW_HOURS, IS_DEBUG
32from flipdare.error.app_error import DatabaseError
33from flipdare.firestore.core.app_base_model import AppBaseModel
34from flipdare.firestore.core.collection_stat_query import CollectionStatQuery
35from flipdare.firestore.core.db_query import FieldOp, OrderByField
36from flipdare.generated.shared.app_error_code import AppErrorCode
37from flipdare.generated.shared.backend.app_job_type import AppJobType
38from flipdare.generated.shared.firestore_collections import FirestoreCollections
39from flipdare.util.debug_util import stringify_query
40from flipdare.util.time_util import TimeUtil
41from flipdare.wrapper._persisted_wrapper import PersistedWrapper
43if TYPE_CHECKING:
45 from flipdare.backend.app_logger import AppLogger
47__all__ = ["AppDb"]
49# NOTE: after upgrading, cant just use "id", need to use FieldPath.document_id()
50_id = FieldPath.document_id()
53@dataclass(frozen=True, slots=True)
54class AggregateResult: # type: ignore[misc]
55 count: float
56 sum_total: float = 0
58 @property
59 def is_error(self) -> bool:
60 return self.count < 0 or self.sum_total < 0
63class AppDb[W: PersistedWrapper[Any], T: AppBaseModel]:
64 def __init__(
65 self,
66 client: FirestoreClient,
67 collection_name: FirestoreCollections,
68 wrapper_class: type[W],
69 model_class: type[T],
70 log_creator: AppLogger | None = None,
71 def_window_hours: int = DEF_RETRIEVAL_WINDOW_HOURS,
72 ) -> None:
73 if def_window_hours <= 0:
74 LOG().warning(
75 f"Invalid def_window_hours {def_window_hours}, "
76 f"using default {DEF_RETRIEVAL_WINDOW_HOURS}",
77 )
78 def_window_hours = DEF_RETRIEVAL_WINDOW_HOURS
80 self._client = client
81 self._collection_name = collection_name
82 self._wrapper_class = wrapper_class
83 self._model_class = model_class
84 self._def_window_hours = def_window_hours
85 self._log_creator = log_creator
87 # -----------------------------------------------------------------------------
88 # Properties
89 # -----------------------------------------------------------------------------
91 @property
92 def collection_name(self) -> str:
93 return self._collection_name.value
95 @property
96 def wrapper_class(self) -> type[W]:
97 return self._wrapper_class
99 @property
100 def model_class(self) -> type[T]:
101 return self._model_class
103 @property
104 def def_window_hours(self) -> int:
105 return self._def_window_hours
107 @property
108 def client(self) -> FirestoreClient:
109 return self._client
111 @property
112 def log_creator(self) -> AppLogger:
113 from flipdare.services import get_app_logger
115 if self._log_creator is None:
116 self._log_creator = get_app_logger()
118 return self._log_creator
120 @property
121 def document_count(self) -> int:
122 """Get the total number of documents in the collection."""
123 try:
124 col_ref = self.client.collection(self.collection_name)
125 count_query = col_ref.count()
126 count_snapshot = count_query.get()
127 return int(count_snapshot[0][0].value) # pyright: ignore[reportIndexIssue] #
128 except Exception as error:
129 LOG().error(f"Error counting documents in collection {self.collection_name}: {error}")
130 return 0
132 # -----------------------------------------------------------------------------
133 # Get
134 # -----------------------------------------------------------------------------
136 def exists(self, doc_id: str) -> bool:
137 """Check if a document exists in Firestore."""
138 return self._exists(doc_id)
140 def get(self, doc_id: str) -> W | None:
141 """Get a document by ID from Firestore, returns PersistedWrapper or None"""
142 data = self._get(doc_id)
143 if data is None:
144 return None
146 return self.wrapper_class.from_dict(data)
148 def get_bulk(self, doc_ids: list[str]) -> list[W]:
149 """Get multiple documents by their IDs, returns list of PersistedWrapper."""
150 data_items = self._get_bulk(doc_ids)
151 if len(data_items) == 0:
152 return []
154 results: list[W] = []
155 for item in data_items:
156 wrapper = self.wrapper_class.from_dict(item)
157 results.append(wrapper)
158 return results
160 # -----------------------------------------------------------------------------
161 # Create/Update
162 # -----------------------------------------------------------------------------
164 def create(self, data: T | dict[str, Any], merge: bool = False) -> W:
165 """
166 Create a new document in Firestore, returns PersistedWrapper.
168 Args:
169 data: Model instance or dict to create from
170 merge: Whether to merge with existing document
172 Returns:
173 PersistedWrapper for the created document
175 """
176 # Convert to dict for Firestore
177 payload = data if isinstance(data, dict) else data.to_dict()
178 created_data = self._create(payload, merge=merge)
180 # from_dict returns PersistedWrapper[Model]
181 return self.wrapper_class.from_dict(created_data)
183 def update_model(self, model: W) -> W | None:
184 """Update document in Firestore using model instance, returns PersistedWrapper."""
185 doc_id = model.doc_id
186 if not model.has_changes:
187 if IS_DEBUG:
188 LOG().debug(f"No changes to update for {doc_id}")
189 # Model is already a PersistedWrapper, return it directly
190 return model
192 updates = model.get_updates()
193 if IS_DEBUG:
194 LOG().debug(f"Updating model {doc_id} with changes: {updates}")
196 return self.update(doc_id, updates)
198 def update(self, doc_id: str, updates: DatabaseDict) -> W | None:
199 """Update document in Firestore, returns PersistedWrapper."""
200 updated_data = self._update(doc_id, updates)
201 if updated_data is None:
202 return None
204 return self.wrapper_class.from_dict(updated_data)
206 def batch_update(self, updates: Sequence[W]) -> int:
207 """Bulk update documents in Firestore."""
208 batch = self.client.batch()
209 base_ref = self.client.collection(self.collection_name)
210 ct = 0
212 for update in updates:
213 doc_id = update.doc_id
214 if not update.has_changes:
215 if IS_DEBUG:
216 LOG().debug(f"Skipping update for {doc_id} - no changes")
217 continue
219 update_data = update.get_updates()
220 doc_ref = base_ref.document(doc_id)
221 batch.update(doc_ref, update_data)
222 ct += 1
224 batch.commit()
225 if IS_DEBUG:
226 LOG().debug(f"Batch updated {ct} documents in collection {self.collection_name}")
227 return ct
229 # -----------------------------------------------------------------------------
230 # Delete
231 # -----------------------------------------------------------------------------
233 def delete(self, doc_id: str) -> None:
234 """Delete a document from Firestore."""
235 col_name = self.collection_name
237 try:
238 doc_ref = self.client.collection(col_name).document(doc_id)
239 doc_ref.delete()
240 # LOG().debug(f"Deleted document with id: {doc_id} from collection {col_name}")
241 except Exception as error:
242 LOG().error(f"Error deleting document {doc_id} from collection {col_name}: {error}")
243 raise DatabaseError(
244 f"Failed to delete document {doc_id}",
245 error_code=AppErrorCode.DATABASE,
246 collection_name=col_name,
247 document_id=doc_id,
248 ) from error
250 # -----------------------------------------------------------------------------
251 # Aggregate Stats
252 # -----------------------------------------------------------------------------
254 def get_collection_stats(self, days: int = 7) -> TimeSeriesCollectionData:
255 date_ranges = TimeUtil.get_date_range(
256 days,
257 start=TimeUtil.get_start_of_day_utc(),
258 reverse=True,
259 )
260 agg_stats = TimeSeriesCollectionData()
262 client = self.client
263 col_name = self.collection_name
265 for date_range in date_ranges:
266 from_date = date_range.from_date
267 to_date = date_range.to_date
268 proc_ct = 0.0
269 unproc_ct = 0.0
271 proc_qry = CollectionStatQuery.processed(from_date, to_date)
272 proc_agg = self._get_agg_value(proc_qry.get_query(client, col_name))
273 proc_ct = proc_agg.count if proc_agg is not None else -1.0
275 unproc_qry = CollectionStatQuery.unprocessed(from_date, to_date)
276 unproc_agg = self._get_agg_value(unproc_qry.get_query(client, col_name))
277 unproc_ct = unproc_agg.count if unproc_agg is not None else -1.0
279 error_qry = CollectionStatQuery.error(from_date, to_date)
280 error_agg = self._get_agg_value(
281 error_qry.get_query(client, col_name),
282 sum_field=error_qry.sum_field,
283 )
284 error_ct = error_agg.count if error_agg is not None else -1.0
286 if proc_ct < 0 or unproc_ct < 0 or error_ct < 0:
287 agg_stats.increment_error()
288 else:
289 stat = CollectionStat(
290 total_ct=proc_ct + error_ct + unproc_ct,
291 processed_ct=proc_ct,
292 unprocessed_ct=unproc_ct,
293 error_ct=error_ct,
294 )
295 agg_stats.add(from_date, FirestoreCollections.DARE, stat)
297 return agg_stats
299 def _get_agg_value(
300 self,
301 query: Query | CollectionReference,
302 sum_field: str | None = None,
303 ) -> AggregateResult | None:
304 try:
305 agg_query = aggregation.AggregationQuery(query)
306 agg_query.count(alias="count")
307 if sum_field is not None:
308 agg_query.sum(sum_field, alias="sum")
310 results = agg_query.get()
311 count_value = results[0][0].value # type: ignore
313 sum_value = 0.0
314 if sum_field is not None:
315 sum_value = results[0][1].value # type: ignore
317 return AggregateResult(count=float(count_value), sum_total=float(sum_value))
319 except Exception as e:
320 msg = f"Error getting aggregate stats {self.collection_name}: {e}\n{stringify_query(query)}"
321 LOG().error(msg)
322 return None
324 # -----------------------------------------------------------------------------
325 # Get Utilities
326 # -----------------------------------------------------------------------------
328 def _exists(self, doc_id: str) -> bool: # pragma: no cover
329 """Check if a document exists in a specific collection."""
330 doc_data = self._get(doc_id)
331 return doc_data is not None
333 def _exists_sub(self, parent_id: str, sub_col_name: str, doc_id: str) -> bool:
334 """Check if a sub-collection document exists."""
335 doc_data = self._get_sub(parent_id, sub_col_name, doc_id)
336 return doc_data is not None
338 def _get(self, doc_id: str) -> DatabaseDict | None:
339 """Get a document by ID from a specific collection, None otherwise"""
340 # LOG().debug(f"Getting document for collection {col_name}, id: {doc_id}")
341 col_name = self.collection_name
342 try:
343 doc = self.client.collection(col_name).document(doc_id).get()
344 doc = cast("DocumentSnapshot", doc)
346 if doc.exists:
347 return self._cvt_snap_to_data(doc)
348 return None
349 except Exception as error:
350 LOG().error(f"Error searching for collection {col_name}, id: {doc_id}: {error}")
351 return None
353 def _get_bulk(self, doc_ids: list[str]) -> list[DatabaseDict]:
354 """Get multiple documents by their IDs."""
355 col_name = self.collection_name
356 try:
357 documents = []
358 # Firestore allows up to 10 in 'in' queries
359 chunk_size = 10
360 for i in range(0, len(doc_ids), chunk_size):
361 chunk = doc_ids[i : i + chunk_size]
362 doc_refs = [self.client.collection(col_name).document(doc_id) for doc_id in chunk]
363 query = (
364 self.client.collection(col_name)
365 .where(filter=FieldFilter(_id, FieldOp.IN.value, doc_refs))
366 .get()
367 )
368 for snap in query:
369 data = self._cvt_snap_to_data(snap)
370 if data is not None:
371 documents.append(data)
373 return documents
374 except Exception as error:
375 LOG().error(f"Error getting documents for ids {doc_ids}: {error}")
376 return []
378 # -----------------------------------------------------------------------------
379 # Get for Sub Collections utilities
380 # -----------------------------------------------------------------------------
382 def _get_sub(self, parent_id: str, sub_col_name: str, doc_id: str) -> DatabaseDict | None:
383 """Get a sub-collection document by ID, None otherwise"""
384 # LOG().debug(
385 # f"Getting sub-document for collection "
386 # f"{col_name}/{parent_id}/{sub_col_name}, id: {doc_id}"
387 # )
388 col_name = self.collection_name
389 try:
390 doc = cast(
391 "DocumentSnapshot",
392 (
393 self.client.collection(col_name)
394 .document(parent_id)
395 .collection(sub_col_name)
396 .document(doc_id)
397 .get()
398 ),
399 )
400 if doc.exists:
401 return self._cvt_snap_to_data(doc)
402 return None
403 except Exception as error:
404 LOG().error(
405 f"Error searching for sub-collection "
406 f"{col_name}/{parent_id}/{sub_col_name}, id: {doc_id}: {error}",
407 )
408 return None
410 def _get_all_sub(
411 self,
412 parent_id: str,
413 sub_col_name: str,
414 order_by: OrderByField[Any] | None = None,
415 limit: int | None = None,
416 ) -> list[DatabaseDict]:
417 """Get all documents in a sub-collection."""
418 # LOG().debug(
419 # f"Getting all sub-documents for collection "
420 # f"{col_name}/{parent_id}/{sub_col_name}"
421 # )
422 col_name = self.collection_name
423 try:
424 collection_ref = (
425 self.client.collection(col_name).document(parent_id).collection(sub_col_name)
426 )
428 if order_by is not None:
429 collection_ref = collection_ref.order_by(
430 order_by.key,
431 direction=order_by.direction,
432 )
434 docs = (
435 collection_ref.get()
436 if limit is None or limit <= 0
437 else collection_ref.limit(limit).get()
438 )
440 results = []
441 for doc in docs:
442 data = self._cvt_snap_to_data(doc)
443 if data is not None:
444 results.append(data)
445 return results
446 except Exception as error:
447 LOG().error(
448 f"Error getting all sub-documents for collection "
449 f"{col_name}/{parent_id}/{sub_col_name}: {error}",
450 )
451 return []
453 def _get_bulk_sub(
454 self,
455 parent_id: str,
456 sub_col_name: str,
457 doc_ids: list[str],
458 ) -> list[DatabaseDict]:
459 """Get multiple sub-collection documents by their IDs."""
460 col_name = self.collection_name
462 try:
463 documents = []
464 # Firestore allows up to 10 in 'in' queries
465 chunk_size = 10
466 for i in range(0, len(doc_ids), chunk_size):
467 chunk = doc_ids[i : i + chunk_size]
468 doc_refs = [self.client.collection(col_name).document(doc_id) for doc_id in chunk]
469 query = (
470 self.client.collection(col_name)
471 .document(parent_id)
472 .collection(sub_col_name)
473 .where(filter=FieldFilter(_id, "in", doc_refs))
474 .get()
475 )
477 for snap in query:
478 data = self._cvt_snap_to_data(snap)
479 if data is not None:
480 documents.append(data)
482 return documents
483 except Exception as error:
484 LOG().error(f"Error getting sub-documents for ids {doc_ids}: {error}")
485 return []
487 # -----------------------------------------------------------------------------
488 # Create/Update utilities
489 # -----------------------------------------------------------------------------
491 def _create(self, data: DatabaseDict, merge: bool = False) -> DatabaseDict:
492 """Create a new document in Firestore."""
493 col_name = self.collection_name
494 saved_data: DatabaseDict | None
495 try:
496 col_ref = self.client.collection(col_name)
497 doc_ref = col_ref.document()
498 doc_ref.set(data, merge=merge)
499 snap = doc_ref.get()
500 # quotes required for type checking and slight performance
501 snap = cast("DocumentSnapshot", snap)
502 saved_data = self._cvt_snap_to_data(snap)
503 except Exception as error:
504 LOG().error(f"Error creating document for collection {col_name}: {error}\n\t{data}")
505 raise DatabaseError(
506 f"Failed to create document for collection {col_name}",
507 error_code=AppErrorCode.DATABASE,
508 collection_name=col_name,
509 ) from error
511 if saved_data is not None:
512 return saved_data
514 raise DatabaseError(
515 f"Failed to create document for collection {col_name}",
516 error_code=AppErrorCode.DATABASE,
517 collection_name=col_name,
518 document_id=doc_ref.id,
519 )
521 def _update(self, doc_id: str, updates: DatabaseDict) -> DatabaseDict | None:
522 """Update document in Firestore."""
523 col_name = self.collection_name
525 # LOG().debug(f"Updating document for id: {doc_id}")
526 try:
527 doc_ref = self.client.collection(col_name).document(doc_id)
528 if not self._snap_exists(doc_ref.get()):
529 LOG().error(f"Document not found for update: {col_name}/{doc_id}")
530 return None
532 doc_ref.set(updates, merge=True)
533 updated_snap = doc_ref.get()
535 # we know it exists, so convert directly
536 return self._cvt_snap_to_data(updated_snap)
537 except Exception as error:
538 LOG().error(f"Error updating document {doc_id}: {error}")
539 raise DatabaseError(
540 f"Failed to update document {doc_id}",
541 error_code=AppErrorCode.DATABASE,
542 collection_name=col_name,
543 document_id=doc_id,
544 ) from error
546 # -----------------------------------------------------------------------------
547 # Create for Sub-Collection utilities
548 # -----------------------------------------------------------------------------
550 def _create_sub(self, parent_id: str, sub_col_name: str, data: DatabaseDict) -> DatabaseDict:
551 """Create a new sub-collection document in Firestore."""
552 col_name = self.collection_name
553 saved_data: DatabaseDict | None = None
554 try:
555 col_ref = self.client.collection(col_name).document(parent_id).collection(sub_col_name)
556 doc_ref = col_ref.document()
557 doc_ref.set(data)
558 snap = cast("DocumentSnapshot", doc_ref.get())
559 saved_data = self._cvt_snap_to_data(snap)
560 except Exception as error:
561 self._raise_sub_error(parent_id, sub_col_name, "create", data, error)
563 if saved_data is None:
564 self._raise_sub_error(parent_id, sub_col_name, "create", data)
566 return saved_data
568 def _update_sub(
569 self,
570 parent_id: str,
571 sub_col_name: str,
572 doc_id: str,
573 updates: DatabaseDict,
574 ) -> DatabaseDict | None:
575 """Update sub-collection document in Firestore."""
576 # LOG().debug(f"Updating sub-document for id: {doc_id}")
577 col_name = self.collection_name
579 try:
580 doc_ref = (
581 self.client.collection(col_name)
582 .document(parent_id)
583 .collection(sub_col_name)
584 .document(doc_id)
585 )
586 if not self._snap_exists(doc_ref.get()):
587 LOG().warning(f"Sub-document not found for id: {doc_id}")
588 return None
590 doc_ref.set(updates, merge=True)
591 # LOG().debug(f"Found sub document for {parent_id}/{doc_id}, updating with "
592 # f"{stringify_debug(updates)}")
594 return self._cvt_snap_to_data(doc_ref.get())
595 except Exception as error:
596 LOG().error(f"Error updating sub-document {doc_id}: {error}")
597 raise DatabaseError(
598 f"Failed to update sub-document {doc_id}",
599 error_code=AppErrorCode.DATABASE,
600 collection_name=f"{col_name}/{parent_id}/{sub_col_name}",
601 document_id=doc_id,
602 ) from error
604 # -----------------------------------------------------------------------------
605 # Misc utilities
606 # -----------------------------------------------------------------------------
608 def _snap_exists(self, snap: DocumentSnapshot | Awaitable[DocumentSnapshot]) -> bool:
609 """Check if DocumentSnapshot exists, supports Awaitable for async compatibility."""
610 if isinstance(snap, Awaitable):
611 msg = "Expected DocumentSnapshot, got Awaitable. Are you accidentally using async?"
612 raise TypeError(msg)
614 return snap.exists
616 def _cvt_snap_to_model(self, snap: DocumentSnapshot) -> W | None:
617 """Convert DocumentSnapshot to PersistedWrapper[model]."""
618 data = self._cvt_snap_to_data(snap)
619 if data is None:
620 return None
622 return self.wrapper_class.from_dict(data)
624 def _cvt_snap_to_data(
625 self,
626 snap: DocumentSnapshot | Awaitable[DocumentSnapshot],
627 ) -> DatabaseDict | None:
628 """Convert DocumentSnapshot to DatabaseDict with 'id'."""
629 if isinstance(snap, Awaitable):
630 msg = ("Expected DocumentSnapshot, got Awaitable. Are you accidentally using async?",)
631 raise TypeError(msg)
633 if not snap.exists:
634 return None
636 data: DatabaseDict | None = snap.to_dict()
637 if data is None:
638 return None
640 data["id"] = snap.id
641 return data
643 def _process_results(self, results: list[DocumentSnapshot]) -> list[W]:
644 """Convert list of DocumentSnapshots to list of PersistedWrapper."""
645 models: list[W] = []
646 for snap in results:
647 model = self._cvt_snap_to_model(snap)
648 if model is not None:
649 models.append(model)
650 return models
652 # -----------------------------------------------------------------------------
653 # Error handling
654 # -----------------------------------------------------------------------------
656 def _raise_sub_error(
657 self,
658 parent_id: str,
659 sub_col_name: str,
660 operation: str,
661 data: DatabaseDict,
662 error: Exception | None = None,
663 ) -> NoReturn:
664 """Helper to raise a standardized error for sub-collection operations."""
665 col_name = self.collection_name
667 LOG().error(
668 f"Error performing {operation} sub-document for collection "
669 f"{col_name}/{parent_id}/{sub_col_name}: {error}\n\t{data}",
670 )
671 db_error = DatabaseError(
672 f"Error in collection {self.collection_name}/{parent_id}/{sub_col_name}: {error}",
673 error_code=AppErrorCode.DATABASE,
674 collection_name=f"{self.collection_name}/{parent_id}/{sub_col_name}",
675 )
677 if error is None:
678 raise db_error
679 raise db_error from error
681 def log_error(
682 self,
683 job_type: AppJobType,
684 message: str,
685 error_code: AppErrorCode = AppErrorCode.DATABASE,
686 notify_admin: bool = True,
687 ) -> None:
688 from flipdare.services import get_app_logger
690 collection = self._collection_name
691 LOG().error(f"Logging error for collection {collection}: {message}")
693 get_app_logger().db_error(
694 message=message,
695 error_code=error_code,
696 job_type=job_type,
697 collection=collection,
698 notify_admin=notify_admin,
699 )