saluki_io/deser/codec/dogstatsd/
helpers.rs

1use nom::{
2    branch::alt,
3    bytes::complete::{tag, take_while1},
4    character::complete::u64 as parse_u64,
5    combinator::{all_consuming, map},
6    error::{Error, ErrorKind},
7    sequence::preceded,
8    IResult, Parser as _,
9};
10use saluki_context::{origin::OriginTagCardinality, tags::RawTags};
11
12use super::DogstatsdCodecConfiguration;
13
14/// DogStatsD message type.
15#[derive(Eq, PartialEq)]
16pub enum MessageType {
17    MetricSample,
18    Event,
19    ServiceCheck,
20}
21
22pub const EVENT_PREFIX: &[u8] = b"_e{";
23pub const SERVICE_CHECK_PREFIX: &[u8] = b"_sc|";
24
25pub const TIMESTAMP_PREFIX: &[u8] = b"d:";
26pub const HOSTNAME_PREFIX: &[u8] = b"h:";
27pub const AGGREGATION_KEY_PREFIX: &[u8] = b"k:";
28pub const PRIORITY_PREFIX: &[u8] = b"p:";
29pub const SOURCE_TYPE_PREFIX: &[u8] = b"s:";
30pub const ALERT_TYPE_PREFIX: &[u8] = b"t:";
31pub const TAGS_PREFIX: &[u8] = b"#";
32pub const SERVICE_CHECK_MESSAGE_PREFIX: &[u8] = b"m:";
33pub const CONTAINER_ID_PREFIX: &[u8] = b"c:";
34pub const EXTERNAL_DATA_PREFIX: &[u8] = b"e:";
35pub const CARDINALITY_PREFIX: &[u8] = b"card:";
36
37/// Parses the given raw payload and returns the DogStatsD message type.
38///
39/// If the payload is not an event or service check, it is assumed to be a metric.
40#[inline]
41pub fn parse_message_type(data: &[u8]) -> MessageType {
42    if data.starts_with(EVENT_PREFIX) {
43        return MessageType::Event;
44    } else if data.starts_with(SERVICE_CHECK_PREFIX) {
45        return MessageType::ServiceCheck;
46    }
47    MessageType::MetricSample
48}
49
50/// Splits the input buffer at the given delimiter.
51///
52/// If the delimiter is not found, or the input buffer is empty, `None` is returned. Otherwise, the buffer is
53/// split into two parts at the delimiter, and the delimiter is _not_ included.
54#[inline]
55pub fn split_at_delimiter(input: &[u8], delimiter: u8) -> Option<(&[u8], &[u8])> {
56    match memchr::memchr(delimiter, input) {
57        Some(index) => Some((&input[0..index], &input[index + 1..input.len()])),
58        None => {
59            if input.is_empty() {
60                None
61            } else {
62                Some((input, &[]))
63            }
64        }
65    }
66}
67
68/// Maps the input slice as a UTF-8 string.
69///
70/// # Errors
71///
72/// If the input slice is not valid UTF-8, an error is returned.
73#[inline]
74pub fn utf8(input: &[u8]) -> IResult<&[u8], &str> {
75    match simdutf8::basic::from_utf8(input) {
76        Ok(s) => Ok((&[], s)),
77        Err(_) => Err(nom::Err::Error(Error::new(input, ErrorKind::Verify))),
78    }
79}
80
81/// Returns the longest input slice that contains only ASCII alphanumeric characters and "separators" as a UTF-8 string.
82///
83/// Separators are defined as spaces, underscores, hyphens, and periods.
84///
85/// # Errors
86///
87/// If the input slice does not at least one byte of valid characters, an error is returned.
88#[inline]
89pub fn ascii_alphanum_and_seps(input: &[u8]) -> IResult<&[u8], &str> {
90    let valid_char = |c: u8| c.is_ascii_alphanumeric() || c == b' ' || c == b'_' || c == b'-' || c == b'.';
91    map(take_while1(valid_char), |b| {
92        // SAFETY: We know the bytes in `b` can only be comprised of ASCII characters, which ensures that it's valid to
93        // interpret the bytes directly as UTF-8.
94        unsafe { std::str::from_utf8_unchecked(b) }
95    })
96    .parse(input)
97}
98
99/// Extracts as many raw tags from the input slice as possible, up to the configured limit.
100///
101/// Tags can be limited by length as well as count. If any tags exceed the maximum length, they are dropped. If the number
102/// of tags exceeds the maximum count, the excess tags are dropped. The remaining slice does not contain any dropped tags.
103///
104/// # Errors
105///
106/// If the input slice is not at least one byte long, or if it is not valid UTF-8, an error is returned.
107#[inline]
108pub fn tags(config: &DogstatsdCodecConfiguration) -> impl Fn(&[u8]) -> IResult<&[u8], RawTags<'_>> {
109    let max_tag_count = config.maximum_tag_count;
110    let max_tag_len = config.maximum_tag_length;
111
112    move |input| match simdutf8::basic::from_utf8(input) {
113        Ok(tags) => Ok((&[], RawTags::new(tags, max_tag_count, max_tag_len))),
114        Err(_) => Err(nom::Err::Error(Error::new(input, ErrorKind::Verify))),
115    }
116}
117
118/// Parses a Unix timestamp from the input slice.
119///
120/// # Errors
121///
122/// If the input slice is not a valid unsigned 64-bit integer, an error is returned.
123#[inline]
124pub fn unix_timestamp(input: &[u8]) -> IResult<&[u8], u64> {
125    parse_u64(input)
126}
127
128/// Parses a container ID from the input slice.
129///
130/// # Errors
131///
132/// If the input slice does not contain at least one byte of valid characters, an error is returned.
133#[inline]
134pub fn container_id(input: &[u8]) -> IResult<&[u8], &str> {
135    // We generally only expect container IDs to be either long hexadecimal strings (like 64 characters), or in special
136    // cases, the inode number of the cgroup controller that contains the container sending the metrics, where the value
137    // will look like `in-<integer value>`.
138    let valid_char = |c: u8| c.is_ascii_alphanumeric() || c == b'-';
139    map(take_while1(valid_char), |b| {
140        // SAFETY: We know the bytes in `b` can only be comprised of ASCII characters, which ensures that it's valid to
141        // interpret the bytes directly as UTF-8.
142        unsafe { std::str::from_utf8_unchecked(b) }
143    })
144    .parse(input)
145}
146
147/// Parses External Data from the input slice.
148///
149/// # Errors
150///
151/// If the input slice does not contain at least one byte of valid characters, an error is returned.
152#[inline]
153pub fn external_data(input: &[u8]) -> IResult<&[u8], &str> {
154    // External Data is only meant to be able to represent origin information, which includes container names, pod UIDs,
155    // and the like... which are constrained by the RFC 1123 definition of a DNS label: lowercase ASCII letters,
156    // numbers, and hyphens.
157    //
158    // We don't go the full nine yards with enforcing the "starts with a letter and number" bit.. but we _do_ allow
159    // commas since individual items in the External Data string are comma-separated.
160    let valid_char = |c: u8| c.is_ascii_lowercase() || c.is_ascii_digit() || c == b'-' || c == b',';
161    map(take_while1(valid_char), |b| {
162        // SAFETY: We know the bytes in `b` can only be comprised of ASCII characters, which ensures that it's valid to
163        // interpret the bytes directly as UTF-8.
164        unsafe { std::str::from_utf8_unchecked(b) }
165    })
166    .parse(input)
167}
168
169/// Parses `OriginTagCardinality` from the input slice.
170///
171/// # Errors
172///
173///
174#[inline]
175pub fn cardinality(input: &[u8]) -> IResult<&[u8], Option<OriginTagCardinality>> {
176    // Cardinality is a string that can be one of the following values:
177    // - "none"
178    // - "low"
179    // - "orchestrator"
180    // - "high"
181    let (remaining, raw_cardinality) = map(
182        all_consuming(preceded(
183            tag(CARDINALITY_PREFIX),
184            alt((tag("none"), tag("low"), tag("orchestrator"), tag("high"))),
185        )),
186        |b| {
187            // SAFETY: We know the bytes in `b` can only be comprised of UTF-8 characters, because our tags are all based on valid
188            // UTF-8 strings, which ensures that it's valid to interpret the bytes directly as UTF-8.
189            unsafe { std::str::from_utf8_unchecked(b) }
190        },
191    )
192    .parse(input)?;
193
194    OriginTagCardinality::try_from(raw_cardinality)
195        .map(|cardinality| (remaining, Some(cardinality)))
196        .map_err(|_| nom::Err::Error(Error::new(input, ErrorKind::Verify)))
197}