saluki_components/transforms/aggregate/
mod.rs

1use std::{
2    num::NonZeroU64,
3    time::{Duration, Instant},
4};
5
6use async_trait::async_trait;
7use ddsketch_agent::DDSketch;
8use hashbrown::{hash_map::Entry, HashMap};
9use memory_accounting::{MemoryBounds, MemoryBoundsBuilder, UsageExpr};
10use saluki_common::time::get_unix_timestamp;
11use saluki_config::GenericConfiguration;
12use saluki_context::Context;
13use saluki_core::{
14    components::{transforms::*, ComponentContext},
15    data_model::event::{metric::*, Event, EventType},
16    observability::ComponentMetricsExt as _,
17    topology::{interconnect::BufferedDispatcher, OutputDefinition},
18    topology::{EventsBuffer, EventsDispatcher},
19};
20use saluki_error::GenericError;
21use saluki_metrics::MetricsBuilder;
22use serde::Deserialize;
23use smallvec::SmallVec;
24use tokio::{
25    select,
26    time::{interval, interval_at},
27};
28use tracing::{debug, error, trace};
29
30mod telemetry;
31use self::telemetry::Telemetry;
32
33mod config;
34use self::config::HistogramConfiguration;
35
36const PASSTHROUGH_IDLE_FLUSH_CHECK_INTERVAL: Duration = Duration::from_secs(2);
37
38const fn default_window_duration() -> Duration {
39    Duration::from_secs(10)
40}
41
42const fn default_primary_flush_interval() -> Duration {
43    Duration::from_secs(15)
44}
45
46const fn default_context_limit() -> usize {
47    5000
48}
49
50const fn default_counter_expiry_seconds() -> Option<u64> {
51    Some(300)
52}
53
54const fn default_passthrough_timestamped_metrics() -> bool {
55    true
56}
57
58const fn default_passthrough_idle_flush_timeout() -> Duration {
59    Duration::from_secs(1)
60}
61
62/// Aggregate transform.
63///
64/// Aggregates metrics into fixed-size windows, flushing them at a regular interval.
65///
66/// ## Zero-value counters
67///
68/// When metrics are aggregated and then flushed, they are typically removed entirely from the aggregation state. Unless
69/// they are updated again, they will not be emitted again. However, for counters, a slightly different approach is
70/// taken by tracking "zero-value" counters.
71///
72/// Counters are aggregated and flushed normally. However, when flushed, counters are added to a list of "zero-value"
73/// counters, and if those counters are not updated again, the transform emits a copy of the counter with a value of
74/// zero. It does this until the counter is updated again, or the zero-value counter expires (no updates), whichever
75/// comes first.
76///
77/// This provides a continuity in the output of a counter, from the perspective of a downstream system, when counters
78/// are otherwise sparse. The expiration period is configurable, and allows a trade-off in how sparse/infrequent the
79/// updates to counters can be versus how long it takes for counters that don't exist anymore to actually cease to be
80/// emitted.
81#[derive(Deserialize)]
82pub struct AggregateConfiguration {
83    /// Size of the aggregation window.
84    ///
85    /// Metrics are aggregated into fixed-size windows, such that all updates to the same metric within a window are
86    /// aggregated into a single metric. The window size controls how efficiently metrics are aggregated, and in turn,
87    /// how many data points are emitted downstream.
88    ///
89    /// Defaults to 10 seconds.
90    #[serde(rename = "aggregate_window_duration", default = "default_window_duration")]
91    window_duration: Duration,
92
93    /// How often to flush buckets.
94    ///
95    /// This represents a trade-off between the savings in network bandwidth (sending fewer requests to downstream
96    /// systems, etc) and the frequency of updates (how often updates to a metric are emitted).
97    ///
98    /// Defaults to 15 seconds.
99    #[serde(rename = "aggregate_flush_interval", default = "default_primary_flush_interval")]
100    primary_flush_interval: Duration,
101
102    /// Maximum number of contexts to aggregate per window.
103    ///
104    /// A context is the unique combination of a metric name and its set of tags. For example,
105    /// `metric.name.here{tag1=A,tag2=B}` represents a single context, and would be different than
106    /// `metric.name.here{tag1=A,tag2=C}`.
107    ///
108    /// When the maximum number of contexts is reached in the current aggregation window, additional metrics are dropped
109    /// until the next window starts.
110    ///
111    /// Defaults to 1000.
112    #[serde(rename = "aggregate_context_limit", default = "default_context_limit")]
113    context_limit: usize,
114
115    /// Whether to flush open buckets when stopping the transform.
116    ///
117    /// Normally, open buckets (a bucket whose end has not yet occurred) are not flushed when the transform is stopped.
118    /// This is done to avoid the chance of flushing a partial window, restarting the process, and then flushing the
119    /// same window again. Downstream systems sometimes cannot cope with this gracefully, as there is no way to
120    /// determine that it is an incremental update, and so they treat it as an absolute update, overwriting the
121    /// previously flushed value.
122    ///
123    /// In cases where flushing all outstanding data is paramount, this can be enabled.
124    ///
125    /// Defaults to `false`.
126    #[serde(rename = "aggregate_flush_open_windows", default)]
127    flush_open_windows: bool,
128
129    /// How long to keep idle counters alive after they've been flushed, in seconds.
130    ///
131    /// When metrics are flushed, they are removed from the aggregation state. However, if a counter expiration is set,
132    /// counters will be kept alive in an "idle" state. For as long as a counter is idle, but not yet expired, a zero
133    /// value will be emitted for it during each flush. This allows more gracefully handling sparse counters, where
134    /// updates are infrequent but leaving gaps in the time series would be undesirable from a user experience
135    /// perspective.
136    ///
137    /// After a counter has been idle (no updates) for longer than the expiry period, it will be completely removed and
138    /// no further zero values will be emitted.
139    ///
140    /// Defaults to 300 seconds (5 minutes). Setting a value of `0` disables idle counter keep-alive.
141    #[serde(alias = "dogstatsd_expiry_seconds", default = "default_counter_expiry_seconds")]
142    counter_expiry_seconds: Option<u64>,
143
144    /// Whether or not to immediately forward (passthrough) metrics with pre-defined timestamps.
145    ///
146    /// When enabled, this causes the aggregator to immediately forward metrics that already have a timestamp present.
147    /// Only metrics without a timestamp will be aggregated. This can be useful when metrics are already pre-aggregated
148    /// client-side and both timeliness and memory efficiency are paramount, as it avoids the overhead of aggregating
149    /// within the pipeline.
150    ///
151    /// Defaults to `true`.
152    #[serde(
153        rename = "dogstatsd_no_aggregation_pipeline",
154        default = "default_passthrough_timestamped_metrics"
155    )]
156    passthrough_timestamped_metrics: bool,
157
158    /// How often to flush buffered passthrough metrics.
159    ///
160    /// While passthrough metrics are not re-aggregated by the transform, they will still be temporarily buffered in
161    /// order to optimize the efficiency of processing them in the next component. This setting controls the maximum
162    /// amount of time that passthrough metrics will be buffered before being forwarded.
163    ///
164    /// Defaults to 1 seconds.
165    #[serde(
166        rename = "aggregate_passthrough_idle_flush_timeout",
167        default = "default_passthrough_idle_flush_timeout"
168    )]
169    passthrough_idle_flush_timeout: Duration,
170
171    /// Histogram aggregation configuration.
172    ///
173    /// Controls the aggregates/percentiles that are generated for distributions in "histogram" mode (client-side
174    /// distribution aggregation).
175    #[serde(flatten)]
176    hist_config: HistogramConfiguration,
177}
178
179impl AggregateConfiguration {
180    /// Creates a new `AggregateConfiguration` from the given configuration.
181    pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
182        Ok(config.as_typed()?)
183    }
184
185    /// Creates a new `AggregateConfiguration` with default values.
186    pub fn with_defaults() -> Self {
187        Self {
188            window_duration: default_window_duration(),
189            primary_flush_interval: default_primary_flush_interval(),
190            context_limit: default_context_limit(),
191            flush_open_windows: false,
192            counter_expiry_seconds: default_counter_expiry_seconds(),
193            passthrough_timestamped_metrics: default_passthrough_timestamped_metrics(),
194            passthrough_idle_flush_timeout: default_passthrough_idle_flush_timeout(),
195            hist_config: HistogramConfiguration::default(),
196        }
197    }
198}
199
200#[async_trait]
201impl TransformBuilder for AggregateConfiguration {
202    async fn build(&self, context: ComponentContext) -> Result<Box<dyn Transform + Send>, GenericError> {
203        let metrics_builder = MetricsBuilder::from_component_context(&context);
204        let telemetry = Telemetry::new(&metrics_builder);
205
206        let state = AggregationState::new(
207            self.window_duration,
208            self.context_limit,
209            self.counter_expiry_seconds.filter(|s| *s != 0).map(Duration::from_secs),
210            self.hist_config.clone(),
211            telemetry.clone(),
212        );
213
214        let passthrough_batcher = PassthroughBatcher::new(
215            self.passthrough_idle_flush_timeout,
216            self.window_duration,
217            telemetry.clone(),
218        )
219        .await;
220
221        Ok(Box::new(Aggregate {
222            state,
223            telemetry,
224            primary_flush_interval: self.primary_flush_interval,
225            flush_open_windows: self.flush_open_windows,
226            passthrough_batcher,
227            passthrough_timestamped_metrics: self.passthrough_timestamped_metrics,
228        }))
229    }
230
231    fn input_event_type(&self) -> EventType {
232        EventType::Metric
233    }
234
235    fn outputs(&self) -> &[OutputDefinition<EventType>] {
236        static OUTPUTS: &[OutputDefinition<EventType>] = &[OutputDefinition::default_output(EventType::Metric)];
237        OUTPUTS
238    }
239}
240
241impl MemoryBounds for AggregateConfiguration {
242    fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
243        // TODO: While we account for the aggregation state map accurately, what we don't currently account for is the
244        // fact that a metric could have multiple distinct values. For the common pipeline of metrics in via DogStatsD,
245        // this generally shouldn't be a problem because the values don't have a timestamp, so they get aggregated into
246        // the same bucket, leading to two values per `MetricValues` at most, which is already baked into the size of
247        // `MetricValues` due to using `SmallVec`.
248        //
249        // However, there could be many more values in a single metric, and we don't account for that.
250
251        builder
252            .minimum()
253            // Capture the size of the heap allocation when the component is built.
254            .with_single_value::<Aggregate>("component struct");
255        builder
256            .firm()
257            // Account for the aggregation state map, where we map contexts to the merged metric.
258            .with_expr(UsageExpr::product(
259                "aggregation state map",
260                UsageExpr::sum(
261                    "context map entry",
262                    UsageExpr::struct_size::<Context>("context"),
263                    UsageExpr::struct_size::<AggregatedMetric>("aggregated metric"),
264                ),
265                UsageExpr::config("aggregate_context_limit", self.context_limit),
266            ));
267    }
268}
269
270pub struct Aggregate {
271    state: AggregationState,
272    telemetry: Telemetry,
273    primary_flush_interval: Duration,
274    flush_open_windows: bool,
275    passthrough_batcher: PassthroughBatcher,
276    passthrough_timestamped_metrics: bool,
277}
278
279#[async_trait]
280impl Transform for Aggregate {
281    async fn run(mut self: Box<Self>, mut context: TransformContext) -> Result<(), GenericError> {
282        let mut health = context.take_health_handle();
283
284        let mut primary_flush = interval_at(
285            tokio::time::Instant::now() + self.primary_flush_interval,
286            self.primary_flush_interval,
287        );
288        let mut final_primary_flush = false;
289
290        let passthrough_flush = interval(PASSTHROUGH_IDLE_FLUSH_CHECK_INTERVAL);
291
292        health.mark_ready();
293        debug!("Aggregation transform started.");
294
295        tokio::pin!(passthrough_flush);
296
297        loop {
298            select! {
299                _ = health.live() => continue,
300                _ = primary_flush.tick() => {
301                    // We've reached the end of the current window. Flush our aggregation state and forward the metrics
302                    // onwards. Regardless of whether any metrics were aggregated, we always update the aggregation
303                    // state to track the start time of the current aggregation window.
304                    if !self.state.is_empty() {
305                        debug!("Flushing aggregated metrics...");
306
307                        let should_flush_open_windows = final_primary_flush && self.flush_open_windows;
308
309                        let mut dispatcher = context.dispatcher().buffered().expect("default output should always exist");
310                        if let Err(e) = self.state.flush(get_unix_timestamp(), should_flush_open_windows, &mut dispatcher).await {
311                            error!(error = %e, "Failed to flush aggregation state.");
312                        }
313
314                        self.telemetry.increment_flushes();
315
316                        match dispatcher.flush().await {
317                            Ok(aggregated_events) => debug!(aggregated_events, "Dispatched events."),
318                            Err(e) => error!(error = %e, "Failed to flush aggregated events."),
319                        }
320                    }
321
322                    // If this is the final flush, we break out of the loop.
323                    if final_primary_flush {
324                        debug!("All aggregation complete.");
325                        break
326                    }
327                },
328                _ = passthrough_flush.tick() => self.passthrough_batcher.try_flush(context.dispatcher()).await,
329                maybe_events = context.events().next(), if !final_primary_flush => match maybe_events {
330                    Some(events) => {
331                        trace!(events_len = events.len(), "Received events.");
332
333                        let current_time = get_unix_timestamp();
334                        let mut processed_passthrough_metrics = false;
335
336                        for event in events {
337                            if let Some(metric) = event.try_into_metric() {
338                                let metric = if self.passthrough_timestamped_metrics {
339                                    // Try splitting out any timestamped values, and if we have any, we'll buffer them
340                                    // separately and process the remaining nontimestamped metric (if any) by
341                                    // aggregating it like normal.
342                                    let (maybe_timestamped_metric, maybe_nontimestamped_metric) = try_split_timestamped_values(metric);
343
344                                    // If we have a timestamped metric, then batch it up out-of-band.
345                                    if let Some(timestamped_metric) = maybe_timestamped_metric {
346                                        self.passthrough_batcher.push_metric(timestamped_metric, context.dispatcher()).await;
347                                        processed_passthrough_metrics = true;
348                                    }
349
350                                    // If we have an nontimestamped metric, we'll process it like normal.
351                                    //
352                                    // Otherwise, continue to the next event.
353                                    match maybe_nontimestamped_metric {
354                                        Some(metric) => metric,
355                                        None => continue,
356                                    }
357                                } else {
358                                    metric
359                                };
360
361                                if !self.state.insert(current_time, metric) {
362                                    trace!("Dropping metric due to context limit.");
363                                    self.telemetry.increment_events_dropped();
364                                }
365                            }
366                        }
367
368                        if processed_passthrough_metrics {
369                            self.passthrough_batcher.update_last_processed_at();
370                        }
371                    },
372                    None => {
373                        // We've reached the end of our input stream, so mark ourselves for a final flush and reset the
374                        // interval so it ticks immediately on the next loop iteration.
375                        final_primary_flush = true;
376                        primary_flush.reset_immediately();
377
378                        debug!("Aggregation transform stopping...");
379                    }
380                },
381            }
382        }
383
384        // Do a final flush of any timestamped metrics that we've buffered up.
385        self.passthrough_batcher.try_flush(context.dispatcher()).await;
386
387        debug!("Aggregation transform stopped.");
388
389        Ok(())
390    }
391}
392
393fn try_split_timestamped_values(mut metric: Metric) -> (Option<Metric>, Option<Metric>) {
394    if metric.values().all_timestamped() {
395        (Some(metric), None)
396    } else if metric.values().any_timestamped() {
397        // Only _some_ of the values are timestamped, so we'll split the timestamped values into a new metric.
398        let new_metric_values = metric.values_mut().split_timestamped();
399        let new_metric = Metric::from_parts(metric.context().clone(), new_metric_values, metric.metadata().clone());
400
401        (Some(new_metric), Some(metric))
402    } else {
403        // No timestamped values, so we need to aggregate this metric.
404        (None, Some(metric))
405    }
406}
407
408struct PassthroughBatcher {
409    active_buffer: EventsBuffer,
410    active_buffer_start: Instant,
411    last_processed_at: Instant,
412    idle_flush_timeout: Duration,
413    bucket_width: Duration,
414    telemetry: Telemetry,
415}
416
417impl PassthroughBatcher {
418    async fn new(idle_flush_timeout: Duration, bucket_width: Duration, telemetry: Telemetry) -> Self {
419        let active_buffer = EventsBuffer::default();
420
421        Self {
422            active_buffer,
423            active_buffer_start: Instant::now(),
424            last_processed_at: Instant::now(),
425            idle_flush_timeout,
426            bucket_width,
427            telemetry,
428        }
429    }
430
431    async fn push_metric(&mut self, metric: Metric, dispatcher: &EventsDispatcher) {
432        // Convert counters to rates before we batch them up.
433        //
434        // This involves specifying the rate interval as the bucket width of the aggregate transform itself, which when
435        // you say it out loud is sort of confusing and nonsensical since the whole point is that these are
436        // _pre-aggregated_ metrics but we have to match the behavior of the Datadog Agent. ¯\_(ツ)_/¯
437        let (context, values, metadata) = metric.into_parts();
438        let adjusted_values = counter_values_to_rate(values, self.bucket_width);
439        let metric = Metric::from_parts(context, adjusted_values, metadata);
440
441        // Try pushing the metric into our active buffer.
442        //
443        // If our active buffer is full, then we'll flush the buffer, grab a new one, and push the metric into it.
444        if let Some(event) = self.active_buffer.try_push(Event::Metric(metric)) {
445            debug!("Passthrough event buffer was full. Flushing...");
446            self.dispatch_events(dispatcher).await;
447
448            if self.active_buffer.try_push(event).is_some() {
449                error!("Event buffer is full even after dispatching events. Dropping event.");
450                self.telemetry.increment_events_dropped();
451                return;
452            }
453        }
454
455        // If this is the first metric in the buffer, we've started a new batch, so track when it started.
456        if self.active_buffer.len() == 1 {
457            self.active_buffer_start = Instant::now();
458        }
459
460        self.telemetry.increment_passthrough_metrics();
461    }
462
463    fn update_last_processed_at(&mut self) {
464        // We expose this as a standalone method, rather than just doing it automatically in `push_metric`, because
465        // otherwise we might be calling this 10-20K times per second, instead of simply doing it after the end of each
466        // input event buffer in the transform's main loop, which should be much less frequent.
467        self.last_processed_at = Instant::now();
468    }
469
470    async fn try_flush(&mut self, dispatcher: &EventsDispatcher) {
471        // If our active buffer isn't empty, and we've exceeded our idle flush timeout, then flush the buffer.
472        if !self.active_buffer.is_empty() && self.last_processed_at.elapsed() >= self.idle_flush_timeout {
473            debug!("Passthrough processing exceeded idle flush timeout. Flushing...");
474
475            self.dispatch_events(dispatcher).await;
476        }
477    }
478
479    async fn dispatch_events(&mut self, dispatcher: &EventsDispatcher) {
480        if !self.active_buffer.is_empty() {
481            let unaggregated_events = self.active_buffer.len();
482
483            // Track how long this batch was alive for.
484            let batch_duration = self.active_buffer_start.elapsed();
485            self.telemetry.record_passthrough_batch_duration(batch_duration);
486
487            self.telemetry.increment_passthrough_flushes();
488
489            // Swap our active buffer with a new, empty one, and then forward the old one.
490            let new_active_buffer = EventsBuffer::default();
491            let old_active_buffer = std::mem::replace(&mut self.active_buffer, new_active_buffer);
492
493            match dispatcher.dispatch(old_active_buffer).await {
494                Ok(()) => debug!(unaggregated_events, "Dispatched events."),
495                Err(e) => error!(error = %e, "Failed to flush unaggregated events."),
496            }
497        }
498    }
499}
500
501#[derive(Clone)]
502struct AggregatedMetric {
503    values: MetricValues,
504    metadata: MetricMetadata,
505    last_seen: u64,
506}
507
508struct AggregationState {
509    contexts: HashMap<Context, AggregatedMetric, foldhash::quality::RandomState>,
510    contexts_remove_buf: Vec<Context>,
511    context_limit: usize,
512    bucket_width_secs: u64,
513    counter_expire_secs: Option<NonZeroU64>,
514    last_flush: u64,
515    hist_config: HistogramConfiguration,
516    telemetry: Telemetry,
517}
518
519impl AggregationState {
520    fn new(
521        bucket_width: Duration, context_limit: usize, counter_expiration: Option<Duration>,
522        hist_config: HistogramConfiguration, telemetry: Telemetry,
523    ) -> Self {
524        let counter_expire_secs = counter_expiration.map(|d| d.as_secs()).and_then(NonZeroU64::new);
525
526        Self {
527            contexts: HashMap::default(),
528            contexts_remove_buf: Vec::new(),
529            context_limit,
530            bucket_width_secs: bucket_width.as_secs(),
531            counter_expire_secs,
532            last_flush: 0,
533            hist_config,
534            telemetry,
535        }
536    }
537
538    fn is_empty(&self) -> bool {
539        self.contexts.is_empty()
540    }
541
542    fn insert(&mut self, timestamp: u64, metric: Metric) -> bool {
543        // If we haven't seen this context yet, and it would put us over the limit to insert it, then return early.
544        if !self.contexts.contains_key(metric.context()) && self.contexts.len() >= self.context_limit {
545            return false;
546        }
547
548        let (context, mut values, metadata) = metric.into_parts();
549
550        // Collapse all non-timestamped values into a single timestamped value.
551        //
552        // We do this pre-aggregation step because unless we're merging into an existing context, we'll end up with
553        // however many values were in the original metric instead of full aggregated values.
554        let bucket_ts = align_to_bucket_start(timestamp, self.bucket_width_secs);
555        values.collapse_non_timestamped(bucket_ts);
556
557        trace!(
558            bucket_ts,
559            kind = values.as_str(),
560            "Inserting metric into aggregation state."
561        );
562
563        // If we're already tracking this context, update the last seen time and merge the new values into the existing
564        // values. Otherwise, create a new entry.
565        match self.contexts.entry(context) {
566            Entry::Occupied(mut entry) => {
567                let aggregated = entry.get_mut();
568
569                // We ignore metadata changes within a flush interval to keep things simple.
570                aggregated.last_seen = timestamp;
571                aggregated.values.merge(values);
572            }
573            Entry::Vacant(entry) => {
574                self.telemetry.increment_contexts(entry.key(), &values);
575
576                entry.insert(AggregatedMetric {
577                    values,
578                    metadata,
579                    last_seen: timestamp,
580                });
581            }
582        }
583
584        true
585    }
586
587    async fn flush(
588        &mut self, current_time: u64, flush_open_buckets: bool, dispatcher: &mut BufferedDispatcher<'_, EventsBuffer>,
589    ) -> Result<(), GenericError> {
590        let bucket_width_secs = self.bucket_width_secs;
591        let counter_expire_secs = self.counter_expire_secs.map(|d| d.get()).unwrap_or(0);
592
593        // We want our split timestamp to be before the start of the current bucket, which ensures any timestamp that is
594        // less than or equal to `split_timestamp` resides in a closed bucket.
595        let split_timestamp = align_to_bucket_start(current_time, bucket_width_secs).saturating_sub(1);
596
597        // Calculate the buckets we need to potentially generate zero-value counters for.
598        //
599        // We only need to do this if we've flushed before, since we won't have any knowledge of which counters are idle
600        // or not until that happens.
601        let mut zero_value_buckets = SmallVec::<[(u64, MetricValues); 4]>::new();
602        if self.last_flush != 0 {
603            let start = align_to_bucket_start(self.last_flush, bucket_width_secs);
604
605            for bucket_start in (start..current_time).step_by(bucket_width_secs as usize) {
606                if is_bucket_closed(current_time, bucket_start, bucket_width_secs, flush_open_buckets) {
607                    zero_value_buckets.push((bucket_start, MetricValues::counter((bucket_start, 0.0))));
608                }
609            }
610        }
611
612        // Iterate over each context we're tracking, and flush any values that are in buckets which are now closed.
613        debug!(timestamp = current_time, "Flushing buckets.");
614
615        for (context, am) in self.contexts.iter_mut() {
616            // Figure out if we should remove this metric or not if it has no values in open buckets.
617            //
618            // We have a special carve-out for counters here, which we have the ability to keep alive after they are
619            // flushed, based on a configured expiration period. This allows us to continue emitting a zero value for
620            // counters when they're idle, which can make them appear "live" in downstream systems, even when they're
621            // not.
622            //
623            // This is useful for sparsely-updated counters.
624            let should_expire_if_empty = match &am.values {
625                MetricValues::Counter(..) => {
626                    counter_expire_secs != 0 && am.last_seen + counter_expire_secs < current_time
627                }
628                _ => true,
629            };
630
631            // If we're dealing with a counter, we'll merge in our calculated set of zero values. We only merge in the
632            // values that represent now-closed buckets.
633            //
634            // This is also safe to do even when there are real values in those buckets since adding zero to anything is
635            // a no-op from the perspective of what we end up flushing, and it doesn't mess with the "last seen" time.
636            if let MetricValues::Counter(..) = &mut am.values {
637                let expires_at = am.last_seen + counter_expire_secs;
638                for (zv_bucket_start, zero_value) in &zero_value_buckets {
639                    if expires_at > *zv_bucket_start {
640                        am.values.merge(zero_value.clone());
641                    } else {
642                        // Since zero-value buckets are in order, we can break early if this bucket is past the
643                        // expiration cutoff of the counter. No other bucket will be within the expiration range.
644                        break;
645                    }
646                }
647            }
648
649            // Finally, figure out if the current metric can be removed.
650            //
651            // For any metric with values that are in open buckets, we split off the values that are in closed buckets
652            // and keep the metric alive. When all the values are in closed buckets, or there are no values, we'll
653            // remove the metric if `should_remove_if_empty` is `true`.
654            //
655            // This means we'll always remove all-closed/empty non-counter metrics, and we _may_ remove all-closed/empty
656            // counters.
657            if let Some(closed_bucket_values) = am.values.split_at_timestamp(split_timestamp) {
658                self.telemetry.increment_flushed(&closed_bucket_values);
659
660                // We got some closed bucket values, so flush those out.
661                transform_and_push_metric(
662                    context.clone(),
663                    closed_bucket_values,
664                    am.metadata.clone(),
665                    bucket_width_secs,
666                    &self.hist_config,
667                    dispatcher,
668                )
669                .await?;
670            }
671
672            if am.values.is_empty() && should_expire_if_empty {
673                self.telemetry.decrement_contexts(context, &am.values);
674                self.contexts_remove_buf.push(context.clone());
675            }
676        }
677
678        // Remove any contexts that were marked as needing to be removed.
679        let contexts_len_before = self.contexts.len();
680        for context in self.contexts_remove_buf.drain(..) {
681            self.contexts.remove(&context);
682        }
683        let contexts_len_after = self.contexts.len();
684
685        let contexts_delta = contexts_len_before.saturating_sub(contexts_len_after);
686        let target_contexts_capacity = contexts_len_after.saturating_add(contexts_delta / 2);
687        self.contexts.shrink_to(target_contexts_capacity);
688
689        self.last_flush = current_time;
690
691        Ok(())
692    }
693}
694
695async fn transform_and_push_metric(
696    context: Context, mut values: MetricValues, metadata: MetricMetadata, bucket_width_secs: u64,
697    hist_config: &HistogramConfiguration, dispatcher: &mut BufferedDispatcher<'_, EventsBuffer>,
698) -> Result<(), GenericError> {
699    let bucket_width = Duration::from_secs(bucket_width_secs);
700
701    match values {
702        // If we're dealing with a histogram, we calculate a configured set of aggregates/percentiles from it, and emit
703        // them as individual metrics.
704        MetricValues::Histogram(ref mut points) => {
705            // Convert histogram to distribution
706            if hist_config.copy_to_distribution() {
707                let sketch_points = points
708                    .into_iter()
709                    .map(|(ts, hist)| {
710                        let mut sketch = DDSketch::default();
711                        for sample in hist.samples() {
712                            sketch.insert_n(sample.value.into_inner(), sample.weight as u32);
713                        }
714                        (ts, sketch)
715                    })
716                    .collect::<SketchPoints>();
717                let distribution_values = MetricValues::distribution(sketch_points);
718                let metric_context = if !hist_config.copy_to_distribution_prefix().is_empty() {
719                    context.with_name(format!(
720                        "{}{}",
721                        hist_config.copy_to_distribution_prefix(),
722                        context.name()
723                    ))
724                } else {
725                    context.clone()
726                };
727                let new_metric = Metric::from_parts(metric_context, distribution_values, metadata.clone());
728                dispatcher.push(Event::Metric(new_metric)).await?;
729            }
730            // We collect our histogram points in their "summary" view, which sorts the underlying samples allowing
731            // proper quantile queries to be answered, hence our "sorted" points. We do it this way because rather than
732            // sort every time we insert, or cloning the points, we only sort when a summary view is constructed, which
733            // requires mutable access to sort the samples in-place.
734            let mut sorted_points = Vec::new();
735            for (ts, h) in points {
736                sorted_points.push((ts, h.summary_view()));
737            }
738
739            for statistic in hist_config.statistics() {
740                let new_points = sorted_points
741                    .iter()
742                    .map(|(ts, hs)| (*ts, statistic.value_from_histogram(hs)))
743                    .collect::<ScalarPoints>();
744
745                let new_values = if statistic.is_rate_statistic() {
746                    MetricValues::rate(new_points, bucket_width)
747                } else {
748                    MetricValues::gauge(new_points)
749                };
750
751                let new_context = context.with_name(format!("{}.{}", context.name(), statistic.suffix()));
752                let new_metric = Metric::from_parts(new_context, new_values, metadata.clone());
753                dispatcher.push(Event::Metric(new_metric)).await?;
754            }
755
756            Ok(())
757        }
758
759        // If we're not dealing with a histogram, then all we need to worry about is converting counters to rates before
760        // forwarding our single, aggregated metric.
761        values => {
762            let adjusted_values = counter_values_to_rate(values, bucket_width);
763
764            let metric = Metric::from_parts(context, adjusted_values, metadata);
765            dispatcher.push(Event::Metric(metric)).await
766        }
767    }
768}
769
770fn counter_values_to_rate(values: MetricValues, interval: Duration) -> MetricValues {
771    match values {
772        MetricValues::Counter(points) => MetricValues::rate(points, interval),
773        values => values,
774    }
775}
776
777const fn align_to_bucket_start(timestamp: u64, bucket_width_secs: u64) -> u64 {
778    timestamp - (timestamp % bucket_width_secs)
779}
780
781const fn is_bucket_closed(
782    current_time: u64, bucket_start: u64, bucket_width_secs: u64, flush_open_buckets: bool,
783) -> bool {
784    // A bucket is considered "closed" if the current time is greater than the end of the bucket, or if
785    // `flush_open_buckets` is `true`.
786    //
787    // Buckets represent a half-open interval, where the start is inclusive and the end is exclusive. This means that
788    // for a bucket start of 10, and a width of 10, the bucket is 10 seconds "wide", and its start and end are 10 and
789    // 20, with the 20 excluded, or [10, 20) in interval notation. Simply put, if we have a timestamp of 10, or anything
790    // smaller than 20, we would consider it to fall within the bucket... but 20 or more would be outside of the bucket.
791    //
792    // We can also represent this visually:
793    //
794    // <--------- bucket 1 ----------> <--------- bucket 2 ----------> <--------- bucket 3 ---------->
795    // [10 11 12 13 14 15 16 17 18 19] [20 21 22 23 24 25 26 27 28 29] [30 31 32 33 34 35 36 37 38 39]
796    //
797    // We can see that each bucket is 10 seconds wide (10 elements, one for each second), and that their ends are
798    // effectively `start + width - 1`. This means that for any of these buckets to be considered "closed", the current
799    // time has to be _greater_ than `start + width - 1`. For example, if the current time is 19, then no buckets are
800    // closed, and if the current time is 29, then bucket 1 is closed but buckets 2 and 3 are still open, and if the
801    // current time is 30, then both buckets 1 and 2 are closed, but bucket 3 is still open.
802    (bucket_start + bucket_width_secs - 1) < current_time || flush_open_buckets
803}
804
805// TODO: One thing we ought to consider is a property test, specifically a state machine property test, where we
806// generate a randomized offset to start time from, a bucket width, flush interval, and operations, and so on... and
807// then we run it to make sure that we are always generating sequential timestamps for data points, etc.
808#[cfg(test)]
809mod tests {
810    use float_cmp::ApproxEqRatio as _;
811    use saluki_core::{
812        components::ComponentContext,
813        topology::{interconnect::Dispatcher, ComponentId, OutputName},
814    };
815    use saluki_metrics::test::TestRecorder;
816    use tokio::sync::mpsc;
817
818    use super::config::HistogramStatistic;
819    use super::*;
820
821    const BUCKET_WIDTH_SECS: u64 = 10;
822    const BUCKET_WIDTH: Duration = Duration::from_secs(BUCKET_WIDTH_SECS);
823    const COUNTER_EXPIRE_SECS: u64 = 20;
824    const COUNTER_EXPIRE: Option<Duration> = Some(Duration::from_secs(COUNTER_EXPIRE_SECS));
825
826    /// Gets the bucket start timestamp for the given step.
827    const fn bucket_ts(step: u64) -> u64 {
828        align_to_bucket_start(insert_ts(step), BUCKET_WIDTH_SECS)
829    }
830
831    /// Gets the insert timestamp for the given step.
832    const fn insert_ts(step: u64) -> u64 {
833        (BUCKET_WIDTH_SECS * (step + 1)) - 2
834    }
835
836    /// Gets the flush timestamp for the given step.
837    const fn flush_ts(step: u64) -> u64 {
838        BUCKET_WIDTH_SECS * (step + 1)
839    }
840
841    struct DispatcherReceiver {
842        receiver: mpsc::Receiver<EventsBuffer>,
843    }
844
845    impl DispatcherReceiver {
846        fn collect_next(&mut self) -> Vec<Metric> {
847            match self.receiver.try_recv() {
848                Ok(event_buffer) => {
849                    let mut metrics = event_buffer
850                        .into_iter()
851                        .filter_map(|event| event.try_into_metric())
852                        .collect::<Vec<Metric>>();
853
854                    metrics.sort_by(|a, b| a.context().name().cmp(b.context().name()));
855                    metrics
856                }
857                Err(_) => Vec::new(),
858            }
859        }
860    }
861
862    /// Constructs a basic `Dispatcher` with a fixed-size event buffer.
863    fn build_basic_dispatcher() -> (EventsDispatcher, DispatcherReceiver) {
864        let component_id = ComponentId::try_from("test").expect("should not fail to create component ID");
865        let mut dispatcher = Dispatcher::new(ComponentContext::transform(component_id));
866
867        let (buffer_tx, buffer_rx) = mpsc::channel(1);
868        dispatcher.add_output(OutputName::Default).unwrap();
869        dispatcher
870            .attach_sender_to_output(&OutputName::Default, buffer_tx)
871            .unwrap();
872
873        (dispatcher, DispatcherReceiver { receiver: buffer_rx })
874    }
875
876    async fn get_flushed_metrics(timestamp: u64, state: &mut AggregationState) -> Vec<Metric> {
877        let (dispatcher, mut dispatcher_receiver) = build_basic_dispatcher();
878        let mut buffered_dispatcher = dispatcher.buffered().expect("default output should always exist");
879
880        // Flush the metrics to an event buffer.
881        state
882            .flush(timestamp, true, &mut buffered_dispatcher)
883            .await
884            .expect("should not fail to flush aggregation state");
885
886        // Flush our buffered dispatcher, which should ensure that the event buffer is sent out, and then read it from the
887        // receiver:
888        buffered_dispatcher
889            .flush()
890            .await
891            .expect("should not fail to flush buffered sender");
892
893        dispatcher_receiver.collect_next()
894    }
895
896    macro_rules! compare_points {
897        (scalar, $expected:expr, $actual:expr, $error_ratio:literal) => {
898            for (idx, (expected_value, actual_value)) in $expected.into_iter().zip($actual.into_iter()).enumerate() {
899                let (expected_ts, expected_point) = expected_value;
900                let (actual_ts, actual_point) = actual_value;
901
902                assert_eq!(
903                    expected_ts, actual_ts,
904                    "timestamp for value #{} does not match: {:?} (expected) vs {:?} (actual)",
905                    idx, expected_ts, actual_ts
906                );
907                assert!(
908                    expected_point.approx_eq_ratio(&actual_point, $error_ratio),
909                    "point for value #{} does not match: {} (expected) vs {} (actual)",
910                    idx,
911                    expected_point,
912                    actual_point
913                );
914            }
915        };
916        (distribution, $expected:expr, $actual:expr) => {
917            for (idx, (expected_value, actual_value)) in $expected.into_iter().zip($actual.into_iter()).enumerate() {
918                let (expected_ts, expected_sketch) = expected_value;
919                let (actual_ts, actual_sketch) = actual_value;
920
921                assert_eq!(
922                    expected_ts, actual_ts,
923                    "timestamp for value #{} does not match: {:?} (expected) vs {:?} (actual)",
924                    idx, expected_ts, actual_ts
925                );
926                assert_eq!(
927                    expected_sketch, actual_sketch,
928                    "sketch for value #{} does not match: {:?} (expected) vs {:?} (actual)",
929                    idx, expected_sketch, actual_sketch
930                );
931            }
932        };
933    }
934
935    macro_rules! assert_flushed_scalar_metric {
936        ($original:expr, $actual:expr, [$($ts:expr => $value:expr),+]) => {
937            assert_flushed_scalar_metric!($original, $actual, [$($ts => $value),+], error_ratio => 0.000001);
938        };
939        ($original:expr, $actual:expr, [$($ts:expr => $value:expr),+], error_ratio => $error_ratio:literal) => {
940            let actual_metric = $actual;
941
942            assert_eq!($original.context(), actual_metric.context(), "expected context ({}) and actual context ({}) do not match", $original.context(), actual_metric.context());
943
944            let expected_points = ScalarPoints::from([$(($ts, $value)),+]);
945
946            match actual_metric.values() {
947                MetricValues::Counter(ref actual_points) | MetricValues::Gauge(ref actual_points) | MetricValues::Rate(ref actual_points, _) => {
948                    assert_eq!(expected_points.len(), actual_points.len(), "expected and actual values have different number of points");
949                    compare_points!(scalar, expected_points, actual_points, $error_ratio);
950                },
951                _ => panic!("only counters, rates, and gauges are supported in assert_flushed_scalar_metric"),
952            }
953        };
954    }
955
956    macro_rules! assert_flushed_distribution_metric {
957        ($original:expr, $actual:expr, [$($ts:expr => $value:expr),+]) => {
958            assert_flushed_distribution_metric!($original, $actual, [$($ts => $value),+], error_ratio => 0.000001);
959        };
960        ($original:expr, $actual:expr, [$($ts:expr => $value:expr),+], error_ratio => $error_ratio:literal) => {
961            let actual_metric = $actual;
962
963            assert_eq!($original.context(), actual_metric.context());
964
965            match actual_metric.values() {
966                MetricValues::Distribution(ref actual_points) => {
967                    let expected_points = SketchPoints::from([$(($ts, $value)),+]);
968                    assert_eq!(expected_points.len(), actual_points.len(), "expected and actual values have different number of points");
969
970                    compare_points!(distribution, &expected_points, actual_points);
971                },
972                _ => panic!("only distributions are supported in assert_flushed_distribution_metric"),
973            }
974        };
975    }
976
977    #[test]
978    fn bucket_is_closed() {
979        // Cases are defined as:
980        // (current time, bucket start, bucket width, flush open buckets, expected result)
981        let cases = [
982            // Bucket goes from [995, 1005), current time of 1000, so bucket is open.
983            (1000, 995, 10, false, false),
984            (1000, 995, 10, true, true),
985            // Bucket goes from [1000, 1010), current time of 1000, so bucket is open.
986            (1000, 1000, 10, false, false),
987            (1000, 1000, 10, true, true),
988            // Bucket goes from [1000, 1010), current time of 1010, so bucket is closed.
989            (1010, 1000, 10, false, true),
990            (1010, 1000, 10, true, true),
991        ];
992
993        for (current_time, bucket_start, bucket_width_secs, flush_open_buckets, expected) in cases {
994            let expected_reason = if expected {
995                "closed, was open"
996            } else {
997                "open, was closed"
998            };
999
1000            assert_eq!(
1001                is_bucket_closed(current_time, bucket_start, bucket_width_secs, flush_open_buckets),
1002                expected,
1003                "expected bucket to be {} (current_time={}, bucket_start={}, bucket_width={}, flush_open_buckets={})",
1004                expected_reason,
1005                current_time,
1006                bucket_start,
1007                bucket_width_secs,
1008                flush_open_buckets
1009            );
1010        }
1011    }
1012
1013    #[tokio::test]
1014    async fn context_limit() {
1015        // Create our aggregation state with a context limit of 2.
1016        let mut state = AggregationState::new(
1017            BUCKET_WIDTH,
1018            2,
1019            COUNTER_EXPIRE,
1020            HistogramConfiguration::default(),
1021            Telemetry::noop(),
1022        );
1023
1024        // Create four unique gauges, and insert all of them. The third and fourth should fail because we've reached
1025        // the context limit.
1026        let input_metrics = vec![
1027            Metric::gauge("metric1", 1.0),
1028            Metric::gauge("metric2", 2.0),
1029            Metric::gauge("metric3", 3.0),
1030            Metric::gauge("metric4", 4.0),
1031        ];
1032
1033        assert!(state.insert(insert_ts(1), input_metrics[0].clone()));
1034        assert!(state.insert(insert_ts(1), input_metrics[1].clone()));
1035        assert!(!state.insert(insert_ts(1), input_metrics[2].clone()));
1036        assert!(!state.insert(insert_ts(1), input_metrics[3].clone()));
1037
1038        // We should only see the first two gauges after flushing.
1039        let flushed_metrics = get_flushed_metrics(flush_ts(1), &mut state).await;
1040        assert_eq!(flushed_metrics.len(), 2);
1041        assert_eq!(input_metrics[0].context(), flushed_metrics[0].context());
1042        assert_eq!(input_metrics[1].context(), flushed_metrics[1].context());
1043
1044        // We should be able to insert the third and fourth gauges now as the first two have been flushed, and along
1045        // with them, their contexts should no longer be tracked in the aggregation state:
1046        assert!(state.insert(insert_ts(2), input_metrics[2].clone()));
1047        assert!(state.insert(insert_ts(2), input_metrics[3].clone()));
1048
1049        let flushed_metrics = get_flushed_metrics(flush_ts(2), &mut state).await;
1050        assert_eq!(flushed_metrics.len(), 2);
1051        assert_eq!(input_metrics[2].context(), flushed_metrics[0].context());
1052        assert_eq!(input_metrics[3].context(), flushed_metrics[1].context());
1053    }
1054
1055    #[tokio::test]
1056    async fn context_limit_with_zero_value_counters() {
1057        // We test here to ensure that zero-value counters contribute to the context limit.
1058        let mut state = AggregationState::new(
1059            BUCKET_WIDTH,
1060            2,
1061            COUNTER_EXPIRE,
1062            HistogramConfiguration::default(),
1063            Telemetry::noop(),
1064        );
1065
1066        // Create our input metrics.
1067        let input_metrics = vec![
1068            Metric::counter("metric1", 1.0),
1069            Metric::counter("metric2", 2.0),
1070            Metric::counter("metric3", 3.0),
1071        ];
1072
1073        assert!(state.insert(insert_ts(1), input_metrics[0].clone()));
1074        assert!(state.insert(insert_ts(1), input_metrics[1].clone()));
1075
1076        // Flush the aggregation state, and observe they're both present.
1077        let flushed_metrics = get_flushed_metrics(flush_ts(1), &mut state).await;
1078        assert_eq!(flushed_metrics.len(), 2);
1079        assert_flushed_scalar_metric!(&input_metrics[0], &flushed_metrics[0], [bucket_ts(1) => 1.0]);
1080        assert_flushed_scalar_metric!(&input_metrics[1], &flushed_metrics[1], [bucket_ts(1) => 2.0]);
1081
1082        // Flush _again_ to ensure that we then emit zero-value variants for both counters.
1083        let flushed_metrics = get_flushed_metrics(flush_ts(2), &mut state).await;
1084        assert_eq!(flushed_metrics.len(), 2);
1085        assert_flushed_scalar_metric!(&input_metrics[0], &flushed_metrics[0], [bucket_ts(2) => 0.0]);
1086        assert_flushed_scalar_metric!(&input_metrics[1], &flushed_metrics[1], [bucket_ts(2) => 0.0]);
1087
1088        // Now try to insert a third counter, which should fail because we've reached the context limit.
1089        assert!(!state.insert(insert_ts(3), input_metrics[2].clone()));
1090
1091        // Flush the aggregation state, and observe that we only see the two original counters.
1092        let flushed_metrics = get_flushed_metrics(flush_ts(3), &mut state).await;
1093        assert_eq!(flushed_metrics.len(), 2);
1094        assert_flushed_scalar_metric!(&input_metrics[0], &flushed_metrics[0], [bucket_ts(3) => 0.0]);
1095        assert_flushed_scalar_metric!(&input_metrics[1], &flushed_metrics[1], [bucket_ts(3) => 0.0]);
1096
1097        // With a fourth flush interval, the two counters should now have expired, and thus be dropped and no longer
1098        // contributing to the context limit.
1099        let flushed_metrics = get_flushed_metrics(flush_ts(4), &mut state).await;
1100        assert_eq!(flushed_metrics.len(), 0);
1101
1102        // Now we should be able to insert the third counter, and it should be the only one present after flushing.
1103        assert!(state.insert(insert_ts(5), input_metrics[2].clone()));
1104
1105        let flushed_metrics = get_flushed_metrics(flush_ts(5), &mut state).await;
1106        assert_eq!(flushed_metrics.len(), 1);
1107        assert_flushed_scalar_metric!(&input_metrics[2], &flushed_metrics[0], [bucket_ts(5) => 3.0]);
1108    }
1109
1110    #[tokio::test]
1111    async fn zero_value_counters() {
1112        // We're testing that we properly emit and expire zero-value counters in all relevant scenarios.
1113        let mut state = AggregationState::new(
1114            BUCKET_WIDTH,
1115            10,
1116            COUNTER_EXPIRE,
1117            HistogramConfiguration::default(),
1118            Telemetry::noop(),
1119        );
1120
1121        // Create two unique counters, and insert both of them.
1122        let input_metrics = vec![Metric::counter("metric1", 1.0), Metric::counter("metric2", 2.0)];
1123
1124        assert!(state.insert(insert_ts(1), input_metrics[0].clone()));
1125        assert!(state.insert(insert_ts(1), input_metrics[1].clone()));
1126
1127        // Flush the aggregation state, and observe they're both present.
1128        let flushed_metrics = get_flushed_metrics(flush_ts(1), &mut state).await;
1129        assert_eq!(flushed_metrics.len(), 2);
1130        assert_flushed_scalar_metric!(&input_metrics[0], &flushed_metrics[0], [bucket_ts(1) => 1.0]);
1131        assert_flushed_scalar_metric!(&input_metrics[1], &flushed_metrics[1], [bucket_ts(1) => 2.0]);
1132
1133        // Perform our second flush, which should have them as zero-value counters.
1134        let flushed_metrics = get_flushed_metrics(flush_ts(2), &mut state).await;
1135        assert_eq!(flushed_metrics.len(), 2);
1136        assert_flushed_scalar_metric!(&input_metrics[0], &flushed_metrics[0], [bucket_ts(2) => 0.0]);
1137        assert_flushed_scalar_metric!(&input_metrics[1], &flushed_metrics[1], [bucket_ts(2) => 0.0]);
1138
1139        // Now, we'll pretend to skip a flush period and add updates to them again after that.
1140        assert!(state.insert(insert_ts(4), input_metrics[0].clone()));
1141        assert!(state.insert(insert_ts(4), input_metrics[1].clone()));
1142
1143        // Flush the aggregation state, and observe that we have two zero-value counters for the flush period we
1144        // skipped, but that we see them appear again in the fourth flush period.
1145        let flushed_metrics = get_flushed_metrics(flush_ts(4), &mut state).await;
1146        assert_eq!(flushed_metrics.len(), 2);
1147        assert_flushed_scalar_metric!(&input_metrics[0], &flushed_metrics[0], [bucket_ts(3) => 0.0, bucket_ts(4) => 1.0]);
1148        assert_flushed_scalar_metric!(&input_metrics[1], &flushed_metrics[1], [bucket_ts(3) => 0.0, bucket_ts(4) => 2.0]);
1149
1150        // Now we'll skip multiple flush periods and ensure that we emit zero-value counters up until the point they
1151        // expire. As our zero-value counter expiration is 20 seconds, this is two flush periods, so we skip by three
1152        // flush periods, and we should only see the counters emitted for the first two.
1153        let flushed_metrics = get_flushed_metrics(flush_ts(7), &mut state).await;
1154        assert_eq!(flushed_metrics.len(), 2);
1155        assert_flushed_scalar_metric!(&input_metrics[0], &flushed_metrics[0], [bucket_ts(5) => 0.0, bucket_ts(6) => 0.0]);
1156        assert_flushed_scalar_metric!(&input_metrics[1], &flushed_metrics[1], [bucket_ts(5) => 0.0, bucket_ts(6) => 0.0]);
1157    }
1158
1159    #[tokio::test]
1160    async fn merge_identical_timestamped_values_on_flush() {
1161        // We're testing that we properly emit and expire zero-value counters in all relevant scenarios.
1162        let mut state = AggregationState::new(
1163            BUCKET_WIDTH,
1164            10,
1165            COUNTER_EXPIRE,
1166            HistogramConfiguration::default(),
1167            Telemetry::noop(),
1168        );
1169
1170        // Create one multi-value counter, and insert it.
1171        let input_metric = Metric::counter("metric1", [1.0, 2.0, 3.0, 4.0, 5.0]);
1172
1173        assert!(state.insert(insert_ts(1), input_metric.clone()));
1174
1175        // Flush the aggregation state, and observe the metric is present _and_ that we've properly merged all of the
1176        // values within the same timestamp.
1177        let flushed_metrics = get_flushed_metrics(flush_ts(1), &mut state).await;
1178        assert_eq!(flushed_metrics.len(), 1);
1179        assert_flushed_scalar_metric!(&input_metric, &flushed_metrics[0], [bucket_ts(1) => 15.0]);
1180    }
1181
1182    #[tokio::test]
1183    async fn histogram_statistics() {
1184        // We're testing that we properly emit individual metrics (min, max, sum, etc) for a histogram.
1185        let hist_config = HistogramConfiguration::from_statistics(
1186            &[
1187                HistogramStatistic::Count,
1188                HistogramStatistic::Sum,
1189                HistogramStatistic::Percentile {
1190                    q: 0.5,
1191                    suffix: "p50".into(),
1192                },
1193            ],
1194            false,
1195            "".into(),
1196        );
1197        let mut state = AggregationState::new(BUCKET_WIDTH, 10, COUNTER_EXPIRE, hist_config, Telemetry::noop());
1198
1199        // Create one multi-value histogram and insert it.
1200        let input_metric = Metric::histogram("metric1", [1.0, 2.0, 3.0, 4.0, 5.0]);
1201        assert!(state.insert(insert_ts(1), input_metric.clone()));
1202
1203        // Flush the aggregation state, and observe that we've emitted all of the configured distribution statistics in
1204        // the form of three metrics: count, sum, and p50.
1205        let flushed_metrics = get_flushed_metrics(flush_ts(1), &mut state).await;
1206        assert_eq!(flushed_metrics.len(), 3);
1207
1208        // Create versions of the metric for each of the statistics we're expecting to emit. The values themselves don't
1209        // matter here, but we do need a `Metric` for it to compare the context to.
1210        let count_metric = Metric::rate("metric1.count", 0.0, Duration::from_secs(BUCKET_WIDTH_SECS));
1211        let sum_metric = Metric::gauge("metric1.sum", 0.0);
1212        let p50_metric = Metric::gauge("metric1.p50", 0.0);
1213
1214        // We use a less strict error ratio (how much the expected vs actual) for the percentile check, as we generally
1215        // expect the value to be somewhat off the exact value due to the lossy nature of `DDSketch`.
1216        assert_flushed_scalar_metric!(count_metric, &flushed_metrics[0], [bucket_ts(1) => 5.0]);
1217        assert_flushed_scalar_metric!(p50_metric, &flushed_metrics[1], [bucket_ts(1) => 3.0], error_ratio => 0.0025);
1218        assert_flushed_scalar_metric!(sum_metric, &flushed_metrics[2], [bucket_ts(1) => 15.0]);
1219    }
1220
1221    #[tokio::test]
1222    async fn distributions() {
1223        // We're testing that we pass through distributions untouched.
1224        let mut state = AggregationState::new(
1225            BUCKET_WIDTH,
1226            10,
1227            COUNTER_EXPIRE,
1228            HistogramConfiguration::default(),
1229            Telemetry::noop(),
1230        );
1231
1232        // Create one multi-value distribution, with server-side aggregation, and insert it.
1233        let values = [1.0, 2.0, 3.0, 4.0, 5.0];
1234        let input_metric = Metric::distribution("metric1", &values[..]);
1235
1236        assert!(state.insert(insert_ts(1), input_metric.clone()));
1237
1238        // Flush the aggregation state, and observe that we've emitted the original distribution.
1239        let flushed_metrics = get_flushed_metrics(flush_ts(1), &mut state).await;
1240        assert_eq!(flushed_metrics.len(), 1);
1241
1242        assert_flushed_distribution_metric!(&input_metric, &flushed_metrics[0], [bucket_ts(1) => &values[..]]);
1243    }
1244
1245    #[tokio::test]
1246    async fn histogram_copy_to_distribution() {
1247        let hist_config = HistogramConfiguration::from_statistics(
1248            &[
1249                HistogramStatistic::Count,
1250                HistogramStatistic::Sum,
1251                HistogramStatistic::Percentile {
1252                    q: 0.5,
1253                    suffix: "p50".into(),
1254                },
1255            ],
1256            true,
1257            "dist_prefix.".into(),
1258        );
1259        let mut state = AggregationState::new(BUCKET_WIDTH, 10, COUNTER_EXPIRE, hist_config, Telemetry::noop());
1260
1261        // Create one multi-value histogram and insert it.
1262        let values = [1.0, 2.0, 3.0, 4.0, 5.0];
1263        let input_metric = Metric::histogram("metric1", values);
1264        assert!(state.insert(insert_ts(1), input_metric.clone()));
1265
1266        // Flush the aggregation state, and observe that we've emitted all of the configured distribution statistics in
1267        // the form of three metrics: count, sum, and p50 as well as the additional metric from copying the histogram.
1268        let flushed_metrics = get_flushed_metrics(flush_ts(1), &mut state).await;
1269        assert_eq!(flushed_metrics.len(), 4);
1270
1271        // Create versions of the metric for each of the statistics we're expecting to emit. The values themselves don't
1272        // matter here, but we do need a `Metric` for it to compare the context to.
1273        let count_metric = Metric::rate("metric1.count", 0.0, Duration::from_secs(BUCKET_WIDTH_SECS));
1274        let sum_metric = Metric::gauge("metric1.sum", 0.0);
1275        let p50_metric = Metric::gauge("metric1.p50", 0.0);
1276        let expected_distribution = Metric::distribution("dist_prefix.metric1", &values[..]);
1277
1278        // We use a less strict error ratio (how much the expected vs actual) for the percentile check, as we generally
1279        // expect the value to be somewhat off the exact value due to the lossy nature of `DDSketch`.
1280        assert_flushed_distribution_metric!(expected_distribution, &flushed_metrics[0], [bucket_ts(1) => &values[..]]);
1281        assert_flushed_scalar_metric!(count_metric, &flushed_metrics[1], [bucket_ts(1) => 5.0]);
1282        assert_flushed_scalar_metric!(p50_metric, &flushed_metrics[2], [bucket_ts(1) => 3.0], error_ratio => 0.0025);
1283        assert_flushed_scalar_metric!(sum_metric, &flushed_metrics[3], [bucket_ts(1) => 15.0]);
1284    }
1285
1286    #[tokio::test]
1287    async fn nonaggregated_counters_to_rate() {
1288        let counter_value = 42.0;
1289        let bucket_width = BUCKET_WIDTH;
1290
1291        // Create a basic aggregation state.
1292        let mut state = AggregationState::new(
1293            bucket_width,
1294            10,
1295            COUNTER_EXPIRE,
1296            HistogramConfiguration::default(),
1297            Telemetry::noop(),
1298        );
1299
1300        // Create a simple non-aggregated counter, and insert it.
1301        let input_metric = Metric::counter("metric1", counter_value);
1302        assert!(state.insert(insert_ts(1), input_metric.clone()));
1303
1304        // Flush the aggregation state, and observe that we've emitted the expected counter and that it has the right
1305        // value, but specifically that it's a rate with an interval that matches our configured bucket width:
1306        let flushed_metrics = get_flushed_metrics(flush_ts(1), &mut state).await;
1307        assert_eq!(flushed_metrics.len(), 1);
1308        let flushed_metric = &flushed_metrics[0];
1309
1310        assert_flushed_scalar_metric!(&input_metric, flushed_metric, [bucket_ts(1) => counter_value]);
1311        assert_eq!(flushed_metric.values().as_str(), "rate");
1312    }
1313
1314    #[tokio::test]
1315    async fn preaggregated_counters_to_rate() {
1316        let counter_value = 42.0;
1317        let timestamp = 123456;
1318        let bucket_width = BUCKET_WIDTH;
1319
1320        // Create a basic passthrough batcher and forwarder.
1321        let mut batcher = PassthroughBatcher::new(Duration::from_nanos(1), bucket_width, Telemetry::noop()).await;
1322        let (dispatcher, mut dispatcher_receiver) = build_basic_dispatcher();
1323
1324        // Create a simple pre-aggregated counter, and batch it.
1325        let input_metric = Metric::counter("metric1", (timestamp, counter_value));
1326        batcher.push_metric(input_metric.clone(), &dispatcher).await;
1327
1328        // Flush the batcher, and observe that we've emitted the expected counter and that it has the right
1329        // value, but specifically that it's a rate with an interval that matches our configured bucket width:
1330        batcher.try_flush(&dispatcher).await;
1331
1332        let mut flushed_metrics = dispatcher_receiver.collect_next();
1333        assert_eq!(flushed_metrics.len(), 1);
1334        assert_eq!(
1335            Metric::rate("metric1", (timestamp, counter_value), bucket_width),
1336            flushed_metrics.remove(0)
1337        );
1338    }
1339
1340    #[tokio::test]
1341    async fn telemetry() {
1342        // TODO: We don't check `component_events_dropped_total` or `aggregate_passthrough_metrics_total` here as
1343        // they're set directly in the aggregate component future rather than `AggregationState`, which is harder to
1344        // drive overall and would have required even more boilerplate.
1345        //
1346        // Leaving that as a future improvement.
1347
1348        let recorder = TestRecorder::default();
1349        let _local = metrics::set_default_local_recorder(&recorder);
1350
1351        let builder = MetricsBuilder::default();
1352        let telemetry = Telemetry::new(&builder);
1353
1354        let mut state = AggregationState::new(
1355            BUCKET_WIDTH,
1356            2,
1357            COUNTER_EXPIRE,
1358            HistogramConfiguration::default(),
1359            telemetry,
1360        );
1361
1362        // Make sure our telemetry is registered at default values.
1363        assert_eq!(recorder.gauge("aggregate_active_contexts"), Some(0.0));
1364        assert_eq!(recorder.counter("aggregate_passthrough_metrics_total"), Some(0));
1365        assert_eq!(
1366            recorder.counter(("component_events_dropped_total", &[("intentional", "true")])),
1367            Some(0)
1368        );
1369        for metric_type in &["counter", "gauge", "rate", "set", "histogram", "distribution"] {
1370            assert_eq!(
1371                recorder.gauge(("aggregate_active_contexts_by_type", &[("metric_type", *metric_type)])),
1372                Some(0.0)
1373            );
1374        }
1375
1376        // Insert a counter with a non-timestamped value.
1377        assert!(state.insert(insert_ts(1), Metric::counter("metric1", 42.0)));
1378        assert_eq!(recorder.gauge("aggregate_active_contexts"), Some(1.0));
1379        assert_eq!(
1380            recorder.gauge(("aggregate_active_contexts_by_type", &[("metric_type", "counter")])),
1381            Some(1.0)
1382        );
1383        assert_eq!(recorder.counter("aggregate_passthrough_metrics_total"), Some(0));
1384
1385        // Insert a gauge with a timestamped value.
1386        assert!(state.insert(insert_ts(1), Metric::gauge("metric2", (insert_ts(1), 42.0))));
1387        assert_eq!(recorder.gauge("aggregate_active_contexts"), Some(2.0));
1388        assert_eq!(
1389            recorder.gauge(("aggregate_active_contexts_by_type", &[("metric_type", "gauge")])),
1390            Some(1.0)
1391        );
1392
1393        // We've reached our context limit at this point, so the next metric should not be inserted.
1394        assert!(!state.insert(insert_ts(1), Metric::counter("metric3", 42.0)));
1395        assert_eq!(recorder.gauge("aggregate_active_contexts"), Some(2.0));
1396        assert_eq!(
1397            recorder.gauge(("aggregate_active_contexts_by_type", &[("metric_type", "counter")])),
1398            Some(1.0)
1399        );
1400
1401        // Now let's flush the state which should flush the gauge entirely, reducing the context count, but not flush
1402        // the counter, since it'll be in zero-value mode.
1403        let _ = get_flushed_metrics(flush_ts(1), &mut state).await;
1404        assert_eq!(recorder.gauge("aggregate_active_contexts"), Some(1.0));
1405        assert_eq!(
1406            recorder.gauge(("aggregate_active_contexts_by_type", &[("metric_type", "counter")])),
1407            Some(1.0)
1408        );
1409        assert_eq!(
1410            recorder.gauge(("aggregate_active_contexts_by_type", &[("metric_type", "gauge")])),
1411            Some(0.0)
1412        );
1413    }
1414}