Coverage for functions \ flipdare \ firestore \ context \ flag_context.py: 63%

94 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-05-08 12:22 +1000

1#!/usr/bin/env python 

2# Copyright (c) 2026 Flipdare Pty Ltd. All rights reserved. 

3# 

4# This file is part of Flipdare's proprietary software and contains 

5# confidential and copyrighted material. Unauthorised copying, 

6# modification, distribution, or use of this file is strictly 

7# prohibited without prior written permission from Flipdare Pty Ltd. 

8# 

9# This software includes third-party components licensed under MIT, 

10# BSD, and Apache 2.0 licences. See THIRD_PARTY_NOTICES for details. 

11# 

12 

13from __future__ import annotations 

14 

15from typing import TYPE_CHECKING, Any, override 

16 

17from flipdare.app_log import LOG 

18from flipdare.app_types import DatabaseDict 

19from flipdare.firestore.context._model_context import ModelContext 

20from flipdare.firestore.context._model_context_factory import ModelContextFactory 

21from flipdare.generated.model.issue.flag_model import FlagModel 

22 

23from flipdare.wrapper import FlagWrapper, UserWrapper 

24from flipdare.wrapper.persisted_guard import PersistedGuard 

25 

26if TYPE_CHECKING: 

27 from flipdare.manager.db_manager import DbManager 

28 

29 

30class FlagContextFactory(ModelContextFactory[FlagWrapper, "FlagContext"]): 

31 

32 def __init__(self, db_manager: DbManager | None = None) -> None: 

33 super().__init__(db_manager=db_manager) 

34 

35 @override 

36 def create(self, obj: Any) -> FlagContext | None: 

37 if isinstance(obj, FlagContext): 

38 return obj 

39 if PersistedGuard.is_flag(obj): 

40 # Unwrap PersistedWrapper to get the inner model 

41 return self._from_model(obj) 

42 if isinstance(obj, FlagModel): 

43 return self._from_model(obj) 

44 if isinstance(obj, str): 

45 return self._from_id(obj) 

46 

47 obj_data: DatabaseDict = obj # set explicity type for type checkers. 

48 return self._from_data(obj_data) 

49 

50 @override 

51 def _from_id(self, doc_id: str) -> FlagContext | None: 

52 flag_db = self.flag_db 

53 flag: FlagWrapper | None = None 

54 try: 

55 flag = flag_db.get(doc_id) 

56 if flag is None: 

57 # need at least the flag 

58 LOG().error(f"Flag {doc_id} not found in db.") 

59 return None 

60 

61 # Convert PersistedWrapper[FlagModel] to FlagWrapper 

62 flag_wrapper = FlagWrapper.from_model(flag.model) 

63 return self._from_model(flag_wrapper) 

64 except Exception: 

65 LOG().error(f"Error retrieving flag {doc_id} from db.") 

66 return None 

67 

68 @override 

69 def _from_data(self, data: DatabaseDict) -> FlagContext | None: 

70 flag: FlagWrapper | None = None 

71 try: 

72 flag = FlagWrapper.from_dict(data) 

73 except Exception: 

74 return None 

75 

76 return self._from_model(flag) 

77 

78 @override 

79 def _from_model(self, model: FlagWrapper | FlagModel) -> FlagContext | None: 

80 from_user = self.get_user(model.from_uid) # Returns UserWrapper | None 

81 to_user = self.get_user(model.to_uid) # Returns UserWrapper | None 

82 if isinstance(model, FlagModel): 

83 model = FlagWrapper.from_model(model) 

84 

85 return FlagContext(flag_model=model, from_user=from_user, to_user=to_user) 

86 

87 

88class FlagContext(ModelContext): 

89 

90 def __init__( 

91 self, 

92 flag_model: FlagWrapper, 

93 from_user: UserWrapper | None = None, 

94 to_user: UserWrapper | None = None, 

95 ) -> None: 

96 self._flag_model = flag_model 

97 self._from_user = from_user 

98 self._to_user = to_user 

99 super().__init__() 

100 

101 @property 

102 def flag(self) -> FlagWrapper: 

103 self._require_valid("access flag_model") 

104 return self._flag_model 

105 

106 @flag.setter 

107 def flag(self, new_flag: FlagWrapper) -> None: 

108 self._flag_model = new_flag 

109 

110 @property 

111 def from_user(self) -> UserWrapper: 

112 self._require_valid("access from_user") 

113 assert self._from_user is not None # narrowing 

114 return self._from_user 

115 

116 @property 

117 def to_user(self) -> UserWrapper: 

118 self._require_valid("access to_user") 

119 assert self._to_user is not None # narrowing 

120 return self._to_user 

121 

122 @property 

123 @override 

124 def doc_id(self) -> str: 

125 return self.flag.doc_id 

126 

127 @override 

128 def validate(self) -> bool: 

129 """Validate that all required models exist and have doc_ids.""" 

130 return ( 

131 self._is_model_valid(self._flag_model) 

132 and self._is_model_valid(self._from_user) 

133 and self._is_model_valid(self._to_user) 

134 ) 

135 

136 @property 

137 @override 

138 def _error_messages(self) -> list[str]: 

139 """Build list of validation errors.""" 

140 errors: list[str] = [] 

141 if err := self._validate_model(self._flag_model, "flag_model"): 

142 errors.append(err) 

143 if err := self._validate_model(self._from_user, "from_user"): 

144 errors.append(err) 

145 if err := self._validate_model(self._to_user, "to_user"): 

146 errors.append(err) 

147 

148 return errors