Skip to main content

saluki_core/topology/interconnect/
dispatcher.rs

1use std::{borrow::Cow, time::Instant};
2
3use saluki_common::collections::FastHashMap;
4use saluki_error::{generic_error, GenericError};
5use saluki_metrics::static_metrics;
6use tokio::sync::mpsc;
7
8use super::Dispatchable;
9use crate::{components::ComponentContext, topology::OutputName};
10
11// TODO: When we have support for additional static labels on a per-metric basis, add `discard_reason` to
12// `events_discarded_total` metric to indicate that it's due to the destination component being disconnected.
13static_metrics!(
14    name => DispatcherMetrics,
15    prefix => component,
16    labels => [component_id: String, component_type: &'static str, output: String],
17    metrics => [
18        counter(events_sent_total),
19        trace_histogram(send_latency_seconds),
20        counter(events_discarded_total),
21    ],
22);
23
24impl DispatcherMetrics {
25    fn default_output(context: ComponentContext) -> Self {
26        Self::with_output_name(context, "_default")
27    }
28
29    fn named_output(context: ComponentContext, output_name: &str) -> Self {
30        Self::with_output_name(context, output_name)
31    }
32
33    fn with_output_name(context: ComponentContext, output_name: &str) -> Self {
34        Self::new(
35            context.component_id().to_string(),
36            context.component_type().as_str(),
37            output_name.to_string(),
38        )
39    }
40}
41
42/// A type that can be used as a buffer for dispatching items.
43pub trait DispatchBuffer: Dispatchable + Default {
44    /// Type of item that can be pushed into the buffer.
45    type Item;
46
47    /// Returns the number of items currently in the buffer.
48    fn len(&self) -> usize;
49
50    /// Returns `true` if the buffer is full.
51    fn is_full(&self) -> bool;
52
53    /// Attempts to push an item into the buffer.
54    ///
55    /// Returns `Some(item)` if the buffer is full and the item couldn't be pushed.
56    fn try_push(&mut self, item: Self::Item) -> Option<Self::Item>;
57}
58
59struct DispatchTarget<T> {
60    metrics: DispatcherMetrics,
61    senders: Vec<mpsc::Sender<T>>,
62}
63
64impl<T> DispatchTarget<T>
65where
66    T: Dispatchable,
67{
68    fn default_output(context: ComponentContext) -> Self {
69        Self {
70            metrics: DispatcherMetrics::default_output(context),
71            senders: Vec::new(),
72        }
73    }
74
75    fn named_output(context: ComponentContext, output_name: &str) -> Self {
76        Self {
77            metrics: DispatcherMetrics::named_output(context, output_name),
78            senders: Vec::new(),
79        }
80    }
81
82    fn add_sender(&mut self, sender: mpsc::Sender<T>) {
83        self.senders.push(sender);
84    }
85
86    async fn send(&self, item: T) -> Result<(), GenericError> {
87        if self.senders.is_empty() {
88            // Track discarded events when no senders are attached to this output
89            let item_count = item.item_count() as u64;
90            self.metrics.events_discarded_total().increment(item_count);
91            return Ok(());
92        }
93
94        let start = Instant::now();
95        let item_count = item.item_count();
96
97        // Send the item to all senders except the last one by cloning the item.
98        let cloned_sends = self.senders.len() - 1;
99        for sender in &self.senders[0..cloned_sends] {
100            sender
101                .send(item.clone())
102                .await
103                .map_err(|_| generic_error!("Failed to send to output."))?;
104        }
105
106        // Send the item to the last sender without cloning.
107        let last_sender = &self.senders[cloned_sends];
108        last_sender
109            .send(item)
110            .await
111            .map_err(|_| generic_error!("Failed to send to output."))?;
112
113        let elapsed = start.elapsed();
114
115        // TODO: We should consider splitting this out per-sender somehow. We would need to carry around the
116        // destination component's ID, though, to properly associate it.
117        self.metrics.send_latency_seconds().record(elapsed);
118
119        let total_events_sent = (self.senders.len() * item_count) as u64;
120        self.metrics.events_sent_total().increment(total_events_sent);
121
122        Ok(())
123    }
124}
125
126/// A buffered dispatcher.
127///
128/// `BufferedDispatcher` provides an efficient and ergonomic interface to `Dispatcher` that allows for writing events
129/// one-by-one into batches, which are then dispatched to the configured output as needed. This allows callers to focus
130/// on the logic around what items to send, without needing to worry about the details of event buffer sizing or
131/// flushing.
132pub struct BufferedDispatcher<'a, T> {
133    metrics: &'a DispatcherMetrics,
134    flushed_len: usize,
135    buffer: Option<T>,
136    target: &'a DispatchTarget<T>,
137}
138
139impl<'a, T> BufferedDispatcher<'a, T> {
140    fn new(target: &'a DispatchTarget<T>) -> Self {
141        Self {
142            metrics: &target.metrics,
143            flushed_len: 0,
144            buffer: None,
145            target,
146        }
147    }
148}
149
150impl<T> BufferedDispatcher<'_, T>
151where
152    T: DispatchBuffer,
153{
154    async fn try_flush_buffer(&self, buffer: T) -> Result<(), GenericError> {
155        let buffer_len = buffer.len();
156        if buffer_len > 0 {
157            self.target.send(buffer).await
158        } else {
159            Ok(())
160        }
161    }
162
163    /// Pushes an item into the buffered dispatcher.
164    ///
165    /// # Errors
166    ///
167    /// If there is an error flushing items to the output, or if there is an error acquiring a new buffer, an error
168    /// is returned.
169    pub async fn push(&mut self, item: T::Item) -> Result<(), GenericError> {
170        // If our current buffer is full, flush it before acquiring a new one.
171        if let Some(old_buffer) = self.buffer.take_if(|b| b.is_full()) {
172            self.try_flush_buffer(old_buffer).await?;
173        }
174
175        // Add the item to our current buffer.
176        //
177        // If our current buffer is empty, create a new one first. If the current buffer is full, return an error
178        // because it should be impossible to get a new buffer that is full.
179        let buffer = self.buffer.get_or_insert_default();
180        if buffer.try_push(item).is_some() {
181            return Err(generic_error!("Dispatch buffer already full after acquisition."));
182        }
183
184        self.flushed_len += 1;
185
186        Ok(())
187    }
188
189    /// Consumes this buffered dispatcher and sends/flushes all input items to the underlying output.
190    ///
191    /// If flushing is successful, `Ok(flushed)` is returned, where `flushed` is the total number of items that
192    /// have been flushed through this buffered dispatcher.
193    ///
194    /// # Errors
195    ///
196    /// If there is an error sending items to the output, an error is returned.
197    pub async fn send_all<I>(mut self, items: I) -> Result<usize, GenericError>
198    where
199        I: IntoIterator<Item = T::Item>,
200    {
201        for item in items {
202            self.push(item).await?;
203        }
204
205        self.flush().await
206    }
207
208    /// Consumes this buffered dispatcher, flushing any buffered items to the underlying output.
209    ///
210    /// If flushing is successful, `Ok(flushed)` is returned, where `flushed` is the total number of items that have
211    /// been flushed through this buffered dispatcher.
212    ///
213    /// # Errors
214    ///
215    /// If there is an error sending items to the output, an error is returned.
216    pub async fn flush(mut self) -> Result<usize, GenericError> {
217        if let Some(old_buffer) = self.buffer.take() {
218            self.try_flush_buffer(old_buffer).await?;
219        }
220
221        // We increment the "events sent" metric here because we want to count the number of buffered items, vs doing it in
222        // `DispatchTarget::send` where all it knows is that it sent one item.
223        self.metrics.events_sent_total().increment(self.flushed_len as u64);
224
225        Ok(self.flushed_len)
226    }
227}
228
229/// Dispatches items from one component to another.
230///
231/// [`Dispatcher`] provides an ergonomic interface for sending items to a downstream component. It has support for
232/// multiple outputs (a default output, and additional "named" outputs) and provides telemetry around the number of
233/// dispatched items as well as the latency of sending them.
234pub struct Dispatcher<T>
235where
236    T: Dispatchable,
237{
238    context: ComponentContext,
239    default: Option<DispatchTarget<T>>,
240    targets: FastHashMap<Cow<'static, str>, DispatchTarget<T>>,
241}
242
243impl<T> Dispatcher<T>
244where
245    T: Dispatchable,
246{
247    /// Create a new `Dispatcher` for the given component context.
248    pub fn new(context: ComponentContext) -> Self {
249        Self {
250            context,
251            default: None,
252            targets: FastHashMap::default(),
253        }
254    }
255
256    /// Adds an output to the dispatcher.
257    ///
258    /// # Errors
259    ///
260    /// If the output already exists, an error is returned.
261    pub fn add_output(&mut self, output_name: OutputName) -> Result<(), GenericError> {
262        match output_name {
263            OutputName::Default => {
264                if self.default.is_some() {
265                    return Err(generic_error!("Default output already exists."));
266                }
267
268                self.default = Some(DispatchTarget::default_output(self.context.clone()));
269            }
270            OutputName::Given(name) => {
271                if self.targets.contains_key(&name) {
272                    return Err(generic_error!("Output '{}' already exists.", name));
273                }
274                let target = DispatchTarget::named_output(self.context.clone(), &name);
275                self.targets.insert(name, target);
276            }
277        }
278
279        Ok(())
280    }
281
282    /// Attaches a sender to the given output.
283    ///
284    /// # Errors
285    ///
286    /// If the output doesn't exist, an error is returned.
287    pub fn attach_sender_to_output(
288        &mut self, output_name: &OutputName, sender: mpsc::Sender<T>,
289    ) -> Result<(), GenericError> {
290        let target = match output_name {
291            OutputName::Default => self
292                .default
293                .as_mut()
294                .ok_or_else(|| generic_error!("No default output declared."))?,
295            OutputName::Given(name) => self
296                .targets
297                .get_mut(name)
298                .ok_or_else(|| generic_error!("Output '{}' does not exist.", name))?,
299        };
300        target.add_sender(sender);
301
302        Ok(())
303    }
304
305    fn get_default_output(&self) -> Result<&DispatchTarget<T>, GenericError> {
306        self.default
307            .as_ref()
308            .ok_or_else(|| generic_error!("No default output declared."))
309    }
310
311    fn get_named_output(&self, name: &str) -> Result<&DispatchTarget<T>, GenericError> {
312        self.targets
313            .get(name)
314            .ok_or_else(|| generic_error!("No output named '{}' declared.", name))
315    }
316
317    /// Returns `true` if the default output is connected to downstream components.
318    pub fn is_default_output_connected(&self) -> bool {
319        self.default.as_ref().is_some_and(|target| !target.senders.is_empty())
320    }
321
322    /// Returns `true` if the named output is connected to downstream components.
323    pub fn is_named_output_connected(&self, name: &str) -> bool {
324        self.targets.get(name).is_some_and(|target| !target.senders.is_empty())
325    }
326
327    /// Dispatches the given item to the default output.
328    ///
329    /// # Errors
330    ///
331    /// If the default output isn't set, or there is an error sending to the default output, an error is returned.
332    pub async fn dispatch(&self, item: T) -> Result<(), GenericError> {
333        self.dispatch_inner(None, item).await
334    }
335
336    /// Dispatches the given items to the given named output.
337    ///
338    /// # Errors
339    ///
340    /// If a output of the given name isn't set, or there is an error sending to the output, an error is returned.
341    pub async fn dispatch_named<N>(&self, output_name: N, item: T) -> Result<(), GenericError>
342    where
343        N: AsRef<str>,
344    {
345        self.dispatch_inner(Some(output_name.as_ref()), item).await
346    }
347
348    async fn dispatch_inner(&self, output_name: Option<&str>, item: T) -> Result<(), GenericError> {
349        let target = match output_name {
350            None => self.get_default_output()?,
351            Some(name) => self.get_named_output(name)?,
352        };
353
354        target.send(item).await?;
355
356        Ok(())
357    }
358}
359
360impl<T> Dispatcher<T>
361where
362    T: DispatchBuffer,
363{
364    /// Creates a buffered dispatcher for the default output.
365    ///
366    /// This should generally be used if the items being dispatched aren't already collected in a container, or exposed
367    /// via an iterable type. It allows for efficiently buffering items one-by-one before dispatching them to the
368    /// underlying output.
369    ///
370    /// # Errors
371    ///
372    /// If the default output hasn't been configured, an error will be returned.
373    pub fn buffered(&self) -> Result<BufferedDispatcher<'_, T>, GenericError> {
374        self.get_default_output().map(BufferedDispatcher::new)
375    }
376
377    /// Creates a buffered dispatcher for the given named output.
378    ///
379    /// This should generally be used if the items being dispatched aren't already collected in a container, or exposed
380    /// via an iterable type. It allows for efficiently buffering items one-by-one before dispatching them to the
381    /// underlying output.
382    ///
383    /// # Errors
384    ///
385    /// If the given named output hasn't been configured, an error will be returned.
386    pub fn buffered_named<N>(&self, output_name: N) -> Result<BufferedDispatcher<'_, T>, GenericError>
387    where
388        N: AsRef<str>,
389    {
390        self.get_named_output(output_name.as_ref()).map(BufferedDispatcher::new)
391    }
392
393    /// Dispatches a single item to the default output.
394    ///
395    /// # Errors
396    ///
397    /// If the default output isn't set, or there is an error sending to the default output, an error is returned.
398    pub async fn dispatch_one(&self, item: T::Item) -> Result<(), GenericError> {
399        self.dispatch_one_inner(None, item).await
400    }
401
402    /// Dispatches a single item to the given named output.
403    ///
404    /// # Errors
405    ///
406    /// If an output of the given name isn't set, or there is an error sending to the output, an error is returned.
407    pub async fn dispatch_one_named<N>(&self, output_name: N, item: T::Item) -> Result<(), GenericError>
408    where
409        N: AsRef<str>,
410    {
411        self.dispatch_one_inner(Some(output_name.as_ref()), item).await
412    }
413
414    async fn dispatch_one_inner(&self, output_name: Option<&str>, item: T::Item) -> Result<(), GenericError> {
415        let target = match output_name {
416            None => self.get_default_output()?,
417            Some(name) => self.get_named_output(name)?,
418        };
419
420        let mut buffer = T::default();
421        if buffer.try_push(item).is_some() {
422            return Err(generic_error!("Default-constructed buffer rejected a single item."));
423        }
424        target.send(buffer).await
425    }
426}
427
428#[cfg(test)]
429mod tests {
430    // TODO: Tests asserting we emit metrics, and the right metrics.
431
432    use std::ops::Deref;
433
434    use metrics::{Key, Label};
435    use metrics_util::{
436        debugging::{DebugValue, DebuggingRecorder, Snapshotter},
437        CompositeKey, MetricKind,
438    };
439    use ordered_float::OrderedFloat;
440
441    use super::*;
442
443    #[derive(Clone, Copy, Debug, Eq, PartialEq)]
444    struct SingleEvent<T>(T);
445
446    impl<T: Clone + Copy> Dispatchable for SingleEvent<T> {
447        fn item_count(&self) -> usize {
448            1
449        }
450    }
451
452    impl<T: Clone + Copy> From<T> for SingleEvent<T> {
453        fn from(value: T) -> Self {
454            Self(value)
455        }
456    }
457
458    #[derive(Clone, Debug, Eq, PartialEq)]
459    struct FixedUsizeVec<const N: usize> {
460        data: [usize; N],
461        len: usize,
462    }
463
464    impl<const N: usize> Default for FixedUsizeVec<N> {
465        fn default() -> Self {
466            Self { data: [0; N], len: 0 }
467        }
468    }
469
470    impl<const N: usize> Deref for FixedUsizeVec<N> {
471        type Target = [usize];
472
473        fn deref(&self) -> &Self::Target {
474            &self.data
475        }
476    }
477
478    impl<const N: usize> Dispatchable for FixedUsizeVec<N> {
479        fn item_count(&self) -> usize {
480            self.len
481        }
482    }
483
484    impl<const N: usize> DispatchBuffer for FixedUsizeVec<N> {
485        type Item = usize;
486
487        fn len(&self) -> usize {
488            self.len
489        }
490
491        fn is_full(&self) -> bool {
492            self.len == N
493        }
494
495        fn try_push(&mut self, item: Self::Item) -> Option<Self::Item> {
496            if self.is_full() {
497                Some(item)
498            } else {
499                self.data[self.len] = item;
500                self.len += 1;
501                None
502            }
503        }
504    }
505
506    fn unbuffered_dispatcher<T: Dispatchable>() -> Dispatcher<T> {
507        let component_context = ComponentContext::test_source("dispatcher_test");
508        Dispatcher::new(component_context)
509    }
510
511    fn buffered_dispatcher<T: DispatchBuffer>() -> Dispatcher<T> {
512        unbuffered_dispatcher()
513    }
514
515    fn add_dispatcher_default_output<T: Dispatchable, const N: usize>(
516        dispatcher: &mut Dispatcher<T>, senders: [mpsc::Sender<T>; N],
517    ) {
518        dispatcher
519            .add_output(OutputName::Default)
520            .expect("default output should not be added yet");
521        for sender in senders {
522            dispatcher
523                .attach_sender_to_output(&OutputName::Default, sender)
524                .expect("default output should be added");
525        }
526    }
527
528    fn add_dispatcher_named_output<T: Dispatchable, const N: usize>(
529        dispatcher: &mut Dispatcher<T>, output_name: &'static str, senders: [mpsc::Sender<T>; N],
530    ) {
531        dispatcher
532            .add_output(OutputName::Given(output_name.into()))
533            .expect("named output should not be added yet");
534        for sender in senders {
535            dispatcher
536                .attach_sender_to_output(&OutputName::Given(output_name.into()), sender)
537                .expect("named output should be added");
538        }
539    }
540
541    fn get_dispatcher_metric_ckey(
542        kind: MetricKind, name: &'static str, output_name: &'static str, tags: &[(&'static str, &'static str)],
543    ) -> CompositeKey {
544        let mut labels = vec![
545            Label::from_static_parts("component_id", "dispatcher_test"),
546            Label::from_static_parts("component_type", "source"),
547            Label::from_static_parts("output", output_name),
548        ];
549
550        for tag in tags {
551            labels.push(Label::from_static_parts(tag.0, tag.1));
552        }
553
554        let key = Key::from_parts(name, labels);
555        CompositeKey::new(kind, key)
556    }
557
558    fn get_output_metrics(snapshotter: &Snapshotter, output_name: &'static str) -> (u64, u64, Vec<OrderedFloat<f64>>) {
559        let events_sent_key = get_dispatcher_metric_ckey(
560            MetricKind::Counter,
561            DispatcherMetrics::events_sent_total_name(),
562            output_name,
563            &[],
564        );
565        let events_discarded_key = get_dispatcher_metric_ckey(
566            MetricKind::Counter,
567            DispatcherMetrics::events_discarded_total_name(),
568            output_name,
569            &[],
570        );
571        let send_latency_key = get_dispatcher_metric_ckey(
572            MetricKind::Histogram,
573            DispatcherMetrics::send_latency_seconds_name(),
574            output_name,
575            &[],
576        );
577
578        // TODO: This API for querying the metrics really sucks... and we need something better.
579        let current_metrics = snapshotter.snapshot().into_hashmap();
580        let (_, _, events_sent) = current_metrics
581            .get(&events_sent_key)
582            .expect("should have events sent metric");
583        let (_, _, events_discarded) = current_metrics
584            .get(&events_discarded_key)
585            .expect("should have events discarded metric");
586        let (_, _, send_latency) = current_metrics
587            .get(&send_latency_key)
588            .expect("should have send latency metric");
589
590        let events_sent = match events_sent {
591            DebugValue::Counter(value) => *value,
592            _ => panic!("unexpected metric type for events sent"),
593        };
594
595        let events_discarded = match events_discarded {
596            DebugValue::Counter(value) => *value,
597            _ => panic!("unexpected metric type for events discarded"),
598        };
599
600        let send_latency = match send_latency {
601            DebugValue::Histogram(value) => value.clone(),
602            _ => panic!("unexpected metric type for send latency"),
603        };
604
605        (events_sent, events_discarded, send_latency)
606    }
607
608    #[tokio::test]
609    async fn default_output() {
610        // Create the dispatcher and wire up a sender to the default output.
611        let mut dispatcher = unbuffered_dispatcher::<SingleEvent<usize>>();
612
613        let (tx, mut rx) = mpsc::channel(1);
614        add_dispatcher_default_output(&mut dispatcher, [tx]);
615
616        // Create an item and roundtrip it through the dispatcher.
617        let input_item = 42.into();
618
619        dispatcher.dispatch(input_item).await.unwrap();
620
621        let output_item = rx.try_recv().expect("input item should have been dispatched");
622        assert_eq!(output_item, input_item);
623    }
624
625    #[tokio::test]
626    async fn named_output() {
627        // Create the dispatcher and wire up a sender to a named output.
628        let mut dispatcher = unbuffered_dispatcher::<SingleEvent<usize>>();
629
630        let output_name = "special";
631        let (tx, mut rx) = mpsc::channel(1);
632        add_dispatcher_named_output(&mut dispatcher, output_name, [tx]);
633
634        // Create an item and roundtrip it through the dispatcher.
635        let input_item = 42.into();
636
637        dispatcher.dispatch_named(output_name, input_item).await.unwrap();
638
639        let output_item = rx.try_recv().expect("input item should have been dispatched");
640        assert_eq!(output_item, input_item);
641    }
642
643    #[tokio::test]
644    async fn default_output_multiple_senders() {
645        // Create the dispatcher and wire up two senders to the default output.
646        let mut dispatcher = unbuffered_dispatcher::<SingleEvent<usize>>();
647
648        let (tx1, mut rx1) = mpsc::channel(1);
649        let (tx2, mut rx2) = mpsc::channel(1);
650        add_dispatcher_default_output(&mut dispatcher, [tx1, tx2]);
651
652        // Create an item and roundtrip it through the dispatcher.
653        let input_item = 42.into();
654
655        dispatcher.dispatch(input_item).await.unwrap();
656
657        let output_item1 = rx1.try_recv().expect("input item should have been dispatched");
658        let output_item2 = rx2.try_recv().expect("input item should have been dispatched");
659        assert_eq!(output_item1, input_item);
660        assert_eq!(output_item2, input_item);
661    }
662
663    #[tokio::test]
664    async fn named_output_multiple_senders() {
665        // Create the dispatcher and wire up two senders to a named output.
666        let mut dispatcher = unbuffered_dispatcher::<SingleEvent<usize>>();
667
668        let output_name = "special";
669        let (tx1, mut rx1) = mpsc::channel(1);
670        let (tx2, mut rx2) = mpsc::channel(1);
671        add_dispatcher_named_output(&mut dispatcher, output_name, [tx1, tx2]);
672
673        // Create an item and roundtrip it through the dispatcher.
674        let input_item = 42.into();
675
676        dispatcher.dispatch_named(output_name, input_item).await.unwrap();
677
678        let output_item1 = rx1.try_recv().expect("input item should have been dispatched");
679        let output_item2 = rx2.try_recv().expect("input item should have been dispatched");
680        assert_eq!(output_item1, input_item);
681        assert_eq!(output_item2, input_item);
682    }
683
684    #[tokio::test]
685    async fn default_output_not_set() {
686        // Create the dispatcher and try to dispatch an item without setting up a default output.
687        let dispatcher = unbuffered_dispatcher::<SingleEvent<()>>();
688
689        let result = dispatcher.dispatch(().into()).await;
690        assert!(result.is_err());
691    }
692
693    #[tokio::test]
694    async fn named_output_not_set() {
695        // Create the dispatcher and try to dispatch an event without setting up a named output.
696        let dispatcher = unbuffered_dispatcher::<SingleEvent<()>>();
697
698        let result = dispatcher.dispatch_named("non_existent", ().into()).await;
699        assert!(result.is_err());
700    }
701
702    #[tokio::test]
703    async fn default_output_buffered_partial() {
704        // Create the dispatcher and wire up a sender to the default output, using a bufferable type.
705        let mut dispatcher = buffered_dispatcher::<FixedUsizeVec<4>>();
706
707        let (tx, mut rx) = mpsc::channel(1);
708        add_dispatcher_default_output(&mut dispatcher, [tx]);
709
710        // Create an item and roundtrip it through the dispatcher.
711        let input_item = 42;
712
713        let mut buffered = dispatcher.buffered().unwrap();
714        buffered.push(input_item).await.unwrap();
715
716        let flushed_len = buffered.flush().await.unwrap();
717        assert_eq!(flushed_len, 1);
718
719        let output_item = rx.try_recv().expect("input item should have been dispatched");
720        assert_eq!(output_item.len(), 1);
721        assert_eq!(output_item[0], input_item);
722    }
723
724    #[tokio::test]
725    async fn named_output_buffered_partial() {
726        // Create the dispatcher and wire up a sender to a named output, using a bufferable type.
727        let mut dispatcher = buffered_dispatcher::<FixedUsizeVec<4>>();
728
729        let output_name = "buffered_partial";
730        let (tx, mut rx) = mpsc::channel(1);
731        add_dispatcher_named_output(&mut dispatcher, output_name, [tx]);
732
733        // Create an item and roundtrip it through the dispatcher.
734        let input_item = 42;
735
736        let mut buffered = dispatcher.buffered_named(output_name).unwrap();
737        buffered.push(input_item).await.unwrap();
738
739        let flushed_len = buffered.flush().await.unwrap();
740        assert_eq!(flushed_len, 1);
741
742        let output_item = rx.try_recv().expect("input item should have been dispatched");
743        assert_eq!(output_item.len(), 1);
744        assert_eq!(output_item[0], input_item);
745    }
746
747    #[tokio::test]
748    async fn default_output_buffered_overflow() {
749        // Create the dispatcher and wire up a sender to the default output, using a bufferable type.
750        let mut dispatcher = buffered_dispatcher::<FixedUsizeVec<4>>();
751
752        let (tx, mut rx) = mpsc::channel(2);
753        add_dispatcher_default_output(&mut dispatcher, [tx]);
754
755        // Create multiple items and roundtrip them through the dispatcher.
756        //
757        // We explicitly create more items than a single buffer can hold to exercise full buffers
758        // being flushed during push.
759        let input_items: Vec<usize> = vec![1, 2, 3, 4, 5, 6];
760
761        let mut buffered = dispatcher.buffered().unwrap();
762
763        for item in &input_items {
764            buffered.push(*item).await.unwrap();
765        }
766
767        let flushed_len = buffered.flush().await.unwrap();
768        assert_eq!(flushed_len, input_items.len());
769
770        let output_item1 = rx.try_recv().expect("input item should have been dispatched");
771        assert_eq!(output_item1.len(), 4);
772        assert_eq!(output_item1[0..4], input_items[0..4]);
773
774        let output_item2 = rx.try_recv().expect("input item should have been dispatched");
775        assert_eq!(output_item2.len(), 2);
776        assert_eq!(output_item2[0..2], input_items[4..6]);
777    }
778
779    #[tokio::test]
780    async fn named_output_buffered_overflow() {
781        // Create the dispatcher and wire up a sender to a named output, using a bufferable type.
782        let mut dispatcher = buffered_dispatcher::<FixedUsizeVec<4>>();
783
784        let output_name = "buffered_overflow";
785        let (tx, mut rx) = mpsc::channel(2);
786        add_dispatcher_named_output(&mut dispatcher, output_name, [tx]);
787
788        // Create multiple items and roundtrip them through the dispatcher.
789        //
790        // We explicitly create more items than a single buffer can hold to exercise full buffers
791        // being flushed during push.
792        let input_items: Vec<usize> = vec![1, 2, 3, 4, 5, 6];
793
794        let mut buffered = dispatcher.buffered_named(output_name).unwrap();
795
796        for item in &input_items {
797            buffered.push(*item).await.unwrap();
798        }
799
800        let flushed_len = buffered.flush().await.unwrap();
801        assert_eq!(flushed_len, input_items.len());
802
803        let output_item1 = rx.try_recv().expect("input item should have been dispatched");
804        assert_eq!(output_item1.len(), 4);
805        assert_eq!(output_item1[0..4], input_items[0..4]);
806
807        let output_item2 = rx.try_recv().expect("input item should have been dispatched");
808        assert_eq!(output_item2.len(), 2);
809        assert_eq!(output_item2[0..2], input_items[4..6]);
810    }
811
812    #[tokio::test]
813    async fn default_output_buffered_partial_multiple_senders() {
814        // Create the dispatcher and wire up two senders to the default output, using a bufferable type.
815        let mut dispatcher = buffered_dispatcher::<FixedUsizeVec<4>>();
816
817        let (tx1, mut rx1) = mpsc::channel(1);
818        let (tx2, mut rx2) = mpsc::channel(1);
819        add_dispatcher_default_output(&mut dispatcher, [tx1, tx2]);
820
821        // Create an item and roundtrip it through the dispatcher.
822        let input_item = 42;
823
824        let mut buffered = dispatcher.buffered().unwrap();
825        buffered.push(input_item).await.unwrap();
826
827        let flushed_len = buffered.flush().await.unwrap();
828        assert_eq!(flushed_len, 1);
829
830        let output_item1 = rx1.try_recv().expect("input item should have been dispatched");
831        assert_eq!(output_item1.len(), 1);
832        assert_eq!(output_item1[0], input_item);
833
834        let output_item2 = rx2.try_recv().expect("input item should have been dispatched");
835        assert_eq!(output_item2.len(), 1);
836        assert_eq!(output_item2[0], input_item);
837    }
838
839    #[tokio::test]
840    async fn named_output_buffered_partial_multiple_senders() {
841        // Create the dispatcher and wire up two senders to a named output, using a bufferable type.
842        let mut dispatcher = buffered_dispatcher::<FixedUsizeVec<4>>();
843
844        let output_name = "buffered_partial";
845        let (tx1, mut rx1) = mpsc::channel(1);
846        let (tx2, mut rx2) = mpsc::channel(1);
847        add_dispatcher_named_output(&mut dispatcher, output_name, [tx1, tx2]);
848
849        // Create an item and roundtrip it through the dispatcher.
850        let input_item = 42;
851
852        let mut buffered = dispatcher.buffered_named(output_name).unwrap();
853        buffered.push(input_item).await.unwrap();
854
855        let flushed_len = buffered.flush().await.unwrap();
856        assert_eq!(flushed_len, 1);
857
858        let output_item1 = rx1.try_recv().expect("input item should have been dispatched");
859        assert_eq!(output_item1.len(), 1);
860        assert_eq!(output_item1[0], input_item);
861
862        let output_item2 = rx2.try_recv().expect("input item should have been dispatched");
863        assert_eq!(output_item2.len(), 1);
864        assert_eq!(output_item2[0], input_item);
865    }
866
867    #[tokio::test]
868    async fn default_output_no_senders() {
869        // Test that we can add a default output and dispatch to it even with no senders attached
870        let mut dispatcher = unbuffered_dispatcher::<SingleEvent<u32>>();
871
872        // Add default output but don't attach any senders
873        dispatcher
874            .add_output(OutputName::Default)
875            .expect("should be able to add default output");
876
877        // Should not panic when dispatching to output with no senders
878        let test_event = 42.into();
879        let result = dispatcher.dispatch(test_event).await;
880        assert!(
881            result.is_ok(),
882            "dispatch to default output with no senders should succeed"
883        );
884    }
885
886    #[tokio::test]
887    async fn named_output_no_senders() {
888        // Test that we can add a named output and dispatch to it even with no senders attached
889        let mut dispatcher = unbuffered_dispatcher::<SingleEvent<u32>>();
890
891        // Add named output but don't attach any senders
892        dispatcher
893            .add_output(OutputName::Given("errors".into()))
894            .expect("should be able to add named output");
895
896        // Should not panic when dispatching to output with no senders
897        let test_event = 42.into();
898        let result = dispatcher.dispatch_named("errors", test_event).await;
899        assert!(
900            result.is_ok(),
901            "dispatch to named output with no senders should succeed"
902        );
903    }
904
905    #[tokio::test]
906    async fn metrics_default_output_disconnected() {
907        let recorder = DebuggingRecorder::new();
908        let snapshotter = recorder.snapshotter();
909        let dispatcher = metrics::with_local_recorder(&recorder, || {
910            let mut dispatcher = buffered_dispatcher::<FixedUsizeVec<4>>();
911            dispatcher
912                .add_output(OutputName::Default)
913                .expect("should not fail to add default output");
914            dispatcher
915        });
916
917        // Send an item with an item count of 1, and make sure we can receive it, and that we update our metrics accordingly:
918        let mut single_item = FixedUsizeVec::<4>::default();
919        assert_eq!(None, single_item.try_push(42));
920        let single_item_item_count = single_item.item_count() as u64;
921
922        dispatcher
923            .dispatch(single_item.clone())
924            .await
925            .expect("should not fail to dispatch");
926
927        let (events_sent, events_discarded, send_latencies) = get_output_metrics(&snapshotter, "_default");
928        assert_eq!(events_sent, 0);
929        assert_eq!(events_discarded, single_item_item_count);
930        assert!(send_latencies.is_empty());
931
932        // Now send an item with an item count of 3, and make sure we can receive it, and that we update our metrics accordingly:
933        let mut multiple_items = FixedUsizeVec::<4>::default();
934        assert_eq!(None, multiple_items.try_push(42));
935        assert_eq!(None, multiple_items.try_push(12345));
936        assert_eq!(None, multiple_items.try_push(1337));
937        let multiple_items_item_count = multiple_items.item_count() as u64;
938
939        dispatcher
940            .dispatch(multiple_items.clone())
941            .await
942            .expect("should not fail to dispatch");
943
944        let (events_sent, events_discarded, send_latencies) = get_output_metrics(&snapshotter, "_default");
945        assert_eq!(events_sent, 0);
946        assert_eq!(events_discarded, multiple_items_item_count);
947        assert!(send_latencies.is_empty());
948    }
949
950    #[tokio::test]
951    async fn metrics_named_output_disconnected() {
952        let output_name = "some_output";
953
954        let recorder = DebuggingRecorder::new();
955        let snapshotter = recorder.snapshotter();
956        let dispatcher = metrics::with_local_recorder(&recorder, || {
957            let mut dispatcher = buffered_dispatcher::<FixedUsizeVec<4>>();
958            dispatcher
959                .add_output(OutputName::Given(output_name.into()))
960                .expect("should not fail to add named output");
961            dispatcher
962        });
963
964        // Send an item with an item count of 1, and make sure we can receive it, and that we update our metrics accordingly:
965        let mut single_item = FixedUsizeVec::<4>::default();
966        assert_eq!(None, single_item.try_push(42));
967        let single_item_item_count = single_item.item_count() as u64;
968
969        dispatcher
970            .dispatch_named(output_name, single_item.clone())
971            .await
972            .expect("should not fail to dispatch");
973
974        let (events_sent, events_discarded, send_latencies) = get_output_metrics(&snapshotter, output_name);
975        assert_eq!(events_sent, 0);
976        assert_eq!(events_discarded, single_item_item_count);
977        assert!(send_latencies.is_empty());
978
979        // Now send an item with an item count of 3, and make sure we can receive it, and that we update our metrics accordingly:
980        let mut multiple_items = FixedUsizeVec::<4>::default();
981        assert_eq!(None, multiple_items.try_push(42));
982        assert_eq!(None, multiple_items.try_push(12345));
983        assert_eq!(None, multiple_items.try_push(1337));
984        let multiple_items_item_count = multiple_items.item_count() as u64;
985
986        dispatcher
987            .dispatch_named(output_name, multiple_items.clone())
988            .await
989            .expect("should not fail to dispatch");
990
991        let (events_sent, events_discarded, send_latencies) = get_output_metrics(&snapshotter, output_name);
992        assert_eq!(events_sent, 0);
993        assert_eq!(events_discarded, multiple_items_item_count);
994        assert!(send_latencies.is_empty());
995    }
996
997    #[tokio::test]
998    async fn is_default_output_connected_behavior() {
999        let mut dispatcher = unbuffered_dispatcher::<SingleEvent<u32>>();
1000
1001        // Initially, no default output exists - should return false
1002        assert!(
1003            !dispatcher.is_default_output_connected(),
1004            "should return false when no default output exists"
1005        );
1006
1007        // Add default output but no senders - should return false
1008        dispatcher
1009            .add_output(OutputName::Default)
1010            .expect("should be able to add default output");
1011        assert!(
1012            !dispatcher.is_default_output_connected(),
1013            "should return false when default output exists but has no senders"
1014        );
1015
1016        // Add a sender to the default output - should return true
1017        let (tx, _rx) = mpsc::channel(1);
1018        dispatcher
1019            .attach_sender_to_output(&OutputName::Default, tx)
1020            .expect("should be able to attach sender");
1021        assert!(
1022            dispatcher.is_default_output_connected(),
1023            "should return true when default output has senders attached"
1024        );
1025    }
1026
1027    #[tokio::test]
1028    async fn is_named_output_connected_behavior() {
1029        let mut dispatcher = unbuffered_dispatcher::<SingleEvent<u32>>();
1030        let output_name = "test_output";
1031
1032        // Initially, no named output exists - should return false
1033        assert!(
1034            !dispatcher.is_named_output_connected(output_name),
1035            "should return false when named output doesn't exist"
1036        );
1037
1038        // Add named output but no senders - should return false
1039        dispatcher
1040            .add_output(OutputName::Given(output_name.into()))
1041            .expect("should be able to add named output");
1042        assert!(
1043            !dispatcher.is_named_output_connected(output_name),
1044            "should return false when named output exists but has no senders"
1045        );
1046
1047        // Add a sender to the named output - should return true
1048        let (tx, _rx) = mpsc::channel(1);
1049        dispatcher
1050            .attach_sender_to_output(&OutputName::Given(output_name.into()), tx)
1051            .expect("should be able to attach sender");
1052        assert!(
1053            dispatcher.is_named_output_connected(output_name),
1054            "should return true when named output has senders attached"
1055        );
1056
1057        // Test with a different output name that doesn't exist - should return false
1058        assert!(
1059            !dispatcher.is_named_output_connected("nonexistent_output"),
1060            "should return false for nonexistent output"
1061        );
1062    }
1063
1064    #[tokio::test]
1065    async fn default_output_dispatch_one() {
1066        // Dispatch a single item to the default output and confirm it arrives wrapped in a one-element buffer.
1067        let mut dispatcher = buffered_dispatcher::<FixedUsizeVec<4>>();
1068
1069        let (tx, mut rx) = mpsc::channel(1);
1070        add_dispatcher_default_output(&mut dispatcher, [tx]);
1071
1072        let input_item = 42;
1073
1074        dispatcher.dispatch_one(input_item).await.unwrap();
1075
1076        let output_item = rx.try_recv().expect("input item should have been dispatched");
1077        assert_eq!(output_item.len(), 1);
1078        assert_eq!(output_item[0], input_item);
1079    }
1080
1081    #[tokio::test]
1082    async fn named_output_dispatch_one() {
1083        // Same as above but for a named output.
1084        let mut dispatcher = buffered_dispatcher::<FixedUsizeVec<4>>();
1085
1086        let output_name = "single";
1087        let (tx, mut rx) = mpsc::channel(1);
1088        add_dispatcher_named_output(&mut dispatcher, output_name, [tx]);
1089
1090        let input_item = 42;
1091
1092        dispatcher.dispatch_one_named(output_name, input_item).await.unwrap();
1093
1094        let output_item = rx.try_recv().expect("input item should have been dispatched");
1095        assert_eq!(output_item.len(), 1);
1096        assert_eq!(output_item[0], input_item);
1097    }
1098
1099    #[tokio::test]
1100    async fn default_output_dispatch_one_not_set() {
1101        // dispatch_one without a default output configured should error.
1102        let dispatcher = buffered_dispatcher::<FixedUsizeVec<4>>();
1103
1104        let result = dispatcher.dispatch_one(42).await;
1105        assert!(result.is_err());
1106    }
1107
1108    #[tokio::test]
1109    async fn named_output_dispatch_one_not_set() {
1110        // dispatch_one_named on an unknown output should error.
1111        let dispatcher = buffered_dispatcher::<FixedUsizeVec<4>>();
1112
1113        let result = dispatcher.dispatch_one_named("nonexistent", 42).await;
1114        assert!(result.is_err());
1115    }
1116}