1use bytes::Bytes;
2use snafu::Snafu;
3use tracing::trace;
4
5use crate::buf::ReadIoBuffer;
6
7mod length_delimited;
8pub use self::length_delimited::LengthDelimitedFramer;
9
10mod newline;
11pub use self::newline::NewlineFramer;
12
13#[derive(Debug, Snafu, Eq, PartialEq)]
15#[snafu(context(suffix(false)))]
16pub enum FramingError {
17 #[snafu(display("invalid frame (frame length: {}, {})", frame_len, reason))]
25 InvalidFrame { frame_len: usize, reason: &'static str },
26
27 #[snafu(display(
32 "partial frame after EOF (needed {} bytes, but only {} bytes remaining)",
33 needed,
34 remaining
35 ))]
36 PartialFrame { needed: usize, remaining: usize },
37}
38
39pub trait Framer {
41 fn next_frame<B: ReadIoBuffer>(&mut self, buf: &mut B, is_eof: bool) -> Result<Option<Bytes>, FramingError>;
53}
54
55pub struct NestedFramer<Inner, Outer> {
61 inner: Inner,
62 outer: Outer,
63 current_outer_frame: Option<Bytes>,
64}
65
66impl<Inner, Outer> NestedFramer<Inner, Outer> {
67 pub fn new(inner: Inner, outer: Outer) -> Self {
69 Self {
70 inner,
71 outer,
72 current_outer_frame: None,
73 }
74 }
75}
76
77impl<Inner, Outer> Framer for NestedFramer<Inner, Outer>
78where
79 Inner: Framer,
80 Outer: Framer,
81{
82 fn next_frame<B: ReadIoBuffer>(&mut self, buf: &mut B, is_eof: bool) -> Result<Option<Bytes>, FramingError> {
83 loop {
84 let outer_frame = match self.current_outer_frame.as_mut() {
86 Some(frame) => {
87 trace!(
88 buf_len = buf.remaining(),
89 frame_len = frame.len(),
90 "Using existing outer frame."
91 );
92
93 frame
94 }
95 None => {
96 trace!(buf_len = buf.remaining(), "No existing outer frame.");
97
98 match self.outer.next_frame(buf, is_eof)? {
99 Some(frame) => {
100 trace!(
101 buf_len = buf.remaining(),
102 frame_len = frame.len(),
103 ?frame,
104 "Extracted outer frame."
105 );
106
107 self.current_outer_frame.get_or_insert(frame)
108 }
109
110 None => return Ok(None),
112 }
113 }
114 };
115
116 match self.inner.next_frame(outer_frame, true)? {
118 Some(frame) => {
119 trace!(
120 buf_len = buf.remaining(),
121 outer_frame_len = outer_frame.len(),
122 inner_frame_len = frame.len(),
123 "Extracted inner frame."
124 );
125
126 return Ok(Some(frame));
127 }
128 None => {
129 trace!(
132 buf_len = buf.remaining(),
133 outer_frame_len = outer_frame.len(),
134 "Couldn't extract inner frame from existing outer frame."
135 );
136
137 if outer_frame.is_empty() && buf.remaining() != 0 {
138 self.current_outer_frame = None;
139 continue;
140 } else {
141 return Ok(None);
142 }
143 }
144 }
145 }
146 }
147}
148
149pub struct Framed<'a, F, B> {
151 framer: &'a mut F,
152 buffer: &'a mut B,
153 is_eof: bool,
154}
155
156impl<F, B> Iterator for Framed<'_, F, B>
157where
158 F: Framer,
159 B: ReadIoBuffer,
160{
161 type Item = Result<Bytes, FramingError>;
162
163 fn next(&mut self) -> Option<Self::Item> {
164 self.framer.next_frame(self.buffer, self.is_eof).transpose()
165 }
166}
167
168pub trait FramerExt {
170 fn framed<'a, F>(&'a mut self, framer: &'a mut F, is_eof: bool) -> Framed<'a, F, Self>
175 where
176 Self: ReadIoBuffer + Sized,
177 F: Framer;
178}
179
180impl<B> FramerExt for B
181where
182 B: ReadIoBuffer,
183{
184 fn framed<'a, F>(&'a mut self, framer: &'a mut F, is_eof: bool) -> Framed<'a, F, Self> {
185 Framed {
186 framer,
187 buffer: self,
188 is_eof,
189 }
190 }
191}
192
193#[cfg(test)]
194mod tests {
195 use std::collections::VecDeque;
196
197 use super::{Framer as _, LengthDelimitedFramer, NestedFramer, NewlineFramer};
198
199 #[test]
200 fn nested_framer_single_outer_multiple_inner() {
201 let input_frames = &[b"frame1", b"frame2", b"frame3"];
202
203 let mut framer = NestedFramer::new(NewlineFramer::default(), LengthDelimitedFramer);
206
207 let mut inner_frames = Vec::new();
209
210 for inner_frame_data in input_frames {
211 inner_frames.extend_from_slice(&inner_frame_data[..]);
212 inner_frames.push(b'\n');
213 }
214
215 let mut buf = VecDeque::new();
216 buf.extend(&(inner_frames.len() as u32).to_le_bytes());
217 buf.extend(inner_frames);
218
219 for input_frame in input_frames {
221 let frame = framer
222 .next_frame(&mut buf, false)
223 .expect("should not fail to read from payload")
224 .expect("should not fail to extract frame from payload");
225 assert_eq!(&frame[..], &input_frame[..]);
226 }
227
228 let maybe_frame = framer
229 .next_frame(&mut buf, false)
230 .expect("should not fail to read from payload");
231 assert!(maybe_frame.is_none());
232
233 assert!(buf.is_empty());
235 }
236
237 #[test]
238 fn nested_framer_multiple_outer_single_inner() {
239 let input_frames = &[b"frame1", b"frame2", b"frame3"];
240
241 let mut framer = NestedFramer::new(NewlineFramer::default(), LengthDelimitedFramer);
244
245 let mut buf = VecDeque::new();
247
248 for inner_frame_data in input_frames {
249 let mut inner_frame = Vec::new();
250 inner_frame.extend_from_slice(&inner_frame_data[..]);
251 inner_frame.push(b'\n');
252
253 buf.extend(&(inner_frame.len() as u32).to_le_bytes());
254 buf.extend(inner_frame);
255 }
256
257 for input_frame in input_frames {
259 let frame = framer
260 .next_frame(&mut buf, false)
261 .expect("should not fail to read from payload")
262 .expect("should not fail to extract frame from payload");
263 assert_eq!(&frame[..], &input_frame[..]);
264 }
265
266 let maybe_frame = framer
267 .next_frame(&mut buf, false)
268 .expect("should not fail to read from payload");
269 assert!(maybe_frame.is_none());
270
271 assert!(buf.is_empty());
273 }
274
275 #[test]
276 fn nested_framer_multiple_outer_multiple_inner() {
277 let input_frames = &[b"frame1", b"frame2", b"frame3", b"frame4", b"frame5", b"frame6"];
278
279 let mut framer = NestedFramer::new(NewlineFramer::default(), LengthDelimitedFramer);
282
283 let mut buf = VecDeque::new();
285
286 for inner_frame_data in input_frames.chunks(2) {
287 let mut inner_frames = Vec::new();
288 inner_frames.extend_from_slice(&inner_frame_data[0][..]);
289 inner_frames.push(b'\n');
290 inner_frames.extend_from_slice(&inner_frame_data[1][..]);
291 inner_frames.push(b'\n');
292
293 buf.extend(&(inner_frames.len() as u32).to_le_bytes());
294 buf.extend(inner_frames);
295 }
296
297 for input_frame in input_frames {
299 let frame = framer
300 .next_frame(&mut buf, false)
301 .expect("should not fail to read from payload")
302 .expect("should not fail to extract frame from payload");
303 assert_eq!(&frame[..], &input_frame[..]);
304 }
305
306 let maybe_frame = framer
307 .next_frame(&mut buf, false)
308 .expect("should not fail to read from payload");
309 assert!(maybe_frame.is_none());
310
311 assert!(buf.is_empty());
313 }
314}