Coverage for functions \ flipdare \ firestore \ content_db.py: 91%
33 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-05-08 12:22 +1000
« prev ^ index » next coverage.py v7.13.0, created at 2026-05-08 12:22 +1000
1# Copyright (c) 2026 Flipdare Pty Ltd. All rights reserved.
2#
3# This file is part of Flipdare's proprietary software and contains
4# confidential and copyrighted material. Unauthorised copying,
5# modification, distribution, or use of this file is strictly
6# prohibited without prior written permission from Flipdare Pty Ltd.
7#
8# This software includes third-party components licensed under MIT,
9# BSD, and Apache 2.0 licences. See THIRD_PARTY_NOTICES for details.
10#
12from google.cloud.firestore import Client as FirestoreClient
13from flipdare.app_log import LOG
14from flipdare.error.app_error import DatabaseError
15from flipdare.firestore._app_db import AppDb
16from flipdare.firestore.core.db_query import DbQuery, FieldOp, WhereField
17from flipdare.generated.model.content_model import ContentInternalKeys, ContentModel
18from flipdare.generated.shared.app_error_code import AppErrorCode
19from flipdare.generated.shared.firestore_collections import FirestoreCollections
20from flipdare.util.time_util import TimeUtil
21from flipdare.wrapper import ContentWrapper
23__all__ = ["ContentDb"]
25_CONTENT = FirestoreCollections.CONTENT.value
27_I = ContentInternalKeys
28_OP = FieldOp
31class ContentDb(AppDb[ContentWrapper, ContentModel]):
32 """Class for managing user profile content-related database operations."""
34 def __init__(self, client: FirestoreClient) -> None:
35 super().__init__(
36 client=client,
37 collection_name=FirestoreCollections.CONTENT,
38 wrapper_class=ContentWrapper,
39 model_class=ContentModel,
40 )
42 def get_recent_unprocessed(self, hours: int | None = None) -> list[ContentWrapper]:
43 if hours is None:
44 hours = self.def_window_hours
46 hours_ago = TimeUtil.get_utc_time_hours_ago(hours)
47 msg = (
48 f"Getting unprocessed content within the last {TimeUtil.formatted_dt(hours_ago)} hours"
49 )
50 LOG().debug(msg)
52 try:
53 and_fields = [
54 WhereField[_I](_I.PROCESSED, _OP.EQUAL, False),
55 WhereField[_I](_I.CREATED_AT, _OP.GREATER_THAN_OR_EQUAL, hours_ago),
56 ]
57 query = DbQuery.and_(and_fields)
59 content_docs = query.get_query(self.client, _CONTENT).stream()
60 contents = [
61 ContentWrapper.from_dict(data=data)
62 for doc in content_docs
63 if (data := self._cvt_snap_to_data(doc)) is not None
64 ]
66 LOG().debug(f"Retrieved {len(contents)} content items waiting for admin review.")
67 return contents
68 except Exception as e:
69 msg = f"Failed to get content items waiting for admin review: {e}"
70 raise DatabaseError(
71 msg,
72 error_code=AppErrorCode.DATABASE,
73 collection_name=_CONTENT,
74 ) from e