ddsketch/canonical/store/
dense.rs1use datadog_protos::sketches::Store as ProtoStore;
2
3use super::{validate_proto_count, Store};
4use crate::canonical::error::ProtoConversionError;
5
6#[derive(Clone, Debug, Eq, PartialEq)]
16pub struct DenseStore {
17 bins: Vec<u64>,
19
20 offset: i32,
22
23 count: u64,
25}
26
27impl DenseStore {
28 pub fn new() -> Self {
30 Self {
31 bins: Vec::new(),
32 offset: 0,
33 count: 0,
34 }
35 }
36
37 fn grow(&mut self, index: i32) {
39 if self.bins.is_empty() {
40 self.bins.push(0);
41 self.offset = index;
42 return;
43 }
44
45 if index < self.offset {
46 let num_prepend = (self.offset - index) as usize;
48 let mut new_bins = vec![0u64; num_prepend + self.bins.len()];
49 new_bins[num_prepend..].copy_from_slice(&self.bins);
50 self.bins = new_bins;
51 self.offset = index;
52 } else if index >= self.offset + self.bins.len() as i32 {
53 let new_len = (index - self.offset + 1) as usize;
55 self.bins.resize(new_len, 0);
56 }
57 }
58
59 #[inline]
61 fn bin_index(&self, index: i32) -> usize {
62 (index - self.offset) as usize
63 }
64}
65
66impl Store for DenseStore {
67 fn add(&mut self, index: i32, count: u64) {
68 if count == 0 {
69 return;
70 }
71
72 self.grow(index);
73 let bin_idx = self.bin_index(index);
74 self.bins[bin_idx] = self.bins[bin_idx].saturating_add(count);
75 self.count = self.count.saturating_add(count);
76 }
77
78 fn total_count(&self) -> u64 {
79 self.count
80 }
81
82 fn min_index(&self) -> Option<i32> {
83 if self.bins.is_empty() {
84 return None;
85 }
86
87 for (i, &count) in self.bins.iter().enumerate() {
88 if count > 0 {
89 return Some(self.offset + i as i32);
90 }
91 }
92 None
93 }
94
95 fn max_index(&self) -> Option<i32> {
96 if self.bins.is_empty() {
97 return None;
98 }
99
100 for (i, &count) in self.bins.iter().enumerate().rev() {
101 if count > 0 {
102 return Some(self.offset + i as i32);
103 }
104 }
105 None
106 }
107
108 fn key_at_rank(&self, rank: u64) -> Option<i32> {
109 if rank >= self.count {
110 return None;
111 }
112
113 let mut cumulative = 0u64;
114 for (i, &count) in self.bins.iter().enumerate() {
115 cumulative += count;
116 if cumulative > rank {
117 return Some(self.offset + i as i32);
118 }
119 }
120 None
121 }
122
123 fn merge(&mut self, other: &Self) {
124 if other.bins.is_empty() {
125 return;
126 }
127
128 if let (Some(other_min), Some(other_max)) = (other.min_index(), other.max_index()) {
130 self.grow(other_min);
131 self.grow(other_max);
132
133 for (i, &count) in other.bins.iter().enumerate() {
135 if count > 0 {
136 let index = other.offset + i as i32;
137 let bin_idx = self.bin_index(index);
138 self.bins[bin_idx] = self.bins[bin_idx].saturating_add(count);
139 }
140 }
141 }
142
143 self.count = self.count.saturating_add(other.count);
144 }
145
146 fn is_empty(&self) -> bool {
147 self.count == 0
148 }
149
150 fn clear(&mut self) {
151 self.bins.clear();
152 self.offset = 0;
153 self.count = 0;
154 }
155
156 fn merge_from_proto(&mut self, proto: &ProtoStore) -> Result<(), ProtoConversionError> {
157 for (&index, &count) in &proto.binCounts {
159 let count = validate_proto_count(index, count)?;
160 if count > 0 {
161 self.add(index, count);
162 }
163 }
164
165 let offset = proto.contiguousBinIndexOffset;
167 for (i, &count) in proto.contiguousBinCounts.iter().enumerate() {
168 let index = offset + i as i32;
169 let count = validate_proto_count(index, count)?;
170 if count > 0 {
171 self.add(index, count);
172 }
173 }
174
175 Ok(())
176 }
177
178 fn to_proto(&self) -> ProtoStore {
179 let mut proto = ProtoStore::new();
180
181 if self.bins.is_empty() {
182 return proto;
183 }
184
185 proto.contiguousBinIndexOffset = self.offset;
187 proto.contiguousBinCounts = self.bins.iter().map(|&c| c as f64).collect();
188
189 proto
190 }
191}
192
193impl Default for DenseStore {
194 fn default() -> Self {
195 Self::new()
196 }
197}
198
199#[cfg(test)]
200mod tests {
201 use super::*;
202
203 #[test]
204 fn test_add_single() {
205 let mut store = DenseStore::new();
206 store.add(5, 1);
207
208 assert_eq!(store.total_count(), 1);
209 assert_eq!(store.min_index(), Some(5));
210 assert_eq!(store.max_index(), Some(5));
211 }
212
213 #[test]
214 fn test_add_multiple_same_index() {
215 let mut store = DenseStore::new();
216 store.add(5, 3);
217 store.add(5, 2);
218
219 assert_eq!(store.total_count(), 5);
220 assert_eq!(store.min_index(), Some(5));
221 assert_eq!(store.max_index(), Some(5));
222 }
223
224 #[test]
225 fn test_add_multiple_indices() {
226 let mut store = DenseStore::new();
227 store.add(5, 1);
228 store.add(10, 2);
229 store.add(3, 3);
230
231 assert_eq!(store.total_count(), 6);
232 assert_eq!(store.min_index(), Some(3));
233 assert_eq!(store.max_index(), Some(10));
234 }
235
236 #[test]
237 fn test_key_at_rank() {
238 let mut store = DenseStore::new();
239 store.add(5, 3);
240 store.add(10, 2);
241
242 assert_eq!(store.key_at_rank(0), Some(5));
243 assert_eq!(store.key_at_rank(2), Some(5));
244 assert_eq!(store.key_at_rank(3), Some(10));
245 assert_eq!(store.key_at_rank(4), Some(10));
246 assert_eq!(store.key_at_rank(5), None);
247 }
248
249 #[test]
250 fn test_merge() {
251 let mut store1 = DenseStore::new();
252 store1.add(5, 2);
253 store1.add(10, 1);
254
255 let mut store2 = DenseStore::new();
256 store2.add(5, 1);
257 store2.add(15, 3);
258
259 store1.merge(&store2);
260
261 assert_eq!(store1.total_count(), 7);
262 assert_eq!(store1.min_index(), Some(5));
263 assert_eq!(store1.max_index(), Some(15));
264 }
265
266 #[test]
267 fn test_clear() {
268 let mut store = DenseStore::new();
269 store.add(5, 2);
270 store.add(10, 1);
271
272 store.clear();
273
274 assert!(store.is_empty());
275 assert_eq!(store.total_count(), 0);
276 assert_eq!(store.min_index(), None);
277 }
278
279 #[test]
280 fn test_negative_indices() {
281 let mut store = DenseStore::new();
282 store.add(-5, 1);
283 store.add(5, 1);
284
285 assert_eq!(store.total_count(), 2);
286 assert_eq!(store.min_index(), Some(-5));
287 assert_eq!(store.max_index(), Some(5));
288 }
289}