saluki_io/net/util/retry/queue/
persisted.rs1use std::{
2 io,
3 marker::PhantomData,
4 path::{Path, PathBuf},
5 sync::Arc,
6};
7
8use chrono::{DateTime, NaiveDateTime, Utc};
9use fs4::{available_space, total_space};
10use rand::Rng;
11use saluki_error::{generic_error, ErrorContext as _, GenericError};
12use serde::{de::DeserializeOwned, Serialize};
13use tracing::{debug, info, warn};
14
15use super::{EventContainer, PushResult};
16
17struct PersistedEntry {
21 path: PathBuf,
22 timestamp: u128,
23 size_bytes: u64,
24}
25
26impl PersistedEntry {
27 fn try_from_path(path: PathBuf, size_bytes: u64) -> Option<Self> {
31 let timestamp = decode_timestamped_filename(&path)?;
32 Some(Self {
33 path,
34 timestamp,
35 size_bytes,
36 })
37 }
38
39 fn from_parts(path: PathBuf, timestamp: u128, size_bytes: u64) -> Self {
40 Self {
41 path,
42 timestamp,
43 size_bytes,
44 }
45 }
46}
47
48pub trait DiskUsageRetriever {
49 fn total_space(&self) -> Result<u64, GenericError>;
50 fn available_space(&self) -> Result<u64, GenericError>;
51}
52
53pub struct DiskUsageRetrieverImpl {
54 root_path: PathBuf,
55}
56
57impl DiskUsageRetrieverImpl {
58 pub fn new(root_path: PathBuf) -> Self {
59 Self { root_path }
60 }
61}
62
63impl DiskUsageRetriever for DiskUsageRetrieverImpl {
64 fn total_space(&self) -> Result<u64, GenericError> {
65 total_space(&self.root_path)
66 .with_error_context(|| format!("Failed to get total space for '{}'.", self.root_path.display()))
67 }
68
69 fn available_space(&self) -> Result<u64, GenericError> {
70 available_space(&self.root_path)
71 .with_error_context(|| format!("Failed to get available space for '{}'.", self.root_path.display()))
72 }
73}
74
75#[derive(Clone)]
76pub struct DiskUsageRetrieverWrapper {
77 inner: Arc<dyn DiskUsageRetriever + Send + Sync>,
78}
79
80impl DiskUsageRetrieverWrapper {
81 pub fn new(disk_usage_retriever: Arc<dyn DiskUsageRetriever + Send + Sync>) -> Self {
82 Self {
83 inner: disk_usage_retriever,
84 }
85 }
86}
87
88pub struct PersistedQueue<T> {
89 root_path: PathBuf,
90 entries: Vec<PersistedEntry>,
91 total_on_disk_bytes: u64,
92 max_on_disk_bytes: u64,
93 storage_max_disk_ratio: f64,
94 disk_usage_retriever: DiskUsageRetrieverWrapper,
95 _entry: PhantomData<T>,
96}
97
98impl<T> PersistedQueue<T>
99where
100 T: EventContainer + DeserializeOwned + Serialize,
101{
102 pub async fn from_root_path(
112 root_path: PathBuf, max_on_disk_bytes: u64, storage_max_disk_ratio: f64,
113 disk_usage_retriever: DiskUsageRetrieverWrapper,
114 ) -> Result<Self, GenericError> {
115 create_directory_recursive(root_path.clone())
117 .await
118 .with_error_context(|| format!("Failed to create retry directory '{}'.", root_path.display()))?;
119
120 let mut persisted_requests = Self {
121 root_path: root_path.clone(),
122 entries: Vec::new(),
123 total_on_disk_bytes: 0,
124 max_on_disk_bytes,
125 storage_max_disk_ratio,
126 disk_usage_retriever,
127 _entry: PhantomData,
128 };
129
130 persisted_requests.refresh_entry_state().await?;
131
132 info!(
133 "Persisted retry queue initialized. Transactions will be stored in '{}'.",
134 root_path.display()
135 );
136
137 Ok(persisted_requests)
138 }
139
140 pub fn is_empty(&self) -> bool {
142 self.entries.is_empty()
143 }
144
145 pub fn len(&self) -> usize {
147 self.entries.len()
148 }
149
150 pub async fn push(&mut self, entry: T) -> Result<PushResult, GenericError> {
157 let (filename, timestamp) = generate_timestamped_filename();
159 let entry_path = self.root_path.join(filename);
160 let serialized = serde_json::to_vec(&entry)
161 .with_error_context(|| format!("Failed to serialize entry for '{}'.", entry_path.display()))?;
162
163 if serialized.len() as u64 > self.max_on_disk_bytes {
164 return Err(generic_error!("Entry is too large to persist."));
165 }
166
167 let push_result = self
169 .remove_until_available_space(serialized.len() as u64)
170 .await
171 .error_context(
172 "Failed to remove older persisted entries to make space for the incoming persisted entry.",
173 )?;
174
175 tokio::fs::write(&entry_path, &serialized)
177 .await
178 .with_error_context(|| format!("Failed to write entry to '{}'.", entry_path.display()))?;
179
180 self.entries.push(PersistedEntry::from_parts(
182 entry_path,
183 timestamp,
184 serialized.len() as u64,
185 ));
186 self.total_on_disk_bytes += serialized.len() as u64;
187
188 debug!(entry.len = serialized.len(), "Enqueued persisted entry.");
189
190 Ok(push_result)
191 }
192
193 pub async fn pop(&mut self) -> Result<Option<T>, GenericError> {
199 if self.entries.is_empty() {
200 return Ok(None);
201 }
202
203 loop {
204 let entry = self.entries.remove(0);
205 match try_deserialize_entry(&entry).await {
206 Ok(Some(deserialized)) => {
207 self.total_on_disk_bytes -= entry.size_bytes;
209 debug!(entry.len = entry.size_bytes, "Dequeued persisted entry.");
210
211 return Ok(Some(deserialized));
212 }
213 Ok(None) => {
214 self.refresh_entry_state().await?;
217 continue;
218 }
219 Err(e) => {
220 self.entries.insert(0, entry);
222 return Err(e);
223 }
224 }
225 }
226 }
227
228 async fn refresh_entry_state(&mut self) -> io::Result<()> {
229 let mut entries = Vec::new();
231
232 let mut dir_reader = tokio::fs::read_dir(&self.root_path).await?;
233 while let Some(entry) = dir_reader.next_entry().await? {
234 let metadata = entry.metadata().await?;
235 if metadata.is_file() {
236 match PersistedEntry::try_from_path(entry.path(), metadata.len()) {
237 Some(entry) => entries.push(entry),
238 None => {
239 warn!(
240 file_size = metadata.len(),
241 "Ignoring unrecognized file '{}' in retry directory.",
242 entry.path().display()
243 );
244 continue;
245 }
246 }
247 }
248 }
249
250 entries.sort_by_key(|entry| entry.timestamp);
252 self.total_on_disk_bytes = entries.iter().map(|entry| entry.size_bytes).sum();
253 self.entries = entries;
254
255 Ok(())
256 }
257
258 async fn remove_until_available_space(&mut self, required_bytes: u64) -> Result<PushResult, GenericError> {
265 let mut push_result = PushResult::default();
266
267 let disk_usage_retriever = self.disk_usage_retriever.clone();
268 let storage_max_disk_ratio = self.storage_max_disk_ratio;
269 let max_on_disk_bytes = self.max_on_disk_bytes;
270
271 let limit = tokio::task::spawn_blocking(move || {
272 on_disk_bytes_limit(disk_usage_retriever, storage_max_disk_ratio, max_on_disk_bytes)
273 })
274 .await??;
275
276 while !self.entries.is_empty() && self.total_on_disk_bytes + required_bytes > limit {
277 let entry = self.entries.remove(0);
278
279 let event_count = match try_deserialize_entry::<T>(&entry).await {
281 Ok(Some(deserialized)) => deserialized.event_count(),
282 Ok(None) => {
283 warn!(entry.path = %entry.path.display(), "Failed to find entry on disk. Persisted entry state may be inconsistent.");
284 continue;
285 }
286 Err(e) => {
287 self.entries.insert(0, entry);
289 return Err(e);
290 }
291 };
292
293 self.total_on_disk_bytes -= entry.size_bytes;
295 push_result.track_dropped_item(event_count);
296
297 debug!(entry.path = %entry.path.display(), entry.len = entry.size_bytes, "Dropped persisted entry.");
298 }
299
300 Ok(push_result)
301 }
302}
303
304fn on_disk_bytes_limit(
312 disk_usage_retriever: DiskUsageRetrieverWrapper, storage_max_disk_ratio: f64, max_on_disk_bytes: u64,
313) -> Result<u64, GenericError> {
314 let total_space = disk_usage_retriever.inner.total_space()? as f64;
315 let available_space = disk_usage_retriever.inner.available_space()? as f64;
316 let disk_reserved = total_space * (1.0 - storage_max_disk_ratio);
317 let available_disk_usage = (available_space - disk_reserved).ceil() as u64;
318 Ok(max_on_disk_bytes.min(available_disk_usage))
319}
320
321async fn try_deserialize_entry<T: DeserializeOwned>(entry: &PersistedEntry) -> Result<Option<T>, GenericError> {
322 let serialized = match tokio::fs::read(&entry.path).await {
323 Ok(serialized) => serialized,
324 Err(e) => match e.kind() {
325 io::ErrorKind::NotFound => {
326 return Ok(None);
332 }
333 _ => {
334 return Err(e)
335 .with_error_context(|| format!("Failed to read persisted entry '{}'.", entry.path.display()))
336 }
337 },
338 };
339
340 let deserialized = serde_json::from_slice(&serialized)
341 .with_error_context(|| format!("Failed to deserialize persisted entry '{}'.", entry.path.display()))?;
342
343 tokio::fs::remove_file(&entry.path)
345 .await
346 .with_error_context(|| format!("Failed to delete persisted entry '{}'.", entry.path.display()))?;
347
348 debug!(entry.path = %entry.path.display(), entry.len = entry.size_bytes, "Consumed persisted entry and removed from disk.");
349 Ok(Some(deserialized))
350}
351
352fn generate_timestamped_filename() -> (PathBuf, u128) {
353 let now = Utc::now();
354 let now_ts = datetime_to_timestamp(now);
355 let nonce = rand::rng().random_range(100000000..999999999);
356
357 let filename = format!("retry-{}-{}.json", now.format("%Y%m%d%H%M%S%f"), nonce).into();
358
359 (filename, now_ts)
360}
361
362fn decode_timestamped_filename(path: &Path) -> Option<u128> {
363 let filename = path.file_stem()?.to_str()?;
364 let mut filename_parts = filename.split('-');
365
366 let prefix = filename_parts.next()?;
367 let timestamp_str = filename_parts.next()?;
368 let nonce = filename_parts.next()?;
369
370 if prefix != "retry" || nonce.parse::<u64>().is_err() {
372 return None;
373 }
374
375 NaiveDateTime::parse_from_str(timestamp_str, "%Y%m%d%H%M%S%f")
377 .map(|dt| datetime_to_timestamp(dt.and_utc()))
378 .ok()
379}
380
381fn datetime_to_timestamp(dt: DateTime<Utc>) -> u128 {
382 let secs = (dt.timestamp() as u128) * 1_000_000_000;
383 let ns = dt.timestamp_subsec_nanos() as u128;
384
385 secs + ns
386}
387
388async fn create_directory_recursive(path: PathBuf) -> Result<(), GenericError> {
389 let mut dir_builder = std::fs::DirBuilder::new();
390 dir_builder.recursive(true);
391
392 #[cfg(unix)]
395 {
396 use std::os::unix::fs::DirBuilderExt;
397 dir_builder.mode(0o700);
398 }
399
400 tokio::task::spawn_blocking(move || {
401 dir_builder
402 .create(&path)
403 .with_error_context(|| format!("Failed to create directory '{}'.", path.display()))
404 })
405 .await
406 .error_context("Failed to spawn directory creation blocking task.")?
407}
408
409#[cfg(test)]
410mod tests {
411 use rand_distr::Alphanumeric;
412 use serde::Deserialize;
413
414 use super::*;
415
416 #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
417 struct FakeData {
418 name: String,
419 value: u32,
420 }
421
422 impl FakeData {
423 fn random() -> Self {
424 Self {
425 name: rand::rng().sample_iter(&Alphanumeric).take(8).map(char::from).collect(),
426 value: rand::rng().random_range(0..100),
427 }
428 }
429 }
430
431 impl EventContainer for FakeData {
432 fn event_count(&self) -> u64 {
433 1
434 }
435 }
436
437 struct MockDiskUsageRetriever {}
438
439 impl DiskUsageRetriever for MockDiskUsageRetriever {
440 fn total_space(&self) -> Result<u64, GenericError> {
441 Ok(100)
442 }
443 fn available_space(&self) -> Result<u64, GenericError> {
444 Ok(100)
445 }
446 }
447
448 async fn files_in_dir(path: &Path) -> usize {
449 let mut file_count = 0;
450 let mut dir_reader = tokio::fs::read_dir(path).await.unwrap();
451 while let Some(entry) = dir_reader.next_entry().await.unwrap() {
452 if entry.metadata().await.unwrap().is_file() {
453 file_count += 1;
454 }
455 }
456 file_count
457 }
458
459 #[tokio::test]
460 async fn basic_push_pop() {
461 let data = FakeData::random();
462
463 let temp_dir = tempfile::tempdir().expect("should not fail to create temporary directory");
465 let root_path = temp_dir.path().to_path_buf();
466
467 let mut persisted_queue = PersistedQueue::<FakeData>::from_root_path(
468 root_path.clone(),
469 1024,
470 0.8,
471 DiskUsageRetrieverWrapper::new(Arc::new(DiskUsageRetrieverImpl::new(root_path.clone()))),
472 )
473 .await
474 .expect("should not fail to create persisted queue");
475
476 assert_eq!(0, files_in_dir(&root_path).await);
478
479 let push_result = persisted_queue
481 .push(data.clone())
482 .await
483 .expect("should not fail to push data");
484 assert_eq!(1, files_in_dir(&root_path).await);
485 assert_eq!(0, push_result.items_dropped);
486 assert_eq!(0, push_result.events_dropped);
487
488 let actual = persisted_queue
490 .pop()
491 .await
492 .expect("should not fail to pop data")
493 .expect("should not be empty");
494 assert_eq!(data, actual);
495 assert_eq!(0, files_in_dir(&root_path).await);
496 }
497
498 #[tokio::test]
499 async fn entry_too_large() {
500 let data = FakeData::random();
501
502 let temp_dir = tempfile::tempdir().expect("should not fail to create temporary directory");
504 let root_path = temp_dir.path().to_path_buf();
505
506 let mut persisted_queue = PersistedQueue::<FakeData>::from_root_path(
507 root_path.clone(),
508 1,
509 0.8,
510 DiskUsageRetrieverWrapper::new(Arc::new(DiskUsageRetrieverImpl::new(root_path.clone()))),
511 )
512 .await
513 .expect("should not fail to create persisted queue");
514
515 assert_eq!(0, files_in_dir(&root_path).await);
517
518 assert!(persisted_queue.push(data).await.is_err());
520
521 assert_eq!(0, files_in_dir(&root_path).await);
523 }
524
525 #[tokio::test]
526 async fn remove_oldest_entry_on_push() {
527 let data1 = FakeData::random();
528 let data2 = FakeData::random();
529
530 let temp_dir = tempfile::tempdir().expect("should not fail to create temporary directory");
534 let root_path = temp_dir.path().to_path_buf();
535
536 let mut persisted_queue = PersistedQueue::<FakeData>::from_root_path(
537 root_path.clone(),
538 32,
539 0.8,
540 DiskUsageRetrieverWrapper::new(Arc::new(DiskUsageRetrieverImpl::new(root_path.clone()))),
541 )
542 .await
543 .expect("should not fail to create persisted queue");
544
545 assert_eq!(0, files_in_dir(&root_path).await);
547
548 let push_result = persisted_queue.push(data1).await.expect("should not fail to push data");
550 assert_eq!(1, files_in_dir(&root_path).await);
551 assert_eq!(0, push_result.items_dropped);
552 assert_eq!(0, push_result.events_dropped);
553
554 let push_result = persisted_queue
556 .push(data2.clone())
557 .await
558 .expect("should not fail to push data");
559 assert_eq!(1, files_in_dir(&root_path).await);
560 assert_eq!(1, push_result.items_dropped);
561 assert_eq!(1, push_result.events_dropped);
562
563 let actual = persisted_queue
566 .pop()
567 .await
568 .expect("should not fail to pop data")
569 .expect("should not be empty");
570 assert_eq!(data2, actual);
571 assert_eq!(0, files_in_dir(&root_path).await);
572 }
573
574 #[tokio::test]
575 async fn storage_ratio_exceeded() {
576 let data1 = FakeData::random();
577 let data2 = FakeData::random();
578
579 let temp_dir = tempfile::tempdir().expect("should not fail to create temporary directory");
583 let root_path = temp_dir.path().to_path_buf();
584
585 let mut persisted_queue = PersistedQueue::<FakeData>::from_root_path(
586 root_path.clone(),
587 80,
588 0.35,
589 DiskUsageRetrieverWrapper::new(Arc::new(MockDiskUsageRetriever {})),
590 )
591 .await
592 .expect("should not fail to create persisted queue");
593
594 assert_eq!(0, files_in_dir(&root_path).await);
596
597 let push_result = persisted_queue.push(data1).await.expect("should not fail to push data");
602
603 assert_eq!(1, files_in_dir(&root_path).await);
604 assert_eq!(0, push_result.items_dropped);
605 assert_eq!(0, push_result.events_dropped);
606
607 let push_result = persisted_queue
609 .push(data2.clone())
610 .await
611 .expect("should not fail to push data");
612 assert_eq!(1, files_in_dir(&root_path).await);
613 assert_eq!(1, push_result.items_dropped);
614 assert_eq!(1, push_result.events_dropped);
615
616 let actual = persisted_queue
619 .pop()
620 .await
621 .expect("should not fail to pop data")
622 .expect("should not be empty");
623 assert_eq!(data2, actual);
624 assert_eq!(0, files_in_dir(&root_path).await);
625 }
626}