Skip to main content

saluki_core/data_model/event/metric/value/
histogram.rs

1use std::{fmt, num::NonZeroU64};
2
3use ordered_float::OrderedFloat;
4use smallvec::SmallVec;
5
6use super::{SampleRate, TimestampedValue, TimestampedValues};
7
8/// A weighted sample.
9///
10/// A weighted sample contains a value and a weight. The weight corresponds to the sample rate of the value, where the
11/// value can be considered to be responsible for `weight` samples overall. For example, if a metric was emitted 10
12/// times with the same value, you could instead emit it once with a sample rate of 10%, which would be equivalent to a
13/// weight of 10 (1/0.1).
14#[derive(Clone, Debug, Eq, PartialEq)]
15pub struct WeightedSample {
16    /// The sample value.
17    pub value: OrderedFloat<f64>,
18
19    /// The sample weight.
20    pub weight: OrderedFloat<f64>,
21}
22
23/// A basic histogram.
24#[derive(Clone, Debug, Default, Eq, PartialEq)]
25pub struct Histogram {
26    samples: SmallVec<[WeightedSample; 3]>,
27    /// Weight shared by every sample so far; `0.0` means no samples have been inserted yet.
28    shared_weight: OrderedFloat<f64>,
29    /// Set to `true` on the first insertion whose weight differs from [`shared_weight`].
30    weights_vary: bool,
31}
32
33impl Histogram {
34    /// Insert a sample into the histogram.
35    pub fn insert(&mut self, value: f64, sample_rate: SampleRate) {
36        let weight = OrderedFloat(sample_rate.raw_weight());
37        if self.shared_weight == OrderedFloat(0.0) {
38            self.shared_weight = weight;
39        } else if weight != self.shared_weight {
40            self.weights_vary = true;
41        }
42        self.samples.push(WeightedSample {
43            value: OrderedFloat(value),
44            weight,
45        });
46    }
47
48    /// Returns a summary view over the histogram.
49    ///
50    /// The summary view contains basic aggregates (sum, count, min, max, etc) and allows querying the histogram for
51    /// various quantiles.
52    pub fn summary_view(&mut self) -> HistogramSummary<'_> {
53        // Sort the samples by their value, lowest to highest.
54        //
55        // This is required as we depend on it as an invariant in `HistogramSummary` to quickly answer aggregates like
56        // minimum and maximum, as well as quantile queries.
57        self.samples.sort_unstable_by_key(|sample| sample.value);
58
59        // Compute count and sum in a single pass using compensated (Neumaier) summation.
60        // Four accumulators: (count_s, count_c) for the weight total and (sum_s, sum_c) for the
61        // value total, keeping one correction term per accumulator.
62        let (count, sum) = if self.weights_vary {
63            // Varying weights: accumulate value * weight for sum, weight for count.
64            let (cs, cc, ss, sc) =
65                self.samples
66                    .iter()
67                    .fold((0.0_f64, 0.0_f64, 0.0_f64, 0.0_f64), |(cs, cc, ss, sc), sample| {
68                        let (cs, cc) = neumaier_add(cs, cc, sample.weight.0);
69                        let (ss, sc) = neumaier_add(ss, sc, sample.value.0 * sample.weight.0);
70                        (cs, cc, ss, sc)
71                    });
72            (cs + cc, ss + sc)
73        } else {
74            // Uniform weights: accumulate raw values for sum (scaled once at the end), weight for count.
75            let (cs, cc, ss, sc) =
76                self.samples
77                    .iter()
78                    .fold((0.0_f64, 0.0_f64, 0.0_f64, 0.0_f64), |(cs, cc, ss, sc), sample| {
79                        let (cs, cc) = neumaier_add(cs, cc, sample.weight.0);
80                        let (ss, sc) = neumaier_add(ss, sc, sample.value.0);
81                        (cs, cc, ss, sc)
82                    });
83            (cs + cc, (ss + sc) * self.shared_weight.0)
84        };
85
86        HistogramSummary {
87            histogram: self,
88            count,
89            sum,
90        }
91    }
92
93    /// Merges another histogram into this one.
94    pub fn merge(&mut self, other: &mut Histogram) {
95        if !self.weights_vary {
96            if other.weights_vary {
97                self.weights_vary = true;
98            } else if self.shared_weight == OrderedFloat(0.0) {
99                self.shared_weight = other.shared_weight;
100            } else if other.shared_weight != OrderedFloat(0.0) && self.shared_weight != other.shared_weight {
101                self.weights_vary = true;
102            }
103        }
104        self.samples.extend(other.samples.drain(..));
105    }
106
107    /// Returns the raw weighted samples of the histogram.
108    pub fn samples(&self) -> &[WeightedSample] {
109        &self.samples
110    }
111}
112
113/// Summary view over a [`Histogram`].
114pub struct HistogramSummary<'a> {
115    histogram: &'a Histogram,
116    count: f64,
117    sum: f64,
118}
119
120impl HistogramSummary<'_> {
121    /// Returns the number of samples in the histogram.
122    ///
123    /// This is adjusted by the weight of each sample, based on the sample rate given during insertion.
124    ///
125    /// The underlying weight accumulation uses compensated (Neumaier) summation over float weights,
126    /// and the result is rounded to the nearest integer. For standard sample rates whose reciprocals
127    /// are exact integers (for example, `0.1`, `0.25`, `0.5`, `1.0`) rounding has no effect; for
128    /// non-integer-reciprocal rates (for example, `0.21` → weight ≈ 4.762) it gives a closer approximation
129    /// than truncation.
130    pub fn count(&self) -> u64 {
131        self.count.round() as u64
132    }
133
134    /// Returns the sum of all samples in the histogram.
135    ///
136    /// This is adjusted by the weight of each sample, based on the sample rate given during insertion.
137    pub fn sum(&self) -> f64 {
138        self.sum
139    }
140
141    /// Returns the minimum value in the histogram.
142    ///
143    /// If the histogram is empty, `None` will be returned.
144    pub fn min(&self) -> Option<f64> {
145        // NOTE: Samples are sorted before `HistogramSummary` is created, so we know that the first value is the minimum.
146        self.histogram.samples.first().map(|sample| sample.value.0)
147    }
148
149    /// Returns the maximum value in the histogram.
150    ///
151    /// If the histogram is empty, `None` will be returned.
152    pub fn max(&self) -> Option<f64> {
153        // NOTE: Samples are sorted before `HistogramSummary` is created, so we know that the last value is the maximum.
154        self.histogram.samples.last().map(|sample| sample.value.0)
155    }
156
157    /// Returns the average value in the histogram.
158    pub fn avg(&self) -> f64 {
159        self.sum / self.count
160    }
161
162    /// Returns the median value in the histogram.
163    ///
164    /// If the histogram is empty, `None` will be returned.
165    pub fn median(&self) -> Option<f64> {
166        self.quantile(0.5)
167    }
168
169    /// Returns the given quantile in the histogram.
170    ///
171    /// If the histogram is empty, or if `quantile` is less than 0.0 or greater than 1.0, `None` will be returned.
172    pub fn quantile(&self, quantile: f64) -> Option<f64> {
173        if !(0.0..=1.0).contains(&quantile) {
174            return None;
175        }
176
177        // target is the cumulative weight threshold: walk samples until the running weight exceeds it.
178        let target = quantile * self.count - 0.01;
179
180        let mut ws = 0.0_f64;
181        let mut wc = 0.0_f64;
182        for sample in &self.histogram.samples {
183            (ws, wc) = neumaier_add(ws, wc, sample.weight.0);
184            if ws + wc > target {
185                return Some(sample.value.0);
186            }
187        }
188
189        None
190    }
191}
192
193/// A set of histogram points.
194///
195/// Used to represent the data points of histograms. Each data point is attached to an optional timestamp.
196///
197/// Histograms are conceptually similar to sketches, but hold all raw samples directly and thus have a slightly worse
198/// memory efficiency as the number of samples grows.
199#[derive(Clone, Debug, Eq, PartialEq)]
200pub struct HistogramPoints(TimestampedValues<Histogram, 1>);
201
202impl HistogramPoints {
203    pub(super) fn inner(&self) -> &TimestampedValues<Histogram, 1> {
204        &self.0
205    }
206
207    pub(super) fn inner_mut(&mut self) -> &mut TimestampedValues<Histogram, 1> {
208        &mut self.0
209    }
210
211    pub(super) fn drain_timestamped(&mut self) -> Self {
212        Self(self.0.drain_timestamped())
213    }
214
215    pub(super) fn split_at_timestamp(&mut self, timestamp: u64) -> Option<Self> {
216        self.0.split_at_timestamp(timestamp).map(Self)
217    }
218
219    /// Returns `true` if this set is empty.
220    pub fn is_empty(&self) -> bool {
221        self.0.values.is_empty()
222    }
223
224    /// Returns the number of points in this set.
225    pub fn len(&self) -> usize {
226        self.0.values.len()
227    }
228
229    /// Merges another set of points into this one.
230    ///
231    /// If a point with the same timestamp exists in both sets, the histograms will be merged together. Otherwise, the
232    /// points will appended to the end of the set.
233    pub fn merge(&mut self, other: Self) {
234        let mut needs_sort = false;
235        for mut other_value in other.0.values {
236            if let Some(existing_value) = self
237                .0
238                .values
239                .iter_mut()
240                .find(|value| value.timestamp == other_value.timestamp)
241            {
242                existing_value.value.merge(&mut other_value.value);
243            } else {
244                self.0.values.push(other_value);
245                needs_sort = true;
246            }
247        }
248
249        if needs_sort {
250            self.0.sort_by_timestamp();
251        }
252    }
253}
254
255impl From<Histogram> for HistogramPoints {
256    fn from(value: Histogram) -> Self {
257        Self(TimestampedValue::from(value).into())
258    }
259}
260
261impl From<f64> for HistogramPoints {
262    fn from(value: f64) -> Self {
263        let mut histogram = Histogram::default();
264        histogram.insert(value, SampleRate::unsampled());
265
266        Self(TimestampedValue::from(histogram).into())
267    }
268}
269
270impl From<(u64, f64)> for HistogramPoints {
271    fn from((ts, value): (u64, f64)) -> Self {
272        let mut histogram = Histogram::default();
273        histogram.insert(value, SampleRate::unsampled());
274
275        Self(TimestampedValue::from((ts, histogram)).into())
276    }
277}
278
279impl<const N: usize> From<[f64; N]> for HistogramPoints {
280    fn from(values: [f64; N]) -> Self {
281        let mut histogram = Histogram::default();
282        for value in values {
283            histogram.insert(value, SampleRate::unsampled());
284        }
285
286        Self(TimestampedValue::from(histogram).into())
287    }
288}
289
290impl<const N: usize> From<(u64, [f64; N])> for HistogramPoints {
291    fn from((ts, values): (u64, [f64; N])) -> Self {
292        let mut histogram = Histogram::default();
293        for value in values {
294            histogram.insert(value, SampleRate::unsampled());
295        }
296
297        Self(TimestampedValue::from((ts, histogram)).into())
298    }
299}
300
301impl<const N: usize> From<[(u64, f64); N]> for HistogramPoints {
302    fn from(values: [(u64, f64); N]) -> Self {
303        Self(
304            values
305                .into_iter()
306                .map(|(ts, value)| {
307                    let mut histogram = Histogram::default();
308                    histogram.insert(value, SampleRate::unsampled());
309
310                    (ts, histogram)
311                })
312                .into(),
313        )
314    }
315}
316
317impl<'a> From<&'a [f64]> for HistogramPoints {
318    fn from(values: &'a [f64]) -> Self {
319        let mut histogram = Histogram::default();
320        for value in values {
321            histogram.insert(*value, SampleRate::unsampled());
322        }
323
324        Self(TimestampedValue::from(histogram).into())
325    }
326}
327
328impl<'a, const N: usize> From<&'a [f64; N]> for HistogramPoints {
329    fn from(values: &'a [f64; N]) -> Self {
330        let mut histogram = Histogram::default();
331        for value in values {
332            histogram.insert(*value, SampleRate::unsampled());
333        }
334
335        Self(TimestampedValue::from(histogram).into())
336    }
337}
338
339impl IntoIterator for HistogramPoints {
340    type Item = (Option<NonZeroU64>, Histogram);
341    type IntoIter = HistogramIter;
342
343    fn into_iter(self) -> Self::IntoIter {
344        HistogramIter {
345            inner: self.0.values.into_iter(),
346        }
347    }
348}
349
350impl<'a> IntoIterator for &'a HistogramPoints {
351    type Item = (Option<NonZeroU64>, &'a Histogram);
352    type IntoIter = HistogramIterRef<'a>;
353
354    fn into_iter(self) -> Self::IntoIter {
355        HistogramIterRef {
356            inner: self.0.values.iter(),
357        }
358    }
359}
360
361impl<'a> IntoIterator for &'a mut HistogramPoints {
362    type Item = (Option<NonZeroU64>, &'a mut Histogram);
363    type IntoIter = HistogramIterRefMut<'a>;
364
365    fn into_iter(self) -> Self::IntoIter {
366        HistogramIterRefMut {
367            inner: self.0.values.iter_mut(),
368        }
369    }
370}
371
372impl fmt::Display for HistogramPoints {
373    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
374        write!(f, "[")?;
375        for (i, point) in self.0.values.iter().enumerate() {
376            if i > 0 {
377                write!(f, ",")?;
378            }
379
380            let ts = point.timestamp.map(|ts| ts.get()).unwrap_or_default();
381            write!(f, "({}, [", ts)?;
382            for (j, sample) in point.value.samples().iter().enumerate() {
383                if j > 0 {
384                    write!(f, ",")?;
385                }
386                write!(f, "{{{} * {}}}", sample.value, sample.weight)?;
387            }
388            write!(f, "])")?;
389        }
390        write!(f, "]")
391    }
392}
393
394pub struct HistogramIter {
395    inner: smallvec::IntoIter<[TimestampedValue<Histogram>; 1]>,
396}
397
398impl Iterator for HistogramIter {
399    type Item = (Option<NonZeroU64>, Histogram);
400
401    fn next(&mut self) -> Option<Self::Item> {
402        self.inner.next().map(|value| (value.timestamp, value.value))
403    }
404}
405
406pub struct HistogramIterRef<'a> {
407    inner: std::slice::Iter<'a, TimestampedValue<Histogram>>,
408}
409
410impl<'a> Iterator for HistogramIterRef<'a> {
411    type Item = (Option<NonZeroU64>, &'a Histogram);
412
413    fn next(&mut self) -> Option<Self::Item> {
414        self.inner.next().map(|value| (value.timestamp, &value.value))
415    }
416}
417
418pub struct HistogramIterRefMut<'a> {
419    inner: std::slice::IterMut<'a, TimestampedValue<Histogram>>,
420}
421
422impl<'a> Iterator for HistogramIterRefMut<'a> {
423    type Item = (Option<NonZeroU64>, &'a mut Histogram);
424
425    fn next(&mut self) -> Option<Self::Item> {
426        self.inner.next().map(|value| (value.timestamp, &mut value.value))
427    }
428}
429
430/// Performs a single Neumaier (compensated) addition step.
431///
432/// Returns the updated running sum `t` and the compensation term `c` that captures
433/// the rounding error lost when adding `x` to `s`.
434fn neumaier_add(s: f64, c: f64, x: f64) -> (f64, f64) {
435    let t = s + x;
436    let c = if s.abs() >= x.abs() {
437        c + ((s - t) + x)
438    } else {
439        c + ((x - t) + s)
440    };
441    (t, c)
442}
443
444#[cfg(test)]
445mod tests {
446    use super::*;
447
448    fn histogram_from_values(values: &[(f64, u64)]) -> Histogram {
449        let mut h = Histogram::default();
450        for &(value, weight) in values {
451            let weight = OrderedFloat(weight as f64);
452            if h.shared_weight == OrderedFloat(0.0) {
453                h.shared_weight = weight;
454            } else if weight != h.shared_weight {
455                h.weights_vary = true;
456            }
457            h.samples.push(WeightedSample {
458                value: OrderedFloat(value),
459                weight,
460            });
461        }
462        h
463    }
464
465    #[test]
466    fn compensated_sum_catastrophic_cancellation() {
467        // Naive summation: (1 + 1e100) + (1 - 1e100) = 0 due to float cancellation.
468        // Compensated summation must return 2.0.
469        let mut h = histogram_from_values(&[(1.0, 1), (1e100, 1), (1.0, 1), (-1e100, 1)]);
470        let view = h.summary_view();
471        assert_eq!(view.sum(), 2.0, "compensated sum should be 2.0, not 0.0");
472    }
473
474    #[test]
475    fn compensated_sum_empty() {
476        let mut h = Histogram::default();
477        let view = h.summary_view();
478        assert_eq!(view.sum(), 0.0);
479        assert_eq!(view.count(), 0);
480    }
481
482    #[test]
483    fn compensated_sum_uniform_weights_positives() {
484        let mut h = histogram_from_values(&[(1.0, 2), (2.0, 2), (3.0, 2)]);
485        let view = h.summary_view();
486        // sum = (1+2+3)*2 = 12, count = 6
487        assert_eq!(view.sum(), 12.0);
488        assert_eq!(view.count(), 6);
489    }
490
491    #[test]
492    fn compensated_sum_uniform_weights_negatives() {
493        let mut h = histogram_from_values(&[(-3.0, 1), (-2.0, 1), (-1.0, 1)]);
494        let view = h.summary_view();
495        assert_eq!(view.sum(), -6.0);
496        assert_eq!(view.count(), 3);
497    }
498
499    #[test]
500    fn compensated_sum_varying_weights() {
501        // Different weights trigger the fallback Neumaier path.
502        let mut h = histogram_from_values(&[(1.0, 1), (2.0, 2), (3.0, 4)]);
503        let view = h.summary_view();
504        // sum = 1*1 + 2*2 + 3*4 = 1 + 4 + 12 = 17, count = 7
505        assert_eq!(view.sum(), 17.0);
506        assert_eq!(view.count(), 7);
507    }
508
509    #[test]
510    fn compensated_sum_all_zeros() {
511        let mut h = histogram_from_values(&[(0.0, 1), (0.0, 1), (0.0, 1)]);
512        let view = h.summary_view();
513        assert_eq!(view.sum(), 0.0);
514        assert_eq!(view.count(), 3);
515    }
516
517    #[test]
518    fn compensated_sum_single_value() {
519        let mut h = histogram_from_values(&[(42.0, 5)]);
520        let view = h.summary_view();
521        assert_eq!(view.sum(), 210.0);
522        assert_eq!(view.count(), 5);
523    }
524}