Coverage for functions \ flipdare \ service \ content_service.py: 44%

72 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-05-08 12:22 +1000

1#!/usr/bin/env python 

2# Copyright (c) 2026 Flipdare Pty Ltd. All rights reserved. 

3# 

4# This file is part of Flipdare's proprietary software and contains 

5# confidential and copyrighted material. Unauthorised copying, 

6# modification, distribution, or use of this file is strictly 

7# prohibited without prior written permission from Flipdare Pty Ltd. 

8# 

9# This software includes third-party components licensed under MIT, 

10# BSD, and Apache 2.0 licences. See THIRD_PARTY_NOTICES for details. 

11# 

12 

13from __future__ import annotations 

14from pathlib import Path 

15from typing import TYPE_CHECKING 

16from flipdare.service._service_provider import ServiceProvider 

17from flipdare.service.processor.content_processor import ContentProcessor 

18from flipdare.app_log import LOG 

19from flipdare.constants import DOWNLOAD_FILE_DIR 

20from flipdare.result.app_result import AppResult 

21from flipdare.core.job_type_decorator import job_type_decorator 

22from flipdare.result.job_result import JobResult 

23from flipdare.core.trigger_decorator import trigger_decorator 

24from flipdare.generated.shared.app_error_code import AppErrorCode 

25from flipdare.generated.shared.backend.app_job_type import AppJobType 

26from flipdare.generated.shared.firestore_collections import FirestoreCollections 

27from flipdare.wrapper import AppJobWrapper, ContentWrapper 

28from flipdare.app_types import CronResult 

29from flipdare.constants import IS_DEBUG 

30from flipdare.core.cron_decorator import cron_decorator 

31from flipdare.generated.model.backend.metric.count_metric import CountMetric 

32from flipdare.util.time_util import TimeUtil 

33 

34if TYPE_CHECKING: 

35 from flipdare.manager.db_manager import DbManager 

36 from flipdare.manager.backend_manager import BackendManager 

37 

38__all__ = ["ContentService"] 

39 

40 

41_JT = AppJobType 

42_COL = FirestoreCollections.CONTENT 

43 

44 

45class ContentService(ServiceProvider): 

46 

47 def __init__( 

48 self, 

49 db_manager: DbManager | None = None, 

50 backend_manager: BackendManager | None = None, 

51 local_path: Path = DOWNLOAD_FILE_DIR, 

52 ) -> None: 

53 super().__init__( 

54 backend_manager=backend_manager, 

55 db_manager=db_manager, 

56 ) 

57 

58 self._local_path = local_path 

59 self._content_processor: ContentProcessor | None = None 

60 

61 @property 

62 def content_processor(self) -> ContentProcessor: 

63 if self._content_processor is None: 

64 self._content_processor = ContentProcessor( 

65 bucket=self.storage_bucket, 

66 content_db=self.content_db, 

67 indexer_service=self.indexer, 

68 local_path=self._local_path, 

69 ) 

70 return self._content_processor 

71 

72 # ======================================================================== 

73 # CRONS 

74 # ======================================================================== 

75 

76 @cron_decorator(job_type=_JT.CR_CONTENT_UNPROCESSED) 

77 def cron_content_unprocessed(self) -> CronResult: 

78 content_processor = self.content_processor 

79 main_result = AppResult() 

80 success_ct = 0 

81 failed_ct = 0 

82 skipped_ct = 0 

83 start = TimeUtil.get_current_utc_dt() 

84 

85 try: 

86 items = self.content_db.get_recent_unprocessed() 

87 for content in items: 

88 if content.processing_complete: 

89 skipped_ct += 1 

90 continue 

91 log_result = content_processor.process_content(content, is_update=False) 

92 result = log_result.app_result 

93 if result.is_ok: 

94 success_ct += 1 

95 elif result.is_skipped: 

96 skipped_ct += 1 

97 else: 

98 failed_ct += 1 

99 main_result.merge(result) 

100 except Exception as e: 

101 msg = f"Exception during cron_content_unprocessed: {e}" 

102 LOG().error(msg) 

103 main_result.add_error(AppErrorCode.DATABASE_EX, msg) 

104 

105 end = TimeUtil.get_current_utc_dt() 

106 duration = TimeUtil.duration_in_seconds(start, end) 

107 

108 if IS_DEBUG: 

109 LOG().debug( 

110 f"cron_content_unprocessed: {success_ct} ok, " 

111 f"{failed_ct} failed, {skipped_ct} skipped." 

112 ) 

113 

114 if main_result.is_ok: 

115 return CountMetric( 

116 success_ct=success_ct, 

117 failed_ct=failed_ct, 

118 skipped_ct=skipped_ct, 

119 duration=duration, 

120 ) 

121 

122 LOG().error(f"cron_content_unprocessed errors:\n{main_result.formatted}") 

123 return CountMetric.error(duration=duration) 

124 

125 # ======================================================================== 

126 # TRIGGERS - Delegate to processors 

127 # ======================================================================== 

128 

129 @job_type_decorator(_JT.TR_CONTENT) 

130 @trigger_decorator(job_type=_JT.TR_CONTENT, collection=_COL, wrapper_class=ContentWrapper) 

131 def trigger_content( 

132 self, 

133 job: AppJobWrapper, 

134 *, 

135 wrapper: ContentWrapper, 

136 ) -> JobResult[ContentWrapper]: 

137 is_update = job.has_changes 

138 return self.content_processor.process_content(wrapper, is_update=is_update)