ddsketch/canonical/store/
collapsing_lowest.rs

1use datadog_protos::sketches::Store as ProtoStore;
2
3use super::{validate_proto_count, Store};
4use crate::canonical::error::ProtoConversionError;
5
6/// A dense store that collapses lowest-indexed bins when capacity is exceeded.
7///
8/// This store maintains a maximum number of bins. When adding a new index would exceed this limit, the lowest-indexed
9/// bins are collapsed (merged into the next lowest bin), sacrificing accuracy for lower quantiles to preserve accuracy
10/// for higher quantiles.
11///
12/// Use this store when:
13/// - You need bounded memory usage
14/// - Higher quantiles (e.g., p95, p99) are more important than lower quantiles
15/// - You're tracking latencies or other metrics where the tail matters most
16#[derive(Clone, Debug, Eq, PartialEq)]
17pub struct CollapsingLowestDenseStore {
18    /// The bin counts, stored contiguously.
19    bins: Vec<u64>,
20
21    /// The count stored in bins[0] corresponds to this index.
22    offset: i32,
23
24    /// Maximum number of bins to maintain.
25    max_num_bins: usize,
26
27    /// Total count across all bins.
28    count: u64,
29
30    /// Whether collapsing has occurred (accuracy may be compromised for low quantiles).
31    is_collapsed: bool,
32}
33
34impl CollapsingLowestDenseStore {
35    /// Creates an empty `CollapsingLowestDenseStore` with the given maximum number of bins.
36    pub fn new(max_num_bins: usize) -> Self {
37        assert!(max_num_bins >= 1, "max_num_bins must be at least 1");
38        Self {
39            bins: Vec::new(),
40            offset: 0,
41            max_num_bins,
42            count: 0,
43            is_collapsed: false,
44        }
45    }
46
47    /// Returns `true` if this store has collapsed bins.
48    ///
49    /// If true, accuracy guarantees may not hold for lower quantiles.
50    pub fn is_collapsed(&self) -> bool {
51        self.is_collapsed
52    }
53
54    /// Ensures the store can accommodate the given index, growing and collapsing if necessary.
55    fn grow(&mut self, index: i32) {
56        if self.bins.is_empty() {
57            self.bins.push(0);
58            self.offset = index;
59            return;
60        }
61
62        if index < self.offset {
63            // Need to prepend bins - but first check if we need to collapse
64            let num_prepend = (self.offset - index) as usize;
65            let new_len = self.bins.len() + num_prepend;
66
67            if new_len > self.max_num_bins {
68                // We need to collapse the new low indices into the current lowest
69                // Don't actually add the bins, just record that we collapsed
70                self.is_collapsed = true;
71                // The index is below our range, so when we add the count,
72                // we'll add it to the lowest bin (bins[0])
73                return;
74            }
75
76            let mut new_bins = vec![0u64; new_len];
77            new_bins[num_prepend..].copy_from_slice(&self.bins);
78            self.bins = new_bins;
79            self.offset = index;
80        } else if index >= self.offset + self.bins.len() as i32 {
81            // Need to append bins
82            let new_len = (index - self.offset + 1) as usize;
83
84            if new_len > self.max_num_bins {
85                // Need to collapse lowest bins to make room for higher indices
86                let bins_to_collapse = new_len - self.max_num_bins;
87                self.collapse_lowest(bins_to_collapse);
88            }
89
90            // Now append
91            let target_len = ((index - self.offset + 1) as usize).min(self.max_num_bins);
92            if target_len > self.bins.len() {
93                self.bins.resize(target_len, 0);
94            }
95        }
96    }
97
98    /// Collapses the lowest `n` bins into the bin at index `n`.
99    fn collapse_lowest(&mut self, n: usize) {
100        if n == 0 || self.bins.is_empty() {
101            return;
102        }
103
104        self.is_collapsed = true;
105
106        let n = n.min(self.bins.len() - 1);
107        if n == 0 {
108            return;
109        }
110
111        // Sum up the bins to collapse
112        let collapsed_count: u64 = self.bins[..n].iter().sum();
113
114        // Add to the bin that will become the new lowest
115        self.bins[n] = self.bins[n].saturating_add(collapsed_count);
116
117        // Remove the collapsed bins
118        self.bins.drain(..n);
119        self.offset += n as i32;
120    }
121
122    /// Returns the index into the bins array for the given logical index.
123    ///
124    /// If the index is below our range, it is mapped to the lowest bin. If the index is above our range, `None` is
125    /// returned.
126    #[inline]
127    fn bin_index(&self, index: i32) -> Option<usize> {
128        if index < self.offset {
129            // Index is below our range, map to lowest bin
130            Some(0)
131        } else {
132            let idx = (index - self.offset) as usize;
133            if idx < self.bins.len() {
134                Some(idx)
135            } else {
136                None
137            }
138        }
139    }
140}
141
142impl Store for CollapsingLowestDenseStore {
143    fn add(&mut self, index: i32, count: u64) {
144        if count == 0 {
145            return;
146        }
147
148        self.grow(index);
149
150        if let Some(bin_idx) = self.bin_index(index) {
151            self.bins[bin_idx] = self.bins[bin_idx].saturating_add(count);
152        }
153        self.count = self.count.saturating_add(count);
154    }
155
156    fn total_count(&self) -> u64 {
157        self.count
158    }
159
160    fn min_index(&self) -> Option<i32> {
161        if self.bins.is_empty() {
162            return None;
163        }
164
165        for (i, &count) in self.bins.iter().enumerate() {
166            if count > 0 {
167                return Some(self.offset + i as i32);
168            }
169        }
170        None
171    }
172
173    fn max_index(&self) -> Option<i32> {
174        if self.bins.is_empty() {
175            return None;
176        }
177
178        for (i, &count) in self.bins.iter().enumerate().rev() {
179            if count > 0 {
180                return Some(self.offset + i as i32);
181            }
182        }
183        None
184    }
185
186    fn key_at_rank(&self, rank: u64) -> Option<i32> {
187        if rank >= self.count {
188            return None;
189        }
190
191        let mut cumulative = 0u64;
192        for (i, &count) in self.bins.iter().enumerate() {
193            cumulative += count;
194            if cumulative > rank {
195                return Some(self.offset + i as i32);
196            }
197        }
198        None
199    }
200
201    fn merge(&mut self, other: &Self) {
202        if other.bins.is_empty() {
203            return;
204        }
205
206        if other.is_collapsed {
207            self.is_collapsed = true;
208        }
209
210        // Process each bin from the other store
211        for (i, &count) in other.bins.iter().enumerate() {
212            if count > 0 {
213                let index = other.offset + i as i32;
214                self.add(index, count);
215            }
216        }
217
218        // Adjust count since add() already incremented it
219        // We need to subtract the other.count we added and use the saturating add
220        // Actually, the adds already handle this correctly
221    }
222
223    fn is_empty(&self) -> bool {
224        self.count == 0
225    }
226
227    fn clear(&mut self) {
228        self.bins.clear();
229        self.offset = 0;
230        self.count = 0;
231        self.is_collapsed = false;
232    }
233
234    fn merge_from_proto(&mut self, proto: &ProtoStore) -> Result<(), ProtoConversionError> {
235        // Process sparse binCounts
236        for (&index, &count) in &proto.binCounts {
237            let count = validate_proto_count(index, count)?;
238            if count > 0 {
239                self.add(index, count);
240            }
241        }
242
243        // Process contiguous bins
244        let offset = proto.contiguousBinIndexOffset;
245        for (i, &count) in proto.contiguousBinCounts.iter().enumerate() {
246            let index = offset + i as i32;
247            let count = validate_proto_count(index, count)?;
248            if count > 0 {
249                self.add(index, count);
250            }
251        }
252
253        Ok(())
254    }
255
256    fn to_proto(&self) -> ProtoStore {
257        let mut proto = ProtoStore::new();
258
259        if self.bins.is_empty() {
260            return proto;
261        }
262
263        // Use contiguous encoding for dense store
264        proto.contiguousBinIndexOffset = self.offset;
265        proto.contiguousBinCounts = self.bins.iter().map(|&c| c as f64).collect();
266
267        proto
268    }
269}
270
271impl Default for CollapsingLowestDenseStore {
272    /// Creates a collapsing lowest dense store with a default of 2048 bins.
273    fn default() -> Self {
274        Self::new(2048)
275    }
276}
277
278#[cfg(test)]
279mod tests {
280    use super::*;
281
282    #[test]
283    fn test_within_limit() {
284        let mut store = CollapsingLowestDenseStore::new(10);
285        for i in 0..10 {
286            store.add(i, 1);
287        }
288
289        assert_eq!(store.total_count(), 10);
290        assert!(!store.is_collapsed());
291        assert_eq!(store.bins.len(), 10);
292    }
293
294    #[test]
295    fn test_collapse_on_high_index() {
296        let mut store = CollapsingLowestDenseStore::new(5);
297
298        // Add bins 0-4
299        for i in 0..5 {
300            store.add(i, 1);
301        }
302        assert!(!store.is_collapsed());
303
304        // Adding index 5 should trigger collapse
305        store.add(5, 1);
306
307        assert!(store.is_collapsed());
308        assert_eq!(store.total_count(), 6);
309        assert!(store.bins.len() <= 5);
310    }
311
312    #[test]
313    fn test_collapse_on_low_index() {
314        let mut store = CollapsingLowestDenseStore::new(5);
315
316        // Add bins 5-9
317        for i in 5..10 {
318            store.add(i, 1);
319        }
320        assert!(!store.is_collapsed());
321
322        // Adding index 0 should trigger collapse since it would need 10 bins
323        store.add(0, 1);
324
325        assert!(store.is_collapsed());
326        assert_eq!(store.total_count(), 6);
327    }
328
329    #[test]
330    fn test_key_at_rank_after_collapse() {
331        let mut store = CollapsingLowestDenseStore::new(3);
332
333        store.add(0, 1);
334        store.add(1, 1);
335        store.add(2, 1);
336        store.add(3, 1); // This should trigger collapse
337
338        assert!(store.is_collapsed());
339        assert_eq!(store.total_count(), 4);
340
341        // All counts should still be accounted for
342        assert!(store.key_at_rank(0).is_some());
343        assert!(store.key_at_rank(3).is_some());
344        assert!(store.key_at_rank(4).is_none());
345    }
346
347    #[test]
348    fn test_merge_respects_collapse() {
349        let mut store1 = CollapsingLowestDenseStore::new(5);
350        store1.add(0, 1);
351
352        let mut store2 = CollapsingLowestDenseStore::new(5);
353        for i in 0..10 {
354            store2.add(i, 1);
355        }
356
357        assert!(store2.is_collapsed());
358
359        store1.merge(&store2);
360
361        assert!(store1.is_collapsed());
362    }
363}