Coverage for functions \ flipdare \ task \ command_task_handler.py: 44%
32 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-05-08 12:22 +1000
« prev ^ index » next coverage.py v7.13.0, created at 2026-05-08 12:22 +1000
1#!/usr/bin/env python
2# Copyright (c) 2026 Flipdare Pty Ltd. All rights reserved.
3#
4# This file is part of Flipdare's proprietary software and contains
5# confidential and copyrighted material. Unauthorised copying,
6# modification, distribution, or use of this file is strictly
7# prohibited without prior written permission from Flipdare Pty Ltd.
8#
9# This software includes third-party components licensed under MIT,
10# BSD, and Apache 2.0 licences. See THIRD_PARTY_NOTICES for details.
11#
13from __future__ import annotations
14from flipdare.service._service_provider import ServiceProvider
15from flipdare.task.command.default_command import DefaultCommand
16from flipdare.result.output_result import OutputResult
17from flipdare.generated.shared.backend.app_job_type import AppJobType
18from flipdare.job_types import CommandJobType
20__all__ = ["CommandTaskHandler"]
23class CommandTaskHandler(ServiceProvider):
24 # NOTE: we need access to everything, so we defy the standard
25 # NOTE: for a core admin class.
26 def __init__(self) -> None:
27 super().__init__()
29 def run_command(self, job_type: CommandJobType) -> OutputResult:
30 match job_type:
31 case AppJobType.COMMAND_TYPESENSE_REINDEX:
32 return self._typesense_reindex()
33 case AppJobType.COMMAND_TYPESENSE_COMPACT:
34 return self._typesense_compact()
35 case AppJobType.COMMAND_FIREBASE_CLEANUP:
36 return self._firebase_cleanup()
37 case AppJobType.COMMAND_UPDATE_EXCHANGE_RATE:
38 return self._exchange_rate_update()
40 def _typesense_reindex(self) -> OutputResult:
41 command = DefaultCommand(
42 job_type=AppJobType.COMMAND_TYPESENSE_REINDEX,
43 command_callback=self.search_manager.reindex,
44 app_logger=self.app_logger,
45 mailer=self.admin_mailer,
46 )
47 return command.run_command()
49 def _typesense_compact(self) -> OutputResult:
50 command = DefaultCommand(
51 job_type=AppJobType.COMMAND_TYPESENSE_COMPACT,
52 command_callback=self.search_manager.compact,
53 app_logger=self.app_logger,
54 mailer=self.admin_mailer,
55 )
56 return command.run_command()
58 def _exchange_rate_update(self) -> OutputResult:
59 command = DefaultCommand(
60 job_type=AppJobType.COMMAND_UPDATE_EXCHANGE_RATE,
61 command_callback=self.exchange_rate_monitor.update_exchange_rate,
62 app_logger=self.app_logger,
63 mailer=self.admin_mailer,
64 )
65 return command.run_command()
67 def _firebase_cleanup(self) -> OutputResult:
68 command = DefaultCommand(
69 job_type=AppJobType.COMMAND_FIREBASE_CLEANUP,
70 command_callback=self.storage_client.cleanup,
71 app_logger=self.app_logger,
72 mailer=self.admin_mailer,
73 )
74 return command.run_command()