Coverage for functions \ flipdare \ service \ safety \ restriction_service.py: 34%

291 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-05-08 12:22 +1000

1#!/usr/bin/env python 

2# Copyright (c) 2026 Flipdare Pty Ltd. All rights reserved. 

3# 

4# This file is part of Flipdare's proprietary software and contains 

5# confidential and copyrighted material. Unauthorised copying, 

6# modification, distribution, or use of this file is strictly 

7# prohibited without prior written permission from Flipdare Pty Ltd. 

8# 

9# This software includes third-party components licensed under MIT, 

10# BSD, and Apache 2.0 licences. See THIRD_PARTY_NOTICES for details. 

11# 

12 

13from __future__ import annotations 

14from dataclasses import dataclass, field 

15from typing import TYPE_CHECKING, Self 

16 

17from flipdare.app_types import CronResult 

18from flipdare.core.cron_decorator import cron_decorator 

19from flipdare.app_log import LOG 

20from flipdare.constants import IS_DEBUG 

21from flipdare.job_types import CronWithResultType 

22from flipdare.result.app_result import AppResult 

23from flipdare.result.job_result import JobResult 

24from flipdare.firestore import FlagContextFactory 

25from flipdare.generated import ( 

26 AppErrorCode, 

27 RestrictionCategory, 

28 RestrictionModel, 

29 RestrictionStatus, 

30 StopwatchModel, 

31) 

32from flipdare.generated.shared.backend.app_job_type import AppJobType 

33from flipdare.message.user_message import UserMessage 

34from flipdare.service.safety.core.restriction_calculator import RestrictionCalculator 

35from flipdare.service._service_provider import ServiceProvider 

36from flipdare.service.core.cron_processor import CronConfig, CronProcessor 

37from flipdare.wrapper import ( 

38 FlagWrapper, 

39 PersistedGuard, 

40 RestrictionWrapper, 

41 UserWrapper, 

42) 

43 

44if TYPE_CHECKING: 

45 from flipdare.manager.service_manager import ServiceManager 

46 from flipdare.manager.db_manager import DbManager 

47 from flipdare.manager.backend_manager import BackendManager 

48 

49__all__ = ["RestrictionService"] 

50 

51 

52@dataclass 

53class _ValidatedRestriction: 

54 user: UserWrapper | None 

55 applied_restriction: RestrictionWrapper | None = None 

56 error: AppErrorCode | None = field(default=None) 

57 

58 @classmethod 

59 def from_error(cls, error: AppErrorCode) -> Self: 

60 return cls(user=None, error=error) 

61 

62 @property 

63 def is_error(self) -> bool: 

64 return self.error is not None 

65 

66 

67class RestrictionService(ServiceProvider): 

68 

69 def __init__( 

70 self, 

71 backend_manager: BackendManager, 

72 db_manager: DbManager, 

73 service_manager: ServiceManager | None = None, 

74 ) -> None: 

75 super().__init__( 

76 backend_manager=backend_manager, 

77 db_manager=db_manager, 

78 service_manager=service_manager, 

79 ) 

80 

81 @cron_decorator(job_type=AppJobType.CR_RESTRICT_INACTIVE) 

82 def cron_inactive_restrictions(self) -> CronResult: 

83 """Process unprocessed friends from last week.""" 

84 from flipdare.services import get_report_handler 

85 

86 job_type: CronWithResultType = AppJobType.CR_RESTRICT_INACTIVE 

87 config = CronConfig( 

88 job_type=job_type, 

89 job_name="cron_restrict_inactive", 

90 query_fn=lambda: self.restriction_db.get_inactive_actions(), 

91 process_fn=lambda r: self._process_inactive_cron(r), 

92 report_fn=lambda _, processed: get_report_handler().run_cron_with_result( 

93 job_type=job_type, 

94 processed=processed, 

95 ), 

96 ) 

97 return CronProcessor(config).process_result() 

98 

99 @cron_decorator(job_type=AppJobType.CR_RESTRICT_EXPIRED) 

