1use nom::{
2 branch::alt,
3 bytes::complete::{tag, take_while1},
4 combinator::{all_consuming, map, map_res},
5 error::{Error, ErrorKind},
6 number::complete::double,
7 sequence::{preceded, separated_pair, terminated},
8 IResult, Parser as _,
9};
10use saluki_context::{origin::OriginTagCardinality, tags::RawTags};
11use saluki_core::data_model::event::metric::*;
12use tracing::warn;
13
14use super::{helpers::*, DogstatsdCodecConfiguration, NomParserError};
15
16enum MetricType {
17 Count,
18 Gauge,
19 Set,
20 Timer,
21 Histogram,
22 Distribution,
23}
24
25pub struct MetricPacket<'a> {
27 pub metric_name: &'a str,
28 pub tags: RawTags<'a>,
29 pub values: MetricValues,
30 pub num_points: u64,
31 pub timestamp: Option<u64>,
32 pub local_data: Option<&'a str>,
33 pub external_data: Option<&'a str>,
34 pub cardinality: Option<OriginTagCardinality>,
35}
36
37#[inline]
38pub fn parse_dogstatsd_metric<'a>(
39 input: &'a [u8], config: &DogstatsdCodecConfiguration,
40) -> IResult<&'a [u8], MetricPacket<'a>> {
41 let metric_name_parser = if config.permissive {
44 permissive_metric_name
45 } else {
46 ascii_alphanum_and_seps
47 };
48 let (remaining, (metric_name, (metric_type, raw_metric_values))) =
49 separated_pair(metric_name_parser, tag(":"), raw_metric_values).parse(input)?;
50
51 let mut maybe_sample_rate = None;
57 let mut maybe_tags = None;
58 let mut maybe_local_data = None;
59 let mut maybe_timestamp = None;
60 let mut maybe_external_data = None;
61 let mut maybe_cardinality = None;
62
63 let remaining = if !remaining.is_empty() {
64 let (mut remaining, _) = tag("|")(remaining)?;
65
66 while let Some((chunk, tail)) = split_at_delimiter(remaining, b'|') {
67 if chunk.is_empty() {
68 break;
69 }
70
71 match chunk[0] {
72 b'@' => {
75 let (_, sample_rate) =
76 all_consuming(preceded(tag("@"), map_res(double, sample_rate(metric_name, config))))
77 .parse(chunk)?;
78
79 maybe_sample_rate = Some(sample_rate);
80 }
81 b'#' => {
83 let (_, tags) = all_consuming(preceded(tag("#"), tags(config))).parse(chunk)?;
84 maybe_tags = Some(tags);
85 }
86 b'c' if chunk.len() > 1 && chunk[1] == b':' => {
88 let (_, local_data) = all_consuming(preceded(tag("c:"), local_data)).parse(chunk)?;
89 maybe_local_data = Some(local_data);
90 }
91 b'T' => {
93 if config.timestamps {
94 let (_, timestamp) = all_consuming(preceded(tag("T"), unix_timestamp)).parse(chunk)?;
95 maybe_timestamp = Some(timestamp);
96 }
97 }
98 b'e' if chunk.len() > 1 && chunk[1] == b':' => {
100 let (_, external_data) = all_consuming(preceded(tag("e:"), external_data)).parse(chunk)?;
101 maybe_external_data = Some(external_data);
102 }
103 b'c' if chunk.starts_with(CARDINALITY_PREFIX) => {
105 let (_, cardinality) = cardinality(chunk)?;
106 maybe_cardinality = cardinality;
107 }
108 _ => {
109 }
113 }
114
115 remaining = tail;
116 }
117
118 remaining
122 } else {
123 remaining
124 };
125
126 let (num_points, mut metric_values) = metric_values_from_raw(raw_metric_values, metric_type, maybe_sample_rate)?;
127
128 if let Some(timestamp) = maybe_timestamp {
130 metric_values.set_timestamp(timestamp);
131 }
132
133 let tags = maybe_tags.unwrap_or_else(RawTags::empty);
134
135 Ok((
136 remaining,
137 MetricPacket {
138 metric_name,
139 tags,
140 values: metric_values,
141 num_points,
142 timestamp: maybe_timestamp,
143 local_data: maybe_local_data,
144 external_data: maybe_external_data,
145 cardinality: maybe_cardinality,
146 },
147 ))
148}
149
150#[inline]
151fn permissive_metric_name(input: &[u8]) -> IResult<&[u8], &str> {
152 let valid_char = |c: u8| c > 31 && c < 128 && c != b':';
154 map(take_while1(valid_char), |b| {
155 unsafe { std::str::from_utf8_unchecked(b) }
158 })
159 .parse(input)
160}
161
162#[inline]
163fn sample_rate<'a>(
164 metric_name: &'a str, config: &'a DogstatsdCodecConfiguration,
165) -> impl Fn(f64) -> Result<SampleRate, &'static str> + 'a {
166 let minimum_sample_rate = config.minimum_sample_rate;
167
168 move |mut raw_sample_rate| {
169 if raw_sample_rate < minimum_sample_rate {
170 raw_sample_rate = minimum_sample_rate;
171 warn!(
172 "Sample rate for metric '{}' is below minimum of {}. Clamping to minimum.",
173 metric_name, minimum_sample_rate
174 );
175 }
176 SampleRate::try_from(raw_sample_rate)
177 }
178}
179
180#[inline]
181fn raw_metric_values(input: &[u8]) -> IResult<&[u8], (MetricType, &[u8])> {
182 let (remaining, raw_values) = terminated(take_while1(|b| b != b'|'), tag("|")).parse(input)?;
183 let (remaining, raw_kind) = alt((tag("g"), tag("c"), tag("ms"), tag("h"), tag("s"), tag("d"))).parse(remaining)?;
184
185 if raw_values.is_empty() || simdutf8::basic::from_utf8(raw_values).is_err() {
187 return Err(nom::Err::Error(Error::new(raw_values, ErrorKind::Verify)));
188 }
189
190 let metric_type = match raw_kind {
191 b"c" => MetricType::Count,
192 b"g" => MetricType::Gauge,
193 b"s" => MetricType::Set,
194 b"ms" => MetricType::Timer,
195 b"h" => MetricType::Histogram,
196 b"d" => MetricType::Distribution,
197 _ => unreachable!("should be constrained by alt parser"),
198 };
199
200 Ok((remaining, (metric_type, raw_values)))
201}
202
203#[inline]
204fn metric_values_from_raw(
205 input: &[u8], metric_type: MetricType, sample_rate: Option<SampleRate>,
206) -> Result<(u64, MetricValues), NomParserError<'_>> {
207 let mut num_points = 0;
208 let floats = FloatIter::new(input).inspect(|_| num_points += 1);
209
210 let values = match metric_type {
211 MetricType::Count => MetricValues::counter_sampled_fallible(floats, sample_rate)?,
212 MetricType::Gauge => MetricValues::gauge_fallible(floats)?,
213 MetricType::Set => {
214 num_points = 1;
215
216 let value = unsafe { std::str::from_utf8_unchecked(input) };
218 MetricValues::set(value.to_string())
219 }
220 MetricType::Timer | MetricType::Histogram => MetricValues::histogram_sampled_fallible(floats, sample_rate)?,
221 MetricType::Distribution => MetricValues::distribution_sampled_fallible(floats, sample_rate)?,
222 };
223
224 Ok((num_points, values))
225}
226
227struct FloatIter<'a> {
228 raw_values: &'a [u8],
229}
230
231impl<'a> FloatIter<'a> {
232 fn new(raw_values: &'a [u8]) -> Self {
233 Self { raw_values }
234 }
235}
236
237impl<'a> Iterator for FloatIter<'a> {
238 type Item = Result<f64, NomParserError<'a>>;
239
240 fn next(&mut self) -> Option<Self::Item> {
241 if self.raw_values.is_empty() {
242 return None;
243 }
244
245 let (raw_value, tail) = split_at_delimiter(self.raw_values, b':')?;
246 self.raw_values = tail;
247
248 let value_s = unsafe { std::str::from_utf8_unchecked(raw_value) };
251 match value_s.parse::<f64>() {
252 Ok(value) => Some(Ok(value)),
253 Err(_) => Some(Err(nom::Err::Error(Error::new(raw_value, ErrorKind::Float)))),
254 }
255 }
256}
257
258#[cfg(test)]
259mod tests {
260 use proptest::{collection::vec as arb_vec, prelude::*};
261 use saluki_context::{
262 origin::OriginTagCardinality,
263 tags::{SharedTagSet, Tag},
264 Context,
265 };
266 use saluki_core::data_model::event::metric::*;
267
268 use super::{parse_dogstatsd_metric, DogstatsdCodecConfiguration};
269
270 type OptionalNomResult<'input, T> = Result<Option<T>, nom::Err<nom::error::Error<&'input [u8]>>>;
271
272 fn parse_dsd_metric(input: &[u8]) -> OptionalNomResult<'_, Metric> {
273 let default_config = DogstatsdCodecConfiguration::default();
274 parse_dsd_metric_with_conf(input, &default_config)
275 }
276
277 fn parse_dsd_metric_with_conf<'input>(
278 input: &'input [u8], config: &DogstatsdCodecConfiguration,
279 ) -> OptionalNomResult<'input, Metric> {
280 let (remaining, packet) = parse_dogstatsd_metric(input, config)?;
281 assert!(remaining.is_empty());
282
283 let tags = packet.tags.into_iter().map(Tag::from).collect::<SharedTagSet>();
284 let context = Context::from_parts(packet.metric_name, tags);
285
286 Ok(Some(Metric::from_parts(
287 context,
288 packet.values,
289 MetricMetadata::default(),
290 )))
291 }
292
293 #[track_caller]
294 fn check_basic_metric_eq(expected: Metric, actual: Option<Metric>) -> Metric {
295 let actual = actual.expect("event should not have been None");
296 assert_eq!(expected.context(), actual.context());
297 assert_eq!(expected.values(), actual.values());
298 assert_eq!(expected.metadata(), actual.metadata());
299 actual
300 }
301
302 #[test]
303 fn basic_metric() {
304 let name = "my.counter";
305 let value = 1.0;
306 let raw = format!("{}:{}|c", name, value);
307 let expected = Metric::counter(name, value);
308 let actual = parse_dsd_metric(raw.as_bytes()).expect("should not fail to parse");
309 check_basic_metric_eq(expected, actual);
310
311 let name = "my.gauge";
312 let value = 2.0;
313 let raw = format!("{}:{}|g", name, value);
314 let expected = Metric::gauge(name, value);
315 let actual = parse_dsd_metric(raw.as_bytes()).expect("should not fail to parse");
316 check_basic_metric_eq(expected, actual);
317
318 let name = "my.timer_or_histogram";
321 let value = 3.0;
322 for kind in &["ms", "h"] {
323 let raw = format!("{}:{}|{}", name, value, kind);
324 let expected = Metric::histogram(name, value);
325 let actual = parse_dsd_metric(raw.as_bytes()).expect("should not fail to parse");
326 check_basic_metric_eq(expected, actual);
327 }
328
329 let distribution_name = "my.distribution";
330 let distribution_value = 3.0;
331 let distribution_raw = format!("{}:{}|d", distribution_name, distribution_value);
332 let distribution_expected = Metric::distribution(distribution_name, distribution_value);
333 let distribution_actual = parse_dsd_metric(distribution_raw.as_bytes()).expect("should not fail to parse");
334 check_basic_metric_eq(distribution_expected, distribution_actual);
335
336 let set_name = "my.set";
337 let set_value = "value";
338 let set_raw = format!("{}:{}|s", set_name, set_value);
339 let set_expected = Metric::set(set_name, set_value);
340 let set_actual = parse_dsd_metric(set_raw.as_bytes()).expect("should not fail to parse");
341 check_basic_metric_eq(set_expected, set_actual);
342 }
343
344 #[test]
345 fn metric_tags() {
346 let name = "my.counter";
347 let value = 1.0;
348 let tags = ["tag1", "tag2"];
349 let raw = format!("{}:{}|c|#{}", name, value, tags.join(","));
350 let expected = Metric::counter((name, &tags[..]), value);
351
352 let actual = parse_dsd_metric(raw.as_bytes()).expect("should not fail to parse");
353 check_basic_metric_eq(expected, actual);
354 }
355
356 #[test]
357 fn metric_sample_rate() {
358 let name = "my.counter";
359 let value = 1.0;
360 let sample_rate = 0.5;
361 let raw = format!("{}:{}|c|@{}", name, value, sample_rate);
362
363 let value_sample_rate_adjusted = value * (1.0 / sample_rate);
364 let expected = Metric::counter(name, value_sample_rate_adjusted);
365
366 let actual = parse_dsd_metric(raw.as_bytes()).expect("should not fail to parse");
367 let actual = check_basic_metric_eq(expected, actual);
368 let values = match actual.values() {
369 MetricValues::Counter(values) => values
370 .into_iter()
371 .map(|(ts, v)| (ts.map(|v| v.get()).unwrap_or(0), v))
372 .collect::<Vec<_>>(),
373 _ => panic!("expected counter values"),
374 };
375
376 assert_eq!(values.len(), 1);
377 assert_eq!(values[0], (0, value_sample_rate_adjusted));
378 }
379
380 #[test]
381 fn metric_local_data() {
382 let name = "my.counter";
383 let value = 1.0;
384 let local_data = "abcdef123456";
385 let raw = format!("{}:{}|c|c:{}", name, value, local_data);
386 let expected = Metric::counter(name, value);
387
388 let actual = parse_dsd_metric(raw.as_bytes()).expect("should not fail to parse");
389 check_basic_metric_eq(expected, actual);
390
391 let config = DogstatsdCodecConfiguration::default();
392 let (_, packet) = parse_dogstatsd_metric(raw.as_bytes(), &config).expect("should not fail to parse");
393 assert_eq!(packet.local_data, Some(local_data));
394 }
395
396 #[test]
397 fn metric_unix_timestamp() {
398 let name = "my.counter";
399 let value = 1.0;
400 let timestamp = 1234567890;
401 let raw = format!("{}:{}|c|T{}", name, value, timestamp);
402 let mut expected = Metric::counter(name, value);
403 expected.values_mut().set_timestamp(timestamp);
404
405 let actual = parse_dsd_metric(raw.as_bytes()).expect("should not fail to parse");
406 check_basic_metric_eq(expected, actual);
407 }
408
409 #[test]
410 fn metric_external_data() {
411 let name = "my.counter";
412 let value = 1.0;
413 let external_data = "it-false,cn-redis,pu-810fe89d-da47-410b-8979-9154a40f8183";
414 let raw = format!("{}:{}|c|e:{}", name, value, external_data);
415 let expected = Metric::counter(name, value);
416
417 let actual = parse_dsd_metric(raw.as_bytes()).expect("should not fail to parse");
418 check_basic_metric_eq(expected, actual);
419
420 let config = DogstatsdCodecConfiguration::default();
421 let (_, packet) = parse_dogstatsd_metric(raw.as_bytes(), &config).expect("should not fail to parse");
422 assert_eq!(packet.external_data, Some(external_data));
423 }
424
425 #[test]
426 fn metric_cardinality() {
427 let name = "my.counter";
428 let value = 1.0;
429 let cardinality = "high";
430 let raw = format!("{}:{}|c|card:{}", name, value, cardinality);
431 let expected = Metric::counter(name, value);
432
433 let actual = parse_dsd_metric(raw.as_bytes()).expect("should not fail to parse");
434 check_basic_metric_eq(expected, actual);
435
436 let config = DogstatsdCodecConfiguration::default();
437 let (_, packet) = parse_dogstatsd_metric(raw.as_bytes(), &config).expect("should not fail to parse");
438 assert_eq!(packet.cardinality, Some(OriginTagCardinality::High));
439 }
440
441 #[test]
442 fn metric_multiple_extensions() {
443 let name = "my.counter";
444 let value = 1.0;
445 let sample_rate = 0.5;
446 let tags = ["tag1", "tag2"];
447 let local_data = "abcdef123456";
448 let external_data = "it-false,cn-redis,pu-810fe89d-da47-410b-8979-9154a40f8183";
449 let cardinality = "orchestrator";
450 let timestamp = 1234567890;
451 let raw = format!(
452 "{}:{}|c|#{}|@{}|c:{}|e:{}|card:{}|T{}",
453 name,
454 value,
455 tags.join(","),
456 sample_rate,
457 local_data,
458 external_data,
459 cardinality,
460 timestamp
461 );
462
463 let value_sample_rate_adjusted = value * (1.0 / sample_rate);
464 let mut expected = Metric::counter((name, &tags[..]), value_sample_rate_adjusted);
465 expected.values_mut().set_timestamp(timestamp);
466
467 let actual = parse_dsd_metric(raw.as_bytes()).expect("should not fail to parse");
468 let actual = check_basic_metric_eq(expected, actual);
469 let values = match actual.values() {
470 MetricValues::Counter(values) => values
471 .into_iter()
472 .map(|(ts, v)| (ts.map(|v| v.get()).unwrap_or(0), v))
473 .collect::<Vec<_>>(),
474 _ => panic!("expected counter values"),
475 };
476
477 assert_eq!(values.len(), 1);
478 assert_eq!(values[0], (timestamp, value_sample_rate_adjusted));
479
480 let config = DogstatsdCodecConfiguration::default();
481 let (_, packet) = parse_dogstatsd_metric(raw.as_bytes(), &config).expect("should not fail to parse");
482 assert_eq!(packet.local_data, Some(local_data));
483 assert_eq!(packet.external_data, Some(external_data));
484 assert_eq!(packet.cardinality, Some(OriginTagCardinality::Orchestrator));
485 }
486
487 #[test]
488 fn multivalue_metrics() {
489 let name = "my.counter";
490 let values = [1.0, 2.0, 3.0];
491 let values_stringified = values.iter().map(|v| v.to_string()).collect::<Vec<_>>();
492 let raw = format!("{}:{}|c", name, values_stringified.join(":"));
493 let expected = Metric::counter(name, values);
494 let actual = parse_dsd_metric(raw.as_bytes()).expect("should not fail to parse");
495 check_basic_metric_eq(expected, actual);
496
497 let name = "my.gauge";
498 let values = [42.0, 5.0, -18.0];
499 let values_stringified = values.iter().map(|v| v.to_string()).collect::<Vec<_>>();
500 let raw = format!("{}:{}|g", name, values_stringified.join(":"));
501 let expected = Metric::gauge(name, values);
502 let actual = parse_dsd_metric(raw.as_bytes()).expect("should not fail to parse");
503 check_basic_metric_eq(expected, actual);
504
505 let name = "my.timer_or_histogram";
511 let values = [27.5, 4.20, 80.085];
512 let values_stringified = values.iter().map(|v| v.to_string()).collect::<Vec<_>>();
513 for kind in &["ms", "h"] {
514 let raw = format!("{}:{}|{}", name, values_stringified.join(":"), kind);
515 let expected = Metric::histogram(name, values);
516 let actual = parse_dsd_metric(raw.as_bytes()).expect("should not fail to parse");
517 check_basic_metric_eq(expected, actual);
518 }
519
520 let name = "my.distribution";
521 let raw = format!("{}:{}|d", name, values_stringified.join(":"));
522 let expected = Metric::distribution(name, values);
523 let actual = parse_dsd_metric(raw.as_bytes()).expect("should not fail to parse");
524 check_basic_metric_eq(expected, actual);
525 }
526
527 #[test]
528 fn respects_maximum_tag_count() {
529 let input = b"foo:1|c|#tag1:value1,tag2:value2,tag3:value3";
530
531 let cases = [3, 2, 1];
532 for max_tag_count in cases {
533 let config = DogstatsdCodecConfiguration::default().with_maximum_tag_count(max_tag_count);
534
535 let metric = parse_dsd_metric_with_conf(input, &config)
536 .expect("should not fail to parse")
537 .expect("should not fail to intern");
538 assert_eq!(metric.context().tags().len(), max_tag_count);
539 }
540 }
541
542 #[test]
543 fn respects_maximum_tag_length() {
544 let input = b"foo:1|c|#tag1:short,tag2:medium,tag3:longlong";
545
546 let cases = [6, 5, 4];
547 for max_tag_length in cases {
548 let config = DogstatsdCodecConfiguration::default().with_maximum_tag_length(max_tag_length);
549
550 let metric = parse_dsd_metric_with_conf(input, &config)
551 .expect("should not fail to parse")
552 .expect("should not fail to intern");
553 for tag in metric.context().tags().into_iter() {
554 assert!(tag.len() <= max_tag_length);
555 }
556 }
557 }
558
559 #[test]
560 fn respects_read_timestamps() {
561 let input = b"foo:1|c|T1234567890";
562
563 let config = DogstatsdCodecConfiguration::default().with_timestamps(false);
564
565 let metric = parse_dsd_metric_with_conf(input, &config)
566 .expect("should not fail to parse")
567 .expect("should not fail to intern");
568
569 let value_timestamps = match metric.values() {
570 MetricValues::Counter(values) => values
571 .into_iter()
572 .map(|(ts, _)| ts.map(|v| v.get()).unwrap_or(0))
573 .collect::<Vec<_>>(),
574 _ => panic!("expected counter values"),
575 };
576
577 assert_eq!(value_timestamps.len(), 1);
578 assert_eq!(value_timestamps[0], 0);
579 }
580
581 #[test]
582 fn permissive_mode() {
583 let payload = b"codeheap 'non-nmethods'.usage:0.3054|g|#env:dev,service:foobar,datacenter:localhost.dev";
584
585 let config = DogstatsdCodecConfiguration::default().with_permissive_mode(true);
586 match parse_dsd_metric_with_conf(payload, &config) {
587 Ok(result) => assert!(result.is_some(), "should not fail to materialize metric after decoding"),
588 Err(e) => panic!("should not have errored: {:?}", e),
589 }
590 }
591
592 #[test]
593 fn minimum_sample_rate() {
594 let minimum_sample_rate = SampleRate::try_from(0.01).unwrap();
596 let config = DogstatsdCodecConfiguration::default().with_minimum_sample_rate(minimum_sample_rate.rate());
597
598 let cases = [
599 "test:1|d|@0".to_string(),
601 "test:1|d|@0.001".to_string(),
603 "test:1|d|@0.0005".to_string(),
604 "test:1|d|@0.00001".to_string(),
605 format!("test:1|d|@{}", minimum_sample_rate.rate()),
607 "test:1|d|@0.1".to_string(),
609 "test:1|d|@0.5".to_string(),
610 "test:1|d".to_string(),
611 ];
612
613 for input in cases {
614 let metric = parse_dsd_metric_with_conf(input.as_bytes(), &config)
615 .expect("Should not fail to parse metric.")
616 .expect("Metric should be present.");
617
618 let sketch = match metric.values() {
619 MetricValues::Distribution(points) => {
620 points
621 .into_iter()
622 .next()
623 .expect("Should have at least one sketch point.")
624 .1
625 }
626 _ => panic!("Unexpected metric type."),
627 };
628
629 assert!(sketch.count() as u64 <= minimum_sample_rate.weight());
630 }
631 }
632
633 proptest! {
634 #![proptest_config(ProptestConfig::with_cases(1000))]
635 #[test]
636 fn property_test_malicious_input_non_exhaustive(input in arb_vec(0..255u8, 0..1000)) {
637 let _ = parse_dsd_metric(&input);
644 }
645 }
646}