ddsketch/canonical/store/
sparse.rs

1use std::collections::BTreeMap;
2
3use datadog_protos::sketches::Store as ProtoStore;
4
5use super::{validate_proto_count, Store};
6use crate::canonical::error::ProtoConversionError;
7
8/// A sparse store using a sorted map for bin storage.
9///
10/// This store only keeps track of non-empty bins, making it memory-efficient for data with widely scattered indices.
11/// However, it does not support collapsing, so memory usage can grow unbounded.
12///
13/// Use this store when:
14/// - Input values span a wide range with gaps
15/// - You don't need bounded memory usage
16/// - You want to avoid the overhead of dense array allocation
17#[derive(Clone, Debug, Eq, PartialEq)]
18pub struct SparseStore {
19    /// The bin counts, keyed by index.
20    bins: BTreeMap<i32, u64>,
21
22    /// Total count across all bins.
23    count: u64,
24}
25
26impl SparseStore {
27    /// Creates an empty `SparseStore`.
28    pub fn new() -> Self {
29        Self {
30            bins: BTreeMap::new(),
31            count: 0,
32        }
33    }
34}
35
36impl Store for SparseStore {
37    fn add(&mut self, index: i32, count: u64) {
38        if count == 0 {
39            return;
40        }
41
42        *self.bins.entry(index).or_insert(0) += count;
43        self.count = self.count.saturating_add(count);
44    }
45
46    fn total_count(&self) -> u64 {
47        self.count
48    }
49
50    fn min_index(&self) -> Option<i32> {
51        self.bins.iter().find(|(_, &c)| c > 0).map(|(&k, _)| k)
52    }
53
54    fn max_index(&self) -> Option<i32> {
55        self.bins.iter().rev().find(|(_, &c)| c > 0).map(|(&k, _)| k)
56    }
57
58    fn key_at_rank(&self, rank: u64) -> Option<i32> {
59        if rank >= self.count {
60            return None;
61        }
62
63        let mut cumulative = 0u64;
64        for (&index, &count) in &self.bins {
65            cumulative += count;
66            if cumulative > rank {
67                return Some(index);
68            }
69        }
70        None
71    }
72
73    fn merge(&mut self, other: &Self) {
74        for (&index, &count) in &other.bins {
75            if count > 0 {
76                *self.bins.entry(index).or_insert(0) += count;
77            }
78        }
79        self.count = self.count.saturating_add(other.count);
80    }
81
82    fn is_empty(&self) -> bool {
83        self.count == 0
84    }
85
86    fn clear(&mut self) {
87        self.bins.clear();
88        self.count = 0;
89    }
90
91    fn merge_from_proto(&mut self, proto: &ProtoStore) -> Result<(), ProtoConversionError> {
92        // Process sparse binCounts
93        for (&index, &count) in &proto.binCounts {
94            let count = validate_proto_count(index, count)?;
95            if count > 0 {
96                self.add(index, count);
97            }
98        }
99
100        // Process contiguous bins
101        let offset = proto.contiguousBinIndexOffset;
102        for (i, &count) in proto.contiguousBinCounts.iter().enumerate() {
103            let index = offset + i as i32;
104            let count = validate_proto_count(index, count)?;
105            if count > 0 {
106                self.add(index, count);
107            }
108        }
109
110        Ok(())
111    }
112
113    fn to_proto(&self) -> ProtoStore {
114        let mut proto = ProtoStore::new();
115
116        // Use sparse encoding for sparse store
117        for (&index, &count) in &self.bins {
118            if count > 0 {
119                proto.binCounts.insert(index, count as f64);
120            }
121        }
122
123        proto
124    }
125}
126
127impl Default for SparseStore {
128    fn default() -> Self {
129        Self::new()
130    }
131}
132
133#[cfg(test)]
134mod tests {
135    use super::*;
136
137    #[test]
138    fn test_add_single() {
139        let mut store = SparseStore::new();
140        store.add(5, 1);
141
142        assert_eq!(store.total_count(), 1);
143        assert_eq!(store.min_index(), Some(5));
144        assert_eq!(store.max_index(), Some(5));
145    }
146
147    #[test]
148    fn test_add_widely_scattered() {
149        let mut store = SparseStore::new();
150        store.add(-1000, 1);
151        store.add(0, 2);
152        store.add(1000, 3);
153
154        assert_eq!(store.total_count(), 6);
155        assert_eq!(store.min_index(), Some(-1000));
156        assert_eq!(store.max_index(), Some(1000));
157        // Only 3 entries in the map, not 2001
158        assert_eq!(store.bins.len(), 3);
159    }
160
161    #[test]
162    fn test_key_at_rank() {
163        let mut store = SparseStore::new();
164        store.add(-10, 2);
165        store.add(10, 3);
166
167        assert_eq!(store.key_at_rank(0), Some(-10));
168        assert_eq!(store.key_at_rank(1), Some(-10));
169        assert_eq!(store.key_at_rank(2), Some(10));
170        assert_eq!(store.key_at_rank(4), Some(10));
171        assert_eq!(store.key_at_rank(5), None);
172    }
173
174    #[test]
175    fn test_merge() {
176        let mut store1 = SparseStore::new();
177        store1.add(5, 2);
178
179        let mut store2 = SparseStore::new();
180        store2.add(5, 3);
181        store2.add(100, 1);
182
183        store1.merge(&store2);
184
185        assert_eq!(store1.total_count(), 6);
186        assert_eq!(store1.bins.get(&5), Some(&5));
187        assert_eq!(store1.bins.get(&100), Some(&1));
188    }
189}