Coverage for functions \ flipdare \ job \ trigger_data.py: 81%
151 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-05-08 12:22 +1000
« prev ^ index » next coverage.py v7.13.0, created at 2026-05-08 12:22 +1000
1#!/usr/bin/env python
2# Copyright (c) 2026 Flipdare Pty Ltd. All rights reserved.
3#
4# This file is part of Flipdare's proprietary software and contains
5# confidential and copyrighted material. Unauthorised copying,
6# modification, distribution, or use of this file is strictly
7# prohibited without prior written permission from Flipdare Pty Ltd.
8#
9# This software includes third-party components licensed under MIT,
10# BSD, and Apache 2.0 licences. See THIRD_PARTY_NOTICES for details.
11#
13"""
14Firestore trigger event validation and data extraction.
16VERSION Field Tracking:
17-----------------------
18All models include a VERSION field (integer) used to determine the source of updates:
20- Backend/Server Updates: VERSION field remains unchanged
21 * Triggered by Cloud Functions
22 * Triggered by admin operations
23 * Triggered by scheduled jobs
25- User/Client Updates: VERSION field increments
26 * Triggered by mobile app
27 * Triggered by web app
28 * Triggered by direct user actions
30The UpdateTriggerData.version_changed() method checks this field to allow
31triggers to differentiate between user actions and backend processing.
32"""
34from __future__ import annotations
36from typing import TYPE_CHECKING, Any, cast, override
38from flipdare.app_log import LOG
39from flipdare.app_types import DatabaseDict, EventType
40from flipdare.constants import NO_DOC_ID
41from flipdare.generated.shared.backend.app_job_type import AppJobType
42from flipdare.result.app_result import AppResult
43from flipdare.generated.shared.app_error_code import AppErrorCode
44from flipdare.job.event_parser import ChangeEventDict, EventDict, EventParser
46if TYPE_CHECKING:
47 from flipdare.wrapper import PersistedWrapper
49__all__ = ["TriggerData", "SubCollectionTriggerData", "UpdateTriggerData"]
52class TriggerData[W: PersistedWrapper[Any], D: EventDict = EventDict]:
53 def __init__(
54 self,
55 job_type: AppJobType,
56 event: EventType,
57 wrapper_class: type[W],
58 data_class: type[D] | None = None,
59 ) -> None:
60 self._job_type = job_type
61 self._wrapper_class = wrapper_class
62 self._event = event
63 data_class = data_class or cast("type[D]", EventDict)
65 event_parser = EventParser[D](job_type, event, data_class)
66 self._result = event_parser.errors or event_parser.event_dict
67 self._params = event_parser.params
68 self._doc_id = event_parser.doc_id
70 @property
71 def job_type(self) -> AppJobType:
72 return self._job_type
74 @property
75 def event(self) -> EventType:
76 return self._event
78 @property
79 def wrapper_class(self) -> type[W]:
80 return self._wrapper_class
82 @property
83 def is_error(self) -> bool:
84 return isinstance(self._result, list) and len(self._result) > 0
86 @property
87 def doc_id(self) -> str | None:
88 return self._doc_id
90 @property
91 def params(self) -> dict[str, str] | None:
92 return self._params
94 @property
95 def errors(self) -> list[str] | None:
96 if self.is_error:
97 return cast("list[str]", self._result)
98 return None
100 def error_str(self) -> str | None:
101 errors = self.errors
102 if not errors:
103 return None
104 return f"Trigger event validation errors for {self._job_type}: {'; '.join(errors)}"
106 @property
107 def event_dict(self) -> D | None:
108 if self.is_error:
109 return None
110 if isinstance(self._result, EventDict):
111 return self._result
112 return None
114 @property
115 def data(self) -> DatabaseDict | None:
116 if self.is_error:
117 return None
118 return self.event_dict.data if self.event_dict else None
120 def _create_wrapper_from_dict(
121 self,
122 data: DatabaseDict | None,
123 is_before: bool = False,
124 ) -> W | None:
125 """
126 Helper method to safely create a model from dict data.
128 Args:
129 data: Dictionary data to create model from
130 is_before: If True, this is before state (less critical)
132 Returns:
133 Model instance or None if creation fails
135 """
136 if data is None:
137 LOG().error(f"No data found in trigger event for {self._job_type}")
138 return None
140 # Inject document ID from event path (trigger data doesn't include it)
141 if "id" not in data and self._doc_id:
142 data = data.copy() # Don't modify original
143 data["id"] = self._doc_id
145 try:
146 return self._wrapper_class.from_dict(data)
147 except Exception as e:
148 msg = f"Error creating {'before ' if is_before else ''}model from trigger data: {e}"
149 LOG().error(msg)
150 return None
152 @property
153 def wrapper(self) -> W | None:
154 dict_data = self.event_dict.data if self.event_dict else None
155 return self._create_wrapper_from_dict(dict_data)
157 def valid(self) -> AppResult[W]:
158 doc_id = self.doc_id or NO_DOC_ID
159 app_result = AppResult[W](doc_id=doc_id, task_name=f"validate_trigger_{self._job_type}")
161 if self.is_error:
162 msg = f"Trigger {self._job_type.value} called with invalid event data: {self.errors}"
163 app_result.add_error(AppErrorCode.TRIGGER, msg)
164 return app_result
166 if self.wrapper is None:
167 msg = f"Trigger {self._job_type.value} failed to create wrapper from event data"
168 app_result.add_error(AppErrorCode.CREATE_FAILED, msg)
169 return app_result
171 return app_result
174class SubCollectionTriggerData[W: PersistedWrapper[Any]](TriggerData[W]):
175 """Validator for Firestore sub-collection trigger events."""
177 def __init__(
178 self,
179 job_name: AppJobType,
180 event: EventType,
181 wrapper_class: type[W],
182 parent_key: str = "parent_id",
183 ) -> None:
184 self._parent_key = parent_key
186 # NOTE: call this last because it calls _validate.
187 super().__init__(
188 job_type=job_name,
189 event=event,
190 wrapper_class=wrapper_class,
191 )
193 @property
194 def parent_key(self) -> str:
195 return self._parent_key
197 @override
198 def valid(self) -> AppResult[W]:
199 app_result = super().valid()
200 if app_result.is_error:
201 return app_result
203 # Check parent ID key in params
204 parent_id_check = self._parent_id_check()
205 if parent_id_check.is_error:
206 app_result.merge(parent_id_check)
208 return app_result
210 def _parent_id_check(self) -> AppResult[W]:
211 """Validate that required parent ID key exists in params."""
212 parent_id_key = self.parent_key
213 params = self._params
215 doc_id = self.doc_id or NO_DOC_ID
216 app_result: AppResult[W] = AppResult(
217 doc_id=doc_id,
218 task_name=f"Validate parent ID for {self._job_type}",
219 )
221 if params is None:
222 msg = f"Trigger {self._job_type.value} called with missing params: expected {parent_id_key}"
223 app_result.add_error(AppErrorCode.TRIGGER, msg)
224 return app_result
226 if parent_id_key not in params:
227 msg = f"Trigger {self._job_type.value} called with missing {parent_id_key} in params: {params}"
228 app_result.add_error(AppErrorCode.TRIGGER, msg)
230 return app_result
233class UpdateTriggerData[W: PersistedWrapper[Any]](TriggerData[W, ChangeEventDict]):
234 """
235 Validator for Firestore update triggers with before/after comparison.
237 Extends TriggerData to handle document updates that provide both before and after states.
239 Type Parameters:
240 TModel: The wrapper class (e.g., UserWrapper, DareWrapper)
242 Returns:
243 - model: PersistedWrapper[Any] representing the AFTER state
244 - before_model: PersistedWrapper[Any] representing the BEFORE state
246 """
248 def __init__(
249 self,
250 job_name: AppJobType,
251 event: EventType,
252 wrapper_class: type[W],
253 ) -> None:
254 super().__init__(
255 job_type=job_name,
256 event=event,
257 wrapper_class=wrapper_class,
258 data_class=ChangeEventDict,
259 )
261 def updates(self) -> DatabaseDict | None:
262 """Get dictionary of changed fields between before and after."""
263 event_dict = self.event_dict
264 if event_dict is None:
265 LOG().error("Error accessing event dict for updates")
266 return None
268 before_data = event_dict.before
269 if not before_data:
270 return None
271 after_data = event_dict.after
272 if not after_data:
273 return None
275 # Return only changed fields
276 updates: DatabaseDict = {}
277 for key, after_value in after_data.items():
278 before_value = before_data.get(key)
279 if before_value != after_value:
280 updates[key] = after_value
282 return updates
284 @property
285 def before_wrapper(self) -> W | None:
286 event_dict = self.event_dict
287 if event_dict is None:
288 return None
289 before_data = event_dict.before
290 return self._create_wrapper_from_dict(before_data, is_before=True)
292 @property
293 def before_data(self) -> DatabaseDict | None:
294 event_dict = self.event_dict
295 if event_dict is None:
296 return None
297 return event_dict.before