1#![deny(warnings)]
4#![deny(missing_docs)]
5
6use std::fmt;
7
8use datadog_protos::metrics::{MetricPayload, MetricType, SketchPayload};
9use ddsketch_agent::DDSketch;
10use float_cmp::ApproxEqRatio as _;
11use saluki_error::{generic_error, GenericError};
12use serde::{Deserialize, Serialize};
13
14#[derive(Clone, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
16pub struct MetricContext {
17 name: String,
18 tags: Vec<String>,
19}
20
21impl MetricContext {
22 pub fn name(&self) -> &str {
24 &self.name
25 }
26
27 pub fn tags(&self) -> &[String] {
29 &self.tags
30 }
31
32 pub fn into_parts(self) -> (String, Vec<String>) {
34 (self.name, self.tags)
35 }
36}
37
38impl fmt::Display for MetricContext {
39 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
40 write!(f, "{}", self.name)?;
41
42 if !self.tags.is_empty() {
43 write!(f, " {{{}}}", self.tags.join(", "))?;
44 }
45
46 Ok(())
47 }
48}
49
50#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
52pub struct Metric {
53 context: MetricContext,
54 values: Vec<(u64, MetricValue)>,
55}
56
57impl Metric {
58 pub fn context(&self) -> &MetricContext {
60 &self.context
61 }
62
63 pub fn values(&self) -> &[(u64, MetricValue)] {
65 &self.values
66 }
67}
68
69#[derive(Clone, Debug, Deserialize, Serialize)]
92#[serde(tag = "mtype")]
93pub enum MetricValue {
94 Count {
96 value: f64,
98 },
99
100 Rate {
105 interval: u64,
107
108 value: f64,
110 },
111
112 Gauge {
114 value: f64,
116 },
117
118 Sketch {
120 sketch: DDSketch,
122 },
123}
124
125impl PartialEq for MetricValue {
126 fn eq(&self, other: &Self) -> bool {
127 const RATIO_ERROR: f64 = 0.00000001;
129
130 match (self, other) {
131 (MetricValue::Count { value: value_a }, MetricValue::Count { value: value_b }) => {
132 value_a.approx_eq_ratio(value_b, RATIO_ERROR)
133 }
134 (
135 MetricValue::Rate {
136 interval: interval_a,
137 value: value_a,
138 },
139 MetricValue::Rate {
140 interval: interval_b,
141 value: value_b,
142 },
143 ) => interval_a == interval_b && value_a.approx_eq_ratio(value_b, RATIO_ERROR),
144 (MetricValue::Gauge { value: value_a }, MetricValue::Gauge { value: value_b }) => {
145 value_a.approx_eq_ratio(value_b, RATIO_ERROR)
146 }
147 (MetricValue::Sketch { sketch: sketch_a }, MetricValue::Sketch { sketch: sketch_b }) => {
148 approx_eq_ratio_optional(sketch_a.min(), sketch_b.min(), RATIO_ERROR)
149 && approx_eq_ratio_optional(sketch_a.max(), sketch_b.max(), RATIO_ERROR)
150 && approx_eq_ratio_optional(sketch_a.avg(), sketch_b.avg(), RATIO_ERROR)
151 && approx_eq_ratio_optional(sketch_a.sum(), sketch_b.sum(), RATIO_ERROR)
152 && sketch_a.count() == sketch_b.count()
153 && sketch_a.bin_count() == sketch_b.bin_count()
154 }
155 _ => false,
156 }
157 }
158}
159
160impl Eq for MetricValue {}
161
162impl Metric {
163 pub fn try_from_series(payload: MetricPayload) -> Result<Vec<Self>, GenericError> {
169 let mut metrics = Vec::new();
170
171 for series in payload.series {
172 let name = series.metric().to_string();
173 let tags = series.tags().iter().map(|tag| tag.to_string()).collect();
174 let mut values = Vec::new();
175
176 match series.type_() {
177 MetricType::UNSPECIFIED => {
178 return Err(generic_error!("Received metric series with UNSPECIFIED type."));
179 }
180 MetricType::COUNT => {
181 for point in series.points {
182 let timestamp = u64::try_from(point.timestamp)
183 .map_err(|_| generic_error!("Invalid timestamp for point: {}", point.timestamp))?;
184 values.push((timestamp, MetricValue::Count { value: point.value }));
185 }
186 }
187 MetricType::RATE => {
188 for point in series.points {
189 let timestamp = u64::try_from(point.timestamp)
190 .map_err(|_| generic_error!("Invalid timestamp for point: {}", point.timestamp))?;
191 values.push((
192 timestamp,
193 MetricValue::Rate {
194 interval: series.interval as u64,
195 value: point.value,
196 },
197 ));
198 }
199 }
200 MetricType::GAUGE => {
201 for point in series.points {
202 let timestamp = u64::try_from(point.timestamp)
203 .map_err(|_| generic_error!("Invalid timestamp for point: {}", point.timestamp))?;
204 values.push((timestamp, MetricValue::Gauge { value: point.value }));
205 }
206 }
207 }
208
209 metrics.push(Metric {
210 context: MetricContext { name, tags },
211 values,
212 })
213 }
214
215 Ok(metrics)
216 }
217
218 pub fn try_from_sketch(payload: SketchPayload) -> Result<Vec<Self>, GenericError> {
224 let mut metrics = Vec::new();
225
226 for sketch in payload.sketches {
227 let name = sketch.metric().to_string();
228 let tags = sketch.tags().iter().map(|tag| tag.to_string()).collect();
229 let mut values = Vec::new();
230
231 for dogsketch in sketch.dogsketches {
232 let timestamp = u64::try_from(dogsketch.ts)
233 .map_err(|_| generic_error!("Invalid timestamp for sketch: {}", dogsketch.ts))?;
234 let sketch = DDSketch::try_from(dogsketch)
235 .map_err(|e| generic_error!("Failed to convert DogSketch to DDSketch: {}", e))?;
236 values.push((timestamp, MetricValue::Sketch { sketch }));
237 }
238
239 metrics.push(Metric {
240 context: MetricContext { name, tags },
241 values,
242 })
243 }
244
245 Ok(metrics)
246 }
247}
248
249fn approx_eq_ratio_optional(a: Option<f64>, b: Option<f64>, ratio: f64) -> bool {
250 match (a, b) {
251 (Some(a), Some(b)) => a.approx_eq_ratio(&b, ratio),
252 (None, None) => true,
253 _ => false,
254 }
255}