Coverage for functions \ flipdare \ service \ safety \ core \ restriction_calculator.py: 90%

126 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-05-08 12:22 +1000

1#!/usr/bin/env python 

2# Copyright (c) 2026 Flipdare Pty Ltd. All rights reserved. 

3# 

4# This file is part of Flipdare's proprietary software and contains 

5# confidential and copyrighted material. Unauthorised copying, 

6# modification, distribution, or use of this file is strictly 

7# prohibited without prior written permission from Flipdare Pty Ltd. 

8# 

9# This software includes third-party components licensed under MIT, 

10# BSD, and Apache 2.0 licences. See THIRD_PARTY_NOTICES for details. 

11# 

12 

13 

14from datetime import datetime 

15from typing import Self 

16 

17from flipdare.app_log import LOG 

18from flipdare.constants import CHAT_REJECT_REPUTATION_THRESHOLD, CHAT_REVIEW_REPUTATION_THRESHOLD 

19from flipdare.generated import ( 

20 FlagType, 

21 ModerationDecision, 

22 RestrictionAction, 

23) 

24from flipdare.generated.shared.model.core.stopwatch_duration import StopwatchDuration 

25from flipdare.service.safety.safety_types import RestrictionResult 

26from flipdare.util.time_util import TimeUtil 

27from flipdare.wrapper import RestrictionWrapper 

28 

29__all__ = ["RestrictionCalculator"] 

30 

31_D = StopwatchDuration 

32_A = RestrictionAction 

33 

34 

35class ChatRestrictionCalculator: 

36 """ 

37 Chat restrictions are more flexible that dare restrictions, 

38 as we want to be able to restrict users from chatting without 

39 restricting them from creating dares or pledges. 

40 The logic for chat restrictions is as follows: 

41 - Approved - No restriction 

42 - Rejected - Restrict (since a the support team reviewed manually) 

43 - Auto-reject due to reputation - Restrict if reputation is below threshold 

44 - Review required - Restrict if reputation is below threshold 

45 """ 

46 

47 def __init__(self, decision: ModerationDecision, reputation: int) -> None: 

48 self._decision = decision 

49 self._reputation = reputation 

50 

51 @property 

52 def decision(self) -> ModerationDecision: 

53 return self._decision 

54 

55 @property 

56 def reputation(self) -> int: 

57 return self._reputation 

58 

59 def should_block(self) -> bool: 

60 decision = self.decision 

61 reputation = self.reputation 

62 

63 match decision: 

64 case ( 

65 ModerationDecision.APPROVED 

66 | ModerationDecision.AUTO_APPROVE_REPUTATION 

67 | ModerationDecision.AUTO_APPROVE_SENTIMENT 

68 ): 

69 return False 

70 case ModerationDecision.REJECTED: 

71 return True 

72 case ( 

73 ModerationDecision.AUTO_REJECT_REPUTATION 

74 | ModerationDecision.AUTO_REJECT_SENTIMENT 

75 ): 

76 return reputation < CHAT_REJECT_REPUTATION_THRESHOLD 

77 case ModerationDecision.REVIEW_REQUIRED: 

78 return reputation < CHAT_REVIEW_REPUTATION_THRESHOLD 

79 

80 

81class RestrictionCalculator: 

82 _service_ack: bool 

83 _in_danger: bool 

84 _flag_type: FlagType 

85 

86 def __init__(self) -> None: 

87 msg = ( 

88 "Use RestrictionCalculator.temp or RestrictionCalculator.admin to create an instance." 

89 ) 

90 raise NotImplementedError(msg) 

91 

92 @classmethod 

93 def temp(cls, admin_ack: bool, in_danger: bool, flag_type: FlagType) -> Self: 

94 instance = cls.__new__(cls) 

95 instance._service_ack = admin_ack 

96 instance._in_danger = in_danger 

97 instance._flag_type = flag_type 

98 return instance 

99 

100 @classmethod 

101 def admin(cls, user_in_danger: bool, flag_type: FlagType) -> Self: 

102 instance = cls.__new__(cls) 

103 instance._service_ack = True 

104 instance._in_danger = user_in_danger 

105 instance._flag_type = flag_type 

106 return instance 

107 

108 @property 

109 def admin_ack(self) -> bool: 

