Skip to main content

saluki_io/deser/codec/dogstatsd/
helpers.rs

1use nom::{
2    bytes::complete::{tag, take_while1},
3    character::complete::u64 as parse_u64,
4    combinator::{all_consuming, map, rest},
5    error::{Error, ErrorKind},
6    sequence::preceded,
7    IResult, Parser as _,
8};
9use saluki_context::{origin::OriginTagCardinality, tags::RawTags};
10
11use super::DogStatsDCodecConfiguration;
12
13/// DogStatsD message type.
14#[derive(Eq, PartialEq)]
15pub enum MessageType {
16    MetricSample,
17    Event,
18    ServiceCheck,
19}
20
21pub const EVENT_PREFIX: &[u8] = b"_e{";
22pub const SERVICE_CHECK_PREFIX: &[u8] = b"_sc|";
23
24pub const TIMESTAMP_PREFIX: &[u8] = b"d:";
25pub const HOSTNAME_PREFIX: &[u8] = b"h:";
26pub const AGGREGATION_KEY_PREFIX: &[u8] = b"k:";
27pub const PRIORITY_PREFIX: &[u8] = b"p:";
28pub const SOURCE_TYPE_PREFIX: &[u8] = b"s:";
29pub const ALERT_TYPE_PREFIX: &[u8] = b"t:";
30pub const TAGS_PREFIX: &[u8] = b"#";
31pub const SERVICE_CHECK_MESSAGE_PREFIX: &[u8] = b"m:";
32pub const LOCAL_DATA_PREFIX: &[u8] = b"c:";
33pub const EXTERNAL_DATA_PREFIX: &[u8] = b"e:";
34pub const CARDINALITY_PREFIX: &[u8] = b"card:";
35
36/// Parses the given raw payload and returns the DogStatsD message type.
37///
38/// If the payload isn't an event or service check, it's assumed to be a metric.
39#[inline]
40pub fn parse_message_type(data: &[u8]) -> MessageType {
41    if data.starts_with(EVENT_PREFIX) {
42        return MessageType::Event;
43    } else if data.starts_with(SERVICE_CHECK_PREFIX) {
44        return MessageType::ServiceCheck;
45    }
46    MessageType::MetricSample
47}
48
49/// Splits the input buffer at the given delimiter.
50///
51/// If the delimiter isn't found, or the input buffer is empty, `None` is returned. Otherwise, the buffer is
52/// split into two parts at the delimiter, and the delimiter is _not_ included.
53#[inline]
54pub fn split_at_delimiter(input: &[u8], delimiter: u8) -> Option<(&[u8], &[u8])> {
55    match memchr::memchr(delimiter, input) {
56        Some(index) => Some((&input[0..index], &input[index + 1..input.len()])),
57        None => {
58            if input.is_empty() {
59                None
60            } else {
61                Some((input, &[]))
62            }
63        }
64    }
65}
66
67/// Maps the input slice as a UTF-8 string.
68///
69/// # Errors
70///
71/// If the input slice isn't valid UTF-8, an error is returned.
72#[inline]
73pub fn utf8(input: &[u8]) -> IResult<&[u8], &str> {
74    match simdutf8::basic::from_utf8(input) {
75        Ok(s) => Ok((&[], s)),
76        Err(_) => Err(nom::Err::Error(Error::new(input, ErrorKind::Verify))),
77    }
78}
79
80/// Returns the longest input slice that contains only ASCII alphanumeric characters and "separators" as a UTF-8 string.
81///
82/// Separators are defined as spaces, underscores, hyphens, and periods.
83///
84/// # Errors
85///
86/// If the input slice doesn't at least one byte of valid characters, an error is returned.
87#[inline]
88pub fn ascii_alphanum_and_seps(input: &[u8]) -> IResult<&[u8], &str> {
89    let valid_char = |c: u8| c.is_ascii_alphanumeric() || c == b' ' || c == b'_' || c == b'-' || c == b'.';
90    map(take_while1(valid_char), |b| {
91        // SAFETY: We know the bytes in `b` can only be comprised of ASCII characters, which ensures that it's valid to
92        // interpret the bytes directly as UTF-8.
93        unsafe { std::str::from_utf8_unchecked(b) }
94    })
95    .parse(input)
96}
97
98/// Extracts as many raw tags from the input slice as possible, up to the configured limit.
99///
100/// Tags can be limited by length as well as count. If any tags exceed the maximum length, they're dropped. If the number
101/// of tags exceeds the maximum count, the excess tags are dropped. The remaining slice doesn't contain any dropped tags.
102///
103/// # Errors
104///
105/// If the input slice isn't at least one byte long, or if it's not valid UTF-8, an error is returned.
106#[inline]
107pub fn tags(config: &DogStatsDCodecConfiguration) -> impl Fn(&[u8]) -> IResult<&[u8], RawTags<'_>> {
108    let max_tag_count = config.maximum_tag_count;
109    let max_tag_len = config.maximum_tag_length;
110
111    move |input| match simdutf8::basic::from_utf8(input) {
112        Ok(tags) => Ok((&[], RawTags::new(tags, max_tag_count, max_tag_len))),
113        Err(_) => Err(nom::Err::Error(Error::new(input, ErrorKind::Verify))),
114    }
115}
116
117/// Parses a Unix timestamp from the input slice.
118///
119/// # Errors
120///
121/// If the input slice isn't a valid unsigned 64-bit integer, an error is returned.
122#[inline]
123pub fn unix_timestamp(input: &[u8]) -> IResult<&[u8], u64> {
124    parse_u64(input)
125}
126
127/// Parses Local Data from the input slice.
128///
129/// # Errors
130///
131/// If the input slice doesn't contain at least one byte of valid characters, an error is returned.
132#[inline]
133pub fn local_data(input: &[u8]) -> IResult<&[u8], &str> {
134    // Local Data is only meant to be able to represent container IDs (which arelong hexadecimal strings), or in special
135    // cases, the inode number of the cgroup controller that contains the container sending the metrics, where the value
136    // will look like `in-<integer value>`.
137    //
138    // In some cases, it might contain _multiple_ of these values, separated by a comma.
139    let valid_char = |c: u8| c.is_ascii_alphanumeric() || c == b'-' || c == b',';
140    map(take_while1(valid_char), |b| {
141        // SAFETY: We know the bytes in `b` can only be comprised of ASCII characters, which ensures that it's valid to
142        // interpret the bytes directly as UTF-8.
143        unsafe { std::str::from_utf8_unchecked(b) }
144    })
145    .parse(input)
146}
147
148/// Parses External Data from the input slice.
149///
150/// # Errors
151///
152/// If the input slice doesn't contain at least one byte of valid characters, an error is returned.
153#[inline]
154pub fn external_data(input: &[u8]) -> IResult<&[u8], &str> {
155    // External Data is only meant to be able to represent origin information, which includes container names, pod UIDs,
156    // and the like... which are constrained by the RFC 1123 definition of a DNS label: lowercase ASCII letters,
157    // numbers, and hyphens.
158    //
159    // We don't go the full nine yards with enforcing the "starts with a letter and number" bit.. but we _do_ allow
160    // commas since individual items in the External Data string are comma-separated.
161    let valid_char = |c: u8| c.is_ascii_lowercase() || c.is_ascii_digit() || c == b'-' || c == b',';
162    map(take_while1(valid_char), |b| {
163        // SAFETY: We know the bytes in `b` can only be comprised of ASCII characters, which ensures that it's valid to
164        // interpret the bytes directly as UTF-8.
165        unsafe { std::str::from_utf8_unchecked(b) }
166    })
167    .parse(input)
168}
169
170/// Parses `OriginTagCardinality` from the input slice.
171///
172/// Unknown cardinality values are accepted and returned as `None` rather than failing the parse.
173/// This matches the behavior of the core Datadog Agent, which silently ignores unrecognized values.
174#[inline]
175pub fn cardinality(input: &[u8]) -> IResult<&[u8], Option<OriginTagCardinality>> {
176    let (remaining, raw_bytes) = all_consuming(preceded(tag(CARDINALITY_PREFIX), rest)).parse(input)?;
177
178    // Use simdutf8 (consistent with other UTF-8 checks in this codec) for checked conversion.
179    // Non-UTF-8 bytes are treated as an unrecognized value — return None so the frame continues
180    // processing rather than hard-failing.
181    let cardinality = simdutf8::basic::from_utf8(raw_bytes)
182        .ok()
183        .and_then(|s| OriginTagCardinality::try_from(s).ok());
184
185    Ok((remaining, cardinality))
186}
187
188#[cfg(test)]
189mod tests {
190    use saluki_context::origin::OriginTagCardinality;
191
192    use super::{cardinality, CARDINALITY_PREFIX};
193
194    fn card(s: &str) -> Vec<u8> {
195        format!("{}{}", simdutf8::basic::from_utf8(CARDINALITY_PREFIX).unwrap(), s).into_bytes()
196    }
197
198    #[test]
199    fn cardinality_known_values() {
200        let cases = [
201            ("none", Some(OriginTagCardinality::None)),
202            ("low", Some(OriginTagCardinality::Low)),
203            ("orchestrator", Some(OriginTagCardinality::Orchestrator)),
204            ("high", Some(OriginTagCardinality::High)),
205        ];
206        for (value, expected) in cases {
207            let (_, result) = cardinality(&card(value)).expect("parse should succeed");
208            assert_eq!(result, expected, "failed for '{}'", value);
209        }
210    }
211
212    #[test]
213    fn cardinality_unknown_value_returns_none() {
214        // An unrecognized value should parse successfully and return None rather than
215        // failing the parse and dropping the whole metric frame.
216        let (_, result) = cardinality(&card("not-a-valid-cardinality")).expect("parse should succeed");
217        assert_eq!(result, None);
218    }
219
220    #[test]
221    fn cardinality_case_insensitive() {
222        // Matching is case-insensitive to align with the core Datadog Agent (StringToTagCardinality
223        // uses strings.ToLower). Wrong-case values should resolve to the correct cardinality.
224        let cases = [
225            ("LOW", Some(OriginTagCardinality::Low)),
226            ("HIGH", Some(OriginTagCardinality::High)),
227            ("Orchestrator", Some(OriginTagCardinality::Orchestrator)),
228            ("NONE", Some(OriginTagCardinality::None)),
229        ];
230        for (value, expected) in cases {
231            let (_, result) = cardinality(&card(value)).expect("parse should succeed");
232            assert_eq!(result, expected, "failed for '{}'", value);
233        }
234    }
235
236    #[test]
237    fn cardinality_non_utf8_bytes_returns_none() {
238        // Non-UTF-8 bytes after the prefix must not invoke undefined behavior; they should
239        // be treated as an unrecognized value and return None. This is the bug that was fixed:
240        // the previous implementation used from_utf8_unchecked which would cause UB here.
241        let mut input = CARDINALITY_PREFIX.to_vec();
242        input.extend_from_slice(&[0xff, 0xfe]);
243        let (_, result) = cardinality(&input).expect("parse should succeed");
244        assert_eq!(result, None);
245    }
246}