Coverage for functions \ flipdare \ service \ safety \ reputation_service.py: 75%
258 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-05-08 12:22 +1000
« prev ^ index » next coverage.py v7.13.0, created at 2026-05-08 12:22 +1000
1#!/usr/bin/env python
2# Copyright (c) 2026 Flipdare Pty Ltd. All rights reserved.
3#
4# This file is part of Flipdare's proprietary software and contains
5# confidential and copyrighted material. Unauthorised copying,
6# modification, distribution, or use of this file is strictly
7# prohibited without prior written permission from Flipdare Pty Ltd.
8#
9# This software includes third-party components licensed under MIT,
10# BSD, and Apache 2.0 licences. See THIRD_PARTY_NOTICES for details.
11#
13from __future__ import annotations
15from google.cloud.firestore_v1 import DocumentSnapshot
16from flipdare.constants import IS_TRACE
17from flipdare.service._service_provider import ServiceProvider
18from flipdare.app_log import LOG
19from flipdare.app_types import CronResult
20from flipdare.core.cron_decorator import cron_decorator
21from flipdare.generated import AppJobType
22from flipdare.generated.model.backend.metric.count_metric import CountMetric
23from flipdare.util.time_util import TimeUtil
24import math
25import secrets
26from typing import TYPE_CHECKING
27from cachetools import LRUCache
28from flipdare.constants import (
29 DEF_REPUTATION,
30 DEF_REPUTATION_CACHE_SIZE,
31 IS_DEBUG,
32 REP_CLEAN_REWARD,
33 REP_DECAY_RATE,
34 REP_DEFAULT_MULTIPLIERS,
35 REP_HIGH_RISK_THRESHOLD,
36 REP_LOW_RISK_THRESHOLD,
37 REP_PENALTY_GROWTH_RATE,
38 REP_REVIEW_PENALTY,
39 REP_REVIEW_THRESHOLD,
40 REP_VIOLATION_PENALTY,
41)
42from flipdare.generated import AppErrorCode, FlagType
43from flipdare.service.safety.safety_types import ReputationOutcome, ModerationType
44from flipdare.generated.shared.firestore_collections import FirestoreCollections
45from flipdare.wrapper import UserWrapper
47if TYPE_CHECKING:
48 from flipdare.manager.backend_manager import BackendManager
49 from flipdare.manager.db_manager import DbManager
52__all__ = ["ReputationService"]
55class ReputationService(ServiceProvider):
56 """Configuration class for Firebase Functions."""
58 def __init__(
59 self,
60 db_manager: DbManager | None = None,
61 backend_manager: BackendManager | None = None,
62 rnd: secrets.SystemRandom | None = None,
63 high_risk_threshold: int = REP_HIGH_RISK_THRESHOLD,
64 low_risk_threshold: int = REP_LOW_RISK_THRESHOLD,
65 review_penalty: int = REP_REVIEW_PENALTY,
66 review_threshold: int = REP_REVIEW_THRESHOLD,
67 violation_penalty: int = REP_VIOLATION_PENALTY,
68 clean_reward: int = REP_CLEAN_REWARD,
69 decay_rate: float = REP_DECAY_RATE,
70 multipliers: dict[str, float] = REP_DEFAULT_MULTIPLIERS,
71 def_reputation: int = DEF_REPUTATION,
72 def_cache_size: int = DEF_REPUTATION_CACHE_SIZE,
73 penalty_growth_rate: float = REP_PENALTY_GROWTH_RATE,
74 ) -> None:
75 super().__init__(
76 db_manager=db_manager,
77 backend_manager=backend_manager,
78 )
80 # Sampling parameters
81 self._high_risk_threshold = high_risk_threshold # Rep < 30 = high risk
82 self._low_risk_threshold = low_risk_threshold # Rep > 70 = low risk
83 self._review_penalty = review_penalty # Reputation adjustment parameters
84 # Reputation which above we have to review all content
85 self._review_threshold = review_threshold
86 self._violation_penalty = violation_penalty # Reputation hit per violation
87 self._clean_reward = clean_reward # Reputation gain per clean check
88 self._decay_rate = decay_rate # Reputation recovery rate
89 self._multipliers = multipliers
90 self._def_reputation = def_reputation
91 self._def_cache_size = def_cache_size
92 self._penalty_growth_rate = penalty_growth_rate
94 self._random = rnd
95 self._reputation_cache: LRUCache[str, int] = LRUCache(maxsize=def_cache_size)
97 LOG().debug(
98 f"Reputation Controller initialized with settings:\n"
99 f" high_risk_threshold={high_risk_threshold}\n"
100 f" low_risk_threshold={low_risk_threshold}\n"
101 f" review_penalty={review_penalty}\n"
102 f" review_threshold={review_threshold}\n"
103 f" violation_penalty={violation_penalty}\n"
104 f" clean_reward={clean_reward}\n"
105 f" decay_rate={decay_rate}\n"
106 f" penalty_growth_rate={penalty_growth_rate}\n"
107 f" multipliers={multipliers}\n"
108 f" def_reputation={def_reputation}\n"
109 f" def_cache_size={def_cache_size}\n",
110 )
112 @property
113 def rnd(self) -> secrets.SystemRandom:
114 if self._random is None:
115 self._random = secrets.SystemRandom()
116 return self._random
118 @property
119 def multipliers(self) -> dict[str, float]:
120 return self._multipliers
122 @property
123 def high_risk_threshold(self) -> int:
124 return self._high_risk_threshold
126 @property
127 def low_risk_threshold(self) -> int:
128 return self._low_risk_threshold
130 @property
131 def review_penalty(self) -> int:
132 return self._review_penalty
134 @property
135 def review_threshold(self) -> int:
136 return self._review_threshold
138 @property
139 def violation_penalty(self) -> int:
140 return self._violation_penalty
142 @property
143 def clean_reward(self) -> int:
144 return self._clean_reward
146 @property
147 def decay_rate(self) -> float:
148 return self._decay_rate
150 @property
151 def def_reputation(self) -> int:
152 return self._def_reputation
154 @property
155 def penalty_growth_rate(self) -> float:
156 return self._penalty_growth_rate
158 def clear_reputation_cache(self) -> None:
159 """Clear the reputation cache, e.g. after a bulk update."""
160 self._reputation_cache.clear()
162 LOG().info("Cleared reputation cache..")
164 def _get_content_multiplier(self, content_type: str) -> float:
165 # NOTE: We currently only analyse text, but this allows future expansion
166 """Content type risk multipliers."""
167 return self.multipliers.get(content_type, 1.0)
169 @cron_decorator(job_type=AppJobType.CR_USER_DECAY_REPUTATION)
170 def cron_decay_reputation(self) -> CronResult:
171 """
172 Periodic reputation decay toward neutral (50).
173 Call this daily/weekly to allow reputation recovery.
174 """
175 user_db = self.user_db
176 decay_rate = self.decay_rate
177 last_doc: DocumentSnapshot | None = None
178 def_reputation = self.def_reputation
179 success_ct = 0
180 skipped_ct = 0
181 failed_ct = 0
182 job_type = AppJobType.CR_USER_DECAY_REPUTATION
184 start = TimeUtil.get_current_utc_dt()
185 msg = f"Starting reputation decay with decay_rate={decay_rate} and def_reputation={def_reputation}."
186 LOG().info(msg)
188 while True:
189 decay_entries = user_db.get_users_to_decay(last_doc)
190 users_to_decay = decay_entries.users
191 last_doc = decay_entries.last_doc
193 if users_to_decay is None or len(users_to_decay) == 0:
194 break
196 for user_model in users_to_decay:
197 try:
198 user_id = user_model.doc_id
199 current_rep = user_model.reputation
200 new_rep = def_reputation + (current_rep - def_reputation) * decay_rate
201 new_rep = math.ceil(new_rep)
202 if IS_DEBUG:
203 msg = f"Decaying reputation for user {user_id}: {current_rep} -> {new_rep}"
204 LOG().debug(msg)
206 self.update_user_reputation(
207 user_model,
208 new_rep,
209 job_type=job_type,
210 )
211 success_ct += 1
213 except Exception as e:
214 LOG().error(f"Error decaying reputation for user {user_model.doc_id}: {e}")
215 continue
217 if last_doc is None:
218 break
220 # Clear entire cache after decay since many reputations changed
221 self.clear_reputation_cache()
223 # log
224 end = TimeUtil.get_current_utc_dt()
225 duration = TimeUtil.duration_in_seconds(start, end)
227 if IS_DEBUG:
228 LOG().debug(
229 f"Reputation decay completed: success={success_ct}, "
230 f"skipped={skipped_ct}, failed={failed_ct}.",
231 )
232 return CountMetric(
233 success_ct=success_ct,
234 failed_ct=failed_ct,
235 skipped_ct=skipped_ct,
236 duration=duration,
237 )
239 def should_analyze(self, user_id: str, content_type: str = "text") -> ReputationOutcome:
240 """Adaptive risk-based sampling."""
241 # Get current reputation
242 reputation = self.get_reputation(user_id)
243 if IS_DEBUG:
244 msg = f"Evaluating should_analyze for user {user_id}: reputation {reputation}, content_type {content_type}."
245 LOG().debug(msg)
247 if content_type != "text":
248 msg = f"Content type {content_type} currently not supported for analysis."
249 LOG().debug(msg)
250 return ReputationOutcome(new_reputation=reputation, should_analyze=False)
252 # Calculate sampling rate based on reputation
253 if reputation < self.high_risk_threshold:
254 base_rate = 1.0 # 100% for high risk
255 elif reputation > self.low_risk_threshold:
256 base_rate = 0.02 # 2% for low risk
257 else:
258 # Linear interpolation for medium risk
259 range_size = self.low_risk_threshold - self.high_risk_threshold
260 position = (reputation - self.high_risk_threshold) / range_size
261 base_rate = 0.5 * (1.0 - position) + 0.02 * position
263 # Content type multiplier
264 content_multiplier = self._get_content_multiplier(content_type)
265 effective_rate = min(1.0, base_rate * content_multiplier)
267 should_analyze = self.rnd.random() < effective_rate
269 if IS_TRACE:
270 msg = (
271 f"User {user_id} with reputation {reputation} has "
272 f"base_rate={base_rate:.4f}, "
273 f"content_multiplier={content_multiplier:.2f}, "
274 f"effective_rate={effective_rate:.4f} -> "
275 f"should_analyze={should_analyze}"
276 )
277 LOG().trace(msg)
279 return ReputationOutcome(new_reputation=reputation, should_analyze=should_analyze)
281 def confirm_review(
282 self,
283 user_id: str,
284 moderation_type: ModerationType,
285 ) -> ReputationOutcome:
286 # this takes the moderation_type and incorporates the users reputation to compute
287 # a final outcome of whether the content should be sent for moderation or can be auto-approved,
288 # as well as updating the users reputation accordingly.
289 """Update user reputation after analysis."""
290 violation_found = moderation_type != ModerationType.SAFE
292 current_rep = self.get_reputation(user_id)
293 penalty = self.violation_penalty
294 if moderation_type == ModerationType.REVIEW:
295 penalty = self.review_penalty
297 if IS_TRACE:
298 msg = (
299 f"Updating reputation for user {user_id}: "
300 f"current={current_rep}, penalty={penalty}, "
301 f" violation_found={violation_found}"
302 )
303 LOG().trace(msg)
305 if violation_found:
306 # Significant penalty for
307 new_rep = max(0, current_rep + penalty)
308 else:
309 # Small reward for clean content
310 new_rep = min(100, current_rep + self._clean_reward)
312 self.set_reputation(user_id, new_rep)
313 if not violation_found:
314 if IS_DEBUG:
315 msg = f"User {user_id} content is clean. Rewarding reputation: {current_rep} -> {new_rep}"
316 LOG().debug(msg)
318 return ReputationOutcome(new_reputation=new_rep, should_analyze=False)
320 if new_rep > self.review_threshold:
321 if IS_DEBUG:
322 LOG().debug(f"User {user_id} reputation {new_rep} below review threshold.")
323 return ReputationOutcome(new_reputation=new_rep, should_analyze=False)
325 if IS_DEBUG:
326 LOG().debug(f"User {user_id} reputation {new_rep} requires review.")
328 return ReputationOutcome(new_reputation=new_rep, should_analyze=True)
330 def get_reputation(self, user_id: str) -> int:
331 # Check cache first
332 if user_id in self._reputation_cache:
333 cached_reputation = self._reputation_cache[user_id]
334 if IS_DEBUG:
335 msg = f"Cached reputation for user {user_id}: {cached_reputation}."
336 LOG().debug(msg)
338 return cached_reputation
340 user_db = self.user_db
341 reputation: int = self.def_reputation
343 user_model = user_db.get(user_id)
344 if user_model is not None:
345 reputation = user_model.reputation
346 if IS_DEBUG:
347 LOG().debug(f"Fetched reputation for user {user_id}: {reputation}.")
348 elif IS_DEBUG:
349 msg = f"User {user_id} not found or has no reputation; using default {reputation}."
350 LOG().debug(msg)
352 # Cache the result
353 self._reputation_cache[user_id] = reputation
354 return reputation
356 def set_reputation(self, user_id: str, reputation: int) -> None:
357 # Update cache with new reputation
358 self._reputation_cache[user_id] = reputation
360 user_db = self.user_db
361 user_model = user_db.get(user_id)
362 if user_model is None:
363 if IS_DEBUG:
364 LOG().debug(f"User {user_id} not found, cannot set reputation.")
365 return
367 user_model.reputation = reputation
368 updates = user_model.get_updates()
369 if not updates:
370 if IS_DEBUG:
371 LOG().debug(f"No updates for user {user_id}, cannot set reputation.")
372 return
374 user_db.update(user_id, updates)
375 if IS_DEBUG:
376 LOG().debug(f"Set reputation for user {user_id} to {reputation}.")
378 def get_flag_penalty(self, flag_type: FlagType, violation_count: int = 1) -> int:
379 """
380 Calculate reputation penalty using logarithmic growth to account for false positives.
381 Uses FLAG_TYPE_SEVERITY as base weight with slower exponential growth curve.
383 Formula: penalty = base_severity * log(1 + violation_count * growth_factor)
385 This provides:
386 - Higher penalties for more severe flag types
387 - Diminishing returns for repeated violations (slower growth)
388 - Protection against over-penalization from false positives
390 Args:
391 flag_type: The type of flag violation
392 violation_count: Number of violations (default: 1)
393 growth_factor: Controls growth rate (0.1-1.0). Lower = slower growth. Default: 0.5
395 Returns:
396 Reputation penalty points (integer)
398 Examples:
399 DEATH (severity=100):
400 1 violation: 69 points
401 2 violations: 95 points
402 3 violations: 110 points
403 5 violations: 127 points
405 SPAM (severity=10):
406 1 violation: 7 points
407 2 violations: 10 points
408 3 violations: 11 points
409 5 violations: 13 points
411 """
412 if violation_count <= 0:
413 return 0
415 base_severity = flag_type.severity
417 # Logarithmic growth: slower exponential curve
418 # log(1 + x) grows quickly at first then slows down
419 penalty = base_severity * math.log(1 + violation_count * self.penalty_growth_rate)
420 return int(penalty)
422 def penalize_user_for_flag(self, flag_type: FlagType, user: UserWrapper) -> None:
423 job_type = AppJobType.CR_FLAG_UNPROCESSED
424 col = FirestoreCollections.USER
425 user_id = user.doc_id
426 assert user_id # NOTE: type narrowing.
427 try:
428 current_rep = user.reputation
430 flag_ct = user.flags
431 if flag_ct == 0:
432 flag_ct = 1 # need to set 1 at least
434 penalty = self.get_flag_penalty(flag_type, violation_count=flag_ct)
435 new_rep = max(0, current_rep - penalty)
437 if IS_DEBUG:
438 LOG().debug(
439 f"Penalizing user {user_id} for flag {flag_type} with {flag_ct} violations: "
440 f"penalty={penalty}, reputation {current_rep} -> {new_rep}",
441 )
443 self.update_user_reputation(user, new_rep, job_type=job_type)
445 except Exception as e:
446 msg = f"Error fetching user {user_id} to penalize for flag {flag_type}: {e}"
447 LOG().error(msg)
448 self.app_logger.db_error(
449 error_code=AppErrorCode.DATABASE_EX,
450 collection=col,
451 job_type=job_type,
452 message=msg,
453 )
455 def update_user_reputation(
456 self,
457 user: UserWrapper,
458 new_reputation: int,
459 job_type: AppJobType,
460 ) -> bool:
461 """Helper to update user reputation in DB."""
462 if user.reputation == new_reputation:
463 if IS_DEBUG:
464 LOG().debug(f"No reputation change for user {user.doc_id}, skipping update.")
465 return True
467 if IS_DEBUG:
468 msg = f"Updating reputation for user {user.doc_id}: {user.reputation} -> {new_reputation}"
469 LOG().debug(msg)
471 user.reputation = new_reputation
472 doc_id = user.doc_id
474 updates = user.get_updates()
475 if not updates:
476 msg = f"No changes detected for reputation update for user {doc_id}."
477 LOG().warning(msg)
478 self._log_user_error(
479 user_id=doc_id,
480 error_code=AppErrorCode.UNEXPECTED_CODE_PATH,
481 job_type=job_type,
482 cause=msg,
483 )
484 return False
486 user_db = self.user_db
487 updated_user = user_db.update(user.doc_id, updates)
488 if updated_user is None:
489 msg = f"Failed to update reputation for user {user.doc_id} in DB."
490 LOG().warning(msg)
491 self._log_user_error(
492 user_id=doc_id,
493 error_code=AppErrorCode.DATABASE_EX,
494 job_type=job_type,
495 cause=msg,
496 )
497 return False
499 if IS_DEBUG:
500 LOG().debug(f"Updated reputation for user {user.doc_id} to {new_reputation}.")
501 return True
503 def _log_user_error(
504 self,
505 user_id: str,
506 error_code: AppErrorCode,
507 job_type: AppJobType,
508 cause: str,
509 db_error: bool = True,
510 ) -> None:
511 """Helper to log user-related errors."""
512 LOG().error(f"Error for user {user_id}: {cause}")
513 if db_error:
514 self.app_logger.db_error(
515 error_code=error_code,
516 collection=FirestoreCollections.USER,
517 job_type=job_type,
518 message=f"User {user_id}: {cause}",
519 )
520 else:
521 self.app_logger.unexpected_code_path(
522 collection=FirestoreCollections.USER,
523 job_type=job_type,
524 message=f"User {user_id}: {cause}",
525 )