saluki_io/deser/codec/dogstatsd/
service_check.rs1use nom::{
2 bytes::complete::tag,
3 character::complete::u8 as parse_u8,
4 combinator::all_consuming,
5 error::{Error, ErrorKind},
6 sequence::{preceded, separated_pair},
7 IResult, Parser as _,
8};
9use saluki_context::{origin::OriginTagCardinality, tags::RawTags};
10use saluki_core::data_model::event::service_check::*;
11use stringtheory::MetaString;
12
13use super::{helpers::*, DogStatsDCodecConfiguration};
14
15pub struct ServiceCheckPacket<'a> {
17 pub name: MetaString,
18 pub status: CheckStatus,
19 pub timestamp: Option<u64>,
20 pub hostname: Option<&'a str>,
21 pub message: Option<&'a str>,
22 pub tags: RawTags<'a>,
23 pub local_data: Option<&'a str>,
24 pub external_data: Option<&'a str>,
25 pub cardinality: Option<OriginTagCardinality>,
26}
27
28pub fn parse_dogstatsd_service_check<'a>(
29 input: &'a [u8], config: &DogStatsDCodecConfiguration,
30) -> IResult<&'a [u8], ServiceCheckPacket<'a>> {
31 let (remaining, (name, raw_check_status)) = preceded(
32 tag(SERVICE_CHECK_PREFIX),
33 separated_pair(ascii_alphanum_and_seps, tag("|"), parse_u8),
34 )
35 .parse(input)?;
36
37 let check_status =
38 CheckStatus::try_from(raw_check_status).map_err(|_| nom::Err::Error(Error::new(input, ErrorKind::Verify)))?;
39
40 let mut maybe_timestamp = None;
41 let mut maybe_hostname = None;
42 let mut maybe_tags = None;
43 let mut maybe_message = None;
44 let mut maybe_local_data = None;
45 let mut maybe_external_data = None;
46 let mut maybe_cardinality = None;
47
48 let remaining = if !remaining.is_empty() {
49 let (mut remaining, _) = tag("|")(remaining)?;
50 while let Some((chunk, tail)) = split_at_delimiter(remaining, b'|') {
51 if chunk.len() < 2 {
52 break;
53 }
54
55 match &chunk[..2] {
56 TIMESTAMP_PREFIX => {
58 let (_, timestamp) = all_consuming(preceded(tag(TIMESTAMP_PREFIX), unix_timestamp)).parse(chunk)?;
59 maybe_timestamp = Some(timestamp);
60 }
61 HOSTNAME_PREFIX => {
63 let (_, hostname) =
64 all_consuming(preceded(tag(HOSTNAME_PREFIX), ascii_alphanum_and_seps)).parse(chunk)?;
65 maybe_hostname = Some(hostname);
66 }
67 LOCAL_DATA_PREFIX => {
69 if config.client_origin_detection {
70 let (_, local_data) =
71 all_consuming(preceded(tag(LOCAL_DATA_PREFIX), local_data)).parse(chunk)?;
72 maybe_local_data = Some(local_data);
73 }
74 }
75 EXTERNAL_DATA_PREFIX => {
77 if config.client_origin_detection {
78 let (_, external_data) =
79 all_consuming(preceded(tag(EXTERNAL_DATA_PREFIX), external_data)).parse(chunk)?;
80 maybe_external_data = Some(external_data);
81 }
82 }
83 _ if chunk.starts_with(TAGS_PREFIX) => {
85 let (_, tags) = all_consuming(preceded(tag(TAGS_PREFIX), tags(config))).parse(chunk)?;
86 maybe_tags = Some(tags);
87 }
88 SERVICE_CHECK_MESSAGE_PREFIX => {
90 let (_, message) = all_consuming(preceded(tag(SERVICE_CHECK_MESSAGE_PREFIX), utf8)).parse(chunk)?;
91 maybe_message = Some(message);
92 }
93 _ if chunk.starts_with(CARDINALITY_PREFIX) => {
95 if config.client_origin_detection {
96 let (_, cardinality) = cardinality(chunk)?;
97 maybe_cardinality = cardinality;
98 }
99 }
100 _ => {
101 }
105 }
106 remaining = tail;
107 }
108 remaining
109 } else {
110 remaining
111 };
112
113 let tags = maybe_tags.unwrap_or_else(RawTags::empty);
114
115 let service_check_packet = ServiceCheckPacket {
116 name: name.into(),
117 status: check_status,
118 tags,
119 timestamp: maybe_timestamp,
120 hostname: maybe_hostname,
121 message: maybe_message,
122 local_data: maybe_local_data,
123 external_data: maybe_external_data,
124 cardinality: maybe_cardinality,
125 };
126 Ok((remaining, service_check_packet))
127}
128
129#[cfg(test)]
130mod tests {
131 use nom::IResult;
132 use saluki_context::{
133 origin::OriginTagCardinality,
134 tags::{SharedTagSet, TagSet},
135 };
136 use saluki_core::data_model::event::service_check::{CheckStatus, ServiceCheck};
137 use stringtheory::MetaString;
138
139 use super::{parse_dogstatsd_service_check, DogStatsDCodecConfiguration};
140
141 type NomResult<'input, T> = Result<T, nom::Err<nom::error::Error<&'input [u8]>>>;
142
143 fn parse_dsd_service_check(input: &[u8]) -> NomResult<'_, ServiceCheck> {
144 let default_config = DogStatsDCodecConfiguration::default();
145 parse_dsd_service_check_with_conf(input, &default_config)
146 }
147
148 fn parse_dsd_service_check_with_conf<'input>(
149 input: &'input [u8], config: &DogStatsDCodecConfiguration,
150 ) -> NomResult<'input, ServiceCheck> {
151 let (remaining, service_check) = parse_dsd_service_check_direct(input, config)?;
152 assert!(remaining.is_empty());
153
154 Ok(service_check)
155 }
156
157 fn parse_dsd_service_check_direct<'input>(
158 input: &'input [u8], config: &DogStatsDCodecConfiguration,
159 ) -> IResult<&'input [u8], ServiceCheck> {
160 let (remaining, packet) = parse_dogstatsd_service_check(input, config)?;
161 assert!(remaining.is_empty());
162
163 let mut service_check_tags = TagSet::default();
164 for tag in packet.tags.into_iter() {
165 service_check_tags.insert_tag(tag);
166 }
167
168 let service_check = ServiceCheck::new(packet.name, packet.status)
169 .with_timestamp(packet.timestamp)
170 .with_hostname(packet.hostname.map(|s| s.into()))
171 .with_tags(service_check_tags)
172 .with_message(packet.message.map(|s| s.into()));
173
174 Ok((remaining, service_check))
175 }
176
177 #[track_caller]
178 fn check_basic_service_check_eq(expected: ServiceCheck, actual: ServiceCheck) {
179 assert_eq!(expected.name(), actual.name());
180 assert_eq!(expected.status(), actual.status());
181 assert_eq!(expected.timestamp(), actual.timestamp());
182 assert_eq!(expected.hostname(), actual.hostname());
183 assert_eq!(expected.tags(), actual.tags());
184 assert_eq!(expected.message(), actual.message());
185 assert_eq!(expected.origin_tags(), actual.origin_tags());
186 }
187
188 #[test]
189 fn basic_service_checks() {
190 let name = "testsvc";
191 let sc_status = CheckStatus::Warning;
192 let raw = format!("_sc|{}|{}", name, sc_status.as_u8());
193 let actual = parse_dsd_service_check(raw.as_bytes()).unwrap();
194 let expected = ServiceCheck::new(name, sc_status);
195 check_basic_service_check_eq(expected, actual);
196 }
197
198 #[test]
199 fn service_check_timestamp() {
200 let name = "testsvc";
201 let sc_status = CheckStatus::Warning;
202 let sc_timestamp = 1234567890;
203 let raw = format!("_sc|{}|{}|d:{}", name, sc_status.as_u8(), sc_timestamp);
204 let actual = parse_dsd_service_check(raw.as_bytes()).unwrap();
205 let expected = ServiceCheck::new(name, sc_status).with_timestamp(sc_timestamp);
206 check_basic_service_check_eq(expected, actual);
207 }
208
209 #[test]
210 fn service_check_tags() {
211 let name = "testsvc";
212 let sc_status = CheckStatus::Warning;
213 let tags = ["tag1", "tag2"];
214 let raw = format!("_sc|{}|{}|#{}", name, sc_status.as_u8(), tags.join(","));
215 let actual = parse_dsd_service_check(raw.as_bytes()).unwrap();
216 let shared_tag_set = SharedTagSet::from(TagSet::from_iter(tags.iter().map(|&s| s.into())));
217 let expected = ServiceCheck::new(name, sc_status).with_tags(shared_tag_set);
218 check_basic_service_check_eq(expected, actual);
219 }
220
221 #[test]
222 fn service_check_message() {
223 let name = "testsvc";
224 let sc_status = CheckStatus::Ok;
225 let sc_message = MetaString::from("service running properly");
226 let raw = format!("_sc|{}|{}|m:{}", name, sc_status.as_u8(), sc_message);
227 let actual = parse_dsd_service_check(raw.as_bytes()).unwrap();
228 let expected = ServiceCheck::new(name, sc_status).with_message(sc_message);
229 check_basic_service_check_eq(expected, actual);
230 }
231
232 #[test]
233 fn service_check_fields_after_message() {
234 let name = "testsvc";
235 let sc_status = CheckStatus::Ok;
236 let sc_message = MetaString::from("service running properly");
237 let sc_local_data = "ci-1234567890";
238 let sc_external_data = "it-false,cn-redis,pu-810fe89d-da47-410b-8979-9154a40f8183";
239 let raw = format!(
240 "_sc|{}|{}|#tag1,tag2|m:{}|c:{}|e:{}",
241 name,
242 sc_status.as_u8(),
243 sc_message,
244 sc_local_data,
245 sc_external_data,
246 );
247 let result = parse_dsd_service_check(raw.as_bytes()).unwrap();
248 let tags = ["tag1", "tag2"];
249 let shared_tag_set = SharedTagSet::from(TagSet::from_iter(tags.iter().map(|&s| s.into())));
250 let expected = ServiceCheck::new(name, sc_status)
251 .with_message(sc_message)
252 .with_tags(shared_tag_set);
253 check_basic_service_check_eq(expected, result);
254
255 let config = DogStatsDCodecConfiguration::default().with_client_origin_detection(true);
257 let (_, packet) = parse_dogstatsd_service_check(raw.as_bytes(), &config).expect("should not fail to parse");
258 assert_eq!(packet.local_data, Some(sc_local_data));
259 assert_eq!(packet.external_data, Some(sc_external_data));
260 }
261
262 #[test]
263 fn service_check_multiple_extensions() {
264 let name = "testsvc";
265 let sc_status = CheckStatus::Unknown;
266 let sc_timestamp = 1234567890;
267 let sc_hostname = MetaString::from("myhost");
268 let sc_local_data = "abcdef123456";
269 let sc_external_data = "it-false,cn-redis,pu-810fe89d-da47-410b-8979-9154a40f8183";
270 let tags = ["tag1", "tag2"];
271 let sc_message = MetaString::from("service status unknown");
272 let sc_cardinality = "none";
273 let raw = format!(
274 "_sc|{}|{}|d:{}|h:{}|c:{}|e:{}|card:{}|#{}|m:{}",
275 name,
276 sc_status.as_u8(),
277 sc_timestamp,
278 sc_hostname,
279 sc_local_data,
280 sc_external_data,
281 sc_cardinality,
282 tags.join(","),
283 sc_message
284 );
285 let shared_tag_set = SharedTagSet::from(TagSet::from_iter(tags.iter().map(|&s| s.into())));
286 let actual = parse_dsd_service_check(raw.as_bytes()).unwrap();
287 let expected = ServiceCheck::new(name, sc_status)
288 .with_timestamp(sc_timestamp)
289 .with_hostname(sc_hostname)
290 .with_tags(shared_tag_set)
291 .with_message(sc_message);
292 check_basic_service_check_eq(expected, actual);
293
294 let config = DogStatsDCodecConfiguration::default().with_client_origin_detection(true);
296 let (_, packet) = parse_dogstatsd_service_check(raw.as_bytes(), &config).expect("should not fail to parse");
297 assert_eq!(packet.local_data, Some(sc_local_data));
298 assert_eq!(packet.external_data, Some(sc_external_data));
299 assert_eq!(packet.cardinality, Some(OriginTagCardinality::None));
300 }
301
302 #[test]
303 fn client_origin_fields_ignored_when_disabled() {
304 let local_data = "abcdef123456";
305 let external_data = "it-false,cn-redis,pu-810fe89d-da47-410b-8979-9154a40f8183";
306 let raw = format!("_sc|testsvc|0|c:{}|e:{}|card:low", local_data, external_data);
307 let config = DogStatsDCodecConfiguration::default().with_client_origin_detection(false);
308 let (_, packet) = parse_dogstatsd_service_check(raw.as_bytes(), &config).expect("should not fail to parse");
309 assert_eq!(packet.local_data, None);
310 assert_eq!(packet.external_data, None);
311 assert_eq!(packet.cardinality, None);
312 }
313
314 #[test]
315 fn service_check_semi_real_payload_kafka() {
316 let raw_payload = "_sc|kafka.can_connect|2|#env:staging,service:datadog-agent,dd.internal.entity_id:none,dd.internal.card:none,instance:kafka-127.0.0.1-9999,jmx_server:127.0.0.1|m:Unable to instantiate or initialize instance 127.0.0.1:9999. Is the target JMX Server or JVM running? Failed to retrieve RMIServer stub: javax.naming.ServiceUnavailableException [Root exception is java.rmi.ConnectException: Connection refused to host: 127.0.0.1; nested exception is: \\n\tjava.net.ConnectException: Connection refused (Connection refused)]";
317 let _ = parse_dsd_service_check(raw_payload.as_bytes()).unwrap();
318 }
319}