saluki_io/buf/
vec.rs

1use std::{mem::ManuallyDrop, sync::Arc};
2
3use bytes::{buf::UninitSlice, Buf, BufMut};
4use saluki_core::pooling::{helpers::pooled_newtype, Clearable, ReclaimStrategy};
5use triomphe::{Arc as TriompheArc, UniqueArc};
6
7use super::{ClearableIoBuffer, CollapsibleReadWriteIoBuffer, ReadIoBuffer};
8
9/// A fixed-size bytes buffer.
10///
11/// This is a simple wrapper around a `BytesMut` that provides fixed-size semantics by disallowing writes that extend
12/// beyond the initial capacity. `FixedSizeVec` cannot be used directly, and must be interacted with via the
13/// [`Buf`] and [`BufMut`] traits.
14///
15/// Additionally, it is designed for use in object pools (implements [`Clearable`]).
16pub struct FixedSizeVec {
17    data: UniqueArc<Vec<u8>>,
18    read_idx: usize,
19}
20
21impl FixedSizeVec {
22    /// Creates a new `FixedSizeVec` with the given capacity.
23    ///
24    /// The vector will not grow once all available capacity has been consumed, and must be cleared to be reused.
25    pub fn with_capacity(capacity: usize) -> Self {
26        Self {
27            data: UniqueArc::new(Vec::with_capacity(capacity)),
28            read_idx: 0,
29        }
30    }
31
32    fn freeze(self) -> FrozenFixedSizeVec {
33        FrozenFixedSizeVec {
34            data: self.data.shareable(),
35            read_idx: self.read_idx,
36        }
37    }
38}
39
40impl Clearable for FixedSizeVec {
41    fn clear(&mut self) {
42        self.data.clear();
43        self.read_idx = 0;
44    }
45}
46
47struct FrozenFixedSizeVec {
48    data: TriompheArc<Vec<u8>>,
49    read_idx: usize,
50}
51
52impl FrozenFixedSizeVec {
53    fn into_unique(self) -> Option<FixedSizeVec> {
54        TriompheArc::into_unique(self.data).map(|data| FixedSizeVec { data, read_idx: 0 })
55    }
56}
57
58impl Clone for FrozenFixedSizeVec {
59    fn clone(&self) -> Self {
60        Self {
61            data: self.data.clone(),
62            read_idx: 0,
63        }
64    }
65}
66
67pooled_newtype! {
68    outer => BytesBuffer,
69    inner => FixedSizeVec,
70}
71
72impl BytesBuffer {
73    /// Consumes this buffer and returns a read-only version of it.
74    pub fn freeze(mut self) -> FrozenBytesBuffer {
75        let data = self.data.take().unwrap().freeze();
76
77        FrozenBytesBuffer {
78            strategy_ref: Arc::clone(&self.strategy_ref),
79            data: ManuallyDrop::new(data),
80        }
81    }
82}
83
84impl Buf for BytesBuffer {
85    fn remaining(&self) -> usize {
86        self.data().data.len() - self.data().read_idx
87    }
88
89    fn chunk(&self) -> &[u8] {
90        let data = self.data();
91        &data.data[data.read_idx..data.data.len()]
92    }
93
94    fn advance(&mut self, cnt: usize) {
95        let data = self.data_mut();
96        assert!(data.read_idx + cnt <= data.data.len());
97        data.read_idx += cnt;
98    }
99}
100
101unsafe impl BufMut for BytesBuffer {
102    fn remaining_mut(&self) -> usize {
103        let data = self.data();
104        data.data.capacity() - data.data.len()
105    }
106
107    fn chunk_mut(&mut self) -> &mut UninitSlice {
108        self.data_mut().data.spare_capacity_mut().into()
109    }
110
111    unsafe fn advance_mut(&mut self, cnt: usize) {
112        let new_len = self.data().data.len() + cnt;
113        self.data_mut().data.set_len(new_len);
114    }
115}
116
117impl ReadIoBuffer for BytesBuffer {
118    fn capacity(&self) -> usize {
119        self.data().data.capacity()
120    }
121}
122
123impl CollapsibleReadWriteIoBuffer for BytesBuffer {
124    fn collapse(&mut self) {
125        let remaining = self.remaining();
126
127        // If the buffer is empty, all we have to do is reset the buffer to its initial state.
128        if remaining == 0 {
129            let inner = self.data_mut();
130            inner.read_idx = 0;
131            inner.data.clear();
132            return;
133        }
134
135        // Otherwise, we have to actually shift the remaining data to the front of the buffer and then also update our
136        // buffer state.
137        let inner = self.data_mut();
138
139        let src_start = inner.read_idx;
140        let src_end = inner.data.len();
141        inner.data.copy_within(src_start..src_end, 0);
142        inner.data.truncate(remaining);
143        inner.read_idx = 0;
144    }
145}
146
147impl ClearableIoBuffer for BytesBuffer {
148    fn clear(&mut self) {
149        self.data_mut().clear();
150    }
151}
152
153/// A frozen, read-only version of [`BytesBuffer`].
154///
155/// `FrozenBytesBuffer` can be cheaply cloned, and allows for sharing an underlying [`BytesBuffer`] among multiple
156/// tasks while still maintaining all of the original buffer's object pooling semantics.
157// TODO: it's not great that we're manually emulating the internal structure of `BytesBuffer`, since the whole point is
158// that those bits are auto-generated for us and meant to be functionally transparent to using `BytesBuffer` in the
159// first place... it'd be interesting to consider if we could make this more ergonomic, perhaps by having some sort of
160// convenience helper method for converting pooled objects of type T to U where the `Poolable::Data` is identical
161// between them, almost along lines of `CoerceUnsized` where the underlying data isn't changing, just the representation
162// of it.
163#[derive(Clone)]
164pub struct FrozenBytesBuffer {
165    strategy_ref: Arc<dyn ReclaimStrategy<BytesBuffer> + Send + Sync>,
166    data: ManuallyDrop<FrozenFixedSizeVec>,
167}
168
169impl FrozenBytesBuffer {
170    /// Returns `true` if the buffer is empty.
171    pub fn is_empty(&self) -> bool {
172        self.data.read_idx == self.data.data.len()
173    }
174
175    /// Returns the number of bytes remaining in the buffer.
176    pub fn len(&self) -> usize {
177        self.data.data.len() - self.data.read_idx
178    }
179
180    /// Returns the total capacity of the buffer.
181    pub fn capacity(&self) -> usize {
182        self.data.data.capacity()
183    }
184}
185
186impl Buf for FrozenBytesBuffer {
187    fn remaining(&self) -> usize {
188        self.data.data.len() - self.data.read_idx
189    }
190
191    fn chunk(&self) -> &[u8] {
192        &self.data.data[self.data.read_idx..]
193    }
194
195    fn advance(&mut self, cnt: usize) {
196        assert!(self.data.read_idx + cnt <= self.data.data.len());
197        self.data.read_idx += cnt;
198    }
199}
200
201impl Drop for FrozenBytesBuffer {
202    fn drop(&mut self) {
203        // If we're the last reference to the buffer, we need to reconstitute it back to a `FixedSizeVec`, and reclaim
204        // it to the object pool.
205        //
206        // SAFETY: Nothing else can be using `self.data` since we're dropping.
207        let data = unsafe { ManuallyDrop::take(&mut self.data) };
208        if let Some(data) = data.into_unique() {
209            self.strategy_ref.reclaim(data);
210        }
211    }
212}
213
214#[cfg(test)]
215mod tests {
216    use saluki_core::pooling::helpers::get_pooled_object_via_builder;
217
218    use super::*;
219
220    #[test]
221    fn basic() {
222        let mut buf = get_pooled_object_via_builder::<_, BytesBuffer>(|| FixedSizeVec::with_capacity(13));
223
224        let first_write = b"hello";
225        let second_write = b", worl";
226        let third_write = b"d!";
227
228        // We start out empty:
229        assert_eq!(buf.remaining(), 0);
230        assert_eq!(buf.remaining_mut(), 13);
231
232        // Write the first chunk:
233        buf.put_slice(first_write);
234        assert_eq!(buf.remaining(), 5);
235        assert_eq!(buf.remaining_mut(), 8);
236
237        // Write the second chunk:
238        buf.put_slice(second_write);
239        assert_eq!(buf.remaining(), 11);
240        assert_eq!(buf.remaining_mut(), 2);
241
242        // Read 7 bytes worth:
243        let first_chunk = buf.chunk();
244        assert_eq!(first_chunk.len(), 11);
245        assert_eq!(first_chunk, b"hello, worl");
246
247        buf.advance(7);
248        assert_eq!(buf.remaining(), 4);
249        assert_eq!(buf.remaining_mut(), 2);
250
251        // Write the third chunk:
252        buf.put_slice(third_write);
253        assert_eq!(buf.remaining(), 6);
254        assert_eq!(buf.remaining_mut(), 0);
255
256        // Read the rest:
257        let second_chunk = buf.chunk();
258        assert_eq!(second_chunk.len(), 6);
259        assert_eq!(second_chunk, b"world!");
260
261        buf.advance(6);
262        assert_eq!(buf.remaining(), 0);
263        assert_eq!(buf.remaining_mut(), 0);
264
265        // Clear the buffer:
266        buf.data_mut().clear();
267        assert_eq!(buf.remaining(), 0);
268        assert_eq!(buf.remaining_mut(), 13);
269    }
270
271    #[test]
272    fn collapsible_empty() {
273        let mut buf = get_pooled_object_via_builder::<_, BytesBuffer>(|| FixedSizeVec::with_capacity(13));
274
275        // Buffer is empty.
276        assert_eq!(buf.remaining(), 0);
277        assert_eq!(buf.remaining_mut(), 13);
278
279        buf.collapse();
280
281        // Buffer is still empty.
282        assert_eq!(buf.remaining(), 0);
283        assert_eq!(buf.remaining_mut(), 13);
284    }
285
286    #[test]
287    fn collapsible_remaining_already_collapsed() {
288        let mut buf = get_pooled_object_via_builder::<_, BytesBuffer>(|| FixedSizeVec::with_capacity(24));
289
290        // Write a simple string to the buffer.
291        buf.put_slice(b"hello, world!");
292        assert_eq!(buf.remaining(), 13);
293        assert_eq!(buf.remaining_mut(), 11);
294
295        buf.collapse();
296
297        // Buffer is still the same since we never read anything from the buffer.
298        assert_eq!(buf.remaining(), 13);
299        assert_eq!(buf.remaining_mut(), 11);
300    }
301
302    #[test]
303    fn collapsible_remaining_not_collapsed_no_overlap() {
304        let mut buf = get_pooled_object_via_builder::<_, BytesBuffer>(|| FixedSizeVec::with_capacity(24));
305
306        // Write a simple string to the buffer.
307        buf.put_slice(b"hello, world!");
308        assert_eq!(buf.remaining(), 13);
309        assert_eq!(buf.remaining_mut(), 11);
310        assert_eq!(buf.chunk(), b"hello, world!");
311
312        // Write another simple string to the buffer.
313        buf.put_slice(b"huzzah!");
314        assert_eq!(buf.remaining(), 20);
315        assert_eq!(buf.remaining_mut(), 4);
316        assert_eq!(buf.chunk(), b"hello, world!huzzah!");
317
318        // Simulate reading the first string from the buffer, which will end up leaving a hole in the buffer, prior to
319        // the second string, that is big enough to fit the second string entirely.
320        buf.advance(13);
321        assert_eq!(buf.remaining(), 7);
322        assert_eq!(buf.remaining_mut(), 4);
323        assert_eq!(buf.chunk(), b"huzzah!");
324
325        buf.collapse();
326
327        // Buffer should now be collapsed, with the second string at the beginning of the buffer.
328        assert_eq!(buf.remaining(), 7);
329        assert_eq!(buf.remaining_mut(), 17);
330        assert_eq!(buf.chunk(), b"huzzah!");
331    }
332
333    #[test]
334    fn collapsible_remaining_not_collapsed_with_overlap() {
335        let mut buf = get_pooled_object_via_builder::<_, BytesBuffer>(|| FixedSizeVec::with_capacity(24));
336
337        // Write a simple string to the buffer.
338        buf.put_slice(b"huzzah!");
339        assert_eq!(buf.remaining(), 7);
340        assert_eq!(buf.remaining_mut(), 17);
341        assert_eq!(buf.chunk(), b"huzzah!");
342
343        // Write another simple string to the buffer.
344        buf.put_slice(b"hello, world!");
345        assert_eq!(buf.remaining(), 20);
346        assert_eq!(buf.remaining_mut(), 4);
347        assert_eq!(buf.chunk(), b"huzzah!hello, world!");
348
349        // Simulate reading the first string from the buffer, which will end up leaving a hole in the buffer, prior to
350        // the second string, that isn't big enough to fit the second string entirely.
351        buf.advance(7);
352        assert_eq!(buf.remaining(), 13);
353        assert_eq!(buf.remaining_mut(), 4);
354        assert_eq!(buf.chunk(), b"hello, world!");
355
356        buf.collapse();
357
358        // Buffer should now be collapsed, with the second string at the beginning of the buffer.
359        assert_eq!(buf.remaining(), 13);
360        assert_eq!(buf.remaining_mut(), 11);
361        assert_eq!(buf.chunk(), b"hello, world!");
362    }
363}