1use nom::{
2 branch::alt,
3 bytes::complete::{tag, take_while1},
4 combinator::{all_consuming, map, map_res},
5 error::{Error, ErrorKind},
6 number::complete::double,
7 sequence::{preceded, separated_pair, terminated},
8 IResult, Parser as _,
9};
10use saluki_context::{origin::OriginTagCardinality, tags::RawTags};
11use saluki_core::data_model::event::metric::*;
12
13use super::{helpers::*, DogstatsdCodecConfiguration, NomParserError};
14
15enum MetricType {
16 Count,
17 Gauge,
18 Set,
19 Timer,
20 Histogram,
21 Distribution,
22}
23
24pub struct MetricPacket<'a> {
26 pub metric_name: &'a str,
27 pub tags: RawTags<'a>,
28 pub values: MetricValues,
29 pub num_points: u64,
30 pub timestamp: Option<u64>,
31 pub container_id: Option<&'a str>,
32 pub external_data: Option<&'a str>,
33 pub cardinality: Option<OriginTagCardinality>,
34}
35
36#[inline]
37pub fn parse_dogstatsd_metric<'a>(
38 input: &'a [u8], config: &DogstatsdCodecConfiguration,
39) -> IResult<&'a [u8], MetricPacket<'a>> {
40 let metric_name_parser = if config.permissive {
43 permissive_metric_name
44 } else {
45 ascii_alphanum_and_seps
46 };
47 let (remaining, (metric_name, (metric_type, raw_metric_values))) =
48 separated_pair(metric_name_parser, tag(":"), raw_metric_values).parse(input)?;
49
50 let mut maybe_sample_rate = None;
56 let mut maybe_tags = None;
57 let mut maybe_container_id = None;
58 let mut maybe_timestamp = None;
59 let mut maybe_external_data = None;
60 let mut maybe_cardinality = None;
61
62 let remaining = if !remaining.is_empty() {
63 let (mut remaining, _) = tag("|")(remaining)?;
64
65 while let Some((chunk, tail)) = split_at_delimiter(remaining, b'|') {
66 if chunk.is_empty() {
67 break;
68 }
69
70 match chunk[0] {
71 b'@' => {
74 let (_, sample_rate) =
75 all_consuming(preceded(tag("@"), map_res(double, SampleRate::try_from))).parse(chunk)?;
76 maybe_sample_rate = Some(sample_rate);
77 }
78 b'#' => {
80 let (_, tags) = all_consuming(preceded(tag("#"), tags(config))).parse(chunk)?;
81 maybe_tags = Some(tags);
82 }
83 b'c' if chunk.len() > 1 && chunk[1] == b':' => {
85 let (_, container_id) = all_consuming(preceded(tag("c:"), container_id)).parse(chunk)?;
86 maybe_container_id = Some(container_id);
87 }
88 b'T' => {
90 if config.timestamps {
91 let (_, timestamp) = all_consuming(preceded(tag("T"), unix_timestamp)).parse(chunk)?;
92 maybe_timestamp = Some(timestamp);
93 }
94 }
95 b'e' if chunk.len() > 1 && chunk[1] == b':' => {
97 let (_, external_data) = all_consuming(preceded(tag("e:"), external_data)).parse(chunk)?;
98 maybe_external_data = Some(external_data);
99 }
100 b'c' if chunk.starts_with(CARDINALITY_PREFIX) => {
102 let (_, cardinality) = cardinality(chunk)?;
103 maybe_cardinality = cardinality;
104 }
105 _ => {
106 }
110 }
111
112 remaining = tail;
113 }
114
115 remaining
119 } else {
120 remaining
121 };
122
123 let (num_points, mut metric_values) = metric_values_from_raw(raw_metric_values, metric_type, maybe_sample_rate)?;
124
125 if let Some(timestamp) = maybe_timestamp {
127 metric_values.set_timestamp(timestamp);
128 }
129
130 let tags = maybe_tags.unwrap_or_else(RawTags::empty);
131
132 Ok((
133 remaining,
134 MetricPacket {
135 metric_name,
136 tags,
137 values: metric_values,
138 num_points,
139 timestamp: maybe_timestamp,
140 container_id: maybe_container_id,
141 external_data: maybe_external_data,
142 cardinality: maybe_cardinality,
143 },
144 ))
145}
146
147#[inline]
148fn permissive_metric_name(input: &[u8]) -> IResult<&[u8], &str> {
149 let valid_char = |c: u8| c > 31 && c < 128 && c != b':';
151 map(take_while1(valid_char), |b| {
152 unsafe { std::str::from_utf8_unchecked(b) }
155 })
156 .parse(input)
157}
158
159#[inline]
160fn raw_metric_values(input: &[u8]) -> IResult<&[u8], (MetricType, &[u8])> {
161 let (remaining, raw_values) = terminated(take_while1(|b| b != b'|'), tag("|")).parse(input)?;
162 let (remaining, raw_kind) = alt((tag("g"), tag("c"), tag("ms"), tag("h"), tag("s"), tag("d"))).parse(remaining)?;
163
164 if raw_values.is_empty() || simdutf8::basic::from_utf8(raw_values).is_err() {
166 return Err(nom::Err::Error(Error::new(raw_values, ErrorKind::Verify)));
167 }
168
169 let metric_type = match raw_kind {
170 b"c" => MetricType::Count,
171 b"g" => MetricType::Gauge,
172 b"s" => MetricType::Set,
173 b"ms" => MetricType::Timer,
174 b"h" => MetricType::Histogram,
175 b"d" => MetricType::Distribution,
176 _ => unreachable!("should be constrained by alt parser"),
177 };
178
179 Ok((remaining, (metric_type, raw_values)))
180}
181
182#[inline]
183fn metric_values_from_raw(
184 input: &[u8], metric_type: MetricType, sample_rate: Option<SampleRate>,
185) -> Result<(u64, MetricValues), NomParserError<'_>> {
186 let mut num_points = 0;
187 let floats = FloatIter::new(input).inspect(|_| num_points += 1);
188
189 let values = match metric_type {
190 MetricType::Count => MetricValues::counter_sampled_fallible(floats, sample_rate)?,
191 MetricType::Gauge => MetricValues::gauge_fallible(floats)?,
192 MetricType::Set => {
193 num_points = 1;
194
195 let value = unsafe { std::str::from_utf8_unchecked(input) };
197 MetricValues::set(value.to_string())
198 }
199 MetricType::Timer | MetricType::Histogram => MetricValues::histogram_sampled_fallible(floats, sample_rate)?,
200 MetricType::Distribution => MetricValues::distribution_sampled_fallible(floats, sample_rate)?,
201 };
202
203 Ok((num_points, values))
204}
205
206struct FloatIter<'a> {
207 raw_values: &'a [u8],
208}
209
210impl<'a> FloatIter<'a> {
211 fn new(raw_values: &'a [u8]) -> Self {
212 Self { raw_values }
213 }
214}
215
216impl<'a> Iterator for FloatIter<'a> {
217 type Item = Result<f64, NomParserError<'a>>;
218
219 fn next(&mut self) -> Option<Self::Item> {
220 if self.raw_values.is_empty() {
221 return None;
222 }
223
224 let (raw_value, tail) = split_at_delimiter(self.raw_values, b':')?;
225 self.raw_values = tail;
226
227 let value_s = unsafe { std::str::from_utf8_unchecked(raw_value) };
230 match value_s.parse::<f64>() {
231 Ok(value) => Some(Ok(value)),
232 Err(_) => Some(Err(nom::Err::Error(Error::new(raw_value, ErrorKind::Float)))),
233 }
234 }
235}
236
237#[cfg(test)]
238mod tests {
239 use proptest::{collection::vec as arb_vec, prelude::*};
240 use saluki_context::{
241 origin::OriginTagCardinality,
242 tags::{SharedTagSet, Tag},
243 Context,
244 };
245 use saluki_core::data_model::event::metric::*;
246
247 use super::{parse_dogstatsd_metric, DogstatsdCodecConfiguration};
248
249 type OptionalNomResult<'input, T> = Result<Option<T>, nom::Err<nom::error::Error<&'input [u8]>>>;
250
251 fn parse_dsd_metric(input: &[u8]) -> OptionalNomResult<'_, Metric> {
252 let default_config = DogstatsdCodecConfiguration::default();
253 parse_dsd_metric_with_conf(input, &default_config)
254 }
255
256 fn parse_dsd_metric_with_conf<'input>(
257 input: &'input [u8], config: &DogstatsdCodecConfiguration,
258 ) -> OptionalNomResult<'input, Metric> {
259 let (remaining, packet) = parse_dogstatsd_metric(input, config)?;
260 assert!(remaining.is_empty());
261
262 let tags = packet.tags.into_iter().map(Tag::from).collect::<SharedTagSet>();
263 let context = Context::from_parts(packet.metric_name, tags);
264
265 Ok(Some(Metric::from_parts(
266 context,
267 packet.values,
268 MetricMetadata::default(),
269 )))
270 }
271
272 #[track_caller]
273 fn check_basic_metric_eq(expected: Metric, actual: Option<Metric>) -> Metric {
274 let actual = actual.expect("event should not have been None");
275 assert_eq!(expected.context(), actual.context());
276 assert_eq!(expected.values(), actual.values());
277 assert_eq!(expected.metadata(), actual.metadata());
278 actual
279 }
280
281 #[test]
282 fn basic_metric() {
283 let name = "my.counter";
284 let value = 1.0;
285 let raw = format!("{}:{}|c", name, value);
286 let expected = Metric::counter(name, value);
287 let actual = parse_dsd_metric(raw.as_bytes()).expect("should not fail to parse");
288 check_basic_metric_eq(expected, actual);
289
290 let name = "my.gauge";
291 let value = 2.0;
292 let raw = format!("{}:{}|g", name, value);
293 let expected = Metric::gauge(name, value);
294 let actual = parse_dsd_metric(raw.as_bytes()).expect("should not fail to parse");
295 check_basic_metric_eq(expected, actual);
296
297 let name = "my.timer_or_histogram";
300 let value = 3.0;
301 for kind in &["ms", "h"] {
302 let raw = format!("{}:{}|{}", name, value, kind);
303 let expected = Metric::histogram(name, value);
304 let actual = parse_dsd_metric(raw.as_bytes()).expect("should not fail to parse");
305 check_basic_metric_eq(expected, actual);
306 }
307
308 let distribution_name = "my.distribution";
309 let distribution_value = 3.0;
310 let distribution_raw = format!("{}:{}|d", distribution_name, distribution_value);
311 let distribution_expected = Metric::distribution(distribution_name, distribution_value);
312 let distribution_actual = parse_dsd_metric(distribution_raw.as_bytes()).expect("should not fail to parse");
313 check_basic_metric_eq(distribution_expected, distribution_actual);
314
315 let set_name = "my.set";
316 let set_value = "value";
317 let set_raw = format!("{}:{}|s", set_name, set_value);
318 let set_expected = Metric::set(set_name, set_value);
319 let set_actual = parse_dsd_metric(set_raw.as_bytes()).expect("should not fail to parse");
320 check_basic_metric_eq(set_expected, set_actual);
321 }
322
323 #[test]
324 fn metric_tags() {
325 let name = "my.counter";
326 let value = 1.0;
327 let tags = ["tag1", "tag2"];
328 let raw = format!("{}:{}|c|#{}", name, value, tags.join(","));
329 let expected = Metric::counter((name, &tags[..]), value);
330
331 let actual = parse_dsd_metric(raw.as_bytes()).expect("should not fail to parse");
332 check_basic_metric_eq(expected, actual);
333 }
334
335 #[test]
336 fn metric_sample_rate() {
337 let name = "my.counter";
338 let value = 1.0;
339 let sample_rate = 0.5;
340 let raw = format!("{}:{}|c|@{}", name, value, sample_rate);
341
342 let value_sample_rate_adjusted = value * (1.0 / sample_rate);
343 let expected = Metric::counter(name, value_sample_rate_adjusted);
344
345 let actual = parse_dsd_metric(raw.as_bytes()).expect("should not fail to parse");
346 let actual = check_basic_metric_eq(expected, actual);
347 let values = match actual.values() {
348 MetricValues::Counter(values) => values
349 .into_iter()
350 .map(|(ts, v)| (ts.map(|v| v.get()).unwrap_or(0), v))
351 .collect::<Vec<_>>(),
352 _ => panic!("expected counter values"),
353 };
354
355 assert_eq!(values.len(), 1);
356 assert_eq!(values[0], (0, value_sample_rate_adjusted));
357 }
358
359 #[test]
360 fn metric_container_id() {
361 let name = "my.counter";
362 let value = 1.0;
363 let container_id = "abcdef123456";
364 let raw = format!("{}:{}|c|c:{}", name, value, container_id);
365 let expected = Metric::counter(name, value);
366
367 let actual = parse_dsd_metric(raw.as_bytes()).expect("should not fail to parse");
368 check_basic_metric_eq(expected, actual);
369
370 let config = DogstatsdCodecConfiguration::default();
371 let (_, packet) = parse_dogstatsd_metric(raw.as_bytes(), &config).expect("should not fail to parse");
372 assert_eq!(packet.container_id, Some(container_id));
373 }
374
375 #[test]
376 fn metric_unix_timestamp() {
377 let name = "my.counter";
378 let value = 1.0;
379 let timestamp = 1234567890;
380 let raw = format!("{}:{}|c|T{}", name, value, timestamp);
381 let mut expected = Metric::counter(name, value);
382 expected.values_mut().set_timestamp(timestamp);
383
384 let actual = parse_dsd_metric(raw.as_bytes()).expect("should not fail to parse");
385 check_basic_metric_eq(expected, actual);
386 }
387
388 #[test]
389 fn metric_external_data() {
390 let name = "my.counter";
391 let value = 1.0;
392 let external_data = "it-false,cn-redis,pu-810fe89d-da47-410b-8979-9154a40f8183";
393 let raw = format!("{}:{}|c|e:{}", name, value, external_data);
394 let expected = Metric::counter(name, value);
395
396 let actual = parse_dsd_metric(raw.as_bytes()).expect("should not fail to parse");
397 check_basic_metric_eq(expected, actual);
398
399 let config = DogstatsdCodecConfiguration::default();
400 let (_, packet) = parse_dogstatsd_metric(raw.as_bytes(), &config).expect("should not fail to parse");
401 assert_eq!(packet.external_data, Some(external_data));
402 }
403
404 #[test]
405 fn metric_cardinality() {
406 let name = "my.counter";
407 let value = 1.0;
408 let cardinality = "high";
409 let raw = format!("{}:{}|c|card:{}", name, value, cardinality);
410 let expected = Metric::counter(name, value);
411
412 let actual = parse_dsd_metric(raw.as_bytes()).expect("should not fail to parse");
413 check_basic_metric_eq(expected, actual);
414
415 let config = DogstatsdCodecConfiguration::default();
416 let (_, packet) = parse_dogstatsd_metric(raw.as_bytes(), &config).expect("should not fail to parse");
417 assert_eq!(packet.cardinality, Some(OriginTagCardinality::High));
418 }
419
420 #[test]
421 fn metric_multiple_extensions() {
422 let name = "my.counter";
423 let value = 1.0;
424 let sample_rate = 0.5;
425 let tags = ["tag1", "tag2"];
426 let container_id = "abcdef123456";
427 let external_data = "it-false,cn-redis,pu-810fe89d-da47-410b-8979-9154a40f8183";
428 let cardinality = "orchestrator";
429 let timestamp = 1234567890;
430 let raw = format!(
431 "{}:{}|c|#{}|@{}|c:{}|e:{}|card:{}|T{}",
432 name,
433 value,
434 tags.join(","),
435 sample_rate,
436 container_id,
437 external_data,
438 cardinality,
439 timestamp
440 );
441
442 let value_sample_rate_adjusted = value * (1.0 / sample_rate);
443 let mut expected = Metric::counter((name, &tags[..]), value_sample_rate_adjusted);
444 expected.values_mut().set_timestamp(timestamp);
445
446 let actual = parse_dsd_metric(raw.as_bytes()).expect("should not fail to parse");
447 let actual = check_basic_metric_eq(expected, actual);
448 let values = match actual.values() {
449 MetricValues::Counter(values) => values
450 .into_iter()
451 .map(|(ts, v)| (ts.map(|v| v.get()).unwrap_or(0), v))
452 .collect::<Vec<_>>(),
453 _ => panic!("expected counter values"),
454 };
455
456 assert_eq!(values.len(), 1);
457 assert_eq!(values[0], (timestamp, value_sample_rate_adjusted));
458
459 let config = DogstatsdCodecConfiguration::default();
460 let (_, packet) = parse_dogstatsd_metric(raw.as_bytes(), &config).expect("should not fail to parse");
461 assert_eq!(packet.container_id, Some(container_id));
462 assert_eq!(packet.external_data, Some(external_data));
463 assert_eq!(packet.cardinality, Some(OriginTagCardinality::Orchestrator));
464 }
465
466 #[test]
467 fn multivalue_metrics() {
468 let name = "my.counter";
469 let values = [1.0, 2.0, 3.0];
470 let values_stringified = values.iter().map(|v| v.to_string()).collect::<Vec<_>>();
471 let raw = format!("{}:{}|c", name, values_stringified.join(":"));
472 let expected = Metric::counter(name, values);
473 let actual = parse_dsd_metric(raw.as_bytes()).expect("should not fail to parse");
474 check_basic_metric_eq(expected, actual);
475
476 let name = "my.gauge";
477 let values = [42.0, 5.0, -18.0];
478 let values_stringified = values.iter().map(|v| v.to_string()).collect::<Vec<_>>();
479 let raw = format!("{}:{}|g", name, values_stringified.join(":"));
480 let expected = Metric::gauge(name, values);
481 let actual = parse_dsd_metric(raw.as_bytes()).expect("should not fail to parse");
482 check_basic_metric_eq(expected, actual);
483
484 let name = "my.timer_or_histogram";
490 let values = [27.5, 4.20, 80.085];
491 let values_stringified = values.iter().map(|v| v.to_string()).collect::<Vec<_>>();
492 for kind in &["ms", "h"] {
493 let raw = format!("{}:{}|{}", name, values_stringified.join(":"), kind);
494 let expected = Metric::histogram(name, values);
495 let actual = parse_dsd_metric(raw.as_bytes()).expect("should not fail to parse");
496 check_basic_metric_eq(expected, actual);
497 }
498
499 let name = "my.distribution";
500 let raw = format!("{}:{}|d", name, values_stringified.join(":"));
501 let expected = Metric::distribution(name, values);
502 let actual = parse_dsd_metric(raw.as_bytes()).expect("should not fail to parse");
503 check_basic_metric_eq(expected, actual);
504 }
505
506 #[test]
507 fn respects_maximum_tag_count() {
508 let input = b"foo:1|c|#tag1:value1,tag2:value2,tag3:value3";
509
510 let cases = [3, 2, 1];
511 for max_tag_count in cases {
512 let config = DogstatsdCodecConfiguration::default().with_maximum_tag_count(max_tag_count);
513
514 let metric = parse_dsd_metric_with_conf(input, &config)
515 .expect("should not fail to parse")
516 .expect("should not fail to intern");
517 assert_eq!(metric.context().tags().len(), max_tag_count);
518 }
519 }
520
521 #[test]
522 fn respects_maximum_tag_length() {
523 let input = b"foo:1|c|#tag1:short,tag2:medium,tag3:longlong";
524
525 let cases = [6, 5, 4];
526 for max_tag_length in cases {
527 let config = DogstatsdCodecConfiguration::default().with_maximum_tag_length(max_tag_length);
528
529 let metric = parse_dsd_metric_with_conf(input, &config)
530 .expect("should not fail to parse")
531 .expect("should not fail to intern");
532 for tag in metric.context().tags().into_iter() {
533 assert!(tag.len() <= max_tag_length);
534 }
535 }
536 }
537
538 #[test]
539 fn respects_read_timestamps() {
540 let input = b"foo:1|c|T1234567890";
541
542 let config = DogstatsdCodecConfiguration::default().with_timestamps(false);
543
544 let metric = parse_dsd_metric_with_conf(input, &config)
545 .expect("should not fail to parse")
546 .expect("should not fail to intern");
547
548 let value_timestamps = match metric.values() {
549 MetricValues::Counter(values) => values
550 .into_iter()
551 .map(|(ts, _)| ts.map(|v| v.get()).unwrap_or(0))
552 .collect::<Vec<_>>(),
553 _ => panic!("expected counter values"),
554 };
555
556 assert_eq!(value_timestamps.len(), 1);
557 assert_eq!(value_timestamps[0], 0);
558 }
559
560 #[test]
561 fn permissive_mode() {
562 let payload = b"codeheap 'non-nmethods'.usage:0.3054|g|#env:dev,service:foobar,datacenter:localhost.dev";
563
564 let config = DogstatsdCodecConfiguration::default().with_permissive_mode(true);
565 match parse_dsd_metric_with_conf(payload, &config) {
566 Ok(result) => assert!(result.is_some(), "should not fail to materialize metric after decoding"),
567 Err(e) => panic!("should not have errored: {:?}", e),
568 }
569 }
570
571 proptest! {
572 #![proptest_config(ProptestConfig::with_cases(1000))]
573 #[test]
574 fn property_test_malicious_input_non_exhaustive(input in arb_vec(0..255u8, 0..1000)) {
575 let _ = parse_dsd_metric(&input);
584 }
585 }
586}