ddsketch/agent/sketch.rs
1//! Agent-specific DDSketch implementation.
2
3use std::cmp::Ordering;
4
5use datadog_protos::metrics::Dogsketch;
6use ordered_float::OrderedFloat;
7use smallvec::SmallVec;
8
9use super::bin::Bin;
10use super::bucket::Bucket;
11use super::config::{
12 Config, DDSKETCH_CONF_BIN_LIMIT, DDSKETCH_CONF_GAMMA_LN, DDSKETCH_CONF_GAMMA_V, DDSKETCH_CONF_NORM_BIAS,
13 DDSKETCH_CONF_NORM_MIN,
14};
15use crate::canonical::mapping::LogarithmicMapping;
16use crate::common::float_eq;
17
18static SKETCH_CONFIG: Config = Config::new(
19 DDSKETCH_CONF_BIN_LIMIT,
20 DDSKETCH_CONF_GAMMA_V,
21 DDSKETCH_CONF_GAMMA_LN,
22 DDSKETCH_CONF_NORM_MIN,
23 DDSKETCH_CONF_NORM_BIAS,
24);
25const MAX_BIN_WIDTH: u32 = u32::MAX;
26
27/// [DDSketch][ddsketch] implementation based on the [Datadog Agent][ddagent].
28///
29/// This implementation is subtly different from the open-source implementations of `DDSketch`, as Datadog made some
30/// slight tweaks to configuration values and in-memory layout to optimize it for insertion performance within the
31/// agent.
32///
33/// We've mimicked the agent version of `DDSketch` here in order to support a future where we can take sketches shipped
34/// by the agent, handle them internally, merge them, and so on, without any loss of accuracy, eventually forwarding
35/// them to Datadog ourselves.
36///
37/// As such, this implementation is constrained in the same ways: the configuration parameters cannot be changed, the
38/// collapsing strategy is fixed, and we support a limited number of methods for inserting into the sketch.
39///
40/// Importantly, we have a special function, again taken from the agent version, to allow us to interpolate histograms,
41/// specifically our own aggregated histograms, into a sketch so that we can emit useful default quantiles, rather than
42/// having to ship the buckets -- upper bound and count -- to a downstream system that might have no native way to do
43/// the same thing, basically providing no value as they have no way to render useful data from them.
44///
45/// # Features
46///
47/// This crate exposes a single feature, `serde`, which enables serialization and deserialization of `DDSketch` with
48/// `serde`. This feature is not enabled by default, as it can be slightly risky to use. This is primarily due to the
49/// fact that the format of `DDSketch` is not promised to be stable over time. If you enable this feature, you should
50/// take care to avoid storing serialized `DDSketch` data for long periods of time, as deserializing it in the future
51/// may work but could lead to incorrect/unexpected behavior or issues with correctness.
52///
53/// [ddsketch]: https://www.vldb.org/pvldb/vol12/p2195-masson.pdf
54/// [ddagent]: https://github.com/DataDog/datadog-agent
55#[derive(Clone, Debug)]
56#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
57pub struct DDSketch {
58 /// The bins within the sketch.
59 bins: SmallVec<[Bin; 4]>,
60
61 /// The number of observations within the sketch.
62 count: u64,
63
64 /// The minimum value of all observations within the sketch.
65 min: f64,
66
67 /// The maximum value of all observations within the sketch.
68 max: f64,
69
70 /// The sum of all observations within the sketch.
71 sum: f64,
72
73 /// The average value of all observations within the sketch.
74 avg: f64,
75}
76
77impl DDSketch {
78 /// Returns the canonical DDSketch mapping that aligns with the agent sketch's key space.
79 pub fn remap_mapping() -> LogarithmicMapping {
80 LogarithmicMapping::new_with_gamma_and_offset(DDSKETCH_CONF_GAMMA_V, f64::from(DDSKETCH_CONF_NORM_BIAS) + 0.5)
81 .expect("agent sketch gamma and offset are always valid")
82 }
83
84 /// Returns the representative value for the given agent sketch key.
85 pub fn value_for_key(key: i16) -> f64 {
86 SKETCH_CONFIG.bin_lower_bound(key)
87 }
88
89 /// Returns the number of bins in the sketch.
90 pub fn bin_count(&self) -> usize {
91 self.bins.len()
92 }
93
94 /// Whether or not this sketch is empty.
95 pub fn is_empty(&self) -> bool {
96 self.count == 0
97 }
98
99 /// Number of samples currently represented by this sketch.
100 pub fn count(&self) -> u64 {
101 self.count
102 }
103
104 /// Overrides the sample count tracked by this sketch.
105 pub fn set_count(&mut self, count: u64) {
106 self.count = count;
107 }
108
109 /// Overrides the sum of all values tracked by this sketch.
110 pub fn set_sum(&mut self, sum: f64) {
111 self.sum = sum;
112 }
113
114 /// Overrides the average value tracked by this sketch.
115 pub fn set_avg(&mut self, avg: f64) {
116 self.avg = avg;
117 }
118
119 /// Overrides the minimum value tracked by this sketch.
120 pub fn set_min(&mut self, min: f64) {
121 self.min = min;
122 }
123
124 /// Overrides the maximum value tracked by this sketch.
125 pub fn set_max(&mut self, max: f64) {
126 self.max = max;
127 }
128
129 /// Minimum value seen by this sketch.
130 ///
131 /// Returns `None` if the sketch is empty.
132 pub fn min(&self) -> Option<f64> {
133 if self.is_empty() {
134 None
135 } else {
136 Some(self.min)
137 }
138 }
139
140 /// Maximum value seen by this sketch.
141 ///
142 /// Returns `None` if the sketch is empty.
143 pub fn max(&self) -> Option<f64> {
144 if self.is_empty() {
145 None
146 } else {
147 Some(self.max)
148 }
149 }
150
151 /// Sum of all values seen by this sketch.
152 ///
153 /// Returns `None` if the sketch is empty.
154 pub fn sum(&self) -> Option<f64> {
155 if self.is_empty() {
156 None
157 } else {
158 Some(self.sum)
159 }
160 }
161
162 /// Average value seen by this sketch.
163 ///
164 /// Returns `None` if the sketch is empty.
165 pub fn avg(&self) -> Option<f64> {
166 if self.is_empty() {
167 None
168 } else {
169 Some(self.avg)
170 }
171 }
172
173 /// Returns the current bins of this sketch.
174 pub fn bins(&self) -> &[Bin] {
175 &self.bins
176 }
177
178 /// Clears the sketch, removing all bins and resetting all statistics.
179 pub fn clear(&mut self) {
180 self.count = 0;
181 self.min = f64::MAX;
182 self.max = f64::MIN;
183 self.avg = 0.0;
184 self.sum = 0.0;
185 self.bins.clear();
186 }
187
188 fn adjust_basic_stats(&mut self, v: f64, n: u64) {
189 if v < self.min {
190 self.min = v;
191 }
192
193 if v > self.max {
194 self.max = v;
195 }
196
197 self.count += n;
198 self.sum += v * n as f64;
199
200 if n == 1 {
201 self.avg += (v - self.avg) / self.count as f64;
202 } else {
203 // TODO: From the Agent source code, this method apparently loses precision when the
204 // two averages -- v and self.avg -- are close. Is there a better approach?
205 self.avg = self.avg + (v - self.avg) * n as f64 / self.count as f64;
206 }
207 }
208
209 fn insert_key_counts(&mut self, counts: &[(i16, u64)]) {
210 let mut temp = SmallVec::<[Bin; 4]>::new();
211
212 let mut bins_idx = 0;
213 let mut key_idx = 0;
214 let bins_len = self.bins.len();
215 let counts_len = counts.len();
216
217 // PERF TODO: there's probably a fast path to be had where could check if all if the counts have existing bins
218 // that aren't yet full, and we just update them directly, although we'd still be doing a linear scan to find
219 // them since keys aren't 1:1 with their position in `self.bins` but using this method just to update one or two
220 // bins is clearly suboptimal and we wouldn't really want to scan them all just to have to back out and actually
221 // do the non-fast path.. maybe a first pass could be checking if the first/last key falls within our known
222 // min/max key, and if it doesn't, then we know we have to go through the non-fast path, and if it passes, we do
223 // the scan to see if we can just update bins directly?
224 while bins_idx < bins_len && key_idx < counts_len {
225 let bin = self.bins[bins_idx];
226 let vk = counts[key_idx].0;
227 let kn = counts[key_idx].1;
228
229 match bin.k.cmp(&vk) {
230 Ordering::Greater => {
231 generate_bins(&mut temp, vk, kn);
232 key_idx += 1;
233 }
234 Ordering::Less => {
235 temp.push(bin);
236 bins_idx += 1;
237 }
238 Ordering::Equal => {
239 generate_bins(&mut temp, bin.k, u64::from(bin.n) + kn);
240 bins_idx += 1;
241 key_idx += 1;
242 }
243 }
244 }
245
246 temp.extend_from_slice(&self.bins[bins_idx..]);
247
248 while key_idx < counts_len {
249 let vk = counts[key_idx].0;
250 let kn = counts[key_idx].1;
251 generate_bins(&mut temp, vk, kn);
252 key_idx += 1;
253 }
254
255 trim_left(&mut temp, SKETCH_CONFIG.bin_limit);
256
257 // PERF TODO: This is where we might do a mem::swap instead so that we could shove the bin vector into an object
258 // pool but I'm not sure this actually matters at the moment.
259 self.bins = temp;
260 }
261
262 fn insert_keys(&mut self, mut keys: Vec<i16>) {
263 // Updating more than 4 billion keys would be very very weird and likely indicative of something horribly
264 // broken.
265 //
266 // TODO: I don't actually understand why I wrote this assertion in this way. Either the code can handle
267 // collapsing values in order to maintain the relative error bounds, or we have to cap it to the maximum allowed
268 // number of bins. Gotta think about this some more.
269 assert!(keys.len() <= u32::MAX.try_into().expect("we don't support 16-bit systems"));
270
271 keys.sort_unstable();
272
273 let mut temp = SmallVec::<[Bin; 4]>::new();
274
275 let mut bins_idx = 0;
276 let mut key_idx = 0;
277 let bins_len = self.bins.len();
278 let keys_len = keys.len();
279
280 // PERF TODO: there's probably a fast path to be had where could check if all if the counts have existing bins
281 // that aren't yet full, and we just update them directly, although we'd still be doing a linear scan to find
282 // them since keys aren't 1:1 with their position in `self.bins` but using this method just to update one or two
283 // bins is clearly suboptimal and we wouldn't really want to scan them all just to have to back out and actually
284 // do the non-fast path.. maybe a first pass could be checking if the first/last key falls within our known
285 // min/max key, and if it doesn't, then we know we have to go through the non-fast path, and if it passes, we do
286 // the scan to see if we can just update bins directly?
287 while bins_idx < bins_len && key_idx < keys_len {
288 let bin = self.bins[bins_idx];
289 let vk = keys[key_idx];
290
291 match bin.k.cmp(&vk) {
292 Ordering::Greater => {
293 let kn = buf_count_leading_equal(&keys, key_idx);
294 generate_bins(&mut temp, vk, kn);
295 key_idx += kn as usize;
296 }
297 Ordering::Less => {
298 temp.push(bin);
299 bins_idx += 1;
300 }
301 Ordering::Equal => {
302 let kn = buf_count_leading_equal(&keys, key_idx);
303 generate_bins(&mut temp, bin.k, u64::from(bin.n) + kn);
304 bins_idx += 1;
305 key_idx += kn as usize;
306 }
307 }
308 }
309
310 temp.extend_from_slice(&self.bins[bins_idx..]);
311
312 while key_idx < keys_len {
313 let vk = keys[key_idx];
314 let kn = buf_count_leading_equal(&keys, key_idx);
315 generate_bins(&mut temp, vk, kn);
316 key_idx += kn as usize;
317 }
318
319 trim_left(&mut temp, SKETCH_CONFIG.bin_limit);
320
321 // PERF TODO: This is where we might do a mem::swap instead so that we could shove the bin vector into an object
322 // pool but I'm not sure this actually matters at the moment.
323 self.bins = temp;
324 }
325
326 /// Inserts a single value into the sketch.
327 pub fn insert(&mut self, v: f64) {
328 // TODO: This should return a result that makes sure we have enough room to actually add 1 more sample without
329 // hitting `self.config.max_count()`
330 self.adjust_basic_stats(v, 1);
331
332 let key = SKETCH_CONFIG.key(v);
333
334 let mut insert_at = None;
335
336 for (bin_idx, b) in self.bins.iter_mut().enumerate() {
337 if b.k == key {
338 if b.n < MAX_BIN_WIDTH {
339 // Fast path for adding to an existing bin without overflow.
340 b.n += 1;
341 return;
342 } else {
343 insert_at = Some(bin_idx);
344 break;
345 }
346 }
347 if b.k > key {
348 insert_at = Some(bin_idx);
349 break;
350 }
351 }
352
353 if let Some(bin_idx) = insert_at {
354 self.bins.insert(bin_idx, Bin { k: key, n: 1 });
355 } else {
356 self.bins.push(Bin { k: key, n: 1 });
357 }
358 trim_left(&mut self.bins, SKETCH_CONFIG.bin_limit);
359 }
360
361 /// Inserts many values into the sketch.
362 pub fn insert_many(&mut self, vs: &[f64]) {
363 // TODO: This should return a result that makes sure we have enough room to actually add N more samples without
364 // hitting `self.config.bin_limit`.
365 let mut keys = Vec::with_capacity(vs.len());
366 for v in vs {
367 self.adjust_basic_stats(*v, 1);
368 keys.push(SKETCH_CONFIG.key(*v));
369 }
370 self.insert_keys(keys);
371 }
372
373 /// Inserts a single value into the sketch `n` times.
374 pub fn insert_n(&mut self, v: f64, n: u64) {
375 // TODO: This should return a result that makes sure we have enough room to actually add N more samples without
376 // hitting `self.config.max_count()`.
377 if n == 1 {
378 self.insert(v);
379 } else {
380 self.adjust_basic_stats(v, n);
381
382 let key = SKETCH_CONFIG.key(v);
383 self.insert_key_counts(&[(key, n)]);
384 }
385 }
386
387 fn insert_interpolate_bucket(&mut self, lower: f64, upper: f64, count: u64) {
388 // Find the keys for the bins where the lower bound and upper bound would end up, and collect all of the keys in
389 // between, inclusive.
390 let lower_key = SKETCH_CONFIG.key(lower);
391 let upper_key = SKETCH_CONFIG.key(upper);
392 let keys = (lower_key..=upper_key).collect::<Vec<_>>();
393
394 let mut key_counts = Vec::new();
395 let mut remaining_count = count;
396 let distance = upper - lower;
397 let mut start_idx = 0;
398 let mut end_idx = 1;
399 let mut lower_bound = SKETCH_CONFIG.bin_lower_bound(keys[start_idx]);
400 let mut remainder = 0.0;
401
402 while end_idx < keys.len() && remaining_count > 0 {
403 // For each key, map the total distance between the input lower/upper bound against the sketch lower/upper
404 // bound for the current sketch bin, which tells us how much of the input count to apply to the current
405 // sketch bin.
406 let upper_bound = SKETCH_CONFIG.bin_lower_bound(keys[end_idx]);
407 let fkn = ((upper_bound - lower_bound) / distance) * count as f64;
408 if fkn > 1.0 {
409 remainder += fkn - fkn.trunc();
410 }
411
412 // SAFETY: This integer cast is intentional: we want to get the non-fractional part, as we've captured the
413 // fractional part in the above conditional.
414 #[allow(clippy::cast_possible_truncation)]
415 let mut kn = fkn as u64;
416 if remainder > 1.0 {
417 kn += 1;
418 remainder -= 1.0;
419 }
420
421 if kn > 0 {
422 if kn > remaining_count {
423 kn = remaining_count;
424 }
425
426 self.adjust_basic_stats(lower_bound, kn);
427 key_counts.push((keys[start_idx], kn));
428
429 remaining_count -= kn;
430 start_idx = end_idx;
431 lower_bound = upper_bound;
432 }
433
434 end_idx += 1;
435 }
436
437 if remaining_count > 0 {
438 let last_key = keys[start_idx];
439 lower_bound = SKETCH_CONFIG.bin_lower_bound(last_key);
440 self.adjust_basic_stats(lower_bound, remaining_count);
441 key_counts.push((last_key, remaining_count));
442 }
443
444 // Sort the key counts first, as that's required by `insert_key_counts`.
445 key_counts.sort_unstable_by(|(k1, _), (k2, _)| k1.cmp(k2));
446
447 self.insert_key_counts(&key_counts);
448 }
449
450 /// Inserts histogram buckets into the sketch via linear interpolation.
451 ///
452 /// ## Errors
453 ///
454 /// Returns an error if a bucket size is greater that `u32::MAX`.
455 pub fn insert_interpolate_buckets(&mut self, mut buckets: Vec<Bucket>) -> Result<(), &'static str> {
456 // Buckets need to be sorted from lowest to highest so that we can properly calculate the rolling lower/upper
457 // bounds.
458 buckets.sort_by(|a, b| {
459 let oa = OrderedFloat(a.upper_limit);
460 let ob = OrderedFloat(b.upper_limit);
461
462 oa.cmp(&ob)
463 });
464
465 let mut lower = f64::NEG_INFINITY;
466
467 if buckets.iter().any(|bucket| bucket.count > u64::from(u32::MAX)) {
468 return Err("bucket size greater than u32::MAX");
469 }
470
471 for bucket in buckets {
472 let mut upper = bucket.upper_limit;
473 if upper.is_sign_positive() && upper.is_infinite() {
474 upper = lower;
475 } else if lower.is_sign_negative() && lower.is_infinite() {
476 lower = upper;
477 }
478
479 self.insert_interpolate_bucket(lower, upper, bucket.count);
480 lower = bucket.upper_limit;
481 }
482
483 Ok(())
484 }
485
486 /// Adds a bin directly into the sketch.
487 ///
488 /// Used only for unit testing so that we can create a sketch with an exact layout, which allows testing around the
489 /// resulting bins when feeding in specific values, as well as generating explicitly bad layouts for testing.
490 #[allow(dead_code)]
491 pub(crate) fn insert_raw_bin(&mut self, k: i16, n: u32) {
492 let v = SKETCH_CONFIG.bin_lower_bound(k);
493 self.adjust_basic_stats(v, u64::from(n));
494 self.bins.push(Bin { k, n });
495 }
496
497 /// Gets the value at a given quantile.
498 pub fn quantile(&self, q: f64) -> Option<f64> {
499 if self.count == 0 {
500 return None;
501 }
502
503 if q <= 0.0 {
504 return Some(self.min);
505 }
506
507 if q >= 1.0 {
508 return Some(self.max);
509 }
510
511 let mut n = 0.0;
512 let mut estimated = None;
513 let wanted_rank = rank(self.count, q);
514
515 for (i, bin) in self.bins.iter().enumerate() {
516 n += f64::from(bin.n);
517 if n <= wanted_rank {
518 continue;
519 }
520
521 let weight = (n - wanted_rank) / f64::from(bin.n);
522 let mut v_low = SKETCH_CONFIG.bin_lower_bound(bin.k);
523 let mut v_high = v_low * SKETCH_CONFIG.gamma_v;
524
525 if i == self.bins.len() {
526 v_high = self.max;
527 } else if i == 0 {
528 v_low = self.min;
529 }
530
531 estimated = Some(v_low * weight + v_high * (1.0 - weight));
532 break;
533 }
534
535 estimated.map(|v| v.clamp(self.min, self.max)).or(Some(f64::NAN))
536 }
537
538 /// Merges another sketch into this sketch, without a loss of accuracy.
539 ///
540 /// All samples present in the other sketch will be correctly represented in this sketch, and summary statistics
541 /// such as the sum, average, count, min, and max, will represent the sum of samples from both sketches.
542 pub fn merge(&mut self, other: &DDSketch) {
543 // Merge the basic statistics together.
544 self.count += other.count;
545 if other.max > self.max {
546 self.max = other.max;
547 }
548 if other.min < self.min {
549 self.min = other.min;
550 }
551 self.sum += other.sum;
552 self.avg = self.avg + (other.avg - self.avg) * other.count as f64 / self.count as f64;
553
554 // Now merge the bins.
555 let mut temp = SmallVec::<[Bin; 4]>::new();
556
557 let mut bins_idx = 0;
558 for other_bin in &other.bins {
559 let start = bins_idx;
560 while bins_idx < self.bins.len() && self.bins[bins_idx].k < other_bin.k {
561 bins_idx += 1;
562 }
563
564 temp.extend_from_slice(&self.bins[start..bins_idx]);
565
566 if bins_idx >= self.bins.len() || self.bins[bins_idx].k > other_bin.k {
567 temp.push(*other_bin);
568 } else if self.bins[bins_idx].k == other_bin.k {
569 generate_bins(
570 &mut temp,
571 other_bin.k,
572 u64::from(other_bin.n) + u64::from(self.bins[bins_idx].n),
573 );
574 bins_idx += 1;
575 }
576 }
577
578 temp.extend_from_slice(&self.bins[bins_idx..]);
579 trim_left(&mut temp, SKETCH_CONFIG.bin_limit);
580
581 self.bins = temp;
582 }
583
584 /// Merges this sketch into the `Dogsketch` Protocol Buffers representation.
585 pub fn merge_to_dogsketch(&self, dogsketch: &mut Dogsketch) {
586 dogsketch.set_cnt(i64::try_from(self.count).unwrap_or(i64::MAX));
587 dogsketch.set_min(self.min);
588 dogsketch.set_max(self.max);
589 dogsketch.set_avg(self.avg);
590 dogsketch.set_sum(self.sum);
591
592 let mut k = Vec::new();
593 let mut n = Vec::new();
594
595 for bin in &self.bins {
596 k.push(i32::from(bin.k));
597 n.push(bin.n);
598 }
599
600 dogsketch.set_k(k);
601 dogsketch.set_n(n);
602 }
603}
604
605impl PartialEq for DDSketch {
606 fn eq(&self, other: &Self) -> bool {
607 // We skip checking the configuration because we don't allow creating configurations by hand, and it's always
608 // locked to the constants used by the Datadog Agent. We only check the configuration equality manually in
609 // `DDSketch::merge`, to protect ourselves in the future if different configurations become allowed.
610 //
611 // Additionally, we also use floating-point-specific relative comparisons for sum/avg because they can be
612 // minimally different between sketches purely due to floating-point behavior, despite being fed the same exact
613 // data in terms of recorded samples.
614 self.count == other.count
615 && float_eq(self.min, other.min)
616 && float_eq(self.max, other.max)
617 && float_eq(self.sum, other.sum)
618 && float_eq(self.avg, other.avg)
619 && self.bins == other.bins
620 }
621}
622
623impl Default for DDSketch {
624 fn default() -> Self {
625 Self {
626 bins: SmallVec::new(),
627 count: 0,
628 min: f64::MAX,
629 max: f64::MIN,
630 sum: 0.0,
631 avg: 0.0,
632 }
633 }
634}
635
636impl Eq for DDSketch {}
637
638impl TryFrom<Dogsketch> for DDSketch {
639 type Error = &'static str;
640
641 fn try_from(value: Dogsketch) -> Result<Self, Self::Error> {
642 let mut sketch = DDSketch {
643 count: u64::try_from(value.cnt).map_err(|_| "sketch count overflows u64 or is negative")?,
644 min: value.min,
645 max: value.max,
646 avg: value.avg,
647 sum: value.sum,
648 ..Default::default()
649 };
650
651 let k = value.k;
652 let n = value.n;
653
654 if k.len() != n.len() {
655 return Err("k and n bin vectors have differing lengths");
656 }
657
658 for (k, n) in k.into_iter().zip(n.into_iter()) {
659 let k = i16::try_from(k).map_err(|_| "bin key overflows i16")?;
660
661 sketch.bins.push(Bin { k, n });
662 }
663
664 Ok(sketch)
665 }
666}
667
668fn rank(count: u64, q: f64) -> f64 {
669 let rank = q * (count - 1) as f64;
670 rank.round_ties_even()
671}
672
673#[allow(clippy::cast_possible_truncation)]
674fn buf_count_leading_equal(keys: &[i16], start_idx: usize) -> u64 {
675 if start_idx == keys.len() - 1 {
676 return 1;
677 }
678
679 let mut idx = start_idx;
680 while idx < keys.len() && keys[idx] == keys[start_idx] {
681 idx += 1;
682 }
683
684 // SAFETY: We limit the size of the vector (used to provide the slice given to us here) to be no larger than 2^32,
685 // so we can't exceed u64 here.
686 (idx - start_idx) as u64
687}
688
689fn trim_left(bins: &mut SmallVec<[Bin; 4]>, bin_limit: u16) {
690 // We won't ever support Vector running on anything other than a 32-bit platform and above, I imagine, so this
691 // should always be safe.
692 let bin_limit = bin_limit as usize;
693 if bin_limit == 0 || bins.len() <= bin_limit {
694 return;
695 }
696
697 let num_to_remove = bins.len() - bin_limit;
698 let mut missing: u64 = 0;
699
700 // Sum all mass from the bins being removed. Per CollapsingLowestDenseStore in sketches-go,
701 // all removed mass collapses into the first kept bin (the new minimum index).
702 for bin in bins.iter().take(num_to_remove) {
703 missing += u64::from(bin.n);
704 }
705
706 // Fold the accumulated mass into the first kept bin, matching Go's `bins[newMinIndex] += n`.
707 // Any remainder that overflows u32::MAX is discarded — this requires >4B observations in a
708 // single collapsed bin and is an intentional divergence from the Datadog Agent (which uses
709 // float64 counts and never loses mass).
710 bins[num_to_remove].increment(missing);
711
712 // Drop the removed prefix, leaving exactly bin_limit bins.
713 bins.drain(0..num_to_remove);
714}
715
716#[allow(clippy::cast_possible_truncation)]
717fn generate_bins(bins: &mut SmallVec<[Bin; 4]>, k: i16, n: u64) {
718 if n < u64::from(MAX_BIN_WIDTH) {
719 // SAFETY: `n < MAX_BIN_WIDTH = u32::MAX`, so it fits in u32.
720 bins.push(Bin { k, n: n as u32 });
721 } else {
722 let overflow = n % u64::from(MAX_BIN_WIDTH);
723 if overflow != 0 {
724 bins.push(Bin {
725 k,
726 // SAFETY: `overflow = n % u32::MAX`, so overflow <= u32::MAX - 1, which fits in u32.
727 n: overflow as u32,
728 });
729 }
730
731 for _ in 0..(n / u64::from(MAX_BIN_WIDTH)) {
732 bins.push(Bin { k, n: MAX_BIN_WIDTH });
733 }
734 }
735}
736
737#[cfg(test)]
738mod tests {
739 use super::*;
740
741 // Helper: build a SmallVec<[Bin; 4]> from (k, n) pairs. Assumes input is already sorted by k.
742 fn make_bins(pairs: &[(i16, u32)]) -> SmallVec<[Bin; 4]> {
743 pairs.iter().map(|&(k, n)| Bin { k, n }).collect()
744 }
745
746 // Helper: extract (k, n) pairs from a SmallVec for easy assertion.
747 fn to_pairs(bins: &SmallVec<[Bin; 4]>) -> Vec<(i16, u32)> {
748 bins.iter().map(|b| (b.k, b.n)).collect()
749 }
750
751 /// Basic collapse: when bins exceed the limit, the mass from removed bins is merged
752 /// into the first kept bin (lowest surviving key). This mirrors the CollapsingLowestDenseStore
753 /// semantics from sketches-go: all bins with index < (maxIndex - limit + 1) collapse into
754 /// the bin at (maxIndex - limit + 1).
755 ///
756 /// Input: [(0,2), (1,3), (2,4), (3,5)] limit=2 → remove 2 bins
757 /// missing = n[0] + n[1] = 2 + 3 = 5
758 /// first kept bin: k=2, n=4 → increment by 5 → n=9
759 /// Result: [(2,9), (3,5)]
760 #[test]
761 fn trim_left_collapses_removed_mass_into_first_kept_bin() {
762 let mut bins = make_bins(&[(0, 2), (1, 3), (2, 4), (3, 5)]);
763 trim_left(&mut bins, 2);
764 assert_eq!(to_pairs(&bins), vec![(2, 9), (3, 5)]);
765 }
766
767 /// Total count is preserved exactly when the collapse fits within a single u32 bin.
768 ///
769 /// Input: [(10,5), (20,3), (30,7)] limit=2 → remove 1 bin
770 /// missing = 5; first kept bin k=20, n=3 → n=8
771 /// Result: [(20,8), (30,7)], total=15 == 5+3+7
772 #[test]
773 fn trim_left_preserves_total_count_when_no_overflow() {
774 let mut bins = make_bins(&[(10, 5), (20, 3), (30, 7)]);
775 let total_before: u64 = bins.iter().map(|b| u64::from(b.n)).sum();
776 trim_left(&mut bins, 2);
777 let total_after: u64 = bins.iter().map(|b| u64::from(b.n)).sum();
778 assert_eq!(to_pairs(&bins), vec![(20, 8), (30, 7)]);
779 assert_eq!(total_before, total_after);
780 }
781
782 /// With u32 bin counts, collapsed mass from multiple removed bins fits in a single bin
783 /// without saturation for typical weights, fully preserving the total count.
784 ///
785 /// Input: [(0,50000), (1,50000), (2,1)] limit=1 → remove 2 bins
786 /// missing = 100000; bins[2].increment(100000): 100001 < u32::MAX → n=100001
787 /// bins.drain(0..2). Final: [(2,100001)], all mass preserved.
788 ///
789 /// With the old u16 layout, this same input would have saturated at 65535 and discarded
790 /// 34466 observations. u32 eliminates that loss for any per-bin count below ~4.3 billion.
791 #[test]
792 fn trim_left_preserves_exact_count_with_u32_bins() {
793 let mut bins = make_bins(&[(0, 50000), (1, 50000), (2, 1)]);
794 let total_before: u64 = bins.iter().map(|b| u64::from(b.n)).sum();
795 trim_left(&mut bins, 1);
796 let total_after: u64 = bins.iter().map(|b| u64::from(b.n)).sum();
797 assert_eq!(bins.len(), 1);
798 assert_eq!(bins[0].k, 2);
799 assert_eq!(bins[0].n, 100001);
800 assert_eq!(total_before, total_after, "all mass must be preserved with u32 bins");
801 }
802
803 /// When already at or under the limit, trim_left is a no-op.
804 #[test]
805 fn trim_left_no_op_when_within_limit() {
806 let original = make_bins(&[(5, 10), (6, 20)]);
807 let mut bins = original.clone();
808 trim_left(&mut bins, 2);
809 assert_eq!(to_pairs(&bins), to_pairs(&original));
810 trim_left(&mut bins, 3);
811 assert_eq!(to_pairs(&bins), to_pairs(&original));
812 }
813
814 /// Regression test for trim_left bin count with large per-sample weights.
815 ///
816 /// With the old u16 layout, a sample weight of ~260M would generate ceil(260M / 65535) = 3969
817 /// bins per key, causing bin count explosion and an encoder panic. With u32, the same weight
818 /// fits in a single bin (260M < u32::MAX ≈ 4.3B), so one insert_n call produces exactly one
819 /// bin per key and the bin limit is trivially respected.
820 ///
821 /// This test inserts several values with a weight representative of what ADP receives when
822 /// clamping an incoming sample rate of 3e-9 to its minimum of 3.845e-9 (~260M per sample),
823 /// then asserts the bin count never exceeds DDSKETCH_CONF_BIN_LIMIT.
824 #[test]
825 fn trim_left_respects_bin_limit_with_large_weights() {
826 // Weight corresponding to ADP's minimum safe sample rate (1 / 3.845e-9 ≈ 260_078_024).
827 // With u32 bins, this fits in a single bin per key (260_078_024 < u32::MAX).
828 let weight: u64 = 260_078_024;
829 let bin_limit = usize::from(DDSKETCH_CONF_BIN_LIMIT);
830
831 let mut sketch = DDSketch::default();
832
833 // Insert enough distinct values to repeatedly trigger trim_left. Ten values is more
834 // than sufficient; two already exceed the bin limit without the fix.
835 for i in 1..=10_i32 {
836 sketch.insert_n(f64::from(i), weight);
837 assert!(
838 sketch.bins().len() <= bin_limit,
839 "bin count {} exceeded limit {} after inserting {} value(s) at weight {}",
840 sketch.bins().len(),
841 bin_limit,
842 i,
843 weight,
844 );
845 }
846 }
847
848 /// When the accumulated missing mass plus the first kept bin's existing count exceeds
849 /// u32::MAX, increment saturates at u32::MAX and the remainder is discarded.
850 ///
851 /// Input: [(0, u32::MAX), (1, 1)] limit=1 → remove 1 bin
852 /// missing = u32::MAX; bins[1].increment(u32::MAX): next = u32::MAX+1 > u32::MAX
853 /// → n = u32::MAX, remainder = 1 (discarded)
854 /// Final: [(1, u32::MAX)] — 1 observation lost
855 #[test]
856 fn trim_left_saturates_first_kept_bin_and_discards_remainder() {
857 let mut bins = make_bins(&[(0, u32::MAX), (1, 1)]);
858 trim_left(&mut bins, 1);
859 assert_eq!(bins.len(), 1);
860 assert_eq!(bins[0].k, 1);
861 assert_eq!(bins[0].n, u32::MAX);
862 }
863
864 /// Collapsing works correctly with negative bin keys. The highest key always survives;
865 /// lower (more negative) keys collapse into the first kept key.
866 /// Adapted from TestAddIntDatasets in sketches-go which covers negative indices.
867 ///
868 /// Input: [(-3,5), (-2,3), (-1,2), (0,1)] limit=2 → remove 2 bins
869 /// missing = 5+3=8; bins[-1].increment(8): 2+8=10 < u32::MAX → n=10
870 /// Final: [(-1,10), (0,1)]
871 #[test]
872 fn trim_left_collapses_negative_keys_correctly() {
873 let mut bins = make_bins(&[(-3, 5), (-2, 3), (-1, 2), (0, 1)]);
874 trim_left(&mut bins, 2);
875 assert_eq!(to_pairs(&bins), vec![(-1, 10), (0, 1)]);
876 }
877
878 /// Monotonic ascending sequence: inserting keys 0..N where N > limit collapses the
879 /// lowest keys into the first kept key, with their total mass summed there.
880 /// Adapted from TestAddMonotonous in sketches-go.
881 ///
882 /// Input: [(0,1),(1,1),(2,1),(3,1),(4,1),(5,1),(6,1),(7,1),(8,1),(9,1)] limit=4
883 /// num_to_remove=6; missing=6; bins[6].increment(6): 1+6=7 → n=7
884 /// Final: [(6,7),(7,1),(8,1),(9,1)] — only top 4 keys kept, collapsed mass in first
885 #[test]
886 fn trim_left_monotonic_ascending_keeps_top_keys() {
887 let pairs: Vec<(i16, u32)> = (0..10).map(|i| (i, 1)).collect();
888 let mut bins = make_bins(&pairs);
889 trim_left(&mut bins, 4);
890 assert_eq!(to_pairs(&bins), vec![(6, 7), (7, 1), (8, 1), (9, 1)]);
891 }
892}
893
894/// Property-based tests for trim_left, adapted from TestAddFuzzy / TestAddIntFuzzy /
895/// TestMergeFuzzy in sketches-go/ddsketch/store/store_test.go.
896///
897/// The sketches-go suite generates random (index, count) inputs, passes them through
898/// CollapsingLowestDenseStore, and asserts structural invariants using the `collapsingLowest`
899/// oracle transform. We do the same here: generate random sorted distinct-key bins, run
900/// trim_left, and check the same invariants.
901#[cfg(test)]
902mod property_tests {
903 use proptest::prelude::*;
904
905 use super::*;
906
907 /// Strategy: a non-empty sorted vec of distinct (k: i16, n: u32) pairs.
908 /// Keys are drawn from i16 without repetition; counts are 1..=u32::MAX.
909 fn arb_bins(max_len: usize) -> impl Strategy<Value = SmallVec<[Bin; 4]>> {
910 proptest::collection::btree_map(any::<i16>(), 1u32..=u32::MAX, 1..=max_len)
911 .prop_map(|map| map.into_iter().map(|(k, n)| Bin { k, n }).collect())
912 }
913
914 /// Strategy: a bin_limit in 1..=32 (small enough to exercise collapsing frequently).
915 fn arb_limit() -> impl Strategy<Value = u16> {
916 1u16..=32
917 }
918
919 proptest! {
920 /// After trim_left, the bin count must never exceed bin_limit.
921 ///
922 /// Mirrors the core bin-count invariant checked throughout the sketches-go suite:
923 /// every store operation must leave the store within its configured capacity.
924 #[test]
925 fn prop_bin_count_never_exceeds_limit(
926 mut bins in arb_bins(64),
927 limit in arb_limit(),
928 ) {
929 trim_left(&mut bins, limit);
930 prop_assert!(
931 bins.len() <= limit as usize,
932 "bin count {} exceeded limit {}",
933 bins.len(),
934 limit,
935 );
936 }
937
938 /// After trim_left, bins must remain sorted by key with no duplicate keys.
939 ///
940 /// trim_left only drains a prefix and modifies bins[num_to_remove].n in place;
941 /// it must not disturb the ordering of the surviving bins.
942 #[test]
943 fn prop_output_bins_are_sorted_and_distinct(
944 mut bins in arb_bins(64),
945 limit in arb_limit(),
946 ) {
947 trim_left(&mut bins, limit);
948 for window in bins.windows(2) {
949 prop_assert!(
950 window[0].k < window[1].k,
951 "bins not strictly sorted after trim_left: {:?} >= {:?}",
952 window[0].k,
953 window[1].k,
954 );
955 }
956 }
957
958 /// When bins.len() <= limit, trim_left is a no-op: bins is unchanged.
959 #[test]
960 fn prop_no_op_when_within_limit(
961 bins in arb_bins(16),
962 extra in 0u16..=16,
963 ) {
964 let limit = bins.len() as u16 + extra;
965 let original = bins.clone();
966 let mut bins = bins;
967 trim_left(&mut bins, limit);
968 prop_assert_eq!(bins, original);
969 }
970
971 /// Total count is preserved exactly when no u32 overflow occurs.
972 ///
973 /// This is the key invariant from sketches-go's assertEncodeBins:
974 /// store.TotalCount() must equal the sum of all inserted counts.
975 /// We restrict counts to keep the collapsed sum safely below u32::MAX
976 /// (sketches-go never loses mass because it uses float64; we match that
977 /// guarantee for all inputs where the collapsed bin doesn't overflow u32).
978 ///
979 /// Strategy: counts capped at 1000 so that even if all 64 bins collapse
980 /// into one the sum (≤ 64_000) is far below u32::MAX.
981 #[test]
982 fn prop_total_count_preserved_when_no_overflow(
983 mut bins in proptest::collection::btree_map(any::<i16>(), 1u32..=1000, 1..=64usize)
984 .prop_map(|map| -> SmallVec<[Bin; 4]> {
985 map.into_iter().map(|(k, n)| Bin { k, n }).collect()
986 }),
987 limit in arb_limit(),
988 ) {
989 let total_before: u64 = bins.iter().map(|b| u64::from(b.n)).sum();
990 trim_left(&mut bins, limit);
991 let total_after: u64 = bins.iter().map(|b| u64::from(b.n)).sum();
992 prop_assert_eq!(
993 total_before, total_after,
994 "total count changed: before={}, after={}",
995 total_before, total_after,
996 );
997 }
998
999 /// The surviving bins are always the bin_limit highest-key bins from the input.
1000 ///
1001 /// This is the direct encoding of the collapsingLowest oracle from sketches-go:
1002 /// minCollapsedIndex = maxIndex - limit + 1; all bins with key < minCollapsedIndex
1003 /// are removed (their mass folds into minCollapsedIndex). The output keys must
1004 /// exactly match the top min(len, limit) keys of the input.
1005 #[test]
1006 fn prop_output_keys_are_highest_from_input(
1007 mut bins in arb_bins(64),
1008 limit in arb_limit(),
1009 ) {
1010 let all_keys: Vec<i16> = bins.iter().map(|b| b.k).collect();
1011 let expected_len = all_keys.len().min(limit as usize);
1012 let expected_keys: Vec<i16> = all_keys[all_keys.len() - expected_len..].to_vec();
1013
1014 trim_left(&mut bins, limit);
1015
1016 let actual_keys: Vec<i16> = bins.iter().map(|b| b.k).collect();
1017 prop_assert_eq!(
1018 actual_keys, expected_keys,
1019 "output keys don't match top {} keys of input",
1020 limit,
1021 );
1022 }
1023 }
1024}