saluki_io/net/util/retry/queue/
mod.rs

1use std::{collections::VecDeque, path::PathBuf, sync::Arc};
2
3use saluki_error::{generic_error, GenericError};
4use serde::{de::DeserializeOwned, Serialize};
5use tracing::debug;
6
7mod persisted;
8pub use self::persisted::DiskUsageRetrieverImpl;
9use self::persisted::{DiskUsageRetriever, DiskUsageRetrieverWrapper, PersistedQueue};
10
11/// A container that holds events.
12///
13/// This trait is used as an incredibly generic way to expose the number of events within a "container", which we
14/// loosely define to be anything that is holding events in some form. This is primarily used to track the number of
15/// events dropped by `RetryQueue` (and `PersistedQueue`) when entries have to be dropped due to size limits.
16pub trait EventContainer {
17    /// Returns the number of events represented by this container.
18    fn event_count(&self) -> u64;
19}
20
21/// A value that can be retried.
22pub trait Retryable: EventContainer + DeserializeOwned + Serialize {
23    /// Returns the in-memory size of this value, in bytes.
24    fn size_bytes(&self) -> u64;
25}
26
27impl EventContainer for String {
28    fn event_count(&self) -> u64 {
29        1
30    }
31}
32
33impl Retryable for String {
34    fn size_bytes(&self) -> u64 {
35        self.len() as u64
36    }
37}
38
39/// Result of a push operation.
40///
41/// As pushing items to `RetryQueue` may result in dropping older items to make room for new ones, this struct tracks
42/// the total number of items dropped, and the number of events represented by those items.
43#[derive(Default)]
44#[must_use = "`PushResult` carries information about potentially dropped items/events and should not be ignored"]
45pub struct PushResult {
46    /// Total number of items dropped.
47    pub items_dropped: u64,
48
49    /// Total number of events represented by the dropped items.
50    pub events_dropped: u64,
51}
52
53impl PushResult {
54    /// Returns `true` if any items were dropped.
55    pub fn had_drops(&self) -> bool {
56        self.items_dropped > 0
57    }
58
59    /// Merges `other` into `Self`.
60    pub fn merge(&mut self, other: Self) {
61        self.items_dropped += other.items_dropped;
62        self.events_dropped += other.events_dropped;
63    }
64
65    /// Tracks a single dropped item.
66    pub fn track_dropped_item(&mut self, event_count: u64) {
67        self.items_dropped += 1;
68        self.events_dropped += event_count;
69    }
70}
71
72/// A queue for storing requests to be retried.
73pub struct RetryQueue<T> {
74    queue_name: String,
75    pending: VecDeque<T>,
76    persisted_pending: Option<PersistedQueue<T>>,
77    total_in_memory_bytes: u64,
78    max_in_memory_bytes: u64,
79}
80
81impl<T> RetryQueue<T>
82where
83    T: Retryable,
84{
85    /// Creates a new `RetryQueue` instance with the given name and maximum size.
86    ///
87    /// The queue will only hold as many entries as can fit within the given maximum size. If the queue is full, the
88    /// oldest entries will be removed (or potentially persisted to disk, see
89    /// [`with_disk_persistence`][Self::with_disk_persistence]) to make room for new entries.
90    pub fn new(queue_name: String, max_in_memory_bytes: u64) -> Self {
91        Self {
92            queue_name,
93            pending: VecDeque::new(),
94            persisted_pending: None,
95            total_in_memory_bytes: 0,
96            max_in_memory_bytes,
97        }
98    }
99
100    /// Configures the queue to persist pending entries to disk.
101    ///
102    /// Disk persistence is used as a fallback to in-memory storage when the queue is full. When attempting to add a new
103    /// entry to the queue, and the queue cannot fit the entry in-memory, in-memory entries will be persisted to disk,
104    /// oldest first.
105    ///
106    /// When reading entries from the queue, in-memory entries are read first, followed by persisted entries. This
107    /// provides priority to the most recent entries added to the queue, but allows for bursting over the configured
108    /// in-memory size limit without having to immediately discard entries.
109    ///
110    /// Files are stored in a subdirectory, with the same name as the given queue name, within the given `root_path`.
111    ///
112    /// # Errors
113    ///
114    /// If there is an error initializing the disk persistence layer, an error is returned.
115    pub async fn with_disk_persistence(
116        mut self, root_path: PathBuf, max_disk_size_bytes: u64, storage_max_disk_ratio: f64,
117        disk_usage_retriever: Arc<dyn DiskUsageRetriever + Send + Sync>,
118    ) -> Result<Self, GenericError> {
119        // Make sure the root storage path is non-empty, as otherwise we can't generate a valid path
120        // for the persisted entries in this retry queue.
121        if root_path.as_os_str().is_empty() {
122            return Err(generic_error!("Storage path cannot be empty."));
123        }
124
125        let queue_root_path = root_path.join(&self.queue_name);
126        let persisted_pending = PersistedQueue::from_root_path(
127            queue_root_path,
128            max_disk_size_bytes,
129            storage_max_disk_ratio,
130            DiskUsageRetrieverWrapper::new(disk_usage_retriever),
131        )
132        .await?;
133        self.persisted_pending = Some(persisted_pending);
134        Ok(self)
135    }
136
137    /// Returns `true` if the queue is empty.
138    ///
139    /// This includes both in-memory and persisted entries.
140    pub fn is_empty(&self) -> bool {
141        self.pending.is_empty() && self.persisted_pending.as_ref().is_none_or(|p| p.is_empty())
142    }
143
144    /// Returns the number of entries in the queue
145    ///
146    /// This includes both in-memory and persisted entries.
147    pub fn len(&self) -> usize {
148        self.pending.len() + self.persisted_pending.as_ref().map_or(0, |p| p.len())
149    }
150
151    /// Enqueues an entry.
152    ///
153    /// If the queue is full and the entry cannot be enqueue in-memory, and disk persistence is enabled, in-memory
154    /// entries will be moved to disk (oldest first) until enough capacity is available to enqueue the new entry
155    /// in-memory.
156    ///
157    /// # Errors
158    ///
159    /// If the entry is too large to fit into the queue, or if there is an error when persisting entries to disk, an
160    /// error is returned.
161    pub async fn push(&mut self, entry: T) -> Result<PushResult, GenericError> {
162        let mut push_result = PushResult::default();
163
164        // Make sure the entry, by itself, isn't too big to ever fit into the queue.
165        let current_entry_size = entry.size_bytes();
166        if current_entry_size > self.max_in_memory_bytes {
167            return Err(generic_error!(
168                "Entry too large to fit into retry queue. ({} > {})",
169                current_entry_size,
170                self.max_in_memory_bytes
171            ));
172        }
173
174        // Make sure we have enough room for this incoming entry, either by persisting older entries to disk or by
175        // simply dropping them.
176        while !self.pending.is_empty() && self.total_in_memory_bytes + current_entry_size > self.max_in_memory_bytes {
177            let oldest_entry = self.pending.pop_front().expect("queue is not empty");
178            let oldest_entry_size = oldest_entry.size_bytes();
179
180            if let Some(persisted_pending) = &mut self.persisted_pending {
181                let persist_result = persisted_pending.push(oldest_entry).await?;
182                push_result.merge(persist_result);
183
184                debug!(entry.len = oldest_entry_size, "Moved in-memory entry to disk.");
185            } else {
186                debug!(
187                    entry.len = oldest_entry_size,
188                    "Dropped in-memory entry to increase available capacity."
189                );
190
191                push_result.track_dropped_item(oldest_entry.event_count());
192            }
193
194            self.total_in_memory_bytes -= oldest_entry_size;
195        }
196
197        self.pending.push_back(entry);
198        self.total_in_memory_bytes += current_entry_size;
199        debug!(entry.len = current_entry_size, "Enqueued in-memory entry.");
200
201        Ok(push_result)
202    }
203
204    /// Consumes an entry.
205    ///
206    /// In-memory entries are consumed first, followed by persisted entries if disk persistence is enabled.
207    ///
208    /// If no entries are available, `None` is returned.
209    ///
210    /// # Errors
211    ///
212    /// If there is an error when consuming an entry from disk, whether due to reading or deserializing the entry, an
213    /// error is returned.
214    pub async fn pop(&mut self) -> Result<Option<T>, GenericError> {
215        // Pull from in-memory first to prioritize the most recent entries.
216        if let Some(entry) = self.pending.pop_front() {
217            self.total_in_memory_bytes -= entry.size_bytes();
218            debug!(entry.len = entry.size_bytes(), "Dequeued in-memory entry.");
219
220            return Ok(Some(entry));
221        }
222
223        // If we have disk persistence enabled, pull from disk next.
224        if let Some(persisted_pending) = &mut self.persisted_pending {
225            if let Some(entry) = persisted_pending.pop().await? {
226                return Ok(Some(entry));
227            }
228        }
229
230        Ok(None)
231    }
232
233    /// Flushes all entries, potentially persisting them to disk.
234    ///
235    /// When disk persistence is configured, this will flush all in-memory entries to disk. Flushing to disk still obeys
236    /// the the normal limiting behavior in terms of maximum on-disk size. When disk persistence is not enabled, all
237    /// in-memory entries will be dropped.
238    ///
239    /// # Errors
240    ///
241    /// If an error occurs while persisting an entry to disk, an error is returned.
242    pub async fn flush(mut self) -> Result<PushResult, GenericError> {
243        let mut push_result = PushResult::default();
244
245        while let Some(entry) = self.pending.pop_front() {
246            let entry_size = entry.size_bytes();
247
248            if let Some(persisted_pending) = &mut self.persisted_pending {
249                let persist_result = persisted_pending.push(entry).await?;
250                push_result.merge(persist_result);
251
252                debug!(entry.len = entry_size, "Flushed in-memory entry to disk.");
253            } else {
254                debug!(entry.len = entry_size, "Dropped in-memory entry during flush.");
255
256                push_result.track_dropped_item(entry.event_count());
257            }
258        }
259
260        Ok(push_result)
261    }
262}
263
264#[cfg(test)]
265mod tests {
266    use std::path::Path;
267
268    use rand::Rng as _;
269    use rand_distr::Alphanumeric;
270    use serde::Deserialize;
271
272    use super::*;
273
274    #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
275    struct FakeData {
276        name: String,
277        value: u32,
278    }
279
280    impl FakeData {
281        fn random() -> Self {
282            Self {
283                name: rand::rng().sample_iter(&Alphanumeric).take(8).map(char::from).collect(),
284                value: rand::rng().random_range(0..100),
285            }
286        }
287    }
288
289    impl EventContainer for FakeData {
290        fn event_count(&self) -> u64 {
291            1
292        }
293    }
294
295    impl Retryable for FakeData {
296        fn size_bytes(&self) -> u64 {
297            (self.name.len() + std::mem::size_of::<String>() + 4) as u64
298        }
299    }
300
301    fn file_count_recursive<P: AsRef<Path>>(path: P) -> u64 {
302        let mut count = 0;
303        let entries = std::fs::read_dir(path).expect("should not fail to read directory");
304        for maybe_entry in entries {
305            let entry = maybe_entry.expect("should not fail to read directory entry");
306            if entry.file_type().expect("should not fail to get file type").is_file() {
307                count += 1;
308            } else if entry.file_type().expect("should not fail to get file type").is_dir() {
309                count += file_count_recursive(entry.path());
310            }
311        }
312        count
313    }
314
315    #[tokio::test]
316    async fn basic_push_pop() {
317        let data = FakeData::random();
318
319        let mut retry_queue = RetryQueue::<FakeData>::new("test".to_string(), 1024);
320
321        // Push our data to the queue.
322        let push_result = retry_queue
323            .push(data.clone())
324            .await
325            .expect("should not fail to push data");
326        assert_eq!(0, push_result.items_dropped);
327        assert_eq!(0, push_result.events_dropped);
328
329        // Now pop the data back out and ensure it matches what we pushed, and that the file has been removed from disk.
330        let actual = retry_queue
331            .pop()
332            .await
333            .expect("should not fail to pop data")
334            .expect("should not be empty");
335        assert_eq!(data, actual);
336    }
337
338    #[tokio::test]
339    async fn entry_too_large() {
340        let data = FakeData::random();
341
342        let mut retry_queue = RetryQueue::<FakeData>::new("test".to_string(), 1);
343
344        // Attempt to push our data into the queue, which should fail because it's too large.
345        assert!(retry_queue.push(data).await.is_err());
346    }
347
348    #[tokio::test]
349    async fn remove_oldest_entry_on_push() {
350        let data1 = FakeData::random();
351        let data2 = FakeData::random();
352
353        // Create our retry queue such that it is sized to only fit one entry at a time.
354        let mut retry_queue = RetryQueue::<FakeData>::new("test".to_string(), 36);
355
356        // Push our data to the queue.
357        let push_result = retry_queue.push(data1).await.expect("should not fail to push data");
358        assert_eq!(0, push_result.items_dropped);
359        assert_eq!(0, push_result.events_dropped);
360
361        // Push a second data entry, which should cause the first entry to be removed.
362        let push_result = retry_queue
363            .push(data2.clone())
364            .await
365            .expect("should not fail to push data");
366        assert_eq!(1, push_result.items_dropped);
367        assert_eq!(1, push_result.events_dropped);
368
369        // Now pop the data back out and ensure it matches the second item we pushed, indicating the first item was
370        // removed from the queue to make room.
371        let actual = retry_queue
372            .pop()
373            .await
374            .expect("should not fail to pop data")
375            .expect("should not be empty");
376        assert_eq!(data2, actual);
377    }
378
379    #[tokio::test]
380    async fn flush_no_disk() {
381        let data1 = FakeData::random();
382        let data2 = FakeData::random();
383
384        // Create our retry queue such that it can hold both items.
385        let mut retry_queue = RetryQueue::<FakeData>::new("test".to_string(), u64::MAX);
386
387        // Push our data to the queue.
388        let push_result1 = retry_queue.push(data1).await.expect("should not fail to push data");
389        assert_eq!(0, push_result1.items_dropped);
390        assert_eq!(0, push_result1.events_dropped);
391        let push_result2 = retry_queue.push(data2).await.expect("should not fail to push data");
392        assert_eq!(0, push_result2.items_dropped);
393        assert_eq!(0, push_result2.events_dropped);
394
395        // Flush the queue, which should drop all entries as we have no disk persistence layer configured.
396        let flush_result = retry_queue.flush().await.expect("should not fail to flush");
397        assert_eq!(2, flush_result.items_dropped);
398        assert_eq!(2, flush_result.events_dropped);
399    }
400
401    #[tokio::test]
402    async fn flush_disk() {
403        let data1 = FakeData::random();
404        let data2 = FakeData::random();
405
406        // Create our retry queue such that it can hold both items, and enable disk persistence.
407        let temp_dir = tempfile::tempdir().expect("should not fail to create temporary directory");
408        let root_path = temp_dir.path().to_path_buf();
409
410        // Just a sanity check to ensure our temp directory is empty.
411        assert_eq!(0, file_count_recursive(&root_path));
412
413        let mut retry_queue = RetryQueue::<FakeData>::new("test".to_string(), u64::MAX)
414            .with_disk_persistence(
415                root_path.clone(),
416                u64::MAX,
417                1.0,
418                Arc::new(DiskUsageRetrieverImpl::new(root_path.clone())),
419            )
420            .await
421            .expect("should not fail to create retry queue with disk persistence");
422
423        // Push our data to the queue.
424        let push_result1 = retry_queue.push(data1).await.expect("should not fail to push data");
425        assert_eq!(0, push_result1.items_dropped);
426        assert_eq!(0, push_result1.events_dropped);
427        let push_result2 = retry_queue.push(data2).await.expect("should not fail to push data");
428        assert_eq!(0, push_result2.items_dropped);
429        assert_eq!(0, push_result2.events_dropped);
430
431        // Flush the queue, which should push all entries to disk.
432        let flush_result = retry_queue.flush().await.expect("should not fail to flush");
433        assert_eq!(0, flush_result.items_dropped);
434        assert_eq!(0, flush_result.events_dropped);
435
436        // We should now have two files on disk after flushing.
437        assert_eq!(2, file_count_recursive(&root_path));
438    }
439}