Skip to main content

saluki_io/deser/codec/dogstatsd/
metric.rs

1use nom::{
2    branch::alt,
3    bytes::complete::{tag, take_while1},
4    combinator::{all_consuming, map, map_res},
5    error::{Error, ErrorKind},
6    number::complete::double,
7    sequence::{preceded, separated_pair, terminated},
8    IResult, Parser as _,
9};
10use saluki_context::{origin::OriginTagCardinality, tags::RawTags};
11use saluki_core::data_model::event::metric::*;
12use tracing::{debug, warn};
13
14use super::{helpers::*, DogStatsDCodecConfiguration, NomParserError};
15
16enum MetricType {
17    Count,
18    Gauge,
19    Set,
20    Timer,
21    Histogram,
22    Distribution,
23}
24
25/// A DogStatsD metric packet.
26///
27/// See the [DogStatsD datagram format][datagram] reference for the wire format and the protocol versions that
28/// introduced each field.
29///
30/// [datagram]: https://docs.datadoghq.com/extend/dogstatsd/datagram_shell/?tab=metrics
31pub struct MetricPacket<'a> {
32    /// Name of the metric.
33    pub metric_name: &'a str,
34
35    /// Tags attached to the metric.
36    pub tags: RawTags<'a>,
37
38    /// The metric kind (counter, gauge, rate, etc.) and its sample points.
39    pub values: MetricValues,
40
41    /// Number of sample points represented by `values`.
42    pub num_points: u64,
43
44    /// Optional Unix timestamp for the sample, in seconds (protocol v1.3).
45    pub timestamp: Option<u64>,
46
47    /// Local Data attached to the metric, carried in the `c:` field (protocol v1.2, extended in v1.4).
48    ///
49    /// Identifies the workload that emitted the metric. Carries a container ID (`ci-<id>`) or, when unavailable, a
50    /// cgroup node inode (`in-<inode>`).
51    pub local_data: Option<&'a str>,
52
53    /// External Data attached to the metric, carried in the `e:` field (protocol v1.5).
54    ///
55    /// Used to convey a richer blob of workload identity data resolved by the receiver.
56    pub external_data: Option<&'a str>,
57
58    /// Cardinality hint for origin tag enrichment, carried in the `card:` field (protocol v1.6).
59    ///
60    /// Specifies which origin tags the receiver should attach to the metric.
61    pub cardinality: Option<OriginTagCardinality>,
62
63    /// Unit for this metric, if any.
64    pub unit: Option<&'static str>,
65}
66
67#[inline]
68pub fn parse_dogstatsd_metric<'a>(
69    input: &'a [u8], config: &DogStatsDCodecConfiguration,
70) -> IResult<&'a [u8], MetricPacket<'a>> {
71    // We always parse the metric name and value(s) first, where value is both the kind (counter, gauge, etc) and the
72    // actual value itself.
73    let metric_name_parser = if config.permissive {
74        permissive_metric_name
75    } else {
76        ascii_alphanum_and_seps
77    };
78    let (remaining, (metric_name, (metric_type, raw_metric_values))) =
79        separated_pair(metric_name_parser, tag(":"), raw_metric_values).parse(input)?;
80
81    // At this point, we may have some of this additional data, and if so, we also then would have a pipe separator at
82    // the very front, which we'd want to consume before going further.
83    //
84    // After that, we simply split the remaining bytes by the pipe separator, and then try and parse each chunk to see
85    // if it's any of the protocol extensions we know of.
86    let mut maybe_sample_rate = None;
87    let mut maybe_tags = None;
88    let mut maybe_local_data = None;
89    let mut maybe_timestamp = None;
90    let mut maybe_external_data = None;
91    let mut maybe_cardinality = None;
92
93    let remaining = if !remaining.is_empty() {
94        let (mut remaining, _) = tag("|")(remaining)?;
95
96        while let Some((chunk, tail)) = split_at_delimiter(remaining, b'|') {
97            if chunk.is_empty() {
98                break;
99            }
100
101            match chunk[0] {
102                // Sample rate: indicates client-side sampling of this metric which will need to be "reinflated" at some
103                // point downstream to calculate the true metric value.
104                b'@' => {
105                    let (_, sample_rate) =
106                        all_consuming(preceded(tag("@"), map_res(double, sample_rate(metric_name, config))))
107                            .parse(chunk)?;
108
109                    maybe_sample_rate = Some(sample_rate);
110                }
111                // Tags: additional tags to be added to the metric.
112                b'#' => {
113                    let (_, tags) = all_consuming(preceded(tag("#"), tags(config))).parse(chunk)?;
114                    maybe_tags = Some(tags);
115                }
116                // Local Data: client-provided data used for resolving the entity ID that this metric originated from.
117                b'c' if chunk.len() > 1 && chunk[1] == b':' => {
118                    if config.client_origin_detection {
119                        let (_, local_data) = all_consuming(preceded(tag("c:"), local_data)).parse(chunk)?;
120                        maybe_local_data = Some(local_data);
121                    }
122                }
123                // Timestamp: client-provided timestamp for the metric, relative to the Unix epoch, in seconds.
124                b'T' => {
125                    if config.timestamps {
126                        let (_, timestamp) = all_consuming(preceded(tag("T"), unix_timestamp)).parse(chunk)?;
127                        maybe_timestamp = Some(timestamp);
128                    }
129                }
130                // External Data: client-provided data used for resolving the entity ID that this metric originated from.
131                b'e' if chunk.len() > 1 && chunk[1] == b':' => {
132                    if config.client_origin_detection {
133                        let (_, external_data) = all_consuming(preceded(tag("e:"), external_data)).parse(chunk)?;
134                        maybe_external_data = Some(external_data);
135                    }
136                }
137                // Cardinality: client-provided cardinality for the metric.
138                b'c' if chunk.starts_with(CARDINALITY_PREFIX) => {
139                    if config.client_origin_detection {
140                        let (_, cardinality) = cardinality(chunk)?;
141                        maybe_cardinality = cardinality;
142                    }
143                }
144                _ => {
145                    // We don't know what this is, so we just skip it.
146                    //
147                    // TODO: Should we throw an error, warn, or be silently permissive?
148                }
149            }
150
151            remaining = tail;
152        }
153
154        // TODO: Similarly to the above comment, should having any remaining data here cause us to throw an error, warn,
155        // or be silently permissive?
156
157        remaining
158    } else {
159        remaining
160    };
161
162    // Capture the unit from the metric type before it is erased into a MetricValues variant.
163    // Only timing metrics carry an implicit unit; all other types have no unit.
164    let maybe_unit = if matches!(metric_type, MetricType::Timer) {
165        Some("millisecond")
166    } else {
167        None
168    };
169
170    let (num_points, mut metric_values) = metric_values_from_raw(raw_metric_values, metric_type, maybe_sample_rate)?;
171
172    // If we got a timestamp, apply it to all metric values.
173    if let Some(timestamp) = maybe_timestamp {
174        metric_values.set_timestamp(timestamp);
175    }
176
177    let tags = maybe_tags.unwrap_or_else(RawTags::empty);
178
179    Ok((
180        remaining,
181        MetricPacket {
182            metric_name,
183            tags,
184            values: metric_values,
185            num_points,
186            timestamp: maybe_timestamp,
187            local_data: maybe_local_data,
188            external_data: maybe_external_data,
189            cardinality: maybe_cardinality,
190            unit: maybe_unit,
191        },
192    ))
193}
194
195#[inline]
196fn permissive_metric_name(input: &[u8]) -> IResult<&[u8], &str> {
197    // Essentially, any ASCII character that is printable and isn't `:` is allowed here.
198    let valid_char = |c: u8| c > 31 && c < 128 && c != b':';
199    map(take_while1(valid_char), |b| {
200        // SAFETY: We know the bytes in `b` can only be comprised of ASCII characters, which ensures that it's valid to
201        // interpret the bytes directly as UTF-8.
202        unsafe { std::str::from_utf8_unchecked(b) }
203    })
204    .parse(input)
205}
206
207#[inline]
208fn sample_rate<'a>(
209    metric_name: &'a str, config: &'a DogStatsDCodecConfiguration,
210) -> impl Fn(f64) -> Result<SampleRate, &'static str> + 'a {
211    let minimum_sample_rate = config.minimum_sample_rate;
212
213    move |mut raw_sample_rate| {
214        if raw_sample_rate < minimum_sample_rate {
215            raw_sample_rate = minimum_sample_rate;
216            warn!(
217                "Sample rate for metric '{}' is below minimum of {}. Clamping to minimum.",
218                metric_name, minimum_sample_rate
219            );
220        }
221        SampleRate::try_from(raw_sample_rate)
222    }
223}
224
225#[inline]
226fn raw_metric_values(input: &[u8]) -> IResult<&[u8], (MetricType, &[u8])> {
227    let (remaining, raw_values) = terminated(take_while1(|b| b != b'|'), tag("|")).parse(input)?;
228    let (remaining, raw_kind) = alt((tag("g"), tag("c"), tag("ms"), tag("h"), tag("s"), tag("d"))).parse(remaining)?;
229
230    // Make sure the raw value(s) are valid UTF-8 before we use them later on.
231    if raw_values.is_empty() || simdutf8::basic::from_utf8(raw_values).is_err() {
232        return Err(nom::Err::Error(Error::new(raw_values, ErrorKind::Verify)));
233    }
234
235    let metric_type = match raw_kind {
236        b"c" => MetricType::Count,
237        b"g" => MetricType::Gauge,
238        b"s" => MetricType::Set,
239        b"ms" => MetricType::Timer,
240        b"h" => MetricType::Histogram,
241        b"d" => MetricType::Distribution,
242        _ => unreachable!("should be constrained by alt parser"),
243    };
244
245    Ok((remaining, (metric_type, raw_values)))
246}
247
248#[inline]
249fn metric_values_from_raw(
250    input: &[u8], metric_type: MetricType, sample_rate: Option<SampleRate>,
251) -> Result<(u64, MetricValues), NomParserError<'_>> {
252    let mut num_points = 0;
253    let floats = FloatIter::new(input).inspect(|_| num_points += 1);
254
255    let values = match metric_type {
256        MetricType::Count => MetricValues::counter_sampled_fallible(floats, sample_rate)?,
257        MetricType::Gauge => MetricValues::gauge_fallible(floats)?,
258        MetricType::Set => {
259            num_points = 1;
260
261            // SAFETY: We've already checked above that `input` is valid UTF-8.
262            let value = unsafe { std::str::from_utf8_unchecked(input) };
263            MetricValues::set(value.to_string())
264        }
265        MetricType::Timer | MetricType::Histogram => MetricValues::histogram_sampled_fallible(floats, sample_rate)?,
266        MetricType::Distribution => MetricValues::distribution_sampled_fallible(floats, sample_rate)?,
267    };
268
269    Ok((num_points, values))
270}
271
272struct FloatIter<'a> {
273    raw_values: &'a [u8],
274}
275
276impl<'a> FloatIter<'a> {
277    fn new(raw_values: &'a [u8]) -> Self {
278        Self { raw_values }
279    }
280}
281
282impl<'a> Iterator for FloatIter<'a> {
283    type Item = Result<f64, NomParserError<'a>>;
284
285    fn next(&mut self) -> Option<Self::Item> {
286        loop {
287            if self.raw_values.is_empty() {
288                return None;
289            }
290
291            let (raw_value, tail) = split_at_delimiter(self.raw_values, b':')?;
292            self.raw_values = tail;
293
294            // SAFETY: The caller that creates `ValueIter` is responsible for ensuring that the entire byte slice is valid
295            // UTF-8.
296            let value_s = unsafe { std::str::from_utf8_unchecked(raw_value) };
297            match value_s.parse::<f64>() {
298                Ok(value) if value.is_finite() => return Some(Ok(value)),
299                Ok(_) => {
300                    debug!(value = value_s, "Dropping non-finite DogStatsD metric value.");
301                }
302                Err(_) => return Some(Err(nom::Err::Error(Error::new(raw_value, ErrorKind::Float)))),
303            }
304        }
305    }
306}
307
308#[cfg(test)]
309mod tests {
310    use proptest::{collection::vec as arb_vec, prelude::*};
311    use saluki_context::{
312        origin::OriginTagCardinality,
313        tags::{SharedTagSet, Tag},
314        Context,
315    };
316    use saluki_core::data_model::event::metric::*;
317
318    use super::{parse_dogstatsd_metric, DogStatsDCodecConfiguration};
319
320    type OptionalNomResult<'input, T> = Result<Option<T>, nom::Err<nom::error::Error<&'input [u8]>>>;
321
322    fn parse_dsd_metric(input: &[u8]) -> OptionalNomResult<'_, Metric> {
323        let default_config = DogStatsDCodecConfiguration::default();
324        parse_dsd_metric_with_conf(input, &default_config)
325    }
326
327    fn parse_dsd_metric_with_conf<'input>(
328        input: &'input [u8], config: &DogStatsDCodecConfiguration,
329    ) -> OptionalNomResult<'input, Metric> {
330        let (remaining, packet) = parse_dogstatsd_metric(input, config)?;
331        assert!(remaining.is_empty());
332
333        let tags = packet.tags.into_iter().map(Tag::from).collect::<SharedTagSet>();
334        let context = Context::from_parts(packet.metric_name, tags);
335
336        Ok(Some(Metric::from_parts(
337            context,
338            packet.values,
339            MetricMetadata::default(),
340        )))
341    }
342
343    #[track_caller]
344    fn check_basic_metric_eq(expected: Metric, actual: Option<Metric>) -> Metric {
345        let actual = actual.expect("event should not have been None");
346        assert_eq!(expected.context(), actual.context());
347        assert_eq!(expected.values(), actual.values());
348        assert_eq!(expected.metadata(), actual.metadata());
349        actual
350    }
351
352    #[test]
353    fn basic_metric() {
354        let name = "my.counter";
355        let value = 1.0;
356        let raw = format!("{}:{}|c", name, value);
357        let expected = Metric::counter(name, value);
358        let actual = parse_dsd_metric(raw.as_bytes()).expect("should not fail to parse");
359        check_basic_metric_eq(expected, actual);
360
361        let name = "my.gauge";
362        let value = 2.0;
363        let raw = format!("{}:{}|g", name, value);
364        let expected = Metric::gauge(name, value);
365        let actual = parse_dsd_metric(raw.as_bytes()).expect("should not fail to parse");
366        check_basic_metric_eq(expected, actual);
367
368        // Special case where we check this for both timers and histograms since we treat them both the same when
369        // parsing.
370        let name = "my.timer_or_histogram";
371        let value = 3.0;
372        for kind in &["ms", "h"] {
373            let raw = format!("{}:{}|{}", name, value, kind);
374            let expected = Metric::histogram(name, value);
375            let actual = parse_dsd_metric(raw.as_bytes()).expect("should not fail to parse");
376            check_basic_metric_eq(expected, actual);
377        }
378
379        let distribution_name = "my.distribution";
380        let distribution_value = 3.0;
381        let distribution_raw = format!("{}:{}|d", distribution_name, distribution_value);
382        let distribution_expected = Metric::distribution(distribution_name, distribution_value);
383        let distribution_actual = parse_dsd_metric(distribution_raw.as_bytes()).expect("should not fail to parse");
384        check_basic_metric_eq(distribution_expected, distribution_actual);
385
386        let set_name = "my.set";
387        let set_value = "value";
388        let set_raw = format!("{}:{}|s", set_name, set_value);
389        let set_expected = Metric::set(set_name, set_value);
390        let set_actual = parse_dsd_metric(set_raw.as_bytes()).expect("should not fail to parse");
391        check_basic_metric_eq(set_expected, set_actual);
392    }
393
394    #[test]
395    fn metric_unit() {
396        let config = DogStatsDCodecConfiguration::default();
397
398        // Timing metrics must carry an implicit millisecond unit.
399        let (_, packet) = parse_dogstatsd_metric(b"my.timer:1.0|ms", &config).expect("should not fail to parse");
400        assert_eq!(packet.unit, Some("millisecond"));
401
402        // All other metric types must have no unit.
403        for kind in &["c", "g", "h", "d", "s"] {
404            let raw = format!("my.metric:1.0|{}", kind);
405            let (_, packet) = parse_dogstatsd_metric(raw.as_bytes(), &config).expect("should not fail to parse");
406            assert_eq!(packet.unit, None, "expected no unit for metric type '{}'", kind);
407        }
408    }
409
410    #[test]
411    fn metric_tags() {
412        let name = "my.counter";
413        let value = 1.0;
414        let tags = ["tag1", "tag2"];
415        let raw = format!("{}:{}|c|#{}", name, value, tags.join(","));
416        let expected = Metric::counter((name, &tags[..]), value);
417
418        let actual = parse_dsd_metric(raw.as_bytes()).expect("should not fail to parse");
419        check_basic_metric_eq(expected, actual);
420    }
421
422    #[test]
423    fn metric_sample_rate() {
424        let name = "my.counter";
425        let value = 1.0;
426        let sample_rate = 0.5;
427        let raw = format!("{}:{}|c|@{}", name, value, sample_rate);
428
429        let value_sample_rate_adjusted = value * (1.0 / sample_rate);
430        let expected = Metric::counter(name, value_sample_rate_adjusted);
431
432        let actual = parse_dsd_metric(raw.as_bytes()).expect("should not fail to parse");
433        let actual = check_basic_metric_eq(expected, actual);
434        let values = match actual.values() {
435            MetricValues::Counter(values) => values
436                .into_iter()
437                .map(|(ts, v)| (ts.map(|v| v.get()).unwrap_or(0), v))
438                .collect::<Vec<_>>(),
439            _ => panic!("expected counter values"),
440        };
441
442        assert_eq!(values.len(), 1);
443        assert_eq!(values[0], (0, value_sample_rate_adjusted));
444    }
445
446    #[test]
447    fn metric_local_data() {
448        let name = "my.counter";
449        let value = 1.0;
450        let local_data = "abcdef123456";
451        let raw = format!("{}:{}|c|c:{}", name, value, local_data);
452        let expected = Metric::counter(name, value);
453
454        let actual = parse_dsd_metric(raw.as_bytes()).expect("should not fail to parse");
455        check_basic_metric_eq(expected, actual);
456
457        // We need client_origin_detection on in order to parse local_data, external_data, and cardinality fields
458        let config = DogStatsDCodecConfiguration::default().with_client_origin_detection(true);
459        let (_, packet) = parse_dogstatsd_metric(raw.as_bytes(), &config).expect("should not fail to parse");
460        assert_eq!(packet.local_data, Some(local_data));
461    }
462
463    #[test]
464    fn metric_unix_timestamp() {
465        let name = "my.counter";
466        let value = 1.0;
467        let timestamp = 1234567890;
468        let raw = format!("{}:{}|c|T{}", name, value, timestamp);
469        let mut expected = Metric::counter(name, value);
470        expected.values_mut().set_timestamp(timestamp);
471
472        let actual = parse_dsd_metric(raw.as_bytes()).expect("should not fail to parse");
473        check_basic_metric_eq(expected, actual);
474    }
475
476    #[test]
477    fn metric_external_data() {
478        let name = "my.counter";
479        let value = 1.0;
480        let external_data = "it-false,cn-redis,pu-810fe89d-da47-410b-8979-9154a40f8183";
481        let raw = format!("{}:{}|c|e:{}", name, value, external_data);
482        let expected = Metric::counter(name, value);
483
484        let actual = parse_dsd_metric(raw.as_bytes()).expect("should not fail to parse");
485        check_basic_metric_eq(expected, actual);
486
487        // We need client_origin_detection on in order to parse local_data, external_data, and cardinality fields
488        let config = DogStatsDCodecConfiguration::default().with_client_origin_detection(true);
489        let (_, packet) = parse_dogstatsd_metric(raw.as_bytes(), &config).expect("should not fail to parse");
490        assert_eq!(packet.external_data, Some(external_data));
491    }
492
493    #[test]
494    fn metric_cardinality() {
495        let name = "my.counter";
496        let value = 1.0;
497        let cardinality = "high";
498        let raw = format!("{}:{}|c|card:{}", name, value, cardinality);
499        let expected = Metric::counter(name, value);
500
501        let actual = parse_dsd_metric(raw.as_bytes()).expect("should not fail to parse");
502        check_basic_metric_eq(expected, actual);
503
504        // We need client_origin_detection on in order to parse local_data, external_data, and cardinality fields
505        let config = DogStatsDCodecConfiguration::default().with_client_origin_detection(true);
506        let (_, packet) = parse_dogstatsd_metric(raw.as_bytes(), &config).expect("should not fail to parse");
507        assert_eq!(packet.cardinality, Some(OriginTagCardinality::High));
508    }
509
510    #[test]
511    fn metric_multiple_extensions() {
512        let name = "my.counter";
513        let value = 1.0;
514        let sample_rate = 0.5;
515        let tags = ["tag1", "tag2"];
516        let local_data = "abcdef123456";
517        let external_data = "it-false,cn-redis,pu-810fe89d-da47-410b-8979-9154a40f8183";
518        let cardinality = "orchestrator";
519        let timestamp = 1234567890;
520        let raw = format!(
521            "{}:{}|c|#{}|@{}|c:{}|e:{}|card:{}|T{}",
522            name,
523            value,
524            tags.join(","),
525            sample_rate,
526            local_data,
527            external_data,
528            cardinality,
529            timestamp
530        );
531
532        let value_sample_rate_adjusted = value * (1.0 / sample_rate);
533        let mut expected = Metric::counter((name, &tags[..]), value_sample_rate_adjusted);
534        expected.values_mut().set_timestamp(timestamp);
535
536        let actual = parse_dsd_metric(raw.as_bytes()).expect("should not fail to parse");
537        let actual = check_basic_metric_eq(expected, actual);
538        let values = match actual.values() {
539            MetricValues::Counter(values) => values
540                .into_iter()
541                .map(|(ts, v)| (ts.map(|v| v.get()).unwrap_or(0), v))
542                .collect::<Vec<_>>(),
543            _ => panic!("expected counter values"),
544        };
545
546        assert_eq!(values.len(), 1);
547        assert_eq!(values[0], (timestamp, value_sample_rate_adjusted));
548
549        // We need client_origin_detection on in order to parse local_data, external_data, and cardinality fields
550        let config = DogStatsDCodecConfiguration::default().with_client_origin_detection(true);
551        let (_, packet) = parse_dogstatsd_metric(raw.as_bytes(), &config).expect("should not fail to parse");
552        assert_eq!(packet.local_data, Some(local_data));
553        assert_eq!(packet.external_data, Some(external_data));
554        assert_eq!(packet.cardinality, Some(OriginTagCardinality::Orchestrator));
555    }
556
557    #[test]
558    fn multivalue_metrics() {
559        let name = "my.counter";
560        let values = [1.0, 2.0, 3.0];
561        let values_stringified = values.iter().map(|v| v.to_string()).collect::<Vec<_>>();
562        let raw = format!("{}:{}|c", name, values_stringified.join(":"));
563        let expected = Metric::counter(name, values);
564        let actual = parse_dsd_metric(raw.as_bytes()).expect("should not fail to parse");
565        check_basic_metric_eq(expected, actual);
566
567        let name = "my.gauge";
568        let values = [42.0, 5.0, -18.0];
569        let values_stringified = values.iter().map(|v| v.to_string()).collect::<Vec<_>>();
570        let raw = format!("{}:{}|g", name, values_stringified.join(":"));
571        let expected = Metric::gauge(name, values);
572        let actual = parse_dsd_metric(raw.as_bytes()).expect("should not fail to parse");
573        check_basic_metric_eq(expected, actual);
574
575        // Special case where we check this for both timers and histograms since we treat them both the same when
576        // parsing.
577        //
578        // Additionally, we have an optimization to return a single distribution metric from multi-value payloads, so we
579        // also check here that only one metric is generated for multi-value timers/histograms/distributions.
580        let name = "my.timer_or_histogram";
581        let values = [27.5, 4.20, 80.085];
582        let values_stringified = values.iter().map(|v| v.to_string()).collect::<Vec<_>>();
583        for kind in &["ms", "h"] {
584            let raw = format!("{}:{}|{}", name, values_stringified.join(":"), kind);
585            let expected = Metric::histogram(name, values);
586            let actual = parse_dsd_metric(raw.as_bytes()).expect("should not fail to parse");
587            check_basic_metric_eq(expected, actual);
588        }
589
590        let name = "my.distribution";
591        let raw = format!("{}:{}|d", name, values_stringified.join(":"));
592        let expected = Metric::distribution(name, values);
593        let actual = parse_dsd_metric(raw.as_bytes()).expect("should not fail to parse");
594        check_basic_metric_eq(expected, actual);
595    }
596
597    #[test]
598    fn respects_maximum_tag_count() {
599        let input = b"foo:1|c|#tag1:value1,tag2:value2,tag3:value3";
600
601        let cases = [3, 2, 1];
602        for max_tag_count in cases {
603            let config = DogStatsDCodecConfiguration::default().with_maximum_tag_count(max_tag_count);
604
605            let metric = parse_dsd_metric_with_conf(input, &config)
606                .expect("should not fail to parse")
607                .expect("should not fail to intern");
608            assert_eq!(metric.context().tags().len(), max_tag_count);
609        }
610    }
611
612    #[test]
613    fn respects_maximum_tag_length() {
614        let input = b"foo:1|c|#tag1:short,tag2:medium,tag3:longlong";
615
616        let cases = [6, 5, 4];
617        for max_tag_length in cases {
618            let config = DogStatsDCodecConfiguration::default().with_maximum_tag_length(max_tag_length);
619
620            let metric = parse_dsd_metric_with_conf(input, &config)
621                .expect("should not fail to parse")
622                .expect("should not fail to intern");
623            for tag in metric.context().tags().into_iter() {
624                assert!(tag.len() <= max_tag_length);
625            }
626        }
627    }
628
629    #[test]
630    fn respects_read_timestamps() {
631        let input = b"foo:1|c|T1234567890";
632
633        let config = DogStatsDCodecConfiguration::default().with_timestamps(false);
634
635        let metric = parse_dsd_metric_with_conf(input, &config)
636            .expect("should not fail to parse")
637            .expect("should not fail to intern");
638
639        let value_timestamps = match metric.values() {
640            MetricValues::Counter(values) => values
641                .into_iter()
642                .map(|(ts, _)| ts.map(|v| v.get()).unwrap_or(0))
643                .collect::<Vec<_>>(),
644            _ => panic!("expected counter values"),
645        };
646
647        assert_eq!(value_timestamps.len(), 1);
648        assert_eq!(value_timestamps[0], 0);
649    }
650
651    #[test]
652    fn permissive_mode() {
653        let payload = b"codeheap 'non-nmethods'.usage:0.3054|g|#env:dev,service:foobar,datacenter:localhost.dev";
654
655        let config = DogStatsDCodecConfiguration::default().with_permissive_mode(true);
656        match parse_dsd_metric_with_conf(payload, &config) {
657            Ok(result) => assert!(result.is_some(), "should not fail to materialize metric after decoding"),
658            Err(e) => panic!("should not have errored: {:?}", e),
659        }
660    }
661
662    #[test]
663    fn minimum_sample_rate() {
664        // Sample rate of 0.01 should lead to a count of 100 when handling a single value.
665        let minimum_sample_rate = SampleRate::try_from(0.01).unwrap();
666        let config = DogStatsDCodecConfiguration::default().with_minimum_sample_rate(minimum_sample_rate.rate());
667
668        let cases = [
669            // Worst case scenario: sample rate of zero, or "infinitely sampled".
670            "test:1|d|@0".to_string(),
671            // Bunch of values with different sample rates all below the minimum sample rate.
672            "test:1|d|@0.001".to_string(),
673            "test:1|d|@0.0005".to_string(),
674            "test:1|d|@0.00001".to_string(),
675            // Control: use the minimum sample rate.
676            format!("test:1|d|@{}", minimum_sample_rate.rate()),
677            // Bunch of values with _greater_ sampling rates than the minimum.
678            "test:1|d|@0.1".to_string(),
679            "test:1|d|@0.5".to_string(),
680            "test:1|d".to_string(),
681        ];
682
683        for input in cases {
684            let metric = parse_dsd_metric_with_conf(input.as_bytes(), &config)
685                .expect("Should not fail to parse metric.")
686                .expect("Metric should be present.");
687
688            let sketch = match metric.values() {
689                MetricValues::Distribution(points) => {
690                    points
691                        .into_iter()
692                        .next()
693                        .expect("Should have at least one sketch point.")
694                        .1
695                }
696                _ => panic!("Unexpected metric type."),
697            };
698
699            assert!(sketch.count() as u64 <= minimum_sample_rate.weight());
700        }
701    }
702
703    #[test]
704    fn client_origin_fields_ignored_when_disabled() {
705        let local_data = "cn-name-a";
706        let external_data = "it-false,cn-name-b,pu-810fe89d-da47-410b-8979-9154a40f8183";
707        let raw = format!("foo:1|c|c:{local_data}|e:{external_data}|card:high");
708
709        let config = DogStatsDCodecConfiguration::default().with_client_origin_detection(false);
710        let (_, packet) = parse_dogstatsd_metric(raw.as_bytes(), &config).expect("should not fail to parse");
711
712        assert_eq!(packet.local_data, None);
713        assert_eq!(packet.external_data, None);
714        assert_eq!(packet.cardinality, None);
715    }
716
717    #[test]
718    fn non_finite_metric_values_are_dropped() {
719        // Non-finite float values (NaN, ±Inf) are silently dropped at parse time with a debug log.
720        // The Datadog Agent's trace agent sends NaN gauges (e.g. encode_ms.avg) when a flush
721        // window has zero operations, producing 0.0/0.0 in Go. The parse succeeds but yields
722        // zero valid points; handle_frame then returns Ok(None) for zero-point packets.
723        let config = DogStatsDCodecConfiguration::default();
724        let cases = ["my.gauge:NaN|g", "my.gauge:inf|g", "my.gauge:-inf|g"];
725
726        for input in &cases {
727            let (_, packet) = parse_dogstatsd_metric(input.as_bytes(), &config)
728                .unwrap_or_else(|_| panic!("should parse without error: {input}"));
729            assert_eq!(
730                packet.num_points, 0,
731                "non-finite value should be dropped, leaving 0 valid points: {input}"
732            );
733        }
734    }
735
736    proptest! {
737        #![proptest_config(ProptestConfig::with_cases(1000))]
738        #[test]
739        fn property_test_malicious_input_non_exhaustive(input in arb_vec(0..255u8, 0..1000)) {
740            // We're testing that the parser is resilient to malicious input, which means that it should not panic or
741            // crash when given input that's not well-formed.
742            //
743            // As this is a property test, it is _not_ exhaustive but generally should catch simple issues that manage
744            // to escape the unit tests. This is left here for the sole reason of incrementally running this every time
745            // all tests are run, in the hopes of potentially catching an issue that might have been missed.
746            let _ = parse_dsd_metric(&input);
747        }
748    }
749}