saluki_io/deser/framing/
newline.rs1use bytes::Bytes;
2use tracing::trace;
3
4use super::{Framer, FramingError};
5use crate::buf::ReadIoBuffer;
6
7#[derive(Default)]
12pub struct NewlineFramer {
13 required_on_eof: bool,
14}
15
16impl NewlineFramer {
17 pub fn required_on_eof(mut self, require: bool) -> Self {
27 self.required_on_eof = require;
28 self
29 }
30}
31
32impl Framer for NewlineFramer {
33 fn next_frame<'a, B: ReadIoBuffer>(&mut self, buf: &mut B, is_eof: bool) -> Result<Option<Bytes>, FramingError> {
34 trace!(buf_len = buf.remaining(), "Processing buffer.");
35
36 let chunk = buf.chunk();
37 if chunk.is_empty() {
38 return Ok(None);
39 }
40
41 trace!(chunk_len = chunk.len(), "Processing chunk.");
42
43 match find_newline(chunk) {
45 Some(idx) => {
46 let frame = buf.copy_to_bytes(idx);
48
49 buf.advance(1);
51
52 Ok(Some(frame))
53 }
54 None => {
55 if !is_eof {
57 return Ok(None);
58 }
59
60 if self.required_on_eof {
62 return Err(missing_delimiter_err(chunk.len()));
63 }
64
65 Ok(Some(buf.copy_to_bytes(chunk.len())))
70 }
71 }
72 }
73}
74
75const fn missing_delimiter_err(len: usize) -> FramingError {
76 FramingError::InvalidFrame {
77 frame_len: len,
78 reason: "reached EOF without finding newline delimiter",
79 }
80}
81
82fn find_newline(haystack: &[u8]) -> Option<usize> {
83 memchr::memchr(b'\n', haystack)
84}
85
86#[cfg(test)]
87mod tests {
88 use std::collections::VecDeque;
89
90 use super::*;
91
92 fn get_delimited_payload(inner: &[u8], with_newline: bool) -> VecDeque<u8> {
93 let mut payload = VecDeque::new();
94 payload.extend(inner);
95 if with_newline {
96 payload.push_back(b'\n');
97 }
98
99 payload
100 }
101
102 #[test]
103 fn newline_no_eof() {
104 let payload = b"hello, world!";
105 let mut buf = get_delimited_payload(payload, true);
106
107 let mut framer = NewlineFramer::default();
108
109 let frame = framer
110 .next_frame(&mut buf, false)
111 .expect("should not fail to read from payload")
112 .expect("should not fail to extract frame from payload");
113
114 assert_eq!(&frame[..], payload);
115 assert!(buf.is_empty());
116 }
117
118 #[test]
119 fn no_newline_no_eof() {
120 let payload = b"hello, world!";
121 let mut buf = get_delimited_payload(payload, false);
122 let buf_len = buf.len();
123
124 let mut framer = NewlineFramer::default();
125
126 let maybe_frame = framer
127 .next_frame(&mut buf, false)
128 .expect("should not fail to read from payload");
129
130 assert_eq!(maybe_frame, None);
131 assert_eq!(buf.len(), buf_len);
132 }
133
134 #[test]
135 fn newline_eof() {
136 let payload = b"hello, world!";
137 let mut buf = get_delimited_payload(payload, true);
138
139 let mut framer = NewlineFramer::default();
140
141 let frame = framer
142 .next_frame(&mut buf, true)
143 .expect("should not fail to read from payload")
144 .expect("should not fail to extract frame from payload");
145
146 assert_eq!(&frame[..], payload);
147 assert!(buf.is_empty());
148 }
149
150 #[test]
151 fn no_newline_eof_not_required_on_eof() {
152 let payload = b"hello, world!";
153 let mut buf = get_delimited_payload(payload, false);
154
155 let mut framer = NewlineFramer::default();
156
157 let frame = framer
158 .next_frame(&mut buf, true)
159 .expect("should not fail to read from payload")
160 .expect("should not fail to extract frame from payload");
161
162 assert_eq!(&frame[..], payload);
163 assert!(buf.is_empty());
164 }
165
166 #[test]
167 fn no_newline_eof_required_on_eof() {
168 let payload = b"hello, world!";
169 let mut buf = get_delimited_payload(payload, false);
170 let buf_len = buf.len();
171
172 let mut framer = NewlineFramer::default().required_on_eof(true);
173
174 let maybe_frame = framer.next_frame(&mut buf, true);
175
176 assert_eq!(maybe_frame, Err(missing_delimiter_err(buf_len)));
177 assert_eq!(buf.len(), buf_len);
178 }
179}