saluki_io/deser/framing/
length_delimited.rs1use bytes::Bytes;
2use tracing::trace;
3
4use super::{Framer, FramingError};
5use crate::buf::ReadIoBuffer;
6
7#[derive(Default)]
13pub struct LengthDelimitedFramer;
14
15impl Framer for LengthDelimitedFramer {
16 fn next_frame<B: ReadIoBuffer>(&mut self, buf: &mut B, is_eof: bool) -> Result<Option<Bytes>, FramingError> {
17 trace!(buf_len = buf.remaining(), "Processing buffer.");
18
19 let chunk = buf.chunk();
20 if chunk.is_empty() {
21 return Ok(None);
22 }
23
24 trace!(chunk_len = chunk.len(), "Processing chunk.");
25
26 if chunk.len() < 4 {
28 return if is_eof {
29 Err(FramingError::PartialFrame {
30 needed: 4,
31 remaining: chunk.len(),
32 })
33 } else {
34 Ok(None)
35 };
36 }
37
38 let frame_len = u32::from_le_bytes(chunk[0..4].try_into().unwrap()) as usize;
40 let frame_len_with_length = frame_len.saturating_add(4);
41 if frame_len_with_length > buf.capacity() {
42 return Err(oversized_frame_err(frame_len));
43 }
44
45 if chunk.len() < frame_len_with_length {
46 return if is_eof {
47 Err(FramingError::PartialFrame {
49 needed: frame_len_with_length,
50 remaining: chunk.len(),
51 })
52 } else {
53 Ok(None)
54 };
55 }
56
57 let frame = buf.copy_to_bytes(frame_len_with_length).slice(4..);
64
65 Ok(Some(frame))
66 }
67}
68
69const fn oversized_frame_err(frame_len: usize) -> FramingError {
70 FramingError::InvalidFrame {
71 frame_len,
72 reason: "frame length exceeds buffer capacity",
73 }
74}
75
76#[cfg(test)]
77mod tests {
78 use std::collections::VecDeque;
79
80 use super::*;
81
82 fn get_delimited_payload(inner: &[u8], with_newline: bool) -> VecDeque<u8> {
83 let payload_len = if with_newline { inner.len() + 1 } else { inner.len() };
84
85 get_delimited_payload_with_fixed_length(inner, payload_len as u32, with_newline)
86 }
87
88 fn get_delimited_payload_with_fixed_length(inner: &[u8], frame_len: u32, with_newline: bool) -> VecDeque<u8> {
89 let mut payload = VecDeque::new();
90 payload.extend(&frame_len.to_le_bytes());
91 payload.extend(inner);
92 if with_newline {
93 payload.push_back(b'\n');
94 }
95
96 payload
97 }
98
99 #[test]
100 fn basic() {
101 let payload = b"hello, world!";
102 let mut buf = get_delimited_payload(payload, false);
103
104 let mut framer = LengthDelimitedFramer;
105
106 let frame = framer
107 .next_frame(&mut buf, false)
108 .expect("should not fail to read from payload")
109 .expect("should not fail to extract frame from payload");
110
111 assert_eq!(&frame[..], payload);
112 assert!(buf.is_empty());
113 }
114
115 #[test]
116 fn partial_read() {
117 let payload = b"hello, world!";
120 let mut buf = get_delimited_payload(payload, false);
121
122 let mut framer = LengthDelimitedFramer;
123
124 let mut no_delimiter_buf = buf.clone();
126 no_delimiter_buf.truncate(3);
127
128 let maybe_frame = framer
129 .next_frame(&mut no_delimiter_buf, false)
130 .expect("should not fail to read from payload");
131 assert!(maybe_frame.is_none());
132 assert_eq!(no_delimiter_buf.len(), 3);
133
134 let mut delimiter_but_partial_buf = buf.clone();
137 delimiter_but_partial_buf.truncate(7);
138
139 let maybe_frame = framer
140 .next_frame(&mut delimiter_but_partial_buf, false)
141 .expect("should not fail to read from payload");
142 assert!(maybe_frame.is_none());
143 assert_eq!(delimiter_but_partial_buf.len(), 7);
144
145 let frame = framer
147 .next_frame(&mut buf, false)
148 .expect("should not fail to read from payload")
149 .expect("should not fail to extract frame from payload");
150
151 assert_eq!(&frame[..], payload);
152 assert!(buf.is_empty());
153 }
154
155 #[test]
156 fn partial_read_eof() {
157 let payload = b"hello, world!";
160 let mut buf = get_delimited_payload(payload, false);
161 let frame_len = buf.len();
162
163 let mut framer = LengthDelimitedFramer;
164
165 let mut no_delimiter_buf = buf.clone();
167 no_delimiter_buf.truncate(3);
168
169 let maybe_frame = framer.next_frame(&mut no_delimiter_buf, true);
170 assert_eq!(
171 maybe_frame,
172 Err(FramingError::PartialFrame {
173 needed: 4,
174 remaining: 3
175 })
176 );
177 assert_eq!(no_delimiter_buf.len(), 3);
178
179 let mut delimiter_but_partial_buf = buf.clone();
182 delimiter_but_partial_buf.truncate(7);
183
184 let maybe_frame = framer.next_frame(&mut delimiter_but_partial_buf, true);
185 assert_eq!(
186 maybe_frame,
187 Err(FramingError::PartialFrame {
188 needed: frame_len,
189 remaining: 7
190 })
191 );
192 assert_eq!(delimiter_but_partial_buf.len(), 7);
193
194 let frame = framer
196 .next_frame(&mut buf, true)
197 .expect("should not fail to read from payload")
198 .expect("should not fail to extract frame from payload");
199
200 assert_eq!(&frame[..], payload);
201 assert!(buf.is_empty());
202 }
203
204 #[test]
205 fn oversized_frame() {
206 let payload = b"hello, world!";
208 let mut buf = get_delimited_payload_with_fixed_length(payload, (payload.len() * 10) as u32, false);
209 let buf_len = buf.len();
210
211 let mut framer = LengthDelimitedFramer;
212
213 let maybe_frame = framer.next_frame(&mut buf, false);
215 assert_eq!(maybe_frame, Err(oversized_frame_err(payload.len() * 10)));
216 assert_eq!(buf.len(), buf_len);
217 }
218}