saluki_io/net/util/retry/queue/
persisted.rs

1use std::{
2    io,
3    marker::PhantomData,
4    path::{Path, PathBuf},
5    sync::Arc,
6};
7
8use chrono::{DateTime, NaiveDateTime, Utc};
9use fs4::{available_space, total_space};
10use rand::Rng;
11use saluki_error::{generic_error, ErrorContext as _, GenericError};
12use serde::{de::DeserializeOwned, Serialize};
13use tracing::{debug, warn};
14
15use super::{EventContainer, PushResult};
16
17/// A persisted entry.
18///
19/// Represents the high-level metadata of a persisted entry, including the path to and size of the entry.
20struct PersistedEntry {
21    path: PathBuf,
22    timestamp: u128,
23    size_bytes: u64,
24}
25
26impl PersistedEntry {
27    /// Attempts to create a `PersistedEntry` from the given path.
28    ///
29    /// If the given path is not recognized as the path to a valid persisted entry, `None` is returned.
30    fn try_from_path(path: PathBuf, size_bytes: u64) -> Option<Self> {
31        let timestamp = decode_timestamped_filename(&path)?;
32        Some(Self {
33            path,
34            timestamp,
35            size_bytes,
36        })
37    }
38
39    fn from_parts(path: PathBuf, timestamp: u128, size_bytes: u64) -> Self {
40        Self {
41            path,
42            timestamp,
43            size_bytes,
44        }
45    }
46}
47
48pub trait DiskUsageRetriever {
49    fn total_space(&self) -> Result<u64, GenericError>;
50    fn available_space(&self) -> Result<u64, GenericError>;
51}
52
53pub struct DiskUsageRetrieverImpl {
54    root_path: PathBuf,
55}
56
57impl DiskUsageRetrieverImpl {
58    pub fn new(root_path: PathBuf) -> Self {
59        Self { root_path }
60    }
61}
62
63impl DiskUsageRetriever for DiskUsageRetrieverImpl {
64    fn total_space(&self) -> Result<u64, GenericError> {
65        total_space(&self.root_path)
66            .with_error_context(|| format!("Failed to get total space for '{}'.", self.root_path.display()))
67    }
68
69    fn available_space(&self) -> Result<u64, GenericError> {
70        available_space(&self.root_path)
71            .with_error_context(|| format!("Failed to get available space for '{}'.", self.root_path.display()))
72    }
73}
74
75#[derive(Clone)]
76pub struct DiskUsageRetrieverWrapper {
77    inner: Arc<dyn DiskUsageRetriever + Send + Sync>,
78}
79
80impl DiskUsageRetrieverWrapper {
81    pub fn new(disk_usage_retriever: Arc<dyn DiskUsageRetriever + Send + Sync>) -> Self {
82        Self {
83            inner: disk_usage_retriever,
84        }
85    }
86}
87
88pub struct PersistedQueue<T> {
89    root_path: PathBuf,
90    entries: Vec<PersistedEntry>,
91    total_on_disk_bytes: u64,
92    max_on_disk_bytes: u64,
93    storage_max_disk_ratio: f64,
94    disk_usage_retriever: DiskUsageRetrieverWrapper,
95    _entry: PhantomData<T>,
96}
97
98impl<T> PersistedQueue<T>
99where
100    T: EventContainer + DeserializeOwned + Serialize,
101{
102    /// Creates a new `PersistedQueue` instance from the given root path and maximum size.
103    ///
104    /// The root path is created if it does not already exist, and is scanned for existing persisted entries. Entries
105    /// are removed (oldest first) until the total size of all scanned entries is within the given maximum size.
106    ///
107    /// # Errors
108    ///
109    /// If there is an error creating the root directory, or scanning it for existing entries, or deleting entries to
110    /// shrink the directory to fit the given maximum size, an error is returned.
111    pub async fn from_root_path(
112        root_path: PathBuf, max_on_disk_bytes: u64, storage_max_disk_ratio: f64,
113        disk_usage_retriever: DiskUsageRetrieverWrapper,
114    ) -> Result<Self, GenericError> {
115        // Make sure the directory exists first.
116        create_directory_recursive(root_path.clone())
117            .await
118            .with_error_context(|| format!("Failed to create retry directory '{}'.", root_path.display()))?;
119
120        let mut persisted_requests = Self {
121            root_path: root_path.clone(),
122            entries: Vec::new(),
123            total_on_disk_bytes: 0,
124            max_on_disk_bytes,
125            storage_max_disk_ratio,
126            disk_usage_retriever,
127            _entry: PhantomData,
128        };
129
130        persisted_requests.refresh_entry_state().await?;
131
132        Ok(persisted_requests)
133    }
134
135    /// Returns `true` if the queue is empty.
136    pub fn is_empty(&self) -> bool {
137        self.entries.is_empty()
138    }
139
140    /// Returns the number of entries in the queue.
141    pub fn len(&self) -> usize {
142        self.entries.len()
143    }
144
145    /// Enqueues an entry and persists it to disk.
146    ///
147    /// # Errors
148    ///
149    /// If there is an error serializing the entry, or writing it to disk, or removing older entries to make space for
150    /// the new entry, an error is returned.
151    pub async fn push(&mut self, entry: T) -> Result<PushResult, GenericError> {
152        // Serialize the entry to a temporary file.
153        let (filename, timestamp) = generate_timestamped_filename();
154        let entry_path = self.root_path.join(filename);
155        let serialized = serde_json::to_vec(&entry)
156            .with_error_context(|| format!("Failed to serialize entry for '{}'.", entry_path.display()))?;
157
158        if serialized.len() as u64 > self.max_on_disk_bytes {
159            return Err(generic_error!("Entry is too large to persist."));
160        }
161
162        // Make sure we have enough space to persist the entry.
163        let push_result = self
164            .remove_until_available_space(serialized.len() as u64)
165            .await
166            .error_context(
167                "Failed to remove older persisted entries to make space for the incoming persisted entry.",
168            )?;
169
170        // Actually persist it.
171        tokio::fs::write(&entry_path, &serialized)
172            .await
173            .with_error_context(|| format!("Failed to write entry to '{}'.", entry_path.display()))?;
174
175        // Add a new persisted entry to our state.
176        self.entries.push(PersistedEntry::from_parts(
177            entry_path,
178            timestamp,
179            serialized.len() as u64,
180        ));
181        self.total_on_disk_bytes += serialized.len() as u64;
182
183        debug!(entry.len = serialized.len(), "Enqueued persisted entry.");
184
185        Ok(push_result)
186    }
187
188    /// Consumes the oldest persisted entry on disk, if one exists.
189    ///
190    /// # Errors
191    ///
192    /// If there is an error reading or deserializing the entry, an error is returned.
193    pub async fn pop(&mut self) -> Result<Option<T>, GenericError> {
194        if self.entries.is_empty() {
195            return Ok(None);
196        }
197
198        loop {
199            let entry = self.entries.remove(0);
200            match try_deserialize_entry(&entry).await {
201                Ok(Some(deserialized)) => {
202                    // We got the deserialized entry, so remove it from our state and return it.
203                    self.total_on_disk_bytes -= entry.size_bytes;
204                    debug!(entry.len = entry.size_bytes, "Dequeued persisted entry.");
205
206                    return Ok(Some(deserialized));
207                }
208                Ok(None) => {
209                    // We couldn't read the entry from disk, which points to us potentially having invalid state about
210                    // what entries _are_ on disk, so we'll refresh our entry state and try again.
211                    self.refresh_entry_state().await?;
212                    continue;
213                }
214                Err(e) => {
215                    // We couldn't read the file, so add it back to our entries list and return the error.
216                    self.entries.insert(0, entry);
217                    return Err(e);
218                }
219            }
220        }
221    }
222
223    async fn refresh_entry_state(&mut self) -> io::Result<()> {
224        // Scan the root path for persisted entries.
225        let mut entries = Vec::new();
226
227        let mut dir_reader = tokio::fs::read_dir(&self.root_path).await?;
228        while let Some(entry) = dir_reader.next_entry().await? {
229            let metadata = entry.metadata().await?;
230            if metadata.is_file() {
231                match PersistedEntry::try_from_path(entry.path(), metadata.len()) {
232                    Some(entry) => entries.push(entry),
233                    None => {
234                        warn!(
235                            file_size = metadata.len(),
236                            "Ignoring unrecognized file '{}' in retry directory.",
237                            entry.path().display()
238                        );
239                        continue;
240                    }
241                }
242            }
243        }
244
245        // Sort the entries by their inherent timestamp.
246        entries.sort_by_key(|entry| entry.timestamp);
247        self.total_on_disk_bytes = entries.iter().map(|entry| entry.size_bytes).sum();
248        self.entries = entries;
249
250        Ok(())
251    }
252
253    /// Removes persisted entries (oldest first) until there is at least the required number of bytes in free space
254    /// (maximum - total).
255    ///
256    /// # Errors
257    ///
258    /// If there is an error while deleting persisted entries, an error is returned.
259    async fn remove_until_available_space(&mut self, required_bytes: u64) -> Result<PushResult, GenericError> {
260        let mut push_result = PushResult::default();
261
262        let disk_usage_retriever = self.disk_usage_retriever.clone();
263        let storage_max_disk_ratio = self.storage_max_disk_ratio;
264        let max_on_disk_bytes = self.max_on_disk_bytes;
265
266        let limit = tokio::task::spawn_blocking(move || {
267            on_disk_bytes_limit(disk_usage_retriever, storage_max_disk_ratio, max_on_disk_bytes)
268        })
269        .await??;
270
271        while !self.entries.is_empty() && self.total_on_disk_bytes + required_bytes > limit {
272            let entry = self.entries.remove(0);
273
274            // Deserialize the entry, which gives us back the original event and removes the file from disk.
275            let event_count = match try_deserialize_entry::<T>(&entry).await {
276                Ok(Some(deserialized)) => deserialized.event_count(),
277                Ok(None) => {
278                    warn!(entry.path = %entry.path.display(), "Failed to find entry on disk. Persisted entry state may be inconsistent.");
279                    continue;
280                }
281                Err(e) => {
282                    // We didn't delete the file, so add it back to our entries list and return the error.
283                    self.entries.insert(0, entry);
284                    return Err(e);
285                }
286            };
287
288            // Update our statistics.
289            self.total_on_disk_bytes -= entry.size_bytes;
290            push_result.track_dropped_item(event_count);
291
292            debug!(entry.path = %entry.path.display(), entry.len = entry.size_bytes, "Dropped persisted entry.");
293        }
294
295        Ok(push_result)
296    }
297}
298
299/// Determines the total number of bytes that can be written to disk without causing the underlying volume to end up
300/// with more than `storage_max_disk_ratio` in terms of used space. The minimum of `max_on_disk_bytes` and the result
301/// of this calculation is returned.
302///
303/// # Errors
304///
305/// If there is an error while retrieving the total or available space of the underlying volume, an error is returned.
306fn on_disk_bytes_limit(
307    disk_usage_retriever: DiskUsageRetrieverWrapper, storage_max_disk_ratio: f64, max_on_disk_bytes: u64,
308) -> Result<u64, GenericError> {
309    let total_space = disk_usage_retriever.inner.total_space()? as f64;
310    let available_space = disk_usage_retriever.inner.available_space()? as f64;
311    let disk_reserved = total_space * (1.0 - storage_max_disk_ratio);
312    let available_disk_usage = (available_space - disk_reserved).ceil() as u64;
313    Ok(max_on_disk_bytes.min(available_disk_usage))
314}
315
316async fn try_deserialize_entry<T: DeserializeOwned>(entry: &PersistedEntry) -> Result<Option<T>, GenericError> {
317    let serialized = match tokio::fs::read(&entry.path).await {
318        Ok(serialized) => serialized,
319        Err(e) => match e.kind() {
320            io::ErrorKind::NotFound => {
321                // We tried to delete an entry that no longer exists on disk, which means our internal entry state
322                // is corrupted for some reason.
323                //
324                // Tell the caller that we couldn't find the entry on disk, so that they need to refresh the entry state
325                // to make sure it's up-to-date before trying again.
326                return Ok(None);
327            }
328            _ => {
329                return Err(e)
330                    .with_error_context(|| format!("Failed to read persisted entry '{}'.", entry.path.display()))
331            }
332        },
333    };
334
335    let deserialized = serde_json::from_slice(&serialized)
336        .with_error_context(|| format!("Failed to deserialize persisted entry '{}'.", entry.path.display()))?;
337
338    // Delete the entry from disk before returning, so that we don't risk sending duplicates.
339    tokio::fs::remove_file(&entry.path)
340        .await
341        .with_error_context(|| format!("Failed to delete persisted entry '{}'.", entry.path.display()))?;
342
343    debug!(entry.path = %entry.path.display(), entry.len = entry.size_bytes, "Consumed persisted entry and removed from disk.");
344    Ok(Some(deserialized))
345}
346
347fn generate_timestamped_filename() -> (PathBuf, u128) {
348    let now = Utc::now();
349    let now_ts = datetime_to_timestamp(now);
350    let nonce = rand::rng().random_range(100000000..999999999);
351
352    let filename = format!("retry-{}-{}.json", now.format("%Y%m%d%H%M%S%f"), nonce).into();
353
354    (filename, now_ts)
355}
356
357fn decode_timestamped_filename(path: &Path) -> Option<u128> {
358    let filename = path.file_stem()?.to_str()?;
359    let mut filename_parts = filename.split('-');
360
361    let prefix = filename_parts.next()?;
362    let timestamp_str = filename_parts.next()?;
363    let nonce = filename_parts.next()?;
364
365    // Make sure the filename matches our expected format by first checking the prefix and nonce portions.
366    if prefix != "retry" || nonce.parse::<u64>().is_err() {
367        return None;
368    }
369
370    // Try and decode the timestamp portion.
371    NaiveDateTime::parse_from_str(timestamp_str, "%Y%m%d%H%M%S%f")
372        .map(|dt| datetime_to_timestamp(dt.and_utc()))
373        .ok()
374}
375
376fn datetime_to_timestamp(dt: DateTime<Utc>) -> u128 {
377    let secs = (dt.timestamp() as u128) * 1_000_000_000;
378    let ns = dt.timestamp_subsec_nanos() as u128;
379
380    secs + ns
381}
382
383async fn create_directory_recursive(path: PathBuf) -> Result<(), GenericError> {
384    let mut dir_builder = std::fs::DirBuilder::new();
385    dir_builder.recursive(true);
386
387    // When on Unix platforms, adjust the permissions of the directory to be RWX for the owner only, and nothing for
388    // group/world.
389    #[cfg(unix)]
390    {
391        use std::os::unix::fs::DirBuilderExt;
392        dir_builder.mode(0o700);
393    }
394
395    tokio::task::spawn_blocking(move || {
396        dir_builder
397            .create(&path)
398            .with_error_context(|| format!("Failed to create directory '{}'.", path.display()))
399    })
400    .await
401    .error_context("Failed to spawn directory creation blocking task.")?
402}
403
404#[cfg(test)]
405mod tests {
406    use rand_distr::Alphanumeric;
407    use serde::Deserialize;
408
409    use super::*;
410
411    #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
412    struct FakeData {
413        name: String,
414        value: u32,
415    }
416
417    impl FakeData {
418        fn random() -> Self {
419            Self {
420                name: rand::rng().sample_iter(&Alphanumeric).take(8).map(char::from).collect(),
421                value: rand::rng().random_range(0..100),
422            }
423        }
424    }
425
426    impl EventContainer for FakeData {
427        fn event_count(&self) -> u64 {
428            1
429        }
430    }
431
432    struct MockDiskUsageRetriever {}
433
434    impl DiskUsageRetriever for MockDiskUsageRetriever {
435        fn total_space(&self) -> Result<u64, GenericError> {
436            Ok(100)
437        }
438        fn available_space(&self) -> Result<u64, GenericError> {
439            Ok(100)
440        }
441    }
442
443    async fn files_in_dir(path: &Path) -> usize {
444        let mut file_count = 0;
445        let mut dir_reader = tokio::fs::read_dir(path).await.unwrap();
446        while let Some(entry) = dir_reader.next_entry().await.unwrap() {
447            if entry.metadata().await.unwrap().is_file() {
448                file_count += 1;
449            }
450        }
451        file_count
452    }
453
454    #[tokio::test]
455    async fn basic_push_pop() {
456        let data = FakeData::random();
457
458        // Create our temporary directory and point our persisted queue at it.
459        let temp_dir = tempfile::tempdir().expect("should not fail to create temporary directory");
460        let root_path = temp_dir.path().to_path_buf();
461
462        let mut persisted_queue = PersistedQueue::<FakeData>::from_root_path(
463            root_path.clone(),
464            1024,
465            0.8,
466            DiskUsageRetrieverWrapper::new(Arc::new(DiskUsageRetrieverImpl::new(root_path.clone()))),
467        )
468        .await
469        .expect("should not fail to create persisted queue");
470
471        // Ensure the directory is empty.
472        assert_eq!(0, files_in_dir(&root_path).await);
473
474        // Push our data to the queue and ensure it persisted it to disk.
475        let push_result = persisted_queue
476            .push(data.clone())
477            .await
478            .expect("should not fail to push data");
479        assert_eq!(1, files_in_dir(&root_path).await);
480        assert_eq!(0, push_result.items_dropped);
481        assert_eq!(0, push_result.events_dropped);
482
483        // Now pop the data back out and ensure it matches what we pushed, and that the file has been removed from disk.
484        let actual = persisted_queue
485            .pop()
486            .await
487            .expect("should not fail to pop data")
488            .expect("should not be empty");
489        assert_eq!(data, actual);
490        assert_eq!(0, files_in_dir(&root_path).await);
491    }
492
493    #[tokio::test]
494    async fn entry_too_large() {
495        let data = FakeData::random();
496
497        // Create our temporary directory and point our persisted queue at it.
498        let temp_dir = tempfile::tempdir().expect("should not fail to create temporary directory");
499        let root_path = temp_dir.path().to_path_buf();
500
501        let mut persisted_queue = PersistedQueue::<FakeData>::from_root_path(
502            root_path.clone(),
503            1,
504            0.8,
505            DiskUsageRetrieverWrapper::new(Arc::new(DiskUsageRetrieverImpl::new(root_path.clone()))),
506        )
507        .await
508        .expect("should not fail to create persisted queue");
509
510        // Ensure the directory is empty.
511        assert_eq!(0, files_in_dir(&root_path).await);
512
513        // Attempt to push our data into the queue, which should fail because it's too large.
514        assert!(persisted_queue.push(data).await.is_err());
515
516        // Ensure the directory is (still) empty.
517        assert_eq!(0, files_in_dir(&root_path).await);
518    }
519
520    #[tokio::test]
521    async fn remove_oldest_entry_on_push() {
522        let data1 = FakeData::random();
523        let data2 = FakeData::random();
524
525        // Create our temporary directory and point our persisted queue at it.
526        //
527        // Our queue is sized such that only one entry can be persisted at a time.
528        let temp_dir = tempfile::tempdir().expect("should not fail to create temporary directory");
529        let root_path = temp_dir.path().to_path_buf();
530
531        let mut persisted_queue = PersistedQueue::<FakeData>::from_root_path(
532            root_path.clone(),
533            32,
534            0.8,
535            DiskUsageRetrieverWrapper::new(Arc::new(DiskUsageRetrieverImpl::new(root_path.clone()))),
536        )
537        .await
538        .expect("should not fail to create persisted queue");
539
540        // Ensure the directory is empty.
541        assert_eq!(0, files_in_dir(&root_path).await);
542
543        // Push our data to the queue and ensure it persisted it to disk.
544        let push_result = persisted_queue.push(data1).await.expect("should not fail to push data");
545        assert_eq!(1, files_in_dir(&root_path).await);
546        assert_eq!(0, push_result.items_dropped);
547        assert_eq!(0, push_result.events_dropped);
548
549        // Push a second data entry, which should cause the first entry to be removed.
550        let push_result = persisted_queue
551            .push(data2.clone())
552            .await
553            .expect("should not fail to push data");
554        assert_eq!(1, files_in_dir(&root_path).await);
555        assert_eq!(1, push_result.items_dropped);
556        assert_eq!(1, push_result.events_dropped);
557
558        // Now pop the data back out and ensure it matches the second item we pushed -- indicating the first item was
559        // removed -- and that we've consumed it, leaving no files on disk.
560        let actual = persisted_queue
561            .pop()
562            .await
563            .expect("should not fail to pop data")
564            .expect("should not be empty");
565        assert_eq!(data2, actual);
566        assert_eq!(0, files_in_dir(&root_path).await);
567    }
568
569    #[tokio::test]
570    async fn storage_ratio_exceeded() {
571        let data1 = FakeData::random();
572        let data2 = FakeData::random();
573
574        // Create our temporary directory and point our persisted queue at it.
575        //
576        // Our queue is sized such that two entries can be persisted at a time.
577        let temp_dir = tempfile::tempdir().expect("should not fail to create temporary directory");
578        let root_path = temp_dir.path().to_path_buf();
579
580        let mut persisted_queue = PersistedQueue::<FakeData>::from_root_path(
581            root_path.clone(),
582            80,
583            0.35,
584            DiskUsageRetrieverWrapper::new(Arc::new(MockDiskUsageRetriever {})),
585        )
586        .await
587        .expect("should not fail to create persisted queue");
588
589        // Ensure the directory is empty.
590        assert_eq!(0, files_in_dir(&root_path).await);
591
592        // The `storage_max_disk_ratio` is 0.35, and our `MockDiskUsageRetriever` returns 100 for both `total_space` and
593        // `available_space`, so `on_disk_bytes_limit()` returns min(80, 35) = 35.
594        //
595        // First entry: total_on_disk_bytes(0) + required_bytes(30) < on_disk_bytes_limit(35)
596        let push_result = persisted_queue.push(data1).await.expect("should not fail to push data");
597
598        assert_eq!(1, files_in_dir(&root_path).await);
599        assert_eq!(0, push_result.items_dropped);
600        assert_eq!(0, push_result.events_dropped);
601
602        // Second entry: total_on_disk_bytes(30) + required_bytes(30) > on_disk_bytes_limit(35) so the first entry is dropped.
603        let push_result = persisted_queue
604            .push(data2.clone())
605            .await
606            .expect("should not fail to push data");
607        assert_eq!(1, files_in_dir(&root_path).await);
608        assert_eq!(1, push_result.items_dropped);
609        assert_eq!(1, push_result.events_dropped);
610
611        // Now pop the data back out and ensure it matches the second item we pushed -- indicating the first item was
612        // removed -- and that we've consumed it, leaving no files on disk.
613        let actual = persisted_queue
614            .pop()
615            .await
616            .expect("should not fail to pop data")
617            .expect("should not be empty");
618        assert_eq!(data2, actual);
619        assert_eq!(0, files_in_dir(&root_path).await);
620    }
621}