Coverage for functions \ flipdare \ firestore \ _app_db.py: 100%

0 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-05-08 12:22 +1000

1#!/usr/bin/env python 

2# Copyright (c) 2026 Flipdare Pty Ltd. All rights reserved. 

3# 

4# This file is part of Flipdare's proprietary software and contains 

5# confidential and copyrighted material. Unauthorised copying, 

6# modification, distribution, or use of this file is strictly 

7# prohibited without prior written permission from Flipdare Pty Ltd. 

8# 

9# This software includes third-party components licensed under MIT, 

10# BSD, and Apache 2.0 licences. See THIRD_PARTY_NOTICES for details. 

11# 

12 

13from __future__ import annotations 

14 

15from collections.abc import Awaitable, Sequence 

16from dataclasses import dataclass 

17from typing import TYPE_CHECKING, Any, NoReturn, cast 

18 

19from google.cloud.firestore import Client as FirestoreClient 

20from google.cloud.firestore_v1.base_document import DocumentSnapshot 

21from google.cloud.firestore_v1.base_query import FieldFilter 

22from google.cloud.firestore_v1.field_path import FieldPath 

23from google.cloud.firestore_v1 import CollectionReference, Query, aggregation 

24 

25from flipdare.analysis.data.nested.time_series_collection_data import ( 

26 CollectionStat, 

27 TimeSeriesCollectionData, 

28) 

29from flipdare.app_log import LOG 

30from flipdare.app_types import DatabaseDict 

31from flipdare.constants import DEF_RETRIEVAL_WINDOW_HOURS, IS_DEBUG 

32from flipdare.error.app_error import DatabaseError 

33from flipdare.firestore.core.app_base_model import AppBaseModel 

34from flipdare.firestore.core.collection_stat_query import CollectionStatQuery 

35from flipdare.firestore.core.db_query import FieldOp, OrderByField 

36from flipdare.generated.shared.app_error_code import AppErrorCode 

37from flipdare.generated.shared.backend.app_job_type import AppJobType 

38from flipdare.generated.shared.firestore_collections import FirestoreCollections 

39from flipdare.util.debug_util import stringify_query 

40from flipdare.util.time_util import TimeUtil 

41from flipdare.wrapper._persisted_wrapper import PersistedWrapper 

42 

43if TYPE_CHECKING: 

44 

45 from flipdare.backend.app_logger import AppLogger 

46 

47__all__ = ["AppDb"] 

48 

49# NOTE: after upgrading, cant just use "id", need to use FieldPath.document_id() 

50_id = FieldPath.document_id() 

51 

52 

53@dataclass(frozen=True, slots=True) 

54class AggregateResult: # type: ignore[misc] 

55 count: float 

56 sum_total: float = 0 

57 

58 @property 

59 def is_error(self) -> bool: 

60 return self.count < 0 or self.sum_total < 0 

61 

62 

63class AppDb[W: PersistedWrapper[Any], T: AppBaseModel]: 

64 def __init__( 

65 self, 

66 client: FirestoreClient, 

67 collection_name: FirestoreCollections, 

68 wrapper_class: type[W], 

69 model_class: type[T], 

70 log_creator: AppLogger | None = None, 

71 def_window_hours: int = DEF_RETRIEVAL_WINDOW_HOURS, 

72 ) -> None: 

73 if def_window_hours <= 0: 

74 LOG().warning( 

75 f"Invalid def_window_hours {def_window_hours}, " 

76 f"using default {DEF_RETRIEVAL_WINDOW_HOURS}", 

77 ) 

78 def_window_hours = DEF_RETRIEVAL_WINDOW_HOURS 

79 

80 self._client = client 

81 self._collection_name = collection_name 

82 self._wrapper_class = wrapper_class 

83 self._model_class = model_class 

84 self._def_window_hours = def_window_hours 

85 self._log_creator = log_creator 

86 

87 # ----------------------------------------------------------------------------- 

88 # Properties 

89 # ----------------------------------------------------------------------------- 

90 

91 @property 

92 def collection_name(self) -> str: 

93 return self._collection_name.value 

94 

95 @property 

96 def wrapper_class(self) -> type[W]: 

97 return self._wrapper_class 

98 

99 @property 

100 def model_class(self) -> type[T]: 

101 return self._model_class 

102 

103 @property 

104 def def_window_hours(self) -> int: 

105 return self._def_window_hours 

106 

107 @property 

108 def client(self) -> FirestoreClient: 

109 return self._client 

110 

111 @property 

