stele/
metrics.rs

1use std::fmt;
2
3use datadog_protos::metrics::{MetricPayload, MetricType, SketchPayload};
4use ddsketch_agent::DDSketch;
5use float_cmp::ApproxEqRatio as _;
6use saluki_error::{generic_error, GenericError};
7use serde::{Deserialize, Serialize};
8
9/// A metric's unique identifier.
10#[derive(Clone, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
11pub struct MetricContext {
12    name: String,
13    tags: Vec<String>,
14}
15
16impl MetricContext {
17    /// Returns the name of the context.
18    pub fn name(&self) -> &str {
19        &self.name
20    }
21
22    /// Returns the tags of the context.
23    pub fn tags(&self) -> &[String] {
24        &self.tags
25    }
26
27    /// Consumes this context, returning the name and tags.
28    pub fn into_parts(self) -> (String, Vec<String>) {
29        (self.name, self.tags)
30    }
31}
32
33impl fmt::Display for MetricContext {
34    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
35        write!(f, "{}", self.name)?;
36
37        if !self.tags.is_empty() {
38            write!(f, " {{{}}}", self.tags.join(", "))?;
39        }
40
41        Ok(())
42    }
43}
44
45/// A simplified metric representation.
46#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
47pub struct Metric {
48    context: MetricContext,
49    values: Vec<(u64, MetricValue)>,
50}
51
52impl Metric {
53    /// Returns the context of the metric.
54    pub fn context(&self) -> &MetricContext {
55        &self.context
56    }
57
58    /// Returns the values associated with the metric.
59    pub fn values(&self) -> &[(u64, MetricValue)] {
60        &self.values
61    }
62}
63
64/// A metric value.
65///
66/// # Equality
67///
68/// `MetricValue` implements `PartialEq` and `Eq`, the majority of which involves comparing floating-point (`f64`)
69/// numbers. Comparing floating-point numbers for equality is inherently tricky ([this][bitbanging_io] is just one blog
70/// post/article out of thousands on the subject). In the equality implementation for `MetricValue`, we use a
71/// ratio-based approach.
72///
73/// This means that when comparing two floating-point numbers, we look at their _ratio_ to one another, with an upper
74/// bound on the allowed difference. For example, if we compare 99 to 100, there's a difference of 1% (`1 - (99/100) =
75/// 0.01 = 1%`), while the difference between 99.999 and 100 is only 0.001% (`1 - (99.999/100) = 0.00001 = 0.001%`). As
76/// most comparisons are expected to be close, only differing by a few ULPs (units in the last place) due to slight
77/// differences in how floating-point numbers are implemented between Go and Rust, this approach is sufficient to
78/// compensate for the inherent imprecision while not falling victim to relying on ULPs or epsilon directly, whose
79/// applicability depends on the number range being compared.
80///
81/// Specifically, we compare floating-point numbers using a ratio of `0.00000001` (0.0000001%), meaning the smaller of
82/// the two values being compared must be within 99.999999% to 100% of the larger number, which is sufficiently precise
83/// for our concerns.
84///
85/// [bitbanging_io]: https://bitbashing.io/comparing-floats.html
86#[derive(Clone, Debug, Deserialize, Serialize)]
87#[serde(tag = "mtype")]
88pub enum MetricValue {
89    /// A count.
90    Count {
91        /// The value of the count.
92        value: f64,
93    },
94
95    /// A rate.
96    ///
97    /// Rates are per-second adjusted counts. For example, a count that increased by 100 over 10 seconds would be
98    /// represented as a rate with an interval of 10 (seconds) and a value of 10 (`100 / 10 = 10`).
99    Rate {
100        /// The interval of the rate, in seconds.
101        interval: u64,
102
103        /// The per-second value of the rate.
104        value: f64,
105    },
106
107    /// A gauge.
108    Gauge {
109        /// The value of the gauge.
110        value: f64,
111    },
112
113    /// A sketch.
114    Sketch {
115        /// The sketch data.
116        sketch: DDSketch,
117    },
118}
119
120impl PartialEq for MetricValue {
121    fn eq(&self, other: &Self) -> bool {
122        // When comparing two values, the smaller value cannot deviate by more than 0.0000001% of the larger value.
123        const RATIO_ERROR: f64 = 0.00000001;
124
125        match (self, other) {
126            (MetricValue::Count { value: value_a }, MetricValue::Count { value: value_b }) => {
127                value_a.approx_eq_ratio(value_b, RATIO_ERROR)
128            }
129            (
130                MetricValue::Rate {
131                    interval: interval_a,
132                    value: value_a,
133                },
134                MetricValue::Rate {
135                    interval: interval_b,
136                    value: value_b,
137                },
138            ) => interval_a == interval_b && value_a.approx_eq_ratio(value_b, RATIO_ERROR),
139            (MetricValue::Gauge { value: value_a }, MetricValue::Gauge { value: value_b }) => {
140                value_a.approx_eq_ratio(value_b, RATIO_ERROR)
141            }
142            (MetricValue::Sketch { sketch: sketch_a }, MetricValue::Sketch { sketch: sketch_b }) => {
143                approx_eq_ratio_optional(sketch_a.min(), sketch_b.min(), RATIO_ERROR)
144                    && approx_eq_ratio_optional(sketch_a.max(), sketch_b.max(), RATIO_ERROR)
145                    && approx_eq_ratio_optional(sketch_a.avg(), sketch_b.avg(), RATIO_ERROR)
146                    && approx_eq_ratio_optional(sketch_a.sum(), sketch_b.sum(), RATIO_ERROR)
147                    && sketch_a.count() == sketch_b.count()
148                    && sketch_a.bin_count() == sketch_b.bin_count()
149            }
150            _ => false,
151        }
152    }
153}
154
155impl Eq for MetricValue {}
156
157impl Metric {
158    /// Attempts to parse metrics from a series payload.
159    ///
160    /// # Errors
161    ///
162    /// If the metric payload contains invalid data, an error will be returned.
163    pub fn try_from_series(payload: MetricPayload) -> Result<Vec<Self>, GenericError> {
164        let mut metrics = Vec::new();
165
166        for series in payload.series {
167            let name = series.metric().to_string();
168            let tags = series.tags().iter().map(|tag| tag.to_string()).collect();
169            let mut values = Vec::new();
170
171            match series.type_() {
172                MetricType::UNSPECIFIED => {
173                    return Err(generic_error!("Received metric series with UNSPECIFIED type."));
174                }
175                MetricType::COUNT => {
176                    for point in series.points {
177                        let timestamp = u64::try_from(point.timestamp)
178                            .map_err(|_| generic_error!("Invalid timestamp for point: {}", point.timestamp))?;
179                        values.push((timestamp, MetricValue::Count { value: point.value }));
180                    }
181                }
182                MetricType::RATE => {
183                    for point in series.points {
184                        let timestamp = u64::try_from(point.timestamp)
185                            .map_err(|_| generic_error!("Invalid timestamp for point: {}", point.timestamp))?;
186                        values.push((
187                            timestamp,
188                            MetricValue::Rate {
189                                interval: series.interval as u64,
190                                value: point.value,
191                            },
192                        ));
193                    }
194                }
195                MetricType::GAUGE => {
196                    for point in series.points {
197                        let timestamp = u64::try_from(point.timestamp)
198                            .map_err(|_| generic_error!("Invalid timestamp for point: {}", point.timestamp))?;
199                        values.push((timestamp, MetricValue::Gauge { value: point.value }));
200                    }
201                }
202            }
203
204            metrics.push(Metric {
205                context: MetricContext { name, tags },
206                values,
207            })
208        }
209
210        Ok(metrics)
211    }
212
213    /// Attempts to parse metrics from a sketch payload.
214    ///
215    /// # Errors
216    ///
217    /// If the sketch payload contains invalid data, an error will be returned.
218    pub fn try_from_sketch(payload: SketchPayload) -> Result<Vec<Self>, GenericError> {
219        let mut metrics = Vec::new();
220
221        for sketch in payload.sketches {
222            let name = sketch.metric().to_string();
223            let tags = sketch.tags().iter().map(|tag| tag.to_string()).collect();
224            let mut values = Vec::new();
225
226            for dogsketch in sketch.dogsketches {
227                let timestamp = u64::try_from(dogsketch.ts)
228                    .map_err(|_| generic_error!("Invalid timestamp for sketch: {}", dogsketch.ts))?;
229                let sketch = DDSketch::try_from(dogsketch)
230                    .map_err(|e| generic_error!("Failed to convert DogSketch to DDSketch: {}", e))?;
231                values.push((timestamp, MetricValue::Sketch { sketch }));
232            }
233
234            metrics.push(Metric {
235                context: MetricContext { name, tags },
236                values,
237            })
238        }
239
240        Ok(metrics)
241    }
242}
243
244fn approx_eq_ratio_optional(a: Option<f64>, b: Option<f64>, ratio: f64) -> bool {
245    match (a, b) {
246        (Some(a), Some(b)) => a.approx_eq_ratio(&b, ratio),
247        (None, None) => true,
248        _ => false,
249    }
250}