ddsketch/canonical/store/
collapsing_lowest.rs1use datadog_protos::sketches::Store as ProtoStore;
2
3use super::{validate_proto_count, Store};
4use crate::canonical::error::ProtoConversionError;
5
6#[derive(Clone, Debug, Eq, PartialEq)]
17pub struct CollapsingLowestDenseStore {
18 bins: Vec<u64>,
20
21 offset: i32,
23
24 max_num_bins: usize,
26
27 count: u64,
29
30 is_collapsed: bool,
32}
33
34impl CollapsingLowestDenseStore {
35 pub fn new(max_num_bins: usize) -> Self {
37 assert!(max_num_bins >= 1, "max_num_bins must be at least 1");
38 Self {
39 bins: Vec::new(),
40 offset: 0,
41 max_num_bins,
42 count: 0,
43 is_collapsed: false,
44 }
45 }
46
47 pub fn is_collapsed(&self) -> bool {
51 self.is_collapsed
52 }
53
54 fn grow(&mut self, index: i32) {
56 if self.bins.is_empty() {
57 self.bins.push(0);
58 self.offset = index;
59 return;
60 }
61
62 if index < self.offset {
63 let num_prepend = (self.offset - index) as usize;
65 let new_len = self.bins.len() + num_prepend;
66
67 if new_len > self.max_num_bins {
68 self.is_collapsed = true;
71 return;
74 }
75
76 let mut new_bins = vec![0u64; new_len];
77 new_bins[num_prepend..].copy_from_slice(&self.bins);
78 self.bins = new_bins;
79 self.offset = index;
80 } else if index >= self.offset + self.bins.len() as i32 {
81 let new_len = (index - self.offset + 1) as usize;
83
84 if new_len > self.max_num_bins {
85 let bins_to_collapse = new_len - self.max_num_bins;
87 self.collapse_lowest(bins_to_collapse);
88 }
89
90 let target_len = ((index - self.offset + 1) as usize).min(self.max_num_bins);
92 if target_len > self.bins.len() {
93 self.bins.resize(target_len, 0);
94 }
95 }
96 }
97
98 fn collapse_lowest(&mut self, n: usize) {
100 if n == 0 || self.bins.is_empty() {
101 return;
102 }
103
104 self.is_collapsed = true;
105
106 let n = n.min(self.bins.len() - 1);
107 if n == 0 {
108 return;
109 }
110
111 let collapsed_count: u64 = self.bins[..n].iter().sum();
113
114 self.bins[n] = self.bins[n].saturating_add(collapsed_count);
116
117 self.bins.drain(..n);
119 self.offset += n as i32;
120 }
121
122 #[inline]
127 fn bin_index(&self, index: i32) -> Option<usize> {
128 if index < self.offset {
129 Some(0)
131 } else {
132 let idx = (index - self.offset) as usize;
133 if idx < self.bins.len() {
134 Some(idx)
135 } else {
136 None
137 }
138 }
139 }
140}
141
142impl Store for CollapsingLowestDenseStore {
143 fn add(&mut self, index: i32, count: u64) {
144 if count == 0 {
145 return;
146 }
147
148 self.grow(index);
149
150 if let Some(bin_idx) = self.bin_index(index) {
151 self.bins[bin_idx] = self.bins[bin_idx].saturating_add(count);
152 }
153 self.count = self.count.saturating_add(count);
154 }
155
156 fn total_count(&self) -> u64 {
157 self.count
158 }
159
160 fn min_index(&self) -> Option<i32> {
161 if self.bins.is_empty() {
162 return None;
163 }
164
165 for (i, &count) in self.bins.iter().enumerate() {
166 if count > 0 {
167 return Some(self.offset + i as i32);
168 }
169 }
170 None
171 }
172
173 fn max_index(&self) -> Option<i32> {
174 if self.bins.is_empty() {
175 return None;
176 }
177
178 for (i, &count) in self.bins.iter().enumerate().rev() {
179 if count > 0 {
180 return Some(self.offset + i as i32);
181 }
182 }
183 None
184 }
185
186 fn key_at_rank(&self, rank: u64) -> Option<i32> {
187 if rank >= self.count {
188 return None;
189 }
190
191 let mut cumulative = 0u64;
192 for (i, &count) in self.bins.iter().enumerate() {
193 cumulative += count;
194 if cumulative > rank {
195 return Some(self.offset + i as i32);
196 }
197 }
198 None
199 }
200
201 fn merge(&mut self, other: &Self) {
202 if other.bins.is_empty() {
203 return;
204 }
205
206 if other.is_collapsed {
207 self.is_collapsed = true;
208 }
209
210 for (i, &count) in other.bins.iter().enumerate() {
212 if count > 0 {
213 let index = other.offset + i as i32;
214 self.add(index, count);
215 }
216 }
217
218 }
222
223 fn is_empty(&self) -> bool {
224 self.count == 0
225 }
226
227 fn clear(&mut self) {
228 self.bins.clear();
229 self.offset = 0;
230 self.count = 0;
231 self.is_collapsed = false;
232 }
233
234 fn merge_from_proto(&mut self, proto: &ProtoStore) -> Result<(), ProtoConversionError> {
235 for (&index, &count) in &proto.binCounts {
237 let count = validate_proto_count(index, count)?;
238 if count > 0 {
239 self.add(index, count);
240 }
241 }
242
243 let offset = proto.contiguousBinIndexOffset;
245 for (i, &count) in proto.contiguousBinCounts.iter().enumerate() {
246 let index = offset + i as i32;
247 let count = validate_proto_count(index, count)?;
248 if count > 0 {
249 self.add(index, count);
250 }
251 }
252
253 Ok(())
254 }
255
256 fn to_proto(&self) -> ProtoStore {
257 let mut proto = ProtoStore::new();
258
259 if self.bins.is_empty() {
260 return proto;
261 }
262
263 proto.contiguousBinIndexOffset = self.offset;
265 proto.contiguousBinCounts = self.bins.iter().map(|&c| c as f64).collect();
266
267 proto
268 }
269}
270
271impl Default for CollapsingLowestDenseStore {
272 fn default() -> Self {
274 Self::new(2048)
275 }
276}
277
278#[cfg(test)]
279mod tests {
280 use super::*;
281
282 #[test]
283 fn test_within_limit() {
284 let mut store = CollapsingLowestDenseStore::new(10);
285 for i in 0..10 {
286 store.add(i, 1);
287 }
288
289 assert_eq!(store.total_count(), 10);
290 assert!(!store.is_collapsed());
291 assert_eq!(store.bins.len(), 10);
292 }
293
294 #[test]
295 fn test_collapse_on_high_index() {
296 let mut store = CollapsingLowestDenseStore::new(5);
297
298 for i in 0..5 {
300 store.add(i, 1);
301 }
302 assert!(!store.is_collapsed());
303
304 store.add(5, 1);
306
307 assert!(store.is_collapsed());
308 assert_eq!(store.total_count(), 6);
309 assert!(store.bins.len() <= 5);
310 }
311
312 #[test]
313 fn test_collapse_on_low_index() {
314 let mut store = CollapsingLowestDenseStore::new(5);
315
316 for i in 5..10 {
318 store.add(i, 1);
319 }
320 assert!(!store.is_collapsed());
321
322 store.add(0, 1);
324
325 assert!(store.is_collapsed());
326 assert_eq!(store.total_count(), 6);
327 }
328
329 #[test]
330 fn test_key_at_rank_after_collapse() {
331 let mut store = CollapsingLowestDenseStore::new(3);
332
333 store.add(0, 1);
334 store.add(1, 1);
335 store.add(2, 1);
336 store.add(3, 1); assert!(store.is_collapsed());
339 assert_eq!(store.total_count(), 4);
340
341 assert!(store.key_at_rank(0).is_some());
343 assert!(store.key_at_rank(3).is_some());
344 assert!(store.key_at_rank(4).is_none());
345 }
346
347 #[test]
348 fn test_merge_respects_collapse() {
349 let mut store1 = CollapsingLowestDenseStore::new(5);
350 store1.add(0, 1);
351
352 let mut store2 = CollapsingLowestDenseStore::new(5);
353 for i in 0..10 {
354 store2.add(i, 1);
355 }
356
357 assert!(store2.is_collapsed());
358
359 store1.merge(&store2);
360
361 assert!(store1.is_collapsed());
362 }
363}