1use std::{mem::ManuallyDrop, sync::Arc};
2
3use bytes::{buf::UninitSlice, Buf, BufMut};
4use saluki_core::pooling::{helpers::pooled_newtype, Clearable, ReclaimStrategy};
5use triomphe::{Arc as TriompheArc, UniqueArc};
6
7use super::{ClearableIoBuffer, CollapsibleReadWriteIoBuffer, ReadIoBuffer};
8
9pub struct FixedSizeVec {
17 data: UniqueArc<Vec<u8>>,
18 read_idx: usize,
19}
20
21impl FixedSizeVec {
22 pub fn with_capacity(capacity: usize) -> Self {
26 Self {
27 data: UniqueArc::new(Vec::with_capacity(capacity)),
28 read_idx: 0,
29 }
30 }
31
32 fn freeze(self) -> FrozenFixedSizeVec {
33 FrozenFixedSizeVec {
34 data: self.data.shareable(),
35 read_idx: self.read_idx,
36 }
37 }
38}
39
40impl Clearable for FixedSizeVec {
41 fn clear(&mut self) {
42 self.data.clear();
43 self.read_idx = 0;
44 }
45}
46
47struct FrozenFixedSizeVec {
48 data: TriompheArc<Vec<u8>>,
49 read_idx: usize,
50}
51
52impl FrozenFixedSizeVec {
53 fn into_unique(self) -> Option<FixedSizeVec> {
54 TriompheArc::into_unique(self.data).map(|data| FixedSizeVec { data, read_idx: 0 })
55 }
56}
57
58impl Clone for FrozenFixedSizeVec {
59 fn clone(&self) -> Self {
60 Self {
61 data: self.data.clone(),
62 read_idx: 0,
63 }
64 }
65}
66
67pooled_newtype! {
68 outer => BytesBuffer,
69 inner => FixedSizeVec,
70}
71
72impl BytesBuffer {
73 pub fn freeze(mut self) -> FrozenBytesBuffer {
75 let data = self.data.take().unwrap().freeze();
76
77 FrozenBytesBuffer {
78 strategy_ref: Arc::clone(&self.strategy_ref),
79 data: ManuallyDrop::new(data),
80 }
81 }
82}
83
84impl Buf for BytesBuffer {
85 fn remaining(&self) -> usize {
86 self.data().data.len() - self.data().read_idx
87 }
88
89 fn chunk(&self) -> &[u8] {
90 let data = self.data();
91 &data.data[data.read_idx..data.data.len()]
92 }
93
94 fn advance(&mut self, cnt: usize) {
95 let data = self.data_mut();
96 assert!(data.read_idx + cnt <= data.data.len());
97 data.read_idx += cnt;
98 }
99}
100
101unsafe impl BufMut for BytesBuffer {
102 fn remaining_mut(&self) -> usize {
103 let data = self.data();
104 data.data.capacity() - data.data.len()
105 }
106
107 fn chunk_mut(&mut self) -> &mut UninitSlice {
108 self.data_mut().data.spare_capacity_mut().into()
109 }
110
111 unsafe fn advance_mut(&mut self, cnt: usize) {
112 let new_len = self.data().data.len() + cnt;
113 self.data_mut().data.set_len(new_len);
114 }
115}
116
117impl ReadIoBuffer for BytesBuffer {
118 fn capacity(&self) -> usize {
119 self.data().data.capacity()
120 }
121}
122
123impl CollapsibleReadWriteIoBuffer for BytesBuffer {
124 fn collapse(&mut self) {
125 let remaining = self.remaining();
126
127 if remaining == 0 {
129 let inner = self.data_mut();
130 inner.read_idx = 0;
131 inner.data.clear();
132 return;
133 }
134
135 let inner = self.data_mut();
138
139 let src_start = inner.read_idx;
140 let src_end = inner.data.len();
141 inner.data.copy_within(src_start..src_end, 0);
142 inner.data.truncate(remaining);
143 inner.read_idx = 0;
144 }
145}
146
147impl ClearableIoBuffer for BytesBuffer {
148 fn clear(&mut self) {
149 self.data_mut().clear();
150 }
151}
152
153#[derive(Clone)]
164pub struct FrozenBytesBuffer {
165 strategy_ref: Arc<dyn ReclaimStrategy<BytesBuffer> + Send + Sync>,
166 data: ManuallyDrop<FrozenFixedSizeVec>,
167}
168
169impl FrozenBytesBuffer {
170 pub fn is_empty(&self) -> bool {
172 self.data.read_idx == self.data.data.len()
173 }
174
175 pub fn len(&self) -> usize {
177 self.data.data.len() - self.data.read_idx
178 }
179
180 pub fn capacity(&self) -> usize {
182 self.data.data.capacity()
183 }
184}
185
186impl Buf for FrozenBytesBuffer {
187 fn remaining(&self) -> usize {
188 self.data.data.len() - self.data.read_idx
189 }
190
191 fn chunk(&self) -> &[u8] {
192 &self.data.data[self.data.read_idx..]
193 }
194
195 fn advance(&mut self, cnt: usize) {
196 assert!(self.data.read_idx + cnt <= self.data.data.len());
197 self.data.read_idx += cnt;
198 }
199}
200
201impl Drop for FrozenBytesBuffer {
202 fn drop(&mut self) {
203 let data = unsafe { ManuallyDrop::take(&mut self.data) };
208 if let Some(data) = data.into_unique() {
209 self.strategy_ref.reclaim(data);
210 }
211 }
212}
213
214#[cfg(test)]
215mod tests {
216 use saluki_core::pooling::helpers::get_pooled_object_via_builder;
217
218 use super::*;
219
220 #[test]
221 fn basic() {
222 let mut buf = get_pooled_object_via_builder::<_, BytesBuffer>(|| FixedSizeVec::with_capacity(13));
223
224 let first_write = b"hello";
225 let second_write = b", worl";
226 let third_write = b"d!";
227
228 assert_eq!(buf.remaining(), 0);
230 assert_eq!(buf.remaining_mut(), 13);
231
232 buf.put_slice(first_write);
234 assert_eq!(buf.remaining(), 5);
235 assert_eq!(buf.remaining_mut(), 8);
236
237 buf.put_slice(second_write);
239 assert_eq!(buf.remaining(), 11);
240 assert_eq!(buf.remaining_mut(), 2);
241
242 let first_chunk = buf.chunk();
244 assert_eq!(first_chunk.len(), 11);
245 assert_eq!(first_chunk, b"hello, worl");
246
247 buf.advance(7);
248 assert_eq!(buf.remaining(), 4);
249 assert_eq!(buf.remaining_mut(), 2);
250
251 buf.put_slice(third_write);
253 assert_eq!(buf.remaining(), 6);
254 assert_eq!(buf.remaining_mut(), 0);
255
256 let second_chunk = buf.chunk();
258 assert_eq!(second_chunk.len(), 6);
259 assert_eq!(second_chunk, b"world!");
260
261 buf.advance(6);
262 assert_eq!(buf.remaining(), 0);
263 assert_eq!(buf.remaining_mut(), 0);
264
265 buf.data_mut().clear();
267 assert_eq!(buf.remaining(), 0);
268 assert_eq!(buf.remaining_mut(), 13);
269 }
270
271 #[test]
272 fn collapsible_empty() {
273 let mut buf = get_pooled_object_via_builder::<_, BytesBuffer>(|| FixedSizeVec::with_capacity(13));
274
275 assert_eq!(buf.remaining(), 0);
277 assert_eq!(buf.remaining_mut(), 13);
278
279 buf.collapse();
280
281 assert_eq!(buf.remaining(), 0);
283 assert_eq!(buf.remaining_mut(), 13);
284 }
285
286 #[test]
287 fn collapsible_remaining_already_collapsed() {
288 let mut buf = get_pooled_object_via_builder::<_, BytesBuffer>(|| FixedSizeVec::with_capacity(24));
289
290 buf.put_slice(b"hello, world!");
292 assert_eq!(buf.remaining(), 13);
293 assert_eq!(buf.remaining_mut(), 11);
294
295 buf.collapse();
296
297 assert_eq!(buf.remaining(), 13);
299 assert_eq!(buf.remaining_mut(), 11);
300 }
301
302 #[test]
303 fn collapsible_remaining_not_collapsed_no_overlap() {
304 let mut buf = get_pooled_object_via_builder::<_, BytesBuffer>(|| FixedSizeVec::with_capacity(24));
305
306 buf.put_slice(b"hello, world!");
308 assert_eq!(buf.remaining(), 13);
309 assert_eq!(buf.remaining_mut(), 11);
310 assert_eq!(buf.chunk(), b"hello, world!");
311
312 buf.put_slice(b"huzzah!");
314 assert_eq!(buf.remaining(), 20);
315 assert_eq!(buf.remaining_mut(), 4);
316 assert_eq!(buf.chunk(), b"hello, world!huzzah!");
317
318 buf.advance(13);
321 assert_eq!(buf.remaining(), 7);
322 assert_eq!(buf.remaining_mut(), 4);
323 assert_eq!(buf.chunk(), b"huzzah!");
324
325 buf.collapse();
326
327 assert_eq!(buf.remaining(), 7);
329 assert_eq!(buf.remaining_mut(), 17);
330 assert_eq!(buf.chunk(), b"huzzah!");
331 }
332
333 #[test]
334 fn collapsible_remaining_not_collapsed_with_overlap() {
335 let mut buf = get_pooled_object_via_builder::<_, BytesBuffer>(|| FixedSizeVec::with_capacity(24));
336
337 buf.put_slice(b"huzzah!");
339 assert_eq!(buf.remaining(), 7);
340 assert_eq!(buf.remaining_mut(), 17);
341 assert_eq!(buf.chunk(), b"huzzah!");
342
343 buf.put_slice(b"hello, world!");
345 assert_eq!(buf.remaining(), 20);
346 assert_eq!(buf.remaining_mut(), 4);
347 assert_eq!(buf.chunk(), b"huzzah!hello, world!");
348
349 buf.advance(7);
352 assert_eq!(buf.remaining(), 13);
353 assert_eq!(buf.remaining_mut(), 4);
354 assert_eq!(buf.chunk(), b"hello, world!");
355
356 buf.collapse();
357
358 assert_eq!(buf.remaining(), 13);
360 assert_eq!(buf.remaining_mut(), 11);
361 assert_eq!(buf.chunk(), b"hello, world!");
362 }
363}