saluki_io/net/util/retry/queue/
persisted.rs1use std::{
2 io,
3 marker::PhantomData,
4 path::{Path, PathBuf},
5 sync::Arc,
6};
7
8use chrono::{DateTime, NaiveDateTime, Utc};
9use fs4::{available_space, total_space};
10use rand::Rng;
11use saluki_error::{generic_error, ErrorContext as _, GenericError};
12use serde::{de::DeserializeOwned, Serialize};
13use tracing::{debug, warn};
14
15use super::{EventContainer, PushResult};
16
17struct PersistedEntry {
21 path: PathBuf,
22 timestamp: u128,
23 size_bytes: u64,
24}
25
26impl PersistedEntry {
27 fn try_from_path(path: PathBuf, size_bytes: u64) -> Option<Self> {
31 let timestamp = decode_timestamped_filename(&path)?;
32 Some(Self {
33 path,
34 timestamp,
35 size_bytes,
36 })
37 }
38
39 fn from_parts(path: PathBuf, timestamp: u128, size_bytes: u64) -> Self {
40 Self {
41 path,
42 timestamp,
43 size_bytes,
44 }
45 }
46}
47
48pub trait DiskUsageRetriever {
49 fn total_space(&self) -> Result<u64, GenericError>;
50 fn available_space(&self) -> Result<u64, GenericError>;
51}
52
53pub struct DiskUsageRetrieverImpl {
54 root_path: PathBuf,
55}
56
57impl DiskUsageRetrieverImpl {
58 pub fn new(root_path: PathBuf) -> Self {
59 Self { root_path }
60 }
61}
62
63impl DiskUsageRetriever for DiskUsageRetrieverImpl {
64 fn total_space(&self) -> Result<u64, GenericError> {
65 total_space(&self.root_path)
66 .with_error_context(|| format!("Failed to get total space for '{}'.", self.root_path.display()))
67 }
68
69 fn available_space(&self) -> Result<u64, GenericError> {
70 available_space(&self.root_path)
71 .with_error_context(|| format!("Failed to get available space for '{}'.", self.root_path.display()))
72 }
73}
74
75#[derive(Clone)]
76pub struct DiskUsageRetrieverWrapper {
77 inner: Arc<dyn DiskUsageRetriever + Send + Sync>,
78}
79
80impl DiskUsageRetrieverWrapper {
81 pub fn new(disk_usage_retriever: Arc<dyn DiskUsageRetriever + Send + Sync>) -> Self {
82 Self {
83 inner: disk_usage_retriever,
84 }
85 }
86}
87
88pub struct PersistedQueue<T> {
89 root_path: PathBuf,
90 entries: Vec<PersistedEntry>,
91 total_on_disk_bytes: u64,
92 max_on_disk_bytes: u64,
93 storage_max_disk_ratio: f64,
94 disk_usage_retriever: DiskUsageRetrieverWrapper,
95 _entry: PhantomData<T>,
96}
97
98impl<T> PersistedQueue<T>
99where
100 T: EventContainer + DeserializeOwned + Serialize,
101{
102 pub async fn from_root_path(
112 root_path: PathBuf, max_on_disk_bytes: u64, storage_max_disk_ratio: f64,
113 disk_usage_retriever: DiskUsageRetrieverWrapper,
114 ) -> Result<Self, GenericError> {
115 create_directory_recursive(root_path.clone())
117 .await
118 .with_error_context(|| format!("Failed to create retry directory '{}'.", root_path.display()))?;
119
120 let mut persisted_requests = Self {
121 root_path: root_path.clone(),
122 entries: Vec::new(),
123 total_on_disk_bytes: 0,
124 max_on_disk_bytes,
125 storage_max_disk_ratio,
126 disk_usage_retriever,
127 _entry: PhantomData,
128 };
129
130 persisted_requests.refresh_entry_state().await?;
131
132 Ok(persisted_requests)
133 }
134
135 pub fn is_empty(&self) -> bool {
137 self.entries.is_empty()
138 }
139
140 pub fn len(&self) -> usize {
142 self.entries.len()
143 }
144
145 pub async fn push(&mut self, entry: T) -> Result<PushResult, GenericError> {
152 let (filename, timestamp) = generate_timestamped_filename();
154 let entry_path = self.root_path.join(filename);
155 let serialized = serde_json::to_vec(&entry)
156 .with_error_context(|| format!("Failed to serialize entry for '{}'.", entry_path.display()))?;
157
158 if serialized.len() as u64 > self.max_on_disk_bytes {
159 return Err(generic_error!("Entry is too large to persist."));
160 }
161
162 let push_result = self
164 .remove_until_available_space(serialized.len() as u64)
165 .await
166 .error_context(
167 "Failed to remove older persisted entries to make space for the incoming persisted entry.",
168 )?;
169
170 tokio::fs::write(&entry_path, &serialized)
172 .await
173 .with_error_context(|| format!("Failed to write entry to '{}'.", entry_path.display()))?;
174
175 self.entries.push(PersistedEntry::from_parts(
177 entry_path,
178 timestamp,
179 serialized.len() as u64,
180 ));
181 self.total_on_disk_bytes += serialized.len() as u64;
182
183 debug!(entry.len = serialized.len(), "Enqueued persisted entry.");
184
185 Ok(push_result)
186 }
187
188 pub async fn pop(&mut self) -> Result<Option<T>, GenericError> {
194 if self.entries.is_empty() {
195 return Ok(None);
196 }
197
198 loop {
199 let entry = self.entries.remove(0);
200 match try_deserialize_entry(&entry).await {
201 Ok(Some(deserialized)) => {
202 self.total_on_disk_bytes -= entry.size_bytes;
204 debug!(entry.len = entry.size_bytes, "Dequeued persisted entry.");
205
206 return Ok(Some(deserialized));
207 }
208 Ok(None) => {
209 self.refresh_entry_state().await?;
212 continue;
213 }
214 Err(e) => {
215 self.entries.insert(0, entry);
217 return Err(e);
218 }
219 }
220 }
221 }
222
223 async fn refresh_entry_state(&mut self) -> io::Result<()> {
224 let mut entries = Vec::new();
226
227 let mut dir_reader = tokio::fs::read_dir(&self.root_path).await?;
228 while let Some(entry) = dir_reader.next_entry().await? {
229 let metadata = entry.metadata().await?;
230 if metadata.is_file() {
231 match PersistedEntry::try_from_path(entry.path(), metadata.len()) {
232 Some(entry) => entries.push(entry),
233 None => {
234 warn!(
235 file_size = metadata.len(),
236 "Ignoring unrecognized file '{}' in retry directory.",
237 entry.path().display()
238 );
239 continue;
240 }
241 }
242 }
243 }
244
245 entries.sort_by_key(|entry| entry.timestamp);
247 self.total_on_disk_bytes = entries.iter().map(|entry| entry.size_bytes).sum();
248 self.entries = entries;
249
250 Ok(())
251 }
252
253 async fn remove_until_available_space(&mut self, required_bytes: u64) -> Result<PushResult, GenericError> {
260 let mut push_result = PushResult::default();
261
262 let disk_usage_retriever = self.disk_usage_retriever.clone();
263 let storage_max_disk_ratio = self.storage_max_disk_ratio;
264 let max_on_disk_bytes = self.max_on_disk_bytes;
265
266 let limit = tokio::task::spawn_blocking(move || {
267 on_disk_bytes_limit(disk_usage_retriever, storage_max_disk_ratio, max_on_disk_bytes)
268 })
269 .await??;
270
271 while !self.entries.is_empty() && self.total_on_disk_bytes + required_bytes > limit {
272 let entry = self.entries.remove(0);
273
274 let event_count = match try_deserialize_entry::<T>(&entry).await {
276 Ok(Some(deserialized)) => deserialized.event_count(),
277 Ok(None) => {
278 warn!(entry.path = %entry.path.display(), "Failed to find entry on disk. Persisted entry state may be inconsistent.");
279 continue;
280 }
281 Err(e) => {
282 self.entries.insert(0, entry);
284 return Err(e);
285 }
286 };
287
288 self.total_on_disk_bytes -= entry.size_bytes;
290 push_result.track_dropped_item(event_count);
291
292 debug!(entry.path = %entry.path.display(), entry.len = entry.size_bytes, "Dropped persisted entry.");
293 }
294
295 Ok(push_result)
296 }
297}
298
299fn on_disk_bytes_limit(
307 disk_usage_retriever: DiskUsageRetrieverWrapper, storage_max_disk_ratio: f64, max_on_disk_bytes: u64,
308) -> Result<u64, GenericError> {
309 let total_space = disk_usage_retriever.inner.total_space()? as f64;
310 let available_space = disk_usage_retriever.inner.available_space()? as f64;
311 let disk_reserved = total_space * (1.0 - storage_max_disk_ratio);
312 let available_disk_usage = (available_space - disk_reserved).ceil() as u64;
313 Ok(max_on_disk_bytes.min(available_disk_usage))
314}
315
316async fn try_deserialize_entry<T: DeserializeOwned>(entry: &PersistedEntry) -> Result<Option<T>, GenericError> {
317 let serialized = match tokio::fs::read(&entry.path).await {
318 Ok(serialized) => serialized,
319 Err(e) => match e.kind() {
320 io::ErrorKind::NotFound => {
321 return Ok(None);
327 }
328 _ => {
329 return Err(e)
330 .with_error_context(|| format!("Failed to read persisted entry '{}'.", entry.path.display()))
331 }
332 },
333 };
334
335 let deserialized = serde_json::from_slice(&serialized)
336 .with_error_context(|| format!("Failed to deserialize persisted entry '{}'.", entry.path.display()))?;
337
338 tokio::fs::remove_file(&entry.path)
340 .await
341 .with_error_context(|| format!("Failed to delete persisted entry '{}'.", entry.path.display()))?;
342
343 debug!(entry.path = %entry.path.display(), entry.len = entry.size_bytes, "Consumed persisted entry and removed from disk.");
344 Ok(Some(deserialized))
345}
346
347fn generate_timestamped_filename() -> (PathBuf, u128) {
348 let now = Utc::now();
349 let now_ts = datetime_to_timestamp(now);
350 let nonce = rand::rng().random_range(100000000..999999999);
351
352 let filename = format!("retry-{}-{}.json", now.format("%Y%m%d%H%M%S%f"), nonce).into();
353
354 (filename, now_ts)
355}
356
357fn decode_timestamped_filename(path: &Path) -> Option<u128> {
358 let filename = path.file_stem()?.to_str()?;
359 let mut filename_parts = filename.split('-');
360
361 let prefix = filename_parts.next()?;
362 let timestamp_str = filename_parts.next()?;
363 let nonce = filename_parts.next()?;
364
365 if prefix != "retry" || nonce.parse::<u64>().is_err() {
367 return None;
368 }
369
370 NaiveDateTime::parse_from_str(timestamp_str, "%Y%m%d%H%M%S%f")
372 .map(|dt| datetime_to_timestamp(dt.and_utc()))
373 .ok()
374}
375
376fn datetime_to_timestamp(dt: DateTime<Utc>) -> u128 {
377 let secs = (dt.timestamp() as u128) * 1_000_000_000;
378 let ns = dt.timestamp_subsec_nanos() as u128;
379
380 secs + ns
381}
382
383async fn create_directory_recursive(path: PathBuf) -> Result<(), GenericError> {
384 let mut dir_builder = std::fs::DirBuilder::new();
385 dir_builder.recursive(true);
386
387 #[cfg(unix)]
390 {
391 use std::os::unix::fs::DirBuilderExt;
392 dir_builder.mode(0o700);
393 }
394
395 tokio::task::spawn_blocking(move || {
396 dir_builder
397 .create(&path)
398 .with_error_context(|| format!("Failed to create directory '{}'.", path.display()))
399 })
400 .await
401 .error_context("Failed to spawn directory creation blocking task.")?
402}
403
404#[cfg(test)]
405mod tests {
406 use rand_distr::Alphanumeric;
407 use serde::Deserialize;
408
409 use super::*;
410
411 #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
412 struct FakeData {
413 name: String,
414 value: u32,
415 }
416
417 impl FakeData {
418 fn random() -> Self {
419 Self {
420 name: rand::rng().sample_iter(&Alphanumeric).take(8).map(char::from).collect(),
421 value: rand::rng().random_range(0..100),
422 }
423 }
424 }
425
426 impl EventContainer for FakeData {
427 fn event_count(&self) -> u64 {
428 1
429 }
430 }
431
432 struct MockDiskUsageRetriever {}
433
434 impl DiskUsageRetriever for MockDiskUsageRetriever {
435 fn total_space(&self) -> Result<u64, GenericError> {
436 Ok(100)
437 }
438 fn available_space(&self) -> Result<u64, GenericError> {
439 Ok(100)
440 }
441 }
442
443 async fn files_in_dir(path: &Path) -> usize {
444 let mut file_count = 0;
445 let mut dir_reader = tokio::fs::read_dir(path).await.unwrap();
446 while let Some(entry) = dir_reader.next_entry().await.unwrap() {
447 if entry.metadata().await.unwrap().is_file() {
448 file_count += 1;
449 }
450 }
451 file_count
452 }
453
454 #[tokio::test]
455 async fn basic_push_pop() {
456 let data = FakeData::random();
457
458 let temp_dir = tempfile::tempdir().expect("should not fail to create temporary directory");
460 let root_path = temp_dir.path().to_path_buf();
461
462 let mut persisted_queue = PersistedQueue::<FakeData>::from_root_path(
463 root_path.clone(),
464 1024,
465 0.8,
466 DiskUsageRetrieverWrapper::new(Arc::new(DiskUsageRetrieverImpl::new(root_path.clone()))),
467 )
468 .await
469 .expect("should not fail to create persisted queue");
470
471 assert_eq!(0, files_in_dir(&root_path).await);
473
474 let push_result = persisted_queue
476 .push(data.clone())
477 .await
478 .expect("should not fail to push data");
479 assert_eq!(1, files_in_dir(&root_path).await);
480 assert_eq!(0, push_result.items_dropped);
481 assert_eq!(0, push_result.events_dropped);
482
483 let actual = persisted_queue
485 .pop()
486 .await
487 .expect("should not fail to pop data")
488 .expect("should not be empty");
489 assert_eq!(data, actual);
490 assert_eq!(0, files_in_dir(&root_path).await);
491 }
492
493 #[tokio::test]
494 async fn entry_too_large() {
495 let data = FakeData::random();
496
497 let temp_dir = tempfile::tempdir().expect("should not fail to create temporary directory");
499 let root_path = temp_dir.path().to_path_buf();
500
501 let mut persisted_queue = PersistedQueue::<FakeData>::from_root_path(
502 root_path.clone(),
503 1,
504 0.8,
505 DiskUsageRetrieverWrapper::new(Arc::new(DiskUsageRetrieverImpl::new(root_path.clone()))),
506 )
507 .await
508 .expect("should not fail to create persisted queue");
509
510 assert_eq!(0, files_in_dir(&root_path).await);
512
513 assert!(persisted_queue.push(data).await.is_err());
515
516 assert_eq!(0, files_in_dir(&root_path).await);
518 }
519
520 #[tokio::test]
521 async fn remove_oldest_entry_on_push() {
522 let data1 = FakeData::random();
523 let data2 = FakeData::random();
524
525 let temp_dir = tempfile::tempdir().expect("should not fail to create temporary directory");
529 let root_path = temp_dir.path().to_path_buf();
530
531 let mut persisted_queue = PersistedQueue::<FakeData>::from_root_path(
532 root_path.clone(),
533 32,
534 0.8,
535 DiskUsageRetrieverWrapper::new(Arc::new(DiskUsageRetrieverImpl::new(root_path.clone()))),
536 )
537 .await
538 .expect("should not fail to create persisted queue");
539
540 assert_eq!(0, files_in_dir(&root_path).await);
542
543 let push_result = persisted_queue.push(data1).await.expect("should not fail to push data");
545 assert_eq!(1, files_in_dir(&root_path).await);
546 assert_eq!(0, push_result.items_dropped);
547 assert_eq!(0, push_result.events_dropped);
548
549 let push_result = persisted_queue
551 .push(data2.clone())
552 .await
553 .expect("should not fail to push data");
554 assert_eq!(1, files_in_dir(&root_path).await);
555 assert_eq!(1, push_result.items_dropped);
556 assert_eq!(1, push_result.events_dropped);
557
558 let actual = persisted_queue
561 .pop()
562 .await
563 .expect("should not fail to pop data")
564 .expect("should not be empty");
565 assert_eq!(data2, actual);
566 assert_eq!(0, files_in_dir(&root_path).await);
567 }
568
569 #[tokio::test]
570 async fn storage_ratio_exceeded() {
571 let data1 = FakeData::random();
572 let data2 = FakeData::random();
573
574 let temp_dir = tempfile::tempdir().expect("should not fail to create temporary directory");
578 let root_path = temp_dir.path().to_path_buf();
579
580 let mut persisted_queue = PersistedQueue::<FakeData>::from_root_path(
581 root_path.clone(),
582 80,
583 0.35,
584 DiskUsageRetrieverWrapper::new(Arc::new(MockDiskUsageRetriever {})),
585 )
586 .await
587 .expect("should not fail to create persisted queue");
588
589 assert_eq!(0, files_in_dir(&root_path).await);
591
592 let push_result = persisted_queue.push(data1).await.expect("should not fail to push data");
597
598 assert_eq!(1, files_in_dir(&root_path).await);
599 assert_eq!(0, push_result.items_dropped);
600 assert_eq!(0, push_result.events_dropped);
601
602 let push_result = persisted_queue
604 .push(data2.clone())
605 .await
606 .expect("should not fail to push data");
607 assert_eq!(1, files_in_dir(&root_path).await);
608 assert_eq!(1, push_result.items_dropped);
609 assert_eq!(1, push_result.events_dropped);
610
611 let actual = persisted_queue
614 .pop()
615 .await
616 .expect("should not fail to pop data")
617 .expect("should not be empty");
618 assert_eq!(data2, actual);
619 assert_eq!(0, files_in_dir(&root_path).await);
620 }
621}