Skip to main content

saluki_components/transforms/aggregate/
mod.rs

1use std::{
2    num::NonZeroU64,
3    time::{Duration, Instant},
4};
5
6use async_trait::async_trait;
7use ddsketch::DDSketch;
8use hashbrown::{hash_map::Entry, HashMap};
9use resource_accounting::{MemoryBounds, MemoryBoundsBuilder, UsageExpr};
10use saluki_common::time::get_unix_timestamp;
11use saluki_config::GenericConfiguration;
12use saluki_context::Context;
13use saluki_core::{
14    components::{transforms::*, ComponentContext},
15    data_model::event::{metric::*, Event, EventType},
16    observability::ComponentMetricsExt as _,
17    topology::{interconnect::BufferedDispatcher, OutputDefinition},
18    topology::{EventsBuffer, EventsDispatcher},
19};
20use saluki_error::GenericError;
21use saluki_metrics::MetricsBuilder;
22use serde::Deserialize;
23use smallvec::SmallVec;
24use stringtheory::MetaString;
25use tokio::{
26    select,
27    time::{interval, interval_at},
28};
29use tracing::{debug, error, info, trace, warn};
30
31mod telemetry;
32use self::telemetry::Telemetry;
33
34mod config;
35use self::config::{HistogramConfiguration, HistogramStatistic};
36
37const PASSTHROUGH_IDLE_FLUSH_CHECK_INTERVAL: Duration = Duration::from_secs(2);
38
39const fn default_window_duration_seconds() -> NonZeroU64 {
40    NonZeroU64::new(10).expect("not zero")
41}
42
43const fn default_primary_flush_interval() -> Duration {
44    Duration::from_secs(15)
45}
46
47const fn default_context_limit() -> usize {
48    1_000_000
49}
50
51const fn default_counter_expiry_seconds() -> Option<u64> {
52    Some(300)
53}
54
55const fn default_passthrough_timestamped_metrics() -> bool {
56    true
57}
58
59const fn default_passthrough_idle_flush_timeout() -> Duration {
60    Duration::from_secs(1)
61}
62
63/// Aggregate transform.
64///
65/// Aggregates metrics into fixed-size windows, flushing them at a regular interval.
66///
67/// ## Zero-value counters
68///
69/// When metrics are aggregated and then flushed, they're typically removed entirely from the aggregation state. Unless
70/// they're updated again, they won't be emitted again. However, for counters, a slightly different approach is
71/// taken by tracking "zero-value" counters.
72///
73/// Counters are aggregated and flushed normally. However, when flushed, counters are added to a list of "zero-value"
74/// counters, and if those counters aren't updated again, the transform emits a copy of the counter with a value of
75/// zero. It does this until the counter is updated again, or the zero-value counter expires (no updates), whichever
76/// comes first.
77///
78/// This provides a continuity in the output of a counter, from the perspective of a downstream system, when counters
79/// are otherwise sparse. The expiration period is configurable, and allows a trade-off in how sparse/infrequent the
80/// updates to counters can be versus how long it takes for counters that don't exist anymore to actually cease to be
81/// emitted.
82#[derive(Deserialize)]
83#[cfg_attr(test, derive(Debug, PartialEq, serde::Serialize))]
84pub struct AggregateConfiguration {
85    /// Size of the aggregation window, in seconds.
86    ///
87    /// Metrics are aggregated into fixed-size windows, such that all updates to the same metric within a window are
88    /// aggregated into a single metric. The window size controls how efficiently metrics are aggregated, and in turn,
89    /// how many data points are emitted downstream.
90    ///
91    /// Window durations cannot be zero.
92    ///
93    /// Defaults to 10 seconds.
94    #[serde(
95        rename = "aggregate_window_duration_seconds",
96        default = "default_window_duration_seconds"
97    )]
98    window_duration_seconds: NonZeroU64,
99
100    /// How often to flush buckets.
101    ///
102    /// This represents a trade-off between the savings in network bandwidth (sending fewer requests to downstream
103    /// systems, etc) and the frequency of updates (how often updates to a metric are emitted).
104    ///
105    /// Defaults to 15 seconds.
106    #[serde(rename = "aggregate_flush_interval", default = "default_primary_flush_interval")]
107    primary_flush_interval: Duration,
108
109    /// Maximum number of contexts to aggregate per window.
110    ///
111    /// A context is the unique combination of a metric name and its set of tags. For example,
112    /// `metric.name.here{tag1=A,tag2=B}` represents a single context, and would be different than
113    /// `metric.name.here{tag1=A,tag2=C}`.
114    ///
115    /// When the maximum number of contexts is reached in the current aggregation window, additional metrics are dropped
116    /// until the next window starts.
117    ///
118    /// Defaults to 1,000,000.
119    #[serde(rename = "aggregate_context_limit", default = "default_context_limit")]
120    context_limit: usize,
121
122    /// Whether to flush open buckets when stopping the transform.
123    ///
124    /// Normally, open buckets (a bucket whose end hasn't yet occurred) aren't flushed when the transform is stopped.
125    /// This is done to avoid the chance of flushing a partial window, restarting the process, and then flushing the
126    /// same window again. Downstream systems sometimes can't cope with this gracefully, as there is no way to
127    /// determine that it's an incremental update, and so they treat it as an absolute update, overwriting the
128    /// previously flushed value.
129    ///
130    /// In cases where flushing all outstanding data is paramount, this can be enabled.
131    ///
132    /// Defaults to `false`.
133    #[serde(
134        rename = "aggregate_flush_open_windows",
135        alias = "dogstatsd_flush_incomplete_buckets",
136        default
137    )]
138    flush_open_windows: bool,
139
140    /// How long to keep idle counters alive after they've been flushed, in seconds.
141    ///
142    /// When metrics are flushed, they're removed from the aggregation state. However, if a counter expiration is set,
143    /// counters will be kept alive in an "idle" state. For as long as a counter is idle, but not yet expired, a zero
144    /// value will be emitted for it during each flush. This allows more gracefully handling sparse counters, where
145    /// updates are infrequent but leaving gaps in the time series would be undesirable from a user experience
146    /// perspective.
147    ///
148    /// After a counter has been idle (no updates) for longer than the expiry period, it will be completely removed and
149    /// no further zero values will be emitted.
150    ///
151    /// Defaults to 300 seconds (5 minutes). Setting a value of `0` disables idle counter keep-alive.
152    #[serde(alias = "dogstatsd_expiry_seconds", default = "default_counter_expiry_seconds")]
153    counter_expiry_seconds: Option<u64>,
154
155    /// Whether or not to immediately forward (passthrough) metrics with pre-defined timestamps.
156    ///
157    /// When enabled, this causes the aggregator to immediately forward metrics that already have a timestamp present.
158    /// Only metrics without a timestamp will be aggregated. This can be useful when metrics are already pre-aggregated
159    /// client-side and both timeliness and memory efficiency are paramount, as it avoids the overhead of aggregating
160    /// within the pipeline.
161    ///
162    /// Defaults to `true`.
163    #[serde(
164        rename = "dogstatsd_no_aggregation_pipeline",
165        default = "default_passthrough_timestamped_metrics"
166    )]
167    passthrough_timestamped_metrics: bool,
168
169    /// How often to flush buffered passthrough metrics.
170    ///
171    /// While passthrough metrics aren't re-aggregated by the transform, they will still be temporarily buffered in
172    /// order to optimize the efficiency of processing them in the next component. This setting controls the maximum
173    /// amount of time that passthrough metrics will be buffered before being forwarded.
174    ///
175    /// Defaults to 1 seconds.
176    #[serde(
177        rename = "aggregate_passthrough_idle_flush_timeout",
178        default = "default_passthrough_idle_flush_timeout"
179    )]
180    passthrough_idle_flush_timeout: Duration,
181
182    /// Histogram aggregation configuration.
183    ///
184    /// Controls the aggregates/percentiles that are generated for distributions in "histogram" mode (client-side
185    /// distribution aggregation).
186    #[serde(flatten)]
187    hist_config: HistogramConfiguration,
188}
189
190impl AggregateConfiguration {
191    /// Creates a new `AggregateConfiguration` from the given configuration.
192    pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
193        Ok(config.as_typed()?)
194    }
195
196    /// Creates a new `AggregateConfiguration` with default values.
197    pub fn with_defaults() -> Self {
198        Self {
199            window_duration_seconds: default_window_duration_seconds(),
200            primary_flush_interval: default_primary_flush_interval(),
201            context_limit: default_context_limit(),
202            flush_open_windows: false,
203            counter_expiry_seconds: default_counter_expiry_seconds(),
204            passthrough_timestamped_metrics: default_passthrough_timestamped_metrics(),
205            passthrough_idle_flush_timeout: default_passthrough_idle_flush_timeout(),
206            hist_config: HistogramConfiguration::default(),
207        }
208    }
209}
210
211#[async_trait]
212impl TransformBuilder for AggregateConfiguration {
213    async fn build(&self, context: ComponentContext) -> Result<Box<dyn Transform + Send>, GenericError> {
214        let metrics_builder = MetricsBuilder::from_component_context(&context);
215        let telemetry = Telemetry::new(&metrics_builder);
216
217        let state = AggregationState::new(
218            self.window_duration_seconds,
219            self.context_limit,
220            self.counter_expiry_seconds.filter(|s| *s != 0).map(Duration::from_secs),
221            self.hist_config.clone(),
222            telemetry.clone(),
223        );
224
225        let passthrough_batcher = PassthroughBatcher::new(
226            self.passthrough_idle_flush_timeout,
227            self.window_duration_seconds,
228            telemetry.clone(),
229        )
230        .await;
231
232        Ok(Box::new(Aggregate {
233            state,
234            telemetry,
235            primary_flush_interval: self.primary_flush_interval,
236            flush_open_windows: self.flush_open_windows,
237            passthrough_batcher,
238            passthrough_timestamped_metrics: self.passthrough_timestamped_metrics,
239        }))
240    }
241
242    fn input_event_type(&self) -> EventType {
243        EventType::Metric
244    }
245
246    fn outputs(&self) -> &[OutputDefinition<EventType>] {
247        static OUTPUTS: &[OutputDefinition<EventType>] = &[OutputDefinition::default_output(EventType::Metric)];
248        OUTPUTS
249    }
250}
251
252impl MemoryBounds for AggregateConfiguration {
253    fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
254        // TODO: While we account for the aggregation state map accurately, what we don't currently account for is the
255        // fact that a metric could have multiple distinct values. For the common pipeline of metrics in via DogStatsD,
256        // this generally shouldn't be a problem because the values don't have a timestamp, so they get aggregated into
257        // the same bucket, leading to two values per `MetricValues` at most, which is already baked into the size of
258        // `MetricValues` due to using `SmallVec`.
259        //
260        // However, there could be many more values in a single metric, and we don't account for that.
261
262        builder
263            .minimum()
264            // Capture the size of the heap allocation when the component is built.
265            .with_single_value::<Aggregate>("component struct");
266        builder
267            .firm()
268            // Account for the aggregation state map, where we map contexts to the merged metric.
269            .with_expr(UsageExpr::product(
270                "aggregation state map",
271                UsageExpr::sum(
272                    "context map entry",
273                    UsageExpr::struct_size::<Context>("context"),
274                    UsageExpr::struct_size::<AggregatedMetric>("aggregated metric"),
275                ),
276                UsageExpr::config("aggregate_context_limit", self.context_limit),
277            ));
278    }
279}
280
281pub struct Aggregate {
282    state: AggregationState,
283    telemetry: Telemetry,
284    primary_flush_interval: Duration,
285    flush_open_windows: bool,
286    passthrough_batcher: PassthroughBatcher,
287    passthrough_timestamped_metrics: bool,
288}
289
290#[async_trait]
291impl Transform for Aggregate {
292    async fn run(mut self: Box<Self>, mut context: TransformContext) -> Result<(), GenericError> {
293        let mut health = context.take_health_handle();
294
295        let mut primary_flush = interval_at(
296            tokio::time::Instant::now() + self.primary_flush_interval,
297            self.primary_flush_interval,
298        );
299        let mut final_primary_flush = false;
300
301        let passthrough_flush = interval(PASSTHROUGH_IDLE_FLUSH_CHECK_INTERVAL);
302
303        health.mark_ready();
304        debug!("Aggregation transform started.");
305
306        tokio::pin!(passthrough_flush);
307
308        loop {
309            select! {
310                _ = health.live() => continue,
311                _ = primary_flush.tick() => {
312                    // We've reached the end of the current window. Flush our aggregation state and forward the metrics
313                    // onwards. Regardless of whether any metrics were aggregated, we always update the aggregation
314                    // state to track the start time of the current aggregation window.
315                    if !self.state.is_empty() {
316                        debug!("Flushing aggregated metrics...");
317
318                        let should_flush_open_windows = final_primary_flush && self.flush_open_windows;
319
320                        // Remember if the context limit had been surpassed before this flush.
321                        let was_breached = self.state.context_limit_breached();
322
323                        let mut dispatcher = context.dispatcher().buffered().expect("default output should always exist");
324                        if let Err(e) = self.state.flush(get_unix_timestamp(), should_flush_open_windows, &mut dispatcher).await {
325                            error!(error = %e, "Failed to flush aggregation state.");
326                        }
327
328                        self.telemetry.increment_flushes();
329
330                        // If flush recovered us from a breach, log the recovery.
331                        if was_breached && !self.state.context_limit_breached() {
332                            info!("Context limit no longer exceeded, metrics are being accepted again.");
333                        }
334
335                        match dispatcher.flush().await {
336                            Ok(aggregated_events) => debug!(aggregated_events, "Dispatched events."),
337                            Err(e) => error!(error = %e, "Failed to flush aggregated events."),
338                        }
339                    }
340
341                    // If this is the final flush, we break out of the loop.
342                    if final_primary_flush {
343                        debug!("All aggregation complete.");
344                        break
345                    }
346                },
347                _ = passthrough_flush.tick() => self.passthrough_batcher.try_flush(context.dispatcher()).await,
348                maybe_events = context.events().next(), if !final_primary_flush => match maybe_events {
349                    Some(events) => {
350                        trace!(events_len = events.len(), "Received events.");
351
352                        let current_time = get_unix_timestamp();
353                        let mut processed_passthrough_metrics = false;
354
355                        for event in events {
356                            if let Some(metric) = event.try_into_metric() {
357                                let metric = if self.passthrough_timestamped_metrics {
358                                    // Try splitting out any timestamped values, and if we have any, we'll buffer them
359                                    // separately and process the remaining nontimestamped metric (if any) by
360                                    // aggregating it like normal.
361                                    let (maybe_timestamped_metric, maybe_nontimestamped_metric) = try_split_timestamped_values(metric);
362
363                                    // If we have a timestamped metric, then batch it up out-of-band.
364                                    if let Some(timestamped_metric) = maybe_timestamped_metric {
365                                        self.passthrough_batcher.push_metric(timestamped_metric, context.dispatcher()).await;
366                                        processed_passthrough_metrics = true;
367                                    }
368
369                                    // If we have an nontimestamped metric, we'll process it like normal.
370                                    //
371                                    // Otherwise, continue to the next event.
372                                    match maybe_nontimestamped_metric {
373                                        Some(metric) => metric,
374                                        None => continue,
375                                    }
376                                } else {
377                                    metric
378                                };
379
380                                let was_breached = self.state.context_limit_breached();
381                                if !self.state.insert(current_time, metric) {
382                                    trace!("Dropping metric due to context limit.");
383                                    if !was_breached {
384                                        // First drop since the last recovery — emit a single warning.
385                                        warn!(context_limit = self.state.context_limit, "Context limit reached, \
386                                        dropping metrics. Consider increasing `aggregate_context_limit`.");
387                                    }
388                                    self.telemetry.increment_events_dropped();
389                                }
390                            }
391                        }
392
393                        if processed_passthrough_metrics {
394                            self.passthrough_batcher.update_last_processed_at();
395                        }
396                    },
397                    None => {
398                        // We've reached the end of our input stream, so mark ourselves for a final flush and reset the
399                        // interval so it ticks immediately on the next loop iteration.
400                        final_primary_flush = true;
401                        primary_flush.reset_immediately();
402
403                        debug!("Aggregation transform stopping...");
404                    }
405                },
406            }
407        }
408
409        // Do a final flush of any timestamped metrics that we've buffered up.
410        self.passthrough_batcher.try_flush(context.dispatcher()).await;
411
412        debug!("Aggregation transform stopped.");
413
414        Ok(())
415    }
416}
417
418fn try_split_timestamped_values(mut metric: Metric) -> (Option<Metric>, Option<Metric>) {
419    if metric.values().all_timestamped() {
420        (Some(metric), None)
421    } else if metric.values().any_timestamped() {
422        // Only _some_ of the values are timestamped, so we'll split the timestamped values into a new metric.
423        let new_metric_values = metric.values_mut().split_timestamped();
424        let new_metric = Metric::from_parts(metric.context().clone(), new_metric_values, metric.metadata().clone());
425
426        (Some(new_metric), Some(metric))
427    } else {
428        // No timestamped values, so we need to aggregate this metric.
429        (None, Some(metric))
430    }
431}
432
433struct PassthroughBatcher {
434    active_buffer: EventsBuffer,
435    active_buffer_start: Instant,
436    last_processed_at: Instant,
437    idle_flush_timeout: Duration,
438    bucket_width_secs: NonZeroU64,
439    telemetry: Telemetry,
440}
441
442impl PassthroughBatcher {
443    async fn new(idle_flush_timeout: Duration, bucket_width_secs: NonZeroU64, telemetry: Telemetry) -> Self {
444        let active_buffer = EventsBuffer::default();
445
446        Self {
447            active_buffer,
448            active_buffer_start: Instant::now(),
449            last_processed_at: Instant::now(),
450            idle_flush_timeout,
451            bucket_width_secs,
452            telemetry,
453        }
454    }
455
456    async fn push_metric(&mut self, metric: Metric, dispatcher: &EventsDispatcher) {
457        // Convert counters to rates before we batch them up.
458        //
459        // This involves specifying the rate interval as the bucket width of the aggregate transform itself, which when
460        // you say it out loud is sort of confusing and nonsensical since the whole point is that these are
461        // _pre-aggregated_ metrics but we have to match the behavior of the Datadog Agent. ¯\_(ツ)_/¯
462        let (context, values, metadata) = metric.into_parts();
463        let adjusted_values = counter_values_to_rate(values, self.bucket_width_secs);
464        let metric = Metric::from_parts(context, adjusted_values, metadata);
465
466        // Try pushing the metric into our active buffer.
467        //
468        // If our active buffer is full, then we'll flush the buffer, grab a new one, and push the metric into it.
469        if let Some(event) = self.active_buffer.try_push(Event::Metric(metric)) {
470            debug!("Passthrough event buffer was full. Flushing...");
471            self.dispatch_events(dispatcher).await;
472
473            if self.active_buffer.try_push(event).is_some() {
474                error!("Event buffer is full even after dispatching events. Dropping event.");
475                self.telemetry.increment_events_dropped();
476                return;
477            }
478        }
479
480        // If this is the first metric in the buffer, we've started a new batch, so track when it started.
481        if self.active_buffer.len() == 1 {
482            self.active_buffer_start = Instant::now();
483        }
484
485        self.telemetry.increment_passthrough_metrics();
486    }
487
488    fn update_last_processed_at(&mut self) {
489        // We expose this as a standalone method, rather than just doing it automatically in `push_metric`, because
490        // otherwise we might be calling this 10-20K times per second, instead of simply doing it after the end of each
491        // input event buffer in the transform's main loop, which should be much less frequent.
492        self.last_processed_at = Instant::now();
493    }
494
495    async fn try_flush(&mut self, dispatcher: &EventsDispatcher) {
496        // If our active buffer isn't empty, and we've exceeded our idle flush timeout, then flush the buffer.
497        if !self.active_buffer.is_empty() && self.last_processed_at.elapsed() >= self.idle_flush_timeout {
498            debug!("Passthrough processing exceeded idle flush timeout. Flushing...");
499
500            self.dispatch_events(dispatcher).await;
501        }
502    }
503
504    async fn dispatch_events(&mut self, dispatcher: &EventsDispatcher) {
505        if !self.active_buffer.is_empty() {
506            let unaggregated_events = self.active_buffer.len();
507
508            // Track how long this batch was alive for.
509            let batch_duration = self.active_buffer_start.elapsed();
510            self.telemetry.record_passthrough_batch_duration(batch_duration);
511
512            self.telemetry.increment_passthrough_flushes();
513
514            // Swap our active buffer with a new, empty one, and then forward the old one.
515            let new_active_buffer = EventsBuffer::default();
516            let old_active_buffer = std::mem::replace(&mut self.active_buffer, new_active_buffer);
517
518            match dispatcher.dispatch(old_active_buffer).await {
519                Ok(()) => debug!(unaggregated_events, "Dispatched events."),
520                Err(e) => error!(error = %e, "Failed to flush unaggregated events."),
521            }
522        }
523    }
524}
525
526#[derive(Clone)]
527struct AggregatedMetric {
528    values: MetricValues,
529    metadata: MetricMetadata,
530    last_seen: u64,
531}
532
533struct AggregationState {
534    contexts: HashMap<Context, AggregatedMetric, foldhash::quality::RandomState>,
535    contexts_remove_buf: Vec<Context>,
536    context_limit: usize,
537    bucket_width_secs: NonZeroU64,
538    counter_expire_secs: Option<NonZeroU64>,
539    last_flush: u64,
540    hist_config: HistogramConfiguration,
541    telemetry: Telemetry,
542    /// Tracks whether the context limit has been breached. Starts out as `false`. Set to `true` on the first dropped
543    /// metric. Reset to `false` when the context count drops below the limit during flush.
544    context_limit_breached: bool,
545}
546
547impl AggregationState {
548    fn new(
549        bucket_width_secs: NonZeroU64, context_limit: usize, counter_expiration: Option<Duration>,
550        hist_config: HistogramConfiguration, telemetry: Telemetry,
551    ) -> Self {
552        let counter_expire_secs = counter_expiration.map(|d| d.as_secs()).and_then(NonZeroU64::new);
553
554        Self {
555            contexts: HashMap::default(),
556            contexts_remove_buf: Vec::new(),
557            context_limit,
558            bucket_width_secs,
559            counter_expire_secs,
560            last_flush: 0,
561            hist_config,
562            telemetry,
563            context_limit_breached: false,
564        }
565    }
566
567    fn is_empty(&self) -> bool {
568        self.contexts.is_empty()
569    }
570
571    fn insert(&mut self, timestamp: u64, metric: Metric) -> bool {
572        // If we haven't seen this context yet, and it would put us over the limit to insert it, then return early.
573        if !self.contexts.contains_key(metric.context()) && self.contexts.len() >= self.context_limit {
574            self.context_limit_breached = true;
575            return false;
576        }
577
578        let (context, mut values, metadata) = metric.into_parts();
579
580        // Collapse all non-timestamped values into a single timestamped value.
581        //
582        // We do this pre-aggregation step because unless we're merging into an existing context, we'll end up with
583        // however many values were in the original metric instead of full aggregated values.
584        let bucket_ts = align_to_bucket_start(timestamp, self.bucket_width_secs);
585        values.collapse_non_timestamped(bucket_ts);
586
587        trace!(
588            bucket_ts,
589            kind = values.as_str(),
590            "Inserting metric into aggregation state."
591        );
592
593        // If we're already tracking this context, update the last seen time and merge the new values into the existing
594        // values. Otherwise, create a new entry.
595        match self.contexts.entry(context) {
596            Entry::Occupied(mut entry) => {
597                let aggregated = entry.get_mut();
598
599                // We ignore metadata changes within a flush interval to keep things simple.
600                aggregated.last_seen = timestamp;
601                aggregated.values.merge(values);
602            }
603            Entry::Vacant(entry) => {
604                self.telemetry.increment_contexts(entry.key(), &values);
605
606                entry.insert(AggregatedMetric {
607                    values,
608                    metadata,
609                    last_seen: timestamp,
610                });
611            }
612        }
613
614        true
615    }
616
617    async fn flush(
618        &mut self, current_time: u64, flush_open_buckets: bool, dispatcher: &mut BufferedDispatcher<'_, EventsBuffer>,
619    ) -> Result<(), GenericError> {
620        let bucket_width_secs = self.bucket_width_secs;
621        let counter_expire_secs = self.counter_expire_secs.map(|d| d.get()).unwrap_or(0);
622
623        // We want our split timestamp to be before the start of the current bucket, which ensures any timestamp that is
624        // less than or equal to `split_timestamp` resides in a closed bucket.
625        let split_timestamp = align_to_bucket_start(current_time, bucket_width_secs).saturating_sub(1);
626
627        // Calculate the buckets we need to potentially generate zero-value counters for.
628        //
629        // We only need to do this if we've flushed before, since we won't have any knowledge of which counters are idle
630        // or not until that happens.
631        let mut zero_value_buckets = SmallVec::<[(u64, MetricValues); 4]>::new();
632        if self.last_flush != 0 {
633            let start = align_to_bucket_start(self.last_flush, bucket_width_secs);
634
635            for bucket_start in (start..current_time).step_by(bucket_width_secs.get() as usize) {
636                if is_bucket_closed(current_time, bucket_start, bucket_width_secs, flush_open_buckets) {
637                    zero_value_buckets.push((bucket_start, MetricValues::counter((bucket_start, 0.0))));
638                }
639            }
640        }
641
642        // Iterate over each context we're tracking, and flush any values that are in buckets which are now closed.
643        debug!(timestamp = current_time, "Flushing buckets.");
644
645        for (context, am) in self.contexts.iter_mut() {
646            // Figure out if we should remove this metric or not if it has no values in open buckets.
647            //
648            // We have a special carve-out for counters here, which we have the ability to keep alive after they are
649            // flushed, based on a configured expiration period. This allows us to continue emitting a zero value for
650            // counters when they're idle, which can make them appear "live" in downstream systems, even when they're
651            // not.
652            //
653            // This is useful for sparsely-updated counters.
654            let should_expire_if_empty = match &am.values {
655                MetricValues::Counter(..) => {
656                    counter_expire_secs != 0 && am.last_seen + counter_expire_secs < current_time
657                }
658                _ => true,
659            };
660
661            // If we're dealing with a counter, we'll merge in our calculated set of zero values. We only merge in the
662            // values that represent now-closed buckets.
663            //
664            // This is also safe to do even when there are real values in those buckets since adding zero to anything is
665            // a no-op from the perspective of what we end up flushing, and it doesn't mess with the "last seen" time.
666            if let MetricValues::Counter(..) = &mut am.values {
667                let expires_at = am.last_seen + counter_expire_secs;
668                for (zv_bucket_start, zero_value) in &zero_value_buckets {
669                    if expires_at > *zv_bucket_start {
670                        am.values.merge(zero_value.clone());
671                    } else {
672                        // Since zero-value buckets are in order, we can break early if this bucket is past the
673                        // expiration cutoff of the counter. No other bucket will be within the expiration range.
674                        break;
675                    }
676                }
677            }
678
679            // Finally, figure out if the current metric can be removed.
680            //
681            // For any metric with values that are in open buckets, we split off the values that are in closed buckets
682            // and keep the metric alive. When all the values are in closed buckets, or there are no values, we'll
683            // remove the metric if `should_remove_if_empty` is `true`.
684            //
685            // This means we'll always remove all-closed/empty non-counter metrics, and we _may_ remove all-closed/empty
686            // counters.
687            if let Some(closed_bucket_values) = am.values.split_at_timestamp(split_timestamp) {
688                self.telemetry.increment_flushed(&closed_bucket_values);
689
690                // We got some closed bucket values, so flush those out.
691                transform_and_push_metric(
692                    context.clone(),
693                    closed_bucket_values,
694                    am.metadata.clone(),
695                    bucket_width_secs,
696                    &self.hist_config,
697                    dispatcher,
698                )
699                .await?;
700            }
701
702            if am.values.is_empty() && should_expire_if_empty {
703                self.telemetry.decrement_contexts(context, &am.values);
704                self.contexts_remove_buf.push(context.clone());
705            }
706        }
707
708        // Remove any contexts that were marked as needing to be removed.
709        let contexts_len_before = self.contexts.len();
710        for context in self.contexts_remove_buf.drain(..) {
711            self.contexts.remove(&context);
712        }
713        let contexts_len_after = self.contexts.len();
714
715        let contexts_delta = contexts_len_before.saturating_sub(contexts_len_after);
716        let target_contexts_capacity = contexts_len_after.saturating_add(contexts_delta / 2);
717        self.contexts.shrink_to(target_contexts_capacity);
718
719        if self.context_limit_breached && self.contexts.len() < self.context_limit {
720            self.context_limit_breached = false;
721        }
722
723        self.last_flush = current_time;
724
725        Ok(())
726    }
727
728    fn context_limit_breached(&self) -> bool {
729        self.context_limit_breached
730    }
731}
732
733async fn transform_and_push_metric(
734    context: Context, mut values: MetricValues, metadata: MetricMetadata, bucket_width_secs: NonZeroU64,
735    hist_config: &HistogramConfiguration, dispatcher: &mut BufferedDispatcher<'_, EventsBuffer>,
736) -> Result<(), GenericError> {
737    let bucket_width = Duration::from_secs(bucket_width_secs.get());
738
739    match values {
740        // If we're dealing with a histogram, we calculate a configured set of aggregates/percentiles from it, and emit
741        // them as individual metrics.
742        MetricValues::Histogram(ref mut points) => {
743            // Convert histogram to distribution
744            if hist_config.copy_to_distribution() {
745                let sketch_points = points
746                    .into_iter()
747                    .map(|(ts, hist)| {
748                        let mut sketch = DDSketch::default();
749                        for sample in hist.samples() {
750                            sketch.insert_n(sample.value.into_inner(), sample.weight.0 as u64);
751                        }
752                        (ts, sketch)
753                    })
754                    .collect::<SketchPoints>();
755                let distribution_values = MetricValues::distribution(sketch_points);
756                let metric_context = if !hist_config.copy_to_distribution_prefix().is_empty() {
757                    context.with_name(format!(
758                        "{}{}",
759                        hist_config.copy_to_distribution_prefix(),
760                        context.name()
761                    ))
762                } else {
763                    context.clone()
764                };
765                let new_metric = Metric::from_parts(metric_context, distribution_values, metadata.clone());
766                dispatcher.push(Event::Metric(new_metric)).await?;
767            }
768            // We collect our histogram points in their "summary" view, which sorts the underlying samples allowing
769            // proper quantile queries to be answered, hence our "sorted" points. We do it this way because rather than
770            // sort every time we insert, or cloning the points, we only sort when a summary view is constructed, which
771            // requires mutable access to sort the samples in-place.
772            let mut sorted_points = Vec::new();
773            for (ts, h) in points {
774                sorted_points.push((ts, h.summary_view()));
775            }
776
777            for statistic in hist_config.statistics() {
778                let new_points = sorted_points
779                    .iter()
780                    .map(|(ts, hs)| (*ts, statistic.value_from_histogram(hs)))
781                    .collect::<ScalarPoints>();
782
783                let new_values = if statistic.is_rate_statistic() {
784                    MetricValues::rate(new_points, bucket_width)
785                } else {
786                    MetricValues::gauge(new_points)
787                };
788
789                // Counts are dimensionless, so clear any unit inherited from the input histogram.
790                let new_metadata = if matches!(statistic, HistogramStatistic::Count) {
791                    metadata.clone().with_unit(MetaString::empty())
792                } else {
793                    metadata.clone()
794                };
795
796                let new_context = context.with_name(format!("{}.{}", context.name(), statistic.suffix()));
797                let new_metric = Metric::from_parts(new_context, new_values, new_metadata);
798                dispatcher.push(Event::Metric(new_metric)).await?;
799            }
800
801            Ok(())
802        }
803
804        // If we're not dealing with a histogram, then all we need to worry about is converting counters to rates before
805        // forwarding our single, aggregated metric.
806        values => {
807            let adjusted_values = counter_values_to_rate(values, bucket_width_secs);
808
809            let metric = Metric::from_parts(context, adjusted_values, metadata);
810            dispatcher.push(Event::Metric(metric)).await
811        }
812    }
813}
814
815fn counter_values_to_rate(values: MetricValues, interval_secs: NonZeroU64) -> MetricValues {
816    match values {
817        MetricValues::Counter(points) => MetricValues::rate(points, Duration::from_secs(interval_secs.get())),
818        values => values,
819    }
820}
821
822const fn align_to_bucket_start(timestamp: u64, bucket_width_secs: NonZeroU64) -> u64 {
823    timestamp - (timestamp % bucket_width_secs.get())
824}
825
826const fn is_bucket_closed(
827    current_time: u64, bucket_start: u64, bucket_width_secs: NonZeroU64, flush_open_buckets: bool,
828) -> bool {
829    // A bucket is considered "closed" if the current time is greater than the end of the bucket, or if
830    // `flush_open_buckets` is `true`.
831    //
832    // Buckets represent a half-open interval, where the start is inclusive and the end is exclusive. This means that
833    // for a bucket start of 10, and a width of 10, the bucket is 10 seconds "wide", and its start and end are 10 and
834    // 20, with the 20 excluded, or [10, 20) in interval notation. Simply put, if we have a timestamp of 10, or anything
835    // smaller than 20, we would consider it to fall within the bucket... but 20 or more would be outside of the bucket.
836    //
837    // We can also represent this visually:
838    //
839    // <--------- bucket 1 ----------> <--------- bucket 2 ----------> <--------- bucket 3 ---------->
840    // [10 11 12 13 14 15 16 17 18 19] [20 21 22 23 24 25 26 27 28 29] [30 31 32 33 34 35 36 37 38 39]
841    //
842    // We can see that each bucket is 10 seconds wide (10 elements, one for each second), and that their ends are
843    // effectively `start + width - 1`. This means that for any of these buckets to be considered "closed", the current
844    // time has to be _greater_ than `start + width - 1`. For example, if the current time is 19, then no buckets are
845    // closed, and if the current time is 29, then bucket 1 is closed but buckets 2 and 3 are still open, and if the
846    // current time is 30, then both buckets 1 and 2 are closed, but bucket 3 is still open.
847    (bucket_start + bucket_width_secs.get() - 1) < current_time || flush_open_buckets
848}
849
850// TODO: One thing we ought to consider is a property test, specifically a state machine property test, where we
851// generate a randomized offset to start time from, a bucket width, flush interval, and operations, and so on... and
852// then we run it to make sure that we are always generating sequential timestamps for data points, etc.
853#[cfg(test)]
854mod tests {
855    use float_cmp::ApproxEqRatio as _;
856    use saluki_core::{
857        components::ComponentContext,
858        topology::{interconnect::Dispatcher, ComponentId, OutputName},
859    };
860    use saluki_metrics::test::TestRecorder;
861    use stringtheory::MetaString;
862    use tokio::sync::mpsc;
863
864    use super::config::HistogramStatistic;
865    use super::*;
866
867    const BUCKET_WIDTH_SECS: NonZeroU64 = NonZeroU64::new(10).expect("not zero");
868    const BUCKET_WIDTH: Duration = Duration::from_secs(BUCKET_WIDTH_SECS.get());
869    const COUNTER_EXPIRE_SECS: u64 = 20;
870    const COUNTER_EXPIRE: Option<Duration> = Some(Duration::from_secs(COUNTER_EXPIRE_SECS));
871
872    /// Gets the bucket start timestamp for the given step.
873    const fn bucket_ts(step: u64) -> u64 {
874        align_to_bucket_start(insert_ts(step), BUCKET_WIDTH_SECS)
875    }
876
877    /// Gets the insert timestamp for the given step.
878    const fn insert_ts(step: u64) -> u64 {
879        (BUCKET_WIDTH_SECS.get() * (step + 1)) - 2
880    }
881
882    /// Gets the flush timestamp for the given step.
883    const fn flush_ts(step: u64) -> u64 {
884        BUCKET_WIDTH_SECS.get() * (step + 1)
885    }
886
887    struct DispatcherReceiver {
888        receiver: mpsc::Receiver<EventsBuffer>,
889    }
890
891    impl DispatcherReceiver {
892        fn collect_next(&mut self) -> Vec<Metric> {
893            match self.receiver.try_recv() {
894                Ok(event_buffer) => {
895                    let mut metrics = event_buffer
896                        .into_iter()
897                        .filter_map(|event| event.try_into_metric())
898                        .collect::<Vec<Metric>>();
899
900                    metrics.sort_by(|a, b| a.context().name().cmp(b.context().name()));
901                    metrics
902                }
903                Err(_) => Vec::new(),
904            }
905        }
906    }
907
908    /// Constructs a basic `Dispatcher` with a fixed-size event buffer.
909    fn build_basic_dispatcher() -> (EventsDispatcher, DispatcherReceiver) {
910        let component_id = ComponentId::try_from("test").expect("should not fail to create component ID");
911        let mut dispatcher = Dispatcher::new(ComponentContext::transform(component_id));
912
913        let (buffer_tx, buffer_rx) = mpsc::channel(1);
914        dispatcher.add_output(OutputName::Default).unwrap();
915        dispatcher
916            .attach_sender_to_output(&OutputName::Default, buffer_tx)
917            .unwrap();
918
919        (dispatcher, DispatcherReceiver { receiver: buffer_rx })
920    }
921
922    async fn get_flushed_metrics(timestamp: u64, state: &mut AggregationState) -> Vec<Metric> {
923        let (dispatcher, mut dispatcher_receiver) = build_basic_dispatcher();
924        let mut buffered_dispatcher = dispatcher.buffered().expect("default output should always exist");
925
926        // Flush the metrics to an event buffer.
927        state
928            .flush(timestamp, true, &mut buffered_dispatcher)
929            .await
930            .expect("should not fail to flush aggregation state");
931
932        // Flush our buffered dispatcher, which should ensure that the event buffer is sent out, and then read it from the
933        // receiver:
934        buffered_dispatcher
935            .flush()
936            .await
937            .expect("should not fail to flush buffered sender");
938
939        dispatcher_receiver.collect_next()
940    }
941
942    macro_rules! compare_points {
943        (scalar, $expected:expr, $actual:expr, $error_ratio:literal) => {
944            for (idx, (expected_value, actual_value)) in $expected.into_iter().zip($actual.into_iter()).enumerate() {
945                let (expected_ts, expected_point) = expected_value;
946                let (actual_ts, actual_point) = actual_value;
947
948                assert_eq!(
949                    expected_ts, actual_ts,
950                    "timestamp for value #{} does not match: {:?} (expected) vs {:?} (actual)",
951                    idx, expected_ts, actual_ts
952                );
953                assert!(
954                    expected_point.approx_eq_ratio(&actual_point, $error_ratio),
955                    "point for value #{} does not match: {} (expected) vs {} (actual)",
956                    idx,
957                    expected_point,
958                    actual_point
959                );
960            }
961        };
962        (distribution, $expected:expr, $actual:expr) => {
963            for (idx, (expected_value, actual_value)) in $expected.into_iter().zip($actual.into_iter()).enumerate() {
964                let (expected_ts, expected_sketch) = expected_value;
965                let (actual_ts, actual_sketch) = actual_value;
966
967                assert_eq!(
968                    expected_ts, actual_ts,
969                    "timestamp for value #{} does not match: {:?} (expected) vs {:?} (actual)",
970                    idx, expected_ts, actual_ts
971                );
972                assert_eq!(
973                    expected_sketch, actual_sketch,
974                    "sketch for value #{} does not match: {:?} (expected) vs {:?} (actual)",
975                    idx, expected_sketch, actual_sketch
976                );
977            }
978        };
979    }
980
981    macro_rules! assert_flushed_scalar_metric {
982        ($original:expr, $actual:expr, [$($ts:expr => $value:expr),+]) => {
983            assert_flushed_scalar_metric!($original, $actual, [$($ts => $value),+], error_ratio => 0.000001);
984        };
985        ($original:expr, $actual:expr, [$($ts:expr => $value:expr),+], error_ratio => $error_ratio:literal) => {
986            let actual_metric = $actual;
987
988            assert_eq!($original.context(), actual_metric.context(), "expected context ({}) and actual context ({}) do not match", $original.context(), actual_metric.context());
989
990            let expected_points = ScalarPoints::from([$(($ts, $value)),+]);
991
992            match actual_metric.values() {
993                MetricValues::Counter(ref actual_points) | MetricValues::Gauge(ref actual_points) | MetricValues::Rate(ref actual_points, _) => {
994                    assert_eq!(expected_points.len(), actual_points.len(), "expected and actual values have different number of points");
995                    compare_points!(scalar, expected_points, actual_points, $error_ratio);
996                },
997                _ => panic!("only counters, rates, and gauges are supported in assert_flushed_scalar_metric"),
998            }
999        };
1000    }
1001
1002    macro_rules! assert_flushed_distribution_metric {
1003        ($original:expr, $actual:expr, [$($ts:expr => $value:expr),+]) => {
1004            assert_flushed_distribution_metric!($original, $actual, [$($ts => $value),+], error_ratio => 0.000001);
1005        };
1006        ($original:expr, $actual:expr, [$($ts:expr => $value:expr),+], error_ratio => $error_ratio:literal) => {
1007            let actual_metric = $actual;
1008
1009            assert_eq!($original.context(), actual_metric.context());
1010
1011            match actual_metric.values() {
1012                MetricValues::Distribution(ref actual_points) => {
1013                    let expected_points = SketchPoints::from([$(($ts, $value)),+]);
1014                    assert_eq!(expected_points.len(), actual_points.len(), "expected and actual values have different number of points");
1015
1016                    compare_points!(distribution, &expected_points, actual_points);
1017                },
1018                _ => panic!("only distributions are supported in assert_flushed_distribution_metric"),
1019            }
1020        };
1021    }
1022
1023    #[test]
1024    fn bucket_is_closed() {
1025        // Cases are defined as:
1026        // (current time, bucket start, bucket width, flush open buckets, expected result)
1027        let cases = [
1028            // Bucket goes from [995, 1005), current time of 1000, so bucket is open.
1029            (1000, 995, BUCKET_WIDTH_SECS, false, false),
1030            (1000, 995, BUCKET_WIDTH_SECS, true, true),
1031            // Bucket goes from [1000, 1010), current time of 1000, so bucket is open.
1032            (1000, 1000, BUCKET_WIDTH_SECS, false, false),
1033            (1000, 1000, BUCKET_WIDTH_SECS, true, true),
1034            // Bucket goes from [1000, 1010), current time of 1010, so bucket is closed.
1035            (1010, 1000, BUCKET_WIDTH_SECS, false, true),
1036            (1010, 1000, BUCKET_WIDTH_SECS, true, true),
1037        ];
1038
1039        for (current_time, bucket_start, bucket_width_secs, flush_open_buckets, expected) in cases {
1040            let expected_reason = if expected {
1041                "closed, was open"
1042            } else {
1043                "open, was closed"
1044            };
1045
1046            assert_eq!(
1047                is_bucket_closed(current_time, bucket_start, bucket_width_secs, flush_open_buckets),
1048                expected,
1049                "expected bucket to be {} (current_time={}, bucket_start={}, bucket_width={}, flush_open_buckets={})",
1050                expected_reason,
1051                current_time,
1052                bucket_start,
1053                bucket_width_secs,
1054                flush_open_buckets
1055            );
1056        }
1057    }
1058
1059    #[tokio::test]
1060    async fn context_limit() {
1061        // Create our aggregation state with a context limit of 2.
1062        let mut state = AggregationState::new(
1063            BUCKET_WIDTH_SECS,
1064            2,
1065            COUNTER_EXPIRE,
1066            HistogramConfiguration::default(),
1067            Telemetry::noop(),
1068        );
1069
1070        // Create four unique gauges, and insert all of them. The third and fourth should fail because we've reached
1071        // the context limit.
1072        let input_metrics = [
1073            Metric::gauge("metric1", 1.0),
1074            Metric::gauge("metric2", 2.0),
1075            Metric::gauge("metric3", 3.0),
1076            Metric::gauge("metric4", 4.0),
1077        ];
1078
1079        assert!(!state.context_limit_breached());
1080
1081        assert!(state.insert(insert_ts(1), input_metrics[0].clone()));
1082        assert!(state.insert(insert_ts(1), input_metrics[1].clone()));
1083        assert!(!state.context_limit_breached());
1084
1085        assert!(!state.insert(insert_ts(1), input_metrics[2].clone()));
1086        assert!(state.context_limit_breached());
1087        assert!(!state.insert(insert_ts(1), input_metrics[3].clone()));
1088        assert!(state.context_limit_breached());
1089
1090        // We should only see the first two gauges after flushing. The flush should also clear the breached flag since
1091        // contexts drop below the limit.
1092        let flushed_metrics = get_flushed_metrics(flush_ts(1), &mut state).await;
1093        assert_eq!(flushed_metrics.len(), 2);
1094        assert_eq!(input_metrics[0].context(), flushed_metrics[0].context());
1095        assert_eq!(input_metrics[1].context(), flushed_metrics[1].context());
1096        assert!(!state.context_limit_breached());
1097
1098        // We should be able to insert the third and fourth gauges now as the first two have been flushed, and along
1099        // with them, their contexts should no longer be tracked in the aggregation state:
1100        assert!(state.insert(insert_ts(2), input_metrics[2].clone()));
1101        assert!(state.insert(insert_ts(2), input_metrics[3].clone()));
1102
1103        let flushed_metrics = get_flushed_metrics(flush_ts(2), &mut state).await;
1104        assert_eq!(flushed_metrics.len(), 2);
1105        assert_eq!(input_metrics[2].context(), flushed_metrics[0].context());
1106        assert_eq!(input_metrics[3].context(), flushed_metrics[1].context());
1107    }
1108
1109    #[tokio::test]
1110    async fn context_limit_with_zero_value_counters() {
1111        // We test here to ensure that zero-value counters contribute to the context limit.
1112        let mut state = AggregationState::new(
1113            BUCKET_WIDTH_SECS,
1114            2,
1115            COUNTER_EXPIRE,
1116            HistogramConfiguration::default(),
1117            Telemetry::noop(),
1118        );
1119
1120        // Create our input metrics.
1121        let input_metrics = [
1122            Metric::counter("metric1", 1.0),
1123            Metric::counter("metric2", 2.0),
1124            Metric::counter("metric3", 3.0),
1125        ];
1126
1127        assert!(state.insert(insert_ts(1), input_metrics[0].clone()));
1128        assert!(state.insert(insert_ts(1), input_metrics[1].clone()));
1129
1130        // Flush the aggregation state, and observe they're both present.
1131        let flushed_metrics = get_flushed_metrics(flush_ts(1), &mut state).await;
1132        assert_eq!(flushed_metrics.len(), 2);
1133        assert_flushed_scalar_metric!(&input_metrics[0], &flushed_metrics[0], [bucket_ts(1) => 1.0]);
1134        assert_flushed_scalar_metric!(&input_metrics[1], &flushed_metrics[1], [bucket_ts(1) => 2.0]);
1135
1136        // Flush _again_ to ensure that we then emit zero-value variants for both counters.
1137        let flushed_metrics = get_flushed_metrics(flush_ts(2), &mut state).await;
1138        assert_eq!(flushed_metrics.len(), 2);
1139        assert_flushed_scalar_metric!(&input_metrics[0], &flushed_metrics[0], [bucket_ts(2) => 0.0]);
1140        assert_flushed_scalar_metric!(&input_metrics[1], &flushed_metrics[1], [bucket_ts(2) => 0.0]);
1141
1142        // Now try to insert a third counter, which should fail because we've reached the context limit.
1143        assert!(!state.insert(insert_ts(3), input_metrics[2].clone()));
1144
1145        // Flush the aggregation state, and observe that we only see the two original counters.
1146        let flushed_metrics = get_flushed_metrics(flush_ts(3), &mut state).await;
1147        assert_eq!(flushed_metrics.len(), 2);
1148        assert_flushed_scalar_metric!(&input_metrics[0], &flushed_metrics[0], [bucket_ts(3) => 0.0]);
1149        assert_flushed_scalar_metric!(&input_metrics[1], &flushed_metrics[1], [bucket_ts(3) => 0.0]);
1150
1151        // With a fourth flush interval, the two counters should now have expired, and thus be dropped and no longer
1152        // contributing to the context limit.
1153        let flushed_metrics = get_flushed_metrics(flush_ts(4), &mut state).await;
1154        assert_eq!(flushed_metrics.len(), 0);
1155
1156        // Now we should be able to insert the third counter, and it should be the only one present after flushing.
1157        assert!(state.insert(insert_ts(5), input_metrics[2].clone()));
1158
1159        let flushed_metrics = get_flushed_metrics(flush_ts(5), &mut state).await;
1160        assert_eq!(flushed_metrics.len(), 1);
1161        assert_flushed_scalar_metric!(&input_metrics[2], &flushed_metrics[0], [bucket_ts(5) => 3.0]);
1162    }
1163
1164    #[tokio::test]
1165    async fn zero_value_counters() {
1166        // We're testing that we properly emit and expire zero-value counters in all relevant scenarios.
1167        let mut state = AggregationState::new(
1168            BUCKET_WIDTH_SECS,
1169            10,
1170            COUNTER_EXPIRE,
1171            HistogramConfiguration::default(),
1172            Telemetry::noop(),
1173        );
1174
1175        // Create two unique counters, and insert both of them.
1176        let input_metrics = [Metric::counter("metric1", 1.0), Metric::counter("metric2", 2.0)];
1177
1178        assert!(state.insert(insert_ts(1), input_metrics[0].clone()));
1179        assert!(state.insert(insert_ts(1), input_metrics[1].clone()));
1180
1181        // Flush the aggregation state, and observe they're both present.
1182        let flushed_metrics = get_flushed_metrics(flush_ts(1), &mut state).await;
1183        assert_eq!(flushed_metrics.len(), 2);
1184        assert_flushed_scalar_metric!(&input_metrics[0], &flushed_metrics[0], [bucket_ts(1) => 1.0]);
1185        assert_flushed_scalar_metric!(&input_metrics[1], &flushed_metrics[1], [bucket_ts(1) => 2.0]);
1186
1187        // Perform our second flush, which should have them as zero-value counters.
1188        let flushed_metrics = get_flushed_metrics(flush_ts(2), &mut state).await;
1189        assert_eq!(flushed_metrics.len(), 2);
1190        assert_flushed_scalar_metric!(&input_metrics[0], &flushed_metrics[0], [bucket_ts(2) => 0.0]);
1191        assert_flushed_scalar_metric!(&input_metrics[1], &flushed_metrics[1], [bucket_ts(2) => 0.0]);
1192
1193        // Now, we'll pretend to skip a flush period and add updates to them again after that.
1194        assert!(state.insert(insert_ts(4), input_metrics[0].clone()));
1195        assert!(state.insert(insert_ts(4), input_metrics[1].clone()));
1196
1197        // Flush the aggregation state, and observe that we have two zero-value counters for the flush period we
1198        // skipped, but that we see them appear again in the fourth flush period.
1199        let flushed_metrics = get_flushed_metrics(flush_ts(4), &mut state).await;
1200        assert_eq!(flushed_metrics.len(), 2);
1201        assert_flushed_scalar_metric!(&input_metrics[0], &flushed_metrics[0], [bucket_ts(3) => 0.0, bucket_ts(4) => 1.0]);
1202        assert_flushed_scalar_metric!(&input_metrics[1], &flushed_metrics[1], [bucket_ts(3) => 0.0, bucket_ts(4) => 2.0]);
1203
1204        // Now we'll skip multiple flush periods and ensure that we emit zero-value counters up until the point they
1205        // expire. As our zero-value counter expiration is 20 seconds, this is two flush periods, so we skip by three
1206        // flush periods, and we should only see the counters emitted for the first two.
1207        let flushed_metrics = get_flushed_metrics(flush_ts(7), &mut state).await;
1208        assert_eq!(flushed_metrics.len(), 2);
1209        assert_flushed_scalar_metric!(&input_metrics[0], &flushed_metrics[0], [bucket_ts(5) => 0.0, bucket_ts(6) => 0.0]);
1210        assert_flushed_scalar_metric!(&input_metrics[1], &flushed_metrics[1], [bucket_ts(5) => 0.0, bucket_ts(6) => 0.0]);
1211    }
1212
1213    #[tokio::test]
1214    async fn merge_identical_timestamped_values_on_flush() {
1215        // We're testing that we properly emit and expire zero-value counters in all relevant scenarios.
1216        let mut state = AggregationState::new(
1217            BUCKET_WIDTH_SECS,
1218            10,
1219            COUNTER_EXPIRE,
1220            HistogramConfiguration::default(),
1221            Telemetry::noop(),
1222        );
1223
1224        // Create one multi-value counter, and insert it.
1225        let input_metric = Metric::counter("metric1", [1.0, 2.0, 3.0, 4.0, 5.0]);
1226
1227        assert!(state.insert(insert_ts(1), input_metric.clone()));
1228
1229        // Flush the aggregation state, and observe the metric is present _and_ that we've properly merged all of the
1230        // values within the same timestamp.
1231        let flushed_metrics = get_flushed_metrics(flush_ts(1), &mut state).await;
1232        assert_eq!(flushed_metrics.len(), 1);
1233        assert_flushed_scalar_metric!(&input_metric, &flushed_metrics[0], [bucket_ts(1) => 15.0]);
1234    }
1235
1236    #[tokio::test]
1237    async fn histogram_statistics() {
1238        // We're testing that we properly emit individual metrics (min, max, sum, etc) for a histogram.
1239        let hist_config = HistogramConfiguration::from_statistics(
1240            &[
1241                HistogramStatistic::Count,
1242                HistogramStatistic::Sum,
1243                HistogramStatistic::Percentile {
1244                    q: 0.5,
1245                    suffix: "p50".into(),
1246                },
1247            ],
1248            false,
1249            "".into(),
1250        );
1251        let mut state = AggregationState::new(BUCKET_WIDTH_SECS, 10, COUNTER_EXPIRE, hist_config, Telemetry::noop());
1252
1253        // Create one multi-value histogram and insert it.
1254        let input_metric = Metric::histogram("metric1", [1.0, 2.0, 3.0, 4.0, 5.0]);
1255        assert!(state.insert(insert_ts(1), input_metric.clone()));
1256
1257        // Flush the aggregation state, and observe that we've emitted all of the configured distribution statistics in
1258        // the form of three metrics: count, sum, and p50.
1259        let flushed_metrics = get_flushed_metrics(flush_ts(1), &mut state).await;
1260        assert_eq!(flushed_metrics.len(), 3);
1261
1262        // Create versions of the metric for each of the statistics we're expecting to emit. The values themselves don't
1263        // matter here, but we do need a `Metric` for it to compare the context to.
1264        let count_metric = Metric::rate("metric1.count", 0.0, Duration::from_secs(BUCKET_WIDTH_SECS.get()));
1265        let sum_metric = Metric::gauge("metric1.sum", 0.0);
1266        let p50_metric = Metric::gauge("metric1.p50", 0.0);
1267
1268        // We use a less strict error ratio (how much the expected vs actual) for the percentile check, as we generally
1269        // expect the value to be somewhat off the exact value due to the lossy nature of `DDSketch`.
1270        assert_flushed_scalar_metric!(count_metric, &flushed_metrics[0], [bucket_ts(1) => 5.0]);
1271        assert_flushed_scalar_metric!(p50_metric, &flushed_metrics[1], [bucket_ts(1) => 3.0], error_ratio => 0.0025);
1272        assert_flushed_scalar_metric!(sum_metric, &flushed_metrics[2], [bucket_ts(1) => 15.0]);
1273    }
1274
1275    #[tokio::test]
1276    async fn histogram_statistics_unit_propagation() {
1277        // We're testing that the unit from the input histogram metadata propagates to all flushed output metrics.
1278        let hist_config = HistogramConfiguration::from_statistics(
1279            &[
1280                HistogramStatistic::Count,
1281                HistogramStatistic::Sum,
1282                HistogramStatistic::Percentile {
1283                    q: 0.5,
1284                    suffix: "p50".into(),
1285                },
1286            ],
1287            false,
1288            "".into(),
1289        );
1290        let mut state = AggregationState::new(BUCKET_WIDTH_SECS, 10, COUNTER_EXPIRE, hist_config, Telemetry::noop());
1291
1292        // Build a histogram with unit = "millisecond", simulating what arrives from a DogStatsD `ms` metric.
1293        let context = Context::from_static_parts("metric1", &[]);
1294        let metadata = MetricMetadata::default().with_unit(MetaString::from_static("millisecond"));
1295        let input_metric = Metric::from_parts(
1296            context,
1297            MetricValues::histogram([1.0_f64, 2.0, 3.0, 4.0, 5.0]),
1298            metadata,
1299        );
1300        assert!(state.insert(insert_ts(1), input_metric));
1301
1302        let flushed_metrics = get_flushed_metrics(flush_ts(1), &mut state).await;
1303        assert_eq!(flushed_metrics.len(), 3);
1304
1305        // Counts are dimensionless: the `.count` series must drop the unit while all other
1306        // aggregate series carry the unit from the input histogram.
1307        for metric in &flushed_metrics {
1308            let name = metric.context().name();
1309            if name.ends_with(".count") {
1310                assert_eq!(
1311                    metric.metadata().unit(),
1312                    None,
1313                    "flushed metric '{}' should be dimensionless",
1314                    name
1315                );
1316            } else {
1317                assert_eq!(
1318                    metric.metadata().unit(),
1319                    Some("millisecond"),
1320                    "flushed metric '{}' should carry unit='millisecond'",
1321                    name
1322                );
1323            }
1324        }
1325    }
1326
1327    #[tokio::test]
1328    async fn distributions() {
1329        // We're testing that we pass through distributions untouched.
1330        let mut state = AggregationState::new(
1331            BUCKET_WIDTH_SECS,
1332            10,
1333            COUNTER_EXPIRE,
1334            HistogramConfiguration::default(),
1335            Telemetry::noop(),
1336        );
1337
1338        // Create one multi-value distribution, with server-side aggregation, and insert it.
1339        let values = [1.0, 2.0, 3.0, 4.0, 5.0];
1340        let input_metric = Metric::distribution("metric1", &values[..]);
1341
1342        assert!(state.insert(insert_ts(1), input_metric.clone()));
1343
1344        // Flush the aggregation state, and observe that we've emitted the original distribution.
1345        let flushed_metrics = get_flushed_metrics(flush_ts(1), &mut state).await;
1346        assert_eq!(flushed_metrics.len(), 1);
1347
1348        assert_flushed_distribution_metric!(&input_metric, &flushed_metrics[0], [bucket_ts(1) => &values[..]]);
1349    }
1350
1351    #[tokio::test]
1352    async fn histogram_copy_to_distribution() {
1353        let hist_config = HistogramConfiguration::from_statistics(
1354            &[
1355                HistogramStatistic::Count,
1356                HistogramStatistic::Sum,
1357                HistogramStatistic::Percentile {
1358                    q: 0.5,
1359                    suffix: "p50".into(),
1360                },
1361            ],
1362            true,
1363            "dist_prefix.".into(),
1364        );
1365        let mut state = AggregationState::new(BUCKET_WIDTH_SECS, 10, COUNTER_EXPIRE, hist_config, Telemetry::noop());
1366
1367        // Create one multi-value histogram and insert it.
1368        let values = [1.0, 2.0, 3.0, 4.0, 5.0];
1369        let input_metric = Metric::histogram("metric1", values);
1370        assert!(state.insert(insert_ts(1), input_metric.clone()));
1371
1372        // Flush the aggregation state, and observe that we've emitted all of the configured distribution statistics in
1373        // the form of three metrics: count, sum, and p50 as well as the additional metric from copying the histogram.
1374        let flushed_metrics = get_flushed_metrics(flush_ts(1), &mut state).await;
1375        assert_eq!(flushed_metrics.len(), 4);
1376
1377        // Create versions of the metric for each of the statistics we're expecting to emit. The values themselves don't
1378        // matter here, but we do need a `Metric` for it to compare the context to.
1379        let count_metric = Metric::rate("metric1.count", 0.0, BUCKET_WIDTH);
1380        let sum_metric = Metric::gauge("metric1.sum", 0.0);
1381        let p50_metric = Metric::gauge("metric1.p50", 0.0);
1382        let expected_distribution = Metric::distribution("dist_prefix.metric1", &values[..]);
1383
1384        // We use a less strict error ratio (how much the expected vs actual) for the percentile check, as we generally
1385        // expect the value to be somewhat off the exact value due to the lossy nature of `DDSketch`.
1386        assert_flushed_distribution_metric!(expected_distribution, &flushed_metrics[0], [bucket_ts(1) => &values[..]]);
1387        assert_flushed_scalar_metric!(count_metric, &flushed_metrics[1], [bucket_ts(1) => 5.0]);
1388        assert_flushed_scalar_metric!(p50_metric, &flushed_metrics[2], [bucket_ts(1) => 3.0], error_ratio => 0.0025);
1389        assert_flushed_scalar_metric!(sum_metric, &flushed_metrics[3], [bucket_ts(1) => 15.0]);
1390    }
1391
1392    #[tokio::test]
1393    async fn nonaggregated_counters_to_rate() {
1394        let counter_value = 42.0;
1395
1396        // Create a basic aggregation state.
1397        let mut state = AggregationState::new(
1398            BUCKET_WIDTH_SECS,
1399            10,
1400            COUNTER_EXPIRE,
1401            HistogramConfiguration::default(),
1402            Telemetry::noop(),
1403        );
1404
1405        // Create a simple non-aggregated counter, and insert it.
1406        let input_metric = Metric::counter("metric1", counter_value);
1407        assert!(state.insert(insert_ts(1), input_metric.clone()));
1408
1409        // Flush the aggregation state, and observe that we've emitted the expected counter and that it has the right
1410        // value, but specifically that it's a rate with an interval that matches our configured bucket width:
1411        let flushed_metrics = get_flushed_metrics(flush_ts(1), &mut state).await;
1412        assert_eq!(flushed_metrics.len(), 1);
1413        let flushed_metric = &flushed_metrics[0];
1414
1415        assert_flushed_scalar_metric!(&input_metric, flushed_metric, [bucket_ts(1) => counter_value]);
1416        assert_eq!(flushed_metric.values().as_str(), "rate");
1417    }
1418
1419    #[tokio::test]
1420    async fn preaggregated_counters_to_rate() {
1421        let counter_value = 42.0;
1422        let timestamp = 123456;
1423
1424        // Create a basic passthrough batcher and forwarder.
1425        let mut batcher = PassthroughBatcher::new(Duration::from_nanos(1), BUCKET_WIDTH_SECS, Telemetry::noop()).await;
1426        let (dispatcher, mut dispatcher_receiver) = build_basic_dispatcher();
1427
1428        // Create a simple pre-aggregated counter, and batch it.
1429        let input_metric = Metric::counter("metric1", (timestamp, counter_value));
1430        batcher.push_metric(input_metric.clone(), &dispatcher).await;
1431
1432        // Flush the batcher, and observe that we've emitted the expected counter and that it has the right
1433        // value, but specifically that it's a rate with an interval that matches our configured bucket width:
1434        batcher.try_flush(&dispatcher).await;
1435
1436        let mut flushed_metrics = dispatcher_receiver.collect_next();
1437        assert_eq!(flushed_metrics.len(), 1);
1438        assert_eq!(
1439            Metric::rate("metric1", (timestamp, counter_value), BUCKET_WIDTH),
1440            flushed_metrics.remove(0)
1441        );
1442    }
1443
1444    #[tokio::test]
1445    async fn telemetry() {
1446        // TODO: We don't check `component_events_dropped_total` or `aggregate_passthrough_metrics_total` here as
1447        // they're set directly in the aggregate component future rather than `AggregationState`, which is harder to
1448        // drive overall and would have required even more boilerplate.
1449        //
1450        // Leaving that as a future improvement.
1451
1452        let recorder = TestRecorder::default();
1453        let _local = metrics::set_default_local_recorder(&recorder);
1454
1455        let builder = MetricsBuilder::default();
1456        let telemetry = Telemetry::new(&builder);
1457
1458        let mut state = AggregationState::new(
1459            BUCKET_WIDTH_SECS,
1460            2,
1461            COUNTER_EXPIRE,
1462            HistogramConfiguration::default(),
1463            telemetry,
1464        );
1465
1466        // Make sure our telemetry is registered at default values.
1467        assert_eq!(recorder.gauge("aggregate_active_contexts"), Some(0.0));
1468        assert_eq!(recorder.counter("aggregate_passthrough_metrics_total"), Some(0));
1469        assert_eq!(
1470            recorder.counter(("component_events_dropped_total", &[("intentional", "true")])),
1471            Some(0)
1472        );
1473        for metric_type in &["counter", "gauge", "rate", "set", "histogram", "distribution"] {
1474            assert_eq!(
1475                recorder.gauge(("aggregate_active_contexts_by_type", &[("metric_type", *metric_type)])),
1476                Some(0.0)
1477            );
1478        }
1479
1480        // Insert a counter with a non-timestamped value.
1481        assert!(state.insert(insert_ts(1), Metric::counter("metric1", 42.0)));
1482        assert_eq!(recorder.gauge("aggregate_active_contexts"), Some(1.0));
1483        assert_eq!(
1484            recorder.gauge(("aggregate_active_contexts_by_type", &[("metric_type", "counter")])),
1485            Some(1.0)
1486        );
1487        assert_eq!(recorder.counter("aggregate_passthrough_metrics_total"), Some(0));
1488
1489        // Insert a gauge with a timestamped value.
1490        assert!(state.insert(insert_ts(1), Metric::gauge("metric2", (insert_ts(1), 42.0))));
1491        assert_eq!(recorder.gauge("aggregate_active_contexts"), Some(2.0));
1492        assert_eq!(
1493            recorder.gauge(("aggregate_active_contexts_by_type", &[("metric_type", "gauge")])),
1494            Some(1.0)
1495        );
1496
1497        // We've reached our context limit at this point, so the next metric should not be inserted.
1498        assert!(!state.insert(insert_ts(1), Metric::counter("metric3", 42.0)));
1499        assert_eq!(recorder.gauge("aggregate_active_contexts"), Some(2.0));
1500        assert_eq!(
1501            recorder.gauge(("aggregate_active_contexts_by_type", &[("metric_type", "counter")])),
1502            Some(1.0)
1503        );
1504
1505        // Now let's flush the state which should flush the gauge entirely, reducing the context count, but not flush
1506        // the counter, since it'll be in zero-value mode.
1507        let _ = get_flushed_metrics(flush_ts(1), &mut state).await;
1508        assert_eq!(recorder.gauge("aggregate_active_contexts"), Some(1.0));
1509        assert_eq!(
1510            recorder.gauge(("aggregate_active_contexts_by_type", &[("metric_type", "counter")])),
1511            Some(1.0)
1512        );
1513        assert_eq!(
1514            recorder.gauge(("aggregate_active_contexts_by_type", &[("metric_type", "gauge")])),
1515            Some(0.0)
1516        );
1517    }
1518}
1519
1520#[cfg(test)]
1521mod config_smoke {
1522    use serde_json::json;
1523
1524    use super::AggregateConfiguration;
1525    use crate::config_registry::structs;
1526    use crate::config_registry::test_support::run_config_smoke_tests;
1527
1528    #[tokio::test]
1529    async fn smoke_test() {
1530        // Duration fields serialize as {secs, nanos}. We inject whole-second values so the nanos
1531        // sub-fields are always 0 — they are not independently configurable.
1532        run_config_smoke_tests(
1533            structs::AGGREGATE_CONFIGURATION,
1534            &[
1535                "aggregate_flush_interval.nanos",
1536                "aggregate_passthrough_idle_flush_timeout.nanos",
1537            ],
1538            json!({}),
1539            |cfg| {
1540                cfg.as_typed::<AggregateConfiguration>()
1541                    .expect("AggregateConfiguration should deserialize")
1542            },
1543        )
1544        .await
1545    }
1546}