saluki_core/observability/metrics/
aggregated.rs1use std::sync::Arc;
8
9use futures::stream::StreamExt as _;
10use papaya::HashMap;
11use saluki_context::Context;
12use tokio::sync::OnceCell;
13
14use super::histogram::AggregatedHistogram;
15use super::reflector::{Processor, Reflector};
16use super::MetricsStream;
17use crate::data_model::event::{metric::MetricValues, Event};
18
19#[derive(Clone, Debug)]
21pub enum AggregatedMetricValue {
22 Counter(f64),
24
25 Gauge(f64),
27
28 Histogram(AggregatedHistogram),
30}
31
32impl AggregatedMetricValue {
33 pub fn value(&self) -> f64 {
37 match self {
38 AggregatedMetricValue::Counter(value) => *value,
39 AggregatedMetricValue::Gauge(value) => *value,
40 AggregatedMetricValue::Histogram(_) => 0.0,
41 }
42 }
43
44 pub fn merge(&mut self, incoming: &AggregatedMetricValue) {
49 match (self, incoming) {
50 (Self::Counter(a), Self::Counter(b)) => {
51 *a += *b;
52 }
53 (Self::Histogram(a), Self::Histogram(b)) => {
54 a.merge(b);
55 }
56 (Self::Gauge(a), Self::Gauge(b)) => *a = *b,
57 (existing, incoming) => *existing = incoming.clone(),
59 }
60 }
61}
62
63#[derive(Clone)]
64pub(crate) struct AggregatedMetric {
65 pub(crate) timestamp: Option<u64>,
66 pub(crate) value: AggregatedMetricValue,
67}
68
69impl AggregatedMetric {
70 fn counter(value: f64) -> Self {
71 Self {
72 timestamp: None,
73 value: AggregatedMetricValue::Counter(value),
74 }
75 }
76
77 fn gauge(timestamp: u64, value: f64) -> Self {
78 Self {
79 timestamp: Some(timestamp),
80 value: AggregatedMetricValue::Gauge(value),
81 }
82 }
83
84 fn histogram(histogram: AggregatedHistogram) -> Self {
85 Self {
86 timestamp: None,
87 value: AggregatedMetricValue::Histogram(histogram),
88 }
89 }
90
91 fn merge(&self, other: Self) -> Self {
92 match (&self.value, other.value) {
93 (AggregatedMetricValue::Counter(a), AggregatedMetricValue::Counter(b)) => Self {
94 timestamp: None,
95 value: AggregatedMetricValue::Counter(a + b),
96 },
97 (AggregatedMetricValue::Gauge(a), AggregatedMetricValue::Gauge(b)) => {
98 let ts_a = self.timestamp.unwrap_or(0);
99 let ts_b = other.timestamp.unwrap_or(0);
100 let (new_ts, new_value) = if ts_a > ts_b { (ts_a, *a) } else { (ts_b, b) };
101
102 Self {
103 timestamp: Some(new_ts),
104 value: AggregatedMetricValue::Gauge(new_value),
105 }
106 }
107 (AggregatedMetricValue::Histogram(a), AggregatedMetricValue::Histogram(b)) => {
108 let mut merged = a.clone();
109 merged.merge(&b);
110 Self {
111 timestamp: None,
112 value: AggregatedMetricValue::Histogram(merged),
113 }
114 }
115 (_, other_value) => Self {
116 timestamp: other.timestamp,
117 value: other_value,
118 },
119 }
120 }
121}
122
123struct Inner {
124 metrics: HashMap<Context, AggregatedMetric>,
125}
126
127pub struct AggregatedMetricsState {
129 inner: Arc<Inner>,
130}
131
132impl AggregatedMetricsState {
133 pub fn visit_metrics<F>(&self, mut visitor: F)
135 where
136 F: FnMut(&Context, &AggregatedMetricValue),
137 {
138 self.inner
139 .metrics
140 .pin()
141 .iter()
142 .for_each(|(context, value)| visitor(context, &value.value));
143 }
144
145 pub fn find_single_with_tags(&self, name: &str, tags: &[&str]) -> Option<f64> {
153 let mut had_existing = false;
154 let mut maybe_metric = None;
155
156 self.visit_metrics(|context, value| {
157 if context.name() == name {
158 for tag in tags {
159 if !context.tags().has_tag(tag) {
160 return;
161 }
162 }
163
164 match value {
165 AggregatedMetricValue::Counter(v) | AggregatedMetricValue::Gauge(v) => {
166 had_existing = maybe_metric.is_some();
167 maybe_metric = Some(*v);
168 }
169 AggregatedMetricValue::Histogram(_) => {}
170 }
171 }
172 });
173
174 if had_existing {
175 None
176 } else {
177 maybe_metric
178 }
179 }
180
181 pub fn get_aggregated_with_tags(&self, name: &str, tags: &[&str]) -> f64 {
191 let mut total = 0.0;
192
193 self.visit_metrics(|context, value| {
194 if context.name() == name {
195 for tag in tags {
196 if !context.tags().has_tag(tag) {
197 return;
198 }
199 }
200
201 if let AggregatedMetricValue::Counter(value) = value {
202 total += *value;
203 }
204 }
205 });
206
207 total
208 }
209}
210
211#[derive(Clone)]
226pub struct AggregatedMetricsProcessor;
227
228impl Processor for AggregatedMetricsProcessor {
229 type Input = Event;
230 type State = AggregatedMetricsState;
231
232 fn build_initial_state(&self) -> Self::State {
233 AggregatedMetricsState {
234 inner: Arc::new(Inner {
235 metrics: HashMap::new(),
236 }),
237 }
238 }
239
240 fn process(&self, input: Self::Input, state: &Self::State) {
241 if let Some(metric) = input.try_into_metric() {
242 let (context, values, _) = metric.into_parts();
243 if let Some(agg_metric) = metric_values_to_aggregated(context.name(), values) {
244 state.inner.metrics.pin().update_or_insert_with(
245 context,
246 |existing| existing.merge(agg_metric.clone()),
247 || agg_metric.clone(),
248 );
249 }
250 }
251 }
252}
253
254fn metric_values_to_aggregated(metric_name: &str, values: MetricValues) -> Option<AggregatedMetric> {
255 match values {
256 MetricValues::Counter(points) => {
257 let value = points.into_iter().map(|(_, value)| value).sum();
259 Some(AggregatedMetric::counter(value))
260 }
261 MetricValues::Gauge(points) => {
262 points
264 .into_iter()
265 .last()
266 .map(|(ts, value)| AggregatedMetric::gauge(ts.map(|ts| ts.get()).unwrap_or(0), value))
267 }
268 MetricValues::Histogram(points) => {
269 let mut aggregated = AggregatedHistogram::new(metric_name);
270 for (_, histogram) in points {
271 aggregated.merge_histogram(&histogram);
272 }
273 if aggregated.count() == 0 {
274 None
275 } else {
276 Some(AggregatedMetric::histogram(aggregated))
277 }
278 }
279 _ => None,
280 }
281}
282
283pub async fn get_shared_metrics_state() -> Reflector<AggregatedMetricsProcessor> {
287 static REFLECTOR: OnceCell<Reflector<AggregatedMetricsProcessor>> = OnceCell::const_new();
288 REFLECTOR
289 .get_or_init(|| async {
290 let metrics_stream = MetricsStream::register().map(Arc::unwrap_or_clone);
291 Reflector::new(metrics_stream, AggregatedMetricsProcessor).await
292 })
293 .await
294 .clone()
295}
296
297#[cfg(test)]
298mod tests {
299 use super::*;
300 use crate::data_model::event::metric::Metric;
301
302 fn process_metrics(metrics: Vec<Event>) -> Vec<(String, AggregatedMetricValue)> {
303 let processor = AggregatedMetricsProcessor;
304 let state = processor.build_initial_state();
305
306 for metric in metrics {
307 processor.process(metric, &state);
308 }
309
310 let mut result = Vec::new();
311 state.visit_metrics(|context, value| {
312 result.push((context.name().to_string(), value.clone()));
313 });
314
315 result.sort_by(|(name_a, _), (name_b, _)| name_a.cmp(name_b));
316
317 result
318 }
319
320 fn assert_counter(value: &AggregatedMetricValue, expected: f64) {
321 match value {
322 AggregatedMetricValue::Counter(v) => assert_eq!(*v, expected),
323 other => panic!("expected counter, got {other:?}"),
324 }
325 }
326
327 fn assert_gauge(value: &AggregatedMetricValue, expected: f64) {
328 match value {
329 AggregatedMetricValue::Gauge(v) => assert_eq!(*v, expected),
330 other => panic!("expected gauge, got {other:?}"),
331 }
332 }
333
334 fn assert_histogram<F: FnOnce(&AggregatedHistogram)>(value: &AggregatedMetricValue, check: F) {
335 match value {
336 AggregatedMetricValue::Histogram(h) => check(h),
337 other => panic!("expected histogram, got {other:?}"),
338 }
339 }
340
341 #[test]
342 fn test_aggregate_multiple() {
343 let input_metrics = vec![
344 Event::Metric(Metric::counter("counter", 14.0)),
345 Event::Metric(Metric::gauge("gauge", 28.0)),
346 ];
347
348 let aggregated_metrics = process_metrics(input_metrics);
349 assert_eq!(aggregated_metrics.len(), 2);
350 assert_eq!(aggregated_metrics[0].0, "counter");
351 assert_counter(&aggregated_metrics[0].1, 14.0);
352 assert_eq!(aggregated_metrics[1].0, "gauge");
353 assert_gauge(&aggregated_metrics[1].1, 28.0);
354 }
355
356 #[test]
357 fn test_aggregate_counters() {
358 let input_metrics = vec![
359 Event::Metric(Metric::counter("counter", 14.0)),
360 Event::Metric(Metric::counter("counter", [(123456, 22.0)])),
361 Event::Metric(Metric::counter("counter", [(123456, 67.0), (123457, 44.0)])),
362 ];
363
364 let aggregated_metrics = process_metrics(input_metrics);
365 assert_eq!(aggregated_metrics.len(), 1);
366 assert_eq!(aggregated_metrics[0].0, "counter");
367 assert_counter(&aggregated_metrics[0].1, 147.0);
368 }
369
370 #[test]
371 fn test_aggregate_gauges() {
372 let input_metrics = vec![
373 Event::Metric(Metric::gauge("gauge", 14.0)),
374 Event::Metric(Metric::gauge("gauge", [(123458, 44.0)])),
375 Event::Metric(Metric::gauge("gauge", [(123455, 67.0), (123457, 88.0)])),
376 ];
377
378 let aggregated_metrics = process_metrics(input_metrics);
379 assert_eq!(aggregated_metrics.len(), 1);
380 assert_gauge(&aggregated_metrics[0].1, 44.0);
381 }
382
383 #[test]
384 fn test_aggregate_gauges_bias_incoming() {
385 let input_metrics = vec![
386 Event::Metric(Metric::gauge("gauge", [(123456, 33.0)])),
387 Event::Metric(Metric::gauge("gauge", [(123456, 66.0)])),
388 ];
389
390 let aggregated_metrics = process_metrics(input_metrics);
391 assert_eq!(aggregated_metrics.len(), 1);
392 assert_gauge(&aggregated_metrics[0].1, 66.0);
393 }
394
395 #[test]
396 fn test_aggregate_type_change() {
397 let input_metrics = vec![
400 Event::Metric(Metric::gauge("my_metric", 33.0)),
401 Event::Metric(Metric::counter("my_metric", 42.0)),
402 ];
403
404 let aggregated_metrics = process_metrics(input_metrics);
405 assert_eq!(aggregated_metrics.len(), 1);
406 assert_counter(&aggregated_metrics[0].1, 42.0);
407 }
408
409 #[test]
410 fn test_aggregate_histograms() {
411 let input_metrics = vec![
413 Event::Metric(Metric::histogram("h", [1.0, 2.0, 3.0])),
414 Event::Metric(Metric::histogram("h", [4.0, 5.0])),
415 ];
416
417 let aggregated_metrics = process_metrics(input_metrics);
418 assert_eq!(aggregated_metrics.len(), 1);
419 assert_histogram(&aggregated_metrics[0].1, |hist| {
420 assert_eq!(hist.count(), 5);
421 assert_eq!(hist.sum(), 15.0);
422 });
423 }
424}