Coverage for functions \ flipdare \ manager \ search_manager.py: 100%

0 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-05-08 12:22 +1000

1#!/usr/bin/env python 

2# Copyright (c) 2026 Flipdare Pty Ltd. All rights reserved. 

3# 

4# This file is part of Flipdare's proprietary software and contains 

5# confidential and copyrighted material. Unauthorised copying, 

6# modification, distribution, or use of this file is strictly 

7# prohibited without prior written permission from Flipdare Pty Ltd. 

8# 

9# This software includes third-party components licensed under MIT, 

10# BSD, and Apache 2.0 licences. See THIRD_PARTY_NOTICES for details. 

11# 

12 

13from __future__ import annotations 

14import flask 

15import typesense 

16from typesense.exceptions import ObjectNotFound, TypesenseClientError 

17from flipdare.app_log import LOG 

18from flipdare.constants import SEARCH_NAT_LANG_CONFIG 

19from flipdare.core.singleton import Singleton 

20from flipdare.error.app_error import AppError, ServerError 

21from flipdare.error.error_context import ErrorContext 

22from flipdare.generated.shared.app_error_code import AppErrorCode 

23from flipdare.generated.shared.app_log_category import AppLogCategory 

24from flipdare.generated.shared.search.search_collections import SearchCollections 

25from flipdare.request.app_request import AppRequest 

26from flipdare.request.request_types import AppHttpRequestType 

27from flipdare.core.app_response import AppOkResponse 

28from flipdare.search.db.app_friend_search import AppFriendSearch 

29from flipdare.search.db.app_general_search import AppGeneralSearch 

30 

31__all__ = ["SearchManager"] 

32 

33 

34class SearchManager(Singleton): 

35 """ 

36 Primarily responsible for creation and management of search clients and collections. 

37 Also provides helper methods for common search operations. 

38 

39 Maintenance Tasks: 

40 

41 Blue/Green Re-index Every 3 days (2:00 AM) 

42 Ensures your index is 100% in sync with your primary 

43 DB and fixes any minor drift. 

44 

45 Database Compaction Weekly (or after Re-index) 0 4 * * 0 (4:00 AM Sun) 

46 Only needed if you have high write/delete volume. 

47 Running it weekly is sufficient for most. 

48 """ 

49 

50 def __init__( 

51 self, 

52 client: typesense.Client | None = None, 

53 general: AppGeneralSearch | None = None, 

54 friend: AppFriendSearch | None = None, 

55 ) -> None: 

56 super().__init__() 

57 

58 self._client = client 

59 self._general = general 

60 self._friend = friend 

61 

62 @property 

63 def client(self) -> typesense.Client: 

64 if self._client is None: 

65 self._client = self._create_typesense_client() 

66 

67 return self._client 

68 

69 @property 

70 def general(self) -> AppGeneralSearch: 

71 if self._general is None: 

72 self._general = AppGeneralSearch(self.client) 

73 

74 return self._general 

75 

76 @property 

77 def friend(self) -> AppFriendSearch: 

78 if self._friend is None: 

79 self._friend = AppFriendSearch(self.client) 

80 

81 return self._friend 

82 

83 @staticmethod 

84 def _create_typesense_client() -> typesense.Client: 

85 

86 from flipdare.app_config import get_app_config 

87 

88 config = get_app_config() 

89 hostname = config.search_ip 

90 port = config.search_port 

91 api_key = config.search_api_key 

92 timeout = config.search_timeout 

93 

94 LOG().error( 

95 f"Creating Typesense client with settings: {hostname}:{port}:{api_key}", 

96 include_stack=True, 

97 ) 

98 

99 return typesense.Client( 

100 { 

101 "nodes": [{"host": hostname, "port": port, "protocol": "http"}], 

102 "api_key": api_key, 

103 "connection_timeout_seconds": timeout, 

104 }, 

105 ) 

106 

107 # --------------------------------------------------------------------------------------------- 

108 # ADMIN 

109 # --------------------------------------------------------------------------------------------- 

110 

111 def initialize_all(self) -> None: 

112 client = self.client 

113 collections = client.collections.retrieve() 

