Skip to main content

saluki_components/transforms/aggregate/
mod.rs

1use std::{
2    num::NonZeroU64,
3    time::{Duration, Instant},
4};
5
6use async_trait::async_trait;
7use ddsketch::DDSketch;
8use hashbrown::{hash_map::Entry, HashMap};
9use memory_accounting::{MemoryBounds, MemoryBoundsBuilder, UsageExpr};
10use saluki_common::time::get_unix_timestamp;
11use saluki_config::GenericConfiguration;
12use saluki_context::Context;
13use saluki_core::{
14    components::{transforms::*, ComponentContext},
15    data_model::event::{metric::*, Event, EventType},
16    observability::ComponentMetricsExt as _,
17    topology::{interconnect::BufferedDispatcher, OutputDefinition},
18    topology::{EventsBuffer, EventsDispatcher},
19};
20use saluki_error::GenericError;
21use saluki_metrics::MetricsBuilder;
22use serde::Deserialize;
23use smallvec::SmallVec;
24use tokio::{
25    select,
26    time::{interval, interval_at},
27};
28use tracing::{debug, error, info, trace, warn};
29
30mod telemetry;
31use self::telemetry::Telemetry;
32
33mod config;
34use self::config::HistogramConfiguration;
35
36const PASSTHROUGH_IDLE_FLUSH_CHECK_INTERVAL: Duration = Duration::from_secs(2);
37
38const fn default_window_duration() -> Duration {
39    Duration::from_secs(10)
40}
41
42const fn default_primary_flush_interval() -> Duration {
43    Duration::from_secs(15)
44}
45
46const fn default_context_limit() -> usize {
47    1_000_000
48}
49
50const fn default_counter_expiry_seconds() -> Option<u64> {
51    Some(300)
52}
53
54const fn default_passthrough_timestamped_metrics() -> bool {
55    true
56}
57
58const fn default_passthrough_idle_flush_timeout() -> Duration {
59    Duration::from_secs(1)
60}
61
62/// Aggregate transform.
63///
64/// Aggregates metrics into fixed-size windows, flushing them at a regular interval.
65///
66/// ## Zero-value counters
67///
68/// When metrics are aggregated and then flushed, they are typically removed entirely from the aggregation state. Unless
69/// they are updated again, they will not be emitted again. However, for counters, a slightly different approach is
70/// taken by tracking "zero-value" counters.
71///
72/// Counters are aggregated and flushed normally. However, when flushed, counters are added to a list of "zero-value"
73/// counters, and if those counters are not updated again, the transform emits a copy of the counter with a value of
74/// zero. It does this until the counter is updated again, or the zero-value counter expires (no updates), whichever
75/// comes first.
76///
77/// This provides a continuity in the output of a counter, from the perspective of a downstream system, when counters
78/// are otherwise sparse. The expiration period is configurable, and allows a trade-off in how sparse/infrequent the
79/// updates to counters can be versus how long it takes for counters that don't exist anymore to actually cease to be
80/// emitted.
81#[derive(Deserialize)]
82#[cfg_attr(test, derive(Debug, PartialEq, serde::Serialize))]
83pub struct AggregateConfiguration {
84    /// Size of the aggregation window.
85    ///
86    /// Metrics are aggregated into fixed-size windows, such that all updates to the same metric within a window are
87    /// aggregated into a single metric. The window size controls how efficiently metrics are aggregated, and in turn,
88    /// how many data points are emitted downstream.
89    ///
90    /// Defaults to 10 seconds.
91    #[serde(rename = "aggregate_window_duration", default = "default_window_duration")]
92    window_duration: Duration,
93
94    /// How often to flush buckets.
95    ///
96    /// This represents a trade-off between the savings in network bandwidth (sending fewer requests to downstream
97    /// systems, etc) and the frequency of updates (how often updates to a metric are emitted).
98    ///
99    /// Defaults to 15 seconds.
100    #[serde(rename = "aggregate_flush_interval", default = "default_primary_flush_interval")]
101    primary_flush_interval: Duration,
102
103    /// Maximum number of contexts to aggregate per window.
104    ///
105    /// A context is the unique combination of a metric name and its set of tags. For example,
106    /// `metric.name.here{tag1=A,tag2=B}` represents a single context, and would be different than
107    /// `metric.name.here{tag1=A,tag2=C}`.
108    ///
109    /// When the maximum number of contexts is reached in the current aggregation window, additional metrics are dropped
110    /// until the next window starts.
111    ///
112    /// Defaults to 1,000,000.
113    #[serde(rename = "aggregate_context_limit", default = "default_context_limit")]
114    context_limit: usize,
115
116    /// Whether to flush open buckets when stopping the transform.
117    ///
118    /// Normally, open buckets (a bucket whose end has not yet occurred) are not flushed when the transform is stopped.
119    /// This is done to avoid the chance of flushing a partial window, restarting the process, and then flushing the
120    /// same window again. Downstream systems sometimes cannot cope with this gracefully, as there is no way to
121    /// determine that it is an incremental update, and so they treat it as an absolute update, overwriting the
122    /// previously flushed value.
123    ///
124    /// In cases where flushing all outstanding data is paramount, this can be enabled.
125    ///
126    /// Defaults to `false`.
127    #[serde(
128        rename = "aggregate_flush_open_windows",
129        alias = "dogstatsd_flush_incomplete_buckets",
130        default
131    )]
132    flush_open_windows: bool,
133
134    /// How long to keep idle counters alive after they've been flushed, in seconds.
135    ///
136    /// When metrics are flushed, they are removed from the aggregation state. However, if a counter expiration is set,
137    /// counters will be kept alive in an "idle" state. For as long as a counter is idle, but not yet expired, a zero
138    /// value will be emitted for it during each flush. This allows more gracefully handling sparse counters, where
139    /// updates are infrequent but leaving gaps in the time series would be undesirable from a user experience
140    /// perspective.
141    ///
142    /// After a counter has been idle (no updates) for longer than the expiry period, it will be completely removed and
143    /// no further zero values will be emitted.
144    ///
145    /// Defaults to 300 seconds (5 minutes). Setting a value of `0` disables idle counter keep-alive.
146    #[serde(alias = "dogstatsd_expiry_seconds", default = "default_counter_expiry_seconds")]
147    counter_expiry_seconds: Option<u64>,
148
149    /// Whether or not to immediately forward (passthrough) metrics with pre-defined timestamps.
150    ///
151    /// When enabled, this causes the aggregator to immediately forward metrics that already have a timestamp present.
152    /// Only metrics without a timestamp will be aggregated. This can be useful when metrics are already pre-aggregated
153    /// client-side and both timeliness and memory efficiency are paramount, as it avoids the overhead of aggregating
154    /// within the pipeline.
155    ///
156    /// Defaults to `true`.
157    #[serde(
158        rename = "dogstatsd_no_aggregation_pipeline",
159        default = "default_passthrough_timestamped_metrics"
160    )]
161    passthrough_timestamped_metrics: bool,
162
163    /// How often to flush buffered passthrough metrics.
164    ///
165    /// While passthrough metrics are not re-aggregated by the transform, they will still be temporarily buffered in
166    /// order to optimize the efficiency of processing them in the next component. This setting controls the maximum
167    /// amount of time that passthrough metrics will be buffered before being forwarded.
168    ///
169    /// Defaults to 1 seconds.
170    #[serde(
171        rename = "aggregate_passthrough_idle_flush_timeout",
172        default = "default_passthrough_idle_flush_timeout"
173    )]
174    passthrough_idle_flush_timeout: Duration,
175
176    /// Histogram aggregation configuration.
177    ///
178    /// Controls the aggregates/percentiles that are generated for distributions in "histogram" mode (client-side
179    /// distribution aggregation).
180    #[serde(flatten)]
181    hist_config: HistogramConfiguration,
182}
183
184impl AggregateConfiguration {
185    /// Creates a new `AggregateConfiguration` from the given configuration.
186    pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
187        Ok(config.as_typed()?)
188    }
189
190    /// Creates a new `AggregateConfiguration` with default values.
191    pub fn with_defaults() -> Self {
192        Self {
193            window_duration: default_window_duration(),
194            primary_flush_interval: default_primary_flush_interval(),
195            context_limit: default_context_limit(),
196            flush_open_windows: false,
197            counter_expiry_seconds: default_counter_expiry_seconds(),
198            passthrough_timestamped_metrics: default_passthrough_timestamped_metrics(),
199            passthrough_idle_flush_timeout: default_passthrough_idle_flush_timeout(),
200            hist_config: HistogramConfiguration::default(),
201        }
202    }
203}
204
205#[async_trait]
206impl TransformBuilder for AggregateConfiguration {
207    async fn build(&self, context: ComponentContext) -> Result<Box<dyn Transform + Send>, GenericError> {
208        let metrics_builder = MetricsBuilder::from_component_context(&context);
209        let telemetry = Telemetry::new(&metrics_builder);
210
211        let state = AggregationState::new(
212            self.window_duration,
213            self.context_limit,
214            self.counter_expiry_seconds.filter(|s| *s != 0).map(Duration::from_secs),
215            self.hist_config.clone(),
216            telemetry.clone(),
217        );
218
219        let passthrough_batcher = PassthroughBatcher::new(
220            self.passthrough_idle_flush_timeout,
221            self.window_duration,
222            telemetry.clone(),
223        )
224        .await;
225
226        Ok(Box::new(Aggregate {
227            state,
228            telemetry,
229            primary_flush_interval: self.primary_flush_interval,
230            flush_open_windows: self.flush_open_windows,
231            passthrough_batcher,
232            passthrough_timestamped_metrics: self.passthrough_timestamped_metrics,
233        }))
234    }
235
236    fn input_event_type(&self) -> EventType {
237        EventType::Metric
238    }
239
240    fn outputs(&self) -> &[OutputDefinition<EventType>] {
241        static OUTPUTS: &[OutputDefinition<EventType>] = &[OutputDefinition::default_output(EventType::Metric)];
242        OUTPUTS
243    }
244}
245
246impl MemoryBounds for AggregateConfiguration {
247    fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
248        // TODO: While we account for the aggregation state map accurately, what we don't currently account for is the
249        // fact that a metric could have multiple distinct values. For the common pipeline of metrics in via DogStatsD,
250        // this generally shouldn't be a problem because the values don't have a timestamp, so they get aggregated into
251        // the same bucket, leading to two values per `MetricValues` at most, which is already baked into the size of
252        // `MetricValues` due to using `SmallVec`.
253        //
254        // However, there could be many more values in a single metric, and we don't account for that.
255
256        builder
257            .minimum()
258            // Capture the size of the heap allocation when the component is built.
259            .with_single_value::<Aggregate>("component struct");
260        builder
261            .firm()
262            // Account for the aggregation state map, where we map contexts to the merged metric.
263            .with_expr(UsageExpr::product(
264                "aggregation state map",
265                UsageExpr::sum(
266                    "context map entry",
267                    UsageExpr::struct_size::<Context>("context"),
268                    UsageExpr::struct_size::<AggregatedMetric>("aggregated metric"),
269                ),
270                UsageExpr::config("aggregate_context_limit", self.context_limit),
271            ));
272    }
273}
274
275pub struct Aggregate {
276    state: AggregationState,
277    telemetry: Telemetry,
278    primary_flush_interval: Duration,
279    flush_open_windows: bool,
280    passthrough_batcher: PassthroughBatcher,
281    passthrough_timestamped_metrics: bool,
282}
283
284#[async_trait]
285impl Transform for Aggregate {
286    async fn run(mut self: Box<Self>, mut context: TransformContext) -> Result<(), GenericError> {
287        let mut health = context.take_health_handle();
288
289        let mut primary_flush = interval_at(
290            tokio::time::Instant::now() + self.primary_flush_interval,
291            self.primary_flush_interval,
292        );
293        let mut final_primary_flush = false;
294
295        let passthrough_flush = interval(PASSTHROUGH_IDLE_FLUSH_CHECK_INTERVAL);
296
297        health.mark_ready();
298        debug!("Aggregation transform started.");
299
300        tokio::pin!(passthrough_flush);
301
302        loop {
303            select! {
304                _ = health.live() => continue,
305                _ = primary_flush.tick() => {
306                    // We've reached the end of the current window. Flush our aggregation state and forward the metrics
307                    // onwards. Regardless of whether any metrics were aggregated, we always update the aggregation
308                    // state to track the start time of the current aggregation window.
309                    if !self.state.is_empty() {
310                        debug!("Flushing aggregated metrics...");
311
312                        let should_flush_open_windows = final_primary_flush && self.flush_open_windows;
313
314                        // Remember if the context limit had been surpassed before this flush.
315                        let was_breached = self.state.context_limit_breached();
316
317                        let mut dispatcher = context.dispatcher().buffered().expect("default output should always exist");
318                        if let Err(e) = self.state.flush(get_unix_timestamp(), should_flush_open_windows, &mut dispatcher).await {
319                            error!(error = %e, "Failed to flush aggregation state.");
320                        }
321
322                        self.telemetry.increment_flushes();
323
324                        // If flush recovered us from a breach, log the recovery.
325                        if was_breached && !self.state.context_limit_breached() {
326                            info!("Context limit no longer exceeded, metrics are being accepted again.");
327                        }
328
329                        match dispatcher.flush().await {
330                            Ok(aggregated_events) => debug!(aggregated_events, "Dispatched events."),
331                            Err(e) => error!(error = %e, "Failed to flush aggregated events."),
332                        }
333                    }
334
335                    // If this is the final flush, we break out of the loop.
336                    if final_primary_flush {
337                        debug!("All aggregation complete.");
338                        break
339                    }
340                },
341                _ = passthrough_flush.tick() => self.passthrough_batcher.try_flush(context.dispatcher()).await,
342                maybe_events = context.events().next(), if !final_primary_flush => match maybe_events {
343                    Some(events) => {
344                        trace!(events_len = events.len(), "Received events.");
345
346                        let current_time = get_unix_timestamp();
347                        let mut processed_passthrough_metrics = false;
348
349                        for event in events {
350                            if let Some(metric) = event.try_into_metric() {
351                                let metric = if self.passthrough_timestamped_metrics {
352                                    // Try splitting out any timestamped values, and if we have any, we'll buffer them
353                                    // separately and process the remaining nontimestamped metric (if any) by
354                                    // aggregating it like normal.
355                                    let (maybe_timestamped_metric, maybe_nontimestamped_metric) = try_split_timestamped_values(metric);
356
357                                    // If we have a timestamped metric, then batch it up out-of-band.
358                                    if let Some(timestamped_metric) = maybe_timestamped_metric {
359                                        self.passthrough_batcher.push_metric(timestamped_metric, context.dispatcher()).await;
360                                        processed_passthrough_metrics = true;
361                                    }
362
363                                    // If we have an nontimestamped metric, we'll process it like normal.
364                                    //
365                                    // Otherwise, continue to the next event.
366                                    match maybe_nontimestamped_metric {
367                                        Some(metric) => metric,
368                                        None => continue,
369                                    }
370                                } else {
371                                    metric
372                                };
373
374                                let was_breached = self.state.context_limit_breached();
375                                if !self.state.insert(current_time, metric) {
376                                    trace!("Dropping metric due to context limit.");
377                                    if !was_breached {
378                                        // First drop since the last recovery — emit a single warning.
379                                        warn!(context_limit = self.state.context_limit, "Context limit reached, \
380                                        dropping metrics. Consider increasing `aggregate_context_limit`.");
381                                    }
382                                    self.telemetry.increment_events_dropped();
383                                }
384                            }
385                        }
386
387                        if processed_passthrough_metrics {
388                            self.passthrough_batcher.update_last_processed_at();
389                        }
390                    },
391                    None => {
392                        // We've reached the end of our input stream, so mark ourselves for a final flush and reset the
393                        // interval so it ticks immediately on the next loop iteration.
394                        final_primary_flush = true;
395                        primary_flush.reset_immediately();
396
397                        debug!("Aggregation transform stopping...");
398                    }
399                },
400            }
401        }
402
403        // Do a final flush of any timestamped metrics that we've buffered up.
404        self.passthrough_batcher.try_flush(context.dispatcher()).await;
405
406        debug!("Aggregation transform stopped.");
407
408        Ok(())
409    }
410}
411
412fn try_split_timestamped_values(mut metric: Metric) -> (Option<Metric>, Option<Metric>) {
413    if metric.values().all_timestamped() {
414        (Some(metric), None)
415    } else if metric.values().any_timestamped() {
416        // Only _some_ of the values are timestamped, so we'll split the timestamped values into a new metric.
417        let new_metric_values = metric.values_mut().split_timestamped();
418        let new_metric = Metric::from_parts(metric.context().clone(), new_metric_values, metric.metadata().clone());
419
420        (Some(new_metric), Some(metric))
421    } else {
422        // No timestamped values, so we need to aggregate this metric.
423        (None, Some(metric))
424    }
425}
426
427struct PassthroughBatcher {
428    active_buffer: EventsBuffer,
429    active_buffer_start: Instant,
430    last_processed_at: Instant,
431    idle_flush_timeout: Duration,
432    bucket_width: Duration,
433    telemetry: Telemetry,
434}
435
436impl PassthroughBatcher {
437    async fn new(idle_flush_timeout: Duration, bucket_width: Duration, telemetry: Telemetry) -> Self {
438        let active_buffer = EventsBuffer::default();
439
440        Self {
441            active_buffer,
442            active_buffer_start: Instant::now(),
443            last_processed_at: Instant::now(),
444            idle_flush_timeout,
445            bucket_width,
446            telemetry,
447        }
448    }
449
450    async fn push_metric(&mut self, metric: Metric, dispatcher: &EventsDispatcher) {
451        // Convert counters to rates before we batch them up.
452        //
453        // This involves specifying the rate interval as the bucket width of the aggregate transform itself, which when
454        // you say it out loud is sort of confusing and nonsensical since the whole point is that these are
455        // _pre-aggregated_ metrics but we have to match the behavior of the Datadog Agent. ¯\_(ツ)_/¯
456        let (context, values, metadata) = metric.into_parts();
457        let adjusted_values = counter_values_to_rate(values, self.bucket_width);
458        let metric = Metric::from_parts(context, adjusted_values, metadata);
459
460        // Try pushing the metric into our active buffer.
461        //
462        // If our active buffer is full, then we'll flush the buffer, grab a new one, and push the metric into it.
463        if let Some(event) = self.active_buffer.try_push(Event::Metric(metric)) {
464            debug!("Passthrough event buffer was full. Flushing...");
465            self.dispatch_events(dispatcher).await;
466
467            if self.active_buffer.try_push(event).is_some() {
468                error!("Event buffer is full even after dispatching events. Dropping event.");
469                self.telemetry.increment_events_dropped();
470                return;
471            }
472        }
473
474        // If this is the first metric in the buffer, we've started a new batch, so track when it started.
475        if self.active_buffer.len() == 1 {
476            self.active_buffer_start = Instant::now();
477        }
478
479        self.telemetry.increment_passthrough_metrics();
480    }
481
482    fn update_last_processed_at(&mut self) {
483        // We expose this as a standalone method, rather than just doing it automatically in `push_metric`, because
484        // otherwise we might be calling this 10-20K times per second, instead of simply doing it after the end of each
485        // input event buffer in the transform's main loop, which should be much less frequent.
486        self.last_processed_at = Instant::now();
487    }
488
489    async fn try_flush(&mut self, dispatcher: &EventsDispatcher) {
490        // If our active buffer isn't empty, and we've exceeded our idle flush timeout, then flush the buffer.
491        if !self.active_buffer.is_empty() && self.last_processed_at.elapsed() >= self.idle_flush_timeout {
492            debug!("Passthrough processing exceeded idle flush timeout. Flushing...");
493
494            self.dispatch_events(dispatcher).await;
495        }
496    }
497
498    async fn dispatch_events(&mut self, dispatcher: &EventsDispatcher) {
499        if !self.active_buffer.is_empty() {
500            let unaggregated_events = self.active_buffer.len();
501
502            // Track how long this batch was alive for.
503            let batch_duration = self.active_buffer_start.elapsed();
504            self.telemetry.record_passthrough_batch_duration(batch_duration);
505
506            self.telemetry.increment_passthrough_flushes();
507
508            // Swap our active buffer with a new, empty one, and then forward the old one.
509            let new_active_buffer = EventsBuffer::default();
510            let old_active_buffer = std::mem::replace(&mut self.active_buffer, new_active_buffer);
511
512            match dispatcher.dispatch(old_active_buffer).await {
513                Ok(()) => debug!(unaggregated_events, "Dispatched events."),
514                Err(e) => error!(error = %e, "Failed to flush unaggregated events."),
515            }
516        }
517    }
518}
519
520#[derive(Clone)]
521struct AggregatedMetric {
522    values: MetricValues,
523    metadata: MetricMetadata,
524    last_seen: u64,
525}
526
527struct AggregationState {
528    contexts: HashMap<Context, AggregatedMetric, foldhash::quality::RandomState>,
529    contexts_remove_buf: Vec<Context>,
530    context_limit: usize,
531    bucket_width_secs: u64,
532    counter_expire_secs: Option<NonZeroU64>,
533    last_flush: u64,
534    hist_config: HistogramConfiguration,
535    telemetry: Telemetry,
536    /// Tracks whether the context limit has been breached. Starts out as `false`. Set to `true` on the first dropped
537    /// metric. Reset to `false` when the context count drops below the limit during flush.
538    context_limit_breached: bool,
539}
540
541impl AggregationState {
542    fn new(
543        bucket_width: Duration, context_limit: usize, counter_expiration: Option<Duration>,
544        hist_config: HistogramConfiguration, telemetry: Telemetry,
545    ) -> Self {
546        let counter_expire_secs = counter_expiration.map(|d| d.as_secs()).and_then(NonZeroU64::new);
547
548        Self {
549            contexts: HashMap::default(),
550            contexts_remove_buf: Vec::new(),
551            context_limit,
552            bucket_width_secs: bucket_width.as_secs(),
553            counter_expire_secs,
554            last_flush: 0,
555            hist_config,
556            telemetry,
557            context_limit_breached: false,
558        }
559    }
560
561    fn is_empty(&self) -> bool {
562        self.contexts.is_empty()
563    }
564
565    fn insert(&mut self, timestamp: u64, metric: Metric) -> bool {
566        // If we haven't seen this context yet, and it would put us over the limit to insert it, then return early.
567        if !self.contexts.contains_key(metric.context()) && self.contexts.len() >= self.context_limit {
568            self.context_limit_breached = true;
569            return false;
570        }
571
572        let (context, mut values, metadata) = metric.into_parts();
573
574        // Collapse all non-timestamped values into a single timestamped value.
575        //
576        // We do this pre-aggregation step because unless we're merging into an existing context, we'll end up with
577        // however many values were in the original metric instead of full aggregated values.
578        let bucket_ts = align_to_bucket_start(timestamp, self.bucket_width_secs);
579        values.collapse_non_timestamped(bucket_ts);
580
581        trace!(
582            bucket_ts,
583            kind = values.as_str(),
584            "Inserting metric into aggregation state."
585        );
586
587        // If we're already tracking this context, update the last seen time and merge the new values into the existing
588        // values. Otherwise, create a new entry.
589        match self.contexts.entry(context) {
590            Entry::Occupied(mut entry) => {
591                let aggregated = entry.get_mut();
592
593                // We ignore metadata changes within a flush interval to keep things simple.
594                aggregated.last_seen = timestamp;
595                aggregated.values.merge(values);
596            }
597            Entry::Vacant(entry) => {
598                self.telemetry.increment_contexts(entry.key(), &values);
599
600                entry.insert(AggregatedMetric {
601                    values,
602                    metadata,
603                    last_seen: timestamp,
604                });
605            }
606        }
607
608        true
609    }
610
611    async fn flush(
612        &mut self, current_time: u64, flush_open_buckets: bool, dispatcher: &mut BufferedDispatcher<'_, EventsBuffer>,
613    ) -> Result<(), GenericError> {
614        let bucket_width_secs = self.bucket_width_secs;
615        let counter_expire_secs = self.counter_expire_secs.map(|d| d.get()).unwrap_or(0);
616
617        // We want our split timestamp to be before the start of the current bucket, which ensures any timestamp that is
618        // less than or equal to `split_timestamp` resides in a closed bucket.
619        let split_timestamp = align_to_bucket_start(current_time, bucket_width_secs).saturating_sub(1);
620
621        // Calculate the buckets we need to potentially generate zero-value counters for.
622        //
623        // We only need to do this if we've flushed before, since we won't have any knowledge of which counters are idle
624        // or not until that happens.
625        let mut zero_value_buckets = SmallVec::<[(u64, MetricValues); 4]>::new();
626        if self.last_flush != 0 {
627            let start = align_to_bucket_start(self.last_flush, bucket_width_secs);
628
629            for bucket_start in (start..current_time).step_by(bucket_width_secs as usize) {
630                if is_bucket_closed(current_time, bucket_start, bucket_width_secs, flush_open_buckets) {
631                    zero_value_buckets.push((bucket_start, MetricValues::counter((bucket_start, 0.0))));
632                }
633            }
634        }
635
636        // Iterate over each context we're tracking, and flush any values that are in buckets which are now closed.
637        debug!(timestamp = current_time, "Flushing buckets.");
638
639        for (context, am) in self.contexts.iter_mut() {
640            // Figure out if we should remove this metric or not if it has no values in open buckets.
641            //
642            // We have a special carve-out for counters here, which we have the ability to keep alive after they are
643            // flushed, based on a configured expiration period. This allows us to continue emitting a zero value for
644            // counters when they're idle, which can make them appear "live" in downstream systems, even when they're
645            // not.
646            //
647            // This is useful for sparsely-updated counters.
648            let should_expire_if_empty = match &am.values {
649                MetricValues::Counter(..) => {
650                    counter_expire_secs != 0 && am.last_seen + counter_expire_secs < current_time
651                }
652                _ => true,
653            };
654
655            // If we're dealing with a counter, we'll merge in our calculated set of zero values. We only merge in the
656            // values that represent now-closed buckets.
657            //
658            // This is also safe to do even when there are real values in those buckets since adding zero to anything is
659            // a no-op from the perspective of what we end up flushing, and it doesn't mess with the "last seen" time.
660            if let MetricValues::Counter(..) = &mut am.values {
661                let expires_at = am.last_seen + counter_expire_secs;
662                for (zv_bucket_start, zero_value) in &zero_value_buckets {
663                    if expires_at > *zv_bucket_start {
664                        am.values.merge(zero_value.clone());
665                    } else {
666                        // Since zero-value buckets are in order, we can break early if this bucket is past the
667                        // expiration cutoff of the counter. No other bucket will be within the expiration range.
668                        break;
669                    }
670                }
671            }
672
673            // Finally, figure out if the current metric can be removed.
674            //
675            // For any metric with values that are in open buckets, we split off the values that are in closed buckets
676            // and keep the metric alive. When all the values are in closed buckets, or there are no values, we'll
677            // remove the metric if `should_remove_if_empty` is `true`.
678            //
679            // This means we'll always remove all-closed/empty non-counter metrics, and we _may_ remove all-closed/empty
680            // counters.
681            if let Some(closed_bucket_values) = am.values.split_at_timestamp(split_timestamp) {
682                self.telemetry.increment_flushed(&closed_bucket_values);
683
684                // We got some closed bucket values, so flush those out.
685                transform_and_push_metric(
686                    context.clone(),
687                    closed_bucket_values,
688                    am.metadata.clone(),
689                    bucket_width_secs,
690                    &self.hist_config,
691                    dispatcher,
692                )
693                .await?;
694            }
695
696            if am.values.is_empty() && should_expire_if_empty {
697                self.telemetry.decrement_contexts(context, &am.values);
698                self.contexts_remove_buf.push(context.clone());
699            }
700        }
701
702        // Remove any contexts that were marked as needing to be removed.
703        let contexts_len_before = self.contexts.len();
704        for context in self.contexts_remove_buf.drain(..) {
705            self.contexts.remove(&context);
706        }
707        let contexts_len_after = self.contexts.len();
708
709        let contexts_delta = contexts_len_before.saturating_sub(contexts_len_after);
710        let target_contexts_capacity = contexts_len_after.saturating_add(contexts_delta / 2);
711        self.contexts.shrink_to(target_contexts_capacity);
712
713        if self.context_limit_breached && self.contexts.len() < self.context_limit {
714            self.context_limit_breached = false;
715        }
716
717        self.last_flush = current_time;
718
719        Ok(())
720    }
721
722    fn context_limit_breached(&self) -> bool {
723        self.context_limit_breached
724    }
725}
726
727async fn transform_and_push_metric(
728    context: Context, mut values: MetricValues, metadata: MetricMetadata, bucket_width_secs: u64,
729    hist_config: &HistogramConfiguration, dispatcher: &mut BufferedDispatcher<'_, EventsBuffer>,
730) -> Result<(), GenericError> {
731    let bucket_width = Duration::from_secs(bucket_width_secs);
732
733    match values {
734        // If we're dealing with a histogram, we calculate a configured set of aggregates/percentiles from it, and emit
735        // them as individual metrics.
736        MetricValues::Histogram(ref mut points) => {
737            // Convert histogram to distribution
738            if hist_config.copy_to_distribution() {
739                let sketch_points = points
740                    .into_iter()
741                    .map(|(ts, hist)| {
742                        let mut sketch = DDSketch::default();
743                        for sample in hist.samples() {
744                            sketch.insert_n(sample.value.into_inner(), sample.weight);
745                        }
746                        (ts, sketch)
747                    })
748                    .collect::<SketchPoints>();
749                let distribution_values = MetricValues::distribution(sketch_points);
750                let metric_context = if !hist_config.copy_to_distribution_prefix().is_empty() {
751                    context.with_name(format!(
752                        "{}{}",
753                        hist_config.copy_to_distribution_prefix(),
754                        context.name()
755                    ))
756                } else {
757                    context.clone()
758                };
759                let new_metric = Metric::from_parts(metric_context, distribution_values, metadata.clone());
760                dispatcher.push(Event::Metric(new_metric)).await?;
761            }
762            // We collect our histogram points in their "summary" view, which sorts the underlying samples allowing
763            // proper quantile queries to be answered, hence our "sorted" points. We do it this way because rather than
764            // sort every time we insert, or cloning the points, we only sort when a summary view is constructed, which
765            // requires mutable access to sort the samples in-place.
766            let mut sorted_points = Vec::new();
767            for (ts, h) in points {
768                sorted_points.push((ts, h.summary_view()));
769            }
770
771            for statistic in hist_config.statistics() {
772                let new_points = sorted_points
773                    .iter()
774                    .map(|(ts, hs)| (*ts, statistic.value_from_histogram(hs)))
775                    .collect::<ScalarPoints>();
776
777                let new_values = if statistic.is_rate_statistic() {
778                    MetricValues::rate(new_points, bucket_width)
779                } else {
780                    MetricValues::gauge(new_points)
781                };
782
783                let new_context = context.with_name(format!("{}.{}", context.name(), statistic.suffix()));
784                let new_metric = Metric::from_parts(new_context, new_values, metadata.clone());
785                dispatcher.push(Event::Metric(new_metric)).await?;
786            }
787
788            Ok(())
789        }
790
791        // If we're not dealing with a histogram, then all we need to worry about is converting counters to rates before
792        // forwarding our single, aggregated metric.
793        values => {
794            let adjusted_values = counter_values_to_rate(values, bucket_width);
795
796            let metric = Metric::from_parts(context, adjusted_values, metadata);
797            dispatcher.push(Event::Metric(metric)).await
798        }
799    }
800}
801
802fn counter_values_to_rate(values: MetricValues, interval: Duration) -> MetricValues {
803    match values {
804        MetricValues::Counter(points) => MetricValues::rate(points, interval),
805        values => values,
806    }
807}
808
809const fn align_to_bucket_start(timestamp: u64, bucket_width_secs: u64) -> u64 {
810    timestamp - (timestamp % bucket_width_secs)
811}
812
813const fn is_bucket_closed(
814    current_time: u64, bucket_start: u64, bucket_width_secs: u64, flush_open_buckets: bool,
815) -> bool {
816    // A bucket is considered "closed" if the current time is greater than the end of the bucket, or if
817    // `flush_open_buckets` is `true`.
818    //
819    // Buckets represent a half-open interval, where the start is inclusive and the end is exclusive. This means that
820    // for a bucket start of 10, and a width of 10, the bucket is 10 seconds "wide", and its start and end are 10 and
821    // 20, with the 20 excluded, or [10, 20) in interval notation. Simply put, if we have a timestamp of 10, or anything
822    // smaller than 20, we would consider it to fall within the bucket... but 20 or more would be outside of the bucket.
823    //
824    // We can also represent this visually:
825    //
826    // <--------- bucket 1 ----------> <--------- bucket 2 ----------> <--------- bucket 3 ---------->
827    // [10 11 12 13 14 15 16 17 18 19] [20 21 22 23 24 25 26 27 28 29] [30 31 32 33 34 35 36 37 38 39]
828    //
829    // We can see that each bucket is 10 seconds wide (10 elements, one for each second), and that their ends are
830    // effectively `start + width - 1`. This means that for any of these buckets to be considered "closed", the current
831    // time has to be _greater_ than `start + width - 1`. For example, if the current time is 19, then no buckets are
832    // closed, and if the current time is 29, then bucket 1 is closed but buckets 2 and 3 are still open, and if the
833    // current time is 30, then both buckets 1 and 2 are closed, but bucket 3 is still open.
834    (bucket_start + bucket_width_secs - 1) < current_time || flush_open_buckets
835}
836
837// TODO: One thing we ought to consider is a property test, specifically a state machine property test, where we
838// generate a randomized offset to start time from, a bucket width, flush interval, and operations, and so on... and
839// then we run it to make sure that we are always generating sequential timestamps for data points, etc.
840#[cfg(test)]
841mod tests {
842    use float_cmp::ApproxEqRatio as _;
843    use saluki_core::{
844        components::ComponentContext,
845        topology::{interconnect::Dispatcher, ComponentId, OutputName},
846    };
847    use saluki_metrics::test::TestRecorder;
848    use stringtheory::MetaString;
849    use tokio::sync::mpsc;
850
851    use super::config::HistogramStatistic;
852    use super::*;
853
854    const BUCKET_WIDTH_SECS: u64 = 10;
855    const BUCKET_WIDTH: Duration = Duration::from_secs(BUCKET_WIDTH_SECS);
856    const COUNTER_EXPIRE_SECS: u64 = 20;
857    const COUNTER_EXPIRE: Option<Duration> = Some(Duration::from_secs(COUNTER_EXPIRE_SECS));
858
859    /// Gets the bucket start timestamp for the given step.
860    const fn bucket_ts(step: u64) -> u64 {
861        align_to_bucket_start(insert_ts(step), BUCKET_WIDTH_SECS)
862    }
863
864    /// Gets the insert timestamp for the given step.
865    const fn insert_ts(step: u64) -> u64 {
866        (BUCKET_WIDTH_SECS * (step + 1)) - 2
867    }
868
869    /// Gets the flush timestamp for the given step.
870    const fn flush_ts(step: u64) -> u64 {
871        BUCKET_WIDTH_SECS * (step + 1)
872    }
873
874    struct DispatcherReceiver {
875        receiver: mpsc::Receiver<EventsBuffer>,
876    }
877
878    impl DispatcherReceiver {
879        fn collect_next(&mut self) -> Vec<Metric> {
880            match self.receiver.try_recv() {
881                Ok(event_buffer) => {
882                    let mut metrics = event_buffer
883                        .into_iter()
884                        .filter_map(|event| event.try_into_metric())
885                        .collect::<Vec<Metric>>();
886
887                    metrics.sort_by(|a, b| a.context().name().cmp(b.context().name()));
888                    metrics
889                }
890                Err(_) => Vec::new(),
891            }
892        }
893    }
894
895    /// Constructs a basic `Dispatcher` with a fixed-size event buffer.
896    fn build_basic_dispatcher() -> (EventsDispatcher, DispatcherReceiver) {
897        let component_id = ComponentId::try_from("test").expect("should not fail to create component ID");
898        let mut dispatcher = Dispatcher::new(ComponentContext::transform(component_id));
899
900        let (buffer_tx, buffer_rx) = mpsc::channel(1);
901        dispatcher.add_output(OutputName::Default).unwrap();
902        dispatcher
903            .attach_sender_to_output(&OutputName::Default, buffer_tx)
904            .unwrap();
905
906        (dispatcher, DispatcherReceiver { receiver: buffer_rx })
907    }
908
909    async fn get_flushed_metrics(timestamp: u64, state: &mut AggregationState) -> Vec<Metric> {
910        let (dispatcher, mut dispatcher_receiver) = build_basic_dispatcher();
911        let mut buffered_dispatcher = dispatcher.buffered().expect("default output should always exist");
912
913        // Flush the metrics to an event buffer.
914        state
915            .flush(timestamp, true, &mut buffered_dispatcher)
916            .await
917            .expect("should not fail to flush aggregation state");
918
919        // Flush our buffered dispatcher, which should ensure that the event buffer is sent out, and then read it from the
920        // receiver:
921        buffered_dispatcher
922            .flush()
923            .await
924            .expect("should not fail to flush buffered sender");
925
926        dispatcher_receiver.collect_next()
927    }
928
929    macro_rules! compare_points {
930        (scalar, $expected:expr, $actual:expr, $error_ratio:literal) => {
931            for (idx, (expected_value, actual_value)) in $expected.into_iter().zip($actual.into_iter()).enumerate() {
932                let (expected_ts, expected_point) = expected_value;
933                let (actual_ts, actual_point) = actual_value;
934
935                assert_eq!(
936                    expected_ts, actual_ts,
937                    "timestamp for value #{} does not match: {:?} (expected) vs {:?} (actual)",
938                    idx, expected_ts, actual_ts
939                );
940                assert!(
941                    expected_point.approx_eq_ratio(&actual_point, $error_ratio),
942                    "point for value #{} does not match: {} (expected) vs {} (actual)",
943                    idx,
944                    expected_point,
945                    actual_point
946                );
947            }
948        };
949        (distribution, $expected:expr, $actual:expr) => {
950            for (idx, (expected_value, actual_value)) in $expected.into_iter().zip($actual.into_iter()).enumerate() {
951                let (expected_ts, expected_sketch) = expected_value;
952                let (actual_ts, actual_sketch) = actual_value;
953
954                assert_eq!(
955                    expected_ts, actual_ts,
956                    "timestamp for value #{} does not match: {:?} (expected) vs {:?} (actual)",
957                    idx, expected_ts, actual_ts
958                );
959                assert_eq!(
960                    expected_sketch, actual_sketch,
961                    "sketch for value #{} does not match: {:?} (expected) vs {:?} (actual)",
962                    idx, expected_sketch, actual_sketch
963                );
964            }
965        };
966    }
967
968    macro_rules! assert_flushed_scalar_metric {
969        ($original:expr, $actual:expr, [$($ts:expr => $value:expr),+]) => {
970            assert_flushed_scalar_metric!($original, $actual, [$($ts => $value),+], error_ratio => 0.000001);
971        };
972        ($original:expr, $actual:expr, [$($ts:expr => $value:expr),+], error_ratio => $error_ratio:literal) => {
973            let actual_metric = $actual;
974
975            assert_eq!($original.context(), actual_metric.context(), "expected context ({}) and actual context ({}) do not match", $original.context(), actual_metric.context());
976
977            let expected_points = ScalarPoints::from([$(($ts, $value)),+]);
978
979            match actual_metric.values() {
980                MetricValues::Counter(ref actual_points) | MetricValues::Gauge(ref actual_points) | MetricValues::Rate(ref actual_points, _) => {
981                    assert_eq!(expected_points.len(), actual_points.len(), "expected and actual values have different number of points");
982                    compare_points!(scalar, expected_points, actual_points, $error_ratio);
983                },
984                _ => panic!("only counters, rates, and gauges are supported in assert_flushed_scalar_metric"),
985            }
986        };
987    }
988
989    macro_rules! assert_flushed_distribution_metric {
990        ($original:expr, $actual:expr, [$($ts:expr => $value:expr),+]) => {
991            assert_flushed_distribution_metric!($original, $actual, [$($ts => $value),+], error_ratio => 0.000001);
992        };
993        ($original:expr, $actual:expr, [$($ts:expr => $value:expr),+], error_ratio => $error_ratio:literal) => {
994            let actual_metric = $actual;
995
996            assert_eq!($original.context(), actual_metric.context());
997
998            match actual_metric.values() {
999                MetricValues::Distribution(ref actual_points) => {
1000                    let expected_points = SketchPoints::from([$(($ts, $value)),+]);
1001                    assert_eq!(expected_points.len(), actual_points.len(), "expected and actual values have different number of points");
1002
1003                    compare_points!(distribution, &expected_points, actual_points);
1004                },
1005                _ => panic!("only distributions are supported in assert_flushed_distribution_metric"),
1006            }
1007        };
1008    }
1009
1010    #[test]
1011    fn bucket_is_closed() {
1012        // Cases are defined as:
1013        // (current time, bucket start, bucket width, flush open buckets, expected result)
1014        let cases = [
1015            // Bucket goes from [995, 1005), current time of 1000, so bucket is open.
1016            (1000, 995, 10, false, false),
1017            (1000, 995, 10, true, true),
1018            // Bucket goes from [1000, 1010), current time of 1000, so bucket is open.
1019            (1000, 1000, 10, false, false),
1020            (1000, 1000, 10, true, true),
1021            // Bucket goes from [1000, 1010), current time of 1010, so bucket is closed.
1022            (1010, 1000, 10, false, true),
1023            (1010, 1000, 10, true, true),
1024        ];
1025
1026        for (current_time, bucket_start, bucket_width_secs, flush_open_buckets, expected) in cases {
1027            let expected_reason = if expected {
1028                "closed, was open"
1029            } else {
1030                "open, was closed"
1031            };
1032
1033            assert_eq!(
1034                is_bucket_closed(current_time, bucket_start, bucket_width_secs, flush_open_buckets),
1035                expected,
1036                "expected bucket to be {} (current_time={}, bucket_start={}, bucket_width={}, flush_open_buckets={})",
1037                expected_reason,
1038                current_time,
1039                bucket_start,
1040                bucket_width_secs,
1041                flush_open_buckets
1042            );
1043        }
1044    }
1045
1046    #[tokio::test]
1047    async fn context_limit() {
1048        // Create our aggregation state with a context limit of 2.
1049        let mut state = AggregationState::new(
1050            BUCKET_WIDTH,
1051            2,
1052            COUNTER_EXPIRE,
1053            HistogramConfiguration::default(),
1054            Telemetry::noop(),
1055        );
1056
1057        // Create four unique gauges, and insert all of them. The third and fourth should fail because we've reached
1058        // the context limit.
1059        let input_metrics = [
1060            Metric::gauge("metric1", 1.0),
1061            Metric::gauge("metric2", 2.0),
1062            Metric::gauge("metric3", 3.0),
1063            Metric::gauge("metric4", 4.0),
1064        ];
1065
1066        assert!(!state.context_limit_breached());
1067
1068        assert!(state.insert(insert_ts(1), input_metrics[0].clone()));
1069        assert!(state.insert(insert_ts(1), input_metrics[1].clone()));
1070        assert!(!state.context_limit_breached());
1071
1072        assert!(!state.insert(insert_ts(1), input_metrics[2].clone()));
1073        assert!(state.context_limit_breached());
1074        assert!(!state.insert(insert_ts(1), input_metrics[3].clone()));
1075        assert!(state.context_limit_breached());
1076
1077        // We should only see the first two gauges after flushing. The flush should also clear the breached flag since
1078        // contexts drop below the limit.
1079        let flushed_metrics = get_flushed_metrics(flush_ts(1), &mut state).await;
1080        assert_eq!(flushed_metrics.len(), 2);
1081        assert_eq!(input_metrics[0].context(), flushed_metrics[0].context());
1082        assert_eq!(input_metrics[1].context(), flushed_metrics[1].context());
1083        assert!(!state.context_limit_breached());
1084
1085        // We should be able to insert the third and fourth gauges now as the first two have been flushed, and along
1086        // with them, their contexts should no longer be tracked in the aggregation state:
1087        assert!(state.insert(insert_ts(2), input_metrics[2].clone()));
1088        assert!(state.insert(insert_ts(2), input_metrics[3].clone()));
1089
1090        let flushed_metrics = get_flushed_metrics(flush_ts(2), &mut state).await;
1091        assert_eq!(flushed_metrics.len(), 2);
1092        assert_eq!(input_metrics[2].context(), flushed_metrics[0].context());
1093        assert_eq!(input_metrics[3].context(), flushed_metrics[1].context());
1094    }
1095
1096    #[tokio::test]
1097    async fn context_limit_with_zero_value_counters() {
1098        // We test here to ensure that zero-value counters contribute to the context limit.
1099        let mut state = AggregationState::new(
1100            BUCKET_WIDTH,
1101            2,
1102            COUNTER_EXPIRE,
1103            HistogramConfiguration::default(),
1104            Telemetry::noop(),
1105        );
1106
1107        // Create our input metrics.
1108        let input_metrics = [
1109            Metric::counter("metric1", 1.0),
1110            Metric::counter("metric2", 2.0),
1111            Metric::counter("metric3", 3.0),
1112        ];
1113
1114        assert!(state.insert(insert_ts(1), input_metrics[0].clone()));
1115        assert!(state.insert(insert_ts(1), input_metrics[1].clone()));
1116
1117        // Flush the aggregation state, and observe they're both present.
1118        let flushed_metrics = get_flushed_metrics(flush_ts(1), &mut state).await;
1119        assert_eq!(flushed_metrics.len(), 2);
1120        assert_flushed_scalar_metric!(&input_metrics[0], &flushed_metrics[0], [bucket_ts(1) => 1.0]);
1121        assert_flushed_scalar_metric!(&input_metrics[1], &flushed_metrics[1], [bucket_ts(1) => 2.0]);
1122
1123        // Flush _again_ to ensure that we then emit zero-value variants for both counters.
1124        let flushed_metrics = get_flushed_metrics(flush_ts(2), &mut state).await;
1125        assert_eq!(flushed_metrics.len(), 2);
1126        assert_flushed_scalar_metric!(&input_metrics[0], &flushed_metrics[0], [bucket_ts(2) => 0.0]);
1127        assert_flushed_scalar_metric!(&input_metrics[1], &flushed_metrics[1], [bucket_ts(2) => 0.0]);
1128
1129        // Now try to insert a third counter, which should fail because we've reached the context limit.
1130        assert!(!state.insert(insert_ts(3), input_metrics[2].clone()));
1131
1132        // Flush the aggregation state, and observe that we only see the two original counters.
1133        let flushed_metrics = get_flushed_metrics(flush_ts(3), &mut state).await;
1134        assert_eq!(flushed_metrics.len(), 2);
1135        assert_flushed_scalar_metric!(&input_metrics[0], &flushed_metrics[0], [bucket_ts(3) => 0.0]);
1136        assert_flushed_scalar_metric!(&input_metrics[1], &flushed_metrics[1], [bucket_ts(3) => 0.0]);
1137
1138        // With a fourth flush interval, the two counters should now have expired, and thus be dropped and no longer
1139        // contributing to the context limit.
1140        let flushed_metrics = get_flushed_metrics(flush_ts(4), &mut state).await;
1141        assert_eq!(flushed_metrics.len(), 0);
1142
1143        // Now we should be able to insert the third counter, and it should be the only one present after flushing.
1144        assert!(state.insert(insert_ts(5), input_metrics[2].clone()));
1145
1146        let flushed_metrics = get_flushed_metrics(flush_ts(5), &mut state).await;
1147        assert_eq!(flushed_metrics.len(), 1);
1148        assert_flushed_scalar_metric!(&input_metrics[2], &flushed_metrics[0], [bucket_ts(5) => 3.0]);
1149    }
1150
1151    #[tokio::test]
1152    async fn zero_value_counters() {
1153        // We're testing that we properly emit and expire zero-value counters in all relevant scenarios.
1154        let mut state = AggregationState::new(
1155            BUCKET_WIDTH,
1156            10,
1157            COUNTER_EXPIRE,
1158            HistogramConfiguration::default(),
1159            Telemetry::noop(),
1160        );
1161
1162        // Create two unique counters, and insert both of them.
1163        let input_metrics = [Metric::counter("metric1", 1.0), Metric::counter("metric2", 2.0)];
1164
1165        assert!(state.insert(insert_ts(1), input_metrics[0].clone()));
1166        assert!(state.insert(insert_ts(1), input_metrics[1].clone()));
1167
1168        // Flush the aggregation state, and observe they're both present.
1169        let flushed_metrics = get_flushed_metrics(flush_ts(1), &mut state).await;
1170        assert_eq!(flushed_metrics.len(), 2);
1171        assert_flushed_scalar_metric!(&input_metrics[0], &flushed_metrics[0], [bucket_ts(1) => 1.0]);
1172        assert_flushed_scalar_metric!(&input_metrics[1], &flushed_metrics[1], [bucket_ts(1) => 2.0]);
1173
1174        // Perform our second flush, which should have them as zero-value counters.
1175        let flushed_metrics = get_flushed_metrics(flush_ts(2), &mut state).await;
1176        assert_eq!(flushed_metrics.len(), 2);
1177        assert_flushed_scalar_metric!(&input_metrics[0], &flushed_metrics[0], [bucket_ts(2) => 0.0]);
1178        assert_flushed_scalar_metric!(&input_metrics[1], &flushed_metrics[1], [bucket_ts(2) => 0.0]);
1179
1180        // Now, we'll pretend to skip a flush period and add updates to them again after that.
1181        assert!(state.insert(insert_ts(4), input_metrics[0].clone()));
1182        assert!(state.insert(insert_ts(4), input_metrics[1].clone()));
1183
1184        // Flush the aggregation state, and observe that we have two zero-value counters for the flush period we
1185        // skipped, but that we see them appear again in the fourth flush period.
1186        let flushed_metrics = get_flushed_metrics(flush_ts(4), &mut state).await;
1187        assert_eq!(flushed_metrics.len(), 2);
1188        assert_flushed_scalar_metric!(&input_metrics[0], &flushed_metrics[0], [bucket_ts(3) => 0.0, bucket_ts(4) => 1.0]);
1189        assert_flushed_scalar_metric!(&input_metrics[1], &flushed_metrics[1], [bucket_ts(3) => 0.0, bucket_ts(4) => 2.0]);
1190
1191        // Now we'll skip multiple flush periods and ensure that we emit zero-value counters up until the point they
1192        // expire. As our zero-value counter expiration is 20 seconds, this is two flush periods, so we skip by three
1193        // flush periods, and we should only see the counters emitted for the first two.
1194        let flushed_metrics = get_flushed_metrics(flush_ts(7), &mut state).await;
1195        assert_eq!(flushed_metrics.len(), 2);
1196        assert_flushed_scalar_metric!(&input_metrics[0], &flushed_metrics[0], [bucket_ts(5) => 0.0, bucket_ts(6) => 0.0]);
1197        assert_flushed_scalar_metric!(&input_metrics[1], &flushed_metrics[1], [bucket_ts(5) => 0.0, bucket_ts(6) => 0.0]);
1198    }
1199
1200    #[tokio::test]
1201    async fn merge_identical_timestamped_values_on_flush() {
1202        // We're testing that we properly emit and expire zero-value counters in all relevant scenarios.
1203        let mut state = AggregationState::new(
1204            BUCKET_WIDTH,
1205            10,
1206            COUNTER_EXPIRE,
1207            HistogramConfiguration::default(),
1208            Telemetry::noop(),
1209        );
1210
1211        // Create one multi-value counter, and insert it.
1212        let input_metric = Metric::counter("metric1", [1.0, 2.0, 3.0, 4.0, 5.0]);
1213
1214        assert!(state.insert(insert_ts(1), input_metric.clone()));
1215
1216        // Flush the aggregation state, and observe the metric is present _and_ that we've properly merged all of the
1217        // values within the same timestamp.
1218        let flushed_metrics = get_flushed_metrics(flush_ts(1), &mut state).await;
1219        assert_eq!(flushed_metrics.len(), 1);
1220        assert_flushed_scalar_metric!(&input_metric, &flushed_metrics[0], [bucket_ts(1) => 15.0]);
1221    }
1222
1223    #[tokio::test]
1224    async fn histogram_statistics() {
1225        // We're testing that we properly emit individual metrics (min, max, sum, etc) for a histogram.
1226        let hist_config = HistogramConfiguration::from_statistics(
1227            &[
1228                HistogramStatistic::Count,
1229                HistogramStatistic::Sum,
1230                HistogramStatistic::Percentile {
1231                    q: 0.5,
1232                    suffix: "p50".into(),
1233                },
1234            ],
1235            false,
1236            "".into(),
1237        );
1238        let mut state = AggregationState::new(BUCKET_WIDTH, 10, COUNTER_EXPIRE, hist_config, Telemetry::noop());
1239
1240        // Create one multi-value histogram and insert it.
1241        let input_metric = Metric::histogram("metric1", [1.0, 2.0, 3.0, 4.0, 5.0]);
1242        assert!(state.insert(insert_ts(1), input_metric.clone()));
1243
1244        // Flush the aggregation state, and observe that we've emitted all of the configured distribution statistics in
1245        // the form of three metrics: count, sum, and p50.
1246        let flushed_metrics = get_flushed_metrics(flush_ts(1), &mut state).await;
1247        assert_eq!(flushed_metrics.len(), 3);
1248
1249        // Create versions of the metric for each of the statistics we're expecting to emit. The values themselves don't
1250        // matter here, but we do need a `Metric` for it to compare the context to.
1251        let count_metric = Metric::rate("metric1.count", 0.0, Duration::from_secs(BUCKET_WIDTH_SECS));
1252        let sum_metric = Metric::gauge("metric1.sum", 0.0);
1253        let p50_metric = Metric::gauge("metric1.p50", 0.0);
1254
1255        // We use a less strict error ratio (how much the expected vs actual) for the percentile check, as we generally
1256        // expect the value to be somewhat off the exact value due to the lossy nature of `DDSketch`.
1257        assert_flushed_scalar_metric!(count_metric, &flushed_metrics[0], [bucket_ts(1) => 5.0]);
1258        assert_flushed_scalar_metric!(p50_metric, &flushed_metrics[1], [bucket_ts(1) => 3.0], error_ratio => 0.0025);
1259        assert_flushed_scalar_metric!(sum_metric, &flushed_metrics[2], [bucket_ts(1) => 15.0]);
1260    }
1261
1262    #[tokio::test]
1263    async fn histogram_statistics_unit_propagation() {
1264        // We're testing that the unit from the input histogram metadata propagates to all flushed output metrics.
1265        let hist_config = HistogramConfiguration::from_statistics(
1266            &[
1267                HistogramStatistic::Count,
1268                HistogramStatistic::Sum,
1269                HistogramStatistic::Percentile {
1270                    q: 0.5,
1271                    suffix: "p50".into(),
1272                },
1273            ],
1274            false,
1275            "".into(),
1276        );
1277        let mut state = AggregationState::new(BUCKET_WIDTH, 10, COUNTER_EXPIRE, hist_config, Telemetry::noop());
1278
1279        // Build a histogram with unit = "millisecond", simulating what arrives from a DogStatsD `ms` metric.
1280        let context = Context::from_static_parts("metric1", &[]);
1281        let metadata = MetricMetadata::default().with_unit(MetaString::from_static("millisecond"));
1282        let input_metric = Metric::from_parts(
1283            context,
1284            MetricValues::histogram([1.0_f64, 2.0, 3.0, 4.0, 5.0]),
1285            metadata,
1286        );
1287        assert!(state.insert(insert_ts(1), input_metric));
1288
1289        let flushed_metrics = get_flushed_metrics(flush_ts(1), &mut state).await;
1290        assert_eq!(flushed_metrics.len(), 3);
1291
1292        // Every output metric (count, p50, sum) must carry the unit from the input histogram.
1293        for metric in &flushed_metrics {
1294            assert_eq!(
1295                metric.metadata().unit(),
1296                Some("millisecond"),
1297                "flushed metric '{}' should carry unit='millisecond'",
1298                metric.context().name()
1299            );
1300        }
1301    }
1302
1303    #[tokio::test]
1304    async fn distributions() {
1305        // We're testing that we pass through distributions untouched.
1306        let mut state = AggregationState::new(
1307            BUCKET_WIDTH,
1308            10,
1309            COUNTER_EXPIRE,
1310            HistogramConfiguration::default(),
1311            Telemetry::noop(),
1312        );
1313
1314        // Create one multi-value distribution, with server-side aggregation, and insert it.
1315        let values = [1.0, 2.0, 3.0, 4.0, 5.0];
1316        let input_metric = Metric::distribution("metric1", &values[..]);
1317
1318        assert!(state.insert(insert_ts(1), input_metric.clone()));
1319
1320        // Flush the aggregation state, and observe that we've emitted the original distribution.
1321        let flushed_metrics = get_flushed_metrics(flush_ts(1), &mut state).await;
1322        assert_eq!(flushed_metrics.len(), 1);
1323
1324        assert_flushed_distribution_metric!(&input_metric, &flushed_metrics[0], [bucket_ts(1) => &values[..]]);
1325    }
1326
1327    #[tokio::test]
1328    async fn histogram_copy_to_distribution() {
1329        let hist_config = HistogramConfiguration::from_statistics(
1330            &[
1331                HistogramStatistic::Count,
1332                HistogramStatistic::Sum,
1333                HistogramStatistic::Percentile {
1334                    q: 0.5,
1335                    suffix: "p50".into(),
1336                },
1337            ],
1338            true,
1339            "dist_prefix.".into(),
1340        );
1341        let mut state = AggregationState::new(BUCKET_WIDTH, 10, COUNTER_EXPIRE, hist_config, Telemetry::noop());
1342
1343        // Create one multi-value histogram and insert it.
1344        let values = [1.0, 2.0, 3.0, 4.0, 5.0];
1345        let input_metric = Metric::histogram("metric1", values);
1346        assert!(state.insert(insert_ts(1), input_metric.clone()));
1347
1348        // Flush the aggregation state, and observe that we've emitted all of the configured distribution statistics in
1349        // the form of three metrics: count, sum, and p50 as well as the additional metric from copying the histogram.
1350        let flushed_metrics = get_flushed_metrics(flush_ts(1), &mut state).await;
1351        assert_eq!(flushed_metrics.len(), 4);
1352
1353        // Create versions of the metric for each of the statistics we're expecting to emit. The values themselves don't
1354        // matter here, but we do need a `Metric` for it to compare the context to.
1355        let count_metric = Metric::rate("metric1.count", 0.0, Duration::from_secs(BUCKET_WIDTH_SECS));
1356        let sum_metric = Metric::gauge("metric1.sum", 0.0);
1357        let p50_metric = Metric::gauge("metric1.p50", 0.0);
1358        let expected_distribution = Metric::distribution("dist_prefix.metric1", &values[..]);
1359
1360        // We use a less strict error ratio (how much the expected vs actual) for the percentile check, as we generally
1361        // expect the value to be somewhat off the exact value due to the lossy nature of `DDSketch`.
1362        assert_flushed_distribution_metric!(expected_distribution, &flushed_metrics[0], [bucket_ts(1) => &values[..]]);
1363        assert_flushed_scalar_metric!(count_metric, &flushed_metrics[1], [bucket_ts(1) => 5.0]);
1364        assert_flushed_scalar_metric!(p50_metric, &flushed_metrics[2], [bucket_ts(1) => 3.0], error_ratio => 0.0025);
1365        assert_flushed_scalar_metric!(sum_metric, &flushed_metrics[3], [bucket_ts(1) => 15.0]);
1366    }
1367
1368    #[tokio::test]
1369    async fn nonaggregated_counters_to_rate() {
1370        let counter_value = 42.0;
1371        let bucket_width = BUCKET_WIDTH;
1372
1373        // Create a basic aggregation state.
1374        let mut state = AggregationState::new(
1375            bucket_width,
1376            10,
1377            COUNTER_EXPIRE,
1378            HistogramConfiguration::default(),
1379            Telemetry::noop(),
1380        );
1381
1382        // Create a simple non-aggregated counter, and insert it.
1383        let input_metric = Metric::counter("metric1", counter_value);
1384        assert!(state.insert(insert_ts(1), input_metric.clone()));
1385
1386        // Flush the aggregation state, and observe that we've emitted the expected counter and that it has the right
1387        // value, but specifically that it's a rate with an interval that matches our configured bucket width:
1388        let flushed_metrics = get_flushed_metrics(flush_ts(1), &mut state).await;
1389        assert_eq!(flushed_metrics.len(), 1);
1390        let flushed_metric = &flushed_metrics[0];
1391
1392        assert_flushed_scalar_metric!(&input_metric, flushed_metric, [bucket_ts(1) => counter_value]);
1393        assert_eq!(flushed_metric.values().as_str(), "rate");
1394    }
1395
1396    #[tokio::test]
1397    async fn preaggregated_counters_to_rate() {
1398        let counter_value = 42.0;
1399        let timestamp = 123456;
1400        let bucket_width = BUCKET_WIDTH;
1401
1402        // Create a basic passthrough batcher and forwarder.
1403        let mut batcher = PassthroughBatcher::new(Duration::from_nanos(1), bucket_width, Telemetry::noop()).await;
1404        let (dispatcher, mut dispatcher_receiver) = build_basic_dispatcher();
1405
1406        // Create a simple pre-aggregated counter, and batch it.
1407        let input_metric = Metric::counter("metric1", (timestamp, counter_value));
1408        batcher.push_metric(input_metric.clone(), &dispatcher).await;
1409
1410        // Flush the batcher, and observe that we've emitted the expected counter and that it has the right
1411        // value, but specifically that it's a rate with an interval that matches our configured bucket width:
1412        batcher.try_flush(&dispatcher).await;
1413
1414        let mut flushed_metrics = dispatcher_receiver.collect_next();
1415        assert_eq!(flushed_metrics.len(), 1);
1416        assert_eq!(
1417            Metric::rate("metric1", (timestamp, counter_value), bucket_width),
1418            flushed_metrics.remove(0)
1419        );
1420    }
1421
1422    #[tokio::test]
1423    async fn telemetry() {
1424        // TODO: We don't check `component_events_dropped_total` or `aggregate_passthrough_metrics_total` here as
1425        // they're set directly in the aggregate component future rather than `AggregationState`, which is harder to
1426        // drive overall and would have required even more boilerplate.
1427        //
1428        // Leaving that as a future improvement.
1429
1430        let recorder = TestRecorder::default();
1431        let _local = metrics::set_default_local_recorder(&recorder);
1432
1433        let builder = MetricsBuilder::default();
1434        let telemetry = Telemetry::new(&builder);
1435
1436        let mut state = AggregationState::new(
1437            BUCKET_WIDTH,
1438            2,
1439            COUNTER_EXPIRE,
1440            HistogramConfiguration::default(),
1441            telemetry,
1442        );
1443
1444        // Make sure our telemetry is registered at default values.
1445        assert_eq!(recorder.gauge("aggregate_active_contexts"), Some(0.0));
1446        assert_eq!(recorder.counter("aggregate_passthrough_metrics_total"), Some(0));
1447        assert_eq!(
1448            recorder.counter(("component_events_dropped_total", &[("intentional", "true")])),
1449            Some(0)
1450        );
1451        for metric_type in &["counter", "gauge", "rate", "set", "histogram", "distribution"] {
1452            assert_eq!(
1453                recorder.gauge(("aggregate_active_contexts_by_type", &[("metric_type", *metric_type)])),
1454                Some(0.0)
1455            );
1456        }
1457
1458        // Insert a counter with a non-timestamped value.
1459        assert!(state.insert(insert_ts(1), Metric::counter("metric1", 42.0)));
1460        assert_eq!(recorder.gauge("aggregate_active_contexts"), Some(1.0));
1461        assert_eq!(
1462            recorder.gauge(("aggregate_active_contexts_by_type", &[("metric_type", "counter")])),
1463            Some(1.0)
1464        );
1465        assert_eq!(recorder.counter("aggregate_passthrough_metrics_total"), Some(0));
1466
1467        // Insert a gauge with a timestamped value.
1468        assert!(state.insert(insert_ts(1), Metric::gauge("metric2", (insert_ts(1), 42.0))));
1469        assert_eq!(recorder.gauge("aggregate_active_contexts"), Some(2.0));
1470        assert_eq!(
1471            recorder.gauge(("aggregate_active_contexts_by_type", &[("metric_type", "gauge")])),
1472            Some(1.0)
1473        );
1474
1475        // We've reached our context limit at this point, so the next metric should not be inserted.
1476        assert!(!state.insert(insert_ts(1), Metric::counter("metric3", 42.0)));
1477        assert_eq!(recorder.gauge("aggregate_active_contexts"), Some(2.0));
1478        assert_eq!(
1479            recorder.gauge(("aggregate_active_contexts_by_type", &[("metric_type", "counter")])),
1480            Some(1.0)
1481        );
1482
1483        // Now let's flush the state which should flush the gauge entirely, reducing the context count, but not flush
1484        // the counter, since it'll be in zero-value mode.
1485        let _ = get_flushed_metrics(flush_ts(1), &mut state).await;
1486        assert_eq!(recorder.gauge("aggregate_active_contexts"), Some(1.0));
1487        assert_eq!(
1488            recorder.gauge(("aggregate_active_contexts_by_type", &[("metric_type", "counter")])),
1489            Some(1.0)
1490        );
1491        assert_eq!(
1492            recorder.gauge(("aggregate_active_contexts_by_type", &[("metric_type", "gauge")])),
1493            Some(0.0)
1494        );
1495    }
1496}
1497
1498#[cfg(test)]
1499mod config_smoke {
1500    use serde_json::json;
1501
1502    use super::AggregateConfiguration;
1503    use crate::config_registry::structs;
1504    use crate::config_registry::test_support::run_config_smoke_tests;
1505
1506    #[tokio::test]
1507    async fn smoke_test() {
1508        // Duration fields serialize as {secs, nanos}. We inject whole-second values so the nanos
1509        // sub-fields are always 0 — they are not independently configurable.
1510        run_config_smoke_tests(
1511            structs::AGGREGATE_CONFIGURATION,
1512            &[
1513                "aggregate_flush_interval.nanos",
1514                "aggregate_passthrough_idle_flush_timeout.nanos",
1515                "aggregate_window_duration.nanos",
1516            ],
1517            json!({}),
1518            |cfg| {
1519                cfg.as_typed::<AggregateConfiguration>()
1520                    .expect("AggregateConfiguration should deserialize")
1521            },
1522        )
1523        .await
1524    }
1525}