Coverage for functions \ flipdare \ service \ processor \ group_member_processor.py: 32%
82 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-05-08 12:22 +1000
« prev ^ index » next coverage.py v7.13.0, created at 2026-05-08 12:22 +1000
1#!/usr/bin/env python
2# Copyright (c) 2026 Flipdare Pty Ltd. All rights reserved.
3#
4# This file is part of Flipdare's proprietary software and contains
5# confidential and copyrighted material. Unauthorised copying,
6# modification, distribution, or use of this file is strictly
7# prohibited without prior written permission from Flipdare Pty Ltd.
8#
9# This software includes third-party components licensed under MIT,
10# BSD, and Apache 2.0 licences. See THIRD_PARTY_NOTICES for details.
11#
14from pathlib import Path
15from typing import TYPE_CHECKING, Any
17from google.cloud.storage.bucket import Bucket as StorageBucket
18from flipdare.service.notification_service import NotificationService
19from flipdare.service.processor._processor_mixin import ProcessorMixin
20from flipdare.app_log import LOG
21from flipdare.constants import DOWNLOAD_FILE_DIR, NO_DOC_ID
22from flipdare.result.app_result import AppResult
23from flipdare.service.core.step_processor import ProcessingStep, StepProcessor
24from flipdare.firestore.context.group_member_context import (
25 GroupMemberContext,
26 GroupMemberContextFactory,
27)
28from flipdare.firestore.group_db import GroupDb
29from flipdare.generated import AppErrorCode
30from flipdare.generated.model.group_member_model import GroupMemberKeys
31from flipdare.wrapper import GroupMemberWrapper
33if TYPE_CHECKING:
35 from flipdare.backend.indexer_service import IndexerService
37_M = GroupMemberKeys
40class GroupMemberProcessor(ProcessorMixin):
42 def __init__(
43 self,
44 bucket: StorageBucket,
45 group_db: GroupDb,
46 indexer_service: "IndexerService",
47 notification_service: NotificationService,
48 local_path: Path = DOWNLOAD_FILE_DIR,
49 ) -> None:
50 super().__init__(bucket=bucket, local_path=local_path)
51 self._indexer_service = indexer_service
52 self._group_db = group_db
53 self._notification_service = notification_service
55 @property
56 def indexer_service(self) -> "IndexerService":
57 return self._indexer_service
59 @property
60 def group_db(self) -> GroupDb:
61 return self._group_db
63 def _update_member(
64 self,
65 parent_id: str,
66 member: GroupMemberWrapper,
67 ) -> AppResult[GroupMemberWrapper]:
68 """Save updated member to the database."""
69 main_result = AppResult[GroupMemberWrapper](
70 task_name=f"UpdateGroupMember for {member.doc_id or NO_DOC_ID}",
71 )
73 try:
74 self.group_db.update_member(
75 group_id=parent_id,
76 member_id=member.doc_id,
77 member_data=member,
78 )
79 main_result.generated = member
80 except Exception as e:
81 msg = f"Exception updating GroupMember {member.doc_id}: {e}"
82 main_result.add_error(AppErrorCode.DATABASE_EX, msg, extra=member.to_json_dict())
84 return main_result
86 def process_group_member(
87 self,
88 group_member: GroupMemberWrapper,
89 ) -> AppResult[GroupMemberWrapper]:
90 group_member_id = group_member.doc_id
91 main_result = AppResult[GroupMemberWrapper](
92 task_name=f"ProcessGroupMember for {group_member_id or NO_DOC_ID}",
93 )
95 # Check if already complete
96 if group_member.processing_complete:
97 msg = f"GroupMember already processed for {group_member_id}"
98 LOG().info(msg)
99 return AppResult[GroupMemberWrapper].ok(doc_id=group_member_id, message=msg)
101 # Use StepProcessor for the workflow
102 processor = self._member_processor(group_member)
103 if processor is None:
104 main_result.add_error(
105 AppErrorCode.PROCESSING_STEP,
106 f"Failed to build processor for GroupMember {group_member_id}",
107 )
108 return main_result
110 result = processor.execute()
111 if result.is_error:
112 main_result.merge(result)
114 return main_result
116 def _member_processor(
117 self,
118 member: GroupMemberWrapper,
119 ) -> StepProcessor[GroupMemberWrapper] | None:
120 member_id = member.doc_id
121 steps = [
122 ProcessingStep[_M, GroupMemberWrapper](
123 state_key=_M.REQUEST_NOTIFICATION_SENT,
124 handler=lambda m: self._send_notif_step(m, is_request=True),
125 description="Add request notification",
126 required=True,
127 ),
128 ProcessingStep[_M, GroupMemberWrapper](
129 state_key=_M.STATUS_NOTIFICATION_SENT,
130 handler=lambda m: self._send_notif_step(m, is_request=False),
131 description="Add status notification",
132 required=True,
133 ),
134 ]
136 return StepProcessor(
137 wrapper=member,
138 steps=steps,
139 save_handler=lambda m: self._update_member(parent_id=member_id, member=m),
140 process_name=f"process_member_{member_id}",
141 )
143 def _send_notif_step(
144 self,
145 group_member: GroupMemberWrapper,
146 is_request: bool,
147 ) -> AppResult[Any]:
148 """Retrieve context information for a group member."""
149 main_result = AppResult[Any](
150 task_name=f"GetGroupContext for {group_member.doc_id or NO_DOC_ID}",
151 )
153 context_result = self._get_member_context(group_member)
154 if context_result.is_error:
155 main_result.merge(context_result)
156 return main_result
158 context = context_result.generated
159 assert context
161 try:
162 self._notification_service.send_group_notif(context, is_request=is_request)
163 except Exception as e:
164 msg = f"Exception sending notification for member {group_member.doc_id}: {e}"
165 main_result.add_error(AppErrorCode.NOTIFICATION, msg)
167 return main_result
169 def _get_member_context(self, member: GroupMemberWrapper) -> AppResult[GroupMemberContext]:
170 """Create FriendContext from FriendModel."""
171 doc_id = member.doc_id
172 main_result = AppResult[GroupMemberContext](doc_id=doc_id)
174 try:
175 member_context = GroupMemberContextFactory().create(member)
176 if member_context is not None:
177 main_result.generated = member_context
178 else:
179 cause = f"Failed to build GroupMemberContext for {member.doc_id}"
180 main_result.add_error(
181 AppErrorCode.CONTEXT,
182 cause,
183 extra=member.to_json_dict(),
184 )
185 except Exception as e:
186 cause = f"Exception creating GroupMemberContext for {member.doc_id}: {e}"
187 main_result.add_error(AppErrorCode.CONTEXT, cause, extra=member.to_json_dict())
189 return main_result