saluki_io/net/util/retry/queue/
mod.rs

1use std::{collections::VecDeque, path::PathBuf, sync::Arc};
2
3use saluki_error::{generic_error, GenericError};
4use serde::{de::DeserializeOwned, Serialize};
5use tracing::debug;
6
7mod persisted;
8pub use self::persisted::DiskUsageRetrieverImpl;
9use self::persisted::{DiskUsageRetriever, DiskUsageRetrieverWrapper, PersistedQueue};
10
11/// A container that holds events.
12///
13/// This trait is used as an incredibly generic way to expose the number of events within a "container", which we
14/// loosely define to be anything that is holding events in some form. This is primarily used to track the number of
15/// events dropped by `RetryQueue` (and `PersistedQueue`) when entries have to be dropped due to size limits.
16pub trait EventContainer {
17    /// Returns the number of events represented by this container.
18    fn event_count(&self) -> u64;
19}
20
21/// A value that can be retried.
22pub trait Retryable: EventContainer + DeserializeOwned + Serialize {
23    /// Returns the in-memory size of this value, in bytes.
24    fn size_bytes(&self) -> u64;
25}
26
27impl EventContainer for String {
28    fn event_count(&self) -> u64 {
29        1
30    }
31}
32
33impl Retryable for String {
34    fn size_bytes(&self) -> u64 {
35        self.len() as u64
36    }
37}
38
39/// Result of a push operation.
40///
41/// As pushing items to `RetryQueue` may result in dropping older items to make room for new ones, this struct tracks
42/// the total number of items dropped, and the number of events represented by those items.
43#[derive(Default)]
44#[must_use = "`PushResult` carries information about potentially dropped items/events and should not be ignored"]
45pub struct PushResult {
46    /// Total number of items dropped.
47    pub items_dropped: u64,
48
49    /// Total number of events represented by the dropped items.
50    pub events_dropped: u64,
51}
52
53impl PushResult {
54    /// Returns `true` if any items were dropped.
55    pub fn had_drops(&self) -> bool {
56        self.items_dropped > 0
57    }
58
59    /// Merges `other` into `Self`.
60    pub fn merge(&mut self, other: Self) {
61        self.items_dropped += other.items_dropped;
62        self.events_dropped += other.events_dropped;
63    }
64
65    /// Tracks a single dropped item.
66    pub fn track_dropped_item(&mut self, event_count: u64) {
67        self.items_dropped += 1;
68        self.events_dropped += event_count;
69    }
70}
71
72/// A queue for storing requests to be retried.
73pub struct RetryQueue<T> {
74    queue_name: String,
75    pending: VecDeque<T>,
76    persisted_pending: Option<PersistedQueue<T>>,
77    total_in_memory_bytes: u64,
78    max_in_memory_bytes: u64,
79}
80
81impl<T> RetryQueue<T>
82where
83    T: Retryable,
84{
85    /// Creates a new `RetryQueue` instance with the given name and maximum size.
86    ///
87    /// The queue will only hold as many entries as can fit within the given maximum size. If the queue is full, the
88    /// oldest entries will be removed (or potentially persisted to disk, see
89    /// [`with_disk_persistence`][Self::with_disk_persistence]) to make room for new entries.
90    pub fn new(queue_name: String, max_in_memory_bytes: u64) -> Self {
91        Self {
92            queue_name,
93            pending: VecDeque::new(),
94            persisted_pending: None,
95            total_in_memory_bytes: 0,
96            max_in_memory_bytes,
97        }
98    }
99
100    /// Configures the queue to persist pending entries to disk.
101    ///
102    /// Disk persistence is used as a fallback to in-memory storage when the queue is full. When attempting to add a new
103    /// entry to the queue, and the queue cannot fit the entry in-memory, in-memory entries will be persisted to disk,
104    /// oldest first.
105    ///
106    /// When reading entries from the queue, in-memory entries are read first, followed by persisted entries. This
107    /// provides priority to the most recent entries added to the queue, but allows for bursting over the configured
108    /// in-memory size limit without having to immediately discard entries.
109    ///
110    /// Files are stored in a subdirectory, with the same name as the given queue name, within the given `root_path`.
111    ///
112    /// # Errors
113    ///
114    /// If there is an error initializing the disk persistence layer, an error is returned.
115    pub async fn with_disk_persistence(
116        mut self, root_path: PathBuf, max_disk_size_bytes: u64, storage_max_disk_ratio: f64,
117        disk_usage_retriever: Arc<dyn DiskUsageRetriever + Send + Sync>,
118    ) -> Result<Self, GenericError> {
119        let queue_root_path = root_path.join(&self.queue_name);
120        let persisted_pending = PersistedQueue::from_root_path(
121            queue_root_path,
122            max_disk_size_bytes,
123            storage_max_disk_ratio,
124            DiskUsageRetrieverWrapper::new(disk_usage_retriever),
125        )
126        .await?;
127        self.persisted_pending = Some(persisted_pending);
128        Ok(self)
129    }
130
131    /// Returns `true` if the queue is empty.
132    ///
133    /// This includes both in-memory and persisted entries.
134    pub fn is_empty(&self) -> bool {
135        self.pending.is_empty() && self.persisted_pending.as_ref().is_none_or(|p| p.is_empty())
136    }
137
138    /// Returns the number of entries in the queue
139    ///
140    /// This includes both in-memory and persisted entries.
141    pub fn len(&self) -> usize {
142        self.pending.len() + self.persisted_pending.as_ref().map_or(0, |p| p.len())
143    }
144
145    /// Enqueues an entry.
146    ///
147    /// If the queue is full and the entry cannot be enqueue in-memory, and disk persistence is enabled, in-memory
148    /// entries will be moved to disk (oldest first) until enough capacity is available to enqueue the new entry
149    /// in-memory.
150    ///
151    /// # Errors
152    ///
153    /// If the entry is too large to fit into the queue, or if there is an error when persisting entries to disk, an
154    /// error is returned.
155    pub async fn push(&mut self, entry: T) -> Result<PushResult, GenericError> {
156        let mut push_result = PushResult::default();
157
158        // Make sure the entry, by itself, isn't too big to ever fit into the queue.
159        let current_entry_size = entry.size_bytes();
160        if current_entry_size > self.max_in_memory_bytes {
161            return Err(generic_error!(
162                "Entry too large to fit into retry queue. ({} > {})",
163                current_entry_size,
164                self.max_in_memory_bytes
165            ));
166        }
167
168        // Make sure we have enough room for this incoming entry, either by persisting older entries to disk or by
169        // simply dropping them.
170        while !self.pending.is_empty() && self.total_in_memory_bytes + current_entry_size > self.max_in_memory_bytes {
171            let oldest_entry = self.pending.pop_front().expect("queue is not empty");
172            let oldest_entry_size = oldest_entry.size_bytes();
173
174            if let Some(persisted_pending) = &mut self.persisted_pending {
175                let persist_result = persisted_pending.push(oldest_entry).await?;
176                push_result.merge(persist_result);
177
178                debug!(entry.len = oldest_entry_size, "Moved in-memory entry to disk.");
179            } else {
180                debug!(
181                    entry.len = oldest_entry_size,
182                    "Dropped in-memory entry to increase available capacity."
183                );
184
185                push_result.track_dropped_item(oldest_entry.event_count());
186            }
187
188            self.total_in_memory_bytes -= oldest_entry_size;
189        }
190
191        self.pending.push_back(entry);
192        self.total_in_memory_bytes += current_entry_size;
193        debug!(entry.len = current_entry_size, "Enqueued in-memory entry.");
194
195        Ok(push_result)
196    }
197
198    /// Consumes an entry.
199    ///
200    /// In-memory entries are consumed first, followed by persisted entries if disk persistence is enabled.
201    ///
202    /// If no entries are available, `None` is returned.
203    ///
204    /// # Errors
205    ///
206    /// If there is an error when consuming an entry from disk, whether due to reading or deserializing the entry, an
207    /// error is returned.
208    pub async fn pop(&mut self) -> Result<Option<T>, GenericError> {
209        // Pull from in-memory first to prioritize the most recent entries.
210        if let Some(entry) = self.pending.pop_front() {
211            self.total_in_memory_bytes -= entry.size_bytes();
212            debug!(entry.len = entry.size_bytes(), "Dequeued in-memory entry.");
213
214            return Ok(Some(entry));
215        }
216
217        // If we have disk persistence enabled, pull from disk next.
218        if let Some(persisted_pending) = &mut self.persisted_pending {
219            if let Some(entry) = persisted_pending.pop().await? {
220                return Ok(Some(entry));
221            }
222        }
223
224        Ok(None)
225    }
226
227    /// Flushes all entries, potentially persisting them to disk.
228    ///
229    /// When disk persistence is configured, this will flush all in-memory entries to disk. Flushing to disk still obeys
230    /// the the normal limiting behavior in terms of maximum on-disk size. When disk persistence is not enabled, all
231    /// in-memory entries will be dropped.
232    ///
233    /// # Errors
234    ///
235    /// If an error occurs while persisting an entry to disk, an error is returned.
236    pub async fn flush(mut self) -> Result<PushResult, GenericError> {
237        let mut push_result = PushResult::default();
238
239        while let Some(entry) = self.pending.pop_front() {
240            let entry_size = entry.size_bytes();
241
242            if let Some(persisted_pending) = &mut self.persisted_pending {
243                let persist_result = persisted_pending.push(entry).await?;
244                push_result.merge(persist_result);
245
246                debug!(entry.len = entry_size, "Flushed in-memory entry to disk.");
247            } else {
248                debug!(entry.len = entry_size, "Dropped in-memory entry during flush.");
249
250                push_result.track_dropped_item(entry.event_count());
251            }
252        }
253
254        Ok(push_result)
255    }
256}
257
258#[cfg(test)]
259mod tests {
260    use std::path::Path;
261
262    use rand::Rng as _;
263    use rand_distr::Alphanumeric;
264    use serde::Deserialize;
265
266    use super::*;
267
268    #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
269    struct FakeData {
270        name: String,
271        value: u32,
272    }
273
274    impl FakeData {
275        fn random() -> Self {
276            Self {
277                name: rand::rng().sample_iter(&Alphanumeric).take(8).map(char::from).collect(),
278                value: rand::rng().random_range(0..100),
279            }
280        }
281    }
282
283    impl EventContainer for FakeData {
284        fn event_count(&self) -> u64 {
285            1
286        }
287    }
288
289    impl Retryable for FakeData {
290        fn size_bytes(&self) -> u64 {
291            (self.name.len() + std::mem::size_of::<String>() + 4) as u64
292        }
293    }
294
295    fn file_count_recursive<P: AsRef<Path>>(path: P) -> u64 {
296        let mut count = 0;
297        let entries = std::fs::read_dir(path).expect("should not fail to read directory");
298        for maybe_entry in entries {
299            let entry = maybe_entry.expect("should not fail to read directory entry");
300            if entry.file_type().expect("should not fail to get file type").is_file() {
301                count += 1;
302            } else if entry.file_type().expect("should not fail to get file type").is_dir() {
303                count += file_count_recursive(entry.path());
304            }
305        }
306        count
307    }
308
309    #[tokio::test]
310    async fn basic_push_pop() {
311        let data = FakeData::random();
312
313        let mut retry_queue = RetryQueue::<FakeData>::new("test".to_string(), 1024);
314
315        // Push our data to the queue.
316        let push_result = retry_queue
317            .push(data.clone())
318            .await
319            .expect("should not fail to push data");
320        assert_eq!(0, push_result.items_dropped);
321        assert_eq!(0, push_result.events_dropped);
322
323        // Now pop the data back out and ensure it matches what we pushed, and that the file has been removed from disk.
324        let actual = retry_queue
325            .pop()
326            .await
327            .expect("should not fail to pop data")
328            .expect("should not be empty");
329        assert_eq!(data, actual);
330    }
331
332    #[tokio::test]
333    async fn entry_too_large() {
334        let data = FakeData::random();
335
336        let mut retry_queue = RetryQueue::<FakeData>::new("test".to_string(), 1);
337
338        // Attempt to push our data into the queue, which should fail because it's too large.
339        assert!(retry_queue.push(data).await.is_err());
340    }
341
342    #[tokio::test]
343    async fn remove_oldest_entry_on_push() {
344        let data1 = FakeData::random();
345        let data2 = FakeData::random();
346
347        // Create our retry queue such that it is sized to only fit one entry at a time.
348        let mut retry_queue = RetryQueue::<FakeData>::new("test".to_string(), 36);
349
350        // Push our data to the queue.
351        let push_result = retry_queue.push(data1).await.expect("should not fail to push data");
352        assert_eq!(0, push_result.items_dropped);
353        assert_eq!(0, push_result.events_dropped);
354
355        // Push a second data entry, which should cause the first entry to be removed.
356        let push_result = retry_queue
357            .push(data2.clone())
358            .await
359            .expect("should not fail to push data");
360        assert_eq!(1, push_result.items_dropped);
361        assert_eq!(1, push_result.events_dropped);
362
363        // Now pop the data back out and ensure it matches the second item we pushed, indicating the first item was
364        // removed from the queue to make room.
365        let actual = retry_queue
366            .pop()
367            .await
368            .expect("should not fail to pop data")
369            .expect("should not be empty");
370        assert_eq!(data2, actual);
371    }
372
373    #[tokio::test]
374    async fn flush_no_disk() {
375        let data1 = FakeData::random();
376        let data2 = FakeData::random();
377
378        // Create our retry queue such that it can hold both items.
379        let mut retry_queue = RetryQueue::<FakeData>::new("test".to_string(), u64::MAX);
380
381        // Push our data to the queue.
382        let push_result1 = retry_queue.push(data1).await.expect("should not fail to push data");
383        assert_eq!(0, push_result1.items_dropped);
384        assert_eq!(0, push_result1.events_dropped);
385        let push_result2 = retry_queue.push(data2).await.expect("should not fail to push data");
386        assert_eq!(0, push_result2.items_dropped);
387        assert_eq!(0, push_result2.events_dropped);
388
389        // Flush the queue, which should drop all entries as we have no disk persistence layer configured.
390        let flush_result = retry_queue.flush().await.expect("should not fail to flush");
391        assert_eq!(2, flush_result.items_dropped);
392        assert_eq!(2, flush_result.events_dropped);
393    }
394
395    #[tokio::test]
396    async fn flush_disk() {
397        let data1 = FakeData::random();
398        let data2 = FakeData::random();
399
400        // Create our retry queue such that it can hold both items, and enable disk persistence.
401        let temp_dir = tempfile::tempdir().expect("should not fail to create temporary directory");
402        let root_path = temp_dir.path().to_path_buf();
403
404        // Just a sanity check to ensure our temp directory is empty.
405        assert_eq!(0, file_count_recursive(&root_path));
406
407        let mut retry_queue = RetryQueue::<FakeData>::new("test".to_string(), u64::MAX)
408            .with_disk_persistence(
409                root_path.clone(),
410                u64::MAX,
411                1.0,
412                Arc::new(DiskUsageRetrieverImpl::new(root_path.clone())),
413            )
414            .await
415            .expect("should not fail to create retry queue with disk persistence");
416
417        // Push our data to the queue.
418        let push_result1 = retry_queue.push(data1).await.expect("should not fail to push data");
419        assert_eq!(0, push_result1.items_dropped);
420        assert_eq!(0, push_result1.events_dropped);
421        let push_result2 = retry_queue.push(data2).await.expect("should not fail to push data");
422        assert_eq!(0, push_result2.items_dropped);
423        assert_eq!(0, push_result2.events_dropped);
424
425        // Flush the queue, which should push all entries to disk.
426        let flush_result = retry_queue.flush().await.expect("should not fail to flush");
427        assert_eq!(0, flush_result.items_dropped);
428        assert_eq!(0, flush_result.events_dropped);
429
430        // We should now have two files on disk after flushing.
431        assert_eq!(2, file_count_recursive(&root_path));
432    }
433}