100 def cron_expired_restrictions(self) -> CronResult: 

101 """Process unprocessed friends from last week.""" 

102 config = CronConfig( 

103 job_type=AppJobType.CR_RESTRICT_EXPIRED, 

104 job_name="cron_restrict_expired", 

105 query_fn=lambda: self.restriction_db.get_expired_active_actions(), 

106 process_fn=lambda r: self._process_expired_cron(r), 

107 ) 

108 return CronProcessor(config).process_result() 

109 

110 def _process_expired_cron( 

111 self, restriction: RestrictionWrapper 

112 ) -> JobResult[RestrictionWrapper]: 

113 restrict_id = restriction.doc_id 

114 debug_label = f"RestrictionCron/Expired/{restrict_id}" 

115 main_result = AppResult[RestrictionWrapper](task_name=debug_label, doc_id=restrict_id) 

116 

117 controller = self.restriction_service 

118 validation_result = controller.validate_restriction_and_user(restriction) 

119 if validation_result.is_error: 

120 msg = f"{debug_label}: User {restriction.uid} not found for restriction {restriction.debug_str}" 

121 LOG().error(msg) 

122 main_result.add_error(AppErrorCode.USER_MISSING, msg) 

123 return JobResult.from_result(main_result) 

124 

125 # Check if user has matching restriction to remove 

126 user = validation_result.user 

127 assert user is not None # narrowing, since is_error is False 

128 

129 user_id = user.doc_id 

130 applied = validation_result.applied_restriction 

131 

132 if applied is None: 

133 msg = f"{debug_label}: User {user_id} has no restriction to remove. Skipping." 

134 LOG().warning(msg) 

135 

136 main_result.add_warning(msg) 

137 return JobResult.skip_doc(restrict_id, message=msg) 

138 

139 if applied.category != restriction.category: 

140 msg = ( 

141 f"{debug_label}: User {user_id} restriction type mismatch. " 

142 f"Expected {restriction.category}, found {applied.category}. Skipping." 

143 ) 

144 LOG().warning(msg) 

145 

146 main_result.add_warning(msg) 

147 return JobResult.skip_doc(restrict_id, message=msg) 

148 

149 # Remove the restriction 

150 try: 

151 controller.remove_restriction(user, flag_id=restriction.flag_id) 

152 msg = f"{debug_label}: Removed restriction from user {user_id}" 

153 LOG().warning(msg) 

154 return JobResult.ok(doc_id=restrict_id, message=msg) 

155 except Exception as e: 

156 cause = f"{debug_label}: Failed to remove restriction for user {user_id}: {e}" 

157 main_result.add_error(AppErrorCode.RESTRICTION_REMOVAL, cause) 

158 return JobResult.from_result(main_result) 

159 

160 def _process_inactive_cron( 

161 self, restriction: RestrictionWrapper 

162 ) -> JobResult[RestrictionWrapper]: 

163 """Process all inactive restrictions and apply restrictions to users.""" 

164 restrict_id = restriction.doc_id 

165 debug_label = f"RestrictionCron/Inactive/{restrict_id}" 

166 

167 main_result = AppResult[RestrictionWrapper](task_name=debug_label, doc_id=restrict_id) 

168 

169 # Skip restrictions that already have a stopwatch 

170 if restriction.stopwatch is not None: 

171 msg = f"{debug_label}: Has stopwatch, skipping." 

172 if IS_DEBUG: 

173 LOG().debug(msg) 

174 

175 main_result.add_warning(msg) 

176 return JobResult.skip_doc(restrict_id, message=msg) 

177 

178 # Validate restriction and get user 

179 restrict_id = restriction.doc_id 

180 controller = self.restriction_service 

181 

182 validation_result = controller.validate_restriction_and_user(restriction) 

183 

184 if validation_result.is_error: 

185 msg = f"{debug_label}: User {restriction.uid} not found for restriction {restriction.debug_str}" 

186 LOG().error(msg) 

187 

