Coverage for functions \ flipdare \ firestore \ invite_db.py: 47%
70 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-05-08 12:22 +1000
« prev ^ index » next coverage.py v7.13.0, created at 2026-05-08 12:22 +1000
1#!/usr/bin/env python
2# Copyright (c) 2026 Flipdare Pty Ltd. All rights reserved.
3#
4# This file is part of Flipdare's proprietary software and contains
5# confidential and copyrighted material. Unauthorised copying,
6# modification, distribution, or use of this file is strictly
7# prohibited without prior written permission from Flipdare Pty Ltd.
8#
9# This software includes third-party components licensed under MIT,
10# BSD, and Apache 2.0 licences. See THIRD_PARTY_NOTICES for details.
11#
14from typing import Any
16from google.cloud.firestore import Client as FirestoreClient
17from flipdare.app_log import LOG
18from flipdare.constants import IS_DEBUG
19from flipdare.error.app_error import DatabaseError
20from flipdare.firestore._app_db import AppDb
21from flipdare.firestore.core.db_query import DbQuery, FieldOp, WhereField
22from flipdare.generated import InviteModel
23from flipdare.generated.model.invite_model import InviteInternalKeys
24from flipdare.generated.shared.app_error_code import AppErrorCode
25from flipdare.generated.shared.firestore_collections import FirestoreCollections
26from flipdare.util.time_util import TimeUtil
27from flipdare.wrapper import InviteWrapper
29_INVITE: str = FirestoreCollections.INVITE.value
31__all__ = ["InviteDb"]
33_OP = FieldOp
34_I = InviteInternalKeys
37class InviteDb(AppDb[InviteWrapper, InviteModel]):
38 """Class for managing invite-related database operations."""
40 def __init__(self, client: FirestoreClient) -> None:
41 super().__init__(
42 client=client,
43 collection_name=FirestoreCollections.INVITE,
44 model_class=InviteModel,
45 wrapper_class=InviteWrapper,
46 )
48 def mark_invite_signup(self, to_email: str) -> None:
49 # there could be more than 1 invite to the same email, so we mark them all as processed
50 # we also set REMINDER_SENT to True to avoid re-processing
51 try:
52 where_fields = [
53 WhereField[Any](_I.PROCESSED, _OP.EQUAL, False),
54 WhereField[Any]("to_email", _OP.EQUAL, to_email),
55 ]
56 query = DbQuery.and_(where_fields=where_fields)
57 results = query.get_query(self.client, _INVITE).get()
59 if len(results) == 0:
60 LOG().debug(f"No unprocessed invites found for {to_email}")
61 return
63 batch = self.client.batch()
64 for doc in results:
65 doc_ref = self.client.collection(_INVITE).document(doc.id)
66 data: dict[str, Any] = {"processed": True, "reminder_sent": True}
67 updates = InviteModel.validate_partial(**data)
68 batch.update(doc_ref, updates)
69 batch.commit()
70 except Exception as error:
71 msg = f"Error marking invites as processed for {to_email}."
72 raise DatabaseError(
73 error_code=AppErrorCode.DATABASE_EX,
74 collection_name=_INVITE,
75 document_id=None,
76 message=msg,
77 ) from error
79 def get_reminder_invites(self) -> list[InviteWrapper]:
80 """Get invites from the last 7 days."""
81 LOG().debug("Getting invites from the last week")
82 try:
83 one_week_ago = TimeUtil.get_utc_time_days_ago(7)
84 query = DbQuery.and_(
85 where_fields=[
86 WhereField[_I](_I.CREATED_AT, _OP.GREATER_THAN_OR_EQUAL, one_week_ago),
87 WhereField[_I](_I.REMINDER_SENT, _OP.EQUAL, False),
88 ],
89 )
90 results = query.get_query(self.client, _INVITE).get()
92 if len(results) == 0:
93 LOG().debug("No invites found from the last week")
94 return []
96 invites = [
97 invite for doc in results if (invite := self._cvt_snap_to_model(doc)) is not None
98 ]
99 if IS_DEBUG:
100 LOG().debug(f"Retrieved {len(invites)} invites that can be reminded.")
101 return invites
102 except Exception as error:
103 msg = "Error getting invites from last week."
104 raise DatabaseError(
105 error_code=AppErrorCode.DATABASE_EX,
106 collection_name=_INVITE,
107 document_id=None,
108 message=msg,
109 ) from error
111 def get_recent_unprocessed_invites(self) -> list[InviteWrapper]:
112 """Get unprocessed invites from the last day ."""
113 LOG().debug("Getting invites from the last day")
114 try:
115 one_day_ago = TimeUtil.get_utc_time_days_ago(1)
116 query = DbQuery.and_(
117 where_fields=[
118 WhereField[_I](_I.CREATED_AT, _OP.GREATER_THAN_OR_EQUAL, one_day_ago),
119 WhereField[_I](_I.PROCESSED, _OP.EQUAL, False),
120 ],
121 )
122 results = query.get_query(self.client, _INVITE).get()
124 if len(results) == 0:
125 LOG().debug("No invites found from the last day")
126 return []
128 invites = [
129 invite for doc in results if (invite := self._cvt_snap_to_model(doc)) is not None
130 ]
131 if IS_DEBUG:
132 LOG().debug(f"Retrieved {len(invites)} invites that can be processed.")
134 return invites
135 except Exception as error:
136 msg = "Error getting invites from last day."
137 raise DatabaseError(
138 error_code=AppErrorCode.DATABASE_EX,
139 collection_name=_INVITE,
140 document_id=None,
141 message=msg,
142 ) from error