Coverage for functions \ flipdare \ job \ event_parser.py: 85%

104 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-05-08 12:22 +1000

1#!/usr/bin/env python 

2# Copyright (c) 2026 Flipdare Pty Ltd. All rights reserved. 

3# 

4# This file is part of Flipdare's proprietary software and contains 

5# confidential and copyrighted material. Unauthorised copying, 

6# modification, distribution, or use of this file is strictly 

7# prohibited without prior written permission from Flipdare Pty Ltd. 

8# 

9# This software includes third-party components licensed under MIT, 

10# BSD, and Apache 2.0 licences. See THIRD_PARTY_NOTICES for details. 

11# 

12 

13 

14from dataclasses import dataclass 

15from typing import cast 

16 

17from google.cloud.firestore_v1.base_document import DocumentSnapshot 

18 

19from flipdare.app_log import LOG 

20from flipdare.app_types import DatabaseDict, EventType 

21 

22__all__ = ["EventDict", "ChangeEventDict", "EventParser"] 

23 

24 

25@dataclass 

26class EventDict: 

27 data: DatabaseDict 

28 

29 

30@dataclass 

31class ChangeEventDict(EventDict): 

32 before: DatabaseDict 

33 after: DatabaseDict 

34 

35 

36class EventParser[D: EventDict]: 

37 def __init__(self, job_name: str, event: EventType, data_class: type[D]) -> None: 

38 self._job_name = job_name 

39 self._event = event 

40 self._dict_data: D | None = None 

41 self._errors: list[str] = [] 

42 self._data_class = data_class 

43 self._params = getattr(event, "params", None) 

44 

45 try: 

46 self._dict_data = cast("D", _RawEventParser(job_name, self._event).parse()) 

47 except Exception as e: 

48 cause = f"Failed to parse event data for {job_name}: {e!s}" 

49 LOG().error(cause) 

50 self._errors.append(cause) 

51 

52 @property 

53 def params(self) -> dict[str, str] | None: 

54 # A dictionary containing any wildcard values from the document path (e.g., userId). 

55 return self._params 

56 

57 @property 

58 def event_dict(self) -> D | None: 

59 if len(self._errors) > 0: 

60 return None 

61 if self._dict_data is None: 

62 return None 

63 

64 return self._dict_data 

65 

66 @property 

67 def errors(self) -> list[str]: 

68 return self._errors 

69 

70 @property 

71 def doc_id(self) -> str | None: 

72 data_dict = self.event_dict 

73 if data_dict is None: 

74 return None 

75 # if we get here, we know "id" is present because of validation in data_from_event 

76 # databaseDict is str,Any but id is always a string, so we can safely cast here 

77 return str(data_dict.data.get("id")) 

78 

79 

80class _RawEventParser: 

81 def __init__(self, job_name: str, event: EventType) -> None: 

82 self._job_name = job_name 

83 self._event = event 

84 

85 def parse(self) -> EventDict: # noqa: PLR0915, PLR0912 

86 job_name = self._job_name 

87 event_data = self._event.data 

88 

89 """Convert DocumentSnapshot to dict with error handling.""" 

90 if event_data is None: 

91 raise ValueError(f"No data available for {job_name}") 

92 

93 data: DatabaseDict | None = None 

94 before_data: DatabaseDict | None = None 

95 after_data: DatabaseDict | None = None 

96 doc_id: str | None = None 

97 

98 is_change: bool = False 

99 

100 if isinstance(event_data, dict): 

101 data = event_data 

102 doc_id = data.get("id") 

103 elif isinstance(event_data, DocumentSnapshot): 

104 data = event_data.to_dict() 

105 doc_id = event_data.id 

106 else: 

107 # Assume it's a Change object with before/after 

108 is_change = True 

109 before_data = event_data.before.to_dict() if event_data.before else None 

110 after_data = event_data.after.to_dict() if event_data.after else None 

111 data = after_data or before_data 

112 if event_data.after is not None: 

113 doc_id = event_data.after.id 

114 elif event_data.before is not None: 

115 doc_id = event_data.before.id 

116 if doc_id is None: 

117 # fallback to event param if available 

118 params = getattr(event_data, "params", None) 

119 if params is not None: 

120 doc_id = params.get("doc_id") 

121 

122 if data is None: 

123 cause = f"Could not convert DocumentSnapshot to dict for {job_name}" 

124 LOG().error(cause) 

125 raise ValueError(cause) 

126 if doc_id is None: 

127 cause = f"Document ID is missing from data for {job_name}" 

128 LOG().error(cause) 

129 raise ValueError(cause) 

130 

131 if "id" not in data: 

132 data["id"] = doc_id 

133 

134 if not is_change: 

135 # This means we have a create or delete event, so we can use data as the main data 

136 return EventDict(data=data) 

137 

138 # This means we have an update event, so we need to return both before and after data 

139 if after_data is None and before_data is None: 

140 cause = f"Both before and after data are missing for update event in {job_name}" 

141 LOG().error(cause) 

142 raise ValueError(cause) 

143 

144 if before_data is None: 

145 assert after_data is not None # for mypy 

146 before_data = after_data 

147 before_data["id"] = doc_id 

148 elif after_data is None: 

149 assert before_data is not None # for mypy 

150 after_data = before_data 

151 after_data["id"] = doc_id 

152 

153 return ChangeEventDict(data=data, before=before_data, after=after_data)