Coverage for functions \ flipdare \ voting \ ballot.py: 100%

0 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-05-08 12:22 +1000

1#!/usr/bin/env python 

2# Copyright (c) 2026 Flipdare Pty Ltd. All rights reserved. 

3# 

4# This file is part of Flipdare's proprietary software and contains 

5# confidential and copyrighted material. Unauthorised copying, 

6# modification, distribution, or use of this file is strictly 

7# prohibited without prior written permission from Flipdare Pty Ltd. 

8# 

9# This software includes third-party components licensed under MIT, 

10# BSD, and Apache 2.0 licences. See THIRD_PARTY_NOTICES for details. 

11# 

12 

13 

14import math 

15from dataclasses import dataclass 

16 

17import numpy as np 

18from scipy import stats 

19 

20from flipdare.app_log import LOG 

21from flipdare.constants import ( 

22 BAYESIAN_CDF_THRESHOLD, 

23 BAYESIAN_MAJORITY_THRESHOLD, 

24 BAYESIAN_MINORITY_THRESHOLD, 

25 IS_DEBUG, 

26 KEMENY_APPROVAL_THRESHOLD, 

27 KEMENY_MIN_PERCENT_PENDING, 

28 KEMENY_MIN_VOTES, 

29 KEMENY_PENDING_WEIGHT, 

30 KEMENY_REJECTION_THRESHOLD, 

31 SPRT_FALSE_NEGATIVE_RATE, 

32 SPRT_FALSE_POSITIVE_RATE, 

33 SPRT_MIN_VOTES, 

34 SPRT_SHOULD_APPROVE_PROBABILITY, 

35 SPRT_SHOULD_REJECT_PROBABILITY, 

36 THOMPSON_CONFIDENCE, 

37 THOMPSON_SAMPLING_ITERATIONS, 

38 THOMPSON_THRESHOLD, 

39 WILSON_CONFIDENCE_INTERVAL, 

40) 

41from flipdare.generated.shared.model.dare.ballot_algorithm_type import BallotAlgorithmType 

42from flipdare.generated.shared.model.dare.ballot_result import BallotResult 

43 

44__all__ = ["Ballot", "BallotOutcome", "VoteTally"] 

45 

46 

47rng = np.random.default_rng() 

48 

49 

50@dataclass(frozen=True, kw_only=True) 

51class BallotOutcome: 

52 result: BallotResult 

53 algorithm: BallotAlgorithmType | None = None 

54 

55 

56@dataclass(frozen=True, kw_only=True) 

57class VoteTally: 

58 accepted: int = 0 

59 rejected: int = 0 

60 undecided: int = 0 

61 

62 @property 

63 def empty(self) -> bool: 

64 return self.accepted == 0 and self.rejected == 0 and self.undecided == 0 

65 

66 

67class Ballot: 

68 

69 __slots__ = ("_min_kemeny_pending_percent", "_tally") 

70 

71 def __init__( 

72 self, 

73 tally: VoteTally, 

74 kemeny_min_percent_pending: float = KEMENY_MIN_PERCENT_PENDING, 

75 ) -> None: 

76 self._tally = tally 

77 self._min_kemeny_pending_percent = kemeny_min_percent_pending 

78 

79 @property 

80 def accepted(self) -> int: 

81 return self._tally.accepted 

82 

83 @property 

84 def rejected(self) -> int: 

85 return self._tally.rejected 

86 

87 @property 

88 def undecided(self) -> int: 

89 return self._tally.undecided 

90 

91 @property 

92 def kemeny_min_percent_pending(self) -> float: 

93 return self._min_kemeny_pending_percent 

94 

95 def count_votes(self, within_voting_period: bool) -> BallotOutcome: 

96 outcome = self._manual_count() 

97 result = outcome.result 

98 

99 if result.is_accepted: 

100 if IS_DEBUG: 

101 msg = f"Manual count decision for {self.stats_str()} - APPROVED: {outcome!s}" 

102 LOG().debug(msg) 

103 return outcome 

104 

105 if result.is_rejected and not within_voting_period: 

106 if IS_DEBUG: 

107 msg = f"Manual count decision for {self.stats_str()} - REJECTED: {outcome!s}" 

108 LOG().debug(msg) 

109 return outcome 

110 

111 # dont use auto algorithms until the votin period expires 

112 if within_voting_period: 

113 outcome = BallotOutcome(result=BallotResult.NOT_ENOUGH_VOTES) 

114 

115 if IS_DEBUG: 

116 msg = f"Decision {self.stats_str()} - in-conclusive (but within voting period) - {outcome!s}" 

117 LOG().debug(msg) 

118 

119 return outcome 

120 

121 # we want to wait until the end of the voting period to reject, 

122 # since there may be pending votes that could change the outcome 

