saluki_io/net/util/retry/queue/
mod.rs1use std::{collections::VecDeque, path::PathBuf, sync::Arc};
2
3use saluki_error::{generic_error, GenericError};
4use serde::{de::DeserializeOwned, Serialize};
5use tracing::debug;
6
7mod persisted;
8pub use self::persisted::DiskUsageRetrieverImpl;
9use self::persisted::{DiskUsageRetriever, DiskUsageRetrieverWrapper, PersistedQueue};
10
11pub trait EventContainer {
17 fn event_count(&self) -> u64;
19
20 fn data_point_count(&self) -> u64 {
22 0
23 }
24}
25
26pub trait Retryable: EventContainer + DeserializeOwned + Serialize {
28 fn size_bytes(&self) -> u64;
30}
31
32impl EventContainer for String {
33 fn event_count(&self) -> u64 {
34 1
35 }
36}
37
38impl Retryable for String {
39 fn size_bytes(&self) -> u64 {
40 self.len() as u64
41 }
42}
43
44#[derive(Default)]
49#[must_use = "`PushResult` carries information about potentially dropped items/events and should not be ignored"]
50pub struct PushResult {
51 pub items_dropped: u64,
53
54 pub events_dropped: u64,
56
57 pub data_points_dropped: u64,
59}
60
61impl PushResult {
62 pub fn had_drops(&self) -> bool {
64 self.items_dropped > 0
65 }
66
67 pub fn merge(&mut self, other: Self) {
69 self.items_dropped += other.items_dropped;
70 self.events_dropped += other.events_dropped;
71 self.data_points_dropped += other.data_points_dropped;
72 }
73
74 pub fn track_dropped_item(&mut self, item: &dyn EventContainer) {
76 self.items_dropped += 1;
77 self.events_dropped += item.event_count();
78 self.data_points_dropped += item.data_point_count();
79 }
80}
81
82pub struct RetryQueue<T> {
84 queue_name: String,
85 pending: VecDeque<T>,
86 persisted_pending: Option<PersistedQueue<T>>,
87 total_in_memory_bytes: u64,
88 max_in_memory_bytes: u64,
89}
90
91impl<T> RetryQueue<T>
92where
93 T: Retryable,
94{
95 pub fn new(queue_name: String, max_in_memory_bytes: u64) -> Self {
101 Self {
102 queue_name,
103 pending: VecDeque::new(),
104 persisted_pending: None,
105 total_in_memory_bytes: 0,
106 max_in_memory_bytes,
107 }
108 }
109
110 pub async fn with_disk_persistence(
126 mut self, root_path: PathBuf, max_disk_size_bytes: u64, storage_max_disk_ratio: f64,
127 disk_usage_retriever: Arc<dyn DiskUsageRetriever + Send + Sync>,
128 ) -> Result<Self, GenericError> {
129 if root_path.as_os_str().is_empty() {
132 return Err(generic_error!("Storage path cannot be empty."));
133 }
134
135 let queue_root_path = root_path.join(&self.queue_name);
136 let persisted_pending = PersistedQueue::from_root_path(
137 queue_root_path,
138 max_disk_size_bytes,
139 storage_max_disk_ratio,
140 DiskUsageRetrieverWrapper::new(disk_usage_retriever),
141 )
142 .await?;
143 self.persisted_pending = Some(persisted_pending);
144 Ok(self)
145 }
146
147 pub fn is_empty(&self) -> bool {
151 self.pending.is_empty() && self.persisted_pending.as_ref().is_none_or(|p| p.is_empty())
152 }
153
154 pub fn len(&self) -> usize {
158 self.pending.len() + self.persisted_pending.as_ref().map_or(0, |p| p.len())
159 }
160
161 pub fn take_persisted_entries_dropped(&mut self) -> u64 {
166 self.persisted_pending.as_mut().map_or(0, |p| p.take_entries_dropped())
167 }
168
169 pub async fn push(&mut self, entry: T) -> Result<PushResult, GenericError> {
180 let mut push_result = PushResult::default();
181
182 let current_entry_size = entry.size_bytes();
184 if current_entry_size > self.max_in_memory_bytes {
185 return Err(generic_error!(
186 "Entry too large to fit into retry queue. ({} > {})",
187 current_entry_size,
188 self.max_in_memory_bytes
189 ));
190 }
191
192 while !self.pending.is_empty() && self.total_in_memory_bytes + current_entry_size > self.max_in_memory_bytes {
195 let oldest_entry = self.pending.pop_front().expect("queue is not empty");
196 let oldest_entry_size = oldest_entry.size_bytes();
197
198 if let Some(persisted_pending) = &mut self.persisted_pending {
199 let persist_result = persisted_pending.push(oldest_entry).await?;
200 push_result.merge(persist_result);
201
202 debug!(entry.len = oldest_entry_size, "Moved in-memory entry to disk.");
203 } else {
204 debug!(
205 entry.len = oldest_entry_size,
206 "Dropped in-memory entry to increase available capacity."
207 );
208
209 push_result.track_dropped_item(&oldest_entry);
210 }
211
212 self.total_in_memory_bytes -= oldest_entry_size;
213 }
214
215 self.pending.push_back(entry);
216 self.total_in_memory_bytes += current_entry_size;
217 debug!(entry.len = current_entry_size, "Enqueued in-memory entry.");
218
219 Ok(push_result)
220 }
221
222 pub async fn pop(&mut self) -> Result<Option<T>, GenericError> {
233 if let Some(entry) = self.pending.pop_front() {
235 self.total_in_memory_bytes -= entry.size_bytes();
236 debug!(entry.len = entry.size_bytes(), "Dequeued in-memory entry.");
237
238 return Ok(Some(entry));
239 }
240
241 if let Some(persisted_pending) = &mut self.persisted_pending {
243 if let Some(entry) = persisted_pending.pop().await? {
244 return Ok(Some(entry));
245 }
246 }
247
248 Ok(None)
249 }
250
251 pub async fn flush(mut self) -> Result<PushResult, GenericError> {
261 let mut push_result = PushResult::default();
262
263 while let Some(entry) = self.pending.pop_front() {
264 let entry_size = entry.size_bytes();
265
266 if let Some(persisted_pending) = &mut self.persisted_pending {
267 let persist_result = persisted_pending.push(entry).await?;
268 push_result.merge(persist_result);
269
270 debug!(entry.len = entry_size, "Flushed in-memory entry to disk.");
271 } else {
272 debug!(entry.len = entry_size, "Dropped in-memory entry during flush.");
273
274 push_result.track_dropped_item(&entry);
275 }
276 }
277
278 Ok(push_result)
279 }
280}
281
282#[cfg(test)]
283mod tests {
284 use std::path::Path;
285
286 use rand::RngExt as _;
287 use rand_distr::Alphanumeric;
288 use serde::Deserialize;
289
290 use super::*;
291
292 #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
293 struct FakeData {
294 name: String,
295 value: u32,
296 }
297
298 impl FakeData {
299 fn random() -> Self {
300 Self {
301 name: rand::rng().sample_iter(&Alphanumeric).take(8).map(char::from).collect(),
302 value: rand::rng().random_range(0..100),
303 }
304 }
305 }
306
307 impl EventContainer for FakeData {
308 fn event_count(&self) -> u64 {
309 1
310 }
311 }
312
313 impl Retryable for FakeData {
314 fn size_bytes(&self) -> u64 {
315 (self.name.len() + std::mem::size_of::<String>() + 4) as u64
316 }
317 }
318
319 fn file_count_recursive<P: AsRef<Path>>(path: P) -> u64 {
320 let mut count = 0;
321 let entries = std::fs::read_dir(path).expect("should not fail to read directory");
322 for maybe_entry in entries {
323 let entry = maybe_entry.expect("should not fail to read directory entry");
324 if entry.file_type().expect("should not fail to get file type").is_file() {
325 count += 1;
326 } else if entry.file_type().expect("should not fail to get file type").is_dir() {
327 count += file_count_recursive(entry.path());
328 }
329 }
330 count
331 }
332
333 #[tokio::test]
334 async fn basic_push_pop() {
335 let data = FakeData::random();
336
337 let mut retry_queue = RetryQueue::<FakeData>::new("test".to_string(), 1024);
338
339 let push_result = retry_queue
341 .push(data.clone())
342 .await
343 .expect("should not fail to push data");
344 assert_eq!(0, push_result.items_dropped);
345 assert_eq!(0, push_result.events_dropped);
346
347 let actual = retry_queue
349 .pop()
350 .await
351 .expect("should not fail to pop data")
352 .expect("should not be empty");
353 assert_eq!(data, actual);
354 }
355
356 #[tokio::test]
357 async fn entry_too_large() {
358 let data = FakeData::random();
359
360 let mut retry_queue = RetryQueue::<FakeData>::new("test".to_string(), 1);
361
362 assert!(retry_queue.push(data).await.is_err());
364 }
365
366 #[tokio::test]
367 async fn remove_oldest_entry_on_push() {
368 let data1 = FakeData::random();
369 let data2 = FakeData::random();
370
371 let mut retry_queue = RetryQueue::<FakeData>::new("test".to_string(), 36);
373
374 let push_result = retry_queue.push(data1).await.expect("should not fail to push data");
376 assert_eq!(0, push_result.items_dropped);
377 assert_eq!(0, push_result.events_dropped);
378
379 let push_result = retry_queue
381 .push(data2.clone())
382 .await
383 .expect("should not fail to push data");
384 assert_eq!(1, push_result.items_dropped);
385 assert_eq!(1, push_result.events_dropped);
386
387 let actual = retry_queue
390 .pop()
391 .await
392 .expect("should not fail to pop data")
393 .expect("should not be empty");
394 assert_eq!(data2, actual);
395 }
396
397 #[tokio::test]
398 async fn flush_no_disk() {
399 let data1 = FakeData::random();
400 let data2 = FakeData::random();
401
402 let mut retry_queue = RetryQueue::<FakeData>::new("test".to_string(), u64::MAX);
404
405 let push_result1 = retry_queue.push(data1).await.expect("should not fail to push data");
407 assert_eq!(0, push_result1.items_dropped);
408 assert_eq!(0, push_result1.events_dropped);
409 let push_result2 = retry_queue.push(data2).await.expect("should not fail to push data");
410 assert_eq!(0, push_result2.items_dropped);
411 assert_eq!(0, push_result2.events_dropped);
412
413 let flush_result = retry_queue.flush().await.expect("should not fail to flush");
415 assert_eq!(2, flush_result.items_dropped);
416 assert_eq!(2, flush_result.events_dropped);
417 }
418
419 #[tokio::test]
420 async fn flush_disk() {
421 let data1 = FakeData::random();
422 let data2 = FakeData::random();
423
424 let temp_dir = tempfile::tempdir().expect("should not fail to create temporary directory");
426 let root_path = temp_dir.path().to_path_buf();
427
428 assert_eq!(0, file_count_recursive(&root_path));
430
431 let mut retry_queue = RetryQueue::<FakeData>::new("test".to_string(), u64::MAX)
432 .with_disk_persistence(
433 root_path.clone(),
434 u64::MAX,
435 1.0,
436 Arc::new(DiskUsageRetrieverImpl::new(root_path.clone())),
437 )
438 .await
439 .expect("should not fail to create retry queue with disk persistence");
440
441 let push_result1 = retry_queue.push(data1).await.expect("should not fail to push data");
443 assert_eq!(0, push_result1.items_dropped);
444 assert_eq!(0, push_result1.events_dropped);
445 let push_result2 = retry_queue.push(data2).await.expect("should not fail to push data");
446 assert_eq!(0, push_result2.items_dropped);
447 assert_eq!(0, push_result2.events_dropped);
448
449 let flush_result = retry_queue.flush().await.expect("should not fail to flush");
451 assert_eq!(0, flush_result.items_dropped);
452 assert_eq!(0, flush_result.events_dropped);
453
454 assert_eq!(2, file_count_recursive(&root_path));
456 }
457}