saluki_components/destinations/prometheus/
mod.rs

1use std::{
2    convert::Infallible,
3    fmt::Write as _,
4    num::NonZeroUsize,
5    sync::{Arc, LazyLock},
6};
7
8use async_trait::async_trait;
9use ddsketch_agent::DDSketch;
10use http::{Request, Response};
11use hyper::{body::Incoming, service::service_fn};
12use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
13use saluki_common::{collections::FastIndexMap, iter::ReusableDeduplicator};
14use saluki_context::{tags::Tag, Context};
15use saluki_core::components::{destinations::*, ComponentContext};
16use saluki_core::data_model::event::{
17    metric::{Histogram, Metric, MetricValues},
18    EventType,
19};
20use saluki_error::GenericError;
21use saluki_io::net::{
22    listener::ConnectionOrientedListener,
23    server::http::{ErrorHandle, HttpServer, ShutdownHandle},
24    ListenAddress,
25};
26use serde::Deserialize;
27use stringtheory::{
28    interning::{FixedSizeInterner, Interner as _},
29    MetaString,
30};
31use tokio::{select, sync::RwLock};
32use tracing::debug;
33
34const CONTEXT_LIMIT: usize = 10_000;
35const PAYLOAD_SIZE_LIMIT_BYTES: usize = 1024 * 1024;
36const PAYLOAD_BUFFER_SIZE_LIMIT_BYTES: usize = 128 * 1024;
37const TAGS_BUFFER_SIZE_LIMIT_BYTES: usize = 2048;
38const NAME_NORMALIZATION_BUFFER_SIZE: usize = 512;
39
40// Histogram-related constants and pre-calculated buckets.
41const TIME_HISTOGRAM_BUCKET_COUNT: usize = 30;
42static TIME_HISTOGRAM_BUCKETS: LazyLock<[(f64, &'static str); TIME_HISTOGRAM_BUCKET_COUNT]> =
43    LazyLock::new(|| histogram_buckets::<TIME_HISTOGRAM_BUCKET_COUNT>(0.000000128, 4.0));
44
45const NON_TIME_HISTOGRAM_BUCKET_COUNT: usize = 30;
46static NON_TIME_HISTOGRAM_BUCKETS: LazyLock<[(f64, &'static str); NON_TIME_HISTOGRAM_BUCKET_COUNT]> =
47    LazyLock::new(|| histogram_buckets::<NON_TIME_HISTOGRAM_BUCKET_COUNT>(1.0, 2.0));
48
49// SAFETY: This is obviously not zero.
50const METRIC_NAME_STRING_INTERNER_BYTES: NonZeroUsize = NonZeroUsize::new(65536).unwrap();
51
52/// Prometheus destination.
53///
54/// Exposes a Prometheus scrape endpoint that emits metrics in the Prometheus exposition format.
55///
56/// # Limits
57///
58/// - Number of contexts (unique series) is limited to 10,000.
59/// - Maximum size of scrape payload response is ~1MiB.
60///
61/// # Missing
62///
63/// - no support for expiring metrics (which we don't really need because the only use for this destination at the
64///   moment is internal metrics, which aren't dynamic since we don't use dynamic tags or have dynamic topology support,
65///   but... you know, we'll eventually need this)
66/// - full support for distributions (we can't convert a distribution to an aggregated histogram, and native histogram
67///   support is still too fresh for most clients, so we simply expose aggregated summaries as a stopgap)
68///
69#[derive(Deserialize)]
70pub struct PrometheusConfiguration {
71    #[serde(rename = "prometheus_listen_addr")]
72    listen_addr: ListenAddress,
73}
74
75impl PrometheusConfiguration {
76    /// Creates a new `PrometheusConfiguration` for the given listen address.
77    pub fn from_listen_address(listen_addr: ListenAddress) -> Self {
78        Self { listen_addr }
79    }
80}
81
82#[async_trait]
83impl DestinationBuilder for PrometheusConfiguration {
84    fn input_event_type(&self) -> EventType {
85        EventType::Metric
86    }
87
88    async fn build(&self, _context: ComponentContext) -> Result<Box<dyn Destination + Send>, GenericError> {
89        Ok(Box::new(Prometheus {
90            listener: ConnectionOrientedListener::from_listen_address(self.listen_addr.clone()).await?,
91            metrics: FastIndexMap::default(),
92            payload: Arc::new(RwLock::new(String::new())),
93            payload_buffer: String::with_capacity(PAYLOAD_BUFFER_SIZE_LIMIT_BYTES),
94            tags_buffer: String::with_capacity(TAGS_BUFFER_SIZE_LIMIT_BYTES),
95            interner: FixedSizeInterner::new(METRIC_NAME_STRING_INTERNER_BYTES),
96        }))
97    }
98}
99
100impl MemoryBounds for PrometheusConfiguration {
101    fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
102        builder
103            .minimum()
104            // Capture the size of the heap allocation when the component is built.
105            .with_single_value::<Prometheus>("component struct")
106            // This isn't _really_ bounded since the string buffer could definitely grow larger if the metric name was
107            // larger, but the default buffer size is far beyond any typical metric name that it should almost never
108            // grow beyond this initially allocated size.
109            .with_fixed_amount("name normalization buffer size", NAME_NORMALIZATION_BUFFER_SIZE);
110
111        builder
112            .firm()
113            // Even though our context map is really the Prometheus context to a map of context/value pairs, we're just
114            // simplifying things here because the ratio of true "contexts" to Prometheus contexts should be very high,
115            // high enough to make this a reasonable approximation.
116            .with_map::<Context, PrometheusValue>("state map", CONTEXT_LIMIT)
117            .with_fixed_amount("payload size", PAYLOAD_SIZE_LIMIT_BYTES)
118            .with_fixed_amount("payload buffer", PAYLOAD_BUFFER_SIZE_LIMIT_BYTES)
119            .with_fixed_amount("tags buffer", TAGS_BUFFER_SIZE_LIMIT_BYTES);
120    }
121}
122
123struct Prometheus {
124    listener: ConnectionOrientedListener,
125    metrics: FastIndexMap<PrometheusContext, FastIndexMap<Context, PrometheusValue>>,
126    payload: Arc<RwLock<String>>,
127    payload_buffer: String,
128    tags_buffer: String,
129    interner: FixedSizeInterner<1>,
130}
131
132#[async_trait]
133impl Destination for Prometheus {
134    async fn run(mut self: Box<Self>, mut context: DestinationContext) -> Result<(), GenericError> {
135        let Self {
136            listener,
137            mut metrics,
138            payload,
139            mut payload_buffer,
140            mut tags_buffer,
141            interner,
142        } = *self;
143
144        let mut health = context.take_health_handle();
145
146        let (http_shutdown, mut http_error) = spawn_prom_scrape_service(listener, Arc::clone(&payload));
147        health.mark_ready();
148
149        debug!("Prometheus destination started.");
150
151        let mut contexts = 0;
152        let mut name_buf = String::with_capacity(NAME_NORMALIZATION_BUFFER_SIZE);
153        let mut tags_deduplicator = ReusableDeduplicator::new();
154
155        loop {
156            select! {
157                _ = health.live() => continue,
158                maybe_events = context.events().next() => match maybe_events {
159                    Some(events) => {
160                        // Process each metric event in the batch, either merging it with the existing value or
161                        // inserting it for the first time.
162                        for event in events {
163                            if let Some(metric) = event.try_into_metric() {
164                                // Break apart our metric into its constituent parts, and then normalize it for
165                                // Prometheus: adjust the name if necessary, figuring out the equivalent Prometheus
166                                // metric type, and so on.
167                                let prom_context = match into_prometheus_metric(&metric, &mut name_buf, &interner) {
168                                    Some(prom_context) => prom_context,
169                                    None => continue,
170                                };
171
172                                let (context, values, _) = metric.into_parts();
173
174                                // Create an entry for the context if we don't already have one, obeying our configured context limit.
175                                let existing_contexts = metrics.entry(prom_context.clone()).or_default();
176                                match existing_contexts.get_mut(&context) {
177                                    Some(existing_prom_value) => merge_metric_values_with_prom_value(values, existing_prom_value),
178                                    None => {
179                                        if contexts >= CONTEXT_LIMIT {
180                                            debug!("Prometheus destination reached context limit. Skipping metric '{}'.", context.name());
181                                            continue
182                                        }
183
184                                        let mut new_prom_value = get_prom_value_for_prom_context(&prom_context);
185                                        merge_metric_values_with_prom_value(values, &mut new_prom_value);
186
187                                        existing_contexts.insert(context, new_prom_value);
188                                        contexts += 1;
189                                    }
190                                }
191                            }
192                        }
193
194                        // Regenerate the scrape payload.
195                        regenerate_payload(&metrics, &payload, &mut payload_buffer, &mut tags_buffer, &mut tags_deduplicator).await;
196                    },
197                    None => break,
198                },
199                error = &mut http_error => {
200                    if let Some(error) = error {
201                        debug!(%error, "HTTP server error.");
202                    }
203                    break;
204                },
205            }
206        }
207
208        // TODO: This should really be `DynamicShutdownCoordinator`-based so we can trigger shutdown _and_ wait until
209        // all HTTP connections and the listener have finished.
210        http_shutdown.shutdown();
211
212        debug!("Prometheus destination stopped.");
213
214        Ok(())
215    }
216}
217
218fn spawn_prom_scrape_service(
219    listener: ConnectionOrientedListener, payload: Arc<RwLock<String>>,
220) -> (ShutdownHandle, ErrorHandle) {
221    let service = service_fn(move |_: Request<Incoming>| {
222        let payload = Arc::clone(&payload);
223        async move {
224            let payload = payload.read().await;
225            Ok::<Response<String>, Infallible>(Response::new(payload.to_string()))
226        }
227    });
228
229    let http_server = HttpServer::from_listener(listener, service);
230    http_server.listen()
231}
232
233#[allow(clippy::mutable_key_type)]
234async fn regenerate_payload(
235    metrics: &FastIndexMap<PrometheusContext, FastIndexMap<Context, PrometheusValue>>, payload: &Arc<RwLock<String>>,
236    payload_buffer: &mut String, tags_buffer: &mut String, tags_deduplicator: &mut ReusableDeduplicator<Tag>,
237) {
238    let mut payload = payload.write().await;
239    payload.clear();
240
241    let mut metrics_written = 0;
242    let metrics_total = metrics.len();
243
244    for (prom_context, contexts) in metrics {
245        if write_metrics(payload_buffer, tags_buffer, prom_context, contexts, tags_deduplicator) {
246            if payload.len() + payload_buffer.len() > PAYLOAD_SIZE_LIMIT_BYTES {
247                debug!(
248                    metrics_written,
249                    metrics_total,
250                    payload_len = payload.len(),
251                    "Writing additional metrics would exceed payload size limit. Skipping remaining metrics."
252                );
253                break;
254            }
255
256            // If we've already written some metrics, add a newline between each grouping.
257            if metrics_written > 0 {
258                payload.push('\n');
259            }
260
261            payload.push_str(payload_buffer);
262
263            metrics_written += 1;
264        } else {
265            debug!("Failed to write metric to payload. Continuing...");
266        }
267    }
268}
269
270fn get_help_text(metric_name: &str) -> Option<&'static str> {
271    // The HELP text for overlapped metrics MUST match the agent's HELP text exactly or else an error will occur on the
272    // agent's side when parsing the metrics.
273    match metric_name {
274        "no_aggregation__flush" => Some("Count the number of flushes done by the no-aggregation pipeline worker"),
275        "no_aggregation__processed" => {
276            Some("Count the number of samples processed by the no-aggregation pipeline worker")
277        }
278        "aggregator__dogstatsd_contexts_by_mtype" => {
279            Some("Count the number of dogstatsd contexts in the aggregator, by metric type")
280        }
281        "aggregator__flush" => Some("Number of metrics/service checks/events flushed"),
282        "aggregator__dogstatsd_contexts_bytes_by_mtype" => {
283            Some("Estimated count of bytes taken by contexts in the aggregator, by metric type")
284        }
285        "aggregator__dogstatsd_contexts" => Some("Count the number of dogstatsd contexts in the aggregator"),
286        "aggregator__processed" => Some("Amount of metrics/services_checks/events processed by the aggregator"),
287        "dogstatsd__processed" => Some("Count of service checks/events/metrics processed by dogstatsd"),
288        "dogstatsd__packet_pool_get" => Some("Count of get done in the packet pool"),
289        "dogstatsd__packet_pool_put" => Some("Count of put done in the packet pool"),
290        "dogstatsd__packet_pool" => Some("Usage of the packet pool in dogstatsd"),
291        _ => None,
292    }
293}
294
295fn write_metrics(
296    payload_buffer: &mut String, tags_buffer: &mut String, prom_context: &PrometheusContext,
297    contexts: &FastIndexMap<Context, PrometheusValue>, tags_deduplicator: &mut ReusableDeduplicator<Tag>,
298) -> bool {
299    if contexts.is_empty() {
300        debug!("No contexts for metric '{}'. Skipping.", prom_context.metric_name);
301        return true;
302    }
303
304    payload_buffer.clear();
305
306    // Write HELP if available
307    if let Some(help_text) = get_help_text(prom_context.metric_name.as_ref()) {
308        writeln!(payload_buffer, "# HELP {} {}", prom_context.metric_name, help_text).unwrap();
309    }
310    // Write the metric header.
311    writeln!(
312        payload_buffer,
313        "# TYPE {} {}",
314        prom_context.metric_name,
315        prom_context.metric_type.as_str()
316    )
317    .unwrap();
318
319    for (context, values) in contexts {
320        if payload_buffer.len() > PAYLOAD_BUFFER_SIZE_LIMIT_BYTES {
321            debug!("Payload buffer size limit exceeded. Additional contexts for this metric will be truncated.");
322            break;
323        }
324
325        tags_buffer.clear();
326
327        // Format/encode the tags.
328        if !format_tags(tags_buffer, context, tags_deduplicator) {
329            return false;
330        }
331
332        // Write the metric value itself.
333        match values {
334            PrometheusValue::Counter(value) | PrometheusValue::Gauge(value) => {
335                // No metric type-specific tags for counters or gauges, so just write them straight out.
336                payload_buffer.push_str(&prom_context.metric_name);
337                if !tags_buffer.is_empty() {
338                    payload_buffer.push('{');
339                    payload_buffer.push_str(tags_buffer);
340                    payload_buffer.push('}');
341                }
342                writeln!(payload_buffer, " {}", value).unwrap();
343            }
344            PrometheusValue::Histogram(histogram) => {
345                // Write the histogram buckets.
346                for (upper_bound_str, count) in histogram.buckets() {
347                    write!(payload_buffer, "{}_bucket{{{}", &prom_context.metric_name, tags_buffer).unwrap();
348                    if !tags_buffer.is_empty() {
349                        payload_buffer.push(',');
350                    }
351                    writeln!(payload_buffer, "le=\"{}\"}} {}", upper_bound_str, count).unwrap();
352                }
353
354                // Write the final bucket -- the +Inf bucket -- which is just equal to the count of the histogram.
355                write!(payload_buffer, "{}_bucket{{{}", &prom_context.metric_name, tags_buffer).unwrap();
356                if !tags_buffer.is_empty() {
357                    payload_buffer.push(',');
358                }
359                writeln!(payload_buffer, "le=\"+Inf\"}} {}", histogram.count).unwrap();
360
361                // Write the histogram sum and count.
362                write!(payload_buffer, "{}_sum", &prom_context.metric_name).unwrap();
363                if !tags_buffer.is_empty() {
364                    payload_buffer.push('{');
365                    payload_buffer.push_str(tags_buffer);
366                    payload_buffer.push('}');
367                }
368                writeln!(payload_buffer, " {}", histogram.sum).unwrap();
369
370                write!(payload_buffer, "{}_count", &prom_context.metric_name).unwrap();
371                if !tags_buffer.is_empty() {
372                    payload_buffer.push('{');
373                    payload_buffer.push_str(tags_buffer);
374                    payload_buffer.push('}');
375                }
376                writeln!(payload_buffer, " {}", histogram.count).unwrap();
377            }
378            PrometheusValue::Summary(sketch) => {
379                // We take a fixed set of quantiles from the sketch, which is hard-coded but should generally represent
380                // the quantiles people generally care about.
381                for quantile in [0.1, 0.25, 0.5, 0.95, 0.99, 0.999] {
382                    let q_value = sketch.quantile(quantile).unwrap_or_default();
383
384                    write!(payload_buffer, "{}{{{}", &prom_context.metric_name, tags_buffer).unwrap();
385                    if !tags_buffer.is_empty() {
386                        payload_buffer.push(',');
387                    }
388                    writeln!(payload_buffer, "quantile=\"{}\"}} {}", quantile, q_value).unwrap();
389                }
390
391                write!(payload_buffer, "{}_sum", &prom_context.metric_name).unwrap();
392                if !tags_buffer.is_empty() {
393                    payload_buffer.push('{');
394                    payload_buffer.push_str(tags_buffer);
395                    payload_buffer.push('}');
396                }
397                writeln!(payload_buffer, " {}", sketch.sum().unwrap_or_default()).unwrap();
398
399                write!(payload_buffer, "{}_count", &prom_context.metric_name).unwrap();
400                if !tags_buffer.is_empty() {
401                    payload_buffer.push('{');
402                    payload_buffer.push_str(tags_buffer);
403                    payload_buffer.push('}');
404                }
405                writeln!(payload_buffer, " {}", sketch.count()).unwrap();
406            }
407        }
408    }
409
410    true
411}
412
413fn format_tags(tags_buffer: &mut String, context: &Context, tags_deduplicator: &mut ReusableDeduplicator<Tag>) -> bool {
414    let mut has_tags = false;
415
416    let chained_tags = context.tags().into_iter().chain(context.origin_tags());
417    let deduplicated_tags = tags_deduplicator.deduplicated(chained_tags);
418
419    for tag in deduplicated_tags {
420        // If we're not the first tag to be written, add a comma to separate the tags.
421        if has_tags {
422            tags_buffer.push(',');
423        }
424
425        let tag_name = tag.name();
426        let tag_value = match tag.value() {
427            Some(value) => value,
428            None => {
429                debug!("Skipping bare tag.");
430                continue;
431            }
432        };
433
434        has_tags = true;
435
436        // Can't exceed the tags buffer size limit: we calculate the addition as tag name/value length plus three bytes
437        // to account for having to format it as `name="value",`.
438        if tags_buffer.len() + tag_name.len() + tag_value.len() + 4 > TAGS_BUFFER_SIZE_LIMIT_BYTES {
439            debug!("Tags buffer size limit exceeded. Tags may be missing from this metric.");
440            return false;
441        }
442
443        write!(tags_buffer, "{}=\"{}\"", tag_name, tag_value).unwrap();
444    }
445
446    true
447}
448
449#[derive(Clone, Copy, Eq, Hash, Ord, PartialEq, PartialOrd)]
450enum PrometheusType {
451    Counter,
452    Gauge,
453    Histogram,
454    Summary,
455}
456
457impl PrometheusType {
458    fn as_str(&self) -> &'static str {
459        match self {
460            Self::Counter => "counter",
461            Self::Gauge => "gauge",
462            Self::Histogram => "histogram",
463            Self::Summary => "summary",
464        }
465    }
466}
467
468#[derive(Clone, Eq, Hash, Ord, PartialEq, PartialOrd)]
469struct PrometheusContext {
470    metric_name: MetaString,
471    metric_type: PrometheusType,
472}
473
474enum PrometheusValue {
475    Counter(f64),
476    Gauge(f64),
477    Histogram(PrometheusHistogram),
478    Summary(DDSketch),
479}
480
481fn into_prometheus_metric(
482    metric: &Metric, name_buf: &mut String, interner: &FixedSizeInterner<1>,
483) -> Option<PrometheusContext> {
484    // Normalize the metric name first, since we might fail due to the interner being full.
485    let metric_name = match normalize_metric_name(metric.context().name(), name_buf, interner) {
486        Some(name) => name,
487        None => {
488            debug!(
489                "Failed to intern normalized metric name. Skipping metric '{}'.",
490                metric.context().name()
491            );
492            return None;
493        }
494    };
495
496    let metric_type = match metric.values() {
497        MetricValues::Counter(_) => PrometheusType::Counter,
498        MetricValues::Gauge(_) | MetricValues::Set(_) => PrometheusType::Gauge,
499        MetricValues::Histogram(_) => PrometheusType::Histogram,
500        MetricValues::Distribution(_) => PrometheusType::Summary,
501        _ => return None,
502    };
503
504    Some(PrometheusContext {
505        metric_name,
506        metric_type,
507    })
508}
509
510fn get_prom_value_for_prom_context(prom_context: &PrometheusContext) -> PrometheusValue {
511    match prom_context.metric_type {
512        PrometheusType::Counter => PrometheusValue::Counter(0.0),
513        PrometheusType::Gauge => PrometheusValue::Gauge(0.0),
514        PrometheusType::Histogram => PrometheusValue::Histogram(PrometheusHistogram::new(&prom_context.metric_name)),
515        PrometheusType::Summary => PrometheusValue::Summary(DDSketch::default()),
516    }
517}
518
519fn merge_metric_values_with_prom_value(values: MetricValues, prom_value: &mut PrometheusValue) {
520    match (values, prom_value) {
521        (MetricValues::Counter(counter_values), PrometheusValue::Counter(prom_counter)) => {
522            for (_, value) in counter_values {
523                *prom_counter += value;
524            }
525        }
526        (MetricValues::Gauge(gauge_values), PrometheusValue::Gauge(prom_gauge)) => {
527            let latest_value = gauge_values
528                .into_iter()
529                .max_by_key(|(ts, _)| ts.map(|v| v.get()).unwrap_or_default())
530                .map(|(_, value)| value)
531                .unwrap_or_default();
532            *prom_gauge = latest_value;
533        }
534        (MetricValues::Set(set_values), PrometheusValue::Gauge(prom_gauge)) => {
535            let latest_value = set_values
536                .into_iter()
537                .max_by_key(|(ts, _)| ts.map(|v| v.get()).unwrap_or_default())
538                .map(|(_, value)| value)
539                .unwrap_or_default();
540            *prom_gauge = latest_value;
541        }
542        (MetricValues::Histogram(histogram_values), PrometheusValue::Histogram(prom_histogram)) => {
543            for (_, value) in histogram_values {
544                prom_histogram.merge_histogram(&value);
545            }
546        }
547        (MetricValues::Distribution(distribution_values), PrometheusValue::Summary(prom_summary)) => {
548            for (_, value) in distribution_values {
549                prom_summary.merge(&value);
550            }
551        }
552        _ => panic!("Mismatched metric types"),
553    }
554}
555
556fn normalize_metric_name(name: &str, name_buf: &mut String, interner: &FixedSizeInterner<1>) -> Option<MetaString> {
557    name_buf.clear();
558
559    // Normalize the metric name to a valid Prometheus metric name.
560    for (i, c) in name.chars().enumerate() {
561        if i == 0 && is_valid_name_start_char(c) || i != 0 && is_valid_name_char(c) {
562            name_buf.push(c);
563        } else {
564            // Convert periods to a set of two underscores, and anything else to a single underscore.
565            //
566            // This lets us ensure that the normal separators we use in metrics (periods) are converted in a way
567            // where they can be distinguished on the collector side to potentially reconstitute them back to their
568            // original form.
569            name_buf.push_str(if c == '.' { "__" } else { "_" });
570        }
571    }
572
573    // Now try and intern the normalized name.
574    interner.try_intern(name_buf).map(MetaString::from)
575}
576
577#[inline]
578fn is_valid_name_start_char(c: char) -> bool {
579    // Matches a regular expression of [a-zA-Z_:].
580    c.is_ascii_alphabetic() || c == '_' || c == ':'
581}
582
583#[inline]
584fn is_valid_name_char(c: char) -> bool {
585    // Matches a regular expression of [a-zA-Z0-9_:].
586    c.is_ascii_alphanumeric() || c == '_' || c == ':'
587}
588
589#[derive(Clone)]
590struct PrometheusHistogram {
591    sum: f64,
592    count: u64,
593    buckets: Vec<(f64, &'static str, u64)>,
594}
595
596impl PrometheusHistogram {
597    fn new(metric_name: &str) -> Self {
598        // Super hacky but effective way to decide when to switch to the time-oriented buckets.
599        let base_buckets = if metric_name.ends_with("_seconds") {
600            &TIME_HISTOGRAM_BUCKETS[..]
601        } else {
602            &NON_TIME_HISTOGRAM_BUCKETS[..]
603        };
604
605        let buckets = base_buckets
606            .iter()
607            .map(|(upper_bound, upper_bound_str)| (*upper_bound, *upper_bound_str, 0))
608            .collect();
609
610        Self {
611            sum: 0.0,
612            count: 0,
613            buckets,
614        }
615    }
616
617    fn merge_histogram(&mut self, histogram: &Histogram) {
618        for sample in histogram.samples() {
619            self.add_sample(sample.value.into_inner(), sample.weight);
620        }
621    }
622
623    fn add_sample(&mut self, value: f64, weight: u64) {
624        self.sum += value * weight as f64;
625        self.count += weight;
626
627        // Add the value to each bucket that it falls into, up to the maximum number of buckets.
628        for (upper_bound, _, count) in &mut self.buckets {
629            if value <= *upper_bound {
630                *count += weight;
631            }
632        }
633    }
634
635    fn buckets(&self) -> impl Iterator<Item = (&'static str, u64)> + '_ {
636        self.buckets
637            .iter()
638            .map(|(_, upper_bound_str, count)| (*upper_bound_str, *count))
639    }
640}
641
642fn histogram_buckets<const N: usize>(base: f64, scale: f64) -> [(f64, &'static str); N] {
643    // We generate a set of "log-linear" buckets: logarithmically spaced values which are then subdivided linearly.
644    //
645    // As an example, with base=2 and scale=4, we would get: 2, 5, 8, 20, 32, 80, 128, 320, 512, and so on.
646    //
647    // We calculate buckets in pairs, where the n-th pair is `i` and `j`, such that `i` is `base * scale^n` and `j` is
648    // the midpoint between `i` and the next `i` (`base * scale^(n+1)`).
649
650    let mut buckets = [(0.0, ""); N];
651
652    let log_linear_buckets = std::iter::repeat(base).enumerate().flat_map(|(i, base)| {
653        let pow = scale.powf(i as f64);
654        let value = base * pow;
655
656        let next_pow = scale.powf((i + 1) as f64);
657        let next_value = base * next_pow;
658        let midpoint = (value + next_value) / 2.0;
659
660        [value, midpoint]
661    });
662
663    for (i, current_le) in log_linear_buckets.enumerate().take(N) {
664        let (bucket_le, bucket_le_str) = &mut buckets[i];
665        let current_le_str = format!("{}", current_le);
666
667        *bucket_le = current_le;
668        *bucket_le_str = current_le_str.leak();
669    }
670
671    buckets
672}
673
674#[cfg(test)]
675mod tests {
676    use super::*;
677
678    #[test]
679    fn bucket_print() {
680        println!("time buckets: {:?}", *TIME_HISTOGRAM_BUCKETS);
681        println!("non-time buckets: {:?}", *NON_TIME_HISTOGRAM_BUCKETS);
682    }
683
684    #[test]
685    fn prom_histogram_add_sample() {
686        let sample1 = (0.25, 1);
687        let sample2 = (1.0, 2);
688        let sample3 = (2.0, 3);
689
690        let mut histogram = PrometheusHistogram::new("time_metric_seconds");
691        histogram.add_sample(sample1.0, sample1.1);
692        histogram.add_sample(sample2.0, sample2.1);
693        histogram.add_sample(sample3.0, sample3.1);
694
695        let sample1_weighted_value = sample1.0 * sample1.1 as f64;
696        let sample2_weighted_value = sample2.0 * sample2.1 as f64;
697        let sample3_weighted_value = sample3.0 * sample3.1 as f64;
698        let expected_sum = sample1_weighted_value + sample2_weighted_value + sample3_weighted_value;
699        let expected_count = sample1.1 + sample2.1 + sample3.1;
700        assert_eq!(histogram.sum, expected_sum);
701        assert_eq!(histogram.count, expected_count);
702
703        // Go through and make sure we have things in the right buckets.
704        let mut expected_bucket_count = 0;
705        for sample in [sample1, sample2, sample3] {
706            for bucket in &histogram.buckets {
707                // If we've finally hit a bucket that includes our sample value, it's count should be equal to or
708                // greater than our expected bucket count when we account for the current sample.
709                if sample.0 <= bucket.0 {
710                    assert!(bucket.2 >= expected_bucket_count + sample.1);
711                }
712            }
713
714            // Adjust the expected bucket count to fully account for the current sample before moving on.
715            expected_bucket_count += sample.1;
716        }
717    }
718
719    #[test]
720    fn prom_get_help_text() {
721        // Ensure that we catch when the help text changes for these metrics.
722        assert_eq!(
723            get_help_text("no_aggregation__flush"),
724            Some("Count the number of flushes done by the no-aggregation pipeline worker")
725        );
726        assert_eq!(
727            get_help_text("no_aggregation__processed"),
728            Some("Count the number of samples processed by the no-aggregation pipeline worker")
729        );
730        assert_eq!(
731            get_help_text("aggregator__dogstatsd_contexts_by_mtype"),
732            Some("Count the number of dogstatsd contexts in the aggregator, by metric type")
733        );
734        assert_eq!(
735            get_help_text("aggregator__flush"),
736            Some("Number of metrics/service checks/events flushed")
737        );
738        assert_eq!(
739            get_help_text("aggregator__dogstatsd_contexts_bytes_by_mtype"),
740            Some("Estimated count of bytes taken by contexts in the aggregator, by metric type")
741        );
742        assert_eq!(
743            get_help_text("aggregator__dogstatsd_contexts"),
744            Some("Count the number of dogstatsd contexts in the aggregator")
745        );
746        assert_eq!(
747            get_help_text("aggregator__processed"),
748            Some("Amount of metrics/services_checks/events processed by the aggregator")
749        );
750    }
751}