Skip to main content

saluki_core/observability/metrics/
histogram.rs

1//! Fixed-bucket histogram aggregation for Prometheus exposition.
2
3use std::sync::LazyLock;
4
5use crate::data_model::event::metric::Histogram;
6
7const HISTOGRAM_BUCKET_COUNT: usize = 30;
8static TIME_HISTOGRAM_BUCKETS: LazyLock<[(f64, &'static str); HISTOGRAM_BUCKET_COUNT]> =
9    LazyLock::new(|| histogram_buckets::<HISTOGRAM_BUCKET_COUNT>(0.000000128, 4.0));
10static NON_TIME_HISTOGRAM_BUCKETS: LazyLock<[(f64, &'static str); HISTOGRAM_BUCKET_COUNT]> =
11    LazyLock::new(|| histogram_buckets::<HISTOGRAM_BUCKET_COUNT>(1.0, 2.0));
12
13/// An aggregated histogram with fixed buckets, suitable for Prometheus exposition.
14///
15/// Bucket layout follows a log-linear schedule. A metric name ending in `_seconds` selects the
16/// time-oriented bucket schedule; everything else uses the non-time schedule.
17#[derive(Clone, Debug)]
18pub struct AggregatedHistogram {
19    sum: f64,
20    count: u64,
21    buckets: Vec<(f64, &'static str, u64)>,
22}
23
24impl AggregatedHistogram {
25    /// Creates a new `AggregatedHistogram` for the given metric name.
26    ///
27    /// The metric name determines the bucket schedule: names ending in `_seconds` use a
28    /// time-oriented schedule, everything else uses a generic non-time schedule.
29    pub fn new(metric_name: &str) -> Self {
30        let base_buckets = if metric_name.ends_with("_seconds") {
31            &TIME_HISTOGRAM_BUCKETS[..]
32        } else {
33            &NON_TIME_HISTOGRAM_BUCKETS[..]
34        };
35
36        let buckets = base_buckets
37            .iter()
38            .map(|(upper_bound, upper_bound_str)| (*upper_bound, *upper_bound_str, 0))
39            .collect();
40
41        Self {
42            sum: 0.0,
43            count: 0,
44            buckets,
45        }
46    }
47
48    /// Merges another aggregated histogram into this one.
49    ///
50    /// The two histograms must have been constructed with the same bucket schedule (in practice,
51    /// this means they were constructed for the same metric name).
52    pub fn merge(&mut self, other: &AggregatedHistogram) {
53        self.sum += other.sum;
54        self.count += other.count;
55        for (dst, src) in self.buckets.iter_mut().zip(other.buckets.iter()) {
56            dst.2 += src.2;
57        }
58    }
59
60    /// Folds the samples from `histogram` into this aggregated histogram.
61    pub fn merge_histogram(&mut self, histogram: &Histogram) {
62        for sample in histogram.samples() {
63            self.add_sample(sample.value.into_inner(), sample.weight.0 as u64);
64        }
65    }
66
67    fn add_sample(&mut self, value: f64, weight: u64) {
68        self.sum += value * weight as f64;
69        self.count += weight;
70
71        for (upper_bound, _, count) in &mut self.buckets {
72            if value <= *upper_bound {
73                *count += weight;
74            }
75        }
76    }
77
78    /// Returns the running sum of all observed samples.
79    pub fn sum(&self) -> f64 {
80        self.sum
81    }
82
83    /// Returns the running count of all observed samples.
84    pub fn count(&self) -> u64 {
85        self.count
86    }
87
88    /// Returns an iterator over the buckets as `(upper_bound_str, cumulative_count)` pairs, in
89    /// ascending order by upper bound.
90    pub fn buckets(&self) -> impl Iterator<Item = (&'static str, u64)> + '_ {
91        self.buckets
92            .iter()
93            .map(|(_, upper_bound_str, count)| (*upper_bound_str, *count))
94    }
95}
96
97/// Generates a set of `N` log-linear histogram buckets.
98///
99/// The n-th pair of buckets is `(i, j)`, where `i = base * scale^n` and `j` is the midpoint between
100/// `i` and `base * scale^(n+1)`. For example, with `base=2` and `scale=4`, the sequence is `2, 5, 8,
101/// 20, 32, 80, 128, 320, 512, ...`.
102fn histogram_buckets<const N: usize>(base: f64, scale: f64) -> [(f64, &'static str); N] {
103    let mut buckets = [(0.0, ""); N];
104
105    let log_linear_buckets = std::iter::repeat(base).enumerate().flat_map(|(i, base)| {
106        let pow = scale.powf(i as f64);
107        let value = base * pow;
108
109        let next_pow = scale.powf((i + 1) as f64);
110        let next_value = base * next_pow;
111        let midpoint = (value + next_value) / 2.0;
112
113        [value, midpoint]
114    });
115
116    for (i, current_le) in log_linear_buckets.enumerate().take(N) {
117        let (bucket_le, bucket_le_str) = &mut buckets[i];
118        let current_le_str = format!("{}", current_le);
119
120        *bucket_le = current_le;
121        *bucket_le_str = current_le_str.leak();
122    }
123
124    buckets
125}