1mod iter;
2
3use std::{collections::HashSet, fmt, num::NonZeroU64, time::Duration};
4
5use ddsketch::DDSketch;
6use ordered_float::OrderedFloat;
7use smallvec::SmallVec;
8
9mod sketch;
10pub use self::sketch::SketchPoints;
11
12mod histogram;
13pub use self::histogram::{Histogram, HistogramPoints, HistogramSummary};
14
15mod scalar;
16pub use self::scalar::ScalarPoints;
17
18mod set;
19pub use self::set::SetPoints;
20use super::SampleRate;
21
22#[derive(Clone, Debug, Eq, PartialEq)]
23struct TimestampedValue<T> {
24 timestamp: Option<NonZeroU64>,
25 value: T,
26}
27
28impl<T> From<T> for TimestampedValue<T> {
29 fn from(value: T) -> Self {
30 Self { timestamp: None, value }
31 }
32}
33
34impl<T> From<(u64, T)> for TimestampedValue<T> {
35 fn from((timestamp, value): (u64, T)) -> Self {
36 Self {
37 timestamp: NonZeroU64::new(timestamp),
38 value,
39 }
40 }
41}
42
43impl From<(Option<NonZeroU64>, f64)> for TimestampedValue<OrderedFloat<f64>> {
44 fn from((timestamp, value): (Option<NonZeroU64>, f64)) -> Self {
45 Self {
46 timestamp,
47 value: OrderedFloat(value),
48 }
49 }
50}
51
52impl<T> From<(Option<NonZeroU64>, T)> for TimestampedValue<T> {
53 fn from((timestamp, value): (Option<NonZeroU64>, T)) -> Self {
54 Self { timestamp, value }
55 }
56}
57
58#[derive(Clone, Debug, Eq, PartialEq)]
59struct TimestampedValues<T, const N: usize> {
60 values: SmallVec<[TimestampedValue<T>; N]>,
61}
62
63impl<T, const N: usize> TimestampedValues<T, N> {
64 fn all_timestamped(&self) -> bool {
65 self.values.iter().all(|value| value.timestamp.is_some())
66 }
67
68 fn any_timestamped(&self) -> bool {
69 self.values.iter().any(|value| value.timestamp.is_some())
70 }
71
72 fn drain_timestamped(&mut self) -> Self {
73 Self {
74 values: self.values.drain_filter(|value| value.timestamp.is_some()).collect(),
75 }
76 }
77
78 fn sort_by_timestamp(&mut self) {
79 self.values.sort_by_key(|value| value.timestamp);
80 }
81
82 fn split_at_timestamp(&mut self, timestamp: u64) -> Option<Self> {
83 if self.values.is_empty() {
84 return None;
85 }
86
87 if let Some(first) = self.values.first() {
90 if let Some(first_ts) = first.timestamp {
91 if first_ts.get() > timestamp {
92 return None;
93 }
94 }
95 }
96
97 let new_values = self
98 .values
99 .drain_filter(|value| value.timestamp.is_some_and(|ts| ts.get() <= timestamp));
100 Some(Self::from(new_values))
101 }
102
103 fn set_timestamp(&mut self, timestamp: u64) {
104 let timestamp = NonZeroU64::new(timestamp);
105 for value in &mut self.values {
106 value.timestamp = timestamp;
107 }
108 }
109
110 fn collapse_non_timestamped<F>(&mut self, timestamp: u64, merge: F)
111 where
112 F: Fn(&mut T, &mut T),
113 {
114 self.values.dedup_by(|a, b| {
115 if a.timestamp.is_none() && b.timestamp.is_none() {
116 merge(&mut b.value, &mut a.value);
117 true
118 } else {
119 false
120 }
121 });
122
123 if let Some(first) = self.values.first_mut() {
127 first.timestamp = first.timestamp.or(NonZeroU64::new(timestamp));
128 }
129 }
130}
131
132impl<T, const N: usize> Default for TimestampedValues<T, N> {
133 fn default() -> Self {
134 Self {
135 values: SmallVec::new(),
136 }
137 }
138}
139
140impl<T, const N: usize> From<TimestampedValue<T>> for TimestampedValues<T, N> {
141 fn from(value: TimestampedValue<T>) -> Self {
142 let mut values = SmallVec::new();
143 values.push(value);
144
145 Self { values }
146 }
147}
148
149impl<I, IT, T, const N: usize> From<I> for TimestampedValues<T, N>
150where
151 I: IntoIterator<Item = IT>,
152 IT: Into<TimestampedValue<T>>,
153{
154 fn from(value: I) -> Self {
155 let mut values = SmallVec::new();
156 values.extend(value.into_iter().map(Into::into));
157
158 let mut values = Self { values };
159 values.sort_by_timestamp();
160
161 values
162 }
163}
164
165#[derive(Clone, Debug, Eq, PartialEq)]
167pub enum MetricValues {
168 Counter(ScalarPoints),
172
173 Rate(ScalarPoints, Duration),
180
181 Gauge(ScalarPoints),
186
187 Set(SetPoints),
191
192 Histogram(HistogramPoints),
198
199 Distribution(SketchPoints),
208}
209
210impl MetricValues {
211 pub fn counter<V>(values: V) -> Self
213 where
214 V: Into<ScalarPoints>,
215 {
216 Self::Counter(values.into())
217 }
218
219 pub fn counter_sampled_fallible<I, E>(iter: I, sample_rate: Option<SampleRate>) -> Result<Self, E>
224 where
225 I: Iterator<Item = Result<f64, E>>,
226 {
227 let sample_rate = sample_rate.unwrap_or(SampleRate::unsampled());
228
229 let mut points = ScalarPoints::new();
230 for value in iter {
231 let value = value?;
232 points.add_point(None, value * sample_rate.raw_weight());
233 }
234 Ok(Self::Counter(points))
235 }
236
237 pub fn gauge<V>(values: V) -> Self
239 where
240 V: Into<ScalarPoints>,
241 {
242 Self::Gauge(values.into())
243 }
244
245 pub fn gauge_fallible<I, E>(iter: I) -> Result<Self, E>
247 where
248 I: Iterator<Item = Result<f64, E>>,
249 {
250 let mut points = ScalarPoints::new();
251 for value in iter {
252 points.add_point(None, value?);
253 }
254 Ok(Self::Gauge(points))
255 }
256
257 pub fn set<V>(values: V) -> Self
259 where
260 V: Into<SetPoints>,
261 {
262 Self::Set(values.into())
263 }
264
265 pub fn histogram<V>(values: V) -> Self
267 where
268 V: Into<HistogramPoints>,
269 {
270 Self::Histogram(values.into())
271 }
272
273 pub fn histogram_sampled_fallible<I, E>(iter: I, sample_rate: Option<SampleRate>) -> Result<Self, E>
278 where
279 I: Iterator<Item = Result<f64, E>>,
280 {
281 let sample_rate = sample_rate.unwrap_or(SampleRate::unsampled());
282
283 let mut histogram = Histogram::default();
284 for value in iter {
285 let value = value?;
286 histogram.insert(value, sample_rate);
287 }
288 Ok(Self::Histogram(histogram.into()))
289 }
290
291 pub fn distribution<V>(values: V) -> Self
293 where
294 V: Into<SketchPoints>,
295 {
296 Self::Distribution(values.into())
297 }
298
299 pub fn distribution_sampled_fallible<I, E>(iter: I, sample_rate: Option<SampleRate>) -> Result<Self, E>
304 where
305 I: Iterator<Item = Result<f64, E>>,
306 {
307 let sample_rate = sample_rate.unwrap_or(SampleRate::unsampled());
308
309 let mut sketch = DDSketch::default();
310 for value in iter {
311 let value = value?;
312 sketch.insert_n(value, sample_rate.weight());
313 }
314 Ok(Self::Distribution(sketch.into()))
315 }
316
317 pub fn rate<V>(values: V, interval: Duration) -> Self
319 where
320 V: Into<ScalarPoints>,
321 {
322 Self::Rate(values.into(), interval)
323 }
324
325 pub fn is_empty(&self) -> bool {
327 match self {
328 Self::Counter(points) | Self::Rate(points, _) | Self::Gauge(points) => points.is_empty(),
329 Self::Set(points) => points.is_empty(),
330 Self::Histogram(points) => points.is_empty(),
331 Self::Distribution(points) => points.is_empty(),
332 }
333 }
334
335 pub fn len(&self) -> usize {
337 match self {
338 Self::Counter(points) | Self::Rate(points, _) | Self::Gauge(points) => points.len(),
339 Self::Set(points) => points.len(),
340 Self::Histogram(points) => points.len(),
341 Self::Distribution(points) => points.len(),
342 }
343 }
344
345 pub fn all_timestamped(&self) -> bool {
347 match self {
348 Self::Counter(points) | Self::Rate(points, _) | Self::Gauge(points) => points.inner().all_timestamped(),
349 Self::Set(points) => points.inner().all_timestamped(),
350 Self::Histogram(points) => points.inner().all_timestamped(),
351 Self::Distribution(points) => points.inner().all_timestamped(),
352 }
353 }
354
355 pub fn any_timestamped(&self) -> bool {
357 match self {
358 Self::Counter(points) | Self::Rate(points, _) | Self::Gauge(points) => points.inner().any_timestamped(),
359 Self::Set(points) => points.inner().any_timestamped(),
360 Self::Histogram(points) => points.inner().any_timestamped(),
361 Self::Distribution(points) => points.inner().any_timestamped(),
362 }
363 }
364
365 pub fn set_timestamp(&mut self, timestamp: u64) {
370 match self {
371 Self::Counter(points) | Self::Gauge(points) | Self::Rate(points, _) => {
372 points.inner_mut().set_timestamp(timestamp)
373 }
374 Self::Set(points) => points.inner_mut().set_timestamp(timestamp),
375 Self::Histogram(points) => points.inner_mut().set_timestamp(timestamp),
376 Self::Distribution(points) => points.inner_mut().set_timestamp(timestamp),
377 }
378 }
379
380 pub fn split_timestamped(&mut self) -> Self {
382 match self {
383 Self::Counter(points) => Self::Counter(points.drain_timestamped()),
384 Self::Rate(points, interval) => Self::Rate(points.drain_timestamped(), *interval),
385 Self::Gauge(points) => Self::Gauge(points.drain_timestamped()),
386 Self::Set(points) => Self::Set(points.drain_timestamped()),
387 Self::Histogram(points) => Self::Histogram(points.drain_timestamped()),
388 Self::Distribution(points) => Self::Distribution(points.drain_timestamped()),
389 }
390 }
391
392 pub fn split_at_timestamp(&mut self, timestamp: u64) -> Option<Self> {
395 match self {
396 Self::Counter(points) => points.split_at_timestamp(timestamp).map(Self::Counter),
397 Self::Rate(points, interval) => points
398 .split_at_timestamp(timestamp)
399 .map(|points| Self::Rate(points, *interval)),
400 Self::Gauge(points) => points.split_at_timestamp(timestamp).map(Self::Gauge),
401 Self::Set(points) => points.split_at_timestamp(timestamp).map(Self::Set),
402 Self::Histogram(points) => points.split_at_timestamp(timestamp).map(Self::Histogram),
403 Self::Distribution(points) => points.split_at_timestamp(timestamp).map(Self::Distribution),
404 }
405 }
406
407 pub fn collapse_non_timestamped(&mut self, timestamp: u64) {
410 match self {
411 Self::Counter(points) => points.inner_mut().collapse_non_timestamped(timestamp, merge_scalar_sum),
413 Self::Rate(points, _) => points.inner_mut().collapse_non_timestamped(timestamp, merge_scalar_sum),
414 Self::Gauge(points) => points
416 .inner_mut()
417 .collapse_non_timestamped(timestamp, merge_scalar_latest),
418 Self::Set(points) => points.inner_mut().collapse_non_timestamped(timestamp, merge_set),
420 Self::Histogram(points) => points.inner_mut().collapse_non_timestamped(timestamp, merge_histogram),
421 Self::Distribution(sketches) => sketches.inner_mut().collapse_non_timestamped(timestamp, merge_sketch),
422 }
423 }
424
425 pub fn merge(&mut self, other: Self) {
434 match (self, other) {
435 (Self::Counter(a), Self::Counter(b)) => a.merge(b, merge_scalar_sum),
436 (Self::Rate(a_points, a_interval), Self::Rate(b_points, b_interval)) => {
437 if *a_interval != b_interval {
438 *a_points = b_points;
439 *a_interval = b_interval;
440 } else {
441 a_points.merge(b_points, merge_scalar_sum);
442 }
443 }
444 (Self::Gauge(a), Self::Gauge(b)) => a.merge(b, merge_scalar_latest),
445 (Self::Set(a), Self::Set(b)) => a.merge(b),
446 (Self::Histogram(a), Self::Histogram(b)) => a.merge(b),
447 (Self::Distribution(a), Self::Distribution(b)) => a.merge(b),
448
449 (dest, src) => drop(std::mem::replace(dest, src)),
451 }
452 }
453
454 pub fn as_str(&self) -> &'static str {
456 match self {
457 Self::Counter(_) => "counter",
458 Self::Rate(_, _) => "rate",
459 Self::Gauge(_) => "gauge",
460 Self::Set(_) => "set",
461 Self::Histogram(_) => "histogram",
462 Self::Distribution(_) => "distribution",
463 }
464 }
465
466 pub fn is_serie(&self) -> bool {
468 matches!(
469 self,
470 Self::Counter(_) | Self::Rate(_, _) | Self::Gauge(_) | Self::Set(_) | Self::Histogram(_)
471 )
472 }
473
474 pub fn is_sketch(&self) -> bool {
476 matches!(self, Self::Distribution(_))
477 }
478}
479
480impl fmt::Display for MetricValues {
481 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
482 match self {
483 Self::Counter(points) => write!(f, "{}", points),
484 Self::Rate(points, interval) => write!(f, "{} over {:?}", points, interval),
485 Self::Gauge(points) => write!(f, "{}", points),
486 Self::Set(points) => write!(f, "{}", points),
487 Self::Histogram(points) => write!(f, "{}", points),
488 Self::Distribution(points) => write!(f, "{}", points),
489 }
490 }
491}
492
493fn merge_scalar_sum(dest: &mut OrderedFloat<f64>, src: &mut OrderedFloat<f64>) {
494 *dest += *src;
495}
496
497fn merge_scalar_latest(dest: &mut OrderedFloat<f64>, src: &mut OrderedFloat<f64>) {
498 *dest = *src;
499}
500
501fn merge_set(dest: &mut HashSet<String>, src: &mut HashSet<String>) {
502 dest.extend(src.drain());
503}
504
505fn merge_histogram(dest: &mut Histogram, src: &mut Histogram) {
506 dest.merge(src);
507}
508
509fn merge_sketch(dest: &mut DDSketch, src: &mut DDSketch) {
510 dest.merge(src);
511}
512
513#[cfg(test)]
514mod tests {
515 use std::time::Duration;
516
517 use super::{HistogramPoints, MetricValues, ScalarPoints, SetPoints, SketchPoints};
518
519 #[test]
520 fn merge_counters() {
521 let cases = [
522 (
524 ScalarPoints::from((1, 1.0)),
525 ScalarPoints::from((1, 2.0)),
526 ScalarPoints::from((1, 3.0)),
527 ),
528 (
530 ScalarPoints::from((1, 1.0)),
531 ScalarPoints::from(2.0),
532 ScalarPoints::from([(0, 2.0), (1, 1.0)]),
533 ),
534 (
536 ScalarPoints::from(5.0),
537 ScalarPoints::from(6.0),
538 ScalarPoints::from(11.0),
539 ),
540 ];
541
542 for (a, b, expected) in cases {
543 let mut merged = MetricValues::Counter(a.clone());
544 merged.merge(MetricValues::Counter(b.clone()));
545
546 assert_eq!(
547 merged,
548 MetricValues::Counter(expected.clone()),
549 "merged {} with {}, expected {} but got {}",
550 a,
551 b,
552 expected,
553 merged
554 );
555 }
556 }
557
558 #[test]
559 fn merge_gauges() {
560 let cases = [
561 (
563 ScalarPoints::from((1, 1.0)),
564 ScalarPoints::from((1, 2.0)),
565 ScalarPoints::from((1, 2.0)),
566 ),
567 (
569 ScalarPoints::from((1, 1.0)),
570 ScalarPoints::from(2.0),
571 ScalarPoints::from([(0, 2.0), (1, 1.0)]),
572 ),
573 (
575 ScalarPoints::from(5.0),
576 ScalarPoints::from(6.0),
577 ScalarPoints::from(6.0),
578 ),
579 ];
580
581 for (a, b, expected) in cases {
582 let mut merged = MetricValues::Gauge(a.clone());
583 merged.merge(MetricValues::Gauge(b.clone()));
584
585 assert_eq!(
586 merged,
587 MetricValues::Gauge(expected.clone()),
588 "merged {} with {}, expected {} but got {}",
589 a,
590 b,
591 expected,
592 merged
593 );
594 }
595 }
596
597 #[test]
598 fn merge_rates() {
599 const FIVE_SECS: Duration = Duration::from_secs(5);
600 const TEN_SECS: Duration = Duration::from_secs(10);
601
602 let cases = [
603 (
605 ScalarPoints::from((1, 1.0)),
606 FIVE_SECS,
607 ScalarPoints::from((1, 2.0)),
608 FIVE_SECS,
609 ScalarPoints::from((1, 3.0)),
610 FIVE_SECS,
611 ),
612 (
614 ScalarPoints::from((1, 1.0)),
615 FIVE_SECS,
616 ScalarPoints::from(2.0),
617 FIVE_SECS,
618 ScalarPoints::from([(0, 2.0), (1, 1.0)]),
619 FIVE_SECS,
620 ),
621 (
623 ScalarPoints::from(5.0),
624 FIVE_SECS,
625 ScalarPoints::from(6.0),
626 FIVE_SECS,
627 ScalarPoints::from(11.0),
628 FIVE_SECS,
629 ),
630 (
634 ScalarPoints::from((1, 1.0)),
635 FIVE_SECS,
636 ScalarPoints::from((1, 2.0)),
637 TEN_SECS,
638 ScalarPoints::from((1, 2.0)),
639 TEN_SECS,
640 ),
641 (
642 ScalarPoints::from((1, 3.0)),
643 FIVE_SECS,
644 ScalarPoints::from(4.0),
645 TEN_SECS,
646 ScalarPoints::from((0, 4.0)),
647 TEN_SECS,
648 ),
649 (
650 ScalarPoints::from(7.0),
651 TEN_SECS,
652 ScalarPoints::from(9.0),
653 FIVE_SECS,
654 ScalarPoints::from(9.0),
655 FIVE_SECS,
656 ),
657 ];
658
659 for (a, a_interval, b, b_interval, expected, expected_interval) in cases {
660 let mut merged = MetricValues::Rate(a.clone(), a_interval);
661 merged.merge(MetricValues::Rate(b.clone(), b_interval));
662
663 assert_eq!(
664 merged,
665 MetricValues::Rate(expected.clone(), expected_interval),
666 "merged {}/{:?} with {}/{:?}, expected {} but got {}",
667 a,
668 a_interval,
669 b,
670 b_interval,
671 expected,
672 merged
673 );
674 }
675 }
676
677 #[test]
678 fn merge_sets() {
679 let cases = [
680 (
682 SetPoints::from((1, "foo")),
683 SetPoints::from((1, "bar")),
684 SetPoints::from((1, ["foo", "bar"])),
685 ),
686 (
689 SetPoints::from((1, "foo")),
690 SetPoints::from("bar"),
691 SetPoints::from([(0, "bar"), (1, "foo")]),
692 ),
693 (
695 SetPoints::from("foo"),
696 SetPoints::from("bar"),
697 SetPoints::from(["foo", "bar"]),
698 ),
699 ];
700
701 for (a, b, expected) in cases {
702 let mut merged = MetricValues::Set(a.clone());
703 merged.merge(MetricValues::Set(b.clone()));
704
705 assert_eq!(
706 merged,
707 MetricValues::Set(expected.clone()),
708 "merged {} with {}, expected {} but got {}",
709 a,
710 b,
711 expected,
712 merged
713 );
714 }
715 }
716
717 #[test]
718 fn merge_histograms() {
719 let cases = [
720 (
722 HistogramPoints::from((1, 1.0)),
723 HistogramPoints::from((1, 2.0)),
724 HistogramPoints::from((1, [1.0, 2.0])),
725 ),
726 (
728 HistogramPoints::from((1, 1.0)),
729 HistogramPoints::from(2.0),
730 HistogramPoints::from([(0, 2.0), (1, 1.0)]),
731 ),
732 (
734 HistogramPoints::from(5.0),
735 HistogramPoints::from(6.0),
736 HistogramPoints::from([5.0, 6.0]),
737 ),
738 ];
739
740 for (a, b, expected) in cases {
741 let mut merged = MetricValues::Histogram(a.clone());
742 merged.merge(MetricValues::Histogram(b.clone()));
743
744 assert_eq!(
745 merged,
746 MetricValues::Histogram(expected.clone()),
747 "merged {} with {}, expected {} but got {}",
748 a,
749 b,
750 expected,
751 merged
752 );
753 }
754 }
755
756 #[test]
757 fn merge_distributions() {
758 let cases = [
759 (
761 SketchPoints::from((1, 1.0)),
762 SketchPoints::from((1, 2.0)),
763 SketchPoints::from((1, [1.0, 2.0])),
764 ),
765 (
767 SketchPoints::from((1, 1.0)),
768 SketchPoints::from(2.0),
769 SketchPoints::from([(0, 2.0), (1, 1.0)]),
770 ),
771 (
773 SketchPoints::from(5.0),
774 SketchPoints::from(6.0),
775 SketchPoints::from([5.0, 6.0]),
776 ),
777 ];
778
779 for (a, b, expected) in cases {
780 let mut merged = MetricValues::Distribution(a.clone());
781 merged.merge(MetricValues::Distribution(b.clone()));
782
783 assert_eq!(
784 merged,
785 MetricValues::Distribution(expected.clone()),
786 "merged {} with {}, expected {} but got {}",
787 a,
788 b,
789 expected,
790 merged
791 );
792 }
793 }
794}