112 def log_creator(self) -> AppLogger: 

113 from flipdare.services import get_app_logger 

114 

115 if self._log_creator is None: 

116 self._log_creator = get_app_logger() 

117 

118 return self._log_creator 

119 

120 @property 

121 def document_count(self) -> int: 

122 """Get the total number of documents in the collection.""" 

123 try: 

124 col_ref = self.client.collection(self.collection_name) 

125 count_query = col_ref.count() 

126 count_snapshot = count_query.get() 

127 return int(count_snapshot[0][0].value) # pyright: ignore[reportIndexIssue] # 

128 except Exception as error: 

129 LOG().error(f"Error counting documents in collection {self.collection_name}: {error}") 

130 return 0 

131 

132 # ----------------------------------------------------------------------------- 

133 # Get 

134 # ----------------------------------------------------------------------------- 

135 

136 def exists(self, doc_id: str) -> bool: 

137 """Check if a document exists in Firestore.""" 

138 return self._exists(doc_id) 

139 

140 def get(self, doc_id: str) -> W | None: 

141 """Get a document by ID from Firestore, returns PersistedWrapper or None""" 

142 data = self._get(doc_id) 

143 if data is None: 

144 return None 

145 

146 return self.wrapper_class.from_dict(data) 

147 

148 def get_bulk(self, doc_ids: list[str]) -> list[W]: 

149 """Get multiple documents by their IDs, returns list of PersistedWrapper.""" 

150 data_items = self._get_bulk(doc_ids) 

151 if len(data_items) == 0: 

152 return [] 

153 

154 results: list[W] = [] 

155 for item in data_items: 

156 wrapper = self.wrapper_class.from_dict(item) 

157 results.append(wrapper) 

158 return results 

159 

160 # ----------------------------------------------------------------------------- 

161 # Create/Update 

162 # ----------------------------------------------------------------------------- 

163 

164 def create(self, data: T | dict[str, Any], merge: bool = False) -> W: 

165 """ 

166 Create a new document in Firestore, returns PersistedWrapper. 

167 

168 Args: 

169 data: Model instance or dict to create from 

170 merge: Whether to merge with existing document 

171 

172 Returns: 

173 PersistedWrapper for the created document 

174 

175 """ 

176 # Convert to dict for Firestore 

177 payload = data if isinstance(data, dict) else data.to_dict() 

178 created_data = self._create(payload, merge=merge) 

179 

180 # from_dict returns PersistedWrapper[Model] 

181 return self.wrapper_class.from_dict(created_data) 

182 

183 def update_model(self, model: W) -> W | None: 

184 """Update document in Firestore using model instance, returns PersistedWrapper.""" 

185 doc_id = model.doc_id 

186 if not model.has_changes: 

187 if IS_DEBUG: 

188 LOG().debug(f"No changes to update for {doc_id}") 

189 # Model is already a PersistedWrapper, return it directly 

190 return model 

191 

192 updates = model.get_updates() 

193 if IS_DEBUG: 

194 LOG().debug(f"Updating model {doc_id} with changes: {updates}") 

195 

196 return self.update(doc_id, updates) 

197 

198 def update(self, doc_id: str, updates: DatabaseDict) -> W | None: 

199 """Update document in Firestore, returns PersistedWrapper.""" 

200 updated_data = self._update(doc_id, updates) 

201 if updated_data is None: 

202 return None 

203 

204 return self.wrapper_class.from_dict(updated_data) 

205 

206 def batch_update(self, updates: Sequence[W]) -> int: 

207 """Bulk update documents in Firestore.""" 

208 batch = self.client.batch() 

209 base_ref = self.client.collection(self.collection_name) 

210 ct = 0 

211 

212 for update in updates: 

213 doc_id = update.doc_id 

214 if not update.has_changes: 

215 if IS_DEBUG: 

216 LOG().debug(f"Skipping update for {doc_id} - no changes") 

217 continue 

218 

219 update_data = update.get_updates() 

220 doc_ref = base_ref.document(doc_id) 

221 batch.update(doc_ref, update_data) 

222 ct += 1 

223 

224 batch.commit() 

225 if IS_DEBUG: 

226 LOG().debug(f"Batch updated {ct} documents in collection {self.collection_name}") 

227 return ct 

228 

229 # ----------------------------------------------------------------------------- 

230 # Delete 

231 # ----------------------------------------------------------------------------- 

232 

233 def delete(self, doc_id: str) -> None: 

234 """Delete a document from Firestore.""" 