188 main_result.add_error(AppErrorCode.USER_MISSING, msg) 

189 return JobResult.from_result(main_result) 

190 

191 user = validation_result.user 

192 assert user is not None # narrowing, since is_error is False 

193 user_id = user.doc_id 

194 

195 LOG().info(f"{debug_label}: Processing inactive restriction for user {user_id}") 

196 process_result = controller.process_inactive_restriction( 

197 restriction, 

198 user, 

199 restrict_id, 

200 ) 

201 

202 if process_result.is_error: 

203 main_result.merge(process_result) 

204 return JobResult.from_result(main_result) 

205 

206 msg = f"{debug_label}: Processed restriction for user {user_id}" 

207 return JobResult.ok(doc_id=restrict_id, message=msg) 

208 

209 def apply_temporary_restriction(self, flag: FlagWrapper) -> AppResult[RestrictionModel]: 

210 """ 

211 Add a restriction to the user based on the flag action. 

212 Algorithm: 

213 - if is_severe: 

214 - if not is_service_verified: block pledge and below for 72 hours/ 3 days so we can investigate 

215 - if is_service_verified: block full access until manually removed 

216 - if is_major: 

217 - if not is_service_verified: block dare and below for 48 / 2 days hours so we can investigate 

218 - if is_service_verified: block pledge and below for 7 days so we can monitor/further investigate 

219 - if is_moderate: 

220 - if not is_service_verified: block chat and below for 24 hours 1 day so we can investigate 

221 - if is_service_verified: block dare and below for 72 hours 

222 - if is_minor: 

223 - if not is_service_verified: block profile and below for 24 hours/ 1 day so we can investigate 

224 - if is_service_verified: block chat and below for 48 hours / 2 days 

225 """ 

226 result = AppResult[RestrictionModel](task_name=f"for flag {flag.doc_id}") 

227 

228 flag_context = FlagContextFactory(db_manager=self.db_manager).create(flag) 

229 if flag_context is None or not flag_context.validate(): 

230 result.add_error( 

231 AppErrorCode.INVALID_DATA, 

232 f"Invalid FlagContext for flag model: {flag.doc_id}", 

233 extra={ 

234 "context_errors": ( 

235 flag_context._error_messages if flag_context else "No context created" 

236 ), 

237 "flag_model": flag.to_json_dict(), 

238 }, 

239 ) 

240 return result 

241 

242 flag_id = flag.doc_id 

243 to_user = flag_context.to_user 

244 to_user_id = to_user.doc_id 

245 flag_type = flag.flag_type 

246 is_admin_ack = flag.is_admin_ack 

247 user_in_danger = flag.user_in_danger 

248 

249 act = None 

250 duration = None 

251 calc_result = RestrictionCalculator.temp( 

252 is_admin_ack, 

253 user_in_danger, 

254 flag_type, 

255 ).calculate_temp_restriction() 

256 

257 if calc_result is None: 

258 LOG().info( 

259 f"No restriction to apply for user {to_user_id} " 

260 f"based on flag {flag_id} of type {flag_type.value}", 

261 ) 

262 return result 

263 

264 if IS_DEBUG: 

265 msg = ( 

266 f"Calculated temporary restriction for user {to_user_id} based on flag {flag_id}: " 

267 f"flag_type={flag_type.value}, is_admin_ack={is_admin_ack}, user_in_danger={user_in_danger}, " 

268 f"result_action={calc_result.action.value}, result_duration={calc_result.duration.value}" 

269 ) 

270 LOG().debug(msg) 

271 

272 act = calc_result.action 

273 duration = calc_result.duration 

274 stopwatch = StopwatchModel.from_now(duration) 

275 restriction = RestrictionModel( 

276 id=None, 

277 slug_code="temp-restriction", 

278 flag_id=flag_id, 

279 flag_progress=flag.progress, 

280 flag_type=flag_type, 

281 uid=to_user_id, 

282 in_danger=user_in_danger, 

283 category=RestrictionCategory.ADMIN, 

284 duration=duration, 

285 action=act, 

286 stopwatch=stopwatch, 

287 reason=UserMessage.AUTOMATED_FLAG_ACTION_REASON, 

288 status=RestrictionStatus.APPLIED, 

289 ) 

