saluki_io/deser/codec/dogstatsd/
helpers.rs

1use nom::{
2    branch::alt,
3    bytes::complete::{tag, take_while1},
4    character::complete::u64 as parse_u64,
5    combinator::{all_consuming, map},
6    error::{Error, ErrorKind},
7    sequence::preceded,
8    IResult, Parser as _,
9};
10use saluki_context::{origin::OriginTagCardinality, tags::RawTags};
11
12use super::DogstatsdCodecConfiguration;
13
14/// DogStatsD message type.
15#[derive(Eq, PartialEq)]
16pub enum MessageType {
17    MetricSample,
18    Event,
19    ServiceCheck,
20}
21
22pub const EVENT_PREFIX: &[u8] = b"_e{";
23pub const SERVICE_CHECK_PREFIX: &[u8] = b"_sc|";
24
25pub const TIMESTAMP_PREFIX: &[u8] = b"d:";
26pub const HOSTNAME_PREFIX: &[u8] = b"h:";
27pub const AGGREGATION_KEY_PREFIX: &[u8] = b"k:";
28pub const PRIORITY_PREFIX: &[u8] = b"p:";
29pub const SOURCE_TYPE_PREFIX: &[u8] = b"s:";
30pub const ALERT_TYPE_PREFIX: &[u8] = b"t:";
31pub const TAGS_PREFIX: &[u8] = b"#";
32pub const SERVICE_CHECK_MESSAGE_PREFIX: &[u8] = b"m:";
33pub const LOCAL_DATA_PREFIX: &[u8] = b"c:";
34pub const EXTERNAL_DATA_PREFIX: &[u8] = b"e:";
35pub const CARDINALITY_PREFIX: &[u8] = b"card:";
36
37/// Parses the given raw payload and returns the DogStatsD message type.
38///
39/// If the payload is not an event or service check, it is assumed to be a metric.
40#[inline]
41pub fn parse_message_type(data: &[u8]) -> MessageType {
42    if data.starts_with(EVENT_PREFIX) {
43        return MessageType::Event;
44    } else if data.starts_with(SERVICE_CHECK_PREFIX) {
45        return MessageType::ServiceCheck;
46    }
47    MessageType::MetricSample
48}
49
50/// Splits the input buffer at the given delimiter.
51///
52/// If the delimiter is not found, or the input buffer is empty, `None` is returned. Otherwise, the buffer is
53/// split into two parts at the delimiter, and the delimiter is _not_ included.
54#[inline]
55pub fn split_at_delimiter(input: &[u8], delimiter: u8) -> Option<(&[u8], &[u8])> {
56    match memchr::memchr(delimiter, input) {
57        Some(index) => Some((&input[0..index], &input[index + 1..input.len()])),
58        None => {
59            if input.is_empty() {
60                None
61            } else {
62                Some((input, &[]))
63            }
64        }
65    }
66}
67
68/// Maps the input slice as a UTF-8 string.
69///
70/// # Errors
71///
72/// If the input slice is not valid UTF-8, an error is returned.
73#[inline]
74pub fn utf8(input: &[u8]) -> IResult<&[u8], &str> {
75    match simdutf8::basic::from_utf8(input) {
76        Ok(s) => Ok((&[], s)),
77        Err(_) => Err(nom::Err::Error(Error::new(input, ErrorKind::Verify))),
78    }
79}
80
81/// Returns the longest input slice that contains only ASCII alphanumeric characters and "separators" as a UTF-8 string.
82///
83/// Separators are defined as spaces, underscores, hyphens, and periods.
84///
85/// # Errors
86///
87/// If the input slice does not at least one byte of valid characters, an error is returned.
88#[inline]
89pub fn ascii_alphanum_and_seps(input: &[u8]) -> IResult<&[u8], &str> {
90    let valid_char = |c: u8| c.is_ascii_alphanumeric() || c == b' ' || c == b'_' || c == b'-' || c == b'.';
91    map(take_while1(valid_char), |b| {
92        // SAFETY: We know the bytes in `b` can only be comprised of ASCII characters, which ensures that it's valid to
93        // interpret the bytes directly as UTF-8.
94        unsafe { std::str::from_utf8_unchecked(b) }
95    })
96    .parse(input)
97}
98
99/// Extracts as many raw tags from the input slice as possible, up to the configured limit.
100///
101/// Tags can be limited by length as well as count. If any tags exceed the maximum length, they are dropped. If the number
102/// of tags exceeds the maximum count, the excess tags are dropped. The remaining slice does not contain any dropped tags.
103///
104/// # Errors
105///
106/// If the input slice is not at least one byte long, or if it is not valid UTF-8, an error is returned.
107#[inline]
108pub fn tags(config: &DogstatsdCodecConfiguration) -> impl Fn(&[u8]) -> IResult<&[u8], RawTags<'_>> {
109    let max_tag_count = config.maximum_tag_count;
110    let max_tag_len = config.maximum_tag_length;
111
112    move |input| match simdutf8::basic::from_utf8(input) {
113        Ok(tags) => Ok((&[], RawTags::new(tags, max_tag_count, max_tag_len))),
114        Err(_) => Err(nom::Err::Error(Error::new(input, ErrorKind::Verify))),
115    }
116}
117
118/// Parses a Unix timestamp from the input slice.
119///
120/// # Errors
121///
122/// If the input slice is not a valid unsigned 64-bit integer, an error is returned.
123#[inline]
124pub fn unix_timestamp(input: &[u8]) -> IResult<&[u8], u64> {
125    parse_u64(input)
126}
127
128/// Parses Local Data from the input slice.
129///
130/// # Errors
131///
132/// If the input slice does not contain at least one byte of valid characters, an error is returned.
133#[inline]
134pub fn local_data(input: &[u8]) -> IResult<&[u8], &str> {
135    // Local Data is only meant to be able to represent container IDs (which arelong hexadecimal strings), or in special
136    // cases, the inode number of the cgroup controller that contains the container sending the metrics, where the value
137    // will look like `in-<integer value>`.
138    //
139    // In some cases, it might contain _multiple_ of these values, separated by a comma.
140    let valid_char = |c: u8| c.is_ascii_alphanumeric() || c == b'-' || c == b',';
141    map(take_while1(valid_char), |b| {
142        // SAFETY: We know the bytes in `b` can only be comprised of ASCII characters, which ensures that it's valid to
143        // interpret the bytes directly as UTF-8.
144        unsafe { std::str::from_utf8_unchecked(b) }
145    })
146    .parse(input)
147}
148
149/// Parses External Data from the input slice.
150///
151/// # Errors
152///
153/// If the input slice does not contain at least one byte of valid characters, an error is returned.
154#[inline]
155pub fn external_data(input: &[u8]) -> IResult<&[u8], &str> {
156    // External Data is only meant to be able to represent origin information, which includes container names, pod UIDs,
157    // and the like... which are constrained by the RFC 1123 definition of a DNS label: lowercase ASCII letters,
158    // numbers, and hyphens.
159    //
160    // We don't go the full nine yards with enforcing the "starts with a letter and number" bit.. but we _do_ allow
161    // commas since individual items in the External Data string are comma-separated.
162    let valid_char = |c: u8| c.is_ascii_lowercase() || c.is_ascii_digit() || c == b'-' || c == b',';
163    map(take_while1(valid_char), |b| {
164        // SAFETY: We know the bytes in `b` can only be comprised of ASCII characters, which ensures that it's valid to
165        // interpret the bytes directly as UTF-8.
166        unsafe { std::str::from_utf8_unchecked(b) }
167    })
168    .parse(input)
169}
170
171/// Parses `OriginTagCardinality` from the input slice.
172///
173/// # Errors
174///
175///
176#[inline]
177pub fn cardinality(input: &[u8]) -> IResult<&[u8], Option<OriginTagCardinality>> {
178    // Cardinality is a string that can be one of the following values:
179    // - "none"
180    // - "low"
181    // - "orchestrator"
182    // - "high"
183    let (remaining, raw_cardinality) = map(
184        all_consuming(preceded(
185            tag(CARDINALITY_PREFIX),
186            alt((tag("none"), tag("low"), tag("orchestrator"), tag("high"))),
187        )),
188        |b| {
189            // SAFETY: We know the bytes in `b` can only be comprised of UTF-8 characters, because our tags are all based on valid
190            // UTF-8 strings, which ensures that it's valid to interpret the bytes directly as UTF-8.
191            unsafe { std::str::from_utf8_unchecked(b) }
192        },
193    )
194    .parse(input)?;
195
196    OriginTagCardinality::try_from(raw_cardinality)
197        .map(|cardinality| (remaining, Some(cardinality)))
198        .map_err(|_| nom::Err::Error(Error::new(input, ErrorKind::Verify)))
199}