saluki_components/destinations/prometheus/
mod.rs

1use std::{
2    convert::Infallible,
3    fmt::Write as _,
4    num::NonZeroUsize,
5    sync::{Arc, LazyLock},
6};
7
8use async_trait::async_trait;
9use ddsketch::DDSketch;
10use http::{Request, Response};
11use hyper::{body::Incoming, service::service_fn};
12use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
13use saluki_common::{collections::FastIndexMap, iter::ReusableDeduplicator};
14use saluki_context::{tags::Tag, Context};
15use saluki_core::components::{destinations::*, ComponentContext};
16use saluki_core::data_model::event::{
17    metric::{Histogram, Metric, MetricValues},
18    EventType,
19};
20use saluki_error::GenericError;
21use saluki_io::net::{
22    listener::ConnectionOrientedListener,
23    server::http::{ErrorHandle, HttpServer, ShutdownHandle},
24    ListenAddress,
25};
26use serde::Deserialize;
27use stringtheory::{
28    interning::{FixedSizeInterner, Interner as _},
29    MetaString,
30};
31use tokio::{select, sync::RwLock};
32use tracing::debug;
33
34const CONTEXT_LIMIT: usize = 10_000;
35const PAYLOAD_SIZE_LIMIT_BYTES: usize = 1024 * 1024;
36const PAYLOAD_BUFFER_SIZE_LIMIT_BYTES: usize = 128 * 1024;
37const TAGS_BUFFER_SIZE_LIMIT_BYTES: usize = 2048;
38const NAME_NORMALIZATION_BUFFER_SIZE: usize = 512;
39
40// Histogram-related constants and pre-calculated buckets.
41const TIME_HISTOGRAM_BUCKET_COUNT: usize = 30;
42static TIME_HISTOGRAM_BUCKETS: LazyLock<[(f64, &'static str); TIME_HISTOGRAM_BUCKET_COUNT]> =
43    LazyLock::new(|| histogram_buckets::<TIME_HISTOGRAM_BUCKET_COUNT>(0.000000128, 4.0));
44
45const NON_TIME_HISTOGRAM_BUCKET_COUNT: usize = 30;
46static NON_TIME_HISTOGRAM_BUCKETS: LazyLock<[(f64, &'static str); NON_TIME_HISTOGRAM_BUCKET_COUNT]> =
47    LazyLock::new(|| histogram_buckets::<NON_TIME_HISTOGRAM_BUCKET_COUNT>(1.0, 2.0));
48
49// SAFETY: This is obviously not zero.
50const METRIC_NAME_STRING_INTERNER_BYTES: NonZeroUsize = NonZeroUsize::new(65536).unwrap();
51
52/// Prometheus destination.
53///
54/// Exposes a Prometheus scrape endpoint that emits metrics in the Prometheus exposition format.
55///
56/// # Limits
57///
58/// - Number of contexts (unique series) is limited to 10,000.
59/// - Maximum size of scrape payload response is ~1MiB.
60///
61/// # Missing
62///
63/// - no support for expiring metrics (which we don't really need because the only use for this destination at the
64///   moment is internal metrics, which aren't dynamic since we don't use dynamic tags or have dynamic topology support,
65///   but... you know, we'll eventually need this)
66/// - full support for distributions (we can't convert a distribution to an aggregated histogram, and native histogram
67///   support is still too fresh for most clients, so we simply expose aggregated summaries as a stopgap)
68///
69#[derive(Deserialize)]
70pub struct PrometheusConfiguration {
71    #[serde(rename = "prometheus_listen_addr")]
72    listen_addr: ListenAddress,
73}
74
75impl PrometheusConfiguration {
76    /// Creates a new `PrometheusConfiguration` for the given listen address.
77    pub fn from_listen_address(listen_addr: ListenAddress) -> Self {
78        Self { listen_addr }
79    }
80}
81
82#[async_trait]
83impl DestinationBuilder for PrometheusConfiguration {
84    fn input_event_type(&self) -> EventType {
85        EventType::Metric
86    }
87
88    async fn build(&self, _context: ComponentContext) -> Result<Box<dyn Destination + Send>, GenericError> {
89        Ok(Box::new(Prometheus {
90            listener: ConnectionOrientedListener::from_listen_address(self.listen_addr.clone()).await?,
91            metrics: FastIndexMap::default(),
92            payload: Arc::new(RwLock::new(String::new())),
93            payload_buffer: String::with_capacity(PAYLOAD_BUFFER_SIZE_LIMIT_BYTES),
94            tags_buffer: String::with_capacity(TAGS_BUFFER_SIZE_LIMIT_BYTES),
95            interner: FixedSizeInterner::new(METRIC_NAME_STRING_INTERNER_BYTES),
96        }))
97    }
98}
99
100impl MemoryBounds for PrometheusConfiguration {
101    fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
102        builder
103            .minimum()
104            // Capture the size of the heap allocation when the component is built.
105            .with_single_value::<Prometheus>("component struct")
106            // This isn't _really_ bounded since the string buffer could definitely grow larger if the metric name was
107            // larger, but the default buffer size is far beyond any typical metric name that it should almost never
108            // grow beyond this initially allocated size.
109            .with_fixed_amount("name normalization buffer size", NAME_NORMALIZATION_BUFFER_SIZE);
110
111        builder
112            .firm()
113            // Even though our context map is really the Prometheus context to a map of context/value pairs, we're just
114            // simplifying things here because the ratio of true "contexts" to Prometheus contexts should be very high,
115            // high enough to make this a reasonable approximation.
116            .with_map::<Context, PrometheusValue>("state map", CONTEXT_LIMIT)
117            .with_fixed_amount("payload size", PAYLOAD_SIZE_LIMIT_BYTES)
118            .with_fixed_amount("payload buffer", PAYLOAD_BUFFER_SIZE_LIMIT_BYTES)
119            .with_fixed_amount("tags buffer", TAGS_BUFFER_SIZE_LIMIT_BYTES);
120    }
121}
122
123struct Prometheus {
124    listener: ConnectionOrientedListener,
125    metrics: FastIndexMap<PrometheusContext, FastIndexMap<Context, PrometheusValue>>,
126    payload: Arc<RwLock<String>>,
127    payload_buffer: String,
128    tags_buffer: String,
129    interner: FixedSizeInterner<1>,
130}
131
132#[async_trait]
133impl Destination for Prometheus {
134    async fn run(mut self: Box<Self>, mut context: DestinationContext) -> Result<(), GenericError> {
135        let Self {
136            listener,
137            mut metrics,
138            payload,
139            mut payload_buffer,
140            mut tags_buffer,
141            interner,
142        } = *self;
143
144        let mut health = context.take_health_handle();
145
146        let (http_shutdown, mut http_error) = spawn_prom_scrape_service(listener, Arc::clone(&payload));
147        health.mark_ready();
148
149        debug!("Prometheus destination started.");
150
151        let mut contexts = 0;
152        let mut name_buf = String::with_capacity(NAME_NORMALIZATION_BUFFER_SIZE);
153        let mut tags_deduplicator = ReusableDeduplicator::new();
154
155        loop {
156            select! {
157                _ = health.live() => continue,
158                maybe_events = context.events().next() => match maybe_events {
159                    Some(events) => {
160                        // Process each metric event in the batch, either merging it with the existing value or
161                        // inserting it for the first time.
162                        for event in events {
163                            if let Some(metric) = event.try_into_metric() {
164                                // Break apart our metric into its constituent parts, and then normalize it for
165                                // Prometheus: adjust the name if necessary, figuring out the equivalent Prometheus
166                                // metric type, and so on.
167                                let prom_context = match into_prometheus_metric(&metric, &mut name_buf, &interner) {
168                                    Some(prom_context) => prom_context,
169                                    None => continue,
170                                };
171
172                                let (context, values, _) = metric.into_parts();
173
174                                // Create an entry for the context if we don't already have one, obeying our configured context limit.
175                                let existing_contexts = metrics.entry(prom_context.clone()).or_default();
176                                match existing_contexts.get_mut(&context) {
177                                    Some(existing_prom_value) => merge_metric_values_with_prom_value(values, existing_prom_value),
178                                    None => {
179                                        if contexts >= CONTEXT_LIMIT {
180                                            debug!("Prometheus destination reached context limit. Skipping metric '{}'.", context.name());
181                                            continue
182                                        }
183
184                                        let mut new_prom_value = get_prom_value_for_prom_context(&prom_context);
185                                        merge_metric_values_with_prom_value(values, &mut new_prom_value);
186
187                                        existing_contexts.insert(context, new_prom_value);
188                                        contexts += 1;
189                                    }
190                                }
191                            }
192                        }
193
194                        // Regenerate the scrape payload.
195                        regenerate_payload(&metrics, &payload, &mut payload_buffer, &mut tags_buffer, &mut tags_deduplicator).await;
196                    },
197                    None => break,
198                },
199                error = &mut http_error => {
200                    if let Some(error) = error {
201                        debug!(%error, "HTTP server error.");
202                    }
203                    break;
204                },
205            }
206        }
207
208        // TODO: This should really be `DynamicShutdownCoordinator`-based so we can trigger shutdown _and_ wait until
209        // all HTTP connections and the listener have finished.
210        http_shutdown.shutdown();
211
212        debug!("Prometheus destination stopped.");
213
214        Ok(())
215    }
216}
217
218fn spawn_prom_scrape_service(
219    listener: ConnectionOrientedListener, payload: Arc<RwLock<String>>,
220) -> (ShutdownHandle, ErrorHandle) {
221    let service = service_fn(move |_: Request<Incoming>| {
222        let payload = Arc::clone(&payload);
223        async move {
224            let payload = payload.read().await;
225            Ok::<Response<String>, Infallible>(Response::new(payload.to_string()))
226        }
227    });
228
229    let http_server = HttpServer::from_listener(listener, service);
230    http_server.listen()
231}
232
233#[allow(clippy::mutable_key_type)]
234async fn regenerate_payload(
235    metrics: &FastIndexMap<PrometheusContext, FastIndexMap<Context, PrometheusValue>>, payload: &Arc<RwLock<String>>,
236    payload_buffer: &mut String, tags_buffer: &mut String, tags_deduplicator: &mut ReusableDeduplicator<Tag>,
237) {
238    let mut payload = payload.write().await;
239    payload.clear();
240
241    let mut metrics_written = 0;
242    let metrics_total = metrics.len();
243
244    for (prom_context, contexts) in metrics {
245        if write_metrics(payload_buffer, tags_buffer, prom_context, contexts, tags_deduplicator) {
246            if payload.len() + payload_buffer.len() > PAYLOAD_SIZE_LIMIT_BYTES {
247                debug!(
248                    metrics_written,
249                    metrics_total,
250                    payload_len = payload.len(),
251                    "Writing additional metrics would exceed payload size limit. Skipping remaining metrics."
252                );
253                break;
254            }
255
256            // If we've already written some metrics, add a newline between each grouping.
257            if metrics_written > 0 {
258                payload.push('\n');
259            }
260
261            payload.push_str(payload_buffer);
262
263            metrics_written += 1;
264        } else {
265            debug!("Failed to write metric to payload. Continuing...");
266        }
267    }
268}
269
270fn get_help_text(metric_name: &str) -> Option<&'static str> {
271    // The HELP text for overlapped metrics MUST match the agent's HELP text exactly or else an error will occur on the
272    // agent's side when parsing the metrics.
273    match metric_name {
274        "no_aggregation__flush" => Some("Count the number of flushes done by the no-aggregation pipeline worker"),
275        "no_aggregation__processed" => {
276            Some("Count the number of samples processed by the no-aggregation pipeline worker")
277        }
278        "aggregator__dogstatsd_contexts_by_mtype" => {
279            Some("Count the number of dogstatsd contexts in the aggregator, by metric type")
280        }
281        "aggregator__flush" => Some("Number of metrics/service checks/events flushed"),
282        "aggregator__dogstatsd_contexts_bytes_by_mtype" => {
283            Some("Estimated count of bytes taken by contexts in the aggregator, by metric type")
284        }
285        "aggregator__dogstatsd_contexts" => Some("Count the number of dogstatsd contexts in the aggregator"),
286        "aggregator__processed" => Some("Amount of metrics/services_checks/events processed by the aggregator"),
287        "dogstatsd__processed" => Some("Count of service checks/events/metrics processed by dogstatsd"),
288        "dogstatsd__packet_pool_get" => Some("Count of get done in the packet pool"),
289        "dogstatsd__packet_pool_put" => Some("Count of put done in the packet pool"),
290        "dogstatsd__packet_pool" => Some("Usage of the packet pool in dogstatsd"),
291        "transactions__errors" => Some("Count of transactions errored grouped by type of error"),
292        "transactions__http_errors" => Some("Count of transactions http errors per http code"),
293        "transactions__dropped" => Some("Transaction drop count"),
294        "transactions__success" => Some("Successful transaction count"),
295        "transactions__success_bytes" => Some("Successful transaction sizes in bytes"),
296        _ => None,
297    }
298}
299
300fn write_metrics(
301    payload_buffer: &mut String, tags_buffer: &mut String, prom_context: &PrometheusContext,
302    contexts: &FastIndexMap<Context, PrometheusValue>, tags_deduplicator: &mut ReusableDeduplicator<Tag>,
303) -> bool {
304    if contexts.is_empty() {
305        debug!("No contexts for metric '{}'. Skipping.", prom_context.metric_name);
306        return true;
307    }
308
309    payload_buffer.clear();
310
311    // Write HELP if available
312    if let Some(help_text) = get_help_text(prom_context.metric_name.as_ref()) {
313        writeln!(payload_buffer, "# HELP {} {}", prom_context.metric_name, help_text).unwrap();
314    }
315    // Write the metric header.
316    writeln!(
317        payload_buffer,
318        "# TYPE {} {}",
319        prom_context.metric_name,
320        prom_context.metric_type.as_str()
321    )
322    .unwrap();
323
324    for (context, values) in contexts {
325        if payload_buffer.len() > PAYLOAD_BUFFER_SIZE_LIMIT_BYTES {
326            debug!("Payload buffer size limit exceeded. Additional contexts for this metric will be truncated.");
327            break;
328        }
329
330        tags_buffer.clear();
331
332        // Format/encode the tags.
333        if !format_tags(tags_buffer, context, tags_deduplicator) {
334            return false;
335        }
336
337        // Write the metric value itself.
338        match values {
339            PrometheusValue::Counter(value) | PrometheusValue::Gauge(value) => {
340                // No metric type-specific tags for counters or gauges, so just write them straight out.
341                payload_buffer.push_str(&prom_context.metric_name);
342                if !tags_buffer.is_empty() {
343                    payload_buffer.push('{');
344                    payload_buffer.push_str(tags_buffer);
345                    payload_buffer.push('}');
346                }
347                writeln!(payload_buffer, " {}", value).unwrap();
348            }
349            PrometheusValue::Histogram(histogram) => {
350                // Write the histogram buckets.
351                for (upper_bound_str, count) in histogram.buckets() {
352                    write!(payload_buffer, "{}_bucket{{{}", &prom_context.metric_name, tags_buffer).unwrap();
353                    if !tags_buffer.is_empty() {
354                        payload_buffer.push(',');
355                    }
356                    writeln!(payload_buffer, "le=\"{}\"}} {}", upper_bound_str, count).unwrap();
357                }
358
359                // Write the final bucket -- the +Inf bucket -- which is just equal to the count of the histogram.
360                write!(payload_buffer, "{}_bucket{{{}", &prom_context.metric_name, tags_buffer).unwrap();
361                if !tags_buffer.is_empty() {
362                    payload_buffer.push(',');
363                }
364                writeln!(payload_buffer, "le=\"+Inf\"}} {}", histogram.count).unwrap();
365
366                // Write the histogram sum and count.
367                write!(payload_buffer, "{}_sum", &prom_context.metric_name).unwrap();
368                if !tags_buffer.is_empty() {
369                    payload_buffer.push('{');
370                    payload_buffer.push_str(tags_buffer);
371                    payload_buffer.push('}');
372                }
373                writeln!(payload_buffer, " {}", histogram.sum).unwrap();
374
375                write!(payload_buffer, "{}_count", &prom_context.metric_name).unwrap();
376                if !tags_buffer.is_empty() {
377                    payload_buffer.push('{');
378                    payload_buffer.push_str(tags_buffer);
379                    payload_buffer.push('}');
380                }
381                writeln!(payload_buffer, " {}", histogram.count).unwrap();
382            }
383            PrometheusValue::Summary(sketch) => {
384                // We take a fixed set of quantiles from the sketch, which is hard-coded but should generally represent
385                // the quantiles people generally care about.
386                for quantile in [0.1, 0.25, 0.5, 0.95, 0.99, 0.999] {
387                    let q_value = sketch.quantile(quantile).unwrap_or_default();
388
389                    write!(payload_buffer, "{}{{{}", &prom_context.metric_name, tags_buffer).unwrap();
390                    if !tags_buffer.is_empty() {
391                        payload_buffer.push(',');
392                    }
393                    writeln!(payload_buffer, "quantile=\"{}\"}} {}", quantile, q_value).unwrap();
394                }
395
396                write!(payload_buffer, "{}_sum", &prom_context.metric_name).unwrap();
397                if !tags_buffer.is_empty() {
398                    payload_buffer.push('{');
399                    payload_buffer.push_str(tags_buffer);
400                    payload_buffer.push('}');
401                }
402                writeln!(payload_buffer, " {}", sketch.sum().unwrap_or_default()).unwrap();
403
404                write!(payload_buffer, "{}_count", &prom_context.metric_name).unwrap();
405                if !tags_buffer.is_empty() {
406                    payload_buffer.push('{');
407                    payload_buffer.push_str(tags_buffer);
408                    payload_buffer.push('}');
409                }
410                writeln!(payload_buffer, " {}", sketch.count()).unwrap();
411            }
412        }
413    }
414
415    true
416}
417
418fn format_tags(tags_buffer: &mut String, context: &Context, tags_deduplicator: &mut ReusableDeduplicator<Tag>) -> bool {
419    let mut has_tags = false;
420
421    let chained_tags = context.tags().into_iter().chain(context.origin_tags());
422    let deduplicated_tags = tags_deduplicator.deduplicated(chained_tags);
423
424    for tag in deduplicated_tags {
425        // If we're not the first tag to be written, add a comma to separate the tags.
426        if has_tags {
427            tags_buffer.push(',');
428        }
429
430        let tag_name = tag.name();
431        let tag_value = match tag.value() {
432            Some(value) => value,
433            None => {
434                debug!("Skipping bare tag.");
435                continue;
436            }
437        };
438
439        has_tags = true;
440
441        // Can't exceed the tags buffer size limit: we calculate the addition as tag name/value length plus three bytes
442        // to account for having to format it as `name="value",`.
443        if tags_buffer.len() + tag_name.len() + tag_value.len() + 4 > TAGS_BUFFER_SIZE_LIMIT_BYTES {
444            debug!("Tags buffer size limit exceeded. Tags may be missing from this metric.");
445            return false;
446        }
447
448        write!(tags_buffer, "{}=\"{}\"", tag_name, tag_value).unwrap();
449    }
450
451    true
452}
453
454#[derive(Clone, Copy, Eq, Hash, Ord, PartialEq, PartialOrd)]
455enum PrometheusType {
456    Counter,
457    Gauge,
458    Histogram,
459    Summary,
460}
461
462impl PrometheusType {
463    fn as_str(&self) -> &'static str {
464        match self {
465            Self::Counter => "counter",
466            Self::Gauge => "gauge",
467            Self::Histogram => "histogram",
468            Self::Summary => "summary",
469        }
470    }
471}
472
473#[derive(Clone, Eq, Hash, Ord, PartialEq, PartialOrd)]
474struct PrometheusContext {
475    metric_name: MetaString,
476    metric_type: PrometheusType,
477}
478
479enum PrometheusValue {
480    Counter(f64),
481    Gauge(f64),
482    Histogram(PrometheusHistogram),
483    Summary(DDSketch),
484}
485
486fn into_prometheus_metric(
487    metric: &Metric, name_buf: &mut String, interner: &FixedSizeInterner<1>,
488) -> Option<PrometheusContext> {
489    // Normalize the metric name first, since we might fail due to the interner being full.
490    let metric_name = match normalize_metric_name(metric.context().name(), name_buf, interner) {
491        Some(name) => name,
492        None => {
493            debug!(
494                "Failed to intern normalized metric name. Skipping metric '{}'.",
495                metric.context().name()
496            );
497            return None;
498        }
499    };
500
501    let metric_type = match metric.values() {
502        MetricValues::Counter(_) => PrometheusType::Counter,
503        MetricValues::Gauge(_) | MetricValues::Set(_) => PrometheusType::Gauge,
504        MetricValues::Histogram(_) => PrometheusType::Histogram,
505        MetricValues::Distribution(_) => PrometheusType::Summary,
506        _ => return None,
507    };
508
509    Some(PrometheusContext {
510        metric_name,
511        metric_type,
512    })
513}
514
515fn get_prom_value_for_prom_context(prom_context: &PrometheusContext) -> PrometheusValue {
516    match prom_context.metric_type {
517        PrometheusType::Counter => PrometheusValue::Counter(0.0),
518        PrometheusType::Gauge => PrometheusValue::Gauge(0.0),
519        PrometheusType::Histogram => PrometheusValue::Histogram(PrometheusHistogram::new(&prom_context.metric_name)),
520        PrometheusType::Summary => PrometheusValue::Summary(DDSketch::default()),
521    }
522}
523
524fn merge_metric_values_with_prom_value(values: MetricValues, prom_value: &mut PrometheusValue) {
525    match (values, prom_value) {
526        (MetricValues::Counter(counter_values), PrometheusValue::Counter(prom_counter)) => {
527            for (_, value) in counter_values {
528                *prom_counter += value;
529            }
530        }
531        (MetricValues::Gauge(gauge_values), PrometheusValue::Gauge(prom_gauge)) => {
532            let latest_value = gauge_values
533                .into_iter()
534                .max_by_key(|(ts, _)| ts.map(|v| v.get()).unwrap_or_default())
535                .map(|(_, value)| value)
536                .unwrap_or_default();
537            *prom_gauge = latest_value;
538        }
539        (MetricValues::Set(set_values), PrometheusValue::Gauge(prom_gauge)) => {
540            let latest_value = set_values
541                .into_iter()
542                .max_by_key(|(ts, _)| ts.map(|v| v.get()).unwrap_or_default())
543                .map(|(_, value)| value)
544                .unwrap_or_default();
545            *prom_gauge = latest_value;
546        }
547        (MetricValues::Histogram(histogram_values), PrometheusValue::Histogram(prom_histogram)) => {
548            for (_, value) in histogram_values {
549                prom_histogram.merge_histogram(&value);
550            }
551        }
552        (MetricValues::Distribution(distribution_values), PrometheusValue::Summary(prom_summary)) => {
553            for (_, value) in distribution_values {
554                prom_summary.merge(&value);
555            }
556        }
557        _ => panic!("Mismatched metric types"),
558    }
559}
560
561fn normalize_metric_name(name: &str, name_buf: &mut String, interner: &FixedSizeInterner<1>) -> Option<MetaString> {
562    name_buf.clear();
563
564    // Normalize the metric name to a valid Prometheus metric name.
565    for (i, c) in name.chars().enumerate() {
566        if i == 0 && is_valid_name_start_char(c) || i != 0 && is_valid_name_char(c) {
567            name_buf.push(c);
568        } else {
569            // Convert periods to a set of two underscores, and anything else to a single underscore.
570            //
571            // This lets us ensure that the normal separators we use in metrics (periods) are converted in a way
572            // where they can be distinguished on the collector side to potentially reconstitute them back to their
573            // original form.
574            name_buf.push_str(if c == '.' { "__" } else { "_" });
575        }
576    }
577
578    // Now try and intern the normalized name.
579    interner.try_intern(name_buf).map(MetaString::from)
580}
581
582#[inline]
583fn is_valid_name_start_char(c: char) -> bool {
584    // Matches a regular expression of [a-zA-Z_:].
585    c.is_ascii_alphabetic() || c == '_' || c == ':'
586}
587
588#[inline]
589fn is_valid_name_char(c: char) -> bool {
590    // Matches a regular expression of [a-zA-Z0-9_:].
591    c.is_ascii_alphanumeric() || c == '_' || c == ':'
592}
593
594#[derive(Clone)]
595struct PrometheusHistogram {
596    sum: f64,
597    count: u64,
598    buckets: Vec<(f64, &'static str, u64)>,
599}
600
601impl PrometheusHistogram {
602    fn new(metric_name: &str) -> Self {
603        // Super hacky but effective way to decide when to switch to the time-oriented buckets.
604        let base_buckets = if metric_name.ends_with("_seconds") {
605            &TIME_HISTOGRAM_BUCKETS[..]
606        } else {
607            &NON_TIME_HISTOGRAM_BUCKETS[..]
608        };
609
610        let buckets = base_buckets
611            .iter()
612            .map(|(upper_bound, upper_bound_str)| (*upper_bound, *upper_bound_str, 0))
613            .collect();
614
615        Self {
616            sum: 0.0,
617            count: 0,
618            buckets,
619        }
620    }
621
622    fn merge_histogram(&mut self, histogram: &Histogram) {
623        for sample in histogram.samples() {
624            self.add_sample(sample.value.into_inner(), sample.weight);
625        }
626    }
627
628    fn add_sample(&mut self, value: f64, weight: u64) {
629        self.sum += value * weight as f64;
630        self.count += weight;
631
632        // Add the value to each bucket that it falls into, up to the maximum number of buckets.
633        for (upper_bound, _, count) in &mut self.buckets {
634            if value <= *upper_bound {
635                *count += weight;
636            }
637        }
638    }
639
640    fn buckets(&self) -> impl Iterator<Item = (&'static str, u64)> + '_ {
641        self.buckets
642            .iter()
643            .map(|(_, upper_bound_str, count)| (*upper_bound_str, *count))
644    }
645}
646
647fn histogram_buckets<const N: usize>(base: f64, scale: f64) -> [(f64, &'static str); N] {
648    // We generate a set of "log-linear" buckets: logarithmically spaced values which are then subdivided linearly.
649    //
650    // As an example, with base=2 and scale=4, we would get: 2, 5, 8, 20, 32, 80, 128, 320, 512, and so on.
651    //
652    // We calculate buckets in pairs, where the n-th pair is `i` and `j`, such that `i` is `base * scale^n` and `j` is
653    // the midpoint between `i` and the next `i` (`base * scale^(n+1)`).
654
655    let mut buckets = [(0.0, ""); N];
656
657    let log_linear_buckets = std::iter::repeat(base).enumerate().flat_map(|(i, base)| {
658        let pow = scale.powf(i as f64);
659        let value = base * pow;
660
661        let next_pow = scale.powf((i + 1) as f64);
662        let next_value = base * next_pow;
663        let midpoint = (value + next_value) / 2.0;
664
665        [value, midpoint]
666    });
667
668    for (i, current_le) in log_linear_buckets.enumerate().take(N) {
669        let (bucket_le, bucket_le_str) = &mut buckets[i];
670        let current_le_str = format!("{}", current_le);
671
672        *bucket_le = current_le;
673        *bucket_le_str = current_le_str.leak();
674    }
675
676    buckets
677}
678
679#[cfg(test)]
680mod tests {
681    use super::*;
682
683    #[test]
684    fn bucket_print() {
685        println!("time buckets: {:?}", *TIME_HISTOGRAM_BUCKETS);
686        println!("non-time buckets: {:?}", *NON_TIME_HISTOGRAM_BUCKETS);
687    }
688
689    #[test]
690    fn prom_histogram_add_sample() {
691        let sample1 = (0.25, 1);
692        let sample2 = (1.0, 2);
693        let sample3 = (2.0, 3);
694
695        let mut histogram = PrometheusHistogram::new("time_metric_seconds");
696        histogram.add_sample(sample1.0, sample1.1);
697        histogram.add_sample(sample2.0, sample2.1);
698        histogram.add_sample(sample3.0, sample3.1);
699
700        let sample1_weighted_value = sample1.0 * sample1.1 as f64;
701        let sample2_weighted_value = sample2.0 * sample2.1 as f64;
702        let sample3_weighted_value = sample3.0 * sample3.1 as f64;
703        let expected_sum = sample1_weighted_value + sample2_weighted_value + sample3_weighted_value;
704        let expected_count = sample1.1 + sample2.1 + sample3.1;
705        assert_eq!(histogram.sum, expected_sum);
706        assert_eq!(histogram.count, expected_count);
707
708        // Go through and make sure we have things in the right buckets.
709        let mut expected_bucket_count = 0;
710        for sample in [sample1, sample2, sample3] {
711            for bucket in &histogram.buckets {
712                // If we've finally hit a bucket that includes our sample value, it's count should be equal to or
713                // greater than our expected bucket count when we account for the current sample.
714                if sample.0 <= bucket.0 {
715                    assert!(bucket.2 >= expected_bucket_count + sample.1);
716                }
717            }
718
719            // Adjust the expected bucket count to fully account for the current sample before moving on.
720            expected_bucket_count += sample.1;
721        }
722    }
723
724    #[test]
725    fn prom_get_help_text() {
726        // Ensure that we catch when the help text changes for these metrics.
727        assert_eq!(
728            get_help_text("no_aggregation__flush"),
729            Some("Count the number of flushes done by the no-aggregation pipeline worker")
730        );
731        assert_eq!(
732            get_help_text("no_aggregation__processed"),
733            Some("Count the number of samples processed by the no-aggregation pipeline worker")
734        );
735        assert_eq!(
736            get_help_text("aggregator__dogstatsd_contexts_by_mtype"),
737            Some("Count the number of dogstatsd contexts in the aggregator, by metric type")
738        );
739        assert_eq!(
740            get_help_text("aggregator__flush"),
741            Some("Number of metrics/service checks/events flushed")
742        );
743        assert_eq!(
744            get_help_text("aggregator__dogstatsd_contexts_bytes_by_mtype"),
745            Some("Estimated count of bytes taken by contexts in the aggregator, by metric type")
746        );
747        assert_eq!(
748            get_help_text("aggregator__dogstatsd_contexts"),
749            Some("Count the number of dogstatsd contexts in the aggregator")
750        );
751        assert_eq!(
752            get_help_text("aggregator__processed"),
753            Some("Amount of metrics/services_checks/events processed by the aggregator")
754        );
755    }
756}