Skip to main content

ddsketch/agent/
sketch.rs

1//! Agent-specific DDSketch implementation.
2
3use std::cmp::Ordering;
4
5use datadog_protos::metrics::Dogsketch;
6use ordered_float::OrderedFloat;
7use smallvec::SmallVec;
8
9use super::bin::Bin;
10use super::bucket::Bucket;
11use super::config::{
12    Config, DDSKETCH_CONF_BIN_LIMIT, DDSKETCH_CONF_GAMMA_LN, DDSKETCH_CONF_GAMMA_V, DDSKETCH_CONF_NORM_BIAS,
13    DDSKETCH_CONF_NORM_MIN,
14};
15use crate::canonical::mapping::LogarithmicMapping;
16use crate::common::float_eq;
17
18static SKETCH_CONFIG: Config = Config::new(
19    DDSKETCH_CONF_BIN_LIMIT,
20    DDSKETCH_CONF_GAMMA_V,
21    DDSKETCH_CONF_GAMMA_LN,
22    DDSKETCH_CONF_NORM_MIN,
23    DDSKETCH_CONF_NORM_BIAS,
24);
25const MAX_BIN_WIDTH: u32 = u32::MAX;
26
27/// [DDSketch][ddsketch] implementation based on the [Datadog Agent][ddagent].
28///
29/// This implementation is subtly different from the open-source implementations of `DDSketch`, as Datadog made some
30/// slight tweaks to configuration values and in-memory layout to optimize it for insertion performance within the
31/// agent.
32///
33/// We've mimicked the agent version of `DDSketch` here in order to support a future where we can take sketches shipped
34/// by the agent, handle them internally, merge them, and so on, without any loss of accuracy, eventually forwarding
35/// them to Datadog ourselves.
36///
37/// As such, this implementation is constrained in the same ways: the configuration parameters cannot be changed, the
38/// collapsing strategy is fixed, and we support a limited number of methods for inserting into the sketch.
39///
40/// Importantly, we have a special function, again taken from the agent version, to allow us to interpolate histograms,
41/// specifically our own aggregated histograms, into a sketch so that we can emit useful default quantiles, rather than
42/// having to ship the buckets -- upper bound and count -- to a downstream system that might have no native way to do
43/// the same thing, basically providing no value as they have no way to render useful data from them.
44///
45/// # Features
46///
47/// This crate exposes a single feature, `serde`, which enables serialization and deserialization of `DDSketch` with
48/// `serde`. This feature is not enabled by default, as it can be slightly risky to use. This is primarily due to the
49/// fact that the format of `DDSketch` is not promised to be stable over time. If you enable this feature, you should
50/// take care to avoid storing serialized `DDSketch` data for long periods of time, as deserializing it in the future
51/// may work but could lead to incorrect/unexpected behavior or issues with correctness.
52///
53/// [ddsketch]: https://www.vldb.org/pvldb/vol12/p2195-masson.pdf
54/// [ddagent]: https://github.com/DataDog/datadog-agent
55#[derive(Clone, Debug)]
56#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
57pub struct DDSketch {
58    /// The bins within the sketch.
59    bins: SmallVec<[Bin; 4]>,
60
61    /// The number of observations within the sketch.
62    count: u64,
63
64    /// The minimum value of all observations within the sketch.
65    min: f64,
66
67    /// The maximum value of all observations within the sketch.
68    max: f64,
69
70    /// The sum of all observations within the sketch.
71    sum: f64,
72
73    /// The average value of all observations within the sketch.
74    avg: f64,
75}
76
77impl DDSketch {
78    /// Returns the canonical DDSketch mapping that aligns with the agent sketch's key space.
79    pub fn remap_mapping() -> LogarithmicMapping {
80        LogarithmicMapping::new_with_gamma_and_offset(DDSKETCH_CONF_GAMMA_V, f64::from(DDSKETCH_CONF_NORM_BIAS) + 0.5)
81            .expect("agent sketch gamma and offset are always valid")
82    }
83
84    /// Returns the representative value for the given agent sketch key.
85    pub fn value_for_key(key: i16) -> f64 {
86        SKETCH_CONFIG.bin_lower_bound(key)
87    }
88
89    /// Returns the number of bins in the sketch.
90    pub fn bin_count(&self) -> usize {
91        self.bins.len()
92    }
93
94    /// Whether or not this sketch is empty.
95    pub fn is_empty(&self) -> bool {
96        self.count == 0
97    }
98
99    /// Number of samples currently represented by this sketch.
100    pub fn count(&self) -> u64 {
101        self.count
102    }
103
104    /// Overrides the sample count tracked by this sketch.
105    pub fn set_count(&mut self, count: u64) {
106        self.count = count;
107    }
108
109    /// Overrides the sum of all values tracked by this sketch.
110    pub fn set_sum(&mut self, sum: f64) {
111        self.sum = sum;
112    }
113
114    /// Overrides the average value tracked by this sketch.
115    pub fn set_avg(&mut self, avg: f64) {
116        self.avg = avg;
117    }
118
119    /// Overrides the minimum value tracked by this sketch.
120    pub fn set_min(&mut self, min: f64) {
121        self.min = min;
122    }
123
124    /// Overrides the maximum value tracked by this sketch.
125    pub fn set_max(&mut self, max: f64) {
126        self.max = max;
127    }
128
129    /// Minimum value seen by this sketch.
130    ///
131    /// Returns `None` if the sketch is empty.
132    pub fn min(&self) -> Option<f64> {
133        if self.is_empty() {
134            None
135        } else {
136            Some(self.min)
137        }
138    }
139
140    /// Maximum value seen by this sketch.
141    ///
142    /// Returns `None` if the sketch is empty.
143    pub fn max(&self) -> Option<f64> {
144        if self.is_empty() {
145            None
146        } else {
147            Some(self.max)
148        }
149    }
150
151    /// Sum of all values seen by this sketch.
152    ///
153    /// Returns `None` if the sketch is empty.
154    pub fn sum(&self) -> Option<f64> {
155        if self.is_empty() {
156            None
157        } else {
158            Some(self.sum)
159        }
160    }
161
162    /// Average value seen by this sketch.
163    ///
164    /// Returns `None` if the sketch is empty.
165    pub fn avg(&self) -> Option<f64> {
166        if self.is_empty() {
167            None
168        } else {
169            Some(self.avg)
170        }
171    }
172
173    /// Returns the current bins of this sketch.
174    pub fn bins(&self) -> &[Bin] {
175        &self.bins
176    }
177
178    /// Clears the sketch, removing all bins and resetting all statistics.
179    pub fn clear(&mut self) {
180        self.count = 0;
181        self.min = f64::MAX;
182        self.max = f64::MIN;
183        self.avg = 0.0;
184        self.sum = 0.0;
185        self.bins.clear();
186    }
187
188    fn adjust_basic_stats(&mut self, v: f64, n: u64) {
189        if v < self.min {
190            self.min = v;
191        }
192
193        if v > self.max {
194            self.max = v;
195        }
196
197        self.count += n;
198        self.sum += v * n as f64;
199
200        if n == 1 {
201            self.avg += (v - self.avg) / self.count as f64;
202        } else {
203            // TODO: From the Agent source code, this method apparently loses precision when the
204            // two averages -- v and self.avg -- are close.  Is there a better approach?
205            self.avg = self.avg + (v - self.avg) * n as f64 / self.count as f64;
206        }
207    }
208
209    fn insert_key_counts(&mut self, counts: &[(i16, u64)]) {
210        let mut temp = SmallVec::<[Bin; 4]>::new();
211
212        let mut bins_idx = 0;
213        let mut key_idx = 0;
214        let bins_len = self.bins.len();
215        let counts_len = counts.len();
216
217        // PERF TODO: there's probably a fast path to be had where could check if all if the counts have existing bins
218        // that aren't yet full, and we just update them directly, although we'd still be doing a linear scan to find
219        // them since keys aren't 1:1 with their position in `self.bins` but using this method just to update one or two
220        // bins is clearly suboptimal and we wouldn't really want to scan them all just to have to back out and actually
221        // do the non-fast path.. maybe a first pass could be checking if the first/last key falls within our known
222        // min/max key, and if it doesn't, then we know we have to go through the non-fast path, and if it passes, we do
223        // the scan to see if we can just update bins directly?
224        while bins_idx < bins_len && key_idx < counts_len {
225            let bin = self.bins[bins_idx];
226            let vk = counts[key_idx].0;
227            let kn = counts[key_idx].1;
228
229            match bin.k.cmp(&vk) {
230                Ordering::Greater => {
231                    generate_bins(&mut temp, vk, kn);
232                    key_idx += 1;
233                }
234                Ordering::Less => {
235                    temp.push(bin);
236                    bins_idx += 1;
237                }
238                Ordering::Equal => {
239                    generate_bins(&mut temp, bin.k, u64::from(bin.n) + kn);
240                    bins_idx += 1;
241                    key_idx += 1;
242                }
243            }
244        }
245
246        temp.extend_from_slice(&self.bins[bins_idx..]);
247
248        while key_idx < counts_len {
249            let vk = counts[key_idx].0;
250            let kn = counts[key_idx].1;
251            generate_bins(&mut temp, vk, kn);
252            key_idx += 1;
253        }
254
255        trim_left(&mut temp, SKETCH_CONFIG.bin_limit);
256
257        // PERF TODO: This is where we might do a mem::swap instead so that we could shove the bin vector into an object
258        // pool but I'm not sure this actually matters at the moment.
259        self.bins = temp;
260    }
261
262    fn insert_keys(&mut self, mut keys: Vec<i16>) {
263        // Updating more than 4 billion keys would be very very weird and likely indicative of something horribly
264        // broken.
265        //
266        // TODO: I don't actually understand why I wrote this assertion in this way. Either the code can handle
267        // collapsing values in order to maintain the relative error bounds, or we have to cap it to the maximum allowed
268        // number of bins. Gotta think about this some more.
269        assert!(keys.len() <= u32::MAX.try_into().expect("we don't support 16-bit systems"));
270
271        keys.sort_unstable();
272
273        let mut temp = SmallVec::<[Bin; 4]>::new();
274
275        let mut bins_idx = 0;
276        let mut key_idx = 0;
277        let bins_len = self.bins.len();
278        let keys_len = keys.len();
279
280        // PERF TODO: there's probably a fast path to be had where could check if all if the counts have existing bins
281        // that aren't yet full, and we just update them directly, although we'd still be doing a linear scan to find
282        // them since keys aren't 1:1 with their position in `self.bins` but using this method just to update one or two
283        // bins is clearly suboptimal and we wouldn't really want to scan them all just to have to back out and actually
284        // do the non-fast path.. maybe a first pass could be checking if the first/last key falls within our known
285        // min/max key, and if it doesn't, then we know we have to go through the non-fast path, and if it passes, we do
286        // the scan to see if we can just update bins directly?
287        while bins_idx < bins_len && key_idx < keys_len {
288            let bin = self.bins[bins_idx];
289            let vk = keys[key_idx];
290
291            match bin.k.cmp(&vk) {
292                Ordering::Greater => {
293                    let kn = buf_count_leading_equal(&keys, key_idx);
294                    generate_bins(&mut temp, vk, kn);
295                    key_idx += kn as usize;
296                }
297                Ordering::Less => {
298                    temp.push(bin);
299                    bins_idx += 1;
300                }
301                Ordering::Equal => {
302                    let kn = buf_count_leading_equal(&keys, key_idx);
303                    generate_bins(&mut temp, bin.k, u64::from(bin.n) + kn);
304                    bins_idx += 1;
305                    key_idx += kn as usize;
306                }
307            }
308        }
309
310        temp.extend_from_slice(&self.bins[bins_idx..]);
311
312        while key_idx < keys_len {
313            let vk = keys[key_idx];
314            let kn = buf_count_leading_equal(&keys, key_idx);
315            generate_bins(&mut temp, vk, kn);
316            key_idx += kn as usize;
317        }
318
319        trim_left(&mut temp, SKETCH_CONFIG.bin_limit);
320
321        // PERF TODO: This is where we might do a mem::swap instead so that we could shove the bin vector into an object
322        // pool but I'm not sure this actually matters at the moment.
323        self.bins = temp;
324    }
325
326    /// Inserts a single value into the sketch.
327    pub fn insert(&mut self, v: f64) {
328        // TODO: This should return a result that makes sure we have enough room to actually add 1 more sample without
329        // hitting `self.config.max_count()`
330        self.adjust_basic_stats(v, 1);
331
332        let key = SKETCH_CONFIG.key(v);
333
334        let mut insert_at = None;
335
336        for (bin_idx, b) in self.bins.iter_mut().enumerate() {
337            if b.k == key {
338                if b.n < MAX_BIN_WIDTH {
339                    // Fast path for adding to an existing bin without overflow.
340                    b.n += 1;
341                    return;
342                } else {
343                    insert_at = Some(bin_idx);
344                    break;
345                }
346            }
347            if b.k > key {
348                insert_at = Some(bin_idx);
349                break;
350            }
351        }
352
353        if let Some(bin_idx) = insert_at {
354            self.bins.insert(bin_idx, Bin { k: key, n: 1 });
355        } else {
356            self.bins.push(Bin { k: key, n: 1 });
357        }
358        trim_left(&mut self.bins, SKETCH_CONFIG.bin_limit);
359    }
360
361    /// Inserts many values into the sketch.
362    pub fn insert_many(&mut self, vs: &[f64]) {
363        // TODO: This should return a result that makes sure we have enough room to actually add N more samples without
364        // hitting `self.config.bin_limit`.
365        let mut keys = Vec::with_capacity(vs.len());
366        for v in vs {
367            self.adjust_basic_stats(*v, 1);
368            keys.push(SKETCH_CONFIG.key(*v));
369        }
370        self.insert_keys(keys);
371    }
372
373    /// Inserts a single value into the sketch `n` times.
374    pub fn insert_n(&mut self, v: f64, n: u64) {
375        // TODO: This should return a result that makes sure we have enough room to actually add N more samples without
376        // hitting `self.config.max_count()`.
377        if n == 1 {
378            self.insert(v);
379        } else {
380            self.adjust_basic_stats(v, n);
381
382            let key = SKETCH_CONFIG.key(v);
383            self.insert_key_counts(&[(key, n)]);
384        }
385    }
386
387    fn insert_interpolate_bucket(&mut self, lower: f64, upper: f64, count: u64) {
388        // Find the keys for the bins where the lower bound and upper bound would end up, and collect all of the keys in
389        // between, inclusive.
390        let lower_key = SKETCH_CONFIG.key(lower);
391        let upper_key = SKETCH_CONFIG.key(upper);
392        let keys = (lower_key..=upper_key).collect::<Vec<_>>();
393
394        let mut key_counts = Vec::new();
395        let mut remaining_count = count;
396        let distance = upper - lower;
397        let mut start_idx = 0;
398        let mut end_idx = 1;
399        let mut lower_bound = SKETCH_CONFIG.bin_lower_bound(keys[start_idx]);
400        let mut remainder = 0.0;
401
402        while end_idx < keys.len() && remaining_count > 0 {
403            // For each key, map the total distance between the input lower/upper bound against the sketch lower/upper
404            // bound for the current sketch bin, which tells us how much of the input count to apply to the current
405            // sketch bin.
406            let upper_bound = SKETCH_CONFIG.bin_lower_bound(keys[end_idx]);
407            let fkn = ((upper_bound - lower_bound) / distance) * count as f64;
408            if fkn > 1.0 {
409                remainder += fkn - fkn.trunc();
410            }
411
412            // SAFETY: This integer cast is intentional: we want to get the non-fractional part, as we've captured the
413            // fractional part in the above conditional.
414            #[allow(clippy::cast_possible_truncation)]
415            let mut kn = fkn as u64;
416            if remainder > 1.0 {
417                kn += 1;
418                remainder -= 1.0;
419            }
420
421            if kn > 0 {
422                if kn > remaining_count {
423                    kn = remaining_count;
424                }
425
426                self.adjust_basic_stats(lower_bound, kn);
427                key_counts.push((keys[start_idx], kn));
428
429                remaining_count -= kn;
430                start_idx = end_idx;
431                lower_bound = upper_bound;
432            }
433
434            end_idx += 1;
435        }
436
437        if remaining_count > 0 {
438            let last_key = keys[start_idx];
439            lower_bound = SKETCH_CONFIG.bin_lower_bound(last_key);
440            self.adjust_basic_stats(lower_bound, remaining_count);
441            key_counts.push((last_key, remaining_count));
442        }
443
444        // Sort the key counts first, as that's required by `insert_key_counts`.
445        key_counts.sort_unstable_by(|(k1, _), (k2, _)| k1.cmp(k2));
446
447        self.insert_key_counts(&key_counts);
448    }
449
450    /// Inserts histogram buckets into the sketch via linear interpolation.
451    ///
452    /// ## Errors
453    ///
454    /// Returns an error if a bucket size is greater that `u32::MAX`.
455    pub fn insert_interpolate_buckets(&mut self, mut buckets: Vec<Bucket>) -> Result<(), &'static str> {
456        // Buckets need to be sorted from lowest to highest so that we can properly calculate the rolling lower/upper
457        // bounds.
458        buckets.sort_by(|a, b| {
459            let oa = OrderedFloat(a.upper_limit);
460            let ob = OrderedFloat(b.upper_limit);
461
462            oa.cmp(&ob)
463        });
464
465        let mut lower = f64::NEG_INFINITY;
466
467        if buckets.iter().any(|bucket| bucket.count > u64::from(u32::MAX)) {
468            return Err("bucket size greater than u32::MAX");
469        }
470
471        for bucket in buckets {
472            let mut upper = bucket.upper_limit;
473            if upper.is_sign_positive() && upper.is_infinite() {
474                upper = lower;
475            } else if lower.is_sign_negative() && lower.is_infinite() {
476                lower = upper;
477            }
478
479            self.insert_interpolate_bucket(lower, upper, bucket.count);
480            lower = bucket.upper_limit;
481        }
482
483        Ok(())
484    }
485
486    /// Adds a bin directly into the sketch.
487    ///
488    /// Used only for unit testing so that we can create a sketch with an exact layout, which allows testing around the
489    /// resulting bins when feeding in specific values, as well as generating explicitly bad layouts for testing.
490    #[allow(dead_code)]
491    pub(crate) fn insert_raw_bin(&mut self, k: i16, n: u32) {
492        let v = SKETCH_CONFIG.bin_lower_bound(k);
493        self.adjust_basic_stats(v, u64::from(n));
494        self.bins.push(Bin { k, n });
495    }
496
497    /// Gets the value at a given quantile.
498    pub fn quantile(&self, q: f64) -> Option<f64> {
499        if self.count == 0 {
500            return None;
501        }
502
503        if q <= 0.0 {
504            return Some(self.min);
505        }
506
507        if q >= 1.0 {
508            return Some(self.max);
509        }
510
511        let mut n = 0.0;
512        let mut estimated = None;
513        let wanted_rank = rank(self.count, q);
514
515        for (i, bin) in self.bins.iter().enumerate() {
516            n += f64::from(bin.n);
517            if n <= wanted_rank {
518                continue;
519            }
520
521            let weight = (n - wanted_rank) / f64::from(bin.n);
522            let mut v_low = SKETCH_CONFIG.bin_lower_bound(bin.k);
523            let mut v_high = v_low * SKETCH_CONFIG.gamma_v;
524
525            if i == self.bins.len() {
526                v_high = self.max;
527            } else if i == 0 {
528                v_low = self.min;
529            }
530
531            estimated = Some(v_low * weight + v_high * (1.0 - weight));
532            break;
533        }
534
535        estimated.map(|v| v.clamp(self.min, self.max)).or(Some(f64::NAN))
536    }
537
538    /// Merges another sketch into this sketch, without a loss of accuracy.
539    ///
540    /// All samples present in the other sketch will be correctly represented in this sketch, and summary statistics
541    /// such as the sum, average, count, min, and max, will represent the sum of samples from both sketches.
542    pub fn merge(&mut self, other: &DDSketch) {
543        // Merge the basic statistics together.
544        self.count += other.count;
545        if other.max > self.max {
546            self.max = other.max;
547        }
548        if other.min < self.min {
549            self.min = other.min;
550        }
551        self.sum += other.sum;
552        self.avg = self.avg + (other.avg - self.avg) * other.count as f64 / self.count as f64;
553
554        // Now merge the bins.
555        let mut temp = SmallVec::<[Bin; 4]>::new();
556
557        let mut bins_idx = 0;
558        for other_bin in &other.bins {
559            let start = bins_idx;
560            while bins_idx < self.bins.len() && self.bins[bins_idx].k < other_bin.k {
561                bins_idx += 1;
562            }
563
564            temp.extend_from_slice(&self.bins[start..bins_idx]);
565
566            if bins_idx >= self.bins.len() || self.bins[bins_idx].k > other_bin.k {
567                temp.push(*other_bin);
568            } else if self.bins[bins_idx].k == other_bin.k {
569                generate_bins(
570                    &mut temp,
571                    other_bin.k,
572                    u64::from(other_bin.n) + u64::from(self.bins[bins_idx].n),
573                );
574                bins_idx += 1;
575            }
576        }
577
578        temp.extend_from_slice(&self.bins[bins_idx..]);
579        trim_left(&mut temp, SKETCH_CONFIG.bin_limit);
580
581        self.bins = temp;
582    }
583
584    /// Merges this sketch into the `Dogsketch` Protocol Buffers representation.
585    pub fn merge_to_dogsketch(&self, dogsketch: &mut Dogsketch) {
586        dogsketch.set_cnt(i64::try_from(self.count).unwrap_or(i64::MAX));
587        dogsketch.set_min(self.min);
588        dogsketch.set_max(self.max);
589        dogsketch.set_avg(self.avg);
590        dogsketch.set_sum(self.sum);
591
592        let mut k = Vec::new();
593        let mut n = Vec::new();
594
595        for bin in &self.bins {
596            k.push(i32::from(bin.k));
597            n.push(bin.n);
598        }
599
600        dogsketch.set_k(k);
601        dogsketch.set_n(n);
602    }
603}
604
605impl PartialEq for DDSketch {
606    fn eq(&self, other: &Self) -> bool {
607        // We skip checking the configuration because we don't allow creating configurations by hand, and it's always
608        // locked to the constants used by the Datadog Agent.  We only check the configuration equality manually in
609        // `DDSketch::merge`, to protect ourselves in the future if different configurations become allowed.
610        //
611        // Additionally, we also use floating-point-specific relative comparisons for sum/avg because they can be
612        // minimally different between sketches purely due to floating-point behavior, despite being fed the same exact
613        // data in terms of recorded samples.
614        self.count == other.count
615            && float_eq(self.min, other.min)
616            && float_eq(self.max, other.max)
617            && float_eq(self.sum, other.sum)
618            && float_eq(self.avg, other.avg)
619            && self.bins == other.bins
620    }
621}
622
623impl Default for DDSketch {
624    fn default() -> Self {
625        Self {
626            bins: SmallVec::new(),
627            count: 0,
628            min: f64::MAX,
629            max: f64::MIN,
630            sum: 0.0,
631            avg: 0.0,
632        }
633    }
634}
635
636impl Eq for DDSketch {}
637
638impl TryFrom<Dogsketch> for DDSketch {
639    type Error = &'static str;
640
641    fn try_from(value: Dogsketch) -> Result<Self, Self::Error> {
642        let mut sketch = DDSketch {
643            count: u64::try_from(value.cnt).map_err(|_| "sketch count overflows u64 or is negative")?,
644            min: value.min,
645            max: value.max,
646            avg: value.avg,
647            sum: value.sum,
648            ..Default::default()
649        };
650
651        let k = value.k;
652        let n = value.n;
653
654        if k.len() != n.len() {
655            return Err("k and n bin vectors have differing lengths");
656        }
657
658        for (k, n) in k.into_iter().zip(n.into_iter()) {
659            let k = i16::try_from(k).map_err(|_| "bin key overflows i16")?;
660
661            sketch.bins.push(Bin { k, n });
662        }
663
664        Ok(sketch)
665    }
666}
667
668fn rank(count: u64, q: f64) -> f64 {
669    let rank = q * (count - 1) as f64;
670    rank.round_ties_even()
671}
672
673#[allow(clippy::cast_possible_truncation)]
674fn buf_count_leading_equal(keys: &[i16], start_idx: usize) -> u64 {
675    if start_idx == keys.len() - 1 {
676        return 1;
677    }
678
679    let mut idx = start_idx;
680    while idx < keys.len() && keys[idx] == keys[start_idx] {
681        idx += 1;
682    }
683
684    // SAFETY: We limit the size of the vector (used to provide the slice given to us here) to be no larger than 2^32,
685    // so we can't exceed u64 here.
686    (idx - start_idx) as u64
687}
688
689fn trim_left(bins: &mut SmallVec<[Bin; 4]>, bin_limit: u16) {
690    // We won't ever support Vector running on anything other than a 32-bit platform and above, I imagine, so this
691    // should always be safe.
692    let bin_limit = bin_limit as usize;
693    if bin_limit == 0 || bins.len() <= bin_limit {
694        return;
695    }
696
697    let num_to_remove = bins.len() - bin_limit;
698    let mut missing: u64 = 0;
699
700    // Sum all mass from the bins being removed. Per CollapsingLowestDenseStore in sketches-go,
701    // all removed mass collapses into the first kept bin (the new minimum index).
702    for bin in bins.iter().take(num_to_remove) {
703        missing += u64::from(bin.n);
704    }
705
706    // Fold the accumulated mass into the first kept bin, matching Go's `bins[newMinIndex] += n`.
707    // Any remainder that overflows u32::MAX is discarded — this requires >4B observations in a
708    // single collapsed bin and is an intentional divergence from the Datadog Agent (which uses
709    // float64 counts and never loses mass).
710    bins[num_to_remove].increment(missing);
711
712    // Drop the removed prefix, leaving exactly bin_limit bins.
713    bins.drain(0..num_to_remove);
714}
715
716#[allow(clippy::cast_possible_truncation)]
717fn generate_bins(bins: &mut SmallVec<[Bin; 4]>, k: i16, n: u64) {
718    if n < u64::from(MAX_BIN_WIDTH) {
719        // SAFETY: `n < MAX_BIN_WIDTH = u32::MAX`, so it fits in u32.
720        bins.push(Bin { k, n: n as u32 });
721    } else {
722        let overflow = n % u64::from(MAX_BIN_WIDTH);
723        if overflow != 0 {
724            bins.push(Bin {
725                k,
726                // SAFETY: `overflow = n % u32::MAX`, so overflow <= u32::MAX - 1, which fits in u32.
727                n: overflow as u32,
728            });
729        }
730
731        for _ in 0..(n / u64::from(MAX_BIN_WIDTH)) {
732            bins.push(Bin { k, n: MAX_BIN_WIDTH });
733        }
734    }
735}
736
737#[cfg(test)]
738mod tests {
739    use super::*;
740
741    // Helper: build a SmallVec<[Bin; 4]> from (k, n) pairs. Assumes input is already sorted by k.
742    fn make_bins(pairs: &[(i16, u32)]) -> SmallVec<[Bin; 4]> {
743        pairs.iter().map(|&(k, n)| Bin { k, n }).collect()
744    }
745
746    // Helper: extract (k, n) pairs from a SmallVec for easy assertion.
747    fn to_pairs(bins: &SmallVec<[Bin; 4]>) -> Vec<(i16, u32)> {
748        bins.iter().map(|b| (b.k, b.n)).collect()
749    }
750
751    /// Basic collapse: when bins exceed the limit, the mass from removed bins is merged
752    /// into the first kept bin (lowest surviving key). This mirrors the CollapsingLowestDenseStore
753    /// semantics from sketches-go: all bins with index < (maxIndex - limit + 1) collapse into
754    /// the bin at (maxIndex - limit + 1).
755    ///
756    /// Input:  [(0,2), (1,3), (2,4), (3,5)]  limit=2  →  remove 2 bins
757    /// missing = n[0] + n[1] = 2 + 3 = 5
758    /// first kept bin: k=2, n=4 → increment by 5 → n=9
759    /// Result: [(2,9), (3,5)]
760    #[test]
761    fn trim_left_collapses_removed_mass_into_first_kept_bin() {
762        let mut bins = make_bins(&[(0, 2), (1, 3), (2, 4), (3, 5)]);
763        trim_left(&mut bins, 2);
764        assert_eq!(to_pairs(&bins), vec![(2, 9), (3, 5)]);
765    }
766
767    /// Total count is preserved exactly when the collapse fits within a single u32 bin.
768    ///
769    /// Input:  [(10,5), (20,3), (30,7)]  limit=2  →  remove 1 bin
770    /// missing = 5; first kept bin k=20, n=3 → n=8
771    /// Result: [(20,8), (30,7)], total=15 == 5+3+7
772    #[test]
773    fn trim_left_preserves_total_count_when_no_overflow() {
774        let mut bins = make_bins(&[(10, 5), (20, 3), (30, 7)]);
775        let total_before: u64 = bins.iter().map(|b| u64::from(b.n)).sum();
776        trim_left(&mut bins, 2);
777        let total_after: u64 = bins.iter().map(|b| u64::from(b.n)).sum();
778        assert_eq!(to_pairs(&bins), vec![(20, 8), (30, 7)]);
779        assert_eq!(total_before, total_after);
780    }
781
782    /// With u32 bin counts, collapsed mass from multiple removed bins fits in a single bin
783    /// without saturation for typical weights, fully preserving the total count.
784    ///
785    /// Input:  [(0,50000), (1,50000), (2,1)]  limit=1  →  remove 2 bins
786    /// missing = 100000; bins[2].increment(100000): 100001 < u32::MAX → n=100001
787    /// bins.drain(0..2). Final: [(2,100001)], all mass preserved.
788    ///
789    /// With the old u16 layout, this same input would have saturated at 65535 and discarded
790    /// 34466 observations. u32 eliminates that loss for any per-bin count below ~4.3 billion.
791    #[test]
792    fn trim_left_preserves_exact_count_with_u32_bins() {
793        let mut bins = make_bins(&[(0, 50000), (1, 50000), (2, 1)]);
794        let total_before: u64 = bins.iter().map(|b| u64::from(b.n)).sum();
795        trim_left(&mut bins, 1);
796        let total_after: u64 = bins.iter().map(|b| u64::from(b.n)).sum();
797        assert_eq!(bins.len(), 1);
798        assert_eq!(bins[0].k, 2);
799        assert_eq!(bins[0].n, 100001);
800        assert_eq!(total_before, total_after, "all mass must be preserved with u32 bins");
801    }
802
803    /// When already at or under the limit, trim_left is a no-op.
804    #[test]
805    fn trim_left_no_op_when_within_limit() {
806        let original = make_bins(&[(5, 10), (6, 20)]);
807        let mut bins = original.clone();
808        trim_left(&mut bins, 2);
809        assert_eq!(to_pairs(&bins), to_pairs(&original));
810        trim_left(&mut bins, 3);
811        assert_eq!(to_pairs(&bins), to_pairs(&original));
812    }
813
814    /// Regression test for trim_left bin count with large per-sample weights.
815    ///
816    /// With the old u16 layout, a sample weight of ~260M would generate ceil(260M / 65535) = 3969
817    /// bins per key, causing bin count explosion and an encoder panic. With u32, the same weight
818    /// fits in a single bin (260M < u32::MAX ≈ 4.3B), so one insert_n call produces exactly one
819    /// bin per key and the bin limit is trivially respected.
820    ///
821    /// This test inserts several values with a weight representative of what ADP receives when
822    /// clamping an incoming sample rate of 3e-9 to its minimum of 3.845e-9 (~260M per sample),
823    /// then asserts the bin count never exceeds DDSKETCH_CONF_BIN_LIMIT.
824    #[test]
825    fn trim_left_respects_bin_limit_with_large_weights() {
826        // Weight corresponding to ADP's minimum safe sample rate (1 / 3.845e-9 ≈ 260_078_024).
827        // With u32 bins, this fits in a single bin per key (260_078_024 < u32::MAX).
828        let weight: u64 = 260_078_024;
829        let bin_limit = usize::from(DDSKETCH_CONF_BIN_LIMIT);
830
831        let mut sketch = DDSketch::default();
832
833        // Insert enough distinct values to repeatedly trigger trim_left.  Ten values is more
834        // than sufficient; two already exceed the bin limit without the fix.
835        for i in 1..=10_i32 {
836            sketch.insert_n(f64::from(i), weight);
837            assert!(
838                sketch.bins().len() <= bin_limit,
839                "bin count {} exceeded limit {} after inserting {} value(s) at weight {}",
840                sketch.bins().len(),
841                bin_limit,
842                i,
843                weight,
844            );
845        }
846    }
847
848    /// When the accumulated missing mass plus the first kept bin's existing count exceeds
849    /// u32::MAX, increment saturates at u32::MAX and the remainder is discarded.
850    ///
851    /// Input:  [(0, u32::MAX), (1, 1)]  limit=1  →  remove 1 bin
852    /// missing = u32::MAX; bins[1].increment(u32::MAX): next = u32::MAX+1 > u32::MAX
853    /// → n = u32::MAX, remainder = 1 (discarded)
854    /// Final: [(1, u32::MAX)] — 1 observation lost
855    #[test]
856    fn trim_left_saturates_first_kept_bin_and_discards_remainder() {
857        let mut bins = make_bins(&[(0, u32::MAX), (1, 1)]);
858        trim_left(&mut bins, 1);
859        assert_eq!(bins.len(), 1);
860        assert_eq!(bins[0].k, 1);
861        assert_eq!(bins[0].n, u32::MAX);
862    }
863
864    /// Collapsing works correctly with negative bin keys. The highest key always survives;
865    /// lower (more negative) keys collapse into the first kept key.
866    /// Adapted from TestAddIntDatasets in sketches-go which covers negative indices.
867    ///
868    /// Input:  [(-3,5), (-2,3), (-1,2), (0,1)]  limit=2  →  remove 2 bins
869    /// missing = 5+3=8; bins[-1].increment(8): 2+8=10 < u32::MAX → n=10
870    /// Final: [(-1,10), (0,1)]
871    #[test]
872    fn trim_left_collapses_negative_keys_correctly() {
873        let mut bins = make_bins(&[(-3, 5), (-2, 3), (-1, 2), (0, 1)]);
874        trim_left(&mut bins, 2);
875        assert_eq!(to_pairs(&bins), vec![(-1, 10), (0, 1)]);
876    }
877
878    /// Monotonic ascending sequence: inserting keys 0..N where N > limit collapses the
879    /// lowest keys into the first kept key, with their total mass summed there.
880    /// Adapted from TestAddMonotonous in sketches-go.
881    ///
882    /// Input:  [(0,1),(1,1),(2,1),(3,1),(4,1),(5,1),(6,1),(7,1),(8,1),(9,1)]  limit=4
883    /// num_to_remove=6; missing=6; bins[6].increment(6): 1+6=7 → n=7
884    /// Final: [(6,7),(7,1),(8,1),(9,1)]  — only top 4 keys kept, collapsed mass in first
885    #[test]
886    fn trim_left_monotonic_ascending_keeps_top_keys() {
887        let pairs: Vec<(i16, u32)> = (0..10).map(|i| (i, 1)).collect();
888        let mut bins = make_bins(&pairs);
889        trim_left(&mut bins, 4);
890        assert_eq!(to_pairs(&bins), vec![(6, 7), (7, 1), (8, 1), (9, 1)]);
891    }
892}
893
894/// Property-based tests for trim_left, adapted from TestAddFuzzy / TestAddIntFuzzy /
895/// TestMergeFuzzy in sketches-go/ddsketch/store/store_test.go.
896///
897/// The sketches-go suite generates random (index, count) inputs, passes them through
898/// CollapsingLowestDenseStore, and asserts structural invariants using the `collapsingLowest`
899/// oracle transform. We do the same here: generate random sorted distinct-key bins, run
900/// trim_left, and check the same invariants.
901#[cfg(test)]
902mod property_tests {
903    use proptest::prelude::*;
904
905    use super::*;
906
907    /// Strategy: a non-empty sorted vec of distinct (k: i16, n: u32) pairs.
908    /// Keys are drawn from i16 without repetition; counts are 1..=u32::MAX.
909    fn arb_bins(max_len: usize) -> impl Strategy<Value = SmallVec<[Bin; 4]>> {
910        proptest::collection::btree_map(any::<i16>(), 1u32..=u32::MAX, 1..=max_len)
911            .prop_map(|map| map.into_iter().map(|(k, n)| Bin { k, n }).collect())
912    }
913
914    /// Strategy: a bin_limit in 1..=32 (small enough to exercise collapsing frequently).
915    fn arb_limit() -> impl Strategy<Value = u16> {
916        1u16..=32
917    }
918
919    proptest! {
920        /// After trim_left, the bin count must never exceed bin_limit.
921        ///
922        /// Mirrors the core bin-count invariant checked throughout the sketches-go suite:
923        /// every store operation must leave the store within its configured capacity.
924        #[test]
925        fn prop_bin_count_never_exceeds_limit(
926            mut bins in arb_bins(64),
927            limit in arb_limit(),
928        ) {
929            trim_left(&mut bins, limit);
930            prop_assert!(
931                bins.len() <= limit as usize,
932                "bin count {} exceeded limit {}",
933                bins.len(),
934                limit,
935            );
936        }
937
938        /// After trim_left, bins must remain sorted by key with no duplicate keys.
939        ///
940        /// trim_left only drains a prefix and modifies bins[num_to_remove].n in place;
941        /// it must not disturb the ordering of the surviving bins.
942        #[test]
943        fn prop_output_bins_are_sorted_and_distinct(
944            mut bins in arb_bins(64),
945            limit in arb_limit(),
946        ) {
947            trim_left(&mut bins, limit);
948            for window in bins.windows(2) {
949                prop_assert!(
950                    window[0].k < window[1].k,
951                    "bins not strictly sorted after trim_left: {:?} >= {:?}",
952                    window[0].k,
953                    window[1].k,
954                );
955            }
956        }
957
958        /// When bins.len() <= limit, trim_left is a no-op: bins is unchanged.
959        #[test]
960        fn prop_no_op_when_within_limit(
961            bins in arb_bins(16),
962            extra in 0u16..=16,
963        ) {
964            let limit = bins.len() as u16 + extra;
965            let original = bins.clone();
966            let mut bins = bins;
967            trim_left(&mut bins, limit);
968            prop_assert_eq!(bins, original);
969        }
970
971        /// Total count is preserved exactly when no u32 overflow occurs.
972        ///
973        /// This is the key invariant from sketches-go's assertEncodeBins:
974        /// store.TotalCount() must equal the sum of all inserted counts.
975        /// We restrict counts to keep the collapsed sum safely below u32::MAX
976        /// (sketches-go never loses mass because it uses float64; we match that
977        /// guarantee for all inputs where the collapsed bin doesn't overflow u32).
978        ///
979        /// Strategy: counts capped at 1000 so that even if all 64 bins collapse
980        /// into one the sum (≤ 64_000) is far below u32::MAX.
981        #[test]
982        fn prop_total_count_preserved_when_no_overflow(
983            mut bins in proptest::collection::btree_map(any::<i16>(), 1u32..=1000, 1..=64usize)
984                .prop_map(|map| -> SmallVec<[Bin; 4]> {
985                    map.into_iter().map(|(k, n)| Bin { k, n }).collect()
986                }),
987            limit in arb_limit(),
988        ) {
989            let total_before: u64 = bins.iter().map(|b| u64::from(b.n)).sum();
990            trim_left(&mut bins, limit);
991            let total_after: u64 = bins.iter().map(|b| u64::from(b.n)).sum();
992            prop_assert_eq!(
993                total_before, total_after,
994                "total count changed: before={}, after={}",
995                total_before, total_after,
996            );
997        }
998
999        /// The surviving bins are always the bin_limit highest-key bins from the input.
1000        ///
1001        /// This is the direct encoding of the collapsingLowest oracle from sketches-go:
1002        /// minCollapsedIndex = maxIndex - limit + 1; all bins with key < minCollapsedIndex
1003        /// are removed (their mass folds into minCollapsedIndex). The output keys must
1004        /// exactly match the top min(len, limit) keys of the input.
1005        #[test]
1006        fn prop_output_keys_are_highest_from_input(
1007            mut bins in arb_bins(64),
1008            limit in arb_limit(),
1009        ) {
1010            let all_keys: Vec<i16> = bins.iter().map(|b| b.k).collect();
1011            let expected_len = all_keys.len().min(limit as usize);
1012            let expected_keys: Vec<i16> = all_keys[all_keys.len() - expected_len..].to_vec();
1013
1014            trim_left(&mut bins, limit);
1015
1016            let actual_keys: Vec<i16> = bins.iter().map(|b| b.k).collect();
1017            prop_assert_eq!(
1018                actual_keys, expected_keys,
1019                "output keys don't match top {} keys of input",
1020                limit,
1021            );
1022        }
1023    }
1024}