Coverage for functions \ flipdare \ mailer \ user \ user_summary_email.py: 94%

138 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-05-08 12:22 +1000

1#!/usr/bin/env python 

2# Copyright (c) 2026 Flipdare Pty Ltd. All rights reserved. 

3# 

4# This file is part of Flipdare's proprietary software and contains 

5# confidential and copyrighted material. Unauthorised copying, 

6# modification, distribution, or use of this file is strictly 

7# prohibited without prior written permission from Flipdare Pty Ltd. 

8# 

9# This software includes third-party components licensed under MIT, 

10# BSD, and Apache 2.0 licences. See THIRD_PARTY_NOTICES for details. 

11# 

12 

13from typing import override 

14from dataclasses import dataclass 

15from flipdare.app_defaults import get_fallback_avatar_image 

16from flipdare.app_log import LOG 

17from flipdare.constants import ( 

18 IS_DEBUG, 

19 MAX_SUMMARY_EMAIL_ENTRIES, 

20 MAX_SUMMARY_EMAIL_ROW_ENTRIES, 

21) 

22from flipdare.mailer._jinja_email_template import JinjaEmailTemplate 

23from flipdare.mailer.app_email_params import AppEmailParams 

24from flipdare.mailer.app_email_type import AppEmailType 

25from flipdare.generated.model.backend.email.public_image_link_model import PublicImageLinkModel 

26from flipdare.generated.schema.email.subject.user.summary_subject_schema import ( 

27 SummarySubjectSchema, 

28) 

29from flipdare.generated.schema.email.body.user.summary_email_entry_schema import ( 

30 SummaryEmailEntrySchema, 

31) 

32from flipdare.generated.schema.email.body.user.summary_email_schema import SummaryEmailSchema 

33from flipdare.generated.shared.app_deep_link import AppDeepLink 

34from flipdare.generated.shared.backend.summary_email_category import SummaryEmailCategory 

35from flipdare.backend.app_storage_client import AppStorageClient 

36from flipdare.message.summary_message import ( 

37 SummaryManyContext, 

38 SummaryMessage, 

39 SummarySingleContext, 

40) 

41from flipdare.util.firebase_util import FirebaseUtil 

42from flipdare.util.public.public_url_factory import PublicUrlFactory 

43from flipdare.util.time_util import TimeUtil 

44from flipdare.wrapper.backend.user_summary_entry_wrapper import UserSummaryEntryWrapper 

45from flipdare.wrapper.backend.user_summary_wrapper import UserSummaryWrapper 

46 

47type SummaryDictType = dict[SummaryEmailCategory, list[UserSummaryEntryWrapper]] 

48 

49 

50class UserSummaryEmail(JinjaEmailTemplate[SummaryEmailSchema]): 

51 

52 SCHEMA_CLASS = SummaryEmailSchema 

53 

54 def __init__( 

55 self, 

56 summary: UserSummaryWrapper, 

57 summaries: SummaryDictType, 

58 storage_util: AppStorageClient, 

59 user_tz_str: str | None = None, 

60 max_names: int = MAX_SUMMARY_EMAIL_ROW_ENTRIES, 

61 ) -> None: 

62 data = self._build_data(summaries, storage_util, max_names) 

63 

64 created_at = summary.created_at 

65 tz_str = user_tz_str if user_tz_str is not None else summary.tz_str 

66 report_date = TimeUtil.formatted_user_day(created_at, tz_str) 

67 

68 super().__init__( 

69 data=data, 

70 params=AppEmailParams( 

71 email_type=AppEmailType.USR_SUMMARY, 

72 schema=SummarySubjectSchema(date_str=report_date), 

73 ), 

74 ) 

75 

76 @override 

77 def newline_fields(self) -> list[str]: 

78 return [] 

79 

80 @staticmethod 

81 def build_summaries( 

82 entries: list[UserSummaryEntryWrapper], 

83 max_entries: int = MAX_SUMMARY_EMAIL_ENTRIES, 

84 ) -> SummaryDictType: 

85 summaries: SummaryDictType = {} 

86 

87 # we use report order so if len(entries) exceeds max_entries 

88 # we get the most important... 

89 categories = SummaryEmailCategory.report_order() 

90 for category in categories: 

91 category_entries = [e for e in entries if e.entry_type.category == category] 

92 if category_entries: 

93 if IS_DEBUG: 

94 msg = f"Found {len(category_entries)} entries for category {category}." 

95 LOG().debug(msg) 

96 summaries[category] = category_entries[:max_entries] 

97 

98 return summaries 

99 

100 def _build_data( 

101 self, 

102 summaries: SummaryDictType, 

103 storage_util: AppStorageClient, 

104 max_names: int, 

105 ) -> list[SummaryEmailSchema]: 

106 """Build section data for template rendering.""" 

107 sections = [] 

108 categories = SummaryEmailCategory.report_order() 

109 

110 for category in categories: 

111 summary_entries = summaries.get(category, []) 

112 if not summary_entries: 

113 continue 

114 

115 # get public urls. 

116 entries: list[SummaryEmailEntrySchema] = [] 

117 if IS_DEBUG: 

118 LOG().debug(f"Processing {len(entries)} entries for email category {category}.") 