235 col_name = self.collection_name 

236 

237 try: 

238 doc_ref = self.client.collection(col_name).document(doc_id) 

239 doc_ref.delete() 

240 # LOG().debug(f"Deleted document with id: {doc_id} from collection {col_name}") 

241 except Exception as error: 

242 LOG().error(f"Error deleting document {doc_id} from collection {col_name}: {error}") 

243 raise DatabaseError( 

244 f"Failed to delete document {doc_id}", 

245 error_code=AppErrorCode.DATABASE, 

246 collection_name=col_name, 

247 document_id=doc_id, 

248 ) from error 

249 

250 # ----------------------------------------------------------------------------- 

251 # Aggregate Stats 

252 # ----------------------------------------------------------------------------- 

253 

254 def get_collection_stats(self, days: int = 7) -> TimeSeriesCollectionData: 

255 date_ranges = TimeUtil.get_date_range( 

256 days, 

257 start=TimeUtil.get_start_of_day_utc(), 

258 reverse=True, 

259 ) 

260 agg_stats = TimeSeriesCollectionData() 

261 

262 client = self.client 

263 col_name = self.collection_name 

264 

265 for date_range in date_ranges: 

266 from_date = date_range.from_date 

267 to_date = date_range.to_date 

268 proc_ct = 0.0 

269 unproc_ct = 0.0 

270 

271 proc_qry = CollectionStatQuery.processed(from_date, to_date) 

272 proc_agg = self._get_agg_value(proc_qry.get_query(client, col_name)) 

273 proc_ct = proc_agg.count if proc_agg is not None else -1.0 

274 

275 unproc_qry = CollectionStatQuery.unprocessed(from_date, to_date) 

276 unproc_agg = self._get_agg_value(unproc_qry.get_query(client, col_name)) 

277 unproc_ct = unproc_agg.count if unproc_agg is not None else -1.0 

278 

279 error_qry = CollectionStatQuery.error(from_date, to_date) 

280 error_agg = self._get_agg_value( 

281 error_qry.get_query(client, col_name), 

282 sum_field=error_qry.sum_field, 

283 ) 

284 error_ct = error_agg.count if error_agg is not None else -1.0 

285 

286 if proc_ct < 0 or unproc_ct < 0 or error_ct < 0: 

287 agg_stats.increment_error() 

288 else: 

289 stat = CollectionStat( 

290 total_ct=proc_ct + error_ct + unproc_ct, 

291 processed_ct=proc_ct, 

292 unprocessed_ct=unproc_ct, 

293 error_ct=error_ct, 

294 ) 

295 agg_stats.add(from_date, FirestoreCollections.DARE, stat) 

296 

297 return agg_stats 

298 

299 def _get_agg_value( 

300 self, 

301 query: Query | CollectionReference, 

302 sum_field: str | None = None, 

303 ) -> AggregateResult | None: 

304 try: 

305 agg_query = aggregation.AggregationQuery(query) 

306 agg_query.count(alias="count") 

307 if sum_field is not None: 

308 agg_query.sum(sum_field, alias="sum") 

309 

310 results = agg_query.get() 

311 count_value = results[0][0].value # type: ignore 

312 

313 sum_value = 0.0 

314 if sum_field is not None: 

315 sum_value = results[0][1].value # type: ignore 

316 

317 return AggregateResult(count=float(count_value), sum_total=float(sum_value)) 

318 

319 except Exception as e: 

320 msg = f"Error getting aggregate stats {self.collection_name}: {e}\n{stringify_query(query)}" 

321 LOG().error(msg) 

322 return None 

323 

324 # ----------------------------------------------------------------------------- 

325 # Get Utilities 

326 # ----------------------------------------------------------------------------- 

327 

328 def _exists(self, doc_id: str) -> bool: # pragma: no cover 

329 """Check if a document exists in a specific collection.""" 

330 doc_data = self._get(doc_id) 

331 return doc_data is not None 

332 

333 def _exists_sub(self, parent_id: str, sub_col_name: str, doc_id: str) -> bool: 

334 """Check if a sub-collection document exists.""" 

335 doc_data = self._get_sub(parent_id, sub_col_name, doc_id) 

336 return doc_data is not None 

337 

338 def _get(self, doc_id: str) -> DatabaseDict | None: 

339 """Get a document by ID from a specific collection, None otherwise""" 

340 # LOG().debug(f"Getting document for collection {col_name}, id: {doc_id}") 

341 col_name = self.collection_name 

342 try: 

