saluki_core/topology/interconnect/
dispatcher.rs

1use std::{borrow::Cow, time::Instant};
2
3use saluki_common::collections::FastHashMap;
4use saluki_error::{generic_error, GenericError};
5use saluki_metrics::static_metrics;
6use tokio::sync::mpsc;
7
8use super::Dispatchable;
9use crate::{components::ComponentContext, topology::OutputName};
10
11// TODO: When we have support for additional static labels on a per-metric basis, add `discard_reason` to
12// `events_discarded_total` metric to indicate that it's due to the destination component being disconnected.
13static_metrics!(
14    name => DispatcherMetrics,
15    prefix => component,
16    labels => [component_id: String, component_type: &'static str, output: String],
17    metrics => [
18        counter(events_sent_total),
19        trace_histogram(send_latency_seconds),
20        counter(events_discarded_total),
21    ],
22);
23
24impl DispatcherMetrics {
25    fn default_output(context: ComponentContext) -> Self {
26        Self::with_output_name(context, "_default")
27    }
28
29    fn named_output(context: ComponentContext, output_name: &str) -> Self {
30        Self::with_output_name(context, output_name)
31    }
32
33    fn with_output_name(context: ComponentContext, output_name: &str) -> Self {
34        Self::new(
35            context.component_id().to_string(),
36            context.component_type().as_str(),
37            output_name.to_string(),
38        )
39    }
40}
41
42/// A type that can be used as a buffer for dispatching items.
43pub trait DispatchBuffer: Dispatchable + Default {
44    /// Type of item that can be pushed into the buffer.
45    type Item;
46
47    /// Returns the number of items currently in the buffer.
48    fn len(&self) -> usize;
49
50    /// Returns `true` if the buffer is full.
51    fn is_full(&self) -> bool;
52
53    /// Attempts to push an item into the buffer.
54    ///
55    /// Returns `Some(item)` if the buffer is full and the item could not be pushed.
56    fn try_push(&mut self, item: Self::Item) -> Option<Self::Item>;
57}
58
59struct DispatchTarget<T> {
60    metrics: DispatcherMetrics,
61    senders: Vec<mpsc::Sender<T>>,
62}
63
64impl<T> DispatchTarget<T>
65where
66    T: Dispatchable,
67{
68    fn default_output(context: ComponentContext) -> Self {
69        Self {
70            metrics: DispatcherMetrics::default_output(context),
71            senders: Vec::new(),
72        }
73    }
74
75    fn named_output(context: ComponentContext, output_name: &str) -> Self {
76        Self {
77            metrics: DispatcherMetrics::named_output(context, output_name),
78            senders: Vec::new(),
79        }
80    }
81
82    fn add_sender(&mut self, sender: mpsc::Sender<T>) {
83        self.senders.push(sender);
84    }
85
86    async fn send(&self, item: T) -> Result<(), GenericError> {
87        if self.senders.is_empty() {
88            // Track discarded events when no senders are attached to this output
89            let item_count = item.item_count() as u64;
90            self.metrics.events_discarded_total().increment(item_count);
91            return Ok(());
92        }
93
94        let start = Instant::now();
95        let item_count = item.item_count();
96
97        // Send the item to all senders except the last one by cloning the item.
98        let cloned_sends = self.senders.len() - 1;
99        for sender in &self.senders[0..cloned_sends] {
100            sender
101                .send(item.clone())
102                .await
103                .map_err(|_| generic_error!("Failed to send to output."))?;
104        }
105
106        // Send the item to the last sender without cloning.
107        let last_sender = &self.senders[cloned_sends];
108        last_sender
109            .send(item)
110            .await
111            .map_err(|_| generic_error!("Failed to send to output."))?;
112
113        let elapsed = start.elapsed();
114
115        // TODO: We should consider splitting this out per-sender somehow. We would need to carry around the
116        // destination component's ID, though, to properly associate it.
117        self.metrics.send_latency_seconds().record(elapsed);
118
119        let total_events_sent = (self.senders.len() * item_count) as u64;
120        self.metrics.events_sent_total().increment(total_events_sent);
121
122        Ok(())
123    }
124}
125
126/// A buffered dispatcher.
127///
128/// `BufferedDispatcher` provides an efficient and ergonomic interface to `Dispatcher` that allows for writing events
129/// one-by-one into batches, which are then dispatched to the configured output as needed. This allows callers to focus
130/// on the logic around what items to send, without needing to worry about the details of event buffer sizing or
131/// flushing.
132pub struct BufferedDispatcher<'a, T> {
133    metrics: &'a DispatcherMetrics,
134    flushed_len: usize,
135    buffer: Option<T>,
136    target: &'a DispatchTarget<T>,
137}
138
139impl<'a, T> BufferedDispatcher<'a, T> {
140    fn new(target: &'a DispatchTarget<T>) -> Self {
141        Self {
142            metrics: &target.metrics,
143            flushed_len: 0,
144            buffer: None,
145            target,
146        }
147    }
148}
149
150impl<T> BufferedDispatcher<'_, T>
151where
152    T: DispatchBuffer,
153{
154    async fn try_flush_buffer(&self, buffer: T) -> Result<(), GenericError> {
155        let buffer_len = buffer.len();
156        if buffer_len > 0 {
157            self.target.send(buffer).await
158        } else {
159            Ok(())
160        }
161    }
162
163    /// Pushes an item into the buffered dispatcher.
164    ///
165    /// # Errors
166    ///
167    /// If there is an error flushing items to the output, or if there is an error acquiring a new buffer, an error
168    /// is returned.
169    pub async fn push(&mut self, item: T::Item) -> Result<(), GenericError> {
170        // If our current buffer is full, flush it before acquiring a new one.
171        if let Some(old_buffer) = self.buffer.take_if(|b| b.is_full()) {
172            self.try_flush_buffer(old_buffer).await?;
173        }
174
175        // Add the item to our current buffer.
176        //
177        // If our current buffer is empty, create a new one first. If the current buffer is full, return an error
178        // because it should be impossible to get a new buffer that is full.
179        let buffer = self.buffer.get_or_insert_default();
180        if buffer.try_push(item).is_some() {
181            return Err(generic_error!("Dispatch buffer already full after acquisition."));
182        }
183
184        self.flushed_len += 1;
185
186        Ok(())
187    }
188
189    /// Consumes this buffered dispatcher and sends/flushes all input items to the underlying output.
190    ///
191    /// If flushing is successful, `Ok(flushed)` is returned, where `flushed` is the total number of items that
192    /// have been flushed through this buffered dispatcher.
193    ///
194    /// # Errors
195    ///
196    /// If there is an error sending items to the output, an error is returned.
197    pub async fn send_all<I>(mut self, items: I) -> Result<usize, GenericError>
198    where
199        I: IntoIterator<Item = T::Item>,
200    {
201        for item in items {
202            self.push(item).await?;
203        }
204
205        self.flush().await
206    }
207
208    /// Consumes this buffered dispatcher, flushing any buffered items to the underlying output.
209    ///
210    /// If flushing is successful, `Ok(flushed)` is returned, where `flushed` is the total number of items that have
211    /// been flushed through this buffered dispatcher.
212    ///
213    /// # Errors
214    ///
215    /// If there is an error sending items to the output, an error is returned.
216    pub async fn flush(mut self) -> Result<usize, GenericError> {
217        if let Some(old_buffer) = self.buffer.take() {
218            self.try_flush_buffer(old_buffer).await?;
219        }
220
221        // We increment the "events sent" metric here because we want to count the number of buffered items, vs doing it in
222        // `DispatchTarget::send` where all it knows is that it sent one item.
223        self.metrics.events_sent_total().increment(self.flushed_len as u64);
224
225        Ok(self.flushed_len)
226    }
227}
228
229/// Dispatches items from one component to another.
230///
231/// [`Dispatcher`] provides an ergonomic interface for sending items to a downstream component. It has support for
232/// multiple outputs (a default output, and additional "named" outputs) and provides telemetry around the number of
233/// dispatched items as well as the latency of sending them.
234pub struct Dispatcher<T>
235where
236    T: Dispatchable,
237{
238    context: ComponentContext,
239    default: Option<DispatchTarget<T>>,
240    targets: FastHashMap<Cow<'static, str>, DispatchTarget<T>>,
241}
242
243impl<T> Dispatcher<T>
244where
245    T: Dispatchable,
246{
247    /// Create a new `Dispatcher` for the given component context.
248    pub fn new(context: ComponentContext) -> Self {
249        Self {
250            context,
251            default: None,
252            targets: FastHashMap::default(),
253        }
254    }
255
256    /// Adds an output to the dispatcher.
257    ///
258    /// # Errors
259    ///
260    /// If the output already exists, an error is returned.
261    pub fn add_output(&mut self, output_name: OutputName) -> Result<(), GenericError> {
262        match output_name {
263            OutputName::Default => {
264                if self.default.is_some() {
265                    return Err(generic_error!("Default output already exists."));
266                }
267
268                self.default = Some(DispatchTarget::default_output(self.context.clone()));
269            }
270            OutputName::Given(name) => {
271                if self.targets.contains_key(&name) {
272                    return Err(generic_error!("Output '{}' already exists.", name));
273                }
274                let target = DispatchTarget::named_output(self.context.clone(), &name);
275                self.targets.insert(name, target);
276            }
277        }
278
279        Ok(())
280    }
281
282    /// Attaches a sender to the given output.
283    ///
284    /// # Errors
285    ///
286    /// If the output does not exist, an error is returned.
287    pub fn attach_sender_to_output(
288        &mut self, output_name: &OutputName, sender: mpsc::Sender<T>,
289    ) -> Result<(), GenericError> {
290        let target = match output_name {
291            OutputName::Default => self
292                .default
293                .as_mut()
294                .ok_or_else(|| generic_error!("No default output declared."))?,
295            OutputName::Given(name) => self
296                .targets
297                .get_mut(name)
298                .ok_or_else(|| generic_error!("Output '{}' does not exist.", name))?,
299        };
300        target.add_sender(sender);
301
302        Ok(())
303    }
304
305    fn get_default_output(&self) -> Result<&DispatchTarget<T>, GenericError> {
306        self.default
307            .as_ref()
308            .ok_or_else(|| generic_error!("No default output declared."))
309    }
310
311    fn get_named_output(&self, name: &str) -> Result<&DispatchTarget<T>, GenericError> {
312        self.targets
313            .get(name)
314            .ok_or_else(|| generic_error!("No output named '{}' declared.", name))
315    }
316
317    /// Returns `true` if the default output is connected to downstream components.
318    pub fn is_default_output_connected(&self) -> bool {
319        self.default.as_ref().is_some_and(|target| !target.senders.is_empty())
320    }
321
322    /// Returns `true` if the named output is connected to downstream components.
323    pub fn is_named_output_connected(&self, name: &str) -> bool {
324        self.targets.get(name).is_some_and(|target| !target.senders.is_empty())
325    }
326
327    /// Dispatches the given item to the default output.
328    ///
329    /// # Errors
330    ///
331    /// If the default output is not set, or there is an error sending to the default output, an error is returned.
332    pub async fn dispatch(&self, item: T) -> Result<(), GenericError> {
333        self.dispatch_inner(None, item).await
334    }
335
336    /// Dispatches the given items to the given named output.
337    ///
338    /// # Errors
339    ///
340    /// If a output of the given name is not set, or there is an error sending to the output, an error is returned.
341    pub async fn dispatch_named<N>(&self, output_name: N, item: T) -> Result<(), GenericError>
342    where
343        N: AsRef<str>,
344    {
345        self.dispatch_inner(Some(output_name.as_ref()), item).await
346    }
347
348    async fn dispatch_inner(&self, output_name: Option<&str>, item: T) -> Result<(), GenericError> {
349        let target = match output_name {
350            None => self.get_default_output()?,
351            Some(name) => self.get_named_output(name)?,
352        };
353
354        target.send(item).await?;
355
356        Ok(())
357    }
358}
359
360impl<T> Dispatcher<T>
361where
362    T: DispatchBuffer,
363{
364    /// Creates a buffered dispatcher for the default output.
365    ///
366    /// This should generally be used if the items being dispatched are not already collected in a container, or exposed
367    /// via an iterable type. It allows for efficiently buffering items one-by-one before dispatching them to the
368    /// underlying output.
369    ///
370    /// # Errors
371    ///
372    /// If the default output has not been configured, an error will be returned.
373    pub fn buffered(&self) -> Result<BufferedDispatcher<'_, T>, GenericError> {
374        self.get_default_output().map(BufferedDispatcher::new)
375    }
376
377    /// Creates a buffered dispatcher for the given named output.
378    ///
379    /// This should generally be used if the items being dispatched are not already collected in a container, or exposed
380    /// via an iterable type. It allows for efficiently buffering items one-by-one before dispatching them to the
381    /// underlying output.
382    ///
383    /// # Errors
384    ///
385    /// If the given named output has not been configured, an error will be returned.
386    pub fn buffered_named<N>(&self, output_name: N) -> Result<BufferedDispatcher<'_, T>, GenericError>
387    where
388        N: AsRef<str>,
389    {
390        self.get_named_output(output_name.as_ref()).map(BufferedDispatcher::new)
391    }
392}
393
394#[cfg(test)]
395mod tests {
396    // TODO: Tests asserting we emit metrics, and the right metrics.
397
398    use std::ops::Deref;
399
400    use metrics::{Key, Label};
401    use metrics_util::{
402        debugging::{DebugValue, DebuggingRecorder, Snapshotter},
403        CompositeKey, MetricKind,
404    };
405    use ordered_float::OrderedFloat;
406
407    use super::*;
408
409    #[derive(Clone, Copy, Debug, Eq, PartialEq)]
410    struct SingleEvent<T>(T);
411
412    impl<T: Clone + Copy> Dispatchable for SingleEvent<T> {
413        fn item_count(&self) -> usize {
414            1
415        }
416    }
417
418    impl<T: Clone + Copy> From<T> for SingleEvent<T> {
419        fn from(value: T) -> Self {
420            Self(value)
421        }
422    }
423
424    #[derive(Clone, Debug, Eq, PartialEq)]
425    struct FixedUsizeVec<const N: usize> {
426        data: [usize; N],
427        len: usize,
428    }
429
430    impl<const N: usize> Default for FixedUsizeVec<N> {
431        fn default() -> Self {
432            Self { data: [0; N], len: 0 }
433        }
434    }
435
436    impl<const N: usize> Deref for FixedUsizeVec<N> {
437        type Target = [usize];
438
439        fn deref(&self) -> &Self::Target {
440            &self.data
441        }
442    }
443
444    impl<const N: usize> Dispatchable for FixedUsizeVec<N> {
445        fn item_count(&self) -> usize {
446            self.len
447        }
448    }
449
450    impl<const N: usize> DispatchBuffer for FixedUsizeVec<N> {
451        type Item = usize;
452
453        fn len(&self) -> usize {
454            self.len
455        }
456
457        fn is_full(&self) -> bool {
458            self.len == N
459        }
460
461        fn try_push(&mut self, item: Self::Item) -> Option<Self::Item> {
462            if self.is_full() {
463                Some(item)
464            } else {
465                self.data[self.len] = item;
466                self.len += 1;
467                None
468            }
469        }
470    }
471
472    fn unbuffered_dispatcher<T: Dispatchable>() -> Dispatcher<T> {
473        let component_context = ComponentContext::test_source("dispatcher_test");
474        Dispatcher::new(component_context)
475    }
476
477    fn buffered_dispatcher<T: DispatchBuffer>() -> Dispatcher<T> {
478        unbuffered_dispatcher()
479    }
480
481    fn add_dispatcher_default_output<T: Dispatchable, const N: usize>(
482        dispatcher: &mut Dispatcher<T>, senders: [mpsc::Sender<T>; N],
483    ) {
484        dispatcher
485            .add_output(OutputName::Default)
486            .expect("default output should not be added yet");
487        for sender in senders {
488            dispatcher
489                .attach_sender_to_output(&OutputName::Default, sender)
490                .expect("default output should be added");
491        }
492    }
493
494    fn add_dispatcher_named_output<T: Dispatchable, const N: usize>(
495        dispatcher: &mut Dispatcher<T>, output_name: &'static str, senders: [mpsc::Sender<T>; N],
496    ) {
497        dispatcher
498            .add_output(OutputName::Given(output_name.into()))
499            .expect("named output should not be added yet");
500        for sender in senders {
501            dispatcher
502                .attach_sender_to_output(&OutputName::Given(output_name.into()), sender)
503                .expect("named output should be added");
504        }
505    }
506
507    fn get_dispatcher_metric_ckey(
508        kind: MetricKind, name: &'static str, output_name: &'static str, tags: &[(&'static str, &'static str)],
509    ) -> CompositeKey {
510        let mut labels = vec![
511            Label::from_static_parts("component_id", "dispatcher_test"),
512            Label::from_static_parts("component_type", "source"),
513            Label::from_static_parts("output", output_name),
514        ];
515
516        for tag in tags {
517            labels.push(Label::from_static_parts(tag.0, tag.1));
518        }
519
520        let key = Key::from_parts(name, labels);
521        CompositeKey::new(kind, key)
522    }
523
524    fn get_output_metrics(snapshotter: &Snapshotter, output_name: &'static str) -> (u64, u64, Vec<OrderedFloat<f64>>) {
525        let events_sent_key = get_dispatcher_metric_ckey(
526            MetricKind::Counter,
527            DispatcherMetrics::events_sent_total_name(),
528            output_name,
529            &[],
530        );
531        let events_discarded_key = get_dispatcher_metric_ckey(
532            MetricKind::Counter,
533            DispatcherMetrics::events_discarded_total_name(),
534            output_name,
535            &[],
536        );
537        let send_latency_key = get_dispatcher_metric_ckey(
538            MetricKind::Histogram,
539            DispatcherMetrics::send_latency_seconds_name(),
540            output_name,
541            &[],
542        );
543
544        // TODO: This API for querying the metrics really sucks... and we need something better.
545        let current_metrics = snapshotter.snapshot().into_hashmap();
546        let (_, _, events_sent) = current_metrics
547            .get(&events_sent_key)
548            .expect("should have events sent metric");
549        let (_, _, events_discarded) = current_metrics
550            .get(&events_discarded_key)
551            .expect("should have events discarded metric");
552        let (_, _, send_latency) = current_metrics
553            .get(&send_latency_key)
554            .expect("should have send latency metric");
555
556        let events_sent = match events_sent {
557            DebugValue::Counter(value) => *value,
558            _ => panic!("unexpected metric type for events sent"),
559        };
560
561        let events_discarded = match events_discarded {
562            DebugValue::Counter(value) => *value,
563            _ => panic!("unexpected metric type for events discarded"),
564        };
565
566        let send_latency = match send_latency {
567            DebugValue::Histogram(value) => value.clone(),
568            _ => panic!("unexpected metric type for send latency"),
569        };
570
571        (events_sent, events_discarded, send_latency)
572    }
573
574    #[tokio::test]
575    async fn default_output() {
576        // Create the dispatcher and wire up a sender to the default output.
577        let mut dispatcher = unbuffered_dispatcher::<SingleEvent<usize>>();
578
579        let (tx, mut rx) = mpsc::channel(1);
580        add_dispatcher_default_output(&mut dispatcher, [tx]);
581
582        // Create an item and roundtrip it through the dispatcher.
583        let input_item = 42.into();
584
585        dispatcher.dispatch(input_item).await.unwrap();
586
587        let output_item = rx.try_recv().expect("input item should have been dispatched");
588        assert_eq!(output_item, input_item);
589    }
590
591    #[tokio::test]
592    async fn named_output() {
593        // Create the dispatcher and wire up a sender to a named output.
594        let mut dispatcher = unbuffered_dispatcher::<SingleEvent<usize>>();
595
596        let output_name = "special";
597        let (tx, mut rx) = mpsc::channel(1);
598        add_dispatcher_named_output(&mut dispatcher, output_name, [tx]);
599
600        // Create an item and roundtrip it through the dispatcher.
601        let input_item = 42.into();
602
603        dispatcher.dispatch_named(output_name, input_item).await.unwrap();
604
605        let output_item = rx.try_recv().expect("input item should have been dispatched");
606        assert_eq!(output_item, input_item);
607    }
608
609    #[tokio::test]
610    async fn default_output_multiple_senders() {
611        // Create the dispatcher and wire up two senders to the default output.
612        let mut dispatcher = unbuffered_dispatcher::<SingleEvent<usize>>();
613
614        let (tx1, mut rx1) = mpsc::channel(1);
615        let (tx2, mut rx2) = mpsc::channel(1);
616        add_dispatcher_default_output(&mut dispatcher, [tx1, tx2]);
617
618        // Create an item and roundtrip it through the dispatcher.
619        let input_item = 42.into();
620
621        dispatcher.dispatch(input_item).await.unwrap();
622
623        let output_item1 = rx1.try_recv().expect("input item should have been dispatched");
624        let output_item2 = rx2.try_recv().expect("input item should have been dispatched");
625        assert_eq!(output_item1, input_item);
626        assert_eq!(output_item2, input_item);
627    }
628
629    #[tokio::test]
630    async fn named_output_multiple_senders() {
631        // Create the dispatcher and wire up two senders to a named output.
632        let mut dispatcher = unbuffered_dispatcher::<SingleEvent<usize>>();
633
634        let output_name = "special";
635        let (tx1, mut rx1) = mpsc::channel(1);
636        let (tx2, mut rx2) = mpsc::channel(1);
637        add_dispatcher_named_output(&mut dispatcher, output_name, [tx1, tx2]);
638
639        // Create an item and roundtrip it through the dispatcher.
640        let input_item = 42.into();
641
642        dispatcher.dispatch_named(output_name, input_item).await.unwrap();
643
644        let output_item1 = rx1.try_recv().expect("input item should have been dispatched");
645        let output_item2 = rx2.try_recv().expect("input item should have been dispatched");
646        assert_eq!(output_item1, input_item);
647        assert_eq!(output_item2, input_item);
648    }
649
650    #[tokio::test]
651    async fn default_output_not_set() {
652        // Create the dispatcher and try to dispatch an item without setting up a default output.
653        let dispatcher = unbuffered_dispatcher::<SingleEvent<()>>();
654
655        let result = dispatcher.dispatch(().into()).await;
656        assert!(result.is_err());
657    }
658
659    #[tokio::test]
660    async fn named_output_not_set() {
661        // Create the dispatcher and try to dispatch an event without setting up a named output.
662        let dispatcher = unbuffered_dispatcher::<SingleEvent<()>>();
663
664        let result = dispatcher.dispatch_named("non_existent", ().into()).await;
665        assert!(result.is_err());
666    }
667
668    #[tokio::test]
669    async fn default_output_buffered_partial() {
670        // Create the dispatcher and wire up a sender to the default output, using a bufferable type.
671        let mut dispatcher = buffered_dispatcher::<FixedUsizeVec<4>>();
672
673        let (tx, mut rx) = mpsc::channel(1);
674        add_dispatcher_default_output(&mut dispatcher, [tx]);
675
676        // Create an item and roundtrip it through the dispatcher.
677        let input_item = 42;
678
679        let mut buffered = dispatcher.buffered().unwrap();
680        buffered.push(input_item).await.unwrap();
681
682        let flushed_len = buffered.flush().await.unwrap();
683        assert_eq!(flushed_len, 1);
684
685        let output_item = rx.try_recv().expect("input item should have been dispatched");
686        assert_eq!(output_item.len(), 1);
687        assert_eq!(output_item[0], input_item);
688    }
689
690    #[tokio::test]
691    async fn named_output_buffered_partial() {
692        // Create the dispatcher and wire up a sender to a named output, using a bufferable type.
693        let mut dispatcher = buffered_dispatcher::<FixedUsizeVec<4>>();
694
695        let output_name = "buffered_partial";
696        let (tx, mut rx) = mpsc::channel(1);
697        add_dispatcher_named_output(&mut dispatcher, output_name, [tx]);
698
699        // Create an item and roundtrip it through the dispatcher.
700        let input_item = 42;
701
702        let mut buffered = dispatcher.buffered_named(output_name).unwrap();
703        buffered.push(input_item).await.unwrap();
704
705        let flushed_len = buffered.flush().await.unwrap();
706        assert_eq!(flushed_len, 1);
707
708        let output_item = rx.try_recv().expect("input item should have been dispatched");
709        assert_eq!(output_item.len(), 1);
710        assert_eq!(output_item[0], input_item);
711    }
712
713    #[tokio::test]
714    async fn default_output_buffered_overflow() {
715        // Create the dispatcher and wire up a sender to the default output, using a bufferable type.
716        let mut dispatcher = buffered_dispatcher::<FixedUsizeVec<4>>();
717
718        let (tx, mut rx) = mpsc::channel(2);
719        add_dispatcher_default_output(&mut dispatcher, [tx]);
720
721        // Create multiple items and roundtrip them through the dispatcher.
722        //
723        // We explicitly create more items than a single buffer can hold to exercise full buffers
724        // being flushed during push.
725        let input_items: Vec<usize> = vec![1, 2, 3, 4, 5, 6];
726
727        let mut buffered = dispatcher.buffered().unwrap();
728
729        for item in &input_items {
730            buffered.push(*item).await.unwrap();
731        }
732
733        let flushed_len = buffered.flush().await.unwrap();
734        assert_eq!(flushed_len, input_items.len());
735
736        let output_item1 = rx.try_recv().expect("input item should have been dispatched");
737        assert_eq!(output_item1.len(), 4);
738        assert_eq!(output_item1[0..4], input_items[0..4]);
739
740        let output_item2 = rx.try_recv().expect("input item should have been dispatched");
741        assert_eq!(output_item2.len(), 2);
742        assert_eq!(output_item2[0..2], input_items[4..6]);
743    }
744
745    #[tokio::test]
746    async fn named_output_buffered_overflow() {
747        // Create the dispatcher and wire up a sender to a named output, using a bufferable type.
748        let mut dispatcher = buffered_dispatcher::<FixedUsizeVec<4>>();
749
750        let output_name = "buffered_overflow";
751        let (tx, mut rx) = mpsc::channel(2);
752        add_dispatcher_named_output(&mut dispatcher, output_name, [tx]);
753
754        // Create multiple items and roundtrip them through the dispatcher.
755        //
756        // We explicitly create more items than a single buffer can hold to exercise full buffers
757        // being flushed during push.
758        let input_items: Vec<usize> = vec![1, 2, 3, 4, 5, 6];
759
760        let mut buffered = dispatcher.buffered_named(output_name).unwrap();
761
762        for item in &input_items {
763            buffered.push(*item).await.unwrap();
764        }
765
766        let flushed_len = buffered.flush().await.unwrap();
767        assert_eq!(flushed_len, input_items.len());
768
769        let output_item1 = rx.try_recv().expect("input item should have been dispatched");
770        assert_eq!(output_item1.len(), 4);
771        assert_eq!(output_item1[0..4], input_items[0..4]);
772
773        let output_item2 = rx.try_recv().expect("input item should have been dispatched");
774        assert_eq!(output_item2.len(), 2);
775        assert_eq!(output_item2[0..2], input_items[4..6]);
776    }
777
778    #[tokio::test]
779    async fn default_output_buffered_partial_multiple_senders() {
780        // Create the dispatcher and wire up two senders to the default output, using a bufferable type.
781        let mut dispatcher = buffered_dispatcher::<FixedUsizeVec<4>>();
782
783        let (tx1, mut rx1) = mpsc::channel(1);
784        let (tx2, mut rx2) = mpsc::channel(1);
785        add_dispatcher_default_output(&mut dispatcher, [tx1, tx2]);
786
787        // Create an item and roundtrip it through the dispatcher.
788        let input_item = 42;
789
790        let mut buffered = dispatcher.buffered().unwrap();
791        buffered.push(input_item).await.unwrap();
792
793        let flushed_len = buffered.flush().await.unwrap();
794        assert_eq!(flushed_len, 1);
795
796        let output_item1 = rx1.try_recv().expect("input item should have been dispatched");
797        assert_eq!(output_item1.len(), 1);
798        assert_eq!(output_item1[0], input_item);
799
800        let output_item2 = rx2.try_recv().expect("input item should have been dispatched");
801        assert_eq!(output_item2.len(), 1);
802        assert_eq!(output_item2[0], input_item);
803    }
804
805    #[tokio::test]
806    async fn named_output_buffered_partial_multiple_senders() {
807        // Create the dispatcher and wire up two senders to a named output, using a bufferable type.
808        let mut dispatcher = buffered_dispatcher::<FixedUsizeVec<4>>();
809
810        let output_name = "buffered_partial";
811        let (tx1, mut rx1) = mpsc::channel(1);
812        let (tx2, mut rx2) = mpsc::channel(1);
813        add_dispatcher_named_output(&mut dispatcher, output_name, [tx1, tx2]);
814
815        // Create an item and roundtrip it through the dispatcher.
816        let input_item = 42;
817
818        let mut buffered = dispatcher.buffered_named(output_name).unwrap();
819        buffered.push(input_item).await.unwrap();
820
821        let flushed_len = buffered.flush().await.unwrap();
822        assert_eq!(flushed_len, 1);
823
824        let output_item1 = rx1.try_recv().expect("input item should have been dispatched");
825        assert_eq!(output_item1.len(), 1);
826        assert_eq!(output_item1[0], input_item);
827
828        let output_item2 = rx2.try_recv().expect("input item should have been dispatched");
829        assert_eq!(output_item2.len(), 1);
830        assert_eq!(output_item2[0], input_item);
831    }
832
833    #[tokio::test]
834    async fn default_output_no_senders() {
835        // Test that we can add a default output and dispatch to it even with no senders attached
836        let mut dispatcher = unbuffered_dispatcher::<SingleEvent<u32>>();
837
838        // Add default output but don't attach any senders
839        dispatcher
840            .add_output(OutputName::Default)
841            .expect("should be able to add default output");
842
843        // Should not panic when dispatching to output with no senders
844        let test_event = 42.into();
845        let result = dispatcher.dispatch(test_event).await;
846        assert!(
847            result.is_ok(),
848            "dispatch to default output with no senders should succeed"
849        );
850    }
851
852    #[tokio::test]
853    async fn named_output_no_senders() {
854        // Test that we can add a named output and dispatch to it even with no senders attached
855        let mut dispatcher = unbuffered_dispatcher::<SingleEvent<u32>>();
856
857        // Add named output but don't attach any senders
858        dispatcher
859            .add_output(OutputName::Given("errors".into()))
860            .expect("should be able to add named output");
861
862        // Should not panic when dispatching to output with no senders
863        let test_event = 42.into();
864        let result = dispatcher.dispatch_named("errors", test_event).await;
865        assert!(
866            result.is_ok(),
867            "dispatch to named output with no senders should succeed"
868        );
869    }
870
871    #[tokio::test]
872    async fn metrics_default_output_disconnected() {
873        let recorder = DebuggingRecorder::new();
874        let snapshotter = recorder.snapshotter();
875        let dispatcher = metrics::with_local_recorder(&recorder, || {
876            let mut dispatcher = buffered_dispatcher::<FixedUsizeVec<4>>();
877            dispatcher
878                .add_output(OutputName::Default)
879                .expect("should not fail to add default output");
880            dispatcher
881        });
882
883        // Send an item with an item count of 1, and make sure we can receive it, and that we update our metrics accordingly:
884        let mut single_item = FixedUsizeVec::<4>::default();
885        assert_eq!(None, single_item.try_push(42));
886        let single_item_item_count = single_item.item_count() as u64;
887
888        dispatcher
889            .dispatch(single_item.clone())
890            .await
891            .expect("should not fail to dispatch");
892
893        let (events_sent, events_discarded, send_latencies) = get_output_metrics(&snapshotter, "_default");
894        assert_eq!(events_sent, 0);
895        assert_eq!(events_discarded, single_item_item_count);
896        assert!(send_latencies.is_empty());
897
898        // Now send an item with an item count of 3, and make sure we can receive it, and that we update our metrics accordingly:
899        let mut multiple_items = FixedUsizeVec::<4>::default();
900        assert_eq!(None, multiple_items.try_push(42));
901        assert_eq!(None, multiple_items.try_push(12345));
902        assert_eq!(None, multiple_items.try_push(1337));
903        let multiple_items_item_count = multiple_items.item_count() as u64;
904
905        dispatcher
906            .dispatch(multiple_items.clone())
907            .await
908            .expect("should not fail to dispatch");
909
910        let (events_sent, events_discarded, send_latencies) = get_output_metrics(&snapshotter, "_default");
911        assert_eq!(events_sent, 0);
912        assert_eq!(events_discarded, multiple_items_item_count);
913        assert!(send_latencies.is_empty());
914    }
915
916    #[tokio::test]
917    async fn metrics_named_output_disconnected() {
918        let output_name = "some_output";
919
920        let recorder = DebuggingRecorder::new();
921        let snapshotter = recorder.snapshotter();
922        let dispatcher = metrics::with_local_recorder(&recorder, || {
923            let mut dispatcher = buffered_dispatcher::<FixedUsizeVec<4>>();
924            dispatcher
925                .add_output(OutputName::Given(output_name.into()))
926                .expect("should not fail to add named output");
927            dispatcher
928        });
929
930        // Send an item with an item count of 1, and make sure we can receive it, and that we update our metrics accordingly:
931        let mut single_item = FixedUsizeVec::<4>::default();
932        assert_eq!(None, single_item.try_push(42));
933        let single_item_item_count = single_item.item_count() as u64;
934
935        dispatcher
936            .dispatch_named(output_name, single_item.clone())
937            .await
938            .expect("should not fail to dispatch");
939
940        let (events_sent, events_discarded, send_latencies) = get_output_metrics(&snapshotter, output_name);
941        assert_eq!(events_sent, 0);
942        assert_eq!(events_discarded, single_item_item_count);
943        assert!(send_latencies.is_empty());
944
945        // Now send an item with an item count of 3, and make sure we can receive it, and that we update our metrics accordingly:
946        let mut multiple_items = FixedUsizeVec::<4>::default();
947        assert_eq!(None, multiple_items.try_push(42));
948        assert_eq!(None, multiple_items.try_push(12345));
949        assert_eq!(None, multiple_items.try_push(1337));
950        let multiple_items_item_count = multiple_items.item_count() as u64;
951
952        dispatcher
953            .dispatch_named(output_name, multiple_items.clone())
954            .await
955            .expect("should not fail to dispatch");
956
957        let (events_sent, events_discarded, send_latencies) = get_output_metrics(&snapshotter, output_name);
958        assert_eq!(events_sent, 0);
959        assert_eq!(events_discarded, multiple_items_item_count);
960        assert!(send_latencies.is_empty());
961    }
962
963    #[tokio::test]
964    async fn is_default_output_connected_behavior() {
965        let mut dispatcher = unbuffered_dispatcher::<SingleEvent<u32>>();
966
967        // Initially, no default output exists - should return false
968        assert!(
969            !dispatcher.is_default_output_connected(),
970            "should return false when no default output exists"
971        );
972
973        // Add default output but no senders - should return false
974        dispatcher
975            .add_output(OutputName::Default)
976            .expect("should be able to add default output");
977        assert!(
978            !dispatcher.is_default_output_connected(),
979            "should return false when default output exists but has no senders"
980        );
981
982        // Add a sender to the default output - should return true
983        let (tx, _rx) = mpsc::channel(1);
984        dispatcher
985            .attach_sender_to_output(&OutputName::Default, tx)
986            .expect("should be able to attach sender");
987        assert!(
988            dispatcher.is_default_output_connected(),
989            "should return true when default output has senders attached"
990        );
991    }
992
993    #[tokio::test]
994    async fn is_named_output_connected_behavior() {
995        let mut dispatcher = unbuffered_dispatcher::<SingleEvent<u32>>();
996        let output_name = "test_output";
997
998        // Initially, no named output exists - should return false
999        assert!(
1000            !dispatcher.is_named_output_connected(output_name),
1001            "should return false when named output doesn't exist"
1002        );
1003
1004        // Add named output but no senders - should return false
1005        dispatcher
1006            .add_output(OutputName::Given(output_name.into()))
1007            .expect("should be able to add named output");
1008        assert!(
1009            !dispatcher.is_named_output_connected(output_name),
1010            "should return false when named output exists but has no senders"
1011        );
1012
1013        // Add a sender to the named output - should return true
1014        let (tx, _rx) = mpsc::channel(1);
1015        dispatcher
1016            .attach_sender_to_output(&OutputName::Given(output_name.into()), tx)
1017            .expect("should be able to attach sender");
1018        assert!(
1019            dispatcher.is_named_output_connected(output_name),
1020            "should return true when named output has senders attached"
1021        );
1022
1023        // Test with a different output name that doesn't exist - should return false
1024        assert!(
1025            !dispatcher.is_named_output_connected("nonexistent_output"),
1026            "should return false for nonexistent output"
1027        );
1028    }
1029}