1use std::{
2 num::NonZeroU64,
3 time::{Duration, Instant},
4};
5
6use async_trait::async_trait;
7use ddsketch_agent::DDSketch;
8use hashbrown::{hash_map::Entry, HashMap};
9use memory_accounting::{MemoryBounds, MemoryBoundsBuilder, UsageExpr};
10use saluki_common::time::get_unix_timestamp;
11use saluki_config::GenericConfiguration;
12use saluki_context::Context;
13use saluki_core::{
14 components::{transforms::*, ComponentContext},
15 data_model::event::{metric::*, Event, EventType},
16 observability::ComponentMetricsExt as _,
17 topology::{interconnect::BufferedDispatcher, OutputDefinition},
18 topology::{EventsBuffer, EventsDispatcher},
19};
20use saluki_error::GenericError;
21use saluki_metrics::MetricsBuilder;
22use serde::Deserialize;
23use smallvec::SmallVec;
24use tokio::{
25 select,
26 time::{interval, interval_at},
27};
28use tracing::{debug, error, trace};
29
30mod telemetry;
31use self::telemetry::Telemetry;
32
33mod config;
34use self::config::HistogramConfiguration;
35
36const PASSTHROUGH_IDLE_FLUSH_CHECK_INTERVAL: Duration = Duration::from_secs(2);
37
38const fn default_window_duration() -> Duration {
39 Duration::from_secs(10)
40}
41
42const fn default_primary_flush_interval() -> Duration {
43 Duration::from_secs(15)
44}
45
46const fn default_context_limit() -> usize {
47 5000
48}
49
50const fn default_counter_expiry_seconds() -> Option<u64> {
51 Some(300)
52}
53
54const fn default_passthrough_timestamped_metrics() -> bool {
55 true
56}
57
58const fn default_passthrough_idle_flush_timeout() -> Duration {
59 Duration::from_secs(1)
60}
61
62#[derive(Deserialize)]
82pub struct AggregateConfiguration {
83 #[serde(rename = "aggregate_window_duration", default = "default_window_duration")]
91 window_duration: Duration,
92
93 #[serde(rename = "aggregate_flush_interval", default = "default_primary_flush_interval")]
100 primary_flush_interval: Duration,
101
102 #[serde(rename = "aggregate_context_limit", default = "default_context_limit")]
113 context_limit: usize,
114
115 #[serde(rename = "aggregate_flush_open_windows", default)]
127 flush_open_windows: bool,
128
129 #[serde(alias = "dogstatsd_expiry_seconds", default = "default_counter_expiry_seconds")]
142 counter_expiry_seconds: Option<u64>,
143
144 #[serde(
153 rename = "dogstatsd_no_aggregation_pipeline",
154 default = "default_passthrough_timestamped_metrics"
155 )]
156 passthrough_timestamped_metrics: bool,
157
158 #[serde(
166 rename = "aggregate_passthrough_idle_flush_timeout",
167 default = "default_passthrough_idle_flush_timeout"
168 )]
169 passthrough_idle_flush_timeout: Duration,
170
171 #[serde(flatten)]
176 hist_config: HistogramConfiguration,
177}
178
179impl AggregateConfiguration {
180 pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
182 Ok(config.as_typed()?)
183 }
184
185 pub fn with_defaults() -> Self {
187 Self {
188 window_duration: default_window_duration(),
189 primary_flush_interval: default_primary_flush_interval(),
190 context_limit: default_context_limit(),
191 flush_open_windows: false,
192 counter_expiry_seconds: default_counter_expiry_seconds(),
193 passthrough_timestamped_metrics: default_passthrough_timestamped_metrics(),
194 passthrough_idle_flush_timeout: default_passthrough_idle_flush_timeout(),
195 hist_config: HistogramConfiguration::default(),
196 }
197 }
198}
199
200#[async_trait]
201impl TransformBuilder for AggregateConfiguration {
202 async fn build(&self, context: ComponentContext) -> Result<Box<dyn Transform + Send>, GenericError> {
203 let metrics_builder = MetricsBuilder::from_component_context(&context);
204 let telemetry = Telemetry::new(&metrics_builder);
205
206 let state = AggregationState::new(
207 self.window_duration,
208 self.context_limit,
209 self.counter_expiry_seconds.filter(|s| *s != 0).map(Duration::from_secs),
210 self.hist_config.clone(),
211 telemetry.clone(),
212 );
213
214 let passthrough_batcher = PassthroughBatcher::new(
215 self.passthrough_idle_flush_timeout,
216 self.window_duration,
217 telemetry.clone(),
218 )
219 .await;
220
221 Ok(Box::new(Aggregate {
222 state,
223 telemetry,
224 primary_flush_interval: self.primary_flush_interval,
225 flush_open_windows: self.flush_open_windows,
226 passthrough_batcher,
227 passthrough_timestamped_metrics: self.passthrough_timestamped_metrics,
228 }))
229 }
230
231 fn input_event_type(&self) -> EventType {
232 EventType::Metric
233 }
234
235 fn outputs(&self) -> &[OutputDefinition<EventType>] {
236 static OUTPUTS: &[OutputDefinition<EventType>] = &[OutputDefinition::default_output(EventType::Metric)];
237 OUTPUTS
238 }
239}
240
241impl MemoryBounds for AggregateConfiguration {
242 fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
243 builder
252 .minimum()
253 .with_single_value::<Aggregate>("component struct");
255 builder
256 .firm()
257 .with_expr(UsageExpr::product(
259 "aggregation state map",
260 UsageExpr::sum(
261 "context map entry",
262 UsageExpr::struct_size::<Context>("context"),
263 UsageExpr::struct_size::<AggregatedMetric>("aggregated metric"),
264 ),
265 UsageExpr::config("aggregate_context_limit", self.context_limit),
266 ));
267 }
268}
269
270pub struct Aggregate {
271 state: AggregationState,
272 telemetry: Telemetry,
273 primary_flush_interval: Duration,
274 flush_open_windows: bool,
275 passthrough_batcher: PassthroughBatcher,
276 passthrough_timestamped_metrics: bool,
277}
278
279#[async_trait]
280impl Transform for Aggregate {
281 async fn run(mut self: Box<Self>, mut context: TransformContext) -> Result<(), GenericError> {
282 let mut health = context.take_health_handle();
283
284 let mut primary_flush = interval_at(
285 tokio::time::Instant::now() + self.primary_flush_interval,
286 self.primary_flush_interval,
287 );
288 let mut final_primary_flush = false;
289
290 let passthrough_flush = interval(PASSTHROUGH_IDLE_FLUSH_CHECK_INTERVAL);
291
292 health.mark_ready();
293 debug!("Aggregation transform started.");
294
295 tokio::pin!(passthrough_flush);
296
297 loop {
298 select! {
299 _ = health.live() => continue,
300 _ = primary_flush.tick() => {
301 if !self.state.is_empty() {
305 debug!("Flushing aggregated metrics...");
306
307 let should_flush_open_windows = final_primary_flush && self.flush_open_windows;
308
309 let mut dispatcher = context.dispatcher().buffered().expect("default output should always exist");
310 if let Err(e) = self.state.flush(get_unix_timestamp(), should_flush_open_windows, &mut dispatcher).await {
311 error!(error = %e, "Failed to flush aggregation state.");
312 }
313
314 self.telemetry.increment_flushes();
315
316 match dispatcher.flush().await {
317 Ok(aggregated_events) => debug!(aggregated_events, "Dispatched events."),
318 Err(e) => error!(error = %e, "Failed to flush aggregated events."),
319 }
320 }
321
322 if final_primary_flush {
324 debug!("All aggregation complete.");
325 break
326 }
327 },
328 _ = passthrough_flush.tick() => self.passthrough_batcher.try_flush(context.dispatcher()).await,
329 maybe_events = context.events().next(), if !final_primary_flush => match maybe_events {
330 Some(events) => {
331 trace!(events_len = events.len(), "Received events.");
332
333 let current_time = get_unix_timestamp();
334 let mut processed_passthrough_metrics = false;
335
336 for event in events {
337 if let Some(metric) = event.try_into_metric() {
338 let metric = if self.passthrough_timestamped_metrics {
339 let (maybe_timestamped_metric, maybe_nontimestamped_metric) = try_split_timestamped_values(metric);
343
344 if let Some(timestamped_metric) = maybe_timestamped_metric {
346 self.passthrough_batcher.push_metric(timestamped_metric, context.dispatcher()).await;
347 processed_passthrough_metrics = true;
348 }
349
350 match maybe_nontimestamped_metric {
354 Some(metric) => metric,
355 None => continue,
356 }
357 } else {
358 metric
359 };
360
361 if !self.state.insert(current_time, metric) {
362 trace!("Dropping metric due to context limit.");
363 self.telemetry.increment_events_dropped();
364 }
365 }
366 }
367
368 if processed_passthrough_metrics {
369 self.passthrough_batcher.update_last_processed_at();
370 }
371 },
372 None => {
373 final_primary_flush = true;
376 primary_flush.reset_immediately();
377
378 debug!("Aggregation transform stopping...");
379 }
380 },
381 }
382 }
383
384 self.passthrough_batcher.try_flush(context.dispatcher()).await;
386
387 debug!("Aggregation transform stopped.");
388
389 Ok(())
390 }
391}
392
393fn try_split_timestamped_values(mut metric: Metric) -> (Option<Metric>, Option<Metric>) {
394 if metric.values().all_timestamped() {
395 (Some(metric), None)
396 } else if metric.values().any_timestamped() {
397 let new_metric_values = metric.values_mut().split_timestamped();
399 let new_metric = Metric::from_parts(metric.context().clone(), new_metric_values, metric.metadata().clone());
400
401 (Some(new_metric), Some(metric))
402 } else {
403 (None, Some(metric))
405 }
406}
407
408struct PassthroughBatcher {
409 active_buffer: EventsBuffer,
410 active_buffer_start: Instant,
411 last_processed_at: Instant,
412 idle_flush_timeout: Duration,
413 bucket_width: Duration,
414 telemetry: Telemetry,
415}
416
417impl PassthroughBatcher {
418 async fn new(idle_flush_timeout: Duration, bucket_width: Duration, telemetry: Telemetry) -> Self {
419 let active_buffer = EventsBuffer::default();
420
421 Self {
422 active_buffer,
423 active_buffer_start: Instant::now(),
424 last_processed_at: Instant::now(),
425 idle_flush_timeout,
426 bucket_width,
427 telemetry,
428 }
429 }
430
431 async fn push_metric(&mut self, metric: Metric, dispatcher: &EventsDispatcher) {
432 let (context, values, metadata) = metric.into_parts();
438 let adjusted_values = counter_values_to_rate(values, self.bucket_width);
439 let metric = Metric::from_parts(context, adjusted_values, metadata);
440
441 if let Some(event) = self.active_buffer.try_push(Event::Metric(metric)) {
445 debug!("Passthrough event buffer was full. Flushing...");
446 self.dispatch_events(dispatcher).await;
447
448 if self.active_buffer.try_push(event).is_some() {
449 error!("Event buffer is full even after dispatching events. Dropping event.");
450 self.telemetry.increment_events_dropped();
451 return;
452 }
453 }
454
455 if self.active_buffer.len() == 1 {
457 self.active_buffer_start = Instant::now();
458 }
459
460 self.telemetry.increment_passthrough_metrics();
461 }
462
463 fn update_last_processed_at(&mut self) {
464 self.last_processed_at = Instant::now();
468 }
469
470 async fn try_flush(&mut self, dispatcher: &EventsDispatcher) {
471 if !self.active_buffer.is_empty() && self.last_processed_at.elapsed() >= self.idle_flush_timeout {
473 debug!("Passthrough processing exceeded idle flush timeout. Flushing...");
474
475 self.dispatch_events(dispatcher).await;
476 }
477 }
478
479 async fn dispatch_events(&mut self, dispatcher: &EventsDispatcher) {
480 if !self.active_buffer.is_empty() {
481 let unaggregated_events = self.active_buffer.len();
482
483 let batch_duration = self.active_buffer_start.elapsed();
485 self.telemetry.record_passthrough_batch_duration(batch_duration);
486
487 self.telemetry.increment_passthrough_flushes();
488
489 let new_active_buffer = EventsBuffer::default();
491 let old_active_buffer = std::mem::replace(&mut self.active_buffer, new_active_buffer);
492
493 match dispatcher.dispatch(old_active_buffer).await {
494 Ok(()) => debug!(unaggregated_events, "Dispatched events."),
495 Err(e) => error!(error = %e, "Failed to flush unaggregated events."),
496 }
497 }
498 }
499}
500
501#[derive(Clone)]
502struct AggregatedMetric {
503 values: MetricValues,
504 metadata: MetricMetadata,
505 last_seen: u64,
506}
507
508struct AggregationState {
509 contexts: HashMap<Context, AggregatedMetric, foldhash::quality::RandomState>,
510 contexts_remove_buf: Vec<Context>,
511 context_limit: usize,
512 bucket_width_secs: u64,
513 counter_expire_secs: Option<NonZeroU64>,
514 last_flush: u64,
515 hist_config: HistogramConfiguration,
516 telemetry: Telemetry,
517}
518
519impl AggregationState {
520 fn new(
521 bucket_width: Duration, context_limit: usize, counter_expiration: Option<Duration>,
522 hist_config: HistogramConfiguration, telemetry: Telemetry,
523 ) -> Self {
524 let counter_expire_secs = counter_expiration.map(|d| d.as_secs()).and_then(NonZeroU64::new);
525
526 Self {
527 contexts: HashMap::default(),
528 contexts_remove_buf: Vec::new(),
529 context_limit,
530 bucket_width_secs: bucket_width.as_secs(),
531 counter_expire_secs,
532 last_flush: 0,
533 hist_config,
534 telemetry,
535 }
536 }
537
538 fn is_empty(&self) -> bool {
539 self.contexts.is_empty()
540 }
541
542 fn insert(&mut self, timestamp: u64, metric: Metric) -> bool {
543 if !self.contexts.contains_key(metric.context()) && self.contexts.len() >= self.context_limit {
545 return false;
546 }
547
548 let (context, mut values, metadata) = metric.into_parts();
549
550 let bucket_ts = align_to_bucket_start(timestamp, self.bucket_width_secs);
555 values.collapse_non_timestamped(bucket_ts);
556
557 trace!(
558 bucket_ts,
559 kind = values.as_str(),
560 "Inserting metric into aggregation state."
561 );
562
563 match self.contexts.entry(context) {
566 Entry::Occupied(mut entry) => {
567 let aggregated = entry.get_mut();
568
569 aggregated.last_seen = timestamp;
571 aggregated.values.merge(values);
572 }
573 Entry::Vacant(entry) => {
574 self.telemetry.increment_contexts(entry.key(), &values);
575
576 entry.insert(AggregatedMetric {
577 values,
578 metadata,
579 last_seen: timestamp,
580 });
581 }
582 }
583
584 true
585 }
586
587 async fn flush(
588 &mut self, current_time: u64, flush_open_buckets: bool, dispatcher: &mut BufferedDispatcher<'_, EventsBuffer>,
589 ) -> Result<(), GenericError> {
590 let bucket_width_secs = self.bucket_width_secs;
591 let counter_expire_secs = self.counter_expire_secs.map(|d| d.get()).unwrap_or(0);
592
593 let split_timestamp = align_to_bucket_start(current_time, bucket_width_secs).saturating_sub(1);
596
597 let mut zero_value_buckets = SmallVec::<[(u64, MetricValues); 4]>::new();
602 if self.last_flush != 0 {
603 let start = align_to_bucket_start(self.last_flush, bucket_width_secs);
604
605 for bucket_start in (start..current_time).step_by(bucket_width_secs as usize) {
606 if is_bucket_closed(current_time, bucket_start, bucket_width_secs, flush_open_buckets) {
607 zero_value_buckets.push((bucket_start, MetricValues::counter((bucket_start, 0.0))));
608 }
609 }
610 }
611
612 debug!(timestamp = current_time, "Flushing buckets.");
614
615 for (context, am) in self.contexts.iter_mut() {
616 let should_expire_if_empty = match &am.values {
625 MetricValues::Counter(..) => {
626 counter_expire_secs != 0 && am.last_seen + counter_expire_secs < current_time
627 }
628 _ => true,
629 };
630
631 if let MetricValues::Counter(..) = &mut am.values {
637 let expires_at = am.last_seen + counter_expire_secs;
638 for (zv_bucket_start, zero_value) in &zero_value_buckets {
639 if expires_at > *zv_bucket_start {
640 am.values.merge(zero_value.clone());
641 } else {
642 break;
645 }
646 }
647 }
648
649 if let Some(closed_bucket_values) = am.values.split_at_timestamp(split_timestamp) {
658 self.telemetry.increment_flushed(&closed_bucket_values);
659
660 transform_and_push_metric(
662 context.clone(),
663 closed_bucket_values,
664 am.metadata.clone(),
665 bucket_width_secs,
666 &self.hist_config,
667 dispatcher,
668 )
669 .await?;
670 }
671
672 if am.values.is_empty() && should_expire_if_empty {
673 self.telemetry.decrement_contexts(context, &am.values);
674 self.contexts_remove_buf.push(context.clone());
675 }
676 }
677
678 let contexts_len_before = self.contexts.len();
680 for context in self.contexts_remove_buf.drain(..) {
681 self.contexts.remove(&context);
682 }
683 let contexts_len_after = self.contexts.len();
684
685 let contexts_delta = contexts_len_before.saturating_sub(contexts_len_after);
686 let target_contexts_capacity = contexts_len_after.saturating_add(contexts_delta / 2);
687 self.contexts.shrink_to(target_contexts_capacity);
688
689 self.last_flush = current_time;
690
691 Ok(())
692 }
693}
694
695async fn transform_and_push_metric(
696 context: Context, mut values: MetricValues, metadata: MetricMetadata, bucket_width_secs: u64,
697 hist_config: &HistogramConfiguration, dispatcher: &mut BufferedDispatcher<'_, EventsBuffer>,
698) -> Result<(), GenericError> {
699 let bucket_width = Duration::from_secs(bucket_width_secs);
700
701 match values {
702 MetricValues::Histogram(ref mut points) => {
705 if hist_config.copy_to_distribution() {
707 let sketch_points = points
708 .into_iter()
709 .map(|(ts, hist)| {
710 let mut sketch = DDSketch::default();
711 for sample in hist.samples() {
712 sketch.insert_n(sample.value.into_inner(), sample.weight as u32);
713 }
714 (ts, sketch)
715 })
716 .collect::<SketchPoints>();
717 let distribution_values = MetricValues::distribution(sketch_points);
718 let metric_context = if !hist_config.copy_to_distribution_prefix().is_empty() {
719 context.with_name(format!(
720 "{}{}",
721 hist_config.copy_to_distribution_prefix(),
722 context.name()
723 ))
724 } else {
725 context.clone()
726 };
727 let new_metric = Metric::from_parts(metric_context, distribution_values, metadata.clone());
728 dispatcher.push(Event::Metric(new_metric)).await?;
729 }
730 let mut sorted_points = Vec::new();
735 for (ts, h) in points {
736 sorted_points.push((ts, h.summary_view()));
737 }
738
739 for statistic in hist_config.statistics() {
740 let new_points = sorted_points
741 .iter()
742 .map(|(ts, hs)| (*ts, statistic.value_from_histogram(hs)))
743 .collect::<ScalarPoints>();
744
745 let new_values = if statistic.is_rate_statistic() {
746 MetricValues::rate(new_points, bucket_width)
747 } else {
748 MetricValues::gauge(new_points)
749 };
750
751 let new_context = context.with_name(format!("{}.{}", context.name(), statistic.suffix()));
752 let new_metric = Metric::from_parts(new_context, new_values, metadata.clone());
753 dispatcher.push(Event::Metric(new_metric)).await?;
754 }
755
756 Ok(())
757 }
758
759 values => {
762 let adjusted_values = counter_values_to_rate(values, bucket_width);
763
764 let metric = Metric::from_parts(context, adjusted_values, metadata);
765 dispatcher.push(Event::Metric(metric)).await
766 }
767 }
768}
769
770fn counter_values_to_rate(values: MetricValues, interval: Duration) -> MetricValues {
771 match values {
772 MetricValues::Counter(points) => MetricValues::rate(points, interval),
773 values => values,
774 }
775}
776
777const fn align_to_bucket_start(timestamp: u64, bucket_width_secs: u64) -> u64 {
778 timestamp - (timestamp % bucket_width_secs)
779}
780
781const fn is_bucket_closed(
782 current_time: u64, bucket_start: u64, bucket_width_secs: u64, flush_open_buckets: bool,
783) -> bool {
784 (bucket_start + bucket_width_secs - 1) < current_time || flush_open_buckets
803}
804
805#[cfg(test)]
809mod tests {
810 use float_cmp::ApproxEqRatio as _;
811 use saluki_core::{
812 components::ComponentContext,
813 topology::{interconnect::Dispatcher, ComponentId, OutputName},
814 };
815 use saluki_metrics::test::TestRecorder;
816 use tokio::sync::mpsc;
817
818 use super::config::HistogramStatistic;
819 use super::*;
820
821 const BUCKET_WIDTH_SECS: u64 = 10;
822 const BUCKET_WIDTH: Duration = Duration::from_secs(BUCKET_WIDTH_SECS);
823 const COUNTER_EXPIRE_SECS: u64 = 20;
824 const COUNTER_EXPIRE: Option<Duration> = Some(Duration::from_secs(COUNTER_EXPIRE_SECS));
825
826 const fn bucket_ts(step: u64) -> u64 {
828 align_to_bucket_start(insert_ts(step), BUCKET_WIDTH_SECS)
829 }
830
831 const fn insert_ts(step: u64) -> u64 {
833 (BUCKET_WIDTH_SECS * (step + 1)) - 2
834 }
835
836 const fn flush_ts(step: u64) -> u64 {
838 BUCKET_WIDTH_SECS * (step + 1)
839 }
840
841 struct DispatcherReceiver {
842 receiver: mpsc::Receiver<EventsBuffer>,
843 }
844
845 impl DispatcherReceiver {
846 fn collect_next(&mut self) -> Vec<Metric> {
847 match self.receiver.try_recv() {
848 Ok(event_buffer) => {
849 let mut metrics = event_buffer
850 .into_iter()
851 .filter_map(|event| event.try_into_metric())
852 .collect::<Vec<Metric>>();
853
854 metrics.sort_by(|a, b| a.context().name().cmp(b.context().name()));
855 metrics
856 }
857 Err(_) => Vec::new(),
858 }
859 }
860 }
861
862 fn build_basic_dispatcher() -> (EventsDispatcher, DispatcherReceiver) {
864 let component_id = ComponentId::try_from("test").expect("should not fail to create component ID");
865 let mut dispatcher = Dispatcher::new(ComponentContext::transform(component_id));
866
867 let (buffer_tx, buffer_rx) = mpsc::channel(1);
868 dispatcher.add_output(OutputName::Default).unwrap();
869 dispatcher
870 .attach_sender_to_output(&OutputName::Default, buffer_tx)
871 .unwrap();
872
873 (dispatcher, DispatcherReceiver { receiver: buffer_rx })
874 }
875
876 async fn get_flushed_metrics(timestamp: u64, state: &mut AggregationState) -> Vec<Metric> {
877 let (dispatcher, mut dispatcher_receiver) = build_basic_dispatcher();
878 let mut buffered_dispatcher = dispatcher.buffered().expect("default output should always exist");
879
880 state
882 .flush(timestamp, true, &mut buffered_dispatcher)
883 .await
884 .expect("should not fail to flush aggregation state");
885
886 buffered_dispatcher
889 .flush()
890 .await
891 .expect("should not fail to flush buffered sender");
892
893 dispatcher_receiver.collect_next()
894 }
895
896 macro_rules! compare_points {
897 (scalar, $expected:expr, $actual:expr, $error_ratio:literal) => {
898 for (idx, (expected_value, actual_value)) in $expected.into_iter().zip($actual.into_iter()).enumerate() {
899 let (expected_ts, expected_point) = expected_value;
900 let (actual_ts, actual_point) = actual_value;
901
902 assert_eq!(
903 expected_ts, actual_ts,
904 "timestamp for value #{} does not match: {:?} (expected) vs {:?} (actual)",
905 idx, expected_ts, actual_ts
906 );
907 assert!(
908 expected_point.approx_eq_ratio(&actual_point, $error_ratio),
909 "point for value #{} does not match: {} (expected) vs {} (actual)",
910 idx,
911 expected_point,
912 actual_point
913 );
914 }
915 };
916 (distribution, $expected:expr, $actual:expr) => {
917 for (idx, (expected_value, actual_value)) in $expected.into_iter().zip($actual.into_iter()).enumerate() {
918 let (expected_ts, expected_sketch) = expected_value;
919 let (actual_ts, actual_sketch) = actual_value;
920
921 assert_eq!(
922 expected_ts, actual_ts,
923 "timestamp for value #{} does not match: {:?} (expected) vs {:?} (actual)",
924 idx, expected_ts, actual_ts
925 );
926 assert_eq!(
927 expected_sketch, actual_sketch,
928 "sketch for value #{} does not match: {:?} (expected) vs {:?} (actual)",
929 idx, expected_sketch, actual_sketch
930 );
931 }
932 };
933 }
934
935 macro_rules! assert_flushed_scalar_metric {
936 ($original:expr, $actual:expr, [$($ts:expr => $value:expr),+]) => {
937 assert_flushed_scalar_metric!($original, $actual, [$($ts => $value),+], error_ratio => 0.000001);
938 };
939 ($original:expr, $actual:expr, [$($ts:expr => $value:expr),+], error_ratio => $error_ratio:literal) => {
940 let actual_metric = $actual;
941
942 assert_eq!($original.context(), actual_metric.context(), "expected context ({}) and actual context ({}) do not match", $original.context(), actual_metric.context());
943
944 let expected_points = ScalarPoints::from([$(($ts, $value)),+]);
945
946 match actual_metric.values() {
947 MetricValues::Counter(ref actual_points) | MetricValues::Gauge(ref actual_points) | MetricValues::Rate(ref actual_points, _) => {
948 assert_eq!(expected_points.len(), actual_points.len(), "expected and actual values have different number of points");
949 compare_points!(scalar, expected_points, actual_points, $error_ratio);
950 },
951 _ => panic!("only counters, rates, and gauges are supported in assert_flushed_scalar_metric"),
952 }
953 };
954 }
955
956 macro_rules! assert_flushed_distribution_metric {
957 ($original:expr, $actual:expr, [$($ts:expr => $value:expr),+]) => {
958 assert_flushed_distribution_metric!($original, $actual, [$($ts => $value),+], error_ratio => 0.000001);
959 };
960 ($original:expr, $actual:expr, [$($ts:expr => $value:expr),+], error_ratio => $error_ratio:literal) => {
961 let actual_metric = $actual;
962
963 assert_eq!($original.context(), actual_metric.context());
964
965 match actual_metric.values() {
966 MetricValues::Distribution(ref actual_points) => {
967 let expected_points = SketchPoints::from([$(($ts, $value)),+]);
968 assert_eq!(expected_points.len(), actual_points.len(), "expected and actual values have different number of points");
969
970 compare_points!(distribution, &expected_points, actual_points);
971 },
972 _ => panic!("only distributions are supported in assert_flushed_distribution_metric"),
973 }
974 };
975 }
976
977 #[test]
978 fn bucket_is_closed() {
979 let cases = [
982 (1000, 995, 10, false, false),
984 (1000, 995, 10, true, true),
985 (1000, 1000, 10, false, false),
987 (1000, 1000, 10, true, true),
988 (1010, 1000, 10, false, true),
990 (1010, 1000, 10, true, true),
991 ];
992
993 for (current_time, bucket_start, bucket_width_secs, flush_open_buckets, expected) in cases {
994 let expected_reason = if expected {
995 "closed, was open"
996 } else {
997 "open, was closed"
998 };
999
1000 assert_eq!(
1001 is_bucket_closed(current_time, bucket_start, bucket_width_secs, flush_open_buckets),
1002 expected,
1003 "expected bucket to be {} (current_time={}, bucket_start={}, bucket_width={}, flush_open_buckets={})",
1004 expected_reason,
1005 current_time,
1006 bucket_start,
1007 bucket_width_secs,
1008 flush_open_buckets
1009 );
1010 }
1011 }
1012
1013 #[tokio::test]
1014 async fn context_limit() {
1015 let mut state = AggregationState::new(
1017 BUCKET_WIDTH,
1018 2,
1019 COUNTER_EXPIRE,
1020 HistogramConfiguration::default(),
1021 Telemetry::noop(),
1022 );
1023
1024 let input_metrics = vec![
1027 Metric::gauge("metric1", 1.0),
1028 Metric::gauge("metric2", 2.0),
1029 Metric::gauge("metric3", 3.0),
1030 Metric::gauge("metric4", 4.0),
1031 ];
1032
1033 assert!(state.insert(insert_ts(1), input_metrics[0].clone()));
1034 assert!(state.insert(insert_ts(1), input_metrics[1].clone()));
1035 assert!(!state.insert(insert_ts(1), input_metrics[2].clone()));
1036 assert!(!state.insert(insert_ts(1), input_metrics[3].clone()));
1037
1038 let flushed_metrics = get_flushed_metrics(flush_ts(1), &mut state).await;
1040 assert_eq!(flushed_metrics.len(), 2);
1041 assert_eq!(input_metrics[0].context(), flushed_metrics[0].context());
1042 assert_eq!(input_metrics[1].context(), flushed_metrics[1].context());
1043
1044 assert!(state.insert(insert_ts(2), input_metrics[2].clone()));
1047 assert!(state.insert(insert_ts(2), input_metrics[3].clone()));
1048
1049 let flushed_metrics = get_flushed_metrics(flush_ts(2), &mut state).await;
1050 assert_eq!(flushed_metrics.len(), 2);
1051 assert_eq!(input_metrics[2].context(), flushed_metrics[0].context());
1052 assert_eq!(input_metrics[3].context(), flushed_metrics[1].context());
1053 }
1054
1055 #[tokio::test]
1056 async fn context_limit_with_zero_value_counters() {
1057 let mut state = AggregationState::new(
1059 BUCKET_WIDTH,
1060 2,
1061 COUNTER_EXPIRE,
1062 HistogramConfiguration::default(),
1063 Telemetry::noop(),
1064 );
1065
1066 let input_metrics = vec![
1068 Metric::counter("metric1", 1.0),
1069 Metric::counter("metric2", 2.0),
1070 Metric::counter("metric3", 3.0),
1071 ];
1072
1073 assert!(state.insert(insert_ts(1), input_metrics[0].clone()));
1074 assert!(state.insert(insert_ts(1), input_metrics[1].clone()));
1075
1076 let flushed_metrics = get_flushed_metrics(flush_ts(1), &mut state).await;
1078 assert_eq!(flushed_metrics.len(), 2);
1079 assert_flushed_scalar_metric!(&input_metrics[0], &flushed_metrics[0], [bucket_ts(1) => 1.0]);
1080 assert_flushed_scalar_metric!(&input_metrics[1], &flushed_metrics[1], [bucket_ts(1) => 2.0]);
1081
1082 let flushed_metrics = get_flushed_metrics(flush_ts(2), &mut state).await;
1084 assert_eq!(flushed_metrics.len(), 2);
1085 assert_flushed_scalar_metric!(&input_metrics[0], &flushed_metrics[0], [bucket_ts(2) => 0.0]);
1086 assert_flushed_scalar_metric!(&input_metrics[1], &flushed_metrics[1], [bucket_ts(2) => 0.0]);
1087
1088 assert!(!state.insert(insert_ts(3), input_metrics[2].clone()));
1090
1091 let flushed_metrics = get_flushed_metrics(flush_ts(3), &mut state).await;
1093 assert_eq!(flushed_metrics.len(), 2);
1094 assert_flushed_scalar_metric!(&input_metrics[0], &flushed_metrics[0], [bucket_ts(3) => 0.0]);
1095 assert_flushed_scalar_metric!(&input_metrics[1], &flushed_metrics[1], [bucket_ts(3) => 0.0]);
1096
1097 let flushed_metrics = get_flushed_metrics(flush_ts(4), &mut state).await;
1100 assert_eq!(flushed_metrics.len(), 0);
1101
1102 assert!(state.insert(insert_ts(5), input_metrics[2].clone()));
1104
1105 let flushed_metrics = get_flushed_metrics(flush_ts(5), &mut state).await;
1106 assert_eq!(flushed_metrics.len(), 1);
1107 assert_flushed_scalar_metric!(&input_metrics[2], &flushed_metrics[0], [bucket_ts(5) => 3.0]);
1108 }
1109
1110 #[tokio::test]
1111 async fn zero_value_counters() {
1112 let mut state = AggregationState::new(
1114 BUCKET_WIDTH,
1115 10,
1116 COUNTER_EXPIRE,
1117 HistogramConfiguration::default(),
1118 Telemetry::noop(),
1119 );
1120
1121 let input_metrics = vec![Metric::counter("metric1", 1.0), Metric::counter("metric2", 2.0)];
1123
1124 assert!(state.insert(insert_ts(1), input_metrics[0].clone()));
1125 assert!(state.insert(insert_ts(1), input_metrics[1].clone()));
1126
1127 let flushed_metrics = get_flushed_metrics(flush_ts(1), &mut state).await;
1129 assert_eq!(flushed_metrics.len(), 2);
1130 assert_flushed_scalar_metric!(&input_metrics[0], &flushed_metrics[0], [bucket_ts(1) => 1.0]);
1131 assert_flushed_scalar_metric!(&input_metrics[1], &flushed_metrics[1], [bucket_ts(1) => 2.0]);
1132
1133 let flushed_metrics = get_flushed_metrics(flush_ts(2), &mut state).await;
1135 assert_eq!(flushed_metrics.len(), 2);
1136 assert_flushed_scalar_metric!(&input_metrics[0], &flushed_metrics[0], [bucket_ts(2) => 0.0]);
1137 assert_flushed_scalar_metric!(&input_metrics[1], &flushed_metrics[1], [bucket_ts(2) => 0.0]);
1138
1139 assert!(state.insert(insert_ts(4), input_metrics[0].clone()));
1141 assert!(state.insert(insert_ts(4), input_metrics[1].clone()));
1142
1143 let flushed_metrics = get_flushed_metrics(flush_ts(4), &mut state).await;
1146 assert_eq!(flushed_metrics.len(), 2);
1147 assert_flushed_scalar_metric!(&input_metrics[0], &flushed_metrics[0], [bucket_ts(3) => 0.0, bucket_ts(4) => 1.0]);
1148 assert_flushed_scalar_metric!(&input_metrics[1], &flushed_metrics[1], [bucket_ts(3) => 0.0, bucket_ts(4) => 2.0]);
1149
1150 let flushed_metrics = get_flushed_metrics(flush_ts(7), &mut state).await;
1154 assert_eq!(flushed_metrics.len(), 2);
1155 assert_flushed_scalar_metric!(&input_metrics[0], &flushed_metrics[0], [bucket_ts(5) => 0.0, bucket_ts(6) => 0.0]);
1156 assert_flushed_scalar_metric!(&input_metrics[1], &flushed_metrics[1], [bucket_ts(5) => 0.0, bucket_ts(6) => 0.0]);
1157 }
1158
1159 #[tokio::test]
1160 async fn merge_identical_timestamped_values_on_flush() {
1161 let mut state = AggregationState::new(
1163 BUCKET_WIDTH,
1164 10,
1165 COUNTER_EXPIRE,
1166 HistogramConfiguration::default(),
1167 Telemetry::noop(),
1168 );
1169
1170 let input_metric = Metric::counter("metric1", [1.0, 2.0, 3.0, 4.0, 5.0]);
1172
1173 assert!(state.insert(insert_ts(1), input_metric.clone()));
1174
1175 let flushed_metrics = get_flushed_metrics(flush_ts(1), &mut state).await;
1178 assert_eq!(flushed_metrics.len(), 1);
1179 assert_flushed_scalar_metric!(&input_metric, &flushed_metrics[0], [bucket_ts(1) => 15.0]);
1180 }
1181
1182 #[tokio::test]
1183 async fn histogram_statistics() {
1184 let hist_config = HistogramConfiguration::from_statistics(
1186 &[
1187 HistogramStatistic::Count,
1188 HistogramStatistic::Sum,
1189 HistogramStatistic::Percentile {
1190 q: 0.5,
1191 suffix: "p50".into(),
1192 },
1193 ],
1194 false,
1195 "".into(),
1196 );
1197 let mut state = AggregationState::new(BUCKET_WIDTH, 10, COUNTER_EXPIRE, hist_config, Telemetry::noop());
1198
1199 let input_metric = Metric::histogram("metric1", [1.0, 2.0, 3.0, 4.0, 5.0]);
1201 assert!(state.insert(insert_ts(1), input_metric.clone()));
1202
1203 let flushed_metrics = get_flushed_metrics(flush_ts(1), &mut state).await;
1206 assert_eq!(flushed_metrics.len(), 3);
1207
1208 let count_metric = Metric::rate("metric1.count", 0.0, Duration::from_secs(BUCKET_WIDTH_SECS));
1211 let sum_metric = Metric::gauge("metric1.sum", 0.0);
1212 let p50_metric = Metric::gauge("metric1.p50", 0.0);
1213
1214 assert_flushed_scalar_metric!(count_metric, &flushed_metrics[0], [bucket_ts(1) => 5.0]);
1217 assert_flushed_scalar_metric!(p50_metric, &flushed_metrics[1], [bucket_ts(1) => 3.0], error_ratio => 0.0025);
1218 assert_flushed_scalar_metric!(sum_metric, &flushed_metrics[2], [bucket_ts(1) => 15.0]);
1219 }
1220
1221 #[tokio::test]
1222 async fn distributions() {
1223 let mut state = AggregationState::new(
1225 BUCKET_WIDTH,
1226 10,
1227 COUNTER_EXPIRE,
1228 HistogramConfiguration::default(),
1229 Telemetry::noop(),
1230 );
1231
1232 let values = [1.0, 2.0, 3.0, 4.0, 5.0];
1234 let input_metric = Metric::distribution("metric1", &values[..]);
1235
1236 assert!(state.insert(insert_ts(1), input_metric.clone()));
1237
1238 let flushed_metrics = get_flushed_metrics(flush_ts(1), &mut state).await;
1240 assert_eq!(flushed_metrics.len(), 1);
1241
1242 assert_flushed_distribution_metric!(&input_metric, &flushed_metrics[0], [bucket_ts(1) => &values[..]]);
1243 }
1244
1245 #[tokio::test]
1246 async fn histogram_copy_to_distribution() {
1247 let hist_config = HistogramConfiguration::from_statistics(
1248 &[
1249 HistogramStatistic::Count,
1250 HistogramStatistic::Sum,
1251 HistogramStatistic::Percentile {
1252 q: 0.5,
1253 suffix: "p50".into(),
1254 },
1255 ],
1256 true,
1257 "dist_prefix.".into(),
1258 );
1259 let mut state = AggregationState::new(BUCKET_WIDTH, 10, COUNTER_EXPIRE, hist_config, Telemetry::noop());
1260
1261 let values = [1.0, 2.0, 3.0, 4.0, 5.0];
1263 let input_metric = Metric::histogram("metric1", values);
1264 assert!(state.insert(insert_ts(1), input_metric.clone()));
1265
1266 let flushed_metrics = get_flushed_metrics(flush_ts(1), &mut state).await;
1269 assert_eq!(flushed_metrics.len(), 4);
1270
1271 let count_metric = Metric::rate("metric1.count", 0.0, Duration::from_secs(BUCKET_WIDTH_SECS));
1274 let sum_metric = Metric::gauge("metric1.sum", 0.0);
1275 let p50_metric = Metric::gauge("metric1.p50", 0.0);
1276 let expected_distribution = Metric::distribution("dist_prefix.metric1", &values[..]);
1277
1278 assert_flushed_distribution_metric!(expected_distribution, &flushed_metrics[0], [bucket_ts(1) => &values[..]]);
1281 assert_flushed_scalar_metric!(count_metric, &flushed_metrics[1], [bucket_ts(1) => 5.0]);
1282 assert_flushed_scalar_metric!(p50_metric, &flushed_metrics[2], [bucket_ts(1) => 3.0], error_ratio => 0.0025);
1283 assert_flushed_scalar_metric!(sum_metric, &flushed_metrics[3], [bucket_ts(1) => 15.0]);
1284 }
1285
1286 #[tokio::test]
1287 async fn nonaggregated_counters_to_rate() {
1288 let counter_value = 42.0;
1289 let bucket_width = BUCKET_WIDTH;
1290
1291 let mut state = AggregationState::new(
1293 bucket_width,
1294 10,
1295 COUNTER_EXPIRE,
1296 HistogramConfiguration::default(),
1297 Telemetry::noop(),
1298 );
1299
1300 let input_metric = Metric::counter("metric1", counter_value);
1302 assert!(state.insert(insert_ts(1), input_metric.clone()));
1303
1304 let flushed_metrics = get_flushed_metrics(flush_ts(1), &mut state).await;
1307 assert_eq!(flushed_metrics.len(), 1);
1308 let flushed_metric = &flushed_metrics[0];
1309
1310 assert_flushed_scalar_metric!(&input_metric, flushed_metric, [bucket_ts(1) => counter_value]);
1311 assert_eq!(flushed_metric.values().as_str(), "rate");
1312 }
1313
1314 #[tokio::test]
1315 async fn preaggregated_counters_to_rate() {
1316 let counter_value = 42.0;
1317 let timestamp = 123456;
1318 let bucket_width = BUCKET_WIDTH;
1319
1320 let mut batcher = PassthroughBatcher::new(Duration::from_nanos(1), bucket_width, Telemetry::noop()).await;
1322 let (dispatcher, mut dispatcher_receiver) = build_basic_dispatcher();
1323
1324 let input_metric = Metric::counter("metric1", (timestamp, counter_value));
1326 batcher.push_metric(input_metric.clone(), &dispatcher).await;
1327
1328 batcher.try_flush(&dispatcher).await;
1331
1332 let mut flushed_metrics = dispatcher_receiver.collect_next();
1333 assert_eq!(flushed_metrics.len(), 1);
1334 assert_eq!(
1335 Metric::rate("metric1", (timestamp, counter_value), bucket_width),
1336 flushed_metrics.remove(0)
1337 );
1338 }
1339
1340 #[tokio::test]
1341 async fn telemetry() {
1342 let recorder = TestRecorder::default();
1349 let _local = metrics::set_default_local_recorder(&recorder);
1350
1351 let builder = MetricsBuilder::default();
1352 let telemetry = Telemetry::new(&builder);
1353
1354 let mut state = AggregationState::new(
1355 BUCKET_WIDTH,
1356 2,
1357 COUNTER_EXPIRE,
1358 HistogramConfiguration::default(),
1359 telemetry,
1360 );
1361
1362 assert_eq!(recorder.gauge("aggregate_active_contexts"), Some(0.0));
1364 assert_eq!(recorder.counter("aggregate_passthrough_metrics_total"), Some(0));
1365 assert_eq!(
1366 recorder.counter(("component_events_dropped_total", &[("intentional", "true")])),
1367 Some(0)
1368 );
1369 for metric_type in &["counter", "gauge", "rate", "set", "histogram", "distribution"] {
1370 assert_eq!(
1371 recorder.gauge(("aggregate_active_contexts_by_type", &[("metric_type", *metric_type)])),
1372 Some(0.0)
1373 );
1374 }
1375
1376 assert!(state.insert(insert_ts(1), Metric::counter("metric1", 42.0)));
1378 assert_eq!(recorder.gauge("aggregate_active_contexts"), Some(1.0));
1379 assert_eq!(
1380 recorder.gauge(("aggregate_active_contexts_by_type", &[("metric_type", "counter")])),
1381 Some(1.0)
1382 );
1383 assert_eq!(recorder.counter("aggregate_passthrough_metrics_total"), Some(0));
1384
1385 assert!(state.insert(insert_ts(1), Metric::gauge("metric2", (insert_ts(1), 42.0))));
1387 assert_eq!(recorder.gauge("aggregate_active_contexts"), Some(2.0));
1388 assert_eq!(
1389 recorder.gauge(("aggregate_active_contexts_by_type", &[("metric_type", "gauge")])),
1390 Some(1.0)
1391 );
1392
1393 assert!(!state.insert(insert_ts(1), Metric::counter("metric3", 42.0)));
1395 assert_eq!(recorder.gauge("aggregate_active_contexts"), Some(2.0));
1396 assert_eq!(
1397 recorder.gauge(("aggregate_active_contexts_by_type", &[("metric_type", "counter")])),
1398 Some(1.0)
1399 );
1400
1401 let _ = get_flushed_metrics(flush_ts(1), &mut state).await;
1404 assert_eq!(recorder.gauge("aggregate_active_contexts"), Some(1.0));
1405 assert_eq!(
1406 recorder.gauge(("aggregate_active_contexts_by_type", &[("metric_type", "counter")])),
1407 Some(1.0)
1408 );
1409 assert_eq!(
1410 recorder.gauge(("aggregate_active_contexts_by_type", &[("metric_type", "gauge")])),
1411 Some(0.0)
1412 );
1413 }
1414}