saluki_io/deser/framing/
length_delimited.rs

1use bytes::Bytes;
2use tracing::trace;
3
4use super::{Framer, FramingError};
5use crate::buf::ReadIoBuffer;
6
7/// Frames incoming data by splitting data based on a fixed-size length delimiter.
8///
9/// All frames are prepended with a 4-byte integer, in little endian order, which indicates how much additional data is
10/// included in the frame. This framer only supports frame lengths that fit within the given buffer, which is to say
11/// that if the length described in the delimiter would exceed the current buffer, it is considered an invalid frame.
12#[derive(Default)]
13pub struct LengthDelimitedFramer;
14
15impl Framer for LengthDelimitedFramer {
16    fn next_frame<B: ReadIoBuffer>(&mut self, buf: &mut B, is_eof: bool) -> Result<Option<Bytes>, FramingError> {
17        trace!(buf_len = buf.remaining(), "Processing buffer.");
18
19        let chunk = buf.chunk();
20        if chunk.is_empty() {
21            return Ok(None);
22        }
23
24        trace!(chunk_len = chunk.len(), "Processing chunk.");
25
26        // See if there's enough data to read the frame length.
27        if chunk.len() < 4 {
28            return if is_eof {
29                Err(FramingError::PartialFrame {
30                    needed: 4,
31                    remaining: chunk.len(),
32                })
33            } else {
34                Ok(None)
35            };
36        }
37
38        // See if we have enough data to read the full frame.
39        let frame_len = u32::from_le_bytes(chunk[0..4].try_into().unwrap()) as usize;
40        let frame_len_with_length = frame_len.saturating_add(4);
41        if frame_len_with_length > buf.capacity() {
42            return Err(oversized_frame_err(frame_len));
43        }
44
45        if chunk.len() < frame_len_with_length {
46            return if is_eof {
47                // If we've hit EOF and we have a partial frame here, well, then... it's invalid.
48                Err(FramingError::PartialFrame {
49                    needed: frame_len_with_length,
50                    remaining: chunk.len(),
51                })
52            } else {
53                Ok(None)
54            };
55        }
56
57        // Split out the entire frame -- length delimiter included -- and then carve out the length delimiter from the
58        // frame that we return.
59        //
60        // TODO: This is a bit inefficient, as we're copying the entire frame here. We could potentially avoid this by
61        // adding some specialized trait methods to `ReadIoBuffer` that could let us, potentially, implement equivalent
62        // slicing that is object pool aware (i.e., somehow utilizing `FrozenBytesBuffer`, etc).
63        let frame = buf.copy_to_bytes(frame_len_with_length).slice(4..);
64
65        Ok(Some(frame))
66    }
67}
68
69const fn oversized_frame_err(frame_len: usize) -> FramingError {
70    FramingError::InvalidFrame {
71        frame_len,
72        reason: "frame length exceeds buffer capacity",
73    }
74}
75
76#[cfg(test)]
77mod tests {
78    use std::collections::VecDeque;
79
80    use super::*;
81
82    fn get_delimited_payload(inner: &[u8], with_newline: bool) -> VecDeque<u8> {
83        let payload_len = if with_newline { inner.len() + 1 } else { inner.len() };
84
85        get_delimited_payload_with_fixed_length(inner, payload_len as u32, with_newline)
86    }
87
88    fn get_delimited_payload_with_fixed_length(inner: &[u8], frame_len: u32, with_newline: bool) -> VecDeque<u8> {
89        let mut payload = VecDeque::new();
90        payload.extend(&frame_len.to_le_bytes());
91        payload.extend(inner);
92        if with_newline {
93            payload.push_back(b'\n');
94        }
95
96        payload
97    }
98
99    #[test]
100    fn basic() {
101        let payload = b"hello, world!";
102        let mut buf = get_delimited_payload(payload, false);
103
104        let mut framer = LengthDelimitedFramer;
105
106        let frame = framer
107            .next_frame(&mut buf, false)
108            .expect("should not fail to read from payload")
109            .expect("should not fail to extract frame from payload");
110
111        assert_eq!(&frame[..], payload);
112        assert!(buf.is_empty());
113    }
114
115    #[test]
116    fn partial_read() {
117        // We create a full, valid frame and then take incrementally larger slices of it, ensuring that we can't
118        // actually read the frame until we give the framer the entire buffer.
119        let payload = b"hello, world!";
120        let mut buf = get_delimited_payload(payload, false);
121
122        let mut framer = LengthDelimitedFramer;
123
124        // Try reading a frame from a buffer that doesn't have enough bytes for the length delimiter itself.
125        let mut no_delimiter_buf = buf.clone();
126        no_delimiter_buf.truncate(3);
127
128        let maybe_frame = framer
129            .next_frame(&mut no_delimiter_buf, false)
130            .expect("should not fail to read from payload");
131        assert!(maybe_frame.is_none());
132        assert_eq!(no_delimiter_buf.len(), 3);
133
134        // Try reading a frame from a buffer that has enough bytes for the length delimiter, but not as many bytes as
135        // the length delimiter indicates.
136        let mut delimiter_but_partial_buf = buf.clone();
137        delimiter_but_partial_buf.truncate(7);
138
139        let maybe_frame = framer
140            .next_frame(&mut delimiter_but_partial_buf, false)
141            .expect("should not fail to read from payload");
142        assert!(maybe_frame.is_none());
143        assert_eq!(delimiter_but_partial_buf.len(), 7);
144
145        // Now try reading a frame from the original buffer, which should succeed.
146        let frame = framer
147            .next_frame(&mut buf, false)
148            .expect("should not fail to read from payload")
149            .expect("should not fail to extract frame from payload");
150
151        assert_eq!(&frame[..], payload);
152        assert!(buf.is_empty());
153    }
154
155    #[test]
156    fn partial_read_eof() {
157        // We create a full, valid frame and then take incrementally larger slices of it, ensuring that we can't
158        // actually read the frame until we give the framer the entire buffer.
159        let payload = b"hello, world!";
160        let mut buf = get_delimited_payload(payload, false);
161        let frame_len = buf.len();
162
163        let mut framer = LengthDelimitedFramer;
164
165        // Try reading a frame from a buffer that doesn't have enough bytes for the length delimiter itself.
166        let mut no_delimiter_buf = buf.clone();
167        no_delimiter_buf.truncate(3);
168
169        let maybe_frame = framer.next_frame(&mut no_delimiter_buf, true);
170        assert_eq!(
171            maybe_frame,
172            Err(FramingError::PartialFrame {
173                needed: 4,
174                remaining: 3
175            })
176        );
177        assert_eq!(no_delimiter_buf.len(), 3);
178
179        // Try reading a frame from a buffer that has enough bytes for the length delimiter, but not as many bytes as
180        // the length delimiter indicates.
181        let mut delimiter_but_partial_buf = buf.clone();
182        delimiter_but_partial_buf.truncate(7);
183
184        let maybe_frame = framer.next_frame(&mut delimiter_but_partial_buf, true);
185        assert_eq!(
186            maybe_frame,
187            Err(FramingError::PartialFrame {
188                needed: frame_len,
189                remaining: 7
190            })
191        );
192        assert_eq!(delimiter_but_partial_buf.len(), 7);
193
194        // Now try reading a frame from the original buffer, which should succeed.
195        let frame = framer
196            .next_frame(&mut buf, true)
197            .expect("should not fail to read from payload")
198            .expect("should not fail to extract frame from payload");
199
200        assert_eq!(&frame[..], payload);
201        assert!(buf.is_empty());
202    }
203
204    #[test]
205    fn oversized_frame() {
206        // We create an invalid frame with a length that exceeds the overall length of the resulting buffer.
207        let payload = b"hello, world!";
208        let mut buf = get_delimited_payload_with_fixed_length(payload, (payload.len() * 10) as u32, false);
209        let buf_len = buf.len();
210
211        let mut framer = LengthDelimitedFramer;
212
213        // We should get back an error that the frame is invalid, and the original buffer should not be altered at all.
214        let maybe_frame = framer.next_frame(&mut buf, false);
215        assert_eq!(maybe_frame, Err(oversized_frame_err(payload.len() * 10)));
216        assert_eq!(buf.len(), buf_len);
217    }
218}