Skip to main content

saluki_io/deser/codec/dogstatsd/
service_check.rs

1use nom::{
2    bytes::complete::tag,
3    character::complete::u8 as parse_u8,
4    combinator::all_consuming,
5    error::{Error, ErrorKind},
6    sequence::{preceded, separated_pair},
7    IResult, Parser as _,
8};
9use saluki_context::{origin::OriginTagCardinality, tags::RawTags};
10use saluki_core::data_model::event::service_check::*;
11use stringtheory::MetaString;
12
13use super::{helpers::*, DogStatsDCodecConfiguration};
14
15/// A DogStatsD service check packet.
16pub struct ServiceCheckPacket<'a> {
17    pub name: MetaString,
18    pub status: CheckStatus,
19    pub timestamp: Option<u64>,
20    pub hostname: Option<&'a str>,
21    pub message: Option<&'a str>,
22    pub tags: RawTags<'a>,
23    pub local_data: Option<&'a str>,
24    pub external_data: Option<&'a str>,
25    pub cardinality: Option<OriginTagCardinality>,
26}
27
28pub fn parse_dogstatsd_service_check<'a>(
29    input: &'a [u8], config: &DogStatsDCodecConfiguration,
30) -> IResult<&'a [u8], ServiceCheckPacket<'a>> {
31    let (remaining, (name, raw_check_status)) = preceded(
32        tag(SERVICE_CHECK_PREFIX),
33        separated_pair(ascii_alphanum_and_seps, tag("|"), parse_u8),
34    )
35    .parse(input)?;
36
37    let check_status =
38        CheckStatus::try_from(raw_check_status).map_err(|_| nom::Err::Error(Error::new(input, ErrorKind::Verify)))?;
39
40    let mut maybe_timestamp = None;
41    let mut maybe_hostname = None;
42    let mut maybe_tags = None;
43    let mut maybe_message = None;
44    let mut maybe_local_data = None;
45    let mut maybe_external_data = None;
46    let mut maybe_cardinality = None;
47
48    let remaining = if !remaining.is_empty() {
49        let (mut remaining, _) = tag("|")(remaining)?;
50        while let Some((chunk, tail)) = split_at_delimiter(remaining, b'|') {
51            if chunk.len() < 2 {
52                break;
53            }
54
55            match &chunk[..2] {
56                // Timestamp: client-provided timestamp for the event, relative to the Unix epoch, in seconds.
57                TIMESTAMP_PREFIX => {
58                    let (_, timestamp) = all_consuming(preceded(tag(TIMESTAMP_PREFIX), unix_timestamp)).parse(chunk)?;
59                    maybe_timestamp = Some(timestamp);
60                }
61                // Hostname: client-provided hostname for the host that this service check originated from.
62                HOSTNAME_PREFIX => {
63                    let (_, hostname) =
64                        all_consuming(preceded(tag(HOSTNAME_PREFIX), ascii_alphanum_and_seps)).parse(chunk)?;
65                    maybe_hostname = Some(hostname);
66                }
67                // Local Data: client-provided data used for resolving the entity ID that this service check originated from.
68                LOCAL_DATA_PREFIX => {
69                    if config.client_origin_detection {
70                        let (_, local_data) =
71                            all_consuming(preceded(tag(LOCAL_DATA_PREFIX), local_data)).parse(chunk)?;
72                        maybe_local_data = Some(local_data);
73                    }
74                }
75                // External Data: client-provided data used for resolving the entity ID that this service check originated from.
76                EXTERNAL_DATA_PREFIX => {
77                    if config.client_origin_detection {
78                        let (_, external_data) =
79                            all_consuming(preceded(tag(EXTERNAL_DATA_PREFIX), external_data)).parse(chunk)?;
80                        maybe_external_data = Some(external_data);
81                    }
82                }
83                // Tags: additional tags to be added to the service check.
84                _ if chunk.starts_with(TAGS_PREFIX) => {
85                    let (_, tags) = all_consuming(preceded(tag(TAGS_PREFIX), tags(config))).parse(chunk)?;
86                    maybe_tags = Some(tags);
87                }
88                // Message: A message describing the current state of the service check.
89                SERVICE_CHECK_MESSAGE_PREFIX => {
90                    let (_, message) = all_consuming(preceded(tag(SERVICE_CHECK_MESSAGE_PREFIX), utf8)).parse(chunk)?;
91                    maybe_message = Some(message);
92                }
93                // Cardinality: client-provided cardinality for the service check.
94                _ if chunk.starts_with(CARDINALITY_PREFIX) => {
95                    if config.client_origin_detection {
96                        let (_, cardinality) = cardinality(chunk)?;
97                        maybe_cardinality = cardinality;
98                    }
99                }
100                _ => {
101                    // We don't know what this is, so we just skip it.
102                    //
103                    // TODO: Should we throw an error, warn, or be silently permissive?
104                }
105            }
106            remaining = tail;
107        }
108        remaining
109    } else {
110        remaining
111    };
112
113    let tags = maybe_tags.unwrap_or_else(RawTags::empty);
114
115    let service_check_packet = ServiceCheckPacket {
116        name: name.into(),
117        status: check_status,
118        tags,
119        timestamp: maybe_timestamp,
120        hostname: maybe_hostname,
121        message: maybe_message,
122        local_data: maybe_local_data,
123        external_data: maybe_external_data,
124        cardinality: maybe_cardinality,
125    };
126    Ok((remaining, service_check_packet))
127}
128
129#[cfg(test)]
130mod tests {
131    use nom::IResult;
132    use saluki_context::{
133        origin::OriginTagCardinality,
134        tags::{SharedTagSet, TagSet},
135    };
136    use saluki_core::data_model::event::service_check::{CheckStatus, ServiceCheck};
137    use stringtheory::MetaString;
138
139    use super::{parse_dogstatsd_service_check, DogStatsDCodecConfiguration};
140
141    type NomResult<'input, T> = Result<T, nom::Err<nom::error::Error<&'input [u8]>>>;
142
143    fn parse_dsd_service_check(input: &[u8]) -> NomResult<'_, ServiceCheck> {
144        let default_config = DogStatsDCodecConfiguration::default();
145        parse_dsd_service_check_with_conf(input, &default_config)
146    }
147
148    fn parse_dsd_service_check_with_conf<'input>(
149        input: &'input [u8], config: &DogStatsDCodecConfiguration,
150    ) -> NomResult<'input, ServiceCheck> {
151        let (remaining, service_check) = parse_dsd_service_check_direct(input, config)?;
152        assert!(remaining.is_empty());
153
154        Ok(service_check)
155    }
156
157    fn parse_dsd_service_check_direct<'input>(
158        input: &'input [u8], config: &DogStatsDCodecConfiguration,
159    ) -> IResult<&'input [u8], ServiceCheck> {
160        let (remaining, packet) = parse_dogstatsd_service_check(input, config)?;
161        assert!(remaining.is_empty());
162
163        let mut service_check_tags = TagSet::default();
164        for tag in packet.tags.into_iter() {
165            service_check_tags.insert_tag(tag);
166        }
167
168        let service_check = ServiceCheck::new(packet.name, packet.status)
169            .with_timestamp(packet.timestamp)
170            .with_hostname(packet.hostname.map(|s| s.into()))
171            .with_tags(service_check_tags)
172            .with_message(packet.message.map(|s| s.into()));
173
174        Ok((remaining, service_check))
175    }
176
177    #[track_caller]
178    fn check_basic_service_check_eq(expected: ServiceCheck, actual: ServiceCheck) {
179        assert_eq!(expected.name(), actual.name());
180        assert_eq!(expected.status(), actual.status());
181        assert_eq!(expected.timestamp(), actual.timestamp());
182        assert_eq!(expected.hostname(), actual.hostname());
183        assert_eq!(expected.tags(), actual.tags());
184        assert_eq!(expected.message(), actual.message());
185        assert_eq!(expected.origin_tags(), actual.origin_tags());
186    }
187
188    #[test]
189    fn basic_service_checks() {
190        let name = "testsvc";
191        let sc_status = CheckStatus::Warning;
192        let raw = format!("_sc|{}|{}", name, sc_status.as_u8());
193        let actual = parse_dsd_service_check(raw.as_bytes()).unwrap();
194        let expected = ServiceCheck::new(name, sc_status);
195        check_basic_service_check_eq(expected, actual);
196    }
197
198    #[test]
199    fn service_check_timestamp() {
200        let name = "testsvc";
201        let sc_status = CheckStatus::Warning;
202        let sc_timestamp = 1234567890;
203        let raw = format!("_sc|{}|{}|d:{}", name, sc_status.as_u8(), sc_timestamp);
204        let actual = parse_dsd_service_check(raw.as_bytes()).unwrap();
205        let expected = ServiceCheck::new(name, sc_status).with_timestamp(sc_timestamp);
206        check_basic_service_check_eq(expected, actual);
207    }
208
209    #[test]
210    fn service_check_tags() {
211        let name = "testsvc";
212        let sc_status = CheckStatus::Warning;
213        let tags = ["tag1", "tag2"];
214        let raw = format!("_sc|{}|{}|#{}", name, sc_status.as_u8(), tags.join(","));
215        let actual = parse_dsd_service_check(raw.as_bytes()).unwrap();
216        let shared_tag_set = SharedTagSet::from(TagSet::from_iter(tags.iter().map(|&s| s.into())));
217        let expected = ServiceCheck::new(name, sc_status).with_tags(shared_tag_set);
218        check_basic_service_check_eq(expected, actual);
219    }
220
221    #[test]
222    fn service_check_message() {
223        let name = "testsvc";
224        let sc_status = CheckStatus::Ok;
225        let sc_message = MetaString::from("service running properly");
226        let raw = format!("_sc|{}|{}|m:{}", name, sc_status.as_u8(), sc_message);
227        let actual = parse_dsd_service_check(raw.as_bytes()).unwrap();
228        let expected = ServiceCheck::new(name, sc_status).with_message(sc_message);
229        check_basic_service_check_eq(expected, actual);
230    }
231
232    #[test]
233    fn service_check_fields_after_message() {
234        let name = "testsvc";
235        let sc_status = CheckStatus::Ok;
236        let sc_message = MetaString::from("service running properly");
237        let sc_local_data = "ci-1234567890";
238        let sc_external_data = "it-false,cn-redis,pu-810fe89d-da47-410b-8979-9154a40f8183";
239        let raw = format!(
240            "_sc|{}|{}|#tag1,tag2|m:{}|c:{}|e:{}",
241            name,
242            sc_status.as_u8(),
243            sc_message,
244            sc_local_data,
245            sc_external_data,
246        );
247        let result = parse_dsd_service_check(raw.as_bytes()).unwrap();
248        let tags = ["tag1", "tag2"];
249        let shared_tag_set = SharedTagSet::from(TagSet::from_iter(tags.iter().map(|&s| s.into())));
250        let expected = ServiceCheck::new(name, sc_status)
251            .with_message(sc_message)
252            .with_tags(shared_tag_set);
253        check_basic_service_check_eq(expected, result);
254
255        // We need client_origin_detection on in order to parse local_data, external_data, and cardinality fields
256        let config = DogStatsDCodecConfiguration::default().with_client_origin_detection(true);
257        let (_, packet) = parse_dogstatsd_service_check(raw.as_bytes(), &config).expect("should not fail to parse");
258        assert_eq!(packet.local_data, Some(sc_local_data));
259        assert_eq!(packet.external_data, Some(sc_external_data));
260    }
261
262    #[test]
263    fn service_check_multiple_extensions() {
264        let name = "testsvc";
265        let sc_status = CheckStatus::Unknown;
266        let sc_timestamp = 1234567890;
267        let sc_hostname = MetaString::from("myhost");
268        let sc_local_data = "abcdef123456";
269        let sc_external_data = "it-false,cn-redis,pu-810fe89d-da47-410b-8979-9154a40f8183";
270        let tags = ["tag1", "tag2"];
271        let sc_message = MetaString::from("service status unknown");
272        let sc_cardinality = "none";
273        let raw = format!(
274            "_sc|{}|{}|d:{}|h:{}|c:{}|e:{}|card:{}|#{}|m:{}",
275            name,
276            sc_status.as_u8(),
277            sc_timestamp,
278            sc_hostname,
279            sc_local_data,
280            sc_external_data,
281            sc_cardinality,
282            tags.join(","),
283            sc_message
284        );
285        let shared_tag_set = SharedTagSet::from(TagSet::from_iter(tags.iter().map(|&s| s.into())));
286        let actual = parse_dsd_service_check(raw.as_bytes()).unwrap();
287        let expected = ServiceCheck::new(name, sc_status)
288            .with_timestamp(sc_timestamp)
289            .with_hostname(sc_hostname)
290            .with_tags(shared_tag_set)
291            .with_message(sc_message);
292        check_basic_service_check_eq(expected, actual);
293
294        // We need client_origin_detection on in order to parse local_data, external_data, and cardinality fields
295        let config = DogStatsDCodecConfiguration::default().with_client_origin_detection(true);
296        let (_, packet) = parse_dogstatsd_service_check(raw.as_bytes(), &config).expect("should not fail to parse");
297        assert_eq!(packet.local_data, Some(sc_local_data));
298        assert_eq!(packet.external_data, Some(sc_external_data));
299        assert_eq!(packet.cardinality, Some(OriginTagCardinality::None));
300    }
301
302    #[test]
303    fn client_origin_fields_ignored_when_disabled() {
304        let local_data = "abcdef123456";
305        let external_data = "it-false,cn-redis,pu-810fe89d-da47-410b-8979-9154a40f8183";
306        let raw = format!("_sc|testsvc|0|c:{}|e:{}|card:low", local_data, external_data);
307        let config = DogStatsDCodecConfiguration::default().with_client_origin_detection(false);
308        let (_, packet) = parse_dogstatsd_service_check(raw.as_bytes(), &config).expect("should not fail to parse");
309        assert_eq!(packet.local_data, None);
310        assert_eq!(packet.external_data, None);
311        assert_eq!(packet.cardinality, None);
312    }
313
314    #[test]
315    fn service_check_semi_real_payload_kafka() {
316        let raw_payload = "_sc|kafka.can_connect|2|#env:staging,service:datadog-agent,dd.internal.entity_id:none,dd.internal.card:none,instance:kafka-127.0.0.1-9999,jmx_server:127.0.0.1|m:Unable to instantiate or initialize instance 127.0.0.1:9999. Is the target JMX Server or JVM running? Failed to retrieve RMIServer stub: javax.naming.ServiceUnavailableException [Root exception is java.rmi.ConnectException: Connection refused to host: 127.0.0.1; nested exception is: \\n\tjava.net.ConnectException: Connection refused (Connection refused)]";
317        let _ = parse_dsd_service_check(raw_payload.as_bytes()).unwrap();
318    }
319}