1use std::{
2 convert::Infallible,
3 fmt::Write as _,
4 num::NonZeroUsize,
5 sync::{Arc, LazyLock},
6};
7
8use async_trait::async_trait;
9use ddsketch::DDSketch;
10use http::{Request, Response};
11use hyper::{body::Incoming, service::service_fn};
12use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
13use saluki_common::{collections::FastIndexMap, iter::ReusableDeduplicator};
14use saluki_context::{tags::Tag, Context};
15use saluki_core::components::{destinations::*, ComponentContext};
16use saluki_core::data_model::event::{
17 metric::{Histogram, Metric, MetricValues},
18 EventType,
19};
20use saluki_error::GenericError;
21use saluki_io::net::{
22 listener::ConnectionOrientedListener,
23 server::http::{ErrorHandle, HttpServer, ShutdownHandle},
24 ListenAddress,
25};
26use serde::Deserialize;
27use stringtheory::{
28 interning::{FixedSizeInterner, Interner as _},
29 MetaString,
30};
31use tokio::{select, sync::RwLock};
32use tracing::debug;
33
34const CONTEXT_LIMIT: usize = 10_000;
35const PAYLOAD_SIZE_LIMIT_BYTES: usize = 1024 * 1024;
36const PAYLOAD_BUFFER_SIZE_LIMIT_BYTES: usize = 128 * 1024;
37const TAGS_BUFFER_SIZE_LIMIT_BYTES: usize = 2048;
38const NAME_NORMALIZATION_BUFFER_SIZE: usize = 512;
39
40const TIME_HISTOGRAM_BUCKET_COUNT: usize = 30;
42static TIME_HISTOGRAM_BUCKETS: LazyLock<[(f64, &'static str); TIME_HISTOGRAM_BUCKET_COUNT]> =
43 LazyLock::new(|| histogram_buckets::<TIME_HISTOGRAM_BUCKET_COUNT>(0.000000128, 4.0));
44
45const NON_TIME_HISTOGRAM_BUCKET_COUNT: usize = 30;
46static NON_TIME_HISTOGRAM_BUCKETS: LazyLock<[(f64, &'static str); NON_TIME_HISTOGRAM_BUCKET_COUNT]> =
47 LazyLock::new(|| histogram_buckets::<NON_TIME_HISTOGRAM_BUCKET_COUNT>(1.0, 2.0));
48
49const METRIC_NAME_STRING_INTERNER_BYTES: NonZeroUsize = NonZeroUsize::new(65536).unwrap();
51
52#[derive(Deserialize)]
70pub struct PrometheusConfiguration {
71 #[serde(rename = "prometheus_listen_addr")]
72 listen_addr: ListenAddress,
73}
74
75impl PrometheusConfiguration {
76 pub fn from_listen_address(listen_addr: ListenAddress) -> Self {
78 Self { listen_addr }
79 }
80}
81
82#[async_trait]
83impl DestinationBuilder for PrometheusConfiguration {
84 fn input_event_type(&self) -> EventType {
85 EventType::Metric
86 }
87
88 async fn build(&self, _context: ComponentContext) -> Result<Box<dyn Destination + Send>, GenericError> {
89 Ok(Box::new(Prometheus {
90 listener: ConnectionOrientedListener::from_listen_address(self.listen_addr.clone()).await?,
91 metrics: FastIndexMap::default(),
92 payload: Arc::new(RwLock::new(String::new())),
93 payload_buffer: String::with_capacity(PAYLOAD_BUFFER_SIZE_LIMIT_BYTES),
94 tags_buffer: String::with_capacity(TAGS_BUFFER_SIZE_LIMIT_BYTES),
95 interner: FixedSizeInterner::new(METRIC_NAME_STRING_INTERNER_BYTES),
96 }))
97 }
98}
99
100impl MemoryBounds for PrometheusConfiguration {
101 fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
102 builder
103 .minimum()
104 .with_single_value::<Prometheus>("component struct")
106 .with_fixed_amount("name normalization buffer size", NAME_NORMALIZATION_BUFFER_SIZE);
110
111 builder
112 .firm()
113 .with_map::<Context, PrometheusValue>("state map", CONTEXT_LIMIT)
117 .with_fixed_amount("payload size", PAYLOAD_SIZE_LIMIT_BYTES)
118 .with_fixed_amount("payload buffer", PAYLOAD_BUFFER_SIZE_LIMIT_BYTES)
119 .with_fixed_amount("tags buffer", TAGS_BUFFER_SIZE_LIMIT_BYTES);
120 }
121}
122
123struct Prometheus {
124 listener: ConnectionOrientedListener,
125 metrics: FastIndexMap<PrometheusContext, FastIndexMap<Context, PrometheusValue>>,
126 payload: Arc<RwLock<String>>,
127 payload_buffer: String,
128 tags_buffer: String,
129 interner: FixedSizeInterner<1>,
130}
131
132#[async_trait]
133impl Destination for Prometheus {
134 async fn run(mut self: Box<Self>, mut context: DestinationContext) -> Result<(), GenericError> {
135 let Self {
136 listener,
137 mut metrics,
138 payload,
139 mut payload_buffer,
140 mut tags_buffer,
141 interner,
142 } = *self;
143
144 let mut health = context.take_health_handle();
145
146 let (http_shutdown, mut http_error) = spawn_prom_scrape_service(listener, Arc::clone(&payload));
147 health.mark_ready();
148
149 debug!("Prometheus destination started.");
150
151 let mut contexts = 0;
152 let mut name_buf = String::with_capacity(NAME_NORMALIZATION_BUFFER_SIZE);
153 let mut tags_deduplicator = ReusableDeduplicator::new();
154
155 loop {
156 select! {
157 _ = health.live() => continue,
158 maybe_events = context.events().next() => match maybe_events {
159 Some(events) => {
160 for event in events {
163 if let Some(metric) = event.try_into_metric() {
164 let prom_context = match into_prometheus_metric(&metric, &mut name_buf, &interner) {
168 Some(prom_context) => prom_context,
169 None => continue,
170 };
171
172 let (context, values, _) = metric.into_parts();
173
174 let existing_contexts = metrics.entry(prom_context.clone()).or_default();
176 match existing_contexts.get_mut(&context) {
177 Some(existing_prom_value) => merge_metric_values_with_prom_value(values, existing_prom_value),
178 None => {
179 if contexts >= CONTEXT_LIMIT {
180 debug!("Prometheus destination reached context limit. Skipping metric '{}'.", context.name());
181 continue
182 }
183
184 let mut new_prom_value = get_prom_value_for_prom_context(&prom_context);
185 merge_metric_values_with_prom_value(values, &mut new_prom_value);
186
187 existing_contexts.insert(context, new_prom_value);
188 contexts += 1;
189 }
190 }
191 }
192 }
193
194 regenerate_payload(&metrics, &payload, &mut payload_buffer, &mut tags_buffer, &mut tags_deduplicator).await;
196 },
197 None => break,
198 },
199 error = &mut http_error => {
200 if let Some(error) = error {
201 debug!(%error, "HTTP server error.");
202 }
203 break;
204 },
205 }
206 }
207
208 http_shutdown.shutdown();
211
212 debug!("Prometheus destination stopped.");
213
214 Ok(())
215 }
216}
217
218fn spawn_prom_scrape_service(
219 listener: ConnectionOrientedListener, payload: Arc<RwLock<String>>,
220) -> (ShutdownHandle, ErrorHandle) {
221 let service = service_fn(move |_: Request<Incoming>| {
222 let payload = Arc::clone(&payload);
223 async move {
224 let payload = payload.read().await;
225 Ok::<Response<String>, Infallible>(Response::new(payload.to_string()))
226 }
227 });
228
229 let http_server = HttpServer::from_listener(listener, service);
230 http_server.listen()
231}
232
233#[allow(clippy::mutable_key_type)]
234async fn regenerate_payload(
235 metrics: &FastIndexMap<PrometheusContext, FastIndexMap<Context, PrometheusValue>>, payload: &Arc<RwLock<String>>,
236 payload_buffer: &mut String, tags_buffer: &mut String, tags_deduplicator: &mut ReusableDeduplicator<Tag>,
237) {
238 let mut payload = payload.write().await;
239 payload.clear();
240
241 let mut metrics_written = 0;
242 let metrics_total = metrics.len();
243
244 for (prom_context, contexts) in metrics {
245 if write_metrics(payload_buffer, tags_buffer, prom_context, contexts, tags_deduplicator) {
246 if payload.len() + payload_buffer.len() > PAYLOAD_SIZE_LIMIT_BYTES {
247 debug!(
248 metrics_written,
249 metrics_total,
250 payload_len = payload.len(),
251 "Writing additional metrics would exceed payload size limit. Skipping remaining metrics."
252 );
253 break;
254 }
255
256 if metrics_written > 0 {
258 payload.push('\n');
259 }
260
261 payload.push_str(payload_buffer);
262
263 metrics_written += 1;
264 } else {
265 debug!("Failed to write metric to payload. Continuing...");
266 }
267 }
268}
269
270fn get_help_text(metric_name: &str) -> Option<&'static str> {
271 match metric_name {
274 "no_aggregation__flush" => Some("Count the number of flushes done by the no-aggregation pipeline worker"),
275 "no_aggregation__processed" => {
276 Some("Count the number of samples processed by the no-aggregation pipeline worker")
277 }
278 "aggregator__dogstatsd_contexts_by_mtype" => {
279 Some("Count the number of dogstatsd contexts in the aggregator, by metric type")
280 }
281 "aggregator__flush" => Some("Number of metrics/service checks/events flushed"),
282 "aggregator__dogstatsd_contexts_bytes_by_mtype" => {
283 Some("Estimated count of bytes taken by contexts in the aggregator, by metric type")
284 }
285 "aggregator__dogstatsd_contexts" => Some("Count the number of dogstatsd contexts in the aggregator"),
286 "aggregator__processed" => Some("Amount of metrics/services_checks/events processed by the aggregator"),
287 "dogstatsd__processed" => Some("Count of service checks/events/metrics processed by dogstatsd"),
288 "dogstatsd__packet_pool_get" => Some("Count of get done in the packet pool"),
289 "dogstatsd__packet_pool_put" => Some("Count of put done in the packet pool"),
290 "dogstatsd__packet_pool" => Some("Usage of the packet pool in dogstatsd"),
291 "transactions__errors" => Some("Count of transactions errored grouped by type of error"),
292 "transactions__http_errors" => Some("Count of transactions http errors per http code"),
293 "transactions__dropped" => Some("Transaction drop count"),
294 "transactions__success" => Some("Successful transaction count"),
295 "transactions__success_bytes" => Some("Successful transaction sizes in bytes"),
296 _ => None,
297 }
298}
299
300fn write_metrics(
301 payload_buffer: &mut String, tags_buffer: &mut String, prom_context: &PrometheusContext,
302 contexts: &FastIndexMap<Context, PrometheusValue>, tags_deduplicator: &mut ReusableDeduplicator<Tag>,
303) -> bool {
304 if contexts.is_empty() {
305 debug!("No contexts for metric '{}'. Skipping.", prom_context.metric_name);
306 return true;
307 }
308
309 payload_buffer.clear();
310
311 if let Some(help_text) = get_help_text(prom_context.metric_name.as_ref()) {
313 writeln!(payload_buffer, "# HELP {} {}", prom_context.metric_name, help_text).unwrap();
314 }
315 writeln!(
317 payload_buffer,
318 "# TYPE {} {}",
319 prom_context.metric_name,
320 prom_context.metric_type.as_str()
321 )
322 .unwrap();
323
324 for (context, values) in contexts {
325 if payload_buffer.len() > PAYLOAD_BUFFER_SIZE_LIMIT_BYTES {
326 debug!("Payload buffer size limit exceeded. Additional contexts for this metric will be truncated.");
327 break;
328 }
329
330 tags_buffer.clear();
331
332 if !format_tags(tags_buffer, context, tags_deduplicator) {
334 return false;
335 }
336
337 match values {
339 PrometheusValue::Counter(value) | PrometheusValue::Gauge(value) => {
340 payload_buffer.push_str(&prom_context.metric_name);
342 if !tags_buffer.is_empty() {
343 payload_buffer.push('{');
344 payload_buffer.push_str(tags_buffer);
345 payload_buffer.push('}');
346 }
347 writeln!(payload_buffer, " {}", value).unwrap();
348 }
349 PrometheusValue::Histogram(histogram) => {
350 for (upper_bound_str, count) in histogram.buckets() {
352 write!(payload_buffer, "{}_bucket{{{}", &prom_context.metric_name, tags_buffer).unwrap();
353 if !tags_buffer.is_empty() {
354 payload_buffer.push(',');
355 }
356 writeln!(payload_buffer, "le=\"{}\"}} {}", upper_bound_str, count).unwrap();
357 }
358
359 write!(payload_buffer, "{}_bucket{{{}", &prom_context.metric_name, tags_buffer).unwrap();
361 if !tags_buffer.is_empty() {
362 payload_buffer.push(',');
363 }
364 writeln!(payload_buffer, "le=\"+Inf\"}} {}", histogram.count).unwrap();
365
366 write!(payload_buffer, "{}_sum", &prom_context.metric_name).unwrap();
368 if !tags_buffer.is_empty() {
369 payload_buffer.push('{');
370 payload_buffer.push_str(tags_buffer);
371 payload_buffer.push('}');
372 }
373 writeln!(payload_buffer, " {}", histogram.sum).unwrap();
374
375 write!(payload_buffer, "{}_count", &prom_context.metric_name).unwrap();
376 if !tags_buffer.is_empty() {
377 payload_buffer.push('{');
378 payload_buffer.push_str(tags_buffer);
379 payload_buffer.push('}');
380 }
381 writeln!(payload_buffer, " {}", histogram.count).unwrap();
382 }
383 PrometheusValue::Summary(sketch) => {
384 for quantile in [0.1, 0.25, 0.5, 0.95, 0.99, 0.999] {
387 let q_value = sketch.quantile(quantile).unwrap_or_default();
388
389 write!(payload_buffer, "{}{{{}", &prom_context.metric_name, tags_buffer).unwrap();
390 if !tags_buffer.is_empty() {
391 payload_buffer.push(',');
392 }
393 writeln!(payload_buffer, "quantile=\"{}\"}} {}", quantile, q_value).unwrap();
394 }
395
396 write!(payload_buffer, "{}_sum", &prom_context.metric_name).unwrap();
397 if !tags_buffer.is_empty() {
398 payload_buffer.push('{');
399 payload_buffer.push_str(tags_buffer);
400 payload_buffer.push('}');
401 }
402 writeln!(payload_buffer, " {}", sketch.sum().unwrap_or_default()).unwrap();
403
404 write!(payload_buffer, "{}_count", &prom_context.metric_name).unwrap();
405 if !tags_buffer.is_empty() {
406 payload_buffer.push('{');
407 payload_buffer.push_str(tags_buffer);
408 payload_buffer.push('}');
409 }
410 writeln!(payload_buffer, " {}", sketch.count()).unwrap();
411 }
412 }
413 }
414
415 true
416}
417
418fn format_tags(tags_buffer: &mut String, context: &Context, tags_deduplicator: &mut ReusableDeduplicator<Tag>) -> bool {
419 let mut has_tags = false;
420
421 let chained_tags = context.tags().into_iter().chain(context.origin_tags());
422 let deduplicated_tags = tags_deduplicator.deduplicated(chained_tags);
423
424 for tag in deduplicated_tags {
425 if has_tags {
427 tags_buffer.push(',');
428 }
429
430 let tag_name = tag.name();
431 let tag_value = match tag.value() {
432 Some(value) => value,
433 None => {
434 debug!("Skipping bare tag.");
435 continue;
436 }
437 };
438
439 has_tags = true;
440
441 if tags_buffer.len() + tag_name.len() + tag_value.len() + 4 > TAGS_BUFFER_SIZE_LIMIT_BYTES {
444 debug!("Tags buffer size limit exceeded. Tags may be missing from this metric.");
445 return false;
446 }
447
448 write!(tags_buffer, "{}=\"{}\"", tag_name, tag_value).unwrap();
449 }
450
451 true
452}
453
454#[derive(Clone, Copy, Eq, Hash, Ord, PartialEq, PartialOrd)]
455enum PrometheusType {
456 Counter,
457 Gauge,
458 Histogram,
459 Summary,
460}
461
462impl PrometheusType {
463 fn as_str(&self) -> &'static str {
464 match self {
465 Self::Counter => "counter",
466 Self::Gauge => "gauge",
467 Self::Histogram => "histogram",
468 Self::Summary => "summary",
469 }
470 }
471}
472
473#[derive(Clone, Eq, Hash, Ord, PartialEq, PartialOrd)]
474struct PrometheusContext {
475 metric_name: MetaString,
476 metric_type: PrometheusType,
477}
478
479enum PrometheusValue {
480 Counter(f64),
481 Gauge(f64),
482 Histogram(PrometheusHistogram),
483 Summary(DDSketch),
484}
485
486fn into_prometheus_metric(
487 metric: &Metric, name_buf: &mut String, interner: &FixedSizeInterner<1>,
488) -> Option<PrometheusContext> {
489 let metric_name = match normalize_metric_name(metric.context().name(), name_buf, interner) {
491 Some(name) => name,
492 None => {
493 debug!(
494 "Failed to intern normalized metric name. Skipping metric '{}'.",
495 metric.context().name()
496 );
497 return None;
498 }
499 };
500
501 let metric_type = match metric.values() {
502 MetricValues::Counter(_) => PrometheusType::Counter,
503 MetricValues::Gauge(_) | MetricValues::Set(_) => PrometheusType::Gauge,
504 MetricValues::Histogram(_) => PrometheusType::Histogram,
505 MetricValues::Distribution(_) => PrometheusType::Summary,
506 _ => return None,
507 };
508
509 Some(PrometheusContext {
510 metric_name,
511 metric_type,
512 })
513}
514
515fn get_prom_value_for_prom_context(prom_context: &PrometheusContext) -> PrometheusValue {
516 match prom_context.metric_type {
517 PrometheusType::Counter => PrometheusValue::Counter(0.0),
518 PrometheusType::Gauge => PrometheusValue::Gauge(0.0),
519 PrometheusType::Histogram => PrometheusValue::Histogram(PrometheusHistogram::new(&prom_context.metric_name)),
520 PrometheusType::Summary => PrometheusValue::Summary(DDSketch::default()),
521 }
522}
523
524fn merge_metric_values_with_prom_value(values: MetricValues, prom_value: &mut PrometheusValue) {
525 match (values, prom_value) {
526 (MetricValues::Counter(counter_values), PrometheusValue::Counter(prom_counter)) => {
527 for (_, value) in counter_values {
528 *prom_counter += value;
529 }
530 }
531 (MetricValues::Gauge(gauge_values), PrometheusValue::Gauge(prom_gauge)) => {
532 let latest_value = gauge_values
533 .into_iter()
534 .max_by_key(|(ts, _)| ts.map(|v| v.get()).unwrap_or_default())
535 .map(|(_, value)| value)
536 .unwrap_or_default();
537 *prom_gauge = latest_value;
538 }
539 (MetricValues::Set(set_values), PrometheusValue::Gauge(prom_gauge)) => {
540 let latest_value = set_values
541 .into_iter()
542 .max_by_key(|(ts, _)| ts.map(|v| v.get()).unwrap_or_default())
543 .map(|(_, value)| value)
544 .unwrap_or_default();
545 *prom_gauge = latest_value;
546 }
547 (MetricValues::Histogram(histogram_values), PrometheusValue::Histogram(prom_histogram)) => {
548 for (_, value) in histogram_values {
549 prom_histogram.merge_histogram(&value);
550 }
551 }
552 (MetricValues::Distribution(distribution_values), PrometheusValue::Summary(prom_summary)) => {
553 for (_, value) in distribution_values {
554 prom_summary.merge(&value);
555 }
556 }
557 _ => panic!("Mismatched metric types"),
558 }
559}
560
561fn normalize_metric_name(name: &str, name_buf: &mut String, interner: &FixedSizeInterner<1>) -> Option<MetaString> {
562 name_buf.clear();
563
564 for (i, c) in name.chars().enumerate() {
566 if i == 0 && is_valid_name_start_char(c) || i != 0 && is_valid_name_char(c) {
567 name_buf.push(c);
568 } else {
569 name_buf.push_str(if c == '.' { "__" } else { "_" });
575 }
576 }
577
578 interner.try_intern(name_buf).map(MetaString::from)
580}
581
582#[inline]
583fn is_valid_name_start_char(c: char) -> bool {
584 c.is_ascii_alphabetic() || c == '_' || c == ':'
586}
587
588#[inline]
589fn is_valid_name_char(c: char) -> bool {
590 c.is_ascii_alphanumeric() || c == '_' || c == ':'
592}
593
594#[derive(Clone)]
595struct PrometheusHistogram {
596 sum: f64,
597 count: u64,
598 buckets: Vec<(f64, &'static str, u64)>,
599}
600
601impl PrometheusHistogram {
602 fn new(metric_name: &str) -> Self {
603 let base_buckets = if metric_name.ends_with("_seconds") {
605 &TIME_HISTOGRAM_BUCKETS[..]
606 } else {
607 &NON_TIME_HISTOGRAM_BUCKETS[..]
608 };
609
610 let buckets = base_buckets
611 .iter()
612 .map(|(upper_bound, upper_bound_str)| (*upper_bound, *upper_bound_str, 0))
613 .collect();
614
615 Self {
616 sum: 0.0,
617 count: 0,
618 buckets,
619 }
620 }
621
622 fn merge_histogram(&mut self, histogram: &Histogram) {
623 for sample in histogram.samples() {
624 self.add_sample(sample.value.into_inner(), sample.weight);
625 }
626 }
627
628 fn add_sample(&mut self, value: f64, weight: u64) {
629 self.sum += value * weight as f64;
630 self.count += weight;
631
632 for (upper_bound, _, count) in &mut self.buckets {
634 if value <= *upper_bound {
635 *count += weight;
636 }
637 }
638 }
639
640 fn buckets(&self) -> impl Iterator<Item = (&'static str, u64)> + '_ {
641 self.buckets
642 .iter()
643 .map(|(_, upper_bound_str, count)| (*upper_bound_str, *count))
644 }
645}
646
647fn histogram_buckets<const N: usize>(base: f64, scale: f64) -> [(f64, &'static str); N] {
648 let mut buckets = [(0.0, ""); N];
656
657 let log_linear_buckets = std::iter::repeat(base).enumerate().flat_map(|(i, base)| {
658 let pow = scale.powf(i as f64);
659 let value = base * pow;
660
661 let next_pow = scale.powf((i + 1) as f64);
662 let next_value = base * next_pow;
663 let midpoint = (value + next_value) / 2.0;
664
665 [value, midpoint]
666 });
667
668 for (i, current_le) in log_linear_buckets.enumerate().take(N) {
669 let (bucket_le, bucket_le_str) = &mut buckets[i];
670 let current_le_str = format!("{}", current_le);
671
672 *bucket_le = current_le;
673 *bucket_le_str = current_le_str.leak();
674 }
675
676 buckets
677}
678
679#[cfg(test)]
680mod tests {
681 use super::*;
682
683 #[test]
684 fn bucket_print() {
685 println!("time buckets: {:?}", *TIME_HISTOGRAM_BUCKETS);
686 println!("non-time buckets: {:?}", *NON_TIME_HISTOGRAM_BUCKETS);
687 }
688
689 #[test]
690 fn prom_histogram_add_sample() {
691 let sample1 = (0.25, 1);
692 let sample2 = (1.0, 2);
693 let sample3 = (2.0, 3);
694
695 let mut histogram = PrometheusHistogram::new("time_metric_seconds");
696 histogram.add_sample(sample1.0, sample1.1);
697 histogram.add_sample(sample2.0, sample2.1);
698 histogram.add_sample(sample3.0, sample3.1);
699
700 let sample1_weighted_value = sample1.0 * sample1.1 as f64;
701 let sample2_weighted_value = sample2.0 * sample2.1 as f64;
702 let sample3_weighted_value = sample3.0 * sample3.1 as f64;
703 let expected_sum = sample1_weighted_value + sample2_weighted_value + sample3_weighted_value;
704 let expected_count = sample1.1 + sample2.1 + sample3.1;
705 assert_eq!(histogram.sum, expected_sum);
706 assert_eq!(histogram.count, expected_count);
707
708 let mut expected_bucket_count = 0;
710 for sample in [sample1, sample2, sample3] {
711 for bucket in &histogram.buckets {
712 if sample.0 <= bucket.0 {
715 assert!(bucket.2 >= expected_bucket_count + sample.1);
716 }
717 }
718
719 expected_bucket_count += sample.1;
721 }
722 }
723
724 #[test]
725 fn prom_get_help_text() {
726 assert_eq!(
728 get_help_text("no_aggregation__flush"),
729 Some("Count the number of flushes done by the no-aggregation pipeline worker")
730 );
731 assert_eq!(
732 get_help_text("no_aggregation__processed"),
733 Some("Count the number of samples processed by the no-aggregation pipeline worker")
734 );
735 assert_eq!(
736 get_help_text("aggregator__dogstatsd_contexts_by_mtype"),
737 Some("Count the number of dogstatsd contexts in the aggregator, by metric type")
738 );
739 assert_eq!(
740 get_help_text("aggregator__flush"),
741 Some("Number of metrics/service checks/events flushed")
742 );
743 assert_eq!(
744 get_help_text("aggregator__dogstatsd_contexts_bytes_by_mtype"),
745 Some("Estimated count of bytes taken by contexts in the aggregator, by metric type")
746 );
747 assert_eq!(
748 get_help_text("aggregator__dogstatsd_contexts"),
749 Some("Count the number of dogstatsd contexts in the aggregator")
750 );
751 assert_eq!(
752 get_help_text("aggregator__processed"),
753 Some("Amount of metrics/services_checks/events processed by the aggregator")
754 );
755 }
756}