Skip to main content

saluki_components/destinations/prometheus/
mod.rs

1use std::{
2    convert::Infallible,
3    num::NonZeroUsize,
4    sync::{Arc, LazyLock},
5};
6
7use async_trait::async_trait;
8use ddsketch::DDSketch;
9use http::{Request, Response};
10use hyper::{body::Incoming, service::service_fn};
11use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
12use prometheus_exposition::{MetricType, PrometheusRenderer};
13use saluki_common::{collections::FastIndexMap, iter::ReusableDeduplicator};
14use saluki_context::{tags::Tag, Context};
15use saluki_core::components::{destinations::*, ComponentContext};
16use saluki_core::data_model::event::{
17    metric::{Histogram, Metric, MetricValues},
18    EventType,
19};
20use saluki_error::GenericError;
21use saluki_io::net::{
22    listener::ConnectionOrientedListener,
23    server::http::{ErrorHandle, HttpServer, ShutdownHandle},
24    ListenAddress,
25};
26use serde::Deserialize;
27use stringtheory::{
28    interning::{FixedSizeInterner, Interner as _},
29    MetaString,
30};
31use tokio::{select, sync::RwLock};
32use tracing::debug;
33
34const CONTEXT_LIMIT: usize = 10_000;
35const PAYLOAD_SIZE_LIMIT_BYTES: usize = 1024 * 1024;
36const TAGS_BUFFER_SIZE_LIMIT_BYTES: usize = 2048;
37
38// Histogram-related constants and pre-calculated buckets.
39const TIME_HISTOGRAM_BUCKET_COUNT: usize = 30;
40static TIME_HISTOGRAM_BUCKETS: LazyLock<[(f64, &'static str); TIME_HISTOGRAM_BUCKET_COUNT]> =
41    LazyLock::new(|| histogram_buckets::<TIME_HISTOGRAM_BUCKET_COUNT>(0.000000128, 4.0));
42
43const NON_TIME_HISTOGRAM_BUCKET_COUNT: usize = 30;
44static NON_TIME_HISTOGRAM_BUCKETS: LazyLock<[(f64, &'static str); NON_TIME_HISTOGRAM_BUCKET_COUNT]> =
45    LazyLock::new(|| histogram_buckets::<NON_TIME_HISTOGRAM_BUCKET_COUNT>(1.0, 2.0));
46
47// SAFETY: This is obviously not zero.
48const METRIC_NAME_STRING_INTERNER_BYTES: NonZeroUsize = NonZeroUsize::new(65536).unwrap();
49
50/// Prometheus destination.
51///
52/// Exposes a Prometheus scrape endpoint that emits metrics in the Prometheus exposition format.
53///
54/// # Limits
55///
56/// - Number of contexts (unique series) is limited to 10,000.
57/// - Maximum size of scrape payload response is ~1MiB.
58///
59/// # Missing
60///
61/// - no support for expiring metrics (which we don't really need because the only use for this destination at the
62///   moment is internal metrics, which aren't dynamic since we don't use dynamic tags or have dynamic topology support,
63///   but... you know, we'll eventually need this)
64/// - full support for distributions (we can't convert a distribution to an aggregated histogram, and native histogram
65///   support is still too fresh for most clients, so we simply expose aggregated summaries as a stopgap)
66///
67#[derive(Deserialize)]
68pub struct PrometheusConfiguration {
69    #[serde(rename = "prometheus_listen_addr")]
70    listen_addr: ListenAddress,
71}
72
73impl PrometheusConfiguration {
74    /// Creates a new `PrometheusConfiguration` for the given listen address.
75    pub fn from_listen_address(listen_addr: ListenAddress) -> Self {
76        Self { listen_addr }
77    }
78}
79
80#[async_trait]
81impl DestinationBuilder for PrometheusConfiguration {
82    fn input_event_type(&self) -> EventType {
83        EventType::Metric
84    }
85
86    async fn build(&self, _context: ComponentContext) -> Result<Box<dyn Destination + Send>, GenericError> {
87        Ok(Box::new(Prometheus {
88            listener: ConnectionOrientedListener::from_listen_address(self.listen_addr.clone()).await?,
89            metrics: FastIndexMap::default(),
90            payload: Arc::new(RwLock::new(String::new())),
91            renderer: PrometheusRenderer::new(),
92            interner: FixedSizeInterner::new(METRIC_NAME_STRING_INTERNER_BYTES),
93        }))
94    }
95}
96
97impl MemoryBounds for PrometheusConfiguration {
98    fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
99        builder
100            .minimum()
101            // Capture the size of the heap allocation when the component is built.
102            .with_single_value::<Prometheus>("component struct");
103
104        builder
105            .firm()
106            // Even though our context map is really the Prometheus context to a map of context/value pairs, we're just
107            // simplifying things here because the ratio of true "contexts" to Prometheus contexts should be very high,
108            // high enough to make this a reasonable approximation.
109            .with_map::<Context, PrometheusValue>("state map", CONTEXT_LIMIT)
110            .with_fixed_amount("payload size", PAYLOAD_SIZE_LIMIT_BYTES)
111            .with_fixed_amount("tags buffer", TAGS_BUFFER_SIZE_LIMIT_BYTES);
112    }
113}
114
115struct Prometheus {
116    listener: ConnectionOrientedListener,
117    metrics: FastIndexMap<PrometheusContext, FastIndexMap<Context, PrometheusValue>>,
118    payload: Arc<RwLock<String>>,
119    renderer: PrometheusRenderer,
120    interner: FixedSizeInterner<1>,
121}
122
123#[async_trait]
124impl Destination for Prometheus {
125    async fn run(mut self: Box<Self>, mut context: DestinationContext) -> Result<(), GenericError> {
126        let Self {
127            listener,
128            mut metrics,
129            payload,
130            mut renderer,
131            interner,
132        } = *self;
133
134        let mut health = context.take_health_handle();
135
136        let (http_shutdown, mut http_error) = spawn_prom_scrape_service(listener, Arc::clone(&payload));
137        health.mark_ready();
138
139        debug!("Prometheus destination started.");
140
141        let mut contexts = 0;
142        let mut tags_deduplicator = ReusableDeduplicator::new();
143
144        loop {
145            select! {
146                _ = health.live() => continue,
147                maybe_events = context.events().next() => match maybe_events {
148                    Some(events) => {
149                        // Process each metric event in the batch, either merging it with the existing value or
150                        // inserting it for the first time.
151                        for event in events {
152                            if let Some(metric) = event.try_into_metric() {
153                                // Break apart our metric into its constituent parts, and then normalize it for
154                                // Prometheus: adjust the name if necessary, figuring out the equivalent Prometheus
155                                // metric type, and so on.
156                                let prom_context = match into_prometheus_metric(&metric, &mut renderer, &interner) {
157                                    Some(prom_context) => prom_context,
158                                    None => continue,
159                                };
160
161                                let (context, values, _) = metric.into_parts();
162
163                                // Create an entry for the context if we don't already have one, obeying our configured context limit.
164                                let existing_contexts = metrics.entry(prom_context.clone()).or_default();
165                                match existing_contexts.get_mut(&context) {
166                                    Some(existing_prom_value) => merge_metric_values_with_prom_value(values, existing_prom_value),
167                                    None => {
168                                        if contexts >= CONTEXT_LIMIT {
169                                            debug!("Prometheus destination reached context limit. Skipping metric '{}'.", context.name());
170                                            continue
171                                        }
172
173                                        let mut new_prom_value = get_prom_value_for_prom_context(&prom_context);
174                                        merge_metric_values_with_prom_value(values, &mut new_prom_value);
175
176                                        existing_contexts.insert(context, new_prom_value);
177                                        contexts += 1;
178                                    }
179                                }
180                            }
181                        }
182
183                        // Regenerate the scrape payload.
184                        regenerate_payload(&metrics, &payload, &mut renderer, &mut tags_deduplicator).await;
185                    },
186                    None => break,
187                },
188                error = &mut http_error => {
189                    if let Some(error) = error {
190                        debug!(%error, "HTTP server error.");
191                    }
192                    break;
193                },
194            }
195        }
196
197        // TODO: This should really be `DynamicShutdownCoordinator`-based so we can trigger shutdown _and_ wait until
198        // all HTTP connections and the listener have finished.
199        http_shutdown.shutdown();
200
201        debug!("Prometheus destination stopped.");
202
203        Ok(())
204    }
205}
206
207fn spawn_prom_scrape_service(
208    listener: ConnectionOrientedListener, payload: Arc<RwLock<String>>,
209) -> (ShutdownHandle, ErrorHandle) {
210    let service = service_fn(move |_: Request<Incoming>| {
211        let payload = Arc::clone(&payload);
212        async move {
213            let payload = payload.read().await;
214            Ok::<_, Infallible>(Response::new(axum::body::Body::from(payload.to_string())))
215        }
216    });
217
218    let http_server = HttpServer::from_listener(listener, service);
219    http_server.listen()
220}
221
222#[allow(clippy::mutable_key_type)]
223async fn regenerate_payload(
224    metrics: &FastIndexMap<PrometheusContext, FastIndexMap<Context, PrometheusValue>>, payload: &Arc<RwLock<String>>,
225    renderer: &mut PrometheusRenderer, tags_deduplicator: &mut ReusableDeduplicator<Tag>,
226) {
227    renderer.clear();
228
229    for (prom_context, contexts) in metrics {
230        if !write_metrics(renderer, prom_context, contexts, tags_deduplicator) {
231            debug!("Failed to write metric to payload. Continuing...");
232            continue;
233        }
234
235        if renderer.output().len() > PAYLOAD_SIZE_LIMIT_BYTES {
236            debug!(
237                payload_len = renderer.output().len(),
238                "Payload size limit exceeded. Skipping remaining metrics."
239            );
240            break;
241        }
242    }
243
244    let mut payload = payload.write().await;
245    payload.clear();
246    payload.push_str(renderer.output());
247}
248
249fn write_metrics(
250    renderer: &mut PrometheusRenderer, prom_context: &PrometheusContext,
251    contexts: &FastIndexMap<Context, PrometheusValue>, tags_deduplicator: &mut ReusableDeduplicator<Tag>,
252) -> bool {
253    if contexts.is_empty() {
254        debug!("No contexts for metric '{}'. Skipping.", prom_context.metric_name);
255        return true;
256    }
257
258    renderer.begin_group(&prom_context.metric_name, prom_context.metric_type, None);
259
260    for (context, values) in contexts {
261        let labels = match collect_tags(context, tags_deduplicator) {
262            Some(labels) => labels,
263            None => return false,
264        };
265
266        match values {
267            PrometheusValue::Counter(value) | PrometheusValue::Gauge(value) => {
268                renderer.write_gauge_or_counter_series(labels, *value);
269            }
270            PrometheusValue::Histogram(histogram) => {
271                renderer.write_histogram_series(labels, histogram.buckets(), histogram.sum, histogram.count);
272            }
273            PrometheusValue::Summary(sketch) => {
274                let quantiles = [0.1, 0.25, 0.5, 0.95, 0.99, 0.999]
275                    .into_iter()
276                    .map(|q| (q, sketch.quantile(q).unwrap_or_default()));
277
278                renderer.write_summary_series(labels, quantiles, sketch.sum().unwrap_or_default(), sketch.count());
279            }
280        }
281    }
282
283    renderer.finish_group();
284    true
285}
286
287/// Collects tags from a context into key-value pairs suitable for the renderer.
288fn collect_tags<'a>(
289    context: &'a Context, tags_deduplicator: &mut ReusableDeduplicator<Tag>,
290) -> Option<Vec<(&'a str, &'a str)>> {
291    let mut labels = Vec::new();
292    let mut total_bytes = 0;
293
294    let chained_tags = context.tags().into_iter().chain(context.origin_tags());
295    let deduplicated_tags = tags_deduplicator.deduplicated(chained_tags);
296
297    for tag in deduplicated_tags {
298        let tag_name = tag.name();
299        let tag_value = match tag.value() {
300            Some(value) => value,
301            None => {
302                debug!("Skipping bare tag.");
303                continue;
304            }
305        };
306
307        // Can't exceed the tags buffer size limit: we calculate the addition as tag name/value length plus three bytes
308        // to account for having to format it as `name="value",`.
309        total_bytes += tag_name.len() + tag_value.len() + 4;
310        if total_bytes > TAGS_BUFFER_SIZE_LIMIT_BYTES {
311            debug!("Tags buffer size limit exceeded. Tags may be missing from this metric.");
312            return None;
313        }
314
315        labels.push((tag_name, tag_value));
316    }
317
318    Some(labels)
319}
320
321#[derive(Clone, Eq, Hash, Ord, PartialEq, PartialOrd)]
322struct PrometheusContext {
323    metric_name: MetaString,
324    metric_type: MetricType,
325}
326
327enum PrometheusValue {
328    Counter(f64),
329    Gauge(f64),
330    Histogram(PrometheusHistogram),
331    Summary(DDSketch),
332}
333
334fn into_prometheus_metric(
335    metric: &Metric, renderer: &mut PrometheusRenderer, interner: &FixedSizeInterner<1>,
336) -> Option<PrometheusContext> {
337    // Normalize the metric name using the renderer, then intern it.
338    let normalized = renderer.normalize_metric_name(metric.context().name());
339    let metric_name = match interner.try_intern(normalized).map(MetaString::from) {
340        Some(name) => name,
341        None => {
342            debug!(
343                "Failed to intern normalized metric name. Skipping metric '{}'.",
344                metric.context().name()
345            );
346            return None;
347        }
348    };
349
350    let metric_type = match metric.values() {
351        MetricValues::Counter(_) => MetricType::Counter,
352        MetricValues::Gauge(_) | MetricValues::Set(_) => MetricType::Gauge,
353        MetricValues::Histogram(_) => MetricType::Histogram,
354        MetricValues::Distribution(_) => MetricType::Summary,
355        _ => return None,
356    };
357
358    Some(PrometheusContext {
359        metric_name,
360        metric_type,
361    })
362}
363
364fn get_prom_value_for_prom_context(prom_context: &PrometheusContext) -> PrometheusValue {
365    match prom_context.metric_type {
366        MetricType::Counter => PrometheusValue::Counter(0.0),
367        MetricType::Gauge => PrometheusValue::Gauge(0.0),
368        MetricType::Histogram => PrometheusValue::Histogram(PrometheusHistogram::new(&prom_context.metric_name)),
369        MetricType::Summary => PrometheusValue::Summary(DDSketch::default()),
370    }
371}
372
373fn merge_metric_values_with_prom_value(values: MetricValues, prom_value: &mut PrometheusValue) {
374    match (values, prom_value) {
375        (MetricValues::Counter(counter_values), PrometheusValue::Counter(prom_counter)) => {
376            for (_, value) in counter_values {
377                *prom_counter += value;
378            }
379        }
380        (MetricValues::Gauge(gauge_values), PrometheusValue::Gauge(prom_gauge)) => {
381            let latest_value = gauge_values
382                .into_iter()
383                .max_by_key(|(ts, _)| ts.map(|v| v.get()).unwrap_or_default())
384                .map(|(_, value)| value)
385                .unwrap_or_default();
386            *prom_gauge = latest_value;
387        }
388        (MetricValues::Set(set_values), PrometheusValue::Gauge(prom_gauge)) => {
389            let latest_value = set_values
390                .into_iter()
391                .max_by_key(|(ts, _)| ts.map(|v| v.get()).unwrap_or_default())
392                .map(|(_, value)| value)
393                .unwrap_or_default();
394            *prom_gauge = latest_value;
395        }
396        (MetricValues::Histogram(histogram_values), PrometheusValue::Histogram(prom_histogram)) => {
397            for (_, value) in histogram_values {
398                prom_histogram.merge_histogram(&value);
399            }
400        }
401        (MetricValues::Distribution(distribution_values), PrometheusValue::Summary(prom_summary)) => {
402            for (_, value) in distribution_values {
403                prom_summary.merge(&value);
404            }
405        }
406        _ => panic!("Mismatched metric types"),
407    }
408}
409
410#[derive(Clone)]
411struct PrometheusHistogram {
412    sum: f64,
413    count: u64,
414    buckets: Vec<(f64, &'static str, u64)>,
415}
416
417impl PrometheusHistogram {
418    fn new(metric_name: &str) -> Self {
419        // Super hacky but effective way to decide when to switch to the time-oriented buckets.
420        let base_buckets = if metric_name.ends_with("_seconds") {
421            &TIME_HISTOGRAM_BUCKETS[..]
422        } else {
423            &NON_TIME_HISTOGRAM_BUCKETS[..]
424        };
425
426        let buckets = base_buckets
427            .iter()
428            .map(|(upper_bound, upper_bound_str)| (*upper_bound, *upper_bound_str, 0))
429            .collect();
430
431        Self {
432            sum: 0.0,
433            count: 0,
434            buckets,
435        }
436    }
437
438    fn merge_histogram(&mut self, histogram: &Histogram) {
439        for sample in histogram.samples() {
440            self.add_sample(sample.value.into_inner(), sample.weight);
441        }
442    }
443
444    fn add_sample(&mut self, value: f64, weight: u64) {
445        self.sum += value * weight as f64;
446        self.count += weight;
447
448        // Add the value to each bucket that it falls into, up to the maximum number of buckets.
449        for (upper_bound, _, count) in &mut self.buckets {
450            if value <= *upper_bound {
451                *count += weight;
452            }
453        }
454    }
455
456    fn buckets(&self) -> impl Iterator<Item = (&'static str, u64)> + '_ {
457        self.buckets
458            .iter()
459            .map(|(_, upper_bound_str, count)| (*upper_bound_str, *count))
460    }
461}
462
463fn histogram_buckets<const N: usize>(base: f64, scale: f64) -> [(f64, &'static str); N] {
464    // We generate a set of "log-linear" buckets: logarithmically spaced values which are then subdivided linearly.
465    //
466    // As an example, with base=2 and scale=4, we would get: 2, 5, 8, 20, 32, 80, 128, 320, 512, and so on.
467    //
468    // We calculate buckets in pairs, where the n-th pair is `i` and `j`, such that `i` is `base * scale^n` and `j` is
469    // the midpoint between `i` and the next `i` (`base * scale^(n+1)`).
470
471    let mut buckets = [(0.0, ""); N];
472
473    let log_linear_buckets = std::iter::repeat(base).enumerate().flat_map(|(i, base)| {
474        let pow = scale.powf(i as f64);
475        let value = base * pow;
476
477        let next_pow = scale.powf((i + 1) as f64);
478        let next_value = base * next_pow;
479        let midpoint = (value + next_value) / 2.0;
480
481        [value, midpoint]
482    });
483
484    for (i, current_le) in log_linear_buckets.enumerate().take(N) {
485        let (bucket_le, bucket_le_str) = &mut buckets[i];
486        let current_le_str = format!("{}", current_le);
487
488        *bucket_le = current_le;
489        *bucket_le_str = current_le_str.leak();
490    }
491
492    buckets
493}
494
495#[cfg(test)]
496mod tests {
497    use super::*;
498
499    #[test]
500    fn bucket_print() {
501        println!("time buckets: {:?}", *TIME_HISTOGRAM_BUCKETS);
502        println!("non-time buckets: {:?}", *NON_TIME_HISTOGRAM_BUCKETS);
503    }
504
505    #[test]
506    fn prom_histogram_add_sample() {
507        let sample1 = (0.25, 1);
508        let sample2 = (1.0, 2);
509        let sample3 = (2.0, 3);
510
511        let mut histogram = PrometheusHistogram::new("time_metric_seconds");
512        histogram.add_sample(sample1.0, sample1.1);
513        histogram.add_sample(sample2.0, sample2.1);
514        histogram.add_sample(sample3.0, sample3.1);
515
516        let sample1_weighted_value = sample1.0 * sample1.1 as f64;
517        let sample2_weighted_value = sample2.0 * sample2.1 as f64;
518        let sample3_weighted_value = sample3.0 * sample3.1 as f64;
519        let expected_sum = sample1_weighted_value + sample2_weighted_value + sample3_weighted_value;
520        let expected_count = sample1.1 + sample2.1 + sample3.1;
521        assert_eq!(histogram.sum, expected_sum);
522        assert_eq!(histogram.count, expected_count);
523
524        // Go through and make sure we have things in the right buckets.
525        let mut expected_bucket_count = 0;
526        for sample in [sample1, sample2, sample3] {
527            for bucket in &histogram.buckets {
528                // If we've finally hit a bucket that includes our sample value, it's count should be equal to or
529                // greater than our expected bucket count when we account for the current sample.
530                if sample.0 <= bucket.0 {
531                    assert!(bucket.2 >= expected_bucket_count + sample.1);
532                }
533            }
534
535            // Adjust the expected bucket count to fully account for the current sample before moving on.
536            expected_bucket_count += sample.1;
537        }
538    }
539}