Skip to main content

saluki_io/deser/framing/
mod.rs

1use bytes::Bytes;
2use snafu::Snafu;
3use tracing::trace;
4
5use crate::buf::ReadIoBuffer;
6
7mod length_delimited;
8pub use self::length_delimited::LengthDelimitedFramer;
9
10mod newline;
11pub use self::newline::NewlineFramer;
12
13/// Framing error.
14#[derive(Debug, Snafu, Eq, PartialEq)]
15#[snafu(context(suffix(false)))]
16pub enum FramingError {
17    /// An invalid frame was received.
18    ///
19    /// This generally occurs if the frame is corrupted in some way, due to a bug in how the frame was encoded or
20    /// sent/received. For example, if a length-delimited frame indicates that the frame is larger than the buffer can
21    /// handle, it generally indicates that frame was created incorrectly by not respecting the maximum frame length
22    /// limitations, or the buffer is corrupt and spurious bytes are contributing to a decoded frame length that's
23    /// nonsensical.
24    #[snafu(display("invalid frame (frame length: {}, {})", frame_len, reason))]
25    InvalidFrame { frame_len: usize, reason: &'static str },
26
27    /// Failed to read frame due to a partial frame after reaching EOF.
28    ///
29    /// This generally only occurs if the peer closes their connection before sending the entire frame, or if a partial
30    /// write occurs on a connectionless stream, such as UDP, perhaps due to fragmentation.
31    #[snafu(display(
32        "partial frame after EOF (needed {} bytes, but only {} bytes remaining)",
33        needed,
34        remaining
35    ))]
36    PartialFrame { needed: usize, remaining: usize },
37}
38
39/// A trait for reading framed messages from a buffer.
40pub trait Framer {
41    /// Attempt to extract the next frame from the buffer.
42    ///
43    /// If enough data was present to extract a frame, `Ok(Some(frame))` is returned. If not enough data was present, and
44    /// EOF hasn't been reached, `Ok(None)` is returned.
45    ///
46    /// Behavior when EOF is reached is framer-specific and in some cases may allow for decoding a frame even when the
47    /// inherent delimiting data isn't present.
48    ///
49    /// # Errors
50    ///
51    /// If an error is detected when reading the next frame, an error is returned.
52    fn next_frame<B: ReadIoBuffer>(&mut self, buf: &mut B, is_eof: bool) -> Result<Option<Bytes>, FramingError>;
53}
54
55/// A nested framer that extracts inner frames from outer frames.
56///
57/// This framer takes two input framers -- the "outer" and "inner" framers -- and extracts outer frames, and once an
58/// outer frame has been extract, extracts as many inner frames from the outer frame as possible. Callers deal
59/// exclusively with the extracted inner frames.
60pub struct NestedFramer<Inner, Outer> {
61    inner: Inner,
62    outer: Outer,
63    current_outer_frame: Option<Bytes>,
64    completed_outer_frames: usize,
65}
66
67impl<Inner, Outer> NestedFramer<Inner, Outer> {
68    /// Creates a new `NestedFramer` from the given inner and outer framers.
69    pub fn new(inner: Inner, outer: Outer) -> Self {
70        Self {
71            inner,
72            outer,
73            current_outer_frame: None,
74            completed_outer_frames: 0,
75        }
76    }
77
78    /// Returns the number of outer frames that have been fully consumed since the last call.
79    pub fn take_completed_outer_frames(&mut self) -> usize {
80        let completed = self.completed_outer_frames;
81        self.completed_outer_frames = 0;
82        completed
83    }
84}
85
86impl<Inner, Outer> Framer for NestedFramer<Inner, Outer>
87where
88    Inner: Framer,
89    Outer: Framer,
90{
91    fn next_frame<B: ReadIoBuffer>(&mut self, buf: &mut B, is_eof: bool) -> Result<Option<Bytes>, FramingError> {
92        loop {
93            // Take our current outer frame, or if we have none, try to get the next one.
94            let outer_frame = match self.current_outer_frame.as_mut() {
95                Some(frame) => {
96                    trace!(
97                        buf_len = buf.remaining(),
98                        frame_len = frame.len(),
99                        "Using existing outer frame."
100                    );
101
102                    frame
103                }
104                None => {
105                    trace!(buf_len = buf.remaining(), "No existing outer frame.");
106
107                    match self.outer.next_frame(buf, is_eof)? {
108                        Some(frame) => {
109                            trace!(
110                                buf_len = buf.remaining(),
111                                frame_len = frame.len(),
112                                ?frame,
113                                "Extracted outer frame."
114                            );
115
116                            self.current_outer_frame.get_or_insert(frame)
117                        }
118
119                        // If we can't get another outer frame, then we're done for now.
120                        None => return Ok(None),
121                    }
122                }
123            };
124
125            // Try to get the next inner frame.
126            match self.inner.next_frame(outer_frame, true)? {
127                Some(frame) => {
128                    let completed_outer_frame = outer_frame.is_empty();
129
130                    trace!(
131                        buf_len = buf.remaining(),
132                        outer_frame_len = outer_frame.len(),
133                        inner_frame_len = frame.len(),
134                        "Extracted inner frame."
135                    );
136
137                    if completed_outer_frame {
138                        self.current_outer_frame = None;
139                        self.completed_outer_frames += 1;
140                    }
141
142                    return Ok(Some(frame));
143                }
144                None => {
145                    // We can't get anything else from our inner frame. If our outer frame is empty, and our input buffer
146                    // isn't empty, clear the current outer frame so that we can try to grab the next one.
147                    trace!(
148                        buf_len = buf.remaining(),
149                        outer_frame_len = outer_frame.len(),
150                        "Couldn't extract inner frame from existing outer frame."
151                    );
152
153                    if outer_frame.is_empty() {
154                        self.current_outer_frame = None;
155                        self.completed_outer_frames += 1;
156                        if buf.remaining() != 0 {
157                            continue;
158                        }
159                    }
160
161                    return Ok(None);
162                }
163            }
164        }
165    }
166}
167
168/// An iterator of framed messages over a generic buffer.
169pub struct Framed<'a, F, B> {
170    framer: &'a mut F,
171    buffer: &'a mut B,
172    is_eof: bool,
173}
174
175impl<F, B> Iterator for Framed<'_, F, B>
176where
177    F: Framer,
178    B: ReadIoBuffer,
179{
180    type Item = Result<Bytes, FramingError>;
181
182    fn next(&mut self) -> Option<Self::Item> {
183        self.framer.next_frame(self.buffer, self.is_eof).transpose()
184    }
185}
186
187/// Extension trait for ergonomically working with framers and buffers.
188pub trait FramerExt {
189    /// Creates a new `Framed` iterator over the buffer, using the given framer.
190    ///
191    /// Returns an iterator that extracts frames from the given buffer, consuming the bytes from the buffer as frames
192    /// are yielded.
193    fn framed<'a, F>(&'a mut self, framer: &'a mut F, is_eof: bool) -> Framed<'a, F, Self>
194    where
195        Self: ReadIoBuffer + Sized,
196        F: Framer;
197}
198
199impl<B> FramerExt for B
200where
201    B: ReadIoBuffer,
202{
203    fn framed<'a, F>(&'a mut self, framer: &'a mut F, is_eof: bool) -> Framed<'a, F, Self> {
204        Framed {
205            framer,
206            buffer: self,
207            is_eof,
208        }
209    }
210}
211
212#[cfg(test)]
213mod tests {
214    use std::collections::VecDeque;
215
216    use super::{Framer as _, LengthDelimitedFramer, NestedFramer, NewlineFramer};
217
218    #[test]
219    fn nested_framer_single_outer_multiple_inner() {
220        let input_frames = &[b"frame1", b"frame2", b"frame3"];
221
222        // We create a framer that does length-delimited payloads as the outer layer, and newline-delimited payloads as
223        // the inner layer.
224        let mut framer = NestedFramer::new(NewlineFramer::default(), LengthDelimitedFramer);
225
226        // Create a buffer that has a single length-delimited frame with three newline-delimited frames inside of that.
227        let mut inner_frames = Vec::new();
228
229        for inner_frame_data in input_frames {
230            inner_frames.extend_from_slice(&inner_frame_data[..]);
231            inner_frames.push(b'\n');
232        }
233
234        let mut buf = VecDeque::new();
235        buf.extend(&(inner_frames.len() as u32).to_le_bytes());
236        buf.extend(inner_frames);
237
238        // Now we should be able to extract our original three frames from the buffer.
239        for input_frame in input_frames {
240            let frame = framer
241                .next_frame(&mut buf, false)
242                .expect("should not fail to read from payload")
243                .expect("should not fail to extract frame from payload");
244            assert_eq!(&frame[..], &input_frame[..]);
245        }
246
247        let maybe_frame = framer
248            .next_frame(&mut buf, false)
249            .expect("should not fail to read from payload");
250        assert!(maybe_frame.is_none());
251
252        // We should have consumed the entire buffer.
253        assert!(buf.is_empty());
254    }
255
256    #[test]
257    fn nested_framer_multiple_outer_single_inner() {
258        let input_frames = &[b"frame1", b"frame2", b"frame3"];
259
260        // We create a framer that does length-delimited payloads as the outer layer, and newline-delimited payloads as
261        // the inner layer.
262        let mut framer = NestedFramer::new(NewlineFramer::default(), LengthDelimitedFramer);
263
264        // Create a buffer that has a three length-delimited frames with a single newline-delimited frame inside.
265        let mut buf = VecDeque::new();
266
267        for inner_frame_data in input_frames {
268            let mut inner_frame = Vec::new();
269            inner_frame.extend_from_slice(&inner_frame_data[..]);
270            inner_frame.push(b'\n');
271
272            buf.extend(&(inner_frame.len() as u32).to_le_bytes());
273            buf.extend(inner_frame);
274        }
275
276        // Now we should be able to extract our original three frames from the buffer.
277        for input_frame in input_frames {
278            let frame = framer
279                .next_frame(&mut buf, false)
280                .expect("should not fail to read from payload")
281                .expect("should not fail to extract frame from payload");
282            assert_eq!(&frame[..], &input_frame[..]);
283        }
284
285        let maybe_frame = framer
286            .next_frame(&mut buf, false)
287            .expect("should not fail to read from payload");
288        assert!(maybe_frame.is_none());
289
290        // We should have consumed the entire buffer.
291        assert!(buf.is_empty());
292    }
293
294    #[test]
295    fn nested_framer_tracks_completed_outer_frames() {
296        let input_frames = &[b"frame1", b"frame2", b"frame3"];
297        let mut framer = NestedFramer::new(NewlineFramer::default(), LengthDelimitedFramer);
298        let mut buf = VecDeque::new();
299
300        for inner_frame_data in input_frames {
301            let mut inner_frame = Vec::new();
302            inner_frame.extend_from_slice(&inner_frame_data[..]);
303            inner_frame.push(b'\n');
304
305            buf.extend(&(inner_frame.len() as u32).to_le_bytes());
306            buf.extend(inner_frame);
307        }
308
309        for input_frame in input_frames {
310            let frame = framer
311                .next_frame(&mut buf, false)
312                .expect("should not fail to read from payload")
313                .expect("should extract frame from payload");
314            assert_eq!(&frame[..], &input_frame[..]);
315            assert_eq!(framer.take_completed_outer_frames(), 1);
316        }
317
318        assert!(buf.is_empty());
319        assert_eq!(framer.take_completed_outer_frames(), 0);
320    }
321
322    #[test]
323    fn nested_framer_waits_to_complete_outer_frame_until_last_inner_frame() {
324        let mut framer = NestedFramer::new(NewlineFramer::default(), LengthDelimitedFramer);
325        let mut inner_frames = Vec::new();
326        inner_frames.extend_from_slice(b"frame1\nframe2\n");
327
328        let mut buf = VecDeque::new();
329        buf.extend(&(inner_frames.len() as u32).to_le_bytes());
330        buf.extend(inner_frames);
331
332        let first_frame = framer
333            .next_frame(&mut buf, false)
334            .expect("should not fail to read from payload")
335            .expect("should extract first frame from payload");
336        assert_eq!(&first_frame[..], b"frame1");
337        assert_eq!(framer.take_completed_outer_frames(), 0);
338
339        let second_frame = framer
340            .next_frame(&mut buf, false)
341            .expect("should not fail to read from payload")
342            .expect("should extract second frame from payload");
343        assert_eq!(&second_frame[..], b"frame2");
344        assert_eq!(framer.take_completed_outer_frames(), 1);
345    }
346
347    #[test]
348    fn nested_framer_multiple_outer_multiple_inner() {
349        let input_frames = &[b"frame1", b"frame2", b"frame3", b"frame4", b"frame5", b"frame6"];
350
351        // We create a framer that does length-delimited payloads as the outer layer, and newline-delimited payloads as
352        // the inner layer.
353        let mut framer = NestedFramer::new(NewlineFramer::default(), LengthDelimitedFramer);
354
355        // Create a buffer that has a three length-delimited frames with two newline-delimited frames inside.
356        let mut buf = VecDeque::new();
357
358        for inner_frame_data in input_frames.chunks(2) {
359            let mut inner_frames = Vec::new();
360            inner_frames.extend_from_slice(&inner_frame_data[0][..]);
361            inner_frames.push(b'\n');
362            inner_frames.extend_from_slice(&inner_frame_data[1][..]);
363            inner_frames.push(b'\n');
364
365            buf.extend(&(inner_frames.len() as u32).to_le_bytes());
366            buf.extend(inner_frames);
367        }
368
369        // Now we should be able to extract our original six frames from the buffer.
370        for input_frame in input_frames {
371            let frame = framer
372                .next_frame(&mut buf, false)
373                .expect("should not fail to read from payload")
374                .expect("should not fail to extract frame from payload");
375            assert_eq!(&frame[..], &input_frame[..]);
376        }
377
378        let maybe_frame = framer
379            .next_frame(&mut buf, false)
380            .expect("should not fail to read from payload");
381        assert!(maybe_frame.is_none());
382
383        // We should have consumed the entire buffer.
384        assert!(buf.is_empty());
385    }
386}