saluki_core/observability/metrics/
processor.rs1use std::collections::BTreeMap;
12
13use prometheus_exposition::{MetricType, PrometheusRenderer};
14use stringtheory::MetaString;
15
16use super::aggregated::{AggregatedMetricValue, AggregatedMetricsState};
17use super::remapper::RemapperRule;
18
19pub struct TelemetryProcessor {
24 renderer: PrometheusRenderer,
25 rules: Vec<RemapperRule>,
26}
27
28impl TelemetryProcessor {
29 pub fn new() -> Self {
33 Self {
34 renderer: PrometheusRenderer::new(),
35 rules: Vec::new(),
36 }
37 }
38
39 pub fn with_remapper_rules(mut self, rules: Vec<RemapperRule>) -> Self {
45 self.rules = rules;
46 self
47 }
48
49 pub fn process(&mut self, state: &AggregatedMetricsState) -> String {
51 self.renderer.clear();
52
53 if self.rules.is_empty() {
54 self.process_all(state);
55 } else {
56 self.process_remapped(state);
57 }
58
59 self.renderer.output().to_string()
60 }
61
62 fn process_all(&mut self, state: &AggregatedMetricsState) {
63 let mut groups: BTreeMap<MetaString, BTreeMap<Vec<MetaString>, AggregatedMetricValue>> = BTreeMap::new();
66
67 state.visit_metrics(|context, value| {
68 let mut tags: Vec<MetaString> = context.tags().into_iter().map(|tag| tag.clone().into_inner()).collect();
69 tags.sort();
70
71 let series = groups.entry(context.name().clone()).or_default();
72 series
73 .entry(tags)
74 .and_modify(|existing| existing.merge(value))
75 .or_insert_with(|| value.clone());
76 });
77
78 for (name, series) in &groups {
79 render_group(&mut self.renderer, name, None, series);
80 }
81 }
82
83 fn process_remapped(&mut self, state: &AggregatedMetricsState) {
84 let mut groups: BTreeMap<&'static str, BTreeMap<Vec<MetaString>, AggregatedMetricValue>> = BTreeMap::new();
90 let mut help_text: BTreeMap<&'static str, &'static str> = BTreeMap::new();
91
92 state.visit_metrics(|context, value| {
93 for rule in &self.rules {
94 if let Some(mut remapped) = rule.try_match_no_context(context) {
95 remapped.tags.sort();
96
97 let continue_matching = rule.should_continue_matching();
98 if let Some(text) = rule.help_text() {
99 help_text.entry(remapped.name).or_insert(text);
100 }
101 let series = groups.entry(remapped.name).or_default();
102 series
103 .entry(remapped.tags)
104 .and_modify(|existing| existing.merge(value))
105 .or_insert_with(|| value.clone());
106
107 if !continue_matching {
108 return;
109 }
110 }
111 }
112 });
113
114 for (name, series) in &groups {
115 render_group(&mut self.renderer, name, help_text.get(name).copied(), series);
116 }
117 }
118}
119
120impl Default for TelemetryProcessor {
121 fn default() -> Self {
122 Self::new()
123 }
124}
125
126fn render_group(
127 renderer: &mut PrometheusRenderer, name: &str, help_text: Option<&str>,
128 series: &BTreeMap<Vec<MetaString>, AggregatedMetricValue>,
129) {
130 let metric_type = match series.values().next() {
132 Some(AggregatedMetricValue::Counter(_)) => MetricType::Counter,
133 Some(AggregatedMetricValue::Gauge(_)) => MetricType::Gauge,
134 Some(AggregatedMetricValue::Histogram(_)) => MetricType::Histogram,
135 None => return,
136 };
137
138 match metric_type {
139 MetricType::Counter | MetricType::Gauge => {
140 let rendered = series.iter().map(|(tags, value)| (split_tags(tags), value.value()));
141 renderer.render_scalar_group(name, metric_type, help_text, rendered);
142 }
143 MetricType::Histogram => {
144 renderer.begin_group(name, metric_type, help_text);
145 for (tags, value) in series {
146 if let AggregatedMetricValue::Histogram(histogram) = value {
147 renderer.write_histogram_series(
148 split_tags(tags),
149 histogram.buckets(),
150 histogram.sum(),
151 histogram.count(),
152 );
153 }
154 }
155 renderer.finish_group();
156 }
157 MetricType::Summary => {}
158 }
159}
160
161fn split_tags(tags: &[MetaString]) -> impl Iterator<Item = (&str, &str)> {
163 tags.iter().filter_map(|tag| tag.as_ref().split_once(':'))
164}
165
166#[cfg(test)]
167mod tests {
168 use saluki_context::Context;
169
170 use super::super::aggregated::AggregatedMetricsProcessor;
171 use super::super::reflector::Processor as _;
172 use super::*;
173 use crate::data_model::event::{metric::Metric, Event};
174
175 fn process_all(metrics: Vec<Event>) -> AggregatedMetricsState {
176 let processor = AggregatedMetricsProcessor;
177 let state = processor.build_initial_state();
178 for metric in metrics {
179 processor.process(metric, &state);
180 }
181 state
182 }
183
184 #[test]
185 fn renders_counter_and_gauge_groups_without_rules() {
186 let state = process_all(vec![
187 Event::Metric(Metric::counter(
188 Context::from_static_parts("adp.requests_total", &["method:get"]),
189 10.0,
190 )),
191 Event::Metric(Metric::counter(
192 Context::from_static_parts("adp.requests_total", &["method:post"]),
193 3.0,
194 )),
195 Event::Metric(Metric::gauge(
196 Context::from_static_parts("adp.queue_depth", &["queue:work"]),
197 5.0,
198 )),
199 ]);
200
201 let output = TelemetryProcessor::new().process(&state);
202
203 assert!(output.contains("# TYPE adp__requests_total counter"));
204 assert!(output.contains("adp__requests_total{method=\"get\"} 10"));
205 assert!(output.contains("adp__requests_total{method=\"post\"} 3"));
206 assert!(output.contains("# TYPE adp__queue_depth gauge"));
207 assert!(output.contains("adp__queue_depth{queue=\"work\"} 5"));
208 }
209
210 #[test]
211 fn renders_histogram_groups_without_rules() {
212 let state = process_all(vec![
213 Event::Metric(Metric::histogram(
214 Context::from_static_parts("adp.latency_seconds", &["op:read"]),
215 [0.001, 0.002, 0.5],
216 )),
217 Event::Metric(Metric::histogram(
218 Context::from_static_parts("adp.latency_seconds", &["op:write"]),
219 [0.01, 0.02],
220 )),
221 ]);
222
223 let output = TelemetryProcessor::new().process(&state);
224
225 assert!(output.contains("# TYPE adp__latency_seconds histogram"));
226 assert!(output.contains("adp__latency_seconds_bucket{op=\"read\","));
227 assert!(output.contains("adp__latency_seconds_bucket{op=\"write\","));
228 assert!(output.contains("le=\"+Inf\""));
229 assert!(output.contains("adp__latency_seconds_sum{op=\"read\"}"));
230 assert!(output.contains("adp__latency_seconds_count{op=\"read\"} 3"));
231 assert!(output.contains("adp__latency_seconds_count{op=\"write\"} 2"));
232 }
233
234 #[test]
235 fn renders_only_matched_metrics_with_rules() {
236 let state = process_all(vec![
237 Event::Metric(Metric::counter(
238 Context::from_static_parts("src.matched", &["component_id:x"]),
239 42.0,
240 )),
241 Event::Metric(Metric::counter(
242 Context::from_static_parts("src.unmatched", &["component_id:x"]),
243 100.0,
244 )),
245 ]);
246
247 let rules =
248 vec![RemapperRule::by_name("src.matched", "dst.renamed").with_help_text("Renamed counter help text")];
249
250 let output = TelemetryProcessor::new().with_remapper_rules(rules).process(&state);
251
252 assert!(output.contains("# HELP dst__renamed Renamed counter help text"));
253 assert!(output.contains("# TYPE dst__renamed counter"));
254 assert!(output.contains("dst__renamed 42"));
255 assert!(!output.contains("unmatched"));
257 }
258
259 #[test]
260 fn renders_histograms_through_rules() {
261 let state = process_all(vec![Event::Metric(Metric::histogram(
262 Context::from_static_parts("src.latency_seconds", &["op:read"]),
263 [0.001, 0.5],
264 ))]);
265
266 let rules = vec![RemapperRule::by_name("src.latency_seconds", "dst.latency_seconds")
267 .with_original_tags(["op"])
268 .with_help_text("Remapped latency")];
269
270 let output = TelemetryProcessor::new().with_remapper_rules(rules).process(&state);
271
272 assert!(output.contains("# HELP dst__latency_seconds Remapped latency"));
273 assert!(output.contains("# TYPE dst__latency_seconds histogram"));
274 assert!(output.contains("dst__latency_seconds_bucket{op=\"read\","));
275 assert!(output.contains("dst__latency_seconds_count{op=\"read\"} 2"));
276 }
277}