1use std::{
2 convert::Infallible,
3 fmt::Write as _,
4 num::NonZeroUsize,
5 sync::{Arc, LazyLock},
6};
7
8use async_trait::async_trait;
9use ddsketch_agent::DDSketch;
10use http::{Request, Response};
11use hyper::{body::Incoming, service::service_fn};
12use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
13use saluki_common::{collections::FastIndexMap, iter::ReusableDeduplicator};
14use saluki_context::{tags::Tag, Context};
15use saluki_core::components::{destinations::*, ComponentContext};
16use saluki_core::data_model::event::{
17 metric::{Histogram, Metric, MetricValues},
18 EventType,
19};
20use saluki_error::GenericError;
21use saluki_io::net::{
22 listener::ConnectionOrientedListener,
23 server::http::{ErrorHandle, HttpServer, ShutdownHandle},
24 ListenAddress,
25};
26use serde::Deserialize;
27use stringtheory::{
28 interning::{FixedSizeInterner, Interner as _},
29 MetaString,
30};
31use tokio::{select, sync::RwLock};
32use tracing::debug;
33
34const CONTEXT_LIMIT: usize = 10_000;
35const PAYLOAD_SIZE_LIMIT_BYTES: usize = 1024 * 1024;
36const PAYLOAD_BUFFER_SIZE_LIMIT_BYTES: usize = 128 * 1024;
37const TAGS_BUFFER_SIZE_LIMIT_BYTES: usize = 2048;
38const NAME_NORMALIZATION_BUFFER_SIZE: usize = 512;
39
40const TIME_HISTOGRAM_BUCKET_COUNT: usize = 30;
42static TIME_HISTOGRAM_BUCKETS: LazyLock<[(f64, &'static str); TIME_HISTOGRAM_BUCKET_COUNT]> =
43 LazyLock::new(|| histogram_buckets::<TIME_HISTOGRAM_BUCKET_COUNT>(0.000000128, 4.0));
44
45const NON_TIME_HISTOGRAM_BUCKET_COUNT: usize = 30;
46static NON_TIME_HISTOGRAM_BUCKETS: LazyLock<[(f64, &'static str); NON_TIME_HISTOGRAM_BUCKET_COUNT]> =
47 LazyLock::new(|| histogram_buckets::<NON_TIME_HISTOGRAM_BUCKET_COUNT>(1.0, 2.0));
48
49const METRIC_NAME_STRING_INTERNER_BYTES: NonZeroUsize = NonZeroUsize::new(65536).unwrap();
51
52#[derive(Deserialize)]
70pub struct PrometheusConfiguration {
71 #[serde(rename = "prometheus_listen_addr")]
72 listen_addr: ListenAddress,
73}
74
75impl PrometheusConfiguration {
76 pub fn from_listen_address(listen_addr: ListenAddress) -> Self {
78 Self { listen_addr }
79 }
80}
81
82#[async_trait]
83impl DestinationBuilder for PrometheusConfiguration {
84 fn input_event_type(&self) -> EventType {
85 EventType::Metric
86 }
87
88 async fn build(&self, _context: ComponentContext) -> Result<Box<dyn Destination + Send>, GenericError> {
89 Ok(Box::new(Prometheus {
90 listener: ConnectionOrientedListener::from_listen_address(self.listen_addr.clone()).await?,
91 metrics: FastIndexMap::default(),
92 payload: Arc::new(RwLock::new(String::new())),
93 payload_buffer: String::with_capacity(PAYLOAD_BUFFER_SIZE_LIMIT_BYTES),
94 tags_buffer: String::with_capacity(TAGS_BUFFER_SIZE_LIMIT_BYTES),
95 interner: FixedSizeInterner::new(METRIC_NAME_STRING_INTERNER_BYTES),
96 }))
97 }
98}
99
100impl MemoryBounds for PrometheusConfiguration {
101 fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
102 builder
103 .minimum()
104 .with_single_value::<Prometheus>("component struct")
106 .with_fixed_amount("name normalization buffer size", NAME_NORMALIZATION_BUFFER_SIZE);
110
111 builder
112 .firm()
113 .with_map::<Context, PrometheusValue>("state map", CONTEXT_LIMIT)
117 .with_fixed_amount("payload size", PAYLOAD_SIZE_LIMIT_BYTES)
118 .with_fixed_amount("payload buffer", PAYLOAD_BUFFER_SIZE_LIMIT_BYTES)
119 .with_fixed_amount("tags buffer", TAGS_BUFFER_SIZE_LIMIT_BYTES);
120 }
121}
122
123struct Prometheus {
124 listener: ConnectionOrientedListener,
125 metrics: FastIndexMap<PrometheusContext, FastIndexMap<Context, PrometheusValue>>,
126 payload: Arc<RwLock<String>>,
127 payload_buffer: String,
128 tags_buffer: String,
129 interner: FixedSizeInterner<1>,
130}
131
132#[async_trait]
133impl Destination for Prometheus {
134 async fn run(mut self: Box<Self>, mut context: DestinationContext) -> Result<(), GenericError> {
135 let Self {
136 listener,
137 mut metrics,
138 payload,
139 mut payload_buffer,
140 mut tags_buffer,
141 interner,
142 } = *self;
143
144 let mut health = context.take_health_handle();
145
146 let (http_shutdown, mut http_error) = spawn_prom_scrape_service(listener, Arc::clone(&payload));
147 health.mark_ready();
148
149 debug!("Prometheus destination started.");
150
151 let mut contexts = 0;
152 let mut name_buf = String::with_capacity(NAME_NORMALIZATION_BUFFER_SIZE);
153 let mut tags_deduplicator = ReusableDeduplicator::new();
154
155 loop {
156 select! {
157 _ = health.live() => continue,
158 maybe_events = context.events().next() => match maybe_events {
159 Some(events) => {
160 for event in events {
163 if let Some(metric) = event.try_into_metric() {
164 let prom_context = match into_prometheus_metric(&metric, &mut name_buf, &interner) {
168 Some(prom_context) => prom_context,
169 None => continue,
170 };
171
172 let (context, values, _) = metric.into_parts();
173
174 let existing_contexts = metrics.entry(prom_context.clone()).or_default();
176 match existing_contexts.get_mut(&context) {
177 Some(existing_prom_value) => merge_metric_values_with_prom_value(values, existing_prom_value),
178 None => {
179 if contexts >= CONTEXT_LIMIT {
180 debug!("Prometheus destination reached context limit. Skipping metric '{}'.", context.name());
181 continue
182 }
183
184 let mut new_prom_value = get_prom_value_for_prom_context(&prom_context);
185 merge_metric_values_with_prom_value(values, &mut new_prom_value);
186
187 existing_contexts.insert(context, new_prom_value);
188 contexts += 1;
189 }
190 }
191 }
192 }
193
194 regenerate_payload(&metrics, &payload, &mut payload_buffer, &mut tags_buffer, &mut tags_deduplicator).await;
196 },
197 None => break,
198 },
199 error = &mut http_error => {
200 if let Some(error) = error {
201 debug!(%error, "HTTP server error.");
202 }
203 break;
204 },
205 }
206 }
207
208 http_shutdown.shutdown();
211
212 debug!("Prometheus destination stopped.");
213
214 Ok(())
215 }
216}
217
218fn spawn_prom_scrape_service(
219 listener: ConnectionOrientedListener, payload: Arc<RwLock<String>>,
220) -> (ShutdownHandle, ErrorHandle) {
221 let service = service_fn(move |_: Request<Incoming>| {
222 let payload = Arc::clone(&payload);
223 async move {
224 let payload = payload.read().await;
225 Ok::<Response<String>, Infallible>(Response::new(payload.to_string()))
226 }
227 });
228
229 let http_server = HttpServer::from_listener(listener, service);
230 http_server.listen()
231}
232
233#[allow(clippy::mutable_key_type)]
234async fn regenerate_payload(
235 metrics: &FastIndexMap<PrometheusContext, FastIndexMap<Context, PrometheusValue>>, payload: &Arc<RwLock<String>>,
236 payload_buffer: &mut String, tags_buffer: &mut String, tags_deduplicator: &mut ReusableDeduplicator<Tag>,
237) {
238 let mut payload = payload.write().await;
239 payload.clear();
240
241 let mut metrics_written = 0;
242 let metrics_total = metrics.len();
243
244 for (prom_context, contexts) in metrics {
245 if write_metrics(payload_buffer, tags_buffer, prom_context, contexts, tags_deduplicator) {
246 if payload.len() + payload_buffer.len() > PAYLOAD_SIZE_LIMIT_BYTES {
247 debug!(
248 metrics_written,
249 metrics_total,
250 payload_len = payload.len(),
251 "Writing additional metrics would exceed payload size limit. Skipping remaining metrics."
252 );
253 break;
254 }
255
256 if metrics_written > 0 {
258 payload.push('\n');
259 }
260
261 payload.push_str(payload_buffer);
262
263 metrics_written += 1;
264 } else {
265 debug!("Failed to write metric to payload. Continuing...");
266 }
267 }
268}
269
270fn get_help_text(metric_name: &str) -> Option<&'static str> {
271 match metric_name {
274 "no_aggregation__flush" => Some("Count the number of flushes done by the no-aggregation pipeline worker"),
275 "no_aggregation__processed" => {
276 Some("Count the number of samples processed by the no-aggregation pipeline worker")
277 }
278 "aggregator__dogstatsd_contexts_by_mtype" => {
279 Some("Count the number of dogstatsd contexts in the aggregator, by metric type")
280 }
281 "aggregator__flush" => Some("Number of metrics/service checks/events flushed"),
282 "aggregator__dogstatsd_contexts_bytes_by_mtype" => {
283 Some("Estimated count of bytes taken by contexts in the aggregator, by metric type")
284 }
285 "aggregator__dogstatsd_contexts" => Some("Count the number of dogstatsd contexts in the aggregator"),
286 "aggregator__processed" => Some("Amount of metrics/services_checks/events processed by the aggregator"),
287 "dogstatsd__processed" => Some("Count of service checks/events/metrics processed by dogstatsd"),
288 "dogstatsd__packet_pool_get" => Some("Count of get done in the packet pool"),
289 "dogstatsd__packet_pool_put" => Some("Count of put done in the packet pool"),
290 "dogstatsd__packet_pool" => Some("Usage of the packet pool in dogstatsd"),
291 _ => None,
292 }
293}
294
295fn write_metrics(
296 payload_buffer: &mut String, tags_buffer: &mut String, prom_context: &PrometheusContext,
297 contexts: &FastIndexMap<Context, PrometheusValue>, tags_deduplicator: &mut ReusableDeduplicator<Tag>,
298) -> bool {
299 if contexts.is_empty() {
300 debug!("No contexts for metric '{}'. Skipping.", prom_context.metric_name);
301 return true;
302 }
303
304 payload_buffer.clear();
305
306 if let Some(help_text) = get_help_text(prom_context.metric_name.as_ref()) {
308 writeln!(payload_buffer, "# HELP {} {}", prom_context.metric_name, help_text).unwrap();
309 }
310 writeln!(
312 payload_buffer,
313 "# TYPE {} {}",
314 prom_context.metric_name,
315 prom_context.metric_type.as_str()
316 )
317 .unwrap();
318
319 for (context, values) in contexts {
320 if payload_buffer.len() > PAYLOAD_BUFFER_SIZE_LIMIT_BYTES {
321 debug!("Payload buffer size limit exceeded. Additional contexts for this metric will be truncated.");
322 break;
323 }
324
325 tags_buffer.clear();
326
327 if !format_tags(tags_buffer, context, tags_deduplicator) {
329 return false;
330 }
331
332 match values {
334 PrometheusValue::Counter(value) | PrometheusValue::Gauge(value) => {
335 payload_buffer.push_str(&prom_context.metric_name);
337 if !tags_buffer.is_empty() {
338 payload_buffer.push('{');
339 payload_buffer.push_str(tags_buffer);
340 payload_buffer.push('}');
341 }
342 writeln!(payload_buffer, " {}", value).unwrap();
343 }
344 PrometheusValue::Histogram(histogram) => {
345 for (upper_bound_str, count) in histogram.buckets() {
347 write!(payload_buffer, "{}_bucket{{{}", &prom_context.metric_name, tags_buffer).unwrap();
348 if !tags_buffer.is_empty() {
349 payload_buffer.push(',');
350 }
351 writeln!(payload_buffer, "le=\"{}\"}} {}", upper_bound_str, count).unwrap();
352 }
353
354 write!(payload_buffer, "{}_bucket{{{}", &prom_context.metric_name, tags_buffer).unwrap();
356 if !tags_buffer.is_empty() {
357 payload_buffer.push(',');
358 }
359 writeln!(payload_buffer, "le=\"+Inf\"}} {}", histogram.count).unwrap();
360
361 write!(payload_buffer, "{}_sum", &prom_context.metric_name).unwrap();
363 if !tags_buffer.is_empty() {
364 payload_buffer.push('{');
365 payload_buffer.push_str(tags_buffer);
366 payload_buffer.push('}');
367 }
368 writeln!(payload_buffer, " {}", histogram.sum).unwrap();
369
370 write!(payload_buffer, "{}_count", &prom_context.metric_name).unwrap();
371 if !tags_buffer.is_empty() {
372 payload_buffer.push('{');
373 payload_buffer.push_str(tags_buffer);
374 payload_buffer.push('}');
375 }
376 writeln!(payload_buffer, " {}", histogram.count).unwrap();
377 }
378 PrometheusValue::Summary(sketch) => {
379 for quantile in [0.1, 0.25, 0.5, 0.95, 0.99, 0.999] {
382 let q_value = sketch.quantile(quantile).unwrap_or_default();
383
384 write!(payload_buffer, "{}{{{}", &prom_context.metric_name, tags_buffer).unwrap();
385 if !tags_buffer.is_empty() {
386 payload_buffer.push(',');
387 }
388 writeln!(payload_buffer, "quantile=\"{}\"}} {}", quantile, q_value).unwrap();
389 }
390
391 write!(payload_buffer, "{}_sum", &prom_context.metric_name).unwrap();
392 if !tags_buffer.is_empty() {
393 payload_buffer.push('{');
394 payload_buffer.push_str(tags_buffer);
395 payload_buffer.push('}');
396 }
397 writeln!(payload_buffer, " {}", sketch.sum().unwrap_or_default()).unwrap();
398
399 write!(payload_buffer, "{}_count", &prom_context.metric_name).unwrap();
400 if !tags_buffer.is_empty() {
401 payload_buffer.push('{');
402 payload_buffer.push_str(tags_buffer);
403 payload_buffer.push('}');
404 }
405 writeln!(payload_buffer, " {}", sketch.count()).unwrap();
406 }
407 }
408 }
409
410 true
411}
412
413fn format_tags(tags_buffer: &mut String, context: &Context, tags_deduplicator: &mut ReusableDeduplicator<Tag>) -> bool {
414 let mut has_tags = false;
415
416 let chained_tags = context.tags().into_iter().chain(context.origin_tags());
417 let deduplicated_tags = tags_deduplicator.deduplicated(chained_tags);
418
419 for tag in deduplicated_tags {
420 if has_tags {
422 tags_buffer.push(',');
423 }
424
425 let tag_name = tag.name();
426 let tag_value = match tag.value() {
427 Some(value) => value,
428 None => {
429 debug!("Skipping bare tag.");
430 continue;
431 }
432 };
433
434 has_tags = true;
435
436 if tags_buffer.len() + tag_name.len() + tag_value.len() + 4 > TAGS_BUFFER_SIZE_LIMIT_BYTES {
439 debug!("Tags buffer size limit exceeded. Tags may be missing from this metric.");
440 return false;
441 }
442
443 write!(tags_buffer, "{}=\"{}\"", tag_name, tag_value).unwrap();
444 }
445
446 true
447}
448
449#[derive(Clone, Copy, Eq, Hash, Ord, PartialEq, PartialOrd)]
450enum PrometheusType {
451 Counter,
452 Gauge,
453 Histogram,
454 Summary,
455}
456
457impl PrometheusType {
458 fn as_str(&self) -> &'static str {
459 match self {
460 Self::Counter => "counter",
461 Self::Gauge => "gauge",
462 Self::Histogram => "histogram",
463 Self::Summary => "summary",
464 }
465 }
466}
467
468#[derive(Clone, Eq, Hash, Ord, PartialEq, PartialOrd)]
469struct PrometheusContext {
470 metric_name: MetaString,
471 metric_type: PrometheusType,
472}
473
474enum PrometheusValue {
475 Counter(f64),
476 Gauge(f64),
477 Histogram(PrometheusHistogram),
478 Summary(DDSketch),
479}
480
481fn into_prometheus_metric(
482 metric: &Metric, name_buf: &mut String, interner: &FixedSizeInterner<1>,
483) -> Option<PrometheusContext> {
484 let metric_name = match normalize_metric_name(metric.context().name(), name_buf, interner) {
486 Some(name) => name,
487 None => {
488 debug!(
489 "Failed to intern normalized metric name. Skipping metric '{}'.",
490 metric.context().name()
491 );
492 return None;
493 }
494 };
495
496 let metric_type = match metric.values() {
497 MetricValues::Counter(_) => PrometheusType::Counter,
498 MetricValues::Gauge(_) | MetricValues::Set(_) => PrometheusType::Gauge,
499 MetricValues::Histogram(_) => PrometheusType::Histogram,
500 MetricValues::Distribution(_) => PrometheusType::Summary,
501 _ => return None,
502 };
503
504 Some(PrometheusContext {
505 metric_name,
506 metric_type,
507 })
508}
509
510fn get_prom_value_for_prom_context(prom_context: &PrometheusContext) -> PrometheusValue {
511 match prom_context.metric_type {
512 PrometheusType::Counter => PrometheusValue::Counter(0.0),
513 PrometheusType::Gauge => PrometheusValue::Gauge(0.0),
514 PrometheusType::Histogram => PrometheusValue::Histogram(PrometheusHistogram::new(&prom_context.metric_name)),
515 PrometheusType::Summary => PrometheusValue::Summary(DDSketch::default()),
516 }
517}
518
519fn merge_metric_values_with_prom_value(values: MetricValues, prom_value: &mut PrometheusValue) {
520 match (values, prom_value) {
521 (MetricValues::Counter(counter_values), PrometheusValue::Counter(prom_counter)) => {
522 for (_, value) in counter_values {
523 *prom_counter += value;
524 }
525 }
526 (MetricValues::Gauge(gauge_values), PrometheusValue::Gauge(prom_gauge)) => {
527 let latest_value = gauge_values
528 .into_iter()
529 .max_by_key(|(ts, _)| ts.map(|v| v.get()).unwrap_or_default())
530 .map(|(_, value)| value)
531 .unwrap_or_default();
532 *prom_gauge = latest_value;
533 }
534 (MetricValues::Set(set_values), PrometheusValue::Gauge(prom_gauge)) => {
535 let latest_value = set_values
536 .into_iter()
537 .max_by_key(|(ts, _)| ts.map(|v| v.get()).unwrap_or_default())
538 .map(|(_, value)| value)
539 .unwrap_or_default();
540 *prom_gauge = latest_value;
541 }
542 (MetricValues::Histogram(histogram_values), PrometheusValue::Histogram(prom_histogram)) => {
543 for (_, value) in histogram_values {
544 prom_histogram.merge_histogram(&value);
545 }
546 }
547 (MetricValues::Distribution(distribution_values), PrometheusValue::Summary(prom_summary)) => {
548 for (_, value) in distribution_values {
549 prom_summary.merge(&value);
550 }
551 }
552 _ => panic!("Mismatched metric types"),
553 }
554}
555
556fn normalize_metric_name(name: &str, name_buf: &mut String, interner: &FixedSizeInterner<1>) -> Option<MetaString> {
557 name_buf.clear();
558
559 for (i, c) in name.chars().enumerate() {
561 if i == 0 && is_valid_name_start_char(c) || i != 0 && is_valid_name_char(c) {
562 name_buf.push(c);
563 } else {
564 name_buf.push_str(if c == '.' { "__" } else { "_" });
570 }
571 }
572
573 interner.try_intern(name_buf).map(MetaString::from)
575}
576
577#[inline]
578fn is_valid_name_start_char(c: char) -> bool {
579 c.is_ascii_alphabetic() || c == '_' || c == ':'
581}
582
583#[inline]
584fn is_valid_name_char(c: char) -> bool {
585 c.is_ascii_alphanumeric() || c == '_' || c == ':'
587}
588
589#[derive(Clone)]
590struct PrometheusHistogram {
591 sum: f64,
592 count: u64,
593 buckets: Vec<(f64, &'static str, u64)>,
594}
595
596impl PrometheusHistogram {
597 fn new(metric_name: &str) -> Self {
598 let base_buckets = if metric_name.ends_with("_seconds") {
600 &TIME_HISTOGRAM_BUCKETS[..]
601 } else {
602 &NON_TIME_HISTOGRAM_BUCKETS[..]
603 };
604
605 let buckets = base_buckets
606 .iter()
607 .map(|(upper_bound, upper_bound_str)| (*upper_bound, *upper_bound_str, 0))
608 .collect();
609
610 Self {
611 sum: 0.0,
612 count: 0,
613 buckets,
614 }
615 }
616
617 fn merge_histogram(&mut self, histogram: &Histogram) {
618 for sample in histogram.samples() {
619 self.add_sample(sample.value.into_inner(), sample.weight);
620 }
621 }
622
623 fn add_sample(&mut self, value: f64, weight: u64) {
624 self.sum += value * weight as f64;
625 self.count += weight;
626
627 for (upper_bound, _, count) in &mut self.buckets {
629 if value <= *upper_bound {
630 *count += weight;
631 }
632 }
633 }
634
635 fn buckets(&self) -> impl Iterator<Item = (&'static str, u64)> + '_ {
636 self.buckets
637 .iter()
638 .map(|(_, upper_bound_str, count)| (*upper_bound_str, *count))
639 }
640}
641
642fn histogram_buckets<const N: usize>(base: f64, scale: f64) -> [(f64, &'static str); N] {
643 let mut buckets = [(0.0, ""); N];
651
652 let log_linear_buckets = std::iter::repeat(base).enumerate().flat_map(|(i, base)| {
653 let pow = scale.powf(i as f64);
654 let value = base * pow;
655
656 let next_pow = scale.powf((i + 1) as f64);
657 let next_value = base * next_pow;
658 let midpoint = (value + next_value) / 2.0;
659
660 [value, midpoint]
661 });
662
663 for (i, current_le) in log_linear_buckets.enumerate().take(N) {
664 let (bucket_le, bucket_le_str) = &mut buckets[i];
665 let current_le_str = format!("{}", current_le);
666
667 *bucket_le = current_le;
668 *bucket_le_str = current_le_str.leak();
669 }
670
671 buckets
672}
673
674#[cfg(test)]
675mod tests {
676 use super::*;
677
678 #[test]
679 fn bucket_print() {
680 println!("time buckets: {:?}", *TIME_HISTOGRAM_BUCKETS);
681 println!("non-time buckets: {:?}", *NON_TIME_HISTOGRAM_BUCKETS);
682 }
683
684 #[test]
685 fn prom_histogram_add_sample() {
686 let sample1 = (0.25, 1);
687 let sample2 = (1.0, 2);
688 let sample3 = (2.0, 3);
689
690 let mut histogram = PrometheusHistogram::new("time_metric_seconds");
691 histogram.add_sample(sample1.0, sample1.1);
692 histogram.add_sample(sample2.0, sample2.1);
693 histogram.add_sample(sample3.0, sample3.1);
694
695 let sample1_weighted_value = sample1.0 * sample1.1 as f64;
696 let sample2_weighted_value = sample2.0 * sample2.1 as f64;
697 let sample3_weighted_value = sample3.0 * sample3.1 as f64;
698 let expected_sum = sample1_weighted_value + sample2_weighted_value + sample3_weighted_value;
699 let expected_count = sample1.1 + sample2.1 + sample3.1;
700 assert_eq!(histogram.sum, expected_sum);
701 assert_eq!(histogram.count, expected_count);
702
703 let mut expected_bucket_count = 0;
705 for sample in [sample1, sample2, sample3] {
706 for bucket in &histogram.buckets {
707 if sample.0 <= bucket.0 {
710 assert!(bucket.2 >= expected_bucket_count + sample.1);
711 }
712 }
713
714 expected_bucket_count += sample.1;
716 }
717 }
718
719 #[test]
720 fn prom_get_help_text() {
721 assert_eq!(
723 get_help_text("no_aggregation__flush"),
724 Some("Count the number of flushes done by the no-aggregation pipeline worker")
725 );
726 assert_eq!(
727 get_help_text("no_aggregation__processed"),
728 Some("Count the number of samples processed by the no-aggregation pipeline worker")
729 );
730 assert_eq!(
731 get_help_text("aggregator__dogstatsd_contexts_by_mtype"),
732 Some("Count the number of dogstatsd contexts in the aggregator, by metric type")
733 );
734 assert_eq!(
735 get_help_text("aggregator__flush"),
736 Some("Number of metrics/service checks/events flushed")
737 );
738 assert_eq!(
739 get_help_text("aggregator__dogstatsd_contexts_bytes_by_mtype"),
740 Some("Estimated count of bytes taken by contexts in the aggregator, by metric type")
741 );
742 assert_eq!(
743 get_help_text("aggregator__dogstatsd_contexts"),
744 Some("Count the number of dogstatsd contexts in the aggregator")
745 );
746 assert_eq!(
747 get_help_text("aggregator__processed"),
748 Some("Amount of metrics/services_checks/events processed by the aggregator")
749 );
750 }
751}