Skip to main content

saluki_core/observability/metrics/
aggregated.rs

1//! Aggregated metrics state.
2//!
3//! Builds on the [`reflector`][super::reflector] machinery to maintain a long-lived view of every
4//! internal metric, keyed by metric context. Counters are summed, gauges keep the latest value by
5//! timestamp, and histograms accumulate into fixed-bucket [`AggregatedHistogram`]s.
6
7use std::sync::Arc;
8
9use futures::stream::StreamExt as _;
10use papaya::HashMap;
11use saluki_context::Context;
12use tokio::sync::OnceCell;
13
14use super::histogram::AggregatedHistogram;
15use super::reflector::{Processor, Reflector};
16use super::MetricsStream;
17use crate::data_model::event::{metric::MetricValues, Event};
18
19/// Aggregated metric value.
20#[derive(Clone, Debug)]
21pub enum AggregatedMetricValue {
22    /// A counter.
23    Counter(f64),
24
25    /// A gauge.
26    Gauge(f64),
27
28    /// A histogram with fixed buckets.
29    Histogram(AggregatedHistogram),
30}
31
32impl AggregatedMetricValue {
33    /// Returns the scalar value of the metric, if this is a counter or gauge.
34    ///
35    /// Returns `0.0` for histograms.
36    pub fn value(&self) -> f64 {
37        match self {
38            AggregatedMetricValue::Counter(value) => *value,
39            AggregatedMetricValue::Gauge(value) => *value,
40            AggregatedMetricValue::Histogram(_) => 0.0,
41        }
42    }
43
44    /// Merges two aggregated metric values.
45    ///
46    /// Counters are summed, gauges use last-write-wins (the incoming value is kept), histograms are merged per-bucket and
47    /// by sum/count.
48    pub fn merge(&mut self, incoming: &AggregatedMetricValue) {
49        match (self, incoming) {
50            (Self::Counter(a), Self::Counter(b)) => {
51                *a += *b;
52            }
53            (Self::Histogram(a), Self::Histogram(b)) => {
54                a.merge(b);
55            }
56            (Self::Gauge(a), Self::Gauge(b)) => *a = *b,
57            // When the existing and incoming type _don't_ match, take the incoming value.
58            (existing, incoming) => *existing = incoming.clone(),
59        }
60    }
61}
62
63#[derive(Clone)]
64pub(crate) struct AggregatedMetric {
65    pub(crate) timestamp: Option<u64>,
66    pub(crate) value: AggregatedMetricValue,
67}
68
69impl AggregatedMetric {
70    fn counter(value: f64) -> Self {
71        Self {
72            timestamp: None,
73            value: AggregatedMetricValue::Counter(value),
74        }
75    }
76
77    fn gauge(timestamp: u64, value: f64) -> Self {
78        Self {
79            timestamp: Some(timestamp),
80            value: AggregatedMetricValue::Gauge(value),
81        }
82    }
83
84    fn histogram(histogram: AggregatedHistogram) -> Self {
85        Self {
86            timestamp: None,
87            value: AggregatedMetricValue::Histogram(histogram),
88        }
89    }
90
91    fn merge(&self, other: Self) -> Self {
92        match (&self.value, other.value) {
93            (AggregatedMetricValue::Counter(a), AggregatedMetricValue::Counter(b)) => Self {
94                timestamp: None,
95                value: AggregatedMetricValue::Counter(a + b),
96            },
97            (AggregatedMetricValue::Gauge(a), AggregatedMetricValue::Gauge(b)) => {
98                let ts_a = self.timestamp.unwrap_or(0);
99                let ts_b = other.timestamp.unwrap_or(0);
100                let (new_ts, new_value) = if ts_a > ts_b { (ts_a, *a) } else { (ts_b, b) };
101
102                Self {
103                    timestamp: Some(new_ts),
104                    value: AggregatedMetricValue::Gauge(new_value),
105                }
106            }
107            (AggregatedMetricValue::Histogram(a), AggregatedMetricValue::Histogram(b)) => {
108                let mut merged = a.clone();
109                merged.merge(&b);
110                Self {
111                    timestamp: None,
112                    value: AggregatedMetricValue::Histogram(merged),
113                }
114            }
115            (_, other_value) => Self {
116                timestamp: other.timestamp,
117                value: other_value,
118            },
119        }
120    }
121}
122
123struct Inner {
124    metrics: HashMap<Context, AggregatedMetric>,
125}
126
127/// Aggregated metrics state.
128pub struct AggregatedMetricsState {
129    inner: Arc<Inner>,
130}
131
132impl AggregatedMetricsState {
133    /// Visits all metrics.
134    pub fn visit_metrics<F>(&self, mut visitor: F)
135    where
136        F: FnMut(&Context, &AggregatedMetricValue),
137    {
138        self.inner
139            .metrics
140            .pin()
141            .iter()
142            .for_each(|(context, value)| visitor(context, &value.value));
143    }
144
145    /// Searches the state for a single metric with a matching name and tags.
146    ///
147    /// The latest value for the metric is returned. Histograms are skipped.
148    ///
149    /// If no metric is found with a matching name and tags, or if multiple metrics are found with a matching name and
150    /// tags, `None` is returned. Tags are matched on a partial basis: a metric must at least have the tags provided,
151    /// but may have additional tags as well.
152    pub fn find_single_with_tags(&self, name: &str, tags: &[&str]) -> Option<f64> {
153        let mut had_existing = false;
154        let mut maybe_metric = None;
155
156        self.visit_metrics(|context, value| {
157            if context.name() == name {
158                for tag in tags {
159                    if !context.tags().has_tag(tag) {
160                        return;
161                    }
162                }
163
164                match value {
165                    AggregatedMetricValue::Counter(v) | AggregatedMetricValue::Gauge(v) => {
166                        had_existing = maybe_metric.is_some();
167                        maybe_metric = Some(*v);
168                    }
169                    AggregatedMetricValue::Histogram(_) => {}
170                }
171            }
172        });
173
174        if had_existing {
175            None
176        } else {
177            maybe_metric
178        }
179    }
180
181    /// Searches the state for all counter metrics with a matching name and returns their aggregated value.
182    ///
183    /// This method allows rolling up counter metrics that share a common name. For example, a metric may be emitted
184    /// with the same name, but N different values for a specific tag, potentially as a way to break down the metric by
185    /// a certain facet. This method can allow re-aggregating the value of each different facet value into a single
186    /// value.
187    ///
188    /// If multiple metrics are found with a matching name, but they're not all counter metrics, or if no metrics are
189    /// found with a matching name, `0.0` is returned.
190    pub fn get_aggregated_with_tags(&self, name: &str, tags: &[&str]) -> f64 {
191        let mut total = 0.0;
192
193        self.visit_metrics(|context, value| {
194            if context.name() == name {
195                for tag in tags {
196                    if !context.tags().has_tag(tag) {
197                        return;
198                    }
199                }
200
201                if let AggregatedMetricValue::Counter(value) = value {
202                    total += *value;
203                }
204            }
205        });
206
207        total
208    }
209}
210
211/// Aggregated metrics processor.
212///
213/// This processor maintains a map of aggregated metrics, where each metric is represented by its context (name and
214/// tags), and the aggregated value of the metric. Counters, gauges, and histograms are supported; all other metric
215/// types are ignored.
216///
217/// Aggregation follows the following rules:
218///
219/// - Counters are summed together.
220/// - Gauges are represented by the latest value (by timestamp).
221/// - Histograms are merged: per-bucket counts, sum, and total count are summed.
222///
223/// Aggregated values have no concept of "points" -- specific values at a specific timestamp -- or multiple data
224/// points: each metric simply has a single value that represents the aggregated amount since the processor was created.
225#[derive(Clone)]
226pub struct AggregatedMetricsProcessor;
227
228impl Processor for AggregatedMetricsProcessor {
229    type Input = Event;
230    type State = AggregatedMetricsState;
231
232    fn build_initial_state(&self) -> Self::State {
233        AggregatedMetricsState {
234            inner: Arc::new(Inner {
235                metrics: HashMap::new(),
236            }),
237        }
238    }
239
240    fn process(&self, input: Self::Input, state: &Self::State) {
241        if let Some(metric) = input.try_into_metric() {
242            let (context, values, _) = metric.into_parts();
243            if let Some(agg_metric) = metric_values_to_aggregated(context.name(), values) {
244                state.inner.metrics.pin().update_or_insert_with(
245                    context,
246                    |existing| existing.merge(agg_metric.clone()),
247                    || agg_metric.clone(),
248                );
249            }
250        }
251    }
252}
253
254fn metric_values_to_aggregated(metric_name: &str, values: MetricValues) -> Option<AggregatedMetric> {
255    match values {
256        MetricValues::Counter(points) => {
257            // Extract all points and sum their values together.
258            let value = points.into_iter().map(|(_, value)| value).sum();
259            Some(AggregatedMetric::counter(value))
260        }
261        MetricValues::Gauge(points) => {
262            // Take the last point value, which will be the latest timestamped point, to represent the "current" value.
263            points
264                .into_iter()
265                .last()
266                .map(|(ts, value)| AggregatedMetric::gauge(ts.map(|ts| ts.get()).unwrap_or(0), value))
267        }
268        MetricValues::Histogram(points) => {
269            let mut aggregated = AggregatedHistogram::new(metric_name);
270            for (_, histogram) in points {
271                aggregated.merge_histogram(&histogram);
272            }
273            if aggregated.count() == 0 {
274                None
275            } else {
276                Some(AggregatedMetric::histogram(aggregated))
277            }
278        }
279        _ => None,
280    }
281}
282
283/// Gets the shared metrics state, which provides unified access to internal metrics in a simplified interface.
284///
285/// This is lazily initialized and will only be created when it's first accessed.
286pub async fn get_shared_metrics_state() -> Reflector<AggregatedMetricsProcessor> {
287    static REFLECTOR: OnceCell<Reflector<AggregatedMetricsProcessor>> = OnceCell::const_new();
288    REFLECTOR
289        .get_or_init(|| async {
290            let metrics_stream = MetricsStream::register().map(Arc::unwrap_or_clone);
291            Reflector::new(metrics_stream, AggregatedMetricsProcessor).await
292        })
293        .await
294        .clone()
295}
296
297#[cfg(test)]
298mod tests {
299    use super::*;
300    use crate::data_model::event::metric::Metric;
301
302    fn process_metrics(metrics: Vec<Event>) -> Vec<(String, AggregatedMetricValue)> {
303        let processor = AggregatedMetricsProcessor;
304        let state = processor.build_initial_state();
305
306        for metric in metrics {
307            processor.process(metric, &state);
308        }
309
310        let mut result = Vec::new();
311        state.visit_metrics(|context, value| {
312            result.push((context.name().to_string(), value.clone()));
313        });
314
315        result.sort_by(|(name_a, _), (name_b, _)| name_a.cmp(name_b));
316
317        result
318    }
319
320    fn assert_counter(value: &AggregatedMetricValue, expected: f64) {
321        match value {
322            AggregatedMetricValue::Counter(v) => assert_eq!(*v, expected),
323            other => panic!("expected counter, got {other:?}"),
324        }
325    }
326
327    fn assert_gauge(value: &AggregatedMetricValue, expected: f64) {
328        match value {
329            AggregatedMetricValue::Gauge(v) => assert_eq!(*v, expected),
330            other => panic!("expected gauge, got {other:?}"),
331        }
332    }
333
334    fn assert_histogram<F: FnOnce(&AggregatedHistogram)>(value: &AggregatedMetricValue, check: F) {
335        match value {
336            AggregatedMetricValue::Histogram(h) => check(h),
337            other => panic!("expected histogram, got {other:?}"),
338        }
339    }
340
341    #[test]
342    fn test_aggregate_multiple() {
343        let input_metrics = vec![
344            Event::Metric(Metric::counter("counter", 14.0)),
345            Event::Metric(Metric::gauge("gauge", 28.0)),
346        ];
347
348        let aggregated_metrics = process_metrics(input_metrics);
349        assert_eq!(aggregated_metrics.len(), 2);
350        assert_eq!(aggregated_metrics[0].0, "counter");
351        assert_counter(&aggregated_metrics[0].1, 14.0);
352        assert_eq!(aggregated_metrics[1].0, "gauge");
353        assert_gauge(&aggregated_metrics[1].1, 28.0);
354    }
355
356    #[test]
357    fn test_aggregate_counters() {
358        let input_metrics = vec![
359            Event::Metric(Metric::counter("counter", 14.0)),
360            Event::Metric(Metric::counter("counter", [(123456, 22.0)])),
361            Event::Metric(Metric::counter("counter", [(123456, 67.0), (123457, 44.0)])),
362        ];
363
364        let aggregated_metrics = process_metrics(input_metrics);
365        assert_eq!(aggregated_metrics.len(), 1);
366        assert_eq!(aggregated_metrics[0].0, "counter");
367        assert_counter(&aggregated_metrics[0].1, 147.0);
368    }
369
370    #[test]
371    fn test_aggregate_gauges() {
372        let input_metrics = vec![
373            Event::Metric(Metric::gauge("gauge", 14.0)),
374            Event::Metric(Metric::gauge("gauge", [(123458, 44.0)])),
375            Event::Metric(Metric::gauge("gauge", [(123455, 67.0), (123457, 88.0)])),
376        ];
377
378        let aggregated_metrics = process_metrics(input_metrics);
379        assert_eq!(aggregated_metrics.len(), 1);
380        assert_gauge(&aggregated_metrics[0].1, 44.0);
381    }
382
383    #[test]
384    fn test_aggregate_gauges_bias_incoming() {
385        let input_metrics = vec![
386            Event::Metric(Metric::gauge("gauge", [(123456, 33.0)])),
387            Event::Metric(Metric::gauge("gauge", [(123456, 66.0)])),
388        ];
389
390        let aggregated_metrics = process_metrics(input_metrics);
391        assert_eq!(aggregated_metrics.len(), 1);
392        assert_gauge(&aggregated_metrics[0].1, 66.0);
393    }
394
395    #[test]
396    fn test_aggregate_type_change() {
397        // When aggregating metrics with identical contexts but different types (i.e. counter vs gauge), the aggregated
398        // metric should be the last type seen.
399        let input_metrics = vec![
400            Event::Metric(Metric::gauge("my_metric", 33.0)),
401            Event::Metric(Metric::counter("my_metric", 42.0)),
402        ];
403
404        let aggregated_metrics = process_metrics(input_metrics);
405        assert_eq!(aggregated_metrics.len(), 1);
406        assert_counter(&aggregated_metrics[0].1, 42.0);
407    }
408
409    #[test]
410    fn test_aggregate_histograms() {
411        // Histograms with the same context should merge: counts, sums, and per-bucket counts accumulate.
412        let input_metrics = vec![
413            Event::Metric(Metric::histogram("h", [1.0, 2.0, 3.0])),
414            Event::Metric(Metric::histogram("h", [4.0, 5.0])),
415        ];
416
417        let aggregated_metrics = process_metrics(input_metrics);
418        assert_eq!(aggregated_metrics.len(), 1);
419        assert_histogram(&aggregated_metrics[0].1, |hist| {
420            assert_eq!(hist.count(), 5);
421            assert_eq!(hist.sum(), 15.0);
422        });
423    }
424}