saluki_core/data_model/event/metric/value/
sketch.rs

1use std::{fmt, num::NonZeroU64};
2
3use ddsketch_agent::DDSketch;
4
5use super::{TimestampedValue, TimestampedValues};
6
7/// A set of sketch points.
8///
9/// Used to represent the data points of sketch-based metrics, such as distributions. Each point is attached to an
10/// optional timestamp.
11#[derive(Clone, Debug, Eq, PartialEq)]
12pub struct SketchPoints(TimestampedValues<DDSketch, 1>);
13
14impl SketchPoints {
15    pub(super) fn inner(&self) -> &TimestampedValues<DDSketch, 1> {
16        &self.0
17    }
18
19    pub(super) fn inner_mut(&mut self) -> &mut TimestampedValues<DDSketch, 1> {
20        &mut self.0
21    }
22
23    pub(super) fn drain_timestamped(&mut self) -> Self {
24        Self(self.0.drain_timestamped())
25    }
26
27    pub(super) fn split_at_timestamp(&mut self, timestamp: u64) -> Option<Self> {
28        self.0.split_at_timestamp(timestamp).map(Self)
29    }
30
31    /// Returns `true` if this set is empty.
32    pub fn is_empty(&self) -> bool {
33        self.0.values.is_empty()
34    }
35
36    /// Returns the number of points in this set.
37    pub fn len(&self) -> usize {
38        self.0.values.len()
39    }
40
41    /// Merges another set of points into this one.
42    ///
43    /// If a point with the same timestamp exists in both sets, the sketches will be merged together. Otherwise, the
44    /// points will appended to the end of the set.
45    pub fn merge(&mut self, other: Self) {
46        let mut needs_sort = false;
47        for other_value in other.0.values {
48            if let Some(existing_value) = self
49                .0
50                .values
51                .iter_mut()
52                .find(|value| value.timestamp == other_value.timestamp)
53            {
54                existing_value.value.merge(&other_value.value);
55            } else {
56                self.0.values.push(other_value);
57                needs_sort = true;
58            }
59        }
60
61        if needs_sort {
62            self.0.sort_by_timestamp();
63        }
64    }
65}
66
67impl From<f64> for SketchPoints {
68    fn from(value: f64) -> Self {
69        let mut sketch = DDSketch::default();
70        sketch.insert(value);
71
72        Self(TimestampedValue::from(sketch).into())
73    }
74}
75
76impl<'a> From<&'a [f64]> for SketchPoints {
77    fn from(values: &'a [f64]) -> Self {
78        let mut sketch = DDSketch::default();
79        sketch.insert_many(values);
80
81        Self(TimestampedValue::from(sketch).into())
82    }
83}
84
85impl<const N: usize> From<[f64; N]> for SketchPoints {
86    fn from(values: [f64; N]) -> Self {
87        let mut sketch = DDSketch::default();
88        sketch.insert_many(&values[..]);
89
90        Self(TimestampedValue::from(sketch).into())
91    }
92}
93
94impl<'a, const N: usize> From<&'a [f64; N]> for SketchPoints {
95    fn from(values: &'a [f64; N]) -> Self {
96        let mut sketch = DDSketch::default();
97        sketch.insert_many(values);
98
99        Self(TimestampedValue::from(sketch).into())
100    }
101}
102
103impl From<DDSketch> for SketchPoints {
104    fn from(value: DDSketch) -> Self {
105        Self(TimestampedValue::from(value).into())
106    }
107}
108
109impl From<(u64, f64)> for SketchPoints {
110    fn from((ts, value): (u64, f64)) -> Self {
111        let mut sketch = DDSketch::default();
112        sketch.insert(value);
113
114        Self(TimestampedValue::from((ts, sketch)).into())
115    }
116}
117
118impl<const N: usize> From<(u64, [f64; N])> for SketchPoints {
119    fn from((ts, values): (u64, [f64; N])) -> Self {
120        let mut sketch = DDSketch::default();
121        sketch.insert_many(&values[..]);
122
123        Self(TimestampedValue::from((ts, sketch)).into())
124    }
125}
126
127impl<'a> From<(u64, &'a [f64])> for SketchPoints {
128    fn from((ts, values): (u64, &'a [f64])) -> Self {
129        let mut sketch = DDSketch::default();
130        sketch.insert_many(values);
131
132        Self(TimestampedValue::from((ts, sketch)).into())
133    }
134}
135
136impl<'a> From<&'a [(u64, &'a [f64])]> for SketchPoints {
137    fn from(values: &'a [(u64, &'a [f64])]) -> Self {
138        Self(TimestampedValues::from(values.iter().map(|(ts, values)| {
139            let mut sketch = DDSketch::default();
140            sketch.insert_many(values);
141
142            (*ts, sketch)
143        })))
144    }
145}
146
147impl<const N: usize> From<[(u64, f64); N]> for SketchPoints {
148    fn from(values: [(u64, f64); N]) -> Self {
149        Self(TimestampedValues::from(values.iter().map(|(ts, value)| {
150            let mut sketch = DDSketch::default();
151            sketch.insert(*value);
152
153            (*ts, sketch)
154        })))
155    }
156}
157
158impl<'a, const N: usize> From<[(u64, &'a [f64]); N]> for SketchPoints {
159    fn from(values: [(u64, &'a [f64]); N]) -> Self {
160        Self(TimestampedValues::from(values.iter().map(|(ts, values)| {
161            let mut sketch = DDSketch::default();
162            sketch.insert_many(values);
163
164            (*ts, sketch)
165        })))
166    }
167}
168
169impl IntoIterator for SketchPoints {
170    type Item = (Option<NonZeroU64>, DDSketch);
171    type IntoIter = SketchesIter;
172
173    fn into_iter(self) -> Self::IntoIter {
174        SketchesIter {
175            inner: self.0.values.into_iter(),
176        }
177    }
178}
179
180impl<'a> IntoIterator for &'a SketchPoints {
181    type Item = (Option<NonZeroU64>, &'a DDSketch);
182    type IntoIter = SketchesIterRef<'a>;
183
184    fn into_iter(self) -> Self::IntoIter {
185        SketchesIterRef {
186            inner: self.0.values.iter(),
187        }
188    }
189}
190
191impl FromIterator<(Option<NonZeroU64>, DDSketch)> for SketchPoints {
192    fn from_iter<T: IntoIterator<Item = (Option<NonZeroU64>, DDSketch)>>(iter: T) -> Self {
193        let mut sketch_points = SketchPoints(TimestampedValues::default());
194        for (ts, sketch) in iter {
195            sketch_points.0.values.push(TimestampedValue::from((ts, sketch)));
196        }
197        sketch_points
198    }
199}
200
201impl fmt::Display for SketchPoints {
202    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
203        write!(f, "[")?;
204        for (i, point) in self.0.values.iter().enumerate() {
205            if i > 0 {
206                write!(f, ",")?;
207            }
208
209            let ts = point.timestamp.map(|ts| ts.get()).unwrap_or_default();
210            let sketch = &point.value;
211            write!(
212                f,
213                "({}, {{cnt={} min={} max={} avg={} sum={} bin_count={}}})",
214                ts,
215                sketch.count(),
216                sketch.min().unwrap_or(0.0),
217                sketch.max().unwrap_or(0.0),
218                sketch.avg().unwrap_or(0.0),
219                sketch.sum().unwrap_or(0.0),
220                sketch.bin_count(),
221            )?;
222        }
223        write!(f, "]")
224    }
225}
226
227pub struct SketchesIter {
228    inner: smallvec::IntoIter<[TimestampedValue<DDSketch>; 1]>,
229}
230
231impl Iterator for SketchesIter {
232    type Item = (Option<NonZeroU64>, DDSketch);
233
234    fn next(&mut self) -> Option<Self::Item> {
235        self.inner.next().map(|value| (value.timestamp, value.value))
236    }
237}
238
239pub struct SketchesIterRef<'a> {
240    inner: std::slice::Iter<'a, TimestampedValue<DDSketch>>,
241}
242
243impl<'a> Iterator for SketchesIterRef<'a> {
244    type Item = (Option<NonZeroU64>, &'a DDSketch);
245
246    fn next(&mut self) -> Option<Self::Item> {
247        self.inner.next().map(|value| (value.timestamp, &value.value))
248    }
249}