1use std::{
2 num::NonZeroU64,
3 time::{Duration, Instant},
4};
5
6use async_trait::async_trait;
7use ddsketch::DDSketch;
8use hashbrown::{hash_map::Entry, HashMap};
9use memory_accounting::{MemoryBounds, MemoryBoundsBuilder, UsageExpr};
10use saluki_common::time::get_unix_timestamp;
11use saluki_config::GenericConfiguration;
12use saluki_context::Context;
13use saluki_core::{
14 components::{transforms::*, ComponentContext},
15 data_model::event::{metric::*, Event, EventType},
16 observability::ComponentMetricsExt as _,
17 topology::{interconnect::BufferedDispatcher, OutputDefinition},
18 topology::{EventsBuffer, EventsDispatcher},
19};
20use saluki_error::GenericError;
21use saluki_metrics::MetricsBuilder;
22use serde::Deserialize;
23use smallvec::SmallVec;
24use tokio::{
25 select,
26 time::{interval, interval_at},
27};
28use tracing::{debug, error, info, trace, warn};
29
30mod telemetry;
31use self::telemetry::Telemetry;
32
33mod config;
34use self::config::HistogramConfiguration;
35
36const PASSTHROUGH_IDLE_FLUSH_CHECK_INTERVAL: Duration = Duration::from_secs(2);
37
38const fn default_window_duration() -> Duration {
39 Duration::from_secs(10)
40}
41
42const fn default_primary_flush_interval() -> Duration {
43 Duration::from_secs(15)
44}
45
46const fn default_context_limit() -> usize {
47 1_000_000
48}
49
50const fn default_counter_expiry_seconds() -> Option<u64> {
51 Some(300)
52}
53
54const fn default_passthrough_timestamped_metrics() -> bool {
55 true
56}
57
58const fn default_passthrough_idle_flush_timeout() -> Duration {
59 Duration::from_secs(1)
60}
61
62#[derive(Deserialize)]
82#[cfg_attr(test, derive(Debug, PartialEq, serde::Serialize))]
83pub struct AggregateConfiguration {
84 #[serde(rename = "aggregate_window_duration", default = "default_window_duration")]
92 window_duration: Duration,
93
94 #[serde(rename = "aggregate_flush_interval", default = "default_primary_flush_interval")]
101 primary_flush_interval: Duration,
102
103 #[serde(rename = "aggregate_context_limit", default = "default_context_limit")]
114 context_limit: usize,
115
116 #[serde(
128 rename = "aggregate_flush_open_windows",
129 alias = "dogstatsd_flush_incomplete_buckets",
130 default
131 )]
132 flush_open_windows: bool,
133
134 #[serde(alias = "dogstatsd_expiry_seconds", default = "default_counter_expiry_seconds")]
147 counter_expiry_seconds: Option<u64>,
148
149 #[serde(
158 rename = "dogstatsd_no_aggregation_pipeline",
159 default = "default_passthrough_timestamped_metrics"
160 )]
161 passthrough_timestamped_metrics: bool,
162
163 #[serde(
171 rename = "aggregate_passthrough_idle_flush_timeout",
172 default = "default_passthrough_idle_flush_timeout"
173 )]
174 passthrough_idle_flush_timeout: Duration,
175
176 #[serde(flatten)]
181 hist_config: HistogramConfiguration,
182}
183
184impl AggregateConfiguration {
185 pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
187 Ok(config.as_typed()?)
188 }
189
190 pub fn with_defaults() -> Self {
192 Self {
193 window_duration: default_window_duration(),
194 primary_flush_interval: default_primary_flush_interval(),
195 context_limit: default_context_limit(),
196 flush_open_windows: false,
197 counter_expiry_seconds: default_counter_expiry_seconds(),
198 passthrough_timestamped_metrics: default_passthrough_timestamped_metrics(),
199 passthrough_idle_flush_timeout: default_passthrough_idle_flush_timeout(),
200 hist_config: HistogramConfiguration::default(),
201 }
202 }
203}
204
205#[async_trait]
206impl TransformBuilder for AggregateConfiguration {
207 async fn build(&self, context: ComponentContext) -> Result<Box<dyn Transform + Send>, GenericError> {
208 let metrics_builder = MetricsBuilder::from_component_context(&context);
209 let telemetry = Telemetry::new(&metrics_builder);
210
211 let state = AggregationState::new(
212 self.window_duration,
213 self.context_limit,
214 self.counter_expiry_seconds.filter(|s| *s != 0).map(Duration::from_secs),
215 self.hist_config.clone(),
216 telemetry.clone(),
217 );
218
219 let passthrough_batcher = PassthroughBatcher::new(
220 self.passthrough_idle_flush_timeout,
221 self.window_duration,
222 telemetry.clone(),
223 )
224 .await;
225
226 Ok(Box::new(Aggregate {
227 state,
228 telemetry,
229 primary_flush_interval: self.primary_flush_interval,
230 flush_open_windows: self.flush_open_windows,
231 passthrough_batcher,
232 passthrough_timestamped_metrics: self.passthrough_timestamped_metrics,
233 }))
234 }
235
236 fn input_event_type(&self) -> EventType {
237 EventType::Metric
238 }
239
240 fn outputs(&self) -> &[OutputDefinition<EventType>] {
241 static OUTPUTS: &[OutputDefinition<EventType>] = &[OutputDefinition::default_output(EventType::Metric)];
242 OUTPUTS
243 }
244}
245
246impl MemoryBounds for AggregateConfiguration {
247 fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
248 builder
257 .minimum()
258 .with_single_value::<Aggregate>("component struct");
260 builder
261 .firm()
262 .with_expr(UsageExpr::product(
264 "aggregation state map",
265 UsageExpr::sum(
266 "context map entry",
267 UsageExpr::struct_size::<Context>("context"),
268 UsageExpr::struct_size::<AggregatedMetric>("aggregated metric"),
269 ),
270 UsageExpr::config("aggregate_context_limit", self.context_limit),
271 ));
272 }
273}
274
275pub struct Aggregate {
276 state: AggregationState,
277 telemetry: Telemetry,
278 primary_flush_interval: Duration,
279 flush_open_windows: bool,
280 passthrough_batcher: PassthroughBatcher,
281 passthrough_timestamped_metrics: bool,
282}
283
284#[async_trait]
285impl Transform for Aggregate {
286 async fn run(mut self: Box<Self>, mut context: TransformContext) -> Result<(), GenericError> {
287 let mut health = context.take_health_handle();
288
289 let mut primary_flush = interval_at(
290 tokio::time::Instant::now() + self.primary_flush_interval,
291 self.primary_flush_interval,
292 );
293 let mut final_primary_flush = false;
294
295 let passthrough_flush = interval(PASSTHROUGH_IDLE_FLUSH_CHECK_INTERVAL);
296
297 health.mark_ready();
298 debug!("Aggregation transform started.");
299
300 tokio::pin!(passthrough_flush);
301
302 loop {
303 select! {
304 _ = health.live() => continue,
305 _ = primary_flush.tick() => {
306 if !self.state.is_empty() {
310 debug!("Flushing aggregated metrics...");
311
312 let should_flush_open_windows = final_primary_flush && self.flush_open_windows;
313
314 let was_breached = self.state.context_limit_breached();
316
317 let mut dispatcher = context.dispatcher().buffered().expect("default output should always exist");
318 if let Err(e) = self.state.flush(get_unix_timestamp(), should_flush_open_windows, &mut dispatcher).await {
319 error!(error = %e, "Failed to flush aggregation state.");
320 }
321
322 self.telemetry.increment_flushes();
323
324 if was_breached && !self.state.context_limit_breached() {
326 info!("Context limit no longer exceeded, metrics are being accepted again.");
327 }
328
329 match dispatcher.flush().await {
330 Ok(aggregated_events) => debug!(aggregated_events, "Dispatched events."),
331 Err(e) => error!(error = %e, "Failed to flush aggregated events."),
332 }
333 }
334
335 if final_primary_flush {
337 debug!("All aggregation complete.");
338 break
339 }
340 },
341 _ = passthrough_flush.tick() => self.passthrough_batcher.try_flush(context.dispatcher()).await,
342 maybe_events = context.events().next(), if !final_primary_flush => match maybe_events {
343 Some(events) => {
344 trace!(events_len = events.len(), "Received events.");
345
346 let current_time = get_unix_timestamp();
347 let mut processed_passthrough_metrics = false;
348
349 for event in events {
350 if let Some(metric) = event.try_into_metric() {
351 let metric = if self.passthrough_timestamped_metrics {
352 let (maybe_timestamped_metric, maybe_nontimestamped_metric) = try_split_timestamped_values(metric);
356
357 if let Some(timestamped_metric) = maybe_timestamped_metric {
359 self.passthrough_batcher.push_metric(timestamped_metric, context.dispatcher()).await;
360 processed_passthrough_metrics = true;
361 }
362
363 match maybe_nontimestamped_metric {
367 Some(metric) => metric,
368 None => continue,
369 }
370 } else {
371 metric
372 };
373
374 let was_breached = self.state.context_limit_breached();
375 if !self.state.insert(current_time, metric) {
376 trace!("Dropping metric due to context limit.");
377 if !was_breached {
378 warn!(context_limit = self.state.context_limit, "Context limit reached, \
380 dropping metrics. Consider increasing `aggregate_context_limit`.");
381 }
382 self.telemetry.increment_events_dropped();
383 }
384 }
385 }
386
387 if processed_passthrough_metrics {
388 self.passthrough_batcher.update_last_processed_at();
389 }
390 },
391 None => {
392 final_primary_flush = true;
395 primary_flush.reset_immediately();
396
397 debug!("Aggregation transform stopping...");
398 }
399 },
400 }
401 }
402
403 self.passthrough_batcher.try_flush(context.dispatcher()).await;
405
406 debug!("Aggregation transform stopped.");
407
408 Ok(())
409 }
410}
411
412fn try_split_timestamped_values(mut metric: Metric) -> (Option<Metric>, Option<Metric>) {
413 if metric.values().all_timestamped() {
414 (Some(metric), None)
415 } else if metric.values().any_timestamped() {
416 let new_metric_values = metric.values_mut().split_timestamped();
418 let new_metric = Metric::from_parts(metric.context().clone(), new_metric_values, metric.metadata().clone());
419
420 (Some(new_metric), Some(metric))
421 } else {
422 (None, Some(metric))
424 }
425}
426
427struct PassthroughBatcher {
428 active_buffer: EventsBuffer,
429 active_buffer_start: Instant,
430 last_processed_at: Instant,
431 idle_flush_timeout: Duration,
432 bucket_width: Duration,
433 telemetry: Telemetry,
434}
435
436impl PassthroughBatcher {
437 async fn new(idle_flush_timeout: Duration, bucket_width: Duration, telemetry: Telemetry) -> Self {
438 let active_buffer = EventsBuffer::default();
439
440 Self {
441 active_buffer,
442 active_buffer_start: Instant::now(),
443 last_processed_at: Instant::now(),
444 idle_flush_timeout,
445 bucket_width,
446 telemetry,
447 }
448 }
449
450 async fn push_metric(&mut self, metric: Metric, dispatcher: &EventsDispatcher) {
451 let (context, values, metadata) = metric.into_parts();
457 let adjusted_values = counter_values_to_rate(values, self.bucket_width);
458 let metric = Metric::from_parts(context, adjusted_values, metadata);
459
460 if let Some(event) = self.active_buffer.try_push(Event::Metric(metric)) {
464 debug!("Passthrough event buffer was full. Flushing...");
465 self.dispatch_events(dispatcher).await;
466
467 if self.active_buffer.try_push(event).is_some() {
468 error!("Event buffer is full even after dispatching events. Dropping event.");
469 self.telemetry.increment_events_dropped();
470 return;
471 }
472 }
473
474 if self.active_buffer.len() == 1 {
476 self.active_buffer_start = Instant::now();
477 }
478
479 self.telemetry.increment_passthrough_metrics();
480 }
481
482 fn update_last_processed_at(&mut self) {
483 self.last_processed_at = Instant::now();
487 }
488
489 async fn try_flush(&mut self, dispatcher: &EventsDispatcher) {
490 if !self.active_buffer.is_empty() && self.last_processed_at.elapsed() >= self.idle_flush_timeout {
492 debug!("Passthrough processing exceeded idle flush timeout. Flushing...");
493
494 self.dispatch_events(dispatcher).await;
495 }
496 }
497
498 async fn dispatch_events(&mut self, dispatcher: &EventsDispatcher) {
499 if !self.active_buffer.is_empty() {
500 let unaggregated_events = self.active_buffer.len();
501
502 let batch_duration = self.active_buffer_start.elapsed();
504 self.telemetry.record_passthrough_batch_duration(batch_duration);
505
506 self.telemetry.increment_passthrough_flushes();
507
508 let new_active_buffer = EventsBuffer::default();
510 let old_active_buffer = std::mem::replace(&mut self.active_buffer, new_active_buffer);
511
512 match dispatcher.dispatch(old_active_buffer).await {
513 Ok(()) => debug!(unaggregated_events, "Dispatched events."),
514 Err(e) => error!(error = %e, "Failed to flush unaggregated events."),
515 }
516 }
517 }
518}
519
520#[derive(Clone)]
521struct AggregatedMetric {
522 values: MetricValues,
523 metadata: MetricMetadata,
524 last_seen: u64,
525}
526
527struct AggregationState {
528 contexts: HashMap<Context, AggregatedMetric, foldhash::quality::RandomState>,
529 contexts_remove_buf: Vec<Context>,
530 context_limit: usize,
531 bucket_width_secs: u64,
532 counter_expire_secs: Option<NonZeroU64>,
533 last_flush: u64,
534 hist_config: HistogramConfiguration,
535 telemetry: Telemetry,
536 context_limit_breached: bool,
539}
540
541impl AggregationState {
542 fn new(
543 bucket_width: Duration, context_limit: usize, counter_expiration: Option<Duration>,
544 hist_config: HistogramConfiguration, telemetry: Telemetry,
545 ) -> Self {
546 let counter_expire_secs = counter_expiration.map(|d| d.as_secs()).and_then(NonZeroU64::new);
547
548 Self {
549 contexts: HashMap::default(),
550 contexts_remove_buf: Vec::new(),
551 context_limit,
552 bucket_width_secs: bucket_width.as_secs(),
553 counter_expire_secs,
554 last_flush: 0,
555 hist_config,
556 telemetry,
557 context_limit_breached: false,
558 }
559 }
560
561 fn is_empty(&self) -> bool {
562 self.contexts.is_empty()
563 }
564
565 fn insert(&mut self, timestamp: u64, metric: Metric) -> bool {
566 if !self.contexts.contains_key(metric.context()) && self.contexts.len() >= self.context_limit {
568 self.context_limit_breached = true;
569 return false;
570 }
571
572 let (context, mut values, metadata) = metric.into_parts();
573
574 let bucket_ts = align_to_bucket_start(timestamp, self.bucket_width_secs);
579 values.collapse_non_timestamped(bucket_ts);
580
581 trace!(
582 bucket_ts,
583 kind = values.as_str(),
584 "Inserting metric into aggregation state."
585 );
586
587 match self.contexts.entry(context) {
590 Entry::Occupied(mut entry) => {
591 let aggregated = entry.get_mut();
592
593 aggregated.last_seen = timestamp;
595 aggregated.values.merge(values);
596 }
597 Entry::Vacant(entry) => {
598 self.telemetry.increment_contexts(entry.key(), &values);
599
600 entry.insert(AggregatedMetric {
601 values,
602 metadata,
603 last_seen: timestamp,
604 });
605 }
606 }
607
608 true
609 }
610
611 async fn flush(
612 &mut self, current_time: u64, flush_open_buckets: bool, dispatcher: &mut BufferedDispatcher<'_, EventsBuffer>,
613 ) -> Result<(), GenericError> {
614 let bucket_width_secs = self.bucket_width_secs;
615 let counter_expire_secs = self.counter_expire_secs.map(|d| d.get()).unwrap_or(0);
616
617 let split_timestamp = align_to_bucket_start(current_time, bucket_width_secs).saturating_sub(1);
620
621 let mut zero_value_buckets = SmallVec::<[(u64, MetricValues); 4]>::new();
626 if self.last_flush != 0 {
627 let start = align_to_bucket_start(self.last_flush, bucket_width_secs);
628
629 for bucket_start in (start..current_time).step_by(bucket_width_secs as usize) {
630 if is_bucket_closed(current_time, bucket_start, bucket_width_secs, flush_open_buckets) {
631 zero_value_buckets.push((bucket_start, MetricValues::counter((bucket_start, 0.0))));
632 }
633 }
634 }
635
636 debug!(timestamp = current_time, "Flushing buckets.");
638
639 for (context, am) in self.contexts.iter_mut() {
640 let should_expire_if_empty = match &am.values {
649 MetricValues::Counter(..) => {
650 counter_expire_secs != 0 && am.last_seen + counter_expire_secs < current_time
651 }
652 _ => true,
653 };
654
655 if let MetricValues::Counter(..) = &mut am.values {
661 let expires_at = am.last_seen + counter_expire_secs;
662 for (zv_bucket_start, zero_value) in &zero_value_buckets {
663 if expires_at > *zv_bucket_start {
664 am.values.merge(zero_value.clone());
665 } else {
666 break;
669 }
670 }
671 }
672
673 if let Some(closed_bucket_values) = am.values.split_at_timestamp(split_timestamp) {
682 self.telemetry.increment_flushed(&closed_bucket_values);
683
684 transform_and_push_metric(
686 context.clone(),
687 closed_bucket_values,
688 am.metadata.clone(),
689 bucket_width_secs,
690 &self.hist_config,
691 dispatcher,
692 )
693 .await?;
694 }
695
696 if am.values.is_empty() && should_expire_if_empty {
697 self.telemetry.decrement_contexts(context, &am.values);
698 self.contexts_remove_buf.push(context.clone());
699 }
700 }
701
702 let contexts_len_before = self.contexts.len();
704 for context in self.contexts_remove_buf.drain(..) {
705 self.contexts.remove(&context);
706 }
707 let contexts_len_after = self.contexts.len();
708
709 let contexts_delta = contexts_len_before.saturating_sub(contexts_len_after);
710 let target_contexts_capacity = contexts_len_after.saturating_add(contexts_delta / 2);
711 self.contexts.shrink_to(target_contexts_capacity);
712
713 if self.context_limit_breached && self.contexts.len() < self.context_limit {
714 self.context_limit_breached = false;
715 }
716
717 self.last_flush = current_time;
718
719 Ok(())
720 }
721
722 fn context_limit_breached(&self) -> bool {
723 self.context_limit_breached
724 }
725}
726
727async fn transform_and_push_metric(
728 context: Context, mut values: MetricValues, metadata: MetricMetadata, bucket_width_secs: u64,
729 hist_config: &HistogramConfiguration, dispatcher: &mut BufferedDispatcher<'_, EventsBuffer>,
730) -> Result<(), GenericError> {
731 let bucket_width = Duration::from_secs(bucket_width_secs);
732
733 match values {
734 MetricValues::Histogram(ref mut points) => {
737 if hist_config.copy_to_distribution() {
739 let sketch_points = points
740 .into_iter()
741 .map(|(ts, hist)| {
742 let mut sketch = DDSketch::default();
743 for sample in hist.samples() {
744 sketch.insert_n(sample.value.into_inner(), sample.weight);
745 }
746 (ts, sketch)
747 })
748 .collect::<SketchPoints>();
749 let distribution_values = MetricValues::distribution(sketch_points);
750 let metric_context = if !hist_config.copy_to_distribution_prefix().is_empty() {
751 context.with_name(format!(
752 "{}{}",
753 hist_config.copy_to_distribution_prefix(),
754 context.name()
755 ))
756 } else {
757 context.clone()
758 };
759 let new_metric = Metric::from_parts(metric_context, distribution_values, metadata.clone());
760 dispatcher.push(Event::Metric(new_metric)).await?;
761 }
762 let mut sorted_points = Vec::new();
767 for (ts, h) in points {
768 sorted_points.push((ts, h.summary_view()));
769 }
770
771 for statistic in hist_config.statistics() {
772 let new_points = sorted_points
773 .iter()
774 .map(|(ts, hs)| (*ts, statistic.value_from_histogram(hs)))
775 .collect::<ScalarPoints>();
776
777 let new_values = if statistic.is_rate_statistic() {
778 MetricValues::rate(new_points, bucket_width)
779 } else {
780 MetricValues::gauge(new_points)
781 };
782
783 let new_context = context.with_name(format!("{}.{}", context.name(), statistic.suffix()));
784 let new_metric = Metric::from_parts(new_context, new_values, metadata.clone());
785 dispatcher.push(Event::Metric(new_metric)).await?;
786 }
787
788 Ok(())
789 }
790
791 values => {
794 let adjusted_values = counter_values_to_rate(values, bucket_width);
795
796 let metric = Metric::from_parts(context, adjusted_values, metadata);
797 dispatcher.push(Event::Metric(metric)).await
798 }
799 }
800}
801
802fn counter_values_to_rate(values: MetricValues, interval: Duration) -> MetricValues {
803 match values {
804 MetricValues::Counter(points) => MetricValues::rate(points, interval),
805 values => values,
806 }
807}
808
809const fn align_to_bucket_start(timestamp: u64, bucket_width_secs: u64) -> u64 {
810 timestamp - (timestamp % bucket_width_secs)
811}
812
813const fn is_bucket_closed(
814 current_time: u64, bucket_start: u64, bucket_width_secs: u64, flush_open_buckets: bool,
815) -> bool {
816 (bucket_start + bucket_width_secs - 1) < current_time || flush_open_buckets
835}
836
837#[cfg(test)]
841mod tests {
842 use float_cmp::ApproxEqRatio as _;
843 use saluki_core::{
844 components::ComponentContext,
845 topology::{interconnect::Dispatcher, ComponentId, OutputName},
846 };
847 use saluki_metrics::test::TestRecorder;
848 use stringtheory::MetaString;
849 use tokio::sync::mpsc;
850
851 use super::config::HistogramStatistic;
852 use super::*;
853
854 const BUCKET_WIDTH_SECS: u64 = 10;
855 const BUCKET_WIDTH: Duration = Duration::from_secs(BUCKET_WIDTH_SECS);
856 const COUNTER_EXPIRE_SECS: u64 = 20;
857 const COUNTER_EXPIRE: Option<Duration> = Some(Duration::from_secs(COUNTER_EXPIRE_SECS));
858
859 const fn bucket_ts(step: u64) -> u64 {
861 align_to_bucket_start(insert_ts(step), BUCKET_WIDTH_SECS)
862 }
863
864 const fn insert_ts(step: u64) -> u64 {
866 (BUCKET_WIDTH_SECS * (step + 1)) - 2
867 }
868
869 const fn flush_ts(step: u64) -> u64 {
871 BUCKET_WIDTH_SECS * (step + 1)
872 }
873
874 struct DispatcherReceiver {
875 receiver: mpsc::Receiver<EventsBuffer>,
876 }
877
878 impl DispatcherReceiver {
879 fn collect_next(&mut self) -> Vec<Metric> {
880 match self.receiver.try_recv() {
881 Ok(event_buffer) => {
882 let mut metrics = event_buffer
883 .into_iter()
884 .filter_map(|event| event.try_into_metric())
885 .collect::<Vec<Metric>>();
886
887 metrics.sort_by(|a, b| a.context().name().cmp(b.context().name()));
888 metrics
889 }
890 Err(_) => Vec::new(),
891 }
892 }
893 }
894
895 fn build_basic_dispatcher() -> (EventsDispatcher, DispatcherReceiver) {
897 let component_id = ComponentId::try_from("test").expect("should not fail to create component ID");
898 let mut dispatcher = Dispatcher::new(ComponentContext::transform(component_id));
899
900 let (buffer_tx, buffer_rx) = mpsc::channel(1);
901 dispatcher.add_output(OutputName::Default).unwrap();
902 dispatcher
903 .attach_sender_to_output(&OutputName::Default, buffer_tx)
904 .unwrap();
905
906 (dispatcher, DispatcherReceiver { receiver: buffer_rx })
907 }
908
909 async fn get_flushed_metrics(timestamp: u64, state: &mut AggregationState) -> Vec<Metric> {
910 let (dispatcher, mut dispatcher_receiver) = build_basic_dispatcher();
911 let mut buffered_dispatcher = dispatcher.buffered().expect("default output should always exist");
912
913 state
915 .flush(timestamp, true, &mut buffered_dispatcher)
916 .await
917 .expect("should not fail to flush aggregation state");
918
919 buffered_dispatcher
922 .flush()
923 .await
924 .expect("should not fail to flush buffered sender");
925
926 dispatcher_receiver.collect_next()
927 }
928
929 macro_rules! compare_points {
930 (scalar, $expected:expr, $actual:expr, $error_ratio:literal) => {
931 for (idx, (expected_value, actual_value)) in $expected.into_iter().zip($actual.into_iter()).enumerate() {
932 let (expected_ts, expected_point) = expected_value;
933 let (actual_ts, actual_point) = actual_value;
934
935 assert_eq!(
936 expected_ts, actual_ts,
937 "timestamp for value #{} does not match: {:?} (expected) vs {:?} (actual)",
938 idx, expected_ts, actual_ts
939 );
940 assert!(
941 expected_point.approx_eq_ratio(&actual_point, $error_ratio),
942 "point for value #{} does not match: {} (expected) vs {} (actual)",
943 idx,
944 expected_point,
945 actual_point
946 );
947 }
948 };
949 (distribution, $expected:expr, $actual:expr) => {
950 for (idx, (expected_value, actual_value)) in $expected.into_iter().zip($actual.into_iter()).enumerate() {
951 let (expected_ts, expected_sketch) = expected_value;
952 let (actual_ts, actual_sketch) = actual_value;
953
954 assert_eq!(
955 expected_ts, actual_ts,
956 "timestamp for value #{} does not match: {:?} (expected) vs {:?} (actual)",
957 idx, expected_ts, actual_ts
958 );
959 assert_eq!(
960 expected_sketch, actual_sketch,
961 "sketch for value #{} does not match: {:?} (expected) vs {:?} (actual)",
962 idx, expected_sketch, actual_sketch
963 );
964 }
965 };
966 }
967
968 macro_rules! assert_flushed_scalar_metric {
969 ($original:expr, $actual:expr, [$($ts:expr => $value:expr),+]) => {
970 assert_flushed_scalar_metric!($original, $actual, [$($ts => $value),+], error_ratio => 0.000001);
971 };
972 ($original:expr, $actual:expr, [$($ts:expr => $value:expr),+], error_ratio => $error_ratio:literal) => {
973 let actual_metric = $actual;
974
975 assert_eq!($original.context(), actual_metric.context(), "expected context ({}) and actual context ({}) do not match", $original.context(), actual_metric.context());
976
977 let expected_points = ScalarPoints::from([$(($ts, $value)),+]);
978
979 match actual_metric.values() {
980 MetricValues::Counter(ref actual_points) | MetricValues::Gauge(ref actual_points) | MetricValues::Rate(ref actual_points, _) => {
981 assert_eq!(expected_points.len(), actual_points.len(), "expected and actual values have different number of points");
982 compare_points!(scalar, expected_points, actual_points, $error_ratio);
983 },
984 _ => panic!("only counters, rates, and gauges are supported in assert_flushed_scalar_metric"),
985 }
986 };
987 }
988
989 macro_rules! assert_flushed_distribution_metric {
990 ($original:expr, $actual:expr, [$($ts:expr => $value:expr),+]) => {
991 assert_flushed_distribution_metric!($original, $actual, [$($ts => $value),+], error_ratio => 0.000001);
992 };
993 ($original:expr, $actual:expr, [$($ts:expr => $value:expr),+], error_ratio => $error_ratio:literal) => {
994 let actual_metric = $actual;
995
996 assert_eq!($original.context(), actual_metric.context());
997
998 match actual_metric.values() {
999 MetricValues::Distribution(ref actual_points) => {
1000 let expected_points = SketchPoints::from([$(($ts, $value)),+]);
1001 assert_eq!(expected_points.len(), actual_points.len(), "expected and actual values have different number of points");
1002
1003 compare_points!(distribution, &expected_points, actual_points);
1004 },
1005 _ => panic!("only distributions are supported in assert_flushed_distribution_metric"),
1006 }
1007 };
1008 }
1009
1010 #[test]
1011 fn bucket_is_closed() {
1012 let cases = [
1015 (1000, 995, 10, false, false),
1017 (1000, 995, 10, true, true),
1018 (1000, 1000, 10, false, false),
1020 (1000, 1000, 10, true, true),
1021 (1010, 1000, 10, false, true),
1023 (1010, 1000, 10, true, true),
1024 ];
1025
1026 for (current_time, bucket_start, bucket_width_secs, flush_open_buckets, expected) in cases {
1027 let expected_reason = if expected {
1028 "closed, was open"
1029 } else {
1030 "open, was closed"
1031 };
1032
1033 assert_eq!(
1034 is_bucket_closed(current_time, bucket_start, bucket_width_secs, flush_open_buckets),
1035 expected,
1036 "expected bucket to be {} (current_time={}, bucket_start={}, bucket_width={}, flush_open_buckets={})",
1037 expected_reason,
1038 current_time,
1039 bucket_start,
1040 bucket_width_secs,
1041 flush_open_buckets
1042 );
1043 }
1044 }
1045
1046 #[tokio::test]
1047 async fn context_limit() {
1048 let mut state = AggregationState::new(
1050 BUCKET_WIDTH,
1051 2,
1052 COUNTER_EXPIRE,
1053 HistogramConfiguration::default(),
1054 Telemetry::noop(),
1055 );
1056
1057 let input_metrics = [
1060 Metric::gauge("metric1", 1.0),
1061 Metric::gauge("metric2", 2.0),
1062 Metric::gauge("metric3", 3.0),
1063 Metric::gauge("metric4", 4.0),
1064 ];
1065
1066 assert!(!state.context_limit_breached());
1067
1068 assert!(state.insert(insert_ts(1), input_metrics[0].clone()));
1069 assert!(state.insert(insert_ts(1), input_metrics[1].clone()));
1070 assert!(!state.context_limit_breached());
1071
1072 assert!(!state.insert(insert_ts(1), input_metrics[2].clone()));
1073 assert!(state.context_limit_breached());
1074 assert!(!state.insert(insert_ts(1), input_metrics[3].clone()));
1075 assert!(state.context_limit_breached());
1076
1077 let flushed_metrics = get_flushed_metrics(flush_ts(1), &mut state).await;
1080 assert_eq!(flushed_metrics.len(), 2);
1081 assert_eq!(input_metrics[0].context(), flushed_metrics[0].context());
1082 assert_eq!(input_metrics[1].context(), flushed_metrics[1].context());
1083 assert!(!state.context_limit_breached());
1084
1085 assert!(state.insert(insert_ts(2), input_metrics[2].clone()));
1088 assert!(state.insert(insert_ts(2), input_metrics[3].clone()));
1089
1090 let flushed_metrics = get_flushed_metrics(flush_ts(2), &mut state).await;
1091 assert_eq!(flushed_metrics.len(), 2);
1092 assert_eq!(input_metrics[2].context(), flushed_metrics[0].context());
1093 assert_eq!(input_metrics[3].context(), flushed_metrics[1].context());
1094 }
1095
1096 #[tokio::test]
1097 async fn context_limit_with_zero_value_counters() {
1098 let mut state = AggregationState::new(
1100 BUCKET_WIDTH,
1101 2,
1102 COUNTER_EXPIRE,
1103 HistogramConfiguration::default(),
1104 Telemetry::noop(),
1105 );
1106
1107 let input_metrics = [
1109 Metric::counter("metric1", 1.0),
1110 Metric::counter("metric2", 2.0),
1111 Metric::counter("metric3", 3.0),
1112 ];
1113
1114 assert!(state.insert(insert_ts(1), input_metrics[0].clone()));
1115 assert!(state.insert(insert_ts(1), input_metrics[1].clone()));
1116
1117 let flushed_metrics = get_flushed_metrics(flush_ts(1), &mut state).await;
1119 assert_eq!(flushed_metrics.len(), 2);
1120 assert_flushed_scalar_metric!(&input_metrics[0], &flushed_metrics[0], [bucket_ts(1) => 1.0]);
1121 assert_flushed_scalar_metric!(&input_metrics[1], &flushed_metrics[1], [bucket_ts(1) => 2.0]);
1122
1123 let flushed_metrics = get_flushed_metrics(flush_ts(2), &mut state).await;
1125 assert_eq!(flushed_metrics.len(), 2);
1126 assert_flushed_scalar_metric!(&input_metrics[0], &flushed_metrics[0], [bucket_ts(2) => 0.0]);
1127 assert_flushed_scalar_metric!(&input_metrics[1], &flushed_metrics[1], [bucket_ts(2) => 0.0]);
1128
1129 assert!(!state.insert(insert_ts(3), input_metrics[2].clone()));
1131
1132 let flushed_metrics = get_flushed_metrics(flush_ts(3), &mut state).await;
1134 assert_eq!(flushed_metrics.len(), 2);
1135 assert_flushed_scalar_metric!(&input_metrics[0], &flushed_metrics[0], [bucket_ts(3) => 0.0]);
1136 assert_flushed_scalar_metric!(&input_metrics[1], &flushed_metrics[1], [bucket_ts(3) => 0.0]);
1137
1138 let flushed_metrics = get_flushed_metrics(flush_ts(4), &mut state).await;
1141 assert_eq!(flushed_metrics.len(), 0);
1142
1143 assert!(state.insert(insert_ts(5), input_metrics[2].clone()));
1145
1146 let flushed_metrics = get_flushed_metrics(flush_ts(5), &mut state).await;
1147 assert_eq!(flushed_metrics.len(), 1);
1148 assert_flushed_scalar_metric!(&input_metrics[2], &flushed_metrics[0], [bucket_ts(5) => 3.0]);
1149 }
1150
1151 #[tokio::test]
1152 async fn zero_value_counters() {
1153 let mut state = AggregationState::new(
1155 BUCKET_WIDTH,
1156 10,
1157 COUNTER_EXPIRE,
1158 HistogramConfiguration::default(),
1159 Telemetry::noop(),
1160 );
1161
1162 let input_metrics = [Metric::counter("metric1", 1.0), Metric::counter("metric2", 2.0)];
1164
1165 assert!(state.insert(insert_ts(1), input_metrics[0].clone()));
1166 assert!(state.insert(insert_ts(1), input_metrics[1].clone()));
1167
1168 let flushed_metrics = get_flushed_metrics(flush_ts(1), &mut state).await;
1170 assert_eq!(flushed_metrics.len(), 2);
1171 assert_flushed_scalar_metric!(&input_metrics[0], &flushed_metrics[0], [bucket_ts(1) => 1.0]);
1172 assert_flushed_scalar_metric!(&input_metrics[1], &flushed_metrics[1], [bucket_ts(1) => 2.0]);
1173
1174 let flushed_metrics = get_flushed_metrics(flush_ts(2), &mut state).await;
1176 assert_eq!(flushed_metrics.len(), 2);
1177 assert_flushed_scalar_metric!(&input_metrics[0], &flushed_metrics[0], [bucket_ts(2) => 0.0]);
1178 assert_flushed_scalar_metric!(&input_metrics[1], &flushed_metrics[1], [bucket_ts(2) => 0.0]);
1179
1180 assert!(state.insert(insert_ts(4), input_metrics[0].clone()));
1182 assert!(state.insert(insert_ts(4), input_metrics[1].clone()));
1183
1184 let flushed_metrics = get_flushed_metrics(flush_ts(4), &mut state).await;
1187 assert_eq!(flushed_metrics.len(), 2);
1188 assert_flushed_scalar_metric!(&input_metrics[0], &flushed_metrics[0], [bucket_ts(3) => 0.0, bucket_ts(4) => 1.0]);
1189 assert_flushed_scalar_metric!(&input_metrics[1], &flushed_metrics[1], [bucket_ts(3) => 0.0, bucket_ts(4) => 2.0]);
1190
1191 let flushed_metrics = get_flushed_metrics(flush_ts(7), &mut state).await;
1195 assert_eq!(flushed_metrics.len(), 2);
1196 assert_flushed_scalar_metric!(&input_metrics[0], &flushed_metrics[0], [bucket_ts(5) => 0.0, bucket_ts(6) => 0.0]);
1197 assert_flushed_scalar_metric!(&input_metrics[1], &flushed_metrics[1], [bucket_ts(5) => 0.0, bucket_ts(6) => 0.0]);
1198 }
1199
1200 #[tokio::test]
1201 async fn merge_identical_timestamped_values_on_flush() {
1202 let mut state = AggregationState::new(
1204 BUCKET_WIDTH,
1205 10,
1206 COUNTER_EXPIRE,
1207 HistogramConfiguration::default(),
1208 Telemetry::noop(),
1209 );
1210
1211 let input_metric = Metric::counter("metric1", [1.0, 2.0, 3.0, 4.0, 5.0]);
1213
1214 assert!(state.insert(insert_ts(1), input_metric.clone()));
1215
1216 let flushed_metrics = get_flushed_metrics(flush_ts(1), &mut state).await;
1219 assert_eq!(flushed_metrics.len(), 1);
1220 assert_flushed_scalar_metric!(&input_metric, &flushed_metrics[0], [bucket_ts(1) => 15.0]);
1221 }
1222
1223 #[tokio::test]
1224 async fn histogram_statistics() {
1225 let hist_config = HistogramConfiguration::from_statistics(
1227 &[
1228 HistogramStatistic::Count,
1229 HistogramStatistic::Sum,
1230 HistogramStatistic::Percentile {
1231 q: 0.5,
1232 suffix: "p50".into(),
1233 },
1234 ],
1235 false,
1236 "".into(),
1237 );
1238 let mut state = AggregationState::new(BUCKET_WIDTH, 10, COUNTER_EXPIRE, hist_config, Telemetry::noop());
1239
1240 let input_metric = Metric::histogram("metric1", [1.0, 2.0, 3.0, 4.0, 5.0]);
1242 assert!(state.insert(insert_ts(1), input_metric.clone()));
1243
1244 let flushed_metrics = get_flushed_metrics(flush_ts(1), &mut state).await;
1247 assert_eq!(flushed_metrics.len(), 3);
1248
1249 let count_metric = Metric::rate("metric1.count", 0.0, Duration::from_secs(BUCKET_WIDTH_SECS));
1252 let sum_metric = Metric::gauge("metric1.sum", 0.0);
1253 let p50_metric = Metric::gauge("metric1.p50", 0.0);
1254
1255 assert_flushed_scalar_metric!(count_metric, &flushed_metrics[0], [bucket_ts(1) => 5.0]);
1258 assert_flushed_scalar_metric!(p50_metric, &flushed_metrics[1], [bucket_ts(1) => 3.0], error_ratio => 0.0025);
1259 assert_flushed_scalar_metric!(sum_metric, &flushed_metrics[2], [bucket_ts(1) => 15.0]);
1260 }
1261
1262 #[tokio::test]
1263 async fn histogram_statistics_unit_propagation() {
1264 let hist_config = HistogramConfiguration::from_statistics(
1266 &[
1267 HistogramStatistic::Count,
1268 HistogramStatistic::Sum,
1269 HistogramStatistic::Percentile {
1270 q: 0.5,
1271 suffix: "p50".into(),
1272 },
1273 ],
1274 false,
1275 "".into(),
1276 );
1277 let mut state = AggregationState::new(BUCKET_WIDTH, 10, COUNTER_EXPIRE, hist_config, Telemetry::noop());
1278
1279 let context = Context::from_static_parts("metric1", &[]);
1281 let metadata = MetricMetadata::default().with_unit(MetaString::from_static("millisecond"));
1282 let input_metric = Metric::from_parts(
1283 context,
1284 MetricValues::histogram([1.0_f64, 2.0, 3.0, 4.0, 5.0]),
1285 metadata,
1286 );
1287 assert!(state.insert(insert_ts(1), input_metric));
1288
1289 let flushed_metrics = get_flushed_metrics(flush_ts(1), &mut state).await;
1290 assert_eq!(flushed_metrics.len(), 3);
1291
1292 for metric in &flushed_metrics {
1294 assert_eq!(
1295 metric.metadata().unit(),
1296 Some("millisecond"),
1297 "flushed metric '{}' should carry unit='millisecond'",
1298 metric.context().name()
1299 );
1300 }
1301 }
1302
1303 #[tokio::test]
1304 async fn distributions() {
1305 let mut state = AggregationState::new(
1307 BUCKET_WIDTH,
1308 10,
1309 COUNTER_EXPIRE,
1310 HistogramConfiguration::default(),
1311 Telemetry::noop(),
1312 );
1313
1314 let values = [1.0, 2.0, 3.0, 4.0, 5.0];
1316 let input_metric = Metric::distribution("metric1", &values[..]);
1317
1318 assert!(state.insert(insert_ts(1), input_metric.clone()));
1319
1320 let flushed_metrics = get_flushed_metrics(flush_ts(1), &mut state).await;
1322 assert_eq!(flushed_metrics.len(), 1);
1323
1324 assert_flushed_distribution_metric!(&input_metric, &flushed_metrics[0], [bucket_ts(1) => &values[..]]);
1325 }
1326
1327 #[tokio::test]
1328 async fn histogram_copy_to_distribution() {
1329 let hist_config = HistogramConfiguration::from_statistics(
1330 &[
1331 HistogramStatistic::Count,
1332 HistogramStatistic::Sum,
1333 HistogramStatistic::Percentile {
1334 q: 0.5,
1335 suffix: "p50".into(),
1336 },
1337 ],
1338 true,
1339 "dist_prefix.".into(),
1340 );
1341 let mut state = AggregationState::new(BUCKET_WIDTH, 10, COUNTER_EXPIRE, hist_config, Telemetry::noop());
1342
1343 let values = [1.0, 2.0, 3.0, 4.0, 5.0];
1345 let input_metric = Metric::histogram("metric1", values);
1346 assert!(state.insert(insert_ts(1), input_metric.clone()));
1347
1348 let flushed_metrics = get_flushed_metrics(flush_ts(1), &mut state).await;
1351 assert_eq!(flushed_metrics.len(), 4);
1352
1353 let count_metric = Metric::rate("metric1.count", 0.0, Duration::from_secs(BUCKET_WIDTH_SECS));
1356 let sum_metric = Metric::gauge("metric1.sum", 0.0);
1357 let p50_metric = Metric::gauge("metric1.p50", 0.0);
1358 let expected_distribution = Metric::distribution("dist_prefix.metric1", &values[..]);
1359
1360 assert_flushed_distribution_metric!(expected_distribution, &flushed_metrics[0], [bucket_ts(1) => &values[..]]);
1363 assert_flushed_scalar_metric!(count_metric, &flushed_metrics[1], [bucket_ts(1) => 5.0]);
1364 assert_flushed_scalar_metric!(p50_metric, &flushed_metrics[2], [bucket_ts(1) => 3.0], error_ratio => 0.0025);
1365 assert_flushed_scalar_metric!(sum_metric, &flushed_metrics[3], [bucket_ts(1) => 15.0]);
1366 }
1367
1368 #[tokio::test]
1369 async fn nonaggregated_counters_to_rate() {
1370 let counter_value = 42.0;
1371 let bucket_width = BUCKET_WIDTH;
1372
1373 let mut state = AggregationState::new(
1375 bucket_width,
1376 10,
1377 COUNTER_EXPIRE,
1378 HistogramConfiguration::default(),
1379 Telemetry::noop(),
1380 );
1381
1382 let input_metric = Metric::counter("metric1", counter_value);
1384 assert!(state.insert(insert_ts(1), input_metric.clone()));
1385
1386 let flushed_metrics = get_flushed_metrics(flush_ts(1), &mut state).await;
1389 assert_eq!(flushed_metrics.len(), 1);
1390 let flushed_metric = &flushed_metrics[0];
1391
1392 assert_flushed_scalar_metric!(&input_metric, flushed_metric, [bucket_ts(1) => counter_value]);
1393 assert_eq!(flushed_metric.values().as_str(), "rate");
1394 }
1395
1396 #[tokio::test]
1397 async fn preaggregated_counters_to_rate() {
1398 let counter_value = 42.0;
1399 let timestamp = 123456;
1400 let bucket_width = BUCKET_WIDTH;
1401
1402 let mut batcher = PassthroughBatcher::new(Duration::from_nanos(1), bucket_width, Telemetry::noop()).await;
1404 let (dispatcher, mut dispatcher_receiver) = build_basic_dispatcher();
1405
1406 let input_metric = Metric::counter("metric1", (timestamp, counter_value));
1408 batcher.push_metric(input_metric.clone(), &dispatcher).await;
1409
1410 batcher.try_flush(&dispatcher).await;
1413
1414 let mut flushed_metrics = dispatcher_receiver.collect_next();
1415 assert_eq!(flushed_metrics.len(), 1);
1416 assert_eq!(
1417 Metric::rate("metric1", (timestamp, counter_value), bucket_width),
1418 flushed_metrics.remove(0)
1419 );
1420 }
1421
1422 #[tokio::test]
1423 async fn telemetry() {
1424 let recorder = TestRecorder::default();
1431 let _local = metrics::set_default_local_recorder(&recorder);
1432
1433 let builder = MetricsBuilder::default();
1434 let telemetry = Telemetry::new(&builder);
1435
1436 let mut state = AggregationState::new(
1437 BUCKET_WIDTH,
1438 2,
1439 COUNTER_EXPIRE,
1440 HistogramConfiguration::default(),
1441 telemetry,
1442 );
1443
1444 assert_eq!(recorder.gauge("aggregate_active_contexts"), Some(0.0));
1446 assert_eq!(recorder.counter("aggregate_passthrough_metrics_total"), Some(0));
1447 assert_eq!(
1448 recorder.counter(("component_events_dropped_total", &[("intentional", "true")])),
1449 Some(0)
1450 );
1451 for metric_type in &["counter", "gauge", "rate", "set", "histogram", "distribution"] {
1452 assert_eq!(
1453 recorder.gauge(("aggregate_active_contexts_by_type", &[("metric_type", *metric_type)])),
1454 Some(0.0)
1455 );
1456 }
1457
1458 assert!(state.insert(insert_ts(1), Metric::counter("metric1", 42.0)));
1460 assert_eq!(recorder.gauge("aggregate_active_contexts"), Some(1.0));
1461 assert_eq!(
1462 recorder.gauge(("aggregate_active_contexts_by_type", &[("metric_type", "counter")])),
1463 Some(1.0)
1464 );
1465 assert_eq!(recorder.counter("aggregate_passthrough_metrics_total"), Some(0));
1466
1467 assert!(state.insert(insert_ts(1), Metric::gauge("metric2", (insert_ts(1), 42.0))));
1469 assert_eq!(recorder.gauge("aggregate_active_contexts"), Some(2.0));
1470 assert_eq!(
1471 recorder.gauge(("aggregate_active_contexts_by_type", &[("metric_type", "gauge")])),
1472 Some(1.0)
1473 );
1474
1475 assert!(!state.insert(insert_ts(1), Metric::counter("metric3", 42.0)));
1477 assert_eq!(recorder.gauge("aggregate_active_contexts"), Some(2.0));
1478 assert_eq!(
1479 recorder.gauge(("aggregate_active_contexts_by_type", &[("metric_type", "counter")])),
1480 Some(1.0)
1481 );
1482
1483 let _ = get_flushed_metrics(flush_ts(1), &mut state).await;
1486 assert_eq!(recorder.gauge("aggregate_active_contexts"), Some(1.0));
1487 assert_eq!(
1488 recorder.gauge(("aggregate_active_contexts_by_type", &[("metric_type", "counter")])),
1489 Some(1.0)
1490 );
1491 assert_eq!(
1492 recorder.gauge(("aggregate_active_contexts_by_type", &[("metric_type", "gauge")])),
1493 Some(0.0)
1494 );
1495 }
1496}
1497
1498#[cfg(test)]
1499mod config_smoke {
1500 use serde_json::json;
1501
1502 use super::AggregateConfiguration;
1503 use crate::config_registry::structs;
1504 use crate::config_registry::test_support::run_config_smoke_tests;
1505
1506 #[tokio::test]
1507 async fn smoke_test() {
1508 run_config_smoke_tests(
1511 structs::AGGREGATE_CONFIGURATION,
1512 &[
1513 "aggregate_flush_interval.nanos",
1514 "aggregate_passthrough_idle_flush_timeout.nanos",
1515 "aggregate_window_duration.nanos",
1516 ],
1517 json!({}),
1518 |cfg| {
1519 cfg.as_typed::<AggregateConfiguration>()
1520 .expect("AggregateConfiguration should deserialize")
1521 },
1522 )
1523 .await
1524 }
1525}