1use datadog_protos::sketches::DDSketch as ProtoDDSketch;
4
5use super::error::ProtoConversionError;
6use super::mapping::{IndexMapping, LogarithmicMapping};
7use super::store::{CollapsingLowestDenseStore, Store};
8
9#[derive(Clone, Debug)]
33pub struct DDSketch<M: IndexMapping = LogarithmicMapping, S: Store = CollapsingLowestDenseStore> {
34 mapping: M,
36
37 positive_store: S,
39
40 negative_store: S,
42
43 zero_count: u64,
45}
46
47impl DDSketch<LogarithmicMapping, CollapsingLowestDenseStore> {
48 pub fn with_relative_accuracy(relative_accuracy: f64) -> Result<Self, &'static str> {
56 let mapping = LogarithmicMapping::new(relative_accuracy)?;
57 Ok(Self::new(
58 mapping,
59 CollapsingLowestDenseStore::default(),
60 CollapsingLowestDenseStore::default(),
61 ))
62 }
63}
64
65impl<M: IndexMapping, S: Store> DDSketch<M, S> {
66 pub fn new(mapping: M, positive_store: S, negative_store: S) -> Self {
68 Self {
69 mapping,
70 positive_store,
71 negative_store,
72 zero_count: 0,
73 }
74 }
75
76 pub fn add(&mut self, value: f64) {
78 self.add_n(value, 1);
79 }
80
81 pub fn add_n(&mut self, value: f64, n: u64) {
85 if n == 0 {
86 return;
87 }
88
89 if value > self.mapping.min_indexable_value() {
90 let index = self.mapping.index(value);
91 self.positive_store.add(index, n);
92 } else if value < -self.mapping.min_indexable_value() {
93 let index = self.mapping.index(-value);
94 self.negative_store.add(index, n);
95 } else {
96 self.zero_count += n;
97 }
98 }
99
100 pub fn quantile(&self, q: f64) -> Option<f64> {
107 if self.is_empty() {
108 return None;
109 }
110
111 if !(0.0..=1.0).contains(&q) {
112 return None;
113 }
114
115 let rank = (q * (self.count() - 1) as f64).round_ties_even() as u64;
116
117 let negative_count = self.negative_store.total_count();
118 let total_negative_and_zero = negative_count + self.zero_count;
119
120 if rank < negative_count {
121 let reverse_rank = negative_count - rank - 1;
123 if let Some(index) = self.negative_store.key_at_rank(reverse_rank) {
124 return Some(-self.mapping.value(index));
125 }
126 } else if rank < total_negative_and_zero {
127 return Some(0.0);
128 } else {
129 let positive_rank = rank - total_negative_and_zero;
130 if let Some(index) = self.positive_store.key_at_rank(positive_rank) {
131 return Some(self.mapping.value(index));
132 }
133 }
134
135 unreachable!("rank out of bounds on non-empty sketch")
136 }
137
138 pub fn merge(&mut self, other: &Self)
142 where
143 M: PartialEq,
144 {
145 if other.is_empty() {
146 return;
147 }
148
149 self.positive_store.merge(&other.positive_store);
150 self.negative_store.merge(&other.negative_store);
151 self.zero_count += other.zero_count;
152 }
153
154 pub fn is_empty(&self) -> bool {
156 self.count() == 0
157 }
158
159 pub fn count(&self) -> u64 {
161 self.negative_store().total_count() + self.positive_store().total_count() + self.zero_count
162 }
163
164 pub fn clear(&mut self) {
166 self.positive_store.clear();
167 self.negative_store.clear();
168 self.zero_count = 0;
169 }
170
171 pub fn mapping(&self) -> &M {
173 &self.mapping
174 }
175
176 pub fn positive_store(&self) -> &S {
178 &self.positive_store
179 }
180
181 pub fn negative_store(&self) -> &S {
183 &self.negative_store
184 }
185
186 pub fn zero_count(&self) -> u64 {
188 self.zero_count
189 }
190
191 pub fn relative_accuracy(&self) -> f64 {
193 self.mapping.relative_accuracy()
194 }
195
196 pub fn from_proto(proto: &ProtoDDSketch, mapping: M) -> Result<Self, ProtoConversionError>
221 where
222 S: Default,
223 {
224 let proto_mapping = proto.mapping.as_ref().ok_or(ProtoConversionError::MissingMapping)?;
226 mapping.validate_proto_mapping(proto_mapping)?;
227
228 let zero_count = if proto.zeroCount < 0.0 {
230 return Err(ProtoConversionError::NegativeZeroCount { count: proto.zeroCount });
231 } else if proto.zeroCount.fract() != 0.0 {
232 return Err(ProtoConversionError::NonIntegerZeroCount { count: proto.zeroCount });
233 } else {
234 proto.zeroCount as u64
235 };
236
237 let mut positive_store = S::default();
238 if let Some(proto_positive) = proto.positiveValues.as_ref() {
239 positive_store.merge_from_proto(proto_positive)?;
240 }
241
242 let mut negative_store = S::default();
243 if let Some(proto_negative) = proto.negativeValues.as_ref() {
244 negative_store.merge_from_proto(proto_negative)?;
245 }
246
247 Ok(Self {
248 mapping,
249 positive_store,
250 negative_store,
251 zero_count,
252 })
253 }
254
255 pub fn to_proto(&self) -> ProtoDDSketch {
262 let mut proto = ProtoDDSketch::new();
263
264 proto.set_mapping(self.mapping.to_proto());
265
266 if !self.positive_store().is_empty() {
267 proto.set_positiveValues(self.positive_store.to_proto());
268 }
269
270 if !self.negative_store().is_empty() {
271 proto.set_negativeValues(self.negative_store.to_proto());
272 }
273
274 proto.set_zeroCount(self.zero_count as f64);
275
276 proto
277 }
278}
279
280impl<M: IndexMapping + PartialEq, S: Store + PartialEq> PartialEq for DDSketch<M, S> {
281 fn eq(&self, other: &Self) -> bool {
282 self.mapping == other.mapping
283 && self.positive_store == other.positive_store
284 && self.negative_store == other.negative_store
285 && self.zero_count == other.zero_count
286 }
287}
288
289impl<M: IndexMapping + PartialEq, S: Store + PartialEq> Eq for DDSketch<M, S> {}
290
291impl<M: IndexMapping + Default, S: Store + Default> Default for DDSketch<M, S> {
292 fn default() -> Self {
293 Self::new(M::default(), S::default(), S::default())
294 }
295}
296
297#[cfg(test)]
298mod tests {
299 use ndarray::{Array, Axis};
300 use ndarray_stats::{
301 interpolate::{Higher, Lower},
302 QuantileExt,
303 };
304 use noisy_float::types::N64;
305 use num_traits::ToPrimitive as _;
306
307 use super::*;
308
309 macro_rules! assert_rel_acc_range_eq {
310 ($quantile:expr, $rel_acc:expr, $expected_lower:expr, $expected_upper:expr, $actual:expr) => {{
311 let expected_lower_f64 = $expected_lower.to_f64().unwrap();
312 let expected_lower_adj = if expected_lower_f64 > 0.0 {
313 expected_lower_f64 * (1.0 - $rel_acc)
314 } else {
315 expected_lower_f64 * (1.0 + $rel_acc)
316 };
317 let expected_upper_f64 = $expected_upper.to_f64().unwrap();
318 let expected_upper_adj = if expected_upper_f64 > 0.0 {
319 expected_upper_f64 * (1.0 + $rel_acc)
320 } else {
321 expected_upper_f64 * (1.0 - $rel_acc)
322 };
323 let actual = $actual.to_f64().unwrap();
324
325 assert!(
340 actual >= expected_lower_adj && actual <= expected_upper_adj,
341 "mismatch at q={}: expected {} - {} ({}% relative accuracy), got {}",
342 $quantile,
343 expected_lower_adj,
344 expected_upper_adj,
345 $rel_acc * 100.0,
346 actual
347 );
348 }};
349 }
350
351 macro_rules! assert_rel_acc_eq {
352 ($quantile:expr, $rel_acc:expr, $expected:expr, $actual:expr) => {{
353 assert_rel_acc_range_eq!($quantile, $rel_acc, $expected, $expected, $actual);
354 }};
355 }
356
357 struct Dataset<M: IndexMapping, S: Store> {
358 raw_data: Vec<N64>,
359 sketch: DDSketch<M, S>,
360 }
361
362 impl<M: IndexMapping, S: Store + Default> Dataset<M, S> {
363 fn new<V>(index_mapping: M, values: V) -> Self
364 where
365 V: Iterator<Item = f64>,
366 {
367 let mut raw_data = Vec::new();
368 let mut sketch = DDSketch::new(index_mapping, S::default(), S::default());
369 for value in values {
370 raw_data.push(N64::new(value));
371 sketch.add(value);
372 }
373
374 Self { raw_data, sketch }
375 }
376
377 #[track_caller]
378 fn validate(self, quantiles: &[f64]) {
379 let Self { mut raw_data, sketch } = self;
380
381 assert_eq!(raw_data.len() as u64, sketch.count());
383
384 raw_data.sort();
386
387 let mut data = Array::from_vec(raw_data);
388 let rel_acc = sketch.relative_accuracy();
389
390 for q in quantiles {
392 let expected_lower = data
393 .quantile_axis_mut(Axis(0), N64::new(*q), &Lower)
394 .map(|v| v.into_scalar())
395 .ok();
396 let expected_upper = data
397 .quantile_axis_mut(Axis(0), N64::new(*q), &Higher)
398 .map(|v| v.into_scalar())
399 .ok();
400 let actual = sketch.quantile(*q).map(N64::new);
401
402 match (expected_lower, expected_upper, actual) {
403 (Some(expected_lower), Some(expected_upper), Some(actual)) => {
404 assert_rel_acc_range_eq!(q, rel_acc, expected_lower, expected_upper, actual);
413 }
414 (None, None, None) => (),
415 _ => panic!(
416 "mismatched quantiles: expected_lower={:?}, expected_upper={:?}, actual {:?}",
417 expected_lower, expected_upper, actual
418 ),
419 }
420 }
421 }
422 }
423
424 fn integers(start: i64, end: i64) -> impl Iterator<Item = f64> {
425 (start..=end).map(|x| x as f64)
426 }
427
428 #[test]
429 fn test_empty_sketch() {
430 let sketch = DDSketch::with_relative_accuracy(0.01).unwrap();
431
432 assert!(sketch.is_empty());
433 assert_eq!(sketch.count(), 0);
434 assert_eq!(sketch.quantile(0.5), None);
435 }
436
437 #[test]
438 fn test_accuracy_integers_positive_only_even_small() {
439 let index_mapping = LogarithmicMapping::new(0.01).unwrap();
440 let dataset = Dataset::<_, CollapsingLowestDenseStore>::new(index_mapping, integers(1, 10));
441 dataset.validate(&[0.1, 0.25, 0.5, 0.75, 0.9, 0.95, 0.99]);
442 }
443
444 #[test]
445 fn test_accuracy_integers_positive_only_even_medium() {
446 let index_mapping = LogarithmicMapping::new(0.01).unwrap();
447 let dataset = Dataset::<_, CollapsingLowestDenseStore>::new(index_mapping, integers(1, 250));
448 dataset.validate(&[0.1, 0.25, 0.5, 0.75, 0.9, 0.95, 0.99]);
449 }
450
451 #[test]
452 fn test_accuracy_integers_positive_only_even_large() {
453 let index_mapping = LogarithmicMapping::new(0.01).unwrap();
454 let dataset = Dataset::<_, CollapsingLowestDenseStore>::new(index_mapping, integers(1, 1000));
455 dataset.validate(&[0.1, 0.25, 0.5, 0.75, 0.9, 0.95, 0.99]);
456 }
457
458 #[test]
459 fn test_accuracy_integers_positive_only_odd_small() {
460 let index_mapping = LogarithmicMapping::new(0.01).unwrap();
461 let dataset = Dataset::<_, CollapsingLowestDenseStore>::new(index_mapping, integers(1, 11));
462 dataset.validate(&[0.1, 0.25, 0.5, 0.75, 0.9, 0.95, 0.99]);
463 }
464
465 #[test]
466 fn test_accuracy_integers_positive_only_odd_medium() {
467 let index_mapping = LogarithmicMapping::new(0.01).unwrap();
468 let dataset = Dataset::<_, CollapsingLowestDenseStore>::new(index_mapping, integers(1, 293));
469 dataset.validate(&[0.1, 0.25, 0.5, 0.75, 0.9, 0.95, 0.99]);
470 }
471
472 #[test]
473 fn test_accuracy_integers_positive_only_odd_large() {
474 let index_mapping = LogarithmicMapping::new(0.01).unwrap();
475 let dataset = Dataset::<_, CollapsingLowestDenseStore>::new(index_mapping, integers(1, 1023));
476 dataset.validate(&[0.1, 0.25, 0.5, 0.75, 0.9, 0.95, 0.99]);
477 }
478
479 #[test]
480 fn test_accuracy_integers_negative_only_even_small() {
481 let index_mapping = LogarithmicMapping::new(0.01).unwrap();
482 let dataset = Dataset::<_, CollapsingLowestDenseStore>::new(index_mapping, integers(-10, -1));
483 dataset.validate(&[0.1, 0.25, 0.5, 0.75, 0.9, 0.95, 0.99]);
484 }
485
486 #[test]
487 fn test_accuracy_integers_negative_only_even_medium() {
488 let index_mapping = LogarithmicMapping::new(0.01).unwrap();
489 let dataset = Dataset::<_, CollapsingLowestDenseStore>::new(index_mapping, integers(-250, -1));
490 dataset.validate(&[0.1, 0.25, 0.5, 0.75, 0.9, 0.95, 0.99]);
491 }
492
493 #[test]
494 fn test_accuracy_integers_negative_only_even_large() {
495 let index_mapping = LogarithmicMapping::new(0.01).unwrap();
496 let dataset = Dataset::<_, CollapsingLowestDenseStore>::new(index_mapping, integers(-1000, -1));
497 dataset.validate(&[0.1, 0.25, 0.5, 0.75, 0.9, 0.95, 0.99]);
498 }
499
500 #[test]
501 fn test_accuracy_integers_negative_only_odd_small() {
502 let index_mapping = LogarithmicMapping::new(0.01).unwrap();
503 let dataset = Dataset::<_, CollapsingLowestDenseStore>::new(index_mapping, integers(-11, -1));
504 dataset.validate(&[0.1, 0.25, 0.5, 0.75, 0.9, 0.95, 0.99]);
505 }
506
507 #[test]
508 fn test_accuracy_integers_negative_only_odd_medium() {
509 let index_mapping = LogarithmicMapping::new(0.01).unwrap();
510 let dataset = Dataset::<_, CollapsingLowestDenseStore>::new(index_mapping, integers(-293, -1));
511 dataset.validate(&[0.1, 0.25, 0.5, 0.75, 0.9, 0.95, 0.99]);
512 }
513
514 #[test]
515 fn test_accuracy_integers_negative_only_odd_large() {
516 let index_mapping = LogarithmicMapping::new(0.01).unwrap();
517 let dataset = Dataset::<_, CollapsingLowestDenseStore>::new(index_mapping, integers(-1023, -1));
518 dataset.validate(&[0.1, 0.25, 0.5, 0.75, 0.9, 0.95, 0.99]);
519 }
520
521 #[test]
522 fn test_zero_values() {
523 let mut sketch = DDSketch::with_relative_accuracy(0.01).unwrap();
524 sketch.add(0.0);
525 sketch.add(0.0);
526 sketch.add(1.0);
527
528 assert_eq!(sketch.count(), 3);
529 assert_eq!(sketch.zero_count(), 2);
530 }
531
532 #[test]
533 fn test_merge() {
534 let mut sketch1 = DDSketch::with_relative_accuracy(0.01).unwrap();
535 sketch1.add(1.0);
536 sketch1.add(2.0);
537
538 let mut sketch2 = DDSketch::with_relative_accuracy(0.01).unwrap();
539 sketch2.add(3.0);
540 sketch2.add(4.0);
541
542 sketch1.merge(&sketch2);
543
544 assert_eq!(sketch1.count(), 4);
545 }
546
547 #[test]
548 fn test_clear() {
549 let mut sketch = DDSketch::with_relative_accuracy(0.01).unwrap();
550 sketch.add(1.0);
551 sketch.add(2.0);
552
553 sketch.clear();
554
555 assert!(sketch.is_empty());
556 assert_eq!(sketch.count(), 0);
557 }
558
559 #[test]
560 #[ignore]
561 fn test_quantile_bounds() {
562 let mut sketch = DDSketch::with_relative_accuracy(0.01).unwrap();
563 for i in 1..=100 {
564 sketch.add(i as f64);
565 }
566
567 let min_actual = sketch.quantile(0.0).unwrap();
569 assert_rel_acc_eq!(0.0, 0.01, 1.0, min_actual);
570
571 let max_actual = sketch.quantile(1.0).unwrap();
573 assert_rel_acc_eq!(1.0, 0.01, 100.0, max_actual);
574 }
575
576 #[test]
577 fn test_add_n() {
578 let mut sketch = DDSketch::with_relative_accuracy(0.01).unwrap();
579 sketch.add_n(10.0, 5);
580
581 assert_eq!(sketch.count(), 5);
582 }
583
584 #[test]
585 fn test_proto_roundtrip() {
586 let mut sketch = DDSketch::with_relative_accuracy(0.01).unwrap();
587 sketch.add(1.0);
588 sketch.add(2.0);
589 sketch.add(3.0);
590 sketch.add(100.0);
591
592 let proto = sketch.to_proto();
593 let mapping = LogarithmicMapping::new(0.01).unwrap();
594 let recovered: DDSketch = DDSketch::from_proto(&proto, mapping).unwrap();
595
596 assert_eq!(sketch.count(), recovered.count());
598 assert_eq!(sketch.zero_count(), recovered.zero_count());
599
600 for q in [0.25, 0.5, 0.75, 0.99] {
602 let orig = sketch.quantile(q).unwrap();
603 let recov = recovered.quantile(q).unwrap();
604 assert!(
605 (orig - recov).abs() < 0.001,
606 "quantile {} mismatch: {} vs {}",
607 q,
608 orig,
609 recov
610 );
611 }
612 }
613
614 #[test]
615 fn test_proto_roundtrip_with_negatives() {
616 let mut sketch = DDSketch::with_relative_accuracy(0.01).unwrap();
617 sketch.add(-10.0);
618 sketch.add(-5.0);
619 sketch.add(0.0);
620 sketch.add(5.0);
621 sketch.add(10.0);
622
623 let proto = sketch.to_proto();
624 let mapping = LogarithmicMapping::new(0.01).unwrap();
625 let recovered: DDSketch = DDSketch::from_proto(&proto, mapping).unwrap();
626
627 assert_eq!(sketch.count(), recovered.count());
628 assert_eq!(sketch.zero_count(), recovered.zero_count());
629 }
630
631 #[test]
632 fn test_proto_roundtrip_empty() {
633 let sketch = DDSketch::with_relative_accuracy(0.01).unwrap();
634
635 let proto = sketch.to_proto();
636 let mapping = LogarithmicMapping::new(0.01).unwrap();
637 let recovered: DDSketch = DDSketch::from_proto(&proto, mapping).unwrap();
638
639 assert!(recovered.is_empty());
640 assert_eq!(recovered.count(), 0);
641 }
642
643 #[test]
644 fn test_proto_gamma_mismatch() {
645 let mut sketch = DDSketch::with_relative_accuracy(0.01).unwrap();
646 sketch.add(1.0);
647
648 let proto = sketch.to_proto();
649
650 let different_mapping = LogarithmicMapping::new(0.05).unwrap();
652 let result = DDSketch::<_, CollapsingLowestDenseStore>::from_proto(&proto, different_mapping);
653
654 assert!(result.is_err());
655 match result {
656 Err(crate::canonical::ProtoConversionError::GammaMismatch { .. }) => {}
657 _ => panic!("Expected GammaMismatch error"),
658 }
659 }
660
661 #[test]
662 fn test_proto_missing_mapping() {
663 use datadog_protos::sketches::DDSketch as ProtoDDSketch;
664
665 let proto = ProtoDDSketch::new(); let mapping = LogarithmicMapping::new(0.01).unwrap();
667 let result = DDSketch::<_, CollapsingLowestDenseStore>::from_proto(&proto, mapping);
668
669 assert!(result.is_err());
670 match result {
671 Err(crate::canonical::ProtoConversionError::MissingMapping) => {}
672 _ => panic!("Expected MissingMapping error"),
673 }
674 }
675}