Skip to main content

saluki_core/data_model/event/metric/value/
sketch.rs

1use std::{fmt, num::NonZeroU64};
2
3use ddsketch::DDSketch;
4
5use super::{TimestampedValue, TimestampedValues};
6
7/// A set of sketch points.
8///
9/// Used to represent the data points of sketch-based metrics, such as distributions. Each point is attached to an
10/// optional timestamp.
11#[derive(Clone, Debug, Eq, PartialEq)]
12pub struct SketchPoints(TimestampedValues<DDSketch, 1>);
13
14impl SketchPoints {
15    pub(super) fn inner(&self) -> &TimestampedValues<DDSketch, 1> {
16        &self.0
17    }
18
19    pub(super) fn inner_mut(&mut self) -> &mut TimestampedValues<DDSketch, 1> {
20        &mut self.0
21    }
22
23    pub(super) fn drain_timestamped(&mut self) -> Self {
24        Self(self.0.drain_timestamped())
25    }
26
27    pub(super) fn split_at_timestamp(&mut self, timestamp: u64) -> Option<Self> {
28        self.0.split_at_timestamp(timestamp).map(Self)
29    }
30
31    /// Returns `true` if this set is empty.
32    pub fn is_empty(&self) -> bool {
33        self.0.values.is_empty()
34    }
35
36    /// Returns the number of points in this set.
37    pub fn len(&self) -> usize {
38        self.0.values.len()
39    }
40
41    /// Merges another set of points into this one.
42    ///
43    /// If a point with the same timestamp exists in both sets, the sketches will be merged together. Otherwise, the
44    /// points will appended to the end of the set.
45    pub fn merge(&mut self, other: Self) {
46        let mut needs_sort = false;
47        for other_value in other.0.values {
48            if let Some(existing_value) = self
49                .0
50                .values
51                .iter_mut()
52                .find(|value| value.timestamp == other_value.timestamp)
53            {
54                existing_value.value.merge(&other_value.value);
55            } else {
56                self.0.values.push(other_value);
57                needs_sort = true;
58            }
59        }
60
61        if needs_sort {
62            self.0.sort_by_timestamp();
63        }
64    }
65}
66
67impl From<f64> for SketchPoints {
68    fn from(value: f64) -> Self {
69        let mut sketch = DDSketch::default();
70        sketch.insert(value);
71
72        Self(TimestampedValue::from(sketch).into())
73    }
74}
75
76impl<'a> From<&'a [f64]> for SketchPoints {
77    fn from(values: &'a [f64]) -> Self {
78        let mut sketch = DDSketch::default();
79        sketch.insert_many(values);
80
81        Self(TimestampedValue::from(sketch).into())
82    }
83}
84
85impl<const N: usize> From<[f64; N]> for SketchPoints {
86    fn from(values: [f64; N]) -> Self {
87        let mut sketch = DDSketch::default();
88        sketch.insert_many(&values[..]);
89
90        Self(TimestampedValue::from(sketch).into())
91    }
92}
93
94impl<'a, const N: usize> From<&'a [f64; N]> for SketchPoints {
95    fn from(values: &'a [f64; N]) -> Self {
96        let mut sketch = DDSketch::default();
97        sketch.insert_many(values);
98
99        Self(TimestampedValue::from(sketch).into())
100    }
101}
102
103impl From<DDSketch> for SketchPoints {
104    fn from(value: DDSketch) -> Self {
105        Self(TimestampedValue::from(value).into())
106    }
107}
108
109impl From<(u64, f64)> for SketchPoints {
110    fn from((ts, value): (u64, f64)) -> Self {
111        let mut sketch = DDSketch::default();
112        sketch.insert(value);
113
114        Self(TimestampedValue::from((ts, sketch)).into())
115    }
116}
117
118impl<const N: usize> From<(u64, [f64; N])> for SketchPoints {
119    fn from((ts, values): (u64, [f64; N])) -> Self {
120        let mut sketch = DDSketch::default();
121        sketch.insert_many(&values[..]);
122
123        Self(TimestampedValue::from((ts, sketch)).into())
124    }
125}
126
127impl From<(u64, DDSketch)> for SketchPoints {
128    fn from((ts, sketch): (u64, DDSketch)) -> Self {
129        Self(TimestampedValue::from((ts, sketch)).into())
130    }
131}
132
133impl<'a> From<(u64, &'a [f64])> for SketchPoints {
134    fn from((ts, values): (u64, &'a [f64])) -> Self {
135        let mut sketch = DDSketch::default();
136        sketch.insert_many(values);
137
138        Self(TimestampedValue::from((ts, sketch)).into())
139    }
140}
141
142impl<'a> From<&'a [(u64, &'a [f64])]> for SketchPoints {
143    fn from(values: &'a [(u64, &'a [f64])]) -> Self {
144        Self(TimestampedValues::from(values.iter().map(|(ts, values)| {
145            let mut sketch = DDSketch::default();
146            sketch.insert_many(values);
147
148            (*ts, sketch)
149        })))
150    }
151}
152
153impl<const N: usize> From<[(u64, f64); N]> for SketchPoints {
154    fn from(values: [(u64, f64); N]) -> Self {
155        Self(TimestampedValues::from(values.iter().map(|(ts, value)| {
156            let mut sketch = DDSketch::default();
157            sketch.insert(*value);
158
159            (*ts, sketch)
160        })))
161    }
162}
163
164impl<'a, const N: usize> From<[(u64, &'a [f64]); N]> for SketchPoints {
165    fn from(values: [(u64, &'a [f64]); N]) -> Self {
166        Self(TimestampedValues::from(values.iter().map(|(ts, values)| {
167            let mut sketch = DDSketch::default();
168            sketch.insert_many(values);
169
170            (*ts, sketch)
171        })))
172    }
173}
174
175impl IntoIterator for SketchPoints {
176    type Item = (Option<NonZeroU64>, DDSketch);
177    type IntoIter = SketchesIter;
178
179    fn into_iter(self) -> Self::IntoIter {
180        SketchesIter {
181            inner: self.0.values.into_iter(),
182        }
183    }
184}
185
186impl<'a> IntoIterator for &'a SketchPoints {
187    type Item = (Option<NonZeroU64>, &'a DDSketch);
188    type IntoIter = SketchesIterRef<'a>;
189
190    fn into_iter(self) -> Self::IntoIter {
191        SketchesIterRef {
192            inner: self.0.values.iter(),
193        }
194    }
195}
196
197impl FromIterator<(Option<NonZeroU64>, DDSketch)> for SketchPoints {
198    fn from_iter<T: IntoIterator<Item = (Option<NonZeroU64>, DDSketch)>>(iter: T) -> Self {
199        let mut sketch_points = SketchPoints(TimestampedValues::default());
200        for (ts, sketch) in iter {
201            sketch_points.0.values.push(TimestampedValue::from((ts, sketch)));
202        }
203        sketch_points
204    }
205}
206
207impl fmt::Display for SketchPoints {
208    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
209        write!(f, "[")?;
210        for (i, point) in self.0.values.iter().enumerate() {
211            if i > 0 {
212                write!(f, ",")?;
213            }
214
215            let ts = point.timestamp.map(|ts| ts.get()).unwrap_or_default();
216            let sketch = &point.value;
217            write!(
218                f,
219                "({}, {{cnt={} min={} max={} avg={} sum={} bin_count={}}})",
220                ts,
221                sketch.count(),
222                sketch.min().unwrap_or(0.0),
223                sketch.max().unwrap_or(0.0),
224                sketch.avg().unwrap_or(0.0),
225                sketch.sum().unwrap_or(0.0),
226                sketch.bin_count(),
227            )?;
228        }
229        write!(f, "]")
230    }
231}
232
233pub struct SketchesIter {
234    inner: smallvec::IntoIter<[TimestampedValue<DDSketch>; 1]>,
235}
236
237impl Iterator for SketchesIter {
238    type Item = (Option<NonZeroU64>, DDSketch);
239
240    fn next(&mut self) -> Option<Self::Item> {
241        self.inner.next().map(|value| (value.timestamp, value.value))
242    }
243}
244
245pub struct SketchesIterRef<'a> {
246    inner: std::slice::Iter<'a, TimestampedValue<DDSketch>>,
247}
248
249impl<'a> Iterator for SketchesIterRef<'a> {
250    type Item = (Option<NonZeroU64>, &'a DDSketch);
251
252    fn next(&mut self) -> Option<Self::Item> {
253        self.inner.next().map(|value| (value.timestamp, &value.value))
254    }
255}