ddsketch/canonical/store/
collapsing_highest.rs1use datadog_protos::sketches::Store as ProtoStore;
2
3use super::{validate_proto_count, Store};
4use crate::canonical::error::ProtoConversionError;
5
6#[derive(Clone, Debug, Eq, PartialEq)]
17pub struct CollapsingHighestDenseStore {
18 bins: Vec<u64>,
20
21 offset: i32,
23
24 max_num_bins: usize,
26
27 count: u64,
29
30 is_collapsed: bool,
32}
33
34impl CollapsingHighestDenseStore {
35 pub fn new(max_num_bins: usize) -> Self {
37 assert!(max_num_bins >= 1, "max_num_bins must be at least 1");
38 Self {
39 bins: Vec::new(),
40 offset: 0,
41 max_num_bins,
42 count: 0,
43 is_collapsed: false,
44 }
45 }
46
47 pub fn is_collapsed(&self) -> bool {
51 self.is_collapsed
52 }
53
54 fn grow(&mut self, index: i32) {
56 if self.bins.is_empty() {
57 self.bins.push(0);
58 self.offset = index;
59 return;
60 }
61
62 if index >= self.offset + self.bins.len() as i32 {
63 let new_len = (index - self.offset + 1) as usize;
65
66 if new_len > self.max_num_bins {
67 self.is_collapsed = true;
70 return;
73 }
74
75 self.bins.resize(new_len, 0);
76 } else if index < self.offset {
77 let num_prepend = (self.offset - index) as usize;
79 let new_len = self.bins.len() + num_prepend;
80
81 if new_len > self.max_num_bins {
82 let bins_to_collapse = new_len - self.max_num_bins;
84 self.collapse_highest(bins_to_collapse);
85 }
86
87 let target_prepend =
89 ((self.offset - index) as usize).min(self.max_num_bins - self.bins.len().min(self.max_num_bins));
90 if target_prepend > 0 {
91 let mut new_bins = vec![0u64; target_prepend + self.bins.len()];
92 new_bins[target_prepend..].copy_from_slice(&self.bins);
93 self.bins = new_bins;
94 self.offset = index;
95 }
96 }
97 }
98
99 fn collapse_highest(&mut self, n: usize) {
101 if n == 0 || self.bins.is_empty() {
102 return;
103 }
104
105 self.is_collapsed = true;
106
107 let n = n.min(self.bins.len() - 1);
108 if n == 0 {
109 return;
110 }
111
112 let collapse_start = self.bins.len() - n;
113
114 let collapsed_count: u64 = self.bins[collapse_start..].iter().sum();
116
117 self.bins[collapse_start - 1] = self.bins[collapse_start - 1].saturating_add(collapsed_count);
119
120 self.bins.truncate(collapse_start);
122 }
123
124 #[inline]
126 fn bin_index(&self, index: i32) -> Option<usize> {
127 if index >= self.offset + self.bins.len() as i32 {
128 if self.bins.is_empty() {
130 None
131 } else {
132 Some(self.bins.len() - 1)
133 }
134 } else if index < self.offset {
135 None
136 } else {
137 Some((index - self.offset) as usize)
138 }
139 }
140}
141
142impl Store for CollapsingHighestDenseStore {
143 fn add(&mut self, index: i32, count: u64) {
144 if count == 0 {
145 return;
146 }
147
148 self.grow(index);
149
150 if let Some(bin_idx) = self.bin_index(index) {
151 self.bins[bin_idx] = self.bins[bin_idx].saturating_add(count);
152 }
153 self.count = self.count.saturating_add(count);
154 }
155
156 fn total_count(&self) -> u64 {
157 self.count
158 }
159
160 fn min_index(&self) -> Option<i32> {
161 if self.bins.is_empty() {
162 return None;
163 }
164
165 for (i, &count) in self.bins.iter().enumerate() {
166 if count > 0 {
167 return Some(self.offset + i as i32);
168 }
169 }
170 None
171 }
172
173 fn max_index(&self) -> Option<i32> {
174 if self.bins.is_empty() {
175 return None;
176 }
177
178 for (i, &count) in self.bins.iter().enumerate().rev() {
179 if count > 0 {
180 return Some(self.offset + i as i32);
181 }
182 }
183 None
184 }
185
186 fn key_at_rank(&self, rank: u64) -> Option<i32> {
187 if rank >= self.count {
188 return None;
189 }
190
191 let mut cumulative = 0u64;
192 for (i, &count) in self.bins.iter().enumerate() {
193 cumulative += count;
194 if cumulative > rank {
195 return Some(self.offset + i as i32);
196 }
197 }
198 None
199 }
200
201 fn merge(&mut self, other: &Self) {
202 if other.bins.is_empty() {
203 return;
204 }
205
206 if other.is_collapsed {
207 self.is_collapsed = true;
208 }
209
210 for (i, &count) in other.bins.iter().enumerate() {
212 if count > 0 {
213 let index = other.offset + i as i32;
214 self.add(index, count);
215 }
216 }
217 }
218
219 fn is_empty(&self) -> bool {
220 self.count == 0
221 }
222
223 fn clear(&mut self) {
224 self.bins.clear();
225 self.offset = 0;
226 self.count = 0;
227 self.is_collapsed = false;
228 }
229
230 fn merge_from_proto(&mut self, proto: &ProtoStore) -> Result<(), ProtoConversionError> {
231 for (&index, &count) in &proto.binCounts {
232 let count = validate_proto_count(index, count)?;
233 if count > 0 {
234 self.add(index, count);
235 }
236 }
237
238 let offset = proto.contiguousBinIndexOffset;
239 for (i, &count) in proto.contiguousBinCounts.iter().enumerate() {
240 let index = offset + i as i32;
241 let count = validate_proto_count(index, count)?;
242 if count > 0 {
243 self.add(index, count);
244 }
245 }
246
247 Ok(())
248 }
249
250 fn to_proto(&self) -> ProtoStore {
251 let mut proto = ProtoStore::new();
252
253 if self.bins.is_empty() {
254 return proto;
255 }
256
257 proto.contiguousBinIndexOffset = self.offset;
259 proto.contiguousBinCounts = self.bins.iter().map(|&c| c as f64).collect();
260
261 proto
262 }
263}
264
265impl Default for CollapsingHighestDenseStore {
266 fn default() -> Self {
268 Self::new(2048)
269 }
270}
271
272#[cfg(test)]
273mod tests {
274 use super::*;
275
276 #[test]
277 fn test_within_limit() {
278 let mut store = CollapsingHighestDenseStore::new(10);
279 for i in 0..10 {
280 store.add(i, 1);
281 }
282
283 assert_eq!(store.total_count(), 10);
284 assert!(!store.is_collapsed());
285 assert_eq!(store.bins.len(), 10);
286 }
287
288 #[test]
289 fn test_collapse_on_low_index() {
290 let mut store = CollapsingHighestDenseStore::new(5);
291
292 for i in 5..10 {
294 store.add(i, 1);
295 }
296 assert!(!store.is_collapsed());
297
298 store.add(0, 1);
300
301 assert!(store.is_collapsed());
302 assert_eq!(store.total_count(), 6);
303 assert!(store.bins.len() <= 5);
304 }
305
306 #[test]
307 fn test_collapse_on_high_index() {
308 let mut store = CollapsingHighestDenseStore::new(5);
309
310 for i in 0..5 {
312 store.add(i, 1);
313 }
314 assert!(!store.is_collapsed());
315
316 store.add(10, 1);
318
319 assert!(store.is_collapsed());
320 assert_eq!(store.total_count(), 6);
321 }
322
323 #[test]
324 fn test_key_at_rank_after_collapse() {
325 let mut store = CollapsingHighestDenseStore::new(3);
326
327 store.add(0, 1);
328 store.add(1, 1);
329 store.add(2, 1);
330 store.add(-1, 1);
332
333 assert!(store.is_collapsed());
334 assert_eq!(store.total_count(), 4);
335
336 assert!(store.key_at_rank(0).is_some());
338 assert!(store.key_at_rank(3).is_some());
339 assert!(store.key_at_rank(4).is_none());
340 }
341
342 #[test]
343 fn test_merge_respects_collapse() {
344 let mut store1 = CollapsingHighestDenseStore::new(5);
345 store1.add(0, 1);
346
347 let mut store2 = CollapsingHighestDenseStore::new(5);
348 for i in 0..10 {
349 store2.add(i, 1);
350 }
351
352 assert!(store2.is_collapsed());
353
354 store1.merge(&store2);
355
356 assert!(store1.is_collapsed());
357 }
358}