Coverage for functions \ flipdare \ firestore \ invite_db.py: 47%

70 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-05-08 12:22 +1000

1#!/usr/bin/env python 

2# Copyright (c) 2026 Flipdare Pty Ltd. All rights reserved. 

3# 

4# This file is part of Flipdare's proprietary software and contains 

5# confidential and copyrighted material. Unauthorised copying, 

6# modification, distribution, or use of this file is strictly 

7# prohibited without prior written permission from Flipdare Pty Ltd. 

8# 

9# This software includes third-party components licensed under MIT, 

10# BSD, and Apache 2.0 licences. See THIRD_PARTY_NOTICES for details. 

11# 

12 

13 

14from typing import Any 

15 

16from google.cloud.firestore import Client as FirestoreClient 

17from flipdare.app_log import LOG 

18from flipdare.constants import IS_DEBUG 

19from flipdare.error.app_error import DatabaseError 

20from flipdare.firestore._app_db import AppDb 

21from flipdare.firestore.core.db_query import DbQuery, FieldOp, WhereField 

22from flipdare.generated import InviteModel 

23from flipdare.generated.model.invite_model import InviteInternalKeys 

24from flipdare.generated.shared.app_error_code import AppErrorCode 

25from flipdare.generated.shared.firestore_collections import FirestoreCollections 

26from flipdare.util.time_util import TimeUtil 

27from flipdare.wrapper import InviteWrapper 

28 

29_INVITE: str = FirestoreCollections.INVITE.value 

30 

31__all__ = ["InviteDb"] 

32 

33_OP = FieldOp 

34_I = InviteInternalKeys 

35 

36 

37class InviteDb(AppDb[InviteWrapper, InviteModel]): 

38 """Class for managing invite-related database operations.""" 

39 

40 def __init__(self, client: FirestoreClient) -> None: 

41 super().__init__( 

42 client=client, 

43 collection_name=FirestoreCollections.INVITE, 

44 model_class=InviteModel, 

45 wrapper_class=InviteWrapper, 

46 ) 

47 

48 def mark_invite_signup(self, to_email: str) -> None: 

49 # there could be more than 1 invite to the same email, so we mark them all as processed 

50 # we also set REMINDER_SENT to True to avoid re-processing 

51 try: 

52 where_fields = [ 

53 WhereField[Any](_I.PROCESSED, _OP.EQUAL, False), 

54 WhereField[Any]("to_email", _OP.EQUAL, to_email), 

55 ] 

56 query = DbQuery.and_(where_fields=where_fields) 

57 results = query.get_query(self.client, _INVITE).get() 

58 

59 if len(results) == 0: 

60 LOG().debug(f"No unprocessed invites found for {to_email}") 

61 return 

62 

63 batch = self.client.batch() 

64 for doc in results: 

65 doc_ref = self.client.collection(_INVITE).document(doc.id) 

66 data: dict[str, Any] = {"processed": True, "reminder_sent": True} 

67 updates = InviteModel.validate_partial(**data) 

68 batch.update(doc_ref, updates) 

69 batch.commit() 

70 except Exception as error: 

71 msg = f"Error marking invites as processed for {to_email}." 

72 raise DatabaseError( 

73 error_code=AppErrorCode.DATABASE_EX, 

74 collection_name=_INVITE, 

75 document_id=None, 

76 message=msg, 

77 ) from error 

78 

79 def get_reminder_invites(self) -> list[InviteWrapper]: 

80 """Get invites from the last 7 days.""" 

81 LOG().debug("Getting invites from the last week") 

82 try: 

83 one_week_ago = TimeUtil.get_utc_time_days_ago(7) 

84 query = DbQuery.and_( 

85 where_fields=[ 

86 WhereField[_I](_I.CREATED_AT, _OP.GREATER_THAN_OR_EQUAL, one_week_ago), 

87 WhereField[_I](_I.REMINDER_SENT, _OP.EQUAL, False), 

88 ], 

89 ) 

90 results = query.get_query(self.client, _INVITE).get() 

91 

92 if len(results) == 0: 

93 LOG().debug("No invites found from the last week") 

94 return [] 

95 

96 invites = [ 

97 invite for doc in results if (invite := self._cvt_snap_to_model(doc)) is not None 

98 ] 

99 if IS_DEBUG: 

100 LOG().debug(f"Retrieved {len(invites)} invites that can be reminded.") 

101 return invites 

102 except Exception as error: 

103 msg = "Error getting invites from last week." 

104 raise DatabaseError( 

105 error_code=AppErrorCode.DATABASE_EX, 

106 collection_name=_INVITE, 

107 document_id=None, 

108 message=msg, 

109 ) from error 

110 

111 def get_recent_unprocessed_invites(self) -> list[InviteWrapper]: 

112 """Get unprocessed invites from the last day .""" 

113 LOG().debug("Getting invites from the last day") 

114 try: 

115 one_day_ago = TimeUtil.get_utc_time_days_ago(1) 

116 query = DbQuery.and_( 

117 where_fields=[ 

118 WhereField[_I](_I.CREATED_AT, _OP.GREATER_THAN_OR_EQUAL, one_day_ago), 

119 WhereField[_I](_I.PROCESSED, _OP.EQUAL, False), 

120 ], 

121 ) 

122 results = query.get_query(self.client, _INVITE).get() 

123 

124 if len(results) == 0: 

125 LOG().debug("No invites found from the last day") 

126 return [] 

127 

128 invites = [ 

129 invite for doc in results if (invite := self._cvt_snap_to_model(doc)) is not None 

130 ] 

131 if IS_DEBUG: 

132 LOG().debug(f"Retrieved {len(invites)} invites that can be processed.") 

133 

134 return invites 

135 except Exception as error: 

136 msg = "Error getting invites from last day." 

137 raise DatabaseError( 

138 error_code=AppErrorCode.DATABASE_EX, 

139 collection_name=_INVITE, 

140 document_id=None, 

141 message=msg, 

142 ) from error