Skip to main content

saluki_io/net/util/retry/queue/
persisted.rs

1use std::{
2    io,
3    marker::PhantomData,
4    path::{Path, PathBuf},
5    sync::Arc,
6};
7
8use chrono::{DateTime, NaiveDateTime, Utc};
9use fs4::{available_space, total_space};
10use rand::RngExt as _;
11use saluki_error::{generic_error, ErrorContext as _, GenericError};
12use serde::{de::DeserializeOwned, Serialize};
13use tracing::{debug, info, warn};
14
15use super::{EventContainer, PushResult};
16
17/// A persisted entry.
18///
19/// Represents the high-level metadata of a persisted entry, including the path to and size of the entry.
20struct PersistedEntry {
21    path: PathBuf,
22    timestamp: u128,
23    size_bytes: u64,
24}
25
26impl PersistedEntry {
27    /// Attempts to create a `PersistedEntry` from the given path.
28    ///
29    /// If the given path is not recognized as the path to a valid persisted entry, `None` is returned.
30    fn try_from_path(path: PathBuf, size_bytes: u64) -> Option<Self> {
31        let timestamp = decode_timestamped_filename(&path)?;
32        Some(Self {
33            path,
34            timestamp,
35            size_bytes,
36        })
37    }
38
39    fn from_parts(path: PathBuf, timestamp: u128, size_bytes: u64) -> Self {
40        Self {
41            path,
42            timestamp,
43            size_bytes,
44        }
45    }
46}
47
48pub trait DiskUsageRetriever {
49    fn total_space(&self) -> Result<u64, GenericError>;
50    fn available_space(&self) -> Result<u64, GenericError>;
51}
52
53pub struct DiskUsageRetrieverImpl {
54    root_path: PathBuf,
55}
56
57impl DiskUsageRetrieverImpl {
58    pub fn new(root_path: PathBuf) -> Self {
59        Self { root_path }
60    }
61}
62
63impl DiskUsageRetriever for DiskUsageRetrieverImpl {
64    fn total_space(&self) -> Result<u64, GenericError> {
65        total_space(&self.root_path)
66            .with_error_context(|| format!("Failed to get total space for '{}'.", self.root_path.display()))
67    }
68
69    fn available_space(&self) -> Result<u64, GenericError> {
70        available_space(&self.root_path)
71            .with_error_context(|| format!("Failed to get available space for '{}'.", self.root_path.display()))
72    }
73}
74
75#[derive(Clone)]
76pub struct DiskUsageRetrieverWrapper {
77    inner: Arc<dyn DiskUsageRetriever + Send + Sync>,
78}
79
80impl DiskUsageRetrieverWrapper {
81    pub fn new(disk_usage_retriever: Arc<dyn DiskUsageRetriever + Send + Sync>) -> Self {
82        Self {
83            inner: disk_usage_retriever,
84        }
85    }
86}
87
88pub struct PersistedQueue<T> {
89    root_path: PathBuf,
90    entries: Vec<PersistedEntry>,
91    total_on_disk_bytes: u64,
92    max_on_disk_bytes: u64,
93    storage_max_disk_ratio: f64,
94    disk_usage_retriever: DiskUsageRetrieverWrapper,
95    entries_dropped: u64,
96    _entry: PhantomData<T>,
97}
98
99impl<T> PersistedQueue<T>
100where
101    T: EventContainer + DeserializeOwned + Serialize,
102{
103    /// Creates a new `PersistedQueue` instance from the given root path and maximum size.
104    ///
105    /// The root path is created if it does not already exist, and is scanned for existing persisted entries. Entries
106    /// are removed (oldest first) until the total size of all scanned entries is within the given maximum size.
107    ///
108    /// # Errors
109    ///
110    /// If there is an error creating the root directory, or scanning it for existing entries, or deleting entries to
111    /// shrink the directory to fit the given maximum size, an error is returned.
112    pub async fn from_root_path(
113        root_path: PathBuf, max_on_disk_bytes: u64, storage_max_disk_ratio: f64,
114        disk_usage_retriever: DiskUsageRetrieverWrapper,
115    ) -> Result<Self, GenericError> {
116        // Make sure the directory exists first.
117        create_directory_recursive(root_path.clone())
118            .await
119            .with_error_context(|| format!("Failed to create retry directory '{}'.", root_path.display()))?;
120
121        let mut persisted_requests = Self {
122            root_path: root_path.clone(),
123            entries: Vec::new(),
124            total_on_disk_bytes: 0,
125            max_on_disk_bytes,
126            storage_max_disk_ratio,
127            disk_usage_retriever,
128            entries_dropped: 0,
129            _entry: PhantomData,
130        };
131
132        persisted_requests.refresh_entry_state().await?;
133
134        info!(
135            "Persisted retry queue initialized. Transactions will be stored in '{}'.",
136            root_path.display()
137        );
138
139        Ok(persisted_requests)
140    }
141
142    /// Returns `true` if the queue is empty.
143    pub fn is_empty(&self) -> bool {
144        self.entries.is_empty()
145    }
146
147    /// Returns the number of entries in the queue.
148    pub fn len(&self) -> usize {
149        self.entries.len()
150    }
151
152    /// Returns the number of entries that have been permanently dropped due to errors since the last call to this
153    /// method, resetting the counter.
154    pub fn take_entries_dropped(&mut self) -> u64 {
155        std::mem::take(&mut self.entries_dropped)
156    }
157
158    /// Enqueues an entry and persists it to disk.
159    ///
160    /// # Errors
161    ///
162    /// If there is an error serializing the entry, or writing it to disk, or removing older entries to make space for
163    /// the new entry, an error is returned.
164    pub async fn push(&mut self, entry: T) -> Result<PushResult, GenericError> {
165        // Serialize the entry to a temporary file.
166        let (filename, timestamp) = generate_timestamped_filename();
167        let entry_path = self.root_path.join(filename);
168        let serialized = serde_json::to_vec(&entry)
169            .with_error_context(|| format!("Failed to serialize entry for '{}'.", entry_path.display()))?;
170
171        if serialized.len() as u64 > self.max_on_disk_bytes {
172            return Err(generic_error!("Entry is too large to persist."));
173        }
174
175        // Make sure we have enough space to persist the entry.
176        let push_result = self
177            .remove_until_available_space(serialized.len() as u64)
178            .await
179            .error_context(
180                "Failed to remove older persisted entries to make space for the incoming persisted entry.",
181            )?;
182
183        // Actually persist it.
184        tokio::fs::write(&entry_path, &serialized)
185            .await
186            .with_error_context(|| format!("Failed to write entry to '{}'.", entry_path.display()))?;
187
188        // Add a new persisted entry to our state.
189        self.entries.push(PersistedEntry::from_parts(
190            entry_path,
191            timestamp,
192            serialized.len() as u64,
193        ));
194        self.total_on_disk_bytes += serialized.len() as u64;
195
196        debug!(entry.len = serialized.len(), "Enqueued persisted entry.");
197
198        Ok(push_result)
199    }
200
201    /// Consumes the oldest persisted entry on disk, if one exists.
202    ///
203    /// # Errors
204    ///
205    /// If there is an error reading or deserializing the entry, an error is returned.
206    pub async fn pop(&mut self) -> Result<Option<T>, GenericError> {
207        loop {
208            if self.entries.is_empty() {
209                return Ok(None);
210            }
211
212            let entry = self.entries.remove(0);
213            match try_deserialize_entry(&entry).await {
214                Ok(Some(deserialized)) => {
215                    // We got the deserialized entry, so remove it from our state and return it.
216                    self.total_on_disk_bytes -= entry.size_bytes;
217                    debug!(entry.len = entry.size_bytes, "Dequeued persisted entry.");
218
219                    return Ok(Some(deserialized));
220                }
221                Ok(None) => {
222                    // We couldn't read the entry from disk, which points to us potentially having invalid state about
223                    // what entries _are_ on disk, so we'll refresh our entry state and try again.
224                    self.refresh_entry_state().await?;
225                    continue;
226                }
227                Err(e) => {
228                    // The entry is corrupt or unreadable. Drop it permanently to avoid a poison pill scenario
229                    // where the same entry is retried indefinitely, blocking all other work.
230                    warn!(
231                        entry.path = %entry.path.display(),
232                        entry.len = entry.size_bytes,
233                        error = %e,
234                        "Permanently dropping persisted entry that could not be consumed.",
235                    );
236
237                    self.total_on_disk_bytes -= entry.size_bytes;
238                    self.entries_dropped += 1;
239                    continue;
240                }
241            }
242        }
243    }
244
245    async fn refresh_entry_state(&mut self) -> io::Result<()> {
246        // Scan the root path for persisted entries.
247        let mut entries = Vec::new();
248
249        let mut dir_reader = tokio::fs::read_dir(&self.root_path).await?;
250        while let Some(entry) = dir_reader.next_entry().await? {
251            let metadata = entry.metadata().await?;
252            if metadata.is_file() {
253                match PersistedEntry::try_from_path(entry.path(), metadata.len()) {
254                    Some(entry) => entries.push(entry),
255                    None => {
256                        warn!(
257                            file_size = metadata.len(),
258                            "Ignoring unrecognized file '{}' in retry directory.",
259                            entry.path().display()
260                        );
261                        continue;
262                    }
263                }
264            }
265        }
266
267        // Sort the entries by their inherent timestamp.
268        entries.sort_by_key(|entry| entry.timestamp);
269        self.total_on_disk_bytes = entries.iter().map(|entry| entry.size_bytes).sum();
270        self.entries = entries;
271
272        Ok(())
273    }
274
275    /// Removes persisted entries (oldest first) until there is at least the required number of bytes in free space
276    /// (maximum - total).
277    ///
278    /// # Errors
279    ///
280    /// If there is an error while deleting persisted entries, an error is returned.
281    async fn remove_until_available_space(&mut self, required_bytes: u64) -> Result<PushResult, GenericError> {
282        let mut push_result = PushResult::default();
283
284        let disk_usage_retriever = self.disk_usage_retriever.clone();
285        let storage_max_disk_ratio = self.storage_max_disk_ratio;
286        let max_on_disk_bytes = self.max_on_disk_bytes;
287
288        // TODO: Evaluate the possible failures scenarios a little more thoroughly, and see if we can improve
289        // how we handle them instead of just bailing out.
290        //
291        // Essentially, it's not clear to me if we would expect this to fail in a way where we could actually
292        // still write the persistent entries to disk, and if it's worth it to do something like trying to
293        // cache the last known good value we get here to use if we fail to get a new value, etc.
294        let limit = tokio::task::spawn_blocking(move || {
295            on_disk_bytes_limit(disk_usage_retriever, storage_max_disk_ratio, max_on_disk_bytes)
296        })
297        .await
298        .error_context("Failed to run disk size limit check to completion.")??;
299
300        while !self.entries.is_empty() && self.total_on_disk_bytes + required_bytes > limit {
301            let entry = self.entries.remove(0);
302
303            // Deserialize the entry, which gives us back the original event and removes the file from disk.
304            let event_count = match try_deserialize_entry::<T>(&entry).await {
305                Ok(Some(deserialized)) => deserialized.event_count(),
306                Ok(None) => {
307                    warn!(entry.path = %entry.path.display(), "Failed to find entry on disk. Persisted entry state may be inconsistent.");
308                    continue;
309                }
310                Err(e) => {
311                    // The entry is corrupt or unreadable. Drop it permanently to avoid blocking subsequent
312                    // entries from being evicted.
313                    warn!(
314                        entry.path = %entry.path.display(),
315                        entry.len = entry.size_bytes,
316                        error = %e,
317                        "Permanently dropping persisted entry that could not be consumed during eviction.",
318                    );
319
320                    self.total_on_disk_bytes -= entry.size_bytes;
321                    self.entries_dropped += 1;
322                    continue;
323                }
324            };
325
326            // Update our statistics.
327            self.total_on_disk_bytes -= entry.size_bytes;
328            push_result.track_dropped_item(event_count);
329
330            warn!(entry.path = %entry.path.display(), entry.len = entry.size_bytes, "Dropped persisted entry.");
331        }
332
333        Ok(push_result)
334    }
335}
336
337/// Determines the total number of bytes that can be written to disk without causing the underlying volume to end up
338/// with more than `storage_max_disk_ratio` in terms of used space. The minimum of `max_on_disk_bytes` and the result
339/// of this calculation is returned.
340///
341/// # Errors
342///
343/// If there is an error while retrieving the total or available space of the underlying volume, an error is returned.
344fn on_disk_bytes_limit(
345    disk_usage_retriever: DiskUsageRetrieverWrapper, storage_max_disk_ratio: f64, max_on_disk_bytes: u64,
346) -> Result<u64, GenericError> {
347    let total_space = disk_usage_retriever.inner.total_space()? as f64;
348    let available_space = disk_usage_retriever.inner.available_space()? as f64;
349    let disk_reserved = total_space * (1.0 - storage_max_disk_ratio);
350    let available_disk_usage = (available_space - disk_reserved).ceil() as u64;
351    Ok(max_on_disk_bytes.min(available_disk_usage))
352}
353
354async fn try_deserialize_entry<T: DeserializeOwned>(entry: &PersistedEntry) -> Result<Option<T>, GenericError> {
355    let serialized = match tokio::fs::read(&entry.path).await {
356        Ok(serialized) => serialized,
357        Err(e) => match e.kind() {
358            io::ErrorKind::NotFound => {
359                // We tried to delete an entry that no longer exists on disk, which means our internal entry state
360                // is corrupted for some reason.
361                //
362                // Tell the caller that we couldn't find the entry on disk, so that they need to refresh the entry state
363                // to make sure it's up-to-date before trying again.
364                return Ok(None);
365            }
366            _ => {
367                return Err(e)
368                    .with_error_context(|| format!("Failed to read persisted entry '{}'.", entry.path.display()))
369            }
370        },
371    };
372
373    let deserialized = match serde_json::from_slice(&serialized) {
374        Ok(deserialized) => deserialized,
375        Err(e) => {
376            // Deserialization failed, which means the payload is corrupt or invalid. Attempt to clean up the
377            // file from disk so it doesn't accumulate, but don't fail if we can't.
378            if let Err(remove_err) = tokio::fs::remove_file(&entry.path).await {
379                warn!(
380                    entry.path = %entry.path.display(),
381                    error = %remove_err,
382                    "Failed to remove corrupt persisted entry from disk.",
383                );
384            }
385
386            return Err(e)
387                .with_error_context(|| format!("Failed to deserialize persisted entry '{}'.", entry.path.display()));
388        }
389    };
390
391    // Delete the entry from disk before returning, so that we don't risk sending duplicates.
392    tokio::fs::remove_file(&entry.path)
393        .await
394        .with_error_context(|| format!("Failed to delete persisted entry '{}'.", entry.path.display()))?;
395
396    debug!(entry.path = %entry.path.display(), entry.len = entry.size_bytes, "Consumed persisted entry and removed from disk.");
397    Ok(Some(deserialized))
398}
399
400fn generate_timestamped_filename() -> (PathBuf, u128) {
401    let now = Utc::now();
402    let now_ts = datetime_to_timestamp(now);
403    let nonce = rand::rng().random_range(100000000..999999999);
404
405    let filename = format!("retry-{}-{}.json", now.format("%Y%m%d%H%M%S%f"), nonce).into();
406
407    (filename, now_ts)
408}
409
410fn decode_timestamped_filename(path: &Path) -> Option<u128> {
411    let filename = path.file_stem()?.to_str()?;
412    let mut filename_parts = filename.split('-');
413
414    let prefix = filename_parts.next()?;
415    let timestamp_str = filename_parts.next()?;
416    let nonce = filename_parts.next()?;
417
418    // Make sure the filename matches our expected format by first checking the prefix and nonce portions.
419    if prefix != "retry" || nonce.parse::<u64>().is_err() {
420        return None;
421    }
422
423    // Try and decode the timestamp portion.
424    NaiveDateTime::parse_from_str(timestamp_str, "%Y%m%d%H%M%S%f")
425        .map(|dt| datetime_to_timestamp(dt.and_utc()))
426        .ok()
427}
428
429fn datetime_to_timestamp(dt: DateTime<Utc>) -> u128 {
430    let secs = (dt.timestamp() as u128) * 1_000_000_000;
431    let ns = dt.timestamp_subsec_nanos() as u128;
432
433    secs + ns
434}
435
436async fn create_directory_recursive(path: PathBuf) -> Result<(), GenericError> {
437    let mut dir_builder = std::fs::DirBuilder::new();
438    dir_builder.recursive(true);
439
440    // When on Unix platforms, adjust the permissions of the directory to be RWX for the owner only, and nothing for
441    // group/world.
442    #[cfg(unix)]
443    {
444        use std::os::unix::fs::DirBuilderExt;
445        dir_builder.mode(0o700);
446    }
447
448    tokio::task::spawn_blocking(move || {
449        dir_builder
450            .create(&path)
451            .with_error_context(|| format!("Failed to create directory '{}'.", path.display()))
452    })
453    .await
454    .error_context("Failed to spawn directory creation blocking task.")?
455}
456
457#[cfg(test)]
458mod tests {
459    use rand::RngExt as _;
460    use rand_distr::Alphanumeric;
461    use serde::Deserialize;
462
463    use super::*;
464
465    #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
466    struct FakeData {
467        name: String,
468        value: u32,
469    }
470
471    impl FakeData {
472        fn random() -> Self {
473            Self {
474                name: rand::rng().sample_iter(&Alphanumeric).take(8).map(char::from).collect(),
475                value: rand::rng().random_range(0..100),
476            }
477        }
478    }
479
480    impl EventContainer for FakeData {
481        fn event_count(&self) -> u64 {
482            1
483        }
484    }
485
486    struct MockDiskUsageRetriever {}
487
488    impl DiskUsageRetriever for MockDiskUsageRetriever {
489        fn total_space(&self) -> Result<u64, GenericError> {
490            Ok(100)
491        }
492        fn available_space(&self) -> Result<u64, GenericError> {
493            Ok(100)
494        }
495    }
496
497    async fn files_in_dir(path: &Path) -> usize {
498        let mut file_count = 0;
499        let mut dir_reader = tokio::fs::read_dir(path).await.unwrap();
500        while let Some(entry) = dir_reader.next_entry().await.unwrap() {
501            if entry.metadata().await.unwrap().is_file() {
502                file_count += 1;
503            }
504        }
505        file_count
506    }
507
508    #[tokio::test]
509    async fn basic_push_pop() {
510        let data = FakeData::random();
511
512        // Create our temporary directory and point our persisted queue at it.
513        let temp_dir = tempfile::tempdir().expect("should not fail to create temporary directory");
514        let root_path = temp_dir.path().to_path_buf();
515
516        let mut persisted_queue = PersistedQueue::<FakeData>::from_root_path(
517            root_path.clone(),
518            1024,
519            0.8,
520            DiskUsageRetrieverWrapper::new(Arc::new(DiskUsageRetrieverImpl::new(root_path.clone()))),
521        )
522        .await
523        .expect("should not fail to create persisted queue");
524
525        // Ensure the directory is empty.
526        assert_eq!(0, files_in_dir(&root_path).await);
527
528        // Push our data to the queue and ensure it persisted it to disk.
529        let push_result = persisted_queue
530            .push(data.clone())
531            .await
532            .expect("should not fail to push data");
533        assert_eq!(1, files_in_dir(&root_path).await);
534        assert_eq!(0, push_result.items_dropped);
535        assert_eq!(0, push_result.events_dropped);
536
537        // Now pop the data back out and ensure it matches what we pushed, and that the file has been removed from disk.
538        let actual = persisted_queue
539            .pop()
540            .await
541            .expect("should not fail to pop data")
542            .expect("should not be empty");
543        assert_eq!(data, actual);
544        assert_eq!(0, files_in_dir(&root_path).await);
545    }
546
547    #[tokio::test]
548    async fn entry_too_large() {
549        let data = FakeData::random();
550
551        // Create our temporary directory and point our persisted queue at it.
552        let temp_dir = tempfile::tempdir().expect("should not fail to create temporary directory");
553        let root_path = temp_dir.path().to_path_buf();
554
555        let mut persisted_queue = PersistedQueue::<FakeData>::from_root_path(
556            root_path.clone(),
557            1,
558            0.8,
559            DiskUsageRetrieverWrapper::new(Arc::new(DiskUsageRetrieverImpl::new(root_path.clone()))),
560        )
561        .await
562        .expect("should not fail to create persisted queue");
563
564        // Ensure the directory is empty.
565        assert_eq!(0, files_in_dir(&root_path).await);
566
567        // Attempt to push our data into the queue, which should fail because it's too large.
568        assert!(persisted_queue.push(data).await.is_err());
569
570        // Ensure the directory is (still) empty.
571        assert_eq!(0, files_in_dir(&root_path).await);
572    }
573
574    #[tokio::test]
575    async fn remove_oldest_entry_on_push() {
576        let data1 = FakeData::random();
577        let data2 = FakeData::random();
578
579        // Create our temporary directory and point our persisted queue at it.
580        //
581        // Our queue is sized such that only one entry can be persisted at a time.
582        let temp_dir = tempfile::tempdir().expect("should not fail to create temporary directory");
583        let root_path = temp_dir.path().to_path_buf();
584
585        let mut persisted_queue = PersistedQueue::<FakeData>::from_root_path(
586            root_path.clone(),
587            32,
588            0.8,
589            DiskUsageRetrieverWrapper::new(Arc::new(DiskUsageRetrieverImpl::new(root_path.clone()))),
590        )
591        .await
592        .expect("should not fail to create persisted queue");
593
594        // Ensure the directory is empty.
595        assert_eq!(0, files_in_dir(&root_path).await);
596
597        // Push our data to the queue and ensure it persisted it to disk.
598        let push_result = persisted_queue.push(data1).await.expect("should not fail to push data");
599        assert_eq!(1, files_in_dir(&root_path).await);
600        assert_eq!(0, push_result.items_dropped);
601        assert_eq!(0, push_result.events_dropped);
602
603        // Push a second data entry, which should cause the first entry to be removed.
604        let push_result = persisted_queue
605            .push(data2.clone())
606            .await
607            .expect("should not fail to push data");
608        assert_eq!(1, files_in_dir(&root_path).await);
609        assert_eq!(1, push_result.items_dropped);
610        assert_eq!(1, push_result.events_dropped);
611
612        // Now pop the data back out and ensure it matches the second item we pushed -- indicating the first item was
613        // removed -- and that we've consumed it, leaving no files on disk.
614        let actual = persisted_queue
615            .pop()
616            .await
617            .expect("should not fail to pop data")
618            .expect("should not be empty");
619        assert_eq!(data2, actual);
620        assert_eq!(0, files_in_dir(&root_path).await);
621    }
622
623    #[tokio::test]
624    async fn storage_ratio_exceeded() {
625        let data1 = FakeData::random();
626        let data2 = FakeData::random();
627
628        // Create our temporary directory and point our persisted queue at it.
629        //
630        // Our queue is sized such that two entries can be persisted at a time.
631        let temp_dir = tempfile::tempdir().expect("should not fail to create temporary directory");
632        let root_path = temp_dir.path().to_path_buf();
633
634        let mut persisted_queue = PersistedQueue::<FakeData>::from_root_path(
635            root_path.clone(),
636            80,
637            0.35,
638            DiskUsageRetrieverWrapper::new(Arc::new(MockDiskUsageRetriever {})),
639        )
640        .await
641        .expect("should not fail to create persisted queue");
642
643        // Ensure the directory is empty.
644        assert_eq!(0, files_in_dir(&root_path).await);
645
646        // The `storage_max_disk_ratio` is 0.35, and our `MockDiskUsageRetriever` returns 100 for both `total_space` and
647        // `available_space`, so `on_disk_bytes_limit()` returns min(80, 35) = 35.
648        //
649        // First entry: total_on_disk_bytes(0) + required_bytes(30) < on_disk_bytes_limit(35)
650        let push_result = persisted_queue.push(data1).await.expect("should not fail to push data");
651
652        assert_eq!(1, files_in_dir(&root_path).await);
653        assert_eq!(0, push_result.items_dropped);
654        assert_eq!(0, push_result.events_dropped);
655
656        // Second entry: total_on_disk_bytes(30) + required_bytes(30) > on_disk_bytes_limit(35) so the first entry is dropped.
657        let push_result = persisted_queue
658            .push(data2.clone())
659            .await
660            .expect("should not fail to push data");
661        assert_eq!(1, files_in_dir(&root_path).await);
662        assert_eq!(1, push_result.items_dropped);
663        assert_eq!(1, push_result.events_dropped);
664
665        // Now pop the data back out and ensure it matches the second item we pushed -- indicating the first item was
666        // removed -- and that we've consumed it, leaving no files on disk.
667        let actual = persisted_queue
668            .pop()
669            .await
670            .expect("should not fail to pop data")
671            .expect("should not be empty");
672        assert_eq!(data2, actual);
673        assert_eq!(0, files_in_dir(&root_path).await);
674    }
675
676    /// Writes a corrupt (non-JSON) file with a valid retry filename to the given directory, using a timestamp
677    /// that sorts before any real entries (so it will be popped first).
678    async fn write_corrupt_entry(dir: &Path) -> PathBuf {
679        let filename = "retry-20000101000000000000-100000000.json";
680        let path = dir.join(filename);
681        tokio::fs::write(&path, b"this is not valid json").await.unwrap();
682        path
683    }
684
685    #[tokio::test]
686    async fn corrupt_entry_is_skipped_on_pop() {
687        let data = FakeData::random();
688
689        let temp_dir = tempfile::tempdir().expect("should not fail to create temporary directory");
690        let root_path = temp_dir.path().to_path_buf();
691
692        let mut persisted_queue = PersistedQueue::<FakeData>::from_root_path(
693            root_path.clone(),
694            1024,
695            0.8,
696            DiskUsageRetrieverWrapper::new(Arc::new(MockDiskUsageRetriever {})),
697        )
698        .await
699        .expect("should not fail to create persisted queue");
700
701        // Write a corrupt file before pushing valid data, so it sorts first.
702        let corrupt_path = write_corrupt_entry(&root_path).await;
703
704        // Push a valid entry.
705        let _ = persisted_queue
706            .push(data.clone())
707            .await
708            .expect("should not fail to push data");
709
710        // Refresh state so the queue picks up the corrupt file.
711        persisted_queue.refresh_entry_state().await.unwrap();
712
713        // Pop should skip the corrupt entry and return the valid one.
714        let actual = persisted_queue
715            .pop()
716            .await
717            .expect("should not fail to pop data")
718            .expect("should have a valid entry");
719        assert_eq!(data, actual);
720
721        // The corrupt file should have been cleaned up from disk.
722        assert!(!corrupt_path.exists());
723
724        // The dropped counter should reflect the corrupt entry.
725        assert_eq!(1, persisted_queue.take_entries_dropped());
726
727        // No files should remain.
728        assert_eq!(0, files_in_dir(&root_path).await);
729    }
730
731    #[tokio::test]
732    async fn corrupt_entry_does_not_block_queue() {
733        let data1 = FakeData::random();
734        let data2 = FakeData::random();
735
736        let temp_dir = tempfile::tempdir().expect("should not fail to create temporary directory");
737        let root_path = temp_dir.path().to_path_buf();
738
739        // Use MockDiskUsageRetriever to avoid disk space ratio causing eviction during push.
740        let mut persisted_queue = PersistedQueue::<FakeData>::from_root_path(
741            root_path.clone(),
742            1024,
743            0.8,
744            DiskUsageRetrieverWrapper::new(Arc::new(MockDiskUsageRetriever {})),
745        )
746        .await
747        .expect("should not fail to create persisted queue");
748
749        // Push two valid entries, then corrupt the first one on disk.
750        let _ = persisted_queue.push(data1).await.expect("should not fail to push data");
751        let _ = persisted_queue
752            .push(data2.clone())
753            .await
754            .expect("should not fail to push data");
755        assert_eq!(2, persisted_queue.entries.len());
756
757        // Corrupt the oldest entry file on disk.
758        let oldest_path = persisted_queue.entries[0].path.clone();
759        tokio::fs::write(&oldest_path, b"corrupted").await.unwrap();
760
761        // Pop should skip the corrupt entry and return the second valid one.
762        let actual = persisted_queue
763            .pop()
764            .await
765            .expect("should not fail to pop data")
766            .expect("should have a valid entry");
767        assert_eq!(data2, actual);
768
769        assert_eq!(1, persisted_queue.take_entries_dropped());
770        assert_eq!(0, files_in_dir(&root_path).await);
771    }
772
773    #[tokio::test]
774    async fn pop_returns_none_when_all_entries_corrupt() {
775        let temp_dir = tempfile::tempdir().expect("should not fail to create temporary directory");
776        let root_path = temp_dir.path().to_path_buf();
777
778        let mut persisted_queue = PersistedQueue::<FakeData>::from_root_path(
779            root_path.clone(),
780            1024,
781            0.8,
782            DiskUsageRetrieverWrapper::new(Arc::new(MockDiskUsageRetriever {})),
783        )
784        .await
785        .expect("should not fail to create persisted queue");
786
787        // Write a corrupt entry and refresh state.
788        write_corrupt_entry(&root_path).await;
789        persisted_queue.refresh_entry_state().await.unwrap();
790
791        // Pop should skip the corrupt entry and return None (no valid entries).
792        let result = persisted_queue.pop().await.expect("should not fail to pop data");
793        assert!(result.is_none());
794
795        assert_eq!(1, persisted_queue.take_entries_dropped());
796        assert_eq!(0, files_in_dir(&root_path).await);
797    }
798
799    #[tokio::test]
800    async fn corrupt_entry_dropped_during_eviction() {
801        let data = FakeData::random();
802
803        let temp_dir = tempfile::tempdir().expect("should not fail to create temporary directory");
804        let root_path = temp_dir.path().to_path_buf();
805
806        // Queue sized to hold only one entry.
807        let mut persisted_queue = PersistedQueue::<FakeData>::from_root_path(
808            root_path.clone(),
809            32,
810            0.8,
811            DiskUsageRetrieverWrapper::new(Arc::new(MockDiskUsageRetriever {})),
812        )
813        .await
814        .expect("should not fail to create persisted queue");
815
816        // Push a valid entry, then corrupt it on disk.
817        let _ = persisted_queue
818            .push(FakeData::random())
819            .await
820            .expect("should not fail to push data");
821        let first_path = persisted_queue.entries[0].path.clone();
822        tokio::fs::write(&first_path, b"corrupted").await.unwrap();
823
824        // Push another entry, which needs to evict the first (corrupt) one to make space.
825        // This should succeed without error -- the corrupt entry is dropped during eviction.
826        let _ = persisted_queue
827            .push(data.clone())
828            .await
829            .expect("should not fail to push data");
830
831        // The corrupt entry was dropped during eviction, not via normal eviction tracking.
832        assert_eq!(1, persisted_queue.take_entries_dropped());
833
834        // The valid entry should be poppable.
835        let actual = persisted_queue
836            .pop()
837            .await
838            .expect("should not fail to pop data")
839            .expect("should have a valid entry");
840        assert_eq!(data, actual);
841        assert_eq!(0, files_in_dir(&root_path).await);
842    }
843}