saluki_core/topology/
built.rs

1use std::{collections::HashMap, future::Future, num::NonZeroUsize};
2
3use memory_accounting::{
4    allocator::{AllocationGroupToken, Tracked},
5    MemoryLimiter,
6};
7use saluki_common::task::JoinSetExt as _;
8use saluki_error::{generic_error, ErrorContext as _, GenericError};
9use saluki_health::HealthRegistry;
10use tokio::{
11    sync::mpsc,
12    task::{AbortHandle, JoinSet},
13};
14use tracing::{debug, error_span};
15
16use super::{
17    graph::Graph, running::RunningTopology, shutdown::ComponentShutdownCoordinator, ComponentId, EventsBuffer,
18    EventsConsumer, OutputName, PayloadsConsumer, RegisteredComponent, TypedComponentId,
19};
20use crate::{
21    components::{
22        decoders::{Decoder, DecoderContext},
23        destinations::{Destination, DestinationContext},
24        encoders::{Encoder, EncoderContext},
25        forwarders::{Forwarder, ForwarderContext},
26        relays::{Relay, RelayContext},
27        sources::{Source, SourceContext},
28        transforms::{Transform, TransformContext},
29        ComponentContext, ComponentType,
30    },
31    topology::{context::TopologyContext, EventsDispatcher, PayloadsBuffer, PayloadsDispatcher},
32};
33
34/// A built topology.
35///
36/// Built topologies represent a topology blueprint where each configured component, along with their associated
37/// connections to other components, was validated and built successfully.
38///
39/// A built topology must be spawned via [`spawn`][Self::spawn].
40pub struct BuiltTopology {
41    name: String,
42    graph: Graph,
43    sources: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Source + Send>>>>,
44    relays: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Relay + Send>>>>,
45    decoders: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Decoder + Send>>>>,
46    transforms: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Transform + Send>>>>,
47    destinations: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Destination + Send>>>>,
48    encoders: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Encoder + Send>>>>,
49    forwarders: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Forwarder + Send>>>>,
50    component_token: AllocationGroupToken,
51    interconnect_capacity: NonZeroUsize,
52}
53
54impl BuiltTopology {
55    #[allow(clippy::too_many_arguments)]
56    pub(crate) fn from_parts(
57        name: String, graph: Graph,
58        sources: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Source + Send>>>>,
59        relays: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Relay + Send>>>>,
60        decoders: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Decoder + Send>>>>,
61        transforms: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Transform + Send>>>>,
62        destinations: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Destination + Send>>>>,
63        encoders: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Encoder + Send>>>>,
64        forwarders: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Forwarder + Send>>>>,
65        component_token: AllocationGroupToken, interconnect_capacity: NonZeroUsize,
66    ) -> Self {
67        Self {
68            name,
69            graph,
70            sources,
71            relays,
72            decoders,
73            transforms,
74            destinations,
75            encoders,
76            forwarders,
77            component_token,
78            interconnect_capacity,
79        }
80    }
81
82    /// Spawns the topology.
83    ///
84    /// A handle is returned that can be used to trigger the topology to shutdown.
85    ///
86    /// ## Errors
87    ///
88    /// If an error occurs while spawning the topology, an error is returned.
89    pub async fn spawn(
90        self, health_registry: &HealthRegistry, memory_limiter: MemoryLimiter,
91    ) -> Result<RunningTopology, GenericError> {
92        let root_component_name = format!("topology.{}", self.name);
93
94        let _guard = self.component_token.enter();
95
96        let thread_pool = tokio::runtime::Builder::new_multi_thread()
97            .worker_threads(8)
98            .enable_all()
99            .build()
100            .error_context("Failed to build asynchronous thread pool runtime.")?;
101        let thread_pool_handle = thread_pool.handle().clone();
102
103        std::thread::spawn(move || {
104            thread_pool.block_on(std::future::pending::<()>());
105        });
106
107        let topology_context = TopologyContext::new(memory_limiter, health_registry.clone(), thread_pool_handle);
108
109        let mut component_tasks = JoinSet::new();
110        let mut component_task_map = HashMap::new();
111
112        // Build our interconnects, which we'll grab from piecemeal as we spawn our components.
113        let mut interconnects = ComponentInterconnects::from_graph(self.interconnect_capacity, &self.graph)
114            .error_context("Failed to build component interconnects.")?;
115
116        let mut shutdown_coordinator = ComponentShutdownCoordinator::default();
117
118        // Spawn our sources.
119        for (component_id, source) in self.sources {
120            let (source, component_registry) = source.into_parts();
121
122            let dispatcher = interconnects
123                .take_source_dispatcher(&component_id)
124                .ok_or_else(|| generic_error!("No events dispatcher found for source component '{}'", component_id))?;
125
126            let shutdown_handle = shutdown_coordinator.register();
127            let health_handle = health_registry
128                .register_component(format!("{}.sources.{}", root_component_name, component_id))
129                .expect("duplicate source component ID in health registry");
130
131            let component_context = ComponentContext::source(component_id.clone());
132            let context = SourceContext::new(
133                &topology_context,
134                &component_context,
135                component_registry,
136                shutdown_handle,
137                health_handle,
138                dispatcher,
139            );
140
141            let (alloc_group, source) = source.into_parts();
142            let task_handle = spawn_component(
143                &mut component_tasks,
144                component_context,
145                alloc_group,
146                source.run(context),
147            );
148            component_task_map.insert(task_handle.id(), component_id);
149        }
150
151        // Spawn our relays.
152        for (component_id, relay) in self.relays {
153            let (relay, component_registry) = relay.into_parts();
154
155            let dispatcher = interconnects
156                .take_relay_dispatcher(&component_id)
157                .ok_or_else(|| generic_error!("No payloads dispatcher found for relay component '{}'", component_id))?;
158
159            let shutdown_handle = shutdown_coordinator.register();
160            let health_handle = health_registry
161                .register_component(format!("{}.relays.{}", root_component_name, component_id))
162                .expect("duplicate relay component ID in health registry");
163
164            let component_context = ComponentContext::relay(component_id.clone());
165            let context = RelayContext::new(
166                &topology_context,
167                &component_context,
168                component_registry,
169                shutdown_handle,
170                health_handle,
171                dispatcher,
172            );
173
174            let (alloc_group, relay) = relay.into_parts();
175            let task_handle = spawn_component(&mut component_tasks, component_context, alloc_group, relay.run(context));
176            component_task_map.insert(task_handle.id(), component_id);
177        }
178
179        // Spawn our decoders.
180        for (component_id, decoder) in self.decoders {
181            let (decoder, component_registry) = decoder.into_parts();
182
183            let dispatcher = interconnects
184                .take_decoder_dispatcher(&component_id)
185                .ok_or_else(|| generic_error!("No events dispatcher found for decoder component '{}'", component_id))?;
186
187            let consumer = interconnects
188                .take_decoder_consumer(&component_id)
189                .ok_or_else(|| generic_error!("No payloads consumer found for decoder component '{}'", component_id))?;
190
191            let health_handle = health_registry
192                .register_component(format!("{}.decoders.{}", root_component_name, component_id))
193                .expect("duplicate decoder component ID in health registry");
194
195            let component_context = ComponentContext::decoder(component_id.clone());
196            let context = DecoderContext::new(
197                &topology_context,
198                &component_context,
199                component_registry,
200                health_handle,
201                dispatcher,
202                consumer,
203            );
204
205            let (alloc_group, decoder) = decoder.into_parts();
206            let task_handle = spawn_component(
207                &mut component_tasks,
208                component_context,
209                alloc_group,
210                decoder.run(context),
211            );
212            component_task_map.insert(task_handle.id(), component_id);
213        }
214
215        // Spawn our transforms.
216        for (component_id, transform) in self.transforms {
217            let (transform, component_registry) = transform.into_parts();
218
219            let dispatcher = interconnects.take_transform_dispatcher(&component_id).ok_or_else(|| {
220                generic_error!("No events dispatcher found for transform component '{}'", component_id)
221            })?;
222
223            let consumer = interconnects
224                .take_transform_consumer(&component_id)
225                .ok_or_else(|| generic_error!("No events consumer found for transform component '{}'", component_id))?;
226
227            let health_handle = health_registry
228                .register_component(format!("{}.transforms.{}", root_component_name, component_id))
229                .expect("duplicate transform component ID in health registry");
230
231            let component_context = ComponentContext::transform(component_id.clone());
232            let context = TransformContext::new(
233                &topology_context,
234                &component_context,
235                component_registry,
236                health_handle,
237                dispatcher,
238                consumer,
239            );
240
241            let (alloc_group, transform) = transform.into_parts();
242            let task_handle = spawn_component(
243                &mut component_tasks,
244                component_context,
245                alloc_group,
246                transform.run(context),
247            );
248            component_task_map.insert(task_handle.id(), component_id);
249        }
250
251        // Spawn our destinations.
252        for (component_id, destination) in self.destinations {
253            let (destination, component_registry) = destination.into_parts();
254
255            let consumer = interconnects.take_destination_consumer(&component_id).ok_or_else(|| {
256                generic_error!("No events consumer found for destination component '{}'", component_id)
257            })?;
258
259            let health_handle = health_registry
260                .register_component(format!("{}.destinations.{}", root_component_name, component_id))
261                .expect("duplicate destination component ID in health registry");
262
263            let component_context = ComponentContext::destination(component_id.clone());
264            let context = DestinationContext::new(
265                &topology_context,
266                &component_context,
267                component_registry,
268                health_handle,
269                consumer,
270            );
271
272            let (alloc_group, destination) = destination.into_parts();
273            let task_handle = spawn_component(
274                &mut component_tasks,
275                component_context,
276                alloc_group,
277                destination.run(context),
278            );
279            component_task_map.insert(task_handle.id(), component_id);
280        }
281
282        // Spawn our encoders.
283        for (component_id, encoder) in self.encoders {
284            let (encoder, component_registry) = encoder.into_parts();
285
286            let dispatcher = interconnects.take_encoder_dispatcher(&component_id).ok_or_else(|| {
287                generic_error!("No payloads dispatcher found for encoder component '{}'", component_id)
288            })?;
289
290            let consumer = interconnects
291                .take_encoder_consumer(&component_id)
292                .ok_or_else(|| generic_error!("No events consumer found for encoder component '{}'", component_id))?;
293
294            let health_handle = health_registry
295                .register_component(format!("{}.encoders.{}", root_component_name, component_id))
296                .expect("duplicate encoder component ID in health registry");
297
298            let component_context = ComponentContext::encoder(component_id.clone());
299            let context = EncoderContext::new(
300                &topology_context,
301                &component_context,
302                component_registry,
303                health_handle,
304                dispatcher,
305                consumer,
306            );
307
308            let (alloc_group, encoder) = encoder.into_parts();
309            let task_handle = spawn_component(
310                &mut component_tasks,
311                component_context,
312                alloc_group,
313                encoder.run(context),
314            );
315            component_task_map.insert(task_handle.id(), component_id);
316        }
317
318        // Spawn our forwarders.
319        for (component_id, forwarder) in self.forwarders {
320            let (forwarder, component_registry) = forwarder.into_parts();
321
322            let consumer = interconnects.take_forwarder_consumer(&component_id).ok_or_else(|| {
323                generic_error!("No payloads consumer found for forwarder component '{}'", component_id)
324            })?;
325
326            let health_handle = health_registry
327                .register_component(format!("{}.forwarders.{}", root_component_name, component_id))
328                .expect("duplicate forwarder component ID in health registry");
329
330            let component_context = ComponentContext::forwarder(component_id.clone());
331            let context = ForwarderContext::new(
332                &topology_context,
333                &component_context,
334                component_registry,
335                health_handle,
336                consumer,
337            );
338
339            let (alloc_group, forwarder) = forwarder.into_parts();
340            let task_handle = spawn_component(
341                &mut component_tasks,
342                component_context,
343                alloc_group,
344                forwarder.run(context),
345            );
346            component_task_map.insert(task_handle.id(), component_id);
347        }
348
349        Ok(RunningTopology::from_parts(
350            shutdown_coordinator,
351            component_tasks,
352            component_task_map,
353        ))
354    }
355}
356
357struct ComponentInterconnects {
358    interconnect_capacity: NonZeroUsize,
359    source_dispatchers: HashMap<ComponentId, EventsDispatcher>,
360    relay_dispatchers: HashMap<ComponentId, PayloadsDispatcher>,
361    decoder_consumers: HashMap<ComponentId, (mpsc::Sender<PayloadsBuffer>, PayloadsConsumer)>,
362    decoder_dispatchers: HashMap<ComponentId, EventsDispatcher>,
363    transform_consumers: HashMap<ComponentId, (mpsc::Sender<EventsBuffer>, EventsConsumer)>,
364    transform_dispatchers: HashMap<ComponentId, EventsDispatcher>,
365    destination_consumers: HashMap<ComponentId, (mpsc::Sender<EventsBuffer>, EventsConsumer)>,
366    encoder_consumers: HashMap<ComponentId, (mpsc::Sender<EventsBuffer>, EventsConsumer)>,
367    encoder_dispatchers: HashMap<ComponentId, PayloadsDispatcher>,
368    forwarder_consumers: HashMap<ComponentId, (mpsc::Sender<PayloadsBuffer>, PayloadsConsumer)>,
369}
370
371impl ComponentInterconnects {
372    fn from_graph(interconnect_capacity: NonZeroUsize, graph: &Graph) -> Result<Self, GenericError> {
373        let mut interconnects = Self {
374            interconnect_capacity,
375            source_dispatchers: HashMap::new(),
376            relay_dispatchers: HashMap::new(),
377            decoder_consumers: HashMap::new(),
378            decoder_dispatchers: HashMap::new(),
379            transform_consumers: HashMap::new(),
380            transform_dispatchers: HashMap::new(),
381            destination_consumers: HashMap::new(),
382            encoder_consumers: HashMap::new(),
383            encoder_dispatchers: HashMap::new(),
384            forwarder_consumers: HashMap::new(),
385        };
386
387        interconnects.generate_interconnects(graph)?;
388        Ok(interconnects)
389    }
390
391    fn take_source_dispatcher(&mut self, component_id: &ComponentId) -> Option<EventsDispatcher> {
392        self.source_dispatchers.remove(component_id)
393    }
394
395    fn take_relay_dispatcher(&mut self, component_id: &ComponentId) -> Option<PayloadsDispatcher> {
396        self.relay_dispatchers.remove(component_id)
397    }
398
399    fn take_decoder_dispatcher(&mut self, component_id: &ComponentId) -> Option<EventsDispatcher> {
400        self.decoder_dispatchers.remove(component_id)
401    }
402
403    fn take_decoder_consumer(&mut self, component_id: &ComponentId) -> Option<PayloadsConsumer> {
404        self.decoder_consumers
405            .remove(component_id)
406            .map(|(_, consumer)| consumer)
407    }
408
409    fn take_transform_dispatcher(&mut self, component_id: &ComponentId) -> Option<EventsDispatcher> {
410        self.transform_dispatchers.remove(component_id)
411    }
412
413    fn take_encoder_dispatcher(&mut self, component_id: &ComponentId) -> Option<PayloadsDispatcher> {
414        self.encoder_dispatchers.remove(component_id)
415    }
416
417    fn take_transform_consumer(&mut self, component_id: &ComponentId) -> Option<EventsConsumer> {
418        self.transform_consumers
419            .remove(component_id)
420            .map(|(_, consumer)| consumer)
421    }
422
423    fn take_destination_consumer(&mut self, component_id: &ComponentId) -> Option<EventsConsumer> {
424        self.destination_consumers
425            .remove(component_id)
426            .map(|(_, consumer)| consumer)
427    }
428
429    fn take_encoder_consumer(&mut self, component_id: &ComponentId) -> Option<EventsConsumer> {
430        self.encoder_consumers
431            .remove(component_id)
432            .map(|(_, consumer)| consumer)
433    }
434
435    fn take_forwarder_consumer(&mut self, component_id: &ComponentId) -> Option<PayloadsConsumer> {
436        self.forwarder_consumers
437            .remove(component_id)
438            .map(|(_, consumer)| consumer)
439    }
440
441    fn generate_interconnects(&mut self, graph: &Graph) -> Result<(), GenericError> {
442        // Collect and iterate over each outbound edge in the topology graph.
443        //
444        // For each upstream component ("from" side of the edge), we attach each downstream component ("to" side of the edge) to it,
445        // creating the relevant dispatcher or consumer if necessary.
446        let outbound_edges = graph.get_outbound_directed_edges();
447        for (upstream_id, output_map) in outbound_edges {
448            match upstream_id.component_type() {
449                ComponentType::Source | ComponentType::Decoder | ComponentType::Transform => {
450                    self.generate_event_interconnect(upstream_id, output_map)?;
451                }
452                ComponentType::Relay | ComponentType::Encoder => {
453                    self.generate_payload_interconnect(upstream_id, output_map)?
454                }
455                _ => panic!(
456                    "Only sources, decoders, transforms, relays, and encoders can dispatch events/payloads to downstream components."
457                ),
458            }
459        }
460
461        Ok(())
462    }
463
464    fn generate_event_interconnect(
465        &mut self, upstream_id: TypedComponentId, output_map: HashMap<OutputName, Vec<TypedComponentId>>,
466    ) -> Result<(), GenericError> {
467        for (upstream_output_id, downstream_ids) in output_map {
468            let mut senders = Vec::new();
469            for downstream_id in downstream_ids {
470                debug!(upstream_id = %upstream_id.component_id(), %upstream_output_id, downstream_id = %downstream_id.component_id(), "Adding dispatcher output.");
471                let sender = self.get_or_create_events_sender(downstream_id);
472                senders.push(sender);
473            }
474
475            let dispatcher = self.get_or_create_events_dispatcher(upstream_id.clone());
476            dispatcher.add_output(upstream_output_id.clone())?;
477
478            for sender in senders {
479                dispatcher.attach_sender_to_output(&upstream_output_id, sender)?;
480            }
481        }
482
483        Ok(())
484    }
485
486    fn generate_payload_interconnect(
487        &mut self, upstream_id: TypedComponentId, output_map: HashMap<OutputName, Vec<TypedComponentId>>,
488    ) -> Result<(), GenericError> {
489        for (upstream_output_id, downstream_ids) in output_map {
490            let mut senders = Vec::new();
491            for downstream_id in downstream_ids {
492                debug!(upstream_id = %upstream_id.component_id(), %upstream_output_id, downstream_id = %downstream_id.component_id(), "Adding dispatcher output.");
493                let sender = self.get_or_create_payloads_sender(downstream_id);
494                senders.push(sender);
495            }
496
497            let dispatcher = self.get_or_create_payloads_dispatcher(upstream_id.clone());
498            dispatcher.add_output(upstream_output_id.clone())?;
499
500            for sender in senders {
501                dispatcher.attach_sender_to_output(&upstream_output_id, sender)?;
502            }
503        }
504
505        Ok(())
506    }
507
508    fn get_or_create_events_dispatcher(&mut self, component_id: TypedComponentId) -> &mut EventsDispatcher {
509        let (component_id, component_type, component_context) = component_id.into_parts();
510
511        match component_type {
512            ComponentType::Source => self
513                .source_dispatchers
514                .entry(component_id)
515                .or_insert_with(|| EventsDispatcher::new(component_context)),
516            ComponentType::Decoder => self
517                .decoder_dispatchers
518                .entry(component_id)
519                .or_insert_with(|| EventsDispatcher::new(component_context)),
520            ComponentType::Transform => self
521                .transform_dispatchers
522                .entry(component_id)
523                .or_insert_with(|| EventsDispatcher::new(component_context)),
524            _ => {
525                panic!("Only sources, decoders, and transforms can dispatch events to downstream components.")
526            }
527        }
528    }
529
530    fn get_or_create_events_sender(&mut self, component_id: TypedComponentId) -> mpsc::Sender<EventsBuffer> {
531        let (component_id, component_type, component_context) = component_id.into_parts();
532        let interconnect_capacity = self.interconnect_capacity;
533
534        let (sender, _) = match component_type {
535            ComponentType::Transform => self
536                .transform_consumers
537                .entry(component_id)
538                .or_insert_with(|| build_events_consumer_pair(component_context, interconnect_capacity)),
539            ComponentType::Destination => self
540                .destination_consumers
541                .entry(component_id)
542                .or_insert_with(|| build_events_consumer_pair(component_context, interconnect_capacity)),
543            ComponentType::Encoder => self
544                .encoder_consumers
545                .entry(component_id)
546                .or_insert_with(|| build_events_consumer_pair(component_context, interconnect_capacity)),
547            _ => panic!("Only transforms, destinations, and encoders can consume events."),
548        };
549
550        sender.clone()
551    }
552
553    fn get_or_create_payloads_dispatcher(&mut self, component_id: TypedComponentId) -> &mut PayloadsDispatcher {
554        let (component_id, component_type, component_context) = component_id.into_parts();
555
556        match component_type {
557            ComponentType::Relay => self
558                .relay_dispatchers
559                .entry(component_id)
560                .or_insert_with(|| PayloadsDispatcher::new(component_context)),
561            ComponentType::Encoder => self
562                .encoder_dispatchers
563                .entry(component_id)
564                .or_insert_with(|| PayloadsDispatcher::new(component_context)),
565            _ => {
566                panic!("Only relays and encoders can dispatch payloads to downstream components.")
567            }
568        }
569    }
570
571    fn get_or_create_payloads_sender(&mut self, component_id: TypedComponentId) -> mpsc::Sender<PayloadsBuffer> {
572        let (component_id, component_type, component_context) = component_id.into_parts();
573        let interconnect_capacity = self.interconnect_capacity;
574
575        let (sender, _) = match component_type {
576            ComponentType::Decoder => self
577                .decoder_consumers
578                .entry(component_id)
579                .or_insert_with(|| build_payloads_consumer_pair(component_context, interconnect_capacity)),
580            ComponentType::Forwarder => self
581                .forwarder_consumers
582                .entry(component_id)
583                .or_insert_with(|| build_payloads_consumer_pair(component_context, interconnect_capacity)),
584            _ => panic!("Only decoders and forwarders can consume payloads."),
585        };
586
587        sender.clone()
588    }
589}
590
591fn build_events_consumer_pair(
592    component_context: ComponentContext, interconnect_capacity: NonZeroUsize,
593) -> (mpsc::Sender<EventsBuffer>, EventsConsumer) {
594    let (sender, receiver) = mpsc::channel(interconnect_capacity.get());
595    let consumer = EventsConsumer::new(component_context, receiver);
596    (sender, consumer)
597}
598
599fn build_payloads_consumer_pair(
600    component_context: ComponentContext, interconnect_capacity: NonZeroUsize,
601) -> (mpsc::Sender<PayloadsBuffer>, PayloadsConsumer) {
602    let (sender, receiver) = mpsc::channel(interconnect_capacity.get());
603    let consumer = PayloadsConsumer::new(component_context, receiver);
604    (sender, consumer)
605}
606
607fn spawn_component<F>(
608    join_set: &mut JoinSet<Result<(), GenericError>>, context: ComponentContext,
609    allocation_group_token: AllocationGroupToken, component_future: F,
610) -> AbortHandle
611where
612    F: Future<Output = Result<(), GenericError>> + Send + 'static,
613{
614    let component_span = error_span!(
615        "component",
616        "type" = context.component_type().as_str(),
617        id = %context.component_id(),
618    );
619
620    let _span = component_span.enter();
621    let _guard = allocation_group_token.enter();
622
623    let component_task_name = format!(
624        "topology-{}-{}",
625        context.component_type().as_str(),
626        context.component_id()
627    );
628    join_set.spawn_traced_named(component_task_name, component_future)
629}
630
631#[cfg(test)]
632mod tests {
633    use std::num::NonZeroUsize;
634
635    use super::*;
636    use crate::data_model::event::EventType;
637    use crate::data_model::payload::PayloadType;
638    use crate::topology::graph::Graph;
639
640    #[test]
641    fn component_interconnects_adds_output_before_attaching() {
642        let mut graph = Graph::default();
643
644        // Create a set of components and connect them together.
645        graph
646            .with_source("source1", EventType::EventD)
647            .with_transform("transform1", EventType::EventD, EventType::EventD)
648            .with_encoder("encoder1", EventType::EventD, PayloadType::Raw)
649            .with_forwarder("forwarder1", PayloadType::Raw)
650            .with_destination("dest1", EventType::EventD)
651            .with_edge("source1", "transform1")
652            .with_edge("transform1", "encoder1")
653            .with_edge("encoder1", "forwarder1")
654            .with_edge("transform1", "dest1");
655
656        // Ensure we can properly build the interconnects for them, which requires adding the outputs
657        // before attaching senders to them:
658        let interconnect_capacity = NonZeroUsize::new(10).unwrap();
659        let _ = ComponentInterconnects::from_graph(interconnect_capacity, &graph)
660            .expect("should build interconnects successfully");
661    }
662}