1use std::{borrow::Cow, time::Instant};
2
3use saluki_common::collections::FastHashMap;
4use saluki_error::{generic_error, GenericError};
5use saluki_metrics::static_metrics;
6use tokio::sync::mpsc;
7
8use super::Dispatchable;
9use crate::{components::ComponentContext, topology::OutputName};
10
11static_metrics!(
14 name => DispatcherMetrics,
15 prefix => component,
16 labels => [component_id: String, component_type: &'static str, output: String],
17 metrics => [
18 counter(events_sent_total),
19 trace_histogram(send_latency_seconds),
20 counter(events_discarded_total),
21 ],
22);
23
24impl DispatcherMetrics {
25 fn default_output(context: ComponentContext) -> Self {
26 Self::with_output_name(context, "_default")
27 }
28
29 fn named_output(context: ComponentContext, output_name: &str) -> Self {
30 Self::with_output_name(context, output_name)
31 }
32
33 fn with_output_name(context: ComponentContext, output_name: &str) -> Self {
34 Self::new(
35 context.component_id().to_string(),
36 context.component_type().as_str(),
37 output_name.to_string(),
38 )
39 }
40}
41
42pub trait DispatchBuffer: Dispatchable + Default {
44 type Item;
46
47 fn len(&self) -> usize;
49
50 fn is_full(&self) -> bool;
52
53 fn try_push(&mut self, item: Self::Item) -> Option<Self::Item>;
57}
58
59struct DispatchTarget<T> {
60 metrics: DispatcherMetrics,
61 senders: Vec<mpsc::Sender<T>>,
62}
63
64impl<T> DispatchTarget<T>
65where
66 T: Dispatchable,
67{
68 fn default_output(context: ComponentContext) -> Self {
69 Self {
70 metrics: DispatcherMetrics::default_output(context),
71 senders: Vec::new(),
72 }
73 }
74
75 fn named_output(context: ComponentContext, output_name: &str) -> Self {
76 Self {
77 metrics: DispatcherMetrics::named_output(context, output_name),
78 senders: Vec::new(),
79 }
80 }
81
82 fn add_sender(&mut self, sender: mpsc::Sender<T>) {
83 self.senders.push(sender);
84 }
85
86 async fn send(&self, item: T) -> Result<(), GenericError> {
87 if self.senders.is_empty() {
88 let item_count = item.item_count() as u64;
90 self.metrics.events_discarded_total().increment(item_count);
91 return Ok(());
92 }
93
94 let start = Instant::now();
95 let item_count = item.item_count();
96
97 let cloned_sends = self.senders.len() - 1;
99 for sender in &self.senders[0..cloned_sends] {
100 sender
101 .send(item.clone())
102 .await
103 .map_err(|_| generic_error!("Failed to send to output."))?;
104 }
105
106 let last_sender = &self.senders[cloned_sends];
108 last_sender
109 .send(item)
110 .await
111 .map_err(|_| generic_error!("Failed to send to output."))?;
112
113 let elapsed = start.elapsed();
114
115 self.metrics.send_latency_seconds().record(elapsed);
118
119 let total_events_sent = (self.senders.len() * item_count) as u64;
120 self.metrics.events_sent_total().increment(total_events_sent);
121
122 Ok(())
123 }
124}
125
126pub struct BufferedDispatcher<'a, T> {
133 metrics: &'a DispatcherMetrics,
134 flushed_len: usize,
135 buffer: Option<T>,
136 target: &'a DispatchTarget<T>,
137}
138
139impl<'a, T> BufferedDispatcher<'a, T> {
140 fn new(target: &'a DispatchTarget<T>) -> Self {
141 Self {
142 metrics: &target.metrics,
143 flushed_len: 0,
144 buffer: None,
145 target,
146 }
147 }
148}
149
150impl<T> BufferedDispatcher<'_, T>
151where
152 T: DispatchBuffer,
153{
154 async fn try_flush_buffer(&self, buffer: T) -> Result<(), GenericError> {
155 let buffer_len = buffer.len();
156 if buffer_len > 0 {
157 self.target.send(buffer).await
158 } else {
159 Ok(())
160 }
161 }
162
163 pub async fn push(&mut self, item: T::Item) -> Result<(), GenericError> {
170 if let Some(old_buffer) = self.buffer.take_if(|b| b.is_full()) {
172 self.try_flush_buffer(old_buffer).await?;
173 }
174
175 let buffer = self.buffer.get_or_insert_default();
180 if buffer.try_push(item).is_some() {
181 return Err(generic_error!("Dispatch buffer already full after acquisition."));
182 }
183
184 self.flushed_len += 1;
185
186 Ok(())
187 }
188
189 pub async fn send_all<I>(mut self, items: I) -> Result<usize, GenericError>
198 where
199 I: IntoIterator<Item = T::Item>,
200 {
201 for item in items {
202 self.push(item).await?;
203 }
204
205 self.flush().await
206 }
207
208 pub async fn flush(mut self) -> Result<usize, GenericError> {
217 if let Some(old_buffer) = self.buffer.take() {
218 self.try_flush_buffer(old_buffer).await?;
219 }
220
221 self.metrics.events_sent_total().increment(self.flushed_len as u64);
224
225 Ok(self.flushed_len)
226 }
227}
228
229pub struct Dispatcher<T>
235where
236 T: Dispatchable,
237{
238 context: ComponentContext,
239 default: Option<DispatchTarget<T>>,
240 targets: FastHashMap<Cow<'static, str>, DispatchTarget<T>>,
241}
242
243impl<T> Dispatcher<T>
244where
245 T: Dispatchable,
246{
247 pub fn new(context: ComponentContext) -> Self {
249 Self {
250 context,
251 default: None,
252 targets: FastHashMap::default(),
253 }
254 }
255
256 pub fn add_output(&mut self, output_name: OutputName) -> Result<(), GenericError> {
262 match output_name {
263 OutputName::Default => {
264 if self.default.is_some() {
265 return Err(generic_error!("Default output already exists."));
266 }
267
268 self.default = Some(DispatchTarget::default_output(self.context.clone()));
269 }
270 OutputName::Given(name) => {
271 if self.targets.contains_key(&name) {
272 return Err(generic_error!("Output '{}' already exists.", name));
273 }
274 let target = DispatchTarget::named_output(self.context.clone(), &name);
275 self.targets.insert(name, target);
276 }
277 }
278
279 Ok(())
280 }
281
282 pub fn attach_sender_to_output(
288 &mut self, output_name: &OutputName, sender: mpsc::Sender<T>,
289 ) -> Result<(), GenericError> {
290 let target = match output_name {
291 OutputName::Default => self
292 .default
293 .as_mut()
294 .ok_or_else(|| generic_error!("No default output declared."))?,
295 OutputName::Given(name) => self
296 .targets
297 .get_mut(name)
298 .ok_or_else(|| generic_error!("Output '{}' does not exist.", name))?,
299 };
300 target.add_sender(sender);
301
302 Ok(())
303 }
304
305 fn get_default_output(&self) -> Result<&DispatchTarget<T>, GenericError> {
306 self.default
307 .as_ref()
308 .ok_or_else(|| generic_error!("No default output declared."))
309 }
310
311 fn get_named_output(&self, name: &str) -> Result<&DispatchTarget<T>, GenericError> {
312 self.targets
313 .get(name)
314 .ok_or_else(|| generic_error!("No output named '{}' declared.", name))
315 }
316
317 pub fn is_default_output_connected(&self) -> bool {
319 self.default.as_ref().is_some_and(|target| !target.senders.is_empty())
320 }
321
322 pub fn is_named_output_connected(&self, name: &str) -> bool {
324 self.targets.get(name).is_some_and(|target| !target.senders.is_empty())
325 }
326
327 pub async fn dispatch(&self, item: T) -> Result<(), GenericError> {
333 self.dispatch_inner(None, item).await
334 }
335
336 pub async fn dispatch_named<N>(&self, output_name: N, item: T) -> Result<(), GenericError>
342 where
343 N: AsRef<str>,
344 {
345 self.dispatch_inner(Some(output_name.as_ref()), item).await
346 }
347
348 async fn dispatch_inner(&self, output_name: Option<&str>, item: T) -> Result<(), GenericError> {
349 let target = match output_name {
350 None => self.get_default_output()?,
351 Some(name) => self.get_named_output(name)?,
352 };
353
354 target.send(item).await?;
355
356 Ok(())
357 }
358}
359
360impl<T> Dispatcher<T>
361where
362 T: DispatchBuffer,
363{
364 pub fn buffered(&self) -> Result<BufferedDispatcher<'_, T>, GenericError> {
374 self.get_default_output().map(BufferedDispatcher::new)
375 }
376
377 pub fn buffered_named<N>(&self, output_name: N) -> Result<BufferedDispatcher<'_, T>, GenericError>
387 where
388 N: AsRef<str>,
389 {
390 self.get_named_output(output_name.as_ref()).map(BufferedDispatcher::new)
391 }
392
393 pub async fn dispatch_one(&self, item: T::Item) -> Result<(), GenericError> {
399 self.dispatch_one_inner(None, item).await
400 }
401
402 pub async fn dispatch_one_named<N>(&self, output_name: N, item: T::Item) -> Result<(), GenericError>
408 where
409 N: AsRef<str>,
410 {
411 self.dispatch_one_inner(Some(output_name.as_ref()), item).await
412 }
413
414 async fn dispatch_one_inner(&self, output_name: Option<&str>, item: T::Item) -> Result<(), GenericError> {
415 let target = match output_name {
416 None => self.get_default_output()?,
417 Some(name) => self.get_named_output(name)?,
418 };
419
420 let mut buffer = T::default();
421 if buffer.try_push(item).is_some() {
422 return Err(generic_error!("Default-constructed buffer rejected a single item."));
423 }
424 target.send(buffer).await
425 }
426}
427
428#[cfg(test)]
429mod tests {
430 use std::ops::Deref;
433
434 use metrics::{Key, Label};
435 use metrics_util::{
436 debugging::{DebugValue, DebuggingRecorder, Snapshotter},
437 CompositeKey, MetricKind,
438 };
439 use ordered_float::OrderedFloat;
440
441 use super::*;
442
443 #[derive(Clone, Copy, Debug, Eq, PartialEq)]
444 struct SingleEvent<T>(T);
445
446 impl<T: Clone + Copy> Dispatchable for SingleEvent<T> {
447 fn item_count(&self) -> usize {
448 1
449 }
450 }
451
452 impl<T: Clone + Copy> From<T> for SingleEvent<T> {
453 fn from(value: T) -> Self {
454 Self(value)
455 }
456 }
457
458 #[derive(Clone, Debug, Eq, PartialEq)]
459 struct FixedUsizeVec<const N: usize> {
460 data: [usize; N],
461 len: usize,
462 }
463
464 impl<const N: usize> Default for FixedUsizeVec<N> {
465 fn default() -> Self {
466 Self { data: [0; N], len: 0 }
467 }
468 }
469
470 impl<const N: usize> Deref for FixedUsizeVec<N> {
471 type Target = [usize];
472
473 fn deref(&self) -> &Self::Target {
474 &self.data
475 }
476 }
477
478 impl<const N: usize> Dispatchable for FixedUsizeVec<N> {
479 fn item_count(&self) -> usize {
480 self.len
481 }
482 }
483
484 impl<const N: usize> DispatchBuffer for FixedUsizeVec<N> {
485 type Item = usize;
486
487 fn len(&self) -> usize {
488 self.len
489 }
490
491 fn is_full(&self) -> bool {
492 self.len == N
493 }
494
495 fn try_push(&mut self, item: Self::Item) -> Option<Self::Item> {
496 if self.is_full() {
497 Some(item)
498 } else {
499 self.data[self.len] = item;
500 self.len += 1;
501 None
502 }
503 }
504 }
505
506 fn unbuffered_dispatcher<T: Dispatchable>() -> Dispatcher<T> {
507 let component_context = ComponentContext::test_source("dispatcher_test");
508 Dispatcher::new(component_context)
509 }
510
511 fn buffered_dispatcher<T: DispatchBuffer>() -> Dispatcher<T> {
512 unbuffered_dispatcher()
513 }
514
515 fn add_dispatcher_default_output<T: Dispatchable, const N: usize>(
516 dispatcher: &mut Dispatcher<T>, senders: [mpsc::Sender<T>; N],
517 ) {
518 dispatcher
519 .add_output(OutputName::Default)
520 .expect("default output should not be added yet");
521 for sender in senders {
522 dispatcher
523 .attach_sender_to_output(&OutputName::Default, sender)
524 .expect("default output should be added");
525 }
526 }
527
528 fn add_dispatcher_named_output<T: Dispatchable, const N: usize>(
529 dispatcher: &mut Dispatcher<T>, output_name: &'static str, senders: [mpsc::Sender<T>; N],
530 ) {
531 dispatcher
532 .add_output(OutputName::Given(output_name.into()))
533 .expect("named output should not be added yet");
534 for sender in senders {
535 dispatcher
536 .attach_sender_to_output(&OutputName::Given(output_name.into()), sender)
537 .expect("named output should be added");
538 }
539 }
540
541 fn get_dispatcher_metric_ckey(
542 kind: MetricKind, name: &'static str, output_name: &'static str, tags: &[(&'static str, &'static str)],
543 ) -> CompositeKey {
544 let mut labels = vec![
545 Label::from_static_parts("component_id", "dispatcher_test"),
546 Label::from_static_parts("component_type", "source"),
547 Label::from_static_parts("output", output_name),
548 ];
549
550 for tag in tags {
551 labels.push(Label::from_static_parts(tag.0, tag.1));
552 }
553
554 let key = Key::from_parts(name, labels);
555 CompositeKey::new(kind, key)
556 }
557
558 fn get_output_metrics(snapshotter: &Snapshotter, output_name: &'static str) -> (u64, u64, Vec<OrderedFloat<f64>>) {
559 let events_sent_key = get_dispatcher_metric_ckey(
560 MetricKind::Counter,
561 DispatcherMetrics::events_sent_total_name(),
562 output_name,
563 &[],
564 );
565 let events_discarded_key = get_dispatcher_metric_ckey(
566 MetricKind::Counter,
567 DispatcherMetrics::events_discarded_total_name(),
568 output_name,
569 &[],
570 );
571 let send_latency_key = get_dispatcher_metric_ckey(
572 MetricKind::Histogram,
573 DispatcherMetrics::send_latency_seconds_name(),
574 output_name,
575 &[],
576 );
577
578 let current_metrics = snapshotter.snapshot().into_hashmap();
580 let (_, _, events_sent) = current_metrics
581 .get(&events_sent_key)
582 .expect("should have events sent metric");
583 let (_, _, events_discarded) = current_metrics
584 .get(&events_discarded_key)
585 .expect("should have events discarded metric");
586 let (_, _, send_latency) = current_metrics
587 .get(&send_latency_key)
588 .expect("should have send latency metric");
589
590 let events_sent = match events_sent {
591 DebugValue::Counter(value) => *value,
592 _ => panic!("unexpected metric type for events sent"),
593 };
594
595 let events_discarded = match events_discarded {
596 DebugValue::Counter(value) => *value,
597 _ => panic!("unexpected metric type for events discarded"),
598 };
599
600 let send_latency = match send_latency {
601 DebugValue::Histogram(value) => value.clone(),
602 _ => panic!("unexpected metric type for send latency"),
603 };
604
605 (events_sent, events_discarded, send_latency)
606 }
607
608 #[tokio::test]
609 async fn default_output() {
610 let mut dispatcher = unbuffered_dispatcher::<SingleEvent<usize>>();
612
613 let (tx, mut rx) = mpsc::channel(1);
614 add_dispatcher_default_output(&mut dispatcher, [tx]);
615
616 let input_item = 42.into();
618
619 dispatcher.dispatch(input_item).await.unwrap();
620
621 let output_item = rx.try_recv().expect("input item should have been dispatched");
622 assert_eq!(output_item, input_item);
623 }
624
625 #[tokio::test]
626 async fn named_output() {
627 let mut dispatcher = unbuffered_dispatcher::<SingleEvent<usize>>();
629
630 let output_name = "special";
631 let (tx, mut rx) = mpsc::channel(1);
632 add_dispatcher_named_output(&mut dispatcher, output_name, [tx]);
633
634 let input_item = 42.into();
636
637 dispatcher.dispatch_named(output_name, input_item).await.unwrap();
638
639 let output_item = rx.try_recv().expect("input item should have been dispatched");
640 assert_eq!(output_item, input_item);
641 }
642
643 #[tokio::test]
644 async fn default_output_multiple_senders() {
645 let mut dispatcher = unbuffered_dispatcher::<SingleEvent<usize>>();
647
648 let (tx1, mut rx1) = mpsc::channel(1);
649 let (tx2, mut rx2) = mpsc::channel(1);
650 add_dispatcher_default_output(&mut dispatcher, [tx1, tx2]);
651
652 let input_item = 42.into();
654
655 dispatcher.dispatch(input_item).await.unwrap();
656
657 let output_item1 = rx1.try_recv().expect("input item should have been dispatched");
658 let output_item2 = rx2.try_recv().expect("input item should have been dispatched");
659 assert_eq!(output_item1, input_item);
660 assert_eq!(output_item2, input_item);
661 }
662
663 #[tokio::test]
664 async fn named_output_multiple_senders() {
665 let mut dispatcher = unbuffered_dispatcher::<SingleEvent<usize>>();
667
668 let output_name = "special";
669 let (tx1, mut rx1) = mpsc::channel(1);
670 let (tx2, mut rx2) = mpsc::channel(1);
671 add_dispatcher_named_output(&mut dispatcher, output_name, [tx1, tx2]);
672
673 let input_item = 42.into();
675
676 dispatcher.dispatch_named(output_name, input_item).await.unwrap();
677
678 let output_item1 = rx1.try_recv().expect("input item should have been dispatched");
679 let output_item2 = rx2.try_recv().expect("input item should have been dispatched");
680 assert_eq!(output_item1, input_item);
681 assert_eq!(output_item2, input_item);
682 }
683
684 #[tokio::test]
685 async fn default_output_not_set() {
686 let dispatcher = unbuffered_dispatcher::<SingleEvent<()>>();
688
689 let result = dispatcher.dispatch(().into()).await;
690 assert!(result.is_err());
691 }
692
693 #[tokio::test]
694 async fn named_output_not_set() {
695 let dispatcher = unbuffered_dispatcher::<SingleEvent<()>>();
697
698 let result = dispatcher.dispatch_named("non_existent", ().into()).await;
699 assert!(result.is_err());
700 }
701
702 #[tokio::test]
703 async fn default_output_buffered_partial() {
704 let mut dispatcher = buffered_dispatcher::<FixedUsizeVec<4>>();
706
707 let (tx, mut rx) = mpsc::channel(1);
708 add_dispatcher_default_output(&mut dispatcher, [tx]);
709
710 let input_item = 42;
712
713 let mut buffered = dispatcher.buffered().unwrap();
714 buffered.push(input_item).await.unwrap();
715
716 let flushed_len = buffered.flush().await.unwrap();
717 assert_eq!(flushed_len, 1);
718
719 let output_item = rx.try_recv().expect("input item should have been dispatched");
720 assert_eq!(output_item.len(), 1);
721 assert_eq!(output_item[0], input_item);
722 }
723
724 #[tokio::test]
725 async fn named_output_buffered_partial() {
726 let mut dispatcher = buffered_dispatcher::<FixedUsizeVec<4>>();
728
729 let output_name = "buffered_partial";
730 let (tx, mut rx) = mpsc::channel(1);
731 add_dispatcher_named_output(&mut dispatcher, output_name, [tx]);
732
733 let input_item = 42;
735
736 let mut buffered = dispatcher.buffered_named(output_name).unwrap();
737 buffered.push(input_item).await.unwrap();
738
739 let flushed_len = buffered.flush().await.unwrap();
740 assert_eq!(flushed_len, 1);
741
742 let output_item = rx.try_recv().expect("input item should have been dispatched");
743 assert_eq!(output_item.len(), 1);
744 assert_eq!(output_item[0], input_item);
745 }
746
747 #[tokio::test]
748 async fn default_output_buffered_overflow() {
749 let mut dispatcher = buffered_dispatcher::<FixedUsizeVec<4>>();
751
752 let (tx, mut rx) = mpsc::channel(2);
753 add_dispatcher_default_output(&mut dispatcher, [tx]);
754
755 let input_items: Vec<usize> = vec![1, 2, 3, 4, 5, 6];
760
761 let mut buffered = dispatcher.buffered().unwrap();
762
763 for item in &input_items {
764 buffered.push(*item).await.unwrap();
765 }
766
767 let flushed_len = buffered.flush().await.unwrap();
768 assert_eq!(flushed_len, input_items.len());
769
770 let output_item1 = rx.try_recv().expect("input item should have been dispatched");
771 assert_eq!(output_item1.len(), 4);
772 assert_eq!(output_item1[0..4], input_items[0..4]);
773
774 let output_item2 = rx.try_recv().expect("input item should have been dispatched");
775 assert_eq!(output_item2.len(), 2);
776 assert_eq!(output_item2[0..2], input_items[4..6]);
777 }
778
779 #[tokio::test]
780 async fn named_output_buffered_overflow() {
781 let mut dispatcher = buffered_dispatcher::<FixedUsizeVec<4>>();
783
784 let output_name = "buffered_overflow";
785 let (tx, mut rx) = mpsc::channel(2);
786 add_dispatcher_named_output(&mut dispatcher, output_name, [tx]);
787
788 let input_items: Vec<usize> = vec![1, 2, 3, 4, 5, 6];
793
794 let mut buffered = dispatcher.buffered_named(output_name).unwrap();
795
796 for item in &input_items {
797 buffered.push(*item).await.unwrap();
798 }
799
800 let flushed_len = buffered.flush().await.unwrap();
801 assert_eq!(flushed_len, input_items.len());
802
803 let output_item1 = rx.try_recv().expect("input item should have been dispatched");
804 assert_eq!(output_item1.len(), 4);
805 assert_eq!(output_item1[0..4], input_items[0..4]);
806
807 let output_item2 = rx.try_recv().expect("input item should have been dispatched");
808 assert_eq!(output_item2.len(), 2);
809 assert_eq!(output_item2[0..2], input_items[4..6]);
810 }
811
812 #[tokio::test]
813 async fn default_output_buffered_partial_multiple_senders() {
814 let mut dispatcher = buffered_dispatcher::<FixedUsizeVec<4>>();
816
817 let (tx1, mut rx1) = mpsc::channel(1);
818 let (tx2, mut rx2) = mpsc::channel(1);
819 add_dispatcher_default_output(&mut dispatcher, [tx1, tx2]);
820
821 let input_item = 42;
823
824 let mut buffered = dispatcher.buffered().unwrap();
825 buffered.push(input_item).await.unwrap();
826
827 let flushed_len = buffered.flush().await.unwrap();
828 assert_eq!(flushed_len, 1);
829
830 let output_item1 = rx1.try_recv().expect("input item should have been dispatched");
831 assert_eq!(output_item1.len(), 1);
832 assert_eq!(output_item1[0], input_item);
833
834 let output_item2 = rx2.try_recv().expect("input item should have been dispatched");
835 assert_eq!(output_item2.len(), 1);
836 assert_eq!(output_item2[0], input_item);
837 }
838
839 #[tokio::test]
840 async fn named_output_buffered_partial_multiple_senders() {
841 let mut dispatcher = buffered_dispatcher::<FixedUsizeVec<4>>();
843
844 let output_name = "buffered_partial";
845 let (tx1, mut rx1) = mpsc::channel(1);
846 let (tx2, mut rx2) = mpsc::channel(1);
847 add_dispatcher_named_output(&mut dispatcher, output_name, [tx1, tx2]);
848
849 let input_item = 42;
851
852 let mut buffered = dispatcher.buffered_named(output_name).unwrap();
853 buffered.push(input_item).await.unwrap();
854
855 let flushed_len = buffered.flush().await.unwrap();
856 assert_eq!(flushed_len, 1);
857
858 let output_item1 = rx1.try_recv().expect("input item should have been dispatched");
859 assert_eq!(output_item1.len(), 1);
860 assert_eq!(output_item1[0], input_item);
861
862 let output_item2 = rx2.try_recv().expect("input item should have been dispatched");
863 assert_eq!(output_item2.len(), 1);
864 assert_eq!(output_item2[0], input_item);
865 }
866
867 #[tokio::test]
868 async fn default_output_no_senders() {
869 let mut dispatcher = unbuffered_dispatcher::<SingleEvent<u32>>();
871
872 dispatcher
874 .add_output(OutputName::Default)
875 .expect("should be able to add default output");
876
877 let test_event = 42.into();
879 let result = dispatcher.dispatch(test_event).await;
880 assert!(
881 result.is_ok(),
882 "dispatch to default output with no senders should succeed"
883 );
884 }
885
886 #[tokio::test]
887 async fn named_output_no_senders() {
888 let mut dispatcher = unbuffered_dispatcher::<SingleEvent<u32>>();
890
891 dispatcher
893 .add_output(OutputName::Given("errors".into()))
894 .expect("should be able to add named output");
895
896 let test_event = 42.into();
898 let result = dispatcher.dispatch_named("errors", test_event).await;
899 assert!(
900 result.is_ok(),
901 "dispatch to named output with no senders should succeed"
902 );
903 }
904
905 #[tokio::test]
906 async fn metrics_default_output_disconnected() {
907 let recorder = DebuggingRecorder::new();
908 let snapshotter = recorder.snapshotter();
909 let dispatcher = metrics::with_local_recorder(&recorder, || {
910 let mut dispatcher = buffered_dispatcher::<FixedUsizeVec<4>>();
911 dispatcher
912 .add_output(OutputName::Default)
913 .expect("should not fail to add default output");
914 dispatcher
915 });
916
917 let mut single_item = FixedUsizeVec::<4>::default();
919 assert_eq!(None, single_item.try_push(42));
920 let single_item_item_count = single_item.item_count() as u64;
921
922 dispatcher
923 .dispatch(single_item.clone())
924 .await
925 .expect("should not fail to dispatch");
926
927 let (events_sent, events_discarded, send_latencies) = get_output_metrics(&snapshotter, "_default");
928 assert_eq!(events_sent, 0);
929 assert_eq!(events_discarded, single_item_item_count);
930 assert!(send_latencies.is_empty());
931
932 let mut multiple_items = FixedUsizeVec::<4>::default();
934 assert_eq!(None, multiple_items.try_push(42));
935 assert_eq!(None, multiple_items.try_push(12345));
936 assert_eq!(None, multiple_items.try_push(1337));
937 let multiple_items_item_count = multiple_items.item_count() as u64;
938
939 dispatcher
940 .dispatch(multiple_items.clone())
941 .await
942 .expect("should not fail to dispatch");
943
944 let (events_sent, events_discarded, send_latencies) = get_output_metrics(&snapshotter, "_default");
945 assert_eq!(events_sent, 0);
946 assert_eq!(events_discarded, multiple_items_item_count);
947 assert!(send_latencies.is_empty());
948 }
949
950 #[tokio::test]
951 async fn metrics_named_output_disconnected() {
952 let output_name = "some_output";
953
954 let recorder = DebuggingRecorder::new();
955 let snapshotter = recorder.snapshotter();
956 let dispatcher = metrics::with_local_recorder(&recorder, || {
957 let mut dispatcher = buffered_dispatcher::<FixedUsizeVec<4>>();
958 dispatcher
959 .add_output(OutputName::Given(output_name.into()))
960 .expect("should not fail to add named output");
961 dispatcher
962 });
963
964 let mut single_item = FixedUsizeVec::<4>::default();
966 assert_eq!(None, single_item.try_push(42));
967 let single_item_item_count = single_item.item_count() as u64;
968
969 dispatcher
970 .dispatch_named(output_name, single_item.clone())
971 .await
972 .expect("should not fail to dispatch");
973
974 let (events_sent, events_discarded, send_latencies) = get_output_metrics(&snapshotter, output_name);
975 assert_eq!(events_sent, 0);
976 assert_eq!(events_discarded, single_item_item_count);
977 assert!(send_latencies.is_empty());
978
979 let mut multiple_items = FixedUsizeVec::<4>::default();
981 assert_eq!(None, multiple_items.try_push(42));
982 assert_eq!(None, multiple_items.try_push(12345));
983 assert_eq!(None, multiple_items.try_push(1337));
984 let multiple_items_item_count = multiple_items.item_count() as u64;
985
986 dispatcher
987 .dispatch_named(output_name, multiple_items.clone())
988 .await
989 .expect("should not fail to dispatch");
990
991 let (events_sent, events_discarded, send_latencies) = get_output_metrics(&snapshotter, output_name);
992 assert_eq!(events_sent, 0);
993 assert_eq!(events_discarded, multiple_items_item_count);
994 assert!(send_latencies.is_empty());
995 }
996
997 #[tokio::test]
998 async fn is_default_output_connected_behavior() {
999 let mut dispatcher = unbuffered_dispatcher::<SingleEvent<u32>>();
1000
1001 assert!(
1003 !dispatcher.is_default_output_connected(),
1004 "should return false when no default output exists"
1005 );
1006
1007 dispatcher
1009 .add_output(OutputName::Default)
1010 .expect("should be able to add default output");
1011 assert!(
1012 !dispatcher.is_default_output_connected(),
1013 "should return false when default output exists but has no senders"
1014 );
1015
1016 let (tx, _rx) = mpsc::channel(1);
1018 dispatcher
1019 .attach_sender_to_output(&OutputName::Default, tx)
1020 .expect("should be able to attach sender");
1021 assert!(
1022 dispatcher.is_default_output_connected(),
1023 "should return true when default output has senders attached"
1024 );
1025 }
1026
1027 #[tokio::test]
1028 async fn is_named_output_connected_behavior() {
1029 let mut dispatcher = unbuffered_dispatcher::<SingleEvent<u32>>();
1030 let output_name = "test_output";
1031
1032 assert!(
1034 !dispatcher.is_named_output_connected(output_name),
1035 "should return false when named output doesn't exist"
1036 );
1037
1038 dispatcher
1040 .add_output(OutputName::Given(output_name.into()))
1041 .expect("should be able to add named output");
1042 assert!(
1043 !dispatcher.is_named_output_connected(output_name),
1044 "should return false when named output exists but has no senders"
1045 );
1046
1047 let (tx, _rx) = mpsc::channel(1);
1049 dispatcher
1050 .attach_sender_to_output(&OutputName::Given(output_name.into()), tx)
1051 .expect("should be able to attach sender");
1052 assert!(
1053 dispatcher.is_named_output_connected(output_name),
1054 "should return true when named output has senders attached"
1055 );
1056
1057 assert!(
1059 !dispatcher.is_named_output_connected("nonexistent_output"),
1060 "should return false for nonexistent output"
1061 );
1062 }
1063
1064 #[tokio::test]
1065 async fn default_output_dispatch_one() {
1066 let mut dispatcher = buffered_dispatcher::<FixedUsizeVec<4>>();
1068
1069 let (tx, mut rx) = mpsc::channel(1);
1070 add_dispatcher_default_output(&mut dispatcher, [tx]);
1071
1072 let input_item = 42;
1073
1074 dispatcher.dispatch_one(input_item).await.unwrap();
1075
1076 let output_item = rx.try_recv().expect("input item should have been dispatched");
1077 assert_eq!(output_item.len(), 1);
1078 assert_eq!(output_item[0], input_item);
1079 }
1080
1081 #[tokio::test]
1082 async fn named_output_dispatch_one() {
1083 let mut dispatcher = buffered_dispatcher::<FixedUsizeVec<4>>();
1085
1086 let output_name = "single";
1087 let (tx, mut rx) = mpsc::channel(1);
1088 add_dispatcher_named_output(&mut dispatcher, output_name, [tx]);
1089
1090 let input_item = 42;
1091
1092 dispatcher.dispatch_one_named(output_name, input_item).await.unwrap();
1093
1094 let output_item = rx.try_recv().expect("input item should have been dispatched");
1095 assert_eq!(output_item.len(), 1);
1096 assert_eq!(output_item[0], input_item);
1097 }
1098
1099 #[tokio::test]
1100 async fn default_output_dispatch_one_not_set() {
1101 let dispatcher = buffered_dispatcher::<FixedUsizeVec<4>>();
1103
1104 let result = dispatcher.dispatch_one(42).await;
1105 assert!(result.is_err());
1106 }
1107
1108 #[tokio::test]
1109 async fn named_output_dispatch_one_not_set() {
1110 let dispatcher = buffered_dispatcher::<FixedUsizeVec<4>>();
1112
1113 let result = dispatcher.dispatch_one_named("nonexistent", 42).await;
1114 assert!(result.is_err());
1115 }
1116}