saluki_io/deser/codec/dogstatsd/
helpers.rs1use nom::{
2 bytes::complete::{tag, take_while1},
3 character::complete::u64 as parse_u64,
4 combinator::{all_consuming, map, rest},
5 error::{Error, ErrorKind},
6 sequence::preceded,
7 IResult, Parser as _,
8};
9use saluki_context::{origin::OriginTagCardinality, tags::RawTags};
10
11use super::DogStatsDCodecConfiguration;
12
13#[derive(Eq, PartialEq)]
15pub enum MessageType {
16 MetricSample,
17 Event,
18 ServiceCheck,
19}
20
21pub const EVENT_PREFIX: &[u8] = b"_e{";
22pub const SERVICE_CHECK_PREFIX: &[u8] = b"_sc|";
23
24pub const TIMESTAMP_PREFIX: &[u8] = b"d:";
25pub const HOSTNAME_PREFIX: &[u8] = b"h:";
26pub const AGGREGATION_KEY_PREFIX: &[u8] = b"k:";
27pub const PRIORITY_PREFIX: &[u8] = b"p:";
28pub const SOURCE_TYPE_PREFIX: &[u8] = b"s:";
29pub const ALERT_TYPE_PREFIX: &[u8] = b"t:";
30pub const TAGS_PREFIX: &[u8] = b"#";
31pub const SERVICE_CHECK_MESSAGE_PREFIX: &[u8] = b"m:";
32pub const LOCAL_DATA_PREFIX: &[u8] = b"c:";
33pub const EXTERNAL_DATA_PREFIX: &[u8] = b"e:";
34pub const CARDINALITY_PREFIX: &[u8] = b"card:";
35
36#[inline]
40pub fn parse_message_type(data: &[u8]) -> MessageType {
41 if data.starts_with(EVENT_PREFIX) {
42 return MessageType::Event;
43 } else if data.starts_with(SERVICE_CHECK_PREFIX) {
44 return MessageType::ServiceCheck;
45 }
46 MessageType::MetricSample
47}
48
49#[inline]
54pub fn split_at_delimiter(input: &[u8], delimiter: u8) -> Option<(&[u8], &[u8])> {
55 match memchr::memchr(delimiter, input) {
56 Some(index) => Some((&input[0..index], &input[index + 1..input.len()])),
57 None => {
58 if input.is_empty() {
59 None
60 } else {
61 Some((input, &[]))
62 }
63 }
64 }
65}
66
67#[inline]
73pub fn utf8(input: &[u8]) -> IResult<&[u8], &str> {
74 match simdutf8::basic::from_utf8(input) {
75 Ok(s) => Ok((&[], s)),
76 Err(_) => Err(nom::Err::Error(Error::new(input, ErrorKind::Verify))),
77 }
78}
79
80#[inline]
88pub fn ascii_alphanum_and_seps(input: &[u8]) -> IResult<&[u8], &str> {
89 let valid_char = |c: u8| c.is_ascii_alphanumeric() || c == b' ' || c == b'_' || c == b'-' || c == b'.';
90 map(take_while1(valid_char), |b| {
91 unsafe { std::str::from_utf8_unchecked(b) }
94 })
95 .parse(input)
96}
97
98#[inline]
107pub fn tags(config: &DogStatsDCodecConfiguration) -> impl Fn(&[u8]) -> IResult<&[u8], RawTags<'_>> {
108 let max_tag_count = config.maximum_tag_count;
109 let max_tag_len = config.maximum_tag_length;
110
111 move |input| match simdutf8::basic::from_utf8(input) {
112 Ok(tags) => Ok((&[], RawTags::new(tags, max_tag_count, max_tag_len))),
113 Err(_) => Err(nom::Err::Error(Error::new(input, ErrorKind::Verify))),
114 }
115}
116
117#[inline]
123pub fn unix_timestamp(input: &[u8]) -> IResult<&[u8], u64> {
124 parse_u64(input)
125}
126
127#[inline]
133pub fn local_data(input: &[u8]) -> IResult<&[u8], &str> {
134 let valid_char = |c: u8| c.is_ascii_alphanumeric() || c == b'-' || c == b',';
140 map(take_while1(valid_char), |b| {
141 unsafe { std::str::from_utf8_unchecked(b) }
144 })
145 .parse(input)
146}
147
148#[inline]
154pub fn external_data(input: &[u8]) -> IResult<&[u8], &str> {
155 let valid_char = |c: u8| c.is_ascii_lowercase() || c.is_ascii_digit() || c == b'-' || c == b',';
162 map(take_while1(valid_char), |b| {
163 unsafe { std::str::from_utf8_unchecked(b) }
166 })
167 .parse(input)
168}
169
170#[inline]
175pub fn cardinality(input: &[u8]) -> IResult<&[u8], Option<OriginTagCardinality>> {
176 let (remaining, raw_bytes) = all_consuming(preceded(tag(CARDINALITY_PREFIX), rest)).parse(input)?;
177
178 let cardinality = simdutf8::basic::from_utf8(raw_bytes)
182 .ok()
183 .and_then(|s| OriginTagCardinality::try_from(s).ok());
184
185 Ok((remaining, cardinality))
186}
187
188#[cfg(test)]
189mod tests {
190 use saluki_context::origin::OriginTagCardinality;
191
192 use super::{cardinality, CARDINALITY_PREFIX};
193
194 fn card(s: &str) -> Vec<u8> {
195 format!("{}{}", simdutf8::basic::from_utf8(CARDINALITY_PREFIX).unwrap(), s).into_bytes()
196 }
197
198 #[test]
199 fn cardinality_known_values() {
200 let cases = [
201 ("none", Some(OriginTagCardinality::None)),
202 ("low", Some(OriginTagCardinality::Low)),
203 ("orchestrator", Some(OriginTagCardinality::Orchestrator)),
204 ("high", Some(OriginTagCardinality::High)),
205 ];
206 for (value, expected) in cases {
207 let (_, result) = cardinality(&card(value)).expect("parse should succeed");
208 assert_eq!(result, expected, "failed for '{}'", value);
209 }
210 }
211
212 #[test]
213 fn cardinality_unknown_value_returns_none() {
214 let (_, result) = cardinality(&card("not-a-valid-cardinality")).expect("parse should succeed");
217 assert_eq!(result, None);
218 }
219
220 #[test]
221 fn cardinality_case_insensitive() {
222 let cases = [
225 ("LOW", Some(OriginTagCardinality::Low)),
226 ("HIGH", Some(OriginTagCardinality::High)),
227 ("Orchestrator", Some(OriginTagCardinality::Orchestrator)),
228 ("NONE", Some(OriginTagCardinality::None)),
229 ];
230 for (value, expected) in cases {
231 let (_, result) = cardinality(&card(value)).expect("parse should succeed");
232 assert_eq!(result, expected, "failed for '{}'", value);
233 }
234 }
235
236 #[test]
237 fn cardinality_non_utf8_bytes_returns_none() {
238 let mut input = CARDINALITY_PREFIX.to_vec();
242 input.extend_from_slice(&[0xff, 0xfe]);
243 let (_, result) = cardinality(&input).expect("parse should succeed");
244 assert_eq!(result, None);
245 }
246}