123 if IS_DEBUG: 

124 msg = f"Outside voting period, using auto- counts for {self.stats_str()} - {outcome!s}" 

125 LOG().debug(msg) 

126 

127 # if we get here, it means either the result is a tie, 

128 # not enough votes for a manual count, 

129 # or it's a rejection within the voting period 

130 outcome = self._auto_count() 

131 result = outcome.result 

132 

133 # if we get here, we are outside the voting period with no clear decision 

134 # so we have to rely on the statistical algorithms to make a final decision 

135 # based on the expressed confidence intervals. 

136 

137 if result.is_auto_accepted or result.is_auto_rejected: 

138 if IS_DEBUG: 

139 msg = f"Auto decision for {self.stats_str()} - {result.value.upper()}: {outcome!s}" 

140 LOG().debug(msg) 

141 return outcome 

142 

143 # mark as expired. 

144 if IS_DEBUG: 

145 msg = f"Auto decision for {self.stats_str()} - EXPIRED: {outcome!s}" 

146 LOG().debug(msg) 

147 

148 return BallotOutcome(result=BallotResult.EXPIRED, algorithm=outcome.algorithm) 

149 

150 def _manual_count(self) -> BallotOutcome: 

151 undecided = self.undecided 

152 accepted = self.accepted 

153 rejected = self.rejected 

154 

155 if undecided > 0: 

156 if IS_DEBUG: 

157 LOG().debug(f"Cannot decide yet, {undecided} undecided votes remain") 

158 

159 return BallotOutcome(result=BallotResult.NOT_ENOUGH_VOTES) 

160 

161 result: BallotResult 

162 if accepted == 0 and rejected == 0: 

163 result = BallotResult.NOT_ENOUGH_VOTES 

164 elif accepted == rejected: 

165 result = BallotResult.TIE 

166 elif accepted > rejected: 

167 result = BallotResult.ACCEPTED 

168 else: 

169 result = BallotResult.REJECTED 

170 

171 if IS_DEBUG: 

172 LOG().debug(f"Manual Vote Decision: {result} for {self.stats_str()}") 

173 

174 return BallotOutcome(result=result, algorithm=BallotAlgorithmType.VOTE_TOTALS) 

175 

176 def _auto_count(self) -> BallotOutcome: 

177 undecided = self.undecided 

178 accepted = self.accepted 

179 rejected = self.rejected 

180 if undecided == 0 and accepted == 0 and rejected == 0: 

181 return BallotOutcome(result=BallotResult.NOT_ENOUGH_VOTES) 

182 

183 can_approve: bool | None = None 

184 algorithm_type: BallotAlgorithmType | None = None 

185 

186 # NOTE: this code generates too many false positives. 

187 # NOTE: and needs to be investigated further. 

188 # auto_approve = self._auto_approve_with_kemeny_young() 

189 # if auto_approve is not None: 

190 # decision_type = VoteDecisionType.KEMENY_YOUNG 

191 # else: 

192 # auto_approve = self._auto_approve_with_thompson_sampling() 

193 # if auto_approve is not None: 

194 # decision_type = VoteDecisionType.THOMPSON_SAMPLING 

195 

196 total = accepted + rejected + undecided 

197 pending_percent = undecided / total 

198 kemeny_min = self.kemeny_min_percent_pending 

199 

200 if IS_DEBUG: 

201 msg = f"Auto-count pending percent {pending_percent:.2%} with Kemeny threshold {kemeny_min:.2%}" 

202 LOG().debug(msg) 

203 if pending_percent > kemeny_min: 

204 if IS_DEBUG: 

205 LOG().debug("High pending votes, using Kemeny-Young method for auto-decision") 

206 

207 can_approve = self._auto_approve_with_kemeny_young() 

208 algorithm_type = BallotAlgorithmType.KEMENY_YOUNG 

209 else: 

210 if IS_DEBUG: 

211 LOG().debug("Low pending votes, using Thompson Sampling method for auto-decision") 

212 

213 can_approve = self._auto_approve_with_thompson_sampling() 

214 algorithm_type = BallotAlgorithmType.THOMPSON_SAMPLING 

215 

216 if can_approve is None: 

217 return BallotOutcome(result=BallotResult.TIE, algorithm=algorithm_type) 

218 

219 result = BallotResult.AUTO_ACCEPTED if can_approve else BallotResult.AUTO_REJECTED 

220 return BallotOutcome(result=result, algorithm=algorithm_type) 

221 

222 def _auto_approve_with_sprt_robust( 

223 self, 

224 false_positive_rate: float = SPRT_FALSE_POSITIVE_RATE, 

225 false_negative_rate: float = SPRT_FALSE_NEGATIVE_RATE, 

226 min_votes: int = SPRT_MIN_VOTES, 

227 ) -> bool | None: # pragma: no cover 

