1use std::fmt;
2
3use datadog_protos::metrics::{MetricPayload, MetricType, SketchPayload};
4use ddsketch::DDSketch;
5use float_cmp::ApproxEqRatio as _;
6use saluki_error::{generic_error, GenericError};
7use serde::{Deserialize, Serialize};
8
9#[derive(Deserialize)]
11struct V1SeriesEnvelope {
12 series: Vec<V1Serie>,
13}
14
15#[derive(Deserialize)]
21struct V1Serie {
22 metric: String,
23 points: Vec<(i64, f64)>,
24 #[serde(default)]
25 tags: Vec<String>,
26 #[serde(default)]
27 host: String,
28 #[serde(default)]
29 device: String,
30 #[serde(rename = "type", default)]
31 mtype: String,
32 #[serde(default)]
33 interval: i64,
34}
35
36#[derive(Clone, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
42pub struct MetricContext {
43 name: String,
44 tags: Vec<String>,
45}
46
47impl MetricContext {
48 pub fn name(&self) -> &str {
50 &self.name
51 }
52
53 pub fn tags(&self) -> &[String] {
57 &self.tags
58 }
59
60 pub fn into_parts(self) -> (String, Vec<String>) {
62 (self.name, self.tags)
63 }
64}
65
66impl fmt::Display for MetricContext {
67 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
68 write!(f, "{}", self.name)?;
69
70 if !self.tags.is_empty() {
71 write!(f, " {{{}}}", self.tags.join(", "))?;
72 }
73
74 Ok(())
75 }
76}
77
78#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
80pub struct Metric {
81 context: MetricContext,
82 values: Vec<(u64, MetricValue)>,
83}
84
85impl Metric {
86 pub fn context(&self) -> &MetricContext {
88 &self.context
89 }
90
91 pub fn values(&self) -> &[(u64, MetricValue)] {
93 &self.values
94 }
95}
96
97#[derive(Clone, Debug, Deserialize, Serialize)]
120#[serde(tag = "mtype")]
121pub enum MetricValue {
122 Count {
124 value: f64,
126 },
127
128 Rate {
133 interval: u64,
135
136 value: f64,
138 },
139
140 Gauge {
142 value: f64,
144 },
145
146 Sketch {
148 sketch: DDSketch,
150 },
151}
152
153impl PartialEq for MetricValue {
154 fn eq(&self, other: &Self) -> bool {
155 const RATIO_ERROR: f64 = 0.00000001;
157
158 match (self, other) {
159 (MetricValue::Count { value: value_a }, MetricValue::Count { value: value_b }) => {
160 value_a.approx_eq_ratio(value_b, RATIO_ERROR)
161 }
162 (
163 MetricValue::Rate {
164 interval: interval_a,
165 value: value_a,
166 },
167 MetricValue::Rate {
168 interval: interval_b,
169 value: value_b,
170 },
171 ) => interval_a == interval_b && value_a.approx_eq_ratio(value_b, RATIO_ERROR),
172 (MetricValue::Gauge { value: value_a }, MetricValue::Gauge { value: value_b }) => {
173 value_a.approx_eq_ratio(value_b, RATIO_ERROR)
174 }
175 (MetricValue::Sketch { sketch: sketch_a }, MetricValue::Sketch { sketch: sketch_b }) => {
176 approx_eq_ratio_optional(sketch_a.min(), sketch_b.min(), RATIO_ERROR)
177 && approx_eq_ratio_optional(sketch_a.max(), sketch_b.max(), RATIO_ERROR)
178 && approx_eq_ratio_optional(sketch_a.avg(), sketch_b.avg(), RATIO_ERROR)
179 && approx_eq_ratio_optional(sketch_a.sum(), sketch_b.sum(), RATIO_ERROR)
180 && sketch_a.count() == sketch_b.count()
181 && sketch_a.bin_count() == sketch_b.bin_count()
182 }
183 _ => false,
184 }
185 }
186}
187
188impl Eq for MetricValue {}
189
190impl Metric {
191 pub fn try_from_series_v1(payload: &[u8]) -> Result<Vec<Self>, GenericError> {
202 let envelope: V1SeriesEnvelope = serde_json::from_slice(payload)
203 .map_err(|e| generic_error!("Failed to parse V1 series JSON payload: {}", e))?;
204
205 let mut metrics = Vec::with_capacity(envelope.series.len());
206
207 for serie in envelope.series {
208 let mut tags = serie.tags;
209 if !serie.host.is_empty() {
210 tags.push(format!("host:{}", serie.host));
211 }
212 if !serie.device.is_empty() {
213 tags.push(format!("device:{}", serie.device));
214 }
215
216 let mut values = Vec::with_capacity(serie.points.len());
217 for (ts, value) in serie.points {
218 let timestamp =
219 u64::try_from(ts).map_err(|_| generic_error!("Invalid timestamp in V1 series payload: {}", ts))?;
220
221 let metric_value = match serie.mtype.as_str() {
222 "count" => MetricValue::Count { value },
223 "rate" => MetricValue::Rate {
224 interval: serie.interval as u64,
225 value,
226 },
227 "gauge" => MetricValue::Gauge { value },
228 other => {
229 return Err(generic_error!(
230 "Unknown metric type '{}' in V1 series payload (metric '{}')",
231 other,
232 serie.metric
233 ));
234 }
235 };
236 values.push((timestamp, metric_value));
237 }
238
239 metrics.push(Metric {
240 context: MetricContext {
241 name: serie.metric,
242 tags,
243 },
244 values,
245 });
246 }
247
248 Ok(metrics)
249 }
250
251 pub fn try_from_series_v2(payload: MetricPayload) -> Result<Vec<Self>, GenericError> {
257 let mut metrics = Vec::new();
258
259 for series in payload.series {
260 let name = series.metric().to_string();
261 let mut tags: Vec<String> = series.tags().iter().map(|tag| tag.to_string()).collect();
262 if let Some(host) = series.resources.iter().find(|r| r.type_() == "host") {
267 let host_name = host.name();
268 if !host_name.is_empty() {
269 tags.push(format!("host:{}", host_name));
270 }
271 }
272 let mut values = Vec::new();
273
274 match series.type_() {
275 MetricType::UNSPECIFIED => {
276 return Err(generic_error!("Received metric series with UNSPECIFIED type."));
277 }
278 MetricType::COUNT => {
279 for point in series.points {
280 let timestamp = u64::try_from(point.timestamp)
281 .map_err(|_| generic_error!("Invalid timestamp for point: {}", point.timestamp))?;
282 values.push((timestamp, MetricValue::Count { value: point.value }));
283 }
284 }
285 MetricType::RATE => {
286 for point in series.points {
287 let timestamp = u64::try_from(point.timestamp)
288 .map_err(|_| generic_error!("Invalid timestamp for point: {}", point.timestamp))?;
289 values.push((
290 timestamp,
291 MetricValue::Rate {
292 interval: series.interval as u64,
293 value: point.value,
294 },
295 ));
296 }
297 }
298 MetricType::GAUGE => {
299 for point in series.points {
300 let timestamp = u64::try_from(point.timestamp)
301 .map_err(|_| generic_error!("Invalid timestamp for point: {}", point.timestamp))?;
302 values.push((timestamp, MetricValue::Gauge { value: point.value }));
303 }
304 }
305 }
306
307 metrics.push(Metric {
308 context: MetricContext { name, tags },
309 values,
310 })
311 }
312
313 Ok(metrics)
314 }
315
316 pub fn try_from_sketch(payload: SketchPayload) -> Result<Vec<Self>, GenericError> {
322 let mut metrics = Vec::new();
323
324 for sketch in payload.sketches {
325 let name = sketch.metric().to_string();
326 let mut tags: Vec<String> = sketch.tags().iter().map(|tag| tag.to_string()).collect();
327 let host = sketch.host();
330 if !host.is_empty() {
331 tags.push(format!("host:{}", host));
332 }
333 let mut values = Vec::new();
334
335 for dogsketch in sketch.dogsketches {
336 let timestamp = u64::try_from(dogsketch.ts)
337 .map_err(|_| generic_error!("Invalid timestamp for sketch: {}", dogsketch.ts))?;
338 let sketch = DDSketch::try_from(dogsketch)
339 .map_err(|e| generic_error!("Failed to convert DogSketch to DDSketch: {}", e))?;
340 values.push((timestamp, MetricValue::Sketch { sketch }));
341 }
342
343 metrics.push(Metric {
344 context: MetricContext { name, tags },
345 values,
346 })
347 }
348
349 Ok(metrics)
350 }
351}
352
353fn approx_eq_ratio_optional(a: Option<f64>, b: Option<f64>, ratio: f64) -> bool {
354 match (a, b) {
355 (Some(a), Some(b)) => a.approx_eq_ratio(&b, ratio),
356 (None, None) => true,
357 _ => false,
358 }
359}
360
361#[cfg(test)]
362mod tests {
363 use super::*;
364
365 #[test]
366 fn try_from_series_v1_parses_count_gauge_rate() {
367 let body = br#"{"series":[
368 {"metric":"a.count","points":[[100,5.0]],"tags":["env:prod"],"host":"h","type":"count","interval":0},
369 {"metric":"a.gauge","points":[[101,12.0]],"tags":[],"host":"h","type":"gauge","interval":0},
370 {"metric":"a.rate","points":[[102,3.0]],"tags":[],"host":"h","type":"rate","interval":10}
371 ]}"#;
372
373 let metrics = Metric::try_from_series_v1(body).expect("parse should succeed");
374 assert_eq!(metrics.len(), 3);
375
376 assert_eq!(metrics[0].context.name, "a.count");
377 assert!(metrics[0].context.tags.contains(&"env:prod".to_string()));
378 assert!(metrics[0].context.tags.contains(&"host:h".to_string()));
379 assert_eq!(metrics[0].values, vec![(100, MetricValue::Count { value: 5.0 })]);
380
381 assert_eq!(metrics[1].context.name, "a.gauge");
382 assert_eq!(metrics[1].context.tags, vec!["host:h".to_string()]);
383 assert_eq!(metrics[1].values, vec![(101, MetricValue::Gauge { value: 12.0 })]);
384
385 assert_eq!(metrics[2].context.name, "a.rate");
386 assert_eq!(metrics[2].context.tags, vec!["host:h".to_string()]);
387 assert_eq!(
388 metrics[2].values,
389 vec![(
390 102,
391 MetricValue::Rate {
392 interval: 10,
393 value: 3.0
394 }
395 )]
396 );
397 }
398
399 #[test]
400 fn try_from_series_v1_omits_host_tag_when_empty() {
401 let body = br#"{"series":[
404 {"metric":"m","points":[[1,1.0]],"tags":[],"type":"count","interval":0}
405 ]}"#;
406
407 let metrics = Metric::try_from_series_v1(body).expect("parse should succeed");
408 assert!(!metrics[0].context.tags.iter().any(|t| t.starts_with("host:")));
409 }
410
411 #[test]
412 fn try_from_series_v1_reinjects_device_tag() {
413 let body = br#"{"series":[
414 {"metric":"m","points":[[1,1.0]],"tags":["env:prod"],"host":"h","device":"eth0","type":"count","interval":0}
415 ]}"#;
416
417 let metrics = Metric::try_from_series_v1(body).expect("parse should succeed");
418 assert!(metrics[0].context.tags.contains(&"env:prod".to_string()));
419 assert!(metrics[0].context.tags.contains(&"device:eth0".to_string()));
420 }
421
422 #[test]
423 fn try_from_series_v1_rejects_unknown_type() {
424 let body = br#"{"series":[
425 {"metric":"m","points":[[1,1.0]],"tags":[],"host":"h","type":"weird","interval":0}
426 ]}"#;
427
428 assert!(Metric::try_from_series_v1(body).is_err());
429 }
430
431 #[test]
432 fn try_from_series_v2_folds_host_into_tags() {
433 use datadog_protos::metrics::metric_payload::{
434 MetricPoint, MetricSeries, MetricType as ProtoMetricType, Resource,
435 };
436 use datadog_protos::metrics::MetricPayload;
437
438 let mut payload = MetricPayload::new();
439
440 let mut series = MetricSeries::new();
441 series.set_metric("my.metric".into());
442 series.set_type(ProtoMetricType::COUNT);
443 series.tags.push("env:prod".into());
444
445 let mut host_res = Resource::new();
446 host_res.set_type("host".into());
447 host_res.set_name("server-1".into());
448 series.resources.push(host_res);
449
450 let mut device_res = Resource::new();
452 device_res.set_type("device".into());
453 device_res.set_name("eth0".into());
454 series.resources.push(device_res);
455
456 let mut point = MetricPoint::new();
457 point.value = 1.0;
458 point.timestamp = 1;
459 series.points.push(point);
460
461 payload.series.push(series);
462
463 let metrics = Metric::try_from_series_v2(payload).expect("parse should succeed");
464 assert_eq!(metrics.len(), 1);
465 assert!(metrics[0].context.tags.contains(&"host:server-1".to_string()));
466 assert!(metrics[0].context.tags.contains(&"env:prod".to_string()));
467 assert!(!metrics[0].context.tags.iter().any(|t| t.starts_with("device:")));
468 }
469
470 #[test]
471 fn try_from_sketch_folds_host_into_tags() {
472 use datadog_protos::metrics::sketch_payload::Sketch;
473 use datadog_protos::metrics::SketchPayload;
474
475 let mut payload = SketchPayload::new();
476 let mut sketch = Sketch::new();
477 sketch.set_metric("my.metric".into());
478 sketch.set_host("server-1".into());
479 sketch.tags.push("env:prod".into());
480 payload.sketches.push(sketch);
481
482 let metrics = Metric::try_from_sketch(payload).expect("parse should succeed");
483 assert_eq!(metrics.len(), 1);
484 assert!(metrics[0].context.tags.contains(&"host:server-1".to_string()));
485 assert!(metrics[0].context.tags.contains(&"env:prod".to_string()));
486 }
487}