Skip to main content

saluki_io/net/util/retry/queue/
persisted.rs

1use std::{
2    io,
3    marker::PhantomData,
4    path::{Path, PathBuf},
5    sync::Arc,
6};
7
8use chrono::{DateTime, NaiveDateTime, Utc};
9use fs4::{available_space, total_space};
10use rand::Rng;
11use saluki_error::{generic_error, ErrorContext as _, GenericError};
12use serde::{de::DeserializeOwned, Serialize};
13use tracing::{debug, info, warn};
14
15use super::{EventContainer, PushResult};
16
17/// A persisted entry.
18///
19/// Represents the high-level metadata of a persisted entry, including the path to and size of the entry.
20struct PersistedEntry {
21    path: PathBuf,
22    timestamp: u128,
23    size_bytes: u64,
24}
25
26impl PersistedEntry {
27    /// Attempts to create a `PersistedEntry` from the given path.
28    ///
29    /// If the given path is not recognized as the path to a valid persisted entry, `None` is returned.
30    fn try_from_path(path: PathBuf, size_bytes: u64) -> Option<Self> {
31        let timestamp = decode_timestamped_filename(&path)?;
32        Some(Self {
33            path,
34            timestamp,
35            size_bytes,
36        })
37    }
38
39    fn from_parts(path: PathBuf, timestamp: u128, size_bytes: u64) -> Self {
40        Self {
41            path,
42            timestamp,
43            size_bytes,
44        }
45    }
46}
47
48pub trait DiskUsageRetriever {
49    fn total_space(&self) -> Result<u64, GenericError>;
50    fn available_space(&self) -> Result<u64, GenericError>;
51}
52
53pub struct DiskUsageRetrieverImpl {
54    root_path: PathBuf,
55}
56
57impl DiskUsageRetrieverImpl {
58    pub fn new(root_path: PathBuf) -> Self {
59        Self { root_path }
60    }
61}
62
63impl DiskUsageRetriever for DiskUsageRetrieverImpl {
64    fn total_space(&self) -> Result<u64, GenericError> {
65        total_space(&self.root_path)
66            .with_error_context(|| format!("Failed to get total space for '{}'.", self.root_path.display()))
67    }
68
69    fn available_space(&self) -> Result<u64, GenericError> {
70        available_space(&self.root_path)
71            .with_error_context(|| format!("Failed to get available space for '{}'.", self.root_path.display()))
72    }
73}
74
75#[derive(Clone)]
76pub struct DiskUsageRetrieverWrapper {
77    inner: Arc<dyn DiskUsageRetriever + Send + Sync>,
78}
79
80impl DiskUsageRetrieverWrapper {
81    pub fn new(disk_usage_retriever: Arc<dyn DiskUsageRetriever + Send + Sync>) -> Self {
82        Self {
83            inner: disk_usage_retriever,
84        }
85    }
86}
87
88pub struct PersistedQueue<T> {
89    root_path: PathBuf,
90    entries: Vec<PersistedEntry>,
91    total_on_disk_bytes: u64,
92    max_on_disk_bytes: u64,
93    storage_max_disk_ratio: f64,
94    disk_usage_retriever: DiskUsageRetrieverWrapper,
95    entries_dropped: u64,
96    _entry: PhantomData<T>,
97}
98
99impl<T> PersistedQueue<T>
100where
101    T: EventContainer + DeserializeOwned + Serialize,
102{
103    /// Creates a new `PersistedQueue` instance from the given root path and maximum size.
104    ///
105    /// The root path is created if it does not already exist, and is scanned for existing persisted entries. Entries
106    /// are removed (oldest first) until the total size of all scanned entries is within the given maximum size.
107    ///
108    /// # Errors
109    ///
110    /// If there is an error creating the root directory, or scanning it for existing entries, or deleting entries to
111    /// shrink the directory to fit the given maximum size, an error is returned.
112    pub async fn from_root_path(
113        root_path: PathBuf, max_on_disk_bytes: u64, storage_max_disk_ratio: f64,
114        disk_usage_retriever: DiskUsageRetrieverWrapper,
115    ) -> Result<Self, GenericError> {
116        // Make sure the directory exists first.
117        create_directory_recursive(root_path.clone())
118            .await
119            .with_error_context(|| format!("Failed to create retry directory '{}'.", root_path.display()))?;
120
121        let mut persisted_requests = Self {
122            root_path: root_path.clone(),
123            entries: Vec::new(),
124            total_on_disk_bytes: 0,
125            max_on_disk_bytes,
126            storage_max_disk_ratio,
127            disk_usage_retriever,
128            entries_dropped: 0,
129            _entry: PhantomData,
130        };
131
132        persisted_requests.refresh_entry_state().await?;
133
134        info!(
135            "Persisted retry queue initialized. Transactions will be stored in '{}'.",
136            root_path.display()
137        );
138
139        Ok(persisted_requests)
140    }
141
142    /// Returns `true` if the queue is empty.
143    pub fn is_empty(&self) -> bool {
144        self.entries.is_empty()
145    }
146
147    /// Returns the number of entries in the queue.
148    pub fn len(&self) -> usize {
149        self.entries.len()
150    }
151
152    /// Returns the number of entries that have been permanently dropped due to errors since the last call to this
153    /// method, resetting the counter.
154    pub fn take_entries_dropped(&mut self) -> u64 {
155        std::mem::take(&mut self.entries_dropped)
156    }
157
158    /// Enqueues an entry and persists it to disk.
159    ///
160    /// # Errors
161    ///
162    /// If there is an error serializing the entry, or writing it to disk, or removing older entries to make space for
163    /// the new entry, an error is returned.
164    pub async fn push(&mut self, entry: T) -> Result<PushResult, GenericError> {
165        // Serialize the entry to a temporary file.
166        let (filename, timestamp) = generate_timestamped_filename();
167        let entry_path = self.root_path.join(filename);
168        let serialized = serde_json::to_vec(&entry)
169            .with_error_context(|| format!("Failed to serialize entry for '{}'.", entry_path.display()))?;
170
171        if serialized.len() as u64 > self.max_on_disk_bytes {
172            return Err(generic_error!("Entry is too large to persist."));
173        }
174
175        // Make sure we have enough space to persist the entry.
176        let push_result = self
177            .remove_until_available_space(serialized.len() as u64)
178            .await
179            .error_context(
180                "Failed to remove older persisted entries to make space for the incoming persisted entry.",
181            )?;
182
183        // Actually persist it.
184        tokio::fs::write(&entry_path, &serialized)
185            .await
186            .with_error_context(|| format!("Failed to write entry to '{}'.", entry_path.display()))?;
187
188        // Add a new persisted entry to our state.
189        self.entries.push(PersistedEntry::from_parts(
190            entry_path,
191            timestamp,
192            serialized.len() as u64,
193        ));
194        self.total_on_disk_bytes += serialized.len() as u64;
195
196        debug!(entry.len = serialized.len(), "Enqueued persisted entry.");
197
198        Ok(push_result)
199    }
200
201    /// Consumes the oldest persisted entry on disk, if one exists.
202    ///
203    /// # Errors
204    ///
205    /// If there is an error reading or deserializing the entry, an error is returned.
206    pub async fn pop(&mut self) -> Result<Option<T>, GenericError> {
207        loop {
208            if self.entries.is_empty() {
209                return Ok(None);
210            }
211
212            let entry = self.entries.remove(0);
213            match try_deserialize_entry(&entry).await {
214                Ok(Some(deserialized)) => {
215                    // We got the deserialized entry, so remove it from our state and return it.
216                    self.total_on_disk_bytes -= entry.size_bytes;
217                    debug!(entry.len = entry.size_bytes, "Dequeued persisted entry.");
218
219                    return Ok(Some(deserialized));
220                }
221                Ok(None) => {
222                    // We couldn't read the entry from disk, which points to us potentially having invalid state about
223                    // what entries _are_ on disk, so we'll refresh our entry state and try again.
224                    self.refresh_entry_state().await?;
225                    continue;
226                }
227                Err(e) => {
228                    // The entry is corrupt or unreadable. Drop it permanently to avoid a poison pill scenario
229                    // where the same entry is retried indefinitely, blocking all other work.
230                    warn!(
231                        entry.path = %entry.path.display(),
232                        entry.len = entry.size_bytes,
233                        error = %e,
234                        "Permanently dropping persisted entry that could not be consumed.",
235                    );
236
237                    self.total_on_disk_bytes -= entry.size_bytes;
238                    self.entries_dropped += 1;
239                    continue;
240                }
241            }
242        }
243    }
244
245    async fn refresh_entry_state(&mut self) -> io::Result<()> {
246        // Scan the root path for persisted entries.
247        let mut entries = Vec::new();
248
249        let mut dir_reader = tokio::fs::read_dir(&self.root_path).await?;
250        while let Some(entry) = dir_reader.next_entry().await? {
251            let metadata = entry.metadata().await?;
252            if metadata.is_file() {
253                match PersistedEntry::try_from_path(entry.path(), metadata.len()) {
254                    Some(entry) => entries.push(entry),
255                    None => {
256                        warn!(
257                            file_size = metadata.len(),
258                            "Ignoring unrecognized file '{}' in retry directory.",
259                            entry.path().display()
260                        );
261                        continue;
262                    }
263                }
264            }
265        }
266
267        // Sort the entries by their inherent timestamp.
268        entries.sort_by_key(|entry| entry.timestamp);
269        self.total_on_disk_bytes = entries.iter().map(|entry| entry.size_bytes).sum();
270        self.entries = entries;
271
272        Ok(())
273    }
274
275    /// Removes persisted entries (oldest first) until there is at least the required number of bytes in free space
276    /// (maximum - total).
277    ///
278    /// # Errors
279    ///
280    /// If there is an error while deleting persisted entries, an error is returned.
281    async fn remove_until_available_space(&mut self, required_bytes: u64) -> Result<PushResult, GenericError> {
282        let mut push_result = PushResult::default();
283
284        let disk_usage_retriever = self.disk_usage_retriever.clone();
285        let storage_max_disk_ratio = self.storage_max_disk_ratio;
286        let max_on_disk_bytes = self.max_on_disk_bytes;
287
288        // TODO: Evaluate the possible failures scenarios a little more thoroughly, and see if we can improve
289        // how we handle them instead of just bailing out.
290        //
291        // Essentially, it's not clear to me if we would expect this to fail in a way where we could actually
292        // still write the persistent entries to disk, and if it's worth it to do something like trying to
293        // cache the last known good value we get here to use if we fail to get a new value, etc.
294        let limit = tokio::task::spawn_blocking(move || {
295            on_disk_bytes_limit(disk_usage_retriever, storage_max_disk_ratio, max_on_disk_bytes)
296        })
297        .await
298        .error_context("Failed to run disk size limit check to completion.")??;
299
300        while !self.entries.is_empty() && self.total_on_disk_bytes + required_bytes > limit {
301            let entry = self.entries.remove(0);
302
303            // Deserialize the entry, which gives us back the original event and removes the file from disk.
304            let event_count = match try_deserialize_entry::<T>(&entry).await {
305                Ok(Some(deserialized)) => deserialized.event_count(),
306                Ok(None) => {
307                    warn!(entry.path = %entry.path.display(), "Failed to find entry on disk. Persisted entry state may be inconsistent.");
308                    continue;
309                }
310                Err(e) => {
311                    // The entry is corrupt or unreadable. Drop it permanently to avoid blocking subsequent
312                    // entries from being evicted.
313                    warn!(
314                        entry.path = %entry.path.display(),
315                        entry.len = entry.size_bytes,
316                        error = %e,
317                        "Permanently dropping persisted entry that could not be consumed during eviction.",
318                    );
319
320                    self.total_on_disk_bytes -= entry.size_bytes;
321                    self.entries_dropped += 1;
322                    continue;
323                }
324            };
325
326            // Update our statistics.
327            self.total_on_disk_bytes -= entry.size_bytes;
328            push_result.track_dropped_item(event_count);
329
330            debug!(entry.path = %entry.path.display(), entry.len = entry.size_bytes, "Dropped persisted entry.");
331        }
332
333        Ok(push_result)
334    }
335}
336
337/// Determines the total number of bytes that can be written to disk without causing the underlying volume to end up
338/// with more than `storage_max_disk_ratio` in terms of used space. The minimum of `max_on_disk_bytes` and the result
339/// of this calculation is returned.
340///
341/// # Errors
342///
343/// If there is an error while retrieving the total or available space of the underlying volume, an error is returned.
344fn on_disk_bytes_limit(
345    disk_usage_retriever: DiskUsageRetrieverWrapper, storage_max_disk_ratio: f64, max_on_disk_bytes: u64,
346) -> Result<u64, GenericError> {
347    let total_space = disk_usage_retriever.inner.total_space()? as f64;
348    let available_space = disk_usage_retriever.inner.available_space()? as f64;
349    let disk_reserved = total_space * (1.0 - storage_max_disk_ratio);
350    let available_disk_usage = (available_space - disk_reserved).ceil() as u64;
351    Ok(max_on_disk_bytes.min(available_disk_usage))
352}
353
354async fn try_deserialize_entry<T: DeserializeOwned>(entry: &PersistedEntry) -> Result<Option<T>, GenericError> {
355    let serialized = match tokio::fs::read(&entry.path).await {
356        Ok(serialized) => serialized,
357        Err(e) => match e.kind() {
358            io::ErrorKind::NotFound => {
359                // We tried to delete an entry that no longer exists on disk, which means our internal entry state
360                // is corrupted for some reason.
361                //
362                // Tell the caller that we couldn't find the entry on disk, so that they need to refresh the entry state
363                // to make sure it's up-to-date before trying again.
364                return Ok(None);
365            }
366            _ => {
367                return Err(e)
368                    .with_error_context(|| format!("Failed to read persisted entry '{}'.", entry.path.display()))
369            }
370        },
371    };
372
373    let deserialized = match serde_json::from_slice(&serialized) {
374        Ok(deserialized) => deserialized,
375        Err(e) => {
376            // Deserialization failed, which means the payload is corrupt or invalid. Attempt to clean up the
377            // file from disk so it doesn't accumulate, but don't fail if we can't.
378            if let Err(remove_err) = tokio::fs::remove_file(&entry.path).await {
379                warn!(
380                    entry.path = %entry.path.display(),
381                    error = %remove_err,
382                    "Failed to remove corrupt persisted entry from disk.",
383                );
384            }
385
386            return Err(e)
387                .with_error_context(|| format!("Failed to deserialize persisted entry '{}'.", entry.path.display()));
388        }
389    };
390
391    // Delete the entry from disk before returning, so that we don't risk sending duplicates.
392    tokio::fs::remove_file(&entry.path)
393        .await
394        .with_error_context(|| format!("Failed to delete persisted entry '{}'.", entry.path.display()))?;
395
396    debug!(entry.path = %entry.path.display(), entry.len = entry.size_bytes, "Consumed persisted entry and removed from disk.");
397    Ok(Some(deserialized))
398}
399
400fn generate_timestamped_filename() -> (PathBuf, u128) {
401    let now = Utc::now();
402    let now_ts = datetime_to_timestamp(now);
403    let nonce = rand::rng().random_range(100000000..999999999);
404
405    let filename = format!("retry-{}-{}.json", now.format("%Y%m%d%H%M%S%f"), nonce).into();
406
407    (filename, now_ts)
408}
409
410fn decode_timestamped_filename(path: &Path) -> Option<u128> {
411    let filename = path.file_stem()?.to_str()?;
412    let mut filename_parts = filename.split('-');
413
414    let prefix = filename_parts.next()?;
415    let timestamp_str = filename_parts.next()?;
416    let nonce = filename_parts.next()?;
417
418    // Make sure the filename matches our expected format by first checking the prefix and nonce portions.
419    if prefix != "retry" || nonce.parse::<u64>().is_err() {
420        return None;
421    }
422
423    // Try and decode the timestamp portion.
424    NaiveDateTime::parse_from_str(timestamp_str, "%Y%m%d%H%M%S%f")
425        .map(|dt| datetime_to_timestamp(dt.and_utc()))
426        .ok()
427}
428
429fn datetime_to_timestamp(dt: DateTime<Utc>) -> u128 {
430    let secs = (dt.timestamp() as u128) * 1_000_000_000;
431    let ns = dt.timestamp_subsec_nanos() as u128;
432
433    secs + ns
434}
435
436async fn create_directory_recursive(path: PathBuf) -> Result<(), GenericError> {
437    let mut dir_builder = std::fs::DirBuilder::new();
438    dir_builder.recursive(true);
439
440    // When on Unix platforms, adjust the permissions of the directory to be RWX for the owner only, and nothing for
441    // group/world.
442    #[cfg(unix)]
443    {
444        use std::os::unix::fs::DirBuilderExt;
445        dir_builder.mode(0o700);
446    }
447
448    tokio::task::spawn_blocking(move || {
449        dir_builder
450            .create(&path)
451            .with_error_context(|| format!("Failed to create directory '{}'.", path.display()))
452    })
453    .await
454    .error_context("Failed to spawn directory creation blocking task.")?
455}
456
457#[cfg(test)]
458mod tests {
459    use rand_distr::Alphanumeric;
460    use serde::Deserialize;
461
462    use super::*;
463
464    #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
465    struct FakeData {
466        name: String,
467        value: u32,
468    }
469
470    impl FakeData {
471        fn random() -> Self {
472            Self {
473                name: rand::rng().sample_iter(&Alphanumeric).take(8).map(char::from).collect(),
474                value: rand::rng().random_range(0..100),
475            }
476        }
477    }
478
479    impl EventContainer for FakeData {
480        fn event_count(&self) -> u64 {
481            1
482        }
483    }
484
485    struct MockDiskUsageRetriever {}
486
487    impl DiskUsageRetriever for MockDiskUsageRetriever {
488        fn total_space(&self) -> Result<u64, GenericError> {
489            Ok(100)
490        }
491        fn available_space(&self) -> Result<u64, GenericError> {
492            Ok(100)
493        }
494    }
495
496    async fn files_in_dir(path: &Path) -> usize {
497        let mut file_count = 0;
498        let mut dir_reader = tokio::fs::read_dir(path).await.unwrap();
499        while let Some(entry) = dir_reader.next_entry().await.unwrap() {
500            if entry.metadata().await.unwrap().is_file() {
501                file_count += 1;
502            }
503        }
504        file_count
505    }
506
507    #[tokio::test]
508    async fn basic_push_pop() {
509        let data = FakeData::random();
510
511        // Create our temporary directory and point our persisted queue at it.
512        let temp_dir = tempfile::tempdir().expect("should not fail to create temporary directory");
513        let root_path = temp_dir.path().to_path_buf();
514
515        let mut persisted_queue = PersistedQueue::<FakeData>::from_root_path(
516            root_path.clone(),
517            1024,
518            0.8,
519            DiskUsageRetrieverWrapper::new(Arc::new(DiskUsageRetrieverImpl::new(root_path.clone()))),
520        )
521        .await
522        .expect("should not fail to create persisted queue");
523
524        // Ensure the directory is empty.
525        assert_eq!(0, files_in_dir(&root_path).await);
526
527        // Push our data to the queue and ensure it persisted it to disk.
528        let push_result = persisted_queue
529            .push(data.clone())
530            .await
531            .expect("should not fail to push data");
532        assert_eq!(1, files_in_dir(&root_path).await);
533        assert_eq!(0, push_result.items_dropped);
534        assert_eq!(0, push_result.events_dropped);
535
536        // Now pop the data back out and ensure it matches what we pushed, and that the file has been removed from disk.
537        let actual = persisted_queue
538            .pop()
539            .await
540            .expect("should not fail to pop data")
541            .expect("should not be empty");
542        assert_eq!(data, actual);
543        assert_eq!(0, files_in_dir(&root_path).await);
544    }
545
546    #[tokio::test]
547    async fn entry_too_large() {
548        let data = FakeData::random();
549
550        // Create our temporary directory and point our persisted queue at it.
551        let temp_dir = tempfile::tempdir().expect("should not fail to create temporary directory");
552        let root_path = temp_dir.path().to_path_buf();
553
554        let mut persisted_queue = PersistedQueue::<FakeData>::from_root_path(
555            root_path.clone(),
556            1,
557            0.8,
558            DiskUsageRetrieverWrapper::new(Arc::new(DiskUsageRetrieverImpl::new(root_path.clone()))),
559        )
560        .await
561        .expect("should not fail to create persisted queue");
562
563        // Ensure the directory is empty.
564        assert_eq!(0, files_in_dir(&root_path).await);
565
566        // Attempt to push our data into the queue, which should fail because it's too large.
567        assert!(persisted_queue.push(data).await.is_err());
568
569        // Ensure the directory is (still) empty.
570        assert_eq!(0, files_in_dir(&root_path).await);
571    }
572
573    #[tokio::test]
574    async fn remove_oldest_entry_on_push() {
575        let data1 = FakeData::random();
576        let data2 = FakeData::random();
577
578        // Create our temporary directory and point our persisted queue at it.
579        //
580        // Our queue is sized such that only one entry can be persisted at a time.
581        let temp_dir = tempfile::tempdir().expect("should not fail to create temporary directory");
582        let root_path = temp_dir.path().to_path_buf();
583
584        let mut persisted_queue = PersistedQueue::<FakeData>::from_root_path(
585            root_path.clone(),
586            32,
587            0.8,
588            DiskUsageRetrieverWrapper::new(Arc::new(DiskUsageRetrieverImpl::new(root_path.clone()))),
589        )
590        .await
591        .expect("should not fail to create persisted queue");
592
593        // Ensure the directory is empty.
594        assert_eq!(0, files_in_dir(&root_path).await);
595
596        // Push our data to the queue and ensure it persisted it to disk.
597        let push_result = persisted_queue.push(data1).await.expect("should not fail to push data");
598        assert_eq!(1, files_in_dir(&root_path).await);
599        assert_eq!(0, push_result.items_dropped);
600        assert_eq!(0, push_result.events_dropped);
601
602        // Push a second data entry, which should cause the first entry to be removed.
603        let push_result = persisted_queue
604            .push(data2.clone())
605            .await
606            .expect("should not fail to push data");
607        assert_eq!(1, files_in_dir(&root_path).await);
608        assert_eq!(1, push_result.items_dropped);
609        assert_eq!(1, push_result.events_dropped);
610
611        // Now pop the data back out and ensure it matches the second item we pushed -- indicating the first item was
612        // removed -- and that we've consumed it, leaving no files on disk.
613        let actual = persisted_queue
614            .pop()
615            .await
616            .expect("should not fail to pop data")
617            .expect("should not be empty");
618        assert_eq!(data2, actual);
619        assert_eq!(0, files_in_dir(&root_path).await);
620    }
621
622    #[tokio::test]
623    async fn storage_ratio_exceeded() {
624        let data1 = FakeData::random();
625        let data2 = FakeData::random();
626
627        // Create our temporary directory and point our persisted queue at it.
628        //
629        // Our queue is sized such that two entries can be persisted at a time.
630        let temp_dir = tempfile::tempdir().expect("should not fail to create temporary directory");
631        let root_path = temp_dir.path().to_path_buf();
632
633        let mut persisted_queue = PersistedQueue::<FakeData>::from_root_path(
634            root_path.clone(),
635            80,
636            0.35,
637            DiskUsageRetrieverWrapper::new(Arc::new(MockDiskUsageRetriever {})),
638        )
639        .await
640        .expect("should not fail to create persisted queue");
641
642        // Ensure the directory is empty.
643        assert_eq!(0, files_in_dir(&root_path).await);
644
645        // The `storage_max_disk_ratio` is 0.35, and our `MockDiskUsageRetriever` returns 100 for both `total_space` and
646        // `available_space`, so `on_disk_bytes_limit()` returns min(80, 35) = 35.
647        //
648        // First entry: total_on_disk_bytes(0) + required_bytes(30) < on_disk_bytes_limit(35)
649        let push_result = persisted_queue.push(data1).await.expect("should not fail to push data");
650
651        assert_eq!(1, files_in_dir(&root_path).await);
652        assert_eq!(0, push_result.items_dropped);
653        assert_eq!(0, push_result.events_dropped);
654
655        // Second entry: total_on_disk_bytes(30) + required_bytes(30) > on_disk_bytes_limit(35) so the first entry is dropped.
656        let push_result = persisted_queue
657            .push(data2.clone())
658            .await
659            .expect("should not fail to push data");
660        assert_eq!(1, files_in_dir(&root_path).await);
661        assert_eq!(1, push_result.items_dropped);
662        assert_eq!(1, push_result.events_dropped);
663
664        // Now pop the data back out and ensure it matches the second item we pushed -- indicating the first item was
665        // removed -- and that we've consumed it, leaving no files on disk.
666        let actual = persisted_queue
667            .pop()
668            .await
669            .expect("should not fail to pop data")
670            .expect("should not be empty");
671        assert_eq!(data2, actual);
672        assert_eq!(0, files_in_dir(&root_path).await);
673    }
674
675    /// Writes a corrupt (non-JSON) file with a valid retry filename to the given directory, using a timestamp
676    /// that sorts before any real entries (so it will be popped first).
677    async fn write_corrupt_entry(dir: &Path) -> PathBuf {
678        let filename = "retry-20000101000000000000-100000000.json";
679        let path = dir.join(filename);
680        tokio::fs::write(&path, b"this is not valid json").await.unwrap();
681        path
682    }
683
684    #[tokio::test]
685    async fn corrupt_entry_is_skipped_on_pop() {
686        let data = FakeData::random();
687
688        let temp_dir = tempfile::tempdir().expect("should not fail to create temporary directory");
689        let root_path = temp_dir.path().to_path_buf();
690
691        let mut persisted_queue = PersistedQueue::<FakeData>::from_root_path(
692            root_path.clone(),
693            1024,
694            0.8,
695            DiskUsageRetrieverWrapper::new(Arc::new(MockDiskUsageRetriever {})),
696        )
697        .await
698        .expect("should not fail to create persisted queue");
699
700        // Write a corrupt file before pushing valid data, so it sorts first.
701        let corrupt_path = write_corrupt_entry(&root_path).await;
702
703        // Push a valid entry.
704        let _ = persisted_queue
705            .push(data.clone())
706            .await
707            .expect("should not fail to push data");
708
709        // Refresh state so the queue picks up the corrupt file.
710        persisted_queue.refresh_entry_state().await.unwrap();
711
712        // Pop should skip the corrupt entry and return the valid one.
713        let actual = persisted_queue
714            .pop()
715            .await
716            .expect("should not fail to pop data")
717            .expect("should have a valid entry");
718        assert_eq!(data, actual);
719
720        // The corrupt file should have been cleaned up from disk.
721        assert!(!corrupt_path.exists());
722
723        // The dropped counter should reflect the corrupt entry.
724        assert_eq!(1, persisted_queue.take_entries_dropped());
725
726        // No files should remain.
727        assert_eq!(0, files_in_dir(&root_path).await);
728    }
729
730    #[tokio::test]
731    async fn corrupt_entry_does_not_block_queue() {
732        let data1 = FakeData::random();
733        let data2 = FakeData::random();
734
735        let temp_dir = tempfile::tempdir().expect("should not fail to create temporary directory");
736        let root_path = temp_dir.path().to_path_buf();
737
738        // Use MockDiskUsageRetriever to avoid disk space ratio causing eviction during push.
739        let mut persisted_queue = PersistedQueue::<FakeData>::from_root_path(
740            root_path.clone(),
741            1024,
742            0.8,
743            DiskUsageRetrieverWrapper::new(Arc::new(MockDiskUsageRetriever {})),
744        )
745        .await
746        .expect("should not fail to create persisted queue");
747
748        // Push two valid entries, then corrupt the first one on disk.
749        let _ = persisted_queue.push(data1).await.expect("should not fail to push data");
750        let _ = persisted_queue
751            .push(data2.clone())
752            .await
753            .expect("should not fail to push data");
754        assert_eq!(2, persisted_queue.entries.len());
755
756        // Corrupt the oldest entry file on disk.
757        let oldest_path = persisted_queue.entries[0].path.clone();
758        tokio::fs::write(&oldest_path, b"corrupted").await.unwrap();
759
760        // Pop should skip the corrupt entry and return the second valid one.
761        let actual = persisted_queue
762            .pop()
763            .await
764            .expect("should not fail to pop data")
765            .expect("should have a valid entry");
766        assert_eq!(data2, actual);
767
768        assert_eq!(1, persisted_queue.take_entries_dropped());
769        assert_eq!(0, files_in_dir(&root_path).await);
770    }
771
772    #[tokio::test]
773    async fn pop_returns_none_when_all_entries_corrupt() {
774        let temp_dir = tempfile::tempdir().expect("should not fail to create temporary directory");
775        let root_path = temp_dir.path().to_path_buf();
776
777        let mut persisted_queue = PersistedQueue::<FakeData>::from_root_path(
778            root_path.clone(),
779            1024,
780            0.8,
781            DiskUsageRetrieverWrapper::new(Arc::new(MockDiskUsageRetriever {})),
782        )
783        .await
784        .expect("should not fail to create persisted queue");
785
786        // Write a corrupt entry and refresh state.
787        write_corrupt_entry(&root_path).await;
788        persisted_queue.refresh_entry_state().await.unwrap();
789
790        // Pop should skip the corrupt entry and return None (no valid entries).
791        let result = persisted_queue.pop().await.expect("should not fail to pop data");
792        assert!(result.is_none());
793
794        assert_eq!(1, persisted_queue.take_entries_dropped());
795        assert_eq!(0, files_in_dir(&root_path).await);
796    }
797
798    #[tokio::test]
799    async fn corrupt_entry_dropped_during_eviction() {
800        let data = FakeData::random();
801
802        let temp_dir = tempfile::tempdir().expect("should not fail to create temporary directory");
803        let root_path = temp_dir.path().to_path_buf();
804
805        // Queue sized to hold only one entry.
806        let mut persisted_queue = PersistedQueue::<FakeData>::from_root_path(
807            root_path.clone(),
808            32,
809            0.8,
810            DiskUsageRetrieverWrapper::new(Arc::new(MockDiskUsageRetriever {})),
811        )
812        .await
813        .expect("should not fail to create persisted queue");
814
815        // Push a valid entry, then corrupt it on disk.
816        let _ = persisted_queue
817            .push(FakeData::random())
818            .await
819            .expect("should not fail to push data");
820        let first_path = persisted_queue.entries[0].path.clone();
821        tokio::fs::write(&first_path, b"corrupted").await.unwrap();
822
823        // Push another entry, which needs to evict the first (corrupt) one to make space.
824        // This should succeed without error -- the corrupt entry is dropped during eviction.
825        let _ = persisted_queue
826            .push(data.clone())
827            .await
828            .expect("should not fail to push data");
829
830        // The corrupt entry was dropped during eviction, not via normal eviction tracking.
831        assert_eq!(1, persisted_queue.take_entries_dropped());
832
833        // The valid entry should be poppable.
834        let actual = persisted_queue
835            .pop()
836            .await
837            .expect("should not fail to pop data")
838            .expect("should have a valid entry");
839        assert_eq!(data, actual);
840        assert_eq!(0, files_in_dir(&root_path).await);
841    }
842}