Coverage for functions \ flipdare \ service \ safety \ moderation_service.py: 60%

174 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-05-08 12:22 +1000

1#!/usr/bin/env python 

2# Copyright (c) 2026 Flipdare Pty Ltd. All rights reserved. 

3# 

4# This file is part of Flipdare's proprietary software and contains 

5# confidential and copyrighted material. Unauthorised copying, 

6# modification, distribution, or use of this file is strictly 

7# prohibited without prior written permission from Flipdare Pty Ltd. 

8# 

9# This software includes third-party components licensed under MIT, 

10# BSD, and Apache 2.0 licences. See THIRD_PARTY_NOTICES for details. 

11# 

12 

13from __future__ import annotations 

14 

15from typing import TYPE_CHECKING 

16 

17from google.cloud import language_v1 

18from google.cloud.language_v1 import ClassificationModelOptions 

19from flipdare.app_log import LOG 

20from flipdare.constants import IS_DEBUG 

21from flipdare.generated.shared.backend.app_job_type import AppJobType 

22from flipdare.generated.shared.model.dare.dare_status import DareStatus 

23from flipdare.generated.shared.model.restriction.moderation_decision import ModerationDecision 

24 

25from flipdare.manager.db_manager import DbManager 

26 

27from flipdare.service._service_provider import ServiceProvider 

28from flipdare.service.safety.core.moderation_scorer import ModerationScorer 

29from flipdare.service.safety.safety_types import ( 

30 ModerationCategory, 

31 ReputationOutcome, 

32 ModerationAssessment, 

33 ModerationOutcome, 

34) 

35from flipdare.generated.shared.firestore_collections import FirestoreCollections 

36from flipdare.wrapper import DareWrapper, PersistedGuard 

37from flipdare.wrapper.chat_comment_wrapper import ChatCommentWrapper 

38 

39if TYPE_CHECKING: 

40 from flipdare.manager.service_manager import ServiceManager 

41 from flipdare.manager.backend_manager import BackendManager 

42 

43 

44_D = ModerationDecision 

45_CategoriesVersion = ClassificationModelOptions.V2Model.ContentCategoriesVersion 

46 

47 

48class ModerationService(ServiceProvider): 

49 """A service for analyzing sentiment using Google Cloud Natural Language API.""" 

50 

51 def __init__( 

52 self, 

53 client: language_v1.LanguageServiceClient | None = None, 

54 db_manager: DbManager | None = None, 

55 backend_manager: BackendManager | None = None, 

56 service_manager: ServiceManager | None = None, 

57 ) -> None: 

58 super().__init__( 

59 db_manager=db_manager, 

60 backend_manager=backend_manager, 

61 service_manager=service_manager, 

62 ) 

63 self._client = client 

64 self._content_categories_version = _CategoriesVersion.V2 

65 

66 @property 

67 def language_client(self) -> language_v1.LanguageServiceClient: 

68 if self._client is None: 

69 self._client = language_v1.LanguageServiceClient() 

70 return self._client 

71 

72 @language_client.setter 

73 def language_client(self, value: language_v1.LanguageServiceClient) -> None: 

74 self._client = value 

75 

76 @property 

77 def content_categories_version(self) -> _CategoriesVersion: 

78 return self._content_categories_version 

79 

80 @content_categories_version.setter 

81 def content_categories_version(self, value: _CategoriesVersion) -> None: 

82 self._content_categories_version = value 

83 

84 def review_comment(self, comment: ChatCommentWrapper) -> ModerationOutcome: 

85 reputation_controller = self.reputation_service 

86 

87 rep_outcome: ReputationOutcome | None = None 

88 

89 from_uid = comment.from_uid 

90 rep_outcome = reputation_controller.should_analyze(from_uid) 

91 if not rep_outcome.should_analyze: 

92 if IS_DEBUG: 

93 LOG().debug(f"Auto-Approving for user {from_uid} due to high reputation.") 

94 return ModerationOutcome( 

95 decision=_D.AUTO_APPROVE_REPUTATION, 

96 new_reputation=rep_outcome.new_reputation, 

97 ) 

98 

99 message = comment.message 

100 if message is None: 

101 if IS_DEBUG: 

102 LOG().debug(f"No message found in comment {comment.doc_id}, auto-approving.") 

103 return ModerationOutcome( 

104 decision=_D.AUTO_APPROVE_SENTIMENT, 

105 new_reputation=rep_outcome.new_reputation, 

106 ) 

