saluki_io/net/util/retry/queue/
mod.rs1use std::collections::VecDeque;
2
3use saluki_error::{generic_error, GenericError};
4use serde::{de::DeserializeOwned, Serialize};
5use tracing::{debug, info, warn};
6
7mod persisted;
8use self::persisted::PersistedQueue;
9pub use self::persisted::{DiskUsageRetriever, DiskUsageRetrieverImpl, PersistedQueueArgs};
10
11const DEFAULT_FLUSH_TO_DISK_MEM_RATIO: f64 = 0.5;
12
13pub trait EventContainer {
19 fn event_count(&self) -> u64;
21
22 fn data_point_count(&self) -> u64 {
24 0
25 }
26}
27
28pub trait Retryable: EventContainer + DeserializeOwned + Serialize {
30 fn size_bytes(&self) -> u64;
32}
33
34impl EventContainer for String {
35 fn event_count(&self) -> u64 {
36 1
37 }
38}
39
40impl Retryable for String {
41 fn size_bytes(&self) -> u64 {
42 self.len() as u64
43 }
44}
45
46#[derive(Default)]
51#[must_use = "`PushResult` carries information about potentially dropped items/events and should not be ignored"]
52pub struct PushResult {
53 pub items_dropped: u64,
55
56 pub events_dropped: u64,
58
59 pub data_points_dropped: u64,
61}
62
63impl PushResult {
64 pub fn had_drops(&self) -> bool {
66 self.items_dropped > 0
67 }
68
69 pub fn merge(&mut self, other: Self) {
71 self.items_dropped += other.items_dropped;
72 self.events_dropped += other.events_dropped;
73 self.data_points_dropped += other.data_points_dropped;
74 }
75
76 pub fn track_dropped_item(&mut self, item: &dyn EventContainer) {
78 self.items_dropped += 1;
79 self.events_dropped += item.event_count();
80 self.data_points_dropped += item.data_point_count();
81 }
82}
83
84pub struct RetryQueue<T> {
86 queue_name: String,
87 pending: VecDeque<T>,
88 persisted_pending: Option<PersistedQueue<T>>,
89 total_in_memory_bytes: u64,
90 max_in_memory_bytes: u64,
91 flush_to_disk_mem_ratio: f64,
92}
93
94impl<T> RetryQueue<T>
95where
96 T: Retryable,
97{
98 pub fn new(queue_name: String, max_in_memory_bytes: u64) -> Self {
104 Self {
105 queue_name,
106 pending: VecDeque::new(),
107 persisted_pending: None,
108 total_in_memory_bytes: 0,
109 max_in_memory_bytes,
110 flush_to_disk_mem_ratio: DEFAULT_FLUSH_TO_DISK_MEM_RATIO,
111 }
112 }
113
114 pub fn with_flush_to_disk_mem_ratio(mut self, flush_to_disk_mem_ratio: f64) -> Self {
121 self.flush_to_disk_mem_ratio = flush_to_disk_mem_ratio;
122 self
123 }
124
125 pub async fn with_disk_persistence(mut self, mut args: PersistedQueueArgs) -> Result<Self, GenericError> {
141 if args.root_path.as_os_str().is_empty() {
144 return Err(generic_error!("Storage path cannot be empty."));
145 }
146
147 args.root_path = args.root_path.join(&self.queue_name);
148 let mut persisted_pending = PersistedQueue::from_root_path(args).await?;
149 match persisted_pending.remove_stale_files().await {
150 Ok(removed) if removed > 0 => {
151 info!(count = removed, "Removed outdated retry files from disk.");
152 }
153 Ok(_) => {}
154 Err(e) => warn!(error = %e, "Failed to remove stale retry files."),
155 }
156 self.persisted_pending = Some(persisted_pending);
157 Ok(self)
158 }
159
160 pub fn is_empty(&self) -> bool {
164 self.pending.is_empty() && self.persisted_pending.as_ref().is_none_or(|p| p.is_empty())
165 }
166
167 pub fn len(&self) -> usize {
171 self.pending.len() + self.persisted_pending.as_ref().map_or(0, |p| p.len())
172 }
173
174 pub const fn max_in_memory_bytes(&self) -> u64 {
176 self.max_in_memory_bytes
177 }
178
179 pub const fn available_in_memory_capacity_bytes(&self) -> u64 {
181 self.max_in_memory_bytes.saturating_sub(self.total_in_memory_bytes)
182 }
183
184 pub async fn available_on_disk_capacity_bytes(&self) -> Result<u64, GenericError> {
193 match &self.persisted_pending {
194 Some(persisted_pending) => persisted_pending.available_capacity_bytes().await,
195 None => Ok(0),
196 }
197 }
198
199 pub fn take_persisted_entries_dropped(&mut self) -> u64 {
204 self.persisted_pending.as_mut().map_or(0, |p| p.take_entries_dropped())
205 }
206
207 pub async fn push(&mut self, entry: T) -> Result<PushResult, GenericError> {
220 let mut push_result = PushResult::default();
221
222 let current_entry_size = entry.size_bytes();
224 if current_entry_size > self.max_in_memory_bytes {
225 return Err(generic_error!(
226 "Entry too large to fit into retry queue. ({} > {})",
227 current_entry_size,
228 self.max_in_memory_bytes
229 ));
230 }
231
232 let required_bytes = self
235 .total_in_memory_bytes
236 .saturating_add(current_entry_size)
237 .saturating_sub(self.max_in_memory_bytes);
238 let using_disk = self.persisted_pending.is_some();
239 let bytes_to_remove = if using_disk && required_bytes > 0 {
240 required_bytes.max(flush_to_disk_bytes(
241 self.max_in_memory_bytes,
242 self.flush_to_disk_mem_ratio,
243 ))
244 } else {
245 required_bytes
246 };
247 let mut bytes_removed = 0;
248
249 while !self.pending.is_empty() && bytes_removed < bytes_to_remove {
250 let oldest_entry = self.pending.pop_front().expect("queue is not empty");
251 let oldest_entry_size = oldest_entry.size_bytes();
252
253 if using_disk {
254 let oldest_entry_events = oldest_entry.event_count();
257 let oldest_entry_data_points = oldest_entry.data_point_count();
258 let persisted_pending = self.persisted_pending.as_mut().expect("disk persistence is enabled");
259 match persisted_pending.push(oldest_entry).await {
260 Ok(persist_result) => {
261 push_result.merge(persist_result);
262 debug!(entry.len = oldest_entry_size, "Moved in-memory entry to disk.");
263 }
264 Err(e) => {
265 warn!(
269 error = %e,
270 entry.len = oldest_entry_size,
271 "Failed to persist in-memory entry to disk; dropping entry to make room."
272 );
273 push_result.items_dropped += 1;
274 push_result.events_dropped += oldest_entry_events;
275 push_result.data_points_dropped += oldest_entry_data_points;
276 }
277 }
278 } else {
279 debug!(
280 entry.len = oldest_entry_size,
281 "Dropped in-memory entry to increase available capacity."
282 );
283
284 push_result.track_dropped_item(&oldest_entry);
285 }
286
287 self.total_in_memory_bytes -= oldest_entry_size;
288 bytes_removed += oldest_entry_size;
289 }
290
291 self.pending.push_back(entry);
292 self.total_in_memory_bytes += current_entry_size;
293 debug!(entry.len = current_entry_size, "Enqueued in-memory entry.");
294
295 Ok(push_result)
296 }
297
298 pub async fn pop(&mut self) -> Result<Option<T>, GenericError> {
309 if let Some(entry) = self.pending.pop_front() {
311 self.total_in_memory_bytes -= entry.size_bytes();
312 debug!(entry.len = entry.size_bytes(), "Dequeued in-memory entry.");
313
314 return Ok(Some(entry));
315 }
316
317 if let Some(persisted_pending) = &mut self.persisted_pending {
319 if let Some(entry) = persisted_pending.pop().await? {
320 return Ok(Some(entry));
321 }
322 }
323
324 Ok(None)
325 }
326
327 pub async fn flush(mut self) -> Result<PushResult, GenericError> {
337 let mut push_result = PushResult::default();
338
339 while let Some(entry) = self.pending.pop_front() {
340 let entry_size = entry.size_bytes();
341
342 if let Some(persisted_pending) = &mut self.persisted_pending {
343 let persist_result = persisted_pending.push(entry).await?;
344 push_result.merge(persist_result);
345
346 debug!(entry.len = entry_size, "Flushed in-memory entry to disk.");
347 } else {
348 debug!(entry.len = entry_size, "Dropped in-memory entry during flush.");
349
350 push_result.track_dropped_item(&entry);
351 }
352 }
353
354 Ok(push_result)
355 }
356}
357
358fn flush_to_disk_bytes(max_in_memory_bytes: u64, flush_to_disk_mem_ratio: f64) -> u64 {
359 if flush_to_disk_mem_ratio <= 0.0 || flush_to_disk_mem_ratio.is_nan() {
360 0
361 } else if flush_to_disk_mem_ratio.is_infinite() {
362 u64::MAX
363 } else {
364 ((max_in_memory_bytes as f64) * flush_to_disk_mem_ratio) as u64
366 }
367}
368
369#[cfg(test)]
370mod tests {
371 use std::{path::Path, sync::Arc};
372
373 use rand::RngExt as _;
374 use rand_distr::Alphanumeric;
375 use serde::Deserialize;
376
377 use super::*;
378
379 #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
380 struct FakeData {
381 name: String,
382 value: u32,
383 }
384
385 impl FakeData {
386 fn random() -> Self {
387 Self {
388 name: rand::rng().sample_iter(&Alphanumeric).take(8).map(char::from).collect(),
389 value: rand::rng().random_range(0..100),
390 }
391 }
392 }
393
394 impl EventContainer for FakeData {
395 fn event_count(&self) -> u64 {
396 1
397 }
398 }
399
400 impl Retryable for FakeData {
401 fn size_bytes(&self) -> u64 {
402 (self.name.len() + std::mem::size_of::<String>() + 4) as u64
403 }
404 }
405
406 fn file_count_recursive<P: AsRef<Path>>(path: P) -> u64 {
407 let mut count = 0;
408 let entries = std::fs::read_dir(path).expect("should not fail to read directory");
409 for maybe_entry in entries {
410 let entry = maybe_entry.expect("should not fail to read directory entry");
411 if entry.file_type().expect("should not fail to get file type").is_file() {
412 count += 1;
413 } else if entry.file_type().expect("should not fail to get file type").is_dir() {
414 count += file_count_recursive(entry.path());
415 }
416 }
417 count
418 }
419
420 #[tokio::test]
421 async fn basic_push_pop() {
422 let data = FakeData::random();
423
424 let mut retry_queue = RetryQueue::<FakeData>::new("test".to_string(), 1024);
425
426 let push_result = retry_queue
428 .push(data.clone())
429 .await
430 .expect("should not fail to push data");
431 assert_eq!(0, push_result.items_dropped);
432 assert_eq!(0, push_result.events_dropped);
433
434 let actual = retry_queue
436 .pop()
437 .await
438 .expect("should not fail to pop data")
439 .expect("should not be empty");
440 assert_eq!(data, actual);
441 }
442
443 #[tokio::test]
444 async fn capacity_accessors_report_memory_and_disk_capacity() {
445 let temp_dir = tempfile::tempdir().expect("should not fail to create temporary directory");
446 let root_path = temp_dir.path().to_path_buf();
447 let mut retry_queue = RetryQueue::<FakeData>::new("test".to_string(), 36)
448 .with_disk_persistence(PersistedQueueArgs {
449 root_path: root_path.clone(),
450 max_on_disk_bytes: 1024,
451 storage_max_disk_ratio: 1.0,
452 disk_usage_retriever: Arc::new(DiskUsageRetrieverImpl::new(root_path)),
453 max_age_days: 10,
454 })
455 .await
456 .expect("should not fail to create retry queue with disk persistence");
457
458 assert_eq!(retry_queue.max_in_memory_bytes(), 36);
459 assert_eq!(retry_queue.available_in_memory_capacity_bytes(), 36);
460 assert_eq!(
461 retry_queue
462 .available_on_disk_capacity_bytes()
463 .await
464 .expect("should not fail to calculate disk capacity"),
465 1024
466 );
467
468 let push_result = retry_queue
469 .push(FakeData::random())
470 .await
471 .expect("first push should succeed");
472 assert!(!push_result.had_drops());
473 assert_eq!(retry_queue.available_in_memory_capacity_bytes(), 0);
474 let push_result = retry_queue
475 .push(FakeData::random())
476 .await
477 .expect("second push should persist the oldest entry");
478 assert!(!push_result.had_drops());
479 assert_eq!(retry_queue.available_in_memory_capacity_bytes(), 0);
480
481 assert!(
482 retry_queue
483 .available_on_disk_capacity_bytes()
484 .await
485 .expect("should not fail to calculate disk capacity")
486 < 1024
487 );
488
489 let _ = retry_queue.pop().await.expect("pop should succeed");
490 assert_eq!(retry_queue.available_in_memory_capacity_bytes(), 36);
491 }
492
493 #[tokio::test]
494 async fn entry_too_large() {
495 let data = FakeData::random();
496
497 let mut retry_queue = RetryQueue::<FakeData>::new("test".to_string(), 1);
498
499 assert!(retry_queue.push(data).await.is_err());
501 }
502
503 #[tokio::test]
504 async fn remove_oldest_entry_on_push() {
505 let data1 = FakeData::random();
506 let data2 = FakeData::random();
507
508 let mut retry_queue = RetryQueue::<FakeData>::new("test".to_string(), 36);
510
511 let push_result = retry_queue.push(data1).await.expect("should not fail to push data");
513 assert_eq!(0, push_result.items_dropped);
514 assert_eq!(0, push_result.events_dropped);
515
516 let push_result = retry_queue
518 .push(data2.clone())
519 .await
520 .expect("should not fail to push data");
521 assert_eq!(1, push_result.items_dropped);
522 assert_eq!(1, push_result.events_dropped);
523
524 let actual = retry_queue
527 .pop()
528 .await
529 .expect("should not fail to pop data")
530 .expect("should not be empty");
531 assert_eq!(data2, actual);
532 }
533
534 #[tokio::test]
535 async fn flush_no_disk() {
536 let data1 = FakeData::random();
537 let data2 = FakeData::random();
538
539 let mut retry_queue = RetryQueue::<FakeData>::new("test".to_string(), u64::MAX);
541
542 let push_result1 = retry_queue.push(data1).await.expect("should not fail to push data");
544 assert_eq!(0, push_result1.items_dropped);
545 assert_eq!(0, push_result1.events_dropped);
546 let push_result2 = retry_queue.push(data2).await.expect("should not fail to push data");
547 assert_eq!(0, push_result2.items_dropped);
548 assert_eq!(0, push_result2.events_dropped);
549
550 let flush_result = retry_queue.flush().await.expect("should not fail to flush");
552 assert_eq!(2, flush_result.items_dropped);
553 assert_eq!(2, flush_result.events_dropped);
554 }
555
556 #[tokio::test]
557 async fn flush_disk() {
558 let data1 = FakeData::random();
559 let data2 = FakeData::random();
560
561 let temp_dir = tempfile::tempdir().expect("should not fail to create temporary directory");
563 let root_path = temp_dir.path().to_path_buf();
564
565 assert_eq!(0, file_count_recursive(&root_path));
567
568 let mut retry_queue = RetryQueue::<FakeData>::new("test".to_string(), u64::MAX)
569 .with_disk_persistence(PersistedQueueArgs {
570 root_path: root_path.clone(),
571 max_on_disk_bytes: u64::MAX,
572 storage_max_disk_ratio: 1.0,
573 disk_usage_retriever: Arc::new(DiskUsageRetrieverImpl::new(root_path.clone())),
574 max_age_days: 10,
575 })
576 .await
577 .expect("should not fail to create retry queue with disk persistence");
578
579 let push_result1 = retry_queue.push(data1).await.expect("should not fail to push data");
581 assert_eq!(0, push_result1.items_dropped);
582 assert_eq!(0, push_result1.events_dropped);
583 let push_result2 = retry_queue.push(data2).await.expect("should not fail to push data");
584 assert_eq!(0, push_result2.items_dropped);
585 assert_eq!(0, push_result2.events_dropped);
586
587 let flush_result = retry_queue.flush().await.expect("should not fail to flush");
589 assert_eq!(0, flush_result.items_dropped);
590 assert_eq!(0, flush_result.events_dropped);
591
592 assert_eq!(2, file_count_recursive(&root_path));
594 }
595
596 #[tokio::test]
597 async fn disk_overflow_flushes_configured_memory_ratio() {
598 let data1 = FakeData::random();
599 let data2 = FakeData::random();
600 let data3 = FakeData::random();
601 let data4 = FakeData::random();
602
603 let temp_dir = tempfile::tempdir().expect("should not fail to create temporary directory");
604 let root_path = temp_dir.path().to_path_buf();
605
606 let mut retry_queue = RetryQueue::<FakeData>::new("test".to_string(), 120)
607 .with_flush_to_disk_mem_ratio(0.5)
608 .with_disk_persistence(PersistedQueueArgs {
609 root_path: root_path.clone(),
610 max_on_disk_bytes: u64::MAX,
611 storage_max_disk_ratio: 1.0,
612 disk_usage_retriever: Arc::new(DiskUsageRetrieverImpl::new(root_path.clone())),
613 max_age_days: 10,
614 })
615 .await
616 .expect("should not fail to create retry queue with disk persistence");
617
618 let push_result = retry_queue
619 .push(data1.clone())
620 .await
621 .expect("should not fail to push data");
622 assert_eq!(0, push_result.items_dropped);
623 assert_eq!(0, push_result.events_dropped);
624 let push_result = retry_queue
625 .push(data2.clone())
626 .await
627 .expect("should not fail to push data");
628 assert_eq!(0, push_result.items_dropped);
629 assert_eq!(0, push_result.events_dropped);
630 let push_result = retry_queue
631 .push(data3.clone())
632 .await
633 .expect("should not fail to push data");
634 assert_eq!(0, push_result.items_dropped);
635 assert_eq!(0, push_result.events_dropped);
636
637 let push_result = retry_queue
638 .push(data4.clone())
639 .await
640 .expect("should not fail to push data");
641 assert_eq!(0, push_result.items_dropped);
642 assert_eq!(0, push_result.events_dropped);
643 assert!(file_count_recursive(&root_path) >= 2);
644
645 let actual = retry_queue
648 .pop()
649 .await
650 .expect("should not fail to pop data")
651 .expect("should not be empty");
652 assert_eq!(data3, actual);
653
654 let actual = retry_queue
655 .pop()
656 .await
657 .expect("should not fail to pop data")
658 .expect("should not be empty");
659 assert_eq!(data4, actual);
660
661 let actual = retry_queue
662 .pop()
663 .await
664 .expect("should not fail to pop data")
665 .expect("should not be empty");
666 assert_eq!(data1, actual);
667
668 let actual = retry_queue
669 .pop()
670 .await
671 .expect("should not fail to pop data")
672 .expect("should not be empty");
673 assert_eq!(data2, actual);
674 }
675
676 #[tokio::test]
677 async fn zero_disk_flush_ratio_persists_required_entries() {
678 let data1 = FakeData::random();
679 let data2 = FakeData::random();
680 let data3 = FakeData::random();
681
682 let temp_dir = tempfile::tempdir().expect("should not fail to create temporary directory");
683 let root_path = temp_dir.path().to_path_buf();
684
685 let mut retry_queue = RetryQueue::<FakeData>::new("test".to_string(), 72)
686 .with_flush_to_disk_mem_ratio(0.0)
687 .with_disk_persistence(PersistedQueueArgs {
688 root_path: root_path.clone(),
689 max_on_disk_bytes: u64::MAX,
690 storage_max_disk_ratio: 1.0,
691 disk_usage_retriever: Arc::new(DiskUsageRetrieverImpl::new(root_path.clone())),
692 max_age_days: 10,
693 })
694 .await
695 .expect("should not fail to create retry queue with disk persistence");
696
697 let push_result = retry_queue
698 .push(data1.clone())
699 .await
700 .expect("should not fail to push data");
701 assert_eq!(0, push_result.items_dropped);
702 assert_eq!(0, push_result.events_dropped);
703 let push_result = retry_queue
704 .push(data2.clone())
705 .await
706 .expect("should not fail to push data");
707 assert_eq!(0, push_result.items_dropped);
708 assert_eq!(0, push_result.events_dropped);
709
710 let push_result = retry_queue
711 .push(data3.clone())
712 .await
713 .expect("should not fail to push data");
714 assert_eq!(0, push_result.items_dropped);
715 assert_eq!(0, push_result.events_dropped);
716 assert_eq!(1, file_count_recursive(&root_path));
717
718 let actual = retry_queue
719 .pop()
720 .await
721 .expect("should not fail to pop data")
722 .expect("should not be empty");
723 assert_eq!(data2, actual);
724
725 let actual = retry_queue
726 .pop()
727 .await
728 .expect("should not fail to pop data")
729 .expect("should not be empty");
730 assert_eq!(data3, actual);
731
732 let actual = retry_queue
733 .pop()
734 .await
735 .expect("should not fail to pop data")
736 .expect("should not be empty");
737 assert_eq!(data1, actual);
738 }
739}