Coverage for functions \ flipdare \ backend \ indexer_service.py: 39%
308 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-05-08 12:22 +1000
« prev ^ index » next coverage.py v7.13.0, created at 2026-05-08 12:22 +1000
1#!/usr/bin/env python
2# Copyright (c) 2026 Flipdare Pty Ltd. All rights reserved.
3#
4# This file is part of Flipdare's proprietary software and contains
5# confidential and copyrighted material. Unauthorised copying,
6# modification, distribution, or use of this file is strictly
7# prohibited without prior written permission from Flipdare Pty Ltd.
8#
9# This software includes third-party components licensed under MIT,
10# BSD, and Apache 2.0 licences. See THIRD_PARTY_NOTICES for details.
11#
14from dataclasses import dataclass
15from enum import Enum
16from typing import Any
18from flipdare.app_log import LOG
19from flipdare.constants import IS_DEBUG, MAX_SEARCH_TAG_COUNT, NO_DOC_ID
20from flipdare.result.app_result import AppResult
21from flipdare.service._service_provider import ServiceProvider
22from flipdare.core.tokenizer import Tokenizer
23from flipdare.firestore import DareContextFactory, FriendContext, GroupContextFactory
24from flipdare.generated import AppErrorCode
25from flipdare.generated.shared.search.search_obj_type import SearchObjType
26from flipdare.search import (
27 ContentSearchFactory,
28 DareSearchFactory,
29 FriendSearchFactory,
30 GeneralDocument,
31 GroupMemberSearchFactory,
32 GroupSearchFactory,
33 SearchDocument,
34 SearchDocumentFactory,
35 UserSearchFactory,
36)
37from flipdare.search.doc.friend_document import FriendDocument
38from flipdare.wrapper import (
39 ContentWrapper,
40 DareWrapper,
41 FriendWrapper,
42 GroupMemberWrapper,
43 GroupWrapper,
44 UserWrapper,
45)
47__all__ = ["IndexerService"]
50type IndexedDocType = GeneralDocument | FriendDocument
53class IndexerType(Enum):
54 NEW = "new"
55 UPDATE = "update"
56 DELETE = "delete"
59@dataclass
60class _GeneralOpData:
61 uid: str
62 obj_id: str
65@dataclass
66class _FriendOpData:
67 uid: str
68 friend_uid: str
71type _OpData = _GeneralOpData | _FriendOpData
74class IndexerService(ServiceProvider):
76 def __init__(
77 self,
78 tokenizer: Tokenizer | None = None,
79 max_tags: int = MAX_SEARCH_TAG_COUNT,
80 ) -> None:
81 super().__init__()
83 self._tokenizer = tokenizer
84 self._max_tags = max_tags
86 @property
87 def tokenizer(self) -> Tokenizer:
88 if self._tokenizer is None:
89 self._tokenizer = Tokenizer.instance()
90 return self._tokenizer
92 @property
93 def max_tags(self) -> int:
94 return self._max_tags
96 def delete_friend(self, friend: FriendWrapper) -> AppResult:
97 return self._delete(_FriendOpData(friend.to_uid, friend.from_uid))
99 def delete_all_friends(self, uid: str) -> AppResult:
100 # this will delete all friend documents for the user, which includes both from and to friends
101 return self._delete(_FriendOpData(uid, uid))
103 def delete_general(self, uid: str, obj_id: str) -> AppResult:
104 return self._delete(_GeneralOpData(uid, obj_id))
106 def process_content(
107 self,
108 content: ContentWrapper,
109 is_update: bool,
110 ) -> AppResult[ContentWrapper]:
111 doc_id = content.doc_id
112 content_result = AppResult[ContentWrapper](doc_id=doc_id)
114 content_id = content.doc_id
115 if not content.is_user and not content.is_group:
116 cause = "ContentWrapper objType is not USER or GROUP, cannot add to search."
117 content_result.add_error(AppErrorCode.INVALID_DATA, cause)
118 return content_result
119 if content.description is None:
120 msg = f"Content {content_id} has no description, skipping indexing."
121 return AppResult[ContentWrapper].skip(content_id, message=msg)
122 description = content.description
123 assert description # narrowing
125 search_document: SearchDocumentFactory | None = None
126 if content.is_user:
127 user_result = self.user_bridge.get(content.obj_id)
128 if user_result.is_error:
129 msg = f"Unable to retrieve user {content.obj_id} for content {content_id}"
130 content_result.add_error(AppErrorCode.DATABASE_EX, msg)
131 return content_result
133 user_model = user_result.generated
134 assert user_model # narrowing
136 user_id = user_model.doc_id
137 if user_model.model.can_share:
138 search_document = ContentSearchFactory(
139 content,
140 description,
141 user_model,
142 self.tokenizer,
143 )
144 else:
145 if not is_update:
146 LOG().debug(f"User {user_id} is not searchable, skipping.")
147 content_result.merge(user_result)
148 return content_result
150 # Delete existing search entries if the user is no longer searchable
151 LOG().debug(f"User {user_id} is no longer searchable, deleting from search.")
152 delete_result = self._delete(_GeneralOpData(content_id, SearchObjType.USER))
153 content_result.merge(user_result)
154 content_result.merge(delete_result)
155 return content_result
156 else:
157 group_result = self.group_bridge.get(content.obj_id)
158 if group_result.is_error:
159 msg = f"Unable to retrieve group {content.obj_id} for content {content_id}"
160 content_result.add_error(AppErrorCode.DATABASE_EX, msg)
161 return content_result
163 group_model = group_result.generated
164 if group_model is None:
165 msg = f"No group found for content {content_id} with group id {content.obj_id}"
166 content_result.add_error(AppErrorCode.NOT_FOUND, msg)
167 return content_result
169 search_document = ContentSearchFactory(
170 content,
171 description,
172 group_model,
173 self.tokenizer,
174 )
176 try:
177 result = self._add(search_document, is_update=is_update)
178 if not result.is_error:
179 return content_result
181 content_result.merge(result)
182 return content_result
183 except Exception as error:
184 msg = f"Error building searchable user for user {content_id}: {error}"
185 content_result.add_error(AppErrorCode.SERVER_EX, msg)
186 return content_result
188 def process_friend(
189 self,
190 friend_context: FriendContext,
191 updated: bool,
192 ) -> AppResult[FriendWrapper]:
194 doc_id = friend_context.doc_id
195 friend_id = friend_context.friend_id
197 friend_result: AppResult[FriendWrapper] = AppResult(
198 doc_id=doc_id,
199 task_name=f" for friend {friend_id}",
200 )
202 if IS_DEBUG:
203 LOG().debug(f"Processing friend {friend_id} for search.")
205 try:
206 model_tags = FriendSearchFactory(friend_context, self.tokenizer)
207 result = self._add(model_tags, updated)
208 if not result.is_error:
209 return friend_result
211 friend_result.merge(result)
212 except Exception as error:
213 msg = f"Error building searchable friend for friend {friend_id}: {error}"
214 friend_result.add_error(AppErrorCode.SERVER_EX, msg)
216 return friend_result
218 def process_user(self, user: UserWrapper, is_update: bool) -> AppResult[UserWrapper]:
219 user_id = user.doc_id
220 user_result: AppResult[UserWrapper] = AppResult(
221 doc_id=user_id,
222 task_name=f" for user {user_id}",
223 )
225 if not user.model.can_share:
226 if not is_update:
227 LOG().debug(f"User {user_id} is not searchable, skipping.")
228 return user_result
230 # Delete existing search entries if the user is no longer searchable
231 LOG().debug(f"User {user_id} is no longer searchable, deleting from search.")
232 delete_result = self._delete(_GeneralOpData(user_id, SearchObjType.USER))
233 user_result.merge(delete_result)
234 return user_result
236 try:
237 model_tags = UserSearchFactory(user, self.tokenizer)
238 result = self._add(model_tags, is_update=is_update)
239 if not result.is_error:
240 return user_result
242 user_result.merge(result)
243 return user_result
244 except Exception as error:
245 msg = f"Error building searchable user for user {user_id}: {error}"
246 user_result.add_error(AppErrorCode.SERVER_EX, msg)
247 return user_result
249 def process_dare(self, dare_model: DareWrapper, is_update: bool) -> AppResult[DareWrapper]:
250 dare_id = dare_model.doc_id
251 dare_result: AppResult[DareWrapper] = AppResult(
252 doc_id=dare_id,
253 task_name=f" for dare {dare_id}",
254 )
256 obj_type = SearchObjType.GROUP_DARE if dare_model.is_group_dare else SearchObjType.DARE
257 dare_context = DareContextFactory().create(dare_model)
258 if dare_context is None:
259 msg = f"Unable to create dare context for dare id: {dare_id}"
260 dare_result.add_error(AppErrorCode.INVALID_DATA, msg)
261 return dare_result
263 if not dare_context.valid:
264 msg = f"Invalid dare context for dare id: {dare_id}"
265 msg += f"\n{dare_context.error_str}"
267 dare_result.add_error(AppErrorCode.INVALID_DATA, msg)
268 return dare_result
270 dare = dare_context.dare
271 if not dare.can_share:
272 if not is_update:
273 LOG().info(f"Dare {dare_id} is not searchable, skipping.")
274 return dare_result
276 # Delete existing search entries if the dare is no longer searchable
277 LOG().info(f"Dare {dare_id} is no longer searchable, deleting from search.")
278 delete_result = self._delete(_GeneralOpData(dare_id, obj_type))
279 dare_result.merge(delete_result)
280 return dare_result
282 try:
283 model_tags = DareSearchFactory(dare_context, self.tokenizer)
284 result = self._add(model_tags, is_update)
285 if result.is_error:
286 dare_result.merge(result)
287 return dare_result
288 except Exception as error:
289 msg = f"Error building searchable dare for dare {dare_id}: {error}\n"
290 msg += f"Context:\n{dare_context.error_str}"
291 dare_result.add_error(AppErrorCode.TAGGING, msg)
292 return dare_result
294 def process_group(
295 self,
296 group_model: GroupWrapper,
297 is_update: bool,
298 ) -> AppResult[GroupWrapper]:
299 group_id = group_model.doc_id
300 group_result: AppResult[GroupWrapper] = AppResult(
301 doc_id=group_id,
302 task_name=f" for group {group_id}",
303 )
305 try:
306 group_context = GroupContextFactory().create(group_model)
307 if group_context is None or not group_context.valid:
308 msg = f"Unable to create valid group context for group id: {group_id}"
309 msg += f"\n{group_context.error_str}" if group_context else ""
310 group_result.add_error(AppErrorCode.INVALID_DATA, msg)
311 return group_result
313 model_tags = GroupSearchFactory(group_context, self.tokenizer)
314 result = self._add(model_tags, is_update)
315 if result.is_error:
316 group_result.merge(result)
317 return group_result
319 members: list[GroupMemberWrapper] | None = None
320 try:
321 members = group_context.members()
322 if members is None:
323 # could just be a new group
324 return group_result
325 except Exception as error:
326 msg = f"Error retrieving members for group {group_id}: {error}"
327 group_result.add_error(AppErrorCode.SERVER_EX, msg)
328 return group_result
330 for i in range(len(members)):
331 if i >= self._max_tags:
332 LOG().warning(
333 f"Group {group_id} has more than {self._max_tags} members, "
334 f"skipping remaining members for search tagging.",
335 )
336 break
337 try:
338 member_tags = GroupMemberSearchFactory(group_context, i, self.tokenizer)
339 result = self._add(member_tags, is_update=is_update)
340 if result.is_error:
341 group_result.merge(result)
342 except Exception as error:
343 msg = f"Error creating GroupMemberModelTag for member index {i} in group {group_id}: {error}"
344 LOG().error(msg)
345 continue
346 return group_result
347 except Exception as error:
348 group_result.add_error(
349 AppErrorCode.SERVER_EX,
350 f"Error building group context for group data: {error}",
351 )
352 return group_result
354 def _add(self, doc_factory: SearchDocumentFactory, is_update: bool) -> AppResult:
355 all_docs: list[SearchDocument[Any]] | None = doc_factory.get_documents()
356 result: AppResult = AppResult(task_name=f"for obj {doc_factory.obj_type}")
358 if all_docs is None:
359 LOG().debug("No search documents generated, skipping.")
360 return result
362 if IS_DEBUG:
363 LOG().debug(f"Adding {len(all_docs)} documents to search (is_update={is_update}).")
365 for doc in all_docs:
366 debug_str = doc.debug_str
368 if is_update:
369 # not we put the larger code at the start, because we may need to create ...
370 try:
371 if isinstance(doc, (GeneralDocument, FriendDocument)):
372 changed = self._has_changed(doc.uid, doc)
373 else:
374 result.add_error(
375 AppErrorCode.INVALID_DATA,
376 f"Unsupported document type: {type(doc)}",
377 )
378 continue
380 if changed is None:
381 msg = f"No existing document found for {debug_str}, creating new document."
382 LOG().info(msg)
383 else:
384 _, has_changed = changed
385 if has_changed:
386 self._update(doc)
387 except Exception as error:
388 msg = f"Error checking for changes and updating document for {debug_str}: {error}"
389 LOG().error(msg)
390 result.add_error(AppErrorCode.SERVER_EX, msg)
392 try:
393 self._create(doc)
394 except Exception as error:
395 result.add_error(
396 AppErrorCode.SERVER_EX,
397 f"Error adding new document for {debug_str}: {error}",
398 )
400 return result
402 def _create(self, doc: SearchDocument[Any]) -> AppResult:
403 result: AppResult = AppResult(
404 doc_id=doc.doc_id or NO_DOC_ID,
405 task_name=f"for user {doc.uid} and obj {doc.obj_type}",
406 )
407 try:
408 if isinstance(doc, GeneralDocument):
409 self.search_manager.general.create(doc)
410 elif isinstance(doc, FriendDocument):
411 self.search_manager.friend.create(doc)
412 else:
413 result.add_error(
414 AppErrorCode.INVALID_DATA,
415 f"Unsupported document type: {type(doc)}",
416 )
417 return result
418 except Exception as error:
419 result.add_error(
420 AppErrorCode.SERVER_EX,
421 f"Error adding new document for {doc.debug_str}: {error}",
422 )
424 return result
426 def _update(self, doc: SearchDocument[Any]) -> AppResult:
427 result: AppResult = AppResult(
428 doc_id=doc.doc_id or NO_DOC_ID,
429 task_name=f"for user {doc.uid} and obj {doc.obj_type}",
430 )
431 try:
432 if isinstance(doc, GeneralDocument):
433 self.search_manager.general.update(doc)
434 elif isinstance(doc, FriendDocument):
435 self.search_manager.friend.update(doc)
436 else:
437 result.add_error(
438 AppErrorCode.INVALID_DATA,
439 f"Unsupported document type: {type(doc)}",
440 )
441 return result
442 except Exception as error:
443 result.add_error(
444 AppErrorCode.SERVER_EX,
445 f"Error updating document for {doc.debug_str}: {error}",
446 )
448 return result
450 def _get(self, uid: str, doc: IndexedDocType) -> IndexedDocType | None:
451 mgr = self.search_manager
453 match doc:
454 case GeneralDocument():
455 general = mgr.general
456 return general.get_user_type(
457 uid=uid,
458 identifier=doc.obj_id,
459 )
460 case FriendDocument():
461 friend = mgr.friend
462 return friend.get_user_type(
463 uid=uid,
464 identifier=doc.friend_uid,
465 )
467 def _has_changed(self, uid: str, doc: IndexedDocType) -> tuple[str, bool] | None:
468 mgr = self.search_manager
469 saved_doc: IndexedDocType | None
471 obj_id: str
472 match doc:
473 case GeneralDocument():
474 obj_id = doc.obj_id
475 general = mgr.general
476 saved_doc = general.get_user_type(
477 uid=uid,
478 identifier=doc.obj_id,
479 )
480 case FriendDocument():
481 friend = mgr.friend
482 obj_id = doc.friend_uid
483 saved_doc = friend.get_user_type(
484 uid=uid,
485 identifier=doc.friend_uid,
486 )
488 if saved_doc is None:
489 # no doc, has_changed = True, so we add
490 return None
492 doc_id = saved_doc.doc_id
493 assert doc_id is not None # narrowing
495 if saved_doc.payload_equal(doc):
496 LOG().debug(f"Document for {doc_id}/{obj_id} unchanged.")
497 return doc_id, False
499 return doc_id, True
501 def _delete(self, data: _OpData) -> AppResult:
502 result: AppResult = AppResult(task_name=f"for user {data.uid}")
503 mgr = self.search_manager
505 match data:
506 case _GeneralOpData():
507 mgr.general.delete_user_type(
508 uid=data.uid,
509 identifier=data.obj_id,
510 )
511 case _FriendOpData():
512 mgr.friend.delete_user_type(
513 uid=data.uid,
514 identifier=data.friend_uid,
515 )
517 return result