107 

108 assessment = self._moderate_text(message) 

109 decision: _D | None = None 

110 

111 if assessment is None: 

112 if IS_DEBUG: 

113 LOG().debug(f"No assessment, auto-approved for comment: '{message}'") 

114 return ModerationOutcome( 

115 decision=_D.AUTO_APPROVE_SENTIMENT, 

116 new_reputation=rep_outcome.new_reputation, 

117 ) 

118 

119 if IS_DEBUG: 

120 LOG().debug(f"Moderation decision: {assessment} for comment: '{message}'") 

121 moderation_type = assessment.moderation_type 

122 rep_outcome = reputation_controller.confirm_review(from_uid, moderation_type) 

123 

124 if not rep_outcome.should_analyze: 

125 if IS_DEBUG: 

126 LOG().debug(f"Review Not Required: result: {rep_outcome} for comment: '{message}'") 

127 decision = _D.AUTO_APPROVE_SENTIMENT 

128 else: 

129 if IS_DEBUG: 

130 LOG().debug(f"Review REQUIRED: result: {rep_outcome} for comment: '{message}'") 

131 decision = _D.REVIEW_REQUIRED 

132 

133 if IS_DEBUG: 

134 LOG().debug(f"Comment approval result: {decision}") 

135 

136 return ModerationOutcome( 

137 decision=decision, 

138 new_reputation=rep_outcome.new_reputation, 

139 assessment=assessment, 

140 ) 

141 

142 def review_dare( # noqa: PLR0912, PLR0915 

143 self, 

144 dare_obj: DareWrapper | str, 

145 ) -> ModerationOutcome | None: 

146 """ 

147 Analyze the sentiment of a dare using an external sentiment analysis service. 

148 

149 Args: 

150 dare (DareWrapper | str): The dare model or dare ID to analyze. 

151 

152 Returns: 

153 SentimentResult: The result of the sentiment analysis. 

154 

155 """ 

156 controller = self.reputation_service 

157 dare_db = self.dare_db 

158 

159 dare_model: DareWrapper | None = None 

160 dare_id: str | None = None 

161 

162 if PersistedGuard.is_dare(dare_obj): 

163 dare_model = dare_obj 

164 dare_id = dare_obj.doc_id 

165 elif isinstance(dare_obj, str): 

166 dare_id = dare_obj 

167 dare_model = dare_db.get(dare_id) 

168 

169 if dare_model is None: 

170 LOG().error(f"Dare not found for moderation: {dare_id}") 

171 return None 

172 if dare_id is None: 

173 LOG().error(f"Dare model provided has no ID:\nData={dare_model.to_dict()}") 

174 return None 

175 

176 from_uid = dare_model.from_uid 

177 old_decision = dare_model.moderation_decision 

178 status = dare_model.status 

179 

180 if not status.requires_moderation: 

181 if IS_DEBUG: 

182 msg = f"Dare {dare_id} in status {status} does not require moderation; skipping." 

183 LOG().debug(msg) 

184 return None 

185 if old_decision is not None: 

186 if IS_DEBUG: 

187 msg = f"Dare {dare_id} already reviewed ({old_decision}), skipping moderation." 

188 LOG().debug(msg) 

189 return ModerationOutcome(decision=old_decision, new_reputation=-1) 

190 

191 if dare_model.status == DareStatus.DRAFT: 

192 if IS_DEBUG: 

193 LOG().debug(f"Dare {dare_id} is in DRAFT status; skipping moderation.") 

194 return None 

195 

196 content = f"{dare_model.title}\n{dare_model.message}" 

197 

198 decision: _D | None = None 

199 assessment: ModerationAssessment | None = None 

200 rep_outcome: ReputationOutcome | None = None 

201 

202 rep_outcome = controller.should_analyze(from_uid) 

203 

204 if not rep_outcome.should_analyze: 

205 if IS_DEBUG: 

206 LOG().debug(f"Auto-Approving for user {from_uid} due to high reputation.") 

207 decision = _D.AUTO_APPROVE_REPUTATION 

208 else: 

209 debug_msg = f"dare {dare_id} from user {from_uid} with content '{content}'" 

210 assessment = self._moderate_text(content) 

211 if assessment is None: 

212 if IS_DEBUG: 

213 LOG().debug(f"No result, auto-approved: {debug_msg}") 

