saluki_core/topology/interconnect/
event_buffer.rs1use std::{collections::VecDeque, fmt};
2
3use crate::{
4 data_model::event::{Event, EventType},
5 topology::interconnect::{dispatcher::DispatchBuffer, Dispatchable},
6};
7
8#[derive(Clone)]
10pub struct FixedSizeEventBuffer<const N: usize> {
11 events: VecDeque<Event>,
12 seen_event_types: EventType,
13}
14
15impl<const N: usize> FixedSizeEventBuffer<N> {
16 pub fn capacity(&self) -> usize {
18 N
19 }
20
21 pub fn len(&self) -> usize {
23 self.events.len()
24 }
25
26 pub fn is_empty(&self) -> bool {
28 self.events.is_empty()
29 }
30
31 pub fn is_full(&self) -> bool {
33 self.len() == self.capacity()
34 }
35
36 pub fn has_event_type(&self, event_type: EventType) -> bool {
38 self.seen_event_types.contains(event_type)
39 }
40
41 pub fn clear(&mut self) {
43 self.events.clear();
44 self.seen_event_types = EventType::none();
45 }
46
47 pub fn try_push(&mut self, event: Event) -> Option<Event> {
51 if self.len() == self.capacity() {
52 return Some(event);
53 }
54
55 self.seen_event_types |= event.event_type();
56 self.events.push_back(event);
57 None
58 }
59
60 pub fn extract<F>(&mut self, predicate: F) -> impl Iterator<Item = Event>
62 where
63 F: Fn(&Event) -> bool,
64 {
65 let mut indices_to_remove = Vec::new();
66 let mut removed_events = VecDeque::new();
67 let mut seen_event_types = EventType::none();
68
69 for (pos, event) in self.events.iter_mut().enumerate() {
70 if predicate(event) {
71 indices_to_remove.push(pos);
72 } else {
73 seen_event_types |= event.event_type();
74 }
75 }
76
77 for &pos in indices_to_remove.iter().rev() {
79 if pos < self.events.len() {
80 removed_events.push_back(self.events.swap_remove_back(pos).unwrap());
81 }
82 }
83
84 self.seen_event_types = seen_event_types;
86
87 removed_events.into_iter()
88 }
89
90 pub fn remove_if<F>(&mut self, predicate: F)
92 where
93 F: Fn(&mut Event) -> bool,
94 {
95 let mut seen_event_types = EventType::none();
96
97 let mut i = 0;
98 let mut end = self.events.len();
99 while i < end {
100 if predicate(&mut self.events[i]) {
101 self.events.swap_remove_back(i);
102 end -= 1;
103 } else {
104 seen_event_types |= self.events[i].event_type();
105 i += 1;
106 }
107 }
108
109 self.seen_event_types = seen_event_types;
111 }
112}
113
114impl<const N: usize> Default for FixedSizeEventBuffer<N> {
115 fn default() -> Self {
116 Self {
117 events: VecDeque::with_capacity(N),
118 seen_event_types: EventType::none(),
119 }
120 }
121}
122
123impl<const N: usize> Dispatchable for FixedSizeEventBuffer<N> {
124 fn item_count(&self) -> usize {
125 self.len()
126 }
127}
128
129impl<const N: usize> DispatchBuffer for FixedSizeEventBuffer<N> {
130 type Item = Event;
131
132 fn len(&self) -> usize {
133 self.len()
134 }
135
136 fn is_full(&self) -> bool {
137 self.is_full()
138 }
139
140 fn try_push(&mut self, item: Self::Item) -> Option<Self::Item> {
141 self.try_push(item)
142 }
143}
144
145impl<const N: usize> fmt::Debug for FixedSizeEventBuffer<N> {
146 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
147 f.debug_struct("FixedSizeEventBuffer")
148 .field("cap", &N)
149 .field("len", &self.len())
150 .finish()
151 }
152}
153
154impl<const N: usize> IntoIterator for FixedSizeEventBuffer<N> {
155 type Item = Event;
156 type IntoIter = std::collections::vec_deque::IntoIter<Event>;
157
158 fn into_iter(self) -> Self::IntoIter {
159 self.events.into_iter()
160 }
161}
162
163impl<'a, const N: usize> IntoIterator for &'a FixedSizeEventBuffer<N> {
164 type Item = &'a Event;
165 type IntoIter = std::collections::vec_deque::Iter<'a, Event>;
166
167 fn into_iter(self) -> Self::IntoIter {
168 self.events.iter()
169 }
170}
171
172impl<'a, const N: usize> IntoIterator for &'a mut FixedSizeEventBuffer<N> {
173 type Item = &'a mut Event;
174 type IntoIter = std::collections::vec_deque::IterMut<'a, Event>;
175
176 fn into_iter(self) -> Self::IntoIter {
177 self.events.iter_mut()
178 }
179}
180
181#[derive(Default)]
191pub struct EventBufferManager<const N: usize> {
192 current: Option<FixedSizeEventBuffer<N>>,
193}
194
195impl<const N: usize> EventBufferManager<N> {
196 pub fn try_push(&mut self, event: Event) -> Option<FixedSizeEventBuffer<N>> {
201 let buffer = self.current.get_or_insert_default();
202
203 match buffer.try_push(event) {
204 Some(event) => {
205 let old_buffer = std::mem::take(buffer);
208
209 if buffer.try_push(event).is_some() {
210 panic!("New event buffer is unexpectedly full.")
211 }
212
213 Some(old_buffer)
214 }
215
216 None => None,
218 }
219 }
220
221 pub fn consume(&mut self) -> Option<FixedSizeEventBuffer<N>> {
223 self.current.take()
224 }
225}
226
227#[cfg(test)]
228mod tests {
229 use std::collections::VecDeque;
230
231 use super::*;
232 use crate::data_model::event::{
233 eventd::EventD,
234 metric::Metric,
235 service_check::{CheckStatus, ServiceCheck},
236 Event, EventType,
237 };
238
239 #[test]
240 fn capacity() {
241 let mut buffer = FixedSizeEventBuffer::<2>::default();
242 assert_eq!(buffer.capacity(), 2);
243
244 assert!(buffer.try_push(Event::Metric(Metric::counter("foo", 42.0))).is_none());
245 assert!(buffer.try_push(Event::Metric(Metric::counter("foo", 43.0))).is_none());
246 assert!(buffer.try_push(Event::Metric(Metric::counter("foo", 44.0))).is_some());
247 }
248
249 #[test]
250 fn clear() {
251 let mut buffer = FixedSizeEventBuffer::<2>::default();
253 assert!(buffer.is_empty());
254 assert!(!buffer.has_event_type(EventType::Metric));
255 assert!(!buffer.has_event_type(EventType::EventD));
256 assert!(!buffer.has_event_type(EventType::ServiceCheck));
257
258 assert!(buffer.try_push(Event::Metric(Metric::counter("foo", 42.0))).is_none());
260 assert!(!buffer.is_empty());
261 assert!(buffer.has_event_type(EventType::Metric));
262 assert!(!buffer.has_event_type(EventType::EventD));
263 assert!(!buffer.has_event_type(EventType::ServiceCheck));
264
265 buffer.clear();
267 assert!(buffer.is_empty());
268 assert!(!buffer.has_event_type(EventType::Metric));
269 assert!(!buffer.has_event_type(EventType::EventD));
270 assert!(!buffer.has_event_type(EventType::ServiceCheck));
271 }
272
273 #[test]
274 fn has_event_type() {
275 let mut buffer = FixedSizeEventBuffer::<2>::default();
276 assert!(!buffer.has_event_type(EventType::Metric));
277 assert!(!buffer.has_event_type(EventType::EventD));
278 assert!(!buffer.has_event_type(EventType::ServiceCheck));
279
280 assert!(buffer.try_push(Event::Metric(Metric::counter("foo", 42.0))).is_none());
281 assert!(buffer.has_event_type(EventType::Metric));
282 assert!(!buffer.has_event_type(EventType::EventD));
283 assert!(!buffer.has_event_type(EventType::ServiceCheck));
284
285 assert!(buffer.try_push(Event::EventD(EventD::new("title", "text"))).is_none());
286 assert!(buffer.has_event_type(EventType::Metric));
287 assert!(buffer.has_event_type(EventType::EventD));
288 assert!(!buffer.has_event_type(EventType::ServiceCheck));
289 }
290
291 #[test]
292 fn extract() {
293 let mut buffer = FixedSizeEventBuffer::<10>::default();
294 assert!(buffer.try_push(Event::Metric(Metric::counter("foo", 42.0))).is_none());
295 assert!(buffer.try_push(Event::Metric(Metric::counter("foo", 43.0))).is_none());
296 assert!(buffer.try_push(Event::EventD(EventD::new("foo1", "bar1"))).is_none());
297 assert!(buffer.try_push(Event::EventD(EventD::new("foo2", "bar2"))).is_none());
298 assert!(buffer.try_push(Event::EventD(EventD::new("foo3", "bar3"))).is_none());
299 assert!(buffer
300 .try_push(Event::ServiceCheck(ServiceCheck::new("foo4", CheckStatus::Ok)))
301 .is_none());
302 assert!(buffer
303 .try_push(Event::ServiceCheck(ServiceCheck::new("foo5", CheckStatus::Ok)))
304 .is_none());
305
306 assert_eq!(buffer.len(), 7);
307 assert!(buffer.has_event_type(EventType::Metric));
308 assert!(buffer.has_event_type(EventType::EventD));
309 assert!(buffer.has_event_type(EventType::ServiceCheck));
310
311 let eventd_event_buffer: VecDeque<Event> = buffer.extract(Event::is_eventd).collect();
312 assert_eq!(buffer.len(), 4);
313 assert_eq!(eventd_event_buffer.len(), 3);
314 assert!(buffer.has_event_type(EventType::Metric));
315 assert!(!buffer.has_event_type(EventType::EventD));
316 assert!(buffer.has_event_type(EventType::ServiceCheck));
317
318 let service_checks_event_buffer: VecDeque<Event> = buffer.extract(Event::is_service_check).collect();
319 assert_eq!(buffer.len(), 2);
320 assert_eq!(service_checks_event_buffer.len(), 2);
321 assert!(buffer.has_event_type(EventType::Metric));
322 assert!(!buffer.has_event_type(EventType::EventD));
323 assert!(!buffer.has_event_type(EventType::ServiceCheck));
324
325 let new_buffer: VecDeque<Event> = buffer.extract(Event::is_metric).collect();
326 assert_eq!(buffer.len(), 0);
327 assert_eq!(new_buffer.len(), 2);
328 assert!(!buffer.has_event_type(EventType::Metric));
329 assert!(!buffer.has_event_type(EventType::EventD));
330 assert!(!buffer.has_event_type(EventType::ServiceCheck));
331 }
332
333 #[test]
334 fn remove_if() {
335 let event1 = Event::Metric(Metric::counter("foo", 42.0));
336 let event2 = Event::EventD(EventD::new("foo1", "bar1"));
337 let event3 = Event::EventD(EventD::new("foo2", "bar2"));
338 let event4 = Event::ServiceCheck(ServiceCheck::new("foo5", CheckStatus::Ok));
339
340 let mut buffer = FixedSizeEventBuffer::<10>::default();
341
342 assert!(buffer.try_push(event1.clone()).is_none());
344 assert!(buffer.try_push(event2.clone()).is_none());
345 assert!(buffer.try_push(event3.clone()).is_none());
346 assert!(buffer.try_push(event4.clone()).is_none());
347 assert_eq!(buffer.len(), 4);
348 assert!(buffer.has_event_type(EventType::Metric));
349 assert!(buffer.has_event_type(EventType::EventD));
350 assert!(buffer.has_event_type(EventType::ServiceCheck));
351
352 buffer.remove_if(|event| event.is_eventd());
354 assert_eq!(buffer.len(), 2);
355 assert!(buffer.has_event_type(EventType::Metric));
356 assert!(!buffer.has_event_type(EventType::EventD));
357 assert!(buffer.has_event_type(EventType::ServiceCheck));
358
359 let mut remaining_events = buffer.into_iter().collect::<Vec<_>>();
361
362 let remaining_event1 = remaining_events.remove(0);
363 assert_eq!(remaining_event1, event1);
364
365 let remaining_event2 = remaining_events.remove(0);
366 assert_eq!(remaining_event2, event4);
367 }
368
369 #[test]
370 fn event_buffer_manager_try_push_not_full() {
371 let mut manager = EventBufferManager::<4>::default();
372
373 let event1 = Event::Metric(Metric::counter("foo", 42.0));
374 let event2 = Event::EventD(EventD::new("foo1", "bar1"));
375 let event3 = Event::EventD(EventD::new("foo2", "bar2"));
376 let event4 = Event::ServiceCheck(ServiceCheck::new("foo5", CheckStatus::Ok));
377
378 assert!(manager.try_push(event1.clone()).is_none());
380 assert!(manager.try_push(event2.clone()).is_none());
381 assert!(manager.try_push(event3.clone()).is_none());
382 assert!(manager.try_push(event4.clone()).is_none());
383
384 let mut buffered_events = manager.consume().expect("manager should have buffer").into_iter();
386
387 assert_eq!(Some(event1), buffered_events.next());
388 assert_eq!(Some(event2), buffered_events.next());
389 assert_eq!(Some(event3), buffered_events.next());
390 assert_eq!(Some(event4), buffered_events.next());
391 assert_eq!(None, buffered_events.next());
392 }
393
394 #[test]
395 fn event_buffer_manager_try_push_full() {
396 let mut manager = EventBufferManager::<3>::default();
397
398 let event1 = Event::Metric(Metric::counter("foo", 42.0));
399 let event2 = Event::EventD(EventD::new("foo1", "bar1"));
400 let event3 = Event::EventD(EventD::new("foo2", "bar2"));
401 let event4 = Event::ServiceCheck(ServiceCheck::new("foo5", CheckStatus::Ok));
402
403 assert!(manager.try_push(event1.clone()).is_none());
407 assert!(manager.try_push(event2.clone()).is_none());
408 assert!(manager.try_push(event3.clone()).is_none());
409
410 let first_buffer = manager.try_push(event4.clone());
411 assert!(first_buffer.is_some());
412
413 let mut buffered_events1 = first_buffer.unwrap().into_iter();
414 assert_eq!(Some(event1), buffered_events1.next());
415 assert_eq!(Some(event2), buffered_events1.next());
416 assert_eq!(Some(event3), buffered_events1.next());
417 assert_eq!(None, buffered_events1.next());
418
419 let mut buffered_events2 = manager
421 .consume()
422 .expect("manager should have second buffer")
423 .into_iter();
424 assert_eq!(Some(event4), buffered_events2.next());
425 assert_eq!(None, buffered_events2.next());
426 }
427}