Skip to main content

saluki_io/deser/codec/dogstatsd/
metric.rs

1use nom::{
2    branch::alt,
3    bytes::complete::{tag, take_while1},
4    combinator::{all_consuming, map, map_res},
5    error::{Error, ErrorKind},
6    number::complete::double,
7    sequence::{preceded, separated_pair, terminated},
8    IResult, Parser as _,
9};
10use saluki_context::{origin::OriginTagCardinality, tags::RawTags};
11use saluki_core::data_model::event::metric::*;
12use tracing::{debug, warn};
13
14use super::{helpers::*, DogStatsDCodecConfiguration, NomParserError};
15
16enum MetricType {
17    Count,
18    Gauge,
19    Set,
20    Timer,
21    Histogram,
22    Distribution,
23}
24
25/// A DogStatsD metric packet.
26///
27/// See the [DogStatsD datagram format][datagram] reference for the wire format and the protocol versions that
28/// introduced each field.
29///
30/// [datagram]: https://docs.datadoghq.com/extend/dogstatsd/datagram_shell/?tab=metrics
31pub struct MetricPacket<'a> {
32    /// Name of the metric.
33    pub metric_name: &'a str,
34
35    /// Tags attached to the metric.
36    pub tags: RawTags<'a>,
37
38    /// The metric kind (counter, gauge, rate, etc.) and its sample points.
39    pub values: MetricValues,
40
41    /// Number of sample points represented by `values`.
42    pub num_points: u64,
43
44    /// Optional Unix timestamp for the sample, in seconds (protocol v1.3).
45    pub timestamp: Option<u64>,
46
47    /// Local Data attached to the metric, carried in the `c:` field (protocol v1.2, extended in v1.4).
48    ///
49    /// Identifies the workload that emitted the metric. Carries a container ID (`ci-<id>`) or, when unavailable, a
50    /// cgroup node inode (`in-<inode>`).
51    pub local_data: Option<&'a str>,
52
53    /// External Data attached to the metric, carried in the `e:` field (protocol v1.5).
54    ///
55    /// Used to convey a richer blob of workload identity data resolved by the receiver.
56    pub external_data: Option<&'a str>,
57
58    /// Cardinality hint for origin tag enrichment, carried in the `card:` field (protocol v1.6).
59    ///
60    /// Specifies which origin tags the receiver should attach to the metric.
61    pub cardinality: Option<OriginTagCardinality>,
62
63    /// Unit for this metric, if any.
64    pub unit: Option<&'static str>,
65}
66
67#[inline]
68pub fn parse_dogstatsd_metric<'a>(
69    input: &'a [u8], config: &DogStatsDCodecConfiguration,
70) -> IResult<&'a [u8], MetricPacket<'a>> {
71    // We always parse the metric name and value(s) first, where value is both the kind (counter, gauge, etc) and the
72    // actual value itself.
73    let metric_name_parser = if config.permissive {
74        permissive_metric_name
75    } else {
76        ascii_alphanum_and_seps
77    };
78    let (remaining, (metric_name, (metric_type, raw_metric_values))) =
79        separated_pair(metric_name_parser, tag(":"), raw_metric_values).parse(input)?;
80
81    // At this point, we may have some of this additional data, and if so, we also then would have a pipe separator at
82    // the very front, which we'd want to consume before going further.
83    //
84    // After that, we simply split the remaining bytes by the pipe separator, and then try and parse each chunk to see
85    // if it's any of the protocol extensions we know of.
86    let mut maybe_sample_rate = None;
87    let mut maybe_tags = None;
88    let mut maybe_local_data = None;
89    let mut maybe_timestamp = None;
90    let mut maybe_external_data = None;
91    let mut maybe_cardinality = None;
92
93    let remaining = if !remaining.is_empty() {
94        let (mut remaining, _) = tag("|")(remaining)?;
95
96        while let Some((chunk, tail)) = split_at_delimiter(remaining, b'|') {
97            if chunk.is_empty() {
98                break;
99            }
100
101            match chunk[0] {
102                // Sample rate: indicates client-side sampling of this metric which will need to be "reinflated" at some
103                // point downstream to calculate the true metric value.
104                b'@' => {
105                    let (_, sample_rate) =
106                        all_consuming(preceded(tag("@"), map_res(double, sample_rate(metric_name, config))))
107                            .parse(chunk)?;
108
109                    maybe_sample_rate = Some(sample_rate);
110                }
111                // Tags: additional tags to be added to the metric.
112                b'#' => {
113                    let (_, tags) = all_consuming(preceded(tag("#"), tags(config))).parse(chunk)?;
114                    maybe_tags = Some(tags);
115                }
116                // Local Data: client-provided data used for resolving the entity ID that this metric originated from.
117                b'c' if chunk.len() > 1 && chunk[1] == b':' && config.client_origin_detection => {
118                    let (_, local_data) = all_consuming(preceded(tag("c:"), local_data)).parse(chunk)?;
119                    maybe_local_data = Some(local_data);
120                }
121                // Timestamp: client-provided timestamp for the metric, relative to the Unix epoch, in seconds.
122                b'T' if config.timestamps => {
123                    let (_, timestamp) = all_consuming(preceded(tag("T"), unix_timestamp)).parse(chunk)?;
124                    maybe_timestamp = Some(timestamp);
125                }
126                // External Data: client-provided data used for resolving the entity ID that this metric originated from.
127                b'e' if chunk.len() > 1 && chunk[1] == b':' && config.client_origin_detection => {
128                    let (_, external_data) = all_consuming(preceded(tag("e:"), external_data)).parse(chunk)?;
129                    maybe_external_data = Some(external_data);
130                }
131                // Cardinality: client-provided cardinality for the metric.
132                b'c' if chunk.starts_with(CARDINALITY_PREFIX) && config.client_origin_detection => {
133                    let (_, cardinality) = cardinality(chunk)?;
134                    maybe_cardinality = cardinality;
135                }
136                _ => {
137                    // We don't know what this is, so we just skip it.
138                    //
139                    // TODO: Should we throw an error, warn, or be silently permissive?
140                }
141            }
142
143            remaining = tail;
144        }
145
146        // TODO: Similarly to the above comment, should having any remaining data here cause us to throw an error, warn,
147        // or be silently permissive?
148
149        remaining
150    } else {
151        remaining
152    };
153
154    // Capture the unit from the metric type before it is erased into a MetricValues variant.
155    // Only timing metrics carry an implicit unit; all other types have no unit.
156    let maybe_unit = if matches!(metric_type, MetricType::Timer) {
157        Some("millisecond")
158    } else {
159        None
160    };
161
162    let effective_sample_rate = if matches!(&metric_type, MetricType::Count) && maybe_timestamp.is_some() {
163        // Match the Datadog Agent no-aggregation pipeline: timestamped DogStatsD counts are forwarded as
164        // pre-aggregated points, so their sample rate is not used to reinflate the count value.
165        None
166    } else {
167        maybe_sample_rate
168    };
169
170    let (num_points, mut metric_values) =
171        metric_values_from_raw(raw_metric_values, metric_type, effective_sample_rate)?;
172
173    // If we got a timestamp, apply it to all metric values.
174    if let Some(timestamp) = maybe_timestamp {
175        metric_values.set_timestamp(timestamp);
176    }
177
178    let tags = maybe_tags.unwrap_or_else(RawTags::empty);
179
180    Ok((
181        remaining,
182        MetricPacket {
183            metric_name,
184            tags,
185            values: metric_values,
186            num_points,
187            timestamp: maybe_timestamp,
188            local_data: maybe_local_data,
189            external_data: maybe_external_data,
190            cardinality: maybe_cardinality,
191            unit: maybe_unit,
192        },
193    ))
194}
195
196#[inline]
197fn permissive_metric_name(input: &[u8]) -> IResult<&[u8], &str> {
198    // Essentially, any ASCII character that is printable and isn't `:` is allowed here.
199    let valid_char = |c: u8| c > 31 && c < 128 && c != b':';
200    map(take_while1(valid_char), |b| {
201        // SAFETY: We know the bytes in `b` can only be comprised of ASCII characters, which ensures that it's valid to
202        // interpret the bytes directly as UTF-8.
203        unsafe { std::str::from_utf8_unchecked(b) }
204    })
205    .parse(input)
206}
207
208#[inline]
209fn sample_rate<'a>(
210    metric_name: &'a str, config: &'a DogStatsDCodecConfiguration,
211) -> impl Fn(f64) -> Result<SampleRate, &'static str> + 'a {
212    let minimum_sample_rate = config.minimum_sample_rate;
213
214    move |mut raw_sample_rate| {
215        if raw_sample_rate < minimum_sample_rate {
216            raw_sample_rate = minimum_sample_rate;
217            warn!(
218                "Sample rate for metric '{}' is below minimum of {}. Clamping to minimum.",
219                metric_name, minimum_sample_rate
220            );
221        }
222        SampleRate::try_from(raw_sample_rate)
223    }
224}
225
226#[inline]
227fn raw_metric_values(input: &[u8]) -> IResult<&[u8], (MetricType, &[u8])> {
228    let (remaining, raw_values) = terminated(take_while1(|b| b != b'|'), tag("|")).parse(input)?;
229    let (remaining, raw_kind) = alt((tag("g"), tag("c"), tag("ms"), tag("h"), tag("s"), tag("d"))).parse(remaining)?;
230
231    // Make sure the raw value(s) are valid UTF-8 before we use them later on.
232    if raw_values.is_empty() || simdutf8::basic::from_utf8(raw_values).is_err() {
233        return Err(nom::Err::Error(Error::new(raw_values, ErrorKind::Verify)));
234    }
235
236    let metric_type = match raw_kind {
237        b"c" => MetricType::Count,
238        b"g" => MetricType::Gauge,
239        b"s" => MetricType::Set,
240        b"ms" => MetricType::Timer,
241        b"h" => MetricType::Histogram,
242        b"d" => MetricType::Distribution,
243        _ => unreachable!("should be constrained by alt parser"),
244    };
245
246    Ok((remaining, (metric_type, raw_values)))
247}
248
249#[inline]
250fn metric_values_from_raw(
251    input: &[u8], metric_type: MetricType, sample_rate: Option<SampleRate>,
252) -> Result<(u64, MetricValues), NomParserError<'_>> {
253    let mut num_points = 0;
254    let floats = FloatIter::new(input).inspect(|_| num_points += 1);
255
256    let values = match metric_type {
257        MetricType::Count => MetricValues::counter_sampled_fallible(floats, sample_rate)?,
258        MetricType::Gauge => MetricValues::gauge_fallible(floats)?,
259        MetricType::Set => {
260            num_points = 1;
261
262            // SAFETY: We've already checked above that `input` is valid UTF-8.
263            let value = unsafe { std::str::from_utf8_unchecked(input) };
264            MetricValues::set(value.to_string())
265        }
266        MetricType::Timer | MetricType::Histogram => MetricValues::histogram_sampled_fallible(floats, sample_rate)?,
267        MetricType::Distribution => MetricValues::distribution_sampled_fallible(floats, sample_rate)?,
268    };
269
270    Ok((num_points, values))
271}
272
273struct FloatIter<'a> {
274    raw_values: &'a [u8],
275}
276
277impl<'a> FloatIter<'a> {
278    fn new(raw_values: &'a [u8]) -> Self {
279        Self { raw_values }
280    }
281}
282
283impl<'a> Iterator for FloatIter<'a> {
284    type Item = Result<f64, NomParserError<'a>>;
285
286    fn next(&mut self) -> Option<Self::Item> {
287        loop {
288            if self.raw_values.is_empty() {
289                return None;
290            }
291
292            let (raw_value, tail) = split_at_delimiter(self.raw_values, b':')?;
293            self.raw_values = tail;
294
295            // SAFETY: The caller that creates `ValueIter` is responsible for ensuring that the entire byte slice is valid
296            // UTF-8.
297            let value_s = unsafe { std::str::from_utf8_unchecked(raw_value) };
298            match value_s.parse::<f64>() {
299                Ok(value) if value.is_finite() => return Some(Ok(value)),
300                Ok(_) => {
301                    debug!(value = value_s, "Dropping non-finite DogStatsD metric value.");
302                }
303                Err(_) => return Some(Err(nom::Err::Error(Error::new(raw_value, ErrorKind::Float)))),
304            }
305        }
306    }
307}
308
309#[cfg(test)]
310mod tests {
311    use proptest::{collection::vec as arb_vec, prelude::*};
312    use saluki_context::{
313        origin::OriginTagCardinality,
314        tags::{SharedTagSet, Tag},
315        Context,
316    };
317    use saluki_core::data_model::event::metric::*;
318
319    use super::{parse_dogstatsd_metric, DogStatsDCodecConfiguration};
320
321    type OptionalNomResult<'input, T> = Result<Option<T>, nom::Err<nom::error::Error<&'input [u8]>>>;
322
323    fn parse_dsd_metric(input: &[u8]) -> OptionalNomResult<'_, Metric> {
324        let default_config = DogStatsDCodecConfiguration::default();
325        parse_dsd_metric_with_conf(input, &default_config)
326    }
327
328    fn parse_dsd_metric_with_conf<'input>(
329        input: &'input [u8], config: &DogStatsDCodecConfiguration,
330    ) -> OptionalNomResult<'input, Metric> {
331        let (remaining, packet) = parse_dogstatsd_metric(input, config)?;
332        assert!(remaining.is_empty());
333
334        let tags = packet.tags.into_iter().map(Tag::from).collect::<SharedTagSet>();
335        let context = Context::from_parts(packet.metric_name, tags);
336
337        Ok(Some(Metric::from_parts(
338            context,
339            packet.values,
340            MetricMetadata::default(),
341        )))
342    }
343
344    #[track_caller]
345    fn check_basic_metric_eq(expected: Metric, actual: Option<Metric>) -> Metric {
346        let actual = actual.expect("event should not have been None");
347        assert_eq!(expected.context(), actual.context());
348        assert_eq!(expected.values(), actual.values());
349        assert_eq!(expected.metadata(), actual.metadata());
350        actual
351    }
352
353    #[test]
354    fn basic_metric() {
355        let name = "my.counter";
356        let value = 1.0;
357        let raw = format!("{}:{}|c", name, value);
358        let expected = Metric::counter(name, value);
359        let actual = parse_dsd_metric(raw.as_bytes()).expect("should not fail to parse");
360        check_basic_metric_eq(expected, actual);
361
362        let name = "my.gauge";
363        let value = 2.0;
364        let raw = format!("{}:{}|g", name, value);
365        let expected = Metric::gauge(name, value);
366        let actual = parse_dsd_metric(raw.as_bytes()).expect("should not fail to parse");
367        check_basic_metric_eq(expected, actual);
368
369        // Special case where we check this for both timers and histograms since we treat them both the same when
370        // parsing.
371        let name = "my.timer_or_histogram";
372        let value = 3.0;
373        for kind in &["ms", "h"] {
374            let raw = format!("{}:{}|{}", name, value, kind);
375            let expected = Metric::histogram(name, value);
376            let actual = parse_dsd_metric(raw.as_bytes()).expect("should not fail to parse");
377            check_basic_metric_eq(expected, actual);
378        }
379
380        let distribution_name = "my.distribution";
381        let distribution_value = 3.0;
382        let distribution_raw = format!("{}:{}|d", distribution_name, distribution_value);
383        let distribution_expected = Metric::distribution(distribution_name, distribution_value);
384        let distribution_actual = parse_dsd_metric(distribution_raw.as_bytes()).expect("should not fail to parse");
385        check_basic_metric_eq(distribution_expected, distribution_actual);
386
387        let set_name = "my.set";
388        let set_value = "value";
389        let set_raw = format!("{}:{}|s", set_name, set_value);
390        let set_expected = Metric::set(set_name, set_value);
391        let set_actual = parse_dsd_metric(set_raw.as_bytes()).expect("should not fail to parse");
392        check_basic_metric_eq(set_expected, set_actual);
393    }
394
395    #[test]
396    fn metric_unit() {
397        let config = DogStatsDCodecConfiguration::default();
398
399        // Timing metrics must carry an implicit millisecond unit.
400        let (_, packet) = parse_dogstatsd_metric(b"my.timer:1.0|ms", &config).expect("should not fail to parse");
401        assert_eq!(packet.unit, Some("millisecond"));
402
403        // All other metric types must have no unit.
404        for kind in &["c", "g", "h", "d", "s"] {
405            let raw = format!("my.metric:1.0|{}", kind);
406            let (_, packet) = parse_dogstatsd_metric(raw.as_bytes(), &config).expect("should not fail to parse");
407            assert_eq!(packet.unit, None, "expected no unit for metric type '{}'", kind);
408        }
409    }
410
411    #[test]
412    fn metric_tags() {
413        let name = "my.counter";
414        let value = 1.0;
415        let tags = ["tag1", "tag2"];
416        let raw = format!("{}:{}|c|#{}", name, value, tags.join(","));
417        let expected = Metric::counter((name, &tags[..]), value);
418
419        let actual = parse_dsd_metric(raw.as_bytes()).expect("should not fail to parse");
420        check_basic_metric_eq(expected, actual);
421    }
422
423    #[test]
424    fn metric_sample_rate() {
425        let name = "my.counter";
426        let value = 1.0;
427        let sample_rate = 0.5;
428        let raw = format!("{}:{}|c|@{}", name, value, sample_rate);
429
430        let value_sample_rate_adjusted = value * (1.0 / sample_rate);
431        let expected = Metric::counter(name, value_sample_rate_adjusted);
432
433        let actual = parse_dsd_metric(raw.as_bytes()).expect("should not fail to parse");
434        let actual = check_basic_metric_eq(expected, actual);
435        let values = match actual.values() {
436            MetricValues::Counter(values) => values
437                .into_iter()
438                .map(|(ts, v)| (ts.map(|v| v.get()).unwrap_or(0), v))
439                .collect::<Vec<_>>(),
440            _ => panic!("expected counter values"),
441        };
442
443        assert_eq!(values.len(), 1);
444        assert_eq!(values[0], (0, value_sample_rate_adjusted));
445    }
446
447    #[test]
448    fn metric_timestamped_count_sample_rate_matches_no_aggregation_pipeline() {
449        let name = "my.counter";
450        let value = 2.0;
451        let sample_rate = 0.25;
452        let timestamp = 1234567890;
453        let raw = format!("{}:{}|c|@{}|T{}", name, value, sample_rate, timestamp);
454
455        let mut expected = Metric::counter(name, value);
456        expected.values_mut().set_timestamp(timestamp);
457
458        let actual = parse_dsd_metric(raw.as_bytes()).expect("should not fail to parse");
459        let actual = check_basic_metric_eq(expected, actual);
460        let values = match actual.values() {
461            MetricValues::Counter(values) => values
462                .into_iter()
463                .map(|(ts, v)| (ts.map(|v| v.get()).unwrap_or(0), v))
464                .collect::<Vec<_>>(),
465            _ => panic!("expected counter values"),
466        };
467
468        assert_eq!(values.len(), 1);
469        assert_eq!(values[0], (timestamp, value));
470    }
471
472    #[test]
473    fn metric_local_data() {
474        let name = "my.counter";
475        let value = 1.0;
476        let local_data = "abcdef123456";
477        let raw = format!("{}:{}|c|c:{}", name, value, local_data);
478        let expected = Metric::counter(name, value);
479
480        let actual = parse_dsd_metric(raw.as_bytes()).expect("should not fail to parse");
481        check_basic_metric_eq(expected, actual);
482
483        // We need client_origin_detection on in order to parse local_data, external_data, and cardinality fields
484        let config = DogStatsDCodecConfiguration::default().with_client_origin_detection(true);
485        let (_, packet) = parse_dogstatsd_metric(raw.as_bytes(), &config).expect("should not fail to parse");
486        assert_eq!(packet.local_data, Some(local_data));
487    }
488
489    #[test]
490    fn metric_unix_timestamp() {
491        let name = "my.counter";
492        let value = 1.0;
493        let timestamp = 1234567890;
494        let raw = format!("{}:{}|c|T{}", name, value, timestamp);
495        let mut expected = Metric::counter(name, value);
496        expected.values_mut().set_timestamp(timestamp);
497
498        let actual = parse_dsd_metric(raw.as_bytes()).expect("should not fail to parse");
499        check_basic_metric_eq(expected, actual);
500    }
501
502    #[test]
503    fn metric_external_data() {
504        let name = "my.counter";
505        let value = 1.0;
506        let external_data = "it-false,cn-redis,pu-810fe89d-da47-410b-8979-9154a40f8183";
507        let raw = format!("{}:{}|c|e:{}", name, value, external_data);
508        let expected = Metric::counter(name, value);
509
510        let actual = parse_dsd_metric(raw.as_bytes()).expect("should not fail to parse");
511        check_basic_metric_eq(expected, actual);
512
513        // We need client_origin_detection on in order to parse local_data, external_data, and cardinality fields
514        let config = DogStatsDCodecConfiguration::default().with_client_origin_detection(true);
515        let (_, packet) = parse_dogstatsd_metric(raw.as_bytes(), &config).expect("should not fail to parse");
516        assert_eq!(packet.external_data, Some(external_data));
517    }
518
519    #[test]
520    fn metric_cardinality() {
521        let name = "my.counter";
522        let value = 1.0;
523        let cardinality = "high";
524        let raw = format!("{}:{}|c|card:{}", name, value, cardinality);
525        let expected = Metric::counter(name, value);
526
527        let actual = parse_dsd_metric(raw.as_bytes()).expect("should not fail to parse");
528        check_basic_metric_eq(expected, actual);
529
530        // We need client_origin_detection on in order to parse local_data, external_data, and cardinality fields
531        let config = DogStatsDCodecConfiguration::default().with_client_origin_detection(true);
532        let (_, packet) = parse_dogstatsd_metric(raw.as_bytes(), &config).expect("should not fail to parse");
533        assert_eq!(packet.cardinality, Some(OriginTagCardinality::High));
534    }
535
536    #[test]
537    fn metric_multiple_extensions() {
538        let name = "my.counter";
539        let value = 1.0;
540        let sample_rate = 0.5;
541        let tags = ["tag1", "tag2"];
542        let local_data = "abcdef123456";
543        let external_data = "it-false,cn-redis,pu-810fe89d-da47-410b-8979-9154a40f8183";
544        let cardinality = "orchestrator";
545        let timestamp = 1234567890;
546        let raw = format!(
547            "{}:{}|c|#{}|@{}|c:{}|e:{}|card:{}|T{}",
548            name,
549            value,
550            tags.join(","),
551            sample_rate,
552            local_data,
553            external_data,
554            cardinality,
555            timestamp
556        );
557
558        let mut expected = Metric::counter((name, &tags[..]), value);
559        expected.values_mut().set_timestamp(timestamp);
560
561        let actual = parse_dsd_metric(raw.as_bytes()).expect("should not fail to parse");
562        let actual = check_basic_metric_eq(expected, actual);
563        let values = match actual.values() {
564            MetricValues::Counter(values) => values
565                .into_iter()
566                .map(|(ts, v)| (ts.map(|v| v.get()).unwrap_or(0), v))
567                .collect::<Vec<_>>(),
568            _ => panic!("expected counter values"),
569        };
570
571        assert_eq!(values.len(), 1);
572        assert_eq!(values[0], (timestamp, value));
573
574        // We need client_origin_detection on in order to parse local_data, external_data, and cardinality fields
575        let config = DogStatsDCodecConfiguration::default().with_client_origin_detection(true);
576        let (_, packet) = parse_dogstatsd_metric(raw.as_bytes(), &config).expect("should not fail to parse");
577        assert_eq!(packet.local_data, Some(local_data));
578        assert_eq!(packet.external_data, Some(external_data));
579        assert_eq!(packet.cardinality, Some(OriginTagCardinality::Orchestrator));
580    }
581
582    #[test]
583    fn multivalue_metrics() {
584        let name = "my.counter";
585        let values = [1.0, 2.0, 3.0];
586        let values_stringified = values.iter().map(|v| v.to_string()).collect::<Vec<_>>();
587        let raw = format!("{}:{}|c", name, values_stringified.join(":"));
588        let expected = Metric::counter(name, values);
589        let actual = parse_dsd_metric(raw.as_bytes()).expect("should not fail to parse");
590        check_basic_metric_eq(expected, actual);
591
592        let name = "my.gauge";
593        let values = [42.0, 5.0, -18.0];
594        let values_stringified = values.iter().map(|v| v.to_string()).collect::<Vec<_>>();
595        let raw = format!("{}:{}|g", name, values_stringified.join(":"));
596        let expected = Metric::gauge(name, values);
597        let actual = parse_dsd_metric(raw.as_bytes()).expect("should not fail to parse");
598        check_basic_metric_eq(expected, actual);
599
600        // Special case where we check this for both timers and histograms since we treat them both the same when
601        // parsing.
602        //
603        // Additionally, we have an optimization to return a single distribution metric from multi-value payloads, so we
604        // also check here that only one metric is generated for multi-value timers/histograms/distributions.
605        let name = "my.timer_or_histogram";
606        let values = [27.5, 4.20, 80.085];
607        let values_stringified = values.iter().map(|v| v.to_string()).collect::<Vec<_>>();
608        for kind in &["ms", "h"] {
609            let raw = format!("{}:{}|{}", name, values_stringified.join(":"), kind);
610            let expected = Metric::histogram(name, values);
611            let actual = parse_dsd_metric(raw.as_bytes()).expect("should not fail to parse");
612            check_basic_metric_eq(expected, actual);
613        }
614
615        let name = "my.distribution";
616        let raw = format!("{}:{}|d", name, values_stringified.join(":"));
617        let expected = Metric::distribution(name, values);
618        let actual = parse_dsd_metric(raw.as_bytes()).expect("should not fail to parse");
619        check_basic_metric_eq(expected, actual);
620    }
621
622    #[test]
623    fn respects_maximum_tag_count() {
624        let input = b"foo:1|c|#tag1:value1,tag2:value2,tag3:value3";
625
626        let cases = [3, 2, 1];
627        for max_tag_count in cases {
628            let config = DogStatsDCodecConfiguration::default().with_maximum_tag_count(max_tag_count);
629
630            let metric = parse_dsd_metric_with_conf(input, &config)
631                .expect("should not fail to parse")
632                .expect("should not fail to intern");
633            assert_eq!(metric.context().tags().len(), max_tag_count);
634        }
635    }
636
637    #[test]
638    fn respects_maximum_tag_length() {
639        let input = b"foo:1|c|#tag1:short,tag2:medium,tag3:longlong";
640
641        let cases = [6, 5, 4];
642        for max_tag_length in cases {
643            let config = DogStatsDCodecConfiguration::default().with_maximum_tag_length(max_tag_length);
644
645            let metric = parse_dsd_metric_with_conf(input, &config)
646                .expect("should not fail to parse")
647                .expect("should not fail to intern");
648            for tag in metric.context().tags().into_iter() {
649                assert!(tag.len() <= max_tag_length);
650            }
651        }
652    }
653
654    #[test]
655    fn respects_read_timestamps() {
656        let input = b"foo:1|c|T1234567890";
657
658        let config = DogStatsDCodecConfiguration::default().with_timestamps(false);
659
660        let metric = parse_dsd_metric_with_conf(input, &config)
661            .expect("should not fail to parse")
662            .expect("should not fail to intern");
663
664        let value_timestamps = match metric.values() {
665            MetricValues::Counter(values) => values
666                .into_iter()
667                .map(|(ts, _)| ts.map(|v| v.get()).unwrap_or(0))
668                .collect::<Vec<_>>(),
669            _ => panic!("expected counter values"),
670        };
671
672        assert_eq!(value_timestamps.len(), 1);
673        assert_eq!(value_timestamps[0], 0);
674    }
675
676    #[test]
677    fn permissive_mode() {
678        let payload = b"codeheap 'non-nmethods'.usage:0.3054|g|#env:dev,service:foobar,datacenter:localhost.dev";
679
680        let config = DogStatsDCodecConfiguration::default().with_permissive_mode(true);
681        match parse_dsd_metric_with_conf(payload, &config) {
682            Ok(result) => assert!(result.is_some(), "should not fail to materialize metric after decoding"),
683            Err(e) => panic!("should not have errored: {:?}", e),
684        }
685    }
686
687    #[test]
688    fn minimum_sample_rate() {
689        // Sample rate of 0.01 should lead to a count of 100 when handling a single value.
690        let minimum_sample_rate = SampleRate::try_from(0.01).unwrap();
691        let config = DogStatsDCodecConfiguration::default().with_minimum_sample_rate(minimum_sample_rate.rate());
692
693        let cases = [
694            // Worst case scenario: sample rate of zero, or "infinitely sampled".
695            "test:1|d|@0".to_string(),
696            // Bunch of values with different sample rates all below the minimum sample rate.
697            "test:1|d|@0.001".to_string(),
698            "test:1|d|@0.0005".to_string(),
699            "test:1|d|@0.00001".to_string(),
700            // Control: use the minimum sample rate.
701            format!("test:1|d|@{}", minimum_sample_rate.rate()),
702            // Bunch of values with _greater_ sampling rates than the minimum.
703            "test:1|d|@0.1".to_string(),
704            "test:1|d|@0.5".to_string(),
705            "test:1|d".to_string(),
706        ];
707
708        for input in cases {
709            let metric = parse_dsd_metric_with_conf(input.as_bytes(), &config)
710                .expect("Should not fail to parse metric.")
711                .expect("Metric should be present.");
712
713            let sketch = match metric.values() {
714                MetricValues::Distribution(points) => {
715                    points
716                        .into_iter()
717                        .next()
718                        .expect("Should have at least one sketch point.")
719                        .1
720                }
721                _ => panic!("Unexpected metric type."),
722            };
723
724            assert!(sketch.count() as u64 <= minimum_sample_rate.weight());
725        }
726    }
727
728    #[test]
729    fn client_origin_fields_ignored_when_disabled() {
730        let local_data = "cn-name-a";
731        let external_data = "it-false,cn-name-b,pu-810fe89d-da47-410b-8979-9154a40f8183";
732        let raw = format!("foo:1|c|c:{local_data}|e:{external_data}|card:high");
733
734        let config = DogStatsDCodecConfiguration::default().with_client_origin_detection(false);
735        let (_, packet) = parse_dogstatsd_metric(raw.as_bytes(), &config).expect("should not fail to parse");
736
737        assert_eq!(packet.local_data, None);
738        assert_eq!(packet.external_data, None);
739        assert_eq!(packet.cardinality, None);
740    }
741
742    #[test]
743    fn non_finite_metric_values_are_dropped() {
744        // Non-finite float values (NaN, ±Inf) are silently dropped at parse time with a debug log.
745        // The Datadog Agent's trace agent sends NaN gauges (e.g. encode_ms.avg) when a flush
746        // window has zero operations, producing 0.0/0.0 in Go. The parse succeeds but yields
747        // zero valid points; handle_frame then returns Ok(None) for zero-point packets.
748        let config = DogStatsDCodecConfiguration::default();
749        let cases = ["my.gauge:NaN|g", "my.gauge:inf|g", "my.gauge:-inf|g"];
750
751        for input in &cases {
752            let (_, packet) = parse_dogstatsd_metric(input.as_bytes(), &config)
753                .unwrap_or_else(|_| panic!("should parse without error: {input}"));
754            assert_eq!(
755                packet.num_points, 0,
756                "non-finite value should be dropped, leaving 0 valid points: {input}"
757            );
758        }
759    }
760
761    proptest! {
762        #![proptest_config(ProptestConfig::with_cases(1000))]
763        #[test]
764        fn property_test_malicious_input_non_exhaustive(input in arb_vec(0..255u8, 0..1000)) {
765            // We're testing that the parser is resilient to malicious input, which means that it should not panic or
766            // crash when given input that's not well-formed.
767            //
768            // As this is a property test, it is _not_ exhaustive but generally should catch simple issues that manage
769            // to escape the unit tests. This is left here for the sole reason of incrementally running this every time
770            // all tests are run, in the hopes of potentially catching an issue that might have been missed.
771            let _ = parse_dsd_metric(&input);
772        }
773    }
774}