214 decision = _D.AUTO_APPROVE_SENTIMENT 

215 else: 

216 if IS_DEBUG: 

217 LOG().debug(f"Moderation assessment: {assessment}: {debug_msg}") 

218 moderation_type = assessment.moderation_type 

219 rep_outcome = controller.confirm_review(from_uid, moderation_type) 

220 if not rep_outcome.should_analyze: 

221 if IS_DEBUG: 

222 LOG().debug(f"Review Not Required: assessment: {assessment}: {debug_msg}") 

223 decision = _D.AUTO_APPROVE_SENTIMENT 

224 else: 

225 if IS_DEBUG: 

226 LOG().debug(f"Review REQUIRED: assessment: {assessment}: {debug_msg}") 

227 decision = _D.REVIEW_REQUIRED 

228 

229 if IS_DEBUG: 

230 LOG().debug(f"Dare {dare_id} decision: {decision}") 

231 

232 ok = self._update_issue_progress(decision, dare_id, dare_model) 

233 if not ok: 

234 return None 

235 

236 return ModerationOutcome( 

237 decision=decision, 

238 new_reputation=rep_outcome.new_reputation, 

239 assessment=assessment, 

240 ) 

241 

242 def _update_issue_progress( 

243 self, 

244 decision: ModerationDecision, 

245 dare_id: str, 

246 dare_model: DareWrapper, 

247 ) -> bool: 

248 dare_db = self.dare_db 

249 dare_model.moderation_decision = decision 

250 updates = dare_model.get_updates() 

251 if not updates: 

252 # we cant update manually, because based on the decision 

253 # other dare fields may need to be updated 

254 # instead the support team needs to investigate .. 

255 msg = f"No updates found for dare {dare_id}, failed to updated to {decision}" 

256 self.app_logger.unexpected_code_path( 

257 job_type=AppJobType.TR_DARE, 

258 collection=FirestoreCollections.DARE, 

259 message=msg, 

260 data={"moderation_decision": decision.value, "id": dare_id}, 

261 ) 

262 return False 

263 

264 dare_db.update(doc_id=dare_id, updates=updates) 

265 LOG().info(f"Dare {dare_id} marked as {decision}.") 

266 return True 

267 

268 def _moderate_text(self, content: str) -> ModerationAssessment | None: 

269 document = language_v1.Document( 

270 content=content, 

271 type_=language_v1.Document.Type.PLAIN_TEXT, 

272 language="en", 

273 ) 

274 

275 req = language_v1.ModerateTextRequest(document=document) 

276 

277 response = self.language_client.moderate_text(request=req) # type: ignore 

278 confidence_scores = self.parse_moderation_response(response) 

279 if len(confidence_scores) <= 0: 

280 LOG().warning(f"No moderation categories found in response for content: {content}") 

281 return None 

282 

283 score = ModerationScorer(confidences=confidence_scores) 

284 result = score.get_weighted_result() 

285 

286 if IS_DEBUG: 

287 LOG().debug(f"Scored {result} for content: {content}") 

288 

289 return result 

290 

291 @staticmethod 

292 def parse_moderation_response( 

293 response: language_v1.ModerateTextResponse, 

294 ) -> dict[ModerationCategory, float]: 

295 category_scores: dict[ModerationCategory, float] = {} 

296 for category in response.moderation_categories: 

297 try: 

298 category_enum = ModerationCategory.from_string(category.name) 

299 category_scores[category_enum] = category.confidence 

300 if IS_DEBUG: 

301 msg = f"Parsed category {category_enum} with confidence {category.confidence}" 

302 LOG().debug(msg) 

303 except KeyError: 

304 LOG().warning(f"Unknown moderation category: {category.name}") 

305 

306 return category_scores 

307 

308 # NOTE: Using moderation, but this may be necessary in future 

309 # def get_sentiment(self, content: str) -> SentimentResult: 

310 # document = language_v1.Document( 

311 # content=content, 

312 # type_=language_v1.Document.Type.PLAIN_TEXT, 

313 # language="en" 

314 # ) 

315 # 

316 # response = self.client.analyze_sentiment( 

317 # request={'document': document} 

318 # ) 

319 # 

320 # sentiment = response.document_sentiment 

321 # 

322 # return SentimentResult( 

323 # score=sentiment.score, 

324 # magnitude=sentiment.magnitude 

325 # )