Coverage for functions \ flipdare \ service \ safety \ reputation_service.py: 75%

258 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-05-08 12:22 +1000

1#!/usr/bin/env python 

2# Copyright (c) 2026 Flipdare Pty Ltd. All rights reserved. 

3# 

4# This file is part of Flipdare's proprietary software and contains 

5# confidential and copyrighted material. Unauthorised copying, 

6# modification, distribution, or use of this file is strictly 

7# prohibited without prior written permission from Flipdare Pty Ltd. 

8# 

9# This software includes third-party components licensed under MIT, 

10# BSD, and Apache 2.0 licences. See THIRD_PARTY_NOTICES for details. 

11# 

12 

13from __future__ import annotations 

14 

15from google.cloud.firestore_v1 import DocumentSnapshot 

16from flipdare.constants import IS_TRACE 

17from flipdare.service._service_provider import ServiceProvider 

18from flipdare.app_log import LOG 

19from flipdare.app_types import CronResult 

20from flipdare.core.cron_decorator import cron_decorator 

21from flipdare.generated import AppJobType 

22from flipdare.generated.model.backend.metric.count_metric import CountMetric 

23from flipdare.util.time_util import TimeUtil 

24import math 

25import secrets 

26from typing import TYPE_CHECKING 

27from cachetools import LRUCache 

28from flipdare.constants import ( 

29 DEF_REPUTATION, 

30 DEF_REPUTATION_CACHE_SIZE, 

31 IS_DEBUG, 

32 REP_CLEAN_REWARD, 

33 REP_DECAY_RATE, 

34 REP_DEFAULT_MULTIPLIERS, 

35 REP_HIGH_RISK_THRESHOLD, 

36 REP_LOW_RISK_THRESHOLD, 

37 REP_PENALTY_GROWTH_RATE, 

38 REP_REVIEW_PENALTY, 

39 REP_REVIEW_THRESHOLD, 

40 REP_VIOLATION_PENALTY, 

41) 

42from flipdare.generated import AppErrorCode, FlagType 

43from flipdare.service.safety.safety_types import ReputationOutcome, ModerationType 

44from flipdare.generated.shared.firestore_collections import FirestoreCollections 

45from flipdare.wrapper import UserWrapper 

46 

47if TYPE_CHECKING: 

48 from flipdare.manager.backend_manager import BackendManager 

49 from flipdare.manager.db_manager import DbManager 

50 

51 

52__all__ = ["ReputationService"] 

53 

54 

55class ReputationService(ServiceProvider): 

56 """Configuration class for Firebase Functions.""" 

57 

58 def __init__( 

59 self, 

60 db_manager: DbManager | None = None, 

61 backend_manager: BackendManager | None = None, 

62 rnd: secrets.SystemRandom | None = None, 

63 high_risk_threshold: int = REP_HIGH_RISK_THRESHOLD, 

64 low_risk_threshold: int = REP_LOW_RISK_THRESHOLD, 

65 review_penalty: int = REP_REVIEW_PENALTY, 

66 review_threshold: int = REP_REVIEW_THRESHOLD, 

67 violation_penalty: int = REP_VIOLATION_PENALTY, 

68 clean_reward: int = REP_CLEAN_REWARD, 

69 decay_rate: float = REP_DECAY_RATE, 

70 multipliers: dict[str, float] = REP_DEFAULT_MULTIPLIERS, 

71 def_reputation: int = DEF_REPUTATION, 

72 def_cache_size: int = DEF_REPUTATION_CACHE_SIZE, 

73 penalty_growth_rate: float = REP_PENALTY_GROWTH_RATE, 

74 ) -> None: 

75 super().__init__( 

76 db_manager=db_manager, 

77 backend_manager=backend_manager, 

78 ) 

79 

80 # Sampling parameters 

81 self._high_risk_threshold = high_risk_threshold # Rep < 30 = high risk 

82 self._low_risk_threshold = low_risk_threshold # Rep > 70 = low risk 

83 self._review_penalty = review_penalty # Reputation adjustment parameters 

84 # Reputation which above we have to review all content 

85 self._review_threshold = review_threshold 

86 self._violation_penalty = violation_penalty # Reputation hit per violation 

