1use std::{collections::HashMap, future::Future, num::NonZeroUsize};
2
3use memory_accounting::{
4 allocator::{AllocationGroupToken, Tracked},
5 MemoryLimiter,
6};
7use saluki_common::task::JoinSetExt as _;
8use saluki_error::{generic_error, ErrorContext as _, GenericError};
9use tokio::{
10 runtime::Handle,
11 sync::mpsc,
12 task::{AbortHandle, JoinSet},
13};
14use tracing::{debug, error_span};
15
16pub enum WorkerPoolConfiguration {
22 Ambient,
28
29 Dedicated,
33
34 Explicit(Handle),
38}
39
40use super::{
41 graph::Graph, running::RunningTopology, shutdown::ComponentShutdownCoordinator, ComponentId, EventsBuffer,
42 EventsConsumer, OutputName, PayloadsConsumer, RegisteredComponent, TypedComponentId,
43};
44use crate::health::HealthRegistry;
45use crate::{
46 components::{
47 decoders::{Decoder, DecoderContext},
48 destinations::{Destination, DestinationContext},
49 encoders::{Encoder, EncoderContext},
50 forwarders::{Forwarder, ForwarderContext},
51 relays::{Relay, RelayContext},
52 sources::{Source, SourceContext},
53 transforms::{Transform, TransformContext},
54 ComponentContext, ComponentType,
55 },
56 topology::{context::TopologyContext, EventsDispatcher, PayloadsBuffer, PayloadsDispatcher},
57};
58
59pub struct BuiltTopology {
66 name: String,
67 graph: Graph,
68 sources: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Source + Send>>>>,
69 relays: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Relay + Send>>>>,
70 decoders: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Decoder + Send>>>>,
71 transforms: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Transform + Send>>>>,
72 destinations: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Destination + Send>>>>,
73 encoders: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Encoder + Send>>>>,
74 forwarders: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Forwarder + Send>>>>,
75 component_token: AllocationGroupToken,
76 interconnect_capacity: NonZeroUsize,
77 worker_pool_config: WorkerPoolConfiguration,
78}
79
80impl BuiltTopology {
81 #[allow(clippy::too_many_arguments)]
82 pub(crate) fn from_parts(
83 name: String, graph: Graph,
84 sources: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Source + Send>>>>,
85 relays: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Relay + Send>>>>,
86 decoders: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Decoder + Send>>>>,
87 transforms: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Transform + Send>>>>,
88 destinations: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Destination + Send>>>>,
89 encoders: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Encoder + Send>>>>,
90 forwarders: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Forwarder + Send>>>>,
91 component_token: AllocationGroupToken, interconnect_capacity: NonZeroUsize,
92 ) -> Self {
93 Self {
94 name,
95 graph,
96 sources,
97 relays,
98 decoders,
99 transforms,
100 destinations,
101 encoders,
102 forwarders,
103 component_token,
104 interconnect_capacity,
105 worker_pool_config: WorkerPoolConfiguration::Dedicated,
106 }
107 }
108
109 pub fn with_ambient_worker_pool(mut self) -> Self {
115 self.worker_pool_config = WorkerPoolConfiguration::Ambient;
116 self
117 }
118
119 pub fn with_explicit_worker_pool(mut self, handle: Handle) -> Self {
123 self.worker_pool_config = WorkerPoolConfiguration::Explicit(handle);
124 self
125 }
126
127 fn resolve_worker_pool_handle(&self) -> Result<Handle, GenericError> {
128 match &self.worker_pool_config {
129 WorkerPoolConfiguration::Ambient => Ok(Handle::current()),
130 WorkerPoolConfiguration::Dedicated => {
131 let thread_pool = tokio::runtime::Builder::new_multi_thread()
132 .worker_threads(8)
133 .enable_all()
134 .build()
135 .error_context("Failed to build asynchronous thread pool runtime.")?;
136 let handle = thread_pool.handle().clone();
137
138 std::thread::spawn(move || {
139 thread_pool.block_on(std::future::pending::<()>());
140 });
141
142 Ok(handle)
143 }
144 WorkerPoolConfiguration::Explicit(handle) => Ok(handle.clone()),
145 }
146 }
147
148 pub async fn spawn(
162 self, health_registry: &HealthRegistry, memory_limiter: MemoryLimiter,
163 ) -> Result<RunningTopology, GenericError> {
164 let root_component_name = format!("topology.{}", self.name);
165
166 let _guard = self.component_token.enter();
167
168 let thread_pool_handle = self.resolve_worker_pool_handle()?;
169 let topology_context = TopologyContext::new(memory_limiter, health_registry.clone(), thread_pool_handle);
170
171 let mut component_tasks = JoinSet::new();
172 let mut component_task_map = HashMap::new();
173
174 let mut interconnects = ComponentInterconnects::from_graph(self.interconnect_capacity, &self.graph)
176 .error_context("Failed to build component interconnects.")?;
177
178 let mut shutdown_coordinator = ComponentShutdownCoordinator::default();
179
180 for (component_id, source) in self.sources {
182 let (source, component_registry) = source.into_parts();
183
184 let dispatcher = interconnects
185 .take_source_dispatcher(&component_id)
186 .ok_or_else(|| generic_error!("No events dispatcher found for source component '{}'", component_id))?;
187
188 let shutdown_handle = shutdown_coordinator.register();
189 let health_handle = health_registry
190 .register_component(format!("{}.sources.{}", root_component_name, component_id))
191 .expect("duplicate source component ID in health registry");
192
193 let component_context = ComponentContext::source(component_id.clone());
194 let context = SourceContext::new(
195 &topology_context,
196 &component_context,
197 component_registry,
198 shutdown_handle,
199 health_handle,
200 dispatcher,
201 );
202
203 let (alloc_group, source) = source.into_parts();
204 let task_handle = spawn_component(
205 &mut component_tasks,
206 component_context,
207 alloc_group,
208 source.run(context),
209 );
210 component_task_map.insert(task_handle.id(), component_id);
211 }
212
213 for (component_id, relay) in self.relays {
215 let (relay, component_registry) = relay.into_parts();
216
217 let dispatcher = interconnects
218 .take_relay_dispatcher(&component_id)
219 .ok_or_else(|| generic_error!("No payloads dispatcher found for relay component '{}'", component_id))?;
220
221 let shutdown_handle = shutdown_coordinator.register();
222 let health_handle = health_registry
223 .register_component(format!("{}.relays.{}", root_component_name, component_id))
224 .expect("duplicate relay component ID in health registry");
225
226 let component_context = ComponentContext::relay(component_id.clone());
227 let context = RelayContext::new(
228 &topology_context,
229 &component_context,
230 component_registry,
231 shutdown_handle,
232 health_handle,
233 dispatcher,
234 );
235
236 let (alloc_group, relay) = relay.into_parts();
237 let task_handle = spawn_component(&mut component_tasks, component_context, alloc_group, relay.run(context));
238 component_task_map.insert(task_handle.id(), component_id);
239 }
240
241 for (component_id, decoder) in self.decoders {
243 let (decoder, component_registry) = decoder.into_parts();
244
245 let dispatcher = interconnects
246 .take_decoder_dispatcher(&component_id)
247 .ok_or_else(|| generic_error!("No events dispatcher found for decoder component '{}'", component_id))?;
248
249 let consumer = interconnects
250 .take_decoder_consumer(&component_id)
251 .ok_or_else(|| generic_error!("No payloads consumer found for decoder component '{}'", component_id))?;
252
253 let health_handle = health_registry
254 .register_component(format!("{}.decoders.{}", root_component_name, component_id))
255 .expect("duplicate decoder component ID in health registry");
256
257 let component_context = ComponentContext::decoder(component_id.clone());
258 let context = DecoderContext::new(
259 &topology_context,
260 &component_context,
261 component_registry,
262 health_handle,
263 dispatcher,
264 consumer,
265 );
266
267 let (alloc_group, decoder) = decoder.into_parts();
268 let task_handle = spawn_component(
269 &mut component_tasks,
270 component_context,
271 alloc_group,
272 decoder.run(context),
273 );
274 component_task_map.insert(task_handle.id(), component_id);
275 }
276
277 for (component_id, transform) in self.transforms {
279 let (transform, component_registry) = transform.into_parts();
280
281 let dispatcher = interconnects.take_transform_dispatcher(&component_id).ok_or_else(|| {
282 generic_error!("No events dispatcher found for transform component '{}'", component_id)
283 })?;
284
285 let consumer = interconnects
286 .take_transform_consumer(&component_id)
287 .ok_or_else(|| generic_error!("No events consumer found for transform component '{}'", component_id))?;
288
289 let health_handle = health_registry
290 .register_component(format!("{}.transforms.{}", root_component_name, component_id))
291 .expect("duplicate transform component ID in health registry");
292
293 let component_context = ComponentContext::transform(component_id.clone());
294 let context = TransformContext::new(
295 &topology_context,
296 &component_context,
297 component_registry,
298 health_handle,
299 dispatcher,
300 consumer,
301 );
302
303 let (alloc_group, transform) = transform.into_parts();
304 let task_handle = spawn_component(
305 &mut component_tasks,
306 component_context,
307 alloc_group,
308 transform.run(context),
309 );
310 component_task_map.insert(task_handle.id(), component_id);
311 }
312
313 for (component_id, destination) in self.destinations {
315 let (destination, component_registry) = destination.into_parts();
316
317 let consumer = interconnects.take_destination_consumer(&component_id).ok_or_else(|| {
318 generic_error!("No events consumer found for destination component '{}'", component_id)
319 })?;
320
321 let health_handle = health_registry
322 .register_component(format!("{}.destinations.{}", root_component_name, component_id))
323 .expect("duplicate destination component ID in health registry");
324
325 let component_context = ComponentContext::destination(component_id.clone());
326 let context = DestinationContext::new(
327 &topology_context,
328 &component_context,
329 component_registry,
330 health_handle,
331 consumer,
332 );
333
334 let (alloc_group, destination) = destination.into_parts();
335 let task_handle = spawn_component(
336 &mut component_tasks,
337 component_context,
338 alloc_group,
339 destination.run(context),
340 );
341 component_task_map.insert(task_handle.id(), component_id);
342 }
343
344 for (component_id, encoder) in self.encoders {
346 let (encoder, component_registry) = encoder.into_parts();
347
348 let dispatcher = interconnects.take_encoder_dispatcher(&component_id).ok_or_else(|| {
349 generic_error!("No payloads dispatcher found for encoder component '{}'", component_id)
350 })?;
351
352 let consumer = interconnects
353 .take_encoder_consumer(&component_id)
354 .ok_or_else(|| generic_error!("No events consumer found for encoder component '{}'", component_id))?;
355
356 let health_handle = health_registry
357 .register_component(format!("{}.encoders.{}", root_component_name, component_id))
358 .expect("duplicate encoder component ID in health registry");
359
360 let component_context = ComponentContext::encoder(component_id.clone());
361 let context = EncoderContext::new(
362 &topology_context,
363 &component_context,
364 component_registry,
365 health_handle,
366 dispatcher,
367 consumer,
368 );
369
370 let (alloc_group, encoder) = encoder.into_parts();
371 let task_handle = spawn_component(
372 &mut component_tasks,
373 component_context,
374 alloc_group,
375 encoder.run(context),
376 );
377 component_task_map.insert(task_handle.id(), component_id);
378 }
379
380 for (component_id, forwarder) in self.forwarders {
382 let (forwarder, component_registry) = forwarder.into_parts();
383
384 let consumer = interconnects.take_forwarder_consumer(&component_id).ok_or_else(|| {
385 generic_error!("No payloads consumer found for forwarder component '{}'", component_id)
386 })?;
387
388 let health_handle = health_registry
389 .register_component(format!("{}.forwarders.{}", root_component_name, component_id))
390 .expect("duplicate forwarder component ID in health registry");
391
392 let component_context = ComponentContext::forwarder(component_id.clone());
393 let context = ForwarderContext::new(
394 &topology_context,
395 &component_context,
396 component_registry,
397 health_handle,
398 consumer,
399 );
400
401 let (alloc_group, forwarder) = forwarder.into_parts();
402 let task_handle = spawn_component(
403 &mut component_tasks,
404 component_context,
405 alloc_group,
406 forwarder.run(context),
407 );
408 component_task_map.insert(task_handle.id(), component_id);
409 }
410
411 Ok(RunningTopology::from_parts(
412 shutdown_coordinator,
413 component_tasks,
414 component_task_map,
415 ))
416 }
417}
418
419struct ComponentInterconnects {
420 interconnect_capacity: NonZeroUsize,
421 source_dispatchers: HashMap<ComponentId, EventsDispatcher>,
422 relay_dispatchers: HashMap<ComponentId, PayloadsDispatcher>,
423 decoder_consumers: HashMap<ComponentId, (mpsc::Sender<PayloadsBuffer>, PayloadsConsumer)>,
424 decoder_dispatchers: HashMap<ComponentId, EventsDispatcher>,
425 transform_consumers: HashMap<ComponentId, (mpsc::Sender<EventsBuffer>, EventsConsumer)>,
426 transform_dispatchers: HashMap<ComponentId, EventsDispatcher>,
427 destination_consumers: HashMap<ComponentId, (mpsc::Sender<EventsBuffer>, EventsConsumer)>,
428 encoder_consumers: HashMap<ComponentId, (mpsc::Sender<EventsBuffer>, EventsConsumer)>,
429 encoder_dispatchers: HashMap<ComponentId, PayloadsDispatcher>,
430 forwarder_consumers: HashMap<ComponentId, (mpsc::Sender<PayloadsBuffer>, PayloadsConsumer)>,
431}
432
433impl ComponentInterconnects {
434 fn from_graph(interconnect_capacity: NonZeroUsize, graph: &Graph) -> Result<Self, GenericError> {
435 let mut interconnects = Self {
436 interconnect_capacity,
437 source_dispatchers: HashMap::new(),
438 relay_dispatchers: HashMap::new(),
439 decoder_consumers: HashMap::new(),
440 decoder_dispatchers: HashMap::new(),
441 transform_consumers: HashMap::new(),
442 transform_dispatchers: HashMap::new(),
443 destination_consumers: HashMap::new(),
444 encoder_consumers: HashMap::new(),
445 encoder_dispatchers: HashMap::new(),
446 forwarder_consumers: HashMap::new(),
447 };
448
449 interconnects.generate_interconnects(graph)?;
450 Ok(interconnects)
451 }
452
453 fn take_source_dispatcher(&mut self, component_id: &ComponentId) -> Option<EventsDispatcher> {
454 self.source_dispatchers.remove(component_id)
455 }
456
457 fn take_relay_dispatcher(&mut self, component_id: &ComponentId) -> Option<PayloadsDispatcher> {
458 self.relay_dispatchers.remove(component_id)
459 }
460
461 fn take_decoder_dispatcher(&mut self, component_id: &ComponentId) -> Option<EventsDispatcher> {
462 self.decoder_dispatchers.remove(component_id)
463 }
464
465 fn take_decoder_consumer(&mut self, component_id: &ComponentId) -> Option<PayloadsConsumer> {
466 self.decoder_consumers
467 .remove(component_id)
468 .map(|(_, consumer)| consumer)
469 }
470
471 fn take_transform_dispatcher(&mut self, component_id: &ComponentId) -> Option<EventsDispatcher> {
472 self.transform_dispatchers.remove(component_id)
473 }
474
475 fn take_encoder_dispatcher(&mut self, component_id: &ComponentId) -> Option<PayloadsDispatcher> {
476 self.encoder_dispatchers.remove(component_id)
477 }
478
479 fn take_transform_consumer(&mut self, component_id: &ComponentId) -> Option<EventsConsumer> {
480 self.transform_consumers
481 .remove(component_id)
482 .map(|(_, consumer)| consumer)
483 }
484
485 fn take_destination_consumer(&mut self, component_id: &ComponentId) -> Option<EventsConsumer> {
486 self.destination_consumers
487 .remove(component_id)
488 .map(|(_, consumer)| consumer)
489 }
490
491 fn take_encoder_consumer(&mut self, component_id: &ComponentId) -> Option<EventsConsumer> {
492 self.encoder_consumers
493 .remove(component_id)
494 .map(|(_, consumer)| consumer)
495 }
496
497 fn take_forwarder_consumer(&mut self, component_id: &ComponentId) -> Option<PayloadsConsumer> {
498 self.forwarder_consumers
499 .remove(component_id)
500 .map(|(_, consumer)| consumer)
501 }
502
503 fn generate_interconnects(&mut self, graph: &Graph) -> Result<(), GenericError> {
504 let outbound_edges = graph.get_outbound_directed_edges();
509 for (upstream_id, output_map) in outbound_edges {
510 match upstream_id.component_type() {
511 ComponentType::Source | ComponentType::Decoder | ComponentType::Transform => {
512 self.generate_event_interconnect(upstream_id, output_map)?;
513 }
514 ComponentType::Relay | ComponentType::Encoder => {
515 self.generate_payload_interconnect(upstream_id, output_map)?
516 }
517 _ => panic!(
518 "Only sources, decoders, transforms, relays, and encoders can dispatch events/payloads to downstream components."
519 ),
520 }
521 }
522
523 Ok(())
524 }
525
526 fn generate_event_interconnect(
527 &mut self, upstream_id: TypedComponentId, output_map: HashMap<OutputName, Vec<TypedComponentId>>,
528 ) -> Result<(), GenericError> {
529 for (upstream_output_id, downstream_ids) in output_map {
530 let mut senders = Vec::new();
531 for downstream_id in downstream_ids {
532 debug!(upstream_id = %upstream_id.component_id(), %upstream_output_id, downstream_id = %downstream_id.component_id(), "Adding dispatcher output.");
533 let sender = self.get_or_create_events_sender(downstream_id);
534 senders.push(sender);
535 }
536
537 let dispatcher = self.get_or_create_events_dispatcher(upstream_id.clone());
538 dispatcher.add_output(upstream_output_id.clone())?;
539
540 for sender in senders {
541 dispatcher.attach_sender_to_output(&upstream_output_id, sender)?;
542 }
543 }
544
545 Ok(())
546 }
547
548 fn generate_payload_interconnect(
549 &mut self, upstream_id: TypedComponentId, output_map: HashMap<OutputName, Vec<TypedComponentId>>,
550 ) -> Result<(), GenericError> {
551 for (upstream_output_id, downstream_ids) in output_map {
552 let mut senders = Vec::new();
553 for downstream_id in downstream_ids {
554 debug!(upstream_id = %upstream_id.component_id(), %upstream_output_id, downstream_id = %downstream_id.component_id(), "Adding dispatcher output.");
555 let sender = self.get_or_create_payloads_sender(downstream_id);
556 senders.push(sender);
557 }
558
559 let dispatcher = self.get_or_create_payloads_dispatcher(upstream_id.clone());
560 dispatcher.add_output(upstream_output_id.clone())?;
561
562 for sender in senders {
563 dispatcher.attach_sender_to_output(&upstream_output_id, sender)?;
564 }
565 }
566
567 Ok(())
568 }
569
570 fn get_or_create_events_dispatcher(&mut self, component_id: TypedComponentId) -> &mut EventsDispatcher {
571 let (component_id, component_type, component_context) = component_id.into_parts();
572
573 match component_type {
574 ComponentType::Source => self
575 .source_dispatchers
576 .entry(component_id)
577 .or_insert_with(|| EventsDispatcher::new(component_context)),
578 ComponentType::Decoder => self
579 .decoder_dispatchers
580 .entry(component_id)
581 .or_insert_with(|| EventsDispatcher::new(component_context)),
582 ComponentType::Transform => self
583 .transform_dispatchers
584 .entry(component_id)
585 .or_insert_with(|| EventsDispatcher::new(component_context)),
586 _ => {
587 panic!("Only sources, decoders, and transforms can dispatch events to downstream components.")
588 }
589 }
590 }
591
592 fn get_or_create_events_sender(&mut self, component_id: TypedComponentId) -> mpsc::Sender<EventsBuffer> {
593 let (component_id, component_type, component_context) = component_id.into_parts();
594 let interconnect_capacity = self.interconnect_capacity;
595
596 let (sender, _) = match component_type {
597 ComponentType::Transform => self
598 .transform_consumers
599 .entry(component_id)
600 .or_insert_with(|| build_events_consumer_pair(component_context, interconnect_capacity)),
601 ComponentType::Destination => self
602 .destination_consumers
603 .entry(component_id)
604 .or_insert_with(|| build_events_consumer_pair(component_context, interconnect_capacity)),
605 ComponentType::Encoder => self
606 .encoder_consumers
607 .entry(component_id)
608 .or_insert_with(|| build_events_consumer_pair(component_context, interconnect_capacity)),
609 _ => panic!("Only transforms, destinations, and encoders can consume events."),
610 };
611
612 sender.clone()
613 }
614
615 fn get_or_create_payloads_dispatcher(&mut self, component_id: TypedComponentId) -> &mut PayloadsDispatcher {
616 let (component_id, component_type, component_context) = component_id.into_parts();
617
618 match component_type {
619 ComponentType::Relay => self
620 .relay_dispatchers
621 .entry(component_id)
622 .or_insert_with(|| PayloadsDispatcher::new(component_context)),
623 ComponentType::Encoder => self
624 .encoder_dispatchers
625 .entry(component_id)
626 .or_insert_with(|| PayloadsDispatcher::new(component_context)),
627 _ => {
628 panic!("Only relays and encoders can dispatch payloads to downstream components.")
629 }
630 }
631 }
632
633 fn get_or_create_payloads_sender(&mut self, component_id: TypedComponentId) -> mpsc::Sender<PayloadsBuffer> {
634 let (component_id, component_type, component_context) = component_id.into_parts();
635 let interconnect_capacity = self.interconnect_capacity;
636
637 let (sender, _) = match component_type {
638 ComponentType::Decoder => self
639 .decoder_consumers
640 .entry(component_id)
641 .or_insert_with(|| build_payloads_consumer_pair(component_context, interconnect_capacity)),
642 ComponentType::Forwarder => self
643 .forwarder_consumers
644 .entry(component_id)
645 .or_insert_with(|| build_payloads_consumer_pair(component_context, interconnect_capacity)),
646 _ => panic!("Only decoders and forwarders can consume payloads."),
647 };
648
649 sender.clone()
650 }
651}
652
653fn build_events_consumer_pair(
654 component_context: ComponentContext, interconnect_capacity: NonZeroUsize,
655) -> (mpsc::Sender<EventsBuffer>, EventsConsumer) {
656 let (sender, receiver) = mpsc::channel(interconnect_capacity.get());
657 let consumer = EventsConsumer::new(component_context, receiver);
658 (sender, consumer)
659}
660
661fn build_payloads_consumer_pair(
662 component_context: ComponentContext, interconnect_capacity: NonZeroUsize,
663) -> (mpsc::Sender<PayloadsBuffer>, PayloadsConsumer) {
664 let (sender, receiver) = mpsc::channel(interconnect_capacity.get());
665 let consumer = PayloadsConsumer::new(component_context, receiver);
666 (sender, consumer)
667}
668
669fn spawn_component<F>(
670 join_set: &mut JoinSet<Result<(), GenericError>>, context: ComponentContext,
671 allocation_group_token: AllocationGroupToken, component_future: F,
672) -> AbortHandle
673where
674 F: Future<Output = Result<(), GenericError>> + Send + 'static,
675{
676 let component_span = error_span!(
677 "component",
678 "type" = context.component_type().as_str(),
679 id = %context.component_id(),
680 );
681
682 let _span = component_span.enter();
683 let _guard = allocation_group_token.enter();
684
685 let component_task_name = format!(
686 "topology-{}-{}",
687 context.component_type().as_str(),
688 context.component_id()
689 );
690 join_set.spawn_traced_named(component_task_name, component_future)
691}
692
693#[cfg(test)]
694mod tests {
695 use std::num::NonZeroUsize;
696
697 use super::*;
698 use crate::data_model::event::EventType;
699 use crate::data_model::payload::PayloadType;
700 use crate::topology::graph::Graph;
701
702 #[test]
703 fn component_interconnects_adds_output_before_attaching() {
704 let mut graph = Graph::default();
705
706 graph
708 .with_source("source1", EventType::EventD)
709 .with_transform("transform1", EventType::EventD, EventType::EventD)
710 .with_encoder("encoder1", EventType::EventD, PayloadType::Raw)
711 .with_forwarder("forwarder1", PayloadType::Raw)
712 .with_destination("dest1", EventType::EventD)
713 .with_edge("source1", "transform1")
714 .with_edge("transform1", "encoder1")
715 .with_edge("encoder1", "forwarder1")
716 .with_edge("transform1", "dest1");
717
718 let interconnect_capacity = NonZeroUsize::new(10).unwrap();
721 let _ = ComponentInterconnects::from_graph(interconnect_capacity, &graph)
722 .expect("should build interconnects successfully");
723 }
724}