Skip to main content

stele/
metrics.rs

1use std::fmt;
2
3use datadog_protos::metrics::{MetricPayload, MetricType, SketchPayload};
4use ddsketch::DDSketch;
5use float_cmp::ApproxEqRatio as _;
6use saluki_error::{generic_error, GenericError};
7use serde::{Deserialize, Serialize};
8
9/// JSON envelope for the legacy V1 series intake (`/api/v1/series`).
10#[derive(Deserialize)]
11struct V1SeriesEnvelope {
12    series: Vec<V1Serie>,
13}
14
15/// A single `Serie` entry in a V1 series payload.
16///
17/// Mirrors the Datadog Agent's wire format (see `pkg/metrics/series.go`). `omitempty` fields default to empty.
18// Other fields present in the JSON envelope (`source_type_name`, `unit`) are intentionally not deserialized;
19// serde silently ignores unknown JSON fields, so omitting them here is sufficient.
20#[derive(Deserialize)]
21struct V1Serie {
22    metric: String,
23    points: Vec<(i64, f64)>,
24    #[serde(default)]
25    tags: Vec<String>,
26    #[serde(default)]
27    host: String,
28    #[serde(default)]
29    device: String,
30    #[serde(rename = "type", default)]
31    mtype: String,
32    #[serde(default)]
33    interval: i64,
34}
35
36/// A metric's unique identifier.
37///
38/// The host is normalized into the tag list as a `host:<value>` tag rather than carried as a separate field; the
39/// Datadog backend treats host as a first-class dimension on the time series, equivalent to any other tag. This
40/// keeps comparison logic uniform regardless of which wire format the metric originated from.
41#[derive(Clone, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
42pub struct MetricContext {
43    name: String,
44    tags: Vec<String>,
45}
46
47impl MetricContext {
48    /// Returns the name of the context.
49    pub fn name(&self) -> &str {
50        &self.name
51    }
52
53    /// Returns the tags of the context.
54    ///
55    /// When the underlying payload carried a host, it appears here as a `host:<value>` tag.
56    pub fn tags(&self) -> &[String] {
57        &self.tags
58    }
59
60    /// Consumes this context, returning the name and tags.
61    pub fn into_parts(self) -> (String, Vec<String>) {
62        (self.name, self.tags)
63    }
64}
65
66impl fmt::Display for MetricContext {
67    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
68        write!(f, "{}", self.name)?;
69
70        if !self.tags.is_empty() {
71            write!(f, " {{{}}}", self.tags.join(", "))?;
72        }
73
74        Ok(())
75    }
76}
77
78/// A simplified metric representation.
79#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
80pub struct Metric {
81    context: MetricContext,
82    values: Vec<(u64, MetricValue)>,
83}
84
85impl Metric {
86    /// Returns the context of the metric.
87    pub fn context(&self) -> &MetricContext {
88        &self.context
89    }
90
91    /// Returns the values associated with the metric.
92    pub fn values(&self) -> &[(u64, MetricValue)] {
93        &self.values
94    }
95}
96
97/// A metric value.
98///
99/// # Equality
100///
101/// `MetricValue` implements `PartialEq` and `Eq`, the majority of which involves comparing floating-point (`f64`)
102/// numbers. Comparing floating-point numbers for equality is inherently tricky ([this][bitbanging_io] is just one blog
103/// post/article out of thousands on the subject). In the equality implementation for `MetricValue`, we use a
104/// ratio-based approach.
105///
106/// This means that when comparing two floating-point numbers, we look at their _ratio_ to one another, with an upper
107/// bound on the allowed difference. For example, if we compare 99 to 100, there's a difference of 1% (`1 - (99/100) =
108/// 0.01 = 1%`), while the difference between 99.999 and 100 is only 0.001% (`1 - (99.999/100) = 0.00001 = 0.001%`). As
109/// most comparisons are expected to be close, only differing by a few ULPs (units in the last place) due to slight
110/// differences in how floating-point numbers are implemented between Go and Rust, this approach is sufficient to
111/// compensate for the inherent imprecision while not falling victim to relying on ULPs or epsilon directly, whose
112/// applicability depends on the number range being compared.
113///
114/// Specifically, we compare floating-point numbers using a ratio of `0.00000001` (0.0000001%), meaning the smaller of
115/// the two values being compared must be within 99.999999% to 100% of the larger number, which is sufficiently precise
116/// for our concerns.
117///
118/// [bitbanging_io]: https://bitbashing.io/comparing-floats.html
119#[derive(Clone, Debug, Deserialize, Serialize)]
120#[serde(tag = "mtype")]
121pub enum MetricValue {
122    /// A count.
123    Count {
124        /// The value of the count.
125        value: f64,
126    },
127
128    /// A rate.
129    ///
130    /// Rates are per-second adjusted counts. For example, a count that increased by 100 over 10 seconds would be
131    /// represented as a rate with an interval of 10 (seconds) and a value of 10 (`100 / 10 = 10`).
132    Rate {
133        /// The interval of the rate, in seconds.
134        interval: u64,
135
136        /// The per-second value of the rate.
137        value: f64,
138    },
139
140    /// A gauge.
141    Gauge {
142        /// The value of the gauge.
143        value: f64,
144    },
145
146    /// A sketch.
147    Sketch {
148        /// The sketch data.
149        sketch: DDSketch,
150    },
151}
152
153impl PartialEq for MetricValue {
154    fn eq(&self, other: &Self) -> bool {
155        // When comparing two values, the smaller value cannot deviate by more than 0.0000001% of the larger value.
156        const RATIO_ERROR: f64 = 0.00000001;
157
158        match (self, other) {
159            (MetricValue::Count { value: value_a }, MetricValue::Count { value: value_b }) => {
160                value_a.approx_eq_ratio(value_b, RATIO_ERROR)
161            }
162            (
163                MetricValue::Rate {
164                    interval: interval_a,
165                    value: value_a,
166                },
167                MetricValue::Rate {
168                    interval: interval_b,
169                    value: value_b,
170                },
171            ) => interval_a == interval_b && value_a.approx_eq_ratio(value_b, RATIO_ERROR),
172            (MetricValue::Gauge { value: value_a }, MetricValue::Gauge { value: value_b }) => {
173                value_a.approx_eq_ratio(value_b, RATIO_ERROR)
174            }
175            (MetricValue::Sketch { sketch: sketch_a }, MetricValue::Sketch { sketch: sketch_b }) => {
176                approx_eq_ratio_optional(sketch_a.min(), sketch_b.min(), RATIO_ERROR)
177                    && approx_eq_ratio_optional(sketch_a.max(), sketch_b.max(), RATIO_ERROR)
178                    && approx_eq_ratio_optional(sketch_a.avg(), sketch_b.avg(), RATIO_ERROR)
179                    && approx_eq_ratio_optional(sketch_a.sum(), sketch_b.sum(), RATIO_ERROR)
180                    && sketch_a.count() == sketch_b.count()
181                    && sketch_a.bin_count() == sketch_b.bin_count()
182            }
183            _ => false,
184        }
185    }
186}
187
188impl Eq for MetricValue {}
189
190impl Metric {
191    /// Attempts to parse metrics from a series v1 payload.
192    ///
193    /// V1 keeps a separate `device` JSON field rather than a `device:<value>` tag like the V2 protobuf encoder. To
194    /// keep the post-conversion `stele::Metric` representation comparable between V1 and V2 payloads, this re-injects
195    /// `device:<value>` into the tag list when the JSON `device` field is non-empty.
196    ///
197    /// # Errors
198    ///
199    /// If the JSON can't be deserialized, contains invalid data (for example, an unknown `type`), or has out-of-range
200    /// timestamps, an error is returned.
201    pub fn try_from_series_v1(payload: &[u8]) -> Result<Vec<Self>, GenericError> {
202        let envelope: V1SeriesEnvelope = serde_json::from_slice(payload)
203            .map_err(|e| generic_error!("Failed to parse V1 series JSON payload: {}", e))?;
204
205        let mut metrics = Vec::with_capacity(envelope.series.len());
206
207        for serie in envelope.series {
208            let mut tags = serie.tags;
209            if !serie.host.is_empty() {
210                tags.push(format!("host:{}", serie.host));
211            }
212            if !serie.device.is_empty() {
213                tags.push(format!("device:{}", serie.device));
214            }
215
216            let mut values = Vec::with_capacity(serie.points.len());
217            for (ts, value) in serie.points {
218                let timestamp =
219                    u64::try_from(ts).map_err(|_| generic_error!("Invalid timestamp in V1 series payload: {}", ts))?;
220
221                let metric_value = match serie.mtype.as_str() {
222                    "count" => MetricValue::Count { value },
223                    "rate" => MetricValue::Rate {
224                        interval: serie.interval as u64,
225                        value,
226                    },
227                    "gauge" => MetricValue::Gauge { value },
228                    other => {
229                        return Err(generic_error!(
230                            "Unknown metric type '{}' in V1 series payload (metric '{}')",
231                            other,
232                            serie.metric
233                        ));
234                    }
235                };
236                values.push((timestamp, metric_value));
237            }
238
239            metrics.push(Metric {
240                context: MetricContext {
241                    name: serie.metric,
242                    tags,
243                },
244                values,
245            });
246        }
247
248        Ok(metrics)
249    }
250
251    /// Attempts to parse metrics from a series v2 payload.
252    ///
253    /// # Errors
254    ///
255    /// If the metric payload contains invalid data, an error will be returned.
256    pub fn try_from_series_v2(payload: MetricPayload) -> Result<Vec<Self>, GenericError> {
257        let mut metrics = Vec::new();
258
259        for series in payload.series {
260            let name = series.metric().to_string();
261            let mut tags: Vec<String> = series.tags().iter().map(|tag| tag.to_string()).collect();
262            // V2 protobuf encodes the hostname as a resource with type="host". The Datadog Agent's wire format
263            // contract requires at least one such resource per series, but we still tolerate its absence. The host
264            // is appended to the tag list (as `host:<value>`) to match the backend's representation and to keep
265            // comparison logic uniform with the V1 JSON path.
266            if let Some(host) = series.resources.iter().find(|r| r.type_() == "host") {
267                let host_name = host.name();
268                if !host_name.is_empty() {
269                    tags.push(format!("host:{}", host_name));
270                }
271            }
272            let mut values = Vec::new();
273
274            match series.type_() {
275                MetricType::UNSPECIFIED => {
276                    return Err(generic_error!("Received metric series with UNSPECIFIED type."));
277                }
278                MetricType::COUNT => {
279                    for point in series.points {
280                        let timestamp = u64::try_from(point.timestamp)
281                            .map_err(|_| generic_error!("Invalid timestamp for point: {}", point.timestamp))?;
282                        values.push((timestamp, MetricValue::Count { value: point.value }));
283                    }
284                }
285                MetricType::RATE => {
286                    for point in series.points {
287                        let timestamp = u64::try_from(point.timestamp)
288                            .map_err(|_| generic_error!("Invalid timestamp for point: {}", point.timestamp))?;
289                        values.push((
290                            timestamp,
291                            MetricValue::Rate {
292                                interval: series.interval as u64,
293                                value: point.value,
294                            },
295                        ));
296                    }
297                }
298                MetricType::GAUGE => {
299                    for point in series.points {
300                        let timestamp = u64::try_from(point.timestamp)
301                            .map_err(|_| generic_error!("Invalid timestamp for point: {}", point.timestamp))?;
302                        values.push((timestamp, MetricValue::Gauge { value: point.value }));
303                    }
304                }
305            }
306
307            metrics.push(Metric {
308                context: MetricContext { name, tags },
309                values,
310            })
311        }
312
313        Ok(metrics)
314    }
315
316    /// Attempts to parse metrics from a sketch payload.
317    ///
318    /// # Errors
319    ///
320    /// If the sketch payload contains invalid data, an error will be returned.
321    pub fn try_from_sketch(payload: SketchPayload) -> Result<Vec<Self>, GenericError> {
322        let mut metrics = Vec::new();
323
324        for sketch in payload.sketches {
325            let name = sketch.metric().to_string();
326            let mut tags: Vec<String> = sketch.tags().iter().map(|tag| tag.to_string()).collect();
327            // V2 sketches carry the host on the sketch message itself rather than via a resources list. Fold it
328            // into the tag list (as `host:<value>`) for comparison parity with the V1 JSON / V2 series paths.
329            let host = sketch.host();
330            if !host.is_empty() {
331                tags.push(format!("host:{}", host));
332            }
333            let mut values = Vec::new();
334
335            for dogsketch in sketch.dogsketches {
336                let timestamp = u64::try_from(dogsketch.ts)
337                    .map_err(|_| generic_error!("Invalid timestamp for sketch: {}", dogsketch.ts))?;
338                let sketch = DDSketch::try_from(dogsketch)
339                    .map_err(|e| generic_error!("Failed to convert DogSketch to DDSketch: {}", e))?;
340                values.push((timestamp, MetricValue::Sketch { sketch }));
341            }
342
343            metrics.push(Metric {
344                context: MetricContext { name, tags },
345                values,
346            })
347        }
348
349        Ok(metrics)
350    }
351}
352
353fn approx_eq_ratio_optional(a: Option<f64>, b: Option<f64>, ratio: f64) -> bool {
354    match (a, b) {
355        (Some(a), Some(b)) => a.approx_eq_ratio(&b, ratio),
356        (None, None) => true,
357        _ => false,
358    }
359}
360
361#[cfg(test)]
362mod tests {
363    use super::*;
364
365    #[test]
366    fn try_from_series_v1_parses_count_gauge_rate() {
367        let body = br#"{"series":[
368            {"metric":"a.count","points":[[100,5.0]],"tags":["env:prod"],"host":"h","type":"count","interval":0},
369            {"metric":"a.gauge","points":[[101,12.0]],"tags":[],"host":"h","type":"gauge","interval":0},
370            {"metric":"a.rate","points":[[102,3.0]],"tags":[],"host":"h","type":"rate","interval":10}
371        ]}"#;
372
373        let metrics = Metric::try_from_series_v1(body).expect("parse should succeed");
374        assert_eq!(metrics.len(), 3);
375
376        assert_eq!(metrics[0].context.name, "a.count");
377        assert!(metrics[0].context.tags.contains(&"env:prod".to_string()));
378        assert!(metrics[0].context.tags.contains(&"host:h".to_string()));
379        assert_eq!(metrics[0].values, vec![(100, MetricValue::Count { value: 5.0 })]);
380
381        assert_eq!(metrics[1].context.name, "a.gauge");
382        assert_eq!(metrics[1].context.tags, vec!["host:h".to_string()]);
383        assert_eq!(metrics[1].values, vec![(101, MetricValue::Gauge { value: 12.0 })]);
384
385        assert_eq!(metrics[2].context.name, "a.rate");
386        assert_eq!(metrics[2].context.tags, vec!["host:h".to_string()]);
387        assert_eq!(
388            metrics[2].values,
389            vec![(
390                102,
391                MetricValue::Rate {
392                    interval: 10,
393                    value: 3.0
394                }
395            )]
396        );
397    }
398
399    #[test]
400    fn try_from_series_v1_omits_host_tag_when_empty() {
401        // A payload without `host` is uncommon in practice but the parser must remain permissive — it omits the
402        // `host:` tag rather than emitting an empty one.
403        let body = br#"{"series":[
404            {"metric":"m","points":[[1,1.0]],"tags":[],"type":"count","interval":0}
405        ]}"#;
406
407        let metrics = Metric::try_from_series_v1(body).expect("parse should succeed");
408        assert!(!metrics[0].context.tags.iter().any(|t| t.starts_with("host:")));
409    }
410
411    #[test]
412    fn try_from_series_v1_reinjects_device_tag() {
413        let body = br#"{"series":[
414            {"metric":"m","points":[[1,1.0]],"tags":["env:prod"],"host":"h","device":"eth0","type":"count","interval":0}
415        ]}"#;
416
417        let metrics = Metric::try_from_series_v1(body).expect("parse should succeed");
418        assert!(metrics[0].context.tags.contains(&"env:prod".to_string()));
419        assert!(metrics[0].context.tags.contains(&"device:eth0".to_string()));
420    }
421
422    #[test]
423    fn try_from_series_v1_rejects_unknown_type() {
424        let body = br#"{"series":[
425            {"metric":"m","points":[[1,1.0]],"tags":[],"host":"h","type":"weird","interval":0}
426        ]}"#;
427
428        assert!(Metric::try_from_series_v1(body).is_err());
429    }
430
431    #[test]
432    fn try_from_series_v2_folds_host_into_tags() {
433        use datadog_protos::metrics::metric_payload::{
434            MetricPoint, MetricSeries, MetricType as ProtoMetricType, Resource,
435        };
436        use datadog_protos::metrics::MetricPayload;
437
438        let mut payload = MetricPayload::new();
439
440        let mut series = MetricSeries::new();
441        series.set_metric("my.metric".into());
442        series.set_type(ProtoMetricType::COUNT);
443        series.tags.push("env:prod".into());
444
445        let mut host_res = Resource::new();
446        host_res.set_type("host".into());
447        host_res.set_name("server-1".into());
448        series.resources.push(host_res);
449
450        // Non-host resources must not be folded into tags.
451        let mut device_res = Resource::new();
452        device_res.set_type("device".into());
453        device_res.set_name("eth0".into());
454        series.resources.push(device_res);
455
456        let mut point = MetricPoint::new();
457        point.value = 1.0;
458        point.timestamp = 1;
459        series.points.push(point);
460
461        payload.series.push(series);
462
463        let metrics = Metric::try_from_series_v2(payload).expect("parse should succeed");
464        assert_eq!(metrics.len(), 1);
465        assert!(metrics[0].context.tags.contains(&"host:server-1".to_string()));
466        assert!(metrics[0].context.tags.contains(&"env:prod".to_string()));
467        assert!(!metrics[0].context.tags.iter().any(|t| t.starts_with("device:")));
468    }
469
470    #[test]
471    fn try_from_sketch_folds_host_into_tags() {
472        use datadog_protos::metrics::sketch_payload::Sketch;
473        use datadog_protos::metrics::SketchPayload;
474
475        let mut payload = SketchPayload::new();
476        let mut sketch = Sketch::new();
477        sketch.set_metric("my.metric".into());
478        sketch.set_host("server-1".into());
479        sketch.tags.push("env:prod".into());
480        payload.sketches.push(sketch);
481
482        let metrics = Metric::try_from_sketch(payload).expect("parse should succeed");
483        assert_eq!(metrics.len(), 1);
484        assert!(metrics[0].context.tags.contains(&"host:server-1".to_string()));
485        assert!(metrics[0].context.tags.contains(&"env:prod".to_string()));
486    }
487}