saluki_io/deser/codec/dogstatsd/
mod.rs1use std::fmt;
2
3mod event;
4pub use self::event::EventPacket;
5
6mod helpers;
7pub use self::helpers::{parse_message_type, MessageType};
8
9mod metric;
10pub use self::metric::MetricPacket;
11
12mod service_check;
13pub use self::service_check::ServiceCheckPacket;
14
15type NomParserError<'a> = nom::Err<nom::error::Error<&'a [u8]>>;
16
17#[derive(Debug)]
19pub struct ParseError {
20 kind: nom::error::ErrorKind,
21 data: String,
22}
23
24impl fmt::Display for ParseError {
25 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
26 write!(
27 f,
28 "encountered error '{:?}' while processing message '{}'",
29 self.kind, self.data
30 )
31 }
32}
33
34impl std::error::Error for ParseError {}
35
36impl<'a> From<NomParserError<'a>> for ParseError {
37 fn from(err: NomParserError<'a>) -> Self {
38 match err {
39 nom::Err::Error(e) | nom::Err::Failure(e) => Self {
40 kind: e.code,
41 data: String::from_utf8_lossy(e.input).to_string(),
42 },
43 nom::Err::Incomplete(_) => unreachable!("dogstatsd codec only supports complete payloads"),
44 }
45 }
46}
47
48pub enum ParsedPacket<'a> {
50 Metric(MetricPacket<'a>),
52
53 Event(EventPacket<'a>),
55
56 ServiceCheck(ServiceCheckPacket<'a>),
58}
59
60#[derive(Clone, Debug)]
62pub struct DogstatsdCodecConfiguration {
63 permissive: bool,
64 maximum_tag_length: usize,
65 maximum_tag_count: usize,
66 timestamps: bool,
67}
68
69impl DogstatsdCodecConfiguration {
70 pub fn with_permissive_mode(mut self, permissive: bool) -> Self {
82 self.permissive = permissive;
83 self
84 }
85
86 pub fn with_maximum_tag_length(mut self, maximum_tag_length: usize) -> Self {
93 self.maximum_tag_length = maximum_tag_length;
94 self
95 }
96
97 pub fn with_maximum_tag_count(mut self, maximum_tag_count: usize) -> Self {
104 self.maximum_tag_count = maximum_tag_count;
105 self
106 }
107
108 pub fn with_timestamps(mut self, timestamps: bool) -> Self {
115 self.timestamps = timestamps;
116 self
117 }
118}
119
120impl Default for DogstatsdCodecConfiguration {
121 fn default() -> Self {
122 Self {
123 maximum_tag_length: usize::MAX,
124 maximum_tag_count: usize::MAX,
125 timestamps: true,
126 permissive: false,
127 }
128 }
129}
130
131#[derive(Clone, Debug)]
139pub struct DogstatsdCodec {
140 config: DogstatsdCodecConfiguration,
141}
142
143impl DogstatsdCodec {
144 pub fn from_configuration(config: DogstatsdCodecConfiguration) -> Self {
149 Self { config }
150 }
151
152 pub fn decode_packet<'a>(&self, data: &'a [u8]) -> Result<ParsedPacket<'a>, ParseError> {
158 match parse_message_type(data) {
159 MessageType::Event => self.decode_event(data).map(ParsedPacket::Event),
160 MessageType::ServiceCheck => self.decode_service_check(data).map(ParsedPacket::ServiceCheck),
161 MessageType::MetricSample => self.decode_metric(data).map(ParsedPacket::Metric),
162 }
163 }
164
165 fn decode_metric<'a>(&self, data: &'a [u8]) -> Result<MetricPacket<'a>, ParseError> {
166 let (_remaining, metric) = self::metric::parse_dogstatsd_metric(data, &self.config)?;
169 Ok(metric)
170 }
171
172 fn decode_event<'a>(&self, data: &'a [u8]) -> Result<EventPacket<'a>, ParseError> {
173 let (_remaining, event) = self::event::parse_dogstatsd_event(data, &self.config)?;
174 Ok(event)
175 }
176
177 fn decode_service_check<'a>(&self, data: &'a [u8]) -> Result<ServiceCheckPacket<'a>, ParseError> {
178 let (_remaining, service_check) = self::service_check::parse_dogstatsd_service_check(data, &self.config)?;
179 Ok(service_check)
180 }
181}