ddsketch/canonical/store/
dense.rs

1use datadog_protos::sketches::Store as ProtoStore;
2
3use super::{validate_proto_count, Store};
4use crate::canonical::error::ProtoConversionError;
5
6/// A dense store using contiguous array storage.
7///
8/// This store grows unbounded to accommodate any range of indices. It's memory-efficient when the indices are clustered
9/// together, but can use significant memory if indices are widely scattered.
10///
11/// Use this store when:
12/// - You have a bounded range of input values
13/// - Memory usage is not a concern
14/// - You need the fastest possible insertion performance
15#[derive(Clone, Debug, Eq, PartialEq)]
16pub struct DenseStore {
17    /// The bin counts, stored contiguously.
18    bins: Vec<u64>,
19
20    /// The count stored in bins[0] corresponds to this index.
21    offset: i32,
22
23    /// Total count across all bins.
24    count: u64,
25}
26
27impl DenseStore {
28    /// Creates an empty `DenseStore`.
29    pub fn new() -> Self {
30        Self {
31            bins: Vec::new(),
32            offset: 0,
33            count: 0,
34        }
35    }
36
37    /// Ensures the store can accommodate the given index, growing if necessary.
38    fn grow(&mut self, index: i32) {
39        if self.bins.is_empty() {
40            self.bins.push(0);
41            self.offset = index;
42            return;
43        }
44
45        if index < self.offset {
46            // Need to prepend bins
47            let num_prepend = (self.offset - index) as usize;
48            let mut new_bins = vec![0u64; num_prepend + self.bins.len()];
49            new_bins[num_prepend..].copy_from_slice(&self.bins);
50            self.bins = new_bins;
51            self.offset = index;
52        } else if index >= self.offset + self.bins.len() as i32 {
53            // Need to append bins
54            let new_len = (index - self.offset + 1) as usize;
55            self.bins.resize(new_len, 0);
56        }
57    }
58
59    /// Returns the index into the bins array for the given logical index.
60    #[inline]
61    fn bin_index(&self, index: i32) -> usize {
62        (index - self.offset) as usize
63    }
64}
65
66impl Store for DenseStore {
67    fn add(&mut self, index: i32, count: u64) {
68        if count == 0 {
69            return;
70        }
71
72        self.grow(index);
73        let bin_idx = self.bin_index(index);
74        self.bins[bin_idx] = self.bins[bin_idx].saturating_add(count);
75        self.count = self.count.saturating_add(count);
76    }
77
78    fn total_count(&self) -> u64 {
79        self.count
80    }
81
82    fn min_index(&self) -> Option<i32> {
83        if self.bins.is_empty() {
84            return None;
85        }
86
87        for (i, &count) in self.bins.iter().enumerate() {
88            if count > 0 {
89                return Some(self.offset + i as i32);
90            }
91        }
92        None
93    }
94
95    fn max_index(&self) -> Option<i32> {
96        if self.bins.is_empty() {
97            return None;
98        }
99
100        for (i, &count) in self.bins.iter().enumerate().rev() {
101            if count > 0 {
102                return Some(self.offset + i as i32);
103            }
104        }
105        None
106    }
107
108    fn key_at_rank(&self, rank: u64) -> Option<i32> {
109        if rank >= self.count {
110            return None;
111        }
112
113        let mut cumulative = 0u64;
114        for (i, &count) in self.bins.iter().enumerate() {
115            cumulative += count;
116            if cumulative > rank {
117                return Some(self.offset + i as i32);
118            }
119        }
120        None
121    }
122
123    fn merge(&mut self, other: &Self) {
124        if other.bins.is_empty() {
125            return;
126        }
127
128        // Grow to accommodate the other store's range
129        if let (Some(other_min), Some(other_max)) = (other.min_index(), other.max_index()) {
130            self.grow(other_min);
131            self.grow(other_max);
132
133            // Add all non-zero bins from the other store
134            for (i, &count) in other.bins.iter().enumerate() {
135                if count > 0 {
136                    let index = other.offset + i as i32;
137                    let bin_idx = self.bin_index(index);
138                    self.bins[bin_idx] = self.bins[bin_idx].saturating_add(count);
139                }
140            }
141        }
142
143        self.count = self.count.saturating_add(other.count);
144    }
145
146    fn is_empty(&self) -> bool {
147        self.count == 0
148    }
149
150    fn clear(&mut self) {
151        self.bins.clear();
152        self.offset = 0;
153        self.count = 0;
154    }
155
156    fn merge_from_proto(&mut self, proto: &ProtoStore) -> Result<(), ProtoConversionError> {
157        // Process sparse binCounts
158        for (&index, &count) in &proto.binCounts {
159            let count = validate_proto_count(index, count)?;
160            if count > 0 {
161                self.add(index, count);
162            }
163        }
164
165        // Process contiguous bins
166        let offset = proto.contiguousBinIndexOffset;
167        for (i, &count) in proto.contiguousBinCounts.iter().enumerate() {
168            let index = offset + i as i32;
169            let count = validate_proto_count(index, count)?;
170            if count > 0 {
171                self.add(index, count);
172            }
173        }
174
175        Ok(())
176    }
177
178    fn to_proto(&self) -> ProtoStore {
179        let mut proto = ProtoStore::new();
180
181        if self.bins.is_empty() {
182            return proto;
183        }
184
185        // Use contiguous encoding for dense store
186        proto.contiguousBinIndexOffset = self.offset;
187        proto.contiguousBinCounts = self.bins.iter().map(|&c| c as f64).collect();
188
189        proto
190    }
191}
192
193impl Default for DenseStore {
194    fn default() -> Self {
195        Self::new()
196    }
197}
198
199#[cfg(test)]
200mod tests {
201    use super::*;
202
203    #[test]
204    fn test_add_single() {
205        let mut store = DenseStore::new();
206        store.add(5, 1);
207
208        assert_eq!(store.total_count(), 1);
209        assert_eq!(store.min_index(), Some(5));
210        assert_eq!(store.max_index(), Some(5));
211    }
212
213    #[test]
214    fn test_add_multiple_same_index() {
215        let mut store = DenseStore::new();
216        store.add(5, 3);
217        store.add(5, 2);
218
219        assert_eq!(store.total_count(), 5);
220        assert_eq!(store.min_index(), Some(5));
221        assert_eq!(store.max_index(), Some(5));
222    }
223
224    #[test]
225    fn test_add_multiple_indices() {
226        let mut store = DenseStore::new();
227        store.add(5, 1);
228        store.add(10, 2);
229        store.add(3, 3);
230
231        assert_eq!(store.total_count(), 6);
232        assert_eq!(store.min_index(), Some(3));
233        assert_eq!(store.max_index(), Some(10));
234    }
235
236    #[test]
237    fn test_key_at_rank() {
238        let mut store = DenseStore::new();
239        store.add(5, 3);
240        store.add(10, 2);
241
242        assert_eq!(store.key_at_rank(0), Some(5));
243        assert_eq!(store.key_at_rank(2), Some(5));
244        assert_eq!(store.key_at_rank(3), Some(10));
245        assert_eq!(store.key_at_rank(4), Some(10));
246        assert_eq!(store.key_at_rank(5), None);
247    }
248
249    #[test]
250    fn test_merge() {
251        let mut store1 = DenseStore::new();
252        store1.add(5, 2);
253        store1.add(10, 1);
254
255        let mut store2 = DenseStore::new();
256        store2.add(5, 1);
257        store2.add(15, 3);
258
259        store1.merge(&store2);
260
261        assert_eq!(store1.total_count(), 7);
262        assert_eq!(store1.min_index(), Some(5));
263        assert_eq!(store1.max_index(), Some(15));
264    }
265
266    #[test]
267    fn test_clear() {
268        let mut store = DenseStore::new();
269        store.add(5, 2);
270        store.add(10, 1);
271
272        store.clear();
273
274        assert!(store.is_empty());
275        assert_eq!(store.total_count(), 0);
276        assert_eq!(store.min_index(), None);
277    }
278
279    #[test]
280    fn test_negative_indices() {
281        let mut store = DenseStore::new();
282        store.add(-5, 1);
283        store.add(5, 1);
284
285        assert_eq!(store.total_count(), 2);
286        assert_eq!(store.min_index(), Some(-5));
287        assert_eq!(store.max_index(), Some(5));
288    }
289}