saluki_io/deser/codec/dogstatsd/
mod.rs

1use std::fmt;
2
3mod event;
4pub use self::event::EventPacket;
5
6mod helpers;
7pub use self::helpers::{parse_message_type, MessageType};
8
9mod metric;
10pub use self::metric::MetricPacket;
11
12mod service_check;
13pub use self::service_check::ServiceCheckPacket;
14
15type NomParserError<'a> = nom::Err<nom::error::Error<&'a [u8]>>;
16
17/// Parser error.
18#[derive(Debug)]
19pub struct ParseError {
20    kind: nom::error::ErrorKind,
21    data: String,
22}
23
24impl fmt::Display for ParseError {
25    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
26        write!(
27            f,
28            "encountered error '{:?}' while processing message '{}'",
29            self.kind, self.data
30        )
31    }
32}
33
34impl std::error::Error for ParseError {}
35
36impl<'a> From<NomParserError<'a>> for ParseError {
37    fn from(err: NomParserError<'a>) -> Self {
38        match err {
39            nom::Err::Error(e) | nom::Err::Failure(e) => Self {
40                kind: e.code,
41                data: String::from_utf8_lossy(e.input).to_string(),
42            },
43            nom::Err::Incomplete(_) => unreachable!("dogstatsd codec only supports complete payloads"),
44        }
45    }
46}
47
48/// A DogStatsD packet.
49pub enum ParsedPacket<'a> {
50    /// Metric.
51    Metric(MetricPacket<'a>),
52
53    /// Event.
54    Event(EventPacket<'a>),
55
56    /// Service check.
57    ServiceCheck(ServiceCheckPacket<'a>),
58}
59
60/// DogStatsD codec configuration.
61#[derive(Clone, Debug)]
62pub struct DogstatsdCodecConfiguration {
63    permissive: bool,
64    maximum_tag_length: usize,
65    maximum_tag_count: usize,
66    timestamps: bool,
67}
68
69impl DogstatsdCodecConfiguration {
70    /// Sets whether or not the codec should operate in permissive mode.
71    ///
72    /// In permissive mode, the codec will attempt to parse as much of the input as possible, relying solely on
73    /// structural markers (specific delimiting characters) to determine the boundaries of different parts of the
74    /// payload. This allows for decoding payloads with invalid contents (e.g., characters that are valid UTF-8, but
75    /// aren't within ASCII bounds, etc) such that the data plane can attempt to process them further.
76    ///
77    /// Permissive mode does not allow for decoding payloads with structural errors (e.g., missing delimiters, etc) or
78    /// that cannot be safely handled internally (e.g., invalid UTF-8 characters for the metric name or tags).
79    ///
80    /// Defaults to `false`.
81    pub fn with_permissive_mode(mut self, permissive: bool) -> Self {
82        self.permissive = permissive;
83        self
84    }
85
86    /// Sets the maximum tag length.
87    ///
88    /// This controls the number of bytes that are allowed for a single tag. If a tag exceeds this limit, it is
89    /// truncated to the closest previous UTF-8 character boundary, in order to preserve UTF-8 validity.
90    ///
91    /// Defaults to no limit.
92    pub fn with_maximum_tag_length(mut self, maximum_tag_length: usize) -> Self {
93        self.maximum_tag_length = maximum_tag_length;
94        self
95    }
96
97    /// Sets the maximum tag count.
98    ///
99    /// This is the maximum number of tags allowed for a single metric. If the number of tags exceeds this limit,
100    /// remaining tags are simply ignored.
101    ///
102    /// Defaults to no limit.
103    pub fn with_maximum_tag_count(mut self, maximum_tag_count: usize) -> Self {
104        self.maximum_tag_count = maximum_tag_count;
105        self
106    }
107
108    /// Sets whether or not timestamps are read from metrics.
109    ///
110    /// This is generally used in conjunction with aggregating metrics pipelines to control whether or not metrics are
111    /// able to specify their own timestamp in order to be forwarded immediately without aggregation.
112    ///
113    /// Defaults to `true`.
114    pub fn with_timestamps(mut self, timestamps: bool) -> Self {
115        self.timestamps = timestamps;
116        self
117    }
118}
119
120impl Default for DogstatsdCodecConfiguration {
121    fn default() -> Self {
122        Self {
123            maximum_tag_length: usize::MAX,
124            maximum_tag_count: usize::MAX,
125            timestamps: true,
126            permissive: false,
127        }
128    }
129}
130
131/// A [DogStatsD][dsd] codec.
132///
133/// This codec is used to parse the DogStatsD protocol, which is a superset of the StatsD protocol. DogStatsD adds a
134/// number of additional features, such as the ability to specify tags, send histograms directly, send service checks
135/// and events (DataDog-specific), and more.
136///
137/// [dsd]: https://docs.datadoghq.com/developers/dogstatsd/
138#[derive(Clone, Debug)]
139pub struct DogstatsdCodec {
140    config: DogstatsdCodecConfiguration,
141}
142
143impl DogstatsdCodec {
144    /// Sets the given configuration for the codec.
145    ///
146    /// Different aspects of the codec's behavior (such as tag length, tag count, and timestamp parsing) can be
147    /// controlled through its configuration. See [`DogstatsdCodecConfiguration`] for more information.
148    pub fn from_configuration(config: DogstatsdCodecConfiguration) -> Self {
149        Self { config }
150    }
151
152    /// Decodes a DogStatsD packet from the given raw data.
153    ///
154    /// # Errors
155    ///
156    /// If the raw data is not a valid DogStatsD packet, an error is returned.
157    pub fn decode_packet<'a>(&self, data: &'a [u8]) -> Result<ParsedPacket<'a>, ParseError> {
158        match parse_message_type(data) {
159            MessageType::Event => self.decode_event(data).map(ParsedPacket::Event),
160            MessageType::ServiceCheck => self.decode_service_check(data).map(ParsedPacket::ServiceCheck),
161            MessageType::MetricSample => self.decode_metric(data).map(ParsedPacket::Metric),
162        }
163    }
164
165    fn decode_metric<'a>(&self, data: &'a [u8]) -> Result<MetricPacket<'a>, ParseError> {
166        // Decode the payload and get the representative parts of the metric.
167        // TODO: Can probably assert remaining is empty now.
168        let (_remaining, metric) = self::metric::parse_dogstatsd_metric(data, &self.config)?;
169        Ok(metric)
170    }
171
172    fn decode_event<'a>(&self, data: &'a [u8]) -> Result<EventPacket<'a>, ParseError> {
173        let (_remaining, event) = self::event::parse_dogstatsd_event(data, &self.config)?;
174        Ok(event)
175    }
176
177    fn decode_service_check<'a>(&self, data: &'a [u8]) -> Result<ServiceCheckPacket<'a>, ParseError> {
178        let (_remaining, service_check) = self::service_check::parse_dogstatsd_service_check(data, &self.config)?;
179        Ok(service_check)
180    }
181}