228 """ 

229 SPRT with explicit control over error rates 

230 """ 

231 approved = self.accepted 

232 rejected = self.rejected 

233 n = approved + rejected 

234 

235 if n < min_votes: 

236 return None 

237 

238 # Hypotheses: p0 = 0.4 (should reject), p1 = 0.6 (should approve) 

239 p0, p1 = SPRT_SHOULD_REJECT_PROBABILITY, SPRT_SHOULD_APPROVE_PROBABILITY 

240 

241 # SPRT thresholds 

242 A = false_negative_rate / (1 - false_positive_rate) # noqa: N806 

243 B = (1 - false_negative_rate) / false_positive_rate # noqa: N806 

244 

245 # Log-likelihood ratio 

246 llr = approved * math.log(p1 / p0) + rejected * math.log((1 - p1) / (1 - p0)) 

247 

248 if llr >= math.log(B): 

249 return True 

250 if llr <= math.log(A): 

251 return False 

252 

253 return None 

254 

255 def _auto_approve_with_bayesian_decision( 

256 self, 

257 prior_alpha: int = 1, 

258 prior_beta: int = 1, 

259 ) -> bool | None: # pragma: no cover 

260 """ 

261 Bayesian inference with Beta-Binomial model 

262 """ 

263 approved = self.accepted 

264 rejected = self.rejected 

265 n = approved + rejected 

266 if n <= 0: 

267 return None 

268 

269 # Posterior distribution 

270 posterior_alpha = prior_alpha + approved 

271 posterior_beta = prior_beta + rejected 

272 

273 # Probability that approval rate > 0.45 

274 # NOTE: Using 0.45 threshold to account for pending votes 

275 prob_majority = 1 - stats.beta.cdf( 

276 BAYESIAN_CDF_THRESHOLD, 

277 posterior_alpha, 

278 posterior_beta, 

279 ) 

280 

281 # Decision thresholds 

282 if prob_majority > BAYESIAN_MAJORITY_THRESHOLD: # Very confident it will pass 

283 return True 

284 if prob_majority < BAYESIAN_MINORITY_THRESHOLD: # Very confident it will fail 

285 return False 

286 

287 return None 

288 

289 def _auto_approve_with_thompson_sampling( 

290 self, 

291 n_samples: int = THOMPSON_SAMPLING_ITERATIONS, 

292 threshold: float = THOMPSON_THRESHOLD, 

293 confidence: float = THOMPSON_CONFIDENCE, 

294 ) -> bool | None: 

295 """ 

296 Thompson Sampling approach - samples from posterior distribution 

297 """ 

298 approved = self.accepted 

299 rejected = self.rejected 

300 

301 if approved + rejected == 0: 

302 return None 

303 

304 # Sample from Beta posterior 

305 prior_alpha, prior_beta = 1, 1 

306 posterior_samples = rng.beta(approved + prior_alpha, rejected + prior_beta, n_samples) 

307 

308 # Probability that true approval rate > threshold 

309 prob_approve = np.mean(posterior_samples > threshold) 

310 

311 if prob_approve >= confidence: 

312 if IS_DEBUG: 

313 msg = f"Thompson Sampling: Approve with prob {prob_approve}:\n\t{self.stats_str()}" 

314 LOG().debug(msg) 

315 return True 

316 if prob_approve <= (1 - confidence): 

317 if IS_DEBUG: 

318 msg = f"Thompson Sampling: Reject with prob {1 - prob_approve}:\n\t{self.stats_str()}" 

319 LOG().debug(msg) 

320 return False 

321 

322 if IS_DEBUG: 

323 msg = f"Thompson Sampling: No decision with prob {prob_approve}:\n\t{self.stats_str()}" 

324 LOG().debug(msg) 

325 return None 

326 

327 def _auto_approve_with_kemeny_young( 

328 self, 

329 pending_weight: float = KEMENY_PENDING_WEIGHT, 

330 ) -> bool | None: 

331 """ 

332 Kemeny-Young ranking considering pending votes as partial preferences 

333 NOTE: This is a simplified adaptation for binary decisions 

334 NOTE: The lower the pending_weight, the more pending votes count towards rejection 

335 NOTE: e.g. 1,3,2 will return False if pending_weight=0.45, but None if pending_weight=0.5 

336 """ 

337 approved = self.accepted 

338 rejected = self.rejected 

339 undecided = self.undecided 

340 

341 # Weight undecided votes as partial support 

342 effective_approved = approved + (undecided * pending_weight) 

343 effective_rejected = rejected + (undecided * (1 - pending_weight)) 

344 

345 total = effective_approved + effective_rejected 

346 

347 if total < KEMENY_MIN_VOTES: # Need minimum votes 

