Coverage for functions \ flipdare \ service \ safety \ core \ moderation_scorer.py: 96%
85 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-05-08 12:22 +1000
« prev ^ index » next coverage.py v7.13.0, created at 2026-05-08 12:22 +1000
1#!/usr/bin/env python
2# Copyright (c) 2026 Flipdare Pty Ltd. All rights reserved.
3#
4# This file is part of Flipdare's proprietary software and contains
5# confidential and copyrighted material. Unauthorised copying,
6# modification, distribution, or use of this file is strictly
7# prohibited without prior written permission from Flipdare Pty Ltd.
8#
9# This software includes third-party components licensed under MIT,
10# BSD, and Apache 2.0 licences. See THIRD_PARTY_NOTICES for details.
11#
14from numpy import argmax
15from flipdare.app_log import LOG
16from flipdare.constants import (
17 IS_DEBUG,
18 IS_TRACE,
19 MOD_CONFIDENCE_THRESHOLD,
20 MOD_MIN_AGREEMENT_CATEGORIES,
21 MOD_SHARPNESS,
22 MOD_WEIGHTED_FLAGGED_SCORE,
23 MOD_WEIGHTED_REVIEW_SCORE,
24)
25from flipdare.service.safety.safety_types import (
26 ModerationAssessment,
27 ModerationCategory,
28 ModerationType,
29)
32class ModerationScorer:
34 def __init__(
35 self,
36 confidences: dict[ModerationCategory, float],
37 confidence_threshold: float = MOD_CONFIDENCE_THRESHOLD,
38 sharpness: float = MOD_SHARPNESS,
39 min_agreement_categories: int = MOD_MIN_AGREEMENT_CATEGORIES,
40 weighted_flagged_score: float = MOD_WEIGHTED_FLAGGED_SCORE,
41 weighted_review_score: float = MOD_WEIGHTED_REVIEW_SCORE,
42 ) -> None:
43 category_confidences: dict[ModerationCategory, float] = {}
44 for category, confidence in confidences.items():
45 try:
46 category_enum = ModerationCategory.from_string(category.value)
47 category_confidences[category_enum] = confidence
48 except ValueError:
49 LOG().warning(f"Unknown moderation category: {category}")
51 self._category_confidences = category_confidences
52 self._confidence_threshold = confidence_threshold
53 self._sharpness = sharpness
54 self._weighted_flagged_score = weighted_flagged_score
55 self._weighted_review_score = weighted_review_score
56 self._min_agreement_categories = min_agreement_categories
58 @property
59 def category_confidences(self) -> dict[ModerationCategory, float]:
60 return self._category_confidences
62 @property
63 def confidence_threshold(self) -> float:
64 return self._confidence_threshold
66 @property
67 def sharpness(self) -> float:
68 return self._sharpness
70 @property
71 def weighted_flagged_score(self) -> float:
72 return self._weighted_flagged_score
74 @property
75 def weighted_review_score(self) -> float:
76 return self._weighted_review_score
78 @property
79 def min_agreement_categories(self) -> int:
80 return self._min_agreement_categories
82 def get_weighted_result(self) -> ModerationAssessment:
83 weighted_score, top_category = self.get_weighted_score()
84 # Lower thresholds for weighted method due to dampening factors
85 if top_category is None:
86 LOG().debug("No top category identified, returning SAFE result.")
87 return ModerationAssessment(
88 score=weighted_score,
89 moderation_type=ModerationType.SAFE,
90 moderation_category=None,
91 )
93 if weighted_score >= self.weighted_flagged_score:
94 result = ModerationType.FLAGGED
95 elif weighted_score >= self.weighted_review_score:
96 result = ModerationType.REVIEW
97 else:
98 result = ModerationType.SAFE
100 if IS_TRACE:
101 LOG().trace(f"Weighted result: score={weighted_score}, result={result}")
103 return ModerationAssessment(
104 score=weighted_score,
105 moderation_type=result,
106 moderation_category=top_category,
107 )
109 def get_weighted_score(self) -> tuple[float, ModerationCategory | None]:
110 """
111 Hybrid approach: confidence penalty + ensemble voting
112 Minimizes false positives while catching true violations
113 """
114 confidence_threshold = self.confidence_threshold
115 sharpness = self.sharpness
116 min_agreement_categories = self.min_agreement_categories
118 # Count high-confidence violations
119 high_confidence_violations = 0
120 total_score = 0.0
122 items = self.category_confidences.items()
123 if IS_DEBUG:
124 LOG().debug(
125 f"Calculating weighted score from category confidences: {len(items)} items"
126 )
128 category_scores: dict[ModerationCategory, float] = {}
130 for category, confidence in items:
131 weighting = category.weighting
133 if weighting == 0.0:
134 if IS_TRACE:
135 LOG().trace(f"Skipping category {category} with zero weighting")
136 continue
138 # Only count high-confidence predictions
139 if confidence < confidence_threshold:
140 if IS_TRACE:
141 LOG().trace(f"Skipping category {category} with low confidence {confidence}")
142 continue
144 category_scores[category] = confidence
145 high_confidence_violations += 1
146 # Apply aggressive confidence penalty
147 confidence_penalty = confidence**sharpness
148 total_score += confidence_penalty * weighting
150 # Require multiple categories to agree (reduces false positives)
151 if high_confidence_violations < min_agreement_categories:
152 if IS_DEBUG:
153 LOG().debug(
154 f"Not enough agreeing categories: "
155 f"{high_confidence_violations} < {min_agreement_categories}",
156 )
157 return 0.0, None # Not enough agreement
159 # Apply dampening factor based on number of agreeing categories
160 agreement_factor = min(1.0, high_confidence_violations / 3.0)
162 # return the highest scoring category for explainability
163 top_category = argmax(list(category_scores.values())) if category_scores else None
164 top_category_enum = (
165 list(category_scores.keys())[top_category] if top_category is not None else None
166 )
168 score = total_score * agreement_factor
169 if IS_DEBUG:
170 LOG().debug(
171 f"Weighted score calculation: total_score={total_score}, "
172 f"high_confidence_violations={high_confidence_violations}, "
173 f"agreement_factor={agreement_factor}, final_score={score}"
174 f", top_category={top_category_enum}",
175 )
176 return score, top_category_enum
179#
180# OLD METHODS - KEEP FOR REFERENCE
181#
182# def get_bayesian_result(self, prior_positive_rate=0.05) -> SentimentResult:
183# bayes_score = self.get_bayesian_score(prior_positive_rate=prior_positive_rate)
184# # Higher thresholds for Bayesian posterior probabilities
185# if bayes_score >= 0.85:
186# result = ModerationResult.BLOCK
187# elif bayes_score >= 0.60:
188# result = ModerationResult.REVIEW
189# else:
190# result = ModerationResult.SAFE
191# return SentimentResult(score=bayes_score, result=result)
192# def get_bayesian_score(self, prior_positive_rate=0.05) -> float:
193# """
194# Bayesian approach: P(truly harmful | observed scores)
195#
196# prior_positive_rate: Base rate of truly harmful content (e.g., 5%)
197# """
198# # Likelihood of observing these scores if content IS harmful
199# likelihood_positive = 1.0
200# # Likelihood of observing these scores if content is NOT harmful
201# likelihood_negative = 1.0
202#
203# for category, confidence in self.category_confidences.items():
204# weighting = category.weighting
205#
206# if weighting == 0.0:
207# continue
208#
209# # Model: harmful content has high confidence, safe content has low
210# # P(high_confidence | harmful) vs P(high_confidence | safe)
211# likelihood_positive *= confidence * weighting + (1 - weighting) * 0.1
212# likelihood_negative *= (1 - confidence * weighting) * 0.9 + 0.1
213#
214# # Bayes theorem
215# prior_negative_rate = 1 - prior_positive_rate
216# posterior = (likelihood_positive * prior_positive_rate) / \
217# (likelihood_positive * prior_positive_rate +
218# likelihood_negative * prior_negative_rate)
219#
220# return posterior
221# def get_weighted_score_with_confidence_penalty(self,
222# confidence_threshold=0.5,
223# sharpness=2.0) -> float:
224# """
225# Penalizes low-confidence predictions to reduce false positives
226# sharpness: Higher = more aggressive penalty (2.0 is balanced)
227# """
228# total_score = 0.0
229# for category, confidence in self.category_confidences.items():
230# weighting = category.weighting
231# if weighting == 0.0:
232# continue
233#
234# # Only count confidences above threshold
235# if confidence < confidence_threshold:
236# continue
237#
238# # Apply confidence penalty: confidence^sharpness
239# # Low confidence (0.6) gets heavily penalized vs high (0.9)
240# confidence_penalty = confidence ** sharpness
241# total_score += confidence_penalty * weighting
242# return total_score
243#
244# def get_geometric_mean_score(self, epsilon=0.01) -> float:
245# """
246# Geometric mean reduces impact of single high-confidence false positives
247# More conservative than arithmetic mean
248# """
249# product = 1.0
250# count = 0
251#
252# for category, confidence in self.category_confidences.items():
253# weighting = category.weighting
254#
255# if weighting == 0.0 or confidence < 0.5:
256# continue
257#
258# # Add small epsilon to avoid log(0)
259# weighted_conf = (confidence * weighting) + epsilon
260# product *= weighted_conf
261# count += 1
262#
263# if count == 0:
264# return 0.0
265#
266# # Geometric mean: (product)^(1/n)
267# return product ** (1.0 / count)
268#
269#
270# def get_harmonic_mean_score(self, category_ceiling=0.8) -> float:
271# """
272# Harmonic mean is very conservative - punishes inconsistency
273# Category ceiling prevents single category from dominating
274# """
275# weighted_reciprocals = 0.0
276# total_weight = 0.0
277#
278# for category, confidence in self.category_confidences.items():
279# weighting = category.weighting
280#
281# if weighting == 0.0 or confidence < 0.5:
282# continue
283#
284# # Cap individual category contribution
285# capped_confidence = min(confidence, category_ceiling)
286#
287# # Harmonic mean calculation
288# if capped_confidence > 0:
289# weighted_reciprocals += weighting / capped_confidence
290# total_weight += weighting
291#
292# if total_weight == 0 or weighted_reciprocals == 0:
293# return 0.0
294#
295# # Harmonic mean: n / (1/x1 + 1/x2 + ... + 1/xn)
296# return total_weight / weighted_reciprocals
297#
298# def get_gated_score(self) -> tuple[float, str]:
299# """
300# Multi-stage gating with reason codes
301# Returns (score, reason) for explainability
302# """
303# # Stage 1: Check for any high-confidence severe violations
304# critical_categories = [
305# ModerationCategory.DEATH_HARM_TRAGEDY,
306# ModerationCategory.SEXUAL,
307# ]
308#
309# for category in critical_categories:
310# confidence = self.category_confidences.get(category, 0.0)
311# if confidence > 0.85: # Very high bar
312# return (1.0, f"Critical: {category.value}")
313#
314# # Stage 2: Check for multiple medium-confidence violations
315# medium_violations = []
316# for category, confidence in self.category_confidences.items():
317# if category.weighting > 0.3 and confidence > 0.70:
318# medium_violations.append((category, confidence))
319#
320# if len(medium_violations) >= 2:
321# avg_confidence = sum(c for _, c in medium_violations) / len(medium_violations)
322# return (avg_confidence * 0.8, f"Multiple violations: {len(medium_violations)}")
323#
324# # Stage 3: Aggregated weighted score (conservative)
325# total_score = 0.0
326# for category, confidence in self.category_confidences.items():
327# if confidence > 0.75: # High confidence only
328# total_score += confidence ** 2 * category.weighting
329#
330# if total_score > 0.5:
331# return (total_score, "Aggregated score")
332#
333# return (0.0, "Clean")
334#
335#
336# def get_ensemble_score(self,
337# high_severity_threshold=0.75,
338# medium_severity_threshold=0.60,
339# min_high_severity_votes=1,
340# min_medium_severity_votes=2) -> float:
341# """
342# Ensemble voting: requires multiple categories to agree
343# Dramatically reduces false positives
344# """
345# high_severity_categories = [
346# ModerationCategory.DEATH_HARM_TRAGEDY,
347# ModerationCategory.SEXUAL,
348# ModerationCategory.VIOLENT,
349# ]
350#
351# high_severity_votes = 0
352# medium_severity_votes = 0
353# max_confidence = 0.0
354#
355# for category, confidence in self.category_confidences.items():
356# weighting = category.weighting
357#
358# if weighting == 0.0:
359# continue
360#
361# max_confidence = max(max_confidence, confidence)
362#
363# if category in high_severity_categories:
364# if confidence >= high_severity_threshold:
365# high_severity_votes += weighting
366# else:
367# if confidence >= medium_severity_threshold:
368# medium_severity_votes += weighting
369#
370# # Require consensus from multiple categories
371# if high_severity_votes >= min_high_severity_votes:
372# return max_confidence * high_severity_votes
373# elif medium_severity_votes >= min_medium_severity_votes:
374# return max_confidence * medium_severity_votes * 0.7 # Discount medium
375#
376# return 0.0 # Not enough agreement = not harmful