saluki_core/topology/interconnect/
event_buffer.rs

1use std::{collections::VecDeque, fmt};
2
3use crate::{
4    data_model::event::{Event, EventType},
5    topology::interconnect::{dispatcher::DispatchBuffer, Dispatchable},
6};
7
8/// A fixed-size event buffer.
9#[derive(Clone)]
10pub struct FixedSizeEventBuffer<const N: usize> {
11    events: VecDeque<Event>,
12    seen_event_types: EventType,
13}
14
15impl<const N: usize> FixedSizeEventBuffer<N> {
16    /// Returns the total number of events the event buffer can hold.
17    pub fn capacity(&self) -> usize {
18        N
19    }
20
21    /// Returns the number of events in the event buffer.
22    pub fn len(&self) -> usize {
23        self.events.len()
24    }
25
26    /// Returns `true` if the event buffer contains no events.
27    pub fn is_empty(&self) -> bool {
28        self.events.is_empty()
29    }
30
31    /// Returns `true` if the event buffer has no remaining capacity.
32    pub fn is_full(&self) -> bool {
33        self.len() == self.capacity()
34    }
35
36    /// Returns `true` if this event buffer contains one or more events of the given event type.
37    pub fn has_event_type(&self, event_type: EventType) -> bool {
38        self.seen_event_types.contains(event_type)
39    }
40
41    /// Clears the event buffer, removing all events and resetting the seen event types.
42    pub fn clear(&mut self) {
43        self.events.clear();
44        self.seen_event_types = EventType::none();
45    }
46
47    /// Attempts to append an event to the back of the event buffer.
48    ///
49    /// If the event buffer is full, `Some` is returned with the original event.
50    pub fn try_push(&mut self, event: Event) -> Option<Event> {
51        if self.len() == self.capacity() {
52            return Some(event);
53        }
54
55        self.seen_event_types |= event.event_type();
56        self.events.push_back(event);
57        None
58    }
59
60    /// Extract events from the event buffer given a predicate function.
61    pub fn extract<F>(&mut self, predicate: F) -> impl Iterator<Item = Event>
62    where
63        F: Fn(&Event) -> bool,
64    {
65        let mut indices_to_remove = Vec::new();
66        let mut removed_events = VecDeque::new();
67        let mut seen_event_types = EventType::none();
68
69        for (pos, event) in self.events.iter_mut().enumerate() {
70            if predicate(event) {
71                indices_to_remove.push(pos);
72            } else {
73                seen_event_types |= event.event_type();
74            }
75        }
76
77        // Remove elements from back to front to avoid index shifting issues
78        for &pos in indices_to_remove.iter().rev() {
79            if pos < self.events.len() {
80                removed_events.push_back(self.events.swap_remove_back(pos).unwrap());
81            }
82        }
83
84        // Update the seen event types to reflect what's left over.
85        self.seen_event_types = seen_event_types;
86
87        removed_events.into_iter()
88    }
89
90    /// Removes events from the event buffer if they match the given predicate.
91    pub fn remove_if<F>(&mut self, predicate: F)
92    where
93        F: Fn(&mut Event) -> bool,
94    {
95        let mut seen_event_types = EventType::none();
96
97        let mut i = 0;
98        let mut end = self.events.len();
99        while i < end {
100            if predicate(&mut self.events[i]) {
101                self.events.swap_remove_back(i);
102                end -= 1;
103            } else {
104                seen_event_types |= self.events[i].event_type();
105                i += 1;
106            }
107        }
108
109        // Update the seen event types to reflect what's left over.
110        self.seen_event_types = seen_event_types;
111    }
112}
113
114impl<const N: usize> Default for FixedSizeEventBuffer<N> {
115    fn default() -> Self {
116        Self {
117            events: VecDeque::with_capacity(N),
118            seen_event_types: EventType::none(),
119        }
120    }
121}
122
123impl<const N: usize> Dispatchable for FixedSizeEventBuffer<N> {
124    fn item_count(&self) -> usize {
125        self.len()
126    }
127}
128
129impl<const N: usize> DispatchBuffer for FixedSizeEventBuffer<N> {
130    type Item = Event;
131
132    fn len(&self) -> usize {
133        self.len()
134    }
135
136    fn is_full(&self) -> bool {
137        self.is_full()
138    }
139
140    fn try_push(&mut self, item: Self::Item) -> Option<Self::Item> {
141        self.try_push(item)
142    }
143}
144
145impl<const N: usize> fmt::Debug for FixedSizeEventBuffer<N> {
146    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
147        f.debug_struct("FixedSizeEventBuffer")
148            .field("cap", &N)
149            .field("len", &self.len())
150            .finish()
151    }
152}
153
154impl<const N: usize> IntoIterator for FixedSizeEventBuffer<N> {
155    type Item = Event;
156    type IntoIter = std::collections::vec_deque::IntoIter<Event>;
157
158    fn into_iter(self) -> Self::IntoIter {
159        self.events.into_iter()
160    }
161}
162
163impl<'a, const N: usize> IntoIterator for &'a FixedSizeEventBuffer<N> {
164    type Item = &'a Event;
165    type IntoIter = std::collections::vec_deque::Iter<'a, Event>;
166
167    fn into_iter(self) -> Self::IntoIter {
168        self.events.iter()
169    }
170}
171
172impl<'a, const N: usize> IntoIterator for &'a mut FixedSizeEventBuffer<N> {
173    type Item = &'a mut Event;
174    type IntoIter = std::collections::vec_deque::IterMut<'a, Event>;
175
176    fn into_iter(self) -> Self::IntoIter {
177        self.events.iter_mut()
178    }
179}
180
181/// An ergonomic wrapper over fallibly writing to fixed-size event buffers.
182///
183/// As `FixedSizeEventBuffer` has a fixed capacity, callers have to handle the scenario where they attempt to push an
184/// event into the event buffer but the buffer has no more capacity. This generally involves having to swap it with a
185/// new buffer, as well as holding the event around until they acquire the new buffer.
186///
187/// `EventBufferManager` provides a simple, ergonomic wrapper over a basic pattern of treating the current buffer as an
188/// optional value, and handling the logic of ensuring we have a buffer to write into only when actually attempting a
189/// write, rather than always holding on to one.
190#[derive(Default)]
191pub struct EventBufferManager<const N: usize> {
192    current: Option<FixedSizeEventBuffer<N>>,
193}
194
195impl<const N: usize> EventBufferManager<N> {
196    /// Attempts to push an event into the current event buffer.
197    ///
198    /// If the event buffer is full, it is replaced with a new event buffer before pushing the event, and `Some(buffer)`
199    /// is returned containing the old event buffer. Otherwise, `None` is returned.
200    pub fn try_push(&mut self, event: Event) -> Option<FixedSizeEventBuffer<N>> {
201        let buffer = self.current.get_or_insert_default();
202
203        match buffer.try_push(event) {
204            Some(event) => {
205                // Our current buffer is full, so replace it with a new buffer before trying to write the event
206                // into it again, and return the old buffer to the caller.
207                let old_buffer = std::mem::take(buffer);
208
209                if buffer.try_push(event).is_some() {
210                    panic!("New event buffer is unexpectedly full.")
211                }
212
213                Some(old_buffer)
214            }
215
216            // We were able to push the event into the current buffer, so we're done.
217            None => None,
218        }
219    }
220
221    /// Consumes the current event buffer, if one exists.
222    pub fn consume(&mut self) -> Option<FixedSizeEventBuffer<N>> {
223        self.current.take()
224    }
225}
226
227#[cfg(test)]
228mod tests {
229    use std::collections::VecDeque;
230
231    use super::*;
232    use crate::data_model::event::{
233        eventd::EventD,
234        metric::Metric,
235        service_check::{CheckStatus, ServiceCheck},
236        Event, EventType,
237    };
238
239    #[test]
240    fn capacity() {
241        let mut buffer = FixedSizeEventBuffer::<2>::default();
242        assert_eq!(buffer.capacity(), 2);
243
244        assert!(buffer.try_push(Event::Metric(Metric::counter("foo", 42.0))).is_none());
245        assert!(buffer.try_push(Event::Metric(Metric::counter("foo", 43.0))).is_none());
246        assert!(buffer.try_push(Event::Metric(Metric::counter("foo", 44.0))).is_some());
247    }
248
249    #[test]
250    fn clear() {
251        // Create an empty event buffer and assert that it's empty and has no seen data types:
252        let mut buffer = FixedSizeEventBuffer::<2>::default();
253        assert!(buffer.is_empty());
254        assert!(!buffer.has_event_type(EventType::Metric));
255        assert!(!buffer.has_event_type(EventType::EventD));
256        assert!(!buffer.has_event_type(EventType::ServiceCheck));
257
258        // Now write a metric, and make sure that's reflected:
259        assert!(buffer.try_push(Event::Metric(Metric::counter("foo", 42.0))).is_none());
260        assert!(!buffer.is_empty());
261        assert!(buffer.has_event_type(EventType::Metric));
262        assert!(!buffer.has_event_type(EventType::EventD));
263        assert!(!buffer.has_event_type(EventType::ServiceCheck));
264
265        // Finally, clear the buffer and ensure it's empty and has no seen data types:
266        buffer.clear();
267        assert!(buffer.is_empty());
268        assert!(!buffer.has_event_type(EventType::Metric));
269        assert!(!buffer.has_event_type(EventType::EventD));
270        assert!(!buffer.has_event_type(EventType::ServiceCheck));
271    }
272
273    #[test]
274    fn has_event_type() {
275        let mut buffer = FixedSizeEventBuffer::<2>::default();
276        assert!(!buffer.has_event_type(EventType::Metric));
277        assert!(!buffer.has_event_type(EventType::EventD));
278        assert!(!buffer.has_event_type(EventType::ServiceCheck));
279
280        assert!(buffer.try_push(Event::Metric(Metric::counter("foo", 42.0))).is_none());
281        assert!(buffer.has_event_type(EventType::Metric));
282        assert!(!buffer.has_event_type(EventType::EventD));
283        assert!(!buffer.has_event_type(EventType::ServiceCheck));
284
285        assert!(buffer.try_push(Event::EventD(EventD::new("title", "text"))).is_none());
286        assert!(buffer.has_event_type(EventType::Metric));
287        assert!(buffer.has_event_type(EventType::EventD));
288        assert!(!buffer.has_event_type(EventType::ServiceCheck));
289    }
290
291    #[test]
292    fn extract() {
293        let mut buffer = FixedSizeEventBuffer::<10>::default();
294        assert!(buffer.try_push(Event::Metric(Metric::counter("foo", 42.0))).is_none());
295        assert!(buffer.try_push(Event::Metric(Metric::counter("foo", 43.0))).is_none());
296        assert!(buffer.try_push(Event::EventD(EventD::new("foo1", "bar1"))).is_none());
297        assert!(buffer.try_push(Event::EventD(EventD::new("foo2", "bar2"))).is_none());
298        assert!(buffer.try_push(Event::EventD(EventD::new("foo3", "bar3"))).is_none());
299        assert!(buffer
300            .try_push(Event::ServiceCheck(ServiceCheck::new("foo4", CheckStatus::Ok)))
301            .is_none());
302        assert!(buffer
303            .try_push(Event::ServiceCheck(ServiceCheck::new("foo5", CheckStatus::Ok)))
304            .is_none());
305
306        assert_eq!(buffer.len(), 7);
307        assert!(buffer.has_event_type(EventType::Metric));
308        assert!(buffer.has_event_type(EventType::EventD));
309        assert!(buffer.has_event_type(EventType::ServiceCheck));
310
311        let eventd_event_buffer: VecDeque<Event> = buffer.extract(Event::is_eventd).collect();
312        assert_eq!(buffer.len(), 4);
313        assert_eq!(eventd_event_buffer.len(), 3);
314        assert!(buffer.has_event_type(EventType::Metric));
315        assert!(!buffer.has_event_type(EventType::EventD));
316        assert!(buffer.has_event_type(EventType::ServiceCheck));
317
318        let service_checks_event_buffer: VecDeque<Event> = buffer.extract(Event::is_service_check).collect();
319        assert_eq!(buffer.len(), 2);
320        assert_eq!(service_checks_event_buffer.len(), 2);
321        assert!(buffer.has_event_type(EventType::Metric));
322        assert!(!buffer.has_event_type(EventType::EventD));
323        assert!(!buffer.has_event_type(EventType::ServiceCheck));
324
325        let new_buffer: VecDeque<Event> = buffer.extract(Event::is_metric).collect();
326        assert_eq!(buffer.len(), 0);
327        assert_eq!(new_buffer.len(), 2);
328        assert!(!buffer.has_event_type(EventType::Metric));
329        assert!(!buffer.has_event_type(EventType::EventD));
330        assert!(!buffer.has_event_type(EventType::ServiceCheck));
331    }
332
333    #[test]
334    fn remove_if() {
335        let event1 = Event::Metric(Metric::counter("foo", 42.0));
336        let event2 = Event::EventD(EventD::new("foo1", "bar1"));
337        let event3 = Event::EventD(EventD::new("foo2", "bar2"));
338        let event4 = Event::ServiceCheck(ServiceCheck::new("foo5", CheckStatus::Ok));
339
340        let mut buffer = FixedSizeEventBuffer::<10>::default();
341
342        // Add the four events.
343        assert!(buffer.try_push(event1.clone()).is_none());
344        assert!(buffer.try_push(event2.clone()).is_none());
345        assert!(buffer.try_push(event3.clone()).is_none());
346        assert!(buffer.try_push(event4.clone()).is_none());
347        assert_eq!(buffer.len(), 4);
348        assert!(buffer.has_event_type(EventType::Metric));
349        assert!(buffer.has_event_type(EventType::EventD));
350        assert!(buffer.has_event_type(EventType::ServiceCheck));
351
352        // Remove events that are EventD.
353        buffer.remove_if(|event| event.is_eventd());
354        assert_eq!(buffer.len(), 2);
355        assert!(buffer.has_event_type(EventType::Metric));
356        assert!(!buffer.has_event_type(EventType::EventD));
357        assert!(buffer.has_event_type(EventType::ServiceCheck));
358
359        // Make sure we have the expected events left.
360        let mut remaining_events = buffer.into_iter().collect::<Vec<_>>();
361
362        let remaining_event1 = remaining_events.remove(0);
363        assert_eq!(remaining_event1, event1);
364
365        let remaining_event2 = remaining_events.remove(0);
366        assert_eq!(remaining_event2, event4);
367    }
368
369    #[test]
370    fn event_buffer_manager_try_push_not_full() {
371        let mut manager = EventBufferManager::<4>::default();
372
373        let event1 = Event::Metric(Metric::counter("foo", 42.0));
374        let event2 = Event::EventD(EventD::new("foo1", "bar1"));
375        let event3 = Event::EventD(EventD::new("foo2", "bar2"));
376        let event4 = Event::ServiceCheck(ServiceCheck::new("foo5", CheckStatus::Ok));
377
378        // Add the four events.
379        assert!(manager.try_push(event1.clone()).is_none());
380        assert!(manager.try_push(event2.clone()).is_none());
381        assert!(manager.try_push(event3.clone()).is_none());
382        assert!(manager.try_push(event4.clone()).is_none());
383
384        // Consume the current buffer and ensure all of the events match the expected events.
385        let mut buffered_events = manager.consume().expect("manager should have buffer").into_iter();
386
387        assert_eq!(Some(event1), buffered_events.next());
388        assert_eq!(Some(event2), buffered_events.next());
389        assert_eq!(Some(event3), buffered_events.next());
390        assert_eq!(Some(event4), buffered_events.next());
391        assert_eq!(None, buffered_events.next());
392    }
393
394    #[test]
395    fn event_buffer_manager_try_push_full() {
396        let mut manager = EventBufferManager::<3>::default();
397
398        let event1 = Event::Metric(Metric::counter("foo", 42.0));
399        let event2 = Event::EventD(EventD::new("foo1", "bar1"));
400        let event3 = Event::EventD(EventD::new("foo2", "bar2"));
401        let event4 = Event::ServiceCheck(ServiceCheck::new("foo5", CheckStatus::Ok));
402
403        // Add the four events.
404        //
405        // We should only be able to add three events before getting a buffer back, since we have a capacity of three.
406        assert!(manager.try_push(event1.clone()).is_none());
407        assert!(manager.try_push(event2.clone()).is_none());
408        assert!(manager.try_push(event3.clone()).is_none());
409
410        let first_buffer = manager.try_push(event4.clone());
411        assert!(first_buffer.is_some());
412
413        let mut buffered_events1 = first_buffer.unwrap().into_iter();
414        assert_eq!(Some(event1), buffered_events1.next());
415        assert_eq!(Some(event2), buffered_events1.next());
416        assert_eq!(Some(event3), buffered_events1.next());
417        assert_eq!(None, buffered_events1.next());
418
419        // Consume the buffer from the manager, which should have the fourth event.
420        let mut buffered_events2 = manager
421            .consume()
422            .expect("manager should have second buffer")
423            .into_iter();
424        assert_eq!(Some(event4), buffered_events2.next());
425        assert_eq!(None, buffered_events2.next());
426    }
427}