Coverage for functions \ flipdare \ voting \ ballot.py: 100%
0 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-05-08 12:22 +1000
« prev ^ index » next coverage.py v7.13.0, created at 2026-05-08 12:22 +1000
1#!/usr/bin/env python
2# Copyright (c) 2026 Flipdare Pty Ltd. All rights reserved.
3#
4# This file is part of Flipdare's proprietary software and contains
5# confidential and copyrighted material. Unauthorised copying,
6# modification, distribution, or use of this file is strictly
7# prohibited without prior written permission from Flipdare Pty Ltd.
8#
9# This software includes third-party components licensed under MIT,
10# BSD, and Apache 2.0 licences. See THIRD_PARTY_NOTICES for details.
11#
14import math
15from dataclasses import dataclass
17import numpy as np
18from scipy import stats
20from flipdare.app_log import LOG
21from flipdare.constants import (
22 BAYESIAN_CDF_THRESHOLD,
23 BAYESIAN_MAJORITY_THRESHOLD,
24 BAYESIAN_MINORITY_THRESHOLD,
25 IS_DEBUG,
26 KEMENY_APPROVAL_THRESHOLD,
27 KEMENY_MIN_PERCENT_PENDING,
28 KEMENY_MIN_VOTES,
29 KEMENY_PENDING_WEIGHT,
30 KEMENY_REJECTION_THRESHOLD,
31 SPRT_FALSE_NEGATIVE_RATE,
32 SPRT_FALSE_POSITIVE_RATE,
33 SPRT_MIN_VOTES,
34 SPRT_SHOULD_APPROVE_PROBABILITY,
35 SPRT_SHOULD_REJECT_PROBABILITY,
36 THOMPSON_CONFIDENCE,
37 THOMPSON_SAMPLING_ITERATIONS,
38 THOMPSON_THRESHOLD,
39 WILSON_CONFIDENCE_INTERVAL,
40)
41from flipdare.generated.shared.model.dare.ballot_algorithm_type import BallotAlgorithmType
42from flipdare.generated.shared.model.dare.ballot_result import BallotResult
44__all__ = ["Ballot", "BallotOutcome", "VoteTally"]
47rng = np.random.default_rng()
50@dataclass(frozen=True, kw_only=True)
51class BallotOutcome:
52 result: BallotResult
53 algorithm: BallotAlgorithmType | None = None
56@dataclass(frozen=True, kw_only=True)
57class VoteTally:
58 accepted: int = 0
59 rejected: int = 0
60 undecided: int = 0
62 @property
63 def empty(self) -> bool:
64 return self.accepted == 0 and self.rejected == 0 and self.undecided == 0
67class Ballot:
69 __slots__ = ("_min_kemeny_pending_percent", "_tally")
71 def __init__(
72 self,
73 tally: VoteTally,
74 kemeny_min_percent_pending: float = KEMENY_MIN_PERCENT_PENDING,
75 ) -> None:
76 self._tally = tally
77 self._min_kemeny_pending_percent = kemeny_min_percent_pending
79 @property
80 def accepted(self) -> int:
81 return self._tally.accepted
83 @property
84 def rejected(self) -> int:
85 return self._tally.rejected
87 @property
88 def undecided(self) -> int:
89 return self._tally.undecided
91 @property
92 def kemeny_min_percent_pending(self) -> float:
93 return self._min_kemeny_pending_percent
95 def count_votes(self, within_voting_period: bool) -> BallotOutcome:
96 outcome = self._manual_count()
97 result = outcome.result
99 if result.is_accepted:
100 if IS_DEBUG:
101 msg = f"Manual count decision for {self.stats_str()} - APPROVED: {outcome!s}"
102 LOG().debug(msg)
103 return outcome
105 if result.is_rejected and not within_voting_period:
106 if IS_DEBUG:
107 msg = f"Manual count decision for {self.stats_str()} - REJECTED: {outcome!s}"
108 LOG().debug(msg)
109 return outcome
111 # dont use auto algorithms until the votin period expires
112 if within_voting_period:
113 outcome = BallotOutcome(result=BallotResult.NOT_ENOUGH_VOTES)
115 if IS_DEBUG:
116 msg = f"Decision {self.stats_str()} - in-conclusive (but within voting period) - {outcome!s}"
117 LOG().debug(msg)
119 return outcome
121 # we want to wait until the end of the voting period to reject,
122 # since there may be pending votes that could change the outcome
123 if IS_DEBUG:
124 msg = f"Outside voting period, using auto- counts for {self.stats_str()} - {outcome!s}"
125 LOG().debug(msg)
127 # if we get here, it means either the result is a tie,
128 # not enough votes for a manual count,
129 # or it's a rejection within the voting period
130 outcome = self._auto_count()
131 result = outcome.result
133 # if we get here, we are outside the voting period with no clear decision
134 # so we have to rely on the statistical algorithms to make a final decision
135 # based on the expressed confidence intervals.
137 if result.is_auto_accepted or result.is_auto_rejected:
138 if IS_DEBUG:
139 msg = f"Auto decision for {self.stats_str()} - {result.value.upper()}: {outcome!s}"
140 LOG().debug(msg)
141 return outcome
143 # mark as expired.
144 if IS_DEBUG:
145 msg = f"Auto decision for {self.stats_str()} - EXPIRED: {outcome!s}"
146 LOG().debug(msg)
148 return BallotOutcome(result=BallotResult.EXPIRED, algorithm=outcome.algorithm)
150 def _manual_count(self) -> BallotOutcome:
151 undecided = self.undecided
152 accepted = self.accepted
153 rejected = self.rejected
155 if undecided > 0:
156 if IS_DEBUG:
157 LOG().debug(f"Cannot decide yet, {undecided} undecided votes remain")
159 return BallotOutcome(result=BallotResult.NOT_ENOUGH_VOTES)
161 result: BallotResult
162 if accepted == 0 and rejected == 0:
163 result = BallotResult.NOT_ENOUGH_VOTES
164 elif accepted == rejected:
165 result = BallotResult.TIE
166 elif accepted > rejected:
167 result = BallotResult.ACCEPTED
168 else:
169 result = BallotResult.REJECTED
171 if IS_DEBUG:
172 LOG().debug(f"Manual Vote Decision: {result} for {self.stats_str()}")
174 return BallotOutcome(result=result, algorithm=BallotAlgorithmType.VOTE_TOTALS)
176 def _auto_count(self) -> BallotOutcome:
177 undecided = self.undecided
178 accepted = self.accepted
179 rejected = self.rejected
180 if undecided == 0 and accepted == 0 and rejected == 0:
181 return BallotOutcome(result=BallotResult.NOT_ENOUGH_VOTES)
183 can_approve: bool | None = None
184 algorithm_type: BallotAlgorithmType | None = None
186 # NOTE: this code generates too many false positives.
187 # NOTE: and needs to be investigated further.
188 # auto_approve = self._auto_approve_with_kemeny_young()
189 # if auto_approve is not None:
190 # decision_type = VoteDecisionType.KEMENY_YOUNG
191 # else:
192 # auto_approve = self._auto_approve_with_thompson_sampling()
193 # if auto_approve is not None:
194 # decision_type = VoteDecisionType.THOMPSON_SAMPLING
196 total = accepted + rejected + undecided
197 pending_percent = undecided / total
198 kemeny_min = self.kemeny_min_percent_pending
200 if IS_DEBUG:
201 msg = f"Auto-count pending percent {pending_percent:.2%} with Kemeny threshold {kemeny_min:.2%}"
202 LOG().debug(msg)
203 if pending_percent > kemeny_min:
204 if IS_DEBUG:
205 LOG().debug("High pending votes, using Kemeny-Young method for auto-decision")
207 can_approve = self._auto_approve_with_kemeny_young()
208 algorithm_type = BallotAlgorithmType.KEMENY_YOUNG
209 else:
210 if IS_DEBUG:
211 LOG().debug("Low pending votes, using Thompson Sampling method for auto-decision")
213 can_approve = self._auto_approve_with_thompson_sampling()
214 algorithm_type = BallotAlgorithmType.THOMPSON_SAMPLING
216 if can_approve is None:
217 return BallotOutcome(result=BallotResult.TIE, algorithm=algorithm_type)
219 result = BallotResult.AUTO_ACCEPTED if can_approve else BallotResult.AUTO_REJECTED
220 return BallotOutcome(result=result, algorithm=algorithm_type)
222 def _auto_approve_with_sprt_robust(
223 self,
224 false_positive_rate: float = SPRT_FALSE_POSITIVE_RATE,
225 false_negative_rate: float = SPRT_FALSE_NEGATIVE_RATE,
226 min_votes: int = SPRT_MIN_VOTES,
227 ) -> bool | None: # pragma: no cover
228 """
229 SPRT with explicit control over error rates
230 """
231 approved = self.accepted
232 rejected = self.rejected
233 n = approved + rejected
235 if n < min_votes:
236 return None
238 # Hypotheses: p0 = 0.4 (should reject), p1 = 0.6 (should approve)
239 p0, p1 = SPRT_SHOULD_REJECT_PROBABILITY, SPRT_SHOULD_APPROVE_PROBABILITY
241 # SPRT thresholds
242 A = false_negative_rate / (1 - false_positive_rate) # noqa: N806
243 B = (1 - false_negative_rate) / false_positive_rate # noqa: N806
245 # Log-likelihood ratio
246 llr = approved * math.log(p1 / p0) + rejected * math.log((1 - p1) / (1 - p0))
248 if llr >= math.log(B):
249 return True
250 if llr <= math.log(A):
251 return False
253 return None
255 def _auto_approve_with_bayesian_decision(
256 self,
257 prior_alpha: int = 1,
258 prior_beta: int = 1,
259 ) -> bool | None: # pragma: no cover
260 """
261 Bayesian inference with Beta-Binomial model
262 """
263 approved = self.accepted
264 rejected = self.rejected
265 n = approved + rejected
266 if n <= 0:
267 return None
269 # Posterior distribution
270 posterior_alpha = prior_alpha + approved
271 posterior_beta = prior_beta + rejected
273 # Probability that approval rate > 0.45
274 # NOTE: Using 0.45 threshold to account for pending votes
275 prob_majority = 1 - stats.beta.cdf(
276 BAYESIAN_CDF_THRESHOLD,
277 posterior_alpha,
278 posterior_beta,
279 )
281 # Decision thresholds
282 if prob_majority > BAYESIAN_MAJORITY_THRESHOLD: # Very confident it will pass
283 return True
284 if prob_majority < BAYESIAN_MINORITY_THRESHOLD: # Very confident it will fail
285 return False
287 return None
289 def _auto_approve_with_thompson_sampling(
290 self,
291 n_samples: int = THOMPSON_SAMPLING_ITERATIONS,
292 threshold: float = THOMPSON_THRESHOLD,
293 confidence: float = THOMPSON_CONFIDENCE,
294 ) -> bool | None:
295 """
296 Thompson Sampling approach - samples from posterior distribution
297 """
298 approved = self.accepted
299 rejected = self.rejected
301 if approved + rejected == 0:
302 return None
304 # Sample from Beta posterior
305 prior_alpha, prior_beta = 1, 1
306 posterior_samples = rng.beta(approved + prior_alpha, rejected + prior_beta, n_samples)
308 # Probability that true approval rate > threshold
309 prob_approve = np.mean(posterior_samples > threshold)
311 if prob_approve >= confidence:
312 if IS_DEBUG:
313 msg = f"Thompson Sampling: Approve with prob {prob_approve}:\n\t{self.stats_str()}"
314 LOG().debug(msg)
315 return True
316 if prob_approve <= (1 - confidence):
317 if IS_DEBUG:
318 msg = f"Thompson Sampling: Reject with prob {1 - prob_approve}:\n\t{self.stats_str()}"
319 LOG().debug(msg)
320 return False
322 if IS_DEBUG:
323 msg = f"Thompson Sampling: No decision with prob {prob_approve}:\n\t{self.stats_str()}"
324 LOG().debug(msg)
325 return None
327 def _auto_approve_with_kemeny_young(
328 self,
329 pending_weight: float = KEMENY_PENDING_WEIGHT,
330 ) -> bool | None:
331 """
332 Kemeny-Young ranking considering pending votes as partial preferences
333 NOTE: This is a simplified adaptation for binary decisions
334 NOTE: The lower the pending_weight, the more pending votes count towards rejection
335 NOTE: e.g. 1,3,2 will return False if pending_weight=0.45, but None if pending_weight=0.5
336 """
337 approved = self.accepted
338 rejected = self.rejected
339 undecided = self.undecided
341 # Weight undecided votes as partial support
342 effective_approved = approved + (undecided * pending_weight)
343 effective_rejected = rejected + (undecided * (1 - pending_weight))
345 total = effective_approved + effective_rejected
347 if total < KEMENY_MIN_VOTES: # Need minimum votes
348 if IS_DEBUG:
349 LOG().debug(f"Kemeny-Young: Not enough votes:\n\t{self.stats_str()}")
350 return None
352 approval_score = effective_approved / total
354 # Use stricter thresholds since we're estimating
355 if approval_score > KEMENY_APPROVAL_THRESHOLD:
356 if IS_DEBUG:
357 msg = f"Kemeny-Young: Approve with score {approval_score}:\n\t{self.stats_str()}"
358 LOG().debug(msg)
359 return True
360 if approval_score < KEMENY_REJECTION_THRESHOLD:
361 if IS_DEBUG:
362 msg = f"Kemeny-Young: Reject with score {approval_score}:\n\t{self.stats_str()}"
363 LOG().debug(msg)
364 return False
366 if IS_DEBUG:
367 msg = f"Kemeny-Young: No decision with score {approval_score}:\n\t{self.stats_str()}"
368 LOG().debug(msg)
369 return None
371 def wilson_score_interval(
372 self,
373 confidence_level: float = WILSON_CONFIDENCE_INTERVAL,
374 ) -> tuple[float, float]: # pragma: no cover
375 """
376 Calculate Wilson score confidence interval for approval proportion
377 """
378 # FIXME: this could be replaced with from statsmodels.stats.proportion import proportion_confint
379 # lower, upper = proportion_confint(self.accepted, n, alpha=1-confidence, method='wilson')
381 pos = self.accepted
382 neg = self.rejected
384 n = pos + neg
385 if n <= 0:
386 return 0.0, 0.0
388 p = pos / n
390 # Z-score for the given confidence level
391 z = stats.norm.ppf(1 - (1 - confidence_level) / 2)
392 z_sq = z**2
394 # 3. Wilson Score Formula components
395 denominator = 1 + z_sq / n
396 center = (p + z_sq / (2 * n)) / denominator
397 spread = z * ((p * (1 - p) / n + z_sq / (4 * n**2)) ** 0.5) / denominator
399 return max(0.0, center - spread), min(1.0, center + spread)
401 def stats_str(self) -> str:
402 total = self.accepted + self.rejected + self.undecided
403 pending_percent = self.undecided / total
405 return (
406 f"[Total: {total}, Approved: {self.accepted}, "
407 f"Rejected: {self.rejected}, Undecided: {self.undecided}, "
408 f"Undecided%: {pending_percent}]"
409 )
412# OLD CODE - DEPRECATED
413#
414# **** This returns too many None results, not useful ****
415# def auto_approve_with_entropy_threshold(self, max_entropy=0.5,
416# approval_threshold=0.5):
417# """
418# Only decide when uncertainty (entropy) is low enough
419# """
420# approved = self.approved
421# rejected = self.rejected
422# n = approved + rejected
423#
424# if n == 0:
425# return None
426#
427# p = (approved + 1) / (n + 2) # Laplace smoothing
428#
429# # Shannon entropy of the outcome
430# if p == 0 or p == 1:
431# entropy = 0
432# else:
433# entropy = -p * math.log2(p) - (1-p) * math.log2(1-p)
434#
435# # Only decide if uncertainty is low AND result is clear
436# if entropy > max_entropy:
437# return None # Too uncertain
438#
439# if p > approval_threshold:
440# return True
441# elif p < (1 - approval_threshold):
442# return False
443#
444# return None
445#
446# **** This returns too many None results, not useful ****
447# def auto_approve_with_confidence_intervals(self) -> bool | None:
448# """
449# Use Wilson score interval for proportion confidence
450# """
451# approved = self.approved
452# rejected = self.rejected
453#
454# n = approved + rejected
455# if n <= 0:
456# return None
457#
458# p = approved / n
459#
460# # Wilson score 90% confidence interval
461# z = 1.645 # 90% confidence
462# denominator = 1 + z**2 / n
463# center = (p + z**2 / (2 * n)) / denominator
464# margin = z * ((p * (1 - p) / n + z**2 / (4 * n**2)) ** 0.5) / denominator
465#
466# ci_lower = center - margin
467# ci_upper = center + margin
468#
469# # Auto-approve if lower bound > threshold
470# if ci_lower > 0.65:
471# return True
472# # Auto-reject if upper bound < threshold
473# elif ci_upper < 0.35:
474# return False
475#
476# return None # Too uncertain