saluki_core/data_model/event/metric/value/
scalar.rs

1use std::{fmt, num::NonZeroU64};
2
3use ordered_float::OrderedFloat;
4
5use super::{
6    iter::{PointsIter, PointsIterRef},
7    TimestampedValue, TimestampedValues,
8};
9
10/// A set of scalar points.
11///
12/// Used to represent the data points of "scalar" metric types, such as counters, gauges, and rates. Each data point is
13/// attached to an optional timestamp.
14#[derive(Clone, Debug, Eq, PartialEq)]
15pub struct ScalarPoints(TimestampedValues<OrderedFloat<f64>, 4>);
16
17impl ScalarPoints {
18    pub(super) fn new() -> Self {
19        Self(TimestampedValues::default())
20    }
21
22    pub(super) fn inner(&self) -> &TimestampedValues<OrderedFloat<f64>, 4> {
23        &self.0
24    }
25
26    pub(super) fn inner_mut(&mut self) -> &mut TimestampedValues<OrderedFloat<f64>, 4> {
27        &mut self.0
28    }
29
30    pub(super) fn add_point(&mut self, timestamp: Option<NonZeroU64>, value: f64) {
31        self.0.values.push(TimestampedValue {
32            timestamp,
33            value: OrderedFloat(value),
34        });
35        self.0.sort_by_timestamp();
36    }
37
38    pub(super) fn drain_timestamped(&mut self) -> Self {
39        Self(self.0.drain_timestamped())
40    }
41
42    pub(super) fn split_at_timestamp(&mut self, timestamp: u64) -> Option<Self> {
43        self.0.split_at_timestamp(timestamp).map(Self)
44    }
45
46    /// Returns `true` if this set is empty.
47    pub fn is_empty(&self) -> bool {
48        self.0.values.is_empty()
49    }
50
51    /// Returns the number of points in this set.
52    pub fn len(&self) -> usize {
53        self.0.values.len()
54    }
55
56    /// Merges another set of points into this one.
57    ///
58    /// If a point with the same timestamp exists in both sets, the values will be added together. Otherwise, the points
59    /// will appended to the end of the set.
60    pub fn merge<F>(&mut self, other: Self, merge: F)
61    where
62        F: Fn(&mut OrderedFloat<f64>, &mut OrderedFloat<f64>),
63    {
64        let mut needs_sort = false;
65        for other_value in other.0.values {
66            if let Some(existing_value) = self
67                .0
68                .values
69                .iter_mut()
70                .find(|value| value.timestamp == other_value.timestamp)
71            {
72                let mut other = other_value.value;
73                merge(&mut existing_value.value, &mut other);
74            } else {
75                self.0.values.push(other_value);
76                needs_sort = true;
77            }
78        }
79
80        if needs_sort {
81            self.0.sort_by_timestamp();
82        }
83    }
84}
85
86impl From<f64> for ScalarPoints {
87    fn from(value: f64) -> Self {
88        Self(TimestampedValue::from(OrderedFloat(value)).into())
89    }
90}
91
92impl<'a> From<&'a [f64]> for ScalarPoints {
93    fn from(values: &'a [f64]) -> Self {
94        Self(TimestampedValues::from(values.iter().map(|value| OrderedFloat(*value))))
95    }
96}
97
98impl<const N: usize> From<[f64; N]> for ScalarPoints {
99    fn from(values: [f64; N]) -> Self {
100        Self(TimestampedValues::from(values.iter().map(|value| OrderedFloat(*value))))
101    }
102}
103
104impl From<(u64, f64)> for ScalarPoints {
105    fn from((ts, value): (u64, f64)) -> Self {
106        Self(TimestampedValue::from((ts, OrderedFloat(value))).into())
107    }
108}
109
110impl<'a> From<&'a [(u64, f64)]> for ScalarPoints {
111    fn from(values: &'a [(u64, f64)]) -> Self {
112        Self(TimestampedValues::from(
113            values.iter().map(|(ts, value)| (*ts, OrderedFloat(*value))),
114        ))
115    }
116}
117
118impl<const N: usize> From<[(u64, f64); N]> for ScalarPoints {
119    fn from(values: [(u64, f64); N]) -> Self {
120        Self(TimestampedValues::from(
121            values.iter().map(|(ts, value)| (*ts, OrderedFloat(*value))),
122        ))
123    }
124}
125
126impl<T> FromIterator<T> for ScalarPoints
127where
128    T: Into<TimestampedValue<OrderedFloat<f64>>>,
129{
130    fn from_iter<I: IntoIterator<Item = T>>(iter: I) -> Self {
131        Self(TimestampedValues::from(iter))
132    }
133}
134
135impl IntoIterator for ScalarPoints {
136    type Item = (Option<NonZeroU64>, f64);
137    type IntoIter = PointsIter;
138
139    fn into_iter(self) -> Self::IntoIter {
140        PointsIter::scalar(self.0.values.into_iter())
141    }
142}
143
144impl<'a> IntoIterator for &'a ScalarPoints {
145    type Item = (Option<NonZeroU64>, f64);
146    type IntoIter = PointsIterRef<'a>;
147
148    fn into_iter(self) -> Self::IntoIter {
149        PointsIterRef::scalar(self.0.values.iter())
150    }
151}
152
153impl fmt::Display for ScalarPoints {
154    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
155        write!(f, "[")?;
156        for (i, (timestamp, value)) in self.into_iter().enumerate() {
157            if i > 0 {
158                write!(f, ",")?;
159            }
160
161            let ts = timestamp.map(|ts| ts.get()).unwrap_or_default();
162            write!(f, "({}, {})", ts, value)?;
163        }
164        write!(f, "]")
165    }
166}