87 self._clean_reward = clean_reward # Reputation gain per clean check 

88 self._decay_rate = decay_rate # Reputation recovery rate 

89 self._multipliers = multipliers 

90 self._def_reputation = def_reputation 

91 self._def_cache_size = def_cache_size 

92 self._penalty_growth_rate = penalty_growth_rate 

93 

94 self._random = rnd 

95 self._reputation_cache: LRUCache[str, int] = LRUCache(maxsize=def_cache_size) 

96 

97 LOG().debug( 

98 f"Reputation Controller initialized with settings:\n" 

99 f" high_risk_threshold={high_risk_threshold}\n" 

100 f" low_risk_threshold={low_risk_threshold}\n" 

101 f" review_penalty={review_penalty}\n" 

102 f" review_threshold={review_threshold}\n" 

103 f" violation_penalty={violation_penalty}\n" 

104 f" clean_reward={clean_reward}\n" 

105 f" decay_rate={decay_rate}\n" 

106 f" penalty_growth_rate={penalty_growth_rate}\n" 

107 f" multipliers={multipliers}\n" 

108 f" def_reputation={def_reputation}\n" 

109 f" def_cache_size={def_cache_size}\n", 

110 ) 

111 

112 @property 

113 def rnd(self) -> secrets.SystemRandom: 

114 if self._random is None: 

115 self._random = secrets.SystemRandom() 

116 return self._random 

117 

118 @property 

119 def multipliers(self) -> dict[str, float]: 

120 return self._multipliers 

121 

122 @property 

123 def high_risk_threshold(self) -> int: 

124 return self._high_risk_threshold 

125 

126 @property 

127 def low_risk_threshold(self) -> int: 

128 return self._low_risk_threshold 

129 

130 @property 

131 def review_penalty(self) -> int: 

132 return self._review_penalty 

133 

134 @property 

135 def review_threshold(self) -> int: 

136 return self._review_threshold 

137 

138 @property 

139 def violation_penalty(self) -> int: 

140 return self._violation_penalty 

141 

142 @property 

143 def clean_reward(self) -> int: 

144 return self._clean_reward 

145 

146 @property 

147 def decay_rate(self) -> float: 

148 return self._decay_rate 

149 

150 @property 

151 def def_reputation(self) -> int: 

152 return self._def_reputation 

153 

154 @property 

155 def penalty_growth_rate(self) -> float: 

156 return self._penalty_growth_rate 

157 

158 def clear_reputation_cache(self) -> None: 

159 """Clear the reputation cache, e.g. after a bulk update.""" 

160 self._reputation_cache.clear() 

161 

162 LOG().info("Cleared reputation cache..") 

163 

164 def _get_content_multiplier(self, content_type: str) -> float: 

165 # NOTE: We currently only analyse text, but this allows future expansion 

166 """Content type risk multipliers.""" 

167 return self.multipliers.get(content_type, 1.0) 

168 

169 @cron_decorator(job_type=AppJobType.CR_USER_DECAY_REPUTATION) 

170 def cron_decay_reputation(self) -> CronResult: 

171 """ 

172 Periodic reputation decay toward neutral (50). 

173 Call this daily/weekly to allow reputation recovery. 

174 """ 

175 user_db = self.user_db 

176 decay_rate = self.decay_rate 

177 last_doc: DocumentSnapshot | None = None 

178 def_reputation = self.def_reputation 

179 success_ct = 0 

180 skipped_ct = 0 

181 failed_ct = 0 

182 job_type = AppJobType.CR_USER_DECAY_REPUTATION 

183 

184 start = TimeUtil.get_current_utc_dt() 

185 msg = f"Starting reputation decay with decay_rate={decay_rate} and def_reputation={def_reputation}." 

186 LOG().info(msg) 

187 

188 while True: 

189 decay_entries = user_db.get_users_to_decay(last_doc) 

190 users_to_decay = decay_entries.users 

191 last_doc = decay_entries.last_doc 

192 

193 if users_to_decay is None or len(users_to_decay) == 0: 

194 break 

195 

196 for user_model in users_to_decay: 

197 try: 

198 user_id = user_model.doc_id 

