1use std::{
2 io,
3 marker::PhantomData,
4 path::{Path, PathBuf},
5 sync::Arc,
6};
7
8use chrono::{DateTime, NaiveDateTime, Utc};
9use fs4::{available_space, total_space};
10use rand::RngExt as _;
11use saluki_error::{generic_error, ErrorContext as _, GenericError};
12use serde::{de::DeserializeOwned, Serialize};
13use tracing::{debug, info, warn};
14
15use super::{EventContainer, PushResult};
16
17struct PersistedEntry {
21 path: PathBuf,
22 timestamp: u128,
23 size_bytes: u64,
24}
25
26impl PersistedEntry {
27 fn try_from_path(path: PathBuf, size_bytes: u64) -> Option<Self> {
31 let timestamp = decode_timestamped_filename(&path)?;
32 Some(Self {
33 path,
34 timestamp,
35 size_bytes,
36 })
37 }
38
39 fn from_parts(path: PathBuf, timestamp: u128, size_bytes: u64) -> Self {
40 Self {
41 path,
42 timestamp,
43 size_bytes,
44 }
45 }
46}
47
48pub trait DiskUsageRetriever {
49 fn total_space(&self) -> Result<u64, GenericError>;
50 fn available_space(&self) -> Result<u64, GenericError>;
51}
52
53pub struct DiskUsageRetrieverImpl {
54 root_path: PathBuf,
55}
56
57impl DiskUsageRetrieverImpl {
58 pub fn new(root_path: PathBuf) -> Self {
59 Self { root_path }
60 }
61}
62
63impl DiskUsageRetriever for DiskUsageRetrieverImpl {
64 fn total_space(&self) -> Result<u64, GenericError> {
65 total_space(&self.root_path)
66 .with_error_context(|| format!("Failed to get total space for '{}'.", self.root_path.display()))
67 }
68
69 fn available_space(&self) -> Result<u64, GenericError> {
70 available_space(&self.root_path)
71 .with_error_context(|| format!("Failed to get available space for '{}'.", self.root_path.display()))
72 }
73}
74
75#[derive(Clone)]
76pub struct DiskUsageRetrieverWrapper {
77 inner: Arc<dyn DiskUsageRetriever + Send + Sync>,
78}
79
80impl DiskUsageRetrieverWrapper {
81 pub fn new(disk_usage_retriever: Arc<dyn DiskUsageRetriever + Send + Sync>) -> Self {
82 Self {
83 inner: disk_usage_retriever,
84 }
85 }
86}
87
88pub struct PersistedQueueArgs {
90 pub root_path: PathBuf,
92 pub max_on_disk_bytes: u64,
94 pub storage_max_disk_ratio: f64,
96 pub disk_usage_retriever: Arc<dyn DiskUsageRetriever + Send + Sync>,
98 pub max_age_days: u32,
103}
104
105pub struct PersistedQueue<T> {
106 root_path: PathBuf,
107 entries: Vec<PersistedEntry>,
108 total_on_disk_bytes: u64,
109 max_on_disk_bytes: u64,
110 storage_max_disk_ratio: f64,
111 disk_usage_retriever: DiskUsageRetrieverWrapper,
112 max_age_days: u32,
113 entries_dropped: u64,
114 _entry: PhantomData<T>,
115}
116
117impl<T> PersistedQueue<T>
118where
119 T: EventContainer + DeserializeOwned + Serialize,
120{
121 pub async fn from_root_path(args: PersistedQueueArgs) -> Result<Self, GenericError> {
133 let PersistedQueueArgs {
134 root_path,
135 max_on_disk_bytes,
136 storage_max_disk_ratio,
137 disk_usage_retriever,
138 max_age_days,
139 } = args;
140
141 create_directory_recursive(root_path.clone())
143 .await
144 .with_error_context(|| format!("Failed to create retry directory '{}'.", root_path.display()))?;
145
146 let mut persisted_requests = Self {
147 root_path: root_path.clone(),
148 entries: Vec::new(),
149 total_on_disk_bytes: 0,
150 max_on_disk_bytes,
151 storage_max_disk_ratio,
152 disk_usage_retriever: DiskUsageRetrieverWrapper::new(disk_usage_retriever),
153 max_age_days,
154 entries_dropped: 0,
155 _entry: PhantomData,
156 };
157
158 persisted_requests.refresh_entry_state().await?;
159
160 info!(
161 "Persisted retry queue initialized. Transactions will be stored in '{}'.",
162 root_path.display()
163 );
164
165 Ok(persisted_requests)
166 }
167
168 pub fn is_empty(&self) -> bool {
170 self.entries.is_empty()
171 }
172
173 pub fn len(&self) -> usize {
175 self.entries.len()
176 }
177
178 pub async fn available_capacity_bytes(&self) -> Result<u64, GenericError> {
187 let disk_usage_retriever = self.disk_usage_retriever.clone();
188 let storage_max_disk_ratio = self.storage_max_disk_ratio;
189 let max_on_disk_bytes = self.max_on_disk_bytes;
190
191 let limit = tokio::task::spawn_blocking(move || {
192 on_disk_bytes_limit(disk_usage_retriever, storage_max_disk_ratio, max_on_disk_bytes)
193 })
194 .await
195 .error_context("Failed to run disk size limit check to completion.")??;
196
197 Ok(limit.saturating_sub(self.total_on_disk_bytes))
198 }
199
200 pub fn take_entries_dropped(&mut self) -> u64 {
203 std::mem::take(&mut self.entries_dropped)
204 }
205
206 pub async fn remove_stale_files(&mut self) -> Result<u32, GenericError> {
214 let removed = remove_outdated_retry_files(&self.root_path, self.max_age_days).await?;
215 self.refresh_entry_state().await.map_err(GenericError::from)?;
216 Ok(removed)
217 }
218
219 pub async fn push(&mut self, entry: T) -> Result<PushResult, GenericError> {
226 let (filename, timestamp) = generate_timestamped_filename();
228 let entry_path = self.root_path.join(filename);
229 let serialized = serde_json::to_vec(&entry)
230 .with_error_context(|| format!("Failed to serialize entry for '{}'.", entry_path.display()))?;
231
232 if serialized.len() as u64 > self.max_on_disk_bytes {
233 return Err(generic_error!("Entry is too large to persist."));
234 }
235
236 let push_result = self
238 .remove_until_available_space(serialized.len() as u64)
239 .await
240 .error_context(
241 "Failed to remove older persisted entries to make space for the incoming persisted entry.",
242 )?;
243
244 tokio::fs::write(&entry_path, &serialized)
246 .await
247 .with_error_context(|| format!("Failed to write entry to '{}'.", entry_path.display()))?;
248
249 self.entries.push(PersistedEntry::from_parts(
251 entry_path,
252 timestamp,
253 serialized.len() as u64,
254 ));
255 self.total_on_disk_bytes += serialized.len() as u64;
256
257 debug!(entry.len = serialized.len(), "Enqueued persisted entry.");
258
259 Ok(push_result)
260 }
261
262 pub async fn pop(&mut self) -> Result<Option<T>, GenericError> {
268 loop {
269 if self.entries.is_empty() {
270 return Ok(None);
271 }
272
273 let entry = self.entries.remove(0);
274 match try_deserialize_entry(&entry).await {
275 Ok(Some(deserialized)) => {
276 self.total_on_disk_bytes -= entry.size_bytes;
278 debug!(entry.len = entry.size_bytes, "Dequeued persisted entry.");
279
280 return Ok(Some(deserialized));
281 }
282 Ok(None) => {
283 self.refresh_entry_state().await?;
286 continue;
287 }
288 Err(e) => {
289 warn!(
292 entry.path = %entry.path.display(),
293 entry.len = entry.size_bytes,
294 error = %e,
295 "Permanently dropping persisted entry that could not be consumed.",
296 );
297
298 self.total_on_disk_bytes -= entry.size_bytes;
299 self.entries_dropped += 1;
300 continue;
301 }
302 }
303 }
304 }
305
306 async fn refresh_entry_state(&mut self) -> io::Result<()> {
307 let mut entries = Vec::new();
309
310 let mut dir_reader = tokio::fs::read_dir(&self.root_path).await?;
311 while let Some(entry) = dir_reader.next_entry().await? {
312 let metadata = entry.metadata().await?;
313 if metadata.is_file() {
314 match PersistedEntry::try_from_path(entry.path(), metadata.len()) {
315 Some(entry) => entries.push(entry),
316 None => {
317 warn!(
318 file_size = metadata.len(),
319 "Ignoring unrecognized file '{}' in retry directory.",
320 entry.path().display()
321 );
322 continue;
323 }
324 }
325 }
326 }
327
328 entries.sort_by_key(|entry| entry.timestamp);
330 self.total_on_disk_bytes = entries.iter().map(|entry| entry.size_bytes).sum();
331 self.entries = entries;
332
333 Ok(())
334 }
335
336 async fn remove_until_available_space(&mut self, required_bytes: u64) -> Result<PushResult, GenericError> {
343 let mut push_result = PushResult::default();
344
345 let disk_usage_retriever = self.disk_usage_retriever.clone();
346 let storage_max_disk_ratio = self.storage_max_disk_ratio;
347 let max_on_disk_bytes = self.max_on_disk_bytes;
348
349 let limit = tokio::task::spawn_blocking(move || {
356 on_disk_bytes_limit(disk_usage_retriever, storage_max_disk_ratio, max_on_disk_bytes)
357 })
358 .await
359 .error_context("Failed to run disk size limit check to completion.")??;
360
361 while !self.entries.is_empty() && self.total_on_disk_bytes + required_bytes > limit {
362 let entry = self.entries.remove(0);
363
364 let deserialized = match try_deserialize_entry::<T>(&entry).await {
366 Ok(Some(deserialized)) => deserialized,
367 Ok(None) => {
368 warn!(entry.path = %entry.path.display(), "Failed to find entry on disk. Persisted entry state may be inconsistent.");
369 continue;
370 }
371 Err(e) => {
372 warn!(
375 entry.path = %entry.path.display(),
376 entry.len = entry.size_bytes,
377 error = %e,
378 "Permanently dropping persisted entry that could not be consumed during eviction.",
379 );
380
381 self.total_on_disk_bytes -= entry.size_bytes;
382 self.entries_dropped += 1;
383 continue;
384 }
385 };
386
387 self.total_on_disk_bytes -= entry.size_bytes;
389 push_result.track_dropped_item(&deserialized);
390
391 warn!(entry.path = %entry.path.display(), entry.len = entry.size_bytes, "Dropped persisted entry.");
392 }
393
394 Ok(push_result)
395 }
396}
397
398fn on_disk_bytes_limit(
406 disk_usage_retriever: DiskUsageRetrieverWrapper, storage_max_disk_ratio: f64, max_on_disk_bytes: u64,
407) -> Result<u64, GenericError> {
408 let total_space = disk_usage_retriever.inner.total_space()? as f64;
409 let available_space = disk_usage_retriever.inner.available_space()? as f64;
410 let disk_reserved = total_space * (1.0 - storage_max_disk_ratio);
411 let available_disk_usage = (available_space - disk_reserved).ceil() as u64;
412 Ok(max_on_disk_bytes.min(available_disk_usage))
413}
414
415async fn try_deserialize_entry<T: DeserializeOwned>(entry: &PersistedEntry) -> Result<Option<T>, GenericError> {
416 let serialized = match tokio::fs::read(&entry.path).await {
417 Ok(serialized) => serialized,
418 Err(e) => match e.kind() {
419 io::ErrorKind::NotFound => {
420 return Ok(None);
426 }
427 _ => {
428 return Err(e)
429 .with_error_context(|| format!("Failed to read persisted entry '{}'.", entry.path.display()))
430 }
431 },
432 };
433
434 let deserialized = match serde_json::from_slice(&serialized) {
435 Ok(deserialized) => deserialized,
436 Err(e) => {
437 if let Err(remove_err) = tokio::fs::remove_file(&entry.path).await {
440 warn!(
441 entry.path = %entry.path.display(),
442 error = %remove_err,
443 "Failed to remove corrupt persisted entry from disk.",
444 );
445 }
446
447 return Err(e)
448 .with_error_context(|| format!("Failed to deserialize persisted entry '{}'.", entry.path.display()));
449 }
450 };
451
452 tokio::fs::remove_file(&entry.path)
454 .await
455 .with_error_context(|| format!("Failed to delete persisted entry '{}'.", entry.path.display()))?;
456
457 debug!(entry.path = %entry.path.display(), entry.len = entry.size_bytes, "Consumed persisted entry and removed from disk.");
458 Ok(Some(deserialized))
459}
460
461fn generate_timestamped_filename() -> (PathBuf, u128) {
462 let now = Utc::now();
463 let now_ts = datetime_to_timestamp(now);
464 let nonce = rand::rng().random_range(100000000..999999999);
465
466 let filename = format!("retry-{}-{}.json", now.format("%Y%m%d%H%M%S%f"), nonce).into();
467
468 (filename, now_ts)
469}
470
471fn decode_timestamped_filename(path: &Path) -> Option<u128> {
472 let filename = path.file_stem()?.to_str()?;
473 let mut filename_parts = filename.split('-');
474
475 let prefix = filename_parts.next()?;
476 let timestamp_str = filename_parts.next()?;
477 let nonce = filename_parts.next()?;
478
479 if prefix != "retry" || nonce.parse::<u64>().is_err() {
481 return None;
482 }
483
484 NaiveDateTime::parse_from_str(timestamp_str, "%Y%m%d%H%M%S%f")
486 .map(|dt| datetime_to_timestamp(dt.and_utc()))
487 .ok()
488}
489
490fn datetime_to_timestamp(dt: DateTime<Utc>) -> u128 {
491 let secs = (dt.timestamp() as u128) * 1_000_000_000;
492 let ns = dt.timestamp_subsec_nanos() as u128;
493
494 secs + ns
495}
496
497async fn create_directory_recursive(path: PathBuf) -> Result<(), GenericError> {
498 let mut dir_builder = std::fs::DirBuilder::new();
499 dir_builder.recursive(true);
500
501 #[cfg(unix)]
504 {
505 use std::os::unix::fs::DirBuilderExt;
506 dir_builder.mode(0o700);
507 }
508
509 tokio::task::spawn_blocking(move || {
510 dir_builder
511 .create(&path)
512 .with_error_context(|| format!("Failed to create directory '{}'.", path.display()))
513 })
514 .await
515 .error_context("Failed to spawn directory creation blocking task.")?
516}
517
518async fn remove_outdated_retry_files(queue_path: &Path, max_age_days: u32) -> Result<u32, GenericError> {
524 let mut dir = match tokio::fs::read_dir(queue_path).await {
525 Ok(d) => d,
526 Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(0),
527 Err(e) => {
528 return Err(e).with_error_context(|| {
529 format!(
530 "Failed to open retry queue directory '{}' for age-based cleanup.",
531 queue_path.display()
532 )
533 });
534 }
535 };
536 let now_ns = std::time::SystemTime::now()
537 .duration_since(std::time::UNIX_EPOCH)
538 .unwrap_or_default() .as_nanos();
540 let cutoff_ns = now_ns.saturating_sub(max_age_days as u128 * 24 * 3600 * 1_000_000_000);
541 let mut removed = 0u32;
542 loop {
543 let entry = match dir.next_entry().await {
544 Ok(Some(e)) => e,
545 Ok(None) => break,
546 Err(e) => {
547 return Err(e).with_error_context(|| "Error reading retry queue directory during age-based cleanup.");
548 }
549 };
550 let file_ts = match decode_timestamped_filename(&entry.path()) {
551 Some(ts) => ts,
552 None => continue,
553 };
554 if file_ts < cutoff_ns {
555 let name_str = entry.file_name();
556 let name = name_str.to_string_lossy();
557 match tokio::fs::remove_file(entry.path()).await {
558 Ok(()) => {
559 debug!(file = %name, "Removed outdated retry file.");
560 removed += 1;
561 }
562 Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
563 debug!(file = %name, "Retry file already removed by concurrent cleanup.");
564 }
565 Err(e) => {
566 warn!(file = %name, error = %e, "Failed to remove outdated retry file.");
567 }
568 }
569 }
570 }
571 Ok(removed)
572}
573
574#[cfg(test)]
575mod tests {
576 use rand::RngExt as _;
577 use rand_distr::Alphanumeric;
578 use serde::Deserialize;
579
580 use super::*;
581
582 #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
583 struct FakeData {
584 name: String,
585 value: u32,
586 }
587
588 impl FakeData {
589 fn random() -> Self {
590 Self {
591 name: rand::rng().sample_iter(&Alphanumeric).take(8).map(char::from).collect(),
592 value: rand::rng().random_range(0..100),
593 }
594 }
595 }
596
597 impl EventContainer for FakeData {
598 fn event_count(&self) -> u64 {
599 1
600 }
601 }
602
603 struct MockDiskUsageRetriever {}
604
605 impl DiskUsageRetriever for MockDiskUsageRetriever {
606 fn total_space(&self) -> Result<u64, GenericError> {
607 Ok(100)
608 }
609 fn available_space(&self) -> Result<u64, GenericError> {
610 Ok(100)
611 }
612 }
613
614 async fn files_in_dir(path: &Path) -> usize {
615 let mut file_count = 0;
616 let mut dir_reader = tokio::fs::read_dir(path).await.unwrap();
617 while let Some(entry) = dir_reader.next_entry().await.unwrap() {
618 if entry.metadata().await.unwrap().is_file() {
619 file_count += 1;
620 }
621 }
622 file_count
623 }
624
625 #[tokio::test]
626 async fn basic_push_pop() {
627 let data = FakeData::random();
628
629 let temp_dir = tempfile::tempdir().expect("should not fail to create temporary directory");
631 let root_path = temp_dir.path().to_path_buf();
632
633 let mut persisted_queue = PersistedQueue::<FakeData>::from_root_path(PersistedQueueArgs {
634 root_path: root_path.clone(),
635 max_on_disk_bytes: 1024,
636 storage_max_disk_ratio: 0.8,
637 disk_usage_retriever: Arc::new(DiskUsageRetrieverImpl::new(root_path.clone())),
638 max_age_days: 10,
639 })
640 .await
641 .expect("should not fail to create persisted queue");
642
643 assert_eq!(0, files_in_dir(&root_path).await);
645
646 let push_result = persisted_queue
648 .push(data.clone())
649 .await
650 .expect("should not fail to push data");
651 assert_eq!(1, files_in_dir(&root_path).await);
652 assert_eq!(0, push_result.items_dropped);
653 assert_eq!(0, push_result.events_dropped);
654
655 let actual = persisted_queue
657 .pop()
658 .await
659 .expect("should not fail to pop data")
660 .expect("should not be empty");
661 assert_eq!(data, actual);
662 assert_eq!(0, files_in_dir(&root_path).await);
663 }
664
665 #[tokio::test]
666 async fn entry_too_large() {
667 let data = FakeData::random();
668
669 let temp_dir = tempfile::tempdir().expect("should not fail to create temporary directory");
671 let root_path = temp_dir.path().to_path_buf();
672
673 let mut persisted_queue = PersistedQueue::<FakeData>::from_root_path(PersistedQueueArgs {
674 root_path: root_path.clone(),
675 max_on_disk_bytes: 1,
676 storage_max_disk_ratio: 0.8,
677 disk_usage_retriever: Arc::new(DiskUsageRetrieverImpl::new(root_path.clone())),
678 max_age_days: 10,
679 })
680 .await
681 .expect("should not fail to create persisted queue");
682
683 assert_eq!(0, files_in_dir(&root_path).await);
685
686 assert!(persisted_queue.push(data).await.is_err());
688
689 assert_eq!(0, files_in_dir(&root_path).await);
691 }
692
693 #[tokio::test]
694 async fn remove_oldest_entry_on_push() {
695 let data1 = FakeData::random();
696 let data2 = FakeData::random();
697
698 let temp_dir = tempfile::tempdir().expect("should not fail to create temporary directory");
702 let root_path = temp_dir.path().to_path_buf();
703
704 let mut persisted_queue = PersistedQueue::<FakeData>::from_root_path(PersistedQueueArgs {
705 root_path: root_path.clone(),
706 max_on_disk_bytes: 32,
707 storage_max_disk_ratio: 0.8,
708 disk_usage_retriever: Arc::new(DiskUsageRetrieverImpl::new(root_path.clone())),
709 max_age_days: 10,
710 })
711 .await
712 .expect("should not fail to create persisted queue");
713
714 assert_eq!(0, files_in_dir(&root_path).await);
716
717 let push_result = persisted_queue.push(data1).await.expect("should not fail to push data");
719 assert_eq!(1, files_in_dir(&root_path).await);
720 assert_eq!(0, push_result.items_dropped);
721 assert_eq!(0, push_result.events_dropped);
722
723 let push_result = persisted_queue
725 .push(data2.clone())
726 .await
727 .expect("should not fail to push data");
728 assert_eq!(1, files_in_dir(&root_path).await);
729 assert_eq!(1, push_result.items_dropped);
730 assert_eq!(1, push_result.events_dropped);
731
732 let actual = persisted_queue
735 .pop()
736 .await
737 .expect("should not fail to pop data")
738 .expect("should not be empty");
739 assert_eq!(data2, actual);
740 assert_eq!(0, files_in_dir(&root_path).await);
741 }
742
743 #[tokio::test]
744 async fn storage_ratio_exceeded() {
745 let data1 = FakeData::random();
746 let data2 = FakeData::random();
747
748 let temp_dir = tempfile::tempdir().expect("should not fail to create temporary directory");
752 let root_path = temp_dir.path().to_path_buf();
753
754 let mut persisted_queue = PersistedQueue::<FakeData>::from_root_path(PersistedQueueArgs {
755 root_path: root_path.clone(),
756 max_on_disk_bytes: 80,
757 storage_max_disk_ratio: 0.35,
758 disk_usage_retriever: Arc::new(MockDiskUsageRetriever {}),
759 max_age_days: 10,
760 })
761 .await
762 .expect("should not fail to create persisted queue");
763
764 assert_eq!(0, files_in_dir(&root_path).await);
766
767 let push_result = persisted_queue.push(data1).await.expect("should not fail to push data");
772
773 assert_eq!(1, files_in_dir(&root_path).await);
774 assert_eq!(0, push_result.items_dropped);
775 assert_eq!(0, push_result.events_dropped);
776
777 let push_result = persisted_queue
779 .push(data2.clone())
780 .await
781 .expect("should not fail to push data");
782 assert_eq!(1, files_in_dir(&root_path).await);
783 assert_eq!(1, push_result.items_dropped);
784 assert_eq!(1, push_result.events_dropped);
785
786 let actual = persisted_queue
789 .pop()
790 .await
791 .expect("should not fail to pop data")
792 .expect("should not be empty");
793 assert_eq!(data2, actual);
794 assert_eq!(0, files_in_dir(&root_path).await);
795 }
796
797 async fn write_corrupt_entry(dir: &Path) -> PathBuf {
800 let filename = "retry-20000101000000000000-100000000.json";
801 let path = dir.join(filename);
802 tokio::fs::write(&path, b"this is not valid json").await.unwrap();
803 path
804 }
805
806 #[tokio::test]
807 async fn corrupt_entry_is_skipped_on_pop() {
808 let data = FakeData::random();
809
810 let temp_dir = tempfile::tempdir().expect("should not fail to create temporary directory");
811 let root_path = temp_dir.path().to_path_buf();
812
813 let mut persisted_queue = PersistedQueue::<FakeData>::from_root_path(PersistedQueueArgs {
814 root_path: root_path.clone(),
815 max_on_disk_bytes: 1024,
816 storage_max_disk_ratio: 0.8,
817 disk_usage_retriever: Arc::new(MockDiskUsageRetriever {}),
818 max_age_days: 10,
819 })
820 .await
821 .expect("should not fail to create persisted queue");
822
823 let corrupt_path = write_corrupt_entry(&root_path).await;
825
826 let _ = persisted_queue
828 .push(data.clone())
829 .await
830 .expect("should not fail to push data");
831
832 persisted_queue.refresh_entry_state().await.unwrap();
834
835 let actual = persisted_queue
837 .pop()
838 .await
839 .expect("should not fail to pop data")
840 .expect("should have a valid entry");
841 assert_eq!(data, actual);
842
843 assert!(!corrupt_path.exists());
845
846 assert_eq!(1, persisted_queue.take_entries_dropped());
848
849 assert_eq!(0, files_in_dir(&root_path).await);
851 }
852
853 #[tokio::test]
854 async fn corrupt_entry_does_not_block_queue() {
855 let data1 = FakeData::random();
856 let data2 = FakeData::random();
857
858 let temp_dir = tempfile::tempdir().expect("should not fail to create temporary directory");
859 let root_path = temp_dir.path().to_path_buf();
860
861 let mut persisted_queue = PersistedQueue::<FakeData>::from_root_path(PersistedQueueArgs {
863 root_path: root_path.clone(),
864 max_on_disk_bytes: 1024,
865 storage_max_disk_ratio: 0.8,
866 disk_usage_retriever: Arc::new(MockDiskUsageRetriever {}),
867 max_age_days: 10,
868 })
869 .await
870 .expect("should not fail to create persisted queue");
871
872 let _ = persisted_queue.push(data1).await.expect("should not fail to push data");
874 let _ = persisted_queue
875 .push(data2.clone())
876 .await
877 .expect("should not fail to push data");
878 assert_eq!(2, persisted_queue.entries.len());
879
880 let oldest_path = persisted_queue.entries[0].path.clone();
882 tokio::fs::write(&oldest_path, b"corrupted").await.unwrap();
883
884 let actual = persisted_queue
886 .pop()
887 .await
888 .expect("should not fail to pop data")
889 .expect("should have a valid entry");
890 assert_eq!(data2, actual);
891
892 assert_eq!(1, persisted_queue.take_entries_dropped());
893 assert_eq!(0, files_in_dir(&root_path).await);
894 }
895
896 #[tokio::test]
897 async fn pop_returns_none_when_all_entries_corrupt() {
898 let temp_dir = tempfile::tempdir().expect("should not fail to create temporary directory");
899 let root_path = temp_dir.path().to_path_buf();
900
901 let mut persisted_queue = PersistedQueue::<FakeData>::from_root_path(PersistedQueueArgs {
902 root_path: root_path.clone(),
903 max_on_disk_bytes: 1024,
904 storage_max_disk_ratio: 0.8,
905 disk_usage_retriever: Arc::new(MockDiskUsageRetriever {}),
906 max_age_days: 10,
907 })
908 .await
909 .expect("should not fail to create persisted queue");
910
911 write_corrupt_entry(&root_path).await;
913 persisted_queue.refresh_entry_state().await.unwrap();
914
915 let result = persisted_queue.pop().await.expect("should not fail to pop data");
917 assert!(result.is_none());
918
919 assert_eq!(1, persisted_queue.take_entries_dropped());
920 assert_eq!(0, files_in_dir(&root_path).await);
921 }
922
923 #[tokio::test]
924 async fn corrupt_entry_dropped_during_eviction() {
925 let data = FakeData::random();
926
927 let temp_dir = tempfile::tempdir().expect("should not fail to create temporary directory");
928 let root_path = temp_dir.path().to_path_buf();
929
930 let mut persisted_queue = PersistedQueue::<FakeData>::from_root_path(PersistedQueueArgs {
932 root_path: root_path.clone(),
933 max_on_disk_bytes: 32,
934 storage_max_disk_ratio: 0.8,
935 disk_usage_retriever: Arc::new(MockDiskUsageRetriever {}),
936 max_age_days: 10,
937 })
938 .await
939 .expect("should not fail to create persisted queue");
940
941 let _ = persisted_queue
943 .push(FakeData::random())
944 .await
945 .expect("should not fail to push data");
946 let first_path = persisted_queue.entries[0].path.clone();
947 tokio::fs::write(&first_path, b"corrupted").await.unwrap();
948
949 let _ = persisted_queue
952 .push(data.clone())
953 .await
954 .expect("should not fail to push data");
955
956 assert_eq!(1, persisted_queue.take_entries_dropped());
958
959 let actual = persisted_queue
961 .pop()
962 .await
963 .expect("should not fail to pop data")
964 .expect("should have a valid entry");
965 assert_eq!(data, actual);
966 assert_eq!(0, files_in_dir(&root_path).await);
967 }
968
969 #[tokio::test]
970 async fn persisted_queue_removes_outdated_files_on_initialization() {
971 let data = FakeData::random();
972
973 let temp_dir = tempfile::tempdir().expect("should not fail to create temporary directory");
974 let root_path = temp_dir.path().to_path_buf();
975
976 let stale_content = serde_json::to_vec(&data).unwrap();
979 tokio::fs::write(
980 root_path.join("retry-20000101000000000000000-100000000.json"),
981 &stale_content,
982 )
983 .await
984 .unwrap();
985
986 assert_eq!(1, files_in_dir(&root_path).await);
987
988 let mut queue = PersistedQueue::<FakeData>::from_root_path(PersistedQueueArgs {
990 root_path: root_path.clone(),
991 max_on_disk_bytes: 1024 * 1024,
992 storage_max_disk_ratio: 0.8,
993 disk_usage_retriever: Arc::new(DiskUsageRetrieverImpl::new(root_path.clone())),
994 max_age_days: 10,
995 })
996 .await
997 .expect("should not fail to create persisted queue");
998 queue
999 .remove_stale_files()
1000 .await
1001 .expect("should not fail to remove stale files");
1002
1003 assert_eq!(0, files_in_dir(&root_path).await);
1004 assert!(queue.is_empty());
1005 }
1006
1007 #[tokio::test]
1008 async fn persisted_queue_zero_age_removes_all_retry_files_on_initialization() {
1009 let data = FakeData::random();
1012
1013 let temp_dir = tempfile::tempdir().expect("should not fail to create temporary directory");
1014 let root_path = temp_dir.path().to_path_buf();
1015
1016 let mut seeding_queue = PersistedQueue::<FakeData>::from_root_path(PersistedQueueArgs {
1018 root_path: root_path.clone(),
1019 max_on_disk_bytes: 1024 * 1024,
1020 storage_max_disk_ratio: 0.8,
1021 disk_usage_retriever: Arc::new(DiskUsageRetrieverImpl::new(root_path.clone())),
1022 max_age_days: 10,
1023 })
1024 .await
1025 .expect("should not fail to create persisted queue");
1026 let _ = seeding_queue.push(data).await.expect("should not fail to push data");
1027 assert_eq!(1, files_in_dir(&root_path).await);
1028
1029 let mut queue = PersistedQueue::<FakeData>::from_root_path(PersistedQueueArgs {
1031 root_path: root_path.clone(),
1032 max_on_disk_bytes: 1024 * 1024,
1033 storage_max_disk_ratio: 0.8,
1034 disk_usage_retriever: Arc::new(DiskUsageRetrieverImpl::new(root_path.clone())),
1035 max_age_days: 0,
1036 })
1037 .await
1038 .expect("should not fail to create persisted queue");
1039 queue
1040 .remove_stale_files()
1041 .await
1042 .expect("should not fail to remove stale files");
1043
1044 assert_eq!(0, files_in_dir(&root_path).await);
1045 assert!(queue.is_empty());
1046 }
1047}