saluki_io/net/util/retry/queue/
mod.rs1use std::{collections::VecDeque, path::PathBuf, sync::Arc};
2
3use saluki_error::{generic_error, GenericError};
4use serde::{de::DeserializeOwned, Serialize};
5use tracing::debug;
6
7mod persisted;
8pub use self::persisted::DiskUsageRetrieverImpl;
9use self::persisted::{DiskUsageRetriever, DiskUsageRetrieverWrapper, PersistedQueue};
10
11pub trait EventContainer {
17 fn event_count(&self) -> u64;
19}
20
21pub trait Retryable: EventContainer + DeserializeOwned + Serialize {
23 fn size_bytes(&self) -> u64;
25}
26
27impl EventContainer for String {
28 fn event_count(&self) -> u64 {
29 1
30 }
31}
32
33impl Retryable for String {
34 fn size_bytes(&self) -> u64 {
35 self.len() as u64
36 }
37}
38
39#[derive(Default)]
44#[must_use = "`PushResult` carries information about potentially dropped items/events and should not be ignored"]
45pub struct PushResult {
46 pub items_dropped: u64,
48
49 pub events_dropped: u64,
51}
52
53impl PushResult {
54 pub fn had_drops(&self) -> bool {
56 self.items_dropped > 0
57 }
58
59 pub fn merge(&mut self, other: Self) {
61 self.items_dropped += other.items_dropped;
62 self.events_dropped += other.events_dropped;
63 }
64
65 pub fn track_dropped_item(&mut self, event_count: u64) {
67 self.items_dropped += 1;
68 self.events_dropped += event_count;
69 }
70}
71
72pub struct RetryQueue<T> {
74 queue_name: String,
75 pending: VecDeque<T>,
76 persisted_pending: Option<PersistedQueue<T>>,
77 total_in_memory_bytes: u64,
78 max_in_memory_bytes: u64,
79}
80
81impl<T> RetryQueue<T>
82where
83 T: Retryable,
84{
85 pub fn new(queue_name: String, max_in_memory_bytes: u64) -> Self {
91 Self {
92 queue_name,
93 pending: VecDeque::new(),
94 persisted_pending: None,
95 total_in_memory_bytes: 0,
96 max_in_memory_bytes,
97 }
98 }
99
100 pub async fn with_disk_persistence(
116 mut self, root_path: PathBuf, max_disk_size_bytes: u64, storage_max_disk_ratio: f64,
117 disk_usage_retriever: Arc<dyn DiskUsageRetriever + Send + Sync>,
118 ) -> Result<Self, GenericError> {
119 let queue_root_path = root_path.join(&self.queue_name);
120 let persisted_pending = PersistedQueue::from_root_path(
121 queue_root_path,
122 max_disk_size_bytes,
123 storage_max_disk_ratio,
124 DiskUsageRetrieverWrapper::new(disk_usage_retriever),
125 )
126 .await?;
127 self.persisted_pending = Some(persisted_pending);
128 Ok(self)
129 }
130
131 pub fn is_empty(&self) -> bool {
135 self.pending.is_empty() && self.persisted_pending.as_ref().is_none_or(|p| p.is_empty())
136 }
137
138 pub fn len(&self) -> usize {
142 self.pending.len() + self.persisted_pending.as_ref().map_or(0, |p| p.len())
143 }
144
145 pub async fn push(&mut self, entry: T) -> Result<PushResult, GenericError> {
156 let mut push_result = PushResult::default();
157
158 let current_entry_size = entry.size_bytes();
160 if current_entry_size > self.max_in_memory_bytes {
161 return Err(generic_error!(
162 "Entry too large to fit into retry queue. ({} > {})",
163 current_entry_size,
164 self.max_in_memory_bytes
165 ));
166 }
167
168 while !self.pending.is_empty() && self.total_in_memory_bytes + current_entry_size > self.max_in_memory_bytes {
171 let oldest_entry = self.pending.pop_front().expect("queue is not empty");
172 let oldest_entry_size = oldest_entry.size_bytes();
173
174 if let Some(persisted_pending) = &mut self.persisted_pending {
175 let persist_result = persisted_pending.push(oldest_entry).await?;
176 push_result.merge(persist_result);
177
178 debug!(entry.len = oldest_entry_size, "Moved in-memory entry to disk.");
179 } else {
180 debug!(
181 entry.len = oldest_entry_size,
182 "Dropped in-memory entry to increase available capacity."
183 );
184
185 push_result.track_dropped_item(oldest_entry.event_count());
186 }
187
188 self.total_in_memory_bytes -= oldest_entry_size;
189 }
190
191 self.pending.push_back(entry);
192 self.total_in_memory_bytes += current_entry_size;
193 debug!(entry.len = current_entry_size, "Enqueued in-memory entry.");
194
195 Ok(push_result)
196 }
197
198 pub async fn pop(&mut self) -> Result<Option<T>, GenericError> {
209 if let Some(entry) = self.pending.pop_front() {
211 self.total_in_memory_bytes -= entry.size_bytes();
212 debug!(entry.len = entry.size_bytes(), "Dequeued in-memory entry.");
213
214 return Ok(Some(entry));
215 }
216
217 if let Some(persisted_pending) = &mut self.persisted_pending {
219 if let Some(entry) = persisted_pending.pop().await? {
220 return Ok(Some(entry));
221 }
222 }
223
224 Ok(None)
225 }
226
227 pub async fn flush(mut self) -> Result<PushResult, GenericError> {
237 let mut push_result = PushResult::default();
238
239 while let Some(entry) = self.pending.pop_front() {
240 let entry_size = entry.size_bytes();
241
242 if let Some(persisted_pending) = &mut self.persisted_pending {
243 let persist_result = persisted_pending.push(entry).await?;
244 push_result.merge(persist_result);
245
246 debug!(entry.len = entry_size, "Flushed in-memory entry to disk.");
247 } else {
248 debug!(entry.len = entry_size, "Dropped in-memory entry during flush.");
249
250 push_result.track_dropped_item(entry.event_count());
251 }
252 }
253
254 Ok(push_result)
255 }
256}
257
258#[cfg(test)]
259mod tests {
260 use std::path::Path;
261
262 use rand::Rng as _;
263 use rand_distr::Alphanumeric;
264 use serde::Deserialize;
265
266 use super::*;
267
268 #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
269 struct FakeData {
270 name: String,
271 value: u32,
272 }
273
274 impl FakeData {
275 fn random() -> Self {
276 Self {
277 name: rand::rng().sample_iter(&Alphanumeric).take(8).map(char::from).collect(),
278 value: rand::rng().random_range(0..100),
279 }
280 }
281 }
282
283 impl EventContainer for FakeData {
284 fn event_count(&self) -> u64 {
285 1
286 }
287 }
288
289 impl Retryable for FakeData {
290 fn size_bytes(&self) -> u64 {
291 (self.name.len() + std::mem::size_of::<String>() + 4) as u64
292 }
293 }
294
295 fn file_count_recursive<P: AsRef<Path>>(path: P) -> u64 {
296 let mut count = 0;
297 let entries = std::fs::read_dir(path).expect("should not fail to read directory");
298 for maybe_entry in entries {
299 let entry = maybe_entry.expect("should not fail to read directory entry");
300 if entry.file_type().expect("should not fail to get file type").is_file() {
301 count += 1;
302 } else if entry.file_type().expect("should not fail to get file type").is_dir() {
303 count += file_count_recursive(entry.path());
304 }
305 }
306 count
307 }
308
309 #[tokio::test]
310 async fn basic_push_pop() {
311 let data = FakeData::random();
312
313 let mut retry_queue = RetryQueue::<FakeData>::new("test".to_string(), 1024);
314
315 let push_result = retry_queue
317 .push(data.clone())
318 .await
319 .expect("should not fail to push data");
320 assert_eq!(0, push_result.items_dropped);
321 assert_eq!(0, push_result.events_dropped);
322
323 let actual = retry_queue
325 .pop()
326 .await
327 .expect("should not fail to pop data")
328 .expect("should not be empty");
329 assert_eq!(data, actual);
330 }
331
332 #[tokio::test]
333 async fn entry_too_large() {
334 let data = FakeData::random();
335
336 let mut retry_queue = RetryQueue::<FakeData>::new("test".to_string(), 1);
337
338 assert!(retry_queue.push(data).await.is_err());
340 }
341
342 #[tokio::test]
343 async fn remove_oldest_entry_on_push() {
344 let data1 = FakeData::random();
345 let data2 = FakeData::random();
346
347 let mut retry_queue = RetryQueue::<FakeData>::new("test".to_string(), 36);
349
350 let push_result = retry_queue.push(data1).await.expect("should not fail to push data");
352 assert_eq!(0, push_result.items_dropped);
353 assert_eq!(0, push_result.events_dropped);
354
355 let push_result = retry_queue
357 .push(data2.clone())
358 .await
359 .expect("should not fail to push data");
360 assert_eq!(1, push_result.items_dropped);
361 assert_eq!(1, push_result.events_dropped);
362
363 let actual = retry_queue
366 .pop()
367 .await
368 .expect("should not fail to pop data")
369 .expect("should not be empty");
370 assert_eq!(data2, actual);
371 }
372
373 #[tokio::test]
374 async fn flush_no_disk() {
375 let data1 = FakeData::random();
376 let data2 = FakeData::random();
377
378 let mut retry_queue = RetryQueue::<FakeData>::new("test".to_string(), u64::MAX);
380
381 let push_result1 = retry_queue.push(data1).await.expect("should not fail to push data");
383 assert_eq!(0, push_result1.items_dropped);
384 assert_eq!(0, push_result1.events_dropped);
385 let push_result2 = retry_queue.push(data2).await.expect("should not fail to push data");
386 assert_eq!(0, push_result2.items_dropped);
387 assert_eq!(0, push_result2.events_dropped);
388
389 let flush_result = retry_queue.flush().await.expect("should not fail to flush");
391 assert_eq!(2, flush_result.items_dropped);
392 assert_eq!(2, flush_result.events_dropped);
393 }
394
395 #[tokio::test]
396 async fn flush_disk() {
397 let data1 = FakeData::random();
398 let data2 = FakeData::random();
399
400 let temp_dir = tempfile::tempdir().expect("should not fail to create temporary directory");
402 let root_path = temp_dir.path().to_path_buf();
403
404 assert_eq!(0, file_count_recursive(&root_path));
406
407 let mut retry_queue = RetryQueue::<FakeData>::new("test".to_string(), u64::MAX)
408 .with_disk_persistence(
409 root_path.clone(),
410 u64::MAX,
411 1.0,
412 Arc::new(DiskUsageRetrieverImpl::new(root_path.clone())),
413 )
414 .await
415 .expect("should not fail to create retry queue with disk persistence");
416
417 let push_result1 = retry_queue.push(data1).await.expect("should not fail to push data");
419 assert_eq!(0, push_result1.items_dropped);
420 assert_eq!(0, push_result1.events_dropped);
421 let push_result2 = retry_queue.push(data2).await.expect("should not fail to push data");
422 assert_eq!(0, push_result2.items_dropped);
423 assert_eq!(0, push_result2.events_dropped);
424
425 let flush_result = retry_queue.flush().await.expect("should not fail to flush");
427 assert_eq!(0, flush_result.items_dropped);
428 assert_eq!(0, flush_result.events_dropped);
429
430 assert_eq!(2, file_count_recursive(&root_path));
432 }
433}