1use nom::{
2 branch::alt,
3 bytes::complete::{tag, take_while1},
4 combinator::{all_consuming, map, map_res},
5 error::{Error, ErrorKind},
6 number::complete::double,
7 sequence::{preceded, separated_pair, terminated},
8 IResult, Parser as _,
9};
10use saluki_context::{origin::OriginTagCardinality, tags::RawTags};
11use saluki_core::data_model::event::metric::*;
12use tracing::{debug, warn};
13
14use super::{helpers::*, DogStatsDCodecConfiguration, NomParserError};
15
16enum MetricType {
17 Count,
18 Gauge,
19 Set,
20 Timer,
21 Histogram,
22 Distribution,
23}
24
25pub struct MetricPacket<'a> {
32 pub metric_name: &'a str,
34
35 pub tags: RawTags<'a>,
37
38 pub values: MetricValues,
40
41 pub num_points: u64,
43
44 pub timestamp: Option<u64>,
46
47 pub local_data: Option<&'a str>,
52
53 pub external_data: Option<&'a str>,
57
58 pub cardinality: Option<OriginTagCardinality>,
62
63 pub unit: Option<&'static str>,
65}
66
67#[inline]
68pub fn parse_dogstatsd_metric<'a>(
69 input: &'a [u8], config: &DogStatsDCodecConfiguration,
70) -> IResult<&'a [u8], MetricPacket<'a>> {
71 let metric_name_parser = if config.permissive {
74 permissive_metric_name
75 } else {
76 ascii_alphanum_and_seps
77 };
78 let (remaining, (metric_name, (metric_type, raw_metric_values))) =
79 separated_pair(metric_name_parser, tag(":"), raw_metric_values).parse(input)?;
80
81 let mut maybe_sample_rate = None;
87 let mut maybe_tags = None;
88 let mut maybe_local_data = None;
89 let mut maybe_timestamp = None;
90 let mut maybe_external_data = None;
91 let mut maybe_cardinality = None;
92
93 let remaining = if !remaining.is_empty() {
94 let (mut remaining, _) = tag("|")(remaining)?;
95
96 while let Some((chunk, tail)) = split_at_delimiter(remaining, b'|') {
97 if chunk.is_empty() {
98 break;
99 }
100
101 match chunk[0] {
102 b'@' => {
105 let (_, sample_rate) =
106 all_consuming(preceded(tag("@"), map_res(double, sample_rate(metric_name, config))))
107 .parse(chunk)?;
108
109 maybe_sample_rate = Some(sample_rate);
110 }
111 b'#' => {
113 let (_, tags) = all_consuming(preceded(tag("#"), tags(config))).parse(chunk)?;
114 maybe_tags = Some(tags);
115 }
116 b'c' if chunk.len() > 1 && chunk[1] == b':' && config.client_origin_detection => {
118 let (_, local_data) = all_consuming(preceded(tag("c:"), local_data)).parse(chunk)?;
119 maybe_local_data = Some(local_data);
120 }
121 b'T' if config.timestamps => {
123 let (_, timestamp) = all_consuming(preceded(tag("T"), unix_timestamp)).parse(chunk)?;
124 maybe_timestamp = Some(timestamp);
125 }
126 b'e' if chunk.len() > 1 && chunk[1] == b':' && config.client_origin_detection => {
128 let (_, external_data) = all_consuming(preceded(tag("e:"), external_data)).parse(chunk)?;
129 maybe_external_data = Some(external_data);
130 }
131 b'c' if chunk.starts_with(CARDINALITY_PREFIX) && config.client_origin_detection => {
133 let (_, cardinality) = cardinality(chunk)?;
134 maybe_cardinality = cardinality;
135 }
136 _ => {
137 }
141 }
142
143 remaining = tail;
144 }
145
146 remaining
150 } else {
151 remaining
152 };
153
154 let maybe_unit = if matches!(metric_type, MetricType::Timer) {
157 Some("millisecond")
158 } else {
159 None
160 };
161
162 let effective_sample_rate = if matches!(&metric_type, MetricType::Count) && maybe_timestamp.is_some() {
163 None
166 } else {
167 maybe_sample_rate
168 };
169
170 let (num_points, mut metric_values) =
171 metric_values_from_raw(raw_metric_values, metric_type, effective_sample_rate)?;
172
173 if let Some(timestamp) = maybe_timestamp {
175 metric_values.set_timestamp(timestamp);
176 }
177
178 let tags = maybe_tags.unwrap_or_else(RawTags::empty);
179
180 Ok((
181 remaining,
182 MetricPacket {
183 metric_name,
184 tags,
185 values: metric_values,
186 num_points,
187 timestamp: maybe_timestamp,
188 local_data: maybe_local_data,
189 external_data: maybe_external_data,
190 cardinality: maybe_cardinality,
191 unit: maybe_unit,
192 },
193 ))
194}
195
196#[inline]
197fn permissive_metric_name(input: &[u8]) -> IResult<&[u8], &str> {
198 let valid_char = |c: u8| c > 31 && c < 128 && c != b':';
200 map(take_while1(valid_char), |b| {
201 unsafe { std::str::from_utf8_unchecked(b) }
204 })
205 .parse(input)
206}
207
208#[inline]
209fn sample_rate<'a>(
210 metric_name: &'a str, config: &'a DogStatsDCodecConfiguration,
211) -> impl Fn(f64) -> Result<SampleRate, &'static str> + 'a {
212 let minimum_sample_rate = config.minimum_sample_rate;
213
214 move |mut raw_sample_rate| {
215 if raw_sample_rate < minimum_sample_rate {
216 raw_sample_rate = minimum_sample_rate;
217 warn!(
218 "Sample rate for metric '{}' is below minimum of {}. Clamping to minimum.",
219 metric_name, minimum_sample_rate
220 );
221 }
222 SampleRate::try_from(raw_sample_rate)
223 }
224}
225
226#[inline]
227fn raw_metric_values(input: &[u8]) -> IResult<&[u8], (MetricType, &[u8])> {
228 let (remaining, raw_values) = terminated(take_while1(|b| b != b'|'), tag("|")).parse(input)?;
229 let (remaining, raw_kind) = alt((tag("g"), tag("c"), tag("ms"), tag("h"), tag("s"), tag("d"))).parse(remaining)?;
230
231 if raw_values.is_empty() || simdutf8::basic::from_utf8(raw_values).is_err() {
233 return Err(nom::Err::Error(Error::new(raw_values, ErrorKind::Verify)));
234 }
235
236 let metric_type = match raw_kind {
237 b"c" => MetricType::Count,
238 b"g" => MetricType::Gauge,
239 b"s" => MetricType::Set,
240 b"ms" => MetricType::Timer,
241 b"h" => MetricType::Histogram,
242 b"d" => MetricType::Distribution,
243 _ => unreachable!("should be constrained by alt parser"),
244 };
245
246 Ok((remaining, (metric_type, raw_values)))
247}
248
249#[inline]
250fn metric_values_from_raw(
251 input: &[u8], metric_type: MetricType, sample_rate: Option<SampleRate>,
252) -> Result<(u64, MetricValues), NomParserError<'_>> {
253 let mut num_points = 0;
254 let floats = FloatIter::new(input).inspect(|_| num_points += 1);
255
256 let values = match metric_type {
257 MetricType::Count => MetricValues::counter_sampled_fallible(floats, sample_rate)?,
258 MetricType::Gauge => MetricValues::gauge_fallible(floats)?,
259 MetricType::Set => {
260 num_points = 1;
261
262 let value = unsafe { std::str::from_utf8_unchecked(input) };
264 MetricValues::set(value.to_string())
265 }
266 MetricType::Timer | MetricType::Histogram => MetricValues::histogram_sampled_fallible(floats, sample_rate)?,
267 MetricType::Distribution => MetricValues::distribution_sampled_fallible(floats, sample_rate)?,
268 };
269
270 Ok((num_points, values))
271}
272
273struct FloatIter<'a> {
274 raw_values: &'a [u8],
275}
276
277impl<'a> FloatIter<'a> {
278 fn new(raw_values: &'a [u8]) -> Self {
279 Self { raw_values }
280 }
281}
282
283impl<'a> Iterator for FloatIter<'a> {
284 type Item = Result<f64, NomParserError<'a>>;
285
286 fn next(&mut self) -> Option<Self::Item> {
287 loop {
288 if self.raw_values.is_empty() {
289 return None;
290 }
291
292 let (raw_value, tail) = split_at_delimiter(self.raw_values, b':')?;
293 self.raw_values = tail;
294
295 let value_s = unsafe { std::str::from_utf8_unchecked(raw_value) };
298 match value_s.parse::<f64>() {
299 Ok(value) if value.is_finite() => return Some(Ok(value)),
300 Ok(_) => {
301 debug!(value = value_s, "Dropping non-finite DogStatsD metric value.");
302 }
303 Err(_) => return Some(Err(nom::Err::Error(Error::new(raw_value, ErrorKind::Float)))),
304 }
305 }
306 }
307}
308
309#[cfg(test)]
310mod tests {
311 use proptest::{collection::vec as arb_vec, prelude::*};
312 use saluki_context::{
313 origin::OriginTagCardinality,
314 tags::{SharedTagSet, Tag},
315 Context,
316 };
317 use saluki_core::data_model::event::metric::*;
318
319 use super::{parse_dogstatsd_metric, DogStatsDCodecConfiguration};
320
321 type OptionalNomResult<'input, T> = Result<Option<T>, nom::Err<nom::error::Error<&'input [u8]>>>;
322
323 fn parse_dsd_metric(input: &[u8]) -> OptionalNomResult<'_, Metric> {
324 let default_config = DogStatsDCodecConfiguration::default();
325 parse_dsd_metric_with_conf(input, &default_config)
326 }
327
328 fn parse_dsd_metric_with_conf<'input>(
329 input: &'input [u8], config: &DogStatsDCodecConfiguration,
330 ) -> OptionalNomResult<'input, Metric> {
331 let (remaining, packet) = parse_dogstatsd_metric(input, config)?;
332 assert!(remaining.is_empty());
333
334 let tags = packet.tags.into_iter().map(Tag::from).collect::<SharedTagSet>();
335 let context = Context::from_parts(packet.metric_name, tags);
336
337 Ok(Some(Metric::from_parts(
338 context,
339 packet.values,
340 MetricMetadata::default(),
341 )))
342 }
343
344 #[track_caller]
345 fn check_basic_metric_eq(expected: Metric, actual: Option<Metric>) -> Metric {
346 let actual = actual.expect("event should not have been None");
347 assert_eq!(expected.context(), actual.context());
348 assert_eq!(expected.values(), actual.values());
349 assert_eq!(expected.metadata(), actual.metadata());
350 actual
351 }
352
353 #[test]
354 fn basic_metric() {
355 let name = "my.counter";
356 let value = 1.0;
357 let raw = format!("{}:{}|c", name, value);
358 let expected = Metric::counter(name, value);
359 let actual = parse_dsd_metric(raw.as_bytes()).expect("should not fail to parse");
360 check_basic_metric_eq(expected, actual);
361
362 let name = "my.gauge";
363 let value = 2.0;
364 let raw = format!("{}:{}|g", name, value);
365 let expected = Metric::gauge(name, value);
366 let actual = parse_dsd_metric(raw.as_bytes()).expect("should not fail to parse");
367 check_basic_metric_eq(expected, actual);
368
369 let name = "my.timer_or_histogram";
372 let value = 3.0;
373 for kind in &["ms", "h"] {
374 let raw = format!("{}:{}|{}", name, value, kind);
375 let expected = Metric::histogram(name, value);
376 let actual = parse_dsd_metric(raw.as_bytes()).expect("should not fail to parse");
377 check_basic_metric_eq(expected, actual);
378 }
379
380 let distribution_name = "my.distribution";
381 let distribution_value = 3.0;
382 let distribution_raw = format!("{}:{}|d", distribution_name, distribution_value);
383 let distribution_expected = Metric::distribution(distribution_name, distribution_value);
384 let distribution_actual = parse_dsd_metric(distribution_raw.as_bytes()).expect("should not fail to parse");
385 check_basic_metric_eq(distribution_expected, distribution_actual);
386
387 let set_name = "my.set";
388 let set_value = "value";
389 let set_raw = format!("{}:{}|s", set_name, set_value);
390 let set_expected = Metric::set(set_name, set_value);
391 let set_actual = parse_dsd_metric(set_raw.as_bytes()).expect("should not fail to parse");
392 check_basic_metric_eq(set_expected, set_actual);
393 }
394
395 #[test]
396 fn metric_unit() {
397 let config = DogStatsDCodecConfiguration::default();
398
399 let (_, packet) = parse_dogstatsd_metric(b"my.timer:1.0|ms", &config).expect("should not fail to parse");
401 assert_eq!(packet.unit, Some("millisecond"));
402
403 for kind in &["c", "g", "h", "d", "s"] {
405 let raw = format!("my.metric:1.0|{}", kind);
406 let (_, packet) = parse_dogstatsd_metric(raw.as_bytes(), &config).expect("should not fail to parse");
407 assert_eq!(packet.unit, None, "expected no unit for metric type '{}'", kind);
408 }
409 }
410
411 #[test]
412 fn metric_tags() {
413 let name = "my.counter";
414 let value = 1.0;
415 let tags = ["tag1", "tag2"];
416 let raw = format!("{}:{}|c|#{}", name, value, tags.join(","));
417 let expected = Metric::counter((name, &tags[..]), value);
418
419 let actual = parse_dsd_metric(raw.as_bytes()).expect("should not fail to parse");
420 check_basic_metric_eq(expected, actual);
421 }
422
423 #[test]
424 fn metric_sample_rate() {
425 let name = "my.counter";
426 let value = 1.0;
427 let sample_rate = 0.5;
428 let raw = format!("{}:{}|c|@{}", name, value, sample_rate);
429
430 let value_sample_rate_adjusted = value * (1.0 / sample_rate);
431 let expected = Metric::counter(name, value_sample_rate_adjusted);
432
433 let actual = parse_dsd_metric(raw.as_bytes()).expect("should not fail to parse");
434 let actual = check_basic_metric_eq(expected, actual);
435 let values = match actual.values() {
436 MetricValues::Counter(values) => values
437 .into_iter()
438 .map(|(ts, v)| (ts.map(|v| v.get()).unwrap_or(0), v))
439 .collect::<Vec<_>>(),
440 _ => panic!("expected counter values"),
441 };
442
443 assert_eq!(values.len(), 1);
444 assert_eq!(values[0], (0, value_sample_rate_adjusted));
445 }
446
447 #[test]
448 fn metric_timestamped_count_sample_rate_matches_no_aggregation_pipeline() {
449 let name = "my.counter";
450 let value = 2.0;
451 let sample_rate = 0.25;
452 let timestamp = 1234567890;
453 let raw = format!("{}:{}|c|@{}|T{}", name, value, sample_rate, timestamp);
454
455 let mut expected = Metric::counter(name, value);
456 expected.values_mut().set_timestamp(timestamp);
457
458 let actual = parse_dsd_metric(raw.as_bytes()).expect("should not fail to parse");
459 let actual = check_basic_metric_eq(expected, actual);
460 let values = match actual.values() {
461 MetricValues::Counter(values) => values
462 .into_iter()
463 .map(|(ts, v)| (ts.map(|v| v.get()).unwrap_or(0), v))
464 .collect::<Vec<_>>(),
465 _ => panic!("expected counter values"),
466 };
467
468 assert_eq!(values.len(), 1);
469 assert_eq!(values[0], (timestamp, value));
470 }
471
472 #[test]
473 fn metric_local_data() {
474 let name = "my.counter";
475 let value = 1.0;
476 let local_data = "abcdef123456";
477 let raw = format!("{}:{}|c|c:{}", name, value, local_data);
478 let expected = Metric::counter(name, value);
479
480 let actual = parse_dsd_metric(raw.as_bytes()).expect("should not fail to parse");
481 check_basic_metric_eq(expected, actual);
482
483 let config = DogStatsDCodecConfiguration::default().with_client_origin_detection(true);
485 let (_, packet) = parse_dogstatsd_metric(raw.as_bytes(), &config).expect("should not fail to parse");
486 assert_eq!(packet.local_data, Some(local_data));
487 }
488
489 #[test]
490 fn metric_unix_timestamp() {
491 let name = "my.counter";
492 let value = 1.0;
493 let timestamp = 1234567890;
494 let raw = format!("{}:{}|c|T{}", name, value, timestamp);
495 let mut expected = Metric::counter(name, value);
496 expected.values_mut().set_timestamp(timestamp);
497
498 let actual = parse_dsd_metric(raw.as_bytes()).expect("should not fail to parse");
499 check_basic_metric_eq(expected, actual);
500 }
501
502 #[test]
503 fn metric_external_data() {
504 let name = "my.counter";
505 let value = 1.0;
506 let external_data = "it-false,cn-redis,pu-810fe89d-da47-410b-8979-9154a40f8183";
507 let raw = format!("{}:{}|c|e:{}", name, value, external_data);
508 let expected = Metric::counter(name, value);
509
510 let actual = parse_dsd_metric(raw.as_bytes()).expect("should not fail to parse");
511 check_basic_metric_eq(expected, actual);
512
513 let config = DogStatsDCodecConfiguration::default().with_client_origin_detection(true);
515 let (_, packet) = parse_dogstatsd_metric(raw.as_bytes(), &config).expect("should not fail to parse");
516 assert_eq!(packet.external_data, Some(external_data));
517 }
518
519 #[test]
520 fn metric_cardinality() {
521 let name = "my.counter";
522 let value = 1.0;
523 let cardinality = "high";
524 let raw = format!("{}:{}|c|card:{}", name, value, cardinality);
525 let expected = Metric::counter(name, value);
526
527 let actual = parse_dsd_metric(raw.as_bytes()).expect("should not fail to parse");
528 check_basic_metric_eq(expected, actual);
529
530 let config = DogStatsDCodecConfiguration::default().with_client_origin_detection(true);
532 let (_, packet) = parse_dogstatsd_metric(raw.as_bytes(), &config).expect("should not fail to parse");
533 assert_eq!(packet.cardinality, Some(OriginTagCardinality::High));
534 }
535
536 #[test]
537 fn metric_multiple_extensions() {
538 let name = "my.counter";
539 let value = 1.0;
540 let sample_rate = 0.5;
541 let tags = ["tag1", "tag2"];
542 let local_data = "abcdef123456";
543 let external_data = "it-false,cn-redis,pu-810fe89d-da47-410b-8979-9154a40f8183";
544 let cardinality = "orchestrator";
545 let timestamp = 1234567890;
546 let raw = format!(
547 "{}:{}|c|#{}|@{}|c:{}|e:{}|card:{}|T{}",
548 name,
549 value,
550 tags.join(","),
551 sample_rate,
552 local_data,
553 external_data,
554 cardinality,
555 timestamp
556 );
557
558 let mut expected = Metric::counter((name, &tags[..]), value);
559 expected.values_mut().set_timestamp(timestamp);
560
561 let actual = parse_dsd_metric(raw.as_bytes()).expect("should not fail to parse");
562 let actual = check_basic_metric_eq(expected, actual);
563 let values = match actual.values() {
564 MetricValues::Counter(values) => values
565 .into_iter()
566 .map(|(ts, v)| (ts.map(|v| v.get()).unwrap_or(0), v))
567 .collect::<Vec<_>>(),
568 _ => panic!("expected counter values"),
569 };
570
571 assert_eq!(values.len(), 1);
572 assert_eq!(values[0], (timestamp, value));
573
574 let config = DogStatsDCodecConfiguration::default().with_client_origin_detection(true);
576 let (_, packet) = parse_dogstatsd_metric(raw.as_bytes(), &config).expect("should not fail to parse");
577 assert_eq!(packet.local_data, Some(local_data));
578 assert_eq!(packet.external_data, Some(external_data));
579 assert_eq!(packet.cardinality, Some(OriginTagCardinality::Orchestrator));
580 }
581
582 #[test]
583 fn multivalue_metrics() {
584 let name = "my.counter";
585 let values = [1.0, 2.0, 3.0];
586 let values_stringified = values.iter().map(|v| v.to_string()).collect::<Vec<_>>();
587 let raw = format!("{}:{}|c", name, values_stringified.join(":"));
588 let expected = Metric::counter(name, values);
589 let actual = parse_dsd_metric(raw.as_bytes()).expect("should not fail to parse");
590 check_basic_metric_eq(expected, actual);
591
592 let name = "my.gauge";
593 let values = [42.0, 5.0, -18.0];
594 let values_stringified = values.iter().map(|v| v.to_string()).collect::<Vec<_>>();
595 let raw = format!("{}:{}|g", name, values_stringified.join(":"));
596 let expected = Metric::gauge(name, values);
597 let actual = parse_dsd_metric(raw.as_bytes()).expect("should not fail to parse");
598 check_basic_metric_eq(expected, actual);
599
600 let name = "my.timer_or_histogram";
606 let values = [27.5, 4.20, 80.085];
607 let values_stringified = values.iter().map(|v| v.to_string()).collect::<Vec<_>>();
608 for kind in &["ms", "h"] {
609 let raw = format!("{}:{}|{}", name, values_stringified.join(":"), kind);
610 let expected = Metric::histogram(name, values);
611 let actual = parse_dsd_metric(raw.as_bytes()).expect("should not fail to parse");
612 check_basic_metric_eq(expected, actual);
613 }
614
615 let name = "my.distribution";
616 let raw = format!("{}:{}|d", name, values_stringified.join(":"));
617 let expected = Metric::distribution(name, values);
618 let actual = parse_dsd_metric(raw.as_bytes()).expect("should not fail to parse");
619 check_basic_metric_eq(expected, actual);
620 }
621
622 #[test]
623 fn respects_maximum_tag_count() {
624 let input = b"foo:1|c|#tag1:value1,tag2:value2,tag3:value3";
625
626 let cases = [3, 2, 1];
627 for max_tag_count in cases {
628 let config = DogStatsDCodecConfiguration::default().with_maximum_tag_count(max_tag_count);
629
630 let metric = parse_dsd_metric_with_conf(input, &config)
631 .expect("should not fail to parse")
632 .expect("should not fail to intern");
633 assert_eq!(metric.context().tags().len(), max_tag_count);
634 }
635 }
636
637 #[test]
638 fn respects_maximum_tag_length() {
639 let input = b"foo:1|c|#tag1:short,tag2:medium,tag3:longlong";
640
641 let cases = [6, 5, 4];
642 for max_tag_length in cases {
643 let config = DogStatsDCodecConfiguration::default().with_maximum_tag_length(max_tag_length);
644
645 let metric = parse_dsd_metric_with_conf(input, &config)
646 .expect("should not fail to parse")
647 .expect("should not fail to intern");
648 for tag in metric.context().tags().into_iter() {
649 assert!(tag.len() <= max_tag_length);
650 }
651 }
652 }
653
654 #[test]
655 fn respects_read_timestamps() {
656 let input = b"foo:1|c|T1234567890";
657
658 let config = DogStatsDCodecConfiguration::default().with_timestamps(false);
659
660 let metric = parse_dsd_metric_with_conf(input, &config)
661 .expect("should not fail to parse")
662 .expect("should not fail to intern");
663
664 let value_timestamps = match metric.values() {
665 MetricValues::Counter(values) => values
666 .into_iter()
667 .map(|(ts, _)| ts.map(|v| v.get()).unwrap_or(0))
668 .collect::<Vec<_>>(),
669 _ => panic!("expected counter values"),
670 };
671
672 assert_eq!(value_timestamps.len(), 1);
673 assert_eq!(value_timestamps[0], 0);
674 }
675
676 #[test]
677 fn permissive_mode() {
678 let payload = b"codeheap 'non-nmethods'.usage:0.3054|g|#env:dev,service:foobar,datacenter:localhost.dev";
679
680 let config = DogStatsDCodecConfiguration::default().with_permissive_mode(true);
681 match parse_dsd_metric_with_conf(payload, &config) {
682 Ok(result) => assert!(result.is_some(), "should not fail to materialize metric after decoding"),
683 Err(e) => panic!("should not have errored: {:?}", e),
684 }
685 }
686
687 #[test]
688 fn minimum_sample_rate() {
689 let minimum_sample_rate = SampleRate::try_from(0.01).unwrap();
691 let config = DogStatsDCodecConfiguration::default().with_minimum_sample_rate(minimum_sample_rate.rate());
692
693 let cases = [
694 "test:1|d|@0".to_string(),
696 "test:1|d|@0.001".to_string(),
698 "test:1|d|@0.0005".to_string(),
699 "test:1|d|@0.00001".to_string(),
700 format!("test:1|d|@{}", minimum_sample_rate.rate()),
702 "test:1|d|@0.1".to_string(),
704 "test:1|d|@0.5".to_string(),
705 "test:1|d".to_string(),
706 ];
707
708 for input in cases {
709 let metric = parse_dsd_metric_with_conf(input.as_bytes(), &config)
710 .expect("Should not fail to parse metric.")
711 .expect("Metric should be present.");
712
713 let sketch = match metric.values() {
714 MetricValues::Distribution(points) => {
715 points
716 .into_iter()
717 .next()
718 .expect("Should have at least one sketch point.")
719 .1
720 }
721 _ => panic!("Unexpected metric type."),
722 };
723
724 assert!(sketch.count() as u64 <= minimum_sample_rate.weight());
725 }
726 }
727
728 #[test]
729 fn client_origin_fields_ignored_when_disabled() {
730 let local_data = "cn-name-a";
731 let external_data = "it-false,cn-name-b,pu-810fe89d-da47-410b-8979-9154a40f8183";
732 let raw = format!("foo:1|c|c:{local_data}|e:{external_data}|card:high");
733
734 let config = DogStatsDCodecConfiguration::default().with_client_origin_detection(false);
735 let (_, packet) = parse_dogstatsd_metric(raw.as_bytes(), &config).expect("should not fail to parse");
736
737 assert_eq!(packet.local_data, None);
738 assert_eq!(packet.external_data, None);
739 assert_eq!(packet.cardinality, None);
740 }
741
742 #[test]
743 fn non_finite_metric_values_are_dropped() {
744 let config = DogStatsDCodecConfiguration::default();
749 let cases = ["my.gauge:NaN|g", "my.gauge:inf|g", "my.gauge:-inf|g"];
750
751 for input in &cases {
752 let (_, packet) = parse_dogstatsd_metric(input.as_bytes(), &config)
753 .unwrap_or_else(|_| panic!("should parse without error: {input}"));
754 assert_eq!(
755 packet.num_points, 0,
756 "non-finite value should be dropped, leaving 0 valid points: {input}"
757 );
758 }
759 }
760
761 proptest! {
762 #![proptest_config(ProptestConfig::with_cases(1000))]
763 #[test]
764 fn property_test_malicious_input_non_exhaustive(input in arb_vec(0..255u8, 0..1000)) {
765 let _ = parse_dsd_metric(&input);
772 }
773 }
774}