1use std::fmt;
2
3use datadog_protos::metrics::{MetricPayload, MetricType, SketchPayload};
4use ddsketch_agent::DDSketch;
5use float_cmp::ApproxEqRatio as _;
6use saluki_error::{generic_error, GenericError};
7use serde::{Deserialize, Serialize};
8
9#[derive(Clone, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
11pub struct MetricContext {
12 name: String,
13 tags: Vec<String>,
14}
15
16impl MetricContext {
17 pub fn name(&self) -> &str {
19 &self.name
20 }
21
22 pub fn tags(&self) -> &[String] {
24 &self.tags
25 }
26
27 pub fn into_parts(self) -> (String, Vec<String>) {
29 (self.name, self.tags)
30 }
31}
32
33impl fmt::Display for MetricContext {
34 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
35 write!(f, "{}", self.name)?;
36
37 if !self.tags.is_empty() {
38 write!(f, " {{{}}}", self.tags.join(", "))?;
39 }
40
41 Ok(())
42 }
43}
44
45#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
47pub struct Metric {
48 context: MetricContext,
49 values: Vec<(u64, MetricValue)>,
50}
51
52impl Metric {
53 pub fn context(&self) -> &MetricContext {
55 &self.context
56 }
57
58 pub fn values(&self) -> &[(u64, MetricValue)] {
60 &self.values
61 }
62}
63
64#[derive(Clone, Debug, Deserialize, Serialize)]
87#[serde(tag = "mtype")]
88pub enum MetricValue {
89 Count {
91 value: f64,
93 },
94
95 Rate {
100 interval: u64,
102
103 value: f64,
105 },
106
107 Gauge {
109 value: f64,
111 },
112
113 Sketch {
115 sketch: DDSketch,
117 },
118}
119
120impl PartialEq for MetricValue {
121 fn eq(&self, other: &Self) -> bool {
122 const RATIO_ERROR: f64 = 0.00000001;
124
125 match (self, other) {
126 (MetricValue::Count { value: value_a }, MetricValue::Count { value: value_b }) => {
127 value_a.approx_eq_ratio(value_b, RATIO_ERROR)
128 }
129 (
130 MetricValue::Rate {
131 interval: interval_a,
132 value: value_a,
133 },
134 MetricValue::Rate {
135 interval: interval_b,
136 value: value_b,
137 },
138 ) => interval_a == interval_b && value_a.approx_eq_ratio(value_b, RATIO_ERROR),
139 (MetricValue::Gauge { value: value_a }, MetricValue::Gauge { value: value_b }) => {
140 value_a.approx_eq_ratio(value_b, RATIO_ERROR)
141 }
142 (MetricValue::Sketch { sketch: sketch_a }, MetricValue::Sketch { sketch: sketch_b }) => {
143 approx_eq_ratio_optional(sketch_a.min(), sketch_b.min(), RATIO_ERROR)
144 && approx_eq_ratio_optional(sketch_a.max(), sketch_b.max(), RATIO_ERROR)
145 && approx_eq_ratio_optional(sketch_a.avg(), sketch_b.avg(), RATIO_ERROR)
146 && approx_eq_ratio_optional(sketch_a.sum(), sketch_b.sum(), RATIO_ERROR)
147 && sketch_a.count() == sketch_b.count()
148 && sketch_a.bin_count() == sketch_b.bin_count()
149 }
150 _ => false,
151 }
152 }
153}
154
155impl Eq for MetricValue {}
156
157impl Metric {
158 pub fn try_from_series(payload: MetricPayload) -> Result<Vec<Self>, GenericError> {
164 let mut metrics = Vec::new();
165
166 for series in payload.series {
167 let name = series.metric().to_string();
168 let tags = series.tags().iter().map(|tag| tag.to_string()).collect();
169 let mut values = Vec::new();
170
171 match series.type_() {
172 MetricType::UNSPECIFIED => {
173 return Err(generic_error!("Received metric series with UNSPECIFIED type."));
174 }
175 MetricType::COUNT => {
176 for point in series.points {
177 let timestamp = u64::try_from(point.timestamp)
178 .map_err(|_| generic_error!("Invalid timestamp for point: {}", point.timestamp))?;
179 values.push((timestamp, MetricValue::Count { value: point.value }));
180 }
181 }
182 MetricType::RATE => {
183 for point in series.points {
184 let timestamp = u64::try_from(point.timestamp)
185 .map_err(|_| generic_error!("Invalid timestamp for point: {}", point.timestamp))?;
186 values.push((
187 timestamp,
188 MetricValue::Rate {
189 interval: series.interval as u64,
190 value: point.value,
191 },
192 ));
193 }
194 }
195 MetricType::GAUGE => {
196 for point in series.points {
197 let timestamp = u64::try_from(point.timestamp)
198 .map_err(|_| generic_error!("Invalid timestamp for point: {}", point.timestamp))?;
199 values.push((timestamp, MetricValue::Gauge { value: point.value }));
200 }
201 }
202 }
203
204 metrics.push(Metric {
205 context: MetricContext { name, tags },
206 values,
207 })
208 }
209
210 Ok(metrics)
211 }
212
213 pub fn try_from_sketch(payload: SketchPayload) -> Result<Vec<Self>, GenericError> {
219 let mut metrics = Vec::new();
220
221 for sketch in payload.sketches {
222 let name = sketch.metric().to_string();
223 let tags = sketch.tags().iter().map(|tag| tag.to_string()).collect();
224 let mut values = Vec::new();
225
226 for dogsketch in sketch.dogsketches {
227 let timestamp = u64::try_from(dogsketch.ts)
228 .map_err(|_| generic_error!("Invalid timestamp for sketch: {}", dogsketch.ts))?;
229 let sketch = DDSketch::try_from(dogsketch)
230 .map_err(|e| generic_error!("Failed to convert DogSketch to DDSketch: {}", e))?;
231 values.push((timestamp, MetricValue::Sketch { sketch }));
232 }
233
234 metrics.push(Metric {
235 context: MetricContext { name, tags },
236 values,
237 })
238 }
239
240 Ok(metrics)
241 }
242}
243
244fn approx_eq_ratio_optional(a: Option<f64>, b: Option<f64>, ratio: f64) -> bool {
245 match (a, b) {
246 (Some(a), Some(b)) => a.approx_eq_ratio(&b, ratio),
247 (None, None) => true,
248 _ => false,
249 }
250}