Skip to main content

saluki_io/net/util/retry/queue/
persisted.rs

1use std::{
2    io,
3    marker::PhantomData,
4    path::{Path, PathBuf},
5    sync::Arc,
6};
7
8use chrono::{DateTime, NaiveDateTime, Utc};
9use fs4::{available_space, total_space};
10use rand::RngExt as _;
11use saluki_error::{generic_error, ErrorContext as _, GenericError};
12use serde::{de::DeserializeOwned, Serialize};
13use tracing::{debug, info, warn};
14
15use super::{EventContainer, PushResult};
16
17/// A persisted entry.
18///
19/// Represents the high-level metadata of a persisted entry, including the path to and size of the entry.
20struct PersistedEntry {
21    path: PathBuf,
22    timestamp: u128,
23    size_bytes: u64,
24}
25
26impl PersistedEntry {
27    /// Attempts to create a `PersistedEntry` from the given path.
28    ///
29    /// If the given path isn't recognized as the path to a valid persisted entry, `None` is returned.
30    fn try_from_path(path: PathBuf, size_bytes: u64) -> Option<Self> {
31        let timestamp = decode_timestamped_filename(&path)?;
32        Some(Self {
33            path,
34            timestamp,
35            size_bytes,
36        })
37    }
38
39    fn from_parts(path: PathBuf, timestamp: u128, size_bytes: u64) -> Self {
40        Self {
41            path,
42            timestamp,
43            size_bytes,
44        }
45    }
46}
47
48pub trait DiskUsageRetriever {
49    fn total_space(&self) -> Result<u64, GenericError>;
50    fn available_space(&self) -> Result<u64, GenericError>;
51}
52
53pub struct DiskUsageRetrieverImpl {
54    root_path: PathBuf,
55}
56
57impl DiskUsageRetrieverImpl {
58    pub fn new(root_path: PathBuf) -> Self {
59        Self { root_path }
60    }
61}
62
63impl DiskUsageRetriever for DiskUsageRetrieverImpl {
64    fn total_space(&self) -> Result<u64, GenericError> {
65        total_space(&self.root_path)
66            .with_error_context(|| format!("Failed to get total space for '{}'.", self.root_path.display()))
67    }
68
69    fn available_space(&self) -> Result<u64, GenericError> {
70        available_space(&self.root_path)
71            .with_error_context(|| format!("Failed to get available space for '{}'.", self.root_path.display()))
72    }
73}
74
75#[derive(Clone)]
76pub struct DiskUsageRetrieverWrapper {
77    inner: Arc<dyn DiskUsageRetriever + Send + Sync>,
78}
79
80impl DiskUsageRetrieverWrapper {
81    pub fn new(disk_usage_retriever: Arc<dyn DiskUsageRetriever + Send + Sync>) -> Self {
82        Self {
83            inner: disk_usage_retriever,
84        }
85    }
86}
87
88/// Arguments for constructing a persisted retry queue.
89pub struct PersistedQueueArgs {
90    /// Root path under which the queue directory is created.
91    pub root_path: PathBuf,
92    /// Maximum total bytes the queue may occupy on disk.
93    pub max_on_disk_bytes: u64,
94    /// Maximum fraction of the disk that may be used before writes stop.
95    pub storage_max_disk_ratio: f64,
96    /// Provider for total- and available-disk-space queries.
97    pub disk_usage_retriever: Arc<dyn DiskUsageRetriever + Send + Sync>,
98    /// Maximum age of retry files in days; files older than this are removed on startup.
99    ///
100    /// Setting this to `0` removes all retry files (cutoff = now), matching the behavior of the
101    /// core Agent's `FileRemovalPolicy` with `outdatedFileDayCount = 0`.
102    pub max_age_days: u32,
103}
104
105pub struct PersistedQueue<T> {
106    root_path: PathBuf,
107    entries: Vec<PersistedEntry>,
108    total_on_disk_bytes: u64,
109    max_on_disk_bytes: u64,
110    storage_max_disk_ratio: f64,
111    disk_usage_retriever: DiskUsageRetrieverWrapper,
112    max_age_days: u32,
113    entries_dropped: u64,
114    _entry: PhantomData<T>,
115}
116
117impl<T> PersistedQueue<T>
118where
119    T: EventContainer + DeserializeOwned + Serialize,
120{
121    /// Creates a new `PersistedQueue` instance from the given arguments.
122    ///
123    /// The root path is created if it doesn't already exist, and is scanned for existing persisted entries. Entries
124    /// are removed (oldest first) until the total size of all scanned entries is within the given maximum size.
125    ///
126    /// To remove stale retry files on startup, call [`remove_stale_files`][Self::remove_stale_files] after construction.
127    ///
128    /// # Errors
129    ///
130    /// If there is an error creating the root directory, or scanning it for existing entries, or deleting entries to
131    /// shrink the directory to fit the given maximum size, an error is returned.
132    pub async fn from_root_path(args: PersistedQueueArgs) -> Result<Self, GenericError> {
133        let PersistedQueueArgs {
134            root_path,
135            max_on_disk_bytes,
136            storage_max_disk_ratio,
137            disk_usage_retriever,
138            max_age_days,
139        } = args;
140
141        // Make sure the directory exists first.
142        create_directory_recursive(root_path.clone())
143            .await
144            .with_error_context(|| format!("Failed to create retry directory '{}'.", root_path.display()))?;
145
146        let mut persisted_requests = Self {
147            root_path: root_path.clone(),
148            entries: Vec::new(),
149            total_on_disk_bytes: 0,
150            max_on_disk_bytes,
151            storage_max_disk_ratio,
152            disk_usage_retriever: DiskUsageRetrieverWrapper::new(disk_usage_retriever),
153            max_age_days,
154            entries_dropped: 0,
155            _entry: PhantomData,
156        };
157
158        persisted_requests.refresh_entry_state().await?;
159
160        info!(
161            "Persisted retry queue initialized. Transactions will be stored in '{}'.",
162            root_path.display()
163        );
164
165        Ok(persisted_requests)
166    }
167
168    /// Returns `true` if the queue is empty.
169    pub fn is_empty(&self) -> bool {
170        self.entries.is_empty()
171    }
172
173    /// Returns the number of entries in the queue.
174    pub fn len(&self) -> usize {
175        self.entries.len()
176    }
177
178    /// Returns the available on-disk capacity, in bytes.
179    ///
180    /// This reflects the lower of the configured queue limit and disk-usage-ratio limit, minus the bytes currently used
181    /// by persisted entries.
182    ///
183    /// # Errors
184    ///
185    /// If there is an error while retrieving the total or available space of the underlying volume, an error is returned.
186    pub async fn available_capacity_bytes(&self) -> Result<u64, GenericError> {
187        let disk_usage_retriever = self.disk_usage_retriever.clone();
188        let storage_max_disk_ratio = self.storage_max_disk_ratio;
189        let max_on_disk_bytes = self.max_on_disk_bytes;
190
191        let limit = tokio::task::spawn_blocking(move || {
192            on_disk_bytes_limit(disk_usage_retriever, storage_max_disk_ratio, max_on_disk_bytes)
193        })
194        .await
195        .error_context("Failed to run disk size limit check to completion.")??;
196
197        Ok(limit.saturating_sub(self.total_on_disk_bytes))
198    }
199
200    /// Returns the number of entries that have been permanently dropped due to errors since the last call to this
201    /// method, resetting the counter.
202    pub fn take_entries_dropped(&mut self) -> u64 {
203        std::mem::take(&mut self.entries_dropped)
204    }
205
206    /// Removes retry files older than `max_age_days` (from [`PersistedQueueArgs`]) from the queue directory and
207    /// reloads entry state.
208    ///
209    /// # Errors
210    ///
211    /// Returns an error if the queue directory cannot be opened or scanned. Individual file removal failures are
212    /// logged as warnings and do not stop the cleanup.
213    pub async fn remove_stale_files(&mut self) -> Result<u32, GenericError> {
214        let removed = remove_outdated_retry_files(&self.root_path, self.max_age_days).await?;
215        self.refresh_entry_state().await.map_err(GenericError::from)?;
216        Ok(removed)
217    }
218
219    /// Enqueues an entry and persists it to disk.
220    ///
221    /// # Errors
222    ///
223    /// If there is an error serializing the entry, or writing it to disk, or removing older entries to make space for
224    /// the new entry, an error is returned.
225    pub async fn push(&mut self, entry: T) -> Result<PushResult, GenericError> {
226        // Serialize the entry to a temporary file.
227        let (filename, timestamp) = generate_timestamped_filename();
228        let entry_path = self.root_path.join(filename);
229        let serialized = serde_json::to_vec(&entry)
230            .with_error_context(|| format!("Failed to serialize entry for '{}'.", entry_path.display()))?;
231
232        if serialized.len() as u64 > self.max_on_disk_bytes {
233            return Err(generic_error!("Entry is too large to persist."));
234        }
235
236        // Make sure we have enough space to persist the entry.
237        let push_result = self
238            .remove_until_available_space(serialized.len() as u64)
239            .await
240            .error_context(
241                "Failed to remove older persisted entries to make space for the incoming persisted entry.",
242            )?;
243
244        // Actually persist it.
245        tokio::fs::write(&entry_path, &serialized)
246            .await
247            .with_error_context(|| format!("Failed to write entry to '{}'.", entry_path.display()))?;
248
249        // Add a new persisted entry to our state.
250        self.entries.push(PersistedEntry::from_parts(
251            entry_path,
252            timestamp,
253            serialized.len() as u64,
254        ));
255        self.total_on_disk_bytes += serialized.len() as u64;
256
257        debug!(entry.len = serialized.len(), "Enqueued persisted entry.");
258
259        Ok(push_result)
260    }
261
262    /// Consumes the oldest persisted entry on disk, if one exists.
263    ///
264    /// # Errors
265    ///
266    /// If there is an error reading or deserializing the entry, an error is returned.
267    pub async fn pop(&mut self) -> Result<Option<T>, GenericError> {
268        loop {
269            if self.entries.is_empty() {
270                return Ok(None);
271            }
272
273            let entry = self.entries.remove(0);
274            match try_deserialize_entry(&entry).await {
275                Ok(Some(deserialized)) => {
276                    // We got the deserialized entry, so remove it from our state and return it.
277                    self.total_on_disk_bytes -= entry.size_bytes;
278                    debug!(entry.len = entry.size_bytes, "Dequeued persisted entry.");
279
280                    return Ok(Some(deserialized));
281                }
282                Ok(None) => {
283                    // We couldn't read the entry from disk, which points to us potentially having invalid state about
284                    // what entries _are_ on disk, so we'll refresh our entry state and try again.
285                    self.refresh_entry_state().await?;
286                    continue;
287                }
288                Err(e) => {
289                    // The entry is corrupt or unreadable. Drop it permanently to avoid a poison pill scenario
290                    // where the same entry is retried indefinitely, blocking all other work.
291                    warn!(
292                        entry.path = %entry.path.display(),
293                        entry.len = entry.size_bytes,
294                        error = %e,
295                        "Permanently dropping persisted entry that could not be consumed.",
296                    );
297
298                    self.total_on_disk_bytes -= entry.size_bytes;
299                    self.entries_dropped += 1;
300                    continue;
301                }
302            }
303        }
304    }
305
306    async fn refresh_entry_state(&mut self) -> io::Result<()> {
307        // Scan the root path for persisted entries.
308        let mut entries = Vec::new();
309
310        let mut dir_reader = tokio::fs::read_dir(&self.root_path).await?;
311        while let Some(entry) = dir_reader.next_entry().await? {
312            let metadata = entry.metadata().await?;
313            if metadata.is_file() {
314                match PersistedEntry::try_from_path(entry.path(), metadata.len()) {
315                    Some(entry) => entries.push(entry),
316                    None => {
317                        warn!(
318                            file_size = metadata.len(),
319                            "Ignoring unrecognized file '{}' in retry directory.",
320                            entry.path().display()
321                        );
322                        continue;
323                    }
324                }
325            }
326        }
327
328        // Sort the entries by their inherent timestamp.
329        entries.sort_by_key(|entry| entry.timestamp);
330        self.total_on_disk_bytes = entries.iter().map(|entry| entry.size_bytes).sum();
331        self.entries = entries;
332
333        Ok(())
334    }
335
336    /// Removes persisted entries (oldest first) until there is at least the required number of bytes in free space
337    /// (maximum - total).
338    ///
339    /// # Errors
340    ///
341    /// If there is an error while deleting persisted entries, an error is returned.
342    async fn remove_until_available_space(&mut self, required_bytes: u64) -> Result<PushResult, GenericError> {
343        let mut push_result = PushResult::default();
344
345        let disk_usage_retriever = self.disk_usage_retriever.clone();
346        let storage_max_disk_ratio = self.storage_max_disk_ratio;
347        let max_on_disk_bytes = self.max_on_disk_bytes;
348
349        // TODO: Evaluate the possible failures scenarios a little more thoroughly, and see if we can improve
350        // how we handle them instead of just bailing out.
351        //
352        // Essentially, it's not clear to me if we would expect this to fail in a way where we could actually
353        // still write the persistent entries to disk, and if it's worth it to do something like trying to
354        // cache the last known good value we get here to use if we fail to get a new value, etc.
355        let limit = tokio::task::spawn_blocking(move || {
356            on_disk_bytes_limit(disk_usage_retriever, storage_max_disk_ratio, max_on_disk_bytes)
357        })
358        .await
359        .error_context("Failed to run disk size limit check to completion.")??;
360
361        while !self.entries.is_empty() && self.total_on_disk_bytes + required_bytes > limit {
362            let entry = self.entries.remove(0);
363
364            // Deserialize the entry, which gives us back the original event and removes the file from disk.
365            let deserialized = match try_deserialize_entry::<T>(&entry).await {
366                Ok(Some(deserialized)) => deserialized,
367                Ok(None) => {
368                    warn!(entry.path = %entry.path.display(), "Failed to find entry on disk. Persisted entry state may be inconsistent.");
369                    continue;
370                }
371                Err(e) => {
372                    // The entry is corrupt or unreadable. Drop it permanently to avoid blocking subsequent
373                    // entries from being evicted.
374                    warn!(
375                        entry.path = %entry.path.display(),
376                        entry.len = entry.size_bytes,
377                        error = %e,
378                        "Permanently dropping persisted entry that could not be consumed during eviction.",
379                    );
380
381                    self.total_on_disk_bytes -= entry.size_bytes;
382                    self.entries_dropped += 1;
383                    continue;
384                }
385            };
386
387            // Update our statistics.
388            self.total_on_disk_bytes -= entry.size_bytes;
389            push_result.track_dropped_item(&deserialized);
390
391            warn!(entry.path = %entry.path.display(), entry.len = entry.size_bytes, "Dropped persisted entry.");
392        }
393
394        Ok(push_result)
395    }
396}
397
398/// Determines the total number of bytes that can be written to disk without causing the underlying volume to end up
399/// with more than `storage_max_disk_ratio` in terms of used space. The minimum of `max_on_disk_bytes` and the result
400/// of this calculation is returned.
401///
402/// # Errors
403///
404/// If there is an error while retrieving the total or available space of the underlying volume, an error is returned.
405fn on_disk_bytes_limit(
406    disk_usage_retriever: DiskUsageRetrieverWrapper, storage_max_disk_ratio: f64, max_on_disk_bytes: u64,
407) -> Result<u64, GenericError> {
408    let total_space = disk_usage_retriever.inner.total_space()? as f64;
409    let available_space = disk_usage_retriever.inner.available_space()? as f64;
410    let disk_reserved = total_space * (1.0 - storage_max_disk_ratio);
411    let available_disk_usage = (available_space - disk_reserved).ceil() as u64;
412    Ok(max_on_disk_bytes.min(available_disk_usage))
413}
414
415async fn try_deserialize_entry<T: DeserializeOwned>(entry: &PersistedEntry) -> Result<Option<T>, GenericError> {
416    let serialized = match tokio::fs::read(&entry.path).await {
417        Ok(serialized) => serialized,
418        Err(e) => match e.kind() {
419            io::ErrorKind::NotFound => {
420                // We tried to delete an entry that no longer exists on disk, which means our internal entry state
421                // is corrupted for some reason.
422                //
423                // Tell the caller that we couldn't find the entry on disk, so that they need to refresh the entry state
424                // to make sure it's up-to-date before trying again.
425                return Ok(None);
426            }
427            _ => {
428                return Err(e)
429                    .with_error_context(|| format!("Failed to read persisted entry '{}'.", entry.path.display()))
430            }
431        },
432    };
433
434    let deserialized = match serde_json::from_slice(&serialized) {
435        Ok(deserialized) => deserialized,
436        Err(e) => {
437            // Deserialization failed, which means the payload is corrupt or invalid. Attempt to clean up the
438            // file from disk so it doesn't accumulate, but don't fail if we can't.
439            if let Err(remove_err) = tokio::fs::remove_file(&entry.path).await {
440                warn!(
441                    entry.path = %entry.path.display(),
442                    error = %remove_err,
443                    "Failed to remove corrupt persisted entry from disk.",
444                );
445            }
446
447            return Err(e)
448                .with_error_context(|| format!("Failed to deserialize persisted entry '{}'.", entry.path.display()));
449        }
450    };
451
452    // Delete the entry from disk before returning, so that we don't risk sending duplicates.
453    tokio::fs::remove_file(&entry.path)
454        .await
455        .with_error_context(|| format!("Failed to delete persisted entry '{}'.", entry.path.display()))?;
456
457    debug!(entry.path = %entry.path.display(), entry.len = entry.size_bytes, "Consumed persisted entry and removed from disk.");
458    Ok(Some(deserialized))
459}
460
461fn generate_timestamped_filename() -> (PathBuf, u128) {
462    let now = Utc::now();
463    let now_ts = datetime_to_timestamp(now);
464    let nonce = rand::rng().random_range(100000000..999999999);
465
466    let filename = format!("retry-{}-{}.json", now.format("%Y%m%d%H%M%S%f"), nonce).into();
467
468    (filename, now_ts)
469}
470
471fn decode_timestamped_filename(path: &Path) -> Option<u128> {
472    let filename = path.file_stem()?.to_str()?;
473    let mut filename_parts = filename.split('-');
474
475    let prefix = filename_parts.next()?;
476    let timestamp_str = filename_parts.next()?;
477    let nonce = filename_parts.next()?;
478
479    // Make sure the filename matches our expected format by first checking the prefix and nonce portions.
480    if prefix != "retry" || nonce.parse::<u64>().is_err() {
481        return None;
482    }
483
484    // Try and decode the timestamp portion.
485    NaiveDateTime::parse_from_str(timestamp_str, "%Y%m%d%H%M%S%f")
486        .map(|dt| datetime_to_timestamp(dt.and_utc()))
487        .ok()
488}
489
490fn datetime_to_timestamp(dt: DateTime<Utc>) -> u128 {
491    let secs = (dt.timestamp() as u128) * 1_000_000_000;
492    let ns = dt.timestamp_subsec_nanos() as u128;
493
494    secs + ns
495}
496
497async fn create_directory_recursive(path: PathBuf) -> Result<(), GenericError> {
498    let mut dir_builder = std::fs::DirBuilder::new();
499    dir_builder.recursive(true);
500
501    // When on Unix platforms, adjust the permissions of the directory to be RWX for the owner only, and nothing for
502    // group/world.
503    #[cfg(unix)]
504    {
505        use std::os::unix::fs::DirBuilderExt;
506        dir_builder.mode(0o700);
507    }
508
509    tokio::task::spawn_blocking(move || {
510        dir_builder
511            .create(&path)
512            .with_error_context(|| format!("Failed to create directory '{}'.", path.display()))
513    })
514    .await
515    .error_context("Failed to spawn directory creation blocking task.")?
516}
517
518/// Deletes files in `queue_path` whose filename-embedded creation timestamp is older than
519/// `max_age_days`. Does nothing if the directory does not exist.
520///
521/// Setting `max_age_days` to `0` deletes all retry files (cutoff = now), matching the behavior
522/// of the core Agent's `FileRemovalPolicy` with `outdatedFileDayCount = 0`.
523async fn remove_outdated_retry_files(queue_path: &Path, max_age_days: u32) -> Result<u32, GenericError> {
524    let mut dir = match tokio::fs::read_dir(queue_path).await {
525        Ok(d) => d,
526        Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(0),
527        Err(e) => {
528            return Err(e).with_error_context(|| {
529                format!(
530                    "Failed to open retry queue directory '{}' for age-based cleanup.",
531                    queue_path.display()
532                )
533            });
534        }
535    };
536    let now_ns = std::time::SystemTime::now()
537        .duration_since(std::time::UNIX_EPOCH)
538        .unwrap_or_default() // clock before epoch: treat cutoff as 0, skipping all deletions
539        .as_nanos();
540    let cutoff_ns = now_ns.saturating_sub(max_age_days as u128 * 24 * 3600 * 1_000_000_000);
541    let mut removed = 0u32;
542    loop {
543        let entry = match dir.next_entry().await {
544            Ok(Some(e)) => e,
545            Ok(None) => break,
546            Err(e) => {
547                return Err(e).with_error_context(|| "Error reading retry queue directory during age-based cleanup.");
548            }
549        };
550        let file_ts = match decode_timestamped_filename(&entry.path()) {
551            Some(ts) => ts,
552            None => continue,
553        };
554        if file_ts < cutoff_ns {
555            let name_str = entry.file_name();
556            let name = name_str.to_string_lossy();
557            match tokio::fs::remove_file(entry.path()).await {
558                Ok(()) => {
559                    debug!(file = %name, "Removed outdated retry file.");
560                    removed += 1;
561                }
562                Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
563                    debug!(file = %name, "Retry file already removed by concurrent cleanup.");
564                }
565                Err(e) => {
566                    warn!(file = %name, error = %e, "Failed to remove outdated retry file.");
567                }
568            }
569        }
570    }
571    Ok(removed)
572}
573
574#[cfg(test)]
575mod tests {
576    use rand::RngExt as _;
577    use rand_distr::Alphanumeric;
578    use serde::Deserialize;
579
580    use super::*;
581
582    #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
583    struct FakeData {
584        name: String,
585        value: u32,
586    }
587
588    impl FakeData {
589        fn random() -> Self {
590            Self {
591                name: rand::rng().sample_iter(&Alphanumeric).take(8).map(char::from).collect(),
592                value: rand::rng().random_range(0..100),
593            }
594        }
595    }
596
597    impl EventContainer for FakeData {
598        fn event_count(&self) -> u64 {
599            1
600        }
601    }
602
603    struct MockDiskUsageRetriever {}
604
605    impl DiskUsageRetriever for MockDiskUsageRetriever {
606        fn total_space(&self) -> Result<u64, GenericError> {
607            Ok(100)
608        }
609        fn available_space(&self) -> Result<u64, GenericError> {
610            Ok(100)
611        }
612    }
613
614    async fn files_in_dir(path: &Path) -> usize {
615        let mut file_count = 0;
616        let mut dir_reader = tokio::fs::read_dir(path).await.unwrap();
617        while let Some(entry) = dir_reader.next_entry().await.unwrap() {
618            if entry.metadata().await.unwrap().is_file() {
619                file_count += 1;
620            }
621        }
622        file_count
623    }
624
625    #[tokio::test]
626    async fn basic_push_pop() {
627        let data = FakeData::random();
628
629        // Create our temporary directory and point our persisted queue at it.
630        let temp_dir = tempfile::tempdir().expect("should not fail to create temporary directory");
631        let root_path = temp_dir.path().to_path_buf();
632
633        let mut persisted_queue = PersistedQueue::<FakeData>::from_root_path(PersistedQueueArgs {
634            root_path: root_path.clone(),
635            max_on_disk_bytes: 1024,
636            storage_max_disk_ratio: 0.8,
637            disk_usage_retriever: Arc::new(DiskUsageRetrieverImpl::new(root_path.clone())),
638            max_age_days: 10,
639        })
640        .await
641        .expect("should not fail to create persisted queue");
642
643        // Ensure the directory is empty.
644        assert_eq!(0, files_in_dir(&root_path).await);
645
646        // Push our data to the queue and ensure it persisted it to disk.
647        let push_result = persisted_queue
648            .push(data.clone())
649            .await
650            .expect("should not fail to push data");
651        assert_eq!(1, files_in_dir(&root_path).await);
652        assert_eq!(0, push_result.items_dropped);
653        assert_eq!(0, push_result.events_dropped);
654
655        // Now pop the data back out and ensure it matches what we pushed, and that the file has been removed from disk.
656        let actual = persisted_queue
657            .pop()
658            .await
659            .expect("should not fail to pop data")
660            .expect("should not be empty");
661        assert_eq!(data, actual);
662        assert_eq!(0, files_in_dir(&root_path).await);
663    }
664
665    #[tokio::test]
666    async fn entry_too_large() {
667        let data = FakeData::random();
668
669        // Create our temporary directory and point our persisted queue at it.
670        let temp_dir = tempfile::tempdir().expect("should not fail to create temporary directory");
671        let root_path = temp_dir.path().to_path_buf();
672
673        let mut persisted_queue = PersistedQueue::<FakeData>::from_root_path(PersistedQueueArgs {
674            root_path: root_path.clone(),
675            max_on_disk_bytes: 1,
676            storage_max_disk_ratio: 0.8,
677            disk_usage_retriever: Arc::new(DiskUsageRetrieverImpl::new(root_path.clone())),
678            max_age_days: 10,
679        })
680        .await
681        .expect("should not fail to create persisted queue");
682
683        // Ensure the directory is empty.
684        assert_eq!(0, files_in_dir(&root_path).await);
685
686        // Attempt to push our data into the queue, which should fail because it's too large.
687        assert!(persisted_queue.push(data).await.is_err());
688
689        // Ensure the directory is (still) empty.
690        assert_eq!(0, files_in_dir(&root_path).await);
691    }
692
693    #[tokio::test]
694    async fn remove_oldest_entry_on_push() {
695        let data1 = FakeData::random();
696        let data2 = FakeData::random();
697
698        // Create our temporary directory and point our persisted queue at it.
699        //
700        // Our queue is sized such that only one entry can be persisted at a time.
701        let temp_dir = tempfile::tempdir().expect("should not fail to create temporary directory");
702        let root_path = temp_dir.path().to_path_buf();
703
704        let mut persisted_queue = PersistedQueue::<FakeData>::from_root_path(PersistedQueueArgs {
705            root_path: root_path.clone(),
706            max_on_disk_bytes: 32,
707            storage_max_disk_ratio: 0.8,
708            disk_usage_retriever: Arc::new(DiskUsageRetrieverImpl::new(root_path.clone())),
709            max_age_days: 10,
710        })
711        .await
712        .expect("should not fail to create persisted queue");
713
714        // Ensure the directory is empty.
715        assert_eq!(0, files_in_dir(&root_path).await);
716
717        // Push our data to the queue and ensure it persisted it to disk.
718        let push_result = persisted_queue.push(data1).await.expect("should not fail to push data");
719        assert_eq!(1, files_in_dir(&root_path).await);
720        assert_eq!(0, push_result.items_dropped);
721        assert_eq!(0, push_result.events_dropped);
722
723        // Push a second data entry, which should cause the first entry to be removed.
724        let push_result = persisted_queue
725            .push(data2.clone())
726            .await
727            .expect("should not fail to push data");
728        assert_eq!(1, files_in_dir(&root_path).await);
729        assert_eq!(1, push_result.items_dropped);
730        assert_eq!(1, push_result.events_dropped);
731
732        // Now pop the data back out and ensure it matches the second item we pushed -- indicating the first item was
733        // removed -- and that we've consumed it, leaving no files on disk.
734        let actual = persisted_queue
735            .pop()
736            .await
737            .expect("should not fail to pop data")
738            .expect("should not be empty");
739        assert_eq!(data2, actual);
740        assert_eq!(0, files_in_dir(&root_path).await);
741    }
742
743    #[tokio::test]
744    async fn storage_ratio_exceeded() {
745        let data1 = FakeData::random();
746        let data2 = FakeData::random();
747
748        // Create our temporary directory and point our persisted queue at it.
749        //
750        // Our queue is sized such that two entries can be persisted at a time.
751        let temp_dir = tempfile::tempdir().expect("should not fail to create temporary directory");
752        let root_path = temp_dir.path().to_path_buf();
753
754        let mut persisted_queue = PersistedQueue::<FakeData>::from_root_path(PersistedQueueArgs {
755            root_path: root_path.clone(),
756            max_on_disk_bytes: 80,
757            storage_max_disk_ratio: 0.35,
758            disk_usage_retriever: Arc::new(MockDiskUsageRetriever {}),
759            max_age_days: 10,
760        })
761        .await
762        .expect("should not fail to create persisted queue");
763
764        // Ensure the directory is empty.
765        assert_eq!(0, files_in_dir(&root_path).await);
766
767        // The `storage_max_disk_ratio` is 0.35, and our `MockDiskUsageRetriever` returns 100 for both `total_space` and
768        // `available_space`, so `on_disk_bytes_limit()` returns min(80, 35) = 35.
769        //
770        // First entry: total_on_disk_bytes(0) + required_bytes(30) < on_disk_bytes_limit(35)
771        let push_result = persisted_queue.push(data1).await.expect("should not fail to push data");
772
773        assert_eq!(1, files_in_dir(&root_path).await);
774        assert_eq!(0, push_result.items_dropped);
775        assert_eq!(0, push_result.events_dropped);
776
777        // Second entry: total_on_disk_bytes(30) + required_bytes(30) > on_disk_bytes_limit(35) so the first entry is dropped.
778        let push_result = persisted_queue
779            .push(data2.clone())
780            .await
781            .expect("should not fail to push data");
782        assert_eq!(1, files_in_dir(&root_path).await);
783        assert_eq!(1, push_result.items_dropped);
784        assert_eq!(1, push_result.events_dropped);
785
786        // Now pop the data back out and ensure it matches the second item we pushed -- indicating the first item was
787        // removed -- and that we've consumed it, leaving no files on disk.
788        let actual = persisted_queue
789            .pop()
790            .await
791            .expect("should not fail to pop data")
792            .expect("should not be empty");
793        assert_eq!(data2, actual);
794        assert_eq!(0, files_in_dir(&root_path).await);
795    }
796
797    /// Writes a corrupt (non-JSON) file with a valid retry filename to the given directory, using a timestamp
798    /// that sorts before any real entries (so it will be popped first).
799    async fn write_corrupt_entry(dir: &Path) -> PathBuf {
800        let filename = "retry-20000101000000000000-100000000.json";
801        let path = dir.join(filename);
802        tokio::fs::write(&path, b"this is not valid json").await.unwrap();
803        path
804    }
805
806    #[tokio::test]
807    async fn corrupt_entry_is_skipped_on_pop() {
808        let data = FakeData::random();
809
810        let temp_dir = tempfile::tempdir().expect("should not fail to create temporary directory");
811        let root_path = temp_dir.path().to_path_buf();
812
813        let mut persisted_queue = PersistedQueue::<FakeData>::from_root_path(PersistedQueueArgs {
814            root_path: root_path.clone(),
815            max_on_disk_bytes: 1024,
816            storage_max_disk_ratio: 0.8,
817            disk_usage_retriever: Arc::new(MockDiskUsageRetriever {}),
818            max_age_days: 10,
819        })
820        .await
821        .expect("should not fail to create persisted queue");
822
823        // Write a corrupt file before pushing valid data, so it sorts first.
824        let corrupt_path = write_corrupt_entry(&root_path).await;
825
826        // Push a valid entry.
827        let _ = persisted_queue
828            .push(data.clone())
829            .await
830            .expect("should not fail to push data");
831
832        // Refresh state so the queue picks up the corrupt file.
833        persisted_queue.refresh_entry_state().await.unwrap();
834
835        // Pop should skip the corrupt entry and return the valid one.
836        let actual = persisted_queue
837            .pop()
838            .await
839            .expect("should not fail to pop data")
840            .expect("should have a valid entry");
841        assert_eq!(data, actual);
842
843        // The corrupt file should have been cleaned up from disk.
844        assert!(!corrupt_path.exists());
845
846        // The dropped counter should reflect the corrupt entry.
847        assert_eq!(1, persisted_queue.take_entries_dropped());
848
849        // No files should remain.
850        assert_eq!(0, files_in_dir(&root_path).await);
851    }
852
853    #[tokio::test]
854    async fn corrupt_entry_does_not_block_queue() {
855        let data1 = FakeData::random();
856        let data2 = FakeData::random();
857
858        let temp_dir = tempfile::tempdir().expect("should not fail to create temporary directory");
859        let root_path = temp_dir.path().to_path_buf();
860
861        // Use MockDiskUsageRetriever to avoid disk space ratio causing eviction during push.
862        let mut persisted_queue = PersistedQueue::<FakeData>::from_root_path(PersistedQueueArgs {
863            root_path: root_path.clone(),
864            max_on_disk_bytes: 1024,
865            storage_max_disk_ratio: 0.8,
866            disk_usage_retriever: Arc::new(MockDiskUsageRetriever {}),
867            max_age_days: 10,
868        })
869        .await
870        .expect("should not fail to create persisted queue");
871
872        // Push two valid entries, then corrupt the first one on disk.
873        let _ = persisted_queue.push(data1).await.expect("should not fail to push data");
874        let _ = persisted_queue
875            .push(data2.clone())
876            .await
877            .expect("should not fail to push data");
878        assert_eq!(2, persisted_queue.entries.len());
879
880        // Corrupt the oldest entry file on disk.
881        let oldest_path = persisted_queue.entries[0].path.clone();
882        tokio::fs::write(&oldest_path, b"corrupted").await.unwrap();
883
884        // Pop should skip the corrupt entry and return the second valid one.
885        let actual = persisted_queue
886            .pop()
887            .await
888            .expect("should not fail to pop data")
889            .expect("should have a valid entry");
890        assert_eq!(data2, actual);
891
892        assert_eq!(1, persisted_queue.take_entries_dropped());
893        assert_eq!(0, files_in_dir(&root_path).await);
894    }
895
896    #[tokio::test]
897    async fn pop_returns_none_when_all_entries_corrupt() {
898        let temp_dir = tempfile::tempdir().expect("should not fail to create temporary directory");
899        let root_path = temp_dir.path().to_path_buf();
900
901        let mut persisted_queue = PersistedQueue::<FakeData>::from_root_path(PersistedQueueArgs {
902            root_path: root_path.clone(),
903            max_on_disk_bytes: 1024,
904            storage_max_disk_ratio: 0.8,
905            disk_usage_retriever: Arc::new(MockDiskUsageRetriever {}),
906            max_age_days: 10,
907        })
908        .await
909        .expect("should not fail to create persisted queue");
910
911        // Write a corrupt entry and refresh state.
912        write_corrupt_entry(&root_path).await;
913        persisted_queue.refresh_entry_state().await.unwrap();
914
915        // Pop should skip the corrupt entry and return None (no valid entries).
916        let result = persisted_queue.pop().await.expect("should not fail to pop data");
917        assert!(result.is_none());
918
919        assert_eq!(1, persisted_queue.take_entries_dropped());
920        assert_eq!(0, files_in_dir(&root_path).await);
921    }
922
923    #[tokio::test]
924    async fn corrupt_entry_dropped_during_eviction() {
925        let data = FakeData::random();
926
927        let temp_dir = tempfile::tempdir().expect("should not fail to create temporary directory");
928        let root_path = temp_dir.path().to_path_buf();
929
930        // Queue sized to hold only one entry.
931        let mut persisted_queue = PersistedQueue::<FakeData>::from_root_path(PersistedQueueArgs {
932            root_path: root_path.clone(),
933            max_on_disk_bytes: 32,
934            storage_max_disk_ratio: 0.8,
935            disk_usage_retriever: Arc::new(MockDiskUsageRetriever {}),
936            max_age_days: 10,
937        })
938        .await
939        .expect("should not fail to create persisted queue");
940
941        // Push a valid entry, then corrupt it on disk.
942        let _ = persisted_queue
943            .push(FakeData::random())
944            .await
945            .expect("should not fail to push data");
946        let first_path = persisted_queue.entries[0].path.clone();
947        tokio::fs::write(&first_path, b"corrupted").await.unwrap();
948
949        // Push another entry, which needs to evict the first (corrupt) one to make space.
950        // This should succeed without error -- the corrupt entry is dropped during eviction.
951        let _ = persisted_queue
952            .push(data.clone())
953            .await
954            .expect("should not fail to push data");
955
956        // The corrupt entry was dropped during eviction, not via normal eviction tracking.
957        assert_eq!(1, persisted_queue.take_entries_dropped());
958
959        // The valid entry should be poppable.
960        let actual = persisted_queue
961            .pop()
962            .await
963            .expect("should not fail to pop data")
964            .expect("should have a valid entry");
965        assert_eq!(data, actual);
966        assert_eq!(0, files_in_dir(&root_path).await);
967    }
968
969    #[tokio::test]
970    async fn persisted_queue_removes_outdated_files_on_initialization() {
971        let data = FakeData::random();
972
973        let temp_dir = tempfile::tempdir().expect("should not fail to create temporary directory");
974        let root_path = temp_dir.path().to_path_buf();
975
976        // Pre-seed a year-2000 retry file containing valid data that would be loaded as an entry
977        // if it were not cleaned up first.
978        let stale_content = serde_json::to_vec(&data).unwrap();
979        tokio::fs::write(
980            root_path.join("retry-20000101000000000000000-100000000.json"),
981            &stale_content,
982        )
983        .await
984        .unwrap();
985
986        assert_eq!(1, files_in_dir(&root_path).await);
987
988        // Initialize the queue and remove stale files with a 10-day age limit.
989        let mut queue = PersistedQueue::<FakeData>::from_root_path(PersistedQueueArgs {
990            root_path: root_path.clone(),
991            max_on_disk_bytes: 1024 * 1024,
992            storage_max_disk_ratio: 0.8,
993            disk_usage_retriever: Arc::new(DiskUsageRetrieverImpl::new(root_path.clone())),
994            max_age_days: 10,
995        })
996        .await
997        .expect("should not fail to create persisted queue");
998        queue
999            .remove_stale_files()
1000            .await
1001            .expect("should not fail to remove stale files");
1002
1003        assert_eq!(0, files_in_dir(&root_path).await);
1004        assert!(queue.is_empty());
1005    }
1006
1007    #[tokio::test]
1008    async fn persisted_queue_zero_age_removes_all_retry_files_on_initialization() {
1009        // max_age_days=0 sets cutoff=now, matching the core Agent's FileRemovalPolicy behavior
1010        // with outdatedFileDayCount=0 — all retry files are removed on startup.
1011        let data = FakeData::random();
1012
1013        let temp_dir = tempfile::tempdir().expect("should not fail to create temporary directory");
1014        let root_path = temp_dir.path().to_path_buf();
1015
1016        // Seed with a freshly-written entry via a normal queue.
1017        let mut seeding_queue = PersistedQueue::<FakeData>::from_root_path(PersistedQueueArgs {
1018            root_path: root_path.clone(),
1019            max_on_disk_bytes: 1024 * 1024,
1020            storage_max_disk_ratio: 0.8,
1021            disk_usage_retriever: Arc::new(DiskUsageRetrieverImpl::new(root_path.clone())),
1022            max_age_days: 10,
1023        })
1024        .await
1025        .expect("should not fail to create persisted queue");
1026        let _ = seeding_queue.push(data).await.expect("should not fail to push data");
1027        assert_eq!(1, files_in_dir(&root_path).await);
1028
1029        // Re-open and remove stale files with max_age_days=0: the just-written file must also be deleted.
1030        let mut queue = PersistedQueue::<FakeData>::from_root_path(PersistedQueueArgs {
1031            root_path: root_path.clone(),
1032            max_on_disk_bytes: 1024 * 1024,
1033            storage_max_disk_ratio: 0.8,
1034            disk_usage_retriever: Arc::new(DiskUsageRetrieverImpl::new(root_path.clone())),
1035            max_age_days: 0,
1036        })
1037        .await
1038        .expect("should not fail to create persisted queue");
1039        queue
1040            .remove_stale_files()
1041            .await
1042            .expect("should not fail to remove stale files");
1043
1044        assert_eq!(0, files_in_dir(&root_path).await);
1045        assert!(queue.is_empty());
1046    }
1047}