Coverage for functions \ flipdare \ search \ result \ typesense_payload.py: 81%
139 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-05-08 12:22 +1000
« prev ^ index » next coverage.py v7.13.0, created at 2026-05-08 12:22 +1000
1#!/usr/bin/env python
2# Copyright (c) 2026 Flipdare Pty Ltd. All rights reserved.
3#
4# This file is part of Flipdare's proprietary software and contains
5# confidential and copyrighted material. Unauthorised copying,
6# modification, distribution, or use of this file is strictly
7# prohibited without prior written permission from Flipdare Pty Ltd.
8#
9# This software includes third-party components licensed under MIT,
10# BSD, and Apache 2.0 licences. See THIRD_PARTY_NOTICES for details.
11#
14import re
15from dataclasses import dataclass
16from typing import Self, override
18from pydantic import BaseModel
20from flipdare.app_log import LOG
21from flipdare.app_types import JsonDict, TypesenseDict
22from flipdare.generated.model.search.result_hint_model import ResultHintModel
23from flipdare.search.doc.general_document import GeneralDocument
24from flipdare.search.result.typesense_model_loader import TypesenseModelLoader
25from flipdare.search.result.typesense_models import (
26 HighlightGuards,
27 HighlightType,
28 TArrayHighlightModel,
29 TResultModel,
30 TStringHighlightModel,
31)
34@dataclass(frozen=True, slots=True)
35class HitResult: # type: ignore[misc]
36 doc_id: str
37 document: JsonDict
38 highlights: list[HighlightType] | None = None
41class THintValue(BaseModel):
42 start: int
43 before: str
44 snippet: str
45 after: str
48class THint(BaseModel):
49 indices: list[int]
50 hints: list[THintValue]
53class TypesensePayload:
54 __slots__ = ("_model",)
55 _model: TResultModel
57 def __init__(
58 self,
59 model: TResultModel,
60 ) -> None:
61 self._model = model
63 @classmethod
64 def from_result(cls, result: TypesenseDict) -> Self:
65 model = TypesenseModelLoader(result).load()
66 return cls(model)
68 @property
69 def query(self) -> str:
70 return self._model.request_params.q
72 @property
73 def found(self) -> int:
74 return self._model.found
76 @property
77 def page(self) -> int:
78 return self._model.page
80 @property
81 def out_of(self) -> int:
82 return self._model.out_of
84 @property
85 def collection_name(self) -> str:
86 return self._model.request_params.collection_name
88 @property
89 def hits(self) -> list[HitResult]:
90 results = []
91 model_hits = self._model.hits
92 for model_hit in model_hits:
93 hit_result = HitResult(
94 doc_id=model_hit.document.get("id", ""),
95 document=model_hit.document,
96 highlights=model_hit.highlights,
97 )
98 results.append(hit_result)
99 return results
101 def general_docs(self) -> list[GeneralDocument]:
102 # Parse hits WITHOUT timestamp conversion for internal document objects
103 hits = self._model.hits
104 general_docs: list[GeneralDocument] = []
105 for hit in hits:
106 raw_doc = hit.document
107 if len(raw_doc) == 0:
108 continue
110 doc_id = raw_doc.get("id", None)
111 if doc_id is None:
112 LOG().debug(f"Document without ID found in search results: {raw_doc}")
113 continue
115 LOG().debug(f"Processing document from search results: {doc_id}")
116 try:
117 # Use raw document data directly without timestamp conversion
118 document = GeneralDocument.from_payload(doc_id, raw_doc)
119 if document.doc_id is not None:
120 general_docs.append(document)
121 continue
122 # technically, this should never happen because GeneralDocument.from_payload should throw.
123 msg = f"Failed to convert document ID {doc_id} to GeneralDocument: {raw_doc}"
124 LOG().error(msg)
125 continue
126 except Exception as e:
127 LOG().warning(f"Error converting document ID {doc_id} to search document: {e}")
128 continue
130 return general_docs
132 def hints(self) -> list[ResultHintModel]:
133 first = self._model.hits[0] if self._model.hits else None
134 if first is None:
135 return []
137 highlights = first.highlights
138 if highlights is None or len(highlights) == 0:
139 return []
141 if HighlightGuards.is_array_list(highlights):
142 return self._build_array_hints(highlights)
143 elif HighlightGuards.is_string_list(highlights):
144 return self._build_string_hint(highlights)
145 else:
146 LOG().warning(f"Unknown highlight type in search results: {highlights[0].kind}")
147 return []
149 def _build_array_hints(self, highlights: list[TArrayHighlightModel]) -> list[ResultHintModel]:
150 hints: list[ResultHintModel] = []
152 for highlight in highlights:
153 indices = highlight.indices
154 matched_tokens = highlight.matched_tokens
155 snippets = highlight.snippets
156 if len(indices) != len(matched_tokens) or len(indices) != len(snippets):
157 LOG().warning(
158 f"Highlight data length mismatch in hit highlights: "
159 f"indices={indices}, matched_tokens={matched_tokens}, snippets={snippets}",
160 )
161 continue
163 hint = self._build_hint(snippets[0], matched_tokens[0][0], indices[0])
164 if hint is not None:
165 hints.append(hint)
166 return hints
168 def _build_string_hint(self, highlights: list[TStringHighlightModel]) -> list[ResultHintModel]:
169 hints: list[ResultHintModel] = []
171 for highlight in highlights:
172 hint = self._build_raw_hint(highlight.snippet)
173 if hint is not None:
174 hints.append(hint)
175 return hints
177 @staticmethod
178 def _build_hint(snippet: str, matched: str, start_tag_idx: int) -> ResultHintModel | None:
179 if "<mark>" not in snippet or "</mark>" not in snippet:
180 return None
182 # 2. Get the "original" by removing ONLY the tags
183 original = re.sub(r"</?mark>", "", snippet)
185 # 3. Calculate the clean end index
186 # (The length of the matched word starting from the original start position)
187 start_idx = start_tag_idx
188 end_idx = start_idx + len(matched)
190 return ResultHintModel(start=start_idx, end=end_idx, matched=matched, text=original)
192 @staticmethod
193 def _build_raw_hint(snippet: str) -> ResultHintModel | None:
194 if "<mark>" not in snippet or "</mark>" not in snippet:
195 return None
197 match = re.search(r"<mark>(.*?)</mark>", snippet)
198 if not match:
199 return None
201 matched = match.group(1) # "searchable"
202 start_tag_idx = match.start() # 0
203 # end_tag_idx = match.end() # 22 (position after </mark>)
205 # 2. Get the "original" by removing ONLY the tags
206 original = re.sub(r"</?mark>", "", snippet)
208 # 3. Calculate the clean end index
209 # (The length of the matched word starting from the original start position)
210 start_idx = start_tag_idx
211 end_idx = start_idx + len(matched)
213 return ResultHintModel(start=start_idx, end=end_idx, matched=matched, text=original)
215 @override
216 def __str__(self) -> str:
217 return (
218 f"TypesensePayload(query={self.query}, collection_name={self.collection_name}, "
219 f"found={self.found}, page={self.page}, out_of={self.out_of}, "
220 f"hits_count={len(self.hits)})"
221 )
223 @override
224 def __repr__(self) -> str:
225 return self.__str__()