saluki_core/data_model/event/metric/value/
mod.rs

1mod iter;
2
3use std::{collections::HashSet, fmt, num::NonZeroU64, time::Duration};
4
5use ddsketch::DDSketch;
6use ordered_float::OrderedFloat;
7use smallvec::SmallVec;
8
9mod sketch;
10pub use self::sketch::SketchPoints;
11
12mod histogram;
13pub use self::histogram::{Histogram, HistogramPoints, HistogramSummary};
14
15mod scalar;
16pub use self::scalar::ScalarPoints;
17
18mod set;
19pub use self::set::SetPoints;
20use super::SampleRate;
21
22#[derive(Clone, Debug, Eq, PartialEq)]
23struct TimestampedValue<T> {
24    timestamp: Option<NonZeroU64>,
25    value: T,
26}
27
28impl<T> From<T> for TimestampedValue<T> {
29    fn from(value: T) -> Self {
30        Self { timestamp: None, value }
31    }
32}
33
34impl<T> From<(u64, T)> for TimestampedValue<T> {
35    fn from((timestamp, value): (u64, T)) -> Self {
36        Self {
37            timestamp: NonZeroU64::new(timestamp),
38            value,
39        }
40    }
41}
42
43impl From<(Option<NonZeroU64>, f64)> for TimestampedValue<OrderedFloat<f64>> {
44    fn from((timestamp, value): (Option<NonZeroU64>, f64)) -> Self {
45        Self {
46            timestamp,
47            value: OrderedFloat(value),
48        }
49    }
50}
51
52impl<T> From<(Option<NonZeroU64>, T)> for TimestampedValue<T> {
53    fn from((timestamp, value): (Option<NonZeroU64>, T)) -> Self {
54        Self { timestamp, value }
55    }
56}
57
58#[derive(Clone, Debug, Eq, PartialEq)]
59struct TimestampedValues<T, const N: usize> {
60    values: SmallVec<[TimestampedValue<T>; N]>,
61}
62
63impl<T, const N: usize> TimestampedValues<T, N> {
64    fn all_timestamped(&self) -> bool {
65        self.values.iter().all(|value| value.timestamp.is_some())
66    }
67
68    fn any_timestamped(&self) -> bool {
69        self.values.iter().any(|value| value.timestamp.is_some())
70    }
71
72    fn drain_timestamped(&mut self) -> Self {
73        Self {
74            values: self.values.drain_filter(|value| value.timestamp.is_some()).collect(),
75        }
76    }
77
78    fn sort_by_timestamp(&mut self) {
79        self.values.sort_by_key(|value| value.timestamp);
80    }
81
82    fn split_at_timestamp(&mut self, timestamp: u64) -> Option<Self> {
83        if self.values.is_empty() {
84            return None;
85        }
86
87        // Fast path: since all values are sorted, we know that if the first value's timestamp is set, and greater than
88        // the split timestamp, nothing that comes after it could be split off either.
89        if let Some(first) = self.values.first() {
90            if let Some(first_ts) = first.timestamp {
91                if first_ts.get() > timestamp {
92                    return None;
93                }
94            }
95        }
96
97        let new_values = self
98            .values
99            .drain_filter(|value| value.timestamp.is_some_and(|ts| ts.get() <= timestamp));
100        Some(Self::from(new_values))
101    }
102
103    fn set_timestamp(&mut self, timestamp: u64) {
104        let timestamp = NonZeroU64::new(timestamp);
105        for value in &mut self.values {
106            value.timestamp = timestamp;
107        }
108    }
109
110    fn collapse_non_timestamped<F>(&mut self, timestamp: u64, merge: F)
111    where
112        F: Fn(&mut T, &mut T),
113    {
114        self.values.dedup_by(|a, b| {
115            if a.timestamp.is_none() && b.timestamp.is_none() {
116                merge(&mut b.value, &mut a.value);
117                true
118            } else {
119                false
120            }
121        });
122
123        // Since all values are ordered by timestamp, with non-timestamped values ordered first, we know that if there
124        // were any non-timestamped values that got collapsed, then the single remaining non-timestamped value will be
125        // the first value in the set.
126        if let Some(first) = self.values.first_mut() {
127            first.timestamp = first.timestamp.or(NonZeroU64::new(timestamp));
128        }
129    }
130}
131
132impl<T, const N: usize> Default for TimestampedValues<T, N> {
133    fn default() -> Self {
134        Self {
135            values: SmallVec::new(),
136        }
137    }
138}
139
140impl<T, const N: usize> From<TimestampedValue<T>> for TimestampedValues<T, N> {
141    fn from(value: TimestampedValue<T>) -> Self {
142        let mut values = SmallVec::new();
143        values.push(value);
144
145        Self { values }
146    }
147}
148
149impl<I, IT, T, const N: usize> From<I> for TimestampedValues<T, N>
150where
151    I: IntoIterator<Item = IT>,
152    IT: Into<TimestampedValue<T>>,
153{
154    fn from(value: I) -> Self {
155        let mut values = SmallVec::new();
156        values.extend(value.into_iter().map(Into::into));
157
158        let mut values = Self { values };
159        values.sort_by_timestamp();
160
161        values
162    }
163}
164
165/// The values of a metric.
166#[derive(Clone, Debug, Eq, PartialEq)]
167pub enum MetricValues {
168    /// A counter.
169    ///
170    /// Counters generally represent a monotonically increasing value, such as the number of requests received.
171    Counter(ScalarPoints),
172
173    /// A rate.
174    ///
175    /// Rates define the rate of change over a given interval.
176    ///
177    /// For example, a rate with a value of 15 and an interval of 10 seconds would indicate that the value increases by
178    /// 15 every 10 seconds, or 1.5 per second.
179    Rate(ScalarPoints, Duration),
180
181    /// A gauge.
182    ///
183    /// Gauges represent the latest value of a quantity, such as the current number of active connections. This value
184    /// can go up or down, but gauges do not track the individual changes to the value, only the latest value.
185    Gauge(ScalarPoints),
186
187    /// A set.
188    ///
189    /// Sets represent a collection of unique values, such as the unique IP addresses that have connected to a service.
190    Set(SetPoints),
191
192    /// A histogram.
193    ///
194    /// Histograms represent the distribution of a quantity, such as the response times for a service, with forced
195    /// client-side aggregation. Individual samples are stored locally, in full fidelity, and aggregate statistics
196    /// can be queried against the sample set, but the individual samples cannot be accessed.
197    Histogram(HistogramPoints),
198
199    /// A distribution.
200    ///
201    /// Distributions represent the distribution of a quantity, such as the response times for a service, in such a way
202    /// that server-side aggregation is possible. Individual samples are stored in a sketch, which supports being merged
203    /// with other sketches server-side to facilitate global aggregation.
204    ///
205    /// Like histograms, sketches also provide the ability to be queried for aggregate statistics but the individual
206    /// samples cannot be accessed.
207    Distribution(SketchPoints),
208}
209
210impl MetricValues {
211    /// Creates a set of counter values from the given value(s).
212    pub fn counter<V>(values: V) -> Self
213    where
214        V: Into<ScalarPoints>,
215    {
216        Self::Counter(values.into())
217    }
218
219    /// Creates a set of counter values from a fallible iterator of values, based on the given sample rate.
220    ///
221    /// If `sample_rate` is `None`, no values will be modified. Otherwise, each value will be scaled proportionally to
222    /// the quotient of `1 / sample_rate`.
223    pub fn counter_sampled_fallible<I, E>(iter: I, sample_rate: Option<SampleRate>) -> Result<Self, E>
224    where
225        I: Iterator<Item = Result<f64, E>>,
226    {
227        let sample_rate = sample_rate.unwrap_or(SampleRate::unsampled());
228
229        let mut points = ScalarPoints::new();
230        for value in iter {
231            let value = value?;
232            points.add_point(None, value * sample_rate.raw_weight());
233        }
234        Ok(Self::Counter(points))
235    }
236
237    /// Creates a set of gauge values from the given value(s).
238    pub fn gauge<V>(values: V) -> Self
239    where
240        V: Into<ScalarPoints>,
241    {
242        Self::Gauge(values.into())
243    }
244
245    /// Creates a set of gauge values from a fallible iterator of values.
246    pub fn gauge_fallible<I, E>(iter: I) -> Result<Self, E>
247    where
248        I: Iterator<Item = Result<f64, E>>,
249    {
250        let mut points = ScalarPoints::new();
251        for value in iter {
252            points.add_point(None, value?);
253        }
254        Ok(Self::Gauge(points))
255    }
256
257    /// Creates a set from the given values.
258    pub fn set<V>(values: V) -> Self
259    where
260        V: Into<SetPoints>,
261    {
262        Self::Set(values.into())
263    }
264
265    /// Creates a set of histogram values from the given value(s).
266    pub fn histogram<V>(values: V) -> Self
267    where
268        V: Into<HistogramPoints>,
269    {
270        Self::Histogram(values.into())
271    }
272
273    /// Creates a set of histogram values from a fallible iterator of values, based on the given sample rate.
274    ///
275    /// If `sample_rate` is `None`, only the values present in the iterator will be used. Otherwise, each value will be
276    /// inserted at a scaled count of `1 / sample_rate`.
277    pub fn histogram_sampled_fallible<I, E>(iter: I, sample_rate: Option<SampleRate>) -> Result<Self, E>
278    where
279        I: Iterator<Item = Result<f64, E>>,
280    {
281        let sample_rate = sample_rate.unwrap_or(SampleRate::unsampled());
282
283        let mut histogram = Histogram::default();
284        for value in iter {
285            let value = value?;
286            histogram.insert(value, sample_rate);
287        }
288        Ok(Self::Histogram(histogram.into()))
289    }
290
291    /// Creates a set of distribution values from the given value(s).
292    pub fn distribution<V>(values: V) -> Self
293    where
294        V: Into<SketchPoints>,
295    {
296        Self::Distribution(values.into())
297    }
298
299    /// Creates a set of distribution values from a fallible iterator of values, based on the given sample rate.
300    ///
301    /// If `sample_rate` is `None`, only the values present in the iterator will be used. Otherwise, each value will be
302    /// inserted at a scaled count of `1 / sample_rate`.
303    pub fn distribution_sampled_fallible<I, E>(iter: I, sample_rate: Option<SampleRate>) -> Result<Self, E>
304    where
305        I: Iterator<Item = Result<f64, E>>,
306    {
307        let sample_rate = sample_rate.unwrap_or(SampleRate::unsampled());
308
309        let mut sketch = DDSketch::default();
310        for value in iter {
311            let value = value?;
312            sketch.insert_n(value, sample_rate.weight());
313        }
314        Ok(Self::Distribution(sketch.into()))
315    }
316
317    /// Creates a set of rate values from the given value(s) and interval.
318    pub fn rate<V>(values: V, interval: Duration) -> Self
319    where
320        V: Into<ScalarPoints>,
321    {
322        Self::Rate(values.into(), interval)
323    }
324
325    /// Returns `true` if this metric has no values.
326    pub fn is_empty(&self) -> bool {
327        match self {
328            Self::Counter(points) | Self::Rate(points, _) | Self::Gauge(points) => points.is_empty(),
329            Self::Set(points) => points.is_empty(),
330            Self::Histogram(points) => points.is_empty(),
331            Self::Distribution(points) => points.is_empty(),
332        }
333    }
334
335    /// Returns the number of values in this metric.
336    pub fn len(&self) -> usize {
337        match self {
338            Self::Counter(points) | Self::Rate(points, _) | Self::Gauge(points) => points.len(),
339            Self::Set(points) => points.len(),
340            Self::Histogram(points) => points.len(),
341            Self::Distribution(points) => points.len(),
342        }
343    }
344
345    /// Returns `true` if all values in this metric have a timestamp.
346    pub fn all_timestamped(&self) -> bool {
347        match self {
348            Self::Counter(points) | Self::Rate(points, _) | Self::Gauge(points) => points.inner().all_timestamped(),
349            Self::Set(points) => points.inner().all_timestamped(),
350            Self::Histogram(points) => points.inner().all_timestamped(),
351            Self::Distribution(points) => points.inner().all_timestamped(),
352        }
353    }
354
355    /// Returns `true` if any values in this metric have a timestamp.
356    pub fn any_timestamped(&self) -> bool {
357        match self {
358            Self::Counter(points) | Self::Rate(points, _) | Self::Gauge(points) => points.inner().any_timestamped(),
359            Self::Set(points) => points.inner().any_timestamped(),
360            Self::Histogram(points) => points.inner().any_timestamped(),
361            Self::Distribution(points) => points.inner().any_timestamped(),
362        }
363    }
364
365    /// Sets the timestamp for all values in this metric.
366    ///
367    /// This overrides all existing timestamps whether they are set or not. If `timestamp` is zero, all existing
368    /// timestamps will be cleared.
369    pub fn set_timestamp(&mut self, timestamp: u64) {
370        match self {
371            Self::Counter(points) | Self::Gauge(points) | Self::Rate(points, _) => {
372                points.inner_mut().set_timestamp(timestamp)
373            }
374            Self::Set(points) => points.inner_mut().set_timestamp(timestamp),
375            Self::Histogram(points) => points.inner_mut().set_timestamp(timestamp),
376            Self::Distribution(points) => points.inner_mut().set_timestamp(timestamp),
377        }
378    }
379
380    /// Splits all timestamped values into a new `MetricValues`, leaving the remaining values in `self`.
381    pub fn split_timestamped(&mut self) -> Self {
382        match self {
383            Self::Counter(points) => Self::Counter(points.drain_timestamped()),
384            Self::Rate(points, interval) => Self::Rate(points.drain_timestamped(), *interval),
385            Self::Gauge(points) => Self::Gauge(points.drain_timestamped()),
386            Self::Set(points) => Self::Set(points.drain_timestamped()),
387            Self::Histogram(points) => Self::Histogram(points.drain_timestamped()),
388            Self::Distribution(points) => Self::Distribution(points.drain_timestamped()),
389        }
390    }
391
392    /// Splits all values with a timestamp less than or equal to `timestamp` into a new `MetricValues`, leaving the
393    /// remaining values in `self`.
394    pub fn split_at_timestamp(&mut self, timestamp: u64) -> Option<Self> {
395        match self {
396            Self::Counter(points) => points.split_at_timestamp(timestamp).map(Self::Counter),
397            Self::Rate(points, interval) => points
398                .split_at_timestamp(timestamp)
399                .map(|points| Self::Rate(points, *interval)),
400            Self::Gauge(points) => points.split_at_timestamp(timestamp).map(Self::Gauge),
401            Self::Set(points) => points.split_at_timestamp(timestamp).map(Self::Set),
402            Self::Histogram(points) => points.split_at_timestamp(timestamp).map(Self::Histogram),
403            Self::Distribution(points) => points.split_at_timestamp(timestamp).map(Self::Distribution),
404        }
405    }
406
407    /// Collapses all non-timestamped values into a single value with the given timestamp.
408    ///
409    pub fn collapse_non_timestamped(&mut self, timestamp: u64) {
410        match self {
411            // Collapse by summing.
412            Self::Counter(points) => points.inner_mut().collapse_non_timestamped(timestamp, merge_scalar_sum),
413            Self::Rate(points, _) => points.inner_mut().collapse_non_timestamped(timestamp, merge_scalar_sum),
414            // Collapse by keeping the last value.
415            Self::Gauge(points) => points
416                .inner_mut()
417                .collapse_non_timestamped(timestamp, merge_scalar_latest),
418            // Collapse by merging.
419            Self::Set(points) => points.inner_mut().collapse_non_timestamped(timestamp, merge_set),
420            Self::Histogram(points) => points.inner_mut().collapse_non_timestamped(timestamp, merge_histogram),
421            Self::Distribution(sketches) => sketches.inner_mut().collapse_non_timestamped(timestamp, merge_sketch),
422        }
423    }
424
425    /// Merges another set of metric values into this one.
426    ///
427    /// If both `self` and `other` are the same metric type, their values will be merged appropriately. If the metric
428    /// types are different, or a specific precondition for the metric type is not met, the incoming values will override
429    /// the existing values instead.
430    ///
431    /// For rates, the interval of both rates must match to be merged. For gauges, the incoming value will override the
432    /// existing value.
433    pub fn merge(&mut self, other: Self) {
434        match (self, other) {
435            (Self::Counter(a), Self::Counter(b)) => a.merge(b, merge_scalar_sum),
436            (Self::Rate(a_points, a_interval), Self::Rate(b_points, b_interval)) => {
437                if *a_interval != b_interval {
438                    *a_points = b_points;
439                    *a_interval = b_interval;
440                } else {
441                    a_points.merge(b_points, merge_scalar_sum);
442                }
443            }
444            (Self::Gauge(a), Self::Gauge(b)) => a.merge(b, merge_scalar_latest),
445            (Self::Set(a), Self::Set(b)) => a.merge(b),
446            (Self::Histogram(a), Self::Histogram(b)) => a.merge(b),
447            (Self::Distribution(a), Self::Distribution(b)) => a.merge(b),
448
449            // Just override with whatever the incoming value is.
450            (dest, src) => drop(std::mem::replace(dest, src)),
451        }
452    }
453
454    /// Returns the metric value type as a string.
455    pub fn as_str(&self) -> &'static str {
456        match self {
457            Self::Counter(_) => "counter",
458            Self::Rate(_, _) => "rate",
459            Self::Gauge(_) => "gauge",
460            Self::Set(_) => "set",
461            Self::Histogram(_) => "histogram",
462            Self::Distribution(_) => "distribution",
463        }
464    }
465
466    /// Returns `true` if this metric is a serie.
467    pub fn is_serie(&self) -> bool {
468        matches!(
469            self,
470            Self::Counter(_) | Self::Rate(_, _) | Self::Gauge(_) | Self::Set(_) | Self::Histogram(_)
471        )
472    }
473
474    /// Returns `true` if this metric is a sketch.
475    pub fn is_sketch(&self) -> bool {
476        matches!(self, Self::Distribution(_))
477    }
478}
479
480impl fmt::Display for MetricValues {
481    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
482        match self {
483            Self::Counter(points) => write!(f, "{}", points),
484            Self::Rate(points, interval) => write!(f, "{} over {:?}", points, interval),
485            Self::Gauge(points) => write!(f, "{}", points),
486            Self::Set(points) => write!(f, "{}", points),
487            Self::Histogram(points) => write!(f, "{}", points),
488            Self::Distribution(points) => write!(f, "{}", points),
489        }
490    }
491}
492
493fn merge_scalar_sum(dest: &mut OrderedFloat<f64>, src: &mut OrderedFloat<f64>) {
494    *dest += *src;
495}
496
497fn merge_scalar_latest(dest: &mut OrderedFloat<f64>, src: &mut OrderedFloat<f64>) {
498    *dest = *src;
499}
500
501fn merge_set(dest: &mut HashSet<String>, src: &mut HashSet<String>) {
502    dest.extend(src.drain());
503}
504
505fn merge_histogram(dest: &mut Histogram, src: &mut Histogram) {
506    dest.merge(src);
507}
508
509fn merge_sketch(dest: &mut DDSketch, src: &mut DDSketch) {
510    dest.merge(src);
511}
512
513#[cfg(test)]
514mod tests {
515    use std::time::Duration;
516
517    use super::{HistogramPoints, MetricValues, ScalarPoints, SetPoints, SketchPoints};
518
519    #[test]
520    fn merge_counters() {
521        let cases = [
522            // Both A and B have single point with an identical timestamp, so the points should be merged.
523            (
524                ScalarPoints::from((1, 1.0)),
525                ScalarPoints::from((1, 2.0)),
526                ScalarPoints::from((1, 3.0)),
527            ),
528            // A has a single point with a timestamp, B has a single point without a timestamp, so both points should be kept.
529            (
530                ScalarPoints::from((1, 1.0)),
531                ScalarPoints::from(2.0),
532                ScalarPoints::from([(0, 2.0), (1, 1.0)]),
533            ),
534            // Both A and B have single point without a timestamp, so the points should be merged.
535            (
536                ScalarPoints::from(5.0),
537                ScalarPoints::from(6.0),
538                ScalarPoints::from(11.0),
539            ),
540        ];
541
542        for (a, b, expected) in cases {
543            let mut merged = MetricValues::Counter(a.clone());
544            merged.merge(MetricValues::Counter(b.clone()));
545
546            assert_eq!(
547                merged,
548                MetricValues::Counter(expected.clone()),
549                "merged {} with {}, expected {} but got {}",
550                a,
551                b,
552                expected,
553                merged
554            );
555        }
556    }
557
558    #[test]
559    fn merge_gauges() {
560        let cases = [
561            // Both A and B have single point with an identical timestamp, so B's point value should override A's point value.
562            (
563                ScalarPoints::from((1, 1.0)),
564                ScalarPoints::from((1, 2.0)),
565                ScalarPoints::from((1, 2.0)),
566            ),
567            // A has a single point with a timestamp, B has a single point without a timestamp, so both points should be kept.
568            (
569                ScalarPoints::from((1, 1.0)),
570                ScalarPoints::from(2.0),
571                ScalarPoints::from([(0, 2.0), (1, 1.0)]),
572            ),
573            // Both A and B have single point without a timestamp, so B's point value should override A's point value.
574            (
575                ScalarPoints::from(5.0),
576                ScalarPoints::from(6.0),
577                ScalarPoints::from(6.0),
578            ),
579        ];
580
581        for (a, b, expected) in cases {
582            let mut merged = MetricValues::Gauge(a.clone());
583            merged.merge(MetricValues::Gauge(b.clone()));
584
585            assert_eq!(
586                merged,
587                MetricValues::Gauge(expected.clone()),
588                "merged {} with {}, expected {} but got {}",
589                a,
590                b,
591                expected,
592                merged
593            );
594        }
595    }
596
597    #[test]
598    fn merge_rates() {
599        const FIVE_SECS: Duration = Duration::from_secs(5);
600        const TEN_SECS: Duration = Duration::from_secs(10);
601
602        let cases = [
603            // Both A and B have single point with an identical timestamp, and identical intervals, so the points should be merged.
604            (
605                ScalarPoints::from((1, 1.0)),
606                FIVE_SECS,
607                ScalarPoints::from((1, 2.0)),
608                FIVE_SECS,
609                ScalarPoints::from((1, 3.0)),
610                FIVE_SECS,
611            ),
612            // A has a single point with a timestamp, B has a single point without a timestamp, and identical intervals, so both points should be kept.
613            (
614                ScalarPoints::from((1, 1.0)),
615                FIVE_SECS,
616                ScalarPoints::from(2.0),
617                FIVE_SECS,
618                ScalarPoints::from([(0, 2.0), (1, 1.0)]),
619                FIVE_SECS,
620            ),
621            // Both A and B have single point without a timestamp, and identical intervals, so the points should be merged.
622            (
623                ScalarPoints::from(5.0),
624                FIVE_SECS,
625                ScalarPoints::from(6.0),
626                FIVE_SECS,
627                ScalarPoints::from(11.0),
628                FIVE_SECS,
629            ),
630            // We do three permutations here -- identical timestamped point, differing timestamped point,
631            // non-timestamped point -- but always with differing intervals, which should lead to B overriding A
632            // entirely.
633            (
634                ScalarPoints::from((1, 1.0)),
635                FIVE_SECS,
636                ScalarPoints::from((1, 2.0)),
637                TEN_SECS,
638                ScalarPoints::from((1, 2.0)),
639                TEN_SECS,
640            ),
641            (
642                ScalarPoints::from((1, 3.0)),
643                FIVE_SECS,
644                ScalarPoints::from(4.0),
645                TEN_SECS,
646                ScalarPoints::from((0, 4.0)),
647                TEN_SECS,
648            ),
649            (
650                ScalarPoints::from(7.0),
651                TEN_SECS,
652                ScalarPoints::from(9.0),
653                FIVE_SECS,
654                ScalarPoints::from(9.0),
655                FIVE_SECS,
656            ),
657        ];
658
659        for (a, a_interval, b, b_interval, expected, expected_interval) in cases {
660            let mut merged = MetricValues::Rate(a.clone(), a_interval);
661            merged.merge(MetricValues::Rate(b.clone(), b_interval));
662
663            assert_eq!(
664                merged,
665                MetricValues::Rate(expected.clone(), expected_interval),
666                "merged {}/{:?} with {}/{:?}, expected {} but got {}",
667                a,
668                a_interval,
669                b,
670                b_interval,
671                expected,
672                merged
673            );
674        }
675    }
676
677    #[test]
678    fn merge_sets() {
679        let cases = [
680            // Both A and B have single point with an identical timestamp, so the values should be merged.
681            (
682                SetPoints::from((1, "foo")),
683                SetPoints::from((1, "bar")),
684                SetPoints::from((1, ["foo", "bar"])),
685            ),
686            // A has a single point with a timestamp, B has a single point without a timestamp, so both points should be
687            // kept.
688            (
689                SetPoints::from((1, "foo")),
690                SetPoints::from("bar"),
691                SetPoints::from([(0, "bar"), (1, "foo")]),
692            ),
693            // Both A and B have single point without a timestamp, so the values should be merged.
694            (
695                SetPoints::from("foo"),
696                SetPoints::from("bar"),
697                SetPoints::from(["foo", "bar"]),
698            ),
699        ];
700
701        for (a, b, expected) in cases {
702            let mut merged = MetricValues::Set(a.clone());
703            merged.merge(MetricValues::Set(b.clone()));
704
705            assert_eq!(
706                merged,
707                MetricValues::Set(expected.clone()),
708                "merged {} with {}, expected {} but got {}",
709                a,
710                b,
711                expected,
712                merged
713            );
714        }
715    }
716
717    #[test]
718    fn merge_histograms() {
719        let cases = [
720            // Both A and B have single point with an identical timestamp, so the samples should be merged.
721            (
722                HistogramPoints::from((1, 1.0)),
723                HistogramPoints::from((1, 2.0)),
724                HistogramPoints::from((1, [1.0, 2.0])),
725            ),
726            // A has a single point with a timestamp, B has a single point without a timestamp, so both points should be kept.
727            (
728                HistogramPoints::from((1, 1.0)),
729                HistogramPoints::from(2.0),
730                HistogramPoints::from([(0, 2.0), (1, 1.0)]),
731            ),
732            // Both A and B have single point without a timestamp, so the samples should be merged.
733            (
734                HistogramPoints::from(5.0),
735                HistogramPoints::from(6.0),
736                HistogramPoints::from([5.0, 6.0]),
737            ),
738        ];
739
740        for (a, b, expected) in cases {
741            let mut merged = MetricValues::Histogram(a.clone());
742            merged.merge(MetricValues::Histogram(b.clone()));
743
744            assert_eq!(
745                merged,
746                MetricValues::Histogram(expected.clone()),
747                "merged {} with {}, expected {} but got {}",
748                a,
749                b,
750                expected,
751                merged
752            );
753        }
754    }
755
756    #[test]
757    fn merge_distributions() {
758        let cases = [
759            // Both A and B have single point with an identical timestamp, so the sketches should be merged.
760            (
761                SketchPoints::from((1, 1.0)),
762                SketchPoints::from((1, 2.0)),
763                SketchPoints::from((1, [1.0, 2.0])),
764            ),
765            // A has a single point with a timestamp, B has a single point without a timestamp, so both points should be kept.
766            (
767                SketchPoints::from((1, 1.0)),
768                SketchPoints::from(2.0),
769                SketchPoints::from([(0, 2.0), (1, 1.0)]),
770            ),
771            // Both A and B have single point without a timestamp, so the sketches should be merged.
772            (
773                SketchPoints::from(5.0),
774                SketchPoints::from(6.0),
775                SketchPoints::from([5.0, 6.0]),
776            ),
777        ];
778
779        for (a, b, expected) in cases {
780            let mut merged = MetricValues::Distribution(a.clone());
781            merged.merge(MetricValues::Distribution(b.clone()));
782
783            assert_eq!(
784                merged,
785                MetricValues::Distribution(expected.clone()),
786                "merged {} with {}, expected {} but got {}",
787                a,
788                b,
789                expected,
790                merged
791            );
792        }
793    }
794}