1use std::{borrow::Cow, time::Instant};
2
3use saluki_common::collections::FastHashMap;
4use saluki_error::{generic_error, GenericError};
5use saluki_metrics::static_metrics;
6use tokio::sync::mpsc;
7
8use super::Dispatchable;
9use crate::{components::ComponentContext, topology::OutputName};
10
11static_metrics!(
14 name => DispatcherMetrics,
15 prefix => component,
16 labels => [component_id: String, component_type: &'static str, output: String],
17 metrics => [
18 counter(events_sent_total),
19 trace_histogram(send_latency_seconds),
20 counter(events_discarded_total),
21 ],
22);
23
24impl DispatcherMetrics {
25 fn default_output(context: ComponentContext) -> Self {
26 Self::with_output_name(context, "_default")
27 }
28
29 fn named_output(context: ComponentContext, output_name: &str) -> Self {
30 Self::with_output_name(context, output_name)
31 }
32
33 fn with_output_name(context: ComponentContext, output_name: &str) -> Self {
34 Self::new(
35 context.component_id().to_string(),
36 context.component_type().as_str(),
37 output_name.to_string(),
38 )
39 }
40}
41
42pub trait DispatchBuffer: Dispatchable + Default {
44 type Item;
46
47 fn len(&self) -> usize;
49
50 fn is_full(&self) -> bool;
52
53 fn try_push(&mut self, item: Self::Item) -> Option<Self::Item>;
57}
58
59struct DispatchTarget<T> {
60 metrics: DispatcherMetrics,
61 senders: Vec<mpsc::Sender<T>>,
62}
63
64impl<T> DispatchTarget<T>
65where
66 T: Dispatchable,
67{
68 fn default_output(context: ComponentContext) -> Self {
69 Self {
70 metrics: DispatcherMetrics::default_output(context),
71 senders: Vec::new(),
72 }
73 }
74
75 fn named_output(context: ComponentContext, output_name: &str) -> Self {
76 Self {
77 metrics: DispatcherMetrics::named_output(context, output_name),
78 senders: Vec::new(),
79 }
80 }
81
82 fn add_sender(&mut self, sender: mpsc::Sender<T>) {
83 self.senders.push(sender);
84 }
85
86 async fn send(&self, item: T) -> Result<(), GenericError> {
87 if self.senders.is_empty() {
88 let item_count = item.item_count() as u64;
90 self.metrics.events_discarded_total().increment(item_count);
91 return Ok(());
92 }
93
94 let start = Instant::now();
95 let item_count = item.item_count();
96
97 let cloned_sends = self.senders.len() - 1;
99 for sender in &self.senders[0..cloned_sends] {
100 sender
101 .send(item.clone())
102 .await
103 .map_err(|_| generic_error!("Failed to send to output."))?;
104 }
105
106 let last_sender = &self.senders[cloned_sends];
108 last_sender
109 .send(item)
110 .await
111 .map_err(|_| generic_error!("Failed to send to output."))?;
112
113 let elapsed = start.elapsed();
114
115 self.metrics.send_latency_seconds().record(elapsed);
118
119 let total_events_sent = (self.senders.len() * item_count) as u64;
120 self.metrics.events_sent_total().increment(total_events_sent);
121
122 Ok(())
123 }
124}
125
126pub struct BufferedDispatcher<'a, T> {
133 metrics: &'a DispatcherMetrics,
134 flushed_len: usize,
135 buffer: Option<T>,
136 target: &'a DispatchTarget<T>,
137}
138
139impl<'a, T> BufferedDispatcher<'a, T> {
140 fn new(target: &'a DispatchTarget<T>) -> Self {
141 Self {
142 metrics: &target.metrics,
143 flushed_len: 0,
144 buffer: None,
145 target,
146 }
147 }
148}
149
150impl<T> BufferedDispatcher<'_, T>
151where
152 T: DispatchBuffer,
153{
154 async fn try_flush_buffer(&self, buffer: T) -> Result<(), GenericError> {
155 let buffer_len = buffer.len();
156 if buffer_len > 0 {
157 self.target.send(buffer).await
158 } else {
159 Ok(())
160 }
161 }
162
163 pub async fn push(&mut self, item: T::Item) -> Result<(), GenericError> {
170 if let Some(old_buffer) = self.buffer.take_if(|b| b.is_full()) {
172 self.try_flush_buffer(old_buffer).await?;
173 }
174
175 let buffer = self.buffer.get_or_insert_default();
180 if buffer.try_push(item).is_some() {
181 return Err(generic_error!("Dispatch buffer already full after acquisition."));
182 }
183
184 self.flushed_len += 1;
185
186 Ok(())
187 }
188
189 pub async fn send_all<I>(mut self, items: I) -> Result<usize, GenericError>
198 where
199 I: IntoIterator<Item = T::Item>,
200 {
201 for item in items {
202 self.push(item).await?;
203 }
204
205 self.flush().await
206 }
207
208 pub async fn flush(mut self) -> Result<usize, GenericError> {
217 if let Some(old_buffer) = self.buffer.take() {
218 self.try_flush_buffer(old_buffer).await?;
219 }
220
221 self.metrics.events_sent_total().increment(self.flushed_len as u64);
224
225 Ok(self.flushed_len)
226 }
227}
228
229pub struct Dispatcher<T>
235where
236 T: Dispatchable,
237{
238 context: ComponentContext,
239 default: Option<DispatchTarget<T>>,
240 targets: FastHashMap<Cow<'static, str>, DispatchTarget<T>>,
241}
242
243impl<T> Dispatcher<T>
244where
245 T: Dispatchable,
246{
247 pub fn new(context: ComponentContext) -> Self {
249 Self {
250 context,
251 default: None,
252 targets: FastHashMap::default(),
253 }
254 }
255
256 pub fn add_output(&mut self, output_name: OutputName) -> Result<(), GenericError> {
262 match output_name {
263 OutputName::Default => {
264 if self.default.is_some() {
265 return Err(generic_error!("Default output already exists."));
266 }
267
268 self.default = Some(DispatchTarget::default_output(self.context.clone()));
269 }
270 OutputName::Given(name) => {
271 if self.targets.contains_key(&name) {
272 return Err(generic_error!("Output '{}' already exists.", name));
273 }
274 let target = DispatchTarget::named_output(self.context.clone(), &name);
275 self.targets.insert(name, target);
276 }
277 }
278
279 Ok(())
280 }
281
282 pub fn attach_sender_to_output(
288 &mut self, output_name: &OutputName, sender: mpsc::Sender<T>,
289 ) -> Result<(), GenericError> {
290 let target = match output_name {
291 OutputName::Default => self
292 .default
293 .as_mut()
294 .ok_or_else(|| generic_error!("No default output declared."))?,
295 OutputName::Given(name) => self
296 .targets
297 .get_mut(name)
298 .ok_or_else(|| generic_error!("Output '{}' does not exist.", name))?,
299 };
300 target.add_sender(sender);
301
302 Ok(())
303 }
304
305 fn get_default_output(&self) -> Result<&DispatchTarget<T>, GenericError> {
306 self.default
307 .as_ref()
308 .ok_or_else(|| generic_error!("No default output declared."))
309 }
310
311 fn get_named_output(&self, name: &str) -> Result<&DispatchTarget<T>, GenericError> {
312 self.targets
313 .get(name)
314 .ok_or_else(|| generic_error!("No output named '{}' declared.", name))
315 }
316
317 pub fn is_default_output_connected(&self) -> bool {
319 self.default.as_ref().is_some_and(|target| !target.senders.is_empty())
320 }
321
322 pub fn is_named_output_connected(&self, name: &str) -> bool {
324 self.targets.get(name).is_some_and(|target| !target.senders.is_empty())
325 }
326
327 pub async fn dispatch(&self, item: T) -> Result<(), GenericError> {
333 self.dispatch_inner(None, item).await
334 }
335
336 pub async fn dispatch_named<N>(&self, output_name: N, item: T) -> Result<(), GenericError>
342 where
343 N: AsRef<str>,
344 {
345 self.dispatch_inner(Some(output_name.as_ref()), item).await
346 }
347
348 async fn dispatch_inner(&self, output_name: Option<&str>, item: T) -> Result<(), GenericError> {
349 let target = match output_name {
350 None => self.get_default_output()?,
351 Some(name) => self.get_named_output(name)?,
352 };
353
354 target.send(item).await?;
355
356 Ok(())
357 }
358}
359
360impl<T> Dispatcher<T>
361where
362 T: DispatchBuffer,
363{
364 pub fn buffered(&self) -> Result<BufferedDispatcher<'_, T>, GenericError> {
374 self.get_default_output().map(BufferedDispatcher::new)
375 }
376
377 pub fn buffered_named<N>(&self, output_name: N) -> Result<BufferedDispatcher<'_, T>, GenericError>
387 where
388 N: AsRef<str>,
389 {
390 self.get_named_output(output_name.as_ref()).map(BufferedDispatcher::new)
391 }
392}
393
394#[cfg(test)]
395mod tests {
396 use std::ops::Deref;
399
400 use metrics::{Key, Label};
401 use metrics_util::{
402 debugging::{DebugValue, DebuggingRecorder, Snapshotter},
403 CompositeKey, MetricKind,
404 };
405 use ordered_float::OrderedFloat;
406
407 use super::*;
408
409 #[derive(Clone, Copy, Debug, Eq, PartialEq)]
410 struct SingleEvent<T>(T);
411
412 impl<T: Clone + Copy> Dispatchable for SingleEvent<T> {
413 fn item_count(&self) -> usize {
414 1
415 }
416 }
417
418 impl<T: Clone + Copy> From<T> for SingleEvent<T> {
419 fn from(value: T) -> Self {
420 Self(value)
421 }
422 }
423
424 #[derive(Clone, Debug, Eq, PartialEq)]
425 struct FixedUsizeVec<const N: usize> {
426 data: [usize; N],
427 len: usize,
428 }
429
430 impl<const N: usize> Default for FixedUsizeVec<N> {
431 fn default() -> Self {
432 Self { data: [0; N], len: 0 }
433 }
434 }
435
436 impl<const N: usize> Deref for FixedUsizeVec<N> {
437 type Target = [usize];
438
439 fn deref(&self) -> &Self::Target {
440 &self.data
441 }
442 }
443
444 impl<const N: usize> Dispatchable for FixedUsizeVec<N> {
445 fn item_count(&self) -> usize {
446 self.len
447 }
448 }
449
450 impl<const N: usize> DispatchBuffer for FixedUsizeVec<N> {
451 type Item = usize;
452
453 fn len(&self) -> usize {
454 self.len
455 }
456
457 fn is_full(&self) -> bool {
458 self.len == N
459 }
460
461 fn try_push(&mut self, item: Self::Item) -> Option<Self::Item> {
462 if self.is_full() {
463 Some(item)
464 } else {
465 self.data[self.len] = item;
466 self.len += 1;
467 None
468 }
469 }
470 }
471
472 fn unbuffered_dispatcher<T: Dispatchable>() -> Dispatcher<T> {
473 let component_context = ComponentContext::test_source("dispatcher_test");
474 Dispatcher::new(component_context)
475 }
476
477 fn buffered_dispatcher<T: DispatchBuffer>() -> Dispatcher<T> {
478 unbuffered_dispatcher()
479 }
480
481 fn add_dispatcher_default_output<T: Dispatchable, const N: usize>(
482 dispatcher: &mut Dispatcher<T>, senders: [mpsc::Sender<T>; N],
483 ) {
484 dispatcher
485 .add_output(OutputName::Default)
486 .expect("default output should not be added yet");
487 for sender in senders {
488 dispatcher
489 .attach_sender_to_output(&OutputName::Default, sender)
490 .expect("default output should be added");
491 }
492 }
493
494 fn add_dispatcher_named_output<T: Dispatchable, const N: usize>(
495 dispatcher: &mut Dispatcher<T>, output_name: &'static str, senders: [mpsc::Sender<T>; N],
496 ) {
497 dispatcher
498 .add_output(OutputName::Given(output_name.into()))
499 .expect("named output should not be added yet");
500 for sender in senders {
501 dispatcher
502 .attach_sender_to_output(&OutputName::Given(output_name.into()), sender)
503 .expect("named output should be added");
504 }
505 }
506
507 fn get_dispatcher_metric_ckey(
508 kind: MetricKind, name: &'static str, output_name: &'static str, tags: &[(&'static str, &'static str)],
509 ) -> CompositeKey {
510 let mut labels = vec![
511 Label::from_static_parts("component_id", "dispatcher_test"),
512 Label::from_static_parts("component_type", "source"),
513 Label::from_static_parts("output", output_name),
514 ];
515
516 for tag in tags {
517 labels.push(Label::from_static_parts(tag.0, tag.1));
518 }
519
520 let key = Key::from_parts(name, labels);
521 CompositeKey::new(kind, key)
522 }
523
524 fn get_output_metrics(snapshotter: &Snapshotter, output_name: &'static str) -> (u64, u64, Vec<OrderedFloat<f64>>) {
525 let events_sent_key = get_dispatcher_metric_ckey(
526 MetricKind::Counter,
527 DispatcherMetrics::events_sent_total_name(),
528 output_name,
529 &[],
530 );
531 let events_discarded_key = get_dispatcher_metric_ckey(
532 MetricKind::Counter,
533 DispatcherMetrics::events_discarded_total_name(),
534 output_name,
535 &[],
536 );
537 let send_latency_key = get_dispatcher_metric_ckey(
538 MetricKind::Histogram,
539 DispatcherMetrics::send_latency_seconds_name(),
540 output_name,
541 &[],
542 );
543
544 let current_metrics = snapshotter.snapshot().into_hashmap();
546 let (_, _, events_sent) = current_metrics
547 .get(&events_sent_key)
548 .expect("should have events sent metric");
549 let (_, _, events_discarded) = current_metrics
550 .get(&events_discarded_key)
551 .expect("should have events discarded metric");
552 let (_, _, send_latency) = current_metrics
553 .get(&send_latency_key)
554 .expect("should have send latency metric");
555
556 let events_sent = match events_sent {
557 DebugValue::Counter(value) => *value,
558 _ => panic!("unexpected metric type for events sent"),
559 };
560
561 let events_discarded = match events_discarded {
562 DebugValue::Counter(value) => *value,
563 _ => panic!("unexpected metric type for events discarded"),
564 };
565
566 let send_latency = match send_latency {
567 DebugValue::Histogram(value) => value.clone(),
568 _ => panic!("unexpected metric type for send latency"),
569 };
570
571 (events_sent, events_discarded, send_latency)
572 }
573
574 #[tokio::test]
575 async fn default_output() {
576 let mut dispatcher = unbuffered_dispatcher::<SingleEvent<usize>>();
578
579 let (tx, mut rx) = mpsc::channel(1);
580 add_dispatcher_default_output(&mut dispatcher, [tx]);
581
582 let input_item = 42.into();
584
585 dispatcher.dispatch(input_item).await.unwrap();
586
587 let output_item = rx.try_recv().expect("input item should have been dispatched");
588 assert_eq!(output_item, input_item);
589 }
590
591 #[tokio::test]
592 async fn named_output() {
593 let mut dispatcher = unbuffered_dispatcher::<SingleEvent<usize>>();
595
596 let output_name = "special";
597 let (tx, mut rx) = mpsc::channel(1);
598 add_dispatcher_named_output(&mut dispatcher, output_name, [tx]);
599
600 let input_item = 42.into();
602
603 dispatcher.dispatch_named(output_name, input_item).await.unwrap();
604
605 let output_item = rx.try_recv().expect("input item should have been dispatched");
606 assert_eq!(output_item, input_item);
607 }
608
609 #[tokio::test]
610 async fn default_output_multiple_senders() {
611 let mut dispatcher = unbuffered_dispatcher::<SingleEvent<usize>>();
613
614 let (tx1, mut rx1) = mpsc::channel(1);
615 let (tx2, mut rx2) = mpsc::channel(1);
616 add_dispatcher_default_output(&mut dispatcher, [tx1, tx2]);
617
618 let input_item = 42.into();
620
621 dispatcher.dispatch(input_item).await.unwrap();
622
623 let output_item1 = rx1.try_recv().expect("input item should have been dispatched");
624 let output_item2 = rx2.try_recv().expect("input item should have been dispatched");
625 assert_eq!(output_item1, input_item);
626 assert_eq!(output_item2, input_item);
627 }
628
629 #[tokio::test]
630 async fn named_output_multiple_senders() {
631 let mut dispatcher = unbuffered_dispatcher::<SingleEvent<usize>>();
633
634 let output_name = "special";
635 let (tx1, mut rx1) = mpsc::channel(1);
636 let (tx2, mut rx2) = mpsc::channel(1);
637 add_dispatcher_named_output(&mut dispatcher, output_name, [tx1, tx2]);
638
639 let input_item = 42.into();
641
642 dispatcher.dispatch_named(output_name, input_item).await.unwrap();
643
644 let output_item1 = rx1.try_recv().expect("input item should have been dispatched");
645 let output_item2 = rx2.try_recv().expect("input item should have been dispatched");
646 assert_eq!(output_item1, input_item);
647 assert_eq!(output_item2, input_item);
648 }
649
650 #[tokio::test]
651 async fn default_output_not_set() {
652 let dispatcher = unbuffered_dispatcher::<SingleEvent<()>>();
654
655 let result = dispatcher.dispatch(().into()).await;
656 assert!(result.is_err());
657 }
658
659 #[tokio::test]
660 async fn named_output_not_set() {
661 let dispatcher = unbuffered_dispatcher::<SingleEvent<()>>();
663
664 let result = dispatcher.dispatch_named("non_existent", ().into()).await;
665 assert!(result.is_err());
666 }
667
668 #[tokio::test]
669 async fn default_output_buffered_partial() {
670 let mut dispatcher = buffered_dispatcher::<FixedUsizeVec<4>>();
672
673 let (tx, mut rx) = mpsc::channel(1);
674 add_dispatcher_default_output(&mut dispatcher, [tx]);
675
676 let input_item = 42;
678
679 let mut buffered = dispatcher.buffered().unwrap();
680 buffered.push(input_item).await.unwrap();
681
682 let flushed_len = buffered.flush().await.unwrap();
683 assert_eq!(flushed_len, 1);
684
685 let output_item = rx.try_recv().expect("input item should have been dispatched");
686 assert_eq!(output_item.len(), 1);
687 assert_eq!(output_item[0], input_item);
688 }
689
690 #[tokio::test]
691 async fn named_output_buffered_partial() {
692 let mut dispatcher = buffered_dispatcher::<FixedUsizeVec<4>>();
694
695 let output_name = "buffered_partial";
696 let (tx, mut rx) = mpsc::channel(1);
697 add_dispatcher_named_output(&mut dispatcher, output_name, [tx]);
698
699 let input_item = 42;
701
702 let mut buffered = dispatcher.buffered_named(output_name).unwrap();
703 buffered.push(input_item).await.unwrap();
704
705 let flushed_len = buffered.flush().await.unwrap();
706 assert_eq!(flushed_len, 1);
707
708 let output_item = rx.try_recv().expect("input item should have been dispatched");
709 assert_eq!(output_item.len(), 1);
710 assert_eq!(output_item[0], input_item);
711 }
712
713 #[tokio::test]
714 async fn default_output_buffered_overflow() {
715 let mut dispatcher = buffered_dispatcher::<FixedUsizeVec<4>>();
717
718 let (tx, mut rx) = mpsc::channel(2);
719 add_dispatcher_default_output(&mut dispatcher, [tx]);
720
721 let input_items: Vec<usize> = vec![1, 2, 3, 4, 5, 6];
726
727 let mut buffered = dispatcher.buffered().unwrap();
728
729 for item in &input_items {
730 buffered.push(*item).await.unwrap();
731 }
732
733 let flushed_len = buffered.flush().await.unwrap();
734 assert_eq!(flushed_len, input_items.len());
735
736 let output_item1 = rx.try_recv().expect("input item should have been dispatched");
737 assert_eq!(output_item1.len(), 4);
738 assert_eq!(output_item1[0..4], input_items[0..4]);
739
740 let output_item2 = rx.try_recv().expect("input item should have been dispatched");
741 assert_eq!(output_item2.len(), 2);
742 assert_eq!(output_item2[0..2], input_items[4..6]);
743 }
744
745 #[tokio::test]
746 async fn named_output_buffered_overflow() {
747 let mut dispatcher = buffered_dispatcher::<FixedUsizeVec<4>>();
749
750 let output_name = "buffered_overflow";
751 let (tx, mut rx) = mpsc::channel(2);
752 add_dispatcher_named_output(&mut dispatcher, output_name, [tx]);
753
754 let input_items: Vec<usize> = vec![1, 2, 3, 4, 5, 6];
759
760 let mut buffered = dispatcher.buffered_named(output_name).unwrap();
761
762 for item in &input_items {
763 buffered.push(*item).await.unwrap();
764 }
765
766 let flushed_len = buffered.flush().await.unwrap();
767 assert_eq!(flushed_len, input_items.len());
768
769 let output_item1 = rx.try_recv().expect("input item should have been dispatched");
770 assert_eq!(output_item1.len(), 4);
771 assert_eq!(output_item1[0..4], input_items[0..4]);
772
773 let output_item2 = rx.try_recv().expect("input item should have been dispatched");
774 assert_eq!(output_item2.len(), 2);
775 assert_eq!(output_item2[0..2], input_items[4..6]);
776 }
777
778 #[tokio::test]
779 async fn default_output_buffered_partial_multiple_senders() {
780 let mut dispatcher = buffered_dispatcher::<FixedUsizeVec<4>>();
782
783 let (tx1, mut rx1) = mpsc::channel(1);
784 let (tx2, mut rx2) = mpsc::channel(1);
785 add_dispatcher_default_output(&mut dispatcher, [tx1, tx2]);
786
787 let input_item = 42;
789
790 let mut buffered = dispatcher.buffered().unwrap();
791 buffered.push(input_item).await.unwrap();
792
793 let flushed_len = buffered.flush().await.unwrap();
794 assert_eq!(flushed_len, 1);
795
796 let output_item1 = rx1.try_recv().expect("input item should have been dispatched");
797 assert_eq!(output_item1.len(), 1);
798 assert_eq!(output_item1[0], input_item);
799
800 let output_item2 = rx2.try_recv().expect("input item should have been dispatched");
801 assert_eq!(output_item2.len(), 1);
802 assert_eq!(output_item2[0], input_item);
803 }
804
805 #[tokio::test]
806 async fn named_output_buffered_partial_multiple_senders() {
807 let mut dispatcher = buffered_dispatcher::<FixedUsizeVec<4>>();
809
810 let output_name = "buffered_partial";
811 let (tx1, mut rx1) = mpsc::channel(1);
812 let (tx2, mut rx2) = mpsc::channel(1);
813 add_dispatcher_named_output(&mut dispatcher, output_name, [tx1, tx2]);
814
815 let input_item = 42;
817
818 let mut buffered = dispatcher.buffered_named(output_name).unwrap();
819 buffered.push(input_item).await.unwrap();
820
821 let flushed_len = buffered.flush().await.unwrap();
822 assert_eq!(flushed_len, 1);
823
824 let output_item1 = rx1.try_recv().expect("input item should have been dispatched");
825 assert_eq!(output_item1.len(), 1);
826 assert_eq!(output_item1[0], input_item);
827
828 let output_item2 = rx2.try_recv().expect("input item should have been dispatched");
829 assert_eq!(output_item2.len(), 1);
830 assert_eq!(output_item2[0], input_item);
831 }
832
833 #[tokio::test]
834 async fn default_output_no_senders() {
835 let mut dispatcher = unbuffered_dispatcher::<SingleEvent<u32>>();
837
838 dispatcher
840 .add_output(OutputName::Default)
841 .expect("should be able to add default output");
842
843 let test_event = 42.into();
845 let result = dispatcher.dispatch(test_event).await;
846 assert!(
847 result.is_ok(),
848 "dispatch to default output with no senders should succeed"
849 );
850 }
851
852 #[tokio::test]
853 async fn named_output_no_senders() {
854 let mut dispatcher = unbuffered_dispatcher::<SingleEvent<u32>>();
856
857 dispatcher
859 .add_output(OutputName::Given("errors".into()))
860 .expect("should be able to add named output");
861
862 let test_event = 42.into();
864 let result = dispatcher.dispatch_named("errors", test_event).await;
865 assert!(
866 result.is_ok(),
867 "dispatch to named output with no senders should succeed"
868 );
869 }
870
871 #[tokio::test]
872 async fn metrics_default_output_disconnected() {
873 let recorder = DebuggingRecorder::new();
874 let snapshotter = recorder.snapshotter();
875 let dispatcher = metrics::with_local_recorder(&recorder, || {
876 let mut dispatcher = buffered_dispatcher::<FixedUsizeVec<4>>();
877 dispatcher
878 .add_output(OutputName::Default)
879 .expect("should not fail to add default output");
880 dispatcher
881 });
882
883 let mut single_item = FixedUsizeVec::<4>::default();
885 assert_eq!(None, single_item.try_push(42));
886 let single_item_item_count = single_item.item_count() as u64;
887
888 dispatcher
889 .dispatch(single_item.clone())
890 .await
891 .expect("should not fail to dispatch");
892
893 let (events_sent, events_discarded, send_latencies) = get_output_metrics(&snapshotter, "_default");
894 assert_eq!(events_sent, 0);
895 assert_eq!(events_discarded, single_item_item_count);
896 assert!(send_latencies.is_empty());
897
898 let mut multiple_items = FixedUsizeVec::<4>::default();
900 assert_eq!(None, multiple_items.try_push(42));
901 assert_eq!(None, multiple_items.try_push(12345));
902 assert_eq!(None, multiple_items.try_push(1337));
903 let multiple_items_item_count = multiple_items.item_count() as u64;
904
905 dispatcher
906 .dispatch(multiple_items.clone())
907 .await
908 .expect("should not fail to dispatch");
909
910 let (events_sent, events_discarded, send_latencies) = get_output_metrics(&snapshotter, "_default");
911 assert_eq!(events_sent, 0);
912 assert_eq!(events_discarded, multiple_items_item_count);
913 assert!(send_latencies.is_empty());
914 }
915
916 #[tokio::test]
917 async fn metrics_named_output_disconnected() {
918 let output_name = "some_output";
919
920 let recorder = DebuggingRecorder::new();
921 let snapshotter = recorder.snapshotter();
922 let dispatcher = metrics::with_local_recorder(&recorder, || {
923 let mut dispatcher = buffered_dispatcher::<FixedUsizeVec<4>>();
924 dispatcher
925 .add_output(OutputName::Given(output_name.into()))
926 .expect("should not fail to add named output");
927 dispatcher
928 });
929
930 let mut single_item = FixedUsizeVec::<4>::default();
932 assert_eq!(None, single_item.try_push(42));
933 let single_item_item_count = single_item.item_count() as u64;
934
935 dispatcher
936 .dispatch_named(output_name, single_item.clone())
937 .await
938 .expect("should not fail to dispatch");
939
940 let (events_sent, events_discarded, send_latencies) = get_output_metrics(&snapshotter, output_name);
941 assert_eq!(events_sent, 0);
942 assert_eq!(events_discarded, single_item_item_count);
943 assert!(send_latencies.is_empty());
944
945 let mut multiple_items = FixedUsizeVec::<4>::default();
947 assert_eq!(None, multiple_items.try_push(42));
948 assert_eq!(None, multiple_items.try_push(12345));
949 assert_eq!(None, multiple_items.try_push(1337));
950 let multiple_items_item_count = multiple_items.item_count() as u64;
951
952 dispatcher
953 .dispatch_named(output_name, multiple_items.clone())
954 .await
955 .expect("should not fail to dispatch");
956
957 let (events_sent, events_discarded, send_latencies) = get_output_metrics(&snapshotter, output_name);
958 assert_eq!(events_sent, 0);
959 assert_eq!(events_discarded, multiple_items_item_count);
960 assert!(send_latencies.is_empty());
961 }
962
963 #[tokio::test]
964 async fn is_default_output_connected_behavior() {
965 let mut dispatcher = unbuffered_dispatcher::<SingleEvent<u32>>();
966
967 assert!(
969 !dispatcher.is_default_output_connected(),
970 "should return false when no default output exists"
971 );
972
973 dispatcher
975 .add_output(OutputName::Default)
976 .expect("should be able to add default output");
977 assert!(
978 !dispatcher.is_default_output_connected(),
979 "should return false when default output exists but has no senders"
980 );
981
982 let (tx, _rx) = mpsc::channel(1);
984 dispatcher
985 .attach_sender_to_output(&OutputName::Default, tx)
986 .expect("should be able to attach sender");
987 assert!(
988 dispatcher.is_default_output_connected(),
989 "should return true when default output has senders attached"
990 );
991 }
992
993 #[tokio::test]
994 async fn is_named_output_connected_behavior() {
995 let mut dispatcher = unbuffered_dispatcher::<SingleEvent<u32>>();
996 let output_name = "test_output";
997
998 assert!(
1000 !dispatcher.is_named_output_connected(output_name),
1001 "should return false when named output doesn't exist"
1002 );
1003
1004 dispatcher
1006 .add_output(OutputName::Given(output_name.into()))
1007 .expect("should be able to add named output");
1008 assert!(
1009 !dispatcher.is_named_output_connected(output_name),
1010 "should return false when named output exists but has no senders"
1011 );
1012
1013 let (tx, _rx) = mpsc::channel(1);
1015 dispatcher
1016 .attach_sender_to_output(&OutputName::Given(output_name.into()), tx)
1017 .expect("should be able to attach sender");
1018 assert!(
1019 dispatcher.is_named_output_connected(output_name),
1020 "should return true when named output has senders attached"
1021 );
1022
1023 assert!(
1025 !dispatcher.is_named_output_connected("nonexistent_output"),
1026 "should return false for nonexistent output"
1027 );
1028 }
1029}