Skip to main content

ddsketch/agent/
sketch.rs

1//! Agent-specific DDSketch implementation.
2
3use std::cmp::Ordering;
4
5use datadog_protos::metrics::Dogsketch;
6use ordered_float::OrderedFloat;
7use smallvec::SmallVec;
8
9use super::bin::Bin;
10use super::bucket::Bucket;
11use super::config::{
12    Config, DDSKETCH_CONF_BIN_LIMIT, DDSKETCH_CONF_GAMMA_LN, DDSKETCH_CONF_GAMMA_V, DDSKETCH_CONF_NORM_BIAS,
13    DDSKETCH_CONF_NORM_MIN,
14};
15use crate::common::float_eq;
16
17static SKETCH_CONFIG: Config = Config::new(
18    DDSKETCH_CONF_BIN_LIMIT,
19    DDSKETCH_CONF_GAMMA_V,
20    DDSKETCH_CONF_GAMMA_LN,
21    DDSKETCH_CONF_NORM_MIN,
22    DDSKETCH_CONF_NORM_BIAS,
23);
24const MAX_BIN_WIDTH: u32 = u32::MAX;
25
26/// [DDSketch][ddsketch] implementation based on the [Datadog Agent][ddagent].
27///
28/// This implementation is subtly different from the open-source implementations of `DDSketch`, as Datadog made some
29/// slight tweaks to configuration values and in-memory layout to optimize it for insertion performance within the
30/// agent.
31///
32/// We've mimicked the agent version of `DDSketch` here in order to support a future where we can take sketches shipped
33/// by the agent, handle them internally, merge them, and so on, without any loss of accuracy, eventually forwarding
34/// them to Datadog ourselves.
35///
36/// As such, this implementation is constrained in the same ways: the configuration parameters cannot be changed, the
37/// collapsing strategy is fixed, and we support a limited number of methods for inserting into the sketch.
38///
39/// Importantly, we have a special function, again taken from the agent version, to allow us to interpolate histograms,
40/// specifically our own aggregated histograms, into a sketch so that we can emit useful default quantiles, rather than
41/// having to ship the buckets -- upper bound and count -- to a downstream system that might have no native way to do
42/// the same thing, basically providing no value as they have no way to render useful data from them.
43///
44/// # Features
45///
46/// This crate exposes a single feature, `serde`, which enables serialization and deserialization of `DDSketch` with
47/// `serde`. This feature is not enabled by default, as it can be slightly risky to use. This is primarily due to the
48/// fact that the format of `DDSketch` is not promised to be stable over time. If you enable this feature, you should
49/// take care to avoid storing serialized `DDSketch` data for long periods of time, as deserializing it in the future
50/// may work but could lead to incorrect/unexpected behavior or issues with correctness.
51///
52/// [ddsketch]: https://www.vldb.org/pvldb/vol12/p2195-masson.pdf
53/// [ddagent]: https://github.com/DataDog/datadog-agent
54#[derive(Clone, Debug)]
55#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
56pub struct DDSketch {
57    /// The bins within the sketch.
58    bins: SmallVec<[Bin; 4]>,
59
60    /// The number of observations within the sketch.
61    count: u64,
62
63    /// The minimum value of all observations within the sketch.
64    min: f64,
65
66    /// The maximum value of all observations within the sketch.
67    max: f64,
68
69    /// The sum of all observations within the sketch.
70    sum: f64,
71
72    /// The average value of all observations within the sketch.
73    avg: f64,
74}
75
76impl DDSketch {
77    /// Returns the number of bins in the sketch.
78    pub fn bin_count(&self) -> usize {
79        self.bins.len()
80    }
81
82    /// Whether or not this sketch is empty.
83    pub fn is_empty(&self) -> bool {
84        self.count == 0
85    }
86
87    /// Number of samples currently represented by this sketch.
88    pub fn count(&self) -> u64 {
89        self.count
90    }
91
92    /// Minimum value seen by this sketch.
93    ///
94    /// Returns `None` if the sketch is empty.
95    pub fn min(&self) -> Option<f64> {
96        if self.is_empty() {
97            None
98        } else {
99            Some(self.min)
100        }
101    }
102
103    /// Maximum value seen by this sketch.
104    ///
105    /// Returns `None` if the sketch is empty.
106    pub fn max(&self) -> Option<f64> {
107        if self.is_empty() {
108            None
109        } else {
110            Some(self.max)
111        }
112    }
113
114    /// Sum of all values seen by this sketch.
115    ///
116    /// Returns `None` if the sketch is empty.
117    pub fn sum(&self) -> Option<f64> {
118        if self.is_empty() {
119            None
120        } else {
121            Some(self.sum)
122        }
123    }
124
125    /// Average value seen by this sketch.
126    ///
127    /// Returns `None` if the sketch is empty.
128    pub fn avg(&self) -> Option<f64> {
129        if self.is_empty() {
130            None
131        } else {
132            Some(self.avg)
133        }
134    }
135
136    /// Returns the current bins of this sketch.
137    pub fn bins(&self) -> &[Bin] {
138        &self.bins
139    }
140
141    /// Clears the sketch, removing all bins and resetting all statistics.
142    pub fn clear(&mut self) {
143        self.count = 0;
144        self.min = f64::MAX;
145        self.max = f64::MIN;
146        self.avg = 0.0;
147        self.sum = 0.0;
148        self.bins.clear();
149    }
150
151    fn adjust_basic_stats(&mut self, v: f64, n: u64) {
152        if v < self.min {
153            self.min = v;
154        }
155
156        if v > self.max {
157            self.max = v;
158        }
159
160        self.count += n;
161        self.sum += v * n as f64;
162
163        if n == 1 {
164            self.avg += (v - self.avg) / self.count as f64;
165        } else {
166            // TODO: From the Agent source code, this method apparently loses precision when the
167            // two averages -- v and self.avg -- are close.  Is there a better approach?
168            self.avg = self.avg + (v - self.avg) * n as f64 / self.count as f64;
169        }
170    }
171
172    fn insert_key_counts(&mut self, counts: &[(i16, u64)]) {
173        let mut temp = SmallVec::<[Bin; 4]>::new();
174
175        let mut bins_idx = 0;
176        let mut key_idx = 0;
177        let bins_len = self.bins.len();
178        let counts_len = counts.len();
179
180        // PERF TODO: there's probably a fast path to be had where could check if all if the counts have existing bins
181        // that aren't yet full, and we just update them directly, although we'd still be doing a linear scan to find
182        // them since keys aren't 1:1 with their position in `self.bins` but using this method just to update one or two
183        // bins is clearly suboptimal and we wouldn't really want to scan them all just to have to back out and actually
184        // do the non-fast path.. maybe a first pass could be checking if the first/last key falls within our known
185        // min/max key, and if it doesn't, then we know we have to go through the non-fast path, and if it passes, we do
186        // the scan to see if we can just update bins directly?
187        while bins_idx < bins_len && key_idx < counts_len {
188            let bin = self.bins[bins_idx];
189            let vk = counts[key_idx].0;
190            let kn = counts[key_idx].1;
191
192            match bin.k.cmp(&vk) {
193                Ordering::Greater => {
194                    generate_bins(&mut temp, vk, kn);
195                    key_idx += 1;
196                }
197                Ordering::Less => {
198                    temp.push(bin);
199                    bins_idx += 1;
200                }
201                Ordering::Equal => {
202                    generate_bins(&mut temp, bin.k, u64::from(bin.n) + kn);
203                    bins_idx += 1;
204                    key_idx += 1;
205                }
206            }
207        }
208
209        temp.extend_from_slice(&self.bins[bins_idx..]);
210
211        while key_idx < counts_len {
212            let vk = counts[key_idx].0;
213            let kn = counts[key_idx].1;
214            generate_bins(&mut temp, vk, kn);
215            key_idx += 1;
216        }
217
218        trim_left(&mut temp, SKETCH_CONFIG.bin_limit);
219
220        // PERF TODO: This is where we might do a mem::swap instead so that we could shove the bin vector into an object
221        // pool but I'm not sure this actually matters at the moment.
222        self.bins = temp;
223    }
224
225    fn insert_keys(&mut self, mut keys: Vec<i16>) {
226        // Updating more than 4 billion keys would be very very weird and likely indicative of something horribly
227        // broken.
228        //
229        // TODO: I don't actually understand why I wrote this assertion in this way. Either the code can handle
230        // collapsing values in order to maintain the relative error bounds, or we have to cap it to the maximum allowed
231        // number of bins. Gotta think about this some more.
232        assert!(keys.len() <= u32::MAX.try_into().expect("we don't support 16-bit systems"));
233
234        keys.sort_unstable();
235
236        let mut temp = SmallVec::<[Bin; 4]>::new();
237
238        let mut bins_idx = 0;
239        let mut key_idx = 0;
240        let bins_len = self.bins.len();
241        let keys_len = keys.len();
242
243        // PERF TODO: there's probably a fast path to be had where could check if all if the counts have existing bins
244        // that aren't yet full, and we just update them directly, although we'd still be doing a linear scan to find
245        // them since keys aren't 1:1 with their position in `self.bins` but using this method just to update one or two
246        // bins is clearly suboptimal and we wouldn't really want to scan them all just to have to back out and actually
247        // do the non-fast path.. maybe a first pass could be checking if the first/last key falls within our known
248        // min/max key, and if it doesn't, then we know we have to go through the non-fast path, and if it passes, we do
249        // the scan to see if we can just update bins directly?
250        while bins_idx < bins_len && key_idx < keys_len {
251            let bin = self.bins[bins_idx];
252            let vk = keys[key_idx];
253
254            match bin.k.cmp(&vk) {
255                Ordering::Greater => {
256                    let kn = buf_count_leading_equal(&keys, key_idx);
257                    generate_bins(&mut temp, vk, kn);
258                    key_idx += kn as usize;
259                }
260                Ordering::Less => {
261                    temp.push(bin);
262                    bins_idx += 1;
263                }
264                Ordering::Equal => {
265                    let kn = buf_count_leading_equal(&keys, key_idx);
266                    generate_bins(&mut temp, bin.k, u64::from(bin.n) + kn);
267                    bins_idx += 1;
268                    key_idx += kn as usize;
269                }
270            }
271        }
272
273        temp.extend_from_slice(&self.bins[bins_idx..]);
274
275        while key_idx < keys_len {
276            let vk = keys[key_idx];
277            let kn = buf_count_leading_equal(&keys, key_idx);
278            generate_bins(&mut temp, vk, kn);
279            key_idx += kn as usize;
280        }
281
282        trim_left(&mut temp, SKETCH_CONFIG.bin_limit);
283
284        // PERF TODO: This is where we might do a mem::swap instead so that we could shove the bin vector into an object
285        // pool but I'm not sure this actually matters at the moment.
286        self.bins = temp;
287    }
288
289    /// Inserts a single value into the sketch.
290    pub fn insert(&mut self, v: f64) {
291        // TODO: This should return a result that makes sure we have enough room to actually add 1 more sample without
292        // hitting `self.config.max_count()`
293        self.adjust_basic_stats(v, 1);
294
295        let key = SKETCH_CONFIG.key(v);
296
297        let mut insert_at = None;
298
299        for (bin_idx, b) in self.bins.iter_mut().enumerate() {
300            if b.k == key {
301                if b.n < MAX_BIN_WIDTH {
302                    // Fast path for adding to an existing bin without overflow.
303                    b.n += 1;
304                    return;
305                } else {
306                    insert_at = Some(bin_idx);
307                    break;
308                }
309            }
310            if b.k > key {
311                insert_at = Some(bin_idx);
312                break;
313            }
314        }
315
316        if let Some(bin_idx) = insert_at {
317            self.bins.insert(bin_idx, Bin { k: key, n: 1 });
318        } else {
319            self.bins.push(Bin { k: key, n: 1 });
320        }
321        trim_left(&mut self.bins, SKETCH_CONFIG.bin_limit);
322    }
323
324    /// Inserts many values into the sketch.
325    pub fn insert_many(&mut self, vs: &[f64]) {
326        // TODO: This should return a result that makes sure we have enough room to actually add N more samples without
327        // hitting `self.config.bin_limit`.
328        let mut keys = Vec::with_capacity(vs.len());
329        for v in vs {
330            self.adjust_basic_stats(*v, 1);
331            keys.push(SKETCH_CONFIG.key(*v));
332        }
333        self.insert_keys(keys);
334    }
335
336    /// Inserts a single value into the sketch `n` times.
337    pub fn insert_n(&mut self, v: f64, n: u64) {
338        // TODO: This should return a result that makes sure we have enough room to actually add N more samples without
339        // hitting `self.config.max_count()`.
340        if n == 1 {
341            self.insert(v);
342        } else {
343            self.adjust_basic_stats(v, n);
344
345            let key = SKETCH_CONFIG.key(v);
346            self.insert_key_counts(&[(key, n)]);
347        }
348    }
349
350    fn insert_interpolate_bucket(&mut self, lower: f64, upper: f64, count: u64) {
351        // Find the keys for the bins where the lower bound and upper bound would end up, and collect all of the keys in
352        // between, inclusive.
353        let lower_key = SKETCH_CONFIG.key(lower);
354        let upper_key = SKETCH_CONFIG.key(upper);
355        let keys = (lower_key..=upper_key).collect::<Vec<_>>();
356
357        let mut key_counts = Vec::new();
358        let mut remaining_count = count;
359        let distance = upper - lower;
360        let mut start_idx = 0;
361        let mut end_idx = 1;
362        let mut lower_bound = SKETCH_CONFIG.bin_lower_bound(keys[start_idx]);
363        let mut remainder = 0.0;
364
365        while end_idx < keys.len() && remaining_count > 0 {
366            // For each key, map the total distance between the input lower/upper bound against the sketch lower/upper
367            // bound for the current sketch bin, which tells us how much of the input count to apply to the current
368            // sketch bin.
369            let upper_bound = SKETCH_CONFIG.bin_lower_bound(keys[end_idx]);
370            let fkn = ((upper_bound - lower_bound) / distance) * count as f64;
371            if fkn > 1.0 {
372                remainder += fkn - fkn.trunc();
373            }
374
375            // SAFETY: This integer cast is intentional: we want to get the non-fractional part, as we've captured the
376            // fractional part in the above conditional.
377            #[allow(clippy::cast_possible_truncation)]
378            let mut kn = fkn as u64;
379            if remainder > 1.0 {
380                kn += 1;
381                remainder -= 1.0;
382            }
383
384            if kn > 0 {
385                if kn > remaining_count {
386                    kn = remaining_count;
387                }
388
389                self.adjust_basic_stats(lower_bound, kn);
390                key_counts.push((keys[start_idx], kn));
391
392                remaining_count -= kn;
393                start_idx = end_idx;
394                lower_bound = upper_bound;
395            }
396
397            end_idx += 1;
398        }
399
400        if remaining_count > 0 {
401            let last_key = keys[start_idx];
402            lower_bound = SKETCH_CONFIG.bin_lower_bound(last_key);
403            self.adjust_basic_stats(lower_bound, remaining_count);
404            key_counts.push((last_key, remaining_count));
405        }
406
407        // Sort the key counts first, as that's required by `insert_key_counts`.
408        key_counts.sort_unstable_by(|(k1, _), (k2, _)| k1.cmp(k2));
409
410        self.insert_key_counts(&key_counts);
411    }
412
413    /// Inserts histogram buckets into the sketch via linear interpolation.
414    ///
415    /// ## Errors
416    ///
417    /// Returns an error if a bucket size is greater that `u32::MAX`.
418    pub fn insert_interpolate_buckets(&mut self, mut buckets: Vec<Bucket>) -> Result<(), &'static str> {
419        // Buckets need to be sorted from lowest to highest so that we can properly calculate the rolling lower/upper
420        // bounds.
421        buckets.sort_by(|a, b| {
422            let oa = OrderedFloat(a.upper_limit);
423            let ob = OrderedFloat(b.upper_limit);
424
425            oa.cmp(&ob)
426        });
427
428        let mut lower = f64::NEG_INFINITY;
429
430        if buckets.iter().any(|bucket| bucket.count > u64::from(u32::MAX)) {
431            return Err("bucket size greater than u32::MAX");
432        }
433
434        for bucket in buckets {
435            let mut upper = bucket.upper_limit;
436            if upper.is_sign_positive() && upper.is_infinite() {
437                upper = lower;
438            } else if lower.is_sign_negative() && lower.is_infinite() {
439                lower = upper;
440            }
441
442            self.insert_interpolate_bucket(lower, upper, bucket.count);
443            lower = bucket.upper_limit;
444        }
445
446        Ok(())
447    }
448
449    /// Adds a bin directly into the sketch.
450    ///
451    /// Used only for unit testing so that we can create a sketch with an exact layout, which allows testing around the
452    /// resulting bins when feeding in specific values, as well as generating explicitly bad layouts for testing.
453    #[allow(dead_code)]
454    pub(crate) fn insert_raw_bin(&mut self, k: i16, n: u32) {
455        let v = SKETCH_CONFIG.bin_lower_bound(k);
456        self.adjust_basic_stats(v, u64::from(n));
457        self.bins.push(Bin { k, n });
458    }
459
460    /// Gets the value at a given quantile.
461    pub fn quantile(&self, q: f64) -> Option<f64> {
462        if self.count == 0 {
463            return None;
464        }
465
466        if q <= 0.0 {
467            return Some(self.min);
468        }
469
470        if q >= 1.0 {
471            return Some(self.max);
472        }
473
474        let mut n = 0.0;
475        let mut estimated = None;
476        let wanted_rank = rank(self.count, q);
477
478        for (i, bin) in self.bins.iter().enumerate() {
479            n += f64::from(bin.n);
480            if n <= wanted_rank {
481                continue;
482            }
483
484            let weight = (n - wanted_rank) / f64::from(bin.n);
485            let mut v_low = SKETCH_CONFIG.bin_lower_bound(bin.k);
486            let mut v_high = v_low * SKETCH_CONFIG.gamma_v;
487
488            if i == self.bins.len() {
489                v_high = self.max;
490            } else if i == 0 {
491                v_low = self.min;
492            }
493
494            estimated = Some(v_low * weight + v_high * (1.0 - weight));
495            break;
496        }
497
498        estimated.map(|v| v.clamp(self.min, self.max)).or(Some(f64::NAN))
499    }
500
501    /// Merges another sketch into this sketch, without a loss of accuracy.
502    ///
503    /// All samples present in the other sketch will be correctly represented in this sketch, and summary statistics
504    /// such as the sum, average, count, min, and max, will represent the sum of samples from both sketches.
505    pub fn merge(&mut self, other: &DDSketch) {
506        // Merge the basic statistics together.
507        self.count += other.count;
508        if other.max > self.max {
509            self.max = other.max;
510        }
511        if other.min < self.min {
512            self.min = other.min;
513        }
514        self.sum += other.sum;
515        self.avg = self.avg + (other.avg - self.avg) * other.count as f64 / self.count as f64;
516
517        // Now merge the bins.
518        let mut temp = SmallVec::<[Bin; 4]>::new();
519
520        let mut bins_idx = 0;
521        for other_bin in &other.bins {
522            let start = bins_idx;
523            while bins_idx < self.bins.len() && self.bins[bins_idx].k < other_bin.k {
524                bins_idx += 1;
525            }
526
527            temp.extend_from_slice(&self.bins[start..bins_idx]);
528
529            if bins_idx >= self.bins.len() || self.bins[bins_idx].k > other_bin.k {
530                temp.push(*other_bin);
531            } else if self.bins[bins_idx].k == other_bin.k {
532                generate_bins(
533                    &mut temp,
534                    other_bin.k,
535                    u64::from(other_bin.n) + u64::from(self.bins[bins_idx].n),
536                );
537                bins_idx += 1;
538            }
539        }
540
541        temp.extend_from_slice(&self.bins[bins_idx..]);
542        trim_left(&mut temp, SKETCH_CONFIG.bin_limit);
543
544        self.bins = temp;
545    }
546
547    /// Merges this sketch into the `Dogsketch` Protocol Buffers representation.
548    pub fn merge_to_dogsketch(&self, dogsketch: &mut Dogsketch) {
549        dogsketch.set_cnt(i64::try_from(self.count).unwrap_or(i64::MAX));
550        dogsketch.set_min(self.min);
551        dogsketch.set_max(self.max);
552        dogsketch.set_avg(self.avg);
553        dogsketch.set_sum(self.sum);
554
555        let mut k = Vec::new();
556        let mut n = Vec::new();
557
558        for bin in &self.bins {
559            k.push(i32::from(bin.k));
560            n.push(bin.n);
561        }
562
563        dogsketch.set_k(k);
564        dogsketch.set_n(n);
565    }
566}
567
568impl PartialEq for DDSketch {
569    fn eq(&self, other: &Self) -> bool {
570        // We skip checking the configuration because we don't allow creating configurations by hand, and it's always
571        // locked to the constants used by the Datadog Agent.  We only check the configuration equality manually in
572        // `DDSketch::merge`, to protect ourselves in the future if different configurations become allowed.
573        //
574        // Additionally, we also use floating-point-specific relative comparisons for sum/avg because they can be
575        // minimally different between sketches purely due to floating-point behavior, despite being fed the same exact
576        // data in terms of recorded samples.
577        self.count == other.count
578            && float_eq(self.min, other.min)
579            && float_eq(self.max, other.max)
580            && float_eq(self.sum, other.sum)
581            && float_eq(self.avg, other.avg)
582            && self.bins == other.bins
583    }
584}
585
586impl Default for DDSketch {
587    fn default() -> Self {
588        Self {
589            bins: SmallVec::new(),
590            count: 0,
591            min: f64::MAX,
592            max: f64::MIN,
593            sum: 0.0,
594            avg: 0.0,
595        }
596    }
597}
598
599impl Eq for DDSketch {}
600
601impl TryFrom<Dogsketch> for DDSketch {
602    type Error = &'static str;
603
604    fn try_from(value: Dogsketch) -> Result<Self, Self::Error> {
605        let mut sketch = DDSketch {
606            count: u64::try_from(value.cnt).map_err(|_| "sketch count overflows u64 or is negative")?,
607            min: value.min,
608            max: value.max,
609            avg: value.avg,
610            sum: value.sum,
611            ..Default::default()
612        };
613
614        let k = value.k;
615        let n = value.n;
616
617        if k.len() != n.len() {
618            return Err("k and n bin vectors have differing lengths");
619        }
620
621        for (k, n) in k.into_iter().zip(n.into_iter()) {
622            let k = i16::try_from(k).map_err(|_| "bin key overflows i16")?;
623
624            sketch.bins.push(Bin { k, n });
625        }
626
627        Ok(sketch)
628    }
629}
630
631fn rank(count: u64, q: f64) -> f64 {
632    let rank = q * (count - 1) as f64;
633    rank.round_ties_even()
634}
635
636#[allow(clippy::cast_possible_truncation)]
637fn buf_count_leading_equal(keys: &[i16], start_idx: usize) -> u64 {
638    if start_idx == keys.len() - 1 {
639        return 1;
640    }
641
642    let mut idx = start_idx;
643    while idx < keys.len() && keys[idx] == keys[start_idx] {
644        idx += 1;
645    }
646
647    // SAFETY: We limit the size of the vector (used to provide the slice given to us here) to be no larger than 2^32,
648    // so we can't exceed u64 here.
649    (idx - start_idx) as u64
650}
651
652fn trim_left(bins: &mut SmallVec<[Bin; 4]>, bin_limit: u16) {
653    // We won't ever support Vector running on anything other than a 32-bit platform and above, I imagine, so this
654    // should always be safe.
655    let bin_limit = bin_limit as usize;
656    if bin_limit == 0 || bins.len() <= bin_limit {
657        return;
658    }
659
660    let num_to_remove = bins.len() - bin_limit;
661    let mut missing: u64 = 0;
662
663    // Sum all mass from the bins being removed. Per CollapsingLowestDenseStore in sketches-go,
664    // all removed mass collapses into the first kept bin (the new minimum index).
665    for bin in bins.iter().take(num_to_remove) {
666        missing += u64::from(bin.n);
667    }
668
669    // Fold the accumulated mass into the first kept bin, matching Go's `bins[newMinIndex] += n`.
670    // Any remainder that overflows u32::MAX is discarded — this requires >4B observations in a
671    // single collapsed bin and is an intentional divergence from the Datadog Agent (which uses
672    // float64 counts and never loses mass).
673    bins[num_to_remove].increment(missing);
674
675    // Drop the removed prefix, leaving exactly bin_limit bins.
676    bins.drain(0..num_to_remove);
677}
678
679#[allow(clippy::cast_possible_truncation)]
680fn generate_bins(bins: &mut SmallVec<[Bin; 4]>, k: i16, n: u64) {
681    if n < u64::from(MAX_BIN_WIDTH) {
682        // SAFETY: `n < MAX_BIN_WIDTH = u32::MAX`, so it fits in u32.
683        bins.push(Bin { k, n: n as u32 });
684    } else {
685        let overflow = n % u64::from(MAX_BIN_WIDTH);
686        if overflow != 0 {
687            bins.push(Bin {
688                k,
689                // SAFETY: `overflow = n % u32::MAX`, so overflow <= u32::MAX - 1, which fits in u32.
690                n: overflow as u32,
691            });
692        }
693
694        for _ in 0..(n / u64::from(MAX_BIN_WIDTH)) {
695            bins.push(Bin { k, n: MAX_BIN_WIDTH });
696        }
697    }
698}
699
700#[cfg(test)]
701mod tests {
702    use super::*;
703
704    // Helper: build a SmallVec<[Bin; 4]> from (k, n) pairs. Assumes input is already sorted by k.
705    fn make_bins(pairs: &[(i16, u32)]) -> SmallVec<[Bin; 4]> {
706        pairs.iter().map(|&(k, n)| Bin { k, n }).collect()
707    }
708
709    // Helper: extract (k, n) pairs from a SmallVec for easy assertion.
710    fn to_pairs(bins: &SmallVec<[Bin; 4]>) -> Vec<(i16, u32)> {
711        bins.iter().map(|b| (b.k, b.n)).collect()
712    }
713
714    /// Basic collapse: when bins exceed the limit, the mass from removed bins is merged
715    /// into the first kept bin (lowest surviving key). This mirrors the CollapsingLowestDenseStore
716    /// semantics from sketches-go: all bins with index < (maxIndex - limit + 1) collapse into
717    /// the bin at (maxIndex - limit + 1).
718    ///
719    /// Input:  [(0,2), (1,3), (2,4), (3,5)]  limit=2  →  remove 2 bins
720    /// missing = n[0] + n[1] = 2 + 3 = 5
721    /// first kept bin: k=2, n=4 → increment by 5 → n=9
722    /// Result: [(2,9), (3,5)]
723    #[test]
724    fn trim_left_collapses_removed_mass_into_first_kept_bin() {
725        let mut bins = make_bins(&[(0, 2), (1, 3), (2, 4), (3, 5)]);
726        trim_left(&mut bins, 2);
727        assert_eq!(to_pairs(&bins), vec![(2, 9), (3, 5)]);
728    }
729
730    /// Total count is preserved exactly when the collapse fits within a single u32 bin.
731    ///
732    /// Input:  [(10,5), (20,3), (30,7)]  limit=2  →  remove 1 bin
733    /// missing = 5; first kept bin k=20, n=3 → n=8
734    /// Result: [(20,8), (30,7)], total=15 == 5+3+7
735    #[test]
736    fn trim_left_preserves_total_count_when_no_overflow() {
737        let mut bins = make_bins(&[(10, 5), (20, 3), (30, 7)]);
738        let total_before: u64 = bins.iter().map(|b| u64::from(b.n)).sum();
739        trim_left(&mut bins, 2);
740        let total_after: u64 = bins.iter().map(|b| u64::from(b.n)).sum();
741        assert_eq!(to_pairs(&bins), vec![(20, 8), (30, 7)]);
742        assert_eq!(total_before, total_after);
743    }
744
745    /// With u32 bin counts, collapsed mass from multiple removed bins fits in a single bin
746    /// without saturation for typical weights, fully preserving the total count.
747    ///
748    /// Input:  [(0,50000), (1,50000), (2,1)]  limit=1  →  remove 2 bins
749    /// missing = 100000; bins[2].increment(100000): 100001 < u32::MAX → n=100001
750    /// bins.drain(0..2). Final: [(2,100001)], all mass preserved.
751    ///
752    /// With the old u16 layout, this same input would have saturated at 65535 and discarded
753    /// 34466 observations. u32 eliminates that loss for any per-bin count below ~4.3 billion.
754    #[test]
755    fn trim_left_preserves_exact_count_with_u32_bins() {
756        let mut bins = make_bins(&[(0, 50000), (1, 50000), (2, 1)]);
757        let total_before: u64 = bins.iter().map(|b| u64::from(b.n)).sum();
758        trim_left(&mut bins, 1);
759        let total_after: u64 = bins.iter().map(|b| u64::from(b.n)).sum();
760        assert_eq!(bins.len(), 1);
761        assert_eq!(bins[0].k, 2);
762        assert_eq!(bins[0].n, 100001);
763        assert_eq!(total_before, total_after, "all mass must be preserved with u32 bins");
764    }
765
766    /// When already at or under the limit, trim_left is a no-op.
767    #[test]
768    fn trim_left_no_op_when_within_limit() {
769        let original = make_bins(&[(5, 10), (6, 20)]);
770        let mut bins = original.clone();
771        trim_left(&mut bins, 2);
772        assert_eq!(to_pairs(&bins), to_pairs(&original));
773        trim_left(&mut bins, 3);
774        assert_eq!(to_pairs(&bins), to_pairs(&original));
775    }
776
777    /// Regression test for trim_left bin count with large per-sample weights.
778    ///
779    /// With the old u16 layout, a sample weight of ~260M would generate ceil(260M / 65535) = 3969
780    /// bins per key, causing bin count explosion and an encoder panic. With u32, the same weight
781    /// fits in a single bin (260M < u32::MAX ≈ 4.3B), so one insert_n call produces exactly one
782    /// bin per key and the bin limit is trivially respected.
783    ///
784    /// This test inserts several values with a weight representative of what ADP receives when
785    /// clamping an incoming sample rate of 3e-9 to its minimum of 3.845e-9 (~260M per sample),
786    /// then asserts the bin count never exceeds DDSKETCH_CONF_BIN_LIMIT.
787    #[test]
788    fn trim_left_respects_bin_limit_with_large_weights() {
789        // Weight corresponding to ADP's minimum safe sample rate (1 / 3.845e-9 ≈ 260_078_024).
790        // With u32 bins, this fits in a single bin per key (260_078_024 < u32::MAX).
791        let weight: u64 = 260_078_024;
792        let bin_limit = usize::from(DDSKETCH_CONF_BIN_LIMIT);
793
794        let mut sketch = DDSketch::default();
795
796        // Insert enough distinct values to repeatedly trigger trim_left.  Ten values is more
797        // than sufficient; two already exceed the bin limit without the fix.
798        for i in 1..=10_i32 {
799            sketch.insert_n(f64::from(i), weight);
800            assert!(
801                sketch.bins().len() <= bin_limit,
802                "bin count {} exceeded limit {} after inserting {} value(s) at weight {}",
803                sketch.bins().len(),
804                bin_limit,
805                i,
806                weight,
807            );
808        }
809    }
810
811    /// When the accumulated missing mass plus the first kept bin's existing count exceeds
812    /// u32::MAX, increment saturates at u32::MAX and the remainder is discarded.
813    ///
814    /// Input:  [(0, u32::MAX), (1, 1)]  limit=1  →  remove 1 bin
815    /// missing = u32::MAX; bins[1].increment(u32::MAX): next = u32::MAX+1 > u32::MAX
816    /// → n = u32::MAX, remainder = 1 (discarded)
817    /// Final: [(1, u32::MAX)] — 1 observation lost
818    #[test]
819    fn trim_left_saturates_first_kept_bin_and_discards_remainder() {
820        let mut bins = make_bins(&[(0, u32::MAX), (1, 1)]);
821        trim_left(&mut bins, 1);
822        assert_eq!(bins.len(), 1);
823        assert_eq!(bins[0].k, 1);
824        assert_eq!(bins[0].n, u32::MAX);
825    }
826
827    /// Collapsing works correctly with negative bin keys. The highest key always survives;
828    /// lower (more negative) keys collapse into the first kept key.
829    /// Adapted from TestAddIntDatasets in sketches-go which covers negative indices.
830    ///
831    /// Input:  [(-3,5), (-2,3), (-1,2), (0,1)]  limit=2  →  remove 2 bins
832    /// missing = 5+3=8; bins[-1].increment(8): 2+8=10 < u32::MAX → n=10
833    /// Final: [(-1,10), (0,1)]
834    #[test]
835    fn trim_left_collapses_negative_keys_correctly() {
836        let mut bins = make_bins(&[(-3, 5), (-2, 3), (-1, 2), (0, 1)]);
837        trim_left(&mut bins, 2);
838        assert_eq!(to_pairs(&bins), vec![(-1, 10), (0, 1)]);
839    }
840
841    /// Monotonic ascending sequence: inserting keys 0..N where N > limit collapses the
842    /// lowest keys into the first kept key, with their total mass summed there.
843    /// Adapted from TestAddMonotonous in sketches-go.
844    ///
845    /// Input:  [(0,1),(1,1),(2,1),(3,1),(4,1),(5,1),(6,1),(7,1),(8,1),(9,1)]  limit=4
846    /// num_to_remove=6; missing=6; bins[6].increment(6): 1+6=7 → n=7
847    /// Final: [(6,7),(7,1),(8,1),(9,1)]  — only top 4 keys kept, collapsed mass in first
848    #[test]
849    fn trim_left_monotonic_ascending_keeps_top_keys() {
850        let pairs: Vec<(i16, u32)> = (0..10).map(|i| (i, 1)).collect();
851        let mut bins = make_bins(&pairs);
852        trim_left(&mut bins, 4);
853        assert_eq!(to_pairs(&bins), vec![(6, 7), (7, 1), (8, 1), (9, 1)]);
854    }
855}
856
857/// Property-based tests for trim_left, adapted from TestAddFuzzy / TestAddIntFuzzy /
858/// TestMergeFuzzy in sketches-go/ddsketch/store/store_test.go.
859///
860/// The sketches-go suite generates random (index, count) inputs, passes them through
861/// CollapsingLowestDenseStore, and asserts structural invariants using the `collapsingLowest`
862/// oracle transform. We do the same here: generate random sorted distinct-key bins, run
863/// trim_left, and check the same invariants.
864#[cfg(test)]
865mod property_tests {
866    use proptest::prelude::*;
867
868    use super::*;
869
870    /// Strategy: a non-empty sorted vec of distinct (k: i16, n: u32) pairs.
871    /// Keys are drawn from i16 without repetition; counts are 1..=u32::MAX.
872    fn arb_bins(max_len: usize) -> impl Strategy<Value = SmallVec<[Bin; 4]>> {
873        proptest::collection::btree_map(any::<i16>(), 1u32..=u32::MAX, 1..=max_len)
874            .prop_map(|map| map.into_iter().map(|(k, n)| Bin { k, n }).collect())
875    }
876
877    /// Strategy: a bin_limit in 1..=32 (small enough to exercise collapsing frequently).
878    fn arb_limit() -> impl Strategy<Value = u16> {
879        1u16..=32
880    }
881
882    proptest! {
883        /// After trim_left, the bin count must never exceed bin_limit.
884        ///
885        /// Mirrors the core bin-count invariant checked throughout the sketches-go suite:
886        /// every store operation must leave the store within its configured capacity.
887        #[test]
888        fn prop_bin_count_never_exceeds_limit(
889            mut bins in arb_bins(64),
890            limit in arb_limit(),
891        ) {
892            trim_left(&mut bins, limit);
893            prop_assert!(
894                bins.len() <= limit as usize,
895                "bin count {} exceeded limit {}",
896                bins.len(),
897                limit,
898            );
899        }
900
901        /// After trim_left, bins must remain sorted by key with no duplicate keys.
902        ///
903        /// trim_left only drains a prefix and modifies bins[num_to_remove].n in place;
904        /// it must not disturb the ordering of the surviving bins.
905        #[test]
906        fn prop_output_bins_are_sorted_and_distinct(
907            mut bins in arb_bins(64),
908            limit in arb_limit(),
909        ) {
910            trim_left(&mut bins, limit);
911            for window in bins.windows(2) {
912                prop_assert!(
913                    window[0].k < window[1].k,
914                    "bins not strictly sorted after trim_left: {:?} >= {:?}",
915                    window[0].k,
916                    window[1].k,
917                );
918            }
919        }
920
921        /// When bins.len() <= limit, trim_left is a no-op: bins is unchanged.
922        #[test]
923        fn prop_no_op_when_within_limit(
924            bins in arb_bins(16),
925            extra in 0u16..=16,
926        ) {
927            let limit = bins.len() as u16 + extra;
928            let original = bins.clone();
929            let mut bins = bins;
930            trim_left(&mut bins, limit);
931            prop_assert_eq!(bins, original);
932        }
933
934        /// Total count is preserved exactly when no u32 overflow occurs.
935        ///
936        /// This is the key invariant from sketches-go's assertEncodeBins:
937        /// store.TotalCount() must equal the sum of all inserted counts.
938        /// We restrict counts to keep the collapsed sum safely below u32::MAX
939        /// (sketches-go never loses mass because it uses float64; we match that
940        /// guarantee for all inputs where the collapsed bin doesn't overflow u32).
941        ///
942        /// Strategy: counts capped at 1000 so that even if all 64 bins collapse
943        /// into one the sum (≤ 64_000) is far below u32::MAX.
944        #[test]
945        fn prop_total_count_preserved_when_no_overflow(
946            mut bins in proptest::collection::btree_map(any::<i16>(), 1u32..=1000, 1..=64usize)
947                .prop_map(|map| -> SmallVec<[Bin; 4]> {
948                    map.into_iter().map(|(k, n)| Bin { k, n }).collect()
949                }),
950            limit in arb_limit(),
951        ) {
952            let total_before: u64 = bins.iter().map(|b| u64::from(b.n)).sum();
953            trim_left(&mut bins, limit);
954            let total_after: u64 = bins.iter().map(|b| u64::from(b.n)).sum();
955            prop_assert_eq!(
956                total_before, total_after,
957                "total count changed: before={}, after={}",
958                total_before, total_after,
959            );
960        }
961
962        /// The surviving bins are always the bin_limit highest-key bins from the input.
963        ///
964        /// This is the direct encoding of the collapsingLowest oracle from sketches-go:
965        /// minCollapsedIndex = maxIndex - limit + 1; all bins with key < minCollapsedIndex
966        /// are removed (their mass folds into minCollapsedIndex). The output keys must
967        /// exactly match the top min(len, limit) keys of the input.
968        #[test]
969        fn prop_output_keys_are_highest_from_input(
970            mut bins in arb_bins(64),
971            limit in arb_limit(),
972        ) {
973            let all_keys: Vec<i16> = bins.iter().map(|b| b.k).collect();
974            let expected_len = all_keys.len().min(limit as usize);
975            let expected_keys: Vec<i16> = all_keys[all_keys.len() - expected_len..].to_vec();
976
977            trim_left(&mut bins, limit);
978
979            let actual_keys: Vec<i16> = bins.iter().map(|b| b.k).collect();
980            prop_assert_eq!(
981                actual_keys, expected_keys,
982                "output keys don't match top {} keys of input",
983                limit,
984            );
985        }
986    }
987}