saluki_io/deser/codec/dogstatsd/mod.rs
1use std::fmt;
2
3mod event;
4pub use self::event::EventPacket;
5
6mod helpers;
7pub use self::helpers::{parse_message_type, MessageType};
8
9mod metric;
10pub use self::metric::MetricPacket;
11
12mod service_check;
13pub use self::service_check::ServiceCheckPacket;
14
15type NomParserError<'a> = nom::Err<nom::error::Error<&'a [u8]>>;
16
17// This is the lowest sample rate that we consider to be "safe" with typical DogStatsD default settings.
18//
19// Our logic here is:
20// - DogStatsD payloads are limited to 8KiB by default
21// - a valid distribution metric could have a multi-value payload with ~4093 values (value of `1`, when factoring for protocol overhead)
22// - to avoid overflow in resulting sketch, total count of all values must be less than or equal to 2^32
23// - 2^32 / 4093 = 1,049,344... or 1,000,000 to be safe
24// - 1 / 1,000,000 = 0.000001
25const MINIMUM_SAFE_DEFAULT_SAMPLE_RATE: f64 = 0.000001;
26
27/// Parser error.
28#[derive(Debug)]
29pub struct ParseError {
30 kind: nom::error::ErrorKind,
31 data: String,
32}
33
34impl fmt::Display for ParseError {
35 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
36 write!(
37 f,
38 "encountered error '{:?}' while processing message '{}'",
39 self.kind, self.data
40 )
41 }
42}
43
44impl std::error::Error for ParseError {}
45
46impl<'a> From<NomParserError<'a>> for ParseError {
47 fn from(err: NomParserError<'a>) -> Self {
48 match err {
49 nom::Err::Error(e) | nom::Err::Failure(e) => Self {
50 kind: e.code,
51 data: String::from_utf8_lossy(e.input).to_string(),
52 },
53 nom::Err::Incomplete(_) => unreachable!("dogstatsd codec only supports complete payloads"),
54 }
55 }
56}
57
58/// A DogStatsD packet.
59pub enum ParsedPacket<'a> {
60 /// Metric.
61 Metric(MetricPacket<'a>),
62
63 /// Event.
64 Event(EventPacket<'a>),
65
66 /// Service check.
67 ServiceCheck(ServiceCheckPacket<'a>),
68}
69
70/// DogStatsD codec configuration.
71#[derive(Clone, Debug)]
72pub struct DogstatsdCodecConfiguration {
73 permissive: bool,
74 maximum_tag_length: usize,
75 maximum_tag_count: usize,
76 timestamps: bool,
77 minimum_sample_rate: f64,
78}
79
80impl DogstatsdCodecConfiguration {
81 /// Sets whether or not the codec should operate in permissive mode.
82 ///
83 /// In permissive mode, the codec will attempt to parse as much of the input as possible, relying solely on
84 /// structural markers (specific delimiting characters) to determine the boundaries of different parts of the
85 /// payload. This allows for decoding payloads with invalid contents (e.g., characters that are valid UTF-8, but
86 /// aren't within ASCII bounds, etc) such that the data plane can attempt to process them further.
87 ///
88 /// Permissive mode does not allow for decoding payloads with structural errors (e.g., missing delimiters, etc) or
89 /// that cannot be safely handled internally (e.g., invalid UTF-8 characters for the metric name or tags).
90 ///
91 /// Defaults to `false`.
92 pub fn with_permissive_mode(mut self, permissive: bool) -> Self {
93 self.permissive = permissive;
94 self
95 }
96
97 /// Sets the maximum tag length.
98 ///
99 /// This controls the number of bytes that are allowed for a single tag. If a tag exceeds this limit, it is
100 /// truncated to the closest previous UTF-8 character boundary, in order to preserve UTF-8 validity.
101 ///
102 /// Defaults to no limit.
103 pub fn with_maximum_tag_length(mut self, maximum_tag_length: usize) -> Self {
104 self.maximum_tag_length = maximum_tag_length;
105 self
106 }
107
108 /// Sets the maximum tag count.
109 ///
110 /// This is the maximum number of tags allowed for a single metric. If the number of tags exceeds this limit,
111 /// remaining tags are simply ignored.
112 ///
113 /// Defaults to no limit.
114 pub fn with_maximum_tag_count(mut self, maximum_tag_count: usize) -> Self {
115 self.maximum_tag_count = maximum_tag_count;
116 self
117 }
118
119 /// Sets whether or not timestamps are read from metrics.
120 ///
121 /// This is generally used in conjunction with aggregating metrics pipelines to control whether or not metrics are
122 /// able to specify their own timestamp in order to be forwarded immediately without aggregation.
123 ///
124 /// Defaults to `true`.
125 pub fn with_timestamps(mut self, timestamps: bool) -> Self {
126 self.timestamps = timestamps;
127 self
128 }
129
130 /// Sets the minimum sample rate.
131 ///
132 /// This is the minimum sample rate that is allowed for a metric payload. If the sample rate is less than this limit,
133 /// the sample rate is clamped to this value and a log message is emitted.
134 ///
135 /// Defaults to `0.000001`.
136 pub fn with_minimum_sample_rate(mut self, minimum_sample_rate: f64) -> Self {
137 self.minimum_sample_rate = minimum_sample_rate;
138 self
139 }
140}
141
142impl Default for DogstatsdCodecConfiguration {
143 fn default() -> Self {
144 Self {
145 maximum_tag_length: usize::MAX,
146 maximum_tag_count: usize::MAX,
147 timestamps: true,
148 permissive: false,
149 minimum_sample_rate: MINIMUM_SAFE_DEFAULT_SAMPLE_RATE,
150 }
151 }
152}
153
154/// A [DogStatsD][dsd] codec.
155///
156/// This codec is used to parse the DogStatsD protocol, which is a superset of the StatsD protocol. DogStatsD adds a
157/// number of additional features, such as the ability to specify tags, send histograms directly, send service checks
158/// and events (DataDog-specific), and more.
159///
160/// [dsd]: https://docs.datadoghq.com/developers/dogstatsd/
161#[derive(Clone, Debug)]
162pub struct DogstatsdCodec {
163 config: DogstatsdCodecConfiguration,
164}
165
166impl DogstatsdCodec {
167 /// Sets the given configuration for the codec.
168 ///
169 /// Different aspects of the codec's behavior (such as tag length, tag count, and timestamp parsing) can be
170 /// controlled through its configuration. See [`DogstatsdCodecConfiguration`] for more information.
171 pub fn from_configuration(config: DogstatsdCodecConfiguration) -> Self {
172 Self { config }
173 }
174
175 /// Decodes a DogStatsD packet from the given raw data.
176 ///
177 /// # Errors
178 ///
179 /// If the raw data is not a valid DogStatsD packet, an error is returned.
180 pub fn decode_packet<'a>(&self, data: &'a [u8]) -> Result<ParsedPacket<'a>, ParseError> {
181 match parse_message_type(data) {
182 MessageType::Event => self.decode_event(data).map(ParsedPacket::Event),
183 MessageType::ServiceCheck => self.decode_service_check(data).map(ParsedPacket::ServiceCheck),
184 MessageType::MetricSample => self.decode_metric(data).map(ParsedPacket::Metric),
185 }
186 }
187
188 fn decode_metric<'a>(&self, data: &'a [u8]) -> Result<MetricPacket<'a>, ParseError> {
189 // Decode the payload and get the representative parts of the metric.
190 // TODO: Can probably assert remaining is empty now.
191 let (_remaining, metric) = self::metric::parse_dogstatsd_metric(data, &self.config)?;
192 Ok(metric)
193 }
194
195 fn decode_event<'a>(&self, data: &'a [u8]) -> Result<EventPacket<'a>, ParseError> {
196 let (_remaining, event) = self::event::parse_dogstatsd_event(data, &self.config)?;
197 Ok(event)
198 }
199
200 fn decode_service_check<'a>(&self, data: &'a [u8]) -> Result<ServiceCheckPacket<'a>, ParseError> {
201 let (_remaining, service_check) = self::service_check::parse_dogstatsd_service_check(data, &self.config)?;
202 Ok(service_check)
203 }
204}