290 

291 if IS_DEBUG: 

292 LOG().debug( 

293 f"Calculated temporary restriction for user {to_user_id} based on flag {flag_id}: " 

294 f"action={act.value}, duration={duration.value}, stopwatch={stopwatch.debug_str()}", 

295 ) 

296 

297 return self.apply_restriction(user=to_user, restriction=restriction) 

298 

299 def apply_restriction( 

300 self, 

301 user: UserWrapper, 

302 restriction: RestrictionWrapper | RestrictionModel, 

303 ) -> AppResult[RestrictionModel]: 

304 """Apply the restriction to the user model and update in database.""" 

305 # When applying restrictions an email should already have been sent from flag_service. 

306 # An email is only required when removing restrictions. 

307 result: AppResult[RestrictionModel] = AppResult( 

308 task_name=f"for flag {restriction.flag_id}", 

309 ) 

310 

311 # Validate inputs 

312 if restriction.stopwatch is None: 

313 msg = f"Restriction for flag {restriction.flag_id} has no stopwatch, cannot apply restriction." 

314 LOG().info(msg) 

315 result.add_error(AppErrorCode.INVALID_DATA, msg) 

316 return result 

317 

318 # Save or update restriction in database 

319 updated_restricted: RestrictionWrapper | None = None 

320 

321 try: 

322 if PersistedGuard.is_restriction(restriction): 

323 msg = ( 

324 f"Restriction {restriction.doc_id} is already persisted. Updating restriction." 

325 ) 

326 LOG().info(msg) 

327 updated = self.restriction_bridge.update(restriction) 

328 if updated.is_error: 

329 msg = f"Failed to update restriction {restriction.doc_id}: {updated.errors}" 

330 LOG().error(msg) 

331 result.merge(updated) 

332 return result 

333 assert updated.generated is not None # type narrowing 

334 updated_restricted = updated.generated 

335 else: 

336 assert isinstance(restriction, RestrictionModel) # type narrowing 

337 created = self.restriction_bridge.create(restriction) 

338 if created.is_error: 

339 msg = f"Failed to create restriction: {created.errors}" 

340 LOG().error(msg) 

341 result.merge(created) 

342 return result 

343 assert created.generated is not None # type narrowing 

344 updated_restricted = created.generated 

345 

346 except Exception as e: 

347 result.add_error(AppErrorCode.DATABASE_EX, f"Failed to save restriction: {e}") 

348 return result 

349 

350 # Update the user model 

351 # NOTE: email notification is handled in flag_service.. 

352 try: 

353 user.restriction_id = updated_restricted.doc_id 

354 user_update_result = self.user_bridge.update(user) 

355 if user_update_result.is_error: 

356 result.merge(user_update_result) 

357 

358 return result 

359 except Exception as e: 

360 result.add_error(AppErrorCode.DATABASE_EX, f"Failed to apply restriction to user: {e}") 

361 return result 

362 

363 def remove_restriction(self, user: UserWrapper, flag_id: str) -> AppResult[RestrictionModel]: 

364 """Remove restriction from user based on flag ID.""" 

365 result = AppResult[RestrictionModel](task_name=f"for flag {flag_id}") 

366 applied_id = user.restriction_id 

367 if applied_id is None: 

368 LOG().warning(f"User {user.doc_id} has no restriction to remove for flag {flag_id}") 

369 return AppResult[RestrictionModel].skip( 

370 "remove_restriction_no_op", 

371 f"User {user.doc_id} has no restriction to remove.", 

372 ) 

373 

374 # Get flag and validate 

375 flag_result = self.flag_bridge.get(flag_id) 

376 if flag_result.is_error: 

377 LOG().error( 

378 f"Failed to get flag {flag_id} for removing restriction: {flag_result.errors}", 

379 ) 

380 result.merge(flag_result) 

381 return result 

382 

383 flag = flag_result.generated 

384 if flag is None or flag.restriction_id is None: 

385 msg = f"Flag {flag_id} has no restriction (restriction_id is None), cannot remove restriction." 

386 LOG().error(msg) 

387 result.add_error(AppErrorCode.INVALID_DATA, msg) 

388 return result 

389 

390 # Get restriction 

391 restriction_id = flag.restriction_id 

392 restrict_result = self.restriction_bridge.get(restriction_id) 

393 if restrict_result.is_error: 

394 msg = f"Failed to get restriction {restriction_id} for removing restriction: {restrict_result.errors}" 

395 LOG().error(msg) 

396 result.merge(restrict_result) 

397 return result 

398 

399 restriction = restrict_result.generated 

400 assert restriction is not None # type narrowing 

401 

402 # NOTE: email notification is handled in flag_service.. 

403 # Remove restriction from user 

404 try: 

405 LOG().info(f"Removing restriction {restriction_id} from user {user.doc_id}") 

406 user.restriction_id = None 

407 update_result = self.user_bridge.update(user) 

408 if update_result.is_error: 

409 result.merge(update_result) 

410 

411 except Exception as e: 

412 LOG().error(f"Exception removing restriction from user {user.doc_id}: {e}") 

413 msg = f"Exception removing restriction from user: {e}" 

414 result.add_error(AppErrorCode.DATABASE_EX, msg) 

415 

416 return result 

417 

418 def process_inactive_restriction( 

419 self, 

420 requested: RestrictionWrapper, 

421 user: UserWrapper, 

422 restrict_id: str, 

423 applied_restriction: RestrictionWrapper | None = None, 

424 ) -> AppResult[RestrictionWrapper]: 

425 """Process a single inactive restriction for a user.""" 

426 result = AppResult[RestrictionWrapper]( 

427 doc_id=restrict_id, 

428 task_name=f"for user {user.doc_id}/{restrict_id}", 

429 ) 

430 

431 if applied_restriction is not None: 

432 applied_id = user.doc_id 

433 assert applied_id # type narrowing 

434 

435 # Handle duplicate restriction 

436 if requested.equivalent(applied_restriction): 

437 LOG().info(f"User {user.doc_id} has equivalent restriction. Marking as DUPLICATE.") 

438 requested.status = RestrictionStatus.DUPLICATE 

439 return self.restriction_bridge.update(requested) 

440 

441 # Determine if should apply requested restriction 

442 should_apply = RestrictionCalculator.should_apply( 

443 existing=applied_restriction, 

444 requested=requested, 

445 ) 

446 

447 if not should_apply: 

448 # Keep existing, mark requested as less severe 

449 update_result = self._apply_restriction_to_db(user, applied_restriction) 

450 if not update_result.is_error: 

451 LOG().info( 

452 f"Keeping existing restriction. Setting {restrict_id} to IGNORED_LESS_SEVERE.", 

453 ) 

454 status_result = self._set_restriction_status( 

455 requested, 

456 RestrictionStatus.IGNORED_LESS_SEVERE, 

457 ) 

458 result.merge(status_result) 

459 else: 

460 result.merge(update_result) 

461 return result 

462 

463 # Apply the new restriction 

464 LOG().info(f"Applying restriction {requested.category.value} for user {user.doc_id}") 

465 apply_result = self.apply_restriction(user, requested) 

466 

467 if not apply_result.is_error: 

468 requested.status = RestrictionStatus.APPLIED 

469 update_result = self.restriction_bridge.update(requested) 

470 result.merge(update_result) 

471 else: 

472 result.merge(apply_result) 

473 

474 return result 

475 

476 def _set_restriction_status( 

477 self, 

478 restriction: RestrictionWrapper, 

479 status: RestrictionStatus, 

480 ) -> AppResult[RestrictionWrapper]: 

481 result: AppResult[RestrictionWrapper] = AppResult( 

482 doc_id=restriction.doc_id, 

483 task_name=f"for {restriction.doc_id}/status={status.value}", 

484 ) 

485 try: 

486 restriction.status = status 

487 update_result = self.restriction_bridge.update(restriction) 

488 if update_result.is_error: 

489 result.merge(update_result) 

490 elif update_result.generated is None: 

491 result.add_error( 

492 AppErrorCode.DATABASE_EX, 

493 f"Updated restriction {restriction.doc_id} has no generated model.", 

494 ) 

495 else: 

496 result.generated = update_result.generated 

497 except Exception as e: 

498 result.add_error( 

499 AppErrorCode.UPDATE_FAILED, 

500 f"Failed to set restriction status to {status.value} for {restriction.doc_id}: {e}", 

501 ) 

502 return result 

503 

504 def _apply_restriction_to_db( 

505 self, 

506 user: UserWrapper, 

507 restriction: RestrictionWrapper, 

508 ) -> AppResult[RestrictionWrapper]: 

509 stopwatch = restriction.stopwatch 

510 restrict_id = restriction.doc_id 

511 assert restrict_id is not None # type narrowing 

512 

513 restriction_result: AppResult[RestrictionWrapper] = AppResult( 

514 doc_id=restrict_id, 

515 task_name=f"for user {user.doc_id}/{restrict_id}", 

516 ) 

517 uid = user.doc_id 

518 assert uid is not None # type narrowing 

519 

520 if stopwatch is not None: 

521 LOG().info(f"User {uid} has existing applied restriction with stopwatch {stopwatch}") 

522 else: 

523 # this shouldn't really happen.. 

524 LOG().info( 

525 f"User {uid} has existing restriction " 

526 f"{stopwatch} of type {restriction.category.value} " 

527 f"for flag action {restrict_id} with no stopwatch. " 

528 "Applying stopwatch from requested restriction.", 

529 ) 

530 restriction.start_stopwatch() 

531 

532 LOG().info( 

533 f"User {uid} has existing applied restriction " 

534 f"{restriction.doc_id} of type {restriction.category.value} " 

535 f"for flag action {restrict_id}.", 

536 ) 

537 

538 restrict_update = self.restriction_bridge.update(restriction) 

539 if restrict_update.is_error: 

540 restriction_result.merge(restrict_update) 

541 return restriction_result 

542 

543 new_restriction = restrict_update.generated 

544 assert new_restriction # type narrowing 

545 

546 user.restriction_id = new_restriction.doc_id 

547 user_update = self.user_bridge.update(user) 

548 if user_update.is_error: 

549 restriction_result.merge(user_update) 

550 

551 return restriction_result 

552 

553 def validate_restriction_and_user( 

554 self, 

555 restriction: RestrictionWrapper, 

556 ) -> _ValidatedRestriction: 

557 """ 

558 Validate restriction has required fields and user exists. 

559 Returns (restrict_id, user, error_type). 

560 

561 """ 

562 uid = restriction.uid 

563 user_result = self.user_bridge.get(uid) 

564 if user_result.is_error: 

565 return _ValidatedRestriction.from_error(AppErrorCode.USER_MISSING) 

566 

567 user = user_result.generated 

568 assert user is not None # narrowing, since is_error is False 

569 

570 restriction_id = user.restriction_id 

571 if restriction_id is None: 

572 return _ValidatedRestriction(user=user) 

573 

574 # get the applied restriction to compare against 

575 applied_result = self.restriction_bridge.get(restriction_id) 

576 if applied_result.is_error: 

577 msg = f"Failed to get applied restriction {restriction_id} for user {uid}: {applied_result.errors}" 

578 LOG().error(msg) 

579 return _ValidatedRestriction.from_error(AppErrorCode.RESTRICTION_MISSING) 

580 

581 applied_restriction = applied_result.generated 

582 assert applied_restriction is not None # narrowing 

583 return _ValidatedRestriction(user=user, applied_restriction=applied_restriction)