saluki_components/transforms/aggregate/
mod.rs

1use std::{
2    num::NonZeroU64,
3    time::{Duration, Instant},
4};
5
6use async_trait::async_trait;
7use ddsketch_agent::DDSketch;
8use hashbrown::{hash_map::Entry, HashMap};
9use memory_accounting::{MemoryBounds, MemoryBoundsBuilder, UsageExpr};
10use saluki_common::time::get_unix_timestamp;
11use saluki_config::GenericConfiguration;
12use saluki_context::Context;
13use saluki_core::{
14    components::{transforms::*, ComponentContext},
15    data_model::event::{metric::*, Event, EventType},
16    observability::ComponentMetricsExt as _,
17    topology::{interconnect::BufferedDispatcher, OutputDefinition},
18    topology::{EventsBuffer, EventsDispatcher},
19};
20use saluki_error::GenericError;
21use saluki_metrics::MetricsBuilder;
22use serde::Deserialize;
23use smallvec::SmallVec;
24use tokio::{
25    select,
26    time::{interval, interval_at},
27};
28use tracing::{debug, error, trace};
29
30mod telemetry;
31use self::telemetry::Telemetry;
32
33mod config;
34use self::config::HistogramConfiguration;
35
36const PASSTHROUGH_IDLE_FLUSH_CHECK_INTERVAL: Duration = Duration::from_secs(2);
37
38const fn default_window_duration() -> Duration {
39    Duration::from_secs(10)
40}
41
42const fn default_primary_flush_interval() -> Duration {
43    Duration::from_secs(15)
44}
45
46const fn default_context_limit() -> usize {
47    5000
48}
49
50const fn default_counter_expiry_seconds() -> Option<u64> {
51    Some(300)
52}
53
54const fn default_passthrough_timestamped_metrics() -> bool {
55    true
56}
57
58const fn default_passthrough_idle_flush_timeout() -> Duration {
59    Duration::from_secs(1)
60}
61
62/// Aggregate transform.
63///
64/// Aggregates metrics into fixed-size windows, flushing them at a regular interval.
65///
66/// ## Zero-value counters
67///
68/// When metrics are aggregated and then flushed, they are typically removed entirely from the aggregation state. Unless
69/// they are updated again, they will not be emitted again. However, for counters, a slightly different approach is
70/// taken by tracking "zero-value" counters.
71///
72/// Counters are aggregated and flushed normally. However, when flushed, counters are added to a list of "zero-value"
73/// counters, and if those counters are not updated again, the transform emits a copy of the counter with a value of
74/// zero. It does this until the counter is updated again, or the zero-value counter expires (no updates), whichever
75/// comes first.
76///
77/// This provides a continuity in the output of a counter, from the perspective of a downstream system, when counters
78/// are otherwise sparse. The expiration period is configurable, and allows a trade-off in how sparse/infrequent the
79/// updates to counters can be versus how long it takes for counters that don't exist anymore to actually cease to be
80/// emitted.
81#[derive(Deserialize)]
82pub struct AggregateConfiguration {
83    /// Size of the aggregation window.
84    ///
85    /// Metrics are aggregated into fixed-size windows, such that all updates to the same metric within a window are
86    /// aggregated into a single metric. The window size controls how efficiently metrics are aggregated, and in turn,
87    /// how many data points are emitted downstream.
88    ///
89    /// Defaults to 10 seconds.
90    #[serde(rename = "aggregate_window_duration", default = "default_window_duration")]
91    window_duration: Duration,
92
93    /// How often to flush buckets.
94    ///
95    /// This represents a trade-off between the savings in network bandwidth (sending fewer requests to downstream
96    /// systems, etc) and the frequency of updates (how often updates to a metric are emitted).
97    ///
98    /// Defaults to 15 seconds.
99    #[serde(rename = "aggregate_flush_interval", default = "default_primary_flush_interval")]
100    primary_flush_interval: Duration,
101
102    /// Maximum number of contexts to aggregate per window.
103    ///
104    /// A context is the unique combination of a metric name and its set of tags. For example,
105    /// `metric.name.here{tag1=A,tag2=B}` represents a single context, and would be different than
106    /// `metric.name.here{tag1=A,tag2=C}`.
107    ///
108    /// When the maximum number of contexts is reached in the current aggregation window, additional metrics are dropped
109    /// until the next window starts.
110    ///
111    /// Defaults to 1000.
112    #[serde(rename = "aggregate_context_limit", default = "default_context_limit")]
113    context_limit: usize,
114
115    /// Whether to flush open buckets when stopping the transform.
116    ///
117    /// Normally, open buckets (a bucket whose end has not yet occurred) are not flushed when the transform is stopped.
118    /// This is done to avoid the chance of flushing a partial window, restarting the process, and then flushing the
119    /// same window again. Downstream systems sometimes cannot cope with this gracefully, as there is no way to
120    /// determine that it is an incremental update, and so they treat it as an absolute update, overwriting the
121    /// previously flushed value.
122    ///
123    /// In cases where flushing all outstanding data is paramount, this can be enabled.
124    ///
125    /// Defaults to `false`.
126    #[serde(rename = "aggregate_flush_open_windows", default)]
127    flush_open_windows: bool,
128
129    /// How long to keep idle counters alive after they've been flushed, in seconds.
130    ///
131    /// When metrics are flushed, they are removed from the aggregation state. However, if a counter expiration is set,
132    /// counters will be kept alive in an "idle" state. For as long as a counter is idle, but not yet expired, a zero
133    /// value will be emitted for it during each flush. This allows more gracefully handling sparse counters, where
134    /// updates are infrequent but leaving gaps in the time series would be undesirable from a user experience
135    /// perspective.
136    ///
137    /// After a counter has been idle (no updates) for longer than the expiry period, it will be completely removed and
138    /// no further zero values will be emitted.
139    ///
140    /// Defaults to 300 seconds (5 minutes). Setting a value of `0` disables idle counter keep-alive.
141    #[serde(alias = "dogstatsd_expiry_seconds", default = "default_counter_expiry_seconds")]
142    counter_expiry_seconds: Option<u64>,
143
144    /// Whether or not to immediately forward (passthrough) metrics with pre-defined timestamps.
145    ///
146    /// When enabled, this causes the aggregator to immediately forward metrics that already have a timestamp present.
147    /// Only metrics without a timestamp will be aggregated. This can be useful when metrics are already pre-aggregated
148    /// client-side and both timeliness and memory efficiency are paramount, as it avoids the overhead of aggregating
149    /// within the pipeline.
150    ///
151    /// Defaults to `true`.
152    #[serde(
153        rename = "dogstatsd_no_aggregation_pipeline",
154        default = "default_passthrough_timestamped_metrics"
155    )]
156    passthrough_timestamped_metrics: bool,
157
158    /// How often to flush buffered passthrough metrics.
159    ///
160    /// While passthrough metrics are not re-aggregated by the transform, they will still be temporarily buffered in
161    /// order to optimize the efficiency of processing them in the next component. This setting controls the maximum
162    /// amount of time that passthrough metrics will be buffered before being forwarded.
163    ///
164    /// Defaults to 1 seconds.
165    #[serde(
166        rename = "aggregate_passthrough_idle_flush_timeout",
167        default = "default_passthrough_idle_flush_timeout"
168    )]
169    passthrough_idle_flush_timeout: Duration,
170
171    /// Histogram aggregation configuration.
172    ///
173    /// Controls the aggregates/percentiles that are generated for distributions in "histogram" mode (client-side
174    /// distribution aggregation).
175    #[serde(flatten)]
176    hist_config: HistogramConfiguration,
177}
178
179impl AggregateConfiguration {
180    /// Creates a new `AggregateConfiguration` from the given configuration.
181    pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
182        Ok(config.as_typed()?)
183    }
184
185    /// Creates a new `AggregateConfiguration` with default values.
186    pub fn with_defaults() -> Self {
187        Self {
188            window_duration: default_window_duration(),
189            primary_flush_interval: default_primary_flush_interval(),
190            context_limit: default_context_limit(),
191            flush_open_windows: false,
192            counter_expiry_seconds: default_counter_expiry_seconds(),
193            passthrough_timestamped_metrics: default_passthrough_timestamped_metrics(),
194            passthrough_idle_flush_timeout: default_passthrough_idle_flush_timeout(),
195            hist_config: HistogramConfiguration::default(),
196        }
197    }
198}
199
200#[async_trait]
201impl TransformBuilder for AggregateConfiguration {
202    async fn build(&self, context: ComponentContext) -> Result<Box<dyn Transform + Send>, GenericError> {
203        let metrics_builder = MetricsBuilder::from_component_context(&context);
204        let telemetry = Telemetry::new(&metrics_builder);
205
206        let state = AggregationState::new(
207            self.window_duration,
208            self.context_limit,
209            self.counter_expiry_seconds.filter(|s| *s != 0).map(Duration::from_secs),
210            self.hist_config.clone(),
211            telemetry.clone(),
212        );
213
214        let passthrough_batcher = PassthroughBatcher::new(
215            self.passthrough_idle_flush_timeout,
216            self.window_duration,
217            telemetry.clone(),
218        )
219        .await;
220
221        Ok(Box::new(Aggregate {
222            state,
223            telemetry,
224            primary_flush_interval: self.primary_flush_interval,
225            flush_open_windows: self.flush_open_windows,
226            passthrough_batcher,
227            passthrough_timestamped_metrics: self.passthrough_timestamped_metrics,
228        }))
229    }
230
231    fn input_event_type(&self) -> EventType {
232        EventType::Metric
233    }
234
235    fn outputs(&self) -> &[OutputDefinition] {
236        static OUTPUTS: &[OutputDefinition] = &[OutputDefinition::default_output(EventType::Metric)];
237
238        OUTPUTS
239    }
240}
241
242impl MemoryBounds for AggregateConfiguration {
243    fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
244        // TODO: While we account for the aggregation state map accurately, what we don't currently account for is the
245        // fact that a metric could have multiple distinct values. For the common pipeline of metrics in via DogStatsD,
246        // this generally shouldn't be a problem because the values don't have a timestamp, so they get aggregated into
247        // the same bucket, leading to two values per `MetricValues` at most, which is already baked into the size of
248        // `MetricValues` due to using `SmallVec`.
249        //
250        // However, there could be many more values in a single metric, and we don't account for that.
251
252        builder
253            .minimum()
254            // Capture the size of the heap allocation when the component is built.
255            .with_single_value::<Aggregate>("component struct");
256        builder
257            .firm()
258            // Account for the aggregation state map, where we map contexts to the merged metric.
259            .with_expr(UsageExpr::product(
260                "aggregation state map",
261                UsageExpr::sum(
262                    "context map entry",
263                    UsageExpr::struct_size::<Context>("context"),
264                    UsageExpr::struct_size::<AggregatedMetric>("aggregated metric"),
265                ),
266                UsageExpr::config("aggregate_context_limit", self.context_limit),
267            ));
268    }
269}
270
271pub struct Aggregate {
272    state: AggregationState,
273    telemetry: Telemetry,
274    primary_flush_interval: Duration,
275    flush_open_windows: bool,
276    passthrough_batcher: PassthroughBatcher,
277    passthrough_timestamped_metrics: bool,
278}
279
280#[async_trait]
281impl Transform for Aggregate {
282    async fn run(mut self: Box<Self>, mut context: TransformContext) -> Result<(), GenericError> {
283        let mut health = context.take_health_handle();
284
285        let mut primary_flush = interval_at(
286            tokio::time::Instant::now() + self.primary_flush_interval,
287            self.primary_flush_interval,
288        );
289        let mut final_primary_flush = false;
290
291        let passthrough_flush = interval(PASSTHROUGH_IDLE_FLUSH_CHECK_INTERVAL);
292
293        health.mark_ready();
294        debug!("Aggregation transform started.");
295
296        tokio::pin!(passthrough_flush);
297
298        loop {
299            select! {
300                _ = health.live() => continue,
301                _ = primary_flush.tick() => {
302                    // We've reached the end of the current window. Flush our aggregation state and forward the metrics
303                    // onwards. Regardless of whether any metrics were aggregated, we always update the aggregation
304                    // state to track the start time of the current aggregation window.
305                    if !self.state.is_empty() {
306                        debug!("Flushing aggregated metrics...");
307
308                        let should_flush_open_windows = final_primary_flush && self.flush_open_windows;
309
310                        let mut dispatcher = context.dispatcher().buffered().expect("default output should always exist");
311                        if let Err(e) = self.state.flush(get_unix_timestamp(), should_flush_open_windows, &mut dispatcher).await {
312                            error!(error = %e, "Failed to flush aggregation state.");
313                        }
314
315                        self.telemetry.increment_flushes();
316
317                        match dispatcher.flush().await {
318                            Ok(aggregated_events) => debug!(aggregated_events, "Dispatched events."),
319                            Err(e) => error!(error = %e, "Failed to flush aggregated events."),
320                        }
321                    }
322
323                    // If this is the final flush, we break out of the loop.
324                    if final_primary_flush {
325                        debug!("All aggregation complete.");
326                        break
327                    }
328                },
329                _ = passthrough_flush.tick() => self.passthrough_batcher.try_flush(context.dispatcher()).await,
330                maybe_events = context.events().next(), if !final_primary_flush => match maybe_events {
331                    Some(events) => {
332                        trace!(events_len = events.len(), "Received events.");
333
334                        let current_time = get_unix_timestamp();
335                        let mut processed_passthrough_metrics = false;
336
337                        for event in events {
338                            if let Some(metric) = event.try_into_metric() {
339                                let metric = if self.passthrough_timestamped_metrics {
340                                    // Try splitting out any timestamped values, and if we have any, we'll buffer them
341                                    // separately and process the remaining nontimestamped metric (if any) by
342                                    // aggregating it like normal.
343                                    let (maybe_timestamped_metric, maybe_nontimestamped_metric) = try_split_timestamped_values(metric);
344
345                                    // If we have a timestamped metric, then batch it up out-of-band.
346                                    if let Some(timestamped_metric) = maybe_timestamped_metric {
347                                        self.passthrough_batcher.push_metric(timestamped_metric, context.dispatcher()).await;
348                                        processed_passthrough_metrics = true;
349                                    }
350
351                                    // If we have an nontimestamped metric, we'll process it like normal.
352                                    //
353                                    // Otherwise, continue to the next event.
354                                    match maybe_nontimestamped_metric {
355                                        Some(metric) => metric,
356                                        None => continue,
357                                    }
358                                } else {
359                                    metric
360                                };
361
362                                if !self.state.insert(current_time, metric) {
363                                    trace!("Dropping metric due to context limit.");
364                                    self.telemetry.increment_events_dropped();
365                                }
366                            }
367                        }
368
369                        if processed_passthrough_metrics {
370                            self.passthrough_batcher.update_last_processed_at();
371                        }
372                    },
373                    None => {
374                        // We've reached the end of our input stream, so mark ourselves for a final flush and reset the
375                        // interval so it ticks immediately on the next loop iteration.
376                        final_primary_flush = true;
377                        primary_flush.reset_immediately();
378
379                        debug!("Aggregation transform stopping...");
380                    }
381                },
382            }
383        }
384
385        // Do a final flush of any timestamped metrics that we've buffered up.
386        self.passthrough_batcher.try_flush(context.dispatcher()).await;
387
388        debug!("Aggregation transform stopped.");
389
390        Ok(())
391    }
392}
393
394fn try_split_timestamped_values(mut metric: Metric) -> (Option<Metric>, Option<Metric>) {
395    if metric.values().all_timestamped() {
396        (Some(metric), None)
397    } else if metric.values().any_timestamped() {
398        // Only _some_ of the values are timestamped, so we'll split the timestamped values into a new metric.
399        let new_metric_values = metric.values_mut().split_timestamped();
400        let new_metric = Metric::from_parts(metric.context().clone(), new_metric_values, metric.metadata().clone());
401
402        (Some(new_metric), Some(metric))
403    } else {
404        // No timestamped values, so we need to aggregate this metric.
405        (None, Some(metric))
406    }
407}
408
409struct PassthroughBatcher {
410    active_buffer: EventsBuffer,
411    active_buffer_start: Instant,
412    last_processed_at: Instant,
413    idle_flush_timeout: Duration,
414    bucket_width: Duration,
415    telemetry: Telemetry,
416}
417
418impl PassthroughBatcher {
419    async fn new(idle_flush_timeout: Duration, bucket_width: Duration, telemetry: Telemetry) -> Self {
420        let active_buffer = EventsBuffer::default();
421
422        Self {
423            active_buffer,
424            active_buffer_start: Instant::now(),
425            last_processed_at: Instant::now(),
426            idle_flush_timeout,
427            bucket_width,
428            telemetry,
429        }
430    }
431
432    async fn push_metric(&mut self, metric: Metric, dispatcher: &EventsDispatcher) {
433        // Convert counters to rates before we batch them up.
434        //
435        // This involves specifying the rate interval as the bucket width of the aggregate transform itself, which when
436        // you say it out loud is sort of confusing and nonsensical since the whole point is that these are
437        // _pre-aggregated_ metrics but we have to match the behavior of the Datadog Agent. ¯\_(ツ)_/¯
438        let (context, values, metadata) = metric.into_parts();
439        let adjusted_values = counter_values_to_rate(values, self.bucket_width);
440        let metric = Metric::from_parts(context, adjusted_values, metadata);
441
442        // Try pushing the metric into our active buffer.
443        //
444        // If our active buffer is full, then we'll flush the buffer, grab a new one, and push the metric into it.
445        if let Some(event) = self.active_buffer.try_push(Event::Metric(metric)) {
446            debug!("Passthrough event buffer was full. Flushing...");
447            self.dispatch_events(dispatcher).await;
448
449            if self.active_buffer.try_push(event).is_some() {
450                error!("Event buffer is full even after dispatching events. Dropping event.");
451                self.telemetry.increment_events_dropped();
452                return;
453            }
454        }
455
456        // If this is the first metric in the buffer, we've started a new batch, so track when it started.
457        if self.active_buffer.len() == 1 {
458            self.active_buffer_start = Instant::now();
459        }
460
461        self.telemetry.increment_passthrough_metrics();
462    }
463
464    fn update_last_processed_at(&mut self) {
465        // We expose this as a standalone method, rather than just doing it automatically in `push_metric`, because
466        // otherwise we might be calling this 10-20K times per second, instead of simply doing it after the end of each
467        // input event buffer in the transform's main loop, which should be much less frequent.
468        self.last_processed_at = Instant::now();
469    }
470
471    async fn try_flush(&mut self, dispatcher: &EventsDispatcher) {
472        // If our active buffer isn't empty, and we've exceeded our idle flush timeout, then flush the buffer.
473        if !self.active_buffer.is_empty() && self.last_processed_at.elapsed() >= self.idle_flush_timeout {
474            debug!("Passthrough processing exceeded idle flush timeout. Flushing...");
475
476            self.dispatch_events(dispatcher).await;
477        }
478    }
479
480    async fn dispatch_events(&mut self, dispatcher: &EventsDispatcher) {
481        if !self.active_buffer.is_empty() {
482            let unaggregated_events = self.active_buffer.len();
483
484            // Track how long this batch was alive for.
485            let batch_duration = self.active_buffer_start.elapsed();
486            self.telemetry.record_passthrough_batch_duration(batch_duration);
487
488            self.telemetry.increment_passthrough_flushes();
489
490            // Swap our active buffer with a new, empty one, and then forward the old one.
491            let new_active_buffer = EventsBuffer::default();
492            let old_active_buffer = std::mem::replace(&mut self.active_buffer, new_active_buffer);
493
494            match dispatcher.dispatch(old_active_buffer).await {
495                Ok(()) => debug!(unaggregated_events, "Dispatched events."),
496                Err(e) => error!(error = %e, "Failed to flush unaggregated events."),
497            }
498        }
499    }
500}
501
502#[derive(Clone)]
503struct AggregatedMetric {
504    values: MetricValues,
505    metadata: MetricMetadata,
506    last_seen: u64,
507}
508
509struct AggregationState {
510    contexts: HashMap<Context, AggregatedMetric, foldhash::quality::RandomState>,
511    contexts_remove_buf: Vec<Context>,
512    context_limit: usize,
513    bucket_width_secs: u64,
514    counter_expire_secs: Option<NonZeroU64>,
515    last_flush: u64,
516    hist_config: HistogramConfiguration,
517    telemetry: Telemetry,
518}
519
520impl AggregationState {
521    fn new(
522        bucket_width: Duration, context_limit: usize, counter_expiration: Option<Duration>,
523        hist_config: HistogramConfiguration, telemetry: Telemetry,
524    ) -> Self {
525        let counter_expire_secs = counter_expiration.map(|d| d.as_secs()).and_then(NonZeroU64::new);
526
527        Self {
528            contexts: HashMap::default(),
529            contexts_remove_buf: Vec::new(),
530            context_limit,
531            bucket_width_secs: bucket_width.as_secs(),
532            counter_expire_secs,
533            last_flush: 0,
534            hist_config,
535            telemetry,
536        }
537    }
538
539    fn is_empty(&self) -> bool {
540        self.contexts.is_empty()
541    }
542
543    fn insert(&mut self, timestamp: u64, metric: Metric) -> bool {
544        // If we haven't seen this context yet, and it would put us over the limit to insert it, then return early.
545        if !self.contexts.contains_key(metric.context()) && self.contexts.len() >= self.context_limit {
546            return false;
547        }
548
549        let (context, mut values, metadata) = metric.into_parts();
550
551        // Collapse all non-timestamped values into a single timestamped value.
552        //
553        // We do this pre-aggregation step because unless we're merging into an existing context, we'll end up with
554        // however many values were in the original metric instead of full aggregated values.
555        let bucket_ts = align_to_bucket_start(timestamp, self.bucket_width_secs);
556        values.collapse_non_timestamped(bucket_ts);
557
558        trace!(
559            bucket_ts,
560            kind = values.as_str(),
561            "Inserting metric into aggregation state."
562        );
563
564        // If we're already tracking this context, update the last seen time and merge the new values into the existing
565        // values. Otherwise, create a new entry.
566        match self.contexts.entry(context) {
567            Entry::Occupied(mut entry) => {
568                let aggregated = entry.get_mut();
569
570                // We ignore metadata changes within a flush interval to keep things simple.
571                aggregated.last_seen = timestamp;
572                aggregated.values.merge(values);
573            }
574            Entry::Vacant(entry) => {
575                self.telemetry.increment_contexts(entry.key(), &values);
576
577                entry.insert(AggregatedMetric {
578                    values,
579                    metadata,
580                    last_seen: timestamp,
581                });
582            }
583        }
584
585        true
586    }
587
588    async fn flush(
589        &mut self, current_time: u64, flush_open_buckets: bool, dispatcher: &mut BufferedDispatcher<'_, EventsBuffer>,
590    ) -> Result<(), GenericError> {
591        let bucket_width_secs = self.bucket_width_secs;
592        let counter_expire_secs = self.counter_expire_secs.map(|d| d.get()).unwrap_or(0);
593
594        // We want our split timestamp to be before the start of the current bucket, which ensures any timestamp that is
595        // less than or equal to `split_timestamp` resides in a closed bucket.
596        let split_timestamp = align_to_bucket_start(current_time, bucket_width_secs).saturating_sub(1);
597
598        // Calculate the buckets we need to potentially generate zero-value counters for.
599        //
600        // We only need to do this if we've flushed before, since we won't have any knowledge of which counters are idle
601        // or not until that happens.
602        let mut zero_value_buckets = SmallVec::<[(u64, MetricValues); 4]>::new();
603        if self.last_flush != 0 {
604            let start = align_to_bucket_start(self.last_flush, bucket_width_secs);
605
606            for bucket_start in (start..current_time).step_by(bucket_width_secs as usize) {
607                if is_bucket_closed(current_time, bucket_start, bucket_width_secs, flush_open_buckets) {
608                    zero_value_buckets.push((bucket_start, MetricValues::counter((bucket_start, 0.0))));
609                }
610            }
611        }
612
613        // Iterate over each context we're tracking, and flush any values that are in buckets which are now closed.
614        debug!(timestamp = current_time, "Flushing buckets.");
615
616        for (context, am) in self.contexts.iter_mut() {
617            // Figure out if we should remove this metric or not if it has no values in open buckets.
618            //
619            // We have a special carve-out for counters here, which we have the ability to keep alive after they are
620            // flushed, based on a configured expiration period. This allows us to continue emitting a zero value for
621            // counters when they're idle, which can make them appear "live" in downstream systems, even when they're
622            // not.
623            //
624            // This is useful for sparsely-updated counters.
625            let should_expire_if_empty = match &am.values {
626                MetricValues::Counter(..) => {
627                    counter_expire_secs != 0 && am.last_seen + counter_expire_secs < current_time
628                }
629                _ => true,
630            };
631
632            // If we're dealing with a counter, we'll merge in our calculated set of zero values. We only merge in the
633            // values that represent now-closed buckets.
634            //
635            // This is also safe to do even when there are real values in those buckets since adding zero to anything is
636            // a no-op from the perspective of what we end up flushing, and it doesn't mess with the "last seen" time.
637            if let MetricValues::Counter(..) = &mut am.values {
638                let expires_at = am.last_seen + counter_expire_secs;
639                for (zv_bucket_start, zero_value) in &zero_value_buckets {
640                    if expires_at > *zv_bucket_start {
641                        am.values.merge(zero_value.clone());
642                    } else {
643                        // Since zero-value buckets are in order, we can break early if this bucket is past the
644                        // expiration cutoff of the counter. No other bucket will be within the expiration range.
645                        break;
646                    }
647                }
648            }
649
650            // Finally, figure out if the current metric can be removed.
651            //
652            // For any metric with values that are in open buckets, we split off the values that are in closed buckets
653            // and keep the metric alive. When all the values are in closed buckets, or there are no values, we'll
654            // remove the metric if `should_remove_if_empty` is `true`.
655            //
656            // This means we'll always remove all-closed/empty non-counter metrics, and we _may_ remove all-closed/empty
657            // counters.
658            if let Some(closed_bucket_values) = am.values.split_at_timestamp(split_timestamp) {
659                self.telemetry.increment_flushed(&closed_bucket_values);
660
661                // We got some closed bucket values, so flush those out.
662                transform_and_push_metric(
663                    context.clone(),
664                    closed_bucket_values,
665                    am.metadata.clone(),
666                    bucket_width_secs,
667                    &self.hist_config,
668                    dispatcher,
669                )
670                .await?;
671            }
672
673            if am.values.is_empty() && should_expire_if_empty {
674                self.telemetry.decrement_contexts(context, &am.values);
675                self.contexts_remove_buf.push(context.clone());
676            }
677        }
678
679        // Remove any contexts that were marked as needing to be removed.
680        let contexts_len_before = self.contexts.len();
681        for context in self.contexts_remove_buf.drain(..) {
682            self.contexts.remove(&context);
683        }
684        let contexts_len_after = self.contexts.len();
685
686        let contexts_delta = contexts_len_before.saturating_sub(contexts_len_after);
687        let target_contexts_capacity = contexts_len_after.saturating_add(contexts_delta / 2);
688        self.contexts.shrink_to(target_contexts_capacity);
689
690        self.last_flush = current_time;
691
692        Ok(())
693    }
694}
695
696async fn transform_and_push_metric(
697    context: Context, mut values: MetricValues, metadata: MetricMetadata, bucket_width_secs: u64,
698    hist_config: &HistogramConfiguration, dispatcher: &mut BufferedDispatcher<'_, EventsBuffer>,
699) -> Result<(), GenericError> {
700    let bucket_width = Duration::from_secs(bucket_width_secs);
701
702    match values {
703        // If we're dealing with a histogram, we calculate a configured set of aggregates/percentiles from it, and emit
704        // them as individual metrics.
705        MetricValues::Histogram(ref mut points) => {
706            // Convert histogram to distribution
707            if hist_config.copy_to_distribution() {
708                let sketch_points = points
709                    .into_iter()
710                    .map(|(ts, hist)| {
711                        let mut sketch = DDSketch::default();
712                        for sample in hist.samples() {
713                            sketch.insert_n(sample.value.into_inner(), sample.weight as u32);
714                        }
715                        (ts, sketch)
716                    })
717                    .collect::<SketchPoints>();
718                let distribution_values = MetricValues::distribution(sketch_points);
719                let metric_context = if !hist_config.copy_to_distribution_prefix().is_empty() {
720                    context.with_name(format!(
721                        "{}{}",
722                        hist_config.copy_to_distribution_prefix(),
723                        context.name()
724                    ))
725                } else {
726                    context.clone()
727                };
728                let new_metric = Metric::from_parts(metric_context, distribution_values, metadata.clone());
729                dispatcher.push(Event::Metric(new_metric)).await?;
730            }
731            // We collect our histogram points in their "summary" view, which sorts the underlying samples allowing
732            // proper quantile queries to be answered, hence our "sorted" points. We do it this way because rather than
733            // sort every time we insert, or cloning the points, we only sort when a summary view is constructed, which
734            // requires mutable access to sort the samples in-place.
735            let mut sorted_points = Vec::new();
736            for (ts, h) in points {
737                sorted_points.push((ts, h.summary_view()));
738            }
739
740            for statistic in hist_config.statistics() {
741                let new_points = sorted_points
742                    .iter()
743                    .map(|(ts, hs)| (*ts, statistic.value_from_histogram(hs)))
744                    .collect::<ScalarPoints>();
745
746                let new_values = if statistic.is_rate_statistic() {
747                    MetricValues::rate(new_points, bucket_width)
748                } else {
749                    MetricValues::gauge(new_points)
750                };
751
752                let new_context = context.with_name(format!("{}.{}", context.name(), statistic.suffix()));
753                let new_metric = Metric::from_parts(new_context, new_values, metadata.clone());
754                dispatcher.push(Event::Metric(new_metric)).await?;
755            }
756
757            Ok(())
758        }
759
760        // If we're not dealing with a histogram, then all we need to worry about is converting counters to rates before
761        // forwarding our single, aggregated metric.
762        values => {
763            let adjusted_values = counter_values_to_rate(values, bucket_width);
764
765            let metric = Metric::from_parts(context, adjusted_values, metadata);
766            dispatcher.push(Event::Metric(metric)).await
767        }
768    }
769}
770
771fn counter_values_to_rate(values: MetricValues, interval: Duration) -> MetricValues {
772    match values {
773        MetricValues::Counter(points) => MetricValues::rate(points, interval),
774        values => values,
775    }
776}
777
778const fn align_to_bucket_start(timestamp: u64, bucket_width_secs: u64) -> u64 {
779    timestamp - (timestamp % bucket_width_secs)
780}
781
782const fn is_bucket_closed(
783    current_time: u64, bucket_start: u64, bucket_width_secs: u64, flush_open_buckets: bool,
784) -> bool {
785    // A bucket is considered "closed" if the current time is greater than the end of the bucket, or if
786    // `flush_open_buckets` is `true`.
787    //
788    // Buckets represent a half-open interval, where the start is inclusive and the end is exclusive. This means that
789    // for a bucket start of 10, and a width of 10, the bucket is 10 seconds "wide", and its start and end are 10 and
790    // 20, with the 20 excluded, or [10, 20) in interval notation. Simply put, if we have a timestamp of 10, or anything
791    // smaller than 20, we would consider it to fall within the bucket... but 20 or more would be outside of the bucket.
792    //
793    // We can also represent this visually:
794    //
795    // <--------- bucket 1 ----------> <--------- bucket 2 ----------> <--------- bucket 3 ---------->
796    // [10 11 12 13 14 15 16 17 18 19] [20 21 22 23 24 25 26 27 28 29] [30 31 32 33 34 35 36 37 38 39]
797    //
798    // We can see that each bucket is 10 seconds wide (10 elements, one for each second), and that their ends are
799    // effectively `start + width - 1`. This means that for any of these buckets to be considered "closed", the current
800    // time has to be _greater_ than `start + width - 1`. For example, if the current time is 19, then no buckets are
801    // closed, and if the current time is 29, then bucket 1 is closed but buckets 2 and 3 are still open, and if the
802    // current time is 30, then both buckets 1 and 2 are closed, but bucket 3 is still open.
803    (bucket_start + bucket_width_secs - 1) < current_time || flush_open_buckets
804}
805
806// TODO: One thing we ought to consider is a property test, specifically a state machine property test, where we
807// generate a randomized offset to start time from, a bucket width, flush interval, and operations, and so on... and
808// then we run it to make sure that we are always generating sequential timestamps for data points, etc.
809#[cfg(test)]
810mod tests {
811    use float_cmp::ApproxEqRatio as _;
812    use saluki_core::{
813        components::ComponentContext,
814        topology::{interconnect::Dispatcher, ComponentId, OutputName},
815    };
816    use saluki_metrics::test::TestRecorder;
817    use tokio::sync::mpsc;
818
819    use super::config::HistogramStatistic;
820    use super::*;
821
822    const BUCKET_WIDTH_SECS: u64 = 10;
823    const BUCKET_WIDTH: Duration = Duration::from_secs(BUCKET_WIDTH_SECS);
824    const COUNTER_EXPIRE_SECS: u64 = 20;
825    const COUNTER_EXPIRE: Option<Duration> = Some(Duration::from_secs(COUNTER_EXPIRE_SECS));
826
827    /// Gets the bucket start timestamp for the given step.
828    const fn bucket_ts(step: u64) -> u64 {
829        align_to_bucket_start(insert_ts(step), BUCKET_WIDTH_SECS)
830    }
831
832    /// Gets the insert timestamp for the given step.
833    const fn insert_ts(step: u64) -> u64 {
834        (BUCKET_WIDTH_SECS * (step + 1)) - 2
835    }
836
837    /// Gets the flush timestamp for the given step.
838    const fn flush_ts(step: u64) -> u64 {
839        BUCKET_WIDTH_SECS * (step + 1)
840    }
841
842    struct DispatcherReceiver {
843        receiver: mpsc::Receiver<EventsBuffer>,
844    }
845
846    impl DispatcherReceiver {
847        fn collect_next(&mut self) -> Vec<Metric> {
848            match self.receiver.try_recv() {
849                Ok(event_buffer) => {
850                    let mut metrics = event_buffer
851                        .into_iter()
852                        .filter_map(|event| event.try_into_metric())
853                        .collect::<Vec<Metric>>();
854
855                    metrics.sort_by(|a, b| a.context().name().cmp(b.context().name()));
856                    metrics
857                }
858                Err(_) => Vec::new(),
859            }
860        }
861    }
862
863    /// Constructs a basic `Dispatcher` with a fixed-size event buffer.
864    fn build_basic_dispatcher() -> (EventsDispatcher, DispatcherReceiver) {
865        let component_id = ComponentId::try_from("test").expect("should not fail to create component ID");
866        let mut dispatcher = Dispatcher::new(ComponentContext::transform(component_id));
867
868        let (buffer_tx, buffer_rx) = mpsc::channel(1);
869        dispatcher.add_output(OutputName::Default).unwrap();
870        dispatcher
871            .attach_sender_to_output(&OutputName::Default, buffer_tx)
872            .unwrap();
873
874        (dispatcher, DispatcherReceiver { receiver: buffer_rx })
875    }
876
877    async fn get_flushed_metrics(timestamp: u64, state: &mut AggregationState) -> Vec<Metric> {
878        let (dispatcher, mut dispatcher_receiver) = build_basic_dispatcher();
879        let mut buffered_dispatcher = dispatcher.buffered().expect("default output should always exist");
880
881        // Flush the metrics to an event buffer.
882        state
883            .flush(timestamp, true, &mut buffered_dispatcher)
884            .await
885            .expect("should not fail to flush aggregation state");
886
887        // Flush our buffered dispatcher, which should ensure that the event buffer is sent out, and then read it from the
888        // receiver:
889        buffered_dispatcher
890            .flush()
891            .await
892            .expect("should not fail to flush buffered sender");
893
894        dispatcher_receiver.collect_next()
895    }
896
897    macro_rules! compare_points {
898        (scalar, $expected:expr, $actual:expr, $error_ratio:literal) => {
899            for (idx, (expected_value, actual_value)) in $expected.into_iter().zip($actual.into_iter()).enumerate() {
900                let (expected_ts, expected_point) = expected_value;
901                let (actual_ts, actual_point) = actual_value;
902
903                assert_eq!(
904                    expected_ts, actual_ts,
905                    "timestamp for value #{} does not match: {:?} (expected) vs {:?} (actual)",
906                    idx, expected_ts, actual_ts
907                );
908                assert!(
909                    expected_point.approx_eq_ratio(&actual_point, $error_ratio),
910                    "point for value #{} does not match: {} (expected) vs {} (actual)",
911                    idx,
912                    expected_point,
913                    actual_point
914                );
915            }
916        };
917        (distribution, $expected:expr, $actual:expr) => {
918            for (idx, (expected_value, actual_value)) in $expected.into_iter().zip($actual.into_iter()).enumerate() {
919                let (expected_ts, expected_sketch) = expected_value;
920                let (actual_ts, actual_sketch) = actual_value;
921
922                assert_eq!(
923                    expected_ts, actual_ts,
924                    "timestamp for value #{} does not match: {:?} (expected) vs {:?} (actual)",
925                    idx, expected_ts, actual_ts
926                );
927                assert_eq!(
928                    expected_sketch, actual_sketch,
929                    "sketch for value #{} does not match: {:?} (expected) vs {:?} (actual)",
930                    idx, expected_sketch, actual_sketch
931                );
932            }
933        };
934    }
935
936    macro_rules! assert_flushed_scalar_metric {
937        ($original:expr, $actual:expr, [$($ts:expr => $value:expr),+]) => {
938            assert_flushed_scalar_metric!($original, $actual, [$($ts => $value),+], error_ratio => 0.000001);
939        };
940        ($original:expr, $actual:expr, [$($ts:expr => $value:expr),+], error_ratio => $error_ratio:literal) => {
941            let actual_metric = $actual;
942
943            assert_eq!($original.context(), actual_metric.context(), "expected context ({}) and actual context ({}) do not match", $original.context(), actual_metric.context());
944
945            let expected_points = ScalarPoints::from([$(($ts, $value)),+]);
946
947            match actual_metric.values() {
948                MetricValues::Counter(ref actual_points) | MetricValues::Gauge(ref actual_points) | MetricValues::Rate(ref actual_points, _) => {
949                    assert_eq!(expected_points.len(), actual_points.len(), "expected and actual values have different number of points");
950                    compare_points!(scalar, expected_points, actual_points, $error_ratio);
951                },
952                _ => panic!("only counters, rates, and gauges are supported in assert_flushed_scalar_metric"),
953            }
954        };
955    }
956
957    macro_rules! assert_flushed_distribution_metric {
958        ($original:expr, $actual:expr, [$($ts:expr => $value:expr),+]) => {
959            assert_flushed_distribution_metric!($original, $actual, [$($ts => $value),+], error_ratio => 0.000001);
960        };
961        ($original:expr, $actual:expr, [$($ts:expr => $value:expr),+], error_ratio => $error_ratio:literal) => {
962            let actual_metric = $actual;
963
964            assert_eq!($original.context(), actual_metric.context());
965
966            match actual_metric.values() {
967                MetricValues::Distribution(ref actual_points) => {
968                    let expected_points = SketchPoints::from([$(($ts, $value)),+]);
969                    assert_eq!(expected_points.len(), actual_points.len(), "expected and actual values have different number of points");
970
971                    compare_points!(distribution, &expected_points, actual_points);
972                },
973                _ => panic!("only distributions are supported in assert_flushed_distribution_metric"),
974            }
975        };
976    }
977
978    #[test]
979    fn bucket_is_closed() {
980        // Cases are defined as:
981        // (current time, bucket start, bucket width, flush open buckets, expected result)
982        let cases = [
983            // Bucket goes from [995, 1005), current time of 1000, so bucket is open.
984            (1000, 995, 10, false, false),
985            (1000, 995, 10, true, true),
986            // Bucket goes from [1000, 1010), current time of 1000, so bucket is open.
987            (1000, 1000, 10, false, false),
988            (1000, 1000, 10, true, true),
989            // Bucket goes from [1000, 1010), current time of 1010, so bucket is closed.
990            (1010, 1000, 10, false, true),
991            (1010, 1000, 10, true, true),
992        ];
993
994        for (current_time, bucket_start, bucket_width_secs, flush_open_buckets, expected) in cases {
995            let expected_reason = if expected {
996                "closed, was open"
997            } else {
998                "open, was closed"
999            };
1000
1001            assert_eq!(
1002                is_bucket_closed(current_time, bucket_start, bucket_width_secs, flush_open_buckets),
1003                expected,
1004                "expected bucket to be {} (current_time={}, bucket_start={}, bucket_width={}, flush_open_buckets={})",
1005                expected_reason,
1006                current_time,
1007                bucket_start,
1008                bucket_width_secs,
1009                flush_open_buckets
1010            );
1011        }
1012    }
1013
1014    #[tokio::test]
1015    async fn context_limit() {
1016        // Create our aggregation state with a context limit of 2.
1017        let mut state = AggregationState::new(
1018            BUCKET_WIDTH,
1019            2,
1020            COUNTER_EXPIRE,
1021            HistogramConfiguration::default(),
1022            Telemetry::noop(),
1023        );
1024
1025        // Create four unique gauges, and insert all of them. The third and fourth should fail because we've reached
1026        // the context limit.
1027        let input_metrics = vec![
1028            Metric::gauge("metric1", 1.0),
1029            Metric::gauge("metric2", 2.0),
1030            Metric::gauge("metric3", 3.0),
1031            Metric::gauge("metric4", 4.0),
1032        ];
1033
1034        assert!(state.insert(insert_ts(1), input_metrics[0].clone()));
1035        assert!(state.insert(insert_ts(1), input_metrics[1].clone()));
1036        assert!(!state.insert(insert_ts(1), input_metrics[2].clone()));
1037        assert!(!state.insert(insert_ts(1), input_metrics[3].clone()));
1038
1039        // We should only see the first two gauges after flushing.
1040        let flushed_metrics = get_flushed_metrics(flush_ts(1), &mut state).await;
1041        assert_eq!(flushed_metrics.len(), 2);
1042        assert_eq!(input_metrics[0].context(), flushed_metrics[0].context());
1043        assert_eq!(input_metrics[1].context(), flushed_metrics[1].context());
1044
1045        // We should be able to insert the third and fourth gauges now as the first two have been flushed, and along
1046        // with them, their contexts should no longer be tracked in the aggregation state:
1047        assert!(state.insert(insert_ts(2), input_metrics[2].clone()));
1048        assert!(state.insert(insert_ts(2), input_metrics[3].clone()));
1049
1050        let flushed_metrics = get_flushed_metrics(flush_ts(2), &mut state).await;
1051        assert_eq!(flushed_metrics.len(), 2);
1052        assert_eq!(input_metrics[2].context(), flushed_metrics[0].context());
1053        assert_eq!(input_metrics[3].context(), flushed_metrics[1].context());
1054    }
1055
1056    #[tokio::test]
1057    async fn context_limit_with_zero_value_counters() {
1058        // We test here to ensure that zero-value counters contribute to the context limit.
1059        let mut state = AggregationState::new(
1060            BUCKET_WIDTH,
1061            2,
1062            COUNTER_EXPIRE,
1063            HistogramConfiguration::default(),
1064            Telemetry::noop(),
1065        );
1066
1067        // Create our input metrics.
1068        let input_metrics = vec![
1069            Metric::counter("metric1", 1.0),
1070            Metric::counter("metric2", 2.0),
1071            Metric::counter("metric3", 3.0),
1072        ];
1073
1074        assert!(state.insert(insert_ts(1), input_metrics[0].clone()));
1075        assert!(state.insert(insert_ts(1), input_metrics[1].clone()));
1076
1077        // Flush the aggregation state, and observe they're both present.
1078        let flushed_metrics = get_flushed_metrics(flush_ts(1), &mut state).await;
1079        assert_eq!(flushed_metrics.len(), 2);
1080        assert_flushed_scalar_metric!(&input_metrics[0], &flushed_metrics[0], [bucket_ts(1) => 1.0]);
1081        assert_flushed_scalar_metric!(&input_metrics[1], &flushed_metrics[1], [bucket_ts(1) => 2.0]);
1082
1083        // Flush _again_ to ensure that we then emit zero-value variants for both counters.
1084        let flushed_metrics = get_flushed_metrics(flush_ts(2), &mut state).await;
1085        assert_eq!(flushed_metrics.len(), 2);
1086        assert_flushed_scalar_metric!(&input_metrics[0], &flushed_metrics[0], [bucket_ts(2) => 0.0]);
1087        assert_flushed_scalar_metric!(&input_metrics[1], &flushed_metrics[1], [bucket_ts(2) => 0.0]);
1088
1089        // Now try to insert a third counter, which should fail because we've reached the context limit.
1090        assert!(!state.insert(insert_ts(3), input_metrics[2].clone()));
1091
1092        // Flush the aggregation state, and observe that we only see the two original counters.
1093        let flushed_metrics = get_flushed_metrics(flush_ts(3), &mut state).await;
1094        assert_eq!(flushed_metrics.len(), 2);
1095        assert_flushed_scalar_metric!(&input_metrics[0], &flushed_metrics[0], [bucket_ts(3) => 0.0]);
1096        assert_flushed_scalar_metric!(&input_metrics[1], &flushed_metrics[1], [bucket_ts(3) => 0.0]);
1097
1098        // With a fourth flush interval, the two counters should now have expired, and thus be dropped and no longer
1099        // contributing to the context limit.
1100        let flushed_metrics = get_flushed_metrics(flush_ts(4), &mut state).await;
1101        assert_eq!(flushed_metrics.len(), 0);
1102
1103        // Now we should be able to insert the third counter, and it should be the only one present after flushing.
1104        assert!(state.insert(insert_ts(5), input_metrics[2].clone()));
1105
1106        let flushed_metrics = get_flushed_metrics(flush_ts(5), &mut state).await;
1107        assert_eq!(flushed_metrics.len(), 1);
1108        assert_flushed_scalar_metric!(&input_metrics[2], &flushed_metrics[0], [bucket_ts(5) => 3.0]);
1109    }
1110
1111    #[tokio::test]
1112    async fn zero_value_counters() {
1113        // We're testing that we properly emit and expire zero-value counters in all relevant scenarios.
1114        let mut state = AggregationState::new(
1115            BUCKET_WIDTH,
1116            10,
1117            COUNTER_EXPIRE,
1118            HistogramConfiguration::default(),
1119            Telemetry::noop(),
1120        );
1121
1122        // Create two unique counters, and insert both of them.
1123        let input_metrics = vec![Metric::counter("metric1", 1.0), Metric::counter("metric2", 2.0)];
1124
1125        assert!(state.insert(insert_ts(1), input_metrics[0].clone()));
1126        assert!(state.insert(insert_ts(1), input_metrics[1].clone()));
1127
1128        // Flush the aggregation state, and observe they're both present.
1129        let flushed_metrics = get_flushed_metrics(flush_ts(1), &mut state).await;
1130        assert_eq!(flushed_metrics.len(), 2);
1131        assert_flushed_scalar_metric!(&input_metrics[0], &flushed_metrics[0], [bucket_ts(1) => 1.0]);
1132        assert_flushed_scalar_metric!(&input_metrics[1], &flushed_metrics[1], [bucket_ts(1) => 2.0]);
1133
1134        // Perform our second flush, which should have them as zero-value counters.
1135        let flushed_metrics = get_flushed_metrics(flush_ts(2), &mut state).await;
1136        assert_eq!(flushed_metrics.len(), 2);
1137        assert_flushed_scalar_metric!(&input_metrics[0], &flushed_metrics[0], [bucket_ts(2) => 0.0]);
1138        assert_flushed_scalar_metric!(&input_metrics[1], &flushed_metrics[1], [bucket_ts(2) => 0.0]);
1139
1140        // Now, we'll pretend to skip a flush period and add updates to them again after that.
1141        assert!(state.insert(insert_ts(4), input_metrics[0].clone()));
1142        assert!(state.insert(insert_ts(4), input_metrics[1].clone()));
1143
1144        // Flush the aggregation state, and observe that we have two zero-value counters for the flush period we
1145        // skipped, but that we see them appear again in the fourth flush period.
1146        let flushed_metrics = get_flushed_metrics(flush_ts(4), &mut state).await;
1147        assert_eq!(flushed_metrics.len(), 2);
1148        assert_flushed_scalar_metric!(&input_metrics[0], &flushed_metrics[0], [bucket_ts(3) => 0.0, bucket_ts(4) => 1.0]);
1149        assert_flushed_scalar_metric!(&input_metrics[1], &flushed_metrics[1], [bucket_ts(3) => 0.0, bucket_ts(4) => 2.0]);
1150
1151        // Now we'll skip multiple flush periods and ensure that we emit zero-value counters up until the point they
1152        // expire. As our zero-value counter expiration is 20 seconds, this is two flush periods, so we skip by three
1153        // flush periods, and we should only see the counters emitted for the first two.
1154        let flushed_metrics = get_flushed_metrics(flush_ts(7), &mut state).await;
1155        assert_eq!(flushed_metrics.len(), 2);
1156        assert_flushed_scalar_metric!(&input_metrics[0], &flushed_metrics[0], [bucket_ts(5) => 0.0, bucket_ts(6) => 0.0]);
1157        assert_flushed_scalar_metric!(&input_metrics[1], &flushed_metrics[1], [bucket_ts(5) => 0.0, bucket_ts(6) => 0.0]);
1158    }
1159
1160    #[tokio::test]
1161    async fn merge_identical_timestamped_values_on_flush() {
1162        // We're testing that we properly emit and expire zero-value counters in all relevant scenarios.
1163        let mut state = AggregationState::new(
1164            BUCKET_WIDTH,
1165            10,
1166            COUNTER_EXPIRE,
1167            HistogramConfiguration::default(),
1168            Telemetry::noop(),
1169        );
1170
1171        // Create one multi-value counter, and insert it.
1172        let input_metric = Metric::counter("metric1", [1.0, 2.0, 3.0, 4.0, 5.0]);
1173
1174        assert!(state.insert(insert_ts(1), input_metric.clone()));
1175
1176        // Flush the aggregation state, and observe the metric is present _and_ that we've properly merged all of the
1177        // values within the same timestamp.
1178        let flushed_metrics = get_flushed_metrics(flush_ts(1), &mut state).await;
1179        assert_eq!(flushed_metrics.len(), 1);
1180        assert_flushed_scalar_metric!(&input_metric, &flushed_metrics[0], [bucket_ts(1) => 15.0]);
1181    }
1182
1183    #[tokio::test]
1184    async fn histogram_statistics() {
1185        // We're testing that we properly emit individual metrics (min, max, sum, etc) for a histogram.
1186        let hist_config = HistogramConfiguration::from_statistics(
1187            &[
1188                HistogramStatistic::Count,
1189                HistogramStatistic::Sum,
1190                HistogramStatistic::Percentile {
1191                    q: 0.5,
1192                    suffix: "p50".into(),
1193                },
1194            ],
1195            false,
1196            "".into(),
1197        );
1198        let mut state = AggregationState::new(BUCKET_WIDTH, 10, COUNTER_EXPIRE, hist_config, Telemetry::noop());
1199
1200        // Create one multi-value histogram and insert it.
1201        let input_metric = Metric::histogram("metric1", [1.0, 2.0, 3.0, 4.0, 5.0]);
1202        assert!(state.insert(insert_ts(1), input_metric.clone()));
1203
1204        // Flush the aggregation state, and observe that we've emitted all of the configured distribution statistics in
1205        // the form of three metrics: count, sum, and p50.
1206        let flushed_metrics = get_flushed_metrics(flush_ts(1), &mut state).await;
1207        assert_eq!(flushed_metrics.len(), 3);
1208
1209        // Create versions of the metric for each of the statistics we're expecting to emit. The values themselves don't
1210        // matter here, but we do need a `Metric` for it to compare the context to.
1211        let count_metric = Metric::rate("metric1.count", 0.0, Duration::from_secs(BUCKET_WIDTH_SECS));
1212        let sum_metric = Metric::gauge("metric1.sum", 0.0);
1213        let p50_metric = Metric::gauge("metric1.p50", 0.0);
1214
1215        // We use a less strict error ratio (how much the expected vs actual) for the percentile check, as we generally
1216        // expect the value to be somewhat off the exact value due to the lossy nature of `DDSketch`.
1217        assert_flushed_scalar_metric!(count_metric, &flushed_metrics[0], [bucket_ts(1) => 5.0]);
1218        assert_flushed_scalar_metric!(p50_metric, &flushed_metrics[1], [bucket_ts(1) => 3.0], error_ratio => 0.0025);
1219        assert_flushed_scalar_metric!(sum_metric, &flushed_metrics[2], [bucket_ts(1) => 15.0]);
1220    }
1221
1222    #[tokio::test]
1223    async fn distributions() {
1224        // We're testing that we pass through distributions untouched.
1225        let mut state = AggregationState::new(
1226            BUCKET_WIDTH,
1227            10,
1228            COUNTER_EXPIRE,
1229            HistogramConfiguration::default(),
1230            Telemetry::noop(),
1231        );
1232
1233        // Create one multi-value distribution, with server-side aggregation, and insert it.
1234        let values = [1.0, 2.0, 3.0, 4.0, 5.0];
1235        let input_metric = Metric::distribution("metric1", &values[..]);
1236
1237        assert!(state.insert(insert_ts(1), input_metric.clone()));
1238
1239        // Flush the aggregation state, and observe that we've emitted the original distribution.
1240        let flushed_metrics = get_flushed_metrics(flush_ts(1), &mut state).await;
1241        assert_eq!(flushed_metrics.len(), 1);
1242
1243        assert_flushed_distribution_metric!(&input_metric, &flushed_metrics[0], [bucket_ts(1) => &values[..]]);
1244    }
1245
1246    #[tokio::test]
1247    async fn histogram_copy_to_distribution() {
1248        let hist_config = HistogramConfiguration::from_statistics(
1249            &[
1250                HistogramStatistic::Count,
1251                HistogramStatistic::Sum,
1252                HistogramStatistic::Percentile {
1253                    q: 0.5,
1254                    suffix: "p50".into(),
1255                },
1256            ],
1257            true,
1258            "dist_prefix.".into(),
1259        );
1260        let mut state = AggregationState::new(BUCKET_WIDTH, 10, COUNTER_EXPIRE, hist_config, Telemetry::noop());
1261
1262        // Create one multi-value histogram and insert it.
1263        let values = [1.0, 2.0, 3.0, 4.0, 5.0];
1264        let input_metric = Metric::histogram("metric1", values);
1265        assert!(state.insert(insert_ts(1), input_metric.clone()));
1266
1267        // Flush the aggregation state, and observe that we've emitted all of the configured distribution statistics in
1268        // the form of three metrics: count, sum, and p50 as well as the additional metric from copying the histogram.
1269        let flushed_metrics = get_flushed_metrics(flush_ts(1), &mut state).await;
1270        assert_eq!(flushed_metrics.len(), 4);
1271
1272        // Create versions of the metric for each of the statistics we're expecting to emit. The values themselves don't
1273        // matter here, but we do need a `Metric` for it to compare the context to.
1274        let count_metric = Metric::rate("metric1.count", 0.0, Duration::from_secs(BUCKET_WIDTH_SECS));
1275        let sum_metric = Metric::gauge("metric1.sum", 0.0);
1276        let p50_metric = Metric::gauge("metric1.p50", 0.0);
1277        let expected_distribution = Metric::distribution("dist_prefix.metric1", &values[..]);
1278
1279        // We use a less strict error ratio (how much the expected vs actual) for the percentile check, as we generally
1280        // expect the value to be somewhat off the exact value due to the lossy nature of `DDSketch`.
1281        assert_flushed_distribution_metric!(expected_distribution, &flushed_metrics[0], [bucket_ts(1) => &values[..]]);
1282        assert_flushed_scalar_metric!(count_metric, &flushed_metrics[1], [bucket_ts(1) => 5.0]);
1283        assert_flushed_scalar_metric!(p50_metric, &flushed_metrics[2], [bucket_ts(1) => 3.0], error_ratio => 0.0025);
1284        assert_flushed_scalar_metric!(sum_metric, &flushed_metrics[3], [bucket_ts(1) => 15.0]);
1285    }
1286
1287    #[tokio::test]
1288    async fn nonaggregated_counters_to_rate() {
1289        let counter_value = 42.0;
1290        let bucket_width = BUCKET_WIDTH;
1291
1292        // Create a basic aggregation state.
1293        let mut state = AggregationState::new(
1294            bucket_width,
1295            10,
1296            COUNTER_EXPIRE,
1297            HistogramConfiguration::default(),
1298            Telemetry::noop(),
1299        );
1300
1301        // Create a simple non-aggregated counter, and insert it.
1302        let input_metric = Metric::counter("metric1", counter_value);
1303        assert!(state.insert(insert_ts(1), input_metric.clone()));
1304
1305        // Flush the aggregation state, and observe that we've emitted the expected counter and that it has the right
1306        // value, but specifically that it's a rate with an interval that matches our configured bucket width:
1307        let flushed_metrics = get_flushed_metrics(flush_ts(1), &mut state).await;
1308        assert_eq!(flushed_metrics.len(), 1);
1309        let flushed_metric = &flushed_metrics[0];
1310
1311        assert_flushed_scalar_metric!(&input_metric, flushed_metric, [bucket_ts(1) => counter_value]);
1312        assert_eq!(flushed_metric.values().as_str(), "rate");
1313    }
1314
1315    #[tokio::test]
1316    async fn preaggregated_counters_to_rate() {
1317        let counter_value = 42.0;
1318        let timestamp = 123456;
1319        let bucket_width = BUCKET_WIDTH;
1320
1321        // Create a basic passthrough batcher and forwarder.
1322        let mut batcher = PassthroughBatcher::new(Duration::from_nanos(1), bucket_width, Telemetry::noop()).await;
1323        let (dispatcher, mut dispatcher_receiver) = build_basic_dispatcher();
1324
1325        // Create a simple pre-aggregated counter, and batch it.
1326        let input_metric = Metric::counter("metric1", (timestamp, counter_value));
1327        batcher.push_metric(input_metric.clone(), &dispatcher).await;
1328
1329        // Flush the batcher, and observe that we've emitted the expected counter and that it has the right
1330        // value, but specifically that it's a rate with an interval that matches our configured bucket width:
1331        batcher.try_flush(&dispatcher).await;
1332
1333        let mut flushed_metrics = dispatcher_receiver.collect_next();
1334        assert_eq!(flushed_metrics.len(), 1);
1335        assert_eq!(
1336            Metric::rate("metric1", (timestamp, counter_value), bucket_width),
1337            flushed_metrics.remove(0)
1338        );
1339    }
1340
1341    #[tokio::test]
1342    async fn telemetry() {
1343        // TODO: We don't check `component_events_dropped_total` or `aggregate_passthrough_metrics_total` here as
1344        // they're set directly in the aggregate component future rather than `AggregationState`, which is harder to
1345        // drive overall and would have required even more boilerplate.
1346        //
1347        // Leaving that as a future improvement.
1348
1349        let recorder = TestRecorder::default();
1350        let _local = metrics::set_default_local_recorder(&recorder);
1351
1352        let builder = MetricsBuilder::default();
1353        let telemetry = Telemetry::new(&builder);
1354
1355        let mut state = AggregationState::new(
1356            BUCKET_WIDTH,
1357            2,
1358            COUNTER_EXPIRE,
1359            HistogramConfiguration::default(),
1360            telemetry,
1361        );
1362
1363        // Make sure our telemetry is registered at default values.
1364        assert_eq!(recorder.gauge("aggregate_active_contexts"), Some(0.0));
1365        assert_eq!(recorder.counter("aggregate_passthrough_metrics_total"), Some(0));
1366        assert_eq!(
1367            recorder.counter(("component_events_dropped_total", &[("intentional", "true")])),
1368            Some(0)
1369        );
1370        for metric_type in &["counter", "gauge", "rate", "set", "histogram", "distribution"] {
1371            assert_eq!(
1372                recorder.gauge(("aggregate_active_contexts_by_type", &[("metric_type", *metric_type)])),
1373                Some(0.0)
1374            );
1375        }
1376
1377        // Insert a counter with a non-timestamped value.
1378        assert!(state.insert(insert_ts(1), Metric::counter("metric1", 42.0)));
1379        assert_eq!(recorder.gauge("aggregate_active_contexts"), Some(1.0));
1380        assert_eq!(
1381            recorder.gauge(("aggregate_active_contexts_by_type", &[("metric_type", "counter")])),
1382            Some(1.0)
1383        );
1384        assert_eq!(recorder.counter("aggregate_passthrough_metrics_total"), Some(0));
1385
1386        // Insert a gauge with a timestamped value.
1387        assert!(state.insert(insert_ts(1), Metric::gauge("metric2", (insert_ts(1), 42.0))));
1388        assert_eq!(recorder.gauge("aggregate_active_contexts"), Some(2.0));
1389        assert_eq!(
1390            recorder.gauge(("aggregate_active_contexts_by_type", &[("metric_type", "gauge")])),
1391            Some(1.0)
1392        );
1393
1394        // We've reached our context limit at this point, so the next metric should not be inserted.
1395        assert!(!state.insert(insert_ts(1), Metric::counter("metric3", 42.0)));
1396        assert_eq!(recorder.gauge("aggregate_active_contexts"), Some(2.0));
1397        assert_eq!(
1398            recorder.gauge(("aggregate_active_contexts_by_type", &[("metric_type", "counter")])),
1399            Some(1.0)
1400        );
1401
1402        // Now let's flush the state which should flush the gauge entirely, reducing the context count, but not flush
1403        // the counter, since it'll be in zero-value mode.
1404        let _ = get_flushed_metrics(flush_ts(1), &mut state).await;
1405        assert_eq!(recorder.gauge("aggregate_active_contexts"), Some(1.0));
1406        assert_eq!(
1407            recorder.gauge(("aggregate_active_contexts_by_type", &[("metric_type", "counter")])),
1408            Some(1.0)
1409        );
1410        assert_eq!(
1411            recorder.gauge(("aggregate_active_contexts_by_type", &[("metric_type", "gauge")])),
1412            Some(0.0)
1413        );
1414    }
1415}