1use std::{
2 io,
3 marker::PhantomData,
4 path::{Path, PathBuf},
5 sync::Arc,
6};
7
8use chrono::{DateTime, NaiveDateTime, Utc};
9use fs4::{available_space, total_space};
10use rand::Rng;
11use saluki_error::{generic_error, ErrorContext as _, GenericError};
12use serde::{de::DeserializeOwned, Serialize};
13use tracing::{debug, info, warn};
14
15use super::{EventContainer, PushResult};
16
17struct PersistedEntry {
21 path: PathBuf,
22 timestamp: u128,
23 size_bytes: u64,
24}
25
26impl PersistedEntry {
27 fn try_from_path(path: PathBuf, size_bytes: u64) -> Option<Self> {
31 let timestamp = decode_timestamped_filename(&path)?;
32 Some(Self {
33 path,
34 timestamp,
35 size_bytes,
36 })
37 }
38
39 fn from_parts(path: PathBuf, timestamp: u128, size_bytes: u64) -> Self {
40 Self {
41 path,
42 timestamp,
43 size_bytes,
44 }
45 }
46}
47
48pub trait DiskUsageRetriever {
49 fn total_space(&self) -> Result<u64, GenericError>;
50 fn available_space(&self) -> Result<u64, GenericError>;
51}
52
53pub struct DiskUsageRetrieverImpl {
54 root_path: PathBuf,
55}
56
57impl DiskUsageRetrieverImpl {
58 pub fn new(root_path: PathBuf) -> Self {
59 Self { root_path }
60 }
61}
62
63impl DiskUsageRetriever for DiskUsageRetrieverImpl {
64 fn total_space(&self) -> Result<u64, GenericError> {
65 total_space(&self.root_path)
66 .with_error_context(|| format!("Failed to get total space for '{}'.", self.root_path.display()))
67 }
68
69 fn available_space(&self) -> Result<u64, GenericError> {
70 available_space(&self.root_path)
71 .with_error_context(|| format!("Failed to get available space for '{}'.", self.root_path.display()))
72 }
73}
74
75#[derive(Clone)]
76pub struct DiskUsageRetrieverWrapper {
77 inner: Arc<dyn DiskUsageRetriever + Send + Sync>,
78}
79
80impl DiskUsageRetrieverWrapper {
81 pub fn new(disk_usage_retriever: Arc<dyn DiskUsageRetriever + Send + Sync>) -> Self {
82 Self {
83 inner: disk_usage_retriever,
84 }
85 }
86}
87
88pub struct PersistedQueue<T> {
89 root_path: PathBuf,
90 entries: Vec<PersistedEntry>,
91 total_on_disk_bytes: u64,
92 max_on_disk_bytes: u64,
93 storage_max_disk_ratio: f64,
94 disk_usage_retriever: DiskUsageRetrieverWrapper,
95 entries_dropped: u64,
96 _entry: PhantomData<T>,
97}
98
99impl<T> PersistedQueue<T>
100where
101 T: EventContainer + DeserializeOwned + Serialize,
102{
103 pub async fn from_root_path(
113 root_path: PathBuf, max_on_disk_bytes: u64, storage_max_disk_ratio: f64,
114 disk_usage_retriever: DiskUsageRetrieverWrapper,
115 ) -> Result<Self, GenericError> {
116 create_directory_recursive(root_path.clone())
118 .await
119 .with_error_context(|| format!("Failed to create retry directory '{}'.", root_path.display()))?;
120
121 let mut persisted_requests = Self {
122 root_path: root_path.clone(),
123 entries: Vec::new(),
124 total_on_disk_bytes: 0,
125 max_on_disk_bytes,
126 storage_max_disk_ratio,
127 disk_usage_retriever,
128 entries_dropped: 0,
129 _entry: PhantomData,
130 };
131
132 persisted_requests.refresh_entry_state().await?;
133
134 info!(
135 "Persisted retry queue initialized. Transactions will be stored in '{}'.",
136 root_path.display()
137 );
138
139 Ok(persisted_requests)
140 }
141
142 pub fn is_empty(&self) -> bool {
144 self.entries.is_empty()
145 }
146
147 pub fn len(&self) -> usize {
149 self.entries.len()
150 }
151
152 pub fn take_entries_dropped(&mut self) -> u64 {
155 std::mem::take(&mut self.entries_dropped)
156 }
157
158 pub async fn push(&mut self, entry: T) -> Result<PushResult, GenericError> {
165 let (filename, timestamp) = generate_timestamped_filename();
167 let entry_path = self.root_path.join(filename);
168 let serialized = serde_json::to_vec(&entry)
169 .with_error_context(|| format!("Failed to serialize entry for '{}'.", entry_path.display()))?;
170
171 if serialized.len() as u64 > self.max_on_disk_bytes {
172 return Err(generic_error!("Entry is too large to persist."));
173 }
174
175 let push_result = self
177 .remove_until_available_space(serialized.len() as u64)
178 .await
179 .error_context(
180 "Failed to remove older persisted entries to make space for the incoming persisted entry.",
181 )?;
182
183 tokio::fs::write(&entry_path, &serialized)
185 .await
186 .with_error_context(|| format!("Failed to write entry to '{}'.", entry_path.display()))?;
187
188 self.entries.push(PersistedEntry::from_parts(
190 entry_path,
191 timestamp,
192 serialized.len() as u64,
193 ));
194 self.total_on_disk_bytes += serialized.len() as u64;
195
196 debug!(entry.len = serialized.len(), "Enqueued persisted entry.");
197
198 Ok(push_result)
199 }
200
201 pub async fn pop(&mut self) -> Result<Option<T>, GenericError> {
207 loop {
208 if self.entries.is_empty() {
209 return Ok(None);
210 }
211
212 let entry = self.entries.remove(0);
213 match try_deserialize_entry(&entry).await {
214 Ok(Some(deserialized)) => {
215 self.total_on_disk_bytes -= entry.size_bytes;
217 debug!(entry.len = entry.size_bytes, "Dequeued persisted entry.");
218
219 return Ok(Some(deserialized));
220 }
221 Ok(None) => {
222 self.refresh_entry_state().await?;
225 continue;
226 }
227 Err(e) => {
228 warn!(
231 entry.path = %entry.path.display(),
232 entry.len = entry.size_bytes,
233 error = %e,
234 "Permanently dropping persisted entry that could not be consumed.",
235 );
236
237 self.total_on_disk_bytes -= entry.size_bytes;
238 self.entries_dropped += 1;
239 continue;
240 }
241 }
242 }
243 }
244
245 async fn refresh_entry_state(&mut self) -> io::Result<()> {
246 let mut entries = Vec::new();
248
249 let mut dir_reader = tokio::fs::read_dir(&self.root_path).await?;
250 while let Some(entry) = dir_reader.next_entry().await? {
251 let metadata = entry.metadata().await?;
252 if metadata.is_file() {
253 match PersistedEntry::try_from_path(entry.path(), metadata.len()) {
254 Some(entry) => entries.push(entry),
255 None => {
256 warn!(
257 file_size = metadata.len(),
258 "Ignoring unrecognized file '{}' in retry directory.",
259 entry.path().display()
260 );
261 continue;
262 }
263 }
264 }
265 }
266
267 entries.sort_by_key(|entry| entry.timestamp);
269 self.total_on_disk_bytes = entries.iter().map(|entry| entry.size_bytes).sum();
270 self.entries = entries;
271
272 Ok(())
273 }
274
275 async fn remove_until_available_space(&mut self, required_bytes: u64) -> Result<PushResult, GenericError> {
282 let mut push_result = PushResult::default();
283
284 let disk_usage_retriever = self.disk_usage_retriever.clone();
285 let storage_max_disk_ratio = self.storage_max_disk_ratio;
286 let max_on_disk_bytes = self.max_on_disk_bytes;
287
288 let limit = tokio::task::spawn_blocking(move || {
295 on_disk_bytes_limit(disk_usage_retriever, storage_max_disk_ratio, max_on_disk_bytes)
296 })
297 .await
298 .error_context("Failed to run disk size limit check to completion.")??;
299
300 while !self.entries.is_empty() && self.total_on_disk_bytes + required_bytes > limit {
301 let entry = self.entries.remove(0);
302
303 let event_count = match try_deserialize_entry::<T>(&entry).await {
305 Ok(Some(deserialized)) => deserialized.event_count(),
306 Ok(None) => {
307 warn!(entry.path = %entry.path.display(), "Failed to find entry on disk. Persisted entry state may be inconsistent.");
308 continue;
309 }
310 Err(e) => {
311 warn!(
314 entry.path = %entry.path.display(),
315 entry.len = entry.size_bytes,
316 error = %e,
317 "Permanently dropping persisted entry that could not be consumed during eviction.",
318 );
319
320 self.total_on_disk_bytes -= entry.size_bytes;
321 self.entries_dropped += 1;
322 continue;
323 }
324 };
325
326 self.total_on_disk_bytes -= entry.size_bytes;
328 push_result.track_dropped_item(event_count);
329
330 debug!(entry.path = %entry.path.display(), entry.len = entry.size_bytes, "Dropped persisted entry.");
331 }
332
333 Ok(push_result)
334 }
335}
336
337fn on_disk_bytes_limit(
345 disk_usage_retriever: DiskUsageRetrieverWrapper, storage_max_disk_ratio: f64, max_on_disk_bytes: u64,
346) -> Result<u64, GenericError> {
347 let total_space = disk_usage_retriever.inner.total_space()? as f64;
348 let available_space = disk_usage_retriever.inner.available_space()? as f64;
349 let disk_reserved = total_space * (1.0 - storage_max_disk_ratio);
350 let available_disk_usage = (available_space - disk_reserved).ceil() as u64;
351 Ok(max_on_disk_bytes.min(available_disk_usage))
352}
353
354async fn try_deserialize_entry<T: DeserializeOwned>(entry: &PersistedEntry) -> Result<Option<T>, GenericError> {
355 let serialized = match tokio::fs::read(&entry.path).await {
356 Ok(serialized) => serialized,
357 Err(e) => match e.kind() {
358 io::ErrorKind::NotFound => {
359 return Ok(None);
365 }
366 _ => {
367 return Err(e)
368 .with_error_context(|| format!("Failed to read persisted entry '{}'.", entry.path.display()))
369 }
370 },
371 };
372
373 let deserialized = match serde_json::from_slice(&serialized) {
374 Ok(deserialized) => deserialized,
375 Err(e) => {
376 if let Err(remove_err) = tokio::fs::remove_file(&entry.path).await {
379 warn!(
380 entry.path = %entry.path.display(),
381 error = %remove_err,
382 "Failed to remove corrupt persisted entry from disk.",
383 );
384 }
385
386 return Err(e)
387 .with_error_context(|| format!("Failed to deserialize persisted entry '{}'.", entry.path.display()));
388 }
389 };
390
391 tokio::fs::remove_file(&entry.path)
393 .await
394 .with_error_context(|| format!("Failed to delete persisted entry '{}'.", entry.path.display()))?;
395
396 debug!(entry.path = %entry.path.display(), entry.len = entry.size_bytes, "Consumed persisted entry and removed from disk.");
397 Ok(Some(deserialized))
398}
399
400fn generate_timestamped_filename() -> (PathBuf, u128) {
401 let now = Utc::now();
402 let now_ts = datetime_to_timestamp(now);
403 let nonce = rand::rng().random_range(100000000..999999999);
404
405 let filename = format!("retry-{}-{}.json", now.format("%Y%m%d%H%M%S%f"), nonce).into();
406
407 (filename, now_ts)
408}
409
410fn decode_timestamped_filename(path: &Path) -> Option<u128> {
411 let filename = path.file_stem()?.to_str()?;
412 let mut filename_parts = filename.split('-');
413
414 let prefix = filename_parts.next()?;
415 let timestamp_str = filename_parts.next()?;
416 let nonce = filename_parts.next()?;
417
418 if prefix != "retry" || nonce.parse::<u64>().is_err() {
420 return None;
421 }
422
423 NaiveDateTime::parse_from_str(timestamp_str, "%Y%m%d%H%M%S%f")
425 .map(|dt| datetime_to_timestamp(dt.and_utc()))
426 .ok()
427}
428
429fn datetime_to_timestamp(dt: DateTime<Utc>) -> u128 {
430 let secs = (dt.timestamp() as u128) * 1_000_000_000;
431 let ns = dt.timestamp_subsec_nanos() as u128;
432
433 secs + ns
434}
435
436async fn create_directory_recursive(path: PathBuf) -> Result<(), GenericError> {
437 let mut dir_builder = std::fs::DirBuilder::new();
438 dir_builder.recursive(true);
439
440 #[cfg(unix)]
443 {
444 use std::os::unix::fs::DirBuilderExt;
445 dir_builder.mode(0o700);
446 }
447
448 tokio::task::spawn_blocking(move || {
449 dir_builder
450 .create(&path)
451 .with_error_context(|| format!("Failed to create directory '{}'.", path.display()))
452 })
453 .await
454 .error_context("Failed to spawn directory creation blocking task.")?
455}
456
457#[cfg(test)]
458mod tests {
459 use rand_distr::Alphanumeric;
460 use serde::Deserialize;
461
462 use super::*;
463
464 #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
465 struct FakeData {
466 name: String,
467 value: u32,
468 }
469
470 impl FakeData {
471 fn random() -> Self {
472 Self {
473 name: rand::rng().sample_iter(&Alphanumeric).take(8).map(char::from).collect(),
474 value: rand::rng().random_range(0..100),
475 }
476 }
477 }
478
479 impl EventContainer for FakeData {
480 fn event_count(&self) -> u64 {
481 1
482 }
483 }
484
485 struct MockDiskUsageRetriever {}
486
487 impl DiskUsageRetriever for MockDiskUsageRetriever {
488 fn total_space(&self) -> Result<u64, GenericError> {
489 Ok(100)
490 }
491 fn available_space(&self) -> Result<u64, GenericError> {
492 Ok(100)
493 }
494 }
495
496 async fn files_in_dir(path: &Path) -> usize {
497 let mut file_count = 0;
498 let mut dir_reader = tokio::fs::read_dir(path).await.unwrap();
499 while let Some(entry) = dir_reader.next_entry().await.unwrap() {
500 if entry.metadata().await.unwrap().is_file() {
501 file_count += 1;
502 }
503 }
504 file_count
505 }
506
507 #[tokio::test]
508 async fn basic_push_pop() {
509 let data = FakeData::random();
510
511 let temp_dir = tempfile::tempdir().expect("should not fail to create temporary directory");
513 let root_path = temp_dir.path().to_path_buf();
514
515 let mut persisted_queue = PersistedQueue::<FakeData>::from_root_path(
516 root_path.clone(),
517 1024,
518 0.8,
519 DiskUsageRetrieverWrapper::new(Arc::new(DiskUsageRetrieverImpl::new(root_path.clone()))),
520 )
521 .await
522 .expect("should not fail to create persisted queue");
523
524 assert_eq!(0, files_in_dir(&root_path).await);
526
527 let push_result = persisted_queue
529 .push(data.clone())
530 .await
531 .expect("should not fail to push data");
532 assert_eq!(1, files_in_dir(&root_path).await);
533 assert_eq!(0, push_result.items_dropped);
534 assert_eq!(0, push_result.events_dropped);
535
536 let actual = persisted_queue
538 .pop()
539 .await
540 .expect("should not fail to pop data")
541 .expect("should not be empty");
542 assert_eq!(data, actual);
543 assert_eq!(0, files_in_dir(&root_path).await);
544 }
545
546 #[tokio::test]
547 async fn entry_too_large() {
548 let data = FakeData::random();
549
550 let temp_dir = tempfile::tempdir().expect("should not fail to create temporary directory");
552 let root_path = temp_dir.path().to_path_buf();
553
554 let mut persisted_queue = PersistedQueue::<FakeData>::from_root_path(
555 root_path.clone(),
556 1,
557 0.8,
558 DiskUsageRetrieverWrapper::new(Arc::new(DiskUsageRetrieverImpl::new(root_path.clone()))),
559 )
560 .await
561 .expect("should not fail to create persisted queue");
562
563 assert_eq!(0, files_in_dir(&root_path).await);
565
566 assert!(persisted_queue.push(data).await.is_err());
568
569 assert_eq!(0, files_in_dir(&root_path).await);
571 }
572
573 #[tokio::test]
574 async fn remove_oldest_entry_on_push() {
575 let data1 = FakeData::random();
576 let data2 = FakeData::random();
577
578 let temp_dir = tempfile::tempdir().expect("should not fail to create temporary directory");
582 let root_path = temp_dir.path().to_path_buf();
583
584 let mut persisted_queue = PersistedQueue::<FakeData>::from_root_path(
585 root_path.clone(),
586 32,
587 0.8,
588 DiskUsageRetrieverWrapper::new(Arc::new(DiskUsageRetrieverImpl::new(root_path.clone()))),
589 )
590 .await
591 .expect("should not fail to create persisted queue");
592
593 assert_eq!(0, files_in_dir(&root_path).await);
595
596 let push_result = persisted_queue.push(data1).await.expect("should not fail to push data");
598 assert_eq!(1, files_in_dir(&root_path).await);
599 assert_eq!(0, push_result.items_dropped);
600 assert_eq!(0, push_result.events_dropped);
601
602 let push_result = persisted_queue
604 .push(data2.clone())
605 .await
606 .expect("should not fail to push data");
607 assert_eq!(1, files_in_dir(&root_path).await);
608 assert_eq!(1, push_result.items_dropped);
609 assert_eq!(1, push_result.events_dropped);
610
611 let actual = persisted_queue
614 .pop()
615 .await
616 .expect("should not fail to pop data")
617 .expect("should not be empty");
618 assert_eq!(data2, actual);
619 assert_eq!(0, files_in_dir(&root_path).await);
620 }
621
622 #[tokio::test]
623 async fn storage_ratio_exceeded() {
624 let data1 = FakeData::random();
625 let data2 = FakeData::random();
626
627 let temp_dir = tempfile::tempdir().expect("should not fail to create temporary directory");
631 let root_path = temp_dir.path().to_path_buf();
632
633 let mut persisted_queue = PersistedQueue::<FakeData>::from_root_path(
634 root_path.clone(),
635 80,
636 0.35,
637 DiskUsageRetrieverWrapper::new(Arc::new(MockDiskUsageRetriever {})),
638 )
639 .await
640 .expect("should not fail to create persisted queue");
641
642 assert_eq!(0, files_in_dir(&root_path).await);
644
645 let push_result = persisted_queue.push(data1).await.expect("should not fail to push data");
650
651 assert_eq!(1, files_in_dir(&root_path).await);
652 assert_eq!(0, push_result.items_dropped);
653 assert_eq!(0, push_result.events_dropped);
654
655 let push_result = persisted_queue
657 .push(data2.clone())
658 .await
659 .expect("should not fail to push data");
660 assert_eq!(1, files_in_dir(&root_path).await);
661 assert_eq!(1, push_result.items_dropped);
662 assert_eq!(1, push_result.events_dropped);
663
664 let actual = persisted_queue
667 .pop()
668 .await
669 .expect("should not fail to pop data")
670 .expect("should not be empty");
671 assert_eq!(data2, actual);
672 assert_eq!(0, files_in_dir(&root_path).await);
673 }
674
675 async fn write_corrupt_entry(dir: &Path) -> PathBuf {
678 let filename = "retry-20000101000000000000-100000000.json";
679 let path = dir.join(filename);
680 tokio::fs::write(&path, b"this is not valid json").await.unwrap();
681 path
682 }
683
684 #[tokio::test]
685 async fn corrupt_entry_is_skipped_on_pop() {
686 let data = FakeData::random();
687
688 let temp_dir = tempfile::tempdir().expect("should not fail to create temporary directory");
689 let root_path = temp_dir.path().to_path_buf();
690
691 let mut persisted_queue = PersistedQueue::<FakeData>::from_root_path(
692 root_path.clone(),
693 1024,
694 0.8,
695 DiskUsageRetrieverWrapper::new(Arc::new(MockDiskUsageRetriever {})),
696 )
697 .await
698 .expect("should not fail to create persisted queue");
699
700 let corrupt_path = write_corrupt_entry(&root_path).await;
702
703 let _ = persisted_queue
705 .push(data.clone())
706 .await
707 .expect("should not fail to push data");
708
709 persisted_queue.refresh_entry_state().await.unwrap();
711
712 let actual = persisted_queue
714 .pop()
715 .await
716 .expect("should not fail to pop data")
717 .expect("should have a valid entry");
718 assert_eq!(data, actual);
719
720 assert!(!corrupt_path.exists());
722
723 assert_eq!(1, persisted_queue.take_entries_dropped());
725
726 assert_eq!(0, files_in_dir(&root_path).await);
728 }
729
730 #[tokio::test]
731 async fn corrupt_entry_does_not_block_queue() {
732 let data1 = FakeData::random();
733 let data2 = FakeData::random();
734
735 let temp_dir = tempfile::tempdir().expect("should not fail to create temporary directory");
736 let root_path = temp_dir.path().to_path_buf();
737
738 let mut persisted_queue = PersistedQueue::<FakeData>::from_root_path(
740 root_path.clone(),
741 1024,
742 0.8,
743 DiskUsageRetrieverWrapper::new(Arc::new(MockDiskUsageRetriever {})),
744 )
745 .await
746 .expect("should not fail to create persisted queue");
747
748 let _ = persisted_queue.push(data1).await.expect("should not fail to push data");
750 let _ = persisted_queue
751 .push(data2.clone())
752 .await
753 .expect("should not fail to push data");
754 assert_eq!(2, persisted_queue.entries.len());
755
756 let oldest_path = persisted_queue.entries[0].path.clone();
758 tokio::fs::write(&oldest_path, b"corrupted").await.unwrap();
759
760 let actual = persisted_queue
762 .pop()
763 .await
764 .expect("should not fail to pop data")
765 .expect("should have a valid entry");
766 assert_eq!(data2, actual);
767
768 assert_eq!(1, persisted_queue.take_entries_dropped());
769 assert_eq!(0, files_in_dir(&root_path).await);
770 }
771
772 #[tokio::test]
773 async fn pop_returns_none_when_all_entries_corrupt() {
774 let temp_dir = tempfile::tempdir().expect("should not fail to create temporary directory");
775 let root_path = temp_dir.path().to_path_buf();
776
777 let mut persisted_queue = PersistedQueue::<FakeData>::from_root_path(
778 root_path.clone(),
779 1024,
780 0.8,
781 DiskUsageRetrieverWrapper::new(Arc::new(MockDiskUsageRetriever {})),
782 )
783 .await
784 .expect("should not fail to create persisted queue");
785
786 write_corrupt_entry(&root_path).await;
788 persisted_queue.refresh_entry_state().await.unwrap();
789
790 let result = persisted_queue.pop().await.expect("should not fail to pop data");
792 assert!(result.is_none());
793
794 assert_eq!(1, persisted_queue.take_entries_dropped());
795 assert_eq!(0, files_in_dir(&root_path).await);
796 }
797
798 #[tokio::test]
799 async fn corrupt_entry_dropped_during_eviction() {
800 let data = FakeData::random();
801
802 let temp_dir = tempfile::tempdir().expect("should not fail to create temporary directory");
803 let root_path = temp_dir.path().to_path_buf();
804
805 let mut persisted_queue = PersistedQueue::<FakeData>::from_root_path(
807 root_path.clone(),
808 32,
809 0.8,
810 DiskUsageRetrieverWrapper::new(Arc::new(MockDiskUsageRetriever {})),
811 )
812 .await
813 .expect("should not fail to create persisted queue");
814
815 let _ = persisted_queue
817 .push(FakeData::random())
818 .await
819 .expect("should not fail to push data");
820 let first_path = persisted_queue.entries[0].path.clone();
821 tokio::fs::write(&first_path, b"corrupted").await.unwrap();
822
823 let _ = persisted_queue
826 .push(data.clone())
827 .await
828 .expect("should not fail to push data");
829
830 assert_eq!(1, persisted_queue.take_entries_dropped());
832
833 let actual = persisted_queue
835 .pop()
836 .await
837 .expect("should not fail to pop data")
838 .expect("should have a valid entry");
839 assert_eq!(data, actual);
840 assert_eq!(0, files_in_dir(&root_path).await);
841 }
842}