1use std::{cmp::Ordering, mem};
4
5use datadog_protos::metrics::Dogsketch;
6use ordered_float::OrderedFloat;
7use smallvec::SmallVec;
8
9use super::bin::Bin;
10use super::bucket::Bucket;
11use super::config::{
12 Config, DDSKETCH_CONF_BIN_LIMIT, DDSKETCH_CONF_GAMMA_LN, DDSKETCH_CONF_GAMMA_V, DDSKETCH_CONF_NORM_BIAS,
13 DDSKETCH_CONF_NORM_MIN,
14};
15use crate::common::float_eq;
16
17static SKETCH_CONFIG: Config = Config::new(
18 DDSKETCH_CONF_BIN_LIMIT,
19 DDSKETCH_CONF_GAMMA_V,
20 DDSKETCH_CONF_GAMMA_LN,
21 DDSKETCH_CONF_NORM_MIN,
22 DDSKETCH_CONF_NORM_BIAS,
23);
24const MAX_BIN_WIDTH: u16 = u16::MAX;
25
26#[derive(Clone, Debug)]
55#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
56pub struct DDSketch {
57 bins: SmallVec<[Bin; 4]>,
59
60 count: u64,
62
63 min: f64,
65
66 max: f64,
68
69 sum: f64,
71
72 avg: f64,
74}
75
76impl DDSketch {
77 pub fn bin_count(&self) -> usize {
79 self.bins.len()
80 }
81
82 pub fn is_empty(&self) -> bool {
84 self.count == 0
85 }
86
87 pub fn count(&self) -> u64 {
89 self.count
90 }
91
92 pub fn min(&self) -> Option<f64> {
96 if self.is_empty() {
97 None
98 } else {
99 Some(self.min)
100 }
101 }
102
103 pub fn max(&self) -> Option<f64> {
107 if self.is_empty() {
108 None
109 } else {
110 Some(self.max)
111 }
112 }
113
114 pub fn sum(&self) -> Option<f64> {
118 if self.is_empty() {
119 None
120 } else {
121 Some(self.sum)
122 }
123 }
124
125 pub fn avg(&self) -> Option<f64> {
129 if self.is_empty() {
130 None
131 } else {
132 Some(self.avg)
133 }
134 }
135
136 pub fn bins(&self) -> &[Bin] {
138 &self.bins
139 }
140
141 pub fn clear(&mut self) {
143 self.count = 0;
144 self.min = f64::MAX;
145 self.max = f64::MIN;
146 self.avg = 0.0;
147 self.sum = 0.0;
148 self.bins.clear();
149 }
150
151 fn adjust_basic_stats(&mut self, v: f64, n: u64) {
152 if v < self.min {
153 self.min = v;
154 }
155
156 if v > self.max {
157 self.max = v;
158 }
159
160 self.count += n;
161 self.sum += v * n as f64;
162
163 if n == 1 {
164 self.avg += (v - self.avg) / self.count as f64;
165 } else {
166 self.avg = self.avg + (v - self.avg) * n as f64 / self.count as f64;
169 }
170 }
171
172 fn insert_key_counts(&mut self, counts: &[(i16, u64)]) {
173 let mut temp = SmallVec::<[Bin; 4]>::new();
174
175 let mut bins_idx = 0;
176 let mut key_idx = 0;
177 let bins_len = self.bins.len();
178 let counts_len = counts.len();
179
180 while bins_idx < bins_len && key_idx < counts_len {
188 let bin = self.bins[bins_idx];
189 let vk = counts[key_idx].0;
190 let kn = counts[key_idx].1;
191
192 match bin.k.cmp(&vk) {
193 Ordering::Greater => {
194 generate_bins(&mut temp, vk, kn);
195 key_idx += 1;
196 }
197 Ordering::Less => {
198 temp.push(bin);
199 bins_idx += 1;
200 }
201 Ordering::Equal => {
202 generate_bins(&mut temp, bin.k, u64::from(bin.n) + kn);
203 bins_idx += 1;
204 key_idx += 1;
205 }
206 }
207 }
208
209 temp.extend_from_slice(&self.bins[bins_idx..]);
210
211 while key_idx < counts_len {
212 let vk = counts[key_idx].0;
213 let kn = counts[key_idx].1;
214 generate_bins(&mut temp, vk, kn);
215 key_idx += 1;
216 }
217
218 trim_left(&mut temp, SKETCH_CONFIG.bin_limit);
219
220 self.bins = temp;
223 }
224
225 fn insert_keys(&mut self, mut keys: Vec<i16>) {
226 assert!(keys.len() <= u32::MAX.try_into().expect("we don't support 16-bit systems"));
233
234 keys.sort_unstable();
235
236 let mut temp = SmallVec::<[Bin; 4]>::new();
237
238 let mut bins_idx = 0;
239 let mut key_idx = 0;
240 let bins_len = self.bins.len();
241 let keys_len = keys.len();
242
243 while bins_idx < bins_len && key_idx < keys_len {
251 let bin = self.bins[bins_idx];
252 let vk = keys[key_idx];
253
254 match bin.k.cmp(&vk) {
255 Ordering::Greater => {
256 let kn = buf_count_leading_equal(&keys, key_idx);
257 generate_bins(&mut temp, vk, kn);
258 key_idx += kn as usize;
259 }
260 Ordering::Less => {
261 temp.push(bin);
262 bins_idx += 1;
263 }
264 Ordering::Equal => {
265 let kn = buf_count_leading_equal(&keys, key_idx);
266 generate_bins(&mut temp, bin.k, u64::from(bin.n) + kn);
267 bins_idx += 1;
268 key_idx += kn as usize;
269 }
270 }
271 }
272
273 temp.extend_from_slice(&self.bins[bins_idx..]);
274
275 while key_idx < keys_len {
276 let vk = keys[key_idx];
277 let kn = buf_count_leading_equal(&keys, key_idx);
278 generate_bins(&mut temp, vk, kn);
279 key_idx += kn as usize;
280 }
281
282 trim_left(&mut temp, SKETCH_CONFIG.bin_limit);
283
284 self.bins = temp;
287 }
288
289 pub fn insert(&mut self, v: f64) {
291 self.adjust_basic_stats(v, 1);
294
295 let key = SKETCH_CONFIG.key(v);
296
297 let mut insert_at = None;
298
299 for (bin_idx, b) in self.bins.iter_mut().enumerate() {
300 if b.k == key {
301 if b.n < MAX_BIN_WIDTH {
302 b.n += 1;
304 return;
305 } else {
306 insert_at = Some(bin_idx);
307 break;
308 }
309 }
310 if b.k > key {
311 insert_at = Some(bin_idx);
312 break;
313 }
314 }
315
316 if let Some(bin_idx) = insert_at {
317 self.bins.insert(bin_idx, Bin { k: key, n: 1 });
318 } else {
319 self.bins.push(Bin { k: key, n: 1 });
320 }
321 trim_left(&mut self.bins, SKETCH_CONFIG.bin_limit);
322 }
323
324 pub fn insert_many(&mut self, vs: &[f64]) {
326 let mut keys = Vec::with_capacity(vs.len());
329 for v in vs {
330 self.adjust_basic_stats(*v, 1);
331 keys.push(SKETCH_CONFIG.key(*v));
332 }
333 self.insert_keys(keys);
334 }
335
336 pub fn insert_n(&mut self, v: f64, n: u64) {
338 if n == 1 {
341 self.insert(v);
342 } else {
343 self.adjust_basic_stats(v, n);
344
345 let key = SKETCH_CONFIG.key(v);
346 self.insert_key_counts(&[(key, n)]);
347 }
348 }
349
350 fn insert_interpolate_bucket(&mut self, lower: f64, upper: f64, count: u64) {
351 let lower_key = SKETCH_CONFIG.key(lower);
354 let upper_key = SKETCH_CONFIG.key(upper);
355 let keys = (lower_key..=upper_key).collect::<Vec<_>>();
356
357 let mut key_counts = Vec::new();
358 let mut remaining_count = count;
359 let distance = upper - lower;
360 let mut start_idx = 0;
361 let mut end_idx = 1;
362 let mut lower_bound = SKETCH_CONFIG.bin_lower_bound(keys[start_idx]);
363 let mut remainder = 0.0;
364
365 while end_idx < keys.len() && remaining_count > 0 {
366 let upper_bound = SKETCH_CONFIG.bin_lower_bound(keys[end_idx]);
370 let fkn = ((upper_bound - lower_bound) / distance) * count as f64;
371 if fkn > 1.0 {
372 remainder += fkn - fkn.trunc();
373 }
374
375 #[allow(clippy::cast_possible_truncation)]
378 let mut kn = fkn as u64;
379 if remainder > 1.0 {
380 kn += 1;
381 remainder -= 1.0;
382 }
383
384 if kn > 0 {
385 if kn > remaining_count {
386 kn = remaining_count;
387 }
388
389 self.adjust_basic_stats(lower_bound, kn);
390 key_counts.push((keys[start_idx], kn));
391
392 remaining_count -= kn;
393 start_idx = end_idx;
394 lower_bound = upper_bound;
395 }
396
397 end_idx += 1;
398 }
399
400 if remaining_count > 0 {
401 let last_key = keys[start_idx];
402 lower_bound = SKETCH_CONFIG.bin_lower_bound(last_key);
403 self.adjust_basic_stats(lower_bound, remaining_count);
404 key_counts.push((last_key, remaining_count));
405 }
406
407 key_counts.sort_unstable_by(|(k1, _), (k2, _)| k1.cmp(k2));
409
410 self.insert_key_counts(&key_counts);
411 }
412
413 pub fn insert_interpolate_buckets(&mut self, mut buckets: Vec<Bucket>) -> Result<(), &'static str> {
419 buckets.sort_by(|a, b| {
422 let oa = OrderedFloat(a.upper_limit);
423 let ob = OrderedFloat(b.upper_limit);
424
425 oa.cmp(&ob)
426 });
427
428 let mut lower = f64::NEG_INFINITY;
429
430 if buckets.iter().any(|bucket| bucket.count > u64::from(u32::MAX)) {
431 return Err("bucket size greater than u32::MAX");
432 }
433
434 for bucket in buckets {
435 let mut upper = bucket.upper_limit;
436 if upper.is_sign_positive() && upper.is_infinite() {
437 upper = lower;
438 } else if lower.is_sign_negative() && lower.is_infinite() {
439 lower = upper;
440 }
441
442 self.insert_interpolate_bucket(lower, upper, bucket.count);
443 lower = bucket.upper_limit;
444 }
445
446 Ok(())
447 }
448
449 #[allow(dead_code)]
454 pub(crate) fn insert_raw_bin(&mut self, k: i16, n: u16) {
455 let v = SKETCH_CONFIG.bin_lower_bound(k);
456 self.adjust_basic_stats(v, u64::from(n));
457 self.bins.push(Bin { k, n });
458 }
459
460 pub fn quantile(&self, q: f64) -> Option<f64> {
462 if self.count == 0 {
463 return None;
464 }
465
466 if q <= 0.0 {
467 return Some(self.min);
468 }
469
470 if q >= 1.0 {
471 return Some(self.max);
472 }
473
474 let mut n = 0.0;
475 let mut estimated = None;
476 let wanted_rank = rank(self.count, q);
477
478 for (i, bin) in self.bins.iter().enumerate() {
479 n += f64::from(bin.n);
480 if n <= wanted_rank {
481 continue;
482 }
483
484 let weight = (n - wanted_rank) / f64::from(bin.n);
485 let mut v_low = SKETCH_CONFIG.bin_lower_bound(bin.k);
486 let mut v_high = v_low * SKETCH_CONFIG.gamma_v;
487
488 if i == self.bins.len() {
489 v_high = self.max;
490 } else if i == 0 {
491 v_low = self.min;
492 }
493
494 estimated = Some(v_low * weight + v_high * (1.0 - weight));
495 break;
496 }
497
498 estimated.map(|v| v.clamp(self.min, self.max)).or(Some(f64::NAN))
499 }
500
501 pub fn merge(&mut self, other: &DDSketch) {
506 self.count += other.count;
508 if other.max > self.max {
509 self.max = other.max;
510 }
511 if other.min < self.min {
512 self.min = other.min;
513 }
514 self.sum += other.sum;
515 self.avg = self.avg + (other.avg - self.avg) * other.count as f64 / self.count as f64;
516
517 let mut temp = SmallVec::<[Bin; 4]>::new();
519
520 let mut bins_idx = 0;
521 for other_bin in &other.bins {
522 let start = bins_idx;
523 while bins_idx < self.bins.len() && self.bins[bins_idx].k < other_bin.k {
524 bins_idx += 1;
525 }
526
527 temp.extend_from_slice(&self.bins[start..bins_idx]);
528
529 if bins_idx >= self.bins.len() || self.bins[bins_idx].k > other_bin.k {
530 temp.push(*other_bin);
531 } else if self.bins[bins_idx].k == other_bin.k {
532 generate_bins(
533 &mut temp,
534 other_bin.k,
535 u64::from(other_bin.n) + u64::from(self.bins[bins_idx].n),
536 );
537 bins_idx += 1;
538 }
539 }
540
541 temp.extend_from_slice(&self.bins[bins_idx..]);
542 trim_left(&mut temp, SKETCH_CONFIG.bin_limit);
543
544 self.bins = temp;
545 }
546
547 pub fn merge_to_dogsketch(&self, dogsketch: &mut Dogsketch) {
549 dogsketch.set_cnt(i64::try_from(self.count).unwrap_or(i64::MAX));
550 dogsketch.set_min(self.min);
551 dogsketch.set_max(self.max);
552 dogsketch.set_avg(self.avg);
553 dogsketch.set_sum(self.sum);
554
555 let mut k = Vec::new();
556 let mut n = Vec::new();
557
558 for bin in &self.bins {
559 k.push(i32::from(bin.k));
560 n.push(u32::from(bin.n));
561 }
562
563 dogsketch.set_k(k);
564 dogsketch.set_n(n);
565 }
566}
567
568impl PartialEq for DDSketch {
569 fn eq(&self, other: &Self) -> bool {
570 self.count == other.count
578 && float_eq(self.min, other.min)
579 && float_eq(self.max, other.max)
580 && float_eq(self.sum, other.sum)
581 && float_eq(self.avg, other.avg)
582 && self.bins == other.bins
583 }
584}
585
586impl Default for DDSketch {
587 fn default() -> Self {
588 Self {
589 bins: SmallVec::new(),
590 count: 0,
591 min: f64::MAX,
592 max: f64::MIN,
593 sum: 0.0,
594 avg: 0.0,
595 }
596 }
597}
598
599impl Eq for DDSketch {}
600
601impl TryFrom<Dogsketch> for DDSketch {
602 type Error = &'static str;
603
604 fn try_from(value: Dogsketch) -> Result<Self, Self::Error> {
605 let mut sketch = DDSketch {
606 count: u64::try_from(value.cnt).map_err(|_| "sketch count overflows u64 or is negative")?,
607 min: value.min,
608 max: value.max,
609 avg: value.avg,
610 sum: value.sum,
611 ..Default::default()
612 };
613
614 let k = value.k;
615 let n = value.n;
616
617 if k.len() != n.len() {
618 return Err("k and n bin vectors have differing lengths");
619 }
620
621 for (k, n) in k.into_iter().zip(n.into_iter()) {
622 let k = i16::try_from(k).map_err(|_| "bin key overflows i16")?;
623 let n = u16::try_from(n).map_err(|_| "bin count overflows u16")?;
624
625 sketch.bins.push(Bin { k, n });
626 }
627
628 Ok(sketch)
629 }
630}
631
632fn rank(count: u64, q: f64) -> f64 {
633 let rank = q * (count - 1) as f64;
634 rank.round_ties_even()
635}
636
637#[allow(clippy::cast_possible_truncation)]
638fn buf_count_leading_equal(keys: &[i16], start_idx: usize) -> u64 {
639 if start_idx == keys.len() - 1 {
640 return 1;
641 }
642
643 let mut idx = start_idx;
644 while idx < keys.len() && keys[idx] == keys[start_idx] {
645 idx += 1;
646 }
647
648 (idx - start_idx) as u64
651}
652
653fn trim_left(bins: &mut SmallVec<[Bin; 4]>, bin_limit: u16) {
654 let bin_limit = bin_limit as usize;
657 if bin_limit == 0 || bins.len() < bin_limit {
658 return;
659 }
660
661 let num_to_remove = bins.len() - bin_limit;
662 let mut missing = 0;
663 let mut overflow = SmallVec::<[Bin; 4]>::new();
664
665 for bin in bins.iter().take(num_to_remove) {
666 missing += u64::from(bin.n);
667
668 if missing > u64::from(MAX_BIN_WIDTH) {
669 overflow.push(Bin {
670 k: bin.k,
671 n: MAX_BIN_WIDTH,
672 });
673
674 missing -= u64::from(MAX_BIN_WIDTH);
675 }
676 }
677
678 let bin_remove = &mut bins[num_to_remove];
679 missing = bin_remove.increment(missing);
680 if missing > 0 {
681 generate_bins(&mut overflow, bin_remove.k, missing);
682 }
683
684 let overflow_len = overflow.len();
685 let (_, bins_end) = bins.split_at(num_to_remove);
686 overflow.extend_from_slice(bins_end);
687
688 overflow.truncate(bin_limit + overflow_len);
691
692 mem::swap(bins, &mut overflow);
693}
694
695#[allow(clippy::cast_possible_truncation)]
696fn generate_bins(bins: &mut SmallVec<[Bin; 4]>, k: i16, n: u64) {
697 if n < u64::from(MAX_BIN_WIDTH) {
698 bins.push(Bin { k, n: n as u16 });
700 } else {
701 let overflow = n % u64::from(MAX_BIN_WIDTH);
702 if overflow != 0 {
703 bins.push(Bin {
704 k,
705 n: overflow as u16,
707 });
708 }
709
710 for _ in 0..(n / u64::from(MAX_BIN_WIDTH)) {
711 bins.push(Bin { k, n: MAX_BIN_WIDTH });
712 }
713 }
714}