348 if IS_DEBUG: 

349 LOG().debug(f"Kemeny-Young: Not enough votes:\n\t{self.stats_str()}") 

350 return None 

351 

352 approval_score = effective_approved / total 

353 

354 # Use stricter thresholds since we're estimating 

355 if approval_score > KEMENY_APPROVAL_THRESHOLD: 

356 if IS_DEBUG: 

357 msg = f"Kemeny-Young: Approve with score {approval_score}:\n\t{self.stats_str()}" 

358 LOG().debug(msg) 

359 return True 

360 if approval_score < KEMENY_REJECTION_THRESHOLD: 

361 if IS_DEBUG: 

362 msg = f"Kemeny-Young: Reject with score {approval_score}:\n\t{self.stats_str()}" 

363 LOG().debug(msg) 

364 return False 

365 

366 if IS_DEBUG: 

367 msg = f"Kemeny-Young: No decision with score {approval_score}:\n\t{self.stats_str()}" 

368 LOG().debug(msg) 

369 return None 

370 

371 def wilson_score_interval( 

372 self, 

373 confidence_level: float = WILSON_CONFIDENCE_INTERVAL, 

374 ) -> tuple[float, float]: # pragma: no cover 

375 """ 

376 Calculate Wilson score confidence interval for approval proportion 

377 """ 

378 # FIXME: this could be replaced with from statsmodels.stats.proportion import proportion_confint 

379 # lower, upper = proportion_confint(self.accepted, n, alpha=1-confidence, method='wilson') 

380 

381 pos = self.accepted 

382 neg = self.rejected 

383 

384 n = pos + neg 

385 if n <= 0: 

386 return 0.0, 0.0 

387 

388 p = pos / n 

389 

390 # Z-score for the given confidence level 

391 z = stats.norm.ppf(1 - (1 - confidence_level) / 2) 

392 z_sq = z**2 

393 

394 # 3. Wilson Score Formula components 

395 denominator = 1 + z_sq / n 

396 center = (p + z_sq / (2 * n)) / denominator 

397 spread = z * ((p * (1 - p) / n + z_sq / (4 * n**2)) ** 0.5) / denominator 

398 

399 return max(0.0, center - spread), min(1.0, center + spread) 

400 

401 def stats_str(self) -> str: 

402 total = self.accepted + self.rejected + self.undecided 

403 pending_percent = self.undecided / total 

404 

405 return ( 

406 f"[Total: {total}, Approved: {self.accepted}, " 

407 f"Rejected: {self.rejected}, Undecided: {self.undecided}, " 

408 f"Undecided%: {pending_percent}]" 

409 ) 

410 

411 

412# OLD CODE - DEPRECATED 

413# 

414# **** This returns too many None results, not useful **** 

415# def auto_approve_with_entropy_threshold(self, max_entropy=0.5, 

416# approval_threshold=0.5): 

417# """ 

418# Only decide when uncertainty (entropy) is low enough 

419# """ 

420# approved = self.approved 

421# rejected = self.rejected 

422# n = approved + rejected 

423# 

424# if n == 0: 

425# return None 

426# 

427# p = (approved + 1) / (n + 2) # Laplace smoothing 

428# 

429# # Shannon entropy of the outcome 

430# if p == 0 or p == 1: 

431# entropy = 0 

432# else: 

433# entropy = -p * math.log2(p) - (1-p) * math.log2(1-p) 

434# 

435# # Only decide if uncertainty is low AND result is clear 

436# if entropy > max_entropy: 

437# return None # Too uncertain 

438# 

439# if p > approval_threshold: 

440# return True 

441# elif p < (1 - approval_threshold): 

442# return False 

443# 

444# return None 

445# 

446# **** This returns too many None results, not useful **** 

447# def auto_approve_with_confidence_intervals(self) -> bool | None: 

448# """ 

449# Use Wilson score interval for proportion confidence 

450# """ 

451# approved = self.approved 

452# rejected = self.rejected 

453# 

454# n = approved + rejected 

455# if n <= 0: 

456# return None 

457# 

458# p = approved / n 

459# 

460# # Wilson score 90% confidence interval 

461# z = 1.645 # 90% confidence 

462# denominator = 1 + z**2 / n 

463# center = (p + z**2 / (2 * n)) / denominator 

464# margin = z * ((p * (1 - p) / n + z**2 / (4 * n**2)) ** 0.5) / denominator 

465# 

466# ci_lower = center - margin 

467# ci_upper = center + margin 

468# 

469# # Auto-approve if lower bound > threshold 

470# if ci_lower > 0.65: 

471# return True 

472# # Auto-reject if upper bound < threshold 

473# elif ci_upper < 0.35: 

474# return False 

475# 

476# return None # Too uncertain