199 current_rep = user_model.reputation 

200 new_rep = def_reputation + (current_rep - def_reputation) * decay_rate 

201 new_rep = math.ceil(new_rep) 

202 if IS_DEBUG: 

203 msg = f"Decaying reputation for user {user_id}: {current_rep} -> {new_rep}" 

204 LOG().debug(msg) 

205 

206 self.update_user_reputation( 

207 user_model, 

208 new_rep, 

209 job_type=job_type, 

210 ) 

211 success_ct += 1 

212 

213 except Exception as e: 

214 LOG().error(f"Error decaying reputation for user {user_model.doc_id}: {e}") 

215 continue 

216 

217 if last_doc is None: 

218 break 

219 

220 # Clear entire cache after decay since many reputations changed 

221 self.clear_reputation_cache() 

222 

223 # log 

224 end = TimeUtil.get_current_utc_dt() 

225 duration = TimeUtil.duration_in_seconds(start, end) 

226 

227 if IS_DEBUG: 

228 LOG().debug( 

229 f"Reputation decay completed: success={success_ct}, " 

230 f"skipped={skipped_ct}, failed={failed_ct}.", 

231 ) 

232 return CountMetric( 

233 success_ct=success_ct, 

234 failed_ct=failed_ct, 

235 skipped_ct=skipped_ct, 

236 duration=duration, 

237 ) 

238 

239 def should_analyze(self, user_id: str, content_type: str = "text") -> ReputationOutcome: 

240 """Adaptive risk-based sampling.""" 

241 # Get current reputation 

242 reputation = self.get_reputation(user_id) 

243 if IS_DEBUG: 

244 msg = f"Evaluating should_analyze for user {user_id}: reputation {reputation}, content_type {content_type}." 

245 LOG().debug(msg) 

246 

247 if content_type != "text": 

248 msg = f"Content type {content_type} currently not supported for analysis." 

249 LOG().debug(msg) 

250 return ReputationOutcome(new_reputation=reputation, should_analyze=False) 

251 

252 # Calculate sampling rate based on reputation 

253 if reputation < self.high_risk_threshold: 

254 base_rate = 1.0 # 100% for high risk 

255 elif reputation > self.low_risk_threshold: 

256 base_rate = 0.02 # 2% for low risk 

257 else: 

258 # Linear interpolation for medium risk 

259 range_size = self.low_risk_threshold - self.high_risk_threshold 

260 position = (reputation - self.high_risk_threshold) / range_size 

261 base_rate = 0.5 * (1.0 - position) + 0.02 * position 

262 

263 # Content type multiplier 

264 content_multiplier = self._get_content_multiplier(content_type) 

265 effective_rate = min(1.0, base_rate * content_multiplier) 

266 

267 should_analyze = self.rnd.random() < effective_rate 

268 

269 if IS_TRACE: 

270 msg = ( 

271 f"User {user_id} with reputation {reputation} has " 

272 f"base_rate={base_rate:.4f}, " 

273 f"content_multiplier={content_multiplier:.2f}, " 

274 f"effective_rate={effective_rate:.4f} -> " 

275 f"should_analyze={should_analyze}" 

276 ) 

277 LOG().trace(msg) 

278 

279 return ReputationOutcome(new_reputation=reputation, should_analyze=should_analyze) 

280 

281 def confirm_review( 

282 self, 

283 user_id: str, 

284 moderation_type: ModerationType, 

285 ) -> ReputationOutcome: 

286 # this takes the moderation_type and incorporates the users reputation to compute 

287 # a final outcome of whether the content should be sent for moderation or can be auto-approved, 

288 # as well as updating the users reputation accordingly. 

289 """Update user reputation after analysis.""" 

290 violation_found = moderation_type != ModerationType.SAFE 

291 

292 current_rep = self.get_reputation(user_id) 

293 penalty = self.violation_penalty 

294 if moderation_type == ModerationType.REVIEW: 

295 penalty = self.review_penalty 

296 

297 if IS_TRACE: 

298 msg = ( 

299 f"Updating reputation for user {user_id}: " 

300 f"current={current_rep}, penalty={penalty}, " 

301 f" violation_found={violation_found}" 

302 ) 

