saluki_io/net/util/retry/queue/
persisted.rs

1use std::{
2    io,
3    marker::PhantomData,
4    path::{Path, PathBuf},
5    sync::Arc,
6};
7
8use chrono::{DateTime, NaiveDateTime, Utc};
9use fs4::{available_space, total_space};
10use rand::Rng;
11use saluki_error::{generic_error, ErrorContext as _, GenericError};
12use serde::{de::DeserializeOwned, Serialize};
13use tracing::{debug, info, warn};
14
15use super::{EventContainer, PushResult};
16
17/// A persisted entry.
18///
19/// Represents the high-level metadata of a persisted entry, including the path to and size of the entry.
20struct PersistedEntry {
21    path: PathBuf,
22    timestamp: u128,
23    size_bytes: u64,
24}
25
26impl PersistedEntry {
27    /// Attempts to create a `PersistedEntry` from the given path.
28    ///
29    /// If the given path is not recognized as the path to a valid persisted entry, `None` is returned.
30    fn try_from_path(path: PathBuf, size_bytes: u64) -> Option<Self> {
31        let timestamp = decode_timestamped_filename(&path)?;
32        Some(Self {
33            path,
34            timestamp,
35            size_bytes,
36        })
37    }
38
39    fn from_parts(path: PathBuf, timestamp: u128, size_bytes: u64) -> Self {
40        Self {
41            path,
42            timestamp,
43            size_bytes,
44        }
45    }
46}
47
48pub trait DiskUsageRetriever {
49    fn total_space(&self) -> Result<u64, GenericError>;
50    fn available_space(&self) -> Result<u64, GenericError>;
51}
52
53pub struct DiskUsageRetrieverImpl {
54    root_path: PathBuf,
55}
56
57impl DiskUsageRetrieverImpl {
58    pub fn new(root_path: PathBuf) -> Self {
59        Self { root_path }
60    }
61}
62
63impl DiskUsageRetriever for DiskUsageRetrieverImpl {
64    fn total_space(&self) -> Result<u64, GenericError> {
65        total_space(&self.root_path)
66            .with_error_context(|| format!("Failed to get total space for '{}'.", self.root_path.display()))
67    }
68
69    fn available_space(&self) -> Result<u64, GenericError> {
70        available_space(&self.root_path)
71            .with_error_context(|| format!("Failed to get available space for '{}'.", self.root_path.display()))
72    }
73}
74
75#[derive(Clone)]
76pub struct DiskUsageRetrieverWrapper {
77    inner: Arc<dyn DiskUsageRetriever + Send + Sync>,
78}
79
80impl DiskUsageRetrieverWrapper {
81    pub fn new(disk_usage_retriever: Arc<dyn DiskUsageRetriever + Send + Sync>) -> Self {
82        Self {
83            inner: disk_usage_retriever,
84        }
85    }
86}
87
88pub struct PersistedQueue<T> {
89    root_path: PathBuf,
90    entries: Vec<PersistedEntry>,
91    total_on_disk_bytes: u64,
92    max_on_disk_bytes: u64,
93    storage_max_disk_ratio: f64,
94    disk_usage_retriever: DiskUsageRetrieverWrapper,
95    _entry: PhantomData<T>,
96}
97
98impl<T> PersistedQueue<T>
99where
100    T: EventContainer + DeserializeOwned + Serialize,
101{
102    /// Creates a new `PersistedQueue` instance from the given root path and maximum size.
103    ///
104    /// The root path is created if it does not already exist, and is scanned for existing persisted entries. Entries
105    /// are removed (oldest first) until the total size of all scanned entries is within the given maximum size.
106    ///
107    /// # Errors
108    ///
109    /// If there is an error creating the root directory, or scanning it for existing entries, or deleting entries to
110    /// shrink the directory to fit the given maximum size, an error is returned.
111    pub async fn from_root_path(
112        root_path: PathBuf, max_on_disk_bytes: u64, storage_max_disk_ratio: f64,
113        disk_usage_retriever: DiskUsageRetrieverWrapper,
114    ) -> Result<Self, GenericError> {
115        // Make sure the directory exists first.
116        create_directory_recursive(root_path.clone())
117            .await
118            .with_error_context(|| format!("Failed to create retry directory '{}'.", root_path.display()))?;
119
120        let mut persisted_requests = Self {
121            root_path: root_path.clone(),
122            entries: Vec::new(),
123            total_on_disk_bytes: 0,
124            max_on_disk_bytes,
125            storage_max_disk_ratio,
126            disk_usage_retriever,
127            _entry: PhantomData,
128        };
129
130        persisted_requests.refresh_entry_state().await?;
131
132        info!(
133            "Persisted retry queue initialized. Transactions will be stored in '{}'.",
134            root_path.display()
135        );
136
137        Ok(persisted_requests)
138    }
139
140    /// Returns `true` if the queue is empty.
141    pub fn is_empty(&self) -> bool {
142        self.entries.is_empty()
143    }
144
145    /// Returns the number of entries in the queue.
146    pub fn len(&self) -> usize {
147        self.entries.len()
148    }
149
150    /// Enqueues an entry and persists it to disk.
151    ///
152    /// # Errors
153    ///
154    /// If there is an error serializing the entry, or writing it to disk, or removing older entries to make space for
155    /// the new entry, an error is returned.
156    pub async fn push(&mut self, entry: T) -> Result<PushResult, GenericError> {
157        // Serialize the entry to a temporary file.
158        let (filename, timestamp) = generate_timestamped_filename();
159        let entry_path = self.root_path.join(filename);
160        let serialized = serde_json::to_vec(&entry)
161            .with_error_context(|| format!("Failed to serialize entry for '{}'.", entry_path.display()))?;
162
163        if serialized.len() as u64 > self.max_on_disk_bytes {
164            return Err(generic_error!("Entry is too large to persist."));
165        }
166
167        // Make sure we have enough space to persist the entry.
168        let push_result = self
169            .remove_until_available_space(serialized.len() as u64)
170            .await
171            .error_context(
172                "Failed to remove older persisted entries to make space for the incoming persisted entry.",
173            )?;
174
175        // Actually persist it.
176        tokio::fs::write(&entry_path, &serialized)
177            .await
178            .with_error_context(|| format!("Failed to write entry to '{}'.", entry_path.display()))?;
179
180        // Add a new persisted entry to our state.
181        self.entries.push(PersistedEntry::from_parts(
182            entry_path,
183            timestamp,
184            serialized.len() as u64,
185        ));
186        self.total_on_disk_bytes += serialized.len() as u64;
187
188        debug!(entry.len = serialized.len(), "Enqueued persisted entry.");
189
190        Ok(push_result)
191    }
192
193    /// Consumes the oldest persisted entry on disk, if one exists.
194    ///
195    /// # Errors
196    ///
197    /// If there is an error reading or deserializing the entry, an error is returned.
198    pub async fn pop(&mut self) -> Result<Option<T>, GenericError> {
199        if self.entries.is_empty() {
200            return Ok(None);
201        }
202
203        loop {
204            let entry = self.entries.remove(0);
205            match try_deserialize_entry(&entry).await {
206                Ok(Some(deserialized)) => {
207                    // We got the deserialized entry, so remove it from our state and return it.
208                    self.total_on_disk_bytes -= entry.size_bytes;
209                    debug!(entry.len = entry.size_bytes, "Dequeued persisted entry.");
210
211                    return Ok(Some(deserialized));
212                }
213                Ok(None) => {
214                    // We couldn't read the entry from disk, which points to us potentially having invalid state about
215                    // what entries _are_ on disk, so we'll refresh our entry state and try again.
216                    self.refresh_entry_state().await?;
217                    continue;
218                }
219                Err(e) => {
220                    // We couldn't read the file, so add it back to our entries list and return the error.
221                    self.entries.insert(0, entry);
222                    return Err(e);
223                }
224            }
225        }
226    }
227
228    async fn refresh_entry_state(&mut self) -> io::Result<()> {
229        // Scan the root path for persisted entries.
230        let mut entries = Vec::new();
231
232        let mut dir_reader = tokio::fs::read_dir(&self.root_path).await?;
233        while let Some(entry) = dir_reader.next_entry().await? {
234            let metadata = entry.metadata().await?;
235            if metadata.is_file() {
236                match PersistedEntry::try_from_path(entry.path(), metadata.len()) {
237                    Some(entry) => entries.push(entry),
238                    None => {
239                        warn!(
240                            file_size = metadata.len(),
241                            "Ignoring unrecognized file '{}' in retry directory.",
242                            entry.path().display()
243                        );
244                        continue;
245                    }
246                }
247            }
248        }
249
250        // Sort the entries by their inherent timestamp.
251        entries.sort_by_key(|entry| entry.timestamp);
252        self.total_on_disk_bytes = entries.iter().map(|entry| entry.size_bytes).sum();
253        self.entries = entries;
254
255        Ok(())
256    }
257
258    /// Removes persisted entries (oldest first) until there is at least the required number of bytes in free space
259    /// (maximum - total).
260    ///
261    /// # Errors
262    ///
263    /// If there is an error while deleting persisted entries, an error is returned.
264    async fn remove_until_available_space(&mut self, required_bytes: u64) -> Result<PushResult, GenericError> {
265        let mut push_result = PushResult::default();
266
267        let disk_usage_retriever = self.disk_usage_retriever.clone();
268        let storage_max_disk_ratio = self.storage_max_disk_ratio;
269        let max_on_disk_bytes = self.max_on_disk_bytes;
270
271        let limit = tokio::task::spawn_blocking(move || {
272            on_disk_bytes_limit(disk_usage_retriever, storage_max_disk_ratio, max_on_disk_bytes)
273        })
274        .await??;
275
276        while !self.entries.is_empty() && self.total_on_disk_bytes + required_bytes > limit {
277            let entry = self.entries.remove(0);
278
279            // Deserialize the entry, which gives us back the original event and removes the file from disk.
280            let event_count = match try_deserialize_entry::<T>(&entry).await {
281                Ok(Some(deserialized)) => deserialized.event_count(),
282                Ok(None) => {
283                    warn!(entry.path = %entry.path.display(), "Failed to find entry on disk. Persisted entry state may be inconsistent.");
284                    continue;
285                }
286                Err(e) => {
287                    // We didn't delete the file, so add it back to our entries list and return the error.
288                    self.entries.insert(0, entry);
289                    return Err(e);
290                }
291            };
292
293            // Update our statistics.
294            self.total_on_disk_bytes -= entry.size_bytes;
295            push_result.track_dropped_item(event_count);
296
297            debug!(entry.path = %entry.path.display(), entry.len = entry.size_bytes, "Dropped persisted entry.");
298        }
299
300        Ok(push_result)
301    }
302}
303
304/// Determines the total number of bytes that can be written to disk without causing the underlying volume to end up
305/// with more than `storage_max_disk_ratio` in terms of used space. The minimum of `max_on_disk_bytes` and the result
306/// of this calculation is returned.
307///
308/// # Errors
309///
310/// If there is an error while retrieving the total or available space of the underlying volume, an error is returned.
311fn on_disk_bytes_limit(
312    disk_usage_retriever: DiskUsageRetrieverWrapper, storage_max_disk_ratio: f64, max_on_disk_bytes: u64,
313) -> Result<u64, GenericError> {
314    let total_space = disk_usage_retriever.inner.total_space()? as f64;
315    let available_space = disk_usage_retriever.inner.available_space()? as f64;
316    let disk_reserved = total_space * (1.0 - storage_max_disk_ratio);
317    let available_disk_usage = (available_space - disk_reserved).ceil() as u64;
318    Ok(max_on_disk_bytes.min(available_disk_usage))
319}
320
321async fn try_deserialize_entry<T: DeserializeOwned>(entry: &PersistedEntry) -> Result<Option<T>, GenericError> {
322    let serialized = match tokio::fs::read(&entry.path).await {
323        Ok(serialized) => serialized,
324        Err(e) => match e.kind() {
325            io::ErrorKind::NotFound => {
326                // We tried to delete an entry that no longer exists on disk, which means our internal entry state
327                // is corrupted for some reason.
328                //
329                // Tell the caller that we couldn't find the entry on disk, so that they need to refresh the entry state
330                // to make sure it's up-to-date before trying again.
331                return Ok(None);
332            }
333            _ => {
334                return Err(e)
335                    .with_error_context(|| format!("Failed to read persisted entry '{}'.", entry.path.display()))
336            }
337        },
338    };
339
340    let deserialized = serde_json::from_slice(&serialized)
341        .with_error_context(|| format!("Failed to deserialize persisted entry '{}'.", entry.path.display()))?;
342
343    // Delete the entry from disk before returning, so that we don't risk sending duplicates.
344    tokio::fs::remove_file(&entry.path)
345        .await
346        .with_error_context(|| format!("Failed to delete persisted entry '{}'.", entry.path.display()))?;
347
348    debug!(entry.path = %entry.path.display(), entry.len = entry.size_bytes, "Consumed persisted entry and removed from disk.");
349    Ok(Some(deserialized))
350}
351
352fn generate_timestamped_filename() -> (PathBuf, u128) {
353    let now = Utc::now();
354    let now_ts = datetime_to_timestamp(now);
355    let nonce = rand::rng().random_range(100000000..999999999);
356
357    let filename = format!("retry-{}-{}.json", now.format("%Y%m%d%H%M%S%f"), nonce).into();
358
359    (filename, now_ts)
360}
361
362fn decode_timestamped_filename(path: &Path) -> Option<u128> {
363    let filename = path.file_stem()?.to_str()?;
364    let mut filename_parts = filename.split('-');
365
366    let prefix = filename_parts.next()?;
367    let timestamp_str = filename_parts.next()?;
368    let nonce = filename_parts.next()?;
369
370    // Make sure the filename matches our expected format by first checking the prefix and nonce portions.
371    if prefix != "retry" || nonce.parse::<u64>().is_err() {
372        return None;
373    }
374
375    // Try and decode the timestamp portion.
376    NaiveDateTime::parse_from_str(timestamp_str, "%Y%m%d%H%M%S%f")
377        .map(|dt| datetime_to_timestamp(dt.and_utc()))
378        .ok()
379}
380
381fn datetime_to_timestamp(dt: DateTime<Utc>) -> u128 {
382    let secs = (dt.timestamp() as u128) * 1_000_000_000;
383    let ns = dt.timestamp_subsec_nanos() as u128;
384
385    secs + ns
386}
387
388async fn create_directory_recursive(path: PathBuf) -> Result<(), GenericError> {
389    let mut dir_builder = std::fs::DirBuilder::new();
390    dir_builder.recursive(true);
391
392    // When on Unix platforms, adjust the permissions of the directory to be RWX for the owner only, and nothing for
393    // group/world.
394    #[cfg(unix)]
395    {
396        use std::os::unix::fs::DirBuilderExt;
397        dir_builder.mode(0o700);
398    }
399
400    tokio::task::spawn_blocking(move || {
401        dir_builder
402            .create(&path)
403            .with_error_context(|| format!("Failed to create directory '{}'.", path.display()))
404    })
405    .await
406    .error_context("Failed to spawn directory creation blocking task.")?
407}
408
409#[cfg(test)]
410mod tests {
411    use rand_distr::Alphanumeric;
412    use serde::Deserialize;
413
414    use super::*;
415
416    #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
417    struct FakeData {
418        name: String,
419        value: u32,
420    }
421
422    impl FakeData {
423        fn random() -> Self {
424            Self {
425                name: rand::rng().sample_iter(&Alphanumeric).take(8).map(char::from).collect(),
426                value: rand::rng().random_range(0..100),
427            }
428        }
429    }
430
431    impl EventContainer for FakeData {
432        fn event_count(&self) -> u64 {
433            1
434        }
435    }
436
437    struct MockDiskUsageRetriever {}
438
439    impl DiskUsageRetriever for MockDiskUsageRetriever {
440        fn total_space(&self) -> Result<u64, GenericError> {
441            Ok(100)
442        }
443        fn available_space(&self) -> Result<u64, GenericError> {
444            Ok(100)
445        }
446    }
447
448    async fn files_in_dir(path: &Path) -> usize {
449        let mut file_count = 0;
450        let mut dir_reader = tokio::fs::read_dir(path).await.unwrap();
451        while let Some(entry) = dir_reader.next_entry().await.unwrap() {
452            if entry.metadata().await.unwrap().is_file() {
453                file_count += 1;
454            }
455        }
456        file_count
457    }
458
459    #[tokio::test]
460    async fn basic_push_pop() {
461        let data = FakeData::random();
462
463        // Create our temporary directory and point our persisted queue at it.
464        let temp_dir = tempfile::tempdir().expect("should not fail to create temporary directory");
465        let root_path = temp_dir.path().to_path_buf();
466
467        let mut persisted_queue = PersistedQueue::<FakeData>::from_root_path(
468            root_path.clone(),
469            1024,
470            0.8,
471            DiskUsageRetrieverWrapper::new(Arc::new(DiskUsageRetrieverImpl::new(root_path.clone()))),
472        )
473        .await
474        .expect("should not fail to create persisted queue");
475
476        // Ensure the directory is empty.
477        assert_eq!(0, files_in_dir(&root_path).await);
478
479        // Push our data to the queue and ensure it persisted it to disk.
480        let push_result = persisted_queue
481            .push(data.clone())
482            .await
483            .expect("should not fail to push data");
484        assert_eq!(1, files_in_dir(&root_path).await);
485        assert_eq!(0, push_result.items_dropped);
486        assert_eq!(0, push_result.events_dropped);
487
488        // Now pop the data back out and ensure it matches what we pushed, and that the file has been removed from disk.
489        let actual = persisted_queue
490            .pop()
491            .await
492            .expect("should not fail to pop data")
493            .expect("should not be empty");
494        assert_eq!(data, actual);
495        assert_eq!(0, files_in_dir(&root_path).await);
496    }
497
498    #[tokio::test]
499    async fn entry_too_large() {
500        let data = FakeData::random();
501
502        // Create our temporary directory and point our persisted queue at it.
503        let temp_dir = tempfile::tempdir().expect("should not fail to create temporary directory");
504        let root_path = temp_dir.path().to_path_buf();
505
506        let mut persisted_queue = PersistedQueue::<FakeData>::from_root_path(
507            root_path.clone(),
508            1,
509            0.8,
510            DiskUsageRetrieverWrapper::new(Arc::new(DiskUsageRetrieverImpl::new(root_path.clone()))),
511        )
512        .await
513        .expect("should not fail to create persisted queue");
514
515        // Ensure the directory is empty.
516        assert_eq!(0, files_in_dir(&root_path).await);
517
518        // Attempt to push our data into the queue, which should fail because it's too large.
519        assert!(persisted_queue.push(data).await.is_err());
520
521        // Ensure the directory is (still) empty.
522        assert_eq!(0, files_in_dir(&root_path).await);
523    }
524
525    #[tokio::test]
526    async fn remove_oldest_entry_on_push() {
527        let data1 = FakeData::random();
528        let data2 = FakeData::random();
529
530        // Create our temporary directory and point our persisted queue at it.
531        //
532        // Our queue is sized such that only one entry can be persisted at a time.
533        let temp_dir = tempfile::tempdir().expect("should not fail to create temporary directory");
534        let root_path = temp_dir.path().to_path_buf();
535
536        let mut persisted_queue = PersistedQueue::<FakeData>::from_root_path(
537            root_path.clone(),
538            32,
539            0.8,
540            DiskUsageRetrieverWrapper::new(Arc::new(DiskUsageRetrieverImpl::new(root_path.clone()))),
541        )
542        .await
543        .expect("should not fail to create persisted queue");
544
545        // Ensure the directory is empty.
546        assert_eq!(0, files_in_dir(&root_path).await);
547
548        // Push our data to the queue and ensure it persisted it to disk.
549        let push_result = persisted_queue.push(data1).await.expect("should not fail to push data");
550        assert_eq!(1, files_in_dir(&root_path).await);
551        assert_eq!(0, push_result.items_dropped);
552        assert_eq!(0, push_result.events_dropped);
553
554        // Push a second data entry, which should cause the first entry to be removed.
555        let push_result = persisted_queue
556            .push(data2.clone())
557            .await
558            .expect("should not fail to push data");
559        assert_eq!(1, files_in_dir(&root_path).await);
560        assert_eq!(1, push_result.items_dropped);
561        assert_eq!(1, push_result.events_dropped);
562
563        // Now pop the data back out and ensure it matches the second item we pushed -- indicating the first item was
564        // removed -- and that we've consumed it, leaving no files on disk.
565        let actual = persisted_queue
566            .pop()
567            .await
568            .expect("should not fail to pop data")
569            .expect("should not be empty");
570        assert_eq!(data2, actual);
571        assert_eq!(0, files_in_dir(&root_path).await);
572    }
573
574    #[tokio::test]
575    async fn storage_ratio_exceeded() {
576        let data1 = FakeData::random();
577        let data2 = FakeData::random();
578
579        // Create our temporary directory and point our persisted queue at it.
580        //
581        // Our queue is sized such that two entries can be persisted at a time.
582        let temp_dir = tempfile::tempdir().expect("should not fail to create temporary directory");
583        let root_path = temp_dir.path().to_path_buf();
584
585        let mut persisted_queue = PersistedQueue::<FakeData>::from_root_path(
586            root_path.clone(),
587            80,
588            0.35,
589            DiskUsageRetrieverWrapper::new(Arc::new(MockDiskUsageRetriever {})),
590        )
591        .await
592        .expect("should not fail to create persisted queue");
593
594        // Ensure the directory is empty.
595        assert_eq!(0, files_in_dir(&root_path).await);
596
597        // The `storage_max_disk_ratio` is 0.35, and our `MockDiskUsageRetriever` returns 100 for both `total_space` and
598        // `available_space`, so `on_disk_bytes_limit()` returns min(80, 35) = 35.
599        //
600        // First entry: total_on_disk_bytes(0) + required_bytes(30) < on_disk_bytes_limit(35)
601        let push_result = persisted_queue.push(data1).await.expect("should not fail to push data");
602
603        assert_eq!(1, files_in_dir(&root_path).await);
604        assert_eq!(0, push_result.items_dropped);
605        assert_eq!(0, push_result.events_dropped);
606
607        // Second entry: total_on_disk_bytes(30) + required_bytes(30) > on_disk_bytes_limit(35) so the first entry is dropped.
608        let push_result = persisted_queue
609            .push(data2.clone())
610            .await
611            .expect("should not fail to push data");
612        assert_eq!(1, files_in_dir(&root_path).await);
613        assert_eq!(1, push_result.items_dropped);
614        assert_eq!(1, push_result.events_dropped);
615
616        // Now pop the data back out and ensure it matches the second item we pushed -- indicating the first item was
617        // removed -- and that we've consumed it, leaving no files on disk.
618        let actual = persisted_queue
619            .pop()
620            .await
621            .expect("should not fail to pop data")
622            .expect("should not be empty");
623        assert_eq!(data2, actual);
624        assert_eq!(0, files_in_dir(&root_path).await);
625    }
626}