saluki_io/deser/codec/dogstatsd/helpers.rs
1use nom::{
2 branch::alt,
3 bytes::complete::{tag, take_while1},
4 character::complete::u64 as parse_u64,
5 combinator::{all_consuming, map},
6 error::{Error, ErrorKind},
7 sequence::preceded,
8 IResult, Parser as _,
9};
10use saluki_context::{origin::OriginTagCardinality, tags::RawTags};
11
12use super::DogstatsdCodecConfiguration;
13
14/// DogStatsD message type.
15#[derive(Eq, PartialEq)]
16pub enum MessageType {
17 MetricSample,
18 Event,
19 ServiceCheck,
20}
21
22pub const EVENT_PREFIX: &[u8] = b"_e{";
23pub const SERVICE_CHECK_PREFIX: &[u8] = b"_sc|";
24
25pub const TIMESTAMP_PREFIX: &[u8] = b"d:";
26pub const HOSTNAME_PREFIX: &[u8] = b"h:";
27pub const AGGREGATION_KEY_PREFIX: &[u8] = b"k:";
28pub const PRIORITY_PREFIX: &[u8] = b"p:";
29pub const SOURCE_TYPE_PREFIX: &[u8] = b"s:";
30pub const ALERT_TYPE_PREFIX: &[u8] = b"t:";
31pub const TAGS_PREFIX: &[u8] = b"#";
32pub const SERVICE_CHECK_MESSAGE_PREFIX: &[u8] = b"m:";
33pub const CONTAINER_ID_PREFIX: &[u8] = b"c:";
34pub const EXTERNAL_DATA_PREFIX: &[u8] = b"e:";
35pub const CARDINALITY_PREFIX: &[u8] = b"card:";
36
37/// Parses the given raw payload and returns the DogStatsD message type.
38///
39/// If the payload is not an event or service check, it is assumed to be a metric.
40#[inline]
41pub fn parse_message_type(data: &[u8]) -> MessageType {
42 if data.starts_with(EVENT_PREFIX) {
43 return MessageType::Event;
44 } else if data.starts_with(SERVICE_CHECK_PREFIX) {
45 return MessageType::ServiceCheck;
46 }
47 MessageType::MetricSample
48}
49
50/// Splits the input buffer at the given delimiter.
51///
52/// If the delimiter is not found, or the input buffer is empty, `None` is returned. Otherwise, the buffer is
53/// split into two parts at the delimiter, and the delimiter is _not_ included.
54#[inline]
55pub fn split_at_delimiter(input: &[u8], delimiter: u8) -> Option<(&[u8], &[u8])> {
56 match memchr::memchr(delimiter, input) {
57 Some(index) => Some((&input[0..index], &input[index + 1..input.len()])),
58 None => {
59 if input.is_empty() {
60 None
61 } else {
62 Some((input, &[]))
63 }
64 }
65 }
66}
67
68/// Maps the input slice as a UTF-8 string.
69///
70/// # Errors
71///
72/// If the input slice is not valid UTF-8, an error is returned.
73#[inline]
74pub fn utf8(input: &[u8]) -> IResult<&[u8], &str> {
75 match simdutf8::basic::from_utf8(input) {
76 Ok(s) => Ok((&[], s)),
77 Err(_) => Err(nom::Err::Error(Error::new(input, ErrorKind::Verify))),
78 }
79}
80
81/// Returns the longest input slice that contains only ASCII alphanumeric characters and "separators" as a UTF-8 string.
82///
83/// Separators are defined as spaces, underscores, hyphens, and periods.
84///
85/// # Errors
86///
87/// If the input slice does not at least one byte of valid characters, an error is returned.
88#[inline]
89pub fn ascii_alphanum_and_seps(input: &[u8]) -> IResult<&[u8], &str> {
90 let valid_char = |c: u8| c.is_ascii_alphanumeric() || c == b' ' || c == b'_' || c == b'-' || c == b'.';
91 map(take_while1(valid_char), |b| {
92 // SAFETY: We know the bytes in `b` can only be comprised of ASCII characters, which ensures that it's valid to
93 // interpret the bytes directly as UTF-8.
94 unsafe { std::str::from_utf8_unchecked(b) }
95 })
96 .parse(input)
97}
98
99/// Extracts as many raw tags from the input slice as possible, up to the configured limit.
100///
101/// Tags can be limited by length as well as count. If any tags exceed the maximum length, they are dropped. If the number
102/// of tags exceeds the maximum count, the excess tags are dropped. The remaining slice does not contain any dropped tags.
103///
104/// # Errors
105///
106/// If the input slice is not at least one byte long, or if it is not valid UTF-8, an error is returned.
107#[inline]
108pub fn tags(config: &DogstatsdCodecConfiguration) -> impl Fn(&[u8]) -> IResult<&[u8], RawTags<'_>> {
109 let max_tag_count = config.maximum_tag_count;
110 let max_tag_len = config.maximum_tag_length;
111
112 move |input| match simdutf8::basic::from_utf8(input) {
113 Ok(tags) => Ok((&[], RawTags::new(tags, max_tag_count, max_tag_len))),
114 Err(_) => Err(nom::Err::Error(Error::new(input, ErrorKind::Verify))),
115 }
116}
117
118/// Parses a Unix timestamp from the input slice.
119///
120/// # Errors
121///
122/// If the input slice is not a valid unsigned 64-bit integer, an error is returned.
123#[inline]
124pub fn unix_timestamp(input: &[u8]) -> IResult<&[u8], u64> {
125 parse_u64(input)
126}
127
128/// Parses a container ID from the input slice.
129///
130/// # Errors
131///
132/// If the input slice does not contain at least one byte of valid characters, an error is returned.
133#[inline]
134pub fn container_id(input: &[u8]) -> IResult<&[u8], &str> {
135 // We generally only expect container IDs to be either long hexadecimal strings (like 64 characters), or in special
136 // cases, the inode number of the cgroup controller that contains the container sending the metrics, where the value
137 // will look like `in-<integer value>`.
138 let valid_char = |c: u8| c.is_ascii_alphanumeric() || c == b'-';
139 map(take_while1(valid_char), |b| {
140 // SAFETY: We know the bytes in `b` can only be comprised of ASCII characters, which ensures that it's valid to
141 // interpret the bytes directly as UTF-8.
142 unsafe { std::str::from_utf8_unchecked(b) }
143 })
144 .parse(input)
145}
146
147/// Parses External Data from the input slice.
148///
149/// # Errors
150///
151/// If the input slice does not contain at least one byte of valid characters, an error is returned.
152#[inline]
153pub fn external_data(input: &[u8]) -> IResult<&[u8], &str> {
154 // External Data is only meant to be able to represent origin information, which includes container names, pod UIDs,
155 // and the like... which are constrained by the RFC 1123 definition of a DNS label: lowercase ASCII letters,
156 // numbers, and hyphens.
157 //
158 // We don't go the full nine yards with enforcing the "starts with a letter and number" bit.. but we _do_ allow
159 // commas since individual items in the External Data string are comma-separated.
160 let valid_char = |c: u8| c.is_ascii_lowercase() || c.is_ascii_digit() || c == b'-' || c == b',';
161 map(take_while1(valid_char), |b| {
162 // SAFETY: We know the bytes in `b` can only be comprised of ASCII characters, which ensures that it's valid to
163 // interpret the bytes directly as UTF-8.
164 unsafe { std::str::from_utf8_unchecked(b) }
165 })
166 .parse(input)
167}
168
169/// Parses `OriginTagCardinality` from the input slice.
170///
171/// # Errors
172///
173///
174#[inline]
175pub fn cardinality(input: &[u8]) -> IResult<&[u8], Option<OriginTagCardinality>> {
176 // Cardinality is a string that can be one of the following values:
177 // - "none"
178 // - "low"
179 // - "orchestrator"
180 // - "high"
181 let (remaining, raw_cardinality) = map(
182 all_consuming(preceded(
183 tag(CARDINALITY_PREFIX),
184 alt((tag("none"), tag("low"), tag("orchestrator"), tag("high"))),
185 )),
186 |b| {
187 // SAFETY: We know the bytes in `b` can only be comprised of UTF-8 characters, because our tags are all based on valid
188 // UTF-8 strings, which ensures that it's valid to interpret the bytes directly as UTF-8.
189 unsafe { std::str::from_utf8_unchecked(b) }
190 },
191 )
192 .parse(input)?;
193
194 OriginTagCardinality::try_from(raw_cardinality)
195 .map(|cardinality| (remaining, Some(cardinality)))
196 .map_err(|_| nom::Err::Error(Error::new(input, ErrorKind::Verify)))
197}