saluki_io/deser/codec/dogstatsd/
metric.rs

1use nom::{
2    branch::alt,
3    bytes::complete::{tag, take_while1},
4    combinator::{all_consuming, map, map_res},
5    error::{Error, ErrorKind},
6    number::complete::double,
7    sequence::{preceded, separated_pair, terminated},
8    IResult, Parser as _,
9};
10use saluki_context::{origin::OriginTagCardinality, tags::RawTags};
11use saluki_core::data_model::event::metric::*;
12
13use super::{helpers::*, DogstatsdCodecConfiguration, NomParserError};
14
15enum MetricType {
16    Count,
17    Gauge,
18    Set,
19    Timer,
20    Histogram,
21    Distribution,
22}
23
24/// A DogStatsD metric packet.
25pub struct MetricPacket<'a> {
26    pub metric_name: &'a str,
27    pub tags: RawTags<'a>,
28    pub values: MetricValues,
29    pub num_points: u64,
30    pub timestamp: Option<u64>,
31    pub container_id: Option<&'a str>,
32    pub external_data: Option<&'a str>,
33    pub cardinality: Option<OriginTagCardinality>,
34}
35
36#[inline]
37pub fn parse_dogstatsd_metric<'a>(
38    input: &'a [u8], config: &DogstatsdCodecConfiguration,
39) -> IResult<&'a [u8], MetricPacket<'a>> {
40    // We always parse the metric name and value(s) first, where value is both the kind (counter, gauge, etc) and the
41    // actual value itself.
42    let metric_name_parser = if config.permissive {
43        permissive_metric_name
44    } else {
45        ascii_alphanum_and_seps
46    };
47    let (remaining, (metric_name, (metric_type, raw_metric_values))) =
48        separated_pair(metric_name_parser, tag(":"), raw_metric_values).parse(input)?;
49
50    // At this point, we may have some of this additional data, and if so, we also then would have a pipe separator at
51    // the very front, which we'd want to consume before going further.
52    //
53    // After that, we simply split the remaining bytes by the pipe separator, and then try and parse each chunk to see
54    // if it's any of the protocol extensions we know of.
55    let mut maybe_sample_rate = None;
56    let mut maybe_tags = None;
57    let mut maybe_container_id = None;
58    let mut maybe_timestamp = None;
59    let mut maybe_external_data = None;
60    let mut maybe_cardinality = None;
61
62    let remaining = if !remaining.is_empty() {
63        let (mut remaining, _) = tag("|")(remaining)?;
64
65        while let Some((chunk, tail)) = split_at_delimiter(remaining, b'|') {
66            if chunk.is_empty() {
67                break;
68            }
69
70            match chunk[0] {
71                // Sample rate: indicates client-side sampling of this metric which will need to be "reinflated" at some
72                // point downstream to calculate the true metric value.
73                b'@' => {
74                    let (_, sample_rate) =
75                        all_consuming(preceded(tag("@"), map_res(double, SampleRate::try_from))).parse(chunk)?;
76                    maybe_sample_rate = Some(sample_rate);
77                }
78                // Tags: additional tags to be added to the metric.
79                b'#' => {
80                    let (_, tags) = all_consuming(preceded(tag("#"), tags(config))).parse(chunk)?;
81                    maybe_tags = Some(tags);
82                }
83                // Container ID: client-provided container ID for the container that this metric originated from.
84                b'c' if chunk.len() > 1 && chunk[1] == b':' => {
85                    let (_, container_id) = all_consuming(preceded(tag("c:"), container_id)).parse(chunk)?;
86                    maybe_container_id = Some(container_id);
87                }
88                // Timestamp: client-provided timestamp for the metric, relative to the Unix epoch, in seconds.
89                b'T' => {
90                    if config.timestamps {
91                        let (_, timestamp) = all_consuming(preceded(tag("T"), unix_timestamp)).parse(chunk)?;
92                        maybe_timestamp = Some(timestamp);
93                    }
94                }
95                // External Data: client-provided data used for resolving the entity ID that this metric originated from.
96                b'e' if chunk.len() > 1 && chunk[1] == b':' => {
97                    let (_, external_data) = all_consuming(preceded(tag("e:"), external_data)).parse(chunk)?;
98                    maybe_external_data = Some(external_data);
99                }
100                // Cardinality: client-provided cardinality for the metric.
101                b'c' if chunk.starts_with(CARDINALITY_PREFIX) => {
102                    let (_, cardinality) = cardinality(chunk)?;
103                    maybe_cardinality = cardinality;
104                }
105                _ => {
106                    // We don't know what this is, so we just skip it.
107                    //
108                    // TODO: Should we throw an error, warn, or be silently permissive?
109                }
110            }
111
112            remaining = tail;
113        }
114
115        // TODO: Similarly to the above comment, should having any remaining data here cause us to throw an error, warn,
116        // or be silently permissive?
117
118        remaining
119    } else {
120        remaining
121    };
122
123    let (num_points, mut metric_values) = metric_values_from_raw(raw_metric_values, metric_type, maybe_sample_rate)?;
124
125    // If we got a timestamp, apply it to all metric values.
126    if let Some(timestamp) = maybe_timestamp {
127        metric_values.set_timestamp(timestamp);
128    }
129
130    let tags = maybe_tags.unwrap_or_else(RawTags::empty);
131
132    Ok((
133        remaining,
134        MetricPacket {
135            metric_name,
136            tags,
137            values: metric_values,
138            num_points,
139            timestamp: maybe_timestamp,
140            container_id: maybe_container_id,
141            external_data: maybe_external_data,
142            cardinality: maybe_cardinality,
143        },
144    ))
145}
146
147#[inline]
148fn permissive_metric_name(input: &[u8]) -> IResult<&[u8], &str> {
149    // Essentially, any ASCII character that is printable and isn't `:` is allowed here.
150    let valid_char = |c: u8| c > 31 && c < 128 && c != b':';
151    map(take_while1(valid_char), |b| {
152        // SAFETY: We know the bytes in `b` can only be comprised of ASCII characters, which ensures that it's valid to
153        // interpret the bytes directly as UTF-8.
154        unsafe { std::str::from_utf8_unchecked(b) }
155    })
156    .parse(input)
157}
158
159#[inline]
160fn raw_metric_values(input: &[u8]) -> IResult<&[u8], (MetricType, &[u8])> {
161    let (remaining, raw_values) = terminated(take_while1(|b| b != b'|'), tag("|")).parse(input)?;
162    let (remaining, raw_kind) = alt((tag("g"), tag("c"), tag("ms"), tag("h"), tag("s"), tag("d"))).parse(remaining)?;
163
164    // Make sure the raw value(s) are valid UTF-8 before we use them later on.
165    if raw_values.is_empty() || simdutf8::basic::from_utf8(raw_values).is_err() {
166        return Err(nom::Err::Error(Error::new(raw_values, ErrorKind::Verify)));
167    }
168
169    let metric_type = match raw_kind {
170        b"c" => MetricType::Count,
171        b"g" => MetricType::Gauge,
172        b"s" => MetricType::Set,
173        b"ms" => MetricType::Timer,
174        b"h" => MetricType::Histogram,
175        b"d" => MetricType::Distribution,
176        _ => unreachable!("should be constrained by alt parser"),
177    };
178
179    Ok((remaining, (metric_type, raw_values)))
180}
181
182#[inline]
183fn metric_values_from_raw(
184    input: &[u8], metric_type: MetricType, sample_rate: Option<SampleRate>,
185) -> Result<(u64, MetricValues), NomParserError<'_>> {
186    let mut num_points = 0;
187    let floats = FloatIter::new(input).inspect(|_| num_points += 1);
188
189    let values = match metric_type {
190        MetricType::Count => MetricValues::counter_sampled_fallible(floats, sample_rate)?,
191        MetricType::Gauge => MetricValues::gauge_fallible(floats)?,
192        MetricType::Set => {
193            num_points = 1;
194
195            // SAFETY: We've already checked above that `input` is valid UTF-8.
196            let value = unsafe { std::str::from_utf8_unchecked(input) };
197            MetricValues::set(value.to_string())
198        }
199        MetricType::Timer | MetricType::Histogram => MetricValues::histogram_sampled_fallible(floats, sample_rate)?,
200        MetricType::Distribution => MetricValues::distribution_sampled_fallible(floats, sample_rate)?,
201    };
202
203    Ok((num_points, values))
204}
205
206struct FloatIter<'a> {
207    raw_values: &'a [u8],
208}
209
210impl<'a> FloatIter<'a> {
211    fn new(raw_values: &'a [u8]) -> Self {
212        Self { raw_values }
213    }
214}
215
216impl<'a> Iterator for FloatIter<'a> {
217    type Item = Result<f64, NomParserError<'a>>;
218
219    fn next(&mut self) -> Option<Self::Item> {
220        if self.raw_values.is_empty() {
221            return None;
222        }
223
224        let (raw_value, tail) = split_at_delimiter(self.raw_values, b':')?;
225        self.raw_values = tail;
226
227        // SAFETY: The caller that creates `ValueIter` is responsible for ensuring that the entire byte slice is valid
228        // UTF-8.
229        let value_s = unsafe { std::str::from_utf8_unchecked(raw_value) };
230        match value_s.parse::<f64>() {
231            Ok(value) => Some(Ok(value)),
232            Err(_) => Some(Err(nom::Err::Error(Error::new(raw_value, ErrorKind::Float)))),
233        }
234    }
235}
236
237#[cfg(test)]
238mod tests {
239    use proptest::{collection::vec as arb_vec, prelude::*};
240    use saluki_context::{
241        origin::OriginTagCardinality,
242        tags::{SharedTagSet, Tag},
243        Context,
244    };
245    use saluki_core::data_model::event::metric::*;
246
247    use super::{parse_dogstatsd_metric, DogstatsdCodecConfiguration};
248
249    type OptionalNomResult<'input, T> = Result<Option<T>, nom::Err<nom::error::Error<&'input [u8]>>>;
250
251    fn parse_dsd_metric(input: &[u8]) -> OptionalNomResult<'_, Metric> {
252        let default_config = DogstatsdCodecConfiguration::default();
253        parse_dsd_metric_with_conf(input, &default_config)
254    }
255
256    fn parse_dsd_metric_with_conf<'input>(
257        input: &'input [u8], config: &DogstatsdCodecConfiguration,
258    ) -> OptionalNomResult<'input, Metric> {
259        let (remaining, packet) = parse_dogstatsd_metric(input, config)?;
260        assert!(remaining.is_empty());
261
262        let tags = packet.tags.into_iter().map(Tag::from).collect::<SharedTagSet>();
263        let context = Context::from_parts(packet.metric_name, tags);
264
265        Ok(Some(Metric::from_parts(
266            context,
267            packet.values,
268            MetricMetadata::default(),
269        )))
270    }
271
272    #[track_caller]
273    fn check_basic_metric_eq(expected: Metric, actual: Option<Metric>) -> Metric {
274        let actual = actual.expect("event should not have been None");
275        assert_eq!(expected.context(), actual.context());
276        assert_eq!(expected.values(), actual.values());
277        assert_eq!(expected.metadata(), actual.metadata());
278        actual
279    }
280
281    #[test]
282    fn basic_metric() {
283        let name = "my.counter";
284        let value = 1.0;
285        let raw = format!("{}:{}|c", name, value);
286        let expected = Metric::counter(name, value);
287        let actual = parse_dsd_metric(raw.as_bytes()).expect("should not fail to parse");
288        check_basic_metric_eq(expected, actual);
289
290        let name = "my.gauge";
291        let value = 2.0;
292        let raw = format!("{}:{}|g", name, value);
293        let expected = Metric::gauge(name, value);
294        let actual = parse_dsd_metric(raw.as_bytes()).expect("should not fail to parse");
295        check_basic_metric_eq(expected, actual);
296
297        // Special case where we check this for both timers and histograms since we treat them both the same when
298        // parsing.
299        let name = "my.timer_or_histogram";
300        let value = 3.0;
301        for kind in &["ms", "h"] {
302            let raw = format!("{}:{}|{}", name, value, kind);
303            let expected = Metric::histogram(name, value);
304            let actual = parse_dsd_metric(raw.as_bytes()).expect("should not fail to parse");
305            check_basic_metric_eq(expected, actual);
306        }
307
308        let distribution_name = "my.distribution";
309        let distribution_value = 3.0;
310        let distribution_raw = format!("{}:{}|d", distribution_name, distribution_value);
311        let distribution_expected = Metric::distribution(distribution_name, distribution_value);
312        let distribution_actual = parse_dsd_metric(distribution_raw.as_bytes()).expect("should not fail to parse");
313        check_basic_metric_eq(distribution_expected, distribution_actual);
314
315        let set_name = "my.set";
316        let set_value = "value";
317        let set_raw = format!("{}:{}|s", set_name, set_value);
318        let set_expected = Metric::set(set_name, set_value);
319        let set_actual = parse_dsd_metric(set_raw.as_bytes()).expect("should not fail to parse");
320        check_basic_metric_eq(set_expected, set_actual);
321    }
322
323    #[test]
324    fn metric_tags() {
325        let name = "my.counter";
326        let value = 1.0;
327        let tags = ["tag1", "tag2"];
328        let raw = format!("{}:{}|c|#{}", name, value, tags.join(","));
329        let expected = Metric::counter((name, &tags[..]), value);
330
331        let actual = parse_dsd_metric(raw.as_bytes()).expect("should not fail to parse");
332        check_basic_metric_eq(expected, actual);
333    }
334
335    #[test]
336    fn metric_sample_rate() {
337        let name = "my.counter";
338        let value = 1.0;
339        let sample_rate = 0.5;
340        let raw = format!("{}:{}|c|@{}", name, value, sample_rate);
341
342        let value_sample_rate_adjusted = value * (1.0 / sample_rate);
343        let expected = Metric::counter(name, value_sample_rate_adjusted);
344
345        let actual = parse_dsd_metric(raw.as_bytes()).expect("should not fail to parse");
346        let actual = check_basic_metric_eq(expected, actual);
347        let values = match actual.values() {
348            MetricValues::Counter(values) => values
349                .into_iter()
350                .map(|(ts, v)| (ts.map(|v| v.get()).unwrap_or(0), v))
351                .collect::<Vec<_>>(),
352            _ => panic!("expected counter values"),
353        };
354
355        assert_eq!(values.len(), 1);
356        assert_eq!(values[0], (0, value_sample_rate_adjusted));
357    }
358
359    #[test]
360    fn metric_container_id() {
361        let name = "my.counter";
362        let value = 1.0;
363        let container_id = "abcdef123456";
364        let raw = format!("{}:{}|c|c:{}", name, value, container_id);
365        let expected = Metric::counter(name, value);
366
367        let actual = parse_dsd_metric(raw.as_bytes()).expect("should not fail to parse");
368        check_basic_metric_eq(expected, actual);
369
370        let config = DogstatsdCodecConfiguration::default();
371        let (_, packet) = parse_dogstatsd_metric(raw.as_bytes(), &config).expect("should not fail to parse");
372        assert_eq!(packet.container_id, Some(container_id));
373    }
374
375    #[test]
376    fn metric_unix_timestamp() {
377        let name = "my.counter";
378        let value = 1.0;
379        let timestamp = 1234567890;
380        let raw = format!("{}:{}|c|T{}", name, value, timestamp);
381        let mut expected = Metric::counter(name, value);
382        expected.values_mut().set_timestamp(timestamp);
383
384        let actual = parse_dsd_metric(raw.as_bytes()).expect("should not fail to parse");
385        check_basic_metric_eq(expected, actual);
386    }
387
388    #[test]
389    fn metric_external_data() {
390        let name = "my.counter";
391        let value = 1.0;
392        let external_data = "it-false,cn-redis,pu-810fe89d-da47-410b-8979-9154a40f8183";
393        let raw = format!("{}:{}|c|e:{}", name, value, external_data);
394        let expected = Metric::counter(name, value);
395
396        let actual = parse_dsd_metric(raw.as_bytes()).expect("should not fail to parse");
397        check_basic_metric_eq(expected, actual);
398
399        let config = DogstatsdCodecConfiguration::default();
400        let (_, packet) = parse_dogstatsd_metric(raw.as_bytes(), &config).expect("should not fail to parse");
401        assert_eq!(packet.external_data, Some(external_data));
402    }
403
404    #[test]
405    fn metric_cardinality() {
406        let name = "my.counter";
407        let value = 1.0;
408        let cardinality = "high";
409        let raw = format!("{}:{}|c|card:{}", name, value, cardinality);
410        let expected = Metric::counter(name, value);
411
412        let actual = parse_dsd_metric(raw.as_bytes()).expect("should not fail to parse");
413        check_basic_metric_eq(expected, actual);
414
415        let config = DogstatsdCodecConfiguration::default();
416        let (_, packet) = parse_dogstatsd_metric(raw.as_bytes(), &config).expect("should not fail to parse");
417        assert_eq!(packet.cardinality, Some(OriginTagCardinality::High));
418    }
419
420    #[test]
421    fn metric_multiple_extensions() {
422        let name = "my.counter";
423        let value = 1.0;
424        let sample_rate = 0.5;
425        let tags = ["tag1", "tag2"];
426        let container_id = "abcdef123456";
427        let external_data = "it-false,cn-redis,pu-810fe89d-da47-410b-8979-9154a40f8183";
428        let cardinality = "orchestrator";
429        let timestamp = 1234567890;
430        let raw = format!(
431            "{}:{}|c|#{}|@{}|c:{}|e:{}|card:{}|T{}",
432            name,
433            value,
434            tags.join(","),
435            sample_rate,
436            container_id,
437            external_data,
438            cardinality,
439            timestamp
440        );
441
442        let value_sample_rate_adjusted = value * (1.0 / sample_rate);
443        let mut expected = Metric::counter((name, &tags[..]), value_sample_rate_adjusted);
444        expected.values_mut().set_timestamp(timestamp);
445
446        let actual = parse_dsd_metric(raw.as_bytes()).expect("should not fail to parse");
447        let actual = check_basic_metric_eq(expected, actual);
448        let values = match actual.values() {
449            MetricValues::Counter(values) => values
450                .into_iter()
451                .map(|(ts, v)| (ts.map(|v| v.get()).unwrap_or(0), v))
452                .collect::<Vec<_>>(),
453            _ => panic!("expected counter values"),
454        };
455
456        assert_eq!(values.len(), 1);
457        assert_eq!(values[0], (timestamp, value_sample_rate_adjusted));
458
459        let config = DogstatsdCodecConfiguration::default();
460        let (_, packet) = parse_dogstatsd_metric(raw.as_bytes(), &config).expect("should not fail to parse");
461        assert_eq!(packet.container_id, Some(container_id));
462        assert_eq!(packet.external_data, Some(external_data));
463        assert_eq!(packet.cardinality, Some(OriginTagCardinality::Orchestrator));
464    }
465
466    #[test]
467    fn multivalue_metrics() {
468        let name = "my.counter";
469        let values = [1.0, 2.0, 3.0];
470        let values_stringified = values.iter().map(|v| v.to_string()).collect::<Vec<_>>();
471        let raw = format!("{}:{}|c", name, values_stringified.join(":"));
472        let expected = Metric::counter(name, values);
473        let actual = parse_dsd_metric(raw.as_bytes()).expect("should not fail to parse");
474        check_basic_metric_eq(expected, actual);
475
476        let name = "my.gauge";
477        let values = [42.0, 5.0, -18.0];
478        let values_stringified = values.iter().map(|v| v.to_string()).collect::<Vec<_>>();
479        let raw = format!("{}:{}|g", name, values_stringified.join(":"));
480        let expected = Metric::gauge(name, values);
481        let actual = parse_dsd_metric(raw.as_bytes()).expect("should not fail to parse");
482        check_basic_metric_eq(expected, actual);
483
484        // Special case where we check this for both timers and histograms since we treat them both the same when
485        // parsing.
486        //
487        // Additionally, we have an optimization to return a single distribution metric from multi-value payloads, so we
488        // also check here that only one metric is generated for multi-value timers/histograms/distributions.
489        let name = "my.timer_or_histogram";
490        let values = [27.5, 4.20, 80.085];
491        let values_stringified = values.iter().map(|v| v.to_string()).collect::<Vec<_>>();
492        for kind in &["ms", "h"] {
493            let raw = format!("{}:{}|{}", name, values_stringified.join(":"), kind);
494            let expected = Metric::histogram(name, values);
495            let actual = parse_dsd_metric(raw.as_bytes()).expect("should not fail to parse");
496            check_basic_metric_eq(expected, actual);
497        }
498
499        let name = "my.distribution";
500        let raw = format!("{}:{}|d", name, values_stringified.join(":"));
501        let expected = Metric::distribution(name, values);
502        let actual = parse_dsd_metric(raw.as_bytes()).expect("should not fail to parse");
503        check_basic_metric_eq(expected, actual);
504    }
505
506    #[test]
507    fn respects_maximum_tag_count() {
508        let input = b"foo:1|c|#tag1:value1,tag2:value2,tag3:value3";
509
510        let cases = [3, 2, 1];
511        for max_tag_count in cases {
512            let config = DogstatsdCodecConfiguration::default().with_maximum_tag_count(max_tag_count);
513
514            let metric = parse_dsd_metric_with_conf(input, &config)
515                .expect("should not fail to parse")
516                .expect("should not fail to intern");
517            assert_eq!(metric.context().tags().len(), max_tag_count);
518        }
519    }
520
521    #[test]
522    fn respects_maximum_tag_length() {
523        let input = b"foo:1|c|#tag1:short,tag2:medium,tag3:longlong";
524
525        let cases = [6, 5, 4];
526        for max_tag_length in cases {
527            let config = DogstatsdCodecConfiguration::default().with_maximum_tag_length(max_tag_length);
528
529            let metric = parse_dsd_metric_with_conf(input, &config)
530                .expect("should not fail to parse")
531                .expect("should not fail to intern");
532            for tag in metric.context().tags().into_iter() {
533                assert!(tag.len() <= max_tag_length);
534            }
535        }
536    }
537
538    #[test]
539    fn respects_read_timestamps() {
540        let input = b"foo:1|c|T1234567890";
541
542        let config = DogstatsdCodecConfiguration::default().with_timestamps(false);
543
544        let metric = parse_dsd_metric_with_conf(input, &config)
545            .expect("should not fail to parse")
546            .expect("should not fail to intern");
547
548        let value_timestamps = match metric.values() {
549            MetricValues::Counter(values) => values
550                .into_iter()
551                .map(|(ts, _)| ts.map(|v| v.get()).unwrap_or(0))
552                .collect::<Vec<_>>(),
553            _ => panic!("expected counter values"),
554        };
555
556        assert_eq!(value_timestamps.len(), 1);
557        assert_eq!(value_timestamps[0], 0);
558    }
559
560    #[test]
561    fn permissive_mode() {
562        let payload = b"codeheap 'non-nmethods'.usage:0.3054|g|#env:dev,service:foobar,datacenter:localhost.dev";
563
564        let config = DogstatsdCodecConfiguration::default().with_permissive_mode(true);
565        match parse_dsd_metric_with_conf(payload, &config) {
566            Ok(result) => assert!(result.is_some(), "should not fail to materialize metric after decoding"),
567            Err(e) => panic!("should not have errored: {:?}", e),
568        }
569    }
570
571    proptest! {
572        #![proptest_config(ProptestConfig::with_cases(1000))]
573        #[test]
574        fn property_test_malicious_input_non_exhaustive(input in arb_vec(0..255u8, 0..1000)) {
575            // We're testing that the parser is resilient to malicious input, which means that it should not panic or
576            // crash when given input that's not well-formed.
577            //
578            // As this is a property test, it is _not_ exhaustive but generally should catch simple issues that manage
579            // to escape the unit tests. This is left here for the sole reason of incrementally running this every time
580            // all tests are run, in the hopes of potentially catching an issue that might have been missed.
581            //
582            // TODO: True exhaustive-style testing a la afl/honggfuzz.
583            let _ = parse_dsd_metric(&input);
584        }
585    }
586}