ddsketch/agent/sketch.rs
1//! Agent-specific DDSketch implementation.
2
3use std::cmp::Ordering;
4
5use datadog_protos::metrics::Dogsketch;
6use ordered_float::OrderedFloat;
7use smallvec::SmallVec;
8
9use super::bin::Bin;
10use super::bucket::Bucket;
11use super::config::{
12 Config, DDSKETCH_CONF_BIN_LIMIT, DDSKETCH_CONF_GAMMA_LN, DDSKETCH_CONF_GAMMA_V, DDSKETCH_CONF_NORM_BIAS,
13 DDSKETCH_CONF_NORM_MIN,
14};
15use crate::common::float_eq;
16
17static SKETCH_CONFIG: Config = Config::new(
18 DDSKETCH_CONF_BIN_LIMIT,
19 DDSKETCH_CONF_GAMMA_V,
20 DDSKETCH_CONF_GAMMA_LN,
21 DDSKETCH_CONF_NORM_MIN,
22 DDSKETCH_CONF_NORM_BIAS,
23);
24const MAX_BIN_WIDTH: u32 = u32::MAX;
25
26/// [DDSketch][ddsketch] implementation based on the [Datadog Agent][ddagent].
27///
28/// This implementation is subtly different from the open-source implementations of `DDSketch`, as Datadog made some
29/// slight tweaks to configuration values and in-memory layout to optimize it for insertion performance within the
30/// agent.
31///
32/// We've mimicked the agent version of `DDSketch` here in order to support a future where we can take sketches shipped
33/// by the agent, handle them internally, merge them, and so on, without any loss of accuracy, eventually forwarding
34/// them to Datadog ourselves.
35///
36/// As such, this implementation is constrained in the same ways: the configuration parameters cannot be changed, the
37/// collapsing strategy is fixed, and we support a limited number of methods for inserting into the sketch.
38///
39/// Importantly, we have a special function, again taken from the agent version, to allow us to interpolate histograms,
40/// specifically our own aggregated histograms, into a sketch so that we can emit useful default quantiles, rather than
41/// having to ship the buckets -- upper bound and count -- to a downstream system that might have no native way to do
42/// the same thing, basically providing no value as they have no way to render useful data from them.
43///
44/// # Features
45///
46/// This crate exposes a single feature, `serde`, which enables serialization and deserialization of `DDSketch` with
47/// `serde`. This feature is not enabled by default, as it can be slightly risky to use. This is primarily due to the
48/// fact that the format of `DDSketch` is not promised to be stable over time. If you enable this feature, you should
49/// take care to avoid storing serialized `DDSketch` data for long periods of time, as deserializing it in the future
50/// may work but could lead to incorrect/unexpected behavior or issues with correctness.
51///
52/// [ddsketch]: https://www.vldb.org/pvldb/vol12/p2195-masson.pdf
53/// [ddagent]: https://github.com/DataDog/datadog-agent
54#[derive(Clone, Debug)]
55#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
56pub struct DDSketch {
57 /// The bins within the sketch.
58 bins: SmallVec<[Bin; 4]>,
59
60 /// The number of observations within the sketch.
61 count: u64,
62
63 /// The minimum value of all observations within the sketch.
64 min: f64,
65
66 /// The maximum value of all observations within the sketch.
67 max: f64,
68
69 /// The sum of all observations within the sketch.
70 sum: f64,
71
72 /// The average value of all observations within the sketch.
73 avg: f64,
74}
75
76impl DDSketch {
77 /// Returns the number of bins in the sketch.
78 pub fn bin_count(&self) -> usize {
79 self.bins.len()
80 }
81
82 /// Whether or not this sketch is empty.
83 pub fn is_empty(&self) -> bool {
84 self.count == 0
85 }
86
87 /// Number of samples currently represented by this sketch.
88 pub fn count(&self) -> u64 {
89 self.count
90 }
91
92 /// Minimum value seen by this sketch.
93 ///
94 /// Returns `None` if the sketch is empty.
95 pub fn min(&self) -> Option<f64> {
96 if self.is_empty() {
97 None
98 } else {
99 Some(self.min)
100 }
101 }
102
103 /// Maximum value seen by this sketch.
104 ///
105 /// Returns `None` if the sketch is empty.
106 pub fn max(&self) -> Option<f64> {
107 if self.is_empty() {
108 None
109 } else {
110 Some(self.max)
111 }
112 }
113
114 /// Sum of all values seen by this sketch.
115 ///
116 /// Returns `None` if the sketch is empty.
117 pub fn sum(&self) -> Option<f64> {
118 if self.is_empty() {
119 None
120 } else {
121 Some(self.sum)
122 }
123 }
124
125 /// Average value seen by this sketch.
126 ///
127 /// Returns `None` if the sketch is empty.
128 pub fn avg(&self) -> Option<f64> {
129 if self.is_empty() {
130 None
131 } else {
132 Some(self.avg)
133 }
134 }
135
136 /// Returns the current bins of this sketch.
137 pub fn bins(&self) -> &[Bin] {
138 &self.bins
139 }
140
141 /// Clears the sketch, removing all bins and resetting all statistics.
142 pub fn clear(&mut self) {
143 self.count = 0;
144 self.min = f64::MAX;
145 self.max = f64::MIN;
146 self.avg = 0.0;
147 self.sum = 0.0;
148 self.bins.clear();
149 }
150
151 fn adjust_basic_stats(&mut self, v: f64, n: u64) {
152 if v < self.min {
153 self.min = v;
154 }
155
156 if v > self.max {
157 self.max = v;
158 }
159
160 self.count += n;
161 self.sum += v * n as f64;
162
163 if n == 1 {
164 self.avg += (v - self.avg) / self.count as f64;
165 } else {
166 // TODO: From the Agent source code, this method apparently loses precision when the
167 // two averages -- v and self.avg -- are close. Is there a better approach?
168 self.avg = self.avg + (v - self.avg) * n as f64 / self.count as f64;
169 }
170 }
171
172 fn insert_key_counts(&mut self, counts: &[(i16, u64)]) {
173 let mut temp = SmallVec::<[Bin; 4]>::new();
174
175 let mut bins_idx = 0;
176 let mut key_idx = 0;
177 let bins_len = self.bins.len();
178 let counts_len = counts.len();
179
180 // PERF TODO: there's probably a fast path to be had where could check if all if the counts have existing bins
181 // that aren't yet full, and we just update them directly, although we'd still be doing a linear scan to find
182 // them since keys aren't 1:1 with their position in `self.bins` but using this method just to update one or two
183 // bins is clearly suboptimal and we wouldn't really want to scan them all just to have to back out and actually
184 // do the non-fast path.. maybe a first pass could be checking if the first/last key falls within our known
185 // min/max key, and if it doesn't, then we know we have to go through the non-fast path, and if it passes, we do
186 // the scan to see if we can just update bins directly?
187 while bins_idx < bins_len && key_idx < counts_len {
188 let bin = self.bins[bins_idx];
189 let vk = counts[key_idx].0;
190 let kn = counts[key_idx].1;
191
192 match bin.k.cmp(&vk) {
193 Ordering::Greater => {
194 generate_bins(&mut temp, vk, kn);
195 key_idx += 1;
196 }
197 Ordering::Less => {
198 temp.push(bin);
199 bins_idx += 1;
200 }
201 Ordering::Equal => {
202 generate_bins(&mut temp, bin.k, u64::from(bin.n) + kn);
203 bins_idx += 1;
204 key_idx += 1;
205 }
206 }
207 }
208
209 temp.extend_from_slice(&self.bins[bins_idx..]);
210
211 while key_idx < counts_len {
212 let vk = counts[key_idx].0;
213 let kn = counts[key_idx].1;
214 generate_bins(&mut temp, vk, kn);
215 key_idx += 1;
216 }
217
218 trim_left(&mut temp, SKETCH_CONFIG.bin_limit);
219
220 // PERF TODO: This is where we might do a mem::swap instead so that we could shove the bin vector into an object
221 // pool but I'm not sure this actually matters at the moment.
222 self.bins = temp;
223 }
224
225 fn insert_keys(&mut self, mut keys: Vec<i16>) {
226 // Updating more than 4 billion keys would be very very weird and likely indicative of something horribly
227 // broken.
228 //
229 // TODO: I don't actually understand why I wrote this assertion in this way. Either the code can handle
230 // collapsing values in order to maintain the relative error bounds, or we have to cap it to the maximum allowed
231 // number of bins. Gotta think about this some more.
232 assert!(keys.len() <= u32::MAX.try_into().expect("we don't support 16-bit systems"));
233
234 keys.sort_unstable();
235
236 let mut temp = SmallVec::<[Bin; 4]>::new();
237
238 let mut bins_idx = 0;
239 let mut key_idx = 0;
240 let bins_len = self.bins.len();
241 let keys_len = keys.len();
242
243 // PERF TODO: there's probably a fast path to be had where could check if all if the counts have existing bins
244 // that aren't yet full, and we just update them directly, although we'd still be doing a linear scan to find
245 // them since keys aren't 1:1 with their position in `self.bins` but using this method just to update one or two
246 // bins is clearly suboptimal and we wouldn't really want to scan them all just to have to back out and actually
247 // do the non-fast path.. maybe a first pass could be checking if the first/last key falls within our known
248 // min/max key, and if it doesn't, then we know we have to go through the non-fast path, and if it passes, we do
249 // the scan to see if we can just update bins directly?
250 while bins_idx < bins_len && key_idx < keys_len {
251 let bin = self.bins[bins_idx];
252 let vk = keys[key_idx];
253
254 match bin.k.cmp(&vk) {
255 Ordering::Greater => {
256 let kn = buf_count_leading_equal(&keys, key_idx);
257 generate_bins(&mut temp, vk, kn);
258 key_idx += kn as usize;
259 }
260 Ordering::Less => {
261 temp.push(bin);
262 bins_idx += 1;
263 }
264 Ordering::Equal => {
265 let kn = buf_count_leading_equal(&keys, key_idx);
266 generate_bins(&mut temp, bin.k, u64::from(bin.n) + kn);
267 bins_idx += 1;
268 key_idx += kn as usize;
269 }
270 }
271 }
272
273 temp.extend_from_slice(&self.bins[bins_idx..]);
274
275 while key_idx < keys_len {
276 let vk = keys[key_idx];
277 let kn = buf_count_leading_equal(&keys, key_idx);
278 generate_bins(&mut temp, vk, kn);
279 key_idx += kn as usize;
280 }
281
282 trim_left(&mut temp, SKETCH_CONFIG.bin_limit);
283
284 // PERF TODO: This is where we might do a mem::swap instead so that we could shove the bin vector into an object
285 // pool but I'm not sure this actually matters at the moment.
286 self.bins = temp;
287 }
288
289 /// Inserts a single value into the sketch.
290 pub fn insert(&mut self, v: f64) {
291 // TODO: This should return a result that makes sure we have enough room to actually add 1 more sample without
292 // hitting `self.config.max_count()`
293 self.adjust_basic_stats(v, 1);
294
295 let key = SKETCH_CONFIG.key(v);
296
297 let mut insert_at = None;
298
299 for (bin_idx, b) in self.bins.iter_mut().enumerate() {
300 if b.k == key {
301 if b.n < MAX_BIN_WIDTH {
302 // Fast path for adding to an existing bin without overflow.
303 b.n += 1;
304 return;
305 } else {
306 insert_at = Some(bin_idx);
307 break;
308 }
309 }
310 if b.k > key {
311 insert_at = Some(bin_idx);
312 break;
313 }
314 }
315
316 if let Some(bin_idx) = insert_at {
317 self.bins.insert(bin_idx, Bin { k: key, n: 1 });
318 } else {
319 self.bins.push(Bin { k: key, n: 1 });
320 }
321 trim_left(&mut self.bins, SKETCH_CONFIG.bin_limit);
322 }
323
324 /// Inserts many values into the sketch.
325 pub fn insert_many(&mut self, vs: &[f64]) {
326 // TODO: This should return a result that makes sure we have enough room to actually add N more samples without
327 // hitting `self.config.bin_limit`.
328 let mut keys = Vec::with_capacity(vs.len());
329 for v in vs {
330 self.adjust_basic_stats(*v, 1);
331 keys.push(SKETCH_CONFIG.key(*v));
332 }
333 self.insert_keys(keys);
334 }
335
336 /// Inserts a single value into the sketch `n` times.
337 pub fn insert_n(&mut self, v: f64, n: u64) {
338 // TODO: This should return a result that makes sure we have enough room to actually add N more samples without
339 // hitting `self.config.max_count()`.
340 if n == 1 {
341 self.insert(v);
342 } else {
343 self.adjust_basic_stats(v, n);
344
345 let key = SKETCH_CONFIG.key(v);
346 self.insert_key_counts(&[(key, n)]);
347 }
348 }
349
350 fn insert_interpolate_bucket(&mut self, lower: f64, upper: f64, count: u64) {
351 // Find the keys for the bins where the lower bound and upper bound would end up, and collect all of the keys in
352 // between, inclusive.
353 let lower_key = SKETCH_CONFIG.key(lower);
354 let upper_key = SKETCH_CONFIG.key(upper);
355 let keys = (lower_key..=upper_key).collect::<Vec<_>>();
356
357 let mut key_counts = Vec::new();
358 let mut remaining_count = count;
359 let distance = upper - lower;
360 let mut start_idx = 0;
361 let mut end_idx = 1;
362 let mut lower_bound = SKETCH_CONFIG.bin_lower_bound(keys[start_idx]);
363 let mut remainder = 0.0;
364
365 while end_idx < keys.len() && remaining_count > 0 {
366 // For each key, map the total distance between the input lower/upper bound against the sketch lower/upper
367 // bound for the current sketch bin, which tells us how much of the input count to apply to the current
368 // sketch bin.
369 let upper_bound = SKETCH_CONFIG.bin_lower_bound(keys[end_idx]);
370 let fkn = ((upper_bound - lower_bound) / distance) * count as f64;
371 if fkn > 1.0 {
372 remainder += fkn - fkn.trunc();
373 }
374
375 // SAFETY: This integer cast is intentional: we want to get the non-fractional part, as we've captured the
376 // fractional part in the above conditional.
377 #[allow(clippy::cast_possible_truncation)]
378 let mut kn = fkn as u64;
379 if remainder > 1.0 {
380 kn += 1;
381 remainder -= 1.0;
382 }
383
384 if kn > 0 {
385 if kn > remaining_count {
386 kn = remaining_count;
387 }
388
389 self.adjust_basic_stats(lower_bound, kn);
390 key_counts.push((keys[start_idx], kn));
391
392 remaining_count -= kn;
393 start_idx = end_idx;
394 lower_bound = upper_bound;
395 }
396
397 end_idx += 1;
398 }
399
400 if remaining_count > 0 {
401 let last_key = keys[start_idx];
402 lower_bound = SKETCH_CONFIG.bin_lower_bound(last_key);
403 self.adjust_basic_stats(lower_bound, remaining_count);
404 key_counts.push((last_key, remaining_count));
405 }
406
407 // Sort the key counts first, as that's required by `insert_key_counts`.
408 key_counts.sort_unstable_by(|(k1, _), (k2, _)| k1.cmp(k2));
409
410 self.insert_key_counts(&key_counts);
411 }
412
413 /// Inserts histogram buckets into the sketch via linear interpolation.
414 ///
415 /// ## Errors
416 ///
417 /// Returns an error if a bucket size is greater that `u32::MAX`.
418 pub fn insert_interpolate_buckets(&mut self, mut buckets: Vec<Bucket>) -> Result<(), &'static str> {
419 // Buckets need to be sorted from lowest to highest so that we can properly calculate the rolling lower/upper
420 // bounds.
421 buckets.sort_by(|a, b| {
422 let oa = OrderedFloat(a.upper_limit);
423 let ob = OrderedFloat(b.upper_limit);
424
425 oa.cmp(&ob)
426 });
427
428 let mut lower = f64::NEG_INFINITY;
429
430 if buckets.iter().any(|bucket| bucket.count > u64::from(u32::MAX)) {
431 return Err("bucket size greater than u32::MAX");
432 }
433
434 for bucket in buckets {
435 let mut upper = bucket.upper_limit;
436 if upper.is_sign_positive() && upper.is_infinite() {
437 upper = lower;
438 } else if lower.is_sign_negative() && lower.is_infinite() {
439 lower = upper;
440 }
441
442 self.insert_interpolate_bucket(lower, upper, bucket.count);
443 lower = bucket.upper_limit;
444 }
445
446 Ok(())
447 }
448
449 /// Adds a bin directly into the sketch.
450 ///
451 /// Used only for unit testing so that we can create a sketch with an exact layout, which allows testing around the
452 /// resulting bins when feeding in specific values, as well as generating explicitly bad layouts for testing.
453 #[allow(dead_code)]
454 pub(crate) fn insert_raw_bin(&mut self, k: i16, n: u32) {
455 let v = SKETCH_CONFIG.bin_lower_bound(k);
456 self.adjust_basic_stats(v, u64::from(n));
457 self.bins.push(Bin { k, n });
458 }
459
460 /// Gets the value at a given quantile.
461 pub fn quantile(&self, q: f64) -> Option<f64> {
462 if self.count == 0 {
463 return None;
464 }
465
466 if q <= 0.0 {
467 return Some(self.min);
468 }
469
470 if q >= 1.0 {
471 return Some(self.max);
472 }
473
474 let mut n = 0.0;
475 let mut estimated = None;
476 let wanted_rank = rank(self.count, q);
477
478 for (i, bin) in self.bins.iter().enumerate() {
479 n += f64::from(bin.n);
480 if n <= wanted_rank {
481 continue;
482 }
483
484 let weight = (n - wanted_rank) / f64::from(bin.n);
485 let mut v_low = SKETCH_CONFIG.bin_lower_bound(bin.k);
486 let mut v_high = v_low * SKETCH_CONFIG.gamma_v;
487
488 if i == self.bins.len() {
489 v_high = self.max;
490 } else if i == 0 {
491 v_low = self.min;
492 }
493
494 estimated = Some(v_low * weight + v_high * (1.0 - weight));
495 break;
496 }
497
498 estimated.map(|v| v.clamp(self.min, self.max)).or(Some(f64::NAN))
499 }
500
501 /// Merges another sketch into this sketch, without a loss of accuracy.
502 ///
503 /// All samples present in the other sketch will be correctly represented in this sketch, and summary statistics
504 /// such as the sum, average, count, min, and max, will represent the sum of samples from both sketches.
505 pub fn merge(&mut self, other: &DDSketch) {
506 // Merge the basic statistics together.
507 self.count += other.count;
508 if other.max > self.max {
509 self.max = other.max;
510 }
511 if other.min < self.min {
512 self.min = other.min;
513 }
514 self.sum += other.sum;
515 self.avg = self.avg + (other.avg - self.avg) * other.count as f64 / self.count as f64;
516
517 // Now merge the bins.
518 let mut temp = SmallVec::<[Bin; 4]>::new();
519
520 let mut bins_idx = 0;
521 for other_bin in &other.bins {
522 let start = bins_idx;
523 while bins_idx < self.bins.len() && self.bins[bins_idx].k < other_bin.k {
524 bins_idx += 1;
525 }
526
527 temp.extend_from_slice(&self.bins[start..bins_idx]);
528
529 if bins_idx >= self.bins.len() || self.bins[bins_idx].k > other_bin.k {
530 temp.push(*other_bin);
531 } else if self.bins[bins_idx].k == other_bin.k {
532 generate_bins(
533 &mut temp,
534 other_bin.k,
535 u64::from(other_bin.n) + u64::from(self.bins[bins_idx].n),
536 );
537 bins_idx += 1;
538 }
539 }
540
541 temp.extend_from_slice(&self.bins[bins_idx..]);
542 trim_left(&mut temp, SKETCH_CONFIG.bin_limit);
543
544 self.bins = temp;
545 }
546
547 /// Merges this sketch into the `Dogsketch` Protocol Buffers representation.
548 pub fn merge_to_dogsketch(&self, dogsketch: &mut Dogsketch) {
549 dogsketch.set_cnt(i64::try_from(self.count).unwrap_or(i64::MAX));
550 dogsketch.set_min(self.min);
551 dogsketch.set_max(self.max);
552 dogsketch.set_avg(self.avg);
553 dogsketch.set_sum(self.sum);
554
555 let mut k = Vec::new();
556 let mut n = Vec::new();
557
558 for bin in &self.bins {
559 k.push(i32::from(bin.k));
560 n.push(bin.n);
561 }
562
563 dogsketch.set_k(k);
564 dogsketch.set_n(n);
565 }
566}
567
568impl PartialEq for DDSketch {
569 fn eq(&self, other: &Self) -> bool {
570 // We skip checking the configuration because we don't allow creating configurations by hand, and it's always
571 // locked to the constants used by the Datadog Agent. We only check the configuration equality manually in
572 // `DDSketch::merge`, to protect ourselves in the future if different configurations become allowed.
573 //
574 // Additionally, we also use floating-point-specific relative comparisons for sum/avg because they can be
575 // minimally different between sketches purely due to floating-point behavior, despite being fed the same exact
576 // data in terms of recorded samples.
577 self.count == other.count
578 && float_eq(self.min, other.min)
579 && float_eq(self.max, other.max)
580 && float_eq(self.sum, other.sum)
581 && float_eq(self.avg, other.avg)
582 && self.bins == other.bins
583 }
584}
585
586impl Default for DDSketch {
587 fn default() -> Self {
588 Self {
589 bins: SmallVec::new(),
590 count: 0,
591 min: f64::MAX,
592 max: f64::MIN,
593 sum: 0.0,
594 avg: 0.0,
595 }
596 }
597}
598
599impl Eq for DDSketch {}
600
601impl TryFrom<Dogsketch> for DDSketch {
602 type Error = &'static str;
603
604 fn try_from(value: Dogsketch) -> Result<Self, Self::Error> {
605 let mut sketch = DDSketch {
606 count: u64::try_from(value.cnt).map_err(|_| "sketch count overflows u64 or is negative")?,
607 min: value.min,
608 max: value.max,
609 avg: value.avg,
610 sum: value.sum,
611 ..Default::default()
612 };
613
614 let k = value.k;
615 let n = value.n;
616
617 if k.len() != n.len() {
618 return Err("k and n bin vectors have differing lengths");
619 }
620
621 for (k, n) in k.into_iter().zip(n.into_iter()) {
622 let k = i16::try_from(k).map_err(|_| "bin key overflows i16")?;
623
624 sketch.bins.push(Bin { k, n });
625 }
626
627 Ok(sketch)
628 }
629}
630
631fn rank(count: u64, q: f64) -> f64 {
632 let rank = q * (count - 1) as f64;
633 rank.round_ties_even()
634}
635
636#[allow(clippy::cast_possible_truncation)]
637fn buf_count_leading_equal(keys: &[i16], start_idx: usize) -> u64 {
638 if start_idx == keys.len() - 1 {
639 return 1;
640 }
641
642 let mut idx = start_idx;
643 while idx < keys.len() && keys[idx] == keys[start_idx] {
644 idx += 1;
645 }
646
647 // SAFETY: We limit the size of the vector (used to provide the slice given to us here) to be no larger than 2^32,
648 // so we can't exceed u64 here.
649 (idx - start_idx) as u64
650}
651
652fn trim_left(bins: &mut SmallVec<[Bin; 4]>, bin_limit: u16) {
653 // We won't ever support Vector running on anything other than a 32-bit platform and above, I imagine, so this
654 // should always be safe.
655 let bin_limit = bin_limit as usize;
656 if bin_limit == 0 || bins.len() <= bin_limit {
657 return;
658 }
659
660 let num_to_remove = bins.len() - bin_limit;
661 let mut missing: u64 = 0;
662
663 // Sum all mass from the bins being removed. Per CollapsingLowestDenseStore in sketches-go,
664 // all removed mass collapses into the first kept bin (the new minimum index).
665 for bin in bins.iter().take(num_to_remove) {
666 missing += u64::from(bin.n);
667 }
668
669 // Fold the accumulated mass into the first kept bin, matching Go's `bins[newMinIndex] += n`.
670 // Any remainder that overflows u32::MAX is discarded — this requires >4B observations in a
671 // single collapsed bin and is an intentional divergence from the Datadog Agent (which uses
672 // float64 counts and never loses mass).
673 bins[num_to_remove].increment(missing);
674
675 // Drop the removed prefix, leaving exactly bin_limit bins.
676 bins.drain(0..num_to_remove);
677}
678
679#[allow(clippy::cast_possible_truncation)]
680fn generate_bins(bins: &mut SmallVec<[Bin; 4]>, k: i16, n: u64) {
681 if n < u64::from(MAX_BIN_WIDTH) {
682 // SAFETY: `n < MAX_BIN_WIDTH = u32::MAX`, so it fits in u32.
683 bins.push(Bin { k, n: n as u32 });
684 } else {
685 let overflow = n % u64::from(MAX_BIN_WIDTH);
686 if overflow != 0 {
687 bins.push(Bin {
688 k,
689 // SAFETY: `overflow = n % u32::MAX`, so overflow <= u32::MAX - 1, which fits in u32.
690 n: overflow as u32,
691 });
692 }
693
694 for _ in 0..(n / u64::from(MAX_BIN_WIDTH)) {
695 bins.push(Bin { k, n: MAX_BIN_WIDTH });
696 }
697 }
698}
699
700#[cfg(test)]
701mod tests {
702 use super::*;
703
704 // Helper: build a SmallVec<[Bin; 4]> from (k, n) pairs. Assumes input is already sorted by k.
705 fn make_bins(pairs: &[(i16, u32)]) -> SmallVec<[Bin; 4]> {
706 pairs.iter().map(|&(k, n)| Bin { k, n }).collect()
707 }
708
709 // Helper: extract (k, n) pairs from a SmallVec for easy assertion.
710 fn to_pairs(bins: &SmallVec<[Bin; 4]>) -> Vec<(i16, u32)> {
711 bins.iter().map(|b| (b.k, b.n)).collect()
712 }
713
714 /// Basic collapse: when bins exceed the limit, the mass from removed bins is merged
715 /// into the first kept bin (lowest surviving key). This mirrors the CollapsingLowestDenseStore
716 /// semantics from sketches-go: all bins with index < (maxIndex - limit + 1) collapse into
717 /// the bin at (maxIndex - limit + 1).
718 ///
719 /// Input: [(0,2), (1,3), (2,4), (3,5)] limit=2 → remove 2 bins
720 /// missing = n[0] + n[1] = 2 + 3 = 5
721 /// first kept bin: k=2, n=4 → increment by 5 → n=9
722 /// Result: [(2,9), (3,5)]
723 #[test]
724 fn trim_left_collapses_removed_mass_into_first_kept_bin() {
725 let mut bins = make_bins(&[(0, 2), (1, 3), (2, 4), (3, 5)]);
726 trim_left(&mut bins, 2);
727 assert_eq!(to_pairs(&bins), vec![(2, 9), (3, 5)]);
728 }
729
730 /// Total count is preserved exactly when the collapse fits within a single u32 bin.
731 ///
732 /// Input: [(10,5), (20,3), (30,7)] limit=2 → remove 1 bin
733 /// missing = 5; first kept bin k=20, n=3 → n=8
734 /// Result: [(20,8), (30,7)], total=15 == 5+3+7
735 #[test]
736 fn trim_left_preserves_total_count_when_no_overflow() {
737 let mut bins = make_bins(&[(10, 5), (20, 3), (30, 7)]);
738 let total_before: u64 = bins.iter().map(|b| u64::from(b.n)).sum();
739 trim_left(&mut bins, 2);
740 let total_after: u64 = bins.iter().map(|b| u64::from(b.n)).sum();
741 assert_eq!(to_pairs(&bins), vec![(20, 8), (30, 7)]);
742 assert_eq!(total_before, total_after);
743 }
744
745 /// With u32 bin counts, collapsed mass from multiple removed bins fits in a single bin
746 /// without saturation for typical weights, fully preserving the total count.
747 ///
748 /// Input: [(0,50000), (1,50000), (2,1)] limit=1 → remove 2 bins
749 /// missing = 100000; bins[2].increment(100000): 100001 < u32::MAX → n=100001
750 /// bins.drain(0..2). Final: [(2,100001)], all mass preserved.
751 ///
752 /// With the old u16 layout, this same input would have saturated at 65535 and discarded
753 /// 34466 observations. u32 eliminates that loss for any per-bin count below ~4.3 billion.
754 #[test]
755 fn trim_left_preserves_exact_count_with_u32_bins() {
756 let mut bins = make_bins(&[(0, 50000), (1, 50000), (2, 1)]);
757 let total_before: u64 = bins.iter().map(|b| u64::from(b.n)).sum();
758 trim_left(&mut bins, 1);
759 let total_after: u64 = bins.iter().map(|b| u64::from(b.n)).sum();
760 assert_eq!(bins.len(), 1);
761 assert_eq!(bins[0].k, 2);
762 assert_eq!(bins[0].n, 100001);
763 assert_eq!(total_before, total_after, "all mass must be preserved with u32 bins");
764 }
765
766 /// When already at or under the limit, trim_left is a no-op.
767 #[test]
768 fn trim_left_no_op_when_within_limit() {
769 let original = make_bins(&[(5, 10), (6, 20)]);
770 let mut bins = original.clone();
771 trim_left(&mut bins, 2);
772 assert_eq!(to_pairs(&bins), to_pairs(&original));
773 trim_left(&mut bins, 3);
774 assert_eq!(to_pairs(&bins), to_pairs(&original));
775 }
776
777 /// Regression test for trim_left bin count with large per-sample weights.
778 ///
779 /// With the old u16 layout, a sample weight of ~260M would generate ceil(260M / 65535) = 3969
780 /// bins per key, causing bin count explosion and an encoder panic. With u32, the same weight
781 /// fits in a single bin (260M < u32::MAX ≈ 4.3B), so one insert_n call produces exactly one
782 /// bin per key and the bin limit is trivially respected.
783 ///
784 /// This test inserts several values with a weight representative of what ADP receives when
785 /// clamping an incoming sample rate of 3e-9 to its minimum of 3.845e-9 (~260M per sample),
786 /// then asserts the bin count never exceeds DDSKETCH_CONF_BIN_LIMIT.
787 #[test]
788 fn trim_left_respects_bin_limit_with_large_weights() {
789 // Weight corresponding to ADP's minimum safe sample rate (1 / 3.845e-9 ≈ 260_078_024).
790 // With u32 bins, this fits in a single bin per key (260_078_024 < u32::MAX).
791 let weight: u64 = 260_078_024;
792 let bin_limit = usize::from(DDSKETCH_CONF_BIN_LIMIT);
793
794 let mut sketch = DDSketch::default();
795
796 // Insert enough distinct values to repeatedly trigger trim_left. Ten values is more
797 // than sufficient; two already exceed the bin limit without the fix.
798 for i in 1..=10_i32 {
799 sketch.insert_n(f64::from(i), weight);
800 assert!(
801 sketch.bins().len() <= bin_limit,
802 "bin count {} exceeded limit {} after inserting {} value(s) at weight {}",
803 sketch.bins().len(),
804 bin_limit,
805 i,
806 weight,
807 );
808 }
809 }
810
811 /// When the accumulated missing mass plus the first kept bin's existing count exceeds
812 /// u32::MAX, increment saturates at u32::MAX and the remainder is discarded.
813 ///
814 /// Input: [(0, u32::MAX), (1, 1)] limit=1 → remove 1 bin
815 /// missing = u32::MAX; bins[1].increment(u32::MAX): next = u32::MAX+1 > u32::MAX
816 /// → n = u32::MAX, remainder = 1 (discarded)
817 /// Final: [(1, u32::MAX)] — 1 observation lost
818 #[test]
819 fn trim_left_saturates_first_kept_bin_and_discards_remainder() {
820 let mut bins = make_bins(&[(0, u32::MAX), (1, 1)]);
821 trim_left(&mut bins, 1);
822 assert_eq!(bins.len(), 1);
823 assert_eq!(bins[0].k, 1);
824 assert_eq!(bins[0].n, u32::MAX);
825 }
826
827 /// Collapsing works correctly with negative bin keys. The highest key always survives;
828 /// lower (more negative) keys collapse into the first kept key.
829 /// Adapted from TestAddIntDatasets in sketches-go which covers negative indices.
830 ///
831 /// Input: [(-3,5), (-2,3), (-1,2), (0,1)] limit=2 → remove 2 bins
832 /// missing = 5+3=8; bins[-1].increment(8): 2+8=10 < u32::MAX → n=10
833 /// Final: [(-1,10), (0,1)]
834 #[test]
835 fn trim_left_collapses_negative_keys_correctly() {
836 let mut bins = make_bins(&[(-3, 5), (-2, 3), (-1, 2), (0, 1)]);
837 trim_left(&mut bins, 2);
838 assert_eq!(to_pairs(&bins), vec![(-1, 10), (0, 1)]);
839 }
840
841 /// Monotonic ascending sequence: inserting keys 0..N where N > limit collapses the
842 /// lowest keys into the first kept key, with their total mass summed there.
843 /// Adapted from TestAddMonotonous in sketches-go.
844 ///
845 /// Input: [(0,1),(1,1),(2,1),(3,1),(4,1),(5,1),(6,1),(7,1),(8,1),(9,1)] limit=4
846 /// num_to_remove=6; missing=6; bins[6].increment(6): 1+6=7 → n=7
847 /// Final: [(6,7),(7,1),(8,1),(9,1)] — only top 4 keys kept, collapsed mass in first
848 #[test]
849 fn trim_left_monotonic_ascending_keeps_top_keys() {
850 let pairs: Vec<(i16, u32)> = (0..10).map(|i| (i, 1)).collect();
851 let mut bins = make_bins(&pairs);
852 trim_left(&mut bins, 4);
853 assert_eq!(to_pairs(&bins), vec![(6, 7), (7, 1), (8, 1), (9, 1)]);
854 }
855}
856
857/// Property-based tests for trim_left, adapted from TestAddFuzzy / TestAddIntFuzzy /
858/// TestMergeFuzzy in sketches-go/ddsketch/store/store_test.go.
859///
860/// The sketches-go suite generates random (index, count) inputs, passes them through
861/// CollapsingLowestDenseStore, and asserts structural invariants using the `collapsingLowest`
862/// oracle transform. We do the same here: generate random sorted distinct-key bins, run
863/// trim_left, and check the same invariants.
864#[cfg(test)]
865mod property_tests {
866 use proptest::prelude::*;
867
868 use super::*;
869
870 /// Strategy: a non-empty sorted vec of distinct (k: i16, n: u32) pairs.
871 /// Keys are drawn from i16 without repetition; counts are 1..=u32::MAX.
872 fn arb_bins(max_len: usize) -> impl Strategy<Value = SmallVec<[Bin; 4]>> {
873 proptest::collection::btree_map(any::<i16>(), 1u32..=u32::MAX, 1..=max_len)
874 .prop_map(|map| map.into_iter().map(|(k, n)| Bin { k, n }).collect())
875 }
876
877 /// Strategy: a bin_limit in 1..=32 (small enough to exercise collapsing frequently).
878 fn arb_limit() -> impl Strategy<Value = u16> {
879 1u16..=32
880 }
881
882 proptest! {
883 /// After trim_left, the bin count must never exceed bin_limit.
884 ///
885 /// Mirrors the core bin-count invariant checked throughout the sketches-go suite:
886 /// every store operation must leave the store within its configured capacity.
887 #[test]
888 fn prop_bin_count_never_exceeds_limit(
889 mut bins in arb_bins(64),
890 limit in arb_limit(),
891 ) {
892 trim_left(&mut bins, limit);
893 prop_assert!(
894 bins.len() <= limit as usize,
895 "bin count {} exceeded limit {}",
896 bins.len(),
897 limit,
898 );
899 }
900
901 /// After trim_left, bins must remain sorted by key with no duplicate keys.
902 ///
903 /// trim_left only drains a prefix and modifies bins[num_to_remove].n in place;
904 /// it must not disturb the ordering of the surviving bins.
905 #[test]
906 fn prop_output_bins_are_sorted_and_distinct(
907 mut bins in arb_bins(64),
908 limit in arb_limit(),
909 ) {
910 trim_left(&mut bins, limit);
911 for window in bins.windows(2) {
912 prop_assert!(
913 window[0].k < window[1].k,
914 "bins not strictly sorted after trim_left: {:?} >= {:?}",
915 window[0].k,
916 window[1].k,
917 );
918 }
919 }
920
921 /// When bins.len() <= limit, trim_left is a no-op: bins is unchanged.
922 #[test]
923 fn prop_no_op_when_within_limit(
924 bins in arb_bins(16),
925 extra in 0u16..=16,
926 ) {
927 let limit = bins.len() as u16 + extra;
928 let original = bins.clone();
929 let mut bins = bins;
930 trim_left(&mut bins, limit);
931 prop_assert_eq!(bins, original);
932 }
933
934 /// Total count is preserved exactly when no u32 overflow occurs.
935 ///
936 /// This is the key invariant from sketches-go's assertEncodeBins:
937 /// store.TotalCount() must equal the sum of all inserted counts.
938 /// We restrict counts to keep the collapsed sum safely below u32::MAX
939 /// (sketches-go never loses mass because it uses float64; we match that
940 /// guarantee for all inputs where the collapsed bin doesn't overflow u32).
941 ///
942 /// Strategy: counts capped at 1000 so that even if all 64 bins collapse
943 /// into one the sum (≤ 64_000) is far below u32::MAX.
944 #[test]
945 fn prop_total_count_preserved_when_no_overflow(
946 mut bins in proptest::collection::btree_map(any::<i16>(), 1u32..=1000, 1..=64usize)
947 .prop_map(|map| -> SmallVec<[Bin; 4]> {
948 map.into_iter().map(|(k, n)| Bin { k, n }).collect()
949 }),
950 limit in arb_limit(),
951 ) {
952 let total_before: u64 = bins.iter().map(|b| u64::from(b.n)).sum();
953 trim_left(&mut bins, limit);
954 let total_after: u64 = bins.iter().map(|b| u64::from(b.n)).sum();
955 prop_assert_eq!(
956 total_before, total_after,
957 "total count changed: before={}, after={}",
958 total_before, total_after,
959 );
960 }
961
962 /// The surviving bins are always the bin_limit highest-key bins from the input.
963 ///
964 /// This is the direct encoding of the collapsingLowest oracle from sketches-go:
965 /// minCollapsedIndex = maxIndex - limit + 1; all bins with key < minCollapsedIndex
966 /// are removed (their mass folds into minCollapsedIndex). The output keys must
967 /// exactly match the top min(len, limit) keys of the input.
968 #[test]
969 fn prop_output_keys_are_highest_from_input(
970 mut bins in arb_bins(64),
971 limit in arb_limit(),
972 ) {
973 let all_keys: Vec<i16> = bins.iter().map(|b| b.k).collect();
974 let expected_len = all_keys.len().min(limit as usize);
975 let expected_keys: Vec<i16> = all_keys[all_keys.len() - expected_len..].to_vec();
976
977 trim_left(&mut bins, limit);
978
979 let actual_keys: Vec<i16> = bins.iter().map(|b| b.k).collect();
980 prop_assert_eq!(
981 actual_keys, expected_keys,
982 "output keys don't match top {} keys of input",
983 limit,
984 );
985 }
986 }
987}