saluki_core/data_model/event/metric/value/
mod.rs

1mod iter;
2
3use std::{collections::HashSet, fmt, num::NonZeroU64, time::Duration};
4
5use ddsketch_agent::DDSketch;
6use ordered_float::OrderedFloat;
7use smallvec::SmallVec;
8
9mod sketch;
10pub use self::sketch::SketchPoints;
11
12mod histogram;
13pub use self::histogram::{Histogram, HistogramPoints, HistogramSummary};
14
15mod scalar;
16pub use self::scalar::ScalarPoints;
17
18mod set;
19pub use self::set::SetPoints;
20use super::SampleRate;
21
22#[derive(Clone, Debug, Eq, PartialEq)]
23struct TimestampedValue<T> {
24    timestamp: Option<NonZeroU64>,
25    value: T,
26}
27
28impl<T> From<T> for TimestampedValue<T> {
29    fn from(value: T) -> Self {
30        Self { timestamp: None, value }
31    }
32}
33
34impl<T> From<(u64, T)> for TimestampedValue<T> {
35    fn from((timestamp, value): (u64, T)) -> Self {
36        Self {
37            timestamp: NonZeroU64::new(timestamp),
38            value,
39        }
40    }
41}
42
43impl From<(Option<NonZeroU64>, f64)> for TimestampedValue<OrderedFloat<f64>> {
44    fn from((timestamp, value): (Option<NonZeroU64>, f64)) -> Self {
45        Self {
46            timestamp,
47            value: OrderedFloat(value),
48        }
49    }
50}
51
52impl<T> From<(Option<NonZeroU64>, T)> for TimestampedValue<T> {
53    fn from((timestamp, value): (Option<NonZeroU64>, T)) -> Self {
54        Self { timestamp, value }
55    }
56}
57
58#[derive(Clone, Debug, Eq, PartialEq)]
59struct TimestampedValues<T, const N: usize> {
60    values: SmallVec<[TimestampedValue<T>; N]>,
61}
62
63impl<T, const N: usize> TimestampedValues<T, N> {
64    fn all_timestamped(&self) -> bool {
65        self.values.iter().all(|value| value.timestamp.is_some())
66    }
67
68    fn any_timestamped(&self) -> bool {
69        self.values.iter().any(|value| value.timestamp.is_some())
70    }
71
72    fn drain_timestamped(&mut self) -> Self {
73        Self {
74            values: self.values.drain_filter(|value| value.timestamp.is_some()).collect(),
75        }
76    }
77
78    fn sort_by_timestamp(&mut self) {
79        self.values.sort_by_key(|value| value.timestamp);
80    }
81
82    fn split_at_timestamp(&mut self, timestamp: u64) -> Option<Self> {
83        if self.values.is_empty() {
84            return None;
85        }
86
87        // Fast path: since all values are sorted, we know that if the first value's timestamp is set, and greater than
88        // the split timestamp, nothing that comes after it could be split off either.
89        if let Some(first) = self.values.first() {
90            if let Some(first_ts) = first.timestamp {
91                if first_ts.get() > timestamp {
92                    return None;
93                }
94            }
95        }
96
97        let new_values = self
98            .values
99            .drain_filter(|value| value.timestamp.is_some_and(|ts| ts.get() <= timestamp));
100        Some(Self::from(new_values))
101    }
102
103    fn set_timestamp(&mut self, timestamp: u64) {
104        let timestamp = NonZeroU64::new(timestamp);
105        for value in &mut self.values {
106            value.timestamp = timestamp;
107        }
108    }
109
110    fn collapse_non_timestamped<F>(&mut self, timestamp: u64, merge: F)
111    where
112        F: Fn(&mut T, &mut T),
113    {
114        self.values.dedup_by(|a, b| {
115            if a.timestamp.is_none() && b.timestamp.is_none() {
116                merge(&mut b.value, &mut a.value);
117                true
118            } else {
119                false
120            }
121        });
122
123        // Since all values are ordered by timestamp, with non-timestamped values ordered first, we know that if there
124        // were any non-timestamped values that got collapsed, then the single remaining non-timestamped value will be
125        // the first value in the set.
126        if let Some(first) = self.values.first_mut() {
127            first.timestamp = first.timestamp.or(NonZeroU64::new(timestamp));
128        }
129    }
130}
131
132impl<T, const N: usize> Default for TimestampedValues<T, N> {
133    fn default() -> Self {
134        Self {
135            values: SmallVec::new(),
136        }
137    }
138}
139
140impl<T, const N: usize> From<TimestampedValue<T>> for TimestampedValues<T, N> {
141    fn from(value: TimestampedValue<T>) -> Self {
142        let mut values = SmallVec::new();
143        values.push(value);
144
145        Self { values }
146    }
147}
148
149impl<I, IT, T, const N: usize> From<I> for TimestampedValues<T, N>
150where
151    I: IntoIterator<Item = IT>,
152    IT: Into<TimestampedValue<T>>,
153{
154    fn from(value: I) -> Self {
155        let mut values = SmallVec::new();
156        values.extend(value.into_iter().map(Into::into));
157
158        let mut values = Self { values };
159        values.sort_by_timestamp();
160
161        values
162    }
163}
164
165/// The values of a metric.
166#[derive(Clone, Debug, Eq, PartialEq)]
167pub enum MetricValues {
168    /// A counter.
169    ///
170    /// Counters generally represent a monotonically increasing value, such as the number of requests received.
171    Counter(ScalarPoints),
172
173    /// A rate.
174    ///
175    /// Rates define the rate of change over a given interval.
176    ///
177    /// For example, a rate with a value of 15 and an interval of 10 seconds would indicate that the value increases by
178    /// 15 every 10 seconds, or 1.5 per second.
179    Rate(ScalarPoints, Duration),
180
181    /// A gauge.
182    ///
183    /// Gauges represent the latest value of a quantity, such as the current number of active connections. This value
184    /// can go up or down, but gauges do not track the individual changes to the value, only the latest value.
185    Gauge(ScalarPoints),
186
187    /// A set.
188    ///
189    /// Sets represent a collection of unique values, such as the unique IP addresses that have connected to a service.
190    Set(SetPoints),
191
192    /// A histogram.
193    ///
194    /// Histograms represent the distribution of a quantity, such as the response times for a service, with forced
195    /// client-side aggregation. Individual samples are stored locally, in full fidelity, and aggregate statistics
196    /// can be queried against the sample set, but the individual samples cannot be accessed.
197    Histogram(HistogramPoints),
198
199    /// A distribution.
200    ///
201    /// Distributions represent the distribution of a quantity, such as the response times for a service, in such a way
202    /// that server-side aggregation is possible. Individual samples are stored in a sketch, which supports being merged
203    /// with other sketches server-side to facilitate global aggregation.
204    ///
205    /// Like histograms, sketches also provide the ability to be queried for aggregate statistics but the individual
206    /// samples cannot be accessed.
207    Distribution(SketchPoints),
208}
209
210impl MetricValues {
211    /// Creates a set of counter values from the given value(s).
212    pub fn counter<V>(values: V) -> Self
213    where
214        V: Into<ScalarPoints>,
215    {
216        Self::Counter(values.into())
217    }
218
219    /// Creates a set of counter values from a fallible iterator of values, based on the given sample rate.
220    ///
221    /// If `sample_rate` is `None`, no values will be modified. Otherwise, each value will be scaled proportionally to
222    /// the quotient of `1 / sample_rate`.
223    pub fn counter_sampled_fallible<I, E>(iter: I, sample_rate: Option<SampleRate>) -> Result<Self, E>
224    where
225        I: Iterator<Item = Result<f64, E>>,
226    {
227        let sample_rate = sample_rate.unwrap_or(SampleRate::unsampled());
228
229        let mut points = ScalarPoints::new();
230        for value in iter {
231            let value = value?;
232            points.add_point(None, value * sample_rate.raw_weight());
233        }
234        Ok(Self::Counter(points))
235    }
236
237    /// Creates a set of gauge values from the given value(s).
238    pub fn gauge<V>(values: V) -> Self
239    where
240        V: Into<ScalarPoints>,
241    {
242        Self::Gauge(values.into())
243    }
244
245    /// Creates a set of gauge values from a fallible iterator of values.
246    pub fn gauge_fallible<I, E>(iter: I) -> Result<Self, E>
247    where
248        I: Iterator<Item = Result<f64, E>>,
249    {
250        let mut points = ScalarPoints::new();
251        for value in iter {
252            points.add_point(None, value?);
253        }
254        Ok(Self::Gauge(points))
255    }
256
257    /// Creates a set from the given values.
258    pub fn set<V>(values: V) -> Self
259    where
260        V: Into<SetPoints>,
261    {
262        Self::Set(values.into())
263    }
264
265    /// Creates a set of histogram values from the given value(s).
266    pub fn histogram<V>(values: V) -> Self
267    where
268        V: Into<HistogramPoints>,
269    {
270        Self::Histogram(values.into())
271    }
272
273    /// Creates a set of histogram values from a fallible iterator of values, based on the given sample rate.
274    ///
275    /// If `sample_rate` is `None`, only the values present in the iterator will be used. Otherwise, each value will be
276    /// inserted at a scaled count of `1 / sample_rate`.
277    pub fn histogram_sampled_fallible<I, E>(iter: I, sample_rate: Option<SampleRate>) -> Result<Self, E>
278    where
279        I: Iterator<Item = Result<f64, E>>,
280    {
281        let sample_rate = sample_rate.unwrap_or(SampleRate::unsampled());
282
283        let mut histogram = Histogram::default();
284        for value in iter {
285            let value = value?;
286            histogram.insert(value, sample_rate);
287        }
288        Ok(Self::Histogram(histogram.into()))
289    }
290
291    /// Creates a set of distribution values from the given value(s).
292    pub fn distribution<V>(values: V) -> Self
293    where
294        V: Into<SketchPoints>,
295    {
296        Self::Distribution(values.into())
297    }
298
299    /// Creates a set of distribution values from a fallible iterator of values, based on the given sample rate.
300    ///
301    /// If `sample_rate` is `None`, only the values present in the iterator will be used. Otherwise, each value will be
302    /// inserted at a scaled count of `1 / sample_rate`.
303    pub fn distribution_sampled_fallible<I, E>(iter: I, sample_rate: Option<SampleRate>) -> Result<Self, E>
304    where
305        I: Iterator<Item = Result<f64, E>>,
306    {
307        let sample_rate = sample_rate.unwrap_or(SampleRate::unsampled());
308        let capped_sample_rate = u32::try_from(sample_rate.weight()).unwrap_or(u32::MAX);
309
310        let mut sketch = DDSketch::default();
311        for value in iter {
312            let value = value?;
313            if capped_sample_rate == 1 {
314                sketch.insert(value);
315            } else {
316                sketch.insert_n(value, capped_sample_rate);
317            }
318        }
319        Ok(Self::Distribution(sketch.into()))
320    }
321
322    /// Creates a set of rate values from the given value(s) and interval.
323    pub fn rate<V>(values: V, interval: Duration) -> Self
324    where
325        V: Into<ScalarPoints>,
326    {
327        Self::Rate(values.into(), interval)
328    }
329
330    /// Returns `true` if this metric has no values.
331    pub fn is_empty(&self) -> bool {
332        match self {
333            Self::Counter(points) | Self::Rate(points, _) | Self::Gauge(points) => points.is_empty(),
334            Self::Set(points) => points.is_empty(),
335            Self::Histogram(points) => points.is_empty(),
336            Self::Distribution(points) => points.is_empty(),
337        }
338    }
339
340    /// Returns the number of values in this metric.
341    pub fn len(&self) -> usize {
342        match self {
343            Self::Counter(points) | Self::Rate(points, _) | Self::Gauge(points) => points.len(),
344            Self::Set(points) => points.len(),
345            Self::Histogram(points) => points.len(),
346            Self::Distribution(points) => points.len(),
347        }
348    }
349
350    /// Returns `true` if all values in this metric have a timestamp.
351    pub fn all_timestamped(&self) -> bool {
352        match self {
353            Self::Counter(points) | Self::Rate(points, _) | Self::Gauge(points) => points.inner().all_timestamped(),
354            Self::Set(points) => points.inner().all_timestamped(),
355            Self::Histogram(points) => points.inner().all_timestamped(),
356            Self::Distribution(points) => points.inner().all_timestamped(),
357        }
358    }
359
360    /// Returns `true` if any values in this metric have a timestamp.
361    pub fn any_timestamped(&self) -> bool {
362        match self {
363            Self::Counter(points) | Self::Rate(points, _) | Self::Gauge(points) => points.inner().any_timestamped(),
364            Self::Set(points) => points.inner().any_timestamped(),
365            Self::Histogram(points) => points.inner().any_timestamped(),
366            Self::Distribution(points) => points.inner().any_timestamped(),
367        }
368    }
369
370    /// Sets the timestamp for all values in this metric.
371    ///
372    /// This overrides all existing timestamps whether they are set or not. If `timestamp` is zero, all existing
373    /// timestamps will be cleared.
374    pub fn set_timestamp(&mut self, timestamp: u64) {
375        match self {
376            Self::Counter(points) | Self::Gauge(points) | Self::Rate(points, _) => {
377                points.inner_mut().set_timestamp(timestamp)
378            }
379            Self::Set(points) => points.inner_mut().set_timestamp(timestamp),
380            Self::Histogram(points) => points.inner_mut().set_timestamp(timestamp),
381            Self::Distribution(points) => points.inner_mut().set_timestamp(timestamp),
382        }
383    }
384
385    /// Splits all timestamped values into a new `MetricValues`, leaving the remaining values in `self`.
386    pub fn split_timestamped(&mut self) -> Self {
387        match self {
388            Self::Counter(points) => Self::Counter(points.drain_timestamped()),
389            Self::Rate(points, interval) => Self::Rate(points.drain_timestamped(), *interval),
390            Self::Gauge(points) => Self::Gauge(points.drain_timestamped()),
391            Self::Set(points) => Self::Set(points.drain_timestamped()),
392            Self::Histogram(points) => Self::Histogram(points.drain_timestamped()),
393            Self::Distribution(points) => Self::Distribution(points.drain_timestamped()),
394        }
395    }
396
397    /// Splits all values with a timestamp less than or equal to `timestamp` into a new `MetricValues`, leaving the
398    /// remaining values in `self`.
399    pub fn split_at_timestamp(&mut self, timestamp: u64) -> Option<Self> {
400        match self {
401            Self::Counter(points) => points.split_at_timestamp(timestamp).map(Self::Counter),
402            Self::Rate(points, interval) => points
403                .split_at_timestamp(timestamp)
404                .map(|points| Self::Rate(points, *interval)),
405            Self::Gauge(points) => points.split_at_timestamp(timestamp).map(Self::Gauge),
406            Self::Set(points) => points.split_at_timestamp(timestamp).map(Self::Set),
407            Self::Histogram(points) => points.split_at_timestamp(timestamp).map(Self::Histogram),
408            Self::Distribution(points) => points.split_at_timestamp(timestamp).map(Self::Distribution),
409        }
410    }
411
412    /// Collapses all non-timestamped values into a single value with the given timestamp.
413    ///
414    pub fn collapse_non_timestamped(&mut self, timestamp: u64) {
415        match self {
416            // Collapse by summing.
417            Self::Counter(points) => points.inner_mut().collapse_non_timestamped(timestamp, merge_scalar_sum),
418            Self::Rate(points, _) => points.inner_mut().collapse_non_timestamped(timestamp, merge_scalar_sum),
419            // Collapse by keeping the last value.
420            Self::Gauge(points) => points
421                .inner_mut()
422                .collapse_non_timestamped(timestamp, merge_scalar_latest),
423            // Collapse by merging.
424            Self::Set(points) => points.inner_mut().collapse_non_timestamped(timestamp, merge_set),
425            Self::Histogram(points) => points.inner_mut().collapse_non_timestamped(timestamp, merge_histogram),
426            Self::Distribution(sketches) => sketches.inner_mut().collapse_non_timestamped(timestamp, merge_sketch),
427        }
428    }
429
430    /// Merges another set of metric values into this one.
431    ///
432    /// If both `self` and `other` are the same metric type, their values will be merged appropriately. If the metric
433    /// types are different, or a specific precondition for the metric type is not met, the incoming values will override
434    /// the existing values instead.
435    ///
436    /// For rates, the interval of both rates must match to be merged. For gauges, the incoming value will override the
437    /// existing value.
438    pub fn merge(&mut self, other: Self) {
439        match (self, other) {
440            (Self::Counter(a), Self::Counter(b)) => a.merge(b, merge_scalar_sum),
441            (Self::Rate(a_points, a_interval), Self::Rate(b_points, b_interval)) => {
442                if *a_interval != b_interval {
443                    *a_points = b_points;
444                    *a_interval = b_interval;
445                } else {
446                    a_points.merge(b_points, merge_scalar_sum);
447                }
448            }
449            (Self::Gauge(a), Self::Gauge(b)) => a.merge(b, merge_scalar_latest),
450            (Self::Set(a), Self::Set(b)) => a.merge(b),
451            (Self::Histogram(a), Self::Histogram(b)) => a.merge(b),
452            (Self::Distribution(a), Self::Distribution(b)) => a.merge(b),
453
454            // Just override with whatever the incoming value is.
455            (dest, src) => drop(std::mem::replace(dest, src)),
456        }
457    }
458
459    /// Returns the metric value type as a string.
460    pub fn as_str(&self) -> &'static str {
461        match self {
462            Self::Counter(_) => "counter",
463            Self::Rate(_, _) => "rate",
464            Self::Gauge(_) => "gauge",
465            Self::Set(_) => "set",
466            Self::Histogram(_) => "histogram",
467            Self::Distribution(_) => "distribution",
468        }
469    }
470
471    /// Returns `true` if this metric is a serie.
472    pub fn is_serie(&self) -> bool {
473        matches!(
474            self,
475            Self::Counter(_) | Self::Rate(_, _) | Self::Gauge(_) | Self::Set(_) | Self::Histogram(_)
476        )
477    }
478
479    /// Returns `true` if this metric is a sketch.
480    pub fn is_sketch(&self) -> bool {
481        matches!(self, Self::Distribution(_))
482    }
483}
484
485impl fmt::Display for MetricValues {
486    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
487        match self {
488            Self::Counter(points) => write!(f, "{}", points),
489            Self::Rate(points, interval) => write!(f, "{} over {:?}", points, interval),
490            Self::Gauge(points) => write!(f, "{}", points),
491            Self::Set(points) => write!(f, "{}", points),
492            Self::Histogram(points) => write!(f, "{}", points),
493            Self::Distribution(points) => write!(f, "{}", points),
494        }
495    }
496}
497
498fn merge_scalar_sum(dest: &mut OrderedFloat<f64>, src: &mut OrderedFloat<f64>) {
499    *dest += *src;
500}
501
502fn merge_scalar_latest(dest: &mut OrderedFloat<f64>, src: &mut OrderedFloat<f64>) {
503    *dest = *src;
504}
505
506fn merge_set(dest: &mut HashSet<String>, src: &mut HashSet<String>) {
507    dest.extend(src.drain());
508}
509
510fn merge_histogram(dest: &mut Histogram, src: &mut Histogram) {
511    dest.merge(src);
512}
513
514fn merge_sketch(dest: &mut DDSketch, src: &mut DDSketch) {
515    dest.merge(src);
516}
517
518#[cfg(test)]
519mod tests {
520    use std::time::Duration;
521
522    use super::{HistogramPoints, MetricValues, ScalarPoints, SetPoints, SketchPoints};
523
524    #[test]
525    fn merge_counters() {
526        let cases = [
527            // Both A and B have single point with an identical timestamp, so the points should be merged.
528            (
529                ScalarPoints::from((1, 1.0)),
530                ScalarPoints::from((1, 2.0)),
531                ScalarPoints::from((1, 3.0)),
532            ),
533            // A has a single point with a timestamp, B has a single point without a timestamp, so both points should be kept.
534            (
535                ScalarPoints::from((1, 1.0)),
536                ScalarPoints::from(2.0),
537                ScalarPoints::from([(0, 2.0), (1, 1.0)]),
538            ),
539            // Both A and B have single point without a timestamp, so the points should be merged.
540            (
541                ScalarPoints::from(5.0),
542                ScalarPoints::from(6.0),
543                ScalarPoints::from(11.0),
544            ),
545        ];
546
547        for (a, b, expected) in cases {
548            let mut merged = MetricValues::Counter(a.clone());
549            merged.merge(MetricValues::Counter(b.clone()));
550
551            assert_eq!(
552                merged,
553                MetricValues::Counter(expected.clone()),
554                "merged {} with {}, expected {} but got {}",
555                a,
556                b,
557                expected,
558                merged
559            );
560        }
561    }
562
563    #[test]
564    fn merge_gauges() {
565        let cases = [
566            // Both A and B have single point with an identical timestamp, so B's point value should override A's point value.
567            (
568                ScalarPoints::from((1, 1.0)),
569                ScalarPoints::from((1, 2.0)),
570                ScalarPoints::from((1, 2.0)),
571            ),
572            // A has a single point with a timestamp, B has a single point without a timestamp, so both points should be kept.
573            (
574                ScalarPoints::from((1, 1.0)),
575                ScalarPoints::from(2.0),
576                ScalarPoints::from([(0, 2.0), (1, 1.0)]),
577            ),
578            // Both A and B have single point without a timestamp, so B's point value should override A's point value.
579            (
580                ScalarPoints::from(5.0),
581                ScalarPoints::from(6.0),
582                ScalarPoints::from(6.0),
583            ),
584        ];
585
586        for (a, b, expected) in cases {
587            let mut merged = MetricValues::Gauge(a.clone());
588            merged.merge(MetricValues::Gauge(b.clone()));
589
590            assert_eq!(
591                merged,
592                MetricValues::Gauge(expected.clone()),
593                "merged {} with {}, expected {} but got {}",
594                a,
595                b,
596                expected,
597                merged
598            );
599        }
600    }
601
602    #[test]
603    fn merge_rates() {
604        const FIVE_SECS: Duration = Duration::from_secs(5);
605        const TEN_SECS: Duration = Duration::from_secs(10);
606
607        let cases = [
608            // Both A and B have single point with an identical timestamp, and identical intervals, so the points should be merged.
609            (
610                ScalarPoints::from((1, 1.0)),
611                FIVE_SECS,
612                ScalarPoints::from((1, 2.0)),
613                FIVE_SECS,
614                ScalarPoints::from((1, 3.0)),
615                FIVE_SECS,
616            ),
617            // A has a single point with a timestamp, B has a single point without a timestamp, and identical intervals, so both points should be kept.
618            (
619                ScalarPoints::from((1, 1.0)),
620                FIVE_SECS,
621                ScalarPoints::from(2.0),
622                FIVE_SECS,
623                ScalarPoints::from([(0, 2.0), (1, 1.0)]),
624                FIVE_SECS,
625            ),
626            // Both A and B have single point without a timestamp, and identical intervals, so the points should be merged.
627            (
628                ScalarPoints::from(5.0),
629                FIVE_SECS,
630                ScalarPoints::from(6.0),
631                FIVE_SECS,
632                ScalarPoints::from(11.0),
633                FIVE_SECS,
634            ),
635            // We do three permutations here -- identical timestamped point, differing timestamped point,
636            // non-timestamped point -- but always with differing intervals, which should lead to B overriding A
637            // entirely.
638            (
639                ScalarPoints::from((1, 1.0)),
640                FIVE_SECS,
641                ScalarPoints::from((1, 2.0)),
642                TEN_SECS,
643                ScalarPoints::from((1, 2.0)),
644                TEN_SECS,
645            ),
646            (
647                ScalarPoints::from((1, 3.0)),
648                FIVE_SECS,
649                ScalarPoints::from(4.0),
650                TEN_SECS,
651                ScalarPoints::from((0, 4.0)),
652                TEN_SECS,
653            ),
654            (
655                ScalarPoints::from(7.0),
656                TEN_SECS,
657                ScalarPoints::from(9.0),
658                FIVE_SECS,
659                ScalarPoints::from(9.0),
660                FIVE_SECS,
661            ),
662        ];
663
664        for (a, a_interval, b, b_interval, expected, expected_interval) in cases {
665            let mut merged = MetricValues::Rate(a.clone(), a_interval);
666            merged.merge(MetricValues::Rate(b.clone(), b_interval));
667
668            assert_eq!(
669                merged,
670                MetricValues::Rate(expected.clone(), expected_interval),
671                "merged {}/{:?} with {}/{:?}, expected {} but got {}",
672                a,
673                a_interval,
674                b,
675                b_interval,
676                expected,
677                merged
678            );
679        }
680    }
681
682    #[test]
683    fn merge_sets() {
684        let cases = [
685            // Both A and B have single point with an identical timestamp, so the values should be merged.
686            (
687                SetPoints::from((1, "foo")),
688                SetPoints::from((1, "bar")),
689                SetPoints::from((1, ["foo", "bar"])),
690            ),
691            // A has a single point with a timestamp, B has a single point without a timestamp, so both points should be
692            // kept.
693            (
694                SetPoints::from((1, "foo")),
695                SetPoints::from("bar"),
696                SetPoints::from([(0, "bar"), (1, "foo")]),
697            ),
698            // Both A and B have single point without a timestamp, so the values should be merged.
699            (
700                SetPoints::from("foo"),
701                SetPoints::from("bar"),
702                SetPoints::from(["foo", "bar"]),
703            ),
704        ];
705
706        for (a, b, expected) in cases {
707            let mut merged = MetricValues::Set(a.clone());
708            merged.merge(MetricValues::Set(b.clone()));
709
710            assert_eq!(
711                merged,
712                MetricValues::Set(expected.clone()),
713                "merged {} with {}, expected {} but got {}",
714                a,
715                b,
716                expected,
717                merged
718            );
719        }
720    }
721
722    #[test]
723    fn merge_histograms() {
724        let cases = [
725            // Both A and B have single point with an identical timestamp, so the samples should be merged.
726            (
727                HistogramPoints::from((1, 1.0)),
728                HistogramPoints::from((1, 2.0)),
729                HistogramPoints::from((1, [1.0, 2.0])),
730            ),
731            // A has a single point with a timestamp, B has a single point without a timestamp, so both points should be kept.
732            (
733                HistogramPoints::from((1, 1.0)),
734                HistogramPoints::from(2.0),
735                HistogramPoints::from([(0, 2.0), (1, 1.0)]),
736            ),
737            // Both A and B have single point without a timestamp, so the samples should be merged.
738            (
739                HistogramPoints::from(5.0),
740                HistogramPoints::from(6.0),
741                HistogramPoints::from([5.0, 6.0]),
742            ),
743        ];
744
745        for (a, b, expected) in cases {
746            let mut merged = MetricValues::Histogram(a.clone());
747            merged.merge(MetricValues::Histogram(b.clone()));
748
749            assert_eq!(
750                merged,
751                MetricValues::Histogram(expected.clone()),
752                "merged {} with {}, expected {} but got {}",
753                a,
754                b,
755                expected,
756                merged
757            );
758        }
759    }
760
761    #[test]
762    fn merge_distributions() {
763        let cases = [
764            // Both A and B have single point with an identical timestamp, so the sketches should be merged.
765            (
766                SketchPoints::from((1, 1.0)),
767                SketchPoints::from((1, 2.0)),
768                SketchPoints::from((1, [1.0, 2.0])),
769            ),
770            // A has a single point with a timestamp, B has a single point without a timestamp, so both points should be kept.
771            (
772                SketchPoints::from((1, 1.0)),
773                SketchPoints::from(2.0),
774                SketchPoints::from([(0, 2.0), (1, 1.0)]),
775            ),
776            // Both A and B have single point without a timestamp, so the sketches should be merged.
777            (
778                SketchPoints::from(5.0),
779                SketchPoints::from(6.0),
780                SketchPoints::from([5.0, 6.0]),
781            ),
782        ];
783
784        for (a, b, expected) in cases {
785            let mut merged = MetricValues::Distribution(a.clone());
786            merged.merge(MetricValues::Distribution(b.clone()));
787
788            assert_eq!(
789                merged,
790                MetricValues::Distribution(expected.clone()),
791                "merged {} with {}, expected {} but got {}",
792                a,
793                b,
794                expected,
795                merged
796            );
797        }
798    }
799}