Coverage for functions \ flipdare \ service \ safety \ core \ restriction_calculator.py: 90%
126 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-05-08 12:22 +1000
« prev ^ index » next coverage.py v7.13.0, created at 2026-05-08 12:22 +1000
1#!/usr/bin/env python
2# Copyright (c) 2026 Flipdare Pty Ltd. All rights reserved.
3#
4# This file is part of Flipdare's proprietary software and contains
5# confidential and copyrighted material. Unauthorised copying,
6# modification, distribution, or use of this file is strictly
7# prohibited without prior written permission from Flipdare Pty Ltd.
8#
9# This software includes third-party components licensed under MIT,
10# BSD, and Apache 2.0 licences. See THIRD_PARTY_NOTICES for details.
11#
14from datetime import datetime
15from typing import Self
17from flipdare.app_log import LOG
18from flipdare.constants import CHAT_REJECT_REPUTATION_THRESHOLD, CHAT_REVIEW_REPUTATION_THRESHOLD
19from flipdare.generated import (
20 FlagType,
21 ModerationDecision,
22 RestrictionAction,
23)
24from flipdare.generated.shared.model.core.stopwatch_duration import StopwatchDuration
25from flipdare.service.safety.safety_types import RestrictionResult
26from flipdare.util.time_util import TimeUtil
27from flipdare.wrapper import RestrictionWrapper
29__all__ = ["RestrictionCalculator"]
31_D = StopwatchDuration
32_A = RestrictionAction
35class ChatRestrictionCalculator:
36 """
37 Chat restrictions are more flexible that dare restrictions,
38 as we want to be able to restrict users from chatting without
39 restricting them from creating dares or pledges.
40 The logic for chat restrictions is as follows:
41 - Approved - No restriction
42 - Rejected - Restrict (since a the support team reviewed manually)
43 - Auto-reject due to reputation - Restrict if reputation is below threshold
44 - Review required - Restrict if reputation is below threshold
45 """
47 def __init__(self, decision: ModerationDecision, reputation: int) -> None:
48 self._decision = decision
49 self._reputation = reputation
51 @property
52 def decision(self) -> ModerationDecision:
53 return self._decision
55 @property
56 def reputation(self) -> int:
57 return self._reputation
59 def should_block(self) -> bool:
60 decision = self.decision
61 reputation = self.reputation
63 match decision:
64 case (
65 ModerationDecision.APPROVED
66 | ModerationDecision.AUTO_APPROVE_REPUTATION
67 | ModerationDecision.AUTO_APPROVE_SENTIMENT
68 ):
69 return False
70 case ModerationDecision.REJECTED:
71 return True
72 case (
73 ModerationDecision.AUTO_REJECT_REPUTATION
74 | ModerationDecision.AUTO_REJECT_SENTIMENT
75 ):
76 return reputation < CHAT_REJECT_REPUTATION_THRESHOLD
77 case ModerationDecision.REVIEW_REQUIRED:
78 return reputation < CHAT_REVIEW_REPUTATION_THRESHOLD
81class RestrictionCalculator:
82 _service_ack: bool
83 _in_danger: bool
84 _flag_type: FlagType
86 def __init__(self) -> None:
87 msg = (
88 "Use RestrictionCalculator.temp or RestrictionCalculator.admin to create an instance."
89 )
90 raise NotImplementedError(msg)
92 @classmethod
93 def temp(cls, admin_ack: bool, in_danger: bool, flag_type: FlagType) -> Self:
94 instance = cls.__new__(cls)
95 instance._service_ack = admin_ack
96 instance._in_danger = in_danger
97 instance._flag_type = flag_type
98 return instance
100 @classmethod
101 def admin(cls, user_in_danger: bool, flag_type: FlagType) -> Self:
102 instance = cls.__new__(cls)
103 instance._service_ack = True
104 instance._in_danger = user_in_danger
105 instance._flag_type = flag_type
106 return instance
108 @property
109 def admin_ack(self) -> bool:
110 return self._service_ack
112 @property
113 def in_danger(self) -> bool:
114 return self._in_danger
116 @property
117 def flag_type(self) -> FlagType:
118 return self._flag_type
120 def calculate_acknowledge_restriction(self) -> RestrictionResult | None:
121 in_danger = self.in_danger
122 flag_type = self.flag_type
123 if in_danger:
124 return RestrictionResult(_A.FULL, _D.PERMANENT)
125 if flag_type.is_severe:
126 return RestrictionResult(_A.PLEDGE, _D.ONE_MONTH)
127 if flag_type.is_major:
128 return RestrictionResult(_A.DARE, _D.ONE_WEEK)
129 if flag_type.is_moderate:
130 return RestrictionResult(_A.CHAT, _D.THREE_DAYS)
131 return None
133 def calculate_temp_restriction(self) -> RestrictionResult | None:
134 in_danger = self.in_danger
135 admin_ack = self.admin_ack
136 flag_type = self.flag_type
137 """Get the default restriction and duration based on flag type and admin verification."""
138 if in_danger:
139 if admin_ack:
140 return RestrictionResult(_A.FULL, _D.PERMANENT)
141 return RestrictionResult(_A.FULL, _D.THREE_DAYS)
143 if flag_type.is_severe:
144 if admin_ack:
145 return RestrictionResult(_A.FULL, _D.PERMANENT)
146 return RestrictionResult(_A.PLEDGE, _D.THREE_DAYS)
147 if flag_type.is_major:
148 if admin_ack:
149 return RestrictionResult(_A.PLEDGE, _D.ONE_WEEK)
150 return RestrictionResult(_A.DARE, _D.TWO_DAYS)
151 if flag_type.is_moderate:
152 if admin_ack:
153 return RestrictionResult(_A.CHAT, _D.THREE_DAYS)
154 return RestrictionResult(_A.CHAT, _D.ONE_DAY)
155 if admin_ack:
156 return RestrictionResult(_A.CHAT, _D.TWO_DAYS)
158 return None
160 @staticmethod
161 def should_apply(existing: RestrictionWrapper, requested: RestrictionWrapper) -> bool:
162 existing_flag = existing.flag_type
163 required_flag = requested.flag_type
164 existing_type = existing.category
165 requested_type = requested.category
167 now = TimeUtil.get_current_utc_dt()
169 existing_expires: datetime
170 requested_expires: datetime
172 if existing.stopwatch is not None:
173 existing_expires = existing.stopwatch.expires_at_dt
174 else:
175 existing_expires = TimeUtil.get_utc_time_future_days(now, existing.duration.days)
177 if requested.stopwatch is not None:
178 requested_expires = requested.stopwatch.expires_at_dt
179 else:
180 requested_expires = TimeUtil.get_utc_time_future_days(now, requested.duration.days)
182 if existing_type == requested_type and existing_expires >= requested_expires:
183 LOG().debug(
184 f"Restriction already exists and applied, {requested_type}, no need to reapply.",
185 )
186 return False
188 # Conditions to apply a new restriction:
189 # 1. New restriction is more severe than existing.
190 # 2. New restriction expires later than existing.
191 if required_flag.severity > existing_flag.severity:
192 LOG().debug(
193 f"Existing restriction '{existing_flag.label}' less severe applying "
194 f"more severe restriction {required_flag.label}.",
195 )
196 return True
197 if requested_type.severity > existing_type.severity:
198 LOG().debug(
199 f"Existing restriction {existing_type.label} "
200 f"less severe applying more severe restriction {requested_type.label}.",
201 )
202 return True
203 if requested_expires > existing_expires:
204 LOG().debug(
205 f"Existing restriction {existing_type.label} "
206 f"expiring at {existing_expires}, "
207 f"applying new restriction {requested_type.label} "
208 f"with later expiry {requested_expires}.",
209 )
210 return True
212 return False