saluki_io/net/util/retry/queue/
mod.rs1use std::{collections::VecDeque, path::PathBuf, sync::Arc};
2
3use saluki_error::{generic_error, GenericError};
4use serde::{de::DeserializeOwned, Serialize};
5use tracing::debug;
6
7mod persisted;
8pub use self::persisted::DiskUsageRetrieverImpl;
9use self::persisted::{DiskUsageRetriever, DiskUsageRetrieverWrapper, PersistedQueue};
10
11pub trait EventContainer {
17 fn event_count(&self) -> u64;
19}
20
21pub trait Retryable: EventContainer + DeserializeOwned + Serialize {
23 fn size_bytes(&self) -> u64;
25}
26
27impl EventContainer for String {
28 fn event_count(&self) -> u64 {
29 1
30 }
31}
32
33impl Retryable for String {
34 fn size_bytes(&self) -> u64 {
35 self.len() as u64
36 }
37}
38
39#[derive(Default)]
44#[must_use = "`PushResult` carries information about potentially dropped items/events and should not be ignored"]
45pub struct PushResult {
46 pub items_dropped: u64,
48
49 pub events_dropped: u64,
51}
52
53impl PushResult {
54 pub fn had_drops(&self) -> bool {
56 self.items_dropped > 0
57 }
58
59 pub fn merge(&mut self, other: Self) {
61 self.items_dropped += other.items_dropped;
62 self.events_dropped += other.events_dropped;
63 }
64
65 pub fn track_dropped_item(&mut self, event_count: u64) {
67 self.items_dropped += 1;
68 self.events_dropped += event_count;
69 }
70}
71
72pub struct RetryQueue<T> {
74 queue_name: String,
75 pending: VecDeque<T>,
76 persisted_pending: Option<PersistedQueue<T>>,
77 total_in_memory_bytes: u64,
78 max_in_memory_bytes: u64,
79}
80
81impl<T> RetryQueue<T>
82where
83 T: Retryable,
84{
85 pub fn new(queue_name: String, max_in_memory_bytes: u64) -> Self {
91 Self {
92 queue_name,
93 pending: VecDeque::new(),
94 persisted_pending: None,
95 total_in_memory_bytes: 0,
96 max_in_memory_bytes,
97 }
98 }
99
100 pub async fn with_disk_persistence(
116 mut self, root_path: PathBuf, max_disk_size_bytes: u64, storage_max_disk_ratio: f64,
117 disk_usage_retriever: Arc<dyn DiskUsageRetriever + Send + Sync>,
118 ) -> Result<Self, GenericError> {
119 if root_path.as_os_str().is_empty() {
122 return Err(generic_error!("Storage path cannot be empty."));
123 }
124
125 let queue_root_path = root_path.join(&self.queue_name);
126 let persisted_pending = PersistedQueue::from_root_path(
127 queue_root_path,
128 max_disk_size_bytes,
129 storage_max_disk_ratio,
130 DiskUsageRetrieverWrapper::new(disk_usage_retriever),
131 )
132 .await?;
133 self.persisted_pending = Some(persisted_pending);
134 Ok(self)
135 }
136
137 pub fn is_empty(&self) -> bool {
141 self.pending.is_empty() && self.persisted_pending.as_ref().is_none_or(|p| p.is_empty())
142 }
143
144 pub fn len(&self) -> usize {
148 self.pending.len() + self.persisted_pending.as_ref().map_or(0, |p| p.len())
149 }
150
151 pub fn take_persisted_entries_dropped(&mut self) -> u64 {
156 self.persisted_pending.as_mut().map_or(0, |p| p.take_entries_dropped())
157 }
158
159 pub async fn push(&mut self, entry: T) -> Result<PushResult, GenericError> {
170 let mut push_result = PushResult::default();
171
172 let current_entry_size = entry.size_bytes();
174 if current_entry_size > self.max_in_memory_bytes {
175 return Err(generic_error!(
176 "Entry too large to fit into retry queue. ({} > {})",
177 current_entry_size,
178 self.max_in_memory_bytes
179 ));
180 }
181
182 while !self.pending.is_empty() && self.total_in_memory_bytes + current_entry_size > self.max_in_memory_bytes {
185 let oldest_entry = self.pending.pop_front().expect("queue is not empty");
186 let oldest_entry_size = oldest_entry.size_bytes();
187
188 if let Some(persisted_pending) = &mut self.persisted_pending {
189 let persist_result = persisted_pending.push(oldest_entry).await?;
190 push_result.merge(persist_result);
191
192 debug!(entry.len = oldest_entry_size, "Moved in-memory entry to disk.");
193 } else {
194 debug!(
195 entry.len = oldest_entry_size,
196 "Dropped in-memory entry to increase available capacity."
197 );
198
199 push_result.track_dropped_item(oldest_entry.event_count());
200 }
201
202 self.total_in_memory_bytes -= oldest_entry_size;
203 }
204
205 self.pending.push_back(entry);
206 self.total_in_memory_bytes += current_entry_size;
207 debug!(entry.len = current_entry_size, "Enqueued in-memory entry.");
208
209 Ok(push_result)
210 }
211
212 pub async fn pop(&mut self) -> Result<Option<T>, GenericError> {
223 if let Some(entry) = self.pending.pop_front() {
225 self.total_in_memory_bytes -= entry.size_bytes();
226 debug!(entry.len = entry.size_bytes(), "Dequeued in-memory entry.");
227
228 return Ok(Some(entry));
229 }
230
231 if let Some(persisted_pending) = &mut self.persisted_pending {
233 if let Some(entry) = persisted_pending.pop().await? {
234 return Ok(Some(entry));
235 }
236 }
237
238 Ok(None)
239 }
240
241 pub async fn flush(mut self) -> Result<PushResult, GenericError> {
251 let mut push_result = PushResult::default();
252
253 while let Some(entry) = self.pending.pop_front() {
254 let entry_size = entry.size_bytes();
255
256 if let Some(persisted_pending) = &mut self.persisted_pending {
257 let persist_result = persisted_pending.push(entry).await?;
258 push_result.merge(persist_result);
259
260 debug!(entry.len = entry_size, "Flushed in-memory entry to disk.");
261 } else {
262 debug!(entry.len = entry_size, "Dropped in-memory entry during flush.");
263
264 push_result.track_dropped_item(entry.event_count());
265 }
266 }
267
268 Ok(push_result)
269 }
270}
271
272#[cfg(test)]
273mod tests {
274 use std::path::Path;
275
276 use rand::Rng as _;
277 use rand_distr::Alphanumeric;
278 use serde::Deserialize;
279
280 use super::*;
281
282 #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
283 struct FakeData {
284 name: String,
285 value: u32,
286 }
287
288 impl FakeData {
289 fn random() -> Self {
290 Self {
291 name: rand::rng().sample_iter(&Alphanumeric).take(8).map(char::from).collect(),
292 value: rand::rng().random_range(0..100),
293 }
294 }
295 }
296
297 impl EventContainer for FakeData {
298 fn event_count(&self) -> u64 {
299 1
300 }
301 }
302
303 impl Retryable for FakeData {
304 fn size_bytes(&self) -> u64 {
305 (self.name.len() + std::mem::size_of::<String>() + 4) as u64
306 }
307 }
308
309 fn file_count_recursive<P: AsRef<Path>>(path: P) -> u64 {
310 let mut count = 0;
311 let entries = std::fs::read_dir(path).expect("should not fail to read directory");
312 for maybe_entry in entries {
313 let entry = maybe_entry.expect("should not fail to read directory entry");
314 if entry.file_type().expect("should not fail to get file type").is_file() {
315 count += 1;
316 } else if entry.file_type().expect("should not fail to get file type").is_dir() {
317 count += file_count_recursive(entry.path());
318 }
319 }
320 count
321 }
322
323 #[tokio::test]
324 async fn basic_push_pop() {
325 let data = FakeData::random();
326
327 let mut retry_queue = RetryQueue::<FakeData>::new("test".to_string(), 1024);
328
329 let push_result = retry_queue
331 .push(data.clone())
332 .await
333 .expect("should not fail to push data");
334 assert_eq!(0, push_result.items_dropped);
335 assert_eq!(0, push_result.events_dropped);
336
337 let actual = retry_queue
339 .pop()
340 .await
341 .expect("should not fail to pop data")
342 .expect("should not be empty");
343 assert_eq!(data, actual);
344 }
345
346 #[tokio::test]
347 async fn entry_too_large() {
348 let data = FakeData::random();
349
350 let mut retry_queue = RetryQueue::<FakeData>::new("test".to_string(), 1);
351
352 assert!(retry_queue.push(data).await.is_err());
354 }
355
356 #[tokio::test]
357 async fn remove_oldest_entry_on_push() {
358 let data1 = FakeData::random();
359 let data2 = FakeData::random();
360
361 let mut retry_queue = RetryQueue::<FakeData>::new("test".to_string(), 36);
363
364 let push_result = retry_queue.push(data1).await.expect("should not fail to push data");
366 assert_eq!(0, push_result.items_dropped);
367 assert_eq!(0, push_result.events_dropped);
368
369 let push_result = retry_queue
371 .push(data2.clone())
372 .await
373 .expect("should not fail to push data");
374 assert_eq!(1, push_result.items_dropped);
375 assert_eq!(1, push_result.events_dropped);
376
377 let actual = retry_queue
380 .pop()
381 .await
382 .expect("should not fail to pop data")
383 .expect("should not be empty");
384 assert_eq!(data2, actual);
385 }
386
387 #[tokio::test]
388 async fn flush_no_disk() {
389 let data1 = FakeData::random();
390 let data2 = FakeData::random();
391
392 let mut retry_queue = RetryQueue::<FakeData>::new("test".to_string(), u64::MAX);
394
395 let push_result1 = retry_queue.push(data1).await.expect("should not fail to push data");
397 assert_eq!(0, push_result1.items_dropped);
398 assert_eq!(0, push_result1.events_dropped);
399 let push_result2 = retry_queue.push(data2).await.expect("should not fail to push data");
400 assert_eq!(0, push_result2.items_dropped);
401 assert_eq!(0, push_result2.events_dropped);
402
403 let flush_result = retry_queue.flush().await.expect("should not fail to flush");
405 assert_eq!(2, flush_result.items_dropped);
406 assert_eq!(2, flush_result.events_dropped);
407 }
408
409 #[tokio::test]
410 async fn flush_disk() {
411 let data1 = FakeData::random();
412 let data2 = FakeData::random();
413
414 let temp_dir = tempfile::tempdir().expect("should not fail to create temporary directory");
416 let root_path = temp_dir.path().to_path_buf();
417
418 assert_eq!(0, file_count_recursive(&root_path));
420
421 let mut retry_queue = RetryQueue::<FakeData>::new("test".to_string(), u64::MAX)
422 .with_disk_persistence(
423 root_path.clone(),
424 u64::MAX,
425 1.0,
426 Arc::new(DiskUsageRetrieverImpl::new(root_path.clone())),
427 )
428 .await
429 .expect("should not fail to create retry queue with disk persistence");
430
431 let push_result1 = retry_queue.push(data1).await.expect("should not fail to push data");
433 assert_eq!(0, push_result1.items_dropped);
434 assert_eq!(0, push_result1.events_dropped);
435 let push_result2 = retry_queue.push(data2).await.expect("should not fail to push data");
436 assert_eq!(0, push_result2.items_dropped);
437 assert_eq!(0, push_result2.events_dropped);
438
439 let flush_result = retry_queue.flush().await.expect("should not fail to flush");
441 assert_eq!(0, flush_result.items_dropped);
442 assert_eq!(0, flush_result.events_dropped);
443
444 assert_eq!(2, file_count_recursive(&root_path));
446 }
447}