303 LOG().trace(msg) 

304 

305 if violation_found: 

306 # Significant penalty for 

307 new_rep = max(0, current_rep + penalty) 

308 else: 

309 # Small reward for clean content 

310 new_rep = min(100, current_rep + self._clean_reward) 

311 

312 self.set_reputation(user_id, new_rep) 

313 if not violation_found: 

314 if IS_DEBUG: 

315 msg = f"User {user_id} content is clean. Rewarding reputation: {current_rep} -> {new_rep}" 

316 LOG().debug(msg) 

317 

318 return ReputationOutcome(new_reputation=new_rep, should_analyze=False) 

319 

320 if new_rep > self.review_threshold: 

321 if IS_DEBUG: 

322 LOG().debug(f"User {user_id} reputation {new_rep} below review threshold.") 

323 return ReputationOutcome(new_reputation=new_rep, should_analyze=False) 

324 

325 if IS_DEBUG: 

326 LOG().debug(f"User {user_id} reputation {new_rep} requires review.") 

327 

328 return ReputationOutcome(new_reputation=new_rep, should_analyze=True) 

329 

330 def get_reputation(self, user_id: str) -> int: 

331 # Check cache first 

332 if user_id in self._reputation_cache: 

333 cached_reputation = self._reputation_cache[user_id] 

334 if IS_DEBUG: 

335 msg = f"Cached reputation for user {user_id}: {cached_reputation}." 

336 LOG().debug(msg) 

337 

338 return cached_reputation 

339 

340 user_db = self.user_db 

341 reputation: int = self.def_reputation 

342 

343 user_model = user_db.get(user_id) 

344 if user_model is not None: 

345 reputation = user_model.reputation 

346 if IS_DEBUG: 

347 LOG().debug(f"Fetched reputation for user {user_id}: {reputation}.") 

348 elif IS_DEBUG: 

349 msg = f"User {user_id} not found or has no reputation; using default {reputation}." 

350 LOG().debug(msg) 

351 

352 # Cache the result 

353 self._reputation_cache[user_id] = reputation 

354 return reputation 

355 

356 def set_reputation(self, user_id: str, reputation: int) -> None: 

357 # Update cache with new reputation 

358 self._reputation_cache[user_id] = reputation 

359 

360 user_db = self.user_db 

361 user_model = user_db.get(user_id) 

362 if user_model is None: 

363 if IS_DEBUG: 

364 LOG().debug(f"User {user_id} not found, cannot set reputation.") 

365 return 

366 

367 user_model.reputation = reputation 

368 updates = user_model.get_updates() 

369 if not updates: 

370 if IS_DEBUG: 

371 LOG().debug(f"No updates for user {user_id}, cannot set reputation.") 

372 return 

373 

374 user_db.update(user_id, updates) 

375 if IS_DEBUG: 

376 LOG().debug(f"Set reputation for user {user_id} to {reputation}.") 

377 

378 def get_flag_penalty(self, flag_type: FlagType, violation_count: int = 1) -> int: 

379 """ 

380 Calculate reputation penalty using logarithmic growth to account for false positives. 

381 Uses FLAG_TYPE_SEVERITY as base weight with slower exponential growth curve. 

382 

383 Formula: penalty = base_severity * log(1 + violation_count * growth_factor) 

384 

385 This provides: 

386 - Higher penalties for more severe flag types 

387 - Diminishing returns for repeated violations (slower growth) 

388 - Protection against over-penalization from false positives 

389 

390 Args: 

391 flag_type: The type of flag violation 

392 violation_count: Number of violations (default: 1) 

393 growth_factor: Controls growth rate (0.1-1.0). Lower = slower growth. Default: 0.5 

394 

395 Returns: 

396 Reputation penalty points (integer) 

397 

398 Examples: 

399 DEATH (severity=100): 

400 1 violation: 69 points 

401 2 violations: 95 points 

402 3 violations: 110 points 

403 5 violations: 127 points 

404 

405 SPAM (severity=10): 

406 1 violation: 7 points 

407 2 violations: 10 points 

408 3 violations: 11 points 

409 5 violations: 13 points 

410 

411 """ 

