Skip to main content

saluki_io/net/util/retry/queue/
mod.rs

1use std::{collections::VecDeque, path::PathBuf, sync::Arc};
2
3use saluki_error::{generic_error, GenericError};
4use serde::{de::DeserializeOwned, Serialize};
5use tracing::debug;
6
7mod persisted;
8pub use self::persisted::DiskUsageRetrieverImpl;
9use self::persisted::{DiskUsageRetriever, DiskUsageRetrieverWrapper, PersistedQueue};
10
11/// A container that holds events.
12///
13/// This trait is used as an incredibly generic way to expose the number of events within a "container", which we
14/// loosely define to be anything that is holding events in some form. This is primarily used to track the number of
15/// events dropped by `RetryQueue` (and `PersistedQueue`) when entries have to be dropped due to size limits.
16pub trait EventContainer {
17    /// Returns the number of events represented by this container.
18    fn event_count(&self) -> u64;
19}
20
21/// A value that can be retried.
22pub trait Retryable: EventContainer + DeserializeOwned + Serialize {
23    /// Returns the in-memory size of this value, in bytes.
24    fn size_bytes(&self) -> u64;
25}
26
27impl EventContainer for String {
28    fn event_count(&self) -> u64 {
29        1
30    }
31}
32
33impl Retryable for String {
34    fn size_bytes(&self) -> u64 {
35        self.len() as u64
36    }
37}
38
39/// Result of a push operation.
40///
41/// As pushing items to `RetryQueue` may result in dropping older items to make room for new ones, this struct tracks
42/// the total number of items dropped, and the number of events represented by those items.
43#[derive(Default)]
44#[must_use = "`PushResult` carries information about potentially dropped items/events and should not be ignored"]
45pub struct PushResult {
46    /// Total number of items dropped.
47    pub items_dropped: u64,
48
49    /// Total number of events represented by the dropped items.
50    pub events_dropped: u64,
51}
52
53impl PushResult {
54    /// Returns `true` if any items were dropped.
55    pub fn had_drops(&self) -> bool {
56        self.items_dropped > 0
57    }
58
59    /// Merges `other` into `Self`.
60    pub fn merge(&mut self, other: Self) {
61        self.items_dropped += other.items_dropped;
62        self.events_dropped += other.events_dropped;
63    }
64
65    /// Tracks a single dropped item.
66    pub fn track_dropped_item(&mut self, event_count: u64) {
67        self.items_dropped += 1;
68        self.events_dropped += event_count;
69    }
70}
71
72/// A queue for storing requests to be retried.
73pub struct RetryQueue<T> {
74    queue_name: String,
75    pending: VecDeque<T>,
76    persisted_pending: Option<PersistedQueue<T>>,
77    total_in_memory_bytes: u64,
78    max_in_memory_bytes: u64,
79}
80
81impl<T> RetryQueue<T>
82where
83    T: Retryable,
84{
85    /// Creates a new `RetryQueue` instance with the given name and maximum size.
86    ///
87    /// The queue will only hold as many entries as can fit within the given maximum size. If the queue is full, the
88    /// oldest entries will be removed (or potentially persisted to disk, see
89    /// [`with_disk_persistence`][Self::with_disk_persistence]) to make room for new entries.
90    pub fn new(queue_name: String, max_in_memory_bytes: u64) -> Self {
91        Self {
92            queue_name,
93            pending: VecDeque::new(),
94            persisted_pending: None,
95            total_in_memory_bytes: 0,
96            max_in_memory_bytes,
97        }
98    }
99
100    /// Configures the queue to persist pending entries to disk.
101    ///
102    /// Disk persistence is used as a fallback to in-memory storage when the queue is full. When attempting to add a new
103    /// entry to the queue, and the queue cannot fit the entry in-memory, in-memory entries will be persisted to disk,
104    /// oldest first.
105    ///
106    /// When reading entries from the queue, in-memory entries are read first, followed by persisted entries. This
107    /// provides priority to the most recent entries added to the queue, but allows for bursting over the configured
108    /// in-memory size limit without having to immediately discard entries.
109    ///
110    /// Files are stored in a subdirectory, with the same name as the given queue name, within the given `root_path`.
111    ///
112    /// # Errors
113    ///
114    /// If there is an error initializing the disk persistence layer, an error is returned.
115    pub async fn with_disk_persistence(
116        mut self, root_path: PathBuf, max_disk_size_bytes: u64, storage_max_disk_ratio: f64,
117        disk_usage_retriever: Arc<dyn DiskUsageRetriever + Send + Sync>,
118    ) -> Result<Self, GenericError> {
119        // Make sure the root storage path is non-empty, as otherwise we can't generate a valid path
120        // for the persisted entries in this retry queue.
121        if root_path.as_os_str().is_empty() {
122            return Err(generic_error!("Storage path cannot be empty."));
123        }
124
125        let queue_root_path = root_path.join(&self.queue_name);
126        let persisted_pending = PersistedQueue::from_root_path(
127            queue_root_path,
128            max_disk_size_bytes,
129            storage_max_disk_ratio,
130            DiskUsageRetrieverWrapper::new(disk_usage_retriever),
131        )
132        .await?;
133        self.persisted_pending = Some(persisted_pending);
134        Ok(self)
135    }
136
137    /// Returns `true` if the queue is empty.
138    ///
139    /// This includes both in-memory and persisted entries.
140    pub fn is_empty(&self) -> bool {
141        self.pending.is_empty() && self.persisted_pending.as_ref().is_none_or(|p| p.is_empty())
142    }
143
144    /// Returns the number of entries in the queue
145    ///
146    /// This includes both in-memory and persisted entries.
147    pub fn len(&self) -> usize {
148        self.pending.len() + self.persisted_pending.as_ref().map_or(0, |p| p.len())
149    }
150
151    /// Returns the number of persisted entries that have been permanently dropped due to errors since the last call
152    /// to this method, resetting the counter.
153    ///
154    /// Always returns 0 if disk persistence is not enabled.
155    pub fn take_persisted_entries_dropped(&mut self) -> u64 {
156        self.persisted_pending.as_mut().map_or(0, |p| p.take_entries_dropped())
157    }
158
159    /// Enqueues an entry.
160    ///
161    /// If the queue is full and the entry cannot be enqueue in-memory, and disk persistence is enabled, in-memory
162    /// entries will be moved to disk (oldest first) until enough capacity is available to enqueue the new entry
163    /// in-memory.
164    ///
165    /// # Errors
166    ///
167    /// If the entry is too large to fit into the queue, or if there is an error when persisting entries to disk, an
168    /// error is returned.
169    pub async fn push(&mut self, entry: T) -> Result<PushResult, GenericError> {
170        let mut push_result = PushResult::default();
171
172        // Make sure the entry, by itself, isn't too big to ever fit into the queue.
173        let current_entry_size = entry.size_bytes();
174        if current_entry_size > self.max_in_memory_bytes {
175            return Err(generic_error!(
176                "Entry too large to fit into retry queue. ({} > {})",
177                current_entry_size,
178                self.max_in_memory_bytes
179            ));
180        }
181
182        // Make sure we have enough room for this incoming entry, either by persisting older entries to disk or by
183        // simply dropping them.
184        while !self.pending.is_empty() && self.total_in_memory_bytes + current_entry_size > self.max_in_memory_bytes {
185            let oldest_entry = self.pending.pop_front().expect("queue is not empty");
186            let oldest_entry_size = oldest_entry.size_bytes();
187
188            if let Some(persisted_pending) = &mut self.persisted_pending {
189                let persist_result = persisted_pending.push(oldest_entry).await?;
190                push_result.merge(persist_result);
191
192                debug!(entry.len = oldest_entry_size, "Moved in-memory entry to disk.");
193            } else {
194                debug!(
195                    entry.len = oldest_entry_size,
196                    "Dropped in-memory entry to increase available capacity."
197                );
198
199                push_result.track_dropped_item(oldest_entry.event_count());
200            }
201
202            self.total_in_memory_bytes -= oldest_entry_size;
203        }
204
205        self.pending.push_back(entry);
206        self.total_in_memory_bytes += current_entry_size;
207        debug!(entry.len = current_entry_size, "Enqueued in-memory entry.");
208
209        Ok(push_result)
210    }
211
212    /// Consumes an entry.
213    ///
214    /// In-memory entries are consumed first, followed by persisted entries if disk persistence is enabled.
215    ///
216    /// If no entries are available, `None` is returned.
217    ///
218    /// # Errors
219    ///
220    /// If there is an error when consuming an entry from disk, whether due to reading or deserializing the entry, an
221    /// error is returned.
222    pub async fn pop(&mut self) -> Result<Option<T>, GenericError> {
223        // Pull from in-memory first to prioritize the most recent entries.
224        if let Some(entry) = self.pending.pop_front() {
225            self.total_in_memory_bytes -= entry.size_bytes();
226            debug!(entry.len = entry.size_bytes(), "Dequeued in-memory entry.");
227
228            return Ok(Some(entry));
229        }
230
231        // If we have disk persistence enabled, pull from disk next.
232        if let Some(persisted_pending) = &mut self.persisted_pending {
233            if let Some(entry) = persisted_pending.pop().await? {
234                return Ok(Some(entry));
235            }
236        }
237
238        Ok(None)
239    }
240
241    /// Flushes all entries, potentially persisting them to disk.
242    ///
243    /// When disk persistence is configured, this will flush all in-memory entries to disk. Flushing to disk still obeys
244    /// the the normal limiting behavior in terms of maximum on-disk size. When disk persistence is not enabled, all
245    /// in-memory entries will be dropped.
246    ///
247    /// # Errors
248    ///
249    /// If an error occurs while persisting an entry to disk, an error is returned.
250    pub async fn flush(mut self) -> Result<PushResult, GenericError> {
251        let mut push_result = PushResult::default();
252
253        while let Some(entry) = self.pending.pop_front() {
254            let entry_size = entry.size_bytes();
255
256            if let Some(persisted_pending) = &mut self.persisted_pending {
257                let persist_result = persisted_pending.push(entry).await?;
258                push_result.merge(persist_result);
259
260                debug!(entry.len = entry_size, "Flushed in-memory entry to disk.");
261            } else {
262                debug!(entry.len = entry_size, "Dropped in-memory entry during flush.");
263
264                push_result.track_dropped_item(entry.event_count());
265            }
266        }
267
268        Ok(push_result)
269    }
270}
271
272#[cfg(test)]
273mod tests {
274    use std::path::Path;
275
276    use rand::Rng as _;
277    use rand_distr::Alphanumeric;
278    use serde::Deserialize;
279
280    use super::*;
281
282    #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
283    struct FakeData {
284        name: String,
285        value: u32,
286    }
287
288    impl FakeData {
289        fn random() -> Self {
290            Self {
291                name: rand::rng().sample_iter(&Alphanumeric).take(8).map(char::from).collect(),
292                value: rand::rng().random_range(0..100),
293            }
294        }
295    }
296
297    impl EventContainer for FakeData {
298        fn event_count(&self) -> u64 {
299            1
300        }
301    }
302
303    impl Retryable for FakeData {
304        fn size_bytes(&self) -> u64 {
305            (self.name.len() + std::mem::size_of::<String>() + 4) as u64
306        }
307    }
308
309    fn file_count_recursive<P: AsRef<Path>>(path: P) -> u64 {
310        let mut count = 0;
311        let entries = std::fs::read_dir(path).expect("should not fail to read directory");
312        for maybe_entry in entries {
313            let entry = maybe_entry.expect("should not fail to read directory entry");
314            if entry.file_type().expect("should not fail to get file type").is_file() {
315                count += 1;
316            } else if entry.file_type().expect("should not fail to get file type").is_dir() {
317                count += file_count_recursive(entry.path());
318            }
319        }
320        count
321    }
322
323    #[tokio::test]
324    async fn basic_push_pop() {
325        let data = FakeData::random();
326
327        let mut retry_queue = RetryQueue::<FakeData>::new("test".to_string(), 1024);
328
329        // Push our data to the queue.
330        let push_result = retry_queue
331            .push(data.clone())
332            .await
333            .expect("should not fail to push data");
334        assert_eq!(0, push_result.items_dropped);
335        assert_eq!(0, push_result.events_dropped);
336
337        // Now pop the data back out and ensure it matches what we pushed, and that the file has been removed from disk.
338        let actual = retry_queue
339            .pop()
340            .await
341            .expect("should not fail to pop data")
342            .expect("should not be empty");
343        assert_eq!(data, actual);
344    }
345
346    #[tokio::test]
347    async fn entry_too_large() {
348        let data = FakeData::random();
349
350        let mut retry_queue = RetryQueue::<FakeData>::new("test".to_string(), 1);
351
352        // Attempt to push our data into the queue, which should fail because it's too large.
353        assert!(retry_queue.push(data).await.is_err());
354    }
355
356    #[tokio::test]
357    async fn remove_oldest_entry_on_push() {
358        let data1 = FakeData::random();
359        let data2 = FakeData::random();
360
361        // Create our retry queue such that it is sized to only fit one entry at a time.
362        let mut retry_queue = RetryQueue::<FakeData>::new("test".to_string(), 36);
363
364        // Push our data to the queue.
365        let push_result = retry_queue.push(data1).await.expect("should not fail to push data");
366        assert_eq!(0, push_result.items_dropped);
367        assert_eq!(0, push_result.events_dropped);
368
369        // Push a second data entry, which should cause the first entry to be removed.
370        let push_result = retry_queue
371            .push(data2.clone())
372            .await
373            .expect("should not fail to push data");
374        assert_eq!(1, push_result.items_dropped);
375        assert_eq!(1, push_result.events_dropped);
376
377        // Now pop the data back out and ensure it matches the second item we pushed, indicating the first item was
378        // removed from the queue to make room.
379        let actual = retry_queue
380            .pop()
381            .await
382            .expect("should not fail to pop data")
383            .expect("should not be empty");
384        assert_eq!(data2, actual);
385    }
386
387    #[tokio::test]
388    async fn flush_no_disk() {
389        let data1 = FakeData::random();
390        let data2 = FakeData::random();
391
392        // Create our retry queue such that it can hold both items.
393        let mut retry_queue = RetryQueue::<FakeData>::new("test".to_string(), u64::MAX);
394
395        // Push our data to the queue.
396        let push_result1 = retry_queue.push(data1).await.expect("should not fail to push data");
397        assert_eq!(0, push_result1.items_dropped);
398        assert_eq!(0, push_result1.events_dropped);
399        let push_result2 = retry_queue.push(data2).await.expect("should not fail to push data");
400        assert_eq!(0, push_result2.items_dropped);
401        assert_eq!(0, push_result2.events_dropped);
402
403        // Flush the queue, which should drop all entries as we have no disk persistence layer configured.
404        let flush_result = retry_queue.flush().await.expect("should not fail to flush");
405        assert_eq!(2, flush_result.items_dropped);
406        assert_eq!(2, flush_result.events_dropped);
407    }
408
409    #[tokio::test]
410    async fn flush_disk() {
411        let data1 = FakeData::random();
412        let data2 = FakeData::random();
413
414        // Create our retry queue such that it can hold both items, and enable disk persistence.
415        let temp_dir = tempfile::tempdir().expect("should not fail to create temporary directory");
416        let root_path = temp_dir.path().to_path_buf();
417
418        // Just a sanity check to ensure our temp directory is empty.
419        assert_eq!(0, file_count_recursive(&root_path));
420
421        let mut retry_queue = RetryQueue::<FakeData>::new("test".to_string(), u64::MAX)
422            .with_disk_persistence(
423                root_path.clone(),
424                u64::MAX,
425                1.0,
426                Arc::new(DiskUsageRetrieverImpl::new(root_path.clone())),
427            )
428            .await
429            .expect("should not fail to create retry queue with disk persistence");
430
431        // Push our data to the queue.
432        let push_result1 = retry_queue.push(data1).await.expect("should not fail to push data");
433        assert_eq!(0, push_result1.items_dropped);
434        assert_eq!(0, push_result1.events_dropped);
435        let push_result2 = retry_queue.push(data2).await.expect("should not fail to push data");
436        assert_eq!(0, push_result2.items_dropped);
437        assert_eq!(0, push_result2.events_dropped);
438
439        // Flush the queue, which should push all entries to disk.
440        let flush_result = retry_queue.flush().await.expect("should not fail to flush");
441        assert_eq!(0, flush_result.items_dropped);
442        assert_eq!(0, flush_result.events_dropped);
443
444        // We should now have two files on disk after flushing.
445        assert_eq!(2, file_count_recursive(&root_path));
446    }
447}