Coverage for functions \ flipdare \ backend \ exchange_rate_monitor.py: 76%

177 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-05-08 12:22 +1000

1#!/usr/bin/env python 

2# Copyright (c) 2026 Flipdare Pty Ltd. All rights reserved. 

3# 

4# This file is part of Flipdare's proprietary software and contains 

5# confidential and copyrighted material. Unauthorised copying, 

6# modification, distribution, or use of this file is strictly 

7# prohibited without prior written permission from Flipdare Pty Ltd. 

8# 

9# This software includes third-party components licensed under MIT, 

10# BSD, and Apache 2.0 licences. See THIRD_PARTY_NOTICES for details. 

11# 

12 

13from __future__ import annotations 

14 

15import json 

16import requests 

17from requests.exceptions import Timeout 

18 

19from flipdare.app_log import LOG 

20from flipdare.app_types import ExchangeRateDict, JsonDict 

21from flipdare.constants import ( 

22 CURRENCY_API_TIMEOUT, 

23 EXCHANGE_RATE_BASE_CURRENCY, 

24 EXCHANGE_RATE_HEADERS, 

25 EXCHANGE_RATE_URL, 

26 IS_DEBUG, 

27 IS_TRACE, 

28) 

29from flipdare.core import Singleton 

30from flipdare.generated.shared.app_log_category import AppLogCategory 

31from flipdare.result.app_result import AppResult 

32from flipdare.result.job_result import JobResult 

33from flipdare.result.output_result import OutputResult 

34from flipdare.mailer.admin_mailer import AdminMailer 

35from flipdare.firestore.backend.exchange_rate_db import ExchangeRateDb 

36from flipdare.generated import StripeCurrencyCode 

37from flipdare.generated.model.internal.exchange_rate_model import ExchangeRateModel 

38from flipdare.generated.shared.app_error_code import AppErrorCode 

39from flipdare.generated.shared.app_http_code import AppHttpCode 

40from flipdare.generated.shared.backend.app_job_type import AppJobType 

41from flipdare.util.time_util import TimeUtil 

42from flipdare.wrapper.internal.exchange_rate_wrapper import ExchangeRateWrapper 

43 

44__all__ = ["ExchangeRateMonitor"] 

45 

46# 

47# Example response from Open Exchange Rates API 

48# { 

49# disclaimer: "https://openexchangerates.org/terms/", 

50# license: "https://openexchangerates.org/license/", 

51# timestamp: 1449877801, 

52# base: "USD", 

53# rates: { 

54# AED: 3.672538, 

55# AFN: 66.809999, 

56# ALL: 125.716501, 

57# AMD: 484.902502, 

58# ANG: 1.788575, 

59# AOA: 135.295998, 

60# ARS: 9.750101, 

61# AUD: 1.390866, 

62# /* ... */ 

63# } 

64# } 

65# 

66 

67 

68class ExchangeRateMonitor(Singleton): 

69 

70 __slots__ = ( 

71 "_api_headers", 

72 "_api_key", 

73 "_base_currency", 

74 "_currency_db", 

75 "_exchange_rate_url", 

76 ) 

77 

78 def __init__( 

79 self, 

80 currency_db: ExchangeRateDb | None = None, 

81 admin_mailer: AdminMailer | None = None, 

82 api_key: str | None = None, 

83 api_headers: dict[str, str] = EXCHANGE_RATE_HEADERS, 

84 base_currency: StripeCurrencyCode = EXCHANGE_RATE_BASE_CURRENCY, 

85 ) -> None: 

86 from flipdare.app_config import get_app_config 

87 

88 if api_key is None: 

89 api_key = get_app_config().api_key(is_backend=False) 

90 

91 self._currency_db = currency_db 

92 self._admin_mailer = admin_mailer 

93 self._api_key = api_key 

94 self._api_headers = api_headers 

95 self._exchange_rate_url = EXCHANGE_RATE_URL.replace("__API_KEY__", api_key) 

96 self._base_currency = base_currency 

97 

98 @property 

99 def currency_db(self) -> ExchangeRateDb: 

100 if self._currency_db is None: 

101 

102 from flipdare.services import get_exchange_rate_db 

103 

104 self._currency_db = get_exchange_rate_db() 

105 return self._currency_db 

106 

107 @property 

108 def admin_mailer(self) -> AdminMailer: 

109 if self._admin_mailer is None: 

110 from flipdare.services import get_admin_mailer 

111 

112 self._admin_mailer = get_admin_mailer() 

113 return self._admin_mailer 

114 

115 @property 

116 def api_key(self) -> str: 

117 return self._api_key 

118 

119 @property 

120 def exchange_rate_url(self) -> str: 

121 return self._exchange_rate_url 

122 

123 @property 

124 def api_headers(self) -> dict[str, str]: 

125 return self._api_headers 

126 

127 def convert_cents_to_usd_cents(self, units: int, currency: StripeCurrencyCode) -> int | None: 

128 """ 

129 Convert given currency cents to USD cents using exchange rates stored in the database. 

130 If currency is already USD, returns the same value. 

131 If no exchange rate is found, returns None. 

132 For zero decimal currencies, units are treated as whole units. 

133 """ 

134 if currency == StripeCurrencyCode.USD: 

135 # already in cents USD, just return cents 

136 return units 

137 

138 db = self.currency_db 

139 rate_id = db.get_currency_id(self._base_currency, currency) 

140 if rate_id is None: 

141 LOG().error(f"No exchange rate found for currency: {currency}") 

142 return None 

143 

144 if IS_TRACE: 

145 LOG().trace(f"Found exchange rate ID {rate_id} for currency: {currency}") 

146 

147 rate_model = db.get(rate_id) 

148 if rate_model is None: 

149 LOG().error(f"No exchange rate found for currency: {currency}") 

150 return None 

151 

152 if IS_TRACE: 

153 msg = f"Exchange rate model retrieved: {rate_model} with rate {rate_model.rate}" 

154 LOG().trace(msg) 

155 

156 exchange_rate = rate_model.rate 

157 if currency.is_zero_decimal: 

158 # For zero decimal currencies, input units are whole currency units (not cents) 

159 # Example: 1111 JPY units = 1111 JPY 

160 # Rate is "1 JPY = X USD", so multiply to get USD dollars 

161 # Convert to USD: 1111 * rate = USD dollars 

162 # Convert to cents: USD dollars * 100 

163 if IS_TRACE: 

164 msg = f"Converting zero-decimal currency {currency} whole units to USD cents" 

165 LOG().trace(msg) 

166 

167 usd_dollars = float(units) * exchange_rate 

168 usd_cents = usd_dollars * 100.0 

169 else: 

170 # For decimal currencies, input units are cents 

171 # Example: 1100 EUR cents = 11.00 EUR 

172 # Rate is "1 EUR = X USD", but we have cents 

173 # Convert to USD: 1100 cents / rate = USD cents 

174 usd_cents = float(units) / exchange_rate 

175 

176 if IS_TRACE: 

177 msg = ( 

178 f"Converting {units} {currency} to USD at rate {exchange_rate}:" 

179 f" {usd_cents} USD cents before rounding" 

180 ) 

181 LOG().trace(msg) 

182 

183 if usd_cents <= 0: 

184 usd_cents = 0.0 

185 

186 usd_cents = round(usd_cents, 0) 

187 return int(usd_cents) 

188 

189 def update_exchange_rate(self, rates: ExchangeRateDict | None = None) -> OutputResult: 

190 result = AppResult[ExchangeRateWrapper]() 

191 start = TimeUtil.get_current_utc_dt() 

192 

193 if rates is None: 

194 rates = self.get_latest_exchange_rates() 

195 

196 if rates is None: 

197 msg = "No exchange rates available to update." 

198 LOG().error(msg) 

199 result.add_error(AppErrorCode.CURRENCY_API, msg) 

200 return JobResult.from_result(result, data={"rates": None}) 

201 

202 db = self.currency_db 

203 missing_codes: list[str] = [] 

204 

205 passed_ct = 0 

206 failed_ct = 0 

207 

208 if IS_DEBUG: 

209 msg = f"Starting exchange rate update with {len(rates)} rates fetched from API\n{rates}\n" 

210 LOG().debug(msg) 

211 

212 for currency in StripeCurrencyCode: 

213 code = currency.code.upper() 

214 if code not in rates: 

215 LOG().warning(f"Missing exchange rate for currency: {code}") 

216 missing_codes.append(code) 

217 failed_ct += 1 

218 continue 

219 

220 rate = rates[code] 

221 base_currency = self._base_currency 

222 

223 if IS_TRACE: 

224 LOG().trace(f"Exchange rate for {currency}[{base_currency}]: {rate}") 

225 

226 model = ExchangeRateModel( 

227 id=None, 

228 base_currency=base_currency, 

229 target_currency=currency, 

230 rate=rate, 

231 ) 

232 

233 doc_id = db.get_currency_id(base_currency, currency) 

234 passed_ct += 1 

235 if doc_id is None: 

236 db.create(model) 

237 if IS_TRACE: 

238 msg = f"Created exchange rate for {currency}[{base_currency}] in database." 

239 LOG().trace(msg) 

240 else: 

241 json_data = model.to_dict() 

242 db.update(doc_id, json_data) 

243 if IS_TRACE: 

244 msg = f"Updated exchange rate for {currency}[{base_currency}] in database." 

245 LOG().trace(msg) 

246 

247 end = TimeUtil.get_current_utc_dt() 

248 duration = TimeUtil.duration_in_seconds(start, end) 

249 

250 if len(missing_codes) <= 0: 

251 msg = f"All required exchange rates updated: passed_ct={passed_ct}, failed_ct={failed_ct}" 

252 LOG().info(msg) 

253 return OutputResult.ok( 

254 job_type=AppJobType.COMMAND_UPDATE_EXCHANGE_RATE, 

255 message=msg, 

256 duration=duration, 

257 ) 

258 

259 msg = f"Missing exchange rates for currencies: {', '.join(missing_codes)}" 

260 self._send_error(msg) 

261 result.add_error(AppErrorCode.CURRENCY_API, msg) 

262 return JobResult.from_result(result, data={"missing_codes": missing_codes}) 

263 

264 def get_latest_exchange_rates(self) -> ExchangeRateDict | None: 

265 url = self.exchange_rate_url 

266 headers = self.api_headers 

267 

268 response: requests.Response | None = None 

269 error_msg: str | None = None 

270 try: 

271 response = requests.get(url, headers=headers, timeout=CURRENCY_API_TIMEOUT) 

272 except Timeout: 

273 error_msg = f"Timeout ({CURRENCY_API_TIMEOUT} secs) fetching exchange rates." 

274 except Exception as e: 

275 error_msg = f"Error while fetching exchange rates from API: {e}" 

276 

277 if response is None: 

278 msg = "No response received from exchange rate API" 

279 error_msg = f"{msg}. Error: {error_msg}" if error_msg else msg 

280 

281 if error_msg is not None: 

282 LOG().error(error_msg) 

283 self._send_error(error_msg) 

284 return None 

285 

286 assert response is not None # narrowing 

287 if response.status_code != AppHttpCode.OK.status: 

288 msg = f"Failed to fetch exchange rates: {response.status_code} - {response.text}" 

289 self._send_error(msg) 

290 return None 

291 

292 data: JsonDict 

293 try: 

294 data = json.loads(response.text) 

295 except Exception as e: 

296 msg = f"Error parsing exchange rates response: {e}" 

297 self._send_error(msg) 

298 return None 

299 

300 rates: ExchangeRateDict = data.get("rates", {}) 

301 if len(rates) == 0: 

302 msg = f"No exchange rates found in response (code={response.status_code}): {response.text}" 

303 self._send_error(msg) 

304 return None 

305 

306 LOG().debug(f"Fetched {len(rates)} exchange rates from API.") 

307 return rates 

308 

309 def _send_error(self, msg: str) -> None: 

310 LOG().error(msg) 

311 self.admin_mailer.send_error( 

312 error_code=AppErrorCode.CURRENCY_API, 

313 category=AppLogCategory.API, 

314 message=msg, 

315 include_stack=False, 

316 )