Coverage for functions \ flipdare \ analysis \ data_analysis.py: 87%
166 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-05-08 12:22 +1000
« prev ^ index » next coverage.py v7.13.0, created at 2026-05-08 12:22 +1000
1#!/usr/bin/env python
2# Copyright (c) 2026 Flipdare Pty Ltd. All rights reserved.
3#
4# This file is part of Flipdare's proprietary software and contains
5# confidential and copyrighted material. Unauthorised copying,
6# modification, distribution, or use of this file is strictly
7# prohibited without prior written permission from Flipdare Pty Ltd.
8#
9# This software includes third-party components licensed under MIT,
10# BSD, and Apache 2.0 licences. See THIRD_PARTY_NOTICES for details.
11#
13import math
14from dataclasses import dataclass
15from enum import Enum
16import numpy as np
17from scipy import stats
18from flipdare.app_types import AnalysisArrayType, NdFloatArrayType
19from flipdare.constants import (
20 APPROX_INFINITE,
21 MIN_ANALYSIS_COUNT,
22 STATS_IQR_MULTIPLIER,
23 STATS_Z_SCORE_THRESHOLD,
24)
25from flipdare.analysis.plotter import ScatterData
27__all__ = [
28 "DataAnalysis",
29 "DataAnalysisProps",
30 "OutlierAlgorithm",
31 "AnalysisResult",
32 "AnalysisEntry",
33]
36class OutlierAlgorithm(Enum):
37 Z_SCORE = "z_score"
38 IQR = "iqr"
40 @property
41 def label(self) -> str:
42 match self:
43 case OutlierAlgorithm.Z_SCORE:
44 return "Z-Score"
45 case OutlierAlgorithm.IQR:
46 return "IQR"
49@dataclass
50class DataAnalysisProps:
51 z_score_threshold: int = STATS_Z_SCORE_THRESHOLD
52 iqr_multiplier: float = STATS_IQR_MULTIPLIER
53 algorithm: OutlierAlgorithm = OutlierAlgorithm.IQR
56@dataclass
57class AnalysisEntry:
58 values: AnalysisArrayType
59 indicies: list[int]
61 @classmethod
62 def empty(cls) -> "AnalysisEntry":
63 return cls(values=[], indicies=[])
65 def __post_init__(self) -> None:
66 if len(self.values) != len(self.indicies):
67 msg = f"Inconsistent array lengths: {len(self.values)} values != {len(self.indicies)} indices."
68 raise ValueError(msg)
70 def scatter_data(self, label: str) -> ScatterData:
71 # Filter out both None values and their corresponding indices
72 # This ensures Matplotlib doesn't receive 'None' in a scatter plot
73 pairs = [(v, i) for v, i in zip(self.values, self.indicies, strict=True) if v is not None]
75 if not pairs:
76 # Handle the case where all values were None
77 return ScatterData(points=[], indices=[], label=label)
79 scatter_values, scatter_indices = zip(*pairs, strict=True)
80 return ScatterData(
81 points=list(scatter_values),
82 indices=list(scatter_indices),
83 label=label,
84 )
87@dataclass
88class AnalysisResult:
89 detect_type: OutlierAlgorithm
90 outliers: AnalysisEntry
91 valid: AnalysisEntry
93 @property
94 def has_outliers(self) -> bool:
95 # Check if there are any non-None outliers
96 return any(v is not None for v in self.outliers.values)
98 @property
99 def has_valid(self) -> bool:
100 return any(v is not None for v in self.valid.values)
102 @property
103 def notes(self) -> list[str]:
104 # Filter Nones before joining so the string doesn't say "[10.5, None, 12.0]"
105 outlier_list = [v for v in self.outliers.values if v is not None]
106 valid_list = [v for v in self.valid.values if v is not None]
108 outlier_str = ", ".join(map(str, outlier_list)) if outlier_list else "N/A"
109 valid_str = ", ".join(map(str, valid_list)) if valid_list else "N/A"
111 return [
112 f"Outliers: [{outlier_str}]",
113 f"Non Outliers: [{valid_str}]",
114 ]
116 @property
117 def outlier_scatter_data(self) -> ScatterData | None:
118 if not self.has_outliers:
119 return None
120 return self.outliers.scatter_data(label=f"Outliers ({self.detect_type.label})")
122 @property
123 def valid_scatter_data(self) -> ScatterData | None:
124 # Only return scatter data if there is at least one non-None valid value
125 if not self.has_valid:
126 return None
127 return self.valid.scatter_data(label=f"Non-Outliers ({self.detect_type.label})")
130class DataAnalysis:
131 __slots__ = ("_props", "_values")
133 def __init__(
134 self,
135 values: AnalysisArrayType,
136 props: DataAnalysisProps | None = None,
137 ) -> None:
138 if props is None:
139 props = DataAnalysisProps()
141 self._values = values
142 self._props = props
144 @property
145 def z_score_threshold(self) -> int:
146 return self._props.z_score_threshold
148 @property
149 def iqr_multiplier(self) -> float:
150 return self._props.iqr_multiplier
152 @property
153 def outlier_type(self) -> OutlierAlgorithm:
154 return self._props.algorithm
156 @property
157 def values(self) -> AnalysisArrayType:
158 return self._values
160 @property
161 def z_scores_formatted(self) -> list[float] | None:
162 """Get a pretty-printed list of z-scores."""
163 zscores = self.z_scores
164 if zscores is None:
165 return None
167 entries = zscores.tolist()
168 if len(entries) == 0:
169 return None
171 scores: list[float] = []
172 for score in entries:
173 # check for nan
174 if math.isnan(score):
175 scores.append(0.0)
176 else:
177 scores.append(round(float(score), 2))
178 return scores
180 @property
181 def z_score_outliers(self) -> AnalysisResult | None:
182 # 1. Filter out Nones but keep track of original indices
183 indexed_data = [(i, v) for i, v in enumerate(self.values) if v is not None]
184 if len(indexed_data) < MIN_ANALYSIS_COUNT:
185 return None
187 indices = [x[0] for x in indexed_data]
188 clean_values = [x[1] for x in indexed_data]
190 # 2. Calculate Z-scores on clean data
191 std_dev = np.std(clean_values, ddof=1)
192 if std_dev < APPROX_INFINITE:
193 z_scores = np.zeros(len(clean_values))
194 else:
195 z_scores = np.abs(stats.zscore(clean_values, ddof=1))
197 threshold = self.z_score_threshold
199 # 3. Map results back using our 'indices' map
200 outlier_values: AnalysisArrayType = []
201 outlier_indices: list[int] = []
202 valid_values: AnalysisArrayType = []
203 valid_indices: list[int] = []
205 for i, score in enumerate(z_scores):
206 orig_idx = indices[i]
207 val = clean_values[i]
208 if score > threshold:
209 outlier_values.append(val)
210 outlier_indices.append(orig_idx)
211 else:
212 valid_values.append(val)
213 valid_indices.append(orig_idx)
215 return AnalysisResult(
216 detect_type=OutlierAlgorithm.Z_SCORE,
217 outliers=AnalysisEntry(values=outlier_values, indicies=outlier_indices),
218 valid=AnalysisEntry(values=valid_values, indicies=valid_indices),
219 )
221 @property
222 def interquartile_outliers(self) -> AnalysisResult | None:
223 indexed_data = [(i, v) for i, v in enumerate(self.values) if v is not None]
224 if len(indexed_data) < MIN_ANALYSIS_COUNT:
225 return None
227 indices = [x[0] for x in indexed_data]
228 data = np.array([x[1] for x in indexed_data])
230 q1, q3 = np.percentile(data, [25, 75])
231 iqr = q3 - q1
232 lower_bound = q1 - (self.iqr_multiplier * iqr)
233 upper_bound = q3 + (self.iqr_multiplier * iqr)
235 # Identify indices relative to the 'data' array
236 is_outlier = (data < lower_bound) | (data > upper_bound)
238 # Map those back to original indices
239 outliers_indices = [indices[i] for i, outlier in enumerate(is_outlier) if outlier]
240 outliers_values = data[is_outlier].tolist()
242 valid_indices = [indices[i] for i, outlier in enumerate(is_outlier) if not outlier]
243 valid_values = data[~is_outlier].tolist()
245 return AnalysisResult(
246 detect_type=OutlierAlgorithm.IQR,
247 outliers=AnalysisEntry(values=outliers_values, indicies=outliers_indices),
248 valid=AnalysisEntry(values=valid_values, indicies=valid_indices),
249 )
251 @property
252 def z_scores(self) -> NdFloatArrayType | None:
253 """Calculate the z-score for the values, correctly handling None/NaN."""
254 values = self.values
255 if len(values) < MIN_ANALYSIS_COUNT:
256 return None
258 # Convert to numpy array (None becomes np.nan)
259 data = np.array(values, dtype=float)
261 # Calculate standard deviation while ignoring NaNs
262 std_dev = np.nanstd(data, ddof=1)
264 if std_dev < APPROX_INFINITE:
265 # Match original length, setting zeros where data was valid
266 result = np.zeros(len(data))
267 result[np.isnan(data)] = np.nan
268 return result
270 # nan_policy='omit' ignores NaNs for stats but keeps the array shape
271 return np.abs(stats.zscore(data, ddof=1, nan_policy="omit"))
273 def analyze(self) -> AnalysisResult:
274 """Determine if the values contain outliers based on z-score threshold."""
275 result: AnalysisResult | None = None
277 match self.outlier_type:
278 case OutlierAlgorithm.IQR:
279 result = self.interquartile_outliers
280 case OutlierAlgorithm.Z_SCORE:
281 result = self.z_score_outliers
283 if result is not None:
284 return result
286 return AnalysisResult(
287 detect_type=self.outlier_type,
288 outliers=AnalysisEntry.empty(),
289 valid=AnalysisEntry(
290 values=self.values,
291 indicies=list(range(len(self.values))),
292 ),
293 )