Skip to main content

saluki_io/net/util/retry/queue/
mod.rs

1use std::collections::VecDeque;
2
3use saluki_error::{generic_error, GenericError};
4use serde::{de::DeserializeOwned, Serialize};
5use tracing::{debug, info, warn};
6
7mod persisted;
8use self::persisted::PersistedQueue;
9pub use self::persisted::{DiskUsageRetriever, DiskUsageRetrieverImpl, PersistedQueueArgs};
10
11const DEFAULT_FLUSH_TO_DISK_MEM_RATIO: f64 = 0.5;
12
13/// A container that holds events.
14///
15/// This trait is used as an incredibly generic way to expose the number of events within a "container," which we
16/// loosely define to be anything that's holding events in some form. This is primarily used to track the number of
17/// events dropped by `RetryQueue` (and `PersistedQueue`) when entries have to be dropped due to size limits.
18pub trait EventContainer {
19    /// Returns the number of events represented by this container.
20    fn event_count(&self) -> u64;
21
22    /// Returns the number of metric data points represented by this container.
23    fn data_point_count(&self) -> u64 {
24        0
25    }
26}
27
28/// A value that can be retried.
29pub trait Retryable: EventContainer + DeserializeOwned + Serialize {
30    /// Returns the in-memory size of this value, in bytes.
31    fn size_bytes(&self) -> u64;
32}
33
34impl EventContainer for String {
35    fn event_count(&self) -> u64 {
36        1
37    }
38}
39
40impl Retryable for String {
41    fn size_bytes(&self) -> u64 {
42        self.len() as u64
43    }
44}
45
46/// Result of a push operation.
47///
48/// As pushing items to `RetryQueue` may result in dropping older items to make room for new ones, this struct tracks
49/// the total number of items dropped, and the number of events represented by those items.
50#[derive(Default)]
51#[must_use = "`PushResult` carries information about potentially dropped items/events and should not be ignored"]
52pub struct PushResult {
53    /// Total number of items dropped.
54    pub items_dropped: u64,
55
56    /// Total number of events represented by the dropped items.
57    pub events_dropped: u64,
58
59    /// Total number of metric data points represented by the dropped items.
60    pub data_points_dropped: u64,
61}
62
63impl PushResult {
64    /// Returns `true` if any items were dropped.
65    pub fn had_drops(&self) -> bool {
66        self.items_dropped > 0
67    }
68
69    /// Merges `other` into `Self`.
70    pub fn merge(&mut self, other: Self) {
71        self.items_dropped += other.items_dropped;
72        self.events_dropped += other.events_dropped;
73        self.data_points_dropped += other.data_points_dropped;
74    }
75
76    /// Tracks a single dropped item.
77    pub fn track_dropped_item(&mut self, item: &dyn EventContainer) {
78        self.items_dropped += 1;
79        self.events_dropped += item.event_count();
80        self.data_points_dropped += item.data_point_count();
81    }
82}
83
84/// A queue for storing requests to be retried.
85pub struct RetryQueue<T> {
86    queue_name: String,
87    pending: VecDeque<T>,
88    persisted_pending: Option<PersistedQueue<T>>,
89    total_in_memory_bytes: u64,
90    max_in_memory_bytes: u64,
91    flush_to_disk_mem_ratio: f64,
92}
93
94impl<T> RetryQueue<T>
95where
96    T: Retryable,
97{
98    /// Creates a new `RetryQueue` instance with the given name and maximum size.
99    ///
100    /// The queue will only hold as many entries as can fit within the given maximum size. If the queue is full, the
101    /// oldest entries will be removed (or potentially persisted to disk, see
102    /// [`with_disk_persistence`][Self::with_disk_persistence]) to make room for new entries.
103    pub fn new(queue_name: String, max_in_memory_bytes: u64) -> Self {
104        Self {
105            queue_name,
106            pending: VecDeque::new(),
107            persisted_pending: None,
108            total_in_memory_bytes: 0,
109            max_in_memory_bytes,
110            flush_to_disk_mem_ratio: DEFAULT_FLUSH_TO_DISK_MEM_RATIO,
111        }
112    }
113
114    /// Configures the ratio of in-memory queue bytes to flush to disk when the queue is full.
115    ///
116    /// When disk persistence is enabled and the queue does not have enough room for a new entry, this ratio controls how
117    /// much in-memory data is moved to disk. For example, a value of `0.5` moves at least half of
118    /// `max_in_memory_bytes` to disk when the queue overflows. Values less than or equal to zero disable extra batch
119    /// flushing, but entries evicted to make room are still persisted when disk persistence is enabled.
120    pub fn with_flush_to_disk_mem_ratio(mut self, flush_to_disk_mem_ratio: f64) -> Self {
121        self.flush_to_disk_mem_ratio = flush_to_disk_mem_ratio;
122        self
123    }
124
125    /// Configures the queue to persist pending entries to disk.
126    ///
127    /// Disk persistence is used as a fallback to in-memory storage when the queue is full. When attempting to add a new
128    /// entry to the queue, and the queue can't fit the entry in-memory, in-memory entries will be persisted to disk,
129    /// oldest first.
130    ///
131    /// When reading entries from the queue, in-memory entries are read first, followed by persisted entries. This
132    /// provides priority to the most recent entries added to the queue, but allows for bursting over the configured
133    /// in-memory size limit without having to immediately discard entries.
134    ///
135    /// Files are stored in a subdirectory, with the same name as the given queue name, within `args.root_path`.
136    ///
137    /// # Errors
138    ///
139    /// If there is an error initializing the disk persistence layer, an error is returned.
140    pub async fn with_disk_persistence(mut self, mut args: PersistedQueueArgs) -> Result<Self, GenericError> {
141        // Make sure the root storage path is non-empty, as otherwise we can't generate a valid path
142        // for the persisted entries in this retry queue.
143        if args.root_path.as_os_str().is_empty() {
144            return Err(generic_error!("Storage path cannot be empty."));
145        }
146
147        args.root_path = args.root_path.join(&self.queue_name);
148        let mut persisted_pending = PersistedQueue::from_root_path(args).await?;
149        match persisted_pending.remove_stale_files().await {
150            Ok(removed) if removed > 0 => {
151                info!(count = removed, "Removed outdated retry files from disk.");
152            }
153            Ok(_) => {}
154            Err(e) => warn!(error = %e, "Failed to remove stale retry files."),
155        }
156        self.persisted_pending = Some(persisted_pending);
157        Ok(self)
158    }
159
160    /// Returns `true` if the queue is empty.
161    ///
162    /// This includes both in-memory and persisted entries.
163    pub fn is_empty(&self) -> bool {
164        self.pending.is_empty() && self.persisted_pending.as_ref().is_none_or(|p| p.is_empty())
165    }
166
167    /// Returns the number of entries in the queue
168    ///
169    /// This includes both in-memory and persisted entries.
170    pub fn len(&self) -> usize {
171        self.pending.len() + self.persisted_pending.as_ref().map_or(0, |p| p.len())
172    }
173
174    /// Returns the maximum in-memory capacity, in bytes.
175    pub const fn max_in_memory_bytes(&self) -> u64 {
176        self.max_in_memory_bytes
177    }
178
179    /// Returns the available in-memory capacity, in bytes.
180    pub const fn available_in_memory_capacity_bytes(&self) -> u64 {
181        self.max_in_memory_bytes.saturating_sub(self.total_in_memory_bytes)
182    }
183
184    /// Returns the available on-disk capacity, in bytes.
185    ///
186    /// Returns `0` when disk persistence is not enabled.
187    ///
188    /// # Errors
189    ///
190    /// If disk persistence is enabled and there is an error while retrieving the underlying disk capacity, an error is
191    /// returned.
192    pub async fn available_on_disk_capacity_bytes(&self) -> Result<u64, GenericError> {
193        match &self.persisted_pending {
194            Some(persisted_pending) => persisted_pending.available_capacity_bytes().await,
195            None => Ok(0),
196        }
197    }
198
199    /// Returns the number of persisted entries that have been permanently dropped due to errors since the last call
200    /// to this method, resetting the counter.
201    ///
202    /// Always returns 0 if disk persistence isn't enabled.
203    pub fn take_persisted_entries_dropped(&mut self) -> u64 {
204        self.persisted_pending.as_mut().map_or(0, |p| p.take_entries_dropped())
205    }
206
207    /// Enqueues an entry.
208    ///
209    /// If the queue is full and the entry can't be enqueued in-memory, in-memory entries (oldest first) are evicted
210    /// until there is room for the new entry. When disk persistence is enabled, evicted entries are moved to disk. If the
211    /// flush-to-disk ratio is greater than zero, eviction moves at least
212    /// `max_in_memory_bytes * flush_to_disk_mem_ratio` bytes of in-memory data to disk before admitting the new entry. If
213    /// disk persistence is disabled, evicted entries are dropped instead. If an in-memory entry can't be persisted due to
214    /// a disk error, that entry is dropped and counted in the returned `PushResult`; the new entry is still enqueued.
215    ///
216    /// # Errors
217    ///
218    /// If the entry is too large to fit into the queue, an error is returned.
219    pub async fn push(&mut self, entry: T) -> Result<PushResult, GenericError> {
220        let mut push_result = PushResult::default();
221
222        // Make sure the entry, by itself, isn't too big to ever fit into the queue.
223        let current_entry_size = entry.size_bytes();
224        if current_entry_size > self.max_in_memory_bytes {
225            return Err(generic_error!(
226                "Entry too large to fit into retry queue. ({} > {})",
227                current_entry_size,
228                self.max_in_memory_bytes
229            ));
230        }
231
232        // Make sure we have enough room for this incoming entry, either by persisting older entries to disk or by
233        // simply dropping them.
234        let required_bytes = self
235            .total_in_memory_bytes
236            .saturating_add(current_entry_size)
237            .saturating_sub(self.max_in_memory_bytes);
238        let using_disk = self.persisted_pending.is_some();
239        let bytes_to_remove = if using_disk && required_bytes > 0 {
240            required_bytes.max(flush_to_disk_bytes(
241                self.max_in_memory_bytes,
242                self.flush_to_disk_mem_ratio,
243            ))
244        } else {
245            required_bytes
246        };
247        let mut bytes_removed = 0;
248
249        while !self.pending.is_empty() && bytes_removed < bytes_to_remove {
250            let oldest_entry = self.pending.pop_front().expect("queue is not empty");
251            let oldest_entry_size = oldest_entry.size_bytes();
252
253            if using_disk {
254                // Capture the dropped-event counts before moving `oldest_entry` into the persist call, so we can still
255                // record drop telemetry if the disk write fails.
256                let oldest_entry_events = oldest_entry.event_count();
257                let oldest_entry_data_points = oldest_entry.data_point_count();
258                let persisted_pending = self.persisted_pending.as_mut().expect("disk persistence is enabled");
259                match persisted_pending.push(oldest_entry).await {
260                    Ok(persist_result) => {
261                        push_result.merge(persist_result);
262                        debug!(entry.len = oldest_entry_size, "Moved in-memory entry to disk.");
263                    }
264                    Err(e) => {
265                        // Match the upstream Agent: on disk persistence failure, drop this entry and continue evicting
266                        // so the new entry can still be admitted to the queue. Propagating the error here would
267                        // permanently lose the incoming transaction at the caller, which the Agent does not do.
268                        warn!(
269                            error = %e,
270                            entry.len = oldest_entry_size,
271                            "Failed to persist in-memory entry to disk; dropping entry to make room."
272                        );
273                        push_result.items_dropped += 1;
274                        push_result.events_dropped += oldest_entry_events;
275                        push_result.data_points_dropped += oldest_entry_data_points;
276                    }
277                }
278            } else {
279                debug!(
280                    entry.len = oldest_entry_size,
281                    "Dropped in-memory entry to increase available capacity."
282                );
283
284                push_result.track_dropped_item(&oldest_entry);
285            }
286
287            self.total_in_memory_bytes -= oldest_entry_size;
288            bytes_removed += oldest_entry_size;
289        }
290
291        self.pending.push_back(entry);
292        self.total_in_memory_bytes += current_entry_size;
293        debug!(entry.len = current_entry_size, "Enqueued in-memory entry.");
294
295        Ok(push_result)
296    }
297
298    /// Consumes an entry.
299    ///
300    /// In-memory entries are consumed first, followed by persisted entries if disk persistence is enabled.
301    ///
302    /// If no entries are available, `None` is returned.
303    ///
304    /// # Errors
305    ///
306    /// If there is an error when consuming an entry from disk, whether due to reading or deserializing the entry, an
307    /// error is returned.
308    pub async fn pop(&mut self) -> Result<Option<T>, GenericError> {
309        // Pull from in-memory first to prioritize the most recent entries.
310        if let Some(entry) = self.pending.pop_front() {
311            self.total_in_memory_bytes -= entry.size_bytes();
312            debug!(entry.len = entry.size_bytes(), "Dequeued in-memory entry.");
313
314            return Ok(Some(entry));
315        }
316
317        // If we have disk persistence enabled, pull from disk next.
318        if let Some(persisted_pending) = &mut self.persisted_pending {
319            if let Some(entry) = persisted_pending.pop().await? {
320                return Ok(Some(entry));
321            }
322        }
323
324        Ok(None)
325    }
326
327    /// Flushes all entries, potentially persisting them to disk.
328    ///
329    /// When disk persistence is configured, this will flush all in-memory entries to disk. Flushing to disk still obeys
330    /// the normal limiting behavior in terms of maximum on-disk size. When disk persistence isn't enabled, all
331    /// in-memory entries will be dropped.
332    ///
333    /// # Errors
334    ///
335    /// If an error occurs while persisting an entry to disk, an error is returned.
336    pub async fn flush(mut self) -> Result<PushResult, GenericError> {
337        let mut push_result = PushResult::default();
338
339        while let Some(entry) = self.pending.pop_front() {
340            let entry_size = entry.size_bytes();
341
342            if let Some(persisted_pending) = &mut self.persisted_pending {
343                let persist_result = persisted_pending.push(entry).await?;
344                push_result.merge(persist_result);
345
346                debug!(entry.len = entry_size, "Flushed in-memory entry to disk.");
347            } else {
348                debug!(entry.len = entry_size, "Dropped in-memory entry during flush.");
349
350                push_result.track_dropped_item(&entry);
351            }
352        }
353
354        Ok(push_result)
355    }
356}
357
358fn flush_to_disk_bytes(max_in_memory_bytes: u64, flush_to_disk_mem_ratio: f64) -> u64 {
359    if flush_to_disk_mem_ratio <= 0.0 || flush_to_disk_mem_ratio.is_nan() {
360        0
361    } else if flush_to_disk_mem_ratio.is_infinite() {
362        u64::MAX
363    } else {
364        // Truncate toward zero to match the upstream Agent's `int(maxMemSizeInBytes * flushToStorageRatio)` semantics.
365        ((max_in_memory_bytes as f64) * flush_to_disk_mem_ratio) as u64
366    }
367}
368
369#[cfg(test)]
370mod tests {
371    use std::{path::Path, sync::Arc};
372
373    use rand::RngExt as _;
374    use rand_distr::Alphanumeric;
375    use serde::Deserialize;
376
377    use super::*;
378
379    #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
380    struct FakeData {
381        name: String,
382        value: u32,
383    }
384
385    impl FakeData {
386        fn random() -> Self {
387            Self {
388                name: rand::rng().sample_iter(&Alphanumeric).take(8).map(char::from).collect(),
389                value: rand::rng().random_range(0..100),
390            }
391        }
392    }
393
394    impl EventContainer for FakeData {
395        fn event_count(&self) -> u64 {
396            1
397        }
398    }
399
400    impl Retryable for FakeData {
401        fn size_bytes(&self) -> u64 {
402            (self.name.len() + std::mem::size_of::<String>() + 4) as u64
403        }
404    }
405
406    fn file_count_recursive<P: AsRef<Path>>(path: P) -> u64 {
407        let mut count = 0;
408        let entries = std::fs::read_dir(path).expect("should not fail to read directory");
409        for maybe_entry in entries {
410            let entry = maybe_entry.expect("should not fail to read directory entry");
411            if entry.file_type().expect("should not fail to get file type").is_file() {
412                count += 1;
413            } else if entry.file_type().expect("should not fail to get file type").is_dir() {
414                count += file_count_recursive(entry.path());
415            }
416        }
417        count
418    }
419
420    #[tokio::test]
421    async fn basic_push_pop() {
422        let data = FakeData::random();
423
424        let mut retry_queue = RetryQueue::<FakeData>::new("test".to_string(), 1024);
425
426        // Push our data to the queue.
427        let push_result = retry_queue
428            .push(data.clone())
429            .await
430            .expect("should not fail to push data");
431        assert_eq!(0, push_result.items_dropped);
432        assert_eq!(0, push_result.events_dropped);
433
434        // Now pop the data back out and ensure it matches what we pushed, and that the file has been removed from disk.
435        let actual = retry_queue
436            .pop()
437            .await
438            .expect("should not fail to pop data")
439            .expect("should not be empty");
440        assert_eq!(data, actual);
441    }
442
443    #[tokio::test]
444    async fn capacity_accessors_report_memory_and_disk_capacity() {
445        let temp_dir = tempfile::tempdir().expect("should not fail to create temporary directory");
446        let root_path = temp_dir.path().to_path_buf();
447        let mut retry_queue = RetryQueue::<FakeData>::new("test".to_string(), 36)
448            .with_disk_persistence(PersistedQueueArgs {
449                root_path: root_path.clone(),
450                max_on_disk_bytes: 1024,
451                storage_max_disk_ratio: 1.0,
452                disk_usage_retriever: Arc::new(DiskUsageRetrieverImpl::new(root_path)),
453                max_age_days: 10,
454            })
455            .await
456            .expect("should not fail to create retry queue with disk persistence");
457
458        assert_eq!(retry_queue.max_in_memory_bytes(), 36);
459        assert_eq!(retry_queue.available_in_memory_capacity_bytes(), 36);
460        assert_eq!(
461            retry_queue
462                .available_on_disk_capacity_bytes()
463                .await
464                .expect("should not fail to calculate disk capacity"),
465            1024
466        );
467
468        let push_result = retry_queue
469            .push(FakeData::random())
470            .await
471            .expect("first push should succeed");
472        assert!(!push_result.had_drops());
473        assert_eq!(retry_queue.available_in_memory_capacity_bytes(), 0);
474        let push_result = retry_queue
475            .push(FakeData::random())
476            .await
477            .expect("second push should persist the oldest entry");
478        assert!(!push_result.had_drops());
479        assert_eq!(retry_queue.available_in_memory_capacity_bytes(), 0);
480
481        assert!(
482            retry_queue
483                .available_on_disk_capacity_bytes()
484                .await
485                .expect("should not fail to calculate disk capacity")
486                < 1024
487        );
488
489        let _ = retry_queue.pop().await.expect("pop should succeed");
490        assert_eq!(retry_queue.available_in_memory_capacity_bytes(), 36);
491    }
492
493    #[tokio::test]
494    async fn entry_too_large() {
495        let data = FakeData::random();
496
497        let mut retry_queue = RetryQueue::<FakeData>::new("test".to_string(), 1);
498
499        // Attempt to push our data into the queue, which should fail because it's too large.
500        assert!(retry_queue.push(data).await.is_err());
501    }
502
503    #[tokio::test]
504    async fn remove_oldest_entry_on_push() {
505        let data1 = FakeData::random();
506        let data2 = FakeData::random();
507
508        // Create our retry queue such that it is sized to only fit one entry at a time.
509        let mut retry_queue = RetryQueue::<FakeData>::new("test".to_string(), 36);
510
511        // Push our data to the queue.
512        let push_result = retry_queue.push(data1).await.expect("should not fail to push data");
513        assert_eq!(0, push_result.items_dropped);
514        assert_eq!(0, push_result.events_dropped);
515
516        // Push a second data entry, which should cause the first entry to be removed.
517        let push_result = retry_queue
518            .push(data2.clone())
519            .await
520            .expect("should not fail to push data");
521        assert_eq!(1, push_result.items_dropped);
522        assert_eq!(1, push_result.events_dropped);
523
524        // Now pop the data back out and ensure it matches the second item we pushed, indicating the first item was
525        // removed from the queue to make room.
526        let actual = retry_queue
527            .pop()
528            .await
529            .expect("should not fail to pop data")
530            .expect("should not be empty");
531        assert_eq!(data2, actual);
532    }
533
534    #[tokio::test]
535    async fn flush_no_disk() {
536        let data1 = FakeData::random();
537        let data2 = FakeData::random();
538
539        // Create our retry queue such that it can hold both items.
540        let mut retry_queue = RetryQueue::<FakeData>::new("test".to_string(), u64::MAX);
541
542        // Push our data to the queue.
543        let push_result1 = retry_queue.push(data1).await.expect("should not fail to push data");
544        assert_eq!(0, push_result1.items_dropped);
545        assert_eq!(0, push_result1.events_dropped);
546        let push_result2 = retry_queue.push(data2).await.expect("should not fail to push data");
547        assert_eq!(0, push_result2.items_dropped);
548        assert_eq!(0, push_result2.events_dropped);
549
550        // Flush the queue, which should drop all entries as we have no disk persistence layer configured.
551        let flush_result = retry_queue.flush().await.expect("should not fail to flush");
552        assert_eq!(2, flush_result.items_dropped);
553        assert_eq!(2, flush_result.events_dropped);
554    }
555
556    #[tokio::test]
557    async fn flush_disk() {
558        let data1 = FakeData::random();
559        let data2 = FakeData::random();
560
561        // Create our retry queue such that it can hold both items, and enable disk persistence.
562        let temp_dir = tempfile::tempdir().expect("should not fail to create temporary directory");
563        let root_path = temp_dir.path().to_path_buf();
564
565        // Just a sanity check to ensure our temp directory is empty.
566        assert_eq!(0, file_count_recursive(&root_path));
567
568        let mut retry_queue = RetryQueue::<FakeData>::new("test".to_string(), u64::MAX)
569            .with_disk_persistence(PersistedQueueArgs {
570                root_path: root_path.clone(),
571                max_on_disk_bytes: u64::MAX,
572                storage_max_disk_ratio: 1.0,
573                disk_usage_retriever: Arc::new(DiskUsageRetrieverImpl::new(root_path.clone())),
574                max_age_days: 10,
575            })
576            .await
577            .expect("should not fail to create retry queue with disk persistence");
578
579        // Push our data to the queue.
580        let push_result1 = retry_queue.push(data1).await.expect("should not fail to push data");
581        assert_eq!(0, push_result1.items_dropped);
582        assert_eq!(0, push_result1.events_dropped);
583        let push_result2 = retry_queue.push(data2).await.expect("should not fail to push data");
584        assert_eq!(0, push_result2.items_dropped);
585        assert_eq!(0, push_result2.events_dropped);
586
587        // Flush the queue, which should push all entries to disk.
588        let flush_result = retry_queue.flush().await.expect("should not fail to flush");
589        assert_eq!(0, flush_result.items_dropped);
590        assert_eq!(0, flush_result.events_dropped);
591
592        // We should now have two files on disk after flushing.
593        assert_eq!(2, file_count_recursive(&root_path));
594    }
595
596    #[tokio::test]
597    async fn disk_overflow_flushes_configured_memory_ratio() {
598        let data1 = FakeData::random();
599        let data2 = FakeData::random();
600        let data3 = FakeData::random();
601        let data4 = FakeData::random();
602
603        let temp_dir = tempfile::tempdir().expect("should not fail to create temporary directory");
604        let root_path = temp_dir.path().to_path_buf();
605
606        let mut retry_queue = RetryQueue::<FakeData>::new("test".to_string(), 120)
607            .with_flush_to_disk_mem_ratio(0.5)
608            .with_disk_persistence(PersistedQueueArgs {
609                root_path: root_path.clone(),
610                max_on_disk_bytes: u64::MAX,
611                storage_max_disk_ratio: 1.0,
612                disk_usage_retriever: Arc::new(DiskUsageRetrieverImpl::new(root_path.clone())),
613                max_age_days: 10,
614            })
615            .await
616            .expect("should not fail to create retry queue with disk persistence");
617
618        let push_result = retry_queue
619            .push(data1.clone())
620            .await
621            .expect("should not fail to push data");
622        assert_eq!(0, push_result.items_dropped);
623        assert_eq!(0, push_result.events_dropped);
624        let push_result = retry_queue
625            .push(data2.clone())
626            .await
627            .expect("should not fail to push data");
628        assert_eq!(0, push_result.items_dropped);
629        assert_eq!(0, push_result.events_dropped);
630        let push_result = retry_queue
631            .push(data3.clone())
632            .await
633            .expect("should not fail to push data");
634        assert_eq!(0, push_result.items_dropped);
635        assert_eq!(0, push_result.events_dropped);
636
637        let push_result = retry_queue
638            .push(data4.clone())
639            .await
640            .expect("should not fail to push data");
641        assert_eq!(0, push_result.items_dropped);
642        assert_eq!(0, push_result.events_dropped);
643        assert!(file_count_recursive(&root_path) >= 2);
644
645        // In-memory entries are popped first (data3, data4), followed by the entries that were flushed to disk
646        // (data1, data2 in FIFO order).
647        let actual = retry_queue
648            .pop()
649            .await
650            .expect("should not fail to pop data")
651            .expect("should not be empty");
652        assert_eq!(data3, actual);
653
654        let actual = retry_queue
655            .pop()
656            .await
657            .expect("should not fail to pop data")
658            .expect("should not be empty");
659        assert_eq!(data4, actual);
660
661        let actual = retry_queue
662            .pop()
663            .await
664            .expect("should not fail to pop data")
665            .expect("should not be empty");
666        assert_eq!(data1, actual);
667
668        let actual = retry_queue
669            .pop()
670            .await
671            .expect("should not fail to pop data")
672            .expect("should not be empty");
673        assert_eq!(data2, actual);
674    }
675
676    #[tokio::test]
677    async fn zero_disk_flush_ratio_persists_required_entries() {
678        let data1 = FakeData::random();
679        let data2 = FakeData::random();
680        let data3 = FakeData::random();
681
682        let temp_dir = tempfile::tempdir().expect("should not fail to create temporary directory");
683        let root_path = temp_dir.path().to_path_buf();
684
685        let mut retry_queue = RetryQueue::<FakeData>::new("test".to_string(), 72)
686            .with_flush_to_disk_mem_ratio(0.0)
687            .with_disk_persistence(PersistedQueueArgs {
688                root_path: root_path.clone(),
689                max_on_disk_bytes: u64::MAX,
690                storage_max_disk_ratio: 1.0,
691                disk_usage_retriever: Arc::new(DiskUsageRetrieverImpl::new(root_path.clone())),
692                max_age_days: 10,
693            })
694            .await
695            .expect("should not fail to create retry queue with disk persistence");
696
697        let push_result = retry_queue
698            .push(data1.clone())
699            .await
700            .expect("should not fail to push data");
701        assert_eq!(0, push_result.items_dropped);
702        assert_eq!(0, push_result.events_dropped);
703        let push_result = retry_queue
704            .push(data2.clone())
705            .await
706            .expect("should not fail to push data");
707        assert_eq!(0, push_result.items_dropped);
708        assert_eq!(0, push_result.events_dropped);
709
710        let push_result = retry_queue
711            .push(data3.clone())
712            .await
713            .expect("should not fail to push data");
714        assert_eq!(0, push_result.items_dropped);
715        assert_eq!(0, push_result.events_dropped);
716        assert_eq!(1, file_count_recursive(&root_path));
717
718        let actual = retry_queue
719            .pop()
720            .await
721            .expect("should not fail to pop data")
722            .expect("should not be empty");
723        assert_eq!(data2, actual);
724
725        let actual = retry_queue
726            .pop()
727            .await
728            .expect("should not fail to pop data")
729            .expect("should not be empty");
730        assert_eq!(data3, actual);
731
732        let actual = retry_queue
733            .pop()
734            .await
735            .expect("should not fail to pop data")
736            .expect("should not be empty");
737        assert_eq!(data1, actual);
738    }
739}