1use std::{
2 io,
3 marker::PhantomData,
4 path::{Path, PathBuf},
5 sync::Arc,
6};
7
8use chrono::{DateTime, NaiveDateTime, Utc};
9use fs4::{available_space, total_space};
10use rand::RngExt as _;
11use saluki_error::{generic_error, ErrorContext as _, GenericError};
12use serde::{de::DeserializeOwned, Serialize};
13use tracing::{debug, info, warn};
14
15use super::{EventContainer, PushResult};
16
17struct PersistedEntry {
21 path: PathBuf,
22 timestamp: u128,
23 size_bytes: u64,
24}
25
26impl PersistedEntry {
27 fn try_from_path(path: PathBuf, size_bytes: u64) -> Option<Self> {
31 let timestamp = decode_timestamped_filename(&path)?;
32 Some(Self {
33 path,
34 timestamp,
35 size_bytes,
36 })
37 }
38
39 fn from_parts(path: PathBuf, timestamp: u128, size_bytes: u64) -> Self {
40 Self {
41 path,
42 timestamp,
43 size_bytes,
44 }
45 }
46}
47
48pub trait DiskUsageRetriever {
49 fn total_space(&self) -> Result<u64, GenericError>;
50 fn available_space(&self) -> Result<u64, GenericError>;
51}
52
53pub struct DiskUsageRetrieverImpl {
54 root_path: PathBuf,
55}
56
57impl DiskUsageRetrieverImpl {
58 pub fn new(root_path: PathBuf) -> Self {
59 Self { root_path }
60 }
61}
62
63impl DiskUsageRetriever for DiskUsageRetrieverImpl {
64 fn total_space(&self) -> Result<u64, GenericError> {
65 total_space(&self.root_path)
66 .with_error_context(|| format!("Failed to get total space for '{}'.", self.root_path.display()))
67 }
68
69 fn available_space(&self) -> Result<u64, GenericError> {
70 available_space(&self.root_path)
71 .with_error_context(|| format!("Failed to get available space for '{}'.", self.root_path.display()))
72 }
73}
74
75#[derive(Clone)]
76pub struct DiskUsageRetrieverWrapper {
77 inner: Arc<dyn DiskUsageRetriever + Send + Sync>,
78}
79
80impl DiskUsageRetrieverWrapper {
81 pub fn new(disk_usage_retriever: Arc<dyn DiskUsageRetriever + Send + Sync>) -> Self {
82 Self {
83 inner: disk_usage_retriever,
84 }
85 }
86}
87
88pub struct PersistedQueue<T> {
89 root_path: PathBuf,
90 entries: Vec<PersistedEntry>,
91 total_on_disk_bytes: u64,
92 max_on_disk_bytes: u64,
93 storage_max_disk_ratio: f64,
94 disk_usage_retriever: DiskUsageRetrieverWrapper,
95 entries_dropped: u64,
96 _entry: PhantomData<T>,
97}
98
99impl<T> PersistedQueue<T>
100where
101 T: EventContainer + DeserializeOwned + Serialize,
102{
103 pub async fn from_root_path(
113 root_path: PathBuf, max_on_disk_bytes: u64, storage_max_disk_ratio: f64,
114 disk_usage_retriever: DiskUsageRetrieverWrapper,
115 ) -> Result<Self, GenericError> {
116 create_directory_recursive(root_path.clone())
118 .await
119 .with_error_context(|| format!("Failed to create retry directory '{}'.", root_path.display()))?;
120
121 let mut persisted_requests = Self {
122 root_path: root_path.clone(),
123 entries: Vec::new(),
124 total_on_disk_bytes: 0,
125 max_on_disk_bytes,
126 storage_max_disk_ratio,
127 disk_usage_retriever,
128 entries_dropped: 0,
129 _entry: PhantomData,
130 };
131
132 persisted_requests.refresh_entry_state().await?;
133
134 info!(
135 "Persisted retry queue initialized. Transactions will be stored in '{}'.",
136 root_path.display()
137 );
138
139 Ok(persisted_requests)
140 }
141
142 pub fn is_empty(&self) -> bool {
144 self.entries.is_empty()
145 }
146
147 pub fn len(&self) -> usize {
149 self.entries.len()
150 }
151
152 pub fn take_entries_dropped(&mut self) -> u64 {
155 std::mem::take(&mut self.entries_dropped)
156 }
157
158 pub async fn push(&mut self, entry: T) -> Result<PushResult, GenericError> {
165 let (filename, timestamp) = generate_timestamped_filename();
167 let entry_path = self.root_path.join(filename);
168 let serialized = serde_json::to_vec(&entry)
169 .with_error_context(|| format!("Failed to serialize entry for '{}'.", entry_path.display()))?;
170
171 if serialized.len() as u64 > self.max_on_disk_bytes {
172 return Err(generic_error!("Entry is too large to persist."));
173 }
174
175 let push_result = self
177 .remove_until_available_space(serialized.len() as u64)
178 .await
179 .error_context(
180 "Failed to remove older persisted entries to make space for the incoming persisted entry.",
181 )?;
182
183 tokio::fs::write(&entry_path, &serialized)
185 .await
186 .with_error_context(|| format!("Failed to write entry to '{}'.", entry_path.display()))?;
187
188 self.entries.push(PersistedEntry::from_parts(
190 entry_path,
191 timestamp,
192 serialized.len() as u64,
193 ));
194 self.total_on_disk_bytes += serialized.len() as u64;
195
196 debug!(entry.len = serialized.len(), "Enqueued persisted entry.");
197
198 Ok(push_result)
199 }
200
201 pub async fn pop(&mut self) -> Result<Option<T>, GenericError> {
207 loop {
208 if self.entries.is_empty() {
209 return Ok(None);
210 }
211
212 let entry = self.entries.remove(0);
213 match try_deserialize_entry(&entry).await {
214 Ok(Some(deserialized)) => {
215 self.total_on_disk_bytes -= entry.size_bytes;
217 debug!(entry.len = entry.size_bytes, "Dequeued persisted entry.");
218
219 return Ok(Some(deserialized));
220 }
221 Ok(None) => {
222 self.refresh_entry_state().await?;
225 continue;
226 }
227 Err(e) => {
228 warn!(
231 entry.path = %entry.path.display(),
232 entry.len = entry.size_bytes,
233 error = %e,
234 "Permanently dropping persisted entry that could not be consumed.",
235 );
236
237 self.total_on_disk_bytes -= entry.size_bytes;
238 self.entries_dropped += 1;
239 continue;
240 }
241 }
242 }
243 }
244
245 async fn refresh_entry_state(&mut self) -> io::Result<()> {
246 let mut entries = Vec::new();
248
249 let mut dir_reader = tokio::fs::read_dir(&self.root_path).await?;
250 while let Some(entry) = dir_reader.next_entry().await? {
251 let metadata = entry.metadata().await?;
252 if metadata.is_file() {
253 match PersistedEntry::try_from_path(entry.path(), metadata.len()) {
254 Some(entry) => entries.push(entry),
255 None => {
256 warn!(
257 file_size = metadata.len(),
258 "Ignoring unrecognized file '{}' in retry directory.",
259 entry.path().display()
260 );
261 continue;
262 }
263 }
264 }
265 }
266
267 entries.sort_by_key(|entry| entry.timestamp);
269 self.total_on_disk_bytes = entries.iter().map(|entry| entry.size_bytes).sum();
270 self.entries = entries;
271
272 Ok(())
273 }
274
275 async fn remove_until_available_space(&mut self, required_bytes: u64) -> Result<PushResult, GenericError> {
282 let mut push_result = PushResult::default();
283
284 let disk_usage_retriever = self.disk_usage_retriever.clone();
285 let storage_max_disk_ratio = self.storage_max_disk_ratio;
286 let max_on_disk_bytes = self.max_on_disk_bytes;
287
288 let limit = tokio::task::spawn_blocking(move || {
295 on_disk_bytes_limit(disk_usage_retriever, storage_max_disk_ratio, max_on_disk_bytes)
296 })
297 .await
298 .error_context("Failed to run disk size limit check to completion.")??;
299
300 while !self.entries.is_empty() && self.total_on_disk_bytes + required_bytes > limit {
301 let entry = self.entries.remove(0);
302
303 let event_count = match try_deserialize_entry::<T>(&entry).await {
305 Ok(Some(deserialized)) => deserialized.event_count(),
306 Ok(None) => {
307 warn!(entry.path = %entry.path.display(), "Failed to find entry on disk. Persisted entry state may be inconsistent.");
308 continue;
309 }
310 Err(e) => {
311 warn!(
314 entry.path = %entry.path.display(),
315 entry.len = entry.size_bytes,
316 error = %e,
317 "Permanently dropping persisted entry that could not be consumed during eviction.",
318 );
319
320 self.total_on_disk_bytes -= entry.size_bytes;
321 self.entries_dropped += 1;
322 continue;
323 }
324 };
325
326 self.total_on_disk_bytes -= entry.size_bytes;
328 push_result.track_dropped_item(event_count);
329
330 warn!(entry.path = %entry.path.display(), entry.len = entry.size_bytes, "Dropped persisted entry.");
331 }
332
333 Ok(push_result)
334 }
335}
336
337fn on_disk_bytes_limit(
345 disk_usage_retriever: DiskUsageRetrieverWrapper, storage_max_disk_ratio: f64, max_on_disk_bytes: u64,
346) -> Result<u64, GenericError> {
347 let total_space = disk_usage_retriever.inner.total_space()? as f64;
348 let available_space = disk_usage_retriever.inner.available_space()? as f64;
349 let disk_reserved = total_space * (1.0 - storage_max_disk_ratio);
350 let available_disk_usage = (available_space - disk_reserved).ceil() as u64;
351 Ok(max_on_disk_bytes.min(available_disk_usage))
352}
353
354async fn try_deserialize_entry<T: DeserializeOwned>(entry: &PersistedEntry) -> Result<Option<T>, GenericError> {
355 let serialized = match tokio::fs::read(&entry.path).await {
356 Ok(serialized) => serialized,
357 Err(e) => match e.kind() {
358 io::ErrorKind::NotFound => {
359 return Ok(None);
365 }
366 _ => {
367 return Err(e)
368 .with_error_context(|| format!("Failed to read persisted entry '{}'.", entry.path.display()))
369 }
370 },
371 };
372
373 let deserialized = match serde_json::from_slice(&serialized) {
374 Ok(deserialized) => deserialized,
375 Err(e) => {
376 if let Err(remove_err) = tokio::fs::remove_file(&entry.path).await {
379 warn!(
380 entry.path = %entry.path.display(),
381 error = %remove_err,
382 "Failed to remove corrupt persisted entry from disk.",
383 );
384 }
385
386 return Err(e)
387 .with_error_context(|| format!("Failed to deserialize persisted entry '{}'.", entry.path.display()));
388 }
389 };
390
391 tokio::fs::remove_file(&entry.path)
393 .await
394 .with_error_context(|| format!("Failed to delete persisted entry '{}'.", entry.path.display()))?;
395
396 debug!(entry.path = %entry.path.display(), entry.len = entry.size_bytes, "Consumed persisted entry and removed from disk.");
397 Ok(Some(deserialized))
398}
399
400fn generate_timestamped_filename() -> (PathBuf, u128) {
401 let now = Utc::now();
402 let now_ts = datetime_to_timestamp(now);
403 let nonce = rand::rng().random_range(100000000..999999999);
404
405 let filename = format!("retry-{}-{}.json", now.format("%Y%m%d%H%M%S%f"), nonce).into();
406
407 (filename, now_ts)
408}
409
410fn decode_timestamped_filename(path: &Path) -> Option<u128> {
411 let filename = path.file_stem()?.to_str()?;
412 let mut filename_parts = filename.split('-');
413
414 let prefix = filename_parts.next()?;
415 let timestamp_str = filename_parts.next()?;
416 let nonce = filename_parts.next()?;
417
418 if prefix != "retry" || nonce.parse::<u64>().is_err() {
420 return None;
421 }
422
423 NaiveDateTime::parse_from_str(timestamp_str, "%Y%m%d%H%M%S%f")
425 .map(|dt| datetime_to_timestamp(dt.and_utc()))
426 .ok()
427}
428
429fn datetime_to_timestamp(dt: DateTime<Utc>) -> u128 {
430 let secs = (dt.timestamp() as u128) * 1_000_000_000;
431 let ns = dt.timestamp_subsec_nanos() as u128;
432
433 secs + ns
434}
435
436async fn create_directory_recursive(path: PathBuf) -> Result<(), GenericError> {
437 let mut dir_builder = std::fs::DirBuilder::new();
438 dir_builder.recursive(true);
439
440 #[cfg(unix)]
443 {
444 use std::os::unix::fs::DirBuilderExt;
445 dir_builder.mode(0o700);
446 }
447
448 tokio::task::spawn_blocking(move || {
449 dir_builder
450 .create(&path)
451 .with_error_context(|| format!("Failed to create directory '{}'.", path.display()))
452 })
453 .await
454 .error_context("Failed to spawn directory creation blocking task.")?
455}
456
457#[cfg(test)]
458mod tests {
459 use rand::RngExt as _;
460 use rand_distr::Alphanumeric;
461 use serde::Deserialize;
462
463 use super::*;
464
465 #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
466 struct FakeData {
467 name: String,
468 value: u32,
469 }
470
471 impl FakeData {
472 fn random() -> Self {
473 Self {
474 name: rand::rng().sample_iter(&Alphanumeric).take(8).map(char::from).collect(),
475 value: rand::rng().random_range(0..100),
476 }
477 }
478 }
479
480 impl EventContainer for FakeData {
481 fn event_count(&self) -> u64 {
482 1
483 }
484 }
485
486 struct MockDiskUsageRetriever {}
487
488 impl DiskUsageRetriever for MockDiskUsageRetriever {
489 fn total_space(&self) -> Result<u64, GenericError> {
490 Ok(100)
491 }
492 fn available_space(&self) -> Result<u64, GenericError> {
493 Ok(100)
494 }
495 }
496
497 async fn files_in_dir(path: &Path) -> usize {
498 let mut file_count = 0;
499 let mut dir_reader = tokio::fs::read_dir(path).await.unwrap();
500 while let Some(entry) = dir_reader.next_entry().await.unwrap() {
501 if entry.metadata().await.unwrap().is_file() {
502 file_count += 1;
503 }
504 }
505 file_count
506 }
507
508 #[tokio::test]
509 async fn basic_push_pop() {
510 let data = FakeData::random();
511
512 let temp_dir = tempfile::tempdir().expect("should not fail to create temporary directory");
514 let root_path = temp_dir.path().to_path_buf();
515
516 let mut persisted_queue = PersistedQueue::<FakeData>::from_root_path(
517 root_path.clone(),
518 1024,
519 0.8,
520 DiskUsageRetrieverWrapper::new(Arc::new(DiskUsageRetrieverImpl::new(root_path.clone()))),
521 )
522 .await
523 .expect("should not fail to create persisted queue");
524
525 assert_eq!(0, files_in_dir(&root_path).await);
527
528 let push_result = persisted_queue
530 .push(data.clone())
531 .await
532 .expect("should not fail to push data");
533 assert_eq!(1, files_in_dir(&root_path).await);
534 assert_eq!(0, push_result.items_dropped);
535 assert_eq!(0, push_result.events_dropped);
536
537 let actual = persisted_queue
539 .pop()
540 .await
541 .expect("should not fail to pop data")
542 .expect("should not be empty");
543 assert_eq!(data, actual);
544 assert_eq!(0, files_in_dir(&root_path).await);
545 }
546
547 #[tokio::test]
548 async fn entry_too_large() {
549 let data = FakeData::random();
550
551 let temp_dir = tempfile::tempdir().expect("should not fail to create temporary directory");
553 let root_path = temp_dir.path().to_path_buf();
554
555 let mut persisted_queue = PersistedQueue::<FakeData>::from_root_path(
556 root_path.clone(),
557 1,
558 0.8,
559 DiskUsageRetrieverWrapper::new(Arc::new(DiskUsageRetrieverImpl::new(root_path.clone()))),
560 )
561 .await
562 .expect("should not fail to create persisted queue");
563
564 assert_eq!(0, files_in_dir(&root_path).await);
566
567 assert!(persisted_queue.push(data).await.is_err());
569
570 assert_eq!(0, files_in_dir(&root_path).await);
572 }
573
574 #[tokio::test]
575 async fn remove_oldest_entry_on_push() {
576 let data1 = FakeData::random();
577 let data2 = FakeData::random();
578
579 let temp_dir = tempfile::tempdir().expect("should not fail to create temporary directory");
583 let root_path = temp_dir.path().to_path_buf();
584
585 let mut persisted_queue = PersistedQueue::<FakeData>::from_root_path(
586 root_path.clone(),
587 32,
588 0.8,
589 DiskUsageRetrieverWrapper::new(Arc::new(DiskUsageRetrieverImpl::new(root_path.clone()))),
590 )
591 .await
592 .expect("should not fail to create persisted queue");
593
594 assert_eq!(0, files_in_dir(&root_path).await);
596
597 let push_result = persisted_queue.push(data1).await.expect("should not fail to push data");
599 assert_eq!(1, files_in_dir(&root_path).await);
600 assert_eq!(0, push_result.items_dropped);
601 assert_eq!(0, push_result.events_dropped);
602
603 let push_result = persisted_queue
605 .push(data2.clone())
606 .await
607 .expect("should not fail to push data");
608 assert_eq!(1, files_in_dir(&root_path).await);
609 assert_eq!(1, push_result.items_dropped);
610 assert_eq!(1, push_result.events_dropped);
611
612 let actual = persisted_queue
615 .pop()
616 .await
617 .expect("should not fail to pop data")
618 .expect("should not be empty");
619 assert_eq!(data2, actual);
620 assert_eq!(0, files_in_dir(&root_path).await);
621 }
622
623 #[tokio::test]
624 async fn storage_ratio_exceeded() {
625 let data1 = FakeData::random();
626 let data2 = FakeData::random();
627
628 let temp_dir = tempfile::tempdir().expect("should not fail to create temporary directory");
632 let root_path = temp_dir.path().to_path_buf();
633
634 let mut persisted_queue = PersistedQueue::<FakeData>::from_root_path(
635 root_path.clone(),
636 80,
637 0.35,
638 DiskUsageRetrieverWrapper::new(Arc::new(MockDiskUsageRetriever {})),
639 )
640 .await
641 .expect("should not fail to create persisted queue");
642
643 assert_eq!(0, files_in_dir(&root_path).await);
645
646 let push_result = persisted_queue.push(data1).await.expect("should not fail to push data");
651
652 assert_eq!(1, files_in_dir(&root_path).await);
653 assert_eq!(0, push_result.items_dropped);
654 assert_eq!(0, push_result.events_dropped);
655
656 let push_result = persisted_queue
658 .push(data2.clone())
659 .await
660 .expect("should not fail to push data");
661 assert_eq!(1, files_in_dir(&root_path).await);
662 assert_eq!(1, push_result.items_dropped);
663 assert_eq!(1, push_result.events_dropped);
664
665 let actual = persisted_queue
668 .pop()
669 .await
670 .expect("should not fail to pop data")
671 .expect("should not be empty");
672 assert_eq!(data2, actual);
673 assert_eq!(0, files_in_dir(&root_path).await);
674 }
675
676 async fn write_corrupt_entry(dir: &Path) -> PathBuf {
679 let filename = "retry-20000101000000000000-100000000.json";
680 let path = dir.join(filename);
681 tokio::fs::write(&path, b"this is not valid json").await.unwrap();
682 path
683 }
684
685 #[tokio::test]
686 async fn corrupt_entry_is_skipped_on_pop() {
687 let data = FakeData::random();
688
689 let temp_dir = tempfile::tempdir().expect("should not fail to create temporary directory");
690 let root_path = temp_dir.path().to_path_buf();
691
692 let mut persisted_queue = PersistedQueue::<FakeData>::from_root_path(
693 root_path.clone(),
694 1024,
695 0.8,
696 DiskUsageRetrieverWrapper::new(Arc::new(MockDiskUsageRetriever {})),
697 )
698 .await
699 .expect("should not fail to create persisted queue");
700
701 let corrupt_path = write_corrupt_entry(&root_path).await;
703
704 let _ = persisted_queue
706 .push(data.clone())
707 .await
708 .expect("should not fail to push data");
709
710 persisted_queue.refresh_entry_state().await.unwrap();
712
713 let actual = persisted_queue
715 .pop()
716 .await
717 .expect("should not fail to pop data")
718 .expect("should have a valid entry");
719 assert_eq!(data, actual);
720
721 assert!(!corrupt_path.exists());
723
724 assert_eq!(1, persisted_queue.take_entries_dropped());
726
727 assert_eq!(0, files_in_dir(&root_path).await);
729 }
730
731 #[tokio::test]
732 async fn corrupt_entry_does_not_block_queue() {
733 let data1 = FakeData::random();
734 let data2 = FakeData::random();
735
736 let temp_dir = tempfile::tempdir().expect("should not fail to create temporary directory");
737 let root_path = temp_dir.path().to_path_buf();
738
739 let mut persisted_queue = PersistedQueue::<FakeData>::from_root_path(
741 root_path.clone(),
742 1024,
743 0.8,
744 DiskUsageRetrieverWrapper::new(Arc::new(MockDiskUsageRetriever {})),
745 )
746 .await
747 .expect("should not fail to create persisted queue");
748
749 let _ = persisted_queue.push(data1).await.expect("should not fail to push data");
751 let _ = persisted_queue
752 .push(data2.clone())
753 .await
754 .expect("should not fail to push data");
755 assert_eq!(2, persisted_queue.entries.len());
756
757 let oldest_path = persisted_queue.entries[0].path.clone();
759 tokio::fs::write(&oldest_path, b"corrupted").await.unwrap();
760
761 let actual = persisted_queue
763 .pop()
764 .await
765 .expect("should not fail to pop data")
766 .expect("should have a valid entry");
767 assert_eq!(data2, actual);
768
769 assert_eq!(1, persisted_queue.take_entries_dropped());
770 assert_eq!(0, files_in_dir(&root_path).await);
771 }
772
773 #[tokio::test]
774 async fn pop_returns_none_when_all_entries_corrupt() {
775 let temp_dir = tempfile::tempdir().expect("should not fail to create temporary directory");
776 let root_path = temp_dir.path().to_path_buf();
777
778 let mut persisted_queue = PersistedQueue::<FakeData>::from_root_path(
779 root_path.clone(),
780 1024,
781 0.8,
782 DiskUsageRetrieverWrapper::new(Arc::new(MockDiskUsageRetriever {})),
783 )
784 .await
785 .expect("should not fail to create persisted queue");
786
787 write_corrupt_entry(&root_path).await;
789 persisted_queue.refresh_entry_state().await.unwrap();
790
791 let result = persisted_queue.pop().await.expect("should not fail to pop data");
793 assert!(result.is_none());
794
795 assert_eq!(1, persisted_queue.take_entries_dropped());
796 assert_eq!(0, files_in_dir(&root_path).await);
797 }
798
799 #[tokio::test]
800 async fn corrupt_entry_dropped_during_eviction() {
801 let data = FakeData::random();
802
803 let temp_dir = tempfile::tempdir().expect("should not fail to create temporary directory");
804 let root_path = temp_dir.path().to_path_buf();
805
806 let mut persisted_queue = PersistedQueue::<FakeData>::from_root_path(
808 root_path.clone(),
809 32,
810 0.8,
811 DiskUsageRetrieverWrapper::new(Arc::new(MockDiskUsageRetriever {})),
812 )
813 .await
814 .expect("should not fail to create persisted queue");
815
816 let _ = persisted_queue
818 .push(FakeData::random())
819 .await
820 .expect("should not fail to push data");
821 let first_path = persisted_queue.entries[0].path.clone();
822 tokio::fs::write(&first_path, b"corrupted").await.unwrap();
823
824 let _ = persisted_queue
827 .push(data.clone())
828 .await
829 .expect("should not fail to push data");
830
831 assert_eq!(1, persisted_queue.take_entries_dropped());
833
834 let actual = persisted_queue
836 .pop()
837 .await
838 .expect("should not fail to pop data")
839 .expect("should have a valid entry");
840 assert_eq!(data, actual);
841 assert_eq!(0, files_in_dir(&root_path).await);
842 }
843}