Skip to main content

ddsketch/canonical/store/
dense.rs

1use datadog_protos::sketches::Store as ProtoStore;
2
3use super::{validate_proto_count, Store};
4use crate::canonical::error::ProtoConversionError;
5
6/// A dense store using contiguous array storage.
7///
8/// This store grows unbounded to accommodate any range of indices. It's memory-efficient when the indices are clustered
9/// together, but can use significant memory if indices are widely scattered.
10///
11/// Use this store when:
12/// - You have a bounded range of input values
13/// - Memory usage is not a concern
14/// - You need the fastest possible insertion performance
15#[derive(Clone, Debug, Eq, PartialEq)]
16pub struct DenseStore {
17    /// The bin counts, stored contiguously.
18    bins: Vec<u64>,
19
20    /// The count stored in bins[0] corresponds to this index.
21    offset: i32,
22
23    /// Total count across all bins.
24    count: u64,
25}
26
27impl DenseStore {
28    /// Creates an empty `DenseStore`.
29    pub fn new() -> Self {
30        Self {
31            bins: Vec::new(),
32            offset: 0,
33            count: 0,
34        }
35    }
36
37    /// Ensures the store can accommodate the given index, growing if necessary.
38    fn grow(&mut self, index: i32) {
39        if self.bins.is_empty() {
40            self.bins.push(0);
41            self.offset = index;
42            return;
43        }
44
45        if index < self.offset {
46            // Need to prepend bins
47            let num_prepend = (self.offset - index) as usize;
48            let mut new_bins = vec![0u64; num_prepend + self.bins.len()];
49            new_bins[num_prepend..].copy_from_slice(&self.bins);
50            self.bins = new_bins;
51            self.offset = index;
52        } else if index >= self.offset + self.bins.len() as i32 {
53            // Need to append bins
54            let new_len = (index - self.offset + 1) as usize;
55            self.bins.resize(new_len, 0);
56        }
57    }
58
59    /// Returns the index into the bins array for the given logical index.
60    #[inline]
61    fn bin_index(&self, index: i32) -> usize {
62        (index - self.offset) as usize
63    }
64
65    /// Iterates through all non-zero bins in the store, yielding `(index,count)` pairs.
66    pub fn iter_non_zero_bins(&self) -> impl Iterator<Item = (i32, u64)> + '_ {
67        self.bins.iter().enumerate().filter_map(|(i, &count)| {
68            if count == 0 {
69                None
70            } else {
71                Some((self.offset + i as i32, count))
72            }
73        })
74    }
75}
76
77impl Store for DenseStore {
78    fn add(&mut self, index: i32, count: u64) {
79        if count == 0 {
80            return;
81        }
82
83        self.grow(index);
84        let bin_idx = self.bin_index(index);
85        self.bins[bin_idx] = self.bins[bin_idx].saturating_add(count);
86        self.count = self.count.saturating_add(count);
87    }
88
89    fn total_count(&self) -> u64 {
90        self.count
91    }
92
93    fn min_index(&self) -> Option<i32> {
94        if self.bins.is_empty() {
95            return None;
96        }
97
98        for (i, &count) in self.bins.iter().enumerate() {
99            if count > 0 {
100                return Some(self.offset + i as i32);
101            }
102        }
103        None
104    }
105
106    fn max_index(&self) -> Option<i32> {
107        if self.bins.is_empty() {
108            return None;
109        }
110
111        for (i, &count) in self.bins.iter().enumerate().rev() {
112            if count > 0 {
113                return Some(self.offset + i as i32);
114            }
115        }
116        None
117    }
118
119    fn key_at_rank(&self, rank: u64) -> Option<i32> {
120        if rank >= self.count {
121            return None;
122        }
123
124        let mut cumulative = 0u64;
125        for (i, &count) in self.bins.iter().enumerate() {
126            cumulative += count;
127            if cumulative > rank {
128                return Some(self.offset + i as i32);
129            }
130        }
131        None
132    }
133
134    fn merge(&mut self, other: &Self) {
135        if other.bins.is_empty() {
136            return;
137        }
138
139        // Grow to accommodate the other store's range
140        if let (Some(other_min), Some(other_max)) = (other.min_index(), other.max_index()) {
141            self.grow(other_min);
142            self.grow(other_max);
143
144            // Add all non-zero bins from the other store
145            for (i, &count) in other.bins.iter().enumerate() {
146                if count > 0 {
147                    let index = other.offset + i as i32;
148                    let bin_idx = self.bin_index(index);
149                    self.bins[bin_idx] = self.bins[bin_idx].saturating_add(count);
150                }
151            }
152        }
153
154        self.count = self.count.saturating_add(other.count);
155    }
156
157    fn is_empty(&self) -> bool {
158        self.count == 0
159    }
160
161    fn clear(&mut self) {
162        self.bins.clear();
163        self.offset = 0;
164        self.count = 0;
165    }
166
167    fn merge_from_proto(&mut self, proto: &ProtoStore) -> Result<(), ProtoConversionError> {
168        // Process sparse binCounts
169        for (&index, &count) in &proto.binCounts {
170            let count = validate_proto_count(index, count)?;
171            if count > 0 {
172                self.add(index, count);
173            }
174        }
175
176        // Process contiguous bins
177        let offset = proto.contiguousBinIndexOffset;
178        for (i, &count) in proto.contiguousBinCounts.iter().enumerate() {
179            let index = offset + i as i32;
180            let count = validate_proto_count(index, count)?;
181            if count > 0 {
182                self.add(index, count);
183            }
184        }
185
186        Ok(())
187    }
188
189    fn to_proto(&self) -> ProtoStore {
190        let mut proto = ProtoStore::new();
191
192        if self.bins.is_empty() {
193            return proto;
194        }
195
196        // Use contiguous encoding for dense store
197        proto.contiguousBinIndexOffset = self.offset;
198        proto.contiguousBinCounts = self.bins.iter().map(|&c| c as f64).collect();
199
200        proto
201    }
202}
203
204impl Default for DenseStore {
205    fn default() -> Self {
206        Self::new()
207    }
208}
209
210#[cfg(test)]
211mod tests {
212    use super::*;
213
214    #[test]
215    fn test_add_single() {
216        let mut store = DenseStore::new();
217        store.add(5, 1);
218
219        assert_eq!(store.total_count(), 1);
220        assert_eq!(store.min_index(), Some(5));
221        assert_eq!(store.max_index(), Some(5));
222    }
223
224    #[test]
225    fn test_add_multiple_same_index() {
226        let mut store = DenseStore::new();
227        store.add(5, 3);
228        store.add(5, 2);
229
230        assert_eq!(store.total_count(), 5);
231        assert_eq!(store.min_index(), Some(5));
232        assert_eq!(store.max_index(), Some(5));
233    }
234
235    #[test]
236    fn test_add_multiple_indices() {
237        let mut store = DenseStore::new();
238        store.add(5, 1);
239        store.add(10, 2);
240        store.add(3, 3);
241
242        assert_eq!(store.total_count(), 6);
243        assert_eq!(store.min_index(), Some(3));
244        assert_eq!(store.max_index(), Some(10));
245    }
246
247    #[test]
248    fn test_iter_non_zero_bins_empty() {
249        let store = DenseStore::new();
250
251        assert_eq!(store.iter_non_zero_bins().collect::<Vec<_>>(), Vec::new());
252    }
253
254    #[test]
255    fn test_iter_non_zero_bins_skips_zero_counts_and_applies_offset() {
256        let mut store = DenseStore::new();
257        store.add(10, 3);
258        store.add(12, 5);
259
260        assert_eq!(store.iter_non_zero_bins().collect::<Vec<_>>(), vec![(10, 3), (12, 5)]);
261    }
262
263    #[test]
264    fn test_key_at_rank() {
265        let mut store = DenseStore::new();
266        store.add(5, 3);
267        store.add(10, 2);
268
269        assert_eq!(store.key_at_rank(0), Some(5));
270        assert_eq!(store.key_at_rank(2), Some(5));
271        assert_eq!(store.key_at_rank(3), Some(10));
272        assert_eq!(store.key_at_rank(4), Some(10));
273        assert_eq!(store.key_at_rank(5), None);
274    }
275
276    #[test]
277    fn test_merge() {
278        let mut store1 = DenseStore::new();
279        store1.add(5, 2);
280        store1.add(10, 1);
281
282        let mut store2 = DenseStore::new();
283        store2.add(5, 1);
284        store2.add(15, 3);
285
286        store1.merge(&store2);
287
288        assert_eq!(store1.total_count(), 7);
289        assert_eq!(store1.min_index(), Some(5));
290        assert_eq!(store1.max_index(), Some(15));
291    }
292
293    #[test]
294    fn test_clear() {
295        let mut store = DenseStore::new();
296        store.add(5, 2);
297        store.add(10, 1);
298
299        store.clear();
300
301        assert!(store.is_empty());
302        assert_eq!(store.total_count(), 0);
303        assert_eq!(store.min_index(), None);
304    }
305
306    #[test]
307    fn test_negative_indices() {
308        let mut store = DenseStore::new();
309        store.add(-5, 1);
310        store.add(5, 1);
311
312        assert_eq!(store.total_count(), 2);
313        assert_eq!(store.min_index(), Some(-5));
314        assert_eq!(store.max_index(), Some(5));
315    }
316}