ddsketch/canonical/store/
dense.rs1use datadog_protos::sketches::Store as ProtoStore;
2
3use super::{validate_proto_count, Store};
4use crate::canonical::error::ProtoConversionError;
5
6#[derive(Clone, Debug, Eq, PartialEq)]
16pub struct DenseStore {
17 bins: Vec<u64>,
19
20 offset: i32,
22
23 count: u64,
25}
26
27impl DenseStore {
28 pub fn new() -> Self {
30 Self {
31 bins: Vec::new(),
32 offset: 0,
33 count: 0,
34 }
35 }
36
37 fn grow(&mut self, index: i32) {
39 if self.bins.is_empty() {
40 self.bins.push(0);
41 self.offset = index;
42 return;
43 }
44
45 if index < self.offset {
46 let num_prepend = (self.offset - index) as usize;
48 let mut new_bins = vec![0u64; num_prepend + self.bins.len()];
49 new_bins[num_prepend..].copy_from_slice(&self.bins);
50 self.bins = new_bins;
51 self.offset = index;
52 } else if index >= self.offset + self.bins.len() as i32 {
53 let new_len = (index - self.offset + 1) as usize;
55 self.bins.resize(new_len, 0);
56 }
57 }
58
59 #[inline]
61 fn bin_index(&self, index: i32) -> usize {
62 (index - self.offset) as usize
63 }
64
65 pub fn iter_non_zero_bins(&self) -> impl Iterator<Item = (i32, u64)> + '_ {
67 self.bins.iter().enumerate().filter_map(|(i, &count)| {
68 if count == 0 {
69 None
70 } else {
71 Some((self.offset + i as i32, count))
72 }
73 })
74 }
75}
76
77impl Store for DenseStore {
78 fn add(&mut self, index: i32, count: u64) {
79 if count == 0 {
80 return;
81 }
82
83 self.grow(index);
84 let bin_idx = self.bin_index(index);
85 self.bins[bin_idx] = self.bins[bin_idx].saturating_add(count);
86 self.count = self.count.saturating_add(count);
87 }
88
89 fn total_count(&self) -> u64 {
90 self.count
91 }
92
93 fn min_index(&self) -> Option<i32> {
94 if self.bins.is_empty() {
95 return None;
96 }
97
98 for (i, &count) in self.bins.iter().enumerate() {
99 if count > 0 {
100 return Some(self.offset + i as i32);
101 }
102 }
103 None
104 }
105
106 fn max_index(&self) -> Option<i32> {
107 if self.bins.is_empty() {
108 return None;
109 }
110
111 for (i, &count) in self.bins.iter().enumerate().rev() {
112 if count > 0 {
113 return Some(self.offset + i as i32);
114 }
115 }
116 None
117 }
118
119 fn key_at_rank(&self, rank: u64) -> Option<i32> {
120 if rank >= self.count {
121 return None;
122 }
123
124 let mut cumulative = 0u64;
125 for (i, &count) in self.bins.iter().enumerate() {
126 cumulative += count;
127 if cumulative > rank {
128 return Some(self.offset + i as i32);
129 }
130 }
131 None
132 }
133
134 fn merge(&mut self, other: &Self) {
135 if other.bins.is_empty() {
136 return;
137 }
138
139 if let (Some(other_min), Some(other_max)) = (other.min_index(), other.max_index()) {
141 self.grow(other_min);
142 self.grow(other_max);
143
144 for (i, &count) in other.bins.iter().enumerate() {
146 if count > 0 {
147 let index = other.offset + i as i32;
148 let bin_idx = self.bin_index(index);
149 self.bins[bin_idx] = self.bins[bin_idx].saturating_add(count);
150 }
151 }
152 }
153
154 self.count = self.count.saturating_add(other.count);
155 }
156
157 fn is_empty(&self) -> bool {
158 self.count == 0
159 }
160
161 fn clear(&mut self) {
162 self.bins.clear();
163 self.offset = 0;
164 self.count = 0;
165 }
166
167 fn merge_from_proto(&mut self, proto: &ProtoStore) -> Result<(), ProtoConversionError> {
168 for (&index, &count) in &proto.binCounts {
170 let count = validate_proto_count(index, count)?;
171 if count > 0 {
172 self.add(index, count);
173 }
174 }
175
176 let offset = proto.contiguousBinIndexOffset;
178 for (i, &count) in proto.contiguousBinCounts.iter().enumerate() {
179 let index = offset + i as i32;
180 let count = validate_proto_count(index, count)?;
181 if count > 0 {
182 self.add(index, count);
183 }
184 }
185
186 Ok(())
187 }
188
189 fn to_proto(&self) -> ProtoStore {
190 let mut proto = ProtoStore::new();
191
192 if self.bins.is_empty() {
193 return proto;
194 }
195
196 proto.contiguousBinIndexOffset = self.offset;
198 proto.contiguousBinCounts = self.bins.iter().map(|&c| c as f64).collect();
199
200 proto
201 }
202}
203
204impl Default for DenseStore {
205 fn default() -> Self {
206 Self::new()
207 }
208}
209
210#[cfg(test)]
211mod tests {
212 use super::*;
213
214 #[test]
215 fn test_add_single() {
216 let mut store = DenseStore::new();
217 store.add(5, 1);
218
219 assert_eq!(store.total_count(), 1);
220 assert_eq!(store.min_index(), Some(5));
221 assert_eq!(store.max_index(), Some(5));
222 }
223
224 #[test]
225 fn test_add_multiple_same_index() {
226 let mut store = DenseStore::new();
227 store.add(5, 3);
228 store.add(5, 2);
229
230 assert_eq!(store.total_count(), 5);
231 assert_eq!(store.min_index(), Some(5));
232 assert_eq!(store.max_index(), Some(5));
233 }
234
235 #[test]
236 fn test_add_multiple_indices() {
237 let mut store = DenseStore::new();
238 store.add(5, 1);
239 store.add(10, 2);
240 store.add(3, 3);
241
242 assert_eq!(store.total_count(), 6);
243 assert_eq!(store.min_index(), Some(3));
244 assert_eq!(store.max_index(), Some(10));
245 }
246
247 #[test]
248 fn test_iter_non_zero_bins_empty() {
249 let store = DenseStore::new();
250
251 assert_eq!(store.iter_non_zero_bins().collect::<Vec<_>>(), Vec::new());
252 }
253
254 #[test]
255 fn test_iter_non_zero_bins_skips_zero_counts_and_applies_offset() {
256 let mut store = DenseStore::new();
257 store.add(10, 3);
258 store.add(12, 5);
259
260 assert_eq!(store.iter_non_zero_bins().collect::<Vec<_>>(), vec![(10, 3), (12, 5)]);
261 }
262
263 #[test]
264 fn test_key_at_rank() {
265 let mut store = DenseStore::new();
266 store.add(5, 3);
267 store.add(10, 2);
268
269 assert_eq!(store.key_at_rank(0), Some(5));
270 assert_eq!(store.key_at_rank(2), Some(5));
271 assert_eq!(store.key_at_rank(3), Some(10));
272 assert_eq!(store.key_at_rank(4), Some(10));
273 assert_eq!(store.key_at_rank(5), None);
274 }
275
276 #[test]
277 fn test_merge() {
278 let mut store1 = DenseStore::new();
279 store1.add(5, 2);
280 store1.add(10, 1);
281
282 let mut store2 = DenseStore::new();
283 store2.add(5, 1);
284 store2.add(15, 3);
285
286 store1.merge(&store2);
287
288 assert_eq!(store1.total_count(), 7);
289 assert_eq!(store1.min_index(), Some(5));
290 assert_eq!(store1.max_index(), Some(15));
291 }
292
293 #[test]
294 fn test_clear() {
295 let mut store = DenseStore::new();
296 store.add(5, 2);
297 store.add(10, 1);
298
299 store.clear();
300
301 assert!(store.is_empty());
302 assert_eq!(store.total_count(), 0);
303 assert_eq!(store.min_index(), None);
304 }
305
306 #[test]
307 fn test_negative_indices() {
308 let mut store = DenseStore::new();
309 store.add(-5, 1);
310 store.add(5, 1);
311
312 assert_eq!(store.total_count(), 2);
313 assert_eq!(store.min_index(), Some(-5));
314 assert_eq!(store.max_index(), Some(5));
315 }
316}