1use nom::{
2 bytes::complete::{tag, take},
3 character::complete::u32 as parse_u32,
4 combinator::all_consuming,
5 error::{Error, ErrorKind},
6 sequence::{delimited, preceded, separated_pair},
7 IResult, Parser as _,
8};
9use saluki_context::{origin::OriginTagCardinality, tags::RawTags};
10use saluki_core::data_model::event::eventd::*;
11use stringtheory::MetaString;
12
13use super::{helpers::*, DogStatsDCodecConfiguration};
14
15pub struct EventPacket<'a> {
17 pub title: MetaString,
18 pub text: MetaString,
19 pub timestamp: Option<u64>,
20 pub hostname: Option<&'a str>,
21 pub aggregation_key: Option<&'a str>,
22 pub priority: Option<Priority>,
23 pub alert_type: Option<AlertType>,
24 pub source_type_name: Option<&'a str>,
25 pub tags: RawTags<'a>,
26 pub local_data: Option<&'a str>,
27 pub external_data: Option<&'a str>,
28 pub cardinality: Option<OriginTagCardinality>,
29}
30
31#[inline]
32pub fn parse_dogstatsd_event<'a>(
33 input: &'a [u8], config: &DogStatsDCodecConfiguration,
34) -> IResult<&'a [u8], EventPacket<'a>> {
35 let (remaining, (title_len, text_len)) = delimited(
37 tag(EVENT_PREFIX),
38 separated_pair(parse_u32, tag(","), parse_u32),
39 tag("}:"),
40 )
41 .parse(input)?;
42
43 if title_len == 0 || text_len == 0 {
45 return Err(nom::Err::Error(Error::new(input, ErrorKind::Verify)));
46 }
47
48 let (remaining, (raw_title, raw_text)) =
49 separated_pair(take(title_len), tag("|"), take(text_len)).parse(remaining)?;
50
51 let title = match simdutf8::basic::from_utf8(raw_title) {
52 Ok(title) => title.replace("\\n", "\n"),
53 Err(_) => return Err(nom::Err::Error(Error::new(raw_title, ErrorKind::Verify))),
54 };
55
56 let text = match simdutf8::basic::from_utf8(raw_text) {
57 Ok(text) => text.replace("\\n", "\n"),
58 Err(_) => return Err(nom::Err::Error(Error::new(raw_text, ErrorKind::Verify))),
59 };
60
61 let mut maybe_priority = Some(Priority::Normal);
69 let mut maybe_alert_type = Some(AlertType::Info);
70 let mut maybe_timestamp = None;
71 let mut maybe_hostname = None;
72 let mut maybe_aggregation_key = None;
73 let mut maybe_source_type = None;
74 let mut maybe_tags = None;
75 let mut maybe_local_data = None;
76 let mut maybe_external_data = None;
77 let mut maybe_cardinality = None;
78
79 let remaining = if !remaining.is_empty() {
80 let (mut remaining, _) = tag("|")(remaining)?;
81 while let Some((chunk, tail)) = split_at_delimiter(remaining, b'|') {
82 if chunk.len() < 2 {
83 break;
84 }
85 match &chunk[..2] {
86 TIMESTAMP_PREFIX => {
88 let (_, timestamp) = all_consuming(preceded(tag(TIMESTAMP_PREFIX), unix_timestamp)).parse(chunk)?;
89 maybe_timestamp = Some(timestamp);
90 }
91 HOSTNAME_PREFIX if chunk != HOSTNAME_PREFIX => {
93 let (_, hostname) =
94 all_consuming(preceded(tag(HOSTNAME_PREFIX), ascii_alphanum_and_seps)).parse(chunk)?;
95 maybe_hostname = Some(hostname);
96 }
97 AGGREGATION_KEY_PREFIX if chunk != AGGREGATION_KEY_PREFIX => {
99 let (_, aggregation_key) =
100 all_consuming(preceded(tag(AGGREGATION_KEY_PREFIX), ascii_alphanum_and_seps)).parse(chunk)?;
101 maybe_aggregation_key = Some(aggregation_key);
102 }
103 PRIORITY_PREFIX => {
105 let (_, priority) =
106 all_consuming(preceded(tag(PRIORITY_PREFIX), ascii_alphanum_and_seps)).parse(chunk)?;
107 maybe_priority = Priority::try_from_string(priority);
108 }
109 SOURCE_TYPE_PREFIX if chunk != SOURCE_TYPE_PREFIX => {
111 let (_, source_type) =
112 all_consuming(preceded(tag(SOURCE_TYPE_PREFIX), ascii_alphanum_and_seps)).parse(chunk)?;
113 maybe_source_type = Some(source_type);
114 }
115 ALERT_TYPE_PREFIX => {
117 let (_, alert_type) =
118 all_consuming(preceded(tag(ALERT_TYPE_PREFIX), ascii_alphanum_and_seps)).parse(chunk)?;
119 maybe_alert_type = AlertType::try_from_string(alert_type);
120 }
121 LOCAL_DATA_PREFIX if config.client_origin_detection && chunk != LOCAL_DATA_PREFIX => {
123 let (_, local_data) = all_consuming(preceded(tag(LOCAL_DATA_PREFIX), local_data)).parse(chunk)?;
124 maybe_local_data = Some(local_data);
125 }
126 EXTERNAL_DATA_PREFIX if config.client_origin_detection && chunk != EXTERNAL_DATA_PREFIX => {
128 let (_, external_data) =
129 all_consuming(preceded(tag(EXTERNAL_DATA_PREFIX), external_data)).parse(chunk)?;
130 maybe_external_data = Some(external_data);
131 }
132 _ if chunk.starts_with(CARDINALITY_PREFIX)
134 && config.client_origin_detection
135 && chunk != CARDINALITY_PREFIX =>
136 {
137 let (_, cardinality) = cardinality(chunk)?;
138 maybe_cardinality = cardinality;
139 }
140 _ if chunk.starts_with(TAGS_PREFIX) && chunk != TAGS_PREFIX => {
142 let (_, tags) = all_consuming(preceded(tag("#"), tags(config))).parse(chunk)?;
143 maybe_tags = Some(tags);
144 }
145 _ => {
146 }
150 }
151 remaining = tail;
152 }
153 remaining
154 } else {
155 remaining
156 };
157
158 let tags = maybe_tags.unwrap_or_else(RawTags::empty);
159
160 let eventd = EventPacket {
161 title: title.into(),
162 text: text.into(),
163 tags,
164 timestamp: maybe_timestamp,
165 hostname: maybe_hostname,
166 aggregation_key: maybe_aggregation_key,
167 priority: maybe_priority,
168 alert_type: maybe_alert_type,
169 source_type_name: maybe_source_type,
170 local_data: maybe_local_data,
171 external_data: maybe_external_data,
172 cardinality: maybe_cardinality,
173 };
174 Ok((remaining, eventd))
175}
176
177#[cfg(test)]
178mod tests {
179 use nom::IResult;
180 use saluki_context::{
181 origin::OriginTagCardinality,
182 tags::{SharedTagSet, Tag, TagSet},
183 };
184 use saluki_core::data_model::event::eventd::{AlertType, EventD, Priority};
185 use stringtheory::MetaString;
186
187 use super::{parse_dogstatsd_event, DogStatsDCodecConfiguration};
188
189 type NomResult<'input, T> = Result<T, nom::Err<nom::error::Error<&'input [u8]>>>;
190
191 fn parse_dsd_eventd(input: &[u8]) -> NomResult<'_, EventD> {
192 let default_config = DogStatsDCodecConfiguration::default();
193 parse_dsd_eventd_with_conf(input, &default_config)
194 }
195
196 fn parse_dsd_eventd_with_conf<'input>(
197 input: &'input [u8], config: &DogStatsDCodecConfiguration,
198 ) -> NomResult<'input, EventD> {
199 let (remaining, eventd) = parse_dsd_eventd_direct(input, config)?;
200 assert!(remaining.is_empty());
201 Ok(eventd)
202 }
203
204 fn parse_dsd_eventd_direct<'input>(
205 input: &'input [u8], config: &DogStatsDCodecConfiguration,
206 ) -> IResult<&'input [u8], EventD> {
207 let (remaining, packet) = parse_dogstatsd_event(input, config)?;
208 assert!(remaining.is_empty());
209
210 let mut event_tags = TagSet::default();
211 for tag in packet.tags.into_iter() {
212 event_tags.insert_tag(tag);
213 }
214
215 let eventd = EventD::new(packet.title, packet.text)
216 .with_timestamp(packet.timestamp)
217 .with_hostname(packet.hostname.map(|s| s.into()))
218 .with_aggregation_key(packet.aggregation_key.map(|s| s.into()))
219 .with_alert_type(packet.alert_type)
220 .with_priority(packet.priority)
221 .with_source_type_name(packet.source_type_name.map(|s| s.into()))
222 .with_alert_type(packet.alert_type)
223 .with_tags(event_tags);
224
225 Ok((remaining, eventd))
226 }
227
228 #[track_caller]
229 fn check_basic_eventd_eq(expected: EventD, actual: EventD) {
230 assert_eq!(expected.title(), actual.title());
231 assert_eq!(expected.text(), actual.text());
232 assert_eq!(expected.timestamp(), actual.timestamp());
233 assert_eq!(expected.hostname(), actual.hostname());
234 assert_eq!(expected.aggregation_key(), actual.aggregation_key());
235 assert_eq!(expected.priority(), actual.priority());
236 assert_eq!(expected.source_type_name(), actual.source_type_name());
237 assert_eq!(expected.alert_type(), actual.alert_type());
238 assert_eq!(expected.tags(), actual.tags());
239 assert_eq!(expected.origin_tags(), actual.origin_tags());
240 }
241
242 #[test]
243 fn basic_eventd() {
244 let event_title = "my event";
245 let event_text = "text";
246 let raw = format!(
247 "_e{{{},{}}}:{}|{}",
248 event_title.len(),
249 event_text.len(),
250 event_title,
251 event_text
252 );
253
254 let actual = parse_dsd_eventd(raw.as_bytes()).unwrap();
255 let expected = EventD::new(event_title, event_text);
256 check_basic_eventd_eq(expected, actual);
257 }
258
259 #[test]
260 fn eventd_tags() {
261 let event_title = "my event";
262 let event_text = "text";
263 let tags = ["tag1", "tag2"];
264 let shared_tag_set: SharedTagSet = tags.iter().map(|&s| Tag::from(s)).collect::<TagSet>().into_shared();
265 let raw = format!(
266 "_e{{{},{}}}:{}|{}|#{}",
267 event_title.len(),
268 event_text.len(),
269 event_title,
270 event_text,
271 tags.join(","),
272 );
273
274 let expected = EventD::new(event_title, event_text).with_tags(shared_tag_set);
275 let actual = parse_dsd_eventd(raw.as_bytes()).unwrap();
276 check_basic_eventd_eq(expected, actual);
277 }
278
279 #[test]
280 fn eventd_priority() {
281 let event_title = "my event";
282 let event_text = "text";
283 let event_priority = Priority::Low;
284 let raw = format!(
285 "_e{{{},{}}}:{}|{}|p:{}",
286 event_title.len(),
287 event_text.len(),
288 event_title,
289 event_text,
290 event_priority
291 );
292
293 let expected = EventD::new(event_title, event_text).with_priority(event_priority);
294 let actual = parse_dsd_eventd(raw.as_bytes()).unwrap();
295 check_basic_eventd_eq(expected, actual);
296 }
297
298 #[test]
299 fn eventd_alert_type() {
300 let event_title = "my event";
301 let event_text = "text";
302 let event_alert_type = AlertType::Warning;
303 let raw = format!(
304 "_e{{{},{}}}:{}|{}|t:{}",
305 event_title.len(),
306 event_text.len(),
307 event_title,
308 event_text,
309 event_alert_type
310 );
311
312 let expected = EventD::new(event_title, event_text).with_alert_type(event_alert_type);
313 let actual = parse_dsd_eventd(raw.as_bytes()).unwrap();
314 check_basic_eventd_eq(expected, actual);
315 }
316
317 #[test]
318 fn eventd_multiple_extensions() {
319 let event_title = "my event";
320 let event_text = "text";
321 let event_hostname = MetaString::from("testhost");
322 let event_aggregation_key = MetaString::from("testkey");
323 let event_priority = Priority::Low;
324 let event_source_type = MetaString::from("testsource");
325 let event_alert_type = AlertType::Success;
326 let event_timestamp = 1234567890;
327 let event_local_data = "abcdef123456";
328 let event_external_data = "it-false,cn-redis,pu-810fe89d-da47-410b-8979-9154a40f8183";
329 let event_cardinality = "low";
330 let tags = ["tags1", "tags2"];
331 let shared_tag_set = SharedTagSet::from(TagSet::from_iter(tags.iter().map(|&s| s.into())));
332 let raw = format!(
333 "_e{{{},{}}}:{}|{}|h:{}|k:{}|p:{}|s:{}|t:{}|d:{}|c:{}|e:{}|card:{}|#{}",
334 event_title.len(),
335 event_text.len(),
336 event_title,
337 event_text,
338 event_hostname,
339 event_aggregation_key,
340 event_priority,
341 event_source_type,
342 event_alert_type,
343 event_timestamp,
344 event_local_data,
345 event_external_data,
346 event_cardinality,
347 tags.join(","),
348 );
349 let actual = parse_dsd_eventd(raw.as_bytes()).unwrap();
350 let expected = EventD::new(event_title, event_text)
351 .with_hostname(event_hostname)
352 .with_aggregation_key(event_aggregation_key)
353 .with_priority(event_priority)
354 .with_source_type_name(event_source_type)
355 .with_alert_type(event_alert_type)
356 .with_timestamp(event_timestamp)
357 .with_tags(shared_tag_set);
358 check_basic_eventd_eq(expected, actual);
359
360 let config = DogStatsDCodecConfiguration::default().with_client_origin_detection(true);
362 let (_, packet) = parse_dogstatsd_event(raw.as_bytes(), &config).expect("should not fail to parse");
363 assert_eq!(packet.local_data, Some(event_local_data));
364 assert_eq!(packet.external_data, Some(event_external_data));
365 assert_eq!(packet.cardinality, Some(OriginTagCardinality::Low));
366 }
367
368 #[test]
369 fn client_origin_fields_ignored_when_disabled() {
370 let local_data = "abcdef123456";
371 let external_data = "it-false,cn-redis,pu-810fe89d-da47-410b-8979-9154a40f8183";
372 let raw = format!("_e{{5,4}}:title|text|c:{}|e:{}|card:low", local_data, external_data);
373 let config = DogStatsDCodecConfiguration::default().with_client_origin_detection(false);
374 let (_, packet) = parse_dogstatsd_event(raw.as_bytes(), &config).expect("should not fail to parse");
375 assert_eq!(packet.local_data, None);
376 assert_eq!(packet.external_data, None);
377 assert_eq!(packet.cardinality, None);
378 }
379
380 #[test]
381 fn empty_structured_fields_treated_as_missing() {
382 let raw = "_e{5,4}:title|text|h:|k:|s:|c:|e:|card:|#";
384 let config = DogStatsDCodecConfiguration::default();
385 let (_, packet) = parse_dogstatsd_event(raw.as_bytes(), &config).expect("should not fail to parse");
386 assert_eq!(packet.hostname, None);
387 assert_eq!(packet.aggregation_key, None);
388 assert_eq!(packet.source_type_name, None);
389 assert_eq!(packet.local_data, None);
390 assert_eq!(packet.external_data, None);
391 assert_eq!(packet.cardinality, None);
392 assert!(packet.tags.into_iter().next().is_none());
393 }
394}