Skip to main content

saluki_core/observability/metrics/
processor.rs

1//! Helper for processing [`AggregatedMetricsState`] into Prometheus text exposition format.
2//!
3//! [`TelemetryProcessor`] wraps a [`PrometheusRenderer`] and an optional set of [`RemapperRule`]s.
4//! It has two modes:
5//!
6//! - **No rules**: every metric in the state is rendered under its raw name and tags.
7//! - **With rules**: each metric is matched against the rules; only matched metrics are rendered,
8//!   under their remapped name and tags. Counters, gauges, and histograms are all supported in
9//!   both modes.
10
11use std::collections::BTreeMap;
12
13use prometheus_exposition::{MetricType, PrometheusRenderer};
14use stringtheory::MetaString;
15
16use super::aggregated::{AggregatedMetricValue, AggregatedMetricsState};
17use super::remapper::RemapperRule;
18
19/// Processes an [`AggregatedMetricsState`] into a Prometheus text exposition payload.
20///
21/// Owns the underlying [`PrometheusRenderer`] (whose internal buffers are reused across calls) and
22/// an optional set of [`RemapperRule`]s. See the module-level docs for the two rendering modes.
23pub struct TelemetryProcessor {
24    renderer: PrometheusRenderer,
25    rules: Vec<RemapperRule>,
26}
27
28impl TelemetryProcessor {
29    /// Creates a new `TelemetryProcessor` with no remapper rules.
30    ///
31    /// In this mode, every metric in the state is rendered under its raw name and tags.
32    pub fn new() -> Self {
33        Self {
34            renderer: PrometheusRenderer::new(),
35            rules: Vec::new(),
36        }
37    }
38
39    /// Configures the set of [`RemapperRule`]s applied during rendering.
40    ///
41    /// In this mode, only metrics that match at least one rule are rendered; rule order matters
42    /// (see [`RemapperRule::with_continued_matching`] for how a single source metric can fan out
43    /// to multiple remapped metrics).
44    pub fn with_remapper_rules(mut self, rules: Vec<RemapperRule>) -> Self {
45        self.rules = rules;
46        self
47    }
48
49    /// Processes the given state and returns the resulting Prometheus exposition payload.
50    pub fn process(&mut self, state: &AggregatedMetricsState) -> String {
51        self.renderer.clear();
52
53        if self.rules.is_empty() {
54            self.process_all(state);
55        } else {
56            self.process_remapped(state);
57        }
58
59        self.renderer.output().to_string()
60    }
61
62    fn process_all(&mut self, state: &AggregatedMetricsState) {
63        // Group metrics first by name, then by sorted tag set. Duplicate (name, tags) entries are
64        // merged so counters sum, gauges take the latest, and histograms merge buckets/sum/count.
65        let mut groups: BTreeMap<MetaString, BTreeMap<Vec<MetaString>, AggregatedMetricValue>> = BTreeMap::new();
66
67        state.visit_metrics(|context, value| {
68            let mut tags: Vec<MetaString> = context.tags().into_iter().map(|tag| tag.clone().into_inner()).collect();
69            tags.sort();
70
71            let series = groups.entry(context.name().clone()).or_default();
72            series
73                .entry(tags)
74                .and_modify(|existing| existing.merge(value))
75                .or_insert_with(|| value.clone());
76        });
77
78        for (name, series) in &groups {
79            render_group(&mut self.renderer, name, None, series);
80        }
81    }
82
83    fn process_remapped(&mut self, state: &AggregatedMetricsState) {
84        // Collect, deduplicate, and group matched metrics by their remapped name and tags.
85        //
86        // Multiple source metrics can remap to the same (name, tags) identity. We merge those via
87        // `AggregatedMetricValue::merge`, and tags are sorted to normalize their order so identical
88        // tag sets in different orders deduplicate correctly.
89        let mut groups: BTreeMap<&'static str, BTreeMap<Vec<MetaString>, AggregatedMetricValue>> = BTreeMap::new();
90        let mut help_text: BTreeMap<&'static str, &'static str> = BTreeMap::new();
91
92        state.visit_metrics(|context, value| {
93            for rule in &self.rules {
94                if let Some(mut remapped) = rule.try_match_no_context(context) {
95                    remapped.tags.sort();
96
97                    let continue_matching = rule.should_continue_matching();
98                    if let Some(text) = rule.help_text() {
99                        help_text.entry(remapped.name).or_insert(text);
100                    }
101                    let series = groups.entry(remapped.name).or_default();
102                    series
103                        .entry(remapped.tags)
104                        .and_modify(|existing| existing.merge(value))
105                        .or_insert_with(|| value.clone());
106
107                    if !continue_matching {
108                        return;
109                    }
110                }
111            }
112        });
113
114        for (name, series) in &groups {
115            render_group(&mut self.renderer, name, help_text.get(name).copied(), series);
116        }
117    }
118}
119
120impl Default for TelemetryProcessor {
121    fn default() -> Self {
122        Self::new()
123    }
124}
125
126fn render_group(
127    renderer: &mut PrometheusRenderer, name: &str, help_text: Option<&str>,
128    series: &BTreeMap<Vec<MetaString>, AggregatedMetricValue>,
129) {
130    // Determine the metric type from the first series value.
131    let metric_type = match series.values().next() {
132        Some(AggregatedMetricValue::Counter(_)) => MetricType::Counter,
133        Some(AggregatedMetricValue::Gauge(_)) => MetricType::Gauge,
134        Some(AggregatedMetricValue::Histogram(_)) => MetricType::Histogram,
135        None => return,
136    };
137
138    match metric_type {
139        MetricType::Counter | MetricType::Gauge => {
140            let rendered = series.iter().map(|(tags, value)| (split_tags(tags), value.value()));
141            renderer.render_scalar_group(name, metric_type, help_text, rendered);
142        }
143        MetricType::Histogram => {
144            renderer.begin_group(name, metric_type, help_text);
145            for (tags, value) in series {
146                if let AggregatedMetricValue::Histogram(histogram) = value {
147                    renderer.write_histogram_series(
148                        split_tags(tags),
149                        histogram.buckets(),
150                        histogram.sum(),
151                        histogram.count(),
152                    );
153                }
154            }
155            renderer.finish_group();
156        }
157        MetricType::Summary => {}
158    }
159}
160
161/// Splits a sorted `key:value` tag list into Prometheus label pairs, dropping bare tags.
162fn split_tags(tags: &[MetaString]) -> impl Iterator<Item = (&str, &str)> {
163    tags.iter().filter_map(|tag| tag.as_ref().split_once(':'))
164}
165
166#[cfg(test)]
167mod tests {
168    use saluki_context::Context;
169
170    use super::super::aggregated::AggregatedMetricsProcessor;
171    use super::super::reflector::Processor as _;
172    use super::*;
173    use crate::data_model::event::{metric::Metric, Event};
174
175    fn process_all(metrics: Vec<Event>) -> AggregatedMetricsState {
176        let processor = AggregatedMetricsProcessor;
177        let state = processor.build_initial_state();
178        for metric in metrics {
179            processor.process(metric, &state);
180        }
181        state
182    }
183
184    #[test]
185    fn renders_counter_and_gauge_groups_without_rules() {
186        let state = process_all(vec![
187            Event::Metric(Metric::counter(
188                Context::from_static_parts("adp.requests_total", &["method:get"]),
189                10.0,
190            )),
191            Event::Metric(Metric::counter(
192                Context::from_static_parts("adp.requests_total", &["method:post"]),
193                3.0,
194            )),
195            Event::Metric(Metric::gauge(
196                Context::from_static_parts("adp.queue_depth", &["queue:work"]),
197                5.0,
198            )),
199        ]);
200
201        let output = TelemetryProcessor::new().process(&state);
202
203        assert!(output.contains("# TYPE adp__requests_total counter"));
204        assert!(output.contains("adp__requests_total{method=\"get\"} 10"));
205        assert!(output.contains("adp__requests_total{method=\"post\"} 3"));
206        assert!(output.contains("# TYPE adp__queue_depth gauge"));
207        assert!(output.contains("adp__queue_depth{queue=\"work\"} 5"));
208    }
209
210    #[test]
211    fn renders_histogram_groups_without_rules() {
212        let state = process_all(vec![
213            Event::Metric(Metric::histogram(
214                Context::from_static_parts("adp.latency_seconds", &["op:read"]),
215                [0.001, 0.002, 0.5],
216            )),
217            Event::Metric(Metric::histogram(
218                Context::from_static_parts("adp.latency_seconds", &["op:write"]),
219                [0.01, 0.02],
220            )),
221        ]);
222
223        let output = TelemetryProcessor::new().process(&state);
224
225        assert!(output.contains("# TYPE adp__latency_seconds histogram"));
226        assert!(output.contains("adp__latency_seconds_bucket{op=\"read\","));
227        assert!(output.contains("adp__latency_seconds_bucket{op=\"write\","));
228        assert!(output.contains("le=\"+Inf\""));
229        assert!(output.contains("adp__latency_seconds_sum{op=\"read\"}"));
230        assert!(output.contains("adp__latency_seconds_count{op=\"read\"} 3"));
231        assert!(output.contains("adp__latency_seconds_count{op=\"write\"} 2"));
232    }
233
234    #[test]
235    fn renders_only_matched_metrics_with_rules() {
236        let state = process_all(vec![
237            Event::Metric(Metric::counter(
238                Context::from_static_parts("src.matched", &["component_id:x"]),
239                42.0,
240            )),
241            Event::Metric(Metric::counter(
242                Context::from_static_parts("src.unmatched", &["component_id:x"]),
243                100.0,
244            )),
245        ]);
246
247        let rules =
248            vec![RemapperRule::by_name("src.matched", "dst.renamed").with_help_text("Renamed counter help text")];
249
250        let output = TelemetryProcessor::new().with_remapper_rules(rules).process(&state);
251
252        assert!(output.contains("# HELP dst__renamed Renamed counter help text"));
253        assert!(output.contains("# TYPE dst__renamed counter"));
254        assert!(output.contains("dst__renamed 42"));
255        // The unmatched counter must not appear.
256        assert!(!output.contains("unmatched"));
257    }
258
259    #[test]
260    fn renders_histograms_through_rules() {
261        let state = process_all(vec![Event::Metric(Metric::histogram(
262            Context::from_static_parts("src.latency_seconds", &["op:read"]),
263            [0.001, 0.5],
264        ))]);
265
266        let rules = vec![RemapperRule::by_name("src.latency_seconds", "dst.latency_seconds")
267            .with_original_tags(["op"])
268            .with_help_text("Remapped latency")];
269
270        let output = TelemetryProcessor::new().with_remapper_rules(rules).process(&state);
271
272        assert!(output.contains("# HELP dst__latency_seconds Remapped latency"));
273        assert!(output.contains("# TYPE dst__latency_seconds histogram"));
274        assert!(output.contains("dst__latency_seconds_bucket{op=\"read\","));
275        assert!(output.contains("dst__latency_seconds_count{op=\"read\"} 2"));
276    }
277}