114 existing_collections = {col["name"] for col in collections} 

115 

116 try: 

117 for collection in SearchCollections: 

118 if collection.base_name not in existing_collections: 

119 LOG().info(f"Creating missing collection: {collection.base_name}") 

120 self.create_collection_if(collection) 

121 self.create_alias_if(collection.value, collection.base_name) 

122 except Exception as e: 

123 msg = f"Failed to initialize search collections: {e}" 

124 raise ServerError( 

125 message=msg, 

126 error_code=AppErrorCode.SERVER_INIT, 

127 error=e, 

128 ) from e 

129 

130 self.initialize_nat_lang() 

131 

132 def initialize_nat_lang(self) -> None: 

133 

134 from flipdare.app_config import get_app_config 

135 

136 app_config = get_app_config() 

137 enable_nat_lang = app_config.search_enable_nat_lang 

138 if not enable_nat_lang: 

139 LOG().info("Natural language search is disabled.") 

140 return 

141 

142 LOG().info("Enabling natural language search settings...") 

143 api_key = app_config.gemini_api_key 

144 model_config = SEARCH_NAT_LANG_CONFIG.copy() 

145 model_config["api_key"] = api_key 

146 

147 client = self.client 

148 try: 

149 client.nl_search_models.create(model_config) # type: ignore 

150 LOG().info(f"Model {model_config['id']} configured successfully.") 

151 except TypesenseClientError as e: 

152 LOG().error(f"Error configuring model: {e}") 

153 except Exception as e: 

154 LOG().error(f"Unexpected error: {e}") 

155 

156 def create_collection_if(self, collection: SearchCollections) -> bool: 

157 if self.collection_exists(collection): 

158 LOG().info(f"Skipped existing collection: {collection.base_name}") 

159 return False 

160 

161 LOG().info(f"Creating collection: {collection.base_name}") 

162 col_def = collection.definition 

163 self.client.collections.create(col_def) # type: ignore 

164 return True 

165 

166 def collection_exists(self, collection: SearchCollections) -> bool: 

167 client = self.client 

168 col_name = collection.base_name 

169 try: 

170 collections = client.collections.retrieve() 

171 return any(col["name"] == col_name for col in collections) 

172 except Exception as error: 

173 LOG().error(f"Error checking collection existence for {col_name}: {error}") 

174 return False 

175 

176 def create_alias_if(self, alias_name: str, collection_name: str) -> bool: 

177 if self.alias_exists(alias_name, expected_destination=collection_name): 

178 LOG().info(f"Skipped existing alias {alias_name} -> {collection_name}.") 

179 return False 

180 

181 LOG().info(f"Creating alias {alias_name} -> {collection_name}.") 

182 self.client.aliases.upsert(alias_name, {"collection_name": collection_name}) 

183 return True 

184 

185 def alias_exists(self, alias_name: str, expected_destination: str | None = None) -> bool: 

186 try: 

187 # Direct lookup is O(1) instead of O(n) 

188 alias_detail = self.client.aliases[alias_name].retrieve() 

189 

190 if expected_destination is not None: 

191 return alias_detail["collection_name"] == expected_destination 

192 

193 return True 

194 except ObjectNotFound: 

195 return False 

196 except Exception as error: 

197 LOG().error(f"Unexpected error checking alias {alias_name}: {error}") 

198 return False 

199 

200 # --------------------------------------------------------------------------------------------- 

201 # MAINTENANCE 

202 # --------------------------------------------------------------------------------------------- 

203 

204 def dump(self) -> None: 

205 """ 

206 Dumps the configuration to log. Useful for debugging connection issues. 

207 """ 

208 client = self.client 

209 try: 

210 collections = client.collections.retrieve() 

211 aliases = client.aliases.retrieve() 

212 LOG().info(f"Current Typesense Collections: {[col['name'] for col in collections]}") 

213 LOG().info( 

214 f"Current Typesense Aliases: {[alias['name'] for alias in aliases['aliases']]}", 

215 ) 

216 except Exception as e: 

217 LOG().error(f"Error dumping Typesense configuration: {e}") 

218 