412 if violation_count <= 0: 

413 return 0 

414 

415 base_severity = flag_type.severity 

416 

417 # Logarithmic growth: slower exponential curve 

418 # log(1 + x) grows quickly at first then slows down 

419 penalty = base_severity * math.log(1 + violation_count * self.penalty_growth_rate) 

420 return int(penalty) 

421 

422 def penalize_user_for_flag(self, flag_type: FlagType, user: UserWrapper) -> None: 

423 job_type = AppJobType.CR_FLAG_UNPROCESSED 

424 col = FirestoreCollections.USER 

425 user_id = user.doc_id 

426 assert user_id # NOTE: type narrowing. 

427 try: 

428 current_rep = user.reputation 

429 

430 flag_ct = user.flags 

431 if flag_ct == 0: 

432 flag_ct = 1 # need to set 1 at least 

433 

434 penalty = self.get_flag_penalty(flag_type, violation_count=flag_ct) 

435 new_rep = max(0, current_rep - penalty) 

436 

437 if IS_DEBUG: 

438 LOG().debug( 

439 f"Penalizing user {user_id} for flag {flag_type} with {flag_ct} violations: " 

440 f"penalty={penalty}, reputation {current_rep} -> {new_rep}", 

441 ) 

442 

443 self.update_user_reputation(user, new_rep, job_type=job_type) 

444 

445 except Exception as e: 

446 msg = f"Error fetching user {user_id} to penalize for flag {flag_type}: {e}" 

447 LOG().error(msg) 

448 self.app_logger.db_error( 

449 error_code=AppErrorCode.DATABASE_EX, 

450 collection=col, 

451 job_type=job_type, 

452 message=msg, 

453 ) 

454 

455 def update_user_reputation( 

456 self, 

457 user: UserWrapper, 

458 new_reputation: int, 

459 job_type: AppJobType, 

460 ) -> bool: 

461 """Helper to update user reputation in DB.""" 

462 if user.reputation == new_reputation: 

463 if IS_DEBUG: 

464 LOG().debug(f"No reputation change for user {user.doc_id}, skipping update.") 

465 return True 

466 

467 if IS_DEBUG: 

468 msg = f"Updating reputation for user {user.doc_id}: {user.reputation} -> {new_reputation}" 

469 LOG().debug(msg) 

470 

471 user.reputation = new_reputation 

472 doc_id = user.doc_id 

473 

474 updates = user.get_updates() 

475 if not updates: 

476 msg = f"No changes detected for reputation update for user {doc_id}." 

477 LOG().warning(msg) 

478 self._log_user_error( 

479 user_id=doc_id, 

480 error_code=AppErrorCode.UNEXPECTED_CODE_PATH, 

481 job_type=job_type, 

482 cause=msg, 

483 ) 

484 return False 

485 

486 user_db = self.user_db 

487 updated_user = user_db.update(user.doc_id, updates) 

488 if updated_user is None: 

489 msg = f"Failed to update reputation for user {user.doc_id} in DB." 

490 LOG().warning(msg) 

491 self._log_user_error( 

492 user_id=doc_id, 

493 error_code=AppErrorCode.DATABASE_EX, 

494 job_type=job_type, 

495 cause=msg, 

496 ) 

497 return False 

498 

499 if IS_DEBUG: 

500 LOG().debug(f"Updated reputation for user {user.doc_id} to {new_reputation}.") 

501 return True 

502 

503 def _log_user_error( 

504 self, 

505 user_id: str, 

506 error_code: AppErrorCode, 

507 job_type: AppJobType, 

508 cause: str, 

509 db_error: bool = True, 

510 ) -> None: 

511 """Helper to log user-related errors.""" 

512 LOG().error(f"Error for user {user_id}: {cause}") 

513 if db_error: 

514 self.app_logger.db_error( 

515 error_code=error_code, 

516 collection=FirestoreCollections.USER, 

517 job_type=job_type, 

518 message=f"User {user_id}: {cause}", 

519 ) 

520 else: 

521 self.app_logger.unexpected_code_path( 

522 collection=FirestoreCollections.USER, 

523 job_type=job_type, 

524 message=f"User {user_id}: {cause}", 

525 )