saluki_io/deser/framing/
newline.rs

1use bytes::Bytes;
2use tracing::trace;
3
4use super::{Framer, FramingError};
5use crate::buf::ReadIoBuffer;
6
7/// Frames incoming data by splitting on newlines.
8///
9/// Only the newline character (0x0A, also known as "line feed") is used to split the payload into frames. If there are
10/// carriage return characters (0x0D) in the payload, they will be included in the resulting frames.
11#[derive(Default)]
12pub struct NewlineFramer {
13    required_on_eof: bool,
14}
15
16impl NewlineFramer {
17    /// Whether or not the delimiter is required when EOF has been reached.
18    ///
19    /// This controls whether or not the frames must always be suffixed by the delimiter character.  In some cases, a
20    /// delimiter is purely for separating multiple frames within a single payload, and when the final payload is sent,
21    /// it may not include the delimiter at the end.
22    ///
23    /// If the framing configuration requires the delimiter to always be present, then set this to `true`.
24    ///
25    /// Defaults to `false`.
26    pub fn required_on_eof(mut self, require: bool) -> Self {
27        self.required_on_eof = require;
28        self
29    }
30}
31
32impl Framer for NewlineFramer {
33    fn next_frame<'a, B: ReadIoBuffer>(&mut self, buf: &mut B, is_eof: bool) -> Result<Option<Bytes>, FramingError> {
34        trace!(buf_len = buf.remaining(), "Processing buffer.");
35
36        let chunk = buf.chunk();
37        if chunk.is_empty() {
38            return Ok(None);
39        }
40
41        trace!(chunk_len = chunk.len(), "Processing chunk.");
42
43        // Search through the buffer for our delimiter.
44        match find_newline(chunk) {
45            Some(idx) => {
46                // If we found the delimiter, then we can return the frame.
47                let frame = buf.copy_to_bytes(idx);
48
49                // Advance the buffer past the delimiter.
50                buf.advance(1);
51
52                Ok(Some(frame))
53            }
54            None => {
55                // If we're not at EOF, then we can't do anything else right now.
56                if !is_eof {
57                    return Ok(None);
58                }
59
60                // If we're at EOF and we require the delimiter, then this is an invalid frame.
61                if self.required_on_eof {
62                    return Err(missing_delimiter_err(chunk.len()));
63                }
64
65                // TODO: This is a bit inefficient, as we're copying the entire frame here. We could potentially avoid
66                // this by adding some specialized trait methods to `ReadIoBuffer` that could let us, potentially,
67                // implement equivalent slicing that is object pool aware (i.e., somehow utilizing `FrozenBytesBuffer`,
68                // etc).
69                Ok(Some(buf.copy_to_bytes(chunk.len())))
70            }
71        }
72    }
73}
74
75const fn missing_delimiter_err(len: usize) -> FramingError {
76    FramingError::InvalidFrame {
77        frame_len: len,
78        reason: "reached EOF without finding newline delimiter",
79    }
80}
81
82fn find_newline(haystack: &[u8]) -> Option<usize> {
83    memchr::memchr(b'\n', haystack)
84}
85
86#[cfg(test)]
87mod tests {
88    use std::collections::VecDeque;
89
90    use super::*;
91
92    fn get_delimited_payload(inner: &[u8], with_newline: bool) -> VecDeque<u8> {
93        let mut payload = VecDeque::new();
94        payload.extend(inner);
95        if with_newline {
96            payload.push_back(b'\n');
97        }
98
99        payload
100    }
101
102    #[test]
103    fn newline_no_eof() {
104        let payload = b"hello, world!";
105        let mut buf = get_delimited_payload(payload, true);
106
107        let mut framer = NewlineFramer::default();
108
109        let frame = framer
110            .next_frame(&mut buf, false)
111            .expect("should not fail to read from payload")
112            .expect("should not fail to extract frame from payload");
113
114        assert_eq!(&frame[..], payload);
115        assert!(buf.is_empty());
116    }
117
118    #[test]
119    fn no_newline_no_eof() {
120        let payload = b"hello, world!";
121        let mut buf = get_delimited_payload(payload, false);
122        let buf_len = buf.len();
123
124        let mut framer = NewlineFramer::default();
125
126        let maybe_frame = framer
127            .next_frame(&mut buf, false)
128            .expect("should not fail to read from payload");
129
130        assert_eq!(maybe_frame, None);
131        assert_eq!(buf.len(), buf_len);
132    }
133
134    #[test]
135    fn newline_eof() {
136        let payload = b"hello, world!";
137        let mut buf = get_delimited_payload(payload, true);
138
139        let mut framer = NewlineFramer::default();
140
141        let frame = framer
142            .next_frame(&mut buf, true)
143            .expect("should not fail to read from payload")
144            .expect("should not fail to extract frame from payload");
145
146        assert_eq!(&frame[..], payload);
147        assert!(buf.is_empty());
148    }
149
150    #[test]
151    fn no_newline_eof_not_required_on_eof() {
152        let payload = b"hello, world!";
153        let mut buf = get_delimited_payload(payload, false);
154
155        let mut framer = NewlineFramer::default();
156
157        let frame = framer
158            .next_frame(&mut buf, true)
159            .expect("should not fail to read from payload")
160            .expect("should not fail to extract frame from payload");
161
162        assert_eq!(&frame[..], payload);
163        assert!(buf.is_empty());
164    }
165
166    #[test]
167    fn no_newline_eof_required_on_eof() {
168        let payload = b"hello, world!";
169        let mut buf = get_delimited_payload(payload, false);
170        let buf_len = buf.len();
171
172        let mut framer = NewlineFramer::default().required_on_eof(true);
173
174        let maybe_frame = framer.next_frame(&mut buf, true);
175
176        assert_eq!(maybe_frame, Err(missing_delimiter_err(buf_len)));
177        assert_eq!(buf.len(), buf_len);
178    }
179}