saluki_io/net/util/retry/queue/
mod.rs1use std::{collections::VecDeque, path::PathBuf, sync::Arc};
2
3use saluki_error::{generic_error, GenericError};
4use serde::{de::DeserializeOwned, Serialize};
5use tracing::debug;
6
7mod persisted;
8pub use self::persisted::DiskUsageRetrieverImpl;
9use self::persisted::{DiskUsageRetriever, DiskUsageRetrieverWrapper, PersistedQueue};
10
11pub trait EventContainer {
17 fn event_count(&self) -> u64;
19}
20
21pub trait Retryable: EventContainer + DeserializeOwned + Serialize {
23 fn size_bytes(&self) -> u64;
25}
26
27impl EventContainer for String {
28 fn event_count(&self) -> u64 {
29 1
30 }
31}
32
33impl Retryable for String {
34 fn size_bytes(&self) -> u64 {
35 self.len() as u64
36 }
37}
38
39#[derive(Default)]
44#[must_use = "`PushResult` carries information about potentially dropped items/events and should not be ignored"]
45pub struct PushResult {
46 pub items_dropped: u64,
48
49 pub events_dropped: u64,
51}
52
53impl PushResult {
54 pub fn had_drops(&self) -> bool {
56 self.items_dropped > 0
57 }
58
59 pub fn merge(&mut self, other: Self) {
61 self.items_dropped += other.items_dropped;
62 self.events_dropped += other.events_dropped;
63 }
64
65 pub fn track_dropped_item(&mut self, event_count: u64) {
67 self.items_dropped += 1;
68 self.events_dropped += event_count;
69 }
70}
71
72pub struct RetryQueue<T> {
74 queue_name: String,
75 pending: VecDeque<T>,
76 persisted_pending: Option<PersistedQueue<T>>,
77 total_in_memory_bytes: u64,
78 max_in_memory_bytes: u64,
79}
80
81impl<T> RetryQueue<T>
82where
83 T: Retryable,
84{
85 pub fn new(queue_name: String, max_in_memory_bytes: u64) -> Self {
91 Self {
92 queue_name,
93 pending: VecDeque::new(),
94 persisted_pending: None,
95 total_in_memory_bytes: 0,
96 max_in_memory_bytes,
97 }
98 }
99
100 pub async fn with_disk_persistence(
116 mut self, root_path: PathBuf, max_disk_size_bytes: u64, storage_max_disk_ratio: f64,
117 disk_usage_retriever: Arc<dyn DiskUsageRetriever + Send + Sync>,
118 ) -> Result<Self, GenericError> {
119 if root_path.as_os_str().is_empty() {
122 return Err(generic_error!("Storage path cannot be empty."));
123 }
124
125 let queue_root_path = root_path.join(&self.queue_name);
126 let persisted_pending = PersistedQueue::from_root_path(
127 queue_root_path,
128 max_disk_size_bytes,
129 storage_max_disk_ratio,
130 DiskUsageRetrieverWrapper::new(disk_usage_retriever),
131 )
132 .await?;
133 self.persisted_pending = Some(persisted_pending);
134 Ok(self)
135 }
136
137 pub fn is_empty(&self) -> bool {
141 self.pending.is_empty() && self.persisted_pending.as_ref().is_none_or(|p| p.is_empty())
142 }
143
144 pub fn len(&self) -> usize {
148 self.pending.len() + self.persisted_pending.as_ref().map_or(0, |p| p.len())
149 }
150
151 pub async fn push(&mut self, entry: T) -> Result<PushResult, GenericError> {
162 let mut push_result = PushResult::default();
163
164 let current_entry_size = entry.size_bytes();
166 if current_entry_size > self.max_in_memory_bytes {
167 return Err(generic_error!(
168 "Entry too large to fit into retry queue. ({} > {})",
169 current_entry_size,
170 self.max_in_memory_bytes
171 ));
172 }
173
174 while !self.pending.is_empty() && self.total_in_memory_bytes + current_entry_size > self.max_in_memory_bytes {
177 let oldest_entry = self.pending.pop_front().expect("queue is not empty");
178 let oldest_entry_size = oldest_entry.size_bytes();
179
180 if let Some(persisted_pending) = &mut self.persisted_pending {
181 let persist_result = persisted_pending.push(oldest_entry).await?;
182 push_result.merge(persist_result);
183
184 debug!(entry.len = oldest_entry_size, "Moved in-memory entry to disk.");
185 } else {
186 debug!(
187 entry.len = oldest_entry_size,
188 "Dropped in-memory entry to increase available capacity."
189 );
190
191 push_result.track_dropped_item(oldest_entry.event_count());
192 }
193
194 self.total_in_memory_bytes -= oldest_entry_size;
195 }
196
197 self.pending.push_back(entry);
198 self.total_in_memory_bytes += current_entry_size;
199 debug!(entry.len = current_entry_size, "Enqueued in-memory entry.");
200
201 Ok(push_result)
202 }
203
204 pub async fn pop(&mut self) -> Result<Option<T>, GenericError> {
215 if let Some(entry) = self.pending.pop_front() {
217 self.total_in_memory_bytes -= entry.size_bytes();
218 debug!(entry.len = entry.size_bytes(), "Dequeued in-memory entry.");
219
220 return Ok(Some(entry));
221 }
222
223 if let Some(persisted_pending) = &mut self.persisted_pending {
225 if let Some(entry) = persisted_pending.pop().await? {
226 return Ok(Some(entry));
227 }
228 }
229
230 Ok(None)
231 }
232
233 pub async fn flush(mut self) -> Result<PushResult, GenericError> {
243 let mut push_result = PushResult::default();
244
245 while let Some(entry) = self.pending.pop_front() {
246 let entry_size = entry.size_bytes();
247
248 if let Some(persisted_pending) = &mut self.persisted_pending {
249 let persist_result = persisted_pending.push(entry).await?;
250 push_result.merge(persist_result);
251
252 debug!(entry.len = entry_size, "Flushed in-memory entry to disk.");
253 } else {
254 debug!(entry.len = entry_size, "Dropped in-memory entry during flush.");
255
256 push_result.track_dropped_item(entry.event_count());
257 }
258 }
259
260 Ok(push_result)
261 }
262}
263
264#[cfg(test)]
265mod tests {
266 use std::path::Path;
267
268 use rand::Rng as _;
269 use rand_distr::Alphanumeric;
270 use serde::Deserialize;
271
272 use super::*;
273
274 #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
275 struct FakeData {
276 name: String,
277 value: u32,
278 }
279
280 impl FakeData {
281 fn random() -> Self {
282 Self {
283 name: rand::rng().sample_iter(&Alphanumeric).take(8).map(char::from).collect(),
284 value: rand::rng().random_range(0..100),
285 }
286 }
287 }
288
289 impl EventContainer for FakeData {
290 fn event_count(&self) -> u64 {
291 1
292 }
293 }
294
295 impl Retryable for FakeData {
296 fn size_bytes(&self) -> u64 {
297 (self.name.len() + std::mem::size_of::<String>() + 4) as u64
298 }
299 }
300
301 fn file_count_recursive<P: AsRef<Path>>(path: P) -> u64 {
302 let mut count = 0;
303 let entries = std::fs::read_dir(path).expect("should not fail to read directory");
304 for maybe_entry in entries {
305 let entry = maybe_entry.expect("should not fail to read directory entry");
306 if entry.file_type().expect("should not fail to get file type").is_file() {
307 count += 1;
308 } else if entry.file_type().expect("should not fail to get file type").is_dir() {
309 count += file_count_recursive(entry.path());
310 }
311 }
312 count
313 }
314
315 #[tokio::test]
316 async fn basic_push_pop() {
317 let data = FakeData::random();
318
319 let mut retry_queue = RetryQueue::<FakeData>::new("test".to_string(), 1024);
320
321 let push_result = retry_queue
323 .push(data.clone())
324 .await
325 .expect("should not fail to push data");
326 assert_eq!(0, push_result.items_dropped);
327 assert_eq!(0, push_result.events_dropped);
328
329 let actual = retry_queue
331 .pop()
332 .await
333 .expect("should not fail to pop data")
334 .expect("should not be empty");
335 assert_eq!(data, actual);
336 }
337
338 #[tokio::test]
339 async fn entry_too_large() {
340 let data = FakeData::random();
341
342 let mut retry_queue = RetryQueue::<FakeData>::new("test".to_string(), 1);
343
344 assert!(retry_queue.push(data).await.is_err());
346 }
347
348 #[tokio::test]
349 async fn remove_oldest_entry_on_push() {
350 let data1 = FakeData::random();
351 let data2 = FakeData::random();
352
353 let mut retry_queue = RetryQueue::<FakeData>::new("test".to_string(), 36);
355
356 let push_result = retry_queue.push(data1).await.expect("should not fail to push data");
358 assert_eq!(0, push_result.items_dropped);
359 assert_eq!(0, push_result.events_dropped);
360
361 let push_result = retry_queue
363 .push(data2.clone())
364 .await
365 .expect("should not fail to push data");
366 assert_eq!(1, push_result.items_dropped);
367 assert_eq!(1, push_result.events_dropped);
368
369 let actual = retry_queue
372 .pop()
373 .await
374 .expect("should not fail to pop data")
375 .expect("should not be empty");
376 assert_eq!(data2, actual);
377 }
378
379 #[tokio::test]
380 async fn flush_no_disk() {
381 let data1 = FakeData::random();
382 let data2 = FakeData::random();
383
384 let mut retry_queue = RetryQueue::<FakeData>::new("test".to_string(), u64::MAX);
386
387 let push_result1 = retry_queue.push(data1).await.expect("should not fail to push data");
389 assert_eq!(0, push_result1.items_dropped);
390 assert_eq!(0, push_result1.events_dropped);
391 let push_result2 = retry_queue.push(data2).await.expect("should not fail to push data");
392 assert_eq!(0, push_result2.items_dropped);
393 assert_eq!(0, push_result2.events_dropped);
394
395 let flush_result = retry_queue.flush().await.expect("should not fail to flush");
397 assert_eq!(2, flush_result.items_dropped);
398 assert_eq!(2, flush_result.events_dropped);
399 }
400
401 #[tokio::test]
402 async fn flush_disk() {
403 let data1 = FakeData::random();
404 let data2 = FakeData::random();
405
406 let temp_dir = tempfile::tempdir().expect("should not fail to create temporary directory");
408 let root_path = temp_dir.path().to_path_buf();
409
410 assert_eq!(0, file_count_recursive(&root_path));
412
413 let mut retry_queue = RetryQueue::<FakeData>::new("test".to_string(), u64::MAX)
414 .with_disk_persistence(
415 root_path.clone(),
416 u64::MAX,
417 1.0,
418 Arc::new(DiskUsageRetrieverImpl::new(root_path.clone())),
419 )
420 .await
421 .expect("should not fail to create retry queue with disk persistence");
422
423 let push_result1 = retry_queue.push(data1).await.expect("should not fail to push data");
425 assert_eq!(0, push_result1.items_dropped);
426 assert_eq!(0, push_result1.events_dropped);
427 let push_result2 = retry_queue.push(data2).await.expect("should not fail to push data");
428 assert_eq!(0, push_result2.items_dropped);
429 assert_eq!(0, push_result2.events_dropped);
430
431 let flush_result = retry_queue.flush().await.expect("should not fail to flush");
433 assert_eq!(0, flush_result.items_dropped);
434 assert_eq!(0, flush_result.events_dropped);
435
436 assert_eq!(2, file_count_recursive(&root_path));
438 }
439}