1use std::{
2 num::NonZeroU64,
3 time::{Duration, Instant},
4};
5
6use async_trait::async_trait;
7use ddsketch_agent::DDSketch;
8use hashbrown::{hash_map::Entry, HashMap};
9use memory_accounting::{MemoryBounds, MemoryBoundsBuilder, UsageExpr};
10use saluki_common::time::get_unix_timestamp;
11use saluki_config::GenericConfiguration;
12use saluki_context::Context;
13use saluki_core::{
14 components::{transforms::*, ComponentContext},
15 data_model::event::{metric::*, Event, EventType},
16 observability::ComponentMetricsExt as _,
17 topology::{interconnect::BufferedDispatcher, OutputDefinition},
18 topology::{EventsBuffer, EventsDispatcher},
19};
20use saluki_error::GenericError;
21use saluki_metrics::MetricsBuilder;
22use serde::Deserialize;
23use smallvec::SmallVec;
24use tokio::{
25 select,
26 time::{interval, interval_at},
27};
28use tracing::{debug, error, trace};
29
30mod telemetry;
31use self::telemetry::Telemetry;
32
33mod config;
34use self::config::HistogramConfiguration;
35
36const PASSTHROUGH_IDLE_FLUSH_CHECK_INTERVAL: Duration = Duration::from_secs(2);
37
38const fn default_window_duration() -> Duration {
39 Duration::from_secs(10)
40}
41
42const fn default_primary_flush_interval() -> Duration {
43 Duration::from_secs(15)
44}
45
46const fn default_context_limit() -> usize {
47 5000
48}
49
50const fn default_counter_expiry_seconds() -> Option<u64> {
51 Some(300)
52}
53
54const fn default_passthrough_timestamped_metrics() -> bool {
55 true
56}
57
58const fn default_passthrough_idle_flush_timeout() -> Duration {
59 Duration::from_secs(1)
60}
61
62#[derive(Deserialize)]
82pub struct AggregateConfiguration {
83 #[serde(rename = "aggregate_window_duration", default = "default_window_duration")]
91 window_duration: Duration,
92
93 #[serde(rename = "aggregate_flush_interval", default = "default_primary_flush_interval")]
100 primary_flush_interval: Duration,
101
102 #[serde(rename = "aggregate_context_limit", default = "default_context_limit")]
113 context_limit: usize,
114
115 #[serde(rename = "aggregate_flush_open_windows", default)]
127 flush_open_windows: bool,
128
129 #[serde(alias = "dogstatsd_expiry_seconds", default = "default_counter_expiry_seconds")]
142 counter_expiry_seconds: Option<u64>,
143
144 #[serde(
153 rename = "dogstatsd_no_aggregation_pipeline",
154 default = "default_passthrough_timestamped_metrics"
155 )]
156 passthrough_timestamped_metrics: bool,
157
158 #[serde(
166 rename = "aggregate_passthrough_idle_flush_timeout",
167 default = "default_passthrough_idle_flush_timeout"
168 )]
169 passthrough_idle_flush_timeout: Duration,
170
171 #[serde(flatten)]
176 hist_config: HistogramConfiguration,
177}
178
179impl AggregateConfiguration {
180 pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
182 Ok(config.as_typed()?)
183 }
184
185 pub fn with_defaults() -> Self {
187 Self {
188 window_duration: default_window_duration(),
189 primary_flush_interval: default_primary_flush_interval(),
190 context_limit: default_context_limit(),
191 flush_open_windows: false,
192 counter_expiry_seconds: default_counter_expiry_seconds(),
193 passthrough_timestamped_metrics: default_passthrough_timestamped_metrics(),
194 passthrough_idle_flush_timeout: default_passthrough_idle_flush_timeout(),
195 hist_config: HistogramConfiguration::default(),
196 }
197 }
198}
199
200#[async_trait]
201impl TransformBuilder for AggregateConfiguration {
202 async fn build(&self, context: ComponentContext) -> Result<Box<dyn Transform + Send>, GenericError> {
203 let metrics_builder = MetricsBuilder::from_component_context(&context);
204 let telemetry = Telemetry::new(&metrics_builder);
205
206 let state = AggregationState::new(
207 self.window_duration,
208 self.context_limit,
209 self.counter_expiry_seconds.filter(|s| *s != 0).map(Duration::from_secs),
210 self.hist_config.clone(),
211 telemetry.clone(),
212 );
213
214 let passthrough_batcher = PassthroughBatcher::new(
215 self.passthrough_idle_flush_timeout,
216 self.window_duration,
217 telemetry.clone(),
218 )
219 .await;
220
221 Ok(Box::new(Aggregate {
222 state,
223 telemetry,
224 primary_flush_interval: self.primary_flush_interval,
225 flush_open_windows: self.flush_open_windows,
226 passthrough_batcher,
227 passthrough_timestamped_metrics: self.passthrough_timestamped_metrics,
228 }))
229 }
230
231 fn input_event_type(&self) -> EventType {
232 EventType::Metric
233 }
234
235 fn outputs(&self) -> &[OutputDefinition] {
236 static OUTPUTS: &[OutputDefinition] = &[OutputDefinition::default_output(EventType::Metric)];
237
238 OUTPUTS
239 }
240}
241
242impl MemoryBounds for AggregateConfiguration {
243 fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
244 builder
253 .minimum()
254 .with_single_value::<Aggregate>("component struct");
256 builder
257 .firm()
258 .with_expr(UsageExpr::product(
260 "aggregation state map",
261 UsageExpr::sum(
262 "context map entry",
263 UsageExpr::struct_size::<Context>("context"),
264 UsageExpr::struct_size::<AggregatedMetric>("aggregated metric"),
265 ),
266 UsageExpr::config("aggregate_context_limit", self.context_limit),
267 ));
268 }
269}
270
271pub struct Aggregate {
272 state: AggregationState,
273 telemetry: Telemetry,
274 primary_flush_interval: Duration,
275 flush_open_windows: bool,
276 passthrough_batcher: PassthroughBatcher,
277 passthrough_timestamped_metrics: bool,
278}
279
280#[async_trait]
281impl Transform for Aggregate {
282 async fn run(mut self: Box<Self>, mut context: TransformContext) -> Result<(), GenericError> {
283 let mut health = context.take_health_handle();
284
285 let mut primary_flush = interval_at(
286 tokio::time::Instant::now() + self.primary_flush_interval,
287 self.primary_flush_interval,
288 );
289 let mut final_primary_flush = false;
290
291 let passthrough_flush = interval(PASSTHROUGH_IDLE_FLUSH_CHECK_INTERVAL);
292
293 health.mark_ready();
294 debug!("Aggregation transform started.");
295
296 tokio::pin!(passthrough_flush);
297
298 loop {
299 select! {
300 _ = health.live() => continue,
301 _ = primary_flush.tick() => {
302 if !self.state.is_empty() {
306 debug!("Flushing aggregated metrics...");
307
308 let should_flush_open_windows = final_primary_flush && self.flush_open_windows;
309
310 let mut dispatcher = context.dispatcher().buffered().expect("default output should always exist");
311 if let Err(e) = self.state.flush(get_unix_timestamp(), should_flush_open_windows, &mut dispatcher).await {
312 error!(error = %e, "Failed to flush aggregation state.");
313 }
314
315 self.telemetry.increment_flushes();
316
317 match dispatcher.flush().await {
318 Ok(aggregated_events) => debug!(aggregated_events, "Dispatched events."),
319 Err(e) => error!(error = %e, "Failed to flush aggregated events."),
320 }
321 }
322
323 if final_primary_flush {
325 debug!("All aggregation complete.");
326 break
327 }
328 },
329 _ = passthrough_flush.tick() => self.passthrough_batcher.try_flush(context.dispatcher()).await,
330 maybe_events = context.events().next(), if !final_primary_flush => match maybe_events {
331 Some(events) => {
332 trace!(events_len = events.len(), "Received events.");
333
334 let current_time = get_unix_timestamp();
335 let mut processed_passthrough_metrics = false;
336
337 for event in events {
338 if let Some(metric) = event.try_into_metric() {
339 let metric = if self.passthrough_timestamped_metrics {
340 let (maybe_timestamped_metric, maybe_nontimestamped_metric) = try_split_timestamped_values(metric);
344
345 if let Some(timestamped_metric) = maybe_timestamped_metric {
347 self.passthrough_batcher.push_metric(timestamped_metric, context.dispatcher()).await;
348 processed_passthrough_metrics = true;
349 }
350
351 match maybe_nontimestamped_metric {
355 Some(metric) => metric,
356 None => continue,
357 }
358 } else {
359 metric
360 };
361
362 if !self.state.insert(current_time, metric) {
363 trace!("Dropping metric due to context limit.");
364 self.telemetry.increment_events_dropped();
365 }
366 }
367 }
368
369 if processed_passthrough_metrics {
370 self.passthrough_batcher.update_last_processed_at();
371 }
372 },
373 None => {
374 final_primary_flush = true;
377 primary_flush.reset_immediately();
378
379 debug!("Aggregation transform stopping...");
380 }
381 },
382 }
383 }
384
385 self.passthrough_batcher.try_flush(context.dispatcher()).await;
387
388 debug!("Aggregation transform stopped.");
389
390 Ok(())
391 }
392}
393
394fn try_split_timestamped_values(mut metric: Metric) -> (Option<Metric>, Option<Metric>) {
395 if metric.values().all_timestamped() {
396 (Some(metric), None)
397 } else if metric.values().any_timestamped() {
398 let new_metric_values = metric.values_mut().split_timestamped();
400 let new_metric = Metric::from_parts(metric.context().clone(), new_metric_values, metric.metadata().clone());
401
402 (Some(new_metric), Some(metric))
403 } else {
404 (None, Some(metric))
406 }
407}
408
409struct PassthroughBatcher {
410 active_buffer: EventsBuffer,
411 active_buffer_start: Instant,
412 last_processed_at: Instant,
413 idle_flush_timeout: Duration,
414 bucket_width: Duration,
415 telemetry: Telemetry,
416}
417
418impl PassthroughBatcher {
419 async fn new(idle_flush_timeout: Duration, bucket_width: Duration, telemetry: Telemetry) -> Self {
420 let active_buffer = EventsBuffer::default();
421
422 Self {
423 active_buffer,
424 active_buffer_start: Instant::now(),
425 last_processed_at: Instant::now(),
426 idle_flush_timeout,
427 bucket_width,
428 telemetry,
429 }
430 }
431
432 async fn push_metric(&mut self, metric: Metric, dispatcher: &EventsDispatcher) {
433 let (context, values, metadata) = metric.into_parts();
439 let adjusted_values = counter_values_to_rate(values, self.bucket_width);
440 let metric = Metric::from_parts(context, adjusted_values, metadata);
441
442 if let Some(event) = self.active_buffer.try_push(Event::Metric(metric)) {
446 debug!("Passthrough event buffer was full. Flushing...");
447 self.dispatch_events(dispatcher).await;
448
449 if self.active_buffer.try_push(event).is_some() {
450 error!("Event buffer is full even after dispatching events. Dropping event.");
451 self.telemetry.increment_events_dropped();
452 return;
453 }
454 }
455
456 if self.active_buffer.len() == 1 {
458 self.active_buffer_start = Instant::now();
459 }
460
461 self.telemetry.increment_passthrough_metrics();
462 }
463
464 fn update_last_processed_at(&mut self) {
465 self.last_processed_at = Instant::now();
469 }
470
471 async fn try_flush(&mut self, dispatcher: &EventsDispatcher) {
472 if !self.active_buffer.is_empty() && self.last_processed_at.elapsed() >= self.idle_flush_timeout {
474 debug!("Passthrough processing exceeded idle flush timeout. Flushing...");
475
476 self.dispatch_events(dispatcher).await;
477 }
478 }
479
480 async fn dispatch_events(&mut self, dispatcher: &EventsDispatcher) {
481 if !self.active_buffer.is_empty() {
482 let unaggregated_events = self.active_buffer.len();
483
484 let batch_duration = self.active_buffer_start.elapsed();
486 self.telemetry.record_passthrough_batch_duration(batch_duration);
487
488 self.telemetry.increment_passthrough_flushes();
489
490 let new_active_buffer = EventsBuffer::default();
492 let old_active_buffer = std::mem::replace(&mut self.active_buffer, new_active_buffer);
493
494 match dispatcher.dispatch(old_active_buffer).await {
495 Ok(()) => debug!(unaggregated_events, "Dispatched events."),
496 Err(e) => error!(error = %e, "Failed to flush unaggregated events."),
497 }
498 }
499 }
500}
501
502#[derive(Clone)]
503struct AggregatedMetric {
504 values: MetricValues,
505 metadata: MetricMetadata,
506 last_seen: u64,
507}
508
509struct AggregationState {
510 contexts: HashMap<Context, AggregatedMetric, foldhash::quality::RandomState>,
511 contexts_remove_buf: Vec<Context>,
512 context_limit: usize,
513 bucket_width_secs: u64,
514 counter_expire_secs: Option<NonZeroU64>,
515 last_flush: u64,
516 hist_config: HistogramConfiguration,
517 telemetry: Telemetry,
518}
519
520impl AggregationState {
521 fn new(
522 bucket_width: Duration, context_limit: usize, counter_expiration: Option<Duration>,
523 hist_config: HistogramConfiguration, telemetry: Telemetry,
524 ) -> Self {
525 let counter_expire_secs = counter_expiration.map(|d| d.as_secs()).and_then(NonZeroU64::new);
526
527 Self {
528 contexts: HashMap::default(),
529 contexts_remove_buf: Vec::new(),
530 context_limit,
531 bucket_width_secs: bucket_width.as_secs(),
532 counter_expire_secs,
533 last_flush: 0,
534 hist_config,
535 telemetry,
536 }
537 }
538
539 fn is_empty(&self) -> bool {
540 self.contexts.is_empty()
541 }
542
543 fn insert(&mut self, timestamp: u64, metric: Metric) -> bool {
544 if !self.contexts.contains_key(metric.context()) && self.contexts.len() >= self.context_limit {
546 return false;
547 }
548
549 let (context, mut values, metadata) = metric.into_parts();
550
551 let bucket_ts = align_to_bucket_start(timestamp, self.bucket_width_secs);
556 values.collapse_non_timestamped(bucket_ts);
557
558 trace!(
559 bucket_ts,
560 kind = values.as_str(),
561 "Inserting metric into aggregation state."
562 );
563
564 match self.contexts.entry(context) {
567 Entry::Occupied(mut entry) => {
568 let aggregated = entry.get_mut();
569
570 aggregated.last_seen = timestamp;
572 aggregated.values.merge(values);
573 }
574 Entry::Vacant(entry) => {
575 self.telemetry.increment_contexts(entry.key(), &values);
576
577 entry.insert(AggregatedMetric {
578 values,
579 metadata,
580 last_seen: timestamp,
581 });
582 }
583 }
584
585 true
586 }
587
588 async fn flush(
589 &mut self, current_time: u64, flush_open_buckets: bool, dispatcher: &mut BufferedDispatcher<'_, EventsBuffer>,
590 ) -> Result<(), GenericError> {
591 let bucket_width_secs = self.bucket_width_secs;
592 let counter_expire_secs = self.counter_expire_secs.map(|d| d.get()).unwrap_or(0);
593
594 let split_timestamp = align_to_bucket_start(current_time, bucket_width_secs).saturating_sub(1);
597
598 let mut zero_value_buckets = SmallVec::<[(u64, MetricValues); 4]>::new();
603 if self.last_flush != 0 {
604 let start = align_to_bucket_start(self.last_flush, bucket_width_secs);
605
606 for bucket_start in (start..current_time).step_by(bucket_width_secs as usize) {
607 if is_bucket_closed(current_time, bucket_start, bucket_width_secs, flush_open_buckets) {
608 zero_value_buckets.push((bucket_start, MetricValues::counter((bucket_start, 0.0))));
609 }
610 }
611 }
612
613 debug!(timestamp = current_time, "Flushing buckets.");
615
616 for (context, am) in self.contexts.iter_mut() {
617 let should_expire_if_empty = match &am.values {
626 MetricValues::Counter(..) => {
627 counter_expire_secs != 0 && am.last_seen + counter_expire_secs < current_time
628 }
629 _ => true,
630 };
631
632 if let MetricValues::Counter(..) = &mut am.values {
638 let expires_at = am.last_seen + counter_expire_secs;
639 for (zv_bucket_start, zero_value) in &zero_value_buckets {
640 if expires_at > *zv_bucket_start {
641 am.values.merge(zero_value.clone());
642 } else {
643 break;
646 }
647 }
648 }
649
650 if let Some(closed_bucket_values) = am.values.split_at_timestamp(split_timestamp) {
659 self.telemetry.increment_flushed(&closed_bucket_values);
660
661 transform_and_push_metric(
663 context.clone(),
664 closed_bucket_values,
665 am.metadata.clone(),
666 bucket_width_secs,
667 &self.hist_config,
668 dispatcher,
669 )
670 .await?;
671 }
672
673 if am.values.is_empty() && should_expire_if_empty {
674 self.telemetry.decrement_contexts(context, &am.values);
675 self.contexts_remove_buf.push(context.clone());
676 }
677 }
678
679 let contexts_len_before = self.contexts.len();
681 for context in self.contexts_remove_buf.drain(..) {
682 self.contexts.remove(&context);
683 }
684 let contexts_len_after = self.contexts.len();
685
686 let contexts_delta = contexts_len_before.saturating_sub(contexts_len_after);
687 let target_contexts_capacity = contexts_len_after.saturating_add(contexts_delta / 2);
688 self.contexts.shrink_to(target_contexts_capacity);
689
690 self.last_flush = current_time;
691
692 Ok(())
693 }
694}
695
696async fn transform_and_push_metric(
697 context: Context, mut values: MetricValues, metadata: MetricMetadata, bucket_width_secs: u64,
698 hist_config: &HistogramConfiguration, dispatcher: &mut BufferedDispatcher<'_, EventsBuffer>,
699) -> Result<(), GenericError> {
700 let bucket_width = Duration::from_secs(bucket_width_secs);
701
702 match values {
703 MetricValues::Histogram(ref mut points) => {
706 if hist_config.copy_to_distribution() {
708 let sketch_points = points
709 .into_iter()
710 .map(|(ts, hist)| {
711 let mut sketch = DDSketch::default();
712 for sample in hist.samples() {
713 sketch.insert_n(sample.value.into_inner(), sample.weight as u32);
714 }
715 (ts, sketch)
716 })
717 .collect::<SketchPoints>();
718 let distribution_values = MetricValues::distribution(sketch_points);
719 let metric_context = if !hist_config.copy_to_distribution_prefix().is_empty() {
720 context.with_name(format!(
721 "{}{}",
722 hist_config.copy_to_distribution_prefix(),
723 context.name()
724 ))
725 } else {
726 context.clone()
727 };
728 let new_metric = Metric::from_parts(metric_context, distribution_values, metadata.clone());
729 dispatcher.push(Event::Metric(new_metric)).await?;
730 }
731 let mut sorted_points = Vec::new();
736 for (ts, h) in points {
737 sorted_points.push((ts, h.summary_view()));
738 }
739
740 for statistic in hist_config.statistics() {
741 let new_points = sorted_points
742 .iter()
743 .map(|(ts, hs)| (*ts, statistic.value_from_histogram(hs)))
744 .collect::<ScalarPoints>();
745
746 let new_values = if statistic.is_rate_statistic() {
747 MetricValues::rate(new_points, bucket_width)
748 } else {
749 MetricValues::gauge(new_points)
750 };
751
752 let new_context = context.with_name(format!("{}.{}", context.name(), statistic.suffix()));
753 let new_metric = Metric::from_parts(new_context, new_values, metadata.clone());
754 dispatcher.push(Event::Metric(new_metric)).await?;
755 }
756
757 Ok(())
758 }
759
760 values => {
763 let adjusted_values = counter_values_to_rate(values, bucket_width);
764
765 let metric = Metric::from_parts(context, adjusted_values, metadata);
766 dispatcher.push(Event::Metric(metric)).await
767 }
768 }
769}
770
771fn counter_values_to_rate(values: MetricValues, interval: Duration) -> MetricValues {
772 match values {
773 MetricValues::Counter(points) => MetricValues::rate(points, interval),
774 values => values,
775 }
776}
777
778const fn align_to_bucket_start(timestamp: u64, bucket_width_secs: u64) -> u64 {
779 timestamp - (timestamp % bucket_width_secs)
780}
781
782const fn is_bucket_closed(
783 current_time: u64, bucket_start: u64, bucket_width_secs: u64, flush_open_buckets: bool,
784) -> bool {
785 (bucket_start + bucket_width_secs - 1) < current_time || flush_open_buckets
804}
805
806#[cfg(test)]
810mod tests {
811 use float_cmp::ApproxEqRatio as _;
812 use saluki_core::{
813 components::ComponentContext,
814 topology::{interconnect::Dispatcher, ComponentId, OutputName},
815 };
816 use saluki_metrics::test::TestRecorder;
817 use tokio::sync::mpsc;
818
819 use super::config::HistogramStatistic;
820 use super::*;
821
822 const BUCKET_WIDTH_SECS: u64 = 10;
823 const BUCKET_WIDTH: Duration = Duration::from_secs(BUCKET_WIDTH_SECS);
824 const COUNTER_EXPIRE_SECS: u64 = 20;
825 const COUNTER_EXPIRE: Option<Duration> = Some(Duration::from_secs(COUNTER_EXPIRE_SECS));
826
827 const fn bucket_ts(step: u64) -> u64 {
829 align_to_bucket_start(insert_ts(step), BUCKET_WIDTH_SECS)
830 }
831
832 const fn insert_ts(step: u64) -> u64 {
834 (BUCKET_WIDTH_SECS * (step + 1)) - 2
835 }
836
837 const fn flush_ts(step: u64) -> u64 {
839 BUCKET_WIDTH_SECS * (step + 1)
840 }
841
842 struct DispatcherReceiver {
843 receiver: mpsc::Receiver<EventsBuffer>,
844 }
845
846 impl DispatcherReceiver {
847 fn collect_next(&mut self) -> Vec<Metric> {
848 match self.receiver.try_recv() {
849 Ok(event_buffer) => {
850 let mut metrics = event_buffer
851 .into_iter()
852 .filter_map(|event| event.try_into_metric())
853 .collect::<Vec<Metric>>();
854
855 metrics.sort_by(|a, b| a.context().name().cmp(b.context().name()));
856 metrics
857 }
858 Err(_) => Vec::new(),
859 }
860 }
861 }
862
863 fn build_basic_dispatcher() -> (EventsDispatcher, DispatcherReceiver) {
865 let component_id = ComponentId::try_from("test").expect("should not fail to create component ID");
866 let mut dispatcher = Dispatcher::new(ComponentContext::transform(component_id));
867
868 let (buffer_tx, buffer_rx) = mpsc::channel(1);
869 dispatcher.add_output(OutputName::Default).unwrap();
870 dispatcher
871 .attach_sender_to_output(&OutputName::Default, buffer_tx)
872 .unwrap();
873
874 (dispatcher, DispatcherReceiver { receiver: buffer_rx })
875 }
876
877 async fn get_flushed_metrics(timestamp: u64, state: &mut AggregationState) -> Vec<Metric> {
878 let (dispatcher, mut dispatcher_receiver) = build_basic_dispatcher();
879 let mut buffered_dispatcher = dispatcher.buffered().expect("default output should always exist");
880
881 state
883 .flush(timestamp, true, &mut buffered_dispatcher)
884 .await
885 .expect("should not fail to flush aggregation state");
886
887 buffered_dispatcher
890 .flush()
891 .await
892 .expect("should not fail to flush buffered sender");
893
894 dispatcher_receiver.collect_next()
895 }
896
897 macro_rules! compare_points {
898 (scalar, $expected:expr, $actual:expr, $error_ratio:literal) => {
899 for (idx, (expected_value, actual_value)) in $expected.into_iter().zip($actual.into_iter()).enumerate() {
900 let (expected_ts, expected_point) = expected_value;
901 let (actual_ts, actual_point) = actual_value;
902
903 assert_eq!(
904 expected_ts, actual_ts,
905 "timestamp for value #{} does not match: {:?} (expected) vs {:?} (actual)",
906 idx, expected_ts, actual_ts
907 );
908 assert!(
909 expected_point.approx_eq_ratio(&actual_point, $error_ratio),
910 "point for value #{} does not match: {} (expected) vs {} (actual)",
911 idx,
912 expected_point,
913 actual_point
914 );
915 }
916 };
917 (distribution, $expected:expr, $actual:expr) => {
918 for (idx, (expected_value, actual_value)) in $expected.into_iter().zip($actual.into_iter()).enumerate() {
919 let (expected_ts, expected_sketch) = expected_value;
920 let (actual_ts, actual_sketch) = actual_value;
921
922 assert_eq!(
923 expected_ts, actual_ts,
924 "timestamp for value #{} does not match: {:?} (expected) vs {:?} (actual)",
925 idx, expected_ts, actual_ts
926 );
927 assert_eq!(
928 expected_sketch, actual_sketch,
929 "sketch for value #{} does not match: {:?} (expected) vs {:?} (actual)",
930 idx, expected_sketch, actual_sketch
931 );
932 }
933 };
934 }
935
936 macro_rules! assert_flushed_scalar_metric {
937 ($original:expr, $actual:expr, [$($ts:expr => $value:expr),+]) => {
938 assert_flushed_scalar_metric!($original, $actual, [$($ts => $value),+], error_ratio => 0.000001);
939 };
940 ($original:expr, $actual:expr, [$($ts:expr => $value:expr),+], error_ratio => $error_ratio:literal) => {
941 let actual_metric = $actual;
942
943 assert_eq!($original.context(), actual_metric.context(), "expected context ({}) and actual context ({}) do not match", $original.context(), actual_metric.context());
944
945 let expected_points = ScalarPoints::from([$(($ts, $value)),+]);
946
947 match actual_metric.values() {
948 MetricValues::Counter(ref actual_points) | MetricValues::Gauge(ref actual_points) | MetricValues::Rate(ref actual_points, _) => {
949 assert_eq!(expected_points.len(), actual_points.len(), "expected and actual values have different number of points");
950 compare_points!(scalar, expected_points, actual_points, $error_ratio);
951 },
952 _ => panic!("only counters, rates, and gauges are supported in assert_flushed_scalar_metric"),
953 }
954 };
955 }
956
957 macro_rules! assert_flushed_distribution_metric {
958 ($original:expr, $actual:expr, [$($ts:expr => $value:expr),+]) => {
959 assert_flushed_distribution_metric!($original, $actual, [$($ts => $value),+], error_ratio => 0.000001);
960 };
961 ($original:expr, $actual:expr, [$($ts:expr => $value:expr),+], error_ratio => $error_ratio:literal) => {
962 let actual_metric = $actual;
963
964 assert_eq!($original.context(), actual_metric.context());
965
966 match actual_metric.values() {
967 MetricValues::Distribution(ref actual_points) => {
968 let expected_points = SketchPoints::from([$(($ts, $value)),+]);
969 assert_eq!(expected_points.len(), actual_points.len(), "expected and actual values have different number of points");
970
971 compare_points!(distribution, &expected_points, actual_points);
972 },
973 _ => panic!("only distributions are supported in assert_flushed_distribution_metric"),
974 }
975 };
976 }
977
978 #[test]
979 fn bucket_is_closed() {
980 let cases = [
983 (1000, 995, 10, false, false),
985 (1000, 995, 10, true, true),
986 (1000, 1000, 10, false, false),
988 (1000, 1000, 10, true, true),
989 (1010, 1000, 10, false, true),
991 (1010, 1000, 10, true, true),
992 ];
993
994 for (current_time, bucket_start, bucket_width_secs, flush_open_buckets, expected) in cases {
995 let expected_reason = if expected {
996 "closed, was open"
997 } else {
998 "open, was closed"
999 };
1000
1001 assert_eq!(
1002 is_bucket_closed(current_time, bucket_start, bucket_width_secs, flush_open_buckets),
1003 expected,
1004 "expected bucket to be {} (current_time={}, bucket_start={}, bucket_width={}, flush_open_buckets={})",
1005 expected_reason,
1006 current_time,
1007 bucket_start,
1008 bucket_width_secs,
1009 flush_open_buckets
1010 );
1011 }
1012 }
1013
1014 #[tokio::test]
1015 async fn context_limit() {
1016 let mut state = AggregationState::new(
1018 BUCKET_WIDTH,
1019 2,
1020 COUNTER_EXPIRE,
1021 HistogramConfiguration::default(),
1022 Telemetry::noop(),
1023 );
1024
1025 let input_metrics = vec![
1028 Metric::gauge("metric1", 1.0),
1029 Metric::gauge("metric2", 2.0),
1030 Metric::gauge("metric3", 3.0),
1031 Metric::gauge("metric4", 4.0),
1032 ];
1033
1034 assert!(state.insert(insert_ts(1), input_metrics[0].clone()));
1035 assert!(state.insert(insert_ts(1), input_metrics[1].clone()));
1036 assert!(!state.insert(insert_ts(1), input_metrics[2].clone()));
1037 assert!(!state.insert(insert_ts(1), input_metrics[3].clone()));
1038
1039 let flushed_metrics = get_flushed_metrics(flush_ts(1), &mut state).await;
1041 assert_eq!(flushed_metrics.len(), 2);
1042 assert_eq!(input_metrics[0].context(), flushed_metrics[0].context());
1043 assert_eq!(input_metrics[1].context(), flushed_metrics[1].context());
1044
1045 assert!(state.insert(insert_ts(2), input_metrics[2].clone()));
1048 assert!(state.insert(insert_ts(2), input_metrics[3].clone()));
1049
1050 let flushed_metrics = get_flushed_metrics(flush_ts(2), &mut state).await;
1051 assert_eq!(flushed_metrics.len(), 2);
1052 assert_eq!(input_metrics[2].context(), flushed_metrics[0].context());
1053 assert_eq!(input_metrics[3].context(), flushed_metrics[1].context());
1054 }
1055
1056 #[tokio::test]
1057 async fn context_limit_with_zero_value_counters() {
1058 let mut state = AggregationState::new(
1060 BUCKET_WIDTH,
1061 2,
1062 COUNTER_EXPIRE,
1063 HistogramConfiguration::default(),
1064 Telemetry::noop(),
1065 );
1066
1067 let input_metrics = vec![
1069 Metric::counter("metric1", 1.0),
1070 Metric::counter("metric2", 2.0),
1071 Metric::counter("metric3", 3.0),
1072 ];
1073
1074 assert!(state.insert(insert_ts(1), input_metrics[0].clone()));
1075 assert!(state.insert(insert_ts(1), input_metrics[1].clone()));
1076
1077 let flushed_metrics = get_flushed_metrics(flush_ts(1), &mut state).await;
1079 assert_eq!(flushed_metrics.len(), 2);
1080 assert_flushed_scalar_metric!(&input_metrics[0], &flushed_metrics[0], [bucket_ts(1) => 1.0]);
1081 assert_flushed_scalar_metric!(&input_metrics[1], &flushed_metrics[1], [bucket_ts(1) => 2.0]);
1082
1083 let flushed_metrics = get_flushed_metrics(flush_ts(2), &mut state).await;
1085 assert_eq!(flushed_metrics.len(), 2);
1086 assert_flushed_scalar_metric!(&input_metrics[0], &flushed_metrics[0], [bucket_ts(2) => 0.0]);
1087 assert_flushed_scalar_metric!(&input_metrics[1], &flushed_metrics[1], [bucket_ts(2) => 0.0]);
1088
1089 assert!(!state.insert(insert_ts(3), input_metrics[2].clone()));
1091
1092 let flushed_metrics = get_flushed_metrics(flush_ts(3), &mut state).await;
1094 assert_eq!(flushed_metrics.len(), 2);
1095 assert_flushed_scalar_metric!(&input_metrics[0], &flushed_metrics[0], [bucket_ts(3) => 0.0]);
1096 assert_flushed_scalar_metric!(&input_metrics[1], &flushed_metrics[1], [bucket_ts(3) => 0.0]);
1097
1098 let flushed_metrics = get_flushed_metrics(flush_ts(4), &mut state).await;
1101 assert_eq!(flushed_metrics.len(), 0);
1102
1103 assert!(state.insert(insert_ts(5), input_metrics[2].clone()));
1105
1106 let flushed_metrics = get_flushed_metrics(flush_ts(5), &mut state).await;
1107 assert_eq!(flushed_metrics.len(), 1);
1108 assert_flushed_scalar_metric!(&input_metrics[2], &flushed_metrics[0], [bucket_ts(5) => 3.0]);
1109 }
1110
1111 #[tokio::test]
1112 async fn zero_value_counters() {
1113 let mut state = AggregationState::new(
1115 BUCKET_WIDTH,
1116 10,
1117 COUNTER_EXPIRE,
1118 HistogramConfiguration::default(),
1119 Telemetry::noop(),
1120 );
1121
1122 let input_metrics = vec![Metric::counter("metric1", 1.0), Metric::counter("metric2", 2.0)];
1124
1125 assert!(state.insert(insert_ts(1), input_metrics[0].clone()));
1126 assert!(state.insert(insert_ts(1), input_metrics[1].clone()));
1127
1128 let flushed_metrics = get_flushed_metrics(flush_ts(1), &mut state).await;
1130 assert_eq!(flushed_metrics.len(), 2);
1131 assert_flushed_scalar_metric!(&input_metrics[0], &flushed_metrics[0], [bucket_ts(1) => 1.0]);
1132 assert_flushed_scalar_metric!(&input_metrics[1], &flushed_metrics[1], [bucket_ts(1) => 2.0]);
1133
1134 let flushed_metrics = get_flushed_metrics(flush_ts(2), &mut state).await;
1136 assert_eq!(flushed_metrics.len(), 2);
1137 assert_flushed_scalar_metric!(&input_metrics[0], &flushed_metrics[0], [bucket_ts(2) => 0.0]);
1138 assert_flushed_scalar_metric!(&input_metrics[1], &flushed_metrics[1], [bucket_ts(2) => 0.0]);
1139
1140 assert!(state.insert(insert_ts(4), input_metrics[0].clone()));
1142 assert!(state.insert(insert_ts(4), input_metrics[1].clone()));
1143
1144 let flushed_metrics = get_flushed_metrics(flush_ts(4), &mut state).await;
1147 assert_eq!(flushed_metrics.len(), 2);
1148 assert_flushed_scalar_metric!(&input_metrics[0], &flushed_metrics[0], [bucket_ts(3) => 0.0, bucket_ts(4) => 1.0]);
1149 assert_flushed_scalar_metric!(&input_metrics[1], &flushed_metrics[1], [bucket_ts(3) => 0.0, bucket_ts(4) => 2.0]);
1150
1151 let flushed_metrics = get_flushed_metrics(flush_ts(7), &mut state).await;
1155 assert_eq!(flushed_metrics.len(), 2);
1156 assert_flushed_scalar_metric!(&input_metrics[0], &flushed_metrics[0], [bucket_ts(5) => 0.0, bucket_ts(6) => 0.0]);
1157 assert_flushed_scalar_metric!(&input_metrics[1], &flushed_metrics[1], [bucket_ts(5) => 0.0, bucket_ts(6) => 0.0]);
1158 }
1159
1160 #[tokio::test]
1161 async fn merge_identical_timestamped_values_on_flush() {
1162 let mut state = AggregationState::new(
1164 BUCKET_WIDTH,
1165 10,
1166 COUNTER_EXPIRE,
1167 HistogramConfiguration::default(),
1168 Telemetry::noop(),
1169 );
1170
1171 let input_metric = Metric::counter("metric1", [1.0, 2.0, 3.0, 4.0, 5.0]);
1173
1174 assert!(state.insert(insert_ts(1), input_metric.clone()));
1175
1176 let flushed_metrics = get_flushed_metrics(flush_ts(1), &mut state).await;
1179 assert_eq!(flushed_metrics.len(), 1);
1180 assert_flushed_scalar_metric!(&input_metric, &flushed_metrics[0], [bucket_ts(1) => 15.0]);
1181 }
1182
1183 #[tokio::test]
1184 async fn histogram_statistics() {
1185 let hist_config = HistogramConfiguration::from_statistics(
1187 &[
1188 HistogramStatistic::Count,
1189 HistogramStatistic::Sum,
1190 HistogramStatistic::Percentile {
1191 q: 0.5,
1192 suffix: "p50".into(),
1193 },
1194 ],
1195 false,
1196 "".into(),
1197 );
1198 let mut state = AggregationState::new(BUCKET_WIDTH, 10, COUNTER_EXPIRE, hist_config, Telemetry::noop());
1199
1200 let input_metric = Metric::histogram("metric1", [1.0, 2.0, 3.0, 4.0, 5.0]);
1202 assert!(state.insert(insert_ts(1), input_metric.clone()));
1203
1204 let flushed_metrics = get_flushed_metrics(flush_ts(1), &mut state).await;
1207 assert_eq!(flushed_metrics.len(), 3);
1208
1209 let count_metric = Metric::rate("metric1.count", 0.0, Duration::from_secs(BUCKET_WIDTH_SECS));
1212 let sum_metric = Metric::gauge("metric1.sum", 0.0);
1213 let p50_metric = Metric::gauge("metric1.p50", 0.0);
1214
1215 assert_flushed_scalar_metric!(count_metric, &flushed_metrics[0], [bucket_ts(1) => 5.0]);
1218 assert_flushed_scalar_metric!(p50_metric, &flushed_metrics[1], [bucket_ts(1) => 3.0], error_ratio => 0.0025);
1219 assert_flushed_scalar_metric!(sum_metric, &flushed_metrics[2], [bucket_ts(1) => 15.0]);
1220 }
1221
1222 #[tokio::test]
1223 async fn distributions() {
1224 let mut state = AggregationState::new(
1226 BUCKET_WIDTH,
1227 10,
1228 COUNTER_EXPIRE,
1229 HistogramConfiguration::default(),
1230 Telemetry::noop(),
1231 );
1232
1233 let values = [1.0, 2.0, 3.0, 4.0, 5.0];
1235 let input_metric = Metric::distribution("metric1", &values[..]);
1236
1237 assert!(state.insert(insert_ts(1), input_metric.clone()));
1238
1239 let flushed_metrics = get_flushed_metrics(flush_ts(1), &mut state).await;
1241 assert_eq!(flushed_metrics.len(), 1);
1242
1243 assert_flushed_distribution_metric!(&input_metric, &flushed_metrics[0], [bucket_ts(1) => &values[..]]);
1244 }
1245
1246 #[tokio::test]
1247 async fn histogram_copy_to_distribution() {
1248 let hist_config = HistogramConfiguration::from_statistics(
1249 &[
1250 HistogramStatistic::Count,
1251 HistogramStatistic::Sum,
1252 HistogramStatistic::Percentile {
1253 q: 0.5,
1254 suffix: "p50".into(),
1255 },
1256 ],
1257 true,
1258 "dist_prefix.".into(),
1259 );
1260 let mut state = AggregationState::new(BUCKET_WIDTH, 10, COUNTER_EXPIRE, hist_config, Telemetry::noop());
1261
1262 let values = [1.0, 2.0, 3.0, 4.0, 5.0];
1264 let input_metric = Metric::histogram("metric1", values);
1265 assert!(state.insert(insert_ts(1), input_metric.clone()));
1266
1267 let flushed_metrics = get_flushed_metrics(flush_ts(1), &mut state).await;
1270 assert_eq!(flushed_metrics.len(), 4);
1271
1272 let count_metric = Metric::rate("metric1.count", 0.0, Duration::from_secs(BUCKET_WIDTH_SECS));
1275 let sum_metric = Metric::gauge("metric1.sum", 0.0);
1276 let p50_metric = Metric::gauge("metric1.p50", 0.0);
1277 let expected_distribution = Metric::distribution("dist_prefix.metric1", &values[..]);
1278
1279 assert_flushed_distribution_metric!(expected_distribution, &flushed_metrics[0], [bucket_ts(1) => &values[..]]);
1282 assert_flushed_scalar_metric!(count_metric, &flushed_metrics[1], [bucket_ts(1) => 5.0]);
1283 assert_flushed_scalar_metric!(p50_metric, &flushed_metrics[2], [bucket_ts(1) => 3.0], error_ratio => 0.0025);
1284 assert_flushed_scalar_metric!(sum_metric, &flushed_metrics[3], [bucket_ts(1) => 15.0]);
1285 }
1286
1287 #[tokio::test]
1288 async fn nonaggregated_counters_to_rate() {
1289 let counter_value = 42.0;
1290 let bucket_width = BUCKET_WIDTH;
1291
1292 let mut state = AggregationState::new(
1294 bucket_width,
1295 10,
1296 COUNTER_EXPIRE,
1297 HistogramConfiguration::default(),
1298 Telemetry::noop(),
1299 );
1300
1301 let input_metric = Metric::counter("metric1", counter_value);
1303 assert!(state.insert(insert_ts(1), input_metric.clone()));
1304
1305 let flushed_metrics = get_flushed_metrics(flush_ts(1), &mut state).await;
1308 assert_eq!(flushed_metrics.len(), 1);
1309 let flushed_metric = &flushed_metrics[0];
1310
1311 assert_flushed_scalar_metric!(&input_metric, flushed_metric, [bucket_ts(1) => counter_value]);
1312 assert_eq!(flushed_metric.values().as_str(), "rate");
1313 }
1314
1315 #[tokio::test]
1316 async fn preaggregated_counters_to_rate() {
1317 let counter_value = 42.0;
1318 let timestamp = 123456;
1319 let bucket_width = BUCKET_WIDTH;
1320
1321 let mut batcher = PassthroughBatcher::new(Duration::from_nanos(1), bucket_width, Telemetry::noop()).await;
1323 let (dispatcher, mut dispatcher_receiver) = build_basic_dispatcher();
1324
1325 let input_metric = Metric::counter("metric1", (timestamp, counter_value));
1327 batcher.push_metric(input_metric.clone(), &dispatcher).await;
1328
1329 batcher.try_flush(&dispatcher).await;
1332
1333 let mut flushed_metrics = dispatcher_receiver.collect_next();
1334 assert_eq!(flushed_metrics.len(), 1);
1335 assert_eq!(
1336 Metric::rate("metric1", (timestamp, counter_value), bucket_width),
1337 flushed_metrics.remove(0)
1338 );
1339 }
1340
1341 #[tokio::test]
1342 async fn telemetry() {
1343 let recorder = TestRecorder::default();
1350 let _local = metrics::set_default_local_recorder(&recorder);
1351
1352 let builder = MetricsBuilder::default();
1353 let telemetry = Telemetry::new(&builder);
1354
1355 let mut state = AggregationState::new(
1356 BUCKET_WIDTH,
1357 2,
1358 COUNTER_EXPIRE,
1359 HistogramConfiguration::default(),
1360 telemetry,
1361 );
1362
1363 assert_eq!(recorder.gauge("aggregate_active_contexts"), Some(0.0));
1365 assert_eq!(recorder.counter("aggregate_passthrough_metrics_total"), Some(0));
1366 assert_eq!(
1367 recorder.counter(("component_events_dropped_total", &[("intentional", "true")])),
1368 Some(0)
1369 );
1370 for metric_type in &["counter", "gauge", "rate", "set", "histogram", "distribution"] {
1371 assert_eq!(
1372 recorder.gauge(("aggregate_active_contexts_by_type", &[("metric_type", *metric_type)])),
1373 Some(0.0)
1374 );
1375 }
1376
1377 assert!(state.insert(insert_ts(1), Metric::counter("metric1", 42.0)));
1379 assert_eq!(recorder.gauge("aggregate_active_contexts"), Some(1.0));
1380 assert_eq!(
1381 recorder.gauge(("aggregate_active_contexts_by_type", &[("metric_type", "counter")])),
1382 Some(1.0)
1383 );
1384 assert_eq!(recorder.counter("aggregate_passthrough_metrics_total"), Some(0));
1385
1386 assert!(state.insert(insert_ts(1), Metric::gauge("metric2", (insert_ts(1), 42.0))));
1388 assert_eq!(recorder.gauge("aggregate_active_contexts"), Some(2.0));
1389 assert_eq!(
1390 recorder.gauge(("aggregate_active_contexts_by_type", &[("metric_type", "gauge")])),
1391 Some(1.0)
1392 );
1393
1394 assert!(!state.insert(insert_ts(1), Metric::counter("metric3", 42.0)));
1396 assert_eq!(recorder.gauge("aggregate_active_contexts"), Some(2.0));
1397 assert_eq!(
1398 recorder.gauge(("aggregate_active_contexts_by_type", &[("metric_type", "counter")])),
1399 Some(1.0)
1400 );
1401
1402 let _ = get_flushed_metrics(flush_ts(1), &mut state).await;
1405 assert_eq!(recorder.gauge("aggregate_active_contexts"), Some(1.0));
1406 assert_eq!(
1407 recorder.gauge(("aggregate_active_contexts_by_type", &[("metric_type", "counter")])),
1408 Some(1.0)
1409 );
1410 assert_eq!(
1411 recorder.gauge(("aggregate_active_contexts_by_type", &[("metric_type", "gauge")])),
1412 Some(0.0)
1413 );
1414 }
1415}