saluki_io/deser/framing/
mod.rs

1use bytes::Bytes;
2use snafu::Snafu;
3use tracing::trace;
4
5use crate::buf::ReadIoBuffer;
6
7mod length_delimited;
8pub use self::length_delimited::LengthDelimitedFramer;
9
10mod newline;
11pub use self::newline::NewlineFramer;
12
13/// Framing error.
14#[derive(Debug, Snafu, Eq, PartialEq)]
15#[snafu(context(suffix(false)))]
16pub enum FramingError {
17    /// An invalid frame was received.
18    ///
19    /// This generally occurs if the frame is corrupted in some way, due to a bug in how the frame was encoded or
20    /// sent/received. For example, if a length-delimited frame indicates that the frame is larger than the buffer can
21    /// handle, it generally indicates that frame was created incorrectly by not respecting the maximum frame length
22    /// limitations, or the buffer is corrupt and spurious bytes are contributing to a decoded frame length that is
23    /// nonsensical.
24    #[snafu(display("invalid frame (frame length: {}, {})", frame_len, reason))]
25    InvalidFrame { frame_len: usize, reason: &'static str },
26
27    /// Failed to read frame due to a partial frame after reaching EOF.
28    ///
29    /// This generally only occurs if the peer closes their connection before sending the entire frame, or if a partial
30    /// write occurs on a connectionless stream, such as UDP, perhaps due to fragmentation.
31    #[snafu(display(
32        "partial frame after EOF (needed {} bytes, but only {} bytes remaining)",
33        needed,
34        remaining
35    ))]
36    PartialFrame { needed: usize, remaining: usize },
37}
38
39/// A trait for reading framed messages from a buffer.
40pub trait Framer {
41    /// Attempt to extract the next frame from the buffer.
42    ///
43    /// If enough data was present to extract a frame, `Ok(Some(frame))` is returned. If not enough data was present, and
44    /// EOF has not been reached, `Ok(None)` is returned.
45    ///
46    /// Behavior when EOF is reached is framer-specific and in some cases may allow for decoding a frame even when the
47    /// inherent delimiting data is not present.
48    ///
49    /// # Errors
50    ///
51    /// If an error is detected when reading the next frame, an error is returned.
52    fn next_frame<B: ReadIoBuffer>(&mut self, buf: &mut B, is_eof: bool) -> Result<Option<Bytes>, FramingError>;
53}
54
55/// A nested framer that extracts inner frames from outer frames.
56///
57/// This framer takes two input framers -- the "outer" and "inner" framers -- and extracts outer frames, and once an
58/// outer frame has been extract, extracts as many inner frames from the outer frame as possible. Callers deal
59/// exclusively with the extracted inner frames.
60pub struct NestedFramer<Inner, Outer> {
61    inner: Inner,
62    outer: Outer,
63    current_outer_frame: Option<Bytes>,
64}
65
66impl<Inner, Outer> NestedFramer<Inner, Outer> {
67    /// Creates a new `NestedFramer` from the given inner and outer framers.
68    pub fn new(inner: Inner, outer: Outer) -> Self {
69        Self {
70            inner,
71            outer,
72            current_outer_frame: None,
73        }
74    }
75}
76
77impl<Inner, Outer> Framer for NestedFramer<Inner, Outer>
78where
79    Inner: Framer,
80    Outer: Framer,
81{
82    fn next_frame<B: ReadIoBuffer>(&mut self, buf: &mut B, is_eof: bool) -> Result<Option<Bytes>, FramingError> {
83        loop {
84            // Take our current outer frame, or if we have none, try to get the next one.
85            let outer_frame = match self.current_outer_frame.as_mut() {
86                Some(frame) => {
87                    trace!(
88                        buf_len = buf.remaining(),
89                        frame_len = frame.len(),
90                        "Using existing outer frame."
91                    );
92
93                    frame
94                }
95                None => {
96                    trace!(buf_len = buf.remaining(), "No existing outer frame.");
97
98                    match self.outer.next_frame(buf, is_eof)? {
99                        Some(frame) => {
100                            trace!(
101                                buf_len = buf.remaining(),
102                                frame_len = frame.len(),
103                                ?frame,
104                                "Extracted outer frame."
105                            );
106
107                            self.current_outer_frame.get_or_insert(frame)
108                        }
109
110                        // If we can't get another outer frame, then we're done for now.
111                        None => return Ok(None),
112                    }
113                }
114            };
115
116            // Try to get the next inner frame.
117            match self.inner.next_frame(outer_frame, true)? {
118                Some(frame) => {
119                    trace!(
120                        buf_len = buf.remaining(),
121                        outer_frame_len = outer_frame.len(),
122                        inner_frame_len = frame.len(),
123                        "Extracted inner frame."
124                    );
125
126                    return Ok(Some(frame));
127                }
128                None => {
129                    // We can't get anything else from our inner frame. If our outer frame is empty, and our input buffer
130                    // isn't empty, clear the current outer frame so that we can try to grab the next one.
131                    trace!(
132                        buf_len = buf.remaining(),
133                        outer_frame_len = outer_frame.len(),
134                        "Couldn't extract inner frame from existing outer frame."
135                    );
136
137                    if outer_frame.is_empty() && buf.remaining() != 0 {
138                        self.current_outer_frame = None;
139                        continue;
140                    } else {
141                        return Ok(None);
142                    }
143                }
144            }
145        }
146    }
147}
148
149/// An iterator of framed messages over a generic buffer.
150pub struct Framed<'a, F, B> {
151    framer: &'a mut F,
152    buffer: &'a mut B,
153    is_eof: bool,
154}
155
156impl<F, B> Iterator for Framed<'_, F, B>
157where
158    F: Framer,
159    B: ReadIoBuffer,
160{
161    type Item = Result<Bytes, FramingError>;
162
163    fn next(&mut self) -> Option<Self::Item> {
164        self.framer.next_frame(self.buffer, self.is_eof).transpose()
165    }
166}
167
168/// Extension trait for ergonomically working with framers and buffers.
169pub trait FramerExt {
170    /// Creates a new `Framed` iterator over the buffer, using the given framer.
171    ///
172    /// Returns an iterator that extracts frames from the given buffer, consuming the bytes from the buffer as frames
173    /// are yielded.
174    fn framed<'a, F>(&'a mut self, framer: &'a mut F, is_eof: bool) -> Framed<'a, F, Self>
175    where
176        Self: ReadIoBuffer + Sized,
177        F: Framer;
178}
179
180impl<B> FramerExt for B
181where
182    B: ReadIoBuffer,
183{
184    fn framed<'a, F>(&'a mut self, framer: &'a mut F, is_eof: bool) -> Framed<'a, F, Self> {
185        Framed {
186            framer,
187            buffer: self,
188            is_eof,
189        }
190    }
191}
192
193#[cfg(test)]
194mod tests {
195    use std::collections::VecDeque;
196
197    use super::{Framer as _, LengthDelimitedFramer, NestedFramer, NewlineFramer};
198
199    #[test]
200    fn nested_framer_single_outer_multiple_inner() {
201        let input_frames = &[b"frame1", b"frame2", b"frame3"];
202
203        // We create a framer that does length-delimited payloads as the outer layer, and newline-delimited payloads as
204        // the inner layer.
205        let mut framer = NestedFramer::new(NewlineFramer::default(), LengthDelimitedFramer);
206
207        // Create a buffer that has a single length-delimited frame with three newline-delimited frames inside of that.
208        let mut inner_frames = Vec::new();
209
210        for inner_frame_data in input_frames {
211            inner_frames.extend_from_slice(&inner_frame_data[..]);
212            inner_frames.push(b'\n');
213        }
214
215        let mut buf = VecDeque::new();
216        buf.extend(&(inner_frames.len() as u32).to_le_bytes());
217        buf.extend(inner_frames);
218
219        // Now we should be able to extract our original three frames from the buffer.
220        for input_frame in input_frames {
221            let frame = framer
222                .next_frame(&mut buf, false)
223                .expect("should not fail to read from payload")
224                .expect("should not fail to extract frame from payload");
225            assert_eq!(&frame[..], &input_frame[..]);
226        }
227
228        let maybe_frame = framer
229            .next_frame(&mut buf, false)
230            .expect("should not fail to read from payload");
231        assert!(maybe_frame.is_none());
232
233        // We should have consumed the entire buffer.
234        assert!(buf.is_empty());
235    }
236
237    #[test]
238    fn nested_framer_multiple_outer_single_inner() {
239        let input_frames = &[b"frame1", b"frame2", b"frame3"];
240
241        // We create a framer that does length-delimited payloads as the outer layer, and newline-delimited payloads as
242        // the inner layer.
243        let mut framer = NestedFramer::new(NewlineFramer::default(), LengthDelimitedFramer);
244
245        // Create a buffer that has a three length-delimited frames with a single newline-delimited frame inside.
246        let mut buf = VecDeque::new();
247
248        for inner_frame_data in input_frames {
249            let mut inner_frame = Vec::new();
250            inner_frame.extend_from_slice(&inner_frame_data[..]);
251            inner_frame.push(b'\n');
252
253            buf.extend(&(inner_frame.len() as u32).to_le_bytes());
254            buf.extend(inner_frame);
255        }
256
257        // Now we should be able to extract our original three frames from the buffer.
258        for input_frame in input_frames {
259            let frame = framer
260                .next_frame(&mut buf, false)
261                .expect("should not fail to read from payload")
262                .expect("should not fail to extract frame from payload");
263            assert_eq!(&frame[..], &input_frame[..]);
264        }
265
266        let maybe_frame = framer
267            .next_frame(&mut buf, false)
268            .expect("should not fail to read from payload");
269        assert!(maybe_frame.is_none());
270
271        // We should have consumed the entire buffer.
272        assert!(buf.is_empty());
273    }
274
275    #[test]
276    fn nested_framer_multiple_outer_multiple_inner() {
277        let input_frames = &[b"frame1", b"frame2", b"frame3", b"frame4", b"frame5", b"frame6"];
278
279        // We create a framer that does length-delimited payloads as the outer layer, and newline-delimited payloads as
280        // the inner layer.
281        let mut framer = NestedFramer::new(NewlineFramer::default(), LengthDelimitedFramer);
282
283        // Create a buffer that has a three length-delimited frames with two newline-delimited frames inside.
284        let mut buf = VecDeque::new();
285
286        for inner_frame_data in input_frames.chunks(2) {
287            let mut inner_frames = Vec::new();
288            inner_frames.extend_from_slice(&inner_frame_data[0][..]);
289            inner_frames.push(b'\n');
290            inner_frames.extend_from_slice(&inner_frame_data[1][..]);
291            inner_frames.push(b'\n');
292
293            buf.extend(&(inner_frames.len() as u32).to_le_bytes());
294            buf.extend(inner_frames);
295        }
296
297        // Now we should be able to extract our original six frames from the buffer.
298        for input_frame in input_frames {
299            let frame = framer
300                .next_frame(&mut buf, false)
301                .expect("should not fail to read from payload")
302                .expect("should not fail to extract frame from payload");
303            assert_eq!(&frame[..], &input_frame[..]);
304        }
305
306        let maybe_frame = framer
307            .next_frame(&mut buf, false)
308            .expect("should not fail to read from payload");
309        assert!(maybe_frame.is_none());
310
311        // We should have consumed the entire buffer.
312        assert!(buf.is_empty());
313    }
314}