1use bytes::Bytes;
2use snafu::Snafu;
3use tracing::trace;
4
5use crate::buf::ReadIoBuffer;
6
7mod length_delimited;
8pub use self::length_delimited::LengthDelimitedFramer;
9
10mod newline;
11pub use self::newline::NewlineFramer;
12
13#[derive(Debug, Snafu, Eq, PartialEq)]
15#[snafu(context(suffix(false)))]
16pub enum FramingError {
17 #[snafu(display("invalid frame (frame length: {}, {})", frame_len, reason))]
25 InvalidFrame { frame_len: usize, reason: &'static str },
26
27 #[snafu(display(
32 "partial frame after EOF (needed {} bytes, but only {} bytes remaining)",
33 needed,
34 remaining
35 ))]
36 PartialFrame { needed: usize, remaining: usize },
37}
38
39pub trait Framer {
41 fn next_frame<B: ReadIoBuffer>(&mut self, buf: &mut B, is_eof: bool) -> Result<Option<Bytes>, FramingError>;
53}
54
55pub struct NestedFramer<Inner, Outer> {
61 inner: Inner,
62 outer: Outer,
63 current_outer_frame: Option<Bytes>,
64 completed_outer_frames: usize,
65}
66
67impl<Inner, Outer> NestedFramer<Inner, Outer> {
68 pub fn new(inner: Inner, outer: Outer) -> Self {
70 Self {
71 inner,
72 outer,
73 current_outer_frame: None,
74 completed_outer_frames: 0,
75 }
76 }
77
78 pub fn take_completed_outer_frames(&mut self) -> usize {
80 let completed = self.completed_outer_frames;
81 self.completed_outer_frames = 0;
82 completed
83 }
84}
85
86impl<Inner, Outer> Framer for NestedFramer<Inner, Outer>
87where
88 Inner: Framer,
89 Outer: Framer,
90{
91 fn next_frame<B: ReadIoBuffer>(&mut self, buf: &mut B, is_eof: bool) -> Result<Option<Bytes>, FramingError> {
92 loop {
93 let outer_frame = match self.current_outer_frame.as_mut() {
95 Some(frame) => {
96 trace!(
97 buf_len = buf.remaining(),
98 frame_len = frame.len(),
99 "Using existing outer frame."
100 );
101
102 frame
103 }
104 None => {
105 trace!(buf_len = buf.remaining(), "No existing outer frame.");
106
107 match self.outer.next_frame(buf, is_eof)? {
108 Some(frame) => {
109 trace!(
110 buf_len = buf.remaining(),
111 frame_len = frame.len(),
112 ?frame,
113 "Extracted outer frame."
114 );
115
116 self.current_outer_frame.get_or_insert(frame)
117 }
118
119 None => return Ok(None),
121 }
122 }
123 };
124
125 match self.inner.next_frame(outer_frame, true)? {
127 Some(frame) => {
128 let completed_outer_frame = outer_frame.is_empty();
129
130 trace!(
131 buf_len = buf.remaining(),
132 outer_frame_len = outer_frame.len(),
133 inner_frame_len = frame.len(),
134 "Extracted inner frame."
135 );
136
137 if completed_outer_frame {
138 self.current_outer_frame = None;
139 self.completed_outer_frames += 1;
140 }
141
142 return Ok(Some(frame));
143 }
144 None => {
145 trace!(
148 buf_len = buf.remaining(),
149 outer_frame_len = outer_frame.len(),
150 "Couldn't extract inner frame from existing outer frame."
151 );
152
153 if outer_frame.is_empty() {
154 self.current_outer_frame = None;
155 self.completed_outer_frames += 1;
156 if buf.remaining() != 0 {
157 continue;
158 }
159 }
160
161 return Ok(None);
162 }
163 }
164 }
165 }
166}
167
168pub struct Framed<'a, F, B> {
170 framer: &'a mut F,
171 buffer: &'a mut B,
172 is_eof: bool,
173}
174
175impl<F, B> Iterator for Framed<'_, F, B>
176where
177 F: Framer,
178 B: ReadIoBuffer,
179{
180 type Item = Result<Bytes, FramingError>;
181
182 fn next(&mut self) -> Option<Self::Item> {
183 self.framer.next_frame(self.buffer, self.is_eof).transpose()
184 }
185}
186
187pub trait FramerExt {
189 fn framed<'a, F>(&'a mut self, framer: &'a mut F, is_eof: bool) -> Framed<'a, F, Self>
194 where
195 Self: ReadIoBuffer + Sized,
196 F: Framer;
197}
198
199impl<B> FramerExt for B
200where
201 B: ReadIoBuffer,
202{
203 fn framed<'a, F>(&'a mut self, framer: &'a mut F, is_eof: bool) -> Framed<'a, F, Self> {
204 Framed {
205 framer,
206 buffer: self,
207 is_eof,
208 }
209 }
210}
211
212#[cfg(test)]
213mod tests {
214 use std::collections::VecDeque;
215
216 use super::{Framer as _, LengthDelimitedFramer, NestedFramer, NewlineFramer};
217
218 #[test]
219 fn nested_framer_single_outer_multiple_inner() {
220 let input_frames = &[b"frame1", b"frame2", b"frame3"];
221
222 let mut framer = NestedFramer::new(NewlineFramer::default(), LengthDelimitedFramer);
225
226 let mut inner_frames = Vec::new();
228
229 for inner_frame_data in input_frames {
230 inner_frames.extend_from_slice(&inner_frame_data[..]);
231 inner_frames.push(b'\n');
232 }
233
234 let mut buf = VecDeque::new();
235 buf.extend(&(inner_frames.len() as u32).to_le_bytes());
236 buf.extend(inner_frames);
237
238 for input_frame in input_frames {
240 let frame = framer
241 .next_frame(&mut buf, false)
242 .expect("should not fail to read from payload")
243 .expect("should not fail to extract frame from payload");
244 assert_eq!(&frame[..], &input_frame[..]);
245 }
246
247 let maybe_frame = framer
248 .next_frame(&mut buf, false)
249 .expect("should not fail to read from payload");
250 assert!(maybe_frame.is_none());
251
252 assert!(buf.is_empty());
254 }
255
256 #[test]
257 fn nested_framer_multiple_outer_single_inner() {
258 let input_frames = &[b"frame1", b"frame2", b"frame3"];
259
260 let mut framer = NestedFramer::new(NewlineFramer::default(), LengthDelimitedFramer);
263
264 let mut buf = VecDeque::new();
266
267 for inner_frame_data in input_frames {
268 let mut inner_frame = Vec::new();
269 inner_frame.extend_from_slice(&inner_frame_data[..]);
270 inner_frame.push(b'\n');
271
272 buf.extend(&(inner_frame.len() as u32).to_le_bytes());
273 buf.extend(inner_frame);
274 }
275
276 for input_frame in input_frames {
278 let frame = framer
279 .next_frame(&mut buf, false)
280 .expect("should not fail to read from payload")
281 .expect("should not fail to extract frame from payload");
282 assert_eq!(&frame[..], &input_frame[..]);
283 }
284
285 let maybe_frame = framer
286 .next_frame(&mut buf, false)
287 .expect("should not fail to read from payload");
288 assert!(maybe_frame.is_none());
289
290 assert!(buf.is_empty());
292 }
293
294 #[test]
295 fn nested_framer_tracks_completed_outer_frames() {
296 let input_frames = &[b"frame1", b"frame2", b"frame3"];
297 let mut framer = NestedFramer::new(NewlineFramer::default(), LengthDelimitedFramer);
298 let mut buf = VecDeque::new();
299
300 for inner_frame_data in input_frames {
301 let mut inner_frame = Vec::new();
302 inner_frame.extend_from_slice(&inner_frame_data[..]);
303 inner_frame.push(b'\n');
304
305 buf.extend(&(inner_frame.len() as u32).to_le_bytes());
306 buf.extend(inner_frame);
307 }
308
309 for input_frame in input_frames {
310 let frame = framer
311 .next_frame(&mut buf, false)
312 .expect("should not fail to read from payload")
313 .expect("should extract frame from payload");
314 assert_eq!(&frame[..], &input_frame[..]);
315 assert_eq!(framer.take_completed_outer_frames(), 1);
316 }
317
318 assert!(buf.is_empty());
319 assert_eq!(framer.take_completed_outer_frames(), 0);
320 }
321
322 #[test]
323 fn nested_framer_waits_to_complete_outer_frame_until_last_inner_frame() {
324 let mut framer = NestedFramer::new(NewlineFramer::default(), LengthDelimitedFramer);
325 let mut inner_frames = Vec::new();
326 inner_frames.extend_from_slice(b"frame1\nframe2\n");
327
328 let mut buf = VecDeque::new();
329 buf.extend(&(inner_frames.len() as u32).to_le_bytes());
330 buf.extend(inner_frames);
331
332 let first_frame = framer
333 .next_frame(&mut buf, false)
334 .expect("should not fail to read from payload")
335 .expect("should extract first frame from payload");
336 assert_eq!(&first_frame[..], b"frame1");
337 assert_eq!(framer.take_completed_outer_frames(), 0);
338
339 let second_frame = framer
340 .next_frame(&mut buf, false)
341 .expect("should not fail to read from payload")
342 .expect("should extract second frame from payload");
343 assert_eq!(&second_frame[..], b"frame2");
344 assert_eq!(framer.take_completed_outer_frames(), 1);
345 }
346
347 #[test]
348 fn nested_framer_multiple_outer_multiple_inner() {
349 let input_frames = &[b"frame1", b"frame2", b"frame3", b"frame4", b"frame5", b"frame6"];
350
351 let mut framer = NestedFramer::new(NewlineFramer::default(), LengthDelimitedFramer);
354
355 let mut buf = VecDeque::new();
357
358 for inner_frame_data in input_frames.chunks(2) {
359 let mut inner_frames = Vec::new();
360 inner_frames.extend_from_slice(&inner_frame_data[0][..]);
361 inner_frames.push(b'\n');
362 inner_frames.extend_from_slice(&inner_frame_data[1][..]);
363 inner_frames.push(b'\n');
364
365 buf.extend(&(inner_frames.len() as u32).to_le_bytes());
366 buf.extend(inner_frames);
367 }
368
369 for input_frame in input_frames {
371 let frame = framer
372 .next_frame(&mut buf, false)
373 .expect("should not fail to read from payload")
374 .expect("should not fail to extract frame from payload");
375 assert_eq!(&frame[..], &input_frame[..]);
376 }
377
378 let maybe_frame = framer
379 .next_frame(&mut buf, false)
380 .expect("should not fail to read from payload");
381 assert!(maybe_frame.is_none());
382
383 assert!(buf.is_empty());
385 }
386}