110 return self._service_ack 

111 

112 @property 

113 def in_danger(self) -> bool: 

114 return self._in_danger 

115 

116 @property 

117 def flag_type(self) -> FlagType: 

118 return self._flag_type 

119 

120 def calculate_acknowledge_restriction(self) -> RestrictionResult | None: 

121 in_danger = self.in_danger 

122 flag_type = self.flag_type 

123 if in_danger: 

124 return RestrictionResult(_A.FULL, _D.PERMANENT) 

125 if flag_type.is_severe: 

126 return RestrictionResult(_A.PLEDGE, _D.ONE_MONTH) 

127 if flag_type.is_major: 

128 return RestrictionResult(_A.DARE, _D.ONE_WEEK) 

129 if flag_type.is_moderate: 

130 return RestrictionResult(_A.CHAT, _D.THREE_DAYS) 

131 return None 

132 

133 def calculate_temp_restriction(self) -> RestrictionResult | None: 

134 in_danger = self.in_danger 

135 admin_ack = self.admin_ack 

136 flag_type = self.flag_type 

137 """Get the default restriction and duration based on flag type and admin verification.""" 

138 if in_danger: 

139 if admin_ack: 

140 return RestrictionResult(_A.FULL, _D.PERMANENT) 

141 return RestrictionResult(_A.FULL, _D.THREE_DAYS) 

142 

143 if flag_type.is_severe: 

144 if admin_ack: 

145 return RestrictionResult(_A.FULL, _D.PERMANENT) 

146 return RestrictionResult(_A.PLEDGE, _D.THREE_DAYS) 

147 if flag_type.is_major: 

148 if admin_ack: 

149 return RestrictionResult(_A.PLEDGE, _D.ONE_WEEK) 

150 return RestrictionResult(_A.DARE, _D.TWO_DAYS) 

151 if flag_type.is_moderate: 

152 if admin_ack: 

153 return RestrictionResult(_A.CHAT, _D.THREE_DAYS) 

154 return RestrictionResult(_A.CHAT, _D.ONE_DAY) 

155 if admin_ack: 

156 return RestrictionResult(_A.CHAT, _D.TWO_DAYS) 

157 

158 return None 

159 

160 @staticmethod 

161 def should_apply(existing: RestrictionWrapper, requested: RestrictionWrapper) -> bool: 

162 existing_flag = existing.flag_type 

163 required_flag = requested.flag_type 

164 existing_type = existing.category 

165 requested_type = requested.category 

166 

167 now = TimeUtil.get_current_utc_dt() 

168 

169 existing_expires: datetime 

170 requested_expires: datetime 

171 

172 if existing.stopwatch is not None: 

173 existing_expires = existing.stopwatch.expires_at_dt 

174 else: 

175 existing_expires = TimeUtil.get_utc_time_future_days(now, existing.duration.days) 

176 

177 if requested.stopwatch is not None: 

178 requested_expires = requested.stopwatch.expires_at_dt 

179 else: 

180 requested_expires = TimeUtil.get_utc_time_future_days(now, requested.duration.days) 

181 

182 if existing_type == requested_type and existing_expires >= requested_expires: 

183 LOG().debug( 

184 f"Restriction already exists and applied, {requested_type}, no need to reapply.", 

185 ) 

186 return False 

187 

188 # Conditions to apply a new restriction: 

189 # 1. New restriction is more severe than existing. 

190 # 2. New restriction expires later than existing. 

191 if required_flag.severity > existing_flag.severity: 

192 LOG().debug( 

193 f"Existing restriction '{existing_flag.label}' less severe applying " 

194 f"more severe restriction {required_flag.label}.", 

195 ) 

196 return True 

197 if requested_type.severity > existing_type.severity: 

198 LOG().debug( 

199 f"Existing restriction {existing_type.label} " 

200 f"less severe applying more severe restriction {requested_type.label}.", 

201 ) 

202 return True 

203 if requested_expires > existing_expires: 

204 LOG().debug( 

205 f"Existing restriction {existing_type.label} " 

206 f"expiring at {existing_expires}, " 

207 f"applying new restriction {requested_type.label} " 

208 f"with later expiry {requested_expires}.", 

209 ) 

210 return True 

211 

212 return False