1mod iter;
2
3use std::{collections::HashSet, fmt, num::NonZeroU64, time::Duration};
4
5use ddsketch_agent::DDSketch;
6use ordered_float::OrderedFloat;
7use smallvec::SmallVec;
8
9mod sketch;
10pub use self::sketch::SketchPoints;
11
12mod histogram;
13pub use self::histogram::{Histogram, HistogramPoints, HistogramSummary};
14
15mod scalar;
16pub use self::scalar::ScalarPoints;
17
18mod set;
19pub use self::set::SetPoints;
20use super::SampleRate;
21
22#[derive(Clone, Debug, Eq, PartialEq)]
23struct TimestampedValue<T> {
24 timestamp: Option<NonZeroU64>,
25 value: T,
26}
27
28impl<T> From<T> for TimestampedValue<T> {
29 fn from(value: T) -> Self {
30 Self { timestamp: None, value }
31 }
32}
33
34impl<T> From<(u64, T)> for TimestampedValue<T> {
35 fn from((timestamp, value): (u64, T)) -> Self {
36 Self {
37 timestamp: NonZeroU64::new(timestamp),
38 value,
39 }
40 }
41}
42
43impl From<(Option<NonZeroU64>, f64)> for TimestampedValue<OrderedFloat<f64>> {
44 fn from((timestamp, value): (Option<NonZeroU64>, f64)) -> Self {
45 Self {
46 timestamp,
47 value: OrderedFloat(value),
48 }
49 }
50}
51
52impl<T> From<(Option<NonZeroU64>, T)> for TimestampedValue<T> {
53 fn from((timestamp, value): (Option<NonZeroU64>, T)) -> Self {
54 Self { timestamp, value }
55 }
56}
57
58#[derive(Clone, Debug, Eq, PartialEq)]
59struct TimestampedValues<T, const N: usize> {
60 values: SmallVec<[TimestampedValue<T>; N]>,
61}
62
63impl<T, const N: usize> TimestampedValues<T, N> {
64 fn all_timestamped(&self) -> bool {
65 self.values.iter().all(|value| value.timestamp.is_some())
66 }
67
68 fn any_timestamped(&self) -> bool {
69 self.values.iter().any(|value| value.timestamp.is_some())
70 }
71
72 fn drain_timestamped(&mut self) -> Self {
73 Self {
74 values: self.values.drain_filter(|value| value.timestamp.is_some()).collect(),
75 }
76 }
77
78 fn sort_by_timestamp(&mut self) {
79 self.values.sort_by_key(|value| value.timestamp);
80 }
81
82 fn split_at_timestamp(&mut self, timestamp: u64) -> Option<Self> {
83 if self.values.is_empty() {
84 return None;
85 }
86
87 if let Some(first) = self.values.first() {
90 if let Some(first_ts) = first.timestamp {
91 if first_ts.get() > timestamp {
92 return None;
93 }
94 }
95 }
96
97 let new_values = self
98 .values
99 .drain_filter(|value| value.timestamp.is_some_and(|ts| ts.get() <= timestamp));
100 Some(Self::from(new_values))
101 }
102
103 fn set_timestamp(&mut self, timestamp: u64) {
104 let timestamp = NonZeroU64::new(timestamp);
105 for value in &mut self.values {
106 value.timestamp = timestamp;
107 }
108 }
109
110 fn collapse_non_timestamped<F>(&mut self, timestamp: u64, merge: F)
111 where
112 F: Fn(&mut T, &mut T),
113 {
114 self.values.dedup_by(|a, b| {
115 if a.timestamp.is_none() && b.timestamp.is_none() {
116 merge(&mut b.value, &mut a.value);
117 true
118 } else {
119 false
120 }
121 });
122
123 if let Some(first) = self.values.first_mut() {
127 first.timestamp = first.timestamp.or(NonZeroU64::new(timestamp));
128 }
129 }
130}
131
132impl<T, const N: usize> Default for TimestampedValues<T, N> {
133 fn default() -> Self {
134 Self {
135 values: SmallVec::new(),
136 }
137 }
138}
139
140impl<T, const N: usize> From<TimestampedValue<T>> for TimestampedValues<T, N> {
141 fn from(value: TimestampedValue<T>) -> Self {
142 let mut values = SmallVec::new();
143 values.push(value);
144
145 Self { values }
146 }
147}
148
149impl<I, IT, T, const N: usize> From<I> for TimestampedValues<T, N>
150where
151 I: IntoIterator<Item = IT>,
152 IT: Into<TimestampedValue<T>>,
153{
154 fn from(value: I) -> Self {
155 let mut values = SmallVec::new();
156 values.extend(value.into_iter().map(Into::into));
157
158 let mut values = Self { values };
159 values.sort_by_timestamp();
160
161 values
162 }
163}
164
165#[derive(Clone, Debug, Eq, PartialEq)]
167pub enum MetricValues {
168 Counter(ScalarPoints),
172
173 Rate(ScalarPoints, Duration),
180
181 Gauge(ScalarPoints),
186
187 Set(SetPoints),
191
192 Histogram(HistogramPoints),
198
199 Distribution(SketchPoints),
208}
209
210impl MetricValues {
211 pub fn counter<V>(values: V) -> Self
213 where
214 V: Into<ScalarPoints>,
215 {
216 Self::Counter(values.into())
217 }
218
219 pub fn counter_sampled_fallible<I, E>(iter: I, sample_rate: Option<SampleRate>) -> Result<Self, E>
224 where
225 I: Iterator<Item = Result<f64, E>>,
226 {
227 let sample_rate = sample_rate.unwrap_or(SampleRate::unsampled());
228
229 let mut points = ScalarPoints::new();
230 for value in iter {
231 let value = value?;
232 points.add_point(None, value * sample_rate.raw_weight());
233 }
234 Ok(Self::Counter(points))
235 }
236
237 pub fn gauge<V>(values: V) -> Self
239 where
240 V: Into<ScalarPoints>,
241 {
242 Self::Gauge(values.into())
243 }
244
245 pub fn gauge_fallible<I, E>(iter: I) -> Result<Self, E>
247 where
248 I: Iterator<Item = Result<f64, E>>,
249 {
250 let mut points = ScalarPoints::new();
251 for value in iter {
252 points.add_point(None, value?);
253 }
254 Ok(Self::Gauge(points))
255 }
256
257 pub fn set<V>(values: V) -> Self
259 where
260 V: Into<SetPoints>,
261 {
262 Self::Set(values.into())
263 }
264
265 pub fn histogram<V>(values: V) -> Self
267 where
268 V: Into<HistogramPoints>,
269 {
270 Self::Histogram(values.into())
271 }
272
273 pub fn histogram_sampled_fallible<I, E>(iter: I, sample_rate: Option<SampleRate>) -> Result<Self, E>
278 where
279 I: Iterator<Item = Result<f64, E>>,
280 {
281 let sample_rate = sample_rate.unwrap_or(SampleRate::unsampled());
282
283 let mut histogram = Histogram::default();
284 for value in iter {
285 let value = value?;
286 histogram.insert(value, sample_rate);
287 }
288 Ok(Self::Histogram(histogram.into()))
289 }
290
291 pub fn distribution<V>(values: V) -> Self
293 where
294 V: Into<SketchPoints>,
295 {
296 Self::Distribution(values.into())
297 }
298
299 pub fn distribution_sampled_fallible<I, E>(iter: I, sample_rate: Option<SampleRate>) -> Result<Self, E>
304 where
305 I: Iterator<Item = Result<f64, E>>,
306 {
307 let sample_rate = sample_rate.unwrap_or(SampleRate::unsampled());
308 let capped_sample_rate = u32::try_from(sample_rate.weight()).unwrap_or(u32::MAX);
309
310 let mut sketch = DDSketch::default();
311 for value in iter {
312 let value = value?;
313 if capped_sample_rate == 1 {
314 sketch.insert(value);
315 } else {
316 sketch.insert_n(value, capped_sample_rate);
317 }
318 }
319 Ok(Self::Distribution(sketch.into()))
320 }
321
322 pub fn rate<V>(values: V, interval: Duration) -> Self
324 where
325 V: Into<ScalarPoints>,
326 {
327 Self::Rate(values.into(), interval)
328 }
329
330 pub fn is_empty(&self) -> bool {
332 match self {
333 Self::Counter(points) | Self::Rate(points, _) | Self::Gauge(points) => points.is_empty(),
334 Self::Set(points) => points.is_empty(),
335 Self::Histogram(points) => points.is_empty(),
336 Self::Distribution(points) => points.is_empty(),
337 }
338 }
339
340 pub fn len(&self) -> usize {
342 match self {
343 Self::Counter(points) | Self::Rate(points, _) | Self::Gauge(points) => points.len(),
344 Self::Set(points) => points.len(),
345 Self::Histogram(points) => points.len(),
346 Self::Distribution(points) => points.len(),
347 }
348 }
349
350 pub fn all_timestamped(&self) -> bool {
352 match self {
353 Self::Counter(points) | Self::Rate(points, _) | Self::Gauge(points) => points.inner().all_timestamped(),
354 Self::Set(points) => points.inner().all_timestamped(),
355 Self::Histogram(points) => points.inner().all_timestamped(),
356 Self::Distribution(points) => points.inner().all_timestamped(),
357 }
358 }
359
360 pub fn any_timestamped(&self) -> bool {
362 match self {
363 Self::Counter(points) | Self::Rate(points, _) | Self::Gauge(points) => points.inner().any_timestamped(),
364 Self::Set(points) => points.inner().any_timestamped(),
365 Self::Histogram(points) => points.inner().any_timestamped(),
366 Self::Distribution(points) => points.inner().any_timestamped(),
367 }
368 }
369
370 pub fn set_timestamp(&mut self, timestamp: u64) {
375 match self {
376 Self::Counter(points) | Self::Gauge(points) | Self::Rate(points, _) => {
377 points.inner_mut().set_timestamp(timestamp)
378 }
379 Self::Set(points) => points.inner_mut().set_timestamp(timestamp),
380 Self::Histogram(points) => points.inner_mut().set_timestamp(timestamp),
381 Self::Distribution(points) => points.inner_mut().set_timestamp(timestamp),
382 }
383 }
384
385 pub fn split_timestamped(&mut self) -> Self {
387 match self {
388 Self::Counter(points) => Self::Counter(points.drain_timestamped()),
389 Self::Rate(points, interval) => Self::Rate(points.drain_timestamped(), *interval),
390 Self::Gauge(points) => Self::Gauge(points.drain_timestamped()),
391 Self::Set(points) => Self::Set(points.drain_timestamped()),
392 Self::Histogram(points) => Self::Histogram(points.drain_timestamped()),
393 Self::Distribution(points) => Self::Distribution(points.drain_timestamped()),
394 }
395 }
396
397 pub fn split_at_timestamp(&mut self, timestamp: u64) -> Option<Self> {
400 match self {
401 Self::Counter(points) => points.split_at_timestamp(timestamp).map(Self::Counter),
402 Self::Rate(points, interval) => points
403 .split_at_timestamp(timestamp)
404 .map(|points| Self::Rate(points, *interval)),
405 Self::Gauge(points) => points.split_at_timestamp(timestamp).map(Self::Gauge),
406 Self::Set(points) => points.split_at_timestamp(timestamp).map(Self::Set),
407 Self::Histogram(points) => points.split_at_timestamp(timestamp).map(Self::Histogram),
408 Self::Distribution(points) => points.split_at_timestamp(timestamp).map(Self::Distribution),
409 }
410 }
411
412 pub fn collapse_non_timestamped(&mut self, timestamp: u64) {
415 match self {
416 Self::Counter(points) => points.inner_mut().collapse_non_timestamped(timestamp, merge_scalar_sum),
418 Self::Rate(points, _) => points.inner_mut().collapse_non_timestamped(timestamp, merge_scalar_sum),
419 Self::Gauge(points) => points
421 .inner_mut()
422 .collapse_non_timestamped(timestamp, merge_scalar_latest),
423 Self::Set(points) => points.inner_mut().collapse_non_timestamped(timestamp, merge_set),
425 Self::Histogram(points) => points.inner_mut().collapse_non_timestamped(timestamp, merge_histogram),
426 Self::Distribution(sketches) => sketches.inner_mut().collapse_non_timestamped(timestamp, merge_sketch),
427 }
428 }
429
430 pub fn merge(&mut self, other: Self) {
439 match (self, other) {
440 (Self::Counter(a), Self::Counter(b)) => a.merge(b, merge_scalar_sum),
441 (Self::Rate(a_points, a_interval), Self::Rate(b_points, b_interval)) => {
442 if *a_interval != b_interval {
443 *a_points = b_points;
444 *a_interval = b_interval;
445 } else {
446 a_points.merge(b_points, merge_scalar_sum);
447 }
448 }
449 (Self::Gauge(a), Self::Gauge(b)) => a.merge(b, merge_scalar_latest),
450 (Self::Set(a), Self::Set(b)) => a.merge(b),
451 (Self::Histogram(a), Self::Histogram(b)) => a.merge(b),
452 (Self::Distribution(a), Self::Distribution(b)) => a.merge(b),
453
454 (dest, src) => drop(std::mem::replace(dest, src)),
456 }
457 }
458
459 pub fn as_str(&self) -> &'static str {
461 match self {
462 Self::Counter(_) => "counter",
463 Self::Rate(_, _) => "rate",
464 Self::Gauge(_) => "gauge",
465 Self::Set(_) => "set",
466 Self::Histogram(_) => "histogram",
467 Self::Distribution(_) => "distribution",
468 }
469 }
470
471 pub fn is_serie(&self) -> bool {
473 matches!(
474 self,
475 Self::Counter(_) | Self::Rate(_, _) | Self::Gauge(_) | Self::Set(_) | Self::Histogram(_)
476 )
477 }
478
479 pub fn is_sketch(&self) -> bool {
481 matches!(self, Self::Distribution(_))
482 }
483}
484
485impl fmt::Display for MetricValues {
486 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
487 match self {
488 Self::Counter(points) => write!(f, "{}", points),
489 Self::Rate(points, interval) => write!(f, "{} over {:?}", points, interval),
490 Self::Gauge(points) => write!(f, "{}", points),
491 Self::Set(points) => write!(f, "{}", points),
492 Self::Histogram(points) => write!(f, "{}", points),
493 Self::Distribution(points) => write!(f, "{}", points),
494 }
495 }
496}
497
498fn merge_scalar_sum(dest: &mut OrderedFloat<f64>, src: &mut OrderedFloat<f64>) {
499 *dest += *src;
500}
501
502fn merge_scalar_latest(dest: &mut OrderedFloat<f64>, src: &mut OrderedFloat<f64>) {
503 *dest = *src;
504}
505
506fn merge_set(dest: &mut HashSet<String>, src: &mut HashSet<String>) {
507 dest.extend(src.drain());
508}
509
510fn merge_histogram(dest: &mut Histogram, src: &mut Histogram) {
511 dest.merge(src);
512}
513
514fn merge_sketch(dest: &mut DDSketch, src: &mut DDSketch) {
515 dest.merge(src);
516}
517
518#[cfg(test)]
519mod tests {
520 use std::time::Duration;
521
522 use super::{HistogramPoints, MetricValues, ScalarPoints, SetPoints, SketchPoints};
523
524 #[test]
525 fn merge_counters() {
526 let cases = [
527 (
529 ScalarPoints::from((1, 1.0)),
530 ScalarPoints::from((1, 2.0)),
531 ScalarPoints::from((1, 3.0)),
532 ),
533 (
535 ScalarPoints::from((1, 1.0)),
536 ScalarPoints::from(2.0),
537 ScalarPoints::from([(0, 2.0), (1, 1.0)]),
538 ),
539 (
541 ScalarPoints::from(5.0),
542 ScalarPoints::from(6.0),
543 ScalarPoints::from(11.0),
544 ),
545 ];
546
547 for (a, b, expected) in cases {
548 let mut merged = MetricValues::Counter(a.clone());
549 merged.merge(MetricValues::Counter(b.clone()));
550
551 assert_eq!(
552 merged,
553 MetricValues::Counter(expected.clone()),
554 "merged {} with {}, expected {} but got {}",
555 a,
556 b,
557 expected,
558 merged
559 );
560 }
561 }
562
563 #[test]
564 fn merge_gauges() {
565 let cases = [
566 (
568 ScalarPoints::from((1, 1.0)),
569 ScalarPoints::from((1, 2.0)),
570 ScalarPoints::from((1, 2.0)),
571 ),
572 (
574 ScalarPoints::from((1, 1.0)),
575 ScalarPoints::from(2.0),
576 ScalarPoints::from([(0, 2.0), (1, 1.0)]),
577 ),
578 (
580 ScalarPoints::from(5.0),
581 ScalarPoints::from(6.0),
582 ScalarPoints::from(6.0),
583 ),
584 ];
585
586 for (a, b, expected) in cases {
587 let mut merged = MetricValues::Gauge(a.clone());
588 merged.merge(MetricValues::Gauge(b.clone()));
589
590 assert_eq!(
591 merged,
592 MetricValues::Gauge(expected.clone()),
593 "merged {} with {}, expected {} but got {}",
594 a,
595 b,
596 expected,
597 merged
598 );
599 }
600 }
601
602 #[test]
603 fn merge_rates() {
604 const FIVE_SECS: Duration = Duration::from_secs(5);
605 const TEN_SECS: Duration = Duration::from_secs(10);
606
607 let cases = [
608 (
610 ScalarPoints::from((1, 1.0)),
611 FIVE_SECS,
612 ScalarPoints::from((1, 2.0)),
613 FIVE_SECS,
614 ScalarPoints::from((1, 3.0)),
615 FIVE_SECS,
616 ),
617 (
619 ScalarPoints::from((1, 1.0)),
620 FIVE_SECS,
621 ScalarPoints::from(2.0),
622 FIVE_SECS,
623 ScalarPoints::from([(0, 2.0), (1, 1.0)]),
624 FIVE_SECS,
625 ),
626 (
628 ScalarPoints::from(5.0),
629 FIVE_SECS,
630 ScalarPoints::from(6.0),
631 FIVE_SECS,
632 ScalarPoints::from(11.0),
633 FIVE_SECS,
634 ),
635 (
639 ScalarPoints::from((1, 1.0)),
640 FIVE_SECS,
641 ScalarPoints::from((1, 2.0)),
642 TEN_SECS,
643 ScalarPoints::from((1, 2.0)),
644 TEN_SECS,
645 ),
646 (
647 ScalarPoints::from((1, 3.0)),
648 FIVE_SECS,
649 ScalarPoints::from(4.0),
650 TEN_SECS,
651 ScalarPoints::from((0, 4.0)),
652 TEN_SECS,
653 ),
654 (
655 ScalarPoints::from(7.0),
656 TEN_SECS,
657 ScalarPoints::from(9.0),
658 FIVE_SECS,
659 ScalarPoints::from(9.0),
660 FIVE_SECS,
661 ),
662 ];
663
664 for (a, a_interval, b, b_interval, expected, expected_interval) in cases {
665 let mut merged = MetricValues::Rate(a.clone(), a_interval);
666 merged.merge(MetricValues::Rate(b.clone(), b_interval));
667
668 assert_eq!(
669 merged,
670 MetricValues::Rate(expected.clone(), expected_interval),
671 "merged {}/{:?} with {}/{:?}, expected {} but got {}",
672 a,
673 a_interval,
674 b,
675 b_interval,
676 expected,
677 merged
678 );
679 }
680 }
681
682 #[test]
683 fn merge_sets() {
684 let cases = [
685 (
687 SetPoints::from((1, "foo")),
688 SetPoints::from((1, "bar")),
689 SetPoints::from((1, ["foo", "bar"])),
690 ),
691 (
694 SetPoints::from((1, "foo")),
695 SetPoints::from("bar"),
696 SetPoints::from([(0, "bar"), (1, "foo")]),
697 ),
698 (
700 SetPoints::from("foo"),
701 SetPoints::from("bar"),
702 SetPoints::from(["foo", "bar"]),
703 ),
704 ];
705
706 for (a, b, expected) in cases {
707 let mut merged = MetricValues::Set(a.clone());
708 merged.merge(MetricValues::Set(b.clone()));
709
710 assert_eq!(
711 merged,
712 MetricValues::Set(expected.clone()),
713 "merged {} with {}, expected {} but got {}",
714 a,
715 b,
716 expected,
717 merged
718 );
719 }
720 }
721
722 #[test]
723 fn merge_histograms() {
724 let cases = [
725 (
727 HistogramPoints::from((1, 1.0)),
728 HistogramPoints::from((1, 2.0)),
729 HistogramPoints::from((1, [1.0, 2.0])),
730 ),
731 (
733 HistogramPoints::from((1, 1.0)),
734 HistogramPoints::from(2.0),
735 HistogramPoints::from([(0, 2.0), (1, 1.0)]),
736 ),
737 (
739 HistogramPoints::from(5.0),
740 HistogramPoints::from(6.0),
741 HistogramPoints::from([5.0, 6.0]),
742 ),
743 ];
744
745 for (a, b, expected) in cases {
746 let mut merged = MetricValues::Histogram(a.clone());
747 merged.merge(MetricValues::Histogram(b.clone()));
748
749 assert_eq!(
750 merged,
751 MetricValues::Histogram(expected.clone()),
752 "merged {} with {}, expected {} but got {}",
753 a,
754 b,
755 expected,
756 merged
757 );
758 }
759 }
760
761 #[test]
762 fn merge_distributions() {
763 let cases = [
764 (
766 SketchPoints::from((1, 1.0)),
767 SketchPoints::from((1, 2.0)),
768 SketchPoints::from((1, [1.0, 2.0])),
769 ),
770 (
772 SketchPoints::from((1, 1.0)),
773 SketchPoints::from(2.0),
774 SketchPoints::from([(0, 2.0), (1, 1.0)]),
775 ),
776 (
778 SketchPoints::from(5.0),
779 SketchPoints::from(6.0),
780 SketchPoints::from([5.0, 6.0]),
781 ),
782 ];
783
784 for (a, b, expected) in cases {
785 let mut merged = MetricValues::Distribution(a.clone());
786 merged.merge(MetricValues::Distribution(b.clone()));
787
788 assert_eq!(
789 merged,
790 MetricValues::Distribution(expected.clone()),
791 "merged {} with {}, expected {} but got {}",
792 a,
793 b,
794 expected,
795 merged
796 );
797 }
798 }
799}