1use std::{
2 num::NonZeroU64,
3 time::{Duration, Instant},
4};
5
6use async_trait::async_trait;
7use ddsketch::DDSketch;
8use hashbrown::{hash_map::Entry, HashMap};
9use resource_accounting::{MemoryBounds, MemoryBoundsBuilder, UsageExpr};
10use saluki_common::time::get_unix_timestamp;
11use saluki_config::GenericConfiguration;
12use saluki_context::Context;
13use saluki_core::{
14 components::{transforms::*, ComponentContext},
15 data_model::event::{metric::*, Event, EventType},
16 observability::ComponentMetricsExt as _,
17 topology::{interconnect::BufferedDispatcher, OutputDefinition},
18 topology::{EventsBuffer, EventsDispatcher},
19};
20use saluki_error::GenericError;
21use saluki_metrics::MetricsBuilder;
22use serde::Deserialize;
23use smallvec::SmallVec;
24use stringtheory::MetaString;
25use tokio::{
26 select,
27 time::{interval, interval_at},
28};
29use tracing::{debug, error, info, trace, warn};
30
31mod telemetry;
32use self::telemetry::Telemetry;
33
34mod config;
35use self::config::{HistogramConfiguration, HistogramStatistic};
36
37const PASSTHROUGH_IDLE_FLUSH_CHECK_INTERVAL: Duration = Duration::from_secs(2);
38
39const fn default_window_duration_seconds() -> NonZeroU64 {
40 NonZeroU64::new(10).expect("not zero")
41}
42
43const fn default_primary_flush_interval() -> Duration {
44 Duration::from_secs(15)
45}
46
47const fn default_context_limit() -> usize {
48 1_000_000
49}
50
51const fn default_counter_expiry_seconds() -> Option<u64> {
52 Some(300)
53}
54
55const fn default_passthrough_timestamped_metrics() -> bool {
56 true
57}
58
59const fn default_passthrough_idle_flush_timeout() -> Duration {
60 Duration::from_secs(1)
61}
62
63#[derive(Deserialize)]
83#[cfg_attr(test, derive(Debug, PartialEq, serde::Serialize))]
84pub struct AggregateConfiguration {
85 #[serde(
95 rename = "aggregate_window_duration_seconds",
96 default = "default_window_duration_seconds"
97 )]
98 window_duration_seconds: NonZeroU64,
99
100 #[serde(rename = "aggregate_flush_interval", default = "default_primary_flush_interval")]
107 primary_flush_interval: Duration,
108
109 #[serde(rename = "aggregate_context_limit", default = "default_context_limit")]
120 context_limit: usize,
121
122 #[serde(
134 rename = "aggregate_flush_open_windows",
135 alias = "dogstatsd_flush_incomplete_buckets",
136 default
137 )]
138 flush_open_windows: bool,
139
140 #[serde(alias = "dogstatsd_expiry_seconds", default = "default_counter_expiry_seconds")]
153 counter_expiry_seconds: Option<u64>,
154
155 #[serde(
164 rename = "dogstatsd_no_aggregation_pipeline",
165 default = "default_passthrough_timestamped_metrics"
166 )]
167 passthrough_timestamped_metrics: bool,
168
169 #[serde(
177 rename = "aggregate_passthrough_idle_flush_timeout",
178 default = "default_passthrough_idle_flush_timeout"
179 )]
180 passthrough_idle_flush_timeout: Duration,
181
182 #[serde(flatten)]
187 hist_config: HistogramConfiguration,
188}
189
190impl AggregateConfiguration {
191 pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
193 Ok(config.as_typed()?)
194 }
195
196 pub fn with_defaults() -> Self {
198 Self {
199 window_duration_seconds: default_window_duration_seconds(),
200 primary_flush_interval: default_primary_flush_interval(),
201 context_limit: default_context_limit(),
202 flush_open_windows: false,
203 counter_expiry_seconds: default_counter_expiry_seconds(),
204 passthrough_timestamped_metrics: default_passthrough_timestamped_metrics(),
205 passthrough_idle_flush_timeout: default_passthrough_idle_flush_timeout(),
206 hist_config: HistogramConfiguration::default(),
207 }
208 }
209}
210
211#[async_trait]
212impl TransformBuilder for AggregateConfiguration {
213 async fn build(&self, context: ComponentContext) -> Result<Box<dyn Transform + Send>, GenericError> {
214 let metrics_builder = MetricsBuilder::from_component_context(&context);
215 let telemetry = Telemetry::new(&metrics_builder);
216
217 let state = AggregationState::new(
218 self.window_duration_seconds,
219 self.context_limit,
220 self.counter_expiry_seconds.filter(|s| *s != 0).map(Duration::from_secs),
221 self.hist_config.clone(),
222 telemetry.clone(),
223 );
224
225 let passthrough_batcher = PassthroughBatcher::new(
226 self.passthrough_idle_flush_timeout,
227 self.window_duration_seconds,
228 telemetry.clone(),
229 )
230 .await;
231
232 Ok(Box::new(Aggregate {
233 state,
234 telemetry,
235 primary_flush_interval: self.primary_flush_interval,
236 flush_open_windows: self.flush_open_windows,
237 passthrough_batcher,
238 passthrough_timestamped_metrics: self.passthrough_timestamped_metrics,
239 }))
240 }
241
242 fn input_event_type(&self) -> EventType {
243 EventType::Metric
244 }
245
246 fn outputs(&self) -> &[OutputDefinition<EventType>] {
247 static OUTPUTS: &[OutputDefinition<EventType>] = &[OutputDefinition::default_output(EventType::Metric)];
248 OUTPUTS
249 }
250}
251
252impl MemoryBounds for AggregateConfiguration {
253 fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
254 builder
263 .minimum()
264 .with_single_value::<Aggregate>("component struct");
266 builder
267 .firm()
268 .with_expr(UsageExpr::product(
270 "aggregation state map",
271 UsageExpr::sum(
272 "context map entry",
273 UsageExpr::struct_size::<Context>("context"),
274 UsageExpr::struct_size::<AggregatedMetric>("aggregated metric"),
275 ),
276 UsageExpr::config("aggregate_context_limit", self.context_limit),
277 ));
278 }
279}
280
281pub struct Aggregate {
282 state: AggregationState,
283 telemetry: Telemetry,
284 primary_flush_interval: Duration,
285 flush_open_windows: bool,
286 passthrough_batcher: PassthroughBatcher,
287 passthrough_timestamped_metrics: bool,
288}
289
290#[async_trait]
291impl Transform for Aggregate {
292 async fn run(mut self: Box<Self>, mut context: TransformContext) -> Result<(), GenericError> {
293 let mut health = context.take_health_handle();
294
295 let mut primary_flush = interval_at(
296 tokio::time::Instant::now() + self.primary_flush_interval,
297 self.primary_flush_interval,
298 );
299 let mut final_primary_flush = false;
300
301 let passthrough_flush = interval(PASSTHROUGH_IDLE_FLUSH_CHECK_INTERVAL);
302
303 health.mark_ready();
304 debug!("Aggregation transform started.");
305
306 tokio::pin!(passthrough_flush);
307
308 loop {
309 select! {
310 _ = health.live() => continue,
311 _ = primary_flush.tick() => {
312 if !self.state.is_empty() {
316 debug!("Flushing aggregated metrics...");
317
318 let should_flush_open_windows = final_primary_flush && self.flush_open_windows;
319
320 let was_breached = self.state.context_limit_breached();
322
323 let mut dispatcher = context.dispatcher().buffered().expect("default output should always exist");
324 if let Err(e) = self.state.flush(get_unix_timestamp(), should_flush_open_windows, &mut dispatcher).await {
325 error!(error = %e, "Failed to flush aggregation state.");
326 }
327
328 self.telemetry.increment_flushes();
329
330 if was_breached && !self.state.context_limit_breached() {
332 info!("Context limit no longer exceeded, metrics are being accepted again.");
333 }
334
335 match dispatcher.flush().await {
336 Ok(aggregated_events) => debug!(aggregated_events, "Dispatched events."),
337 Err(e) => error!(error = %e, "Failed to flush aggregated events."),
338 }
339 }
340
341 if final_primary_flush {
343 debug!("All aggregation complete.");
344 break
345 }
346 },
347 _ = passthrough_flush.tick() => self.passthrough_batcher.try_flush(context.dispatcher()).await,
348 maybe_events = context.events().next(), if !final_primary_flush => match maybe_events {
349 Some(events) => {
350 trace!(events_len = events.len(), "Received events.");
351
352 let current_time = get_unix_timestamp();
353 let mut processed_passthrough_metrics = false;
354
355 for event in events {
356 if let Some(metric) = event.try_into_metric() {
357 let metric = if self.passthrough_timestamped_metrics {
358 let (maybe_timestamped_metric, maybe_nontimestamped_metric) = try_split_timestamped_values(metric);
362
363 if let Some(timestamped_metric) = maybe_timestamped_metric {
365 self.passthrough_batcher.push_metric(timestamped_metric, context.dispatcher()).await;
366 processed_passthrough_metrics = true;
367 }
368
369 match maybe_nontimestamped_metric {
373 Some(metric) => metric,
374 None => continue,
375 }
376 } else {
377 metric
378 };
379
380 let was_breached = self.state.context_limit_breached();
381 if !self.state.insert(current_time, metric) {
382 trace!("Dropping metric due to context limit.");
383 if !was_breached {
384 warn!(context_limit = self.state.context_limit, "Context limit reached, \
386 dropping metrics. Consider increasing `aggregate_context_limit`.");
387 }
388 self.telemetry.increment_events_dropped();
389 }
390 }
391 }
392
393 if processed_passthrough_metrics {
394 self.passthrough_batcher.update_last_processed_at();
395 }
396 },
397 None => {
398 final_primary_flush = true;
401 primary_flush.reset_immediately();
402
403 debug!("Aggregation transform stopping...");
404 }
405 },
406 }
407 }
408
409 self.passthrough_batcher.try_flush(context.dispatcher()).await;
411
412 debug!("Aggregation transform stopped.");
413
414 Ok(())
415 }
416}
417
418fn try_split_timestamped_values(mut metric: Metric) -> (Option<Metric>, Option<Metric>) {
419 if metric.values().all_timestamped() {
420 (Some(metric), None)
421 } else if metric.values().any_timestamped() {
422 let new_metric_values = metric.values_mut().split_timestamped();
424 let new_metric = Metric::from_parts(metric.context().clone(), new_metric_values, metric.metadata().clone());
425
426 (Some(new_metric), Some(metric))
427 } else {
428 (None, Some(metric))
430 }
431}
432
433struct PassthroughBatcher {
434 active_buffer: EventsBuffer,
435 active_buffer_start: Instant,
436 last_processed_at: Instant,
437 idle_flush_timeout: Duration,
438 bucket_width_secs: NonZeroU64,
439 telemetry: Telemetry,
440}
441
442impl PassthroughBatcher {
443 async fn new(idle_flush_timeout: Duration, bucket_width_secs: NonZeroU64, telemetry: Telemetry) -> Self {
444 let active_buffer = EventsBuffer::default();
445
446 Self {
447 active_buffer,
448 active_buffer_start: Instant::now(),
449 last_processed_at: Instant::now(),
450 idle_flush_timeout,
451 bucket_width_secs,
452 telemetry,
453 }
454 }
455
456 async fn push_metric(&mut self, metric: Metric, dispatcher: &EventsDispatcher) {
457 let (context, values, metadata) = metric.into_parts();
463 let adjusted_values = counter_values_to_rate(values, self.bucket_width_secs);
464 let metric = Metric::from_parts(context, adjusted_values, metadata);
465
466 if let Some(event) = self.active_buffer.try_push(Event::Metric(metric)) {
470 debug!("Passthrough event buffer was full. Flushing...");
471 self.dispatch_events(dispatcher).await;
472
473 if self.active_buffer.try_push(event).is_some() {
474 error!("Event buffer is full even after dispatching events. Dropping event.");
475 self.telemetry.increment_events_dropped();
476 return;
477 }
478 }
479
480 if self.active_buffer.len() == 1 {
482 self.active_buffer_start = Instant::now();
483 }
484
485 self.telemetry.increment_passthrough_metrics();
486 }
487
488 fn update_last_processed_at(&mut self) {
489 self.last_processed_at = Instant::now();
493 }
494
495 async fn try_flush(&mut self, dispatcher: &EventsDispatcher) {
496 if !self.active_buffer.is_empty() && self.last_processed_at.elapsed() >= self.idle_flush_timeout {
498 debug!("Passthrough processing exceeded idle flush timeout. Flushing...");
499
500 self.dispatch_events(dispatcher).await;
501 }
502 }
503
504 async fn dispatch_events(&mut self, dispatcher: &EventsDispatcher) {
505 if !self.active_buffer.is_empty() {
506 let unaggregated_events = self.active_buffer.len();
507
508 let batch_duration = self.active_buffer_start.elapsed();
510 self.telemetry.record_passthrough_batch_duration(batch_duration);
511
512 self.telemetry.increment_passthrough_flushes();
513
514 let new_active_buffer = EventsBuffer::default();
516 let old_active_buffer = std::mem::replace(&mut self.active_buffer, new_active_buffer);
517
518 match dispatcher.dispatch(old_active_buffer).await {
519 Ok(()) => debug!(unaggregated_events, "Dispatched events."),
520 Err(e) => error!(error = %e, "Failed to flush unaggregated events."),
521 }
522 }
523 }
524}
525
526#[derive(Clone)]
527struct AggregatedMetric {
528 values: MetricValues,
529 metadata: MetricMetadata,
530 last_seen: u64,
531}
532
533struct AggregationState {
534 contexts: HashMap<Context, AggregatedMetric, foldhash::quality::RandomState>,
535 contexts_remove_buf: Vec<Context>,
536 context_limit: usize,
537 bucket_width_secs: NonZeroU64,
538 counter_expire_secs: Option<NonZeroU64>,
539 last_flush: u64,
540 hist_config: HistogramConfiguration,
541 telemetry: Telemetry,
542 context_limit_breached: bool,
545}
546
547impl AggregationState {
548 fn new(
549 bucket_width_secs: NonZeroU64, context_limit: usize, counter_expiration: Option<Duration>,
550 hist_config: HistogramConfiguration, telemetry: Telemetry,
551 ) -> Self {
552 let counter_expire_secs = counter_expiration.map(|d| d.as_secs()).and_then(NonZeroU64::new);
553
554 Self {
555 contexts: HashMap::default(),
556 contexts_remove_buf: Vec::new(),
557 context_limit,
558 bucket_width_secs,
559 counter_expire_secs,
560 last_flush: 0,
561 hist_config,
562 telemetry,
563 context_limit_breached: false,
564 }
565 }
566
567 fn is_empty(&self) -> bool {
568 self.contexts.is_empty()
569 }
570
571 fn insert(&mut self, timestamp: u64, metric: Metric) -> bool {
572 if !self.contexts.contains_key(metric.context()) && self.contexts.len() >= self.context_limit {
574 self.context_limit_breached = true;
575 return false;
576 }
577
578 let (context, mut values, metadata) = metric.into_parts();
579
580 let bucket_ts = align_to_bucket_start(timestamp, self.bucket_width_secs);
585 values.collapse_non_timestamped(bucket_ts);
586
587 trace!(
588 bucket_ts,
589 kind = values.as_str(),
590 "Inserting metric into aggregation state."
591 );
592
593 match self.contexts.entry(context) {
596 Entry::Occupied(mut entry) => {
597 let aggregated = entry.get_mut();
598
599 aggregated.last_seen = timestamp;
601 aggregated.values.merge(values);
602 }
603 Entry::Vacant(entry) => {
604 self.telemetry.increment_contexts(entry.key(), &values);
605
606 entry.insert(AggregatedMetric {
607 values,
608 metadata,
609 last_seen: timestamp,
610 });
611 }
612 }
613
614 true
615 }
616
617 async fn flush(
618 &mut self, current_time: u64, flush_open_buckets: bool, dispatcher: &mut BufferedDispatcher<'_, EventsBuffer>,
619 ) -> Result<(), GenericError> {
620 let bucket_width_secs = self.bucket_width_secs;
621 let counter_expire_secs = self.counter_expire_secs.map(|d| d.get()).unwrap_or(0);
622
623 let split_timestamp = align_to_bucket_start(current_time, bucket_width_secs).saturating_sub(1);
626
627 let mut zero_value_buckets = SmallVec::<[(u64, MetricValues); 4]>::new();
632 if self.last_flush != 0 {
633 let start = align_to_bucket_start(self.last_flush, bucket_width_secs);
634
635 for bucket_start in (start..current_time).step_by(bucket_width_secs.get() as usize) {
636 if is_bucket_closed(current_time, bucket_start, bucket_width_secs, flush_open_buckets) {
637 zero_value_buckets.push((bucket_start, MetricValues::counter((bucket_start, 0.0))));
638 }
639 }
640 }
641
642 debug!(timestamp = current_time, "Flushing buckets.");
644
645 for (context, am) in self.contexts.iter_mut() {
646 let should_expire_if_empty = match &am.values {
655 MetricValues::Counter(..) => {
656 counter_expire_secs != 0 && am.last_seen + counter_expire_secs < current_time
657 }
658 _ => true,
659 };
660
661 if let MetricValues::Counter(..) = &mut am.values {
667 let expires_at = am.last_seen + counter_expire_secs;
668 for (zv_bucket_start, zero_value) in &zero_value_buckets {
669 if expires_at > *zv_bucket_start {
670 am.values.merge(zero_value.clone());
671 } else {
672 break;
675 }
676 }
677 }
678
679 if let Some(closed_bucket_values) = am.values.split_at_timestamp(split_timestamp) {
688 self.telemetry.increment_flushed(&closed_bucket_values);
689
690 transform_and_push_metric(
692 context.clone(),
693 closed_bucket_values,
694 am.metadata.clone(),
695 bucket_width_secs,
696 &self.hist_config,
697 dispatcher,
698 )
699 .await?;
700 }
701
702 if am.values.is_empty() && should_expire_if_empty {
703 self.telemetry.decrement_contexts(context, &am.values);
704 self.contexts_remove_buf.push(context.clone());
705 }
706 }
707
708 let contexts_len_before = self.contexts.len();
710 for context in self.contexts_remove_buf.drain(..) {
711 self.contexts.remove(&context);
712 }
713 let contexts_len_after = self.contexts.len();
714
715 let contexts_delta = contexts_len_before.saturating_sub(contexts_len_after);
716 let target_contexts_capacity = contexts_len_after.saturating_add(contexts_delta / 2);
717 self.contexts.shrink_to(target_contexts_capacity);
718
719 if self.context_limit_breached && self.contexts.len() < self.context_limit {
720 self.context_limit_breached = false;
721 }
722
723 self.last_flush = current_time;
724
725 Ok(())
726 }
727
728 fn context_limit_breached(&self) -> bool {
729 self.context_limit_breached
730 }
731}
732
733async fn transform_and_push_metric(
734 context: Context, mut values: MetricValues, metadata: MetricMetadata, bucket_width_secs: NonZeroU64,
735 hist_config: &HistogramConfiguration, dispatcher: &mut BufferedDispatcher<'_, EventsBuffer>,
736) -> Result<(), GenericError> {
737 let bucket_width = Duration::from_secs(bucket_width_secs.get());
738
739 match values {
740 MetricValues::Histogram(ref mut points) => {
743 if hist_config.copy_to_distribution() {
745 let sketch_points = points
746 .into_iter()
747 .map(|(ts, hist)| {
748 let mut sketch = DDSketch::default();
749 for sample in hist.samples() {
750 sketch.insert_n(sample.value.into_inner(), sample.weight.0 as u64);
751 }
752 (ts, sketch)
753 })
754 .collect::<SketchPoints>();
755 let distribution_values = MetricValues::distribution(sketch_points);
756 let metric_context = if !hist_config.copy_to_distribution_prefix().is_empty() {
757 context.with_name(format!(
758 "{}{}",
759 hist_config.copy_to_distribution_prefix(),
760 context.name()
761 ))
762 } else {
763 context.clone()
764 };
765 let new_metric = Metric::from_parts(metric_context, distribution_values, metadata.clone());
766 dispatcher.push(Event::Metric(new_metric)).await?;
767 }
768 let mut sorted_points = Vec::new();
773 for (ts, h) in points {
774 sorted_points.push((ts, h.summary_view()));
775 }
776
777 for statistic in hist_config.statistics() {
778 let new_points = sorted_points
779 .iter()
780 .map(|(ts, hs)| (*ts, statistic.value_from_histogram(hs)))
781 .collect::<ScalarPoints>();
782
783 let new_values = if statistic.is_rate_statistic() {
784 MetricValues::rate(new_points, bucket_width)
785 } else {
786 MetricValues::gauge(new_points)
787 };
788
789 let new_metadata = if matches!(statistic, HistogramStatistic::Count) {
791 metadata.clone().with_unit(MetaString::empty())
792 } else {
793 metadata.clone()
794 };
795
796 let new_context = context.with_name(format!("{}.{}", context.name(), statistic.suffix()));
797 let new_metric = Metric::from_parts(new_context, new_values, new_metadata);
798 dispatcher.push(Event::Metric(new_metric)).await?;
799 }
800
801 Ok(())
802 }
803
804 values => {
807 let adjusted_values = counter_values_to_rate(values, bucket_width_secs);
808
809 let metric = Metric::from_parts(context, adjusted_values, metadata);
810 dispatcher.push(Event::Metric(metric)).await
811 }
812 }
813}
814
815fn counter_values_to_rate(values: MetricValues, interval_secs: NonZeroU64) -> MetricValues {
816 match values {
817 MetricValues::Counter(points) => MetricValues::rate(points, Duration::from_secs(interval_secs.get())),
818 values => values,
819 }
820}
821
822const fn align_to_bucket_start(timestamp: u64, bucket_width_secs: NonZeroU64) -> u64 {
823 timestamp - (timestamp % bucket_width_secs.get())
824}
825
826const fn is_bucket_closed(
827 current_time: u64, bucket_start: u64, bucket_width_secs: NonZeroU64, flush_open_buckets: bool,
828) -> bool {
829 (bucket_start + bucket_width_secs.get() - 1) < current_time || flush_open_buckets
848}
849
850#[cfg(test)]
854mod tests {
855 use float_cmp::ApproxEqRatio as _;
856 use saluki_core::{
857 components::ComponentContext,
858 topology::{interconnect::Dispatcher, ComponentId, OutputName},
859 };
860 use saluki_metrics::test::TestRecorder;
861 use stringtheory::MetaString;
862 use tokio::sync::mpsc;
863
864 use super::config::HistogramStatistic;
865 use super::*;
866
867 const BUCKET_WIDTH_SECS: NonZeroU64 = NonZeroU64::new(10).expect("not zero");
868 const BUCKET_WIDTH: Duration = Duration::from_secs(BUCKET_WIDTH_SECS.get());
869 const COUNTER_EXPIRE_SECS: u64 = 20;
870 const COUNTER_EXPIRE: Option<Duration> = Some(Duration::from_secs(COUNTER_EXPIRE_SECS));
871
872 const fn bucket_ts(step: u64) -> u64 {
874 align_to_bucket_start(insert_ts(step), BUCKET_WIDTH_SECS)
875 }
876
877 const fn insert_ts(step: u64) -> u64 {
879 (BUCKET_WIDTH_SECS.get() * (step + 1)) - 2
880 }
881
882 const fn flush_ts(step: u64) -> u64 {
884 BUCKET_WIDTH_SECS.get() * (step + 1)
885 }
886
887 struct DispatcherReceiver {
888 receiver: mpsc::Receiver<EventsBuffer>,
889 }
890
891 impl DispatcherReceiver {
892 fn collect_next(&mut self) -> Vec<Metric> {
893 match self.receiver.try_recv() {
894 Ok(event_buffer) => {
895 let mut metrics = event_buffer
896 .into_iter()
897 .filter_map(|event| event.try_into_metric())
898 .collect::<Vec<Metric>>();
899
900 metrics.sort_by(|a, b| a.context().name().cmp(b.context().name()));
901 metrics
902 }
903 Err(_) => Vec::new(),
904 }
905 }
906 }
907
908 fn build_basic_dispatcher() -> (EventsDispatcher, DispatcherReceiver) {
910 let component_id = ComponentId::try_from("test").expect("should not fail to create component ID");
911 let mut dispatcher = Dispatcher::new(ComponentContext::transform(component_id));
912
913 let (buffer_tx, buffer_rx) = mpsc::channel(1);
914 dispatcher.add_output(OutputName::Default).unwrap();
915 dispatcher
916 .attach_sender_to_output(&OutputName::Default, buffer_tx)
917 .unwrap();
918
919 (dispatcher, DispatcherReceiver { receiver: buffer_rx })
920 }
921
922 async fn get_flushed_metrics(timestamp: u64, state: &mut AggregationState) -> Vec<Metric> {
923 let (dispatcher, mut dispatcher_receiver) = build_basic_dispatcher();
924 let mut buffered_dispatcher = dispatcher.buffered().expect("default output should always exist");
925
926 state
928 .flush(timestamp, true, &mut buffered_dispatcher)
929 .await
930 .expect("should not fail to flush aggregation state");
931
932 buffered_dispatcher
935 .flush()
936 .await
937 .expect("should not fail to flush buffered sender");
938
939 dispatcher_receiver.collect_next()
940 }
941
942 macro_rules! compare_points {
943 (scalar, $expected:expr, $actual:expr, $error_ratio:literal) => {
944 for (idx, (expected_value, actual_value)) in $expected.into_iter().zip($actual.into_iter()).enumerate() {
945 let (expected_ts, expected_point) = expected_value;
946 let (actual_ts, actual_point) = actual_value;
947
948 assert_eq!(
949 expected_ts, actual_ts,
950 "timestamp for value #{} does not match: {:?} (expected) vs {:?} (actual)",
951 idx, expected_ts, actual_ts
952 );
953 assert!(
954 expected_point.approx_eq_ratio(&actual_point, $error_ratio),
955 "point for value #{} does not match: {} (expected) vs {} (actual)",
956 idx,
957 expected_point,
958 actual_point
959 );
960 }
961 };
962 (distribution, $expected:expr, $actual:expr) => {
963 for (idx, (expected_value, actual_value)) in $expected.into_iter().zip($actual.into_iter()).enumerate() {
964 let (expected_ts, expected_sketch) = expected_value;
965 let (actual_ts, actual_sketch) = actual_value;
966
967 assert_eq!(
968 expected_ts, actual_ts,
969 "timestamp for value #{} does not match: {:?} (expected) vs {:?} (actual)",
970 idx, expected_ts, actual_ts
971 );
972 assert_eq!(
973 expected_sketch, actual_sketch,
974 "sketch for value #{} does not match: {:?} (expected) vs {:?} (actual)",
975 idx, expected_sketch, actual_sketch
976 );
977 }
978 };
979 }
980
981 macro_rules! assert_flushed_scalar_metric {
982 ($original:expr, $actual:expr, [$($ts:expr => $value:expr),+]) => {
983 assert_flushed_scalar_metric!($original, $actual, [$($ts => $value),+], error_ratio => 0.000001);
984 };
985 ($original:expr, $actual:expr, [$($ts:expr => $value:expr),+], error_ratio => $error_ratio:literal) => {
986 let actual_metric = $actual;
987
988 assert_eq!($original.context(), actual_metric.context(), "expected context ({}) and actual context ({}) do not match", $original.context(), actual_metric.context());
989
990 let expected_points = ScalarPoints::from([$(($ts, $value)),+]);
991
992 match actual_metric.values() {
993 MetricValues::Counter(ref actual_points) | MetricValues::Gauge(ref actual_points) | MetricValues::Rate(ref actual_points, _) => {
994 assert_eq!(expected_points.len(), actual_points.len(), "expected and actual values have different number of points");
995 compare_points!(scalar, expected_points, actual_points, $error_ratio);
996 },
997 _ => panic!("only counters, rates, and gauges are supported in assert_flushed_scalar_metric"),
998 }
999 };
1000 }
1001
1002 macro_rules! assert_flushed_distribution_metric {
1003 ($original:expr, $actual:expr, [$($ts:expr => $value:expr),+]) => {
1004 assert_flushed_distribution_metric!($original, $actual, [$($ts => $value),+], error_ratio => 0.000001);
1005 };
1006 ($original:expr, $actual:expr, [$($ts:expr => $value:expr),+], error_ratio => $error_ratio:literal) => {
1007 let actual_metric = $actual;
1008
1009 assert_eq!($original.context(), actual_metric.context());
1010
1011 match actual_metric.values() {
1012 MetricValues::Distribution(ref actual_points) => {
1013 let expected_points = SketchPoints::from([$(($ts, $value)),+]);
1014 assert_eq!(expected_points.len(), actual_points.len(), "expected and actual values have different number of points");
1015
1016 compare_points!(distribution, &expected_points, actual_points);
1017 },
1018 _ => panic!("only distributions are supported in assert_flushed_distribution_metric"),
1019 }
1020 };
1021 }
1022
1023 #[test]
1024 fn bucket_is_closed() {
1025 let cases = [
1028 (1000, 995, BUCKET_WIDTH_SECS, false, false),
1030 (1000, 995, BUCKET_WIDTH_SECS, true, true),
1031 (1000, 1000, BUCKET_WIDTH_SECS, false, false),
1033 (1000, 1000, BUCKET_WIDTH_SECS, true, true),
1034 (1010, 1000, BUCKET_WIDTH_SECS, false, true),
1036 (1010, 1000, BUCKET_WIDTH_SECS, true, true),
1037 ];
1038
1039 for (current_time, bucket_start, bucket_width_secs, flush_open_buckets, expected) in cases {
1040 let expected_reason = if expected {
1041 "closed, was open"
1042 } else {
1043 "open, was closed"
1044 };
1045
1046 assert_eq!(
1047 is_bucket_closed(current_time, bucket_start, bucket_width_secs, flush_open_buckets),
1048 expected,
1049 "expected bucket to be {} (current_time={}, bucket_start={}, bucket_width={}, flush_open_buckets={})",
1050 expected_reason,
1051 current_time,
1052 bucket_start,
1053 bucket_width_secs,
1054 flush_open_buckets
1055 );
1056 }
1057 }
1058
1059 #[tokio::test]
1060 async fn context_limit() {
1061 let mut state = AggregationState::new(
1063 BUCKET_WIDTH_SECS,
1064 2,
1065 COUNTER_EXPIRE,
1066 HistogramConfiguration::default(),
1067 Telemetry::noop(),
1068 );
1069
1070 let input_metrics = [
1073 Metric::gauge("metric1", 1.0),
1074 Metric::gauge("metric2", 2.0),
1075 Metric::gauge("metric3", 3.0),
1076 Metric::gauge("metric4", 4.0),
1077 ];
1078
1079 assert!(!state.context_limit_breached());
1080
1081 assert!(state.insert(insert_ts(1), input_metrics[0].clone()));
1082 assert!(state.insert(insert_ts(1), input_metrics[1].clone()));
1083 assert!(!state.context_limit_breached());
1084
1085 assert!(!state.insert(insert_ts(1), input_metrics[2].clone()));
1086 assert!(state.context_limit_breached());
1087 assert!(!state.insert(insert_ts(1), input_metrics[3].clone()));
1088 assert!(state.context_limit_breached());
1089
1090 let flushed_metrics = get_flushed_metrics(flush_ts(1), &mut state).await;
1093 assert_eq!(flushed_metrics.len(), 2);
1094 assert_eq!(input_metrics[0].context(), flushed_metrics[0].context());
1095 assert_eq!(input_metrics[1].context(), flushed_metrics[1].context());
1096 assert!(!state.context_limit_breached());
1097
1098 assert!(state.insert(insert_ts(2), input_metrics[2].clone()));
1101 assert!(state.insert(insert_ts(2), input_metrics[3].clone()));
1102
1103 let flushed_metrics = get_flushed_metrics(flush_ts(2), &mut state).await;
1104 assert_eq!(flushed_metrics.len(), 2);
1105 assert_eq!(input_metrics[2].context(), flushed_metrics[0].context());
1106 assert_eq!(input_metrics[3].context(), flushed_metrics[1].context());
1107 }
1108
1109 #[tokio::test]
1110 async fn context_limit_with_zero_value_counters() {
1111 let mut state = AggregationState::new(
1113 BUCKET_WIDTH_SECS,
1114 2,
1115 COUNTER_EXPIRE,
1116 HistogramConfiguration::default(),
1117 Telemetry::noop(),
1118 );
1119
1120 let input_metrics = [
1122 Metric::counter("metric1", 1.0),
1123 Metric::counter("metric2", 2.0),
1124 Metric::counter("metric3", 3.0),
1125 ];
1126
1127 assert!(state.insert(insert_ts(1), input_metrics[0].clone()));
1128 assert!(state.insert(insert_ts(1), input_metrics[1].clone()));
1129
1130 let flushed_metrics = get_flushed_metrics(flush_ts(1), &mut state).await;
1132 assert_eq!(flushed_metrics.len(), 2);
1133 assert_flushed_scalar_metric!(&input_metrics[0], &flushed_metrics[0], [bucket_ts(1) => 1.0]);
1134 assert_flushed_scalar_metric!(&input_metrics[1], &flushed_metrics[1], [bucket_ts(1) => 2.0]);
1135
1136 let flushed_metrics = get_flushed_metrics(flush_ts(2), &mut state).await;
1138 assert_eq!(flushed_metrics.len(), 2);
1139 assert_flushed_scalar_metric!(&input_metrics[0], &flushed_metrics[0], [bucket_ts(2) => 0.0]);
1140 assert_flushed_scalar_metric!(&input_metrics[1], &flushed_metrics[1], [bucket_ts(2) => 0.0]);
1141
1142 assert!(!state.insert(insert_ts(3), input_metrics[2].clone()));
1144
1145 let flushed_metrics = get_flushed_metrics(flush_ts(3), &mut state).await;
1147 assert_eq!(flushed_metrics.len(), 2);
1148 assert_flushed_scalar_metric!(&input_metrics[0], &flushed_metrics[0], [bucket_ts(3) => 0.0]);
1149 assert_flushed_scalar_metric!(&input_metrics[1], &flushed_metrics[1], [bucket_ts(3) => 0.0]);
1150
1151 let flushed_metrics = get_flushed_metrics(flush_ts(4), &mut state).await;
1154 assert_eq!(flushed_metrics.len(), 0);
1155
1156 assert!(state.insert(insert_ts(5), input_metrics[2].clone()));
1158
1159 let flushed_metrics = get_flushed_metrics(flush_ts(5), &mut state).await;
1160 assert_eq!(flushed_metrics.len(), 1);
1161 assert_flushed_scalar_metric!(&input_metrics[2], &flushed_metrics[0], [bucket_ts(5) => 3.0]);
1162 }
1163
1164 #[tokio::test]
1165 async fn zero_value_counters() {
1166 let mut state = AggregationState::new(
1168 BUCKET_WIDTH_SECS,
1169 10,
1170 COUNTER_EXPIRE,
1171 HistogramConfiguration::default(),
1172 Telemetry::noop(),
1173 );
1174
1175 let input_metrics = [Metric::counter("metric1", 1.0), Metric::counter("metric2", 2.0)];
1177
1178 assert!(state.insert(insert_ts(1), input_metrics[0].clone()));
1179 assert!(state.insert(insert_ts(1), input_metrics[1].clone()));
1180
1181 let flushed_metrics = get_flushed_metrics(flush_ts(1), &mut state).await;
1183 assert_eq!(flushed_metrics.len(), 2);
1184 assert_flushed_scalar_metric!(&input_metrics[0], &flushed_metrics[0], [bucket_ts(1) => 1.0]);
1185 assert_flushed_scalar_metric!(&input_metrics[1], &flushed_metrics[1], [bucket_ts(1) => 2.0]);
1186
1187 let flushed_metrics = get_flushed_metrics(flush_ts(2), &mut state).await;
1189 assert_eq!(flushed_metrics.len(), 2);
1190 assert_flushed_scalar_metric!(&input_metrics[0], &flushed_metrics[0], [bucket_ts(2) => 0.0]);
1191 assert_flushed_scalar_metric!(&input_metrics[1], &flushed_metrics[1], [bucket_ts(2) => 0.0]);
1192
1193 assert!(state.insert(insert_ts(4), input_metrics[0].clone()));
1195 assert!(state.insert(insert_ts(4), input_metrics[1].clone()));
1196
1197 let flushed_metrics = get_flushed_metrics(flush_ts(4), &mut state).await;
1200 assert_eq!(flushed_metrics.len(), 2);
1201 assert_flushed_scalar_metric!(&input_metrics[0], &flushed_metrics[0], [bucket_ts(3) => 0.0, bucket_ts(4) => 1.0]);
1202 assert_flushed_scalar_metric!(&input_metrics[1], &flushed_metrics[1], [bucket_ts(3) => 0.0, bucket_ts(4) => 2.0]);
1203
1204 let flushed_metrics = get_flushed_metrics(flush_ts(7), &mut state).await;
1208 assert_eq!(flushed_metrics.len(), 2);
1209 assert_flushed_scalar_metric!(&input_metrics[0], &flushed_metrics[0], [bucket_ts(5) => 0.0, bucket_ts(6) => 0.0]);
1210 assert_flushed_scalar_metric!(&input_metrics[1], &flushed_metrics[1], [bucket_ts(5) => 0.0, bucket_ts(6) => 0.0]);
1211 }
1212
1213 #[tokio::test]
1214 async fn merge_identical_timestamped_values_on_flush() {
1215 let mut state = AggregationState::new(
1217 BUCKET_WIDTH_SECS,
1218 10,
1219 COUNTER_EXPIRE,
1220 HistogramConfiguration::default(),
1221 Telemetry::noop(),
1222 );
1223
1224 let input_metric = Metric::counter("metric1", [1.0, 2.0, 3.0, 4.0, 5.0]);
1226
1227 assert!(state.insert(insert_ts(1), input_metric.clone()));
1228
1229 let flushed_metrics = get_flushed_metrics(flush_ts(1), &mut state).await;
1232 assert_eq!(flushed_metrics.len(), 1);
1233 assert_flushed_scalar_metric!(&input_metric, &flushed_metrics[0], [bucket_ts(1) => 15.0]);
1234 }
1235
1236 #[tokio::test]
1237 async fn histogram_statistics() {
1238 let hist_config = HistogramConfiguration::from_statistics(
1240 &[
1241 HistogramStatistic::Count,
1242 HistogramStatistic::Sum,
1243 HistogramStatistic::Percentile {
1244 q: 0.5,
1245 suffix: "p50".into(),
1246 },
1247 ],
1248 false,
1249 "".into(),
1250 );
1251 let mut state = AggregationState::new(BUCKET_WIDTH_SECS, 10, COUNTER_EXPIRE, hist_config, Telemetry::noop());
1252
1253 let input_metric = Metric::histogram("metric1", [1.0, 2.0, 3.0, 4.0, 5.0]);
1255 assert!(state.insert(insert_ts(1), input_metric.clone()));
1256
1257 let flushed_metrics = get_flushed_metrics(flush_ts(1), &mut state).await;
1260 assert_eq!(flushed_metrics.len(), 3);
1261
1262 let count_metric = Metric::rate("metric1.count", 0.0, Duration::from_secs(BUCKET_WIDTH_SECS.get()));
1265 let sum_metric = Metric::gauge("metric1.sum", 0.0);
1266 let p50_metric = Metric::gauge("metric1.p50", 0.0);
1267
1268 assert_flushed_scalar_metric!(count_metric, &flushed_metrics[0], [bucket_ts(1) => 5.0]);
1271 assert_flushed_scalar_metric!(p50_metric, &flushed_metrics[1], [bucket_ts(1) => 3.0], error_ratio => 0.0025);
1272 assert_flushed_scalar_metric!(sum_metric, &flushed_metrics[2], [bucket_ts(1) => 15.0]);
1273 }
1274
1275 #[tokio::test]
1276 async fn histogram_statistics_unit_propagation() {
1277 let hist_config = HistogramConfiguration::from_statistics(
1279 &[
1280 HistogramStatistic::Count,
1281 HistogramStatistic::Sum,
1282 HistogramStatistic::Percentile {
1283 q: 0.5,
1284 suffix: "p50".into(),
1285 },
1286 ],
1287 false,
1288 "".into(),
1289 );
1290 let mut state = AggregationState::new(BUCKET_WIDTH_SECS, 10, COUNTER_EXPIRE, hist_config, Telemetry::noop());
1291
1292 let context = Context::from_static_parts("metric1", &[]);
1294 let metadata = MetricMetadata::default().with_unit(MetaString::from_static("millisecond"));
1295 let input_metric = Metric::from_parts(
1296 context,
1297 MetricValues::histogram([1.0_f64, 2.0, 3.0, 4.0, 5.0]),
1298 metadata,
1299 );
1300 assert!(state.insert(insert_ts(1), input_metric));
1301
1302 let flushed_metrics = get_flushed_metrics(flush_ts(1), &mut state).await;
1303 assert_eq!(flushed_metrics.len(), 3);
1304
1305 for metric in &flushed_metrics {
1308 let name = metric.context().name();
1309 if name.ends_with(".count") {
1310 assert_eq!(
1311 metric.metadata().unit(),
1312 None,
1313 "flushed metric '{}' should be dimensionless",
1314 name
1315 );
1316 } else {
1317 assert_eq!(
1318 metric.metadata().unit(),
1319 Some("millisecond"),
1320 "flushed metric '{}' should carry unit='millisecond'",
1321 name
1322 );
1323 }
1324 }
1325 }
1326
1327 #[tokio::test]
1328 async fn distributions() {
1329 let mut state = AggregationState::new(
1331 BUCKET_WIDTH_SECS,
1332 10,
1333 COUNTER_EXPIRE,
1334 HistogramConfiguration::default(),
1335 Telemetry::noop(),
1336 );
1337
1338 let values = [1.0, 2.0, 3.0, 4.0, 5.0];
1340 let input_metric = Metric::distribution("metric1", &values[..]);
1341
1342 assert!(state.insert(insert_ts(1), input_metric.clone()));
1343
1344 let flushed_metrics = get_flushed_metrics(flush_ts(1), &mut state).await;
1346 assert_eq!(flushed_metrics.len(), 1);
1347
1348 assert_flushed_distribution_metric!(&input_metric, &flushed_metrics[0], [bucket_ts(1) => &values[..]]);
1349 }
1350
1351 #[tokio::test]
1352 async fn histogram_copy_to_distribution() {
1353 let hist_config = HistogramConfiguration::from_statistics(
1354 &[
1355 HistogramStatistic::Count,
1356 HistogramStatistic::Sum,
1357 HistogramStatistic::Percentile {
1358 q: 0.5,
1359 suffix: "p50".into(),
1360 },
1361 ],
1362 true,
1363 "dist_prefix.".into(),
1364 );
1365 let mut state = AggregationState::new(BUCKET_WIDTH_SECS, 10, COUNTER_EXPIRE, hist_config, Telemetry::noop());
1366
1367 let values = [1.0, 2.0, 3.0, 4.0, 5.0];
1369 let input_metric = Metric::histogram("metric1", values);
1370 assert!(state.insert(insert_ts(1), input_metric.clone()));
1371
1372 let flushed_metrics = get_flushed_metrics(flush_ts(1), &mut state).await;
1375 assert_eq!(flushed_metrics.len(), 4);
1376
1377 let count_metric = Metric::rate("metric1.count", 0.0, BUCKET_WIDTH);
1380 let sum_metric = Metric::gauge("metric1.sum", 0.0);
1381 let p50_metric = Metric::gauge("metric1.p50", 0.0);
1382 let expected_distribution = Metric::distribution("dist_prefix.metric1", &values[..]);
1383
1384 assert_flushed_distribution_metric!(expected_distribution, &flushed_metrics[0], [bucket_ts(1) => &values[..]]);
1387 assert_flushed_scalar_metric!(count_metric, &flushed_metrics[1], [bucket_ts(1) => 5.0]);
1388 assert_flushed_scalar_metric!(p50_metric, &flushed_metrics[2], [bucket_ts(1) => 3.0], error_ratio => 0.0025);
1389 assert_flushed_scalar_metric!(sum_metric, &flushed_metrics[3], [bucket_ts(1) => 15.0]);
1390 }
1391
1392 #[tokio::test]
1393 async fn nonaggregated_counters_to_rate() {
1394 let counter_value = 42.0;
1395
1396 let mut state = AggregationState::new(
1398 BUCKET_WIDTH_SECS,
1399 10,
1400 COUNTER_EXPIRE,
1401 HistogramConfiguration::default(),
1402 Telemetry::noop(),
1403 );
1404
1405 let input_metric = Metric::counter("metric1", counter_value);
1407 assert!(state.insert(insert_ts(1), input_metric.clone()));
1408
1409 let flushed_metrics = get_flushed_metrics(flush_ts(1), &mut state).await;
1412 assert_eq!(flushed_metrics.len(), 1);
1413 let flushed_metric = &flushed_metrics[0];
1414
1415 assert_flushed_scalar_metric!(&input_metric, flushed_metric, [bucket_ts(1) => counter_value]);
1416 assert_eq!(flushed_metric.values().as_str(), "rate");
1417 }
1418
1419 #[tokio::test]
1420 async fn preaggregated_counters_to_rate() {
1421 let counter_value = 42.0;
1422 let timestamp = 123456;
1423
1424 let mut batcher = PassthroughBatcher::new(Duration::from_nanos(1), BUCKET_WIDTH_SECS, Telemetry::noop()).await;
1426 let (dispatcher, mut dispatcher_receiver) = build_basic_dispatcher();
1427
1428 let input_metric = Metric::counter("metric1", (timestamp, counter_value));
1430 batcher.push_metric(input_metric.clone(), &dispatcher).await;
1431
1432 batcher.try_flush(&dispatcher).await;
1435
1436 let mut flushed_metrics = dispatcher_receiver.collect_next();
1437 assert_eq!(flushed_metrics.len(), 1);
1438 assert_eq!(
1439 Metric::rate("metric1", (timestamp, counter_value), BUCKET_WIDTH),
1440 flushed_metrics.remove(0)
1441 );
1442 }
1443
1444 #[tokio::test]
1445 async fn telemetry() {
1446 let recorder = TestRecorder::default();
1453 let _local = metrics::set_default_local_recorder(&recorder);
1454
1455 let builder = MetricsBuilder::default();
1456 let telemetry = Telemetry::new(&builder);
1457
1458 let mut state = AggregationState::new(
1459 BUCKET_WIDTH_SECS,
1460 2,
1461 COUNTER_EXPIRE,
1462 HistogramConfiguration::default(),
1463 telemetry,
1464 );
1465
1466 assert_eq!(recorder.gauge("aggregate_active_contexts"), Some(0.0));
1468 assert_eq!(recorder.counter("aggregate_passthrough_metrics_total"), Some(0));
1469 assert_eq!(
1470 recorder.counter(("component_events_dropped_total", &[("intentional", "true")])),
1471 Some(0)
1472 );
1473 for metric_type in &["counter", "gauge", "rate", "set", "histogram", "distribution"] {
1474 assert_eq!(
1475 recorder.gauge(("aggregate_active_contexts_by_type", &[("metric_type", *metric_type)])),
1476 Some(0.0)
1477 );
1478 }
1479
1480 assert!(state.insert(insert_ts(1), Metric::counter("metric1", 42.0)));
1482 assert_eq!(recorder.gauge("aggregate_active_contexts"), Some(1.0));
1483 assert_eq!(
1484 recorder.gauge(("aggregate_active_contexts_by_type", &[("metric_type", "counter")])),
1485 Some(1.0)
1486 );
1487 assert_eq!(recorder.counter("aggregate_passthrough_metrics_total"), Some(0));
1488
1489 assert!(state.insert(insert_ts(1), Metric::gauge("metric2", (insert_ts(1), 42.0))));
1491 assert_eq!(recorder.gauge("aggregate_active_contexts"), Some(2.0));
1492 assert_eq!(
1493 recorder.gauge(("aggregate_active_contexts_by_type", &[("metric_type", "gauge")])),
1494 Some(1.0)
1495 );
1496
1497 assert!(!state.insert(insert_ts(1), Metric::counter("metric3", 42.0)));
1499 assert_eq!(recorder.gauge("aggregate_active_contexts"), Some(2.0));
1500 assert_eq!(
1501 recorder.gauge(("aggregate_active_contexts_by_type", &[("metric_type", "counter")])),
1502 Some(1.0)
1503 );
1504
1505 let _ = get_flushed_metrics(flush_ts(1), &mut state).await;
1508 assert_eq!(recorder.gauge("aggregate_active_contexts"), Some(1.0));
1509 assert_eq!(
1510 recorder.gauge(("aggregate_active_contexts_by_type", &[("metric_type", "counter")])),
1511 Some(1.0)
1512 );
1513 assert_eq!(
1514 recorder.gauge(("aggregate_active_contexts_by_type", &[("metric_type", "gauge")])),
1515 Some(0.0)
1516 );
1517 }
1518}
1519
1520#[cfg(test)]
1521mod config_smoke {
1522 use serde_json::json;
1523
1524 use super::AggregateConfiguration;
1525 use crate::config_registry::structs;
1526 use crate::config_registry::test_support::run_config_smoke_tests;
1527
1528 #[tokio::test]
1529 async fn smoke_test() {
1530 run_config_smoke_tests(
1533 structs::AGGREGATE_CONFIGURATION,
1534 &[
1535 "aggregate_flush_interval.nanos",
1536 "aggregate_passthrough_idle_flush_timeout.nanos",
1537 ],
1538 json!({}),
1539 |cfg| {
1540 cfg.as_typed::<AggregateConfiguration>()
1541 .expect("AggregateConfiguration should deserialize")
1542 },
1543 )
1544 .await
1545 }
1546}