Skip to main content

saluki_io/deser/codec/dogstatsd/
event.rs

1use nom::{
2    bytes::complete::{tag, take},
3    character::complete::u32 as parse_u32,
4    combinator::all_consuming,
5    error::{Error, ErrorKind},
6    sequence::{delimited, preceded, separated_pair},
7    IResult, Parser as _,
8};
9use saluki_context::{origin::OriginTagCardinality, tags::RawTags};
10use saluki_core::data_model::event::eventd::*;
11use stringtheory::MetaString;
12
13use super::{helpers::*, DogStatsDCodecConfiguration};
14
15/// A DogStatsD event packet.
16pub struct EventPacket<'a> {
17    pub title: MetaString,
18    pub text: MetaString,
19    pub timestamp: Option<u64>,
20    pub hostname: Option<&'a str>,
21    pub aggregation_key: Option<&'a str>,
22    pub priority: Option<Priority>,
23    pub alert_type: Option<AlertType>,
24    pub source_type_name: Option<&'a str>,
25    pub tags: RawTags<'a>,
26    pub local_data: Option<&'a str>,
27    pub external_data: Option<&'a str>,
28    pub cardinality: Option<OriginTagCardinality>,
29}
30
31#[inline]
32pub fn parse_dogstatsd_event<'a>(
33    input: &'a [u8], config: &DogStatsDCodecConfiguration,
34) -> IResult<&'a [u8], EventPacket<'a>> {
35    // We parse the title length and text length from `_e{<TITLE_UTF8_LENGTH>,<TEXT_UTF8_LENGTH>}:`
36    let (remaining, (title_len, text_len)) = delimited(
37        tag(EVENT_PREFIX),
38        separated_pair(parse_u32, tag(","), parse_u32),
39        tag("}:"),
40    )
41    .parse(input)?;
42
43    // Title and Text are the required fields of an event.
44    if title_len == 0 || text_len == 0 {
45        return Err(nom::Err::Error(Error::new(input, ErrorKind::Verify)));
46    }
47
48    let (remaining, (raw_title, raw_text)) =
49        separated_pair(take(title_len), tag("|"), take(text_len)).parse(remaining)?;
50
51    let title = match simdutf8::basic::from_utf8(raw_title) {
52        Ok(title) => title.replace("\\n", "\n"),
53        Err(_) => return Err(nom::Err::Error(Error::new(raw_title, ErrorKind::Verify))),
54    };
55
56    let text = match simdutf8::basic::from_utf8(raw_text) {
57        Ok(text) => text.replace("\\n", "\n"),
58        Err(_) => return Err(nom::Err::Error(Error::new(raw_text, ErrorKind::Verify))),
59    };
60
61    // At this point, we may have some of this additional data, and if so, we also then would have a pipe separator at
62    // the very front, which we'd want to consume before going further.
63    //
64    // After that, we simply split the remaining bytes by the pipe separator, and then try and parse each chunk to see
65    // if it's any of the protocol extensions we know of.
66    //
67    // Priority and Alert Type have default values
68    let mut maybe_priority = Some(Priority::Normal);
69    let mut maybe_alert_type = Some(AlertType::Info);
70    let mut maybe_timestamp = None;
71    let mut maybe_hostname = None;
72    let mut maybe_aggregation_key = None;
73    let mut maybe_source_type = None;
74    let mut maybe_tags = None;
75    let mut maybe_local_data = None;
76    let mut maybe_external_data = None;
77    let mut maybe_cardinality = None;
78
79    let remaining = if !remaining.is_empty() {
80        let (mut remaining, _) = tag("|")(remaining)?;
81        while let Some((chunk, tail)) = split_at_delimiter(remaining, b'|') {
82            if chunk.len() < 2 {
83                break;
84            }
85            match &chunk[..2] {
86                // Timestamp: client-provided timestamp for the event, relative to the Unix epoch, in seconds.
87                TIMESTAMP_PREFIX => {
88                    let (_, timestamp) = all_consuming(preceded(tag(TIMESTAMP_PREFIX), unix_timestamp)).parse(chunk)?;
89                    maybe_timestamp = Some(timestamp);
90                }
91                // Hostname: client-provided hostname for the host that this event originated from.
92                HOSTNAME_PREFIX if chunk != HOSTNAME_PREFIX => {
93                    let (_, hostname) =
94                        all_consuming(preceded(tag(HOSTNAME_PREFIX), ascii_alphanum_and_seps)).parse(chunk)?;
95                    maybe_hostname = Some(hostname);
96                }
97                // Aggregation key: key to be used to group this event with others that have the same key.
98                AGGREGATION_KEY_PREFIX if chunk != AGGREGATION_KEY_PREFIX => {
99                    let (_, aggregation_key) =
100                        all_consuming(preceded(tag(AGGREGATION_KEY_PREFIX), ascii_alphanum_and_seps)).parse(chunk)?;
101                    maybe_aggregation_key = Some(aggregation_key);
102                }
103                // Priority: client-provided priority of the event.
104                PRIORITY_PREFIX => {
105                    let (_, priority) =
106                        all_consuming(preceded(tag(PRIORITY_PREFIX), ascii_alphanum_and_seps)).parse(chunk)?;
107                    maybe_priority = Priority::try_from_string(priority);
108                }
109                // Source type name: client-provided source type name of the event.
110                SOURCE_TYPE_PREFIX if chunk != SOURCE_TYPE_PREFIX => {
111                    let (_, source_type) =
112                        all_consuming(preceded(tag(SOURCE_TYPE_PREFIX), ascii_alphanum_and_seps)).parse(chunk)?;
113                    maybe_source_type = Some(source_type);
114                }
115                // Alert type: client-provided alert type of the event.
116                ALERT_TYPE_PREFIX => {
117                    let (_, alert_type) =
118                        all_consuming(preceded(tag(ALERT_TYPE_PREFIX), ascii_alphanum_and_seps)).parse(chunk)?;
119                    maybe_alert_type = AlertType::try_from_string(alert_type);
120                }
121                // Local Data: client-provided data used for resolving the entity ID that this event originated from.
122                LOCAL_DATA_PREFIX if config.client_origin_detection && chunk != LOCAL_DATA_PREFIX => {
123                    let (_, local_data) = all_consuming(preceded(tag(LOCAL_DATA_PREFIX), local_data)).parse(chunk)?;
124                    maybe_local_data = Some(local_data);
125                }
126                // External Data: client-provided data used for resolving the entity ID that this event originated from.
127                EXTERNAL_DATA_PREFIX if config.client_origin_detection && chunk != EXTERNAL_DATA_PREFIX => {
128                    let (_, external_data) =
129                        all_consuming(preceded(tag(EXTERNAL_DATA_PREFIX), external_data)).parse(chunk)?;
130                    maybe_external_data = Some(external_data);
131                }
132                // Cardinality: client-provided cardinality for the event.
133                _ if chunk.starts_with(CARDINALITY_PREFIX)
134                    && config.client_origin_detection
135                    && chunk != CARDINALITY_PREFIX =>
136                {
137                    let (_, cardinality) = cardinality(chunk)?;
138                    maybe_cardinality = cardinality;
139                }
140                // Tags: additional tags to be added to the event.
141                _ if chunk.starts_with(TAGS_PREFIX) && chunk != TAGS_PREFIX => {
142                    let (_, tags) = all_consuming(preceded(tag("#"), tags(config))).parse(chunk)?;
143                    maybe_tags = Some(tags);
144                }
145                _ => {
146                    // We don't know what this is, so we just skip it.
147                    //
148                    // TODO: Should we throw an error, warn, or be silently permissive?
149                }
150            }
151            remaining = tail;
152        }
153        remaining
154    } else {
155        remaining
156    };
157
158    let tags = maybe_tags.unwrap_or_else(RawTags::empty);
159
160    let eventd = EventPacket {
161        title: title.into(),
162        text: text.into(),
163        tags,
164        timestamp: maybe_timestamp,
165        hostname: maybe_hostname,
166        aggregation_key: maybe_aggregation_key,
167        priority: maybe_priority,
168        alert_type: maybe_alert_type,
169        source_type_name: maybe_source_type,
170        local_data: maybe_local_data,
171        external_data: maybe_external_data,
172        cardinality: maybe_cardinality,
173    };
174    Ok((remaining, eventd))
175}
176
177#[cfg(test)]
178mod tests {
179    use nom::IResult;
180    use saluki_context::{
181        origin::OriginTagCardinality,
182        tags::{SharedTagSet, Tag, TagSet},
183    };
184    use saluki_core::data_model::event::eventd::{AlertType, EventD, Priority};
185    use stringtheory::MetaString;
186
187    use super::{parse_dogstatsd_event, DogStatsDCodecConfiguration};
188
189    type NomResult<'input, T> = Result<T, nom::Err<nom::error::Error<&'input [u8]>>>;
190
191    fn parse_dsd_eventd(input: &[u8]) -> NomResult<'_, EventD> {
192        let default_config = DogStatsDCodecConfiguration::default();
193        parse_dsd_eventd_with_conf(input, &default_config)
194    }
195
196    fn parse_dsd_eventd_with_conf<'input>(
197        input: &'input [u8], config: &DogStatsDCodecConfiguration,
198    ) -> NomResult<'input, EventD> {
199        let (remaining, eventd) = parse_dsd_eventd_direct(input, config)?;
200        assert!(remaining.is_empty());
201        Ok(eventd)
202    }
203
204    fn parse_dsd_eventd_direct<'input>(
205        input: &'input [u8], config: &DogStatsDCodecConfiguration,
206    ) -> IResult<&'input [u8], EventD> {
207        let (remaining, packet) = parse_dogstatsd_event(input, config)?;
208        assert!(remaining.is_empty());
209
210        let mut event_tags = TagSet::default();
211        for tag in packet.tags.into_iter() {
212            event_tags.insert_tag(tag);
213        }
214
215        let eventd = EventD::new(packet.title, packet.text)
216            .with_timestamp(packet.timestamp)
217            .with_hostname(packet.hostname.map(|s| s.into()))
218            .with_aggregation_key(packet.aggregation_key.map(|s| s.into()))
219            .with_alert_type(packet.alert_type)
220            .with_priority(packet.priority)
221            .with_source_type_name(packet.source_type_name.map(|s| s.into()))
222            .with_alert_type(packet.alert_type)
223            .with_tags(event_tags);
224
225        Ok((remaining, eventd))
226    }
227
228    #[track_caller]
229    fn check_basic_eventd_eq(expected: EventD, actual: EventD) {
230        assert_eq!(expected.title(), actual.title());
231        assert_eq!(expected.text(), actual.text());
232        assert_eq!(expected.timestamp(), actual.timestamp());
233        assert_eq!(expected.hostname(), actual.hostname());
234        assert_eq!(expected.aggregation_key(), actual.aggregation_key());
235        assert_eq!(expected.priority(), actual.priority());
236        assert_eq!(expected.source_type_name(), actual.source_type_name());
237        assert_eq!(expected.alert_type(), actual.alert_type());
238        assert_eq!(expected.tags(), actual.tags());
239        assert_eq!(expected.origin_tags(), actual.origin_tags());
240    }
241
242    #[test]
243    fn basic_eventd() {
244        let event_title = "my event";
245        let event_text = "text";
246        let raw = format!(
247            "_e{{{},{}}}:{}|{}",
248            event_title.len(),
249            event_text.len(),
250            event_title,
251            event_text
252        );
253
254        let actual = parse_dsd_eventd(raw.as_bytes()).unwrap();
255        let expected = EventD::new(event_title, event_text);
256        check_basic_eventd_eq(expected, actual);
257    }
258
259    #[test]
260    fn eventd_tags() {
261        let event_title = "my event";
262        let event_text = "text";
263        let tags = ["tag1", "tag2"];
264        let shared_tag_set: SharedTagSet = tags.iter().map(|&s| Tag::from(s)).collect::<TagSet>().into_shared();
265        let raw = format!(
266            "_e{{{},{}}}:{}|{}|#{}",
267            event_title.len(),
268            event_text.len(),
269            event_title,
270            event_text,
271            tags.join(","),
272        );
273
274        let expected = EventD::new(event_title, event_text).with_tags(shared_tag_set);
275        let actual = parse_dsd_eventd(raw.as_bytes()).unwrap();
276        check_basic_eventd_eq(expected, actual);
277    }
278
279    #[test]
280    fn eventd_priority() {
281        let event_title = "my event";
282        let event_text = "text";
283        let event_priority = Priority::Low;
284        let raw = format!(
285            "_e{{{},{}}}:{}|{}|p:{}",
286            event_title.len(),
287            event_text.len(),
288            event_title,
289            event_text,
290            event_priority
291        );
292
293        let expected = EventD::new(event_title, event_text).with_priority(event_priority);
294        let actual = parse_dsd_eventd(raw.as_bytes()).unwrap();
295        check_basic_eventd_eq(expected, actual);
296    }
297
298    #[test]
299    fn eventd_alert_type() {
300        let event_title = "my event";
301        let event_text = "text";
302        let event_alert_type = AlertType::Warning;
303        let raw = format!(
304            "_e{{{},{}}}:{}|{}|t:{}",
305            event_title.len(),
306            event_text.len(),
307            event_title,
308            event_text,
309            event_alert_type
310        );
311
312        let expected = EventD::new(event_title, event_text).with_alert_type(event_alert_type);
313        let actual = parse_dsd_eventd(raw.as_bytes()).unwrap();
314        check_basic_eventd_eq(expected, actual);
315    }
316
317    #[test]
318    fn eventd_multiple_extensions() {
319        let event_title = "my event";
320        let event_text = "text";
321        let event_hostname = MetaString::from("testhost");
322        let event_aggregation_key = MetaString::from("testkey");
323        let event_priority = Priority::Low;
324        let event_source_type = MetaString::from("testsource");
325        let event_alert_type = AlertType::Success;
326        let event_timestamp = 1234567890;
327        let event_local_data = "abcdef123456";
328        let event_external_data = "it-false,cn-redis,pu-810fe89d-da47-410b-8979-9154a40f8183";
329        let event_cardinality = "low";
330        let tags = ["tags1", "tags2"];
331        let shared_tag_set = SharedTagSet::from(TagSet::from_iter(tags.iter().map(|&s| s.into())));
332        let raw = format!(
333            "_e{{{},{}}}:{}|{}|h:{}|k:{}|p:{}|s:{}|t:{}|d:{}|c:{}|e:{}|card:{}|#{}",
334            event_title.len(),
335            event_text.len(),
336            event_title,
337            event_text,
338            event_hostname,
339            event_aggregation_key,
340            event_priority,
341            event_source_type,
342            event_alert_type,
343            event_timestamp,
344            event_local_data,
345            event_external_data,
346            event_cardinality,
347            tags.join(","),
348        );
349        let actual = parse_dsd_eventd(raw.as_bytes()).unwrap();
350        let expected = EventD::new(event_title, event_text)
351            .with_hostname(event_hostname)
352            .with_aggregation_key(event_aggregation_key)
353            .with_priority(event_priority)
354            .with_source_type_name(event_source_type)
355            .with_alert_type(event_alert_type)
356            .with_timestamp(event_timestamp)
357            .with_tags(shared_tag_set);
358        check_basic_eventd_eq(expected, actual);
359
360        // We need client_origin_detection on in order to parse local_data, external_data, and cardinality fields
361        let config = DogStatsDCodecConfiguration::default().with_client_origin_detection(true);
362        let (_, packet) = parse_dogstatsd_event(raw.as_bytes(), &config).expect("should not fail to parse");
363        assert_eq!(packet.local_data, Some(event_local_data));
364        assert_eq!(packet.external_data, Some(event_external_data));
365        assert_eq!(packet.cardinality, Some(OriginTagCardinality::Low));
366    }
367
368    #[test]
369    fn client_origin_fields_ignored_when_disabled() {
370        let local_data = "abcdef123456";
371        let external_data = "it-false,cn-redis,pu-810fe89d-da47-410b-8979-9154a40f8183";
372        let raw = format!("_e{{5,4}}:title|text|c:{}|e:{}|card:low", local_data, external_data);
373        let config = DogStatsDCodecConfiguration::default().with_client_origin_detection(false);
374        let (_, packet) = parse_dogstatsd_event(raw.as_bytes(), &config).expect("should not fail to parse");
375        assert_eq!(packet.local_data, None);
376        assert_eq!(packet.external_data, None);
377        assert_eq!(packet.cardinality, None);
378    }
379
380    #[test]
381    fn empty_structured_fields_treated_as_missing() {
382        // All optional stringy fields are empty — should parse successfully and treat them as missing.
383        let raw = "_e{5,4}:title|text|h:|k:|s:|c:|e:|card:|#";
384        let config = DogStatsDCodecConfiguration::default();
385        let (_, packet) = parse_dogstatsd_event(raw.as_bytes(), &config).expect("should not fail to parse");
386        assert_eq!(packet.hostname, None);
387        assert_eq!(packet.aggregation_key, None);
388        assert_eq!(packet.source_type_name, None);
389        assert_eq!(packet.local_data, None);
390        assert_eq!(packet.external_data, None);
391        assert_eq!(packet.cardinality, None);
392        assert!(packet.tags.into_iter().next().is_none());
393    }
394}