Coverage for functions \ flipdare \ service \ safety \ restriction_service.py: 34%
291 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-05-08 12:22 +1000
« prev ^ index » next coverage.py v7.13.0, created at 2026-05-08 12:22 +1000
1#!/usr/bin/env python
2# Copyright (c) 2026 Flipdare Pty Ltd. All rights reserved.
3#
4# This file is part of Flipdare's proprietary software and contains
5# confidential and copyrighted material. Unauthorised copying,
6# modification, distribution, or use of this file is strictly
7# prohibited without prior written permission from Flipdare Pty Ltd.
8#
9# This software includes third-party components licensed under MIT,
10# BSD, and Apache 2.0 licences. See THIRD_PARTY_NOTICES for details.
11#
13from __future__ import annotations
14from dataclasses import dataclass, field
15from typing import TYPE_CHECKING, Self
17from flipdare.app_types import CronResult
18from flipdare.core.cron_decorator import cron_decorator
19from flipdare.app_log import LOG
20from flipdare.constants import IS_DEBUG
21from flipdare.job_types import CronWithResultType
22from flipdare.result.app_result import AppResult
23from flipdare.result.job_result import JobResult
24from flipdare.firestore import FlagContextFactory
25from flipdare.generated import (
26 AppErrorCode,
27 RestrictionCategory,
28 RestrictionModel,
29 RestrictionStatus,
30 StopwatchModel,
31)
32from flipdare.generated.shared.backend.app_job_type import AppJobType
33from flipdare.message.user_message import UserMessage
34from flipdare.service.safety.core.restriction_calculator import RestrictionCalculator
35from flipdare.service._service_provider import ServiceProvider
36from flipdare.service.core.cron_processor import CronConfig, CronProcessor
37from flipdare.wrapper import (
38 FlagWrapper,
39 PersistedGuard,
40 RestrictionWrapper,
41 UserWrapper,
42)
44if TYPE_CHECKING:
45 from flipdare.manager.service_manager import ServiceManager
46 from flipdare.manager.db_manager import DbManager
47 from flipdare.manager.backend_manager import BackendManager
49__all__ = ["RestrictionService"]
52@dataclass
53class _ValidatedRestriction:
54 user: UserWrapper | None
55 applied_restriction: RestrictionWrapper | None = None
56 error: AppErrorCode | None = field(default=None)
58 @classmethod
59 def from_error(cls, error: AppErrorCode) -> Self:
60 return cls(user=None, error=error)
62 @property
63 def is_error(self) -> bool:
64 return self.error is not None
67class RestrictionService(ServiceProvider):
69 def __init__(
70 self,
71 backend_manager: BackendManager,
72 db_manager: DbManager,
73 service_manager: ServiceManager | None = None,
74 ) -> None:
75 super().__init__(
76 backend_manager=backend_manager,
77 db_manager=db_manager,
78 service_manager=service_manager,
79 )
81 @cron_decorator(job_type=AppJobType.CR_RESTRICT_INACTIVE)
82 def cron_inactive_restrictions(self) -> CronResult:
83 """Process unprocessed friends from last week."""
84 from flipdare.services import get_report_handler
86 job_type: CronWithResultType = AppJobType.CR_RESTRICT_INACTIVE
87 config = CronConfig(
88 job_type=job_type,
89 job_name="cron_restrict_inactive",
90 query_fn=lambda: self.restriction_db.get_inactive_actions(),
91 process_fn=lambda r: self._process_inactive_cron(r),
92 report_fn=lambda _, processed: get_report_handler().run_cron_with_result(
93 job_type=job_type,
94 processed=processed,
95 ),
96 )
97 return CronProcessor(config).process_result()
99 @cron_decorator(job_type=AppJobType.CR_RESTRICT_EXPIRED)
100 def cron_expired_restrictions(self) -> CronResult:
101 """Process unprocessed friends from last week."""
102 config = CronConfig(
103 job_type=AppJobType.CR_RESTRICT_EXPIRED,
104 job_name="cron_restrict_expired",
105 query_fn=lambda: self.restriction_db.get_expired_active_actions(),
106 process_fn=lambda r: self._process_expired_cron(r),
107 )
108 return CronProcessor(config).process_result()
110 def _process_expired_cron(
111 self, restriction: RestrictionWrapper
112 ) -> JobResult[RestrictionWrapper]:
113 restrict_id = restriction.doc_id
114 debug_label = f"RestrictionCron/Expired/{restrict_id}"
115 main_result = AppResult[RestrictionWrapper](task_name=debug_label, doc_id=restrict_id)
117 controller = self.restriction_service
118 validation_result = controller.validate_restriction_and_user(restriction)
119 if validation_result.is_error:
120 msg = f"{debug_label}: User {restriction.uid} not found for restriction {restriction.debug_str}"
121 LOG().error(msg)
122 main_result.add_error(AppErrorCode.USER_MISSING, msg)
123 return JobResult.from_result(main_result)
125 # Check if user has matching restriction to remove
126 user = validation_result.user
127 assert user is not None # narrowing, since is_error is False
129 user_id = user.doc_id
130 applied = validation_result.applied_restriction
132 if applied is None:
133 msg = f"{debug_label}: User {user_id} has no restriction to remove. Skipping."
134 LOG().warning(msg)
136 main_result.add_warning(msg)
137 return JobResult.skip_doc(restrict_id, message=msg)
139 if applied.category != restriction.category:
140 msg = (
141 f"{debug_label}: User {user_id} restriction type mismatch. "
142 f"Expected {restriction.category}, found {applied.category}. Skipping."
143 )
144 LOG().warning(msg)
146 main_result.add_warning(msg)
147 return JobResult.skip_doc(restrict_id, message=msg)
149 # Remove the restriction
150 try:
151 controller.remove_restriction(user, flag_id=restriction.flag_id)
152 msg = f"{debug_label}: Removed restriction from user {user_id}"
153 LOG().warning(msg)
154 return JobResult.ok(doc_id=restrict_id, message=msg)
155 except Exception as e:
156 cause = f"{debug_label}: Failed to remove restriction for user {user_id}: {e}"
157 main_result.add_error(AppErrorCode.RESTRICTION_REMOVAL, cause)
158 return JobResult.from_result(main_result)
160 def _process_inactive_cron(
161 self, restriction: RestrictionWrapper
162 ) -> JobResult[RestrictionWrapper]:
163 """Process all inactive restrictions and apply restrictions to users."""
164 restrict_id = restriction.doc_id
165 debug_label = f"RestrictionCron/Inactive/{restrict_id}"
167 main_result = AppResult[RestrictionWrapper](task_name=debug_label, doc_id=restrict_id)
169 # Skip restrictions that already have a stopwatch
170 if restriction.stopwatch is not None:
171 msg = f"{debug_label}: Has stopwatch, skipping."
172 if IS_DEBUG:
173 LOG().debug(msg)
175 main_result.add_warning(msg)
176 return JobResult.skip_doc(restrict_id, message=msg)
178 # Validate restriction and get user
179 restrict_id = restriction.doc_id
180 controller = self.restriction_service
182 validation_result = controller.validate_restriction_and_user(restriction)
184 if validation_result.is_error:
185 msg = f"{debug_label}: User {restriction.uid} not found for restriction {restriction.debug_str}"
186 LOG().error(msg)
188 main_result.add_error(AppErrorCode.USER_MISSING, msg)
189 return JobResult.from_result(main_result)
191 user = validation_result.user
192 assert user is not None # narrowing, since is_error is False
193 user_id = user.doc_id
195 LOG().info(f"{debug_label}: Processing inactive restriction for user {user_id}")
196 process_result = controller.process_inactive_restriction(
197 restriction,
198 user,
199 restrict_id,
200 )
202 if process_result.is_error:
203 main_result.merge(process_result)
204 return JobResult.from_result(main_result)
206 msg = f"{debug_label}: Processed restriction for user {user_id}"
207 return JobResult.ok(doc_id=restrict_id, message=msg)
209 def apply_temporary_restriction(self, flag: FlagWrapper) -> AppResult[RestrictionModel]:
210 """
211 Add a restriction to the user based on the flag action.
212 Algorithm:
213 - if is_severe:
214 - if not is_service_verified: block pledge and below for 72 hours/ 3 days so we can investigate
215 - if is_service_verified: block full access until manually removed
216 - if is_major:
217 - if not is_service_verified: block dare and below for 48 / 2 days hours so we can investigate
218 - if is_service_verified: block pledge and below for 7 days so we can monitor/further investigate
219 - if is_moderate:
220 - if not is_service_verified: block chat and below for 24 hours 1 day so we can investigate
221 - if is_service_verified: block dare and below for 72 hours
222 - if is_minor:
223 - if not is_service_verified: block profile and below for 24 hours/ 1 day so we can investigate
224 - if is_service_verified: block chat and below for 48 hours / 2 days
225 """
226 result = AppResult[RestrictionModel](task_name=f"for flag {flag.doc_id}")
228 flag_context = FlagContextFactory(db_manager=self.db_manager).create(flag)
229 if flag_context is None or not flag_context.validate():
230 result.add_error(
231 AppErrorCode.INVALID_DATA,
232 f"Invalid FlagContext for flag model: {flag.doc_id}",
233 extra={
234 "context_errors": (
235 flag_context._error_messages if flag_context else "No context created"
236 ),
237 "flag_model": flag.to_json_dict(),
238 },
239 )
240 return result
242 flag_id = flag.doc_id
243 to_user = flag_context.to_user
244 to_user_id = to_user.doc_id
245 flag_type = flag.flag_type
246 is_admin_ack = flag.is_admin_ack
247 user_in_danger = flag.user_in_danger
249 act = None
250 duration = None
251 calc_result = RestrictionCalculator.temp(
252 is_admin_ack,
253 user_in_danger,
254 flag_type,
255 ).calculate_temp_restriction()
257 if calc_result is None:
258 LOG().info(
259 f"No restriction to apply for user {to_user_id} "
260 f"based on flag {flag_id} of type {flag_type.value}",
261 )
262 return result
264 if IS_DEBUG:
265 msg = (
266 f"Calculated temporary restriction for user {to_user_id} based on flag {flag_id}: "
267 f"flag_type={flag_type.value}, is_admin_ack={is_admin_ack}, user_in_danger={user_in_danger}, "
268 f"result_action={calc_result.action.value}, result_duration={calc_result.duration.value}"
269 )
270 LOG().debug(msg)
272 act = calc_result.action
273 duration = calc_result.duration
274 stopwatch = StopwatchModel.from_now(duration)
275 restriction = RestrictionModel(
276 id=None,
277 slug_code="temp-restriction",
278 flag_id=flag_id,
279 flag_progress=flag.progress,
280 flag_type=flag_type,
281 uid=to_user_id,
282 in_danger=user_in_danger,
283 category=RestrictionCategory.ADMIN,
284 duration=duration,
285 action=act,
286 stopwatch=stopwatch,
287 reason=UserMessage.AUTOMATED_FLAG_ACTION_REASON,
288 status=RestrictionStatus.APPLIED,
289 )
291 if IS_DEBUG:
292 LOG().debug(
293 f"Calculated temporary restriction for user {to_user_id} based on flag {flag_id}: "
294 f"action={act.value}, duration={duration.value}, stopwatch={stopwatch.debug_str()}",
295 )
297 return self.apply_restriction(user=to_user, restriction=restriction)
299 def apply_restriction(
300 self,
301 user: UserWrapper,
302 restriction: RestrictionWrapper | RestrictionModel,
303 ) -> AppResult[RestrictionModel]:
304 """Apply the restriction to the user model and update in database."""
305 # When applying restrictions an email should already have been sent from flag_service.
306 # An email is only required when removing restrictions.
307 result: AppResult[RestrictionModel] = AppResult(
308 task_name=f"for flag {restriction.flag_id}",
309 )
311 # Validate inputs
312 if restriction.stopwatch is None:
313 msg = f"Restriction for flag {restriction.flag_id} has no stopwatch, cannot apply restriction."
314 LOG().info(msg)
315 result.add_error(AppErrorCode.INVALID_DATA, msg)
316 return result
318 # Save or update restriction in database
319 updated_restricted: RestrictionWrapper | None = None
321 try:
322 if PersistedGuard.is_restriction(restriction):
323 msg = (
324 f"Restriction {restriction.doc_id} is already persisted. Updating restriction."
325 )
326 LOG().info(msg)
327 updated = self.restriction_bridge.update(restriction)
328 if updated.is_error:
329 msg = f"Failed to update restriction {restriction.doc_id}: {updated.errors}"
330 LOG().error(msg)
331 result.merge(updated)
332 return result
333 assert updated.generated is not None # type narrowing
334 updated_restricted = updated.generated
335 else:
336 assert isinstance(restriction, RestrictionModel) # type narrowing
337 created = self.restriction_bridge.create(restriction)
338 if created.is_error:
339 msg = f"Failed to create restriction: {created.errors}"
340 LOG().error(msg)
341 result.merge(created)
342 return result
343 assert created.generated is not None # type narrowing
344 updated_restricted = created.generated
346 except Exception as e:
347 result.add_error(AppErrorCode.DATABASE_EX, f"Failed to save restriction: {e}")
348 return result
350 # Update the user model
351 # NOTE: email notification is handled in flag_service..
352 try:
353 user.restriction_id = updated_restricted.doc_id
354 user_update_result = self.user_bridge.update(user)
355 if user_update_result.is_error:
356 result.merge(user_update_result)
358 return result
359 except Exception as e:
360 result.add_error(AppErrorCode.DATABASE_EX, f"Failed to apply restriction to user: {e}")
361 return result
363 def remove_restriction(self, user: UserWrapper, flag_id: str) -> AppResult[RestrictionModel]:
364 """Remove restriction from user based on flag ID."""
365 result = AppResult[RestrictionModel](task_name=f"for flag {flag_id}")
366 applied_id = user.restriction_id
367 if applied_id is None:
368 LOG().warning(f"User {user.doc_id} has no restriction to remove for flag {flag_id}")
369 return AppResult[RestrictionModel].skip(
370 "remove_restriction_no_op",
371 f"User {user.doc_id} has no restriction to remove.",
372 )
374 # Get flag and validate
375 flag_result = self.flag_bridge.get(flag_id)
376 if flag_result.is_error:
377 LOG().error(
378 f"Failed to get flag {flag_id} for removing restriction: {flag_result.errors}",
379 )
380 result.merge(flag_result)
381 return result
383 flag = flag_result.generated
384 if flag is None or flag.restriction_id is None:
385 msg = f"Flag {flag_id} has no restriction (restriction_id is None), cannot remove restriction."
386 LOG().error(msg)
387 result.add_error(AppErrorCode.INVALID_DATA, msg)
388 return result
390 # Get restriction
391 restriction_id = flag.restriction_id
392 restrict_result = self.restriction_bridge.get(restriction_id)
393 if restrict_result.is_error:
394 msg = f"Failed to get restriction {restriction_id} for removing restriction: {restrict_result.errors}"
395 LOG().error(msg)
396 result.merge(restrict_result)
397 return result
399 restriction = restrict_result.generated
400 assert restriction is not None # type narrowing
402 # NOTE: email notification is handled in flag_service..
403 # Remove restriction from user
404 try:
405 LOG().info(f"Removing restriction {restriction_id} from user {user.doc_id}")
406 user.restriction_id = None
407 update_result = self.user_bridge.update(user)
408 if update_result.is_error:
409 result.merge(update_result)
411 except Exception as e:
412 LOG().error(f"Exception removing restriction from user {user.doc_id}: {e}")
413 msg = f"Exception removing restriction from user: {e}"
414 result.add_error(AppErrorCode.DATABASE_EX, msg)
416 return result
418 def process_inactive_restriction(
419 self,
420 requested: RestrictionWrapper,
421 user: UserWrapper,
422 restrict_id: str,
423 applied_restriction: RestrictionWrapper | None = None,
424 ) -> AppResult[RestrictionWrapper]:
425 """Process a single inactive restriction for a user."""
426 result = AppResult[RestrictionWrapper](
427 doc_id=restrict_id,
428 task_name=f"for user {user.doc_id}/{restrict_id}",
429 )
431 if applied_restriction is not None:
432 applied_id = user.doc_id
433 assert applied_id # type narrowing
435 # Handle duplicate restriction
436 if requested.equivalent(applied_restriction):
437 LOG().info(f"User {user.doc_id} has equivalent restriction. Marking as DUPLICATE.")
438 requested.status = RestrictionStatus.DUPLICATE
439 return self.restriction_bridge.update(requested)
441 # Determine if should apply requested restriction
442 should_apply = RestrictionCalculator.should_apply(
443 existing=applied_restriction,
444 requested=requested,
445 )
447 if not should_apply:
448 # Keep existing, mark requested as less severe
449 update_result = self._apply_restriction_to_db(user, applied_restriction)
450 if not update_result.is_error:
451 LOG().info(
452 f"Keeping existing restriction. Setting {restrict_id} to IGNORED_LESS_SEVERE.",
453 )
454 status_result = self._set_restriction_status(
455 requested,
456 RestrictionStatus.IGNORED_LESS_SEVERE,
457 )
458 result.merge(status_result)
459 else:
460 result.merge(update_result)
461 return result
463 # Apply the new restriction
464 LOG().info(f"Applying restriction {requested.category.value} for user {user.doc_id}")
465 apply_result = self.apply_restriction(user, requested)
467 if not apply_result.is_error:
468 requested.status = RestrictionStatus.APPLIED
469 update_result = self.restriction_bridge.update(requested)
470 result.merge(update_result)
471 else:
472 result.merge(apply_result)
474 return result
476 def _set_restriction_status(
477 self,
478 restriction: RestrictionWrapper,
479 status: RestrictionStatus,
480 ) -> AppResult[RestrictionWrapper]:
481 result: AppResult[RestrictionWrapper] = AppResult(
482 doc_id=restriction.doc_id,
483 task_name=f"for {restriction.doc_id}/status={status.value}",
484 )
485 try:
486 restriction.status = status
487 update_result = self.restriction_bridge.update(restriction)
488 if update_result.is_error:
489 result.merge(update_result)
490 elif update_result.generated is None:
491 result.add_error(
492 AppErrorCode.DATABASE_EX,
493 f"Updated restriction {restriction.doc_id} has no generated model.",
494 )
495 else:
496 result.generated = update_result.generated
497 except Exception as e:
498 result.add_error(
499 AppErrorCode.UPDATE_FAILED,
500 f"Failed to set restriction status to {status.value} for {restriction.doc_id}: {e}",
501 )
502 return result
504 def _apply_restriction_to_db(
505 self,
506 user: UserWrapper,
507 restriction: RestrictionWrapper,
508 ) -> AppResult[RestrictionWrapper]:
509 stopwatch = restriction.stopwatch
510 restrict_id = restriction.doc_id
511 assert restrict_id is not None # type narrowing
513 restriction_result: AppResult[RestrictionWrapper] = AppResult(
514 doc_id=restrict_id,
515 task_name=f"for user {user.doc_id}/{restrict_id}",
516 )
517 uid = user.doc_id
518 assert uid is not None # type narrowing
520 if stopwatch is not None:
521 LOG().info(f"User {uid} has existing applied restriction with stopwatch {stopwatch}")
522 else:
523 # this shouldn't really happen..
524 LOG().info(
525 f"User {uid} has existing restriction "
526 f"{stopwatch} of type {restriction.category.value} "
527 f"for flag action {restrict_id} with no stopwatch. "
528 "Applying stopwatch from requested restriction.",
529 )
530 restriction.start_stopwatch()
532 LOG().info(
533 f"User {uid} has existing applied restriction "
534 f"{restriction.doc_id} of type {restriction.category.value} "
535 f"for flag action {restrict_id}.",
536 )
538 restrict_update = self.restriction_bridge.update(restriction)
539 if restrict_update.is_error:
540 restriction_result.merge(restrict_update)
541 return restriction_result
543 new_restriction = restrict_update.generated
544 assert new_restriction # type narrowing
546 user.restriction_id = new_restriction.doc_id
547 user_update = self.user_bridge.update(user)
548 if user_update.is_error:
549 restriction_result.merge(user_update)
551 return restriction_result
553 def validate_restriction_and_user(
554 self,
555 restriction: RestrictionWrapper,
556 ) -> _ValidatedRestriction:
557 """
558 Validate restriction has required fields and user exists.
559 Returns (restrict_id, user, error_type).
561 """
562 uid = restriction.uid
563 user_result = self.user_bridge.get(uid)
564 if user_result.is_error:
565 return _ValidatedRestriction.from_error(AppErrorCode.USER_MISSING)
567 user = user_result.generated
568 assert user is not None # narrowing, since is_error is False
570 restriction_id = user.restriction_id
571 if restriction_id is None:
572 return _ValidatedRestriction(user=user)
574 # get the applied restriction to compare against
575 applied_result = self.restriction_bridge.get(restriction_id)
576 if applied_result.is_error:
577 msg = f"Failed to get applied restriction {restriction_id} for user {uid}: {applied_result.errors}"
578 LOG().error(msg)
579 return _ValidatedRestriction.from_error(AppErrorCode.RESTRICTION_MISSING)
581 applied_restriction = applied_result.generated
582 assert applied_restriction is not None # narrowing
583 return _ValidatedRestriction(user=user, applied_restriction=applied_restriction)