219 def reindex(self) -> None: 

220 # FIXME: FLP-629 - implement blue/green reindexing strategy 

221 # NOTE: Just want to follow up for anyone else that might run into this. 

222 # NOTE: This has been running for a few days now and the disk size and re-indexing 

223 # NOTE: is remaining consistent. 

224 # NOTE: Making sure the db-compaction-interval and snapshot-interval-seconds 

225 # NOTE: flags were not overlapping between pipeline runs was the key here! 

226 # Ref: https://github.com/typesense/typesense/issues/2005 

227 

228 # for Typesense to minimize downtime during index updates. This should include: 

229 # Process: 

230 # 1. Create new collection with updated schema (Green) 

231 # 2. Import data into Green collection in background 

232 # 3. Atomically swap alias to point from old collection (Blue) to new collection (Green) 

233 # 4. In a blue/green re-index, you should perform compaction at the very end of the process, 

234 # specifically after you have deleted the old "blue" collection. 

235 ## 1. Import (Staggered) 

236 # client.collections[NEW_NAME].documents.import_(data, {'batch_size': 100}) 

237 # 

238 ## 2. Swap Alias (Instant) 

239 # client.aliases.upsert('products', {'collection_name': NEW_NAME}) 

240 # 

241 ## 3. Wait (Safety) 

242 # import time 

243 # time.sleep(300) # 5 mins 

244 # 

245 ## 4. Delete Old 

246 # client.collections[OLD_NAME].delete() 

247 # 

248 ## 5. Compact (The only cleanup you need) 

249 # client.operations.perform('db/compact') 

250 raise NotImplementedError("Blue/Green re-indexing strategy is not yet implemented.") 

251 

252 def compact(self) -> None: 

253 """ 

254 Compacts the Typesense search index to optimize performance. 

255 Should be run periodically as part of maintenance. 

256 """ 

257 try: 

258 self.client.operations.perform("db/compact") 

259 except Exception as e: 

260 msg = f"Failed to compact Typesense index: {e}" 

261 raise ServerError( 

262 message=msg, 

263 error_code=AppErrorCode.SERVER_MAINTENANCE, 

264 error=e, 

265 ) from e 

266 

267 # --------------------------------------------------------------------------------------------- 

268 # HEALTH CHECK 

269 # --------------------------------------------------------------------------------------------- 

270 

271 def ping_search(self, req: flask.Request) -> flask.Response: # pragma: no cover 

272 

273 from flipdare.services import get_search_manager 

274 from flipdare.services import get_admin_mailer 

275 from flipdare.core.app_response import AppErrorResponse 

276 

277 # internal function, no coverage 

278 result = AppRequest.http(req, AppHttpRequestType.PING_SEARCH) 

279 mailer = get_admin_mailer() 

280 

281 try: 

282 result.is_authenticated() 

283 except AppError as error: 

284 msg = f"Not Authenticated?:\n{error!s}\n" 

285 mailer.send_error( 

286 error_code=AppErrorCode.SERVER, 

287 category=AppLogCategory.COMMAND, 

288 message=msg, 

289 include_stack=True, 

290 ) 

291 return AppErrorResponse.from_context( 

292 ctx=ErrorContext.unauthorized(req.url, message=msg), 

293 ).raw_response() 

294 

295 try: 

296 manager = get_search_manager() 

297 self._check_categories(manager) 

298 except Exception as ex: 

299 msg = f"Search offline..: {ex}" 

300 mailer.send_error( 

301 error_code=AppErrorCode.SERVER, 

302 category=AppLogCategory.COMMAND, 

303 message=msg, 

304 include_stack=True, 

305 ) 

306 return AppErrorResponse.from_context( 

307 ctx=ErrorContext.server_error(req.url, message=msg), 

308 ).raw_response() 

309 

310 return AppOkResponse.ok().raw_response() 

311 

312 def _check_categories(self, manager: SearchManager) -> None: 

313 search = manager.general 

314 ok = search.client.operations.is_healthy() 

315 if not ok: 

316 msg = f"Search is unhealthy: {ok}" 

317 raise AppError(AppErrorCode.SERVER, msg)