ddsketch/canonical/store/
sparse.rs1use std::collections::BTreeMap;
2
3use datadog_protos::sketches::Store as ProtoStore;
4
5use super::{validate_proto_count, Store};
6use crate::canonical::error::ProtoConversionError;
7
8#[derive(Clone, Debug, Eq, PartialEq)]
18pub struct SparseStore {
19 bins: BTreeMap<i32, u64>,
21
22 count: u64,
24}
25
26impl SparseStore {
27 pub fn new() -> Self {
29 Self {
30 bins: BTreeMap::new(),
31 count: 0,
32 }
33 }
34}
35
36impl Store for SparseStore {
37 fn add(&mut self, index: i32, count: u64) {
38 if count == 0 {
39 return;
40 }
41
42 *self.bins.entry(index).or_insert(0) += count;
43 self.count = self.count.saturating_add(count);
44 }
45
46 fn total_count(&self) -> u64 {
47 self.count
48 }
49
50 fn min_index(&self) -> Option<i32> {
51 self.bins.iter().find(|(_, &c)| c > 0).map(|(&k, _)| k)
52 }
53
54 fn max_index(&self) -> Option<i32> {
55 self.bins.iter().rev().find(|(_, &c)| c > 0).map(|(&k, _)| k)
56 }
57
58 fn key_at_rank(&self, rank: u64) -> Option<i32> {
59 if rank >= self.count {
60 return None;
61 }
62
63 let mut cumulative = 0u64;
64 for (&index, &count) in &self.bins {
65 cumulative += count;
66 if cumulative > rank {
67 return Some(index);
68 }
69 }
70 None
71 }
72
73 fn merge(&mut self, other: &Self) {
74 for (&index, &count) in &other.bins {
75 if count > 0 {
76 *self.bins.entry(index).or_insert(0) += count;
77 }
78 }
79 self.count = self.count.saturating_add(other.count);
80 }
81
82 fn is_empty(&self) -> bool {
83 self.count == 0
84 }
85
86 fn clear(&mut self) {
87 self.bins.clear();
88 self.count = 0;
89 }
90
91 fn merge_from_proto(&mut self, proto: &ProtoStore) -> Result<(), ProtoConversionError> {
92 for (&index, &count) in &proto.binCounts {
94 let count = validate_proto_count(index, count)?;
95 if count > 0 {
96 self.add(index, count);
97 }
98 }
99
100 let offset = proto.contiguousBinIndexOffset;
102 for (i, &count) in proto.contiguousBinCounts.iter().enumerate() {
103 let index = offset + i as i32;
104 let count = validate_proto_count(index, count)?;
105 if count > 0 {
106 self.add(index, count);
107 }
108 }
109
110 Ok(())
111 }
112
113 fn to_proto(&self) -> ProtoStore {
114 let mut proto = ProtoStore::new();
115
116 for (&index, &count) in &self.bins {
118 if count > 0 {
119 proto.binCounts.insert(index, count as f64);
120 }
121 }
122
123 proto
124 }
125}
126
127impl Default for SparseStore {
128 fn default() -> Self {
129 Self::new()
130 }
131}
132
133#[cfg(test)]
134mod tests {
135 use super::*;
136
137 #[test]
138 fn test_add_single() {
139 let mut store = SparseStore::new();
140 store.add(5, 1);
141
142 assert_eq!(store.total_count(), 1);
143 assert_eq!(store.min_index(), Some(5));
144 assert_eq!(store.max_index(), Some(5));
145 }
146
147 #[test]
148 fn test_add_widely_scattered() {
149 let mut store = SparseStore::new();
150 store.add(-1000, 1);
151 store.add(0, 2);
152 store.add(1000, 3);
153
154 assert_eq!(store.total_count(), 6);
155 assert_eq!(store.min_index(), Some(-1000));
156 assert_eq!(store.max_index(), Some(1000));
157 assert_eq!(store.bins.len(), 3);
159 }
160
161 #[test]
162 fn test_key_at_rank() {
163 let mut store = SparseStore::new();
164 store.add(-10, 2);
165 store.add(10, 3);
166
167 assert_eq!(store.key_at_rank(0), Some(-10));
168 assert_eq!(store.key_at_rank(1), Some(-10));
169 assert_eq!(store.key_at_rank(2), Some(10));
170 assert_eq!(store.key_at_rank(4), Some(10));
171 assert_eq!(store.key_at_rank(5), None);
172 }
173
174 #[test]
175 fn test_merge() {
176 let mut store1 = SparseStore::new();
177 store1.add(5, 2);
178
179 let mut store2 = SparseStore::new();
180 store2.add(5, 3);
181 store2.add(100, 1);
182
183 store1.merge(&store2);
184
185 assert_eq!(store1.total_count(), 6);
186 assert_eq!(store1.bins.get(&5), Some(&5));
187 assert_eq!(store1.bins.get(&100), Some(&1));
188 }
189}