343 doc = self.client.collection(col_name).document(doc_id).get() 

344 doc = cast("DocumentSnapshot", doc) 

345 

346 if doc.exists: 

347 return self._cvt_snap_to_data(doc) 

348 return None 

349 except Exception as error: 

350 LOG().error(f"Error searching for collection {col_name}, id: {doc_id}: {error}") 

351 return None 

352 

353 def _get_bulk(self, doc_ids: list[str]) -> list[DatabaseDict]: 

354 """Get multiple documents by their IDs.""" 

355 col_name = self.collection_name 

356 try: 

357 documents = [] 

358 # Firestore allows up to 10 in 'in' queries 

359 chunk_size = 10 

360 for i in range(0, len(doc_ids), chunk_size): 

361 chunk = doc_ids[i : i + chunk_size] 

362 doc_refs = [self.client.collection(col_name).document(doc_id) for doc_id in chunk] 

363 query = ( 

364 self.client.collection(col_name) 

365 .where(filter=FieldFilter(_id, FieldOp.IN.value, doc_refs)) 

366 .get() 

367 ) 

368 for snap in query: 

369 data = self._cvt_snap_to_data(snap) 

370 if data is not None: 

371 documents.append(data) 

372 

373 return documents 

374 except Exception as error: 

375 LOG().error(f"Error getting documents for ids {doc_ids}: {error}") 

376 return [] 

377 

378 # ----------------------------------------------------------------------------- 

379 # Get for Sub Collections utilities 

380 # ----------------------------------------------------------------------------- 

381 

382 def _get_sub(self, parent_id: str, sub_col_name: str, doc_id: str) -> DatabaseDict | None: 

383 """Get a sub-collection document by ID, None otherwise""" 

384 # LOG().debug( 

385 # f"Getting sub-document for collection " 

386 # f"{col_name}/{parent_id}/{sub_col_name}, id: {doc_id}" 

387 # ) 

388 col_name = self.collection_name 

389 try: 

390 doc = cast( 

391 "DocumentSnapshot", 

392 ( 

393 self.client.collection(col_name) 

394 .document(parent_id) 

395 .collection(sub_col_name) 

396 .document(doc_id) 

397 .get() 

398 ), 

399 ) 

400 if doc.exists: 

401 return self._cvt_snap_to_data(doc) 

402 return None 

403 except Exception as error: 

404 LOG().error( 

405 f"Error searching for sub-collection " 

406 f"{col_name}/{parent_id}/{sub_col_name}, id: {doc_id}: {error}", 

407 ) 

408 return None 

409 

410 def _get_all_sub( 

411 self, 

412 parent_id: str, 

413 sub_col_name: str, 

414 order_by: OrderByField[Any] | None = None, 

415 limit: int | None = None, 

416 ) -> list[DatabaseDict]: 

417 """Get all documents in a sub-collection.""" 

418 # LOG().debug( 

419 # f"Getting all sub-documents for collection " 

420 # f"{col_name}/{parent_id}/{sub_col_name}" 

421 # ) 

422 col_name = self.collection_name 

423 try: 

424 collection_ref = ( 

425 self.client.collection(col_name).document(parent_id).collection(sub_col_name) 

426 ) 

427 

428 if order_by is not None: 

429 collection_ref = collection_ref.order_by( 

430 order_by.key, 

431 direction=order_by.direction, 

432 ) 

433 

434 docs = ( 

435 collection_ref.get() 

436 if limit is None or limit <= 0 

437 else collection_ref.limit(limit).get() 

438 ) 

439 

440 results = [] 

441 for doc in docs: 

442 data = self._cvt_snap_to_data(doc) 

443 if data is not None: 

444 results.append(data) 

445 return results 

446 except Exception as error: 

447 LOG().error( 

448 f"Error getting all sub-documents for collection " 

449 f"{col_name}/{parent_id}/{sub_col_name}: {error}", 

450 ) 

451 return [] 

452 

453 def _get_bulk_sub( 

454 self, 

455 parent_id: str, 

456 sub_col_name: str, 

457 doc_ids: list[str], 

458 ) -> list[DatabaseDict]: 

459 """Get multiple sub-collection documents by their IDs.""" 

460 col_name = self.collection_name 

461 

462 try: 

463 documents = [] 

464 # Firestore allows up to 10 in 'in' queries 

465 chunk_size = 10 

466 for i in range(0, len(doc_ids), chunk_size): 

467 chunk = doc_ids[i : i + chunk_size] 

468 doc_refs = [self.client.collection(col_name).document(doc_id) for doc_id in chunk] 

469 query = ( 

470 self.client.collection(col_name) 

471 .document(parent_id) 

472 .collection(sub_col_name) 

473 .where(filter=FieldFilter(_id, "in", doc_refs)) 

474 .get() 

475 ) 

476 

477 for snap in query: 

478 data = self._cvt_snap_to_data(snap) 

479 if data is not None: 

480 documents.append(data) 

481 

482 return documents 

483 except Exception as error: 

484 LOG().error(f"Error getting sub-documents for ids {doc_ids}: {error}") 

485 return [] 

486 

487 # ----------------------------------------------------------------------------- 

488 # Create/Update utilities 

489 # ----------------------------------------------------------------------------- 

490 

491 def _create(self, data: DatabaseDict, merge: bool = False) -> DatabaseDict: 

492 """Create a new document in Firestore.""" 

493 col_name = self.collection_name 

494 saved_data: DatabaseDict | None 

495 try: 

496 col_ref = self.client.collection(col_name) 

497 doc_ref = col_ref.document() 

498 doc_ref.set(data, merge=merge) 

499 snap = doc_ref.get() 

500 # quotes required for type checking and slight performance 

501 snap = cast("DocumentSnapshot", snap) 

502 saved_data = self._cvt_snap_to_data(snap) 

503 except Exception as error: 

504 LOG().error(f"Error creating document for collection {col_name}: {error}\n\t{data}") 

505 raise DatabaseError( 

506 f"Failed to create document for collection {col_name}", 

507 error_code=AppErrorCode.DATABASE, 

508 collection_name=col_name, 

509 ) from error 

510 

511 if saved_data is not None: 

512 return saved_data 

513 

514 raise DatabaseError( 

515 f"Failed to create document for collection {col_name}", 

516 error_code=AppErrorCode.DATABASE, 

517 collection_name=col_name, 

518 document_id=doc_ref.id, 

519 ) 

520 

521 def _update(self, doc_id: str, updates: DatabaseDict) -> DatabaseDict | None: 

522 """Update document in Firestore.""" 

523 col_name = self.collection_name 

524 

525 # LOG().debug(f"Updating document for id: {doc_id}") 

526 try: 

527 doc_ref = self.client.collection(col_name).document(doc_id) 

528 if not self._snap_exists(doc_ref.get()): 

529 LOG().error(f"Document not found for update: {col_name}/{doc_id}") 

530 return None 

531 

532 doc_ref.set(updates, merge=True) 

533 updated_snap = doc_ref.get() 

534 

535 # we know it exists, so convert directly 

536 return self._cvt_snap_to_data(updated_snap) 

537 except Exception as error: 

538 LOG().error(f"Error updating document {doc_id}: {error}") 

539 raise DatabaseError( 

540 f"Failed to update document {doc_id}", 

541 error_code=AppErrorCode.DATABASE, 

542 collection_name=col_name, 

543 document_id=doc_id, 

544 ) from error 

545 

546 # ----------------------------------------------------------------------------- 

547 # Create for Sub-Collection utilities 

548 # ----------------------------------------------------------------------------- 

549 

550 def _create_sub(self, parent_id: str, sub_col_name: str, data: DatabaseDict) -> DatabaseDict: 

551 """Create a new sub-collection document in Firestore.""" 

552 col_name = self.collection_name 

553 saved_data: DatabaseDict | None = None 

554 try: 

555 col_ref = self.client.collection(col_name).document(parent_id).collection(sub_col_name) 

556 doc_ref = col_ref.document() 

557 doc_ref.set(data) 

558 snap = cast("DocumentSnapshot", doc_ref.get()) 

559 saved_data = self._cvt_snap_to_data(snap) 

560 except Exception as error: 

561 self._raise_sub_error(parent_id, sub_col_name, "create", data, error) 

562 

563 if saved_data is None: 

564 self._raise_sub_error(parent_id, sub_col_name, "create", data) 

565 

566 return saved_data 

567 

568 def _update_sub( 

569 self, 

570 parent_id: str, 

571 sub_col_name: str, 

572 doc_id: str, 

573 updates: DatabaseDict, 

574 ) -> DatabaseDict | None: 

575 """Update sub-collection document in Firestore.""" 

576 # LOG().debug(f"Updating sub-document for id: {doc_id}") 

577 col_name = self.collection_name 

578 

579 try: 

580 doc_ref = ( 

581 self.client.collection(col_name) 

582 .document(parent_id) 

583 .collection(sub_col_name) 

584 .document(doc_id) 

585 ) 

586 if not self._snap_exists(doc_ref.get()): 

587 LOG().warning(f"Sub-document not found for id: {doc_id}") 

588 return None 

589 

590 doc_ref.set(updates, merge=True) 

591 # LOG().debug(f"Found sub document for {parent_id}/{doc_id}, updating with " 

592 # f"{stringify_debug(updates)}") 

593 

594 return self._cvt_snap_to_data(doc_ref.get()) 

595 except Exception as error: 

596 LOG().error(f"Error updating sub-document {doc_id}: {error}") 

597 raise DatabaseError( 

598 f"Failed to update sub-document {doc_id}", 

599 error_code=AppErrorCode.DATABASE, 

600 collection_name=f"{col_name}/{parent_id}/{sub_col_name}", 

601 document_id=doc_id, 

602 ) from error 

603 

604 # ----------------------------------------------------------------------------- 

605 # Misc utilities 

606 # ----------------------------------------------------------------------------- 

607 

608 def _snap_exists(self, snap: DocumentSnapshot | Awaitable[DocumentSnapshot]) -> bool: 

609 """Check if DocumentSnapshot exists, supports Awaitable for async compatibility.""" 

610 if isinstance(snap, Awaitable): 

611 msg = "Expected DocumentSnapshot, got Awaitable. Are you accidentally using async?" 

612 raise TypeError(msg) 

613 

614 return snap.exists 

615 

616 def _cvt_snap_to_model(self, snap: DocumentSnapshot) -> W | None: 

617 """Convert DocumentSnapshot to PersistedWrapper[model].""" 

618 data = self._cvt_snap_to_data(snap) 

619 if data is None: 

620 return None 

621 

622 return self.wrapper_class.from_dict(data) 

623 

624 def _cvt_snap_to_data( 

625 self, 

626 snap: DocumentSnapshot | Awaitable[DocumentSnapshot], 

627 ) -> DatabaseDict | None: 

628 """Convert DocumentSnapshot to DatabaseDict with 'id'.""" 

629 if isinstance(snap, Awaitable): 

630 msg = ("Expected DocumentSnapshot, got Awaitable. Are you accidentally using async?",) 

631 raise TypeError(msg) 

632 

633 if not snap.exists: 

634 return None 

635 

636 data: DatabaseDict | None = snap.to_dict() 

637 if data is None: 

638 return None 

639 

640 data["id"] = snap.id 

641 return data 

642 

643 def _process_results(self, results: list[DocumentSnapshot]) -> list[W]: 

644 """Convert list of DocumentSnapshots to list of PersistedWrapper.""" 

645 models: list[W] = [] 

646 for snap in results: 

647 model = self._cvt_snap_to_model(snap) 

648 if model is not None: 

649 models.append(model) 

650 return models 

651 

652 # ----------------------------------------------------------------------------- 

653 # Error handling 

654 # ----------------------------------------------------------------------------- 

655 

656 def _raise_sub_error( 

657 self, 

658 parent_id: str, 

659 sub_col_name: str, 

660 operation: str, 

661 data: DatabaseDict, 

662 error: Exception | None = None, 

663 ) -> NoReturn: 

664 """Helper to raise a standardized error for sub-collection operations.""" 

665 col_name = self.collection_name 

666 

667 LOG().error( 

668 f"Error performing {operation} sub-document for collection " 

669 f"{col_name}/{parent_id}/{sub_col_name}: {error}\n\t{data}", 

670 ) 

671 db_error = DatabaseError( 

672 f"Error in collection {self.collection_name}/{parent_id}/{sub_col_name}: {error}", 

673 error_code=AppErrorCode.DATABASE, 

674 collection_name=f"{self.collection_name}/{parent_id}/{sub_col_name}", 

675 ) 

676 

677 if error is None: 

678 raise db_error 

679 raise db_error from error 

680 

681 def log_error( 

682 self, 

683 job_type: AppJobType, 

684 message: str, 

685 error_code: AppErrorCode = AppErrorCode.DATABASE, 

686 notify_admin: bool = True, 

687 ) -> None: 

688 from flipdare.services import get_app_logger 

689 

690 collection = self._collection_name 

691 LOG().error(f"Logging error for collection {collection}: {message}") 

692 

693 get_app_logger().db_error( 

694 message=message, 

695 error_code=error_code, 

696 job_type=job_type, 

697 collection=collection, 

698 notify_admin=notify_admin, 

699 )