119 

120 for summary_entry in summary_entries: 

121 image = summary_entry.image 

122 if image is None: 

123 continue 

124 

125 image_url = image.source.url 

126 public_url = PublicUrlFactory.create_temp_email_url(image_url) 

127 source_remote_path = FirebaseUtil.gs_url_parts(image_url).path 

128 

129 if IS_DEBUG: 

130 msg = f"Generating public url for {image_url} with: {source_remote_path} -> {public_url}" 

131 LOG().debug(msg) 

132 

133 url_str = storage_util.generate_temp_email_url( 

134 source_remote_path=source_remote_path, 

135 public_url=public_url, 

136 ) 

137 if url_str is None: 

138 url_str = get_fallback_avatar_image() 

139 

140 if IS_DEBUG: 

141 LOG().debug(f"Using public URL for {image}: {url_str}") 

142 

143 public_link = PublicImageLinkModel(image=image, public_url=url_str) 

144 entries.append( 

145 SummaryEmailEntrySchema( 

146 from_name=summary_entry.from_name, 

147 public_url=public_link.public_url, 

148 scaled_width=public_link.scaled_width, 

149 scaled_height=public_link.scaled_height, 

150 ), 

151 ) 

152 

153 # we generate the html and then override the _parse_for_text 

154 # to generate the text 

155 summary_message = _SummaryMessage( 

156 summary_entries, 

157 max_name_count=max_names, 

158 is_html=True, 

159 ) 

160 section = SummaryEmailSchema( 

161 { 

162 "title": category.report_title, 

163 "entries": entries, 

164 "message": summary_message.message, 

165 "names": summary_message.names.name_str, 

166 }, 

167 ) 

168 

169 deep_link = AppDeepLink.for_summary(entry=summary_entries[0]) 

170 if deep_link is not None: 

171 section["action_link"] = deep_link 

172 

173 sections.append(section) 

174 

175 return sections 

176 

177 

178@dataclass 

179class _NameResult: 

180 name_str: str 

181 count: int 

182 

183 

184class _SummaryMessage: 

185 def __init__( 

186 self, 

187 entries: list[UserSummaryEntryWrapper], 

188 max_name_count: int, 

189 is_html: bool = False, 

190 ) -> None: 

191 self.entries = entries 

192 self.max_name_count = max_name_count 

193 self.is_html = is_html 

194 

195 @property 

196 def message(self) -> str: 

197 is_html = self.is_html 

198 entries = self.entries 

199 

200 entry = entries[0] 

201 entry_type = entry.entry_type 

202 

203 if len(entries) == 1: 

204 email_data = self._parse_entry(entry, is_html=is_html) 

205 single_ctx = SummarySingleContext( 

206 from_name=email_data.get("from_name", ""), 

207 description=email_data.get("description", ""), 

208 group_name=email_data.get("group_name", ""), 

209 accepted_name=email_data.get("accepted_name", ""), 

210 ) 

211 return SummaryMessage.from_entry(entry_type, single_ctx) 

212 else: 

213 name_result = self.names 

214 many_ctx = SummaryManyContext( 

215 names=name_result.name_str, 

216 count=name_result.count, 

217 ) 

218 return SummaryMessage.from_entry(entry_type, many_ctx) 

219 

220 @property 

221 def names(self) -> _NameResult: 

222 entries = self.entries 

223 max_ct = self.max_name_count 

224 

225 """Format a list of names into a grammatically correct HTML string.""" 

226 names = [e.from_name for e in entries[:max_ct]] 

227 

228 if len(entries) == 1: 

229 return _NameResult(name_str=f"<b>{names[0]}</b>", count=1) 

230 if len(entries) == 2: # noqa: PLR2004 

231 return _NameResult(name_str=f"<b>{names[0]}</b> and <b>{names[1]}</b>", count=2) 

232 if len(entries) <= max_ct: 

233 formatted = ", ".join(f"<b>{n}</b>" for n in names[:-1]) 

234 formatted += f" and <b>{names[-1]}</b>" 

235 return _NameResult(name_str=formatted, count=len(names)) 

236 

237 formatted = ", ".join(f"<b>{n}</b>" for n in names) 

238 extra = len(entries) - max_ct 

239 formatted += f" and <b>{extra} others</b>" 

240 return _NameResult(name_str=formatted, count=max_ct) 

241 

242 def _parse_entry(self, entry: UserSummaryEntryWrapper, is_html: bool) -> dict[str, str]: 

243 """Get the data dict for a single entry.""" 

244 data = {} 

245 from_name = entry.from_name 

246 data["from_name"] = from_name 

247 

248 if (accepted_name := entry.accepted_name) is not None: 

249 data["accepted_name"] = accepted_name 

250 if (group_name := entry.group_name) is not None: 

251 data["group_name"] = group_name 

252 if (description := entry.description) is not None: 

253 data["description"] = description 

254 

255 if not is_html: 

256 return data 

257 

258 for key in ("from_name", "accepted_name", "group_name", "description"): 

259 if key not in data: 

260 continue 

261 

262 value = data[key] 

263 value = f"<i>{value}</i>" if key == "description" else f"<b>{value}</b>" 

264 data[key] = value 

265 

266 return data