Skip to main content

saluki_io/net/util/retry/queue/
mod.rs

1use std::{collections::VecDeque, path::PathBuf, sync::Arc};
2
3use saluki_error::{generic_error, GenericError};
4use serde::{de::DeserializeOwned, Serialize};
5use tracing::debug;
6
7mod persisted;
8pub use self::persisted::DiskUsageRetrieverImpl;
9use self::persisted::{DiskUsageRetriever, DiskUsageRetrieverWrapper, PersistedQueue};
10
11/// A container that holds events.
12///
13/// This trait is used as an incredibly generic way to expose the number of events within a "container," which we
14/// loosely define to be anything that's holding events in some form. This is primarily used to track the number of
15/// events dropped by `RetryQueue` (and `PersistedQueue`) when entries have to be dropped due to size limits.
16pub trait EventContainer {
17    /// Returns the number of events represented by this container.
18    fn event_count(&self) -> u64;
19
20    /// Returns the number of metric data points represented by this container.
21    fn data_point_count(&self) -> u64 {
22        0
23    }
24}
25
26/// A value that can be retried.
27pub trait Retryable: EventContainer + DeserializeOwned + Serialize {
28    /// Returns the in-memory size of this value, in bytes.
29    fn size_bytes(&self) -> u64;
30}
31
32impl EventContainer for String {
33    fn event_count(&self) -> u64 {
34        1
35    }
36}
37
38impl Retryable for String {
39    fn size_bytes(&self) -> u64 {
40        self.len() as u64
41    }
42}
43
44/// Result of a push operation.
45///
46/// As pushing items to `RetryQueue` may result in dropping older items to make room for new ones, this struct tracks
47/// the total number of items dropped, and the number of events represented by those items.
48#[derive(Default)]
49#[must_use = "`PushResult` carries information about potentially dropped items/events and should not be ignored"]
50pub struct PushResult {
51    /// Total number of items dropped.
52    pub items_dropped: u64,
53
54    /// Total number of events represented by the dropped items.
55    pub events_dropped: u64,
56
57    /// Total number of metric data points represented by the dropped items.
58    pub data_points_dropped: u64,
59}
60
61impl PushResult {
62    /// Returns `true` if any items were dropped.
63    pub fn had_drops(&self) -> bool {
64        self.items_dropped > 0
65    }
66
67    /// Merges `other` into `Self`.
68    pub fn merge(&mut self, other: Self) {
69        self.items_dropped += other.items_dropped;
70        self.events_dropped += other.events_dropped;
71        self.data_points_dropped += other.data_points_dropped;
72    }
73
74    /// Tracks a single dropped item.
75    pub fn track_dropped_item(&mut self, item: &dyn EventContainer) {
76        self.items_dropped += 1;
77        self.events_dropped += item.event_count();
78        self.data_points_dropped += item.data_point_count();
79    }
80}
81
82/// A queue for storing requests to be retried.
83pub struct RetryQueue<T> {
84    queue_name: String,
85    pending: VecDeque<T>,
86    persisted_pending: Option<PersistedQueue<T>>,
87    total_in_memory_bytes: u64,
88    max_in_memory_bytes: u64,
89}
90
91impl<T> RetryQueue<T>
92where
93    T: Retryable,
94{
95    /// Creates a new `RetryQueue` instance with the given name and maximum size.
96    ///
97    /// The queue will only hold as many entries as can fit within the given maximum size. If the queue is full, the
98    /// oldest entries will be removed (or potentially persisted to disk, see
99    /// [`with_disk_persistence`][Self::with_disk_persistence]) to make room for new entries.
100    pub fn new(queue_name: String, max_in_memory_bytes: u64) -> Self {
101        Self {
102            queue_name,
103            pending: VecDeque::new(),
104            persisted_pending: None,
105            total_in_memory_bytes: 0,
106            max_in_memory_bytes,
107        }
108    }
109
110    /// Configures the queue to persist pending entries to disk.
111    ///
112    /// Disk persistence is used as a fallback to in-memory storage when the queue is full. When attempting to add a new
113    /// entry to the queue, and the queue can't fit the entry in-memory, in-memory entries will be persisted to disk,
114    /// oldest first.
115    ///
116    /// When reading entries from the queue, in-memory entries are read first, followed by persisted entries. This
117    /// provides priority to the most recent entries added to the queue, but allows for bursting over the configured
118    /// in-memory size limit without having to immediately discard entries.
119    ///
120    /// Files are stored in a subdirectory, with the same name as the given queue name, within the given `root_path`.
121    ///
122    /// # Errors
123    ///
124    /// If there is an error initializing the disk persistence layer, an error is returned.
125    pub async fn with_disk_persistence(
126        mut self, root_path: PathBuf, max_disk_size_bytes: u64, storage_max_disk_ratio: f64,
127        disk_usage_retriever: Arc<dyn DiskUsageRetriever + Send + Sync>,
128    ) -> Result<Self, GenericError> {
129        // Make sure the root storage path is non-empty, as otherwise we can't generate a valid path
130        // for the persisted entries in this retry queue.
131        if root_path.as_os_str().is_empty() {
132            return Err(generic_error!("Storage path cannot be empty."));
133        }
134
135        let queue_root_path = root_path.join(&self.queue_name);
136        let persisted_pending = PersistedQueue::from_root_path(
137            queue_root_path,
138            max_disk_size_bytes,
139            storage_max_disk_ratio,
140            DiskUsageRetrieverWrapper::new(disk_usage_retriever),
141        )
142        .await?;
143        self.persisted_pending = Some(persisted_pending);
144        Ok(self)
145    }
146
147    /// Returns `true` if the queue is empty.
148    ///
149    /// This includes both in-memory and persisted entries.
150    pub fn is_empty(&self) -> bool {
151        self.pending.is_empty() && self.persisted_pending.as_ref().is_none_or(|p| p.is_empty())
152    }
153
154    /// Returns the number of entries in the queue
155    ///
156    /// This includes both in-memory and persisted entries.
157    pub fn len(&self) -> usize {
158        self.pending.len() + self.persisted_pending.as_ref().map_or(0, |p| p.len())
159    }
160
161    /// Returns the number of persisted entries that have been permanently dropped due to errors since the last call
162    /// to this method, resetting the counter.
163    ///
164    /// Always returns 0 if disk persistence isn't enabled.
165    pub fn take_persisted_entries_dropped(&mut self) -> u64 {
166        self.persisted_pending.as_mut().map_or(0, |p| p.take_entries_dropped())
167    }
168
169    /// Enqueues an entry.
170    ///
171    /// If the queue is full and the entry can't be enqueue in-memory, and disk persistence is enabled, in-memory
172    /// entries will be moved to disk (oldest first) until enough capacity is available to enqueue the new entry
173    /// in-memory.
174    ///
175    /// # Errors
176    ///
177    /// If the entry is too large to fit into the queue, or if there is an error when persisting entries to disk, an
178    /// error is returned.
179    pub async fn push(&mut self, entry: T) -> Result<PushResult, GenericError> {
180        let mut push_result = PushResult::default();
181
182        // Make sure the entry, by itself, isn't too big to ever fit into the queue.
183        let current_entry_size = entry.size_bytes();
184        if current_entry_size > self.max_in_memory_bytes {
185            return Err(generic_error!(
186                "Entry too large to fit into retry queue. ({} > {})",
187                current_entry_size,
188                self.max_in_memory_bytes
189            ));
190        }
191
192        // Make sure we have enough room for this incoming entry, either by persisting older entries to disk or by
193        // simply dropping them.
194        while !self.pending.is_empty() && self.total_in_memory_bytes + current_entry_size > self.max_in_memory_bytes {
195            let oldest_entry = self.pending.pop_front().expect("queue is not empty");
196            let oldest_entry_size = oldest_entry.size_bytes();
197
198            if let Some(persisted_pending) = &mut self.persisted_pending {
199                let persist_result = persisted_pending.push(oldest_entry).await?;
200                push_result.merge(persist_result);
201
202                debug!(entry.len = oldest_entry_size, "Moved in-memory entry to disk.");
203            } else {
204                debug!(
205                    entry.len = oldest_entry_size,
206                    "Dropped in-memory entry to increase available capacity."
207                );
208
209                push_result.track_dropped_item(&oldest_entry);
210            }
211
212            self.total_in_memory_bytes -= oldest_entry_size;
213        }
214
215        self.pending.push_back(entry);
216        self.total_in_memory_bytes += current_entry_size;
217        debug!(entry.len = current_entry_size, "Enqueued in-memory entry.");
218
219        Ok(push_result)
220    }
221
222    /// Consumes an entry.
223    ///
224    /// In-memory entries are consumed first, followed by persisted entries if disk persistence is enabled.
225    ///
226    /// If no entries are available, `None` is returned.
227    ///
228    /// # Errors
229    ///
230    /// If there is an error when consuming an entry from disk, whether due to reading or deserializing the entry, an
231    /// error is returned.
232    pub async fn pop(&mut self) -> Result<Option<T>, GenericError> {
233        // Pull from in-memory first to prioritize the most recent entries.
234        if let Some(entry) = self.pending.pop_front() {
235            self.total_in_memory_bytes -= entry.size_bytes();
236            debug!(entry.len = entry.size_bytes(), "Dequeued in-memory entry.");
237
238            return Ok(Some(entry));
239        }
240
241        // If we have disk persistence enabled, pull from disk next.
242        if let Some(persisted_pending) = &mut self.persisted_pending {
243            if let Some(entry) = persisted_pending.pop().await? {
244                return Ok(Some(entry));
245            }
246        }
247
248        Ok(None)
249    }
250
251    /// Flushes all entries, potentially persisting them to disk.
252    ///
253    /// When disk persistence is configured, this will flush all in-memory entries to disk. Flushing to disk still obeys
254    /// the normal limiting behavior in terms of maximum on-disk size. When disk persistence isn't enabled, all
255    /// in-memory entries will be dropped.
256    ///
257    /// # Errors
258    ///
259    /// If an error occurs while persisting an entry to disk, an error is returned.
260    pub async fn flush(mut self) -> Result<PushResult, GenericError> {
261        let mut push_result = PushResult::default();
262
263        while let Some(entry) = self.pending.pop_front() {
264            let entry_size = entry.size_bytes();
265
266            if let Some(persisted_pending) = &mut self.persisted_pending {
267                let persist_result = persisted_pending.push(entry).await?;
268                push_result.merge(persist_result);
269
270                debug!(entry.len = entry_size, "Flushed in-memory entry to disk.");
271            } else {
272                debug!(entry.len = entry_size, "Dropped in-memory entry during flush.");
273
274                push_result.track_dropped_item(&entry);
275            }
276        }
277
278        Ok(push_result)
279    }
280}
281
282#[cfg(test)]
283mod tests {
284    use std::path::Path;
285
286    use rand::RngExt as _;
287    use rand_distr::Alphanumeric;
288    use serde::Deserialize;
289
290    use super::*;
291
292    #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
293    struct FakeData {
294        name: String,
295        value: u32,
296    }
297
298    impl FakeData {
299        fn random() -> Self {
300            Self {
301                name: rand::rng().sample_iter(&Alphanumeric).take(8).map(char::from).collect(),
302                value: rand::rng().random_range(0..100),
303            }
304        }
305    }
306
307    impl EventContainer for FakeData {
308        fn event_count(&self) -> u64 {
309            1
310        }
311    }
312
313    impl Retryable for FakeData {
314        fn size_bytes(&self) -> u64 {
315            (self.name.len() + std::mem::size_of::<String>() + 4) as u64
316        }
317    }
318
319    fn file_count_recursive<P: AsRef<Path>>(path: P) -> u64 {
320        let mut count = 0;
321        let entries = std::fs::read_dir(path).expect("should not fail to read directory");
322        for maybe_entry in entries {
323            let entry = maybe_entry.expect("should not fail to read directory entry");
324            if entry.file_type().expect("should not fail to get file type").is_file() {
325                count += 1;
326            } else if entry.file_type().expect("should not fail to get file type").is_dir() {
327                count += file_count_recursive(entry.path());
328            }
329        }
330        count
331    }
332
333    #[tokio::test]
334    async fn basic_push_pop() {
335        let data = FakeData::random();
336
337        let mut retry_queue = RetryQueue::<FakeData>::new("test".to_string(), 1024);
338
339        // Push our data to the queue.
340        let push_result = retry_queue
341            .push(data.clone())
342            .await
343            .expect("should not fail to push data");
344        assert_eq!(0, push_result.items_dropped);
345        assert_eq!(0, push_result.events_dropped);
346
347        // Now pop the data back out and ensure it matches what we pushed, and that the file has been removed from disk.
348        let actual = retry_queue
349            .pop()
350            .await
351            .expect("should not fail to pop data")
352            .expect("should not be empty");
353        assert_eq!(data, actual);
354    }
355
356    #[tokio::test]
357    async fn entry_too_large() {
358        let data = FakeData::random();
359
360        let mut retry_queue = RetryQueue::<FakeData>::new("test".to_string(), 1);
361
362        // Attempt to push our data into the queue, which should fail because it's too large.
363        assert!(retry_queue.push(data).await.is_err());
364    }
365
366    #[tokio::test]
367    async fn remove_oldest_entry_on_push() {
368        let data1 = FakeData::random();
369        let data2 = FakeData::random();
370
371        // Create our retry queue such that it is sized to only fit one entry at a time.
372        let mut retry_queue = RetryQueue::<FakeData>::new("test".to_string(), 36);
373
374        // Push our data to the queue.
375        let push_result = retry_queue.push(data1).await.expect("should not fail to push data");
376        assert_eq!(0, push_result.items_dropped);
377        assert_eq!(0, push_result.events_dropped);
378
379        // Push a second data entry, which should cause the first entry to be removed.
380        let push_result = retry_queue
381            .push(data2.clone())
382            .await
383            .expect("should not fail to push data");
384        assert_eq!(1, push_result.items_dropped);
385        assert_eq!(1, push_result.events_dropped);
386
387        // Now pop the data back out and ensure it matches the second item we pushed, indicating the first item was
388        // removed from the queue to make room.
389        let actual = retry_queue
390            .pop()
391            .await
392            .expect("should not fail to pop data")
393            .expect("should not be empty");
394        assert_eq!(data2, actual);
395    }
396
397    #[tokio::test]
398    async fn flush_no_disk() {
399        let data1 = FakeData::random();
400        let data2 = FakeData::random();
401
402        // Create our retry queue such that it can hold both items.
403        let mut retry_queue = RetryQueue::<FakeData>::new("test".to_string(), u64::MAX);
404
405        // Push our data to the queue.
406        let push_result1 = retry_queue.push(data1).await.expect("should not fail to push data");
407        assert_eq!(0, push_result1.items_dropped);
408        assert_eq!(0, push_result1.events_dropped);
409        let push_result2 = retry_queue.push(data2).await.expect("should not fail to push data");
410        assert_eq!(0, push_result2.items_dropped);
411        assert_eq!(0, push_result2.events_dropped);
412
413        // Flush the queue, which should drop all entries as we have no disk persistence layer configured.
414        let flush_result = retry_queue.flush().await.expect("should not fail to flush");
415        assert_eq!(2, flush_result.items_dropped);
416        assert_eq!(2, flush_result.events_dropped);
417    }
418
419    #[tokio::test]
420    async fn flush_disk() {
421        let data1 = FakeData::random();
422        let data2 = FakeData::random();
423
424        // Create our retry queue such that it can hold both items, and enable disk persistence.
425        let temp_dir = tempfile::tempdir().expect("should not fail to create temporary directory");
426        let root_path = temp_dir.path().to_path_buf();
427
428        // Just a sanity check to ensure our temp directory is empty.
429        assert_eq!(0, file_count_recursive(&root_path));
430
431        let mut retry_queue = RetryQueue::<FakeData>::new("test".to_string(), u64::MAX)
432            .with_disk_persistence(
433                root_path.clone(),
434                u64::MAX,
435                1.0,
436                Arc::new(DiskUsageRetrieverImpl::new(root_path.clone())),
437            )
438            .await
439            .expect("should not fail to create retry queue with disk persistence");
440
441        // Push our data to the queue.
442        let push_result1 = retry_queue.push(data1).await.expect("should not fail to push data");
443        assert_eq!(0, push_result1.items_dropped);
444        assert_eq!(0, push_result1.events_dropped);
445        let push_result2 = retry_queue.push(data2).await.expect("should not fail to push data");
446        assert_eq!(0, push_result2.items_dropped);
447        assert_eq!(0, push_result2.events_dropped);
448
449        // Flush the queue, which should push all entries to disk.
450        let flush_result = retry_queue.flush().await.expect("should not fail to flush");
451        assert_eq!(0, flush_result.items_dropped);
452        assert_eq!(0, flush_result.events_dropped);
453
454        // We should now have two files on disk after flushing.
455        assert_eq!(2, file_count_recursive(&root_path));
456    }
457}