1use std::{collections::HashMap, future::Future, num::NonZeroUsize};
2
3use resource_accounting::{MemoryLimiter, ResourceGroupToken, Tracked};
4use saluki_common::task::JoinSetExt as _;
5use saluki_error::{generic_error, ErrorContext as _, GenericError};
6use tokio::{
7 runtime::Handle,
8 sync::mpsc,
9 task::{AbortHandle, JoinSet},
10};
11use tracing::{debug, error_span};
12
13pub enum WorkerPoolConfiguration {
19 Ambient,
25
26 Dedicated,
30
31 Explicit(Handle),
35}
36
37use super::{
38 graph::Graph, running::RunningTopology, shutdown::ComponentShutdownCoordinator, ComponentId, EventsBuffer,
39 EventsConsumer, OutputName, PayloadsConsumer, RegisteredComponent, TypedComponentId,
40};
41use crate::health::HealthRegistry;
42use crate::{
43 components::{
44 decoders::{Decoder, DecoderContext},
45 destinations::{Destination, DestinationContext},
46 encoders::{Encoder, EncoderContext},
47 forwarders::{Forwarder, ForwarderContext},
48 relays::{Relay, RelayContext},
49 sources::{Source, SourceContext},
50 transforms::{Transform, TransformContext},
51 ComponentContext, ComponentType,
52 },
53 topology::{context::TopologyContext, EventsDispatcher, PayloadsBuffer, PayloadsDispatcher},
54};
55
56pub struct BuiltTopology {
63 name: String,
64 graph: Graph,
65 sources: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Source + Send>>>>,
66 relays: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Relay + Send>>>>,
67 decoders: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Decoder + Send>>>>,
68 transforms: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Transform + Send>>>>,
69 destinations: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Destination + Send>>>>,
70 encoders: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Encoder + Send>>>>,
71 forwarders: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Forwarder + Send>>>>,
72 component_token: ResourceGroupToken,
73 interconnect_capacity: NonZeroUsize,
74 worker_pool_config: WorkerPoolConfiguration,
75}
76
77impl BuiltTopology {
78 #[allow(clippy::too_many_arguments)]
79 pub(crate) fn from_parts(
80 name: String, graph: Graph,
81 sources: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Source + Send>>>>,
82 relays: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Relay + Send>>>>,
83 decoders: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Decoder + Send>>>>,
84 transforms: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Transform + Send>>>>,
85 destinations: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Destination + Send>>>>,
86 encoders: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Encoder + Send>>>>,
87 forwarders: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Forwarder + Send>>>>,
88 component_token: ResourceGroupToken, interconnect_capacity: NonZeroUsize,
89 ) -> Self {
90 Self {
91 name,
92 graph,
93 sources,
94 relays,
95 decoders,
96 transforms,
97 destinations,
98 encoders,
99 forwarders,
100 component_token,
101 interconnect_capacity,
102 worker_pool_config: WorkerPoolConfiguration::Dedicated,
103 }
104 }
105
106 pub fn with_ambient_worker_pool(mut self) -> Self {
112 self.worker_pool_config = WorkerPoolConfiguration::Ambient;
113 self
114 }
115
116 pub fn with_explicit_worker_pool(mut self, handle: Handle) -> Self {
120 self.worker_pool_config = WorkerPoolConfiguration::Explicit(handle);
121 self
122 }
123
124 fn resolve_worker_pool_handle(&self) -> Result<Handle, GenericError> {
125 match &self.worker_pool_config {
126 WorkerPoolConfiguration::Ambient => Ok(Handle::current()),
127 WorkerPoolConfiguration::Dedicated => {
128 let thread_pool = tokio::runtime::Builder::new_multi_thread()
129 .worker_threads(8)
130 .enable_all()
131 .build()
132 .error_context("Failed to build asynchronous thread pool runtime.")?;
133 let handle = thread_pool.handle().clone();
134
135 std::thread::spawn(move || {
136 thread_pool.block_on(std::future::pending::<()>());
137 });
138
139 Ok(handle)
140 }
141 WorkerPoolConfiguration::Explicit(handle) => Ok(handle.clone()),
142 }
143 }
144
145 pub async fn spawn(
159 self, health_registry: &HealthRegistry, memory_limiter: MemoryLimiter,
160 ) -> Result<RunningTopology, GenericError> {
161 let root_component_name = format!("topology.{}", self.name);
162
163 let _guard = self.component_token.enter();
164
165 let thread_pool_handle = self.resolve_worker_pool_handle()?;
166 let topology_context = TopologyContext::new(memory_limiter, health_registry.clone(), thread_pool_handle);
167
168 let mut component_tasks = JoinSet::new();
169 let mut component_task_map = HashMap::new();
170
171 let mut interconnects = ComponentInterconnects::from_graph(self.interconnect_capacity, &self.graph)
173 .error_context("Failed to build component interconnects.")?;
174
175 let mut shutdown_coordinator = ComponentShutdownCoordinator::default();
176
177 for (component_id, source) in self.sources {
179 let (source, component_registry) = source.into_parts();
180
181 let dispatcher = interconnects
182 .take_source_dispatcher(&component_id)
183 .ok_or_else(|| generic_error!("No events dispatcher found for source component '{}'", component_id))?;
184
185 let shutdown_handle = shutdown_coordinator.register();
186 let health_handle = health_registry
187 .register_component(format!("{}.sources.{}", root_component_name, component_id))
188 .expect("duplicate source component ID in health registry");
189
190 let component_context = ComponentContext::source(component_id.clone());
191 let context = SourceContext::new(
192 &topology_context,
193 &component_context,
194 component_registry,
195 shutdown_handle,
196 health_handle,
197 dispatcher,
198 );
199
200 let (alloc_group, source) = source.into_parts();
201 let task_handle = spawn_component(
202 &mut component_tasks,
203 component_context,
204 alloc_group,
205 source.run(context),
206 );
207 component_task_map.insert(task_handle.id(), component_id);
208 }
209
210 for (component_id, relay) in self.relays {
212 let (relay, component_registry) = relay.into_parts();
213
214 let dispatcher = interconnects
215 .take_relay_dispatcher(&component_id)
216 .ok_or_else(|| generic_error!("No payloads dispatcher found for relay component '{}'", component_id))?;
217
218 let shutdown_handle = shutdown_coordinator.register();
219 let health_handle = health_registry
220 .register_component(format!("{}.relays.{}", root_component_name, component_id))
221 .expect("duplicate relay component ID in health registry");
222
223 let component_context = ComponentContext::relay(component_id.clone());
224 let context = RelayContext::new(
225 &topology_context,
226 &component_context,
227 component_registry,
228 shutdown_handle,
229 health_handle,
230 dispatcher,
231 );
232
233 let (alloc_group, relay) = relay.into_parts();
234 let task_handle = spawn_component(&mut component_tasks, component_context, alloc_group, relay.run(context));
235 component_task_map.insert(task_handle.id(), component_id);
236 }
237
238 for (component_id, decoder) in self.decoders {
240 let (decoder, component_registry) = decoder.into_parts();
241
242 let dispatcher = interconnects
243 .take_decoder_dispatcher(&component_id)
244 .ok_or_else(|| generic_error!("No events dispatcher found for decoder component '{}'", component_id))?;
245
246 let consumer = interconnects
247 .take_decoder_consumer(&component_id)
248 .ok_or_else(|| generic_error!("No payloads consumer found for decoder component '{}'", component_id))?;
249
250 let health_handle = health_registry
251 .register_component(format!("{}.decoders.{}", root_component_name, component_id))
252 .expect("duplicate decoder component ID in health registry");
253
254 let component_context = ComponentContext::decoder(component_id.clone());
255 let context = DecoderContext::new(
256 &topology_context,
257 &component_context,
258 component_registry,
259 health_handle,
260 dispatcher,
261 consumer,
262 );
263
264 let (alloc_group, decoder) = decoder.into_parts();
265 let task_handle = spawn_component(
266 &mut component_tasks,
267 component_context,
268 alloc_group,
269 decoder.run(context),
270 );
271 component_task_map.insert(task_handle.id(), component_id);
272 }
273
274 for (component_id, transform) in self.transforms {
276 let (transform, component_registry) = transform.into_parts();
277
278 let dispatcher = interconnects.take_transform_dispatcher(&component_id).ok_or_else(|| {
279 generic_error!("No events dispatcher found for transform component '{}'", component_id)
280 })?;
281
282 let consumer = interconnects
283 .take_transform_consumer(&component_id)
284 .ok_or_else(|| generic_error!("No events consumer found for transform component '{}'", component_id))?;
285
286 let health_handle = health_registry
287 .register_component(format!("{}.transforms.{}", root_component_name, component_id))
288 .expect("duplicate transform component ID in health registry");
289
290 let component_context = ComponentContext::transform(component_id.clone());
291 let context = TransformContext::new(
292 &topology_context,
293 &component_context,
294 component_registry,
295 health_handle,
296 dispatcher,
297 consumer,
298 );
299
300 let (alloc_group, transform) = transform.into_parts();
301 let task_handle = spawn_component(
302 &mut component_tasks,
303 component_context,
304 alloc_group,
305 transform.run(context),
306 );
307 component_task_map.insert(task_handle.id(), component_id);
308 }
309
310 for (component_id, destination) in self.destinations {
312 let (destination, component_registry) = destination.into_parts();
313
314 let consumer = interconnects.take_destination_consumer(&component_id).ok_or_else(|| {
315 generic_error!("No events consumer found for destination component '{}'", component_id)
316 })?;
317
318 let health_handle = health_registry
319 .register_component(format!("{}.destinations.{}", root_component_name, component_id))
320 .expect("duplicate destination component ID in health registry");
321
322 let component_context = ComponentContext::destination(component_id.clone());
323 let context = DestinationContext::new(
324 &topology_context,
325 &component_context,
326 component_registry,
327 health_handle,
328 consumer,
329 );
330
331 let (alloc_group, destination) = destination.into_parts();
332 let task_handle = spawn_component(
333 &mut component_tasks,
334 component_context,
335 alloc_group,
336 destination.run(context),
337 );
338 component_task_map.insert(task_handle.id(), component_id);
339 }
340
341 for (component_id, encoder) in self.encoders {
343 let (encoder, component_registry) = encoder.into_parts();
344
345 let dispatcher = interconnects.take_encoder_dispatcher(&component_id).ok_or_else(|| {
346 generic_error!("No payloads dispatcher found for encoder component '{}'", component_id)
347 })?;
348
349 let consumer = interconnects
350 .take_encoder_consumer(&component_id)
351 .ok_or_else(|| generic_error!("No events consumer found for encoder component '{}'", component_id))?;
352
353 let health_handle = health_registry
354 .register_component(format!("{}.encoders.{}", root_component_name, component_id))
355 .expect("duplicate encoder component ID in health registry");
356
357 let component_context = ComponentContext::encoder(component_id.clone());
358 let context = EncoderContext::new(
359 &topology_context,
360 &component_context,
361 component_registry,
362 health_handle,
363 dispatcher,
364 consumer,
365 );
366
367 let (alloc_group, encoder) = encoder.into_parts();
368 let task_handle = spawn_component(
369 &mut component_tasks,
370 component_context,
371 alloc_group,
372 encoder.run(context),
373 );
374 component_task_map.insert(task_handle.id(), component_id);
375 }
376
377 for (component_id, forwarder) in self.forwarders {
379 let (forwarder, component_registry) = forwarder.into_parts();
380
381 let consumer = interconnects.take_forwarder_consumer(&component_id).ok_or_else(|| {
382 generic_error!("No payloads consumer found for forwarder component '{}'", component_id)
383 })?;
384
385 let health_handle = health_registry
386 .register_component(format!("{}.forwarders.{}", root_component_name, component_id))
387 .expect("duplicate forwarder component ID in health registry");
388
389 let component_context = ComponentContext::forwarder(component_id.clone());
390 let context = ForwarderContext::new(
391 &topology_context,
392 &component_context,
393 component_registry,
394 health_handle,
395 consumer,
396 );
397
398 let (alloc_group, forwarder) = forwarder.into_parts();
399 let task_handle = spawn_component(
400 &mut component_tasks,
401 component_context,
402 alloc_group,
403 forwarder.run(context),
404 );
405 component_task_map.insert(task_handle.id(), component_id);
406 }
407
408 Ok(RunningTopology::from_parts(
409 shutdown_coordinator,
410 component_tasks,
411 component_task_map,
412 ))
413 }
414}
415
416struct ComponentInterconnects {
417 interconnect_capacity: NonZeroUsize,
418 source_dispatchers: HashMap<ComponentId, EventsDispatcher>,
419 relay_dispatchers: HashMap<ComponentId, PayloadsDispatcher>,
420 decoder_consumers: HashMap<ComponentId, (mpsc::Sender<PayloadsBuffer>, PayloadsConsumer)>,
421 decoder_dispatchers: HashMap<ComponentId, EventsDispatcher>,
422 transform_consumers: HashMap<ComponentId, (mpsc::Sender<EventsBuffer>, EventsConsumer)>,
423 transform_dispatchers: HashMap<ComponentId, EventsDispatcher>,
424 destination_consumers: HashMap<ComponentId, (mpsc::Sender<EventsBuffer>, EventsConsumer)>,
425 encoder_consumers: HashMap<ComponentId, (mpsc::Sender<EventsBuffer>, EventsConsumer)>,
426 encoder_dispatchers: HashMap<ComponentId, PayloadsDispatcher>,
427 forwarder_consumers: HashMap<ComponentId, (mpsc::Sender<PayloadsBuffer>, PayloadsConsumer)>,
428}
429
430impl ComponentInterconnects {
431 fn from_graph(interconnect_capacity: NonZeroUsize, graph: &Graph) -> Result<Self, GenericError> {
432 let mut interconnects = Self {
433 interconnect_capacity,
434 source_dispatchers: HashMap::new(),
435 relay_dispatchers: HashMap::new(),
436 decoder_consumers: HashMap::new(),
437 decoder_dispatchers: HashMap::new(),
438 transform_consumers: HashMap::new(),
439 transform_dispatchers: HashMap::new(),
440 destination_consumers: HashMap::new(),
441 encoder_consumers: HashMap::new(),
442 encoder_dispatchers: HashMap::new(),
443 forwarder_consumers: HashMap::new(),
444 };
445
446 interconnects.generate_interconnects(graph)?;
447 Ok(interconnects)
448 }
449
450 fn take_source_dispatcher(&mut self, component_id: &ComponentId) -> Option<EventsDispatcher> {
451 self.source_dispatchers.remove(component_id)
452 }
453
454 fn take_relay_dispatcher(&mut self, component_id: &ComponentId) -> Option<PayloadsDispatcher> {
455 self.relay_dispatchers.remove(component_id)
456 }
457
458 fn take_decoder_dispatcher(&mut self, component_id: &ComponentId) -> Option<EventsDispatcher> {
459 self.decoder_dispatchers.remove(component_id)
460 }
461
462 fn take_decoder_consumer(&mut self, component_id: &ComponentId) -> Option<PayloadsConsumer> {
463 self.decoder_consumers
464 .remove(component_id)
465 .map(|(_, consumer)| consumer)
466 }
467
468 fn take_transform_dispatcher(&mut self, component_id: &ComponentId) -> Option<EventsDispatcher> {
469 self.transform_dispatchers.remove(component_id)
470 }
471
472 fn take_encoder_dispatcher(&mut self, component_id: &ComponentId) -> Option<PayloadsDispatcher> {
473 self.encoder_dispatchers.remove(component_id)
474 }
475
476 fn take_transform_consumer(&mut self, component_id: &ComponentId) -> Option<EventsConsumer> {
477 self.transform_consumers
478 .remove(component_id)
479 .map(|(_, consumer)| consumer)
480 }
481
482 fn take_destination_consumer(&mut self, component_id: &ComponentId) -> Option<EventsConsumer> {
483 self.destination_consumers
484 .remove(component_id)
485 .map(|(_, consumer)| consumer)
486 }
487
488 fn take_encoder_consumer(&mut self, component_id: &ComponentId) -> Option<EventsConsumer> {
489 self.encoder_consumers
490 .remove(component_id)
491 .map(|(_, consumer)| consumer)
492 }
493
494 fn take_forwarder_consumer(&mut self, component_id: &ComponentId) -> Option<PayloadsConsumer> {
495 self.forwarder_consumers
496 .remove(component_id)
497 .map(|(_, consumer)| consumer)
498 }
499
500 fn generate_interconnects(&mut self, graph: &Graph) -> Result<(), GenericError> {
501 let outbound_edges = graph.get_outbound_directed_edges();
506 for (upstream_id, output_map) in outbound_edges {
507 match upstream_id.component_type() {
508 ComponentType::Source | ComponentType::Decoder | ComponentType::Transform => {
509 self.generate_event_interconnect(upstream_id, output_map)?;
510 }
511 ComponentType::Relay | ComponentType::Encoder => {
512 self.generate_payload_interconnect(upstream_id, output_map)?
513 }
514 _ => panic!(
515 "Only sources, decoders, transforms, relays, and encoders can dispatch events/payloads to downstream components."
516 ),
517 }
518 }
519
520 Ok(())
521 }
522
523 fn generate_event_interconnect(
524 &mut self, upstream_id: TypedComponentId, output_map: HashMap<OutputName, Vec<TypedComponentId>>,
525 ) -> Result<(), GenericError> {
526 for (upstream_output_id, downstream_ids) in output_map {
527 let mut senders = Vec::new();
528 for downstream_id in downstream_ids {
529 debug!(upstream_id = %upstream_id.component_id(), %upstream_output_id, downstream_id = %downstream_id.component_id(), "Adding dispatcher output.");
530 let sender = self.get_or_create_events_sender(downstream_id);
531 senders.push(sender);
532 }
533
534 let dispatcher = self.get_or_create_events_dispatcher(upstream_id.clone());
535 dispatcher.add_output(upstream_output_id.clone())?;
536
537 for sender in senders {
538 dispatcher.attach_sender_to_output(&upstream_output_id, sender)?;
539 }
540 }
541
542 Ok(())
543 }
544
545 fn generate_payload_interconnect(
546 &mut self, upstream_id: TypedComponentId, output_map: HashMap<OutputName, Vec<TypedComponentId>>,
547 ) -> Result<(), GenericError> {
548 for (upstream_output_id, downstream_ids) in output_map {
549 let mut senders = Vec::new();
550 for downstream_id in downstream_ids {
551 debug!(upstream_id = %upstream_id.component_id(), %upstream_output_id, downstream_id = %downstream_id.component_id(), "Adding dispatcher output.");
552 let sender = self.get_or_create_payloads_sender(downstream_id);
553 senders.push(sender);
554 }
555
556 let dispatcher = self.get_or_create_payloads_dispatcher(upstream_id.clone());
557 dispatcher.add_output(upstream_output_id.clone())?;
558
559 for sender in senders {
560 dispatcher.attach_sender_to_output(&upstream_output_id, sender)?;
561 }
562 }
563
564 Ok(())
565 }
566
567 fn get_or_create_events_dispatcher(&mut self, component_id: TypedComponentId) -> &mut EventsDispatcher {
568 let (component_id, component_type, component_context) = component_id.into_parts();
569
570 match component_type {
571 ComponentType::Source => self
572 .source_dispatchers
573 .entry(component_id)
574 .or_insert_with(|| EventsDispatcher::new(component_context)),
575 ComponentType::Decoder => self
576 .decoder_dispatchers
577 .entry(component_id)
578 .or_insert_with(|| EventsDispatcher::new(component_context)),
579 ComponentType::Transform => self
580 .transform_dispatchers
581 .entry(component_id)
582 .or_insert_with(|| EventsDispatcher::new(component_context)),
583 _ => {
584 panic!("Only sources, decoders, and transforms can dispatch events to downstream components.")
585 }
586 }
587 }
588
589 fn get_or_create_events_sender(&mut self, component_id: TypedComponentId) -> mpsc::Sender<EventsBuffer> {
590 let (component_id, component_type, component_context) = component_id.into_parts();
591 let interconnect_capacity = self.interconnect_capacity;
592
593 let (sender, _) = match component_type {
594 ComponentType::Transform => self
595 .transform_consumers
596 .entry(component_id)
597 .or_insert_with(|| build_events_consumer_pair(component_context, interconnect_capacity)),
598 ComponentType::Destination => self
599 .destination_consumers
600 .entry(component_id)
601 .or_insert_with(|| build_events_consumer_pair(component_context, interconnect_capacity)),
602 ComponentType::Encoder => self
603 .encoder_consumers
604 .entry(component_id)
605 .or_insert_with(|| build_events_consumer_pair(component_context, interconnect_capacity)),
606 _ => panic!("Only transforms, destinations, and encoders can consume events."),
607 };
608
609 sender.clone()
610 }
611
612 fn get_or_create_payloads_dispatcher(&mut self, component_id: TypedComponentId) -> &mut PayloadsDispatcher {
613 let (component_id, component_type, component_context) = component_id.into_parts();
614
615 match component_type {
616 ComponentType::Relay => self
617 .relay_dispatchers
618 .entry(component_id)
619 .or_insert_with(|| PayloadsDispatcher::new(component_context)),
620 ComponentType::Encoder => self
621 .encoder_dispatchers
622 .entry(component_id)
623 .or_insert_with(|| PayloadsDispatcher::new(component_context)),
624 _ => {
625 panic!("Only relays and encoders can dispatch payloads to downstream components.")
626 }
627 }
628 }
629
630 fn get_or_create_payloads_sender(&mut self, component_id: TypedComponentId) -> mpsc::Sender<PayloadsBuffer> {
631 let (component_id, component_type, component_context) = component_id.into_parts();
632 let interconnect_capacity = self.interconnect_capacity;
633
634 let (sender, _) = match component_type {
635 ComponentType::Decoder => self
636 .decoder_consumers
637 .entry(component_id)
638 .or_insert_with(|| build_payloads_consumer_pair(component_context, interconnect_capacity)),
639 ComponentType::Forwarder => self
640 .forwarder_consumers
641 .entry(component_id)
642 .or_insert_with(|| build_payloads_consumer_pair(component_context, interconnect_capacity)),
643 _ => panic!("Only decoders and forwarders can consume payloads."),
644 };
645
646 sender.clone()
647 }
648}
649
650fn build_events_consumer_pair(
651 component_context: ComponentContext, interconnect_capacity: NonZeroUsize,
652) -> (mpsc::Sender<EventsBuffer>, EventsConsumer) {
653 let (sender, receiver) = mpsc::channel(interconnect_capacity.get());
654 let consumer = EventsConsumer::new(component_context, receiver);
655 (sender, consumer)
656}
657
658fn build_payloads_consumer_pair(
659 component_context: ComponentContext, interconnect_capacity: NonZeroUsize,
660) -> (mpsc::Sender<PayloadsBuffer>, PayloadsConsumer) {
661 let (sender, receiver) = mpsc::channel(interconnect_capacity.get());
662 let consumer = PayloadsConsumer::new(component_context, receiver);
663 (sender, consumer)
664}
665
666fn spawn_component<F>(
667 join_set: &mut JoinSet<Result<(), GenericError>>, context: ComponentContext,
668 resource_group_token: ResourceGroupToken, component_future: F,
669) -> AbortHandle
670where
671 F: Future<Output = Result<(), GenericError>> + Send + 'static,
672{
673 let component_span = error_span!(
674 "component",
675 "type" = context.component_type().as_str(),
676 id = %context.component_id(),
677 );
678
679 let _span = component_span.enter();
680 let _guard = resource_group_token.enter();
681
682 let component_task_name = format!(
683 "topology-{}-{}",
684 context.component_type().as_str(),
685 context.component_id()
686 );
687 join_set.spawn_traced_named(component_task_name, component_future)
688}
689
690#[cfg(test)]
691mod tests {
692 use std::num::NonZeroUsize;
693
694 use super::*;
695 use crate::data_model::event::EventType;
696 use crate::data_model::payload::PayloadType;
697 use crate::topology::graph::Graph;
698
699 #[test]
700 fn component_interconnects_adds_output_before_attaching() {
701 let mut graph = Graph::default();
702
703 graph
705 .with_source("source1", EventType::EventD)
706 .with_transform("transform1", EventType::EventD, EventType::EventD)
707 .with_encoder("encoder1", EventType::EventD, PayloadType::Raw)
708 .with_forwarder("forwarder1", PayloadType::Raw)
709 .with_destination("dest1", EventType::EventD)
710 .with_edge("source1", "transform1")
711 .with_edge("transform1", "encoder1")
712 .with_edge("encoder1", "forwarder1")
713 .with_edge("transform1", "dest1");
714
715 let interconnect_capacity = NonZeroUsize::new(10).unwrap();
718 let _ = ComponentInterconnects::from_graph(interconnect_capacity, &graph)
719 .expect("should build interconnects successfully");
720 }
721}