saluki_components/destinations/prometheus/
mod.rs

1use std::{
2    convert::Infallible,
3    fmt::Write as _,
4    num::NonZeroUsize,
5    sync::{Arc, LazyLock},
6};
7
8use async_trait::async_trait;
9use ddsketch_agent::DDSketch;
10use http::{Request, Response};
11use hyper::{body::Incoming, service::service_fn};
12use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
13use saluki_common::{collections::FastIndexMap, iter::ReusableDeduplicator};
14use saluki_config::GenericConfiguration;
15use saluki_context::{tags::Tag, Context};
16use saluki_core::components::{destinations::*, ComponentContext};
17use saluki_core::data_model::event::{
18    metric::{Histogram, Metric, MetricValues},
19    EventType,
20};
21use saluki_error::GenericError;
22use saluki_io::net::{
23    listener::ConnectionOrientedListener,
24    server::http::{ErrorHandle, HttpServer, ShutdownHandle},
25    ListenAddress,
26};
27use serde::Deserialize;
28use stringtheory::{
29    interning::{FixedSizeInterner, Interner as _},
30    MetaString,
31};
32use tokio::{select, sync::RwLock};
33use tracing::debug;
34
35const CONTEXT_LIMIT: usize = 10_000;
36const PAYLOAD_SIZE_LIMIT_BYTES: usize = 1024 * 1024;
37const PAYLOAD_BUFFER_SIZE_LIMIT_BYTES: usize = 128 * 1024;
38const TAGS_BUFFER_SIZE_LIMIT_BYTES: usize = 2048;
39const NAME_NORMALIZATION_BUFFER_SIZE: usize = 512;
40
41// Histogram-related constants and pre-calculated buckets.
42const TIME_HISTOGRAM_BUCKET_COUNT: usize = 30;
43static TIME_HISTOGRAM_BUCKETS: LazyLock<[(f64, &'static str); TIME_HISTOGRAM_BUCKET_COUNT]> =
44    LazyLock::new(|| histogram_buckets::<TIME_HISTOGRAM_BUCKET_COUNT>(0.000000128, 4.0));
45
46const NON_TIME_HISTOGRAM_BUCKET_COUNT: usize = 30;
47static NON_TIME_HISTOGRAM_BUCKETS: LazyLock<[(f64, &'static str); NON_TIME_HISTOGRAM_BUCKET_COUNT]> =
48    LazyLock::new(|| histogram_buckets::<NON_TIME_HISTOGRAM_BUCKET_COUNT>(1.0, 2.0));
49
50// SAFETY: This is obviously not zero.
51const METRIC_NAME_STRING_INTERNER_BYTES: NonZeroUsize = NonZeroUsize::new(65536).unwrap();
52
53/// Prometheus destination.
54///
55/// Exposes a Prometheus scrape endpoint that emits metrics in the Prometheus exposition format.
56///
57/// # Limits
58///
59/// - Number of contexts (unique series) is limited to 10,000.
60/// - Maximum size of scrape payload response is ~1MiB.
61///
62/// # Missing
63///
64/// - no support for expiring metrics (which we don't really need because the only use for this destination at the
65///   moment is internal metrics, which aren't dynamic since we don't use dynamic tags or have dynamic topology support,
66///   but... you know, we'll eventually need this)
67/// - full support for distributions (we can't convert a distribution to an aggregated histogram, and native histogram
68///   support is still too fresh for most clients, so we simply expose aggregated summaries as a stopgap)
69///
70#[derive(Deserialize)]
71pub struct PrometheusConfiguration {
72    #[serde(rename = "prometheus_listen_addr")]
73    listen_addr: ListenAddress,
74}
75
76impl PrometheusConfiguration {
77    /// Creates a new `PrometheusConfiguration` from the given configuration.
78    pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
79        Ok(config.as_typed()?)
80    }
81
82    /// Returns the listen address for the Prometheus scrape endpoint.
83    pub fn listen_address(&self) -> &ListenAddress {
84        &self.listen_addr
85    }
86}
87
88#[async_trait]
89impl DestinationBuilder for PrometheusConfiguration {
90    fn input_event_type(&self) -> EventType {
91        EventType::Metric
92    }
93
94    async fn build(&self, _context: ComponentContext) -> Result<Box<dyn Destination + Send>, GenericError> {
95        Ok(Box::new(Prometheus {
96            listener: ConnectionOrientedListener::from_listen_address(self.listen_addr.clone()).await?,
97            metrics: FastIndexMap::default(),
98            payload: Arc::new(RwLock::new(String::new())),
99            payload_buffer: String::with_capacity(PAYLOAD_BUFFER_SIZE_LIMIT_BYTES),
100            tags_buffer: String::with_capacity(TAGS_BUFFER_SIZE_LIMIT_BYTES),
101            interner: FixedSizeInterner::new(METRIC_NAME_STRING_INTERNER_BYTES),
102        }))
103    }
104}
105
106impl MemoryBounds for PrometheusConfiguration {
107    fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
108        builder
109            .minimum()
110            // Capture the size of the heap allocation when the component is built.
111            .with_single_value::<Prometheus>("component struct")
112            // This isn't _really_ bounded since the string buffer could definitely grow larger if the metric name was
113            // larger, but the default buffer size is far beyond any typical metric name that it should almost never
114            // grow beyond this initially allocated size.
115            .with_fixed_amount("name normalization buffer size", NAME_NORMALIZATION_BUFFER_SIZE);
116
117        builder
118            .firm()
119            // Even though our context map is really the Prometheus context to a map of context/value pairs, we're just
120            // simplifying things here because the ratio of true "contexts" to Prometheus contexts should be very high,
121            // high enough to make this a reasonable approximation.
122            .with_map::<Context, PrometheusValue>("state map", CONTEXT_LIMIT)
123            .with_fixed_amount("payload size", PAYLOAD_SIZE_LIMIT_BYTES)
124            .with_fixed_amount("payload buffer", PAYLOAD_BUFFER_SIZE_LIMIT_BYTES)
125            .with_fixed_amount("tags buffer", TAGS_BUFFER_SIZE_LIMIT_BYTES);
126    }
127}
128
129struct Prometheus {
130    listener: ConnectionOrientedListener,
131    metrics: FastIndexMap<PrometheusContext, FastIndexMap<Context, PrometheusValue>>,
132    payload: Arc<RwLock<String>>,
133    payload_buffer: String,
134    tags_buffer: String,
135    interner: FixedSizeInterner<1>,
136}
137
138#[async_trait]
139impl Destination for Prometheus {
140    async fn run(mut self: Box<Self>, mut context: DestinationContext) -> Result<(), GenericError> {
141        let Self {
142            listener,
143            mut metrics,
144            payload,
145            mut payload_buffer,
146            mut tags_buffer,
147            interner,
148        } = *self;
149
150        let mut health = context.take_health_handle();
151
152        let (http_shutdown, mut http_error) = spawn_prom_scrape_service(listener, Arc::clone(&payload));
153        health.mark_ready();
154
155        debug!("Prometheus destination started.");
156
157        let mut contexts = 0;
158        let mut name_buf = String::with_capacity(NAME_NORMALIZATION_BUFFER_SIZE);
159        let mut tags_deduplicator = ReusableDeduplicator::new();
160
161        loop {
162            select! {
163                _ = health.live() => continue,
164                maybe_events = context.events().next() => match maybe_events {
165                    Some(events) => {
166                        // Process each metric event in the batch, either merging it with the existing value or
167                        // inserting it for the first time.
168                        for event in events {
169                            if let Some(metric) = event.try_into_metric() {
170                                // Break apart our metric into its constituent parts, and then normalize it for
171                                // Prometheus: adjust the name if necessary, figuring out the equivalent Prometheus
172                                // metric type, and so on.
173                                let prom_context = match into_prometheus_metric(&metric, &mut name_buf, &interner) {
174                                    Some(prom_context) => prom_context,
175                                    None => continue,
176                                };
177
178                                let (context, values, _) = metric.into_parts();
179
180                                // Create an entry for the context if we don't already have one, obeying our configured context limit.
181                                let existing_contexts = metrics.entry(prom_context.clone()).or_default();
182                                match existing_contexts.get_mut(&context) {
183                                    Some(existing_prom_value) => merge_metric_values_with_prom_value(values, existing_prom_value),
184                                    None => {
185                                        if contexts >= CONTEXT_LIMIT {
186                                            debug!("Prometheus destination reached context limit. Skipping metric '{}'.", context.name());
187                                            continue
188                                        }
189
190                                        let mut new_prom_value = get_prom_value_for_prom_context(&prom_context);
191                                        merge_metric_values_with_prom_value(values, &mut new_prom_value);
192
193                                        existing_contexts.insert(context, new_prom_value);
194                                        contexts += 1;
195                                    }
196                                }
197                            }
198                        }
199
200                        // Regenerate the scrape payload.
201                        regenerate_payload(&metrics, &payload, &mut payload_buffer, &mut tags_buffer, &mut tags_deduplicator).await;
202                    },
203                    None => break,
204                },
205                error = &mut http_error => {
206                    if let Some(error) = error {
207                        debug!(%error, "HTTP server error.");
208                    }
209                    break;
210                },
211            }
212        }
213
214        // TODO: This should really be `DynamicShutdownCoordinator`-based so we can trigger shutdown _and_ wait until
215        // all HTTP connections and the listener have finished.
216        http_shutdown.shutdown();
217
218        debug!("Prometheus destination stopped.");
219
220        Ok(())
221    }
222}
223
224fn spawn_prom_scrape_service(
225    listener: ConnectionOrientedListener, payload: Arc<RwLock<String>>,
226) -> (ShutdownHandle, ErrorHandle) {
227    let service = service_fn(move |_: Request<Incoming>| {
228        let payload = Arc::clone(&payload);
229        async move {
230            let payload = payload.read().await;
231            Ok::<Response<String>, Infallible>(Response::new(payload.to_string()))
232        }
233    });
234
235    let http_server = HttpServer::from_listener(listener, service);
236    http_server.listen()
237}
238
239#[allow(clippy::mutable_key_type)]
240async fn regenerate_payload(
241    metrics: &FastIndexMap<PrometheusContext, FastIndexMap<Context, PrometheusValue>>, payload: &Arc<RwLock<String>>,
242    payload_buffer: &mut String, tags_buffer: &mut String, tags_deduplicator: &mut ReusableDeduplicator<Tag>,
243) {
244    let mut payload = payload.write().await;
245    payload.clear();
246
247    let mut metrics_written = 0;
248    let metrics_total = metrics.len();
249
250    for (prom_context, contexts) in metrics {
251        if write_metrics(payload_buffer, tags_buffer, prom_context, contexts, tags_deduplicator) {
252            if payload.len() + payload_buffer.len() > PAYLOAD_SIZE_LIMIT_BYTES {
253                debug!(
254                    metrics_written,
255                    metrics_total,
256                    payload_len = payload.len(),
257                    "Writing additional metrics would exceed payload size limit. Skipping remaining metrics."
258                );
259                break;
260            }
261
262            // If we've already written some metrics, add a newline between each grouping.
263            if metrics_written > 0 {
264                payload.push('\n');
265            }
266
267            payload.push_str(payload_buffer);
268
269            metrics_written += 1;
270        } else {
271            debug!("Failed to write metric to payload. Continuing...");
272        }
273    }
274}
275
276fn get_help_text(metric_name: &str) -> Option<&'static str> {
277    // The HELP text for overlapped metrics MUST match the agent's HELP text exactly or else an error will occur on the
278    // agent's side when parsing the metrics.
279    match metric_name {
280        "no_aggregation__flush" => Some("Count the number of flushes done by the no-aggregation pipeline worker"),
281        "no_aggregation__processed" => {
282            Some("Count the number of samples processed by the no-aggregation pipeline worker")
283        }
284        "aggregator__dogstatsd_contexts_by_mtype" => {
285            Some("Count the number of dogstatsd contexts in the aggregator, by metric type")
286        }
287        "aggregator__flush" => Some("Number of metrics/service checks/events flushed"),
288        "aggregator__dogstatsd_contexts_bytes_by_mtype" => {
289            Some("Estimated count of bytes taken by contexts in the aggregator, by metric type")
290        }
291        "aggregator__dogstatsd_contexts" => Some("Count the number of dogstatsd contexts in the aggregator"),
292        "aggregator__processed" => Some("Amount of metrics/services_checks/events processed by the aggregator"),
293        "dogstatsd__processed" => Some("Count of service checks/events/metrics processed by dogstatsd"),
294        "dogstatsd__packet_pool_get" => Some("Count of get done in the packet pool"),
295        "dogstatsd__packet_pool_put" => Some("Count of put done in the packet pool"),
296        "dogstatsd__packet_pool" => Some("Usage of the packet pool in dogstatsd"),
297        _ => None,
298    }
299}
300
301fn write_metrics(
302    payload_buffer: &mut String, tags_buffer: &mut String, prom_context: &PrometheusContext,
303    contexts: &FastIndexMap<Context, PrometheusValue>, tags_deduplicator: &mut ReusableDeduplicator<Tag>,
304) -> bool {
305    if contexts.is_empty() {
306        debug!("No contexts for metric '{}'. Skipping.", prom_context.metric_name);
307        return true;
308    }
309
310    payload_buffer.clear();
311
312    // Write HELP if available
313    if let Some(help_text) = get_help_text(prom_context.metric_name.as_ref()) {
314        writeln!(payload_buffer, "# HELP {} {}", prom_context.metric_name, help_text).unwrap();
315    }
316    // Write the metric header.
317    writeln!(
318        payload_buffer,
319        "# TYPE {} {}",
320        prom_context.metric_name,
321        prom_context.metric_type.as_str()
322    )
323    .unwrap();
324
325    for (context, values) in contexts {
326        if payload_buffer.len() > PAYLOAD_BUFFER_SIZE_LIMIT_BYTES {
327            debug!("Payload buffer size limit exceeded. Additional contexts for this metric will be truncated.");
328            break;
329        }
330
331        tags_buffer.clear();
332
333        // Format/encode the tags.
334        if !format_tags(tags_buffer, context, tags_deduplicator) {
335            return false;
336        }
337
338        // Write the metric value itself.
339        match values {
340            PrometheusValue::Counter(value) | PrometheusValue::Gauge(value) => {
341                // No metric type-specific tags for counters or gauges, so just write them straight out.
342                payload_buffer.push_str(&prom_context.metric_name);
343                if !tags_buffer.is_empty() {
344                    payload_buffer.push('{');
345                    payload_buffer.push_str(tags_buffer);
346                    payload_buffer.push('}');
347                }
348                writeln!(payload_buffer, " {}", value).unwrap();
349            }
350            PrometheusValue::Histogram(histogram) => {
351                // Write the histogram buckets.
352                for (upper_bound_str, count) in histogram.buckets() {
353                    write!(payload_buffer, "{}_bucket{{{}", &prom_context.metric_name, tags_buffer).unwrap();
354                    if !tags_buffer.is_empty() {
355                        payload_buffer.push(',');
356                    }
357                    writeln!(payload_buffer, "le=\"{}\"}} {}", upper_bound_str, count).unwrap();
358                }
359
360                // Write the final bucket -- the +Inf bucket -- which is just equal to the count of the histogram.
361                write!(payload_buffer, "{}_bucket{{{}", &prom_context.metric_name, tags_buffer).unwrap();
362                if !tags_buffer.is_empty() {
363                    payload_buffer.push(',');
364                }
365                writeln!(payload_buffer, "le=\"+Inf\"}} {}", histogram.count).unwrap();
366
367                // Write the histogram sum and count.
368                write!(payload_buffer, "{}_sum", &prom_context.metric_name).unwrap();
369                if !tags_buffer.is_empty() {
370                    payload_buffer.push('{');
371                    payload_buffer.push_str(tags_buffer);
372                    payload_buffer.push('}');
373                }
374                writeln!(payload_buffer, " {}", histogram.sum).unwrap();
375
376                write!(payload_buffer, "{}_count", &prom_context.metric_name).unwrap();
377                if !tags_buffer.is_empty() {
378                    payload_buffer.push('{');
379                    payload_buffer.push_str(tags_buffer);
380                    payload_buffer.push('}');
381                }
382                writeln!(payload_buffer, " {}", histogram.count).unwrap();
383            }
384            PrometheusValue::Summary(sketch) => {
385                // We take a fixed set of quantiles from the sketch, which is hard-coded but should generally represent
386                // the quantiles people generally care about.
387                for quantile in [0.1, 0.25, 0.5, 0.95, 0.99, 0.999] {
388                    let q_value = sketch.quantile(quantile).unwrap_or_default();
389
390                    write!(payload_buffer, "{}{{{}", &prom_context.metric_name, tags_buffer).unwrap();
391                    if !tags_buffer.is_empty() {
392                        payload_buffer.push(',');
393                    }
394                    writeln!(payload_buffer, "quantile=\"{}\"}} {}", quantile, q_value).unwrap();
395                }
396
397                write!(payload_buffer, "{}_sum", &prom_context.metric_name).unwrap();
398                if !tags_buffer.is_empty() {
399                    payload_buffer.push('{');
400                    payload_buffer.push_str(tags_buffer);
401                    payload_buffer.push('}');
402                }
403                writeln!(payload_buffer, " {}", sketch.sum().unwrap_or_default()).unwrap();
404
405                write!(payload_buffer, "{}_count", &prom_context.metric_name).unwrap();
406                if !tags_buffer.is_empty() {
407                    payload_buffer.push('{');
408                    payload_buffer.push_str(tags_buffer);
409                    payload_buffer.push('}');
410                }
411                writeln!(payload_buffer, " {}", sketch.count()).unwrap();
412            }
413        }
414    }
415
416    true
417}
418
419fn format_tags(tags_buffer: &mut String, context: &Context, tags_deduplicator: &mut ReusableDeduplicator<Tag>) -> bool {
420    let mut has_tags = false;
421
422    let chained_tags = context.tags().into_iter().chain(context.origin_tags());
423    let deduplicated_tags = tags_deduplicator.deduplicated(chained_tags);
424
425    for tag in deduplicated_tags {
426        // If we're not the first tag to be written, add a comma to separate the tags.
427        if has_tags {
428            tags_buffer.push(',');
429        }
430
431        let tag_name = tag.name();
432        let tag_value = match tag.value() {
433            Some(value) => value,
434            None => {
435                debug!("Skipping bare tag.");
436                continue;
437            }
438        };
439
440        has_tags = true;
441
442        // Can't exceed the tags buffer size limit: we calculate the addition as tag name/value length plus three bytes
443        // to account for having to format it as `name="value",`.
444        if tags_buffer.len() + tag_name.len() + tag_value.len() + 4 > TAGS_BUFFER_SIZE_LIMIT_BYTES {
445            debug!("Tags buffer size limit exceeded. Tags may be missing from this metric.");
446            return false;
447        }
448
449        write!(tags_buffer, "{}=\"{}\"", tag_name, tag_value).unwrap();
450    }
451
452    true
453}
454
455#[derive(Clone, Copy, Eq, Hash, Ord, PartialEq, PartialOrd)]
456enum PrometheusType {
457    Counter,
458    Gauge,
459    Histogram,
460    Summary,
461}
462
463impl PrometheusType {
464    fn as_str(&self) -> &'static str {
465        match self {
466            Self::Counter => "counter",
467            Self::Gauge => "gauge",
468            Self::Histogram => "histogram",
469            Self::Summary => "summary",
470        }
471    }
472}
473
474#[derive(Clone, Eq, Hash, Ord, PartialEq, PartialOrd)]
475struct PrometheusContext {
476    metric_name: MetaString,
477    metric_type: PrometheusType,
478}
479
480enum PrometheusValue {
481    Counter(f64),
482    Gauge(f64),
483    Histogram(PrometheusHistogram),
484    Summary(DDSketch),
485}
486
487fn into_prometheus_metric(
488    metric: &Metric, name_buf: &mut String, interner: &FixedSizeInterner<1>,
489) -> Option<PrometheusContext> {
490    // Normalize the metric name first, since we might fail due to the interner being full.
491    let metric_name = match normalize_metric_name(metric.context().name(), name_buf, interner) {
492        Some(name) => name,
493        None => {
494            debug!(
495                "Failed to intern normalized metric name. Skipping metric '{}'.",
496                metric.context().name()
497            );
498            return None;
499        }
500    };
501
502    let metric_type = match metric.values() {
503        MetricValues::Counter(_) => PrometheusType::Counter,
504        MetricValues::Gauge(_) | MetricValues::Set(_) => PrometheusType::Gauge,
505        MetricValues::Histogram(_) => PrometheusType::Histogram,
506        MetricValues::Distribution(_) => PrometheusType::Summary,
507        _ => return None,
508    };
509
510    Some(PrometheusContext {
511        metric_name,
512        metric_type,
513    })
514}
515
516fn get_prom_value_for_prom_context(prom_context: &PrometheusContext) -> PrometheusValue {
517    match prom_context.metric_type {
518        PrometheusType::Counter => PrometheusValue::Counter(0.0),
519        PrometheusType::Gauge => PrometheusValue::Gauge(0.0),
520        PrometheusType::Histogram => PrometheusValue::Histogram(PrometheusHistogram::new(&prom_context.metric_name)),
521        PrometheusType::Summary => PrometheusValue::Summary(DDSketch::default()),
522    }
523}
524
525fn merge_metric_values_with_prom_value(values: MetricValues, prom_value: &mut PrometheusValue) {
526    match (values, prom_value) {
527        (MetricValues::Counter(counter_values), PrometheusValue::Counter(prom_counter)) => {
528            for (_, value) in counter_values {
529                *prom_counter += value;
530            }
531        }
532        (MetricValues::Gauge(gauge_values), PrometheusValue::Gauge(prom_gauge)) => {
533            let latest_value = gauge_values
534                .into_iter()
535                .max_by_key(|(ts, _)| ts.map(|v| v.get()).unwrap_or_default())
536                .map(|(_, value)| value)
537                .unwrap_or_default();
538            *prom_gauge = latest_value;
539        }
540        (MetricValues::Set(set_values), PrometheusValue::Gauge(prom_gauge)) => {
541            let latest_value = set_values
542                .into_iter()
543                .max_by_key(|(ts, _)| ts.map(|v| v.get()).unwrap_or_default())
544                .map(|(_, value)| value)
545                .unwrap_or_default();
546            *prom_gauge = latest_value;
547        }
548        (MetricValues::Histogram(histogram_values), PrometheusValue::Histogram(prom_histogram)) => {
549            for (_, value) in histogram_values {
550                prom_histogram.merge_histogram(&value);
551            }
552        }
553        (MetricValues::Distribution(distribution_values), PrometheusValue::Summary(prom_summary)) => {
554            for (_, value) in distribution_values {
555                prom_summary.merge(&value);
556            }
557        }
558        _ => panic!("Mismatched metric types"),
559    }
560}
561
562fn normalize_metric_name(name: &str, name_buf: &mut String, interner: &FixedSizeInterner<1>) -> Option<MetaString> {
563    name_buf.clear();
564
565    // Normalize the metric name to a valid Prometheus metric name.
566    for (i, c) in name.chars().enumerate() {
567        if i == 0 && is_valid_name_start_char(c) || i != 0 && is_valid_name_char(c) {
568            name_buf.push(c);
569        } else {
570            // Convert periods to a set of two underscores, and anything else to a single underscore.
571            //
572            // This lets us ensure that the normal separators we use in metrics (periods) are converted in a way
573            // where they can be distinguished on the collector side to potentially reconstitute them back to their
574            // original form.
575            name_buf.push_str(if c == '.' { "__" } else { "_" });
576        }
577    }
578
579    // Now try and intern the normalized name.
580    interner.try_intern(name_buf).map(MetaString::from)
581}
582
583#[inline]
584fn is_valid_name_start_char(c: char) -> bool {
585    // Matches a regular expression of [a-zA-Z_:].
586    c.is_ascii_alphabetic() || c == '_' || c == ':'
587}
588
589#[inline]
590fn is_valid_name_char(c: char) -> bool {
591    // Matches a regular expression of [a-zA-Z0-9_:].
592    c.is_ascii_alphanumeric() || c == '_' || c == ':'
593}
594
595#[derive(Clone)]
596struct PrometheusHistogram {
597    sum: f64,
598    count: u64,
599    buckets: Vec<(f64, &'static str, u64)>,
600}
601
602impl PrometheusHistogram {
603    fn new(metric_name: &str) -> Self {
604        // Super hacky but effective way to decide when to switch to the time-oriented buckets.
605        let base_buckets = if metric_name.ends_with("_seconds") {
606            &TIME_HISTOGRAM_BUCKETS[..]
607        } else {
608            &NON_TIME_HISTOGRAM_BUCKETS[..]
609        };
610
611        let buckets = base_buckets
612            .iter()
613            .map(|(upper_bound, upper_bound_str)| (*upper_bound, *upper_bound_str, 0))
614            .collect();
615
616        Self {
617            sum: 0.0,
618            count: 0,
619            buckets,
620        }
621    }
622
623    fn merge_histogram(&mut self, histogram: &Histogram) {
624        for sample in histogram.samples() {
625            self.add_sample(sample.value.into_inner(), sample.weight);
626        }
627    }
628
629    fn add_sample(&mut self, value: f64, weight: u64) {
630        self.sum += value * weight as f64;
631        self.count += weight;
632
633        // Add the value to each bucket that it falls into, up to the maximum number of buckets.
634        for (upper_bound, _, count) in &mut self.buckets {
635            if value <= *upper_bound {
636                *count += weight;
637            }
638        }
639    }
640
641    fn buckets(&self) -> impl Iterator<Item = (&'static str, u64)> + '_ {
642        self.buckets
643            .iter()
644            .map(|(_, upper_bound_str, count)| (*upper_bound_str, *count))
645    }
646}
647
648fn histogram_buckets<const N: usize>(base: f64, scale: f64) -> [(f64, &'static str); N] {
649    // We generate a set of "log-linear" buckets: logarithmically spaced values which are then subdivided linearly.
650    //
651    // As an example, with base=2 and scale=4, we would get: 2, 5, 8, 20, 32, 80, 128, 320, 512, and so on.
652    //
653    // We calculate buckets in pairs, where the n-th pair is `i` and `j`, such that `i` is `base * scale^n` and `j` is
654    // the midpoint between `i` and the next `i` (`base * scale^(n+1)`).
655
656    let mut buckets = [(0.0, ""); N];
657
658    let log_linear_buckets = std::iter::repeat(base).enumerate().flat_map(|(i, base)| {
659        let pow = scale.powf(i as f64);
660        let value = base * pow;
661
662        let next_pow = scale.powf((i + 1) as f64);
663        let next_value = base * next_pow;
664        let midpoint = (value + next_value) / 2.0;
665
666        [value, midpoint]
667    });
668
669    for (i, current_le) in log_linear_buckets.enumerate().take(N) {
670        let (bucket_le, bucket_le_str) = &mut buckets[i];
671        let current_le_str = format!("{}", current_le);
672
673        *bucket_le = current_le;
674        *bucket_le_str = current_le_str.leak();
675    }
676
677    buckets
678}
679
680#[cfg(test)]
681mod tests {
682    use super::*;
683
684    #[test]
685    fn bucket_print() {
686        println!("time buckets: {:?}", *TIME_HISTOGRAM_BUCKETS);
687        println!("non-time buckets: {:?}", *NON_TIME_HISTOGRAM_BUCKETS);
688    }
689
690    #[test]
691    fn prom_histogram_add_sample() {
692        let sample1 = (0.25, 1);
693        let sample2 = (1.0, 2);
694        let sample3 = (2.0, 3);
695
696        let mut histogram = PrometheusHistogram::new("time_metric_seconds");
697        histogram.add_sample(sample1.0, sample1.1);
698        histogram.add_sample(sample2.0, sample2.1);
699        histogram.add_sample(sample3.0, sample3.1);
700
701        let sample1_weighted_value = sample1.0 * sample1.1 as f64;
702        let sample2_weighted_value = sample2.0 * sample2.1 as f64;
703        let sample3_weighted_value = sample3.0 * sample3.1 as f64;
704        let expected_sum = sample1_weighted_value + sample2_weighted_value + sample3_weighted_value;
705        let expected_count = sample1.1 + sample2.1 + sample3.1;
706        assert_eq!(histogram.sum, expected_sum);
707        assert_eq!(histogram.count, expected_count);
708
709        // Go through and make sure we have things in the right buckets.
710        let mut expected_bucket_count = 0;
711        for sample in [sample1, sample2, sample3] {
712            for bucket in &histogram.buckets {
713                // If we've finally hit a bucket that includes our sample value, it's count should be equal to or
714                // greater than our expected bucket count when we account for the current sample.
715                if sample.0 <= bucket.0 {
716                    assert!(bucket.2 >= expected_bucket_count + sample.1);
717                }
718            }
719
720            // Adjust the expected bucket count to fully account for the current sample before moving on.
721            expected_bucket_count += sample.1;
722        }
723    }
724
725    #[test]
726    fn prom_get_help_text() {
727        // Ensure that we catch when the help text changes for these metrics.
728        assert_eq!(
729            get_help_text("no_aggregation__flush"),
730            Some("Count the number of flushes done by the no-aggregation pipeline worker")
731        );
732        assert_eq!(
733            get_help_text("no_aggregation__processed"),
734            Some("Count the number of samples processed by the no-aggregation pipeline worker")
735        );
736        assert_eq!(
737            get_help_text("aggregator__dogstatsd_contexts_by_mtype"),
738            Some("Count the number of dogstatsd contexts in the aggregator, by metric type")
739        );
740        assert_eq!(
741            get_help_text("aggregator__flush"),
742            Some("Number of metrics/service checks/events flushed")
743        );
744        assert_eq!(
745            get_help_text("aggregator__dogstatsd_contexts_bytes_by_mtype"),
746            Some("Estimated count of bytes taken by contexts in the aggregator, by metric type")
747        );
748        assert_eq!(
749            get_help_text("aggregator__dogstatsd_contexts"),
750            Some("Count the number of dogstatsd contexts in the aggregator")
751        );
752        assert_eq!(
753            get_help_text("aggregator__processed"),
754            Some("Amount of metrics/services_checks/events processed by the aggregator")
755        );
756    }
757}