ddsketch/canonical/store/
collapsing_highest.rs

1use datadog_protos::sketches::Store as ProtoStore;
2
3use super::{validate_proto_count, Store};
4use crate::canonical::error::ProtoConversionError;
5
6/// A dense store that collapses highest-indexed bins when capacity is exceeded.
7///
8/// This store maintains a maximum number of bins. When adding a new index would exceed this limit, the highest-indexed
9/// bins are collapsed (merged into the next highest bin), sacrificing accuracy for higher quantiles to preserve
10/// accuracy for lower quantiles.
11///
12/// Use this store when:
13/// - You need bounded memory usage
14/// - Lower quantiles (e.g., p1, p5) are more important than higher quantiles
15/// - You're tracking metrics where the minimum values matter most
16#[derive(Clone, Debug, Eq, PartialEq)]
17pub struct CollapsingHighestDenseStore {
18    /// The bin counts, stored contiguously.
19    bins: Vec<u64>,
20
21    /// The count stored in bins[0] corresponds to this index.
22    offset: i32,
23
24    /// Maximum number of bins to maintain.
25    max_num_bins: usize,
26
27    /// Total count across all bins.
28    count: u64,
29
30    /// Whether collapsing has occurred (accuracy may be compromised for high quantiles).
31    is_collapsed: bool,
32}
33
34impl CollapsingHighestDenseStore {
35    /// Creates an empty `CollapsingHighestDenseStore` with the given maximum number of bins.
36    pub fn new(max_num_bins: usize) -> Self {
37        assert!(max_num_bins >= 1, "max_num_bins must be at least 1");
38        Self {
39            bins: Vec::new(),
40            offset: 0,
41            max_num_bins,
42            count: 0,
43            is_collapsed: false,
44        }
45    }
46
47    /// Returns `true` if this store has collapsed bins.
48    ///
49    /// If true, accuracy guarantees may not hold for higher quantiles.
50    pub fn is_collapsed(&self) -> bool {
51        self.is_collapsed
52    }
53
54    /// Ensures the store can accommodate the given index, growing and collapsing if necessary.
55    fn grow(&mut self, index: i32) {
56        if self.bins.is_empty() {
57            self.bins.push(0);
58            self.offset = index;
59            return;
60        }
61
62        if index >= self.offset + self.bins.len() as i32 {
63            // Need to append bins - but first check if we need to collapse
64            let new_len = (index - self.offset + 1) as usize;
65
66            if new_len > self.max_num_bins {
67                // We need to collapse the new high indices into the current highest
68                // Don't actually add the bins, just record that we collapsed
69                self.is_collapsed = true;
70                // The index is above our range, so when we add the count,
71                // we'll add it to the highest bin
72                return;
73            }
74
75            self.bins.resize(new_len, 0);
76        } else if index < self.offset {
77            // Need to prepend bins
78            let num_prepend = (self.offset - index) as usize;
79            let new_len = self.bins.len() + num_prepend;
80
81            if new_len > self.max_num_bins {
82                // Need to collapse highest bins to make room for lower indices
83                let bins_to_collapse = new_len - self.max_num_bins;
84                self.collapse_highest(bins_to_collapse);
85            }
86
87            // Now prepend
88            let target_prepend =
89                ((self.offset - index) as usize).min(self.max_num_bins - self.bins.len().min(self.max_num_bins));
90            if target_prepend > 0 {
91                let mut new_bins = vec![0u64; target_prepend + self.bins.len()];
92                new_bins[target_prepend..].copy_from_slice(&self.bins);
93                self.bins = new_bins;
94                self.offset = index;
95            }
96        }
97    }
98
99    /// Collapses the highest `n` bins into the bin at index `len - n - 1`.
100    fn collapse_highest(&mut self, n: usize) {
101        if n == 0 || self.bins.is_empty() {
102            return;
103        }
104
105        self.is_collapsed = true;
106
107        let n = n.min(self.bins.len() - 1);
108        if n == 0 {
109            return;
110        }
111
112        let collapse_start = self.bins.len() - n;
113
114        // Sum up the bins to collapse
115        let collapsed_count: u64 = self.bins[collapse_start..].iter().sum();
116
117        // Add to the bin that will become the new highest
118        self.bins[collapse_start - 1] = self.bins[collapse_start - 1].saturating_add(collapsed_count);
119
120        // Remove the collapsed bins
121        self.bins.truncate(collapse_start);
122    }
123
124    /// Returns the index into the bins array for the given logical index.
125    #[inline]
126    fn bin_index(&self, index: i32) -> Option<usize> {
127        if index >= self.offset + self.bins.len() as i32 {
128            // Index is above our range, map to highest bin
129            if self.bins.is_empty() {
130                None
131            } else {
132                Some(self.bins.len() - 1)
133            }
134        } else if index < self.offset {
135            None
136        } else {
137            Some((index - self.offset) as usize)
138        }
139    }
140}
141
142impl Store for CollapsingHighestDenseStore {
143    fn add(&mut self, index: i32, count: u64) {
144        if count == 0 {
145            return;
146        }
147
148        self.grow(index);
149
150        if let Some(bin_idx) = self.bin_index(index) {
151            self.bins[bin_idx] = self.bins[bin_idx].saturating_add(count);
152        }
153        self.count = self.count.saturating_add(count);
154    }
155
156    fn total_count(&self) -> u64 {
157        self.count
158    }
159
160    fn min_index(&self) -> Option<i32> {
161        if self.bins.is_empty() {
162            return None;
163        }
164
165        for (i, &count) in self.bins.iter().enumerate() {
166            if count > 0 {
167                return Some(self.offset + i as i32);
168            }
169        }
170        None
171    }
172
173    fn max_index(&self) -> Option<i32> {
174        if self.bins.is_empty() {
175            return None;
176        }
177
178        for (i, &count) in self.bins.iter().enumerate().rev() {
179            if count > 0 {
180                return Some(self.offset + i as i32);
181            }
182        }
183        None
184    }
185
186    fn key_at_rank(&self, rank: u64) -> Option<i32> {
187        if rank >= self.count {
188            return None;
189        }
190
191        let mut cumulative = 0u64;
192        for (i, &count) in self.bins.iter().enumerate() {
193            cumulative += count;
194            if cumulative > rank {
195                return Some(self.offset + i as i32);
196            }
197        }
198        None
199    }
200
201    fn merge(&mut self, other: &Self) {
202        if other.bins.is_empty() {
203            return;
204        }
205
206        if other.is_collapsed {
207            self.is_collapsed = true;
208        }
209
210        // Process each bin from the other store
211        for (i, &count) in other.bins.iter().enumerate() {
212            if count > 0 {
213                let index = other.offset + i as i32;
214                self.add(index, count);
215            }
216        }
217    }
218
219    fn is_empty(&self) -> bool {
220        self.count == 0
221    }
222
223    fn clear(&mut self) {
224        self.bins.clear();
225        self.offset = 0;
226        self.count = 0;
227        self.is_collapsed = false;
228    }
229
230    fn merge_from_proto(&mut self, proto: &ProtoStore) -> Result<(), ProtoConversionError> {
231        for (&index, &count) in &proto.binCounts {
232            let count = validate_proto_count(index, count)?;
233            if count > 0 {
234                self.add(index, count);
235            }
236        }
237
238        let offset = proto.contiguousBinIndexOffset;
239        for (i, &count) in proto.contiguousBinCounts.iter().enumerate() {
240            let index = offset + i as i32;
241            let count = validate_proto_count(index, count)?;
242            if count > 0 {
243                self.add(index, count);
244            }
245        }
246
247        Ok(())
248    }
249
250    fn to_proto(&self) -> ProtoStore {
251        let mut proto = ProtoStore::new();
252
253        if self.bins.is_empty() {
254            return proto;
255        }
256
257        // Use contiguous encoding for dense store
258        proto.contiguousBinIndexOffset = self.offset;
259        proto.contiguousBinCounts = self.bins.iter().map(|&c| c as f64).collect();
260
261        proto
262    }
263}
264
265impl Default for CollapsingHighestDenseStore {
266    /// Creates a collapsing highest dense store with a default of 2048 bins.
267    fn default() -> Self {
268        Self::new(2048)
269    }
270}
271
272#[cfg(test)]
273mod tests {
274    use super::*;
275
276    #[test]
277    fn test_within_limit() {
278        let mut store = CollapsingHighestDenseStore::new(10);
279        for i in 0..10 {
280            store.add(i, 1);
281        }
282
283        assert_eq!(store.total_count(), 10);
284        assert!(!store.is_collapsed());
285        assert_eq!(store.bins.len(), 10);
286    }
287
288    #[test]
289    fn test_collapse_on_low_index() {
290        let mut store = CollapsingHighestDenseStore::new(5);
291
292        // Add bins 5-9
293        for i in 5..10 {
294            store.add(i, 1);
295        }
296        assert!(!store.is_collapsed());
297
298        // Adding index 0 should trigger collapse of highest bins
299        store.add(0, 1);
300
301        assert!(store.is_collapsed());
302        assert_eq!(store.total_count(), 6);
303        assert!(store.bins.len() <= 5);
304    }
305
306    #[test]
307    fn test_collapse_on_high_index() {
308        let mut store = CollapsingHighestDenseStore::new(5);
309
310        // Add bins 0-4
311        for i in 0..5 {
312            store.add(i, 1);
313        }
314        assert!(!store.is_collapsed());
315
316        // Adding index 10 should trigger collapse since it would need more than 5 bins
317        store.add(10, 1);
318
319        assert!(store.is_collapsed());
320        assert_eq!(store.total_count(), 6);
321    }
322
323    #[test]
324    fn test_key_at_rank_after_collapse() {
325        let mut store = CollapsingHighestDenseStore::new(3);
326
327        store.add(0, 1);
328        store.add(1, 1);
329        store.add(2, 1);
330        // Adding a lower index should trigger collapse
331        store.add(-1, 1);
332
333        assert!(store.is_collapsed());
334        assert_eq!(store.total_count(), 4);
335
336        // All counts should still be accounted for
337        assert!(store.key_at_rank(0).is_some());
338        assert!(store.key_at_rank(3).is_some());
339        assert!(store.key_at_rank(4).is_none());
340    }
341
342    #[test]
343    fn test_merge_respects_collapse() {
344        let mut store1 = CollapsingHighestDenseStore::new(5);
345        store1.add(0, 1);
346
347        let mut store2 = CollapsingHighestDenseStore::new(5);
348        for i in 0..10 {
349            store2.add(i, 1);
350        }
351
352        assert!(store2.is_collapsed());
353
354        store1.merge(&store2);
355
356        assert!(store1.is_collapsed());
357    }
358}