1use std::{
2 convert::Infallible,
3 fmt::Write as _,
4 num::NonZeroUsize,
5 sync::{Arc, LazyLock},
6};
7
8use async_trait::async_trait;
9use ddsketch_agent::DDSketch;
10use http::{Request, Response};
11use hyper::{body::Incoming, service::service_fn};
12use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
13use saluki_common::{collections::FastIndexMap, iter::ReusableDeduplicator};
14use saluki_config::GenericConfiguration;
15use saluki_context::{tags::Tag, Context};
16use saluki_core::components::{destinations::*, ComponentContext};
17use saluki_core::data_model::event::{
18 metric::{Histogram, Metric, MetricValues},
19 EventType,
20};
21use saluki_error::GenericError;
22use saluki_io::net::{
23 listener::ConnectionOrientedListener,
24 server::http::{ErrorHandle, HttpServer, ShutdownHandle},
25 ListenAddress,
26};
27use serde::Deserialize;
28use stringtheory::{
29 interning::{FixedSizeInterner, Interner as _},
30 MetaString,
31};
32use tokio::{select, sync::RwLock};
33use tracing::debug;
34
35const CONTEXT_LIMIT: usize = 10_000;
36const PAYLOAD_SIZE_LIMIT_BYTES: usize = 1024 * 1024;
37const PAYLOAD_BUFFER_SIZE_LIMIT_BYTES: usize = 128 * 1024;
38const TAGS_BUFFER_SIZE_LIMIT_BYTES: usize = 2048;
39const NAME_NORMALIZATION_BUFFER_SIZE: usize = 512;
40
41const TIME_HISTOGRAM_BUCKET_COUNT: usize = 30;
43static TIME_HISTOGRAM_BUCKETS: LazyLock<[(f64, &'static str); TIME_HISTOGRAM_BUCKET_COUNT]> =
44 LazyLock::new(|| histogram_buckets::<TIME_HISTOGRAM_BUCKET_COUNT>(0.000000128, 4.0));
45
46const NON_TIME_HISTOGRAM_BUCKET_COUNT: usize = 30;
47static NON_TIME_HISTOGRAM_BUCKETS: LazyLock<[(f64, &'static str); NON_TIME_HISTOGRAM_BUCKET_COUNT]> =
48 LazyLock::new(|| histogram_buckets::<NON_TIME_HISTOGRAM_BUCKET_COUNT>(1.0, 2.0));
49
50const METRIC_NAME_STRING_INTERNER_BYTES: NonZeroUsize = NonZeroUsize::new(65536).unwrap();
52
53#[derive(Deserialize)]
71pub struct PrometheusConfiguration {
72 #[serde(rename = "prometheus_listen_addr")]
73 listen_addr: ListenAddress,
74}
75
76impl PrometheusConfiguration {
77 pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
79 Ok(config.as_typed()?)
80 }
81
82 pub fn listen_address(&self) -> &ListenAddress {
84 &self.listen_addr
85 }
86}
87
88#[async_trait]
89impl DestinationBuilder for PrometheusConfiguration {
90 fn input_event_type(&self) -> EventType {
91 EventType::Metric
92 }
93
94 async fn build(&self, _context: ComponentContext) -> Result<Box<dyn Destination + Send>, GenericError> {
95 Ok(Box::new(Prometheus {
96 listener: ConnectionOrientedListener::from_listen_address(self.listen_addr.clone()).await?,
97 metrics: FastIndexMap::default(),
98 payload: Arc::new(RwLock::new(String::new())),
99 payload_buffer: String::with_capacity(PAYLOAD_BUFFER_SIZE_LIMIT_BYTES),
100 tags_buffer: String::with_capacity(TAGS_BUFFER_SIZE_LIMIT_BYTES),
101 interner: FixedSizeInterner::new(METRIC_NAME_STRING_INTERNER_BYTES),
102 }))
103 }
104}
105
106impl MemoryBounds for PrometheusConfiguration {
107 fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
108 builder
109 .minimum()
110 .with_single_value::<Prometheus>("component struct")
112 .with_fixed_amount("name normalization buffer size", NAME_NORMALIZATION_BUFFER_SIZE);
116
117 builder
118 .firm()
119 .with_map::<Context, PrometheusValue>("state map", CONTEXT_LIMIT)
123 .with_fixed_amount("payload size", PAYLOAD_SIZE_LIMIT_BYTES)
124 .with_fixed_amount("payload buffer", PAYLOAD_BUFFER_SIZE_LIMIT_BYTES)
125 .with_fixed_amount("tags buffer", TAGS_BUFFER_SIZE_LIMIT_BYTES);
126 }
127}
128
129struct Prometheus {
130 listener: ConnectionOrientedListener,
131 metrics: FastIndexMap<PrometheusContext, FastIndexMap<Context, PrometheusValue>>,
132 payload: Arc<RwLock<String>>,
133 payload_buffer: String,
134 tags_buffer: String,
135 interner: FixedSizeInterner<1>,
136}
137
138#[async_trait]
139impl Destination for Prometheus {
140 async fn run(mut self: Box<Self>, mut context: DestinationContext) -> Result<(), GenericError> {
141 let Self {
142 listener,
143 mut metrics,
144 payload,
145 mut payload_buffer,
146 mut tags_buffer,
147 interner,
148 } = *self;
149
150 let mut health = context.take_health_handle();
151
152 let (http_shutdown, mut http_error) = spawn_prom_scrape_service(listener, Arc::clone(&payload));
153 health.mark_ready();
154
155 debug!("Prometheus destination started.");
156
157 let mut contexts = 0;
158 let mut name_buf = String::with_capacity(NAME_NORMALIZATION_BUFFER_SIZE);
159 let mut tags_deduplicator = ReusableDeduplicator::new();
160
161 loop {
162 select! {
163 _ = health.live() => continue,
164 maybe_events = context.events().next() => match maybe_events {
165 Some(events) => {
166 for event in events {
169 if let Some(metric) = event.try_into_metric() {
170 let prom_context = match into_prometheus_metric(&metric, &mut name_buf, &interner) {
174 Some(prom_context) => prom_context,
175 None => continue,
176 };
177
178 let (context, values, _) = metric.into_parts();
179
180 let existing_contexts = metrics.entry(prom_context.clone()).or_default();
182 match existing_contexts.get_mut(&context) {
183 Some(existing_prom_value) => merge_metric_values_with_prom_value(values, existing_prom_value),
184 None => {
185 if contexts >= CONTEXT_LIMIT {
186 debug!("Prometheus destination reached context limit. Skipping metric '{}'.", context.name());
187 continue
188 }
189
190 let mut new_prom_value = get_prom_value_for_prom_context(&prom_context);
191 merge_metric_values_with_prom_value(values, &mut new_prom_value);
192
193 existing_contexts.insert(context, new_prom_value);
194 contexts += 1;
195 }
196 }
197 }
198 }
199
200 regenerate_payload(&metrics, &payload, &mut payload_buffer, &mut tags_buffer, &mut tags_deduplicator).await;
202 },
203 None => break,
204 },
205 error = &mut http_error => {
206 if let Some(error) = error {
207 debug!(%error, "HTTP server error.");
208 }
209 break;
210 },
211 }
212 }
213
214 http_shutdown.shutdown();
217
218 debug!("Prometheus destination stopped.");
219
220 Ok(())
221 }
222}
223
224fn spawn_prom_scrape_service(
225 listener: ConnectionOrientedListener, payload: Arc<RwLock<String>>,
226) -> (ShutdownHandle, ErrorHandle) {
227 let service = service_fn(move |_: Request<Incoming>| {
228 let payload = Arc::clone(&payload);
229 async move {
230 let payload = payload.read().await;
231 Ok::<Response<String>, Infallible>(Response::new(payload.to_string()))
232 }
233 });
234
235 let http_server = HttpServer::from_listener(listener, service);
236 http_server.listen()
237}
238
239#[allow(clippy::mutable_key_type)]
240async fn regenerate_payload(
241 metrics: &FastIndexMap<PrometheusContext, FastIndexMap<Context, PrometheusValue>>, payload: &Arc<RwLock<String>>,
242 payload_buffer: &mut String, tags_buffer: &mut String, tags_deduplicator: &mut ReusableDeduplicator<Tag>,
243) {
244 let mut payload = payload.write().await;
245 payload.clear();
246
247 let mut metrics_written = 0;
248 let metrics_total = metrics.len();
249
250 for (prom_context, contexts) in metrics {
251 if write_metrics(payload_buffer, tags_buffer, prom_context, contexts, tags_deduplicator) {
252 if payload.len() + payload_buffer.len() > PAYLOAD_SIZE_LIMIT_BYTES {
253 debug!(
254 metrics_written,
255 metrics_total,
256 payload_len = payload.len(),
257 "Writing additional metrics would exceed payload size limit. Skipping remaining metrics."
258 );
259 break;
260 }
261
262 if metrics_written > 0 {
264 payload.push('\n');
265 }
266
267 payload.push_str(payload_buffer);
268
269 metrics_written += 1;
270 } else {
271 debug!("Failed to write metric to payload. Continuing...");
272 }
273 }
274}
275
276fn get_help_text(metric_name: &str) -> Option<&'static str> {
277 match metric_name {
280 "no_aggregation__flush" => Some("Count the number of flushes done by the no-aggregation pipeline worker"),
281 "no_aggregation__processed" => {
282 Some("Count the number of samples processed by the no-aggregation pipeline worker")
283 }
284 "aggregator__dogstatsd_contexts_by_mtype" => {
285 Some("Count the number of dogstatsd contexts in the aggregator, by metric type")
286 }
287 "aggregator__flush" => Some("Number of metrics/service checks/events flushed"),
288 "aggregator__dogstatsd_contexts_bytes_by_mtype" => {
289 Some("Estimated count of bytes taken by contexts in the aggregator, by metric type")
290 }
291 "aggregator__dogstatsd_contexts" => Some("Count the number of dogstatsd contexts in the aggregator"),
292 "aggregator__processed" => Some("Amount of metrics/services_checks/events processed by the aggregator"),
293 "dogstatsd__processed" => Some("Count of service checks/events/metrics processed by dogstatsd"),
294 "dogstatsd__packet_pool_get" => Some("Count of get done in the packet pool"),
295 "dogstatsd__packet_pool_put" => Some("Count of put done in the packet pool"),
296 "dogstatsd__packet_pool" => Some("Usage of the packet pool in dogstatsd"),
297 _ => None,
298 }
299}
300
301fn write_metrics(
302 payload_buffer: &mut String, tags_buffer: &mut String, prom_context: &PrometheusContext,
303 contexts: &FastIndexMap<Context, PrometheusValue>, tags_deduplicator: &mut ReusableDeduplicator<Tag>,
304) -> bool {
305 if contexts.is_empty() {
306 debug!("No contexts for metric '{}'. Skipping.", prom_context.metric_name);
307 return true;
308 }
309
310 payload_buffer.clear();
311
312 if let Some(help_text) = get_help_text(prom_context.metric_name.as_ref()) {
314 writeln!(payload_buffer, "# HELP {} {}", prom_context.metric_name, help_text).unwrap();
315 }
316 writeln!(
318 payload_buffer,
319 "# TYPE {} {}",
320 prom_context.metric_name,
321 prom_context.metric_type.as_str()
322 )
323 .unwrap();
324
325 for (context, values) in contexts {
326 if payload_buffer.len() > PAYLOAD_BUFFER_SIZE_LIMIT_BYTES {
327 debug!("Payload buffer size limit exceeded. Additional contexts for this metric will be truncated.");
328 break;
329 }
330
331 tags_buffer.clear();
332
333 if !format_tags(tags_buffer, context, tags_deduplicator) {
335 return false;
336 }
337
338 match values {
340 PrometheusValue::Counter(value) | PrometheusValue::Gauge(value) => {
341 payload_buffer.push_str(&prom_context.metric_name);
343 if !tags_buffer.is_empty() {
344 payload_buffer.push('{');
345 payload_buffer.push_str(tags_buffer);
346 payload_buffer.push('}');
347 }
348 writeln!(payload_buffer, " {}", value).unwrap();
349 }
350 PrometheusValue::Histogram(histogram) => {
351 for (upper_bound_str, count) in histogram.buckets() {
353 write!(payload_buffer, "{}_bucket{{{}", &prom_context.metric_name, tags_buffer).unwrap();
354 if !tags_buffer.is_empty() {
355 payload_buffer.push(',');
356 }
357 writeln!(payload_buffer, "le=\"{}\"}} {}", upper_bound_str, count).unwrap();
358 }
359
360 write!(payload_buffer, "{}_bucket{{{}", &prom_context.metric_name, tags_buffer).unwrap();
362 if !tags_buffer.is_empty() {
363 payload_buffer.push(',');
364 }
365 writeln!(payload_buffer, "le=\"+Inf\"}} {}", histogram.count).unwrap();
366
367 write!(payload_buffer, "{}_sum", &prom_context.metric_name).unwrap();
369 if !tags_buffer.is_empty() {
370 payload_buffer.push('{');
371 payload_buffer.push_str(tags_buffer);
372 payload_buffer.push('}');
373 }
374 writeln!(payload_buffer, " {}", histogram.sum).unwrap();
375
376 write!(payload_buffer, "{}_count", &prom_context.metric_name).unwrap();
377 if !tags_buffer.is_empty() {
378 payload_buffer.push('{');
379 payload_buffer.push_str(tags_buffer);
380 payload_buffer.push('}');
381 }
382 writeln!(payload_buffer, " {}", histogram.count).unwrap();
383 }
384 PrometheusValue::Summary(sketch) => {
385 for quantile in [0.1, 0.25, 0.5, 0.95, 0.99, 0.999] {
388 let q_value = sketch.quantile(quantile).unwrap_or_default();
389
390 write!(payload_buffer, "{}{{{}", &prom_context.metric_name, tags_buffer).unwrap();
391 if !tags_buffer.is_empty() {
392 payload_buffer.push(',');
393 }
394 writeln!(payload_buffer, "quantile=\"{}\"}} {}", quantile, q_value).unwrap();
395 }
396
397 write!(payload_buffer, "{}_sum", &prom_context.metric_name).unwrap();
398 if !tags_buffer.is_empty() {
399 payload_buffer.push('{');
400 payload_buffer.push_str(tags_buffer);
401 payload_buffer.push('}');
402 }
403 writeln!(payload_buffer, " {}", sketch.sum().unwrap_or_default()).unwrap();
404
405 write!(payload_buffer, "{}_count", &prom_context.metric_name).unwrap();
406 if !tags_buffer.is_empty() {
407 payload_buffer.push('{');
408 payload_buffer.push_str(tags_buffer);
409 payload_buffer.push('}');
410 }
411 writeln!(payload_buffer, " {}", sketch.count()).unwrap();
412 }
413 }
414 }
415
416 true
417}
418
419fn format_tags(tags_buffer: &mut String, context: &Context, tags_deduplicator: &mut ReusableDeduplicator<Tag>) -> bool {
420 let mut has_tags = false;
421
422 let chained_tags = context.tags().into_iter().chain(context.origin_tags());
423 let deduplicated_tags = tags_deduplicator.deduplicated(chained_tags);
424
425 for tag in deduplicated_tags {
426 if has_tags {
428 tags_buffer.push(',');
429 }
430
431 let tag_name = tag.name();
432 let tag_value = match tag.value() {
433 Some(value) => value,
434 None => {
435 debug!("Skipping bare tag.");
436 continue;
437 }
438 };
439
440 has_tags = true;
441
442 if tags_buffer.len() + tag_name.len() + tag_value.len() + 4 > TAGS_BUFFER_SIZE_LIMIT_BYTES {
445 debug!("Tags buffer size limit exceeded. Tags may be missing from this metric.");
446 return false;
447 }
448
449 write!(tags_buffer, "{}=\"{}\"", tag_name, tag_value).unwrap();
450 }
451
452 true
453}
454
455#[derive(Clone, Copy, Eq, Hash, Ord, PartialEq, PartialOrd)]
456enum PrometheusType {
457 Counter,
458 Gauge,
459 Histogram,
460 Summary,
461}
462
463impl PrometheusType {
464 fn as_str(&self) -> &'static str {
465 match self {
466 Self::Counter => "counter",
467 Self::Gauge => "gauge",
468 Self::Histogram => "histogram",
469 Self::Summary => "summary",
470 }
471 }
472}
473
474#[derive(Clone, Eq, Hash, Ord, PartialEq, PartialOrd)]
475struct PrometheusContext {
476 metric_name: MetaString,
477 metric_type: PrometheusType,
478}
479
480enum PrometheusValue {
481 Counter(f64),
482 Gauge(f64),
483 Histogram(PrometheusHistogram),
484 Summary(DDSketch),
485}
486
487fn into_prometheus_metric(
488 metric: &Metric, name_buf: &mut String, interner: &FixedSizeInterner<1>,
489) -> Option<PrometheusContext> {
490 let metric_name = match normalize_metric_name(metric.context().name(), name_buf, interner) {
492 Some(name) => name,
493 None => {
494 debug!(
495 "Failed to intern normalized metric name. Skipping metric '{}'.",
496 metric.context().name()
497 );
498 return None;
499 }
500 };
501
502 let metric_type = match metric.values() {
503 MetricValues::Counter(_) => PrometheusType::Counter,
504 MetricValues::Gauge(_) | MetricValues::Set(_) => PrometheusType::Gauge,
505 MetricValues::Histogram(_) => PrometheusType::Histogram,
506 MetricValues::Distribution(_) => PrometheusType::Summary,
507 _ => return None,
508 };
509
510 Some(PrometheusContext {
511 metric_name,
512 metric_type,
513 })
514}
515
516fn get_prom_value_for_prom_context(prom_context: &PrometheusContext) -> PrometheusValue {
517 match prom_context.metric_type {
518 PrometheusType::Counter => PrometheusValue::Counter(0.0),
519 PrometheusType::Gauge => PrometheusValue::Gauge(0.0),
520 PrometheusType::Histogram => PrometheusValue::Histogram(PrometheusHistogram::new(&prom_context.metric_name)),
521 PrometheusType::Summary => PrometheusValue::Summary(DDSketch::default()),
522 }
523}
524
525fn merge_metric_values_with_prom_value(values: MetricValues, prom_value: &mut PrometheusValue) {
526 match (values, prom_value) {
527 (MetricValues::Counter(counter_values), PrometheusValue::Counter(prom_counter)) => {
528 for (_, value) in counter_values {
529 *prom_counter += value;
530 }
531 }
532 (MetricValues::Gauge(gauge_values), PrometheusValue::Gauge(prom_gauge)) => {
533 let latest_value = gauge_values
534 .into_iter()
535 .max_by_key(|(ts, _)| ts.map(|v| v.get()).unwrap_or_default())
536 .map(|(_, value)| value)
537 .unwrap_or_default();
538 *prom_gauge = latest_value;
539 }
540 (MetricValues::Set(set_values), PrometheusValue::Gauge(prom_gauge)) => {
541 let latest_value = set_values
542 .into_iter()
543 .max_by_key(|(ts, _)| ts.map(|v| v.get()).unwrap_or_default())
544 .map(|(_, value)| value)
545 .unwrap_or_default();
546 *prom_gauge = latest_value;
547 }
548 (MetricValues::Histogram(histogram_values), PrometheusValue::Histogram(prom_histogram)) => {
549 for (_, value) in histogram_values {
550 prom_histogram.merge_histogram(&value);
551 }
552 }
553 (MetricValues::Distribution(distribution_values), PrometheusValue::Summary(prom_summary)) => {
554 for (_, value) in distribution_values {
555 prom_summary.merge(&value);
556 }
557 }
558 _ => panic!("Mismatched metric types"),
559 }
560}
561
562fn normalize_metric_name(name: &str, name_buf: &mut String, interner: &FixedSizeInterner<1>) -> Option<MetaString> {
563 name_buf.clear();
564
565 for (i, c) in name.chars().enumerate() {
567 if i == 0 && is_valid_name_start_char(c) || i != 0 && is_valid_name_char(c) {
568 name_buf.push(c);
569 } else {
570 name_buf.push_str(if c == '.' { "__" } else { "_" });
576 }
577 }
578
579 interner.try_intern(name_buf).map(MetaString::from)
581}
582
583#[inline]
584fn is_valid_name_start_char(c: char) -> bool {
585 c.is_ascii_alphabetic() || c == '_' || c == ':'
587}
588
589#[inline]
590fn is_valid_name_char(c: char) -> bool {
591 c.is_ascii_alphanumeric() || c == '_' || c == ':'
593}
594
595#[derive(Clone)]
596struct PrometheusHistogram {
597 sum: f64,
598 count: u64,
599 buckets: Vec<(f64, &'static str, u64)>,
600}
601
602impl PrometheusHistogram {
603 fn new(metric_name: &str) -> Self {
604 let base_buckets = if metric_name.ends_with("_seconds") {
606 &TIME_HISTOGRAM_BUCKETS[..]
607 } else {
608 &NON_TIME_HISTOGRAM_BUCKETS[..]
609 };
610
611 let buckets = base_buckets
612 .iter()
613 .map(|(upper_bound, upper_bound_str)| (*upper_bound, *upper_bound_str, 0))
614 .collect();
615
616 Self {
617 sum: 0.0,
618 count: 0,
619 buckets,
620 }
621 }
622
623 fn merge_histogram(&mut self, histogram: &Histogram) {
624 for sample in histogram.samples() {
625 self.add_sample(sample.value.into_inner(), sample.weight);
626 }
627 }
628
629 fn add_sample(&mut self, value: f64, weight: u64) {
630 self.sum += value * weight as f64;
631 self.count += weight;
632
633 for (upper_bound, _, count) in &mut self.buckets {
635 if value <= *upper_bound {
636 *count += weight;
637 }
638 }
639 }
640
641 fn buckets(&self) -> impl Iterator<Item = (&'static str, u64)> + '_ {
642 self.buckets
643 .iter()
644 .map(|(_, upper_bound_str, count)| (*upper_bound_str, *count))
645 }
646}
647
648fn histogram_buckets<const N: usize>(base: f64, scale: f64) -> [(f64, &'static str); N] {
649 let mut buckets = [(0.0, ""); N];
657
658 let log_linear_buckets = std::iter::repeat(base).enumerate().flat_map(|(i, base)| {
659 let pow = scale.powf(i as f64);
660 let value = base * pow;
661
662 let next_pow = scale.powf((i + 1) as f64);
663 let next_value = base * next_pow;
664 let midpoint = (value + next_value) / 2.0;
665
666 [value, midpoint]
667 });
668
669 for (i, current_le) in log_linear_buckets.enumerate().take(N) {
670 let (bucket_le, bucket_le_str) = &mut buckets[i];
671 let current_le_str = format!("{}", current_le);
672
673 *bucket_le = current_le;
674 *bucket_le_str = current_le_str.leak();
675 }
676
677 buckets
678}
679
680#[cfg(test)]
681mod tests {
682 use super::*;
683
684 #[test]
685 fn bucket_print() {
686 println!("time buckets: {:?}", *TIME_HISTOGRAM_BUCKETS);
687 println!("non-time buckets: {:?}", *NON_TIME_HISTOGRAM_BUCKETS);
688 }
689
690 #[test]
691 fn prom_histogram_add_sample() {
692 let sample1 = (0.25, 1);
693 let sample2 = (1.0, 2);
694 let sample3 = (2.0, 3);
695
696 let mut histogram = PrometheusHistogram::new("time_metric_seconds");
697 histogram.add_sample(sample1.0, sample1.1);
698 histogram.add_sample(sample2.0, sample2.1);
699 histogram.add_sample(sample3.0, sample3.1);
700
701 let sample1_weighted_value = sample1.0 * sample1.1 as f64;
702 let sample2_weighted_value = sample2.0 * sample2.1 as f64;
703 let sample3_weighted_value = sample3.0 * sample3.1 as f64;
704 let expected_sum = sample1_weighted_value + sample2_weighted_value + sample3_weighted_value;
705 let expected_count = sample1.1 + sample2.1 + sample3.1;
706 assert_eq!(histogram.sum, expected_sum);
707 assert_eq!(histogram.count, expected_count);
708
709 let mut expected_bucket_count = 0;
711 for sample in [sample1, sample2, sample3] {
712 for bucket in &histogram.buckets {
713 if sample.0 <= bucket.0 {
716 assert!(bucket.2 >= expected_bucket_count + sample.1);
717 }
718 }
719
720 expected_bucket_count += sample.1;
722 }
723 }
724
725 #[test]
726 fn prom_get_help_text() {
727 assert_eq!(
729 get_help_text("no_aggregation__flush"),
730 Some("Count the number of flushes done by the no-aggregation pipeline worker")
731 );
732 assert_eq!(
733 get_help_text("no_aggregation__processed"),
734 Some("Count the number of samples processed by the no-aggregation pipeline worker")
735 );
736 assert_eq!(
737 get_help_text("aggregator__dogstatsd_contexts_by_mtype"),
738 Some("Count the number of dogstatsd contexts in the aggregator, by metric type")
739 );
740 assert_eq!(
741 get_help_text("aggregator__flush"),
742 Some("Number of metrics/service checks/events flushed")
743 );
744 assert_eq!(
745 get_help_text("aggregator__dogstatsd_contexts_bytes_by_mtype"),
746 Some("Estimated count of bytes taken by contexts in the aggregator, by metric type")
747 );
748 assert_eq!(
749 get_help_text("aggregator__dogstatsd_contexts"),
750 Some("Count the number of dogstatsd contexts in the aggregator")
751 );
752 assert_eq!(
753 get_help_text("aggregator__processed"),
754 Some("Amount of metrics/services_checks/events processed by the aggregator")
755 );
756 }
757}