Skip to main content

saluki_io/deser/codec/dogstatsd/
mod.rs

1use std::fmt;
2
3mod event;
4pub use self::event::EventPacket;
5
6mod helpers;
7pub use self::helpers::{parse_message_type, MessageType};
8
9mod metric;
10pub use self::metric::MetricPacket;
11
12mod service_check;
13pub use self::service_check::ServiceCheckPacket;
14
15type NomParserError<'a> = nom::Err<nom::error::Error<&'a [u8]>>;
16
17// This is the lowest sample rate that we consider to be "safe" with typical DogStatsD default settings.
18//
19// Our logic here is:
20// - DogStatsD payloads are limited to 8KiB by default
21// - a valid distribution metric could have a multi-value payload with ~4093 values (value of `1`, when factoring for
22//   protocol overhead)
23// - to avoid overflow in resulting sketch, total count of all values must be less than or equal to 2^64
24// - 2^64 / 4093 = 4.5069006e+15.. which is really big
25// - our DDSketch implementation we write into, however, is effectively capped at ~270M (4096 bins max, `u16` for bin
26//   count, so 4096 * 2^16 = 268,435,456)
27// - we take 260M to be safe, which when calculating the sample rate, gives us 1 / 260,000,000, or 0.000000003845
28const MINIMUM_SAFE_DEFAULT_SAMPLE_RATE: f64 = 0.000000003845;
29
30/// Parser error.
31#[derive(Debug)]
32pub struct ParseError {
33    kind: nom::error::ErrorKind,
34    data: String,
35}
36
37impl fmt::Display for ParseError {
38    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
39        write!(
40            f,
41            "encountered error '{:?}' while processing message '{}'",
42            self.kind, self.data
43        )
44    }
45}
46
47impl std::error::Error for ParseError {}
48
49impl<'a> From<NomParserError<'a>> for ParseError {
50    fn from(err: NomParserError<'a>) -> Self {
51        match err {
52            nom::Err::Error(e) | nom::Err::Failure(e) => Self {
53                kind: e.code,
54                data: String::from_utf8_lossy(e.input).to_string(),
55            },
56            nom::Err::Incomplete(_) => unreachable!("DogStatsD codec only supports complete payloads"),
57        }
58    }
59}
60
61/// A DogStatsD packet.
62pub enum ParsedPacket<'a> {
63    /// Metric.
64    Metric(MetricPacket<'a>),
65
66    /// Event.
67    Event(EventPacket<'a>),
68
69    /// Service check.
70    ServiceCheck(ServiceCheckPacket<'a>),
71}
72
73/// DogStatsD codec configuration.
74#[derive(Clone, Debug)]
75pub struct DogStatsDCodecConfiguration {
76    permissive: bool,
77    maximum_tag_length: usize,
78    maximum_tag_count: usize,
79    timestamps: bool,
80    minimum_sample_rate: f64,
81    client_origin_detection: bool,
82}
83
84impl DogStatsDCodecConfiguration {
85    /// Sets whether or not the codec should operate in permissive mode.
86    ///
87    /// In permissive mode, the codec will attempt to parse as much of the input as possible, relying solely on
88    /// structural markers (specific delimiting characters) to determine the boundaries of different parts of the
89    /// payload. This allows for decoding payloads with invalid contents (for example, characters that are valid UTF-8, but
90    /// aren't within ASCII bounds, etc) such that the data plane can attempt to process them further.
91    ///
92    /// Permissive mode does not allow for decoding payloads with structural errors (for example, missing delimiters, etc) or
93    /// that cannot be safely handled internally (for example, invalid UTF-8 characters for the metric name or tags).
94    ///
95    /// Defaults to `false`.
96    pub fn with_permissive_mode(mut self, permissive: bool) -> Self {
97        self.permissive = permissive;
98        self
99    }
100
101    /// Sets the maximum tag length.
102    ///
103    /// This controls the number of bytes that are allowed for a single tag. If a tag exceeds this limit, it is
104    /// truncated to the closest previous UTF-8 character boundary, in order to preserve UTF-8 validity.
105    ///
106    /// Defaults to no limit.
107    pub fn with_maximum_tag_length(mut self, maximum_tag_length: usize) -> Self {
108        self.maximum_tag_length = maximum_tag_length;
109        self
110    }
111
112    /// Sets the maximum tag count.
113    ///
114    /// This is the maximum number of tags allowed for a single metric. If the number of tags exceeds this limit,
115    /// remaining tags are simply ignored.
116    ///
117    /// Defaults to no limit.
118    pub fn with_maximum_tag_count(mut self, maximum_tag_count: usize) -> Self {
119        self.maximum_tag_count = maximum_tag_count;
120        self
121    }
122
123    /// Sets whether or not timestamps are read from metrics.
124    ///
125    /// This is generally used in conjunction with aggregating metrics pipelines to control whether or not metrics are
126    /// able to specify their own timestamp in order to be forwarded immediately without aggregation.
127    ///
128    /// Defaults to `true`.
129    pub fn with_timestamps(mut self, timestamps: bool) -> Self {
130        self.timestamps = timestamps;
131        self
132    }
133
134    /// Sets the minimum sample rate.
135    ///
136    /// This is the minimum sample rate that is allowed for a metric payload. If the sample rate is less than this limit,
137    /// the sample rate is clamped to this value and a log message is emitted.
138    ///
139    /// Defaults to `0.000000003845`.
140    pub fn with_minimum_sample_rate(mut self, minimum_sample_rate: f64) -> Self {
141        self.minimum_sample_rate = minimum_sample_rate;
142        self
143    }
144
145    /// Sets whether client-provided origin detection fields are parsed.
146    ///
147    /// When disabled, the `c:` (Local Data), `e:` (External Data), and `card:` (Cardinality) fields are ignored even if
148    /// present in the payload.
149    ///
150    /// Defaults to `false`.
151    pub fn with_client_origin_detection(mut self, enabled: bool) -> Self {
152        self.client_origin_detection = enabled;
153        self
154    }
155}
156
157impl Default for DogStatsDCodecConfiguration {
158    fn default() -> Self {
159        Self {
160            maximum_tag_length: usize::MAX,
161            maximum_tag_count: usize::MAX,
162            timestamps: true,
163            permissive: false,
164            minimum_sample_rate: MINIMUM_SAFE_DEFAULT_SAMPLE_RATE,
165            client_origin_detection: false,
166        }
167    }
168}
169
170/// A [DogStatsD][dsd] codec.
171///
172/// This codec is used to parse the DogStatsD protocol, which is a superset of the StatsD protocol. DogStatsD adds a
173/// number of additional features, such as the ability to specify tags, send histograms directly, send service checks
174/// and events (DataDog-specific), and more.
175///
176/// [dsd]: https://docs.datadoghq.com/developers/dogstatsd/
177#[derive(Clone, Debug)]
178pub struct DogStatsDCodec {
179    config: DogStatsDCodecConfiguration,
180}
181
182impl DogStatsDCodec {
183    /// Sets the given configuration for the codec.
184    ///
185    /// Different aspects of the codec's behavior (such as tag length, tag count, and timestamp parsing) can be
186    /// controlled through its configuration. See [`DogStatsDCodecConfiguration`] for more information.
187    pub fn from_configuration(config: DogStatsDCodecConfiguration) -> Self {
188        Self { config }
189    }
190
191    /// Decodes a DogStatsD packet from the given raw data.
192    ///
193    /// # Errors
194    ///
195    /// If the raw data is not a valid DogStatsD packet, an error is returned.
196    pub fn decode_packet<'a>(&self, data: &'a [u8]) -> Result<ParsedPacket<'a>, ParseError> {
197        match parse_message_type(data) {
198            MessageType::Event => self.decode_event(data).map(ParsedPacket::Event),
199            MessageType::ServiceCheck => self.decode_service_check(data).map(ParsedPacket::ServiceCheck),
200            MessageType::MetricSample => self.decode_metric(data).map(ParsedPacket::Metric),
201        }
202    }
203
204    fn decode_metric<'a>(&self, data: &'a [u8]) -> Result<MetricPacket<'a>, ParseError> {
205        // Decode the payload and get the representative parts of the metric.
206        // TODO: Can probably assert remaining is empty now.
207        let (_remaining, metric) = self::metric::parse_dogstatsd_metric(data, &self.config)?;
208        Ok(metric)
209    }
210
211    fn decode_event<'a>(&self, data: &'a [u8]) -> Result<EventPacket<'a>, ParseError> {
212        let (_remaining, event) = self::event::parse_dogstatsd_event(data, &self.config)?;
213        Ok(event)
214    }
215
216    fn decode_service_check<'a>(&self, data: &'a [u8]) -> Result<ServiceCheckPacket<'a>, ParseError> {
217        let (_remaining, service_check) = self::service_check::parse_dogstatsd_service_check(data, &self.config)?;
218        Ok(service_check)
219    }
220}