Skip to main content

saluki_core/topology/
built.rs

1use std::{collections::HashMap, future::Future, num::NonZeroUsize};
2
3use memory_accounting::{
4    allocator::{AllocationGroupToken, Tracked},
5    MemoryLimiter,
6};
7use saluki_common::task::JoinSetExt as _;
8use saluki_error::{generic_error, ErrorContext as _, GenericError};
9use tokio::{
10    runtime::Handle,
11    sync::mpsc,
12    task::{AbortHandle, JoinSet},
13};
14use tracing::{debug, error_span};
15
16/// Configuration for the worker pool used by the topology.
17///
18/// This controls where tasks spawned by components themselves are executed. When components
19/// have heavy, compute-bound tasks that they need to execute, they should generally be spawned on the worker pool (also known as the "global thread pool") to
20/// avoid scheduling contention/starvation with the core component tasks.
21pub enum WorkerPoolConfiguration {
22    /// Use the ambient Tokio runtime (that is, `Handle::current()`).
23    ///
24    /// Component subtasks are spawned on whatever runtime is currently active.
25    /// Useful when the topology is embedded in an application that already
26    /// manages its own runtime, such as a Lambda extension.
27    Ambient,
28
29    /// Create a dedicated, multi-threaded Tokio runtime with 8 worker threads.
30    ///
31    /// This is the default behavior.
32    Dedicated,
33
34    /// Use an externally provided runtime.
35    ///
36    /// Component subtasks are spawned on the runtime associated with the given handle.
37    Explicit(Handle),
38}
39
40use super::{
41    graph::Graph, running::RunningTopology, shutdown::ComponentShutdownCoordinator, ComponentId, EventsBuffer,
42    EventsConsumer, OutputName, PayloadsConsumer, RegisteredComponent, TypedComponentId,
43};
44use crate::health::HealthRegistry;
45use crate::{
46    components::{
47        decoders::{Decoder, DecoderContext},
48        destinations::{Destination, DestinationContext},
49        encoders::{Encoder, EncoderContext},
50        forwarders::{Forwarder, ForwarderContext},
51        relays::{Relay, RelayContext},
52        sources::{Source, SourceContext},
53        transforms::{Transform, TransformContext},
54        ComponentContext, ComponentType,
55    },
56    topology::{context::TopologyContext, EventsDispatcher, PayloadsBuffer, PayloadsDispatcher},
57};
58
59/// A built topology.
60///
61/// Built topologies represent a topology blueprint where each configured component, along with their associated
62/// connections to other components, was validated and built successfully.
63///
64/// A built topology must be spawned via [`spawn`][Self::spawn].
65pub struct BuiltTopology {
66    name: String,
67    graph: Graph,
68    sources: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Source + Send>>>>,
69    relays: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Relay + Send>>>>,
70    decoders: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Decoder + Send>>>>,
71    transforms: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Transform + Send>>>>,
72    destinations: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Destination + Send>>>>,
73    encoders: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Encoder + Send>>>>,
74    forwarders: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Forwarder + Send>>>>,
75    component_token: AllocationGroupToken,
76    interconnect_capacity: NonZeroUsize,
77    worker_pool_config: WorkerPoolConfiguration,
78}
79
80impl BuiltTopology {
81    #[allow(clippy::too_many_arguments)]
82    pub(crate) fn from_parts(
83        name: String, graph: Graph,
84        sources: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Source + Send>>>>,
85        relays: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Relay + Send>>>>,
86        decoders: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Decoder + Send>>>>,
87        transforms: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Transform + Send>>>>,
88        destinations: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Destination + Send>>>>,
89        encoders: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Encoder + Send>>>>,
90        forwarders: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Forwarder + Send>>>>,
91        component_token: AllocationGroupToken, interconnect_capacity: NonZeroUsize,
92    ) -> Self {
93        Self {
94            name,
95            graph,
96            sources,
97            relays,
98            decoders,
99            transforms,
100            destinations,
101            encoders,
102            forwarders,
103            component_token,
104            interconnect_capacity,
105            worker_pool_config: WorkerPoolConfiguration::Dedicated,
106        }
107    }
108
109    /// Configures the topology to use the ambient Tokio runtime.
110    ///
111    /// Component subtasks will be spawned on whatever runtime is currently active
112    /// when `spawn` is called. This avoids creating a dedicated thread pool,
113    /// which is useful for resource-constrained environments like Lambda.
114    pub fn with_ambient_worker_pool(mut self) -> Self {
115        self.worker_pool_config = WorkerPoolConfiguration::Ambient;
116        self
117    }
118
119    /// Configures the topology to use an externally provided Tokio runtime.
120    ///
121    /// Component subtasks will be spawned on the runtime associated with the given handle.
122    pub fn with_explicit_worker_pool(mut self, handle: Handle) -> Self {
123        self.worker_pool_config = WorkerPoolConfiguration::Explicit(handle);
124        self
125    }
126
127    fn resolve_worker_pool_handle(&self) -> Result<Handle, GenericError> {
128        match &self.worker_pool_config {
129            WorkerPoolConfiguration::Ambient => Ok(Handle::current()),
130            WorkerPoolConfiguration::Dedicated => {
131                let thread_pool = tokio::runtime::Builder::new_multi_thread()
132                    .worker_threads(8)
133                    .enable_all()
134                    .build()
135                    .error_context("Failed to build asynchronous thread pool runtime.")?;
136                let handle = thread_pool.handle().clone();
137
138                std::thread::spawn(move || {
139                    thread_pool.block_on(std::future::pending::<()>());
140                });
141
142                Ok(handle)
143            }
144            WorkerPoolConfiguration::Explicit(handle) => Ok(handle.clone()),
145        }
146    }
147
148    /// Spawns the topology.
149    ///
150    /// By default, a dedicated, multi-threaded Tokio runtime (8 threads) is created for
151    /// components to spawn compute-heavy subtasks on. Use
152    /// [`with_ambient_worker_pool`][Self::with_ambient_worker_pool] or
153    /// [`with_explicit_worker_pool`][Self::with_explicit_worker_pool] to change this
154    /// before calling `spawn`.
155    ///
156    /// A handle is returned that can be used to trigger the topology to shutdown.
157    ///
158    /// ## Errors
159    ///
160    /// If an error occurs while spawning the topology, an error is returned.
161    pub async fn spawn(
162        self, health_registry: &HealthRegistry, memory_limiter: MemoryLimiter,
163    ) -> Result<RunningTopology, GenericError> {
164        let root_component_name = format!("topology.{}", self.name);
165
166        let _guard = self.component_token.enter();
167
168        let thread_pool_handle = self.resolve_worker_pool_handle()?;
169        let topology_context = TopologyContext::new(memory_limiter, health_registry.clone(), thread_pool_handle);
170
171        let mut component_tasks = JoinSet::new();
172        let mut component_task_map = HashMap::new();
173
174        // Build our interconnects, which we'll grab from piecemeal as we spawn our components.
175        let mut interconnects = ComponentInterconnects::from_graph(self.interconnect_capacity, &self.graph)
176            .error_context("Failed to build component interconnects.")?;
177
178        let mut shutdown_coordinator = ComponentShutdownCoordinator::default();
179
180        // Spawn our sources.
181        for (component_id, source) in self.sources {
182            let (source, component_registry) = source.into_parts();
183
184            let dispatcher = interconnects
185                .take_source_dispatcher(&component_id)
186                .ok_or_else(|| generic_error!("No events dispatcher found for source component '{}'", component_id))?;
187
188            let shutdown_handle = shutdown_coordinator.register();
189            let health_handle = health_registry
190                .register_component(format!("{}.sources.{}", root_component_name, component_id))
191                .expect("duplicate source component ID in health registry");
192
193            let component_context = ComponentContext::source(component_id.clone());
194            let context = SourceContext::new(
195                &topology_context,
196                &component_context,
197                component_registry,
198                shutdown_handle,
199                health_handle,
200                dispatcher,
201            );
202
203            let (alloc_group, source) = source.into_parts();
204            let task_handle = spawn_component(
205                &mut component_tasks,
206                component_context,
207                alloc_group,
208                source.run(context),
209            );
210            component_task_map.insert(task_handle.id(), component_id);
211        }
212
213        // Spawn our relays.
214        for (component_id, relay) in self.relays {
215            let (relay, component_registry) = relay.into_parts();
216
217            let dispatcher = interconnects
218                .take_relay_dispatcher(&component_id)
219                .ok_or_else(|| generic_error!("No payloads dispatcher found for relay component '{}'", component_id))?;
220
221            let shutdown_handle = shutdown_coordinator.register();
222            let health_handle = health_registry
223                .register_component(format!("{}.relays.{}", root_component_name, component_id))
224                .expect("duplicate relay component ID in health registry");
225
226            let component_context = ComponentContext::relay(component_id.clone());
227            let context = RelayContext::new(
228                &topology_context,
229                &component_context,
230                component_registry,
231                shutdown_handle,
232                health_handle,
233                dispatcher,
234            );
235
236            let (alloc_group, relay) = relay.into_parts();
237            let task_handle = spawn_component(&mut component_tasks, component_context, alloc_group, relay.run(context));
238            component_task_map.insert(task_handle.id(), component_id);
239        }
240
241        // Spawn our decoders.
242        for (component_id, decoder) in self.decoders {
243            let (decoder, component_registry) = decoder.into_parts();
244
245            let dispatcher = interconnects
246                .take_decoder_dispatcher(&component_id)
247                .ok_or_else(|| generic_error!("No events dispatcher found for decoder component '{}'", component_id))?;
248
249            let consumer = interconnects
250                .take_decoder_consumer(&component_id)
251                .ok_or_else(|| generic_error!("No payloads consumer found for decoder component '{}'", component_id))?;
252
253            let health_handle = health_registry
254                .register_component(format!("{}.decoders.{}", root_component_name, component_id))
255                .expect("duplicate decoder component ID in health registry");
256
257            let component_context = ComponentContext::decoder(component_id.clone());
258            let context = DecoderContext::new(
259                &topology_context,
260                &component_context,
261                component_registry,
262                health_handle,
263                dispatcher,
264                consumer,
265            );
266
267            let (alloc_group, decoder) = decoder.into_parts();
268            let task_handle = spawn_component(
269                &mut component_tasks,
270                component_context,
271                alloc_group,
272                decoder.run(context),
273            );
274            component_task_map.insert(task_handle.id(), component_id);
275        }
276
277        // Spawn our transforms.
278        for (component_id, transform) in self.transforms {
279            let (transform, component_registry) = transform.into_parts();
280
281            let dispatcher = interconnects.take_transform_dispatcher(&component_id).ok_or_else(|| {
282                generic_error!("No events dispatcher found for transform component '{}'", component_id)
283            })?;
284
285            let consumer = interconnects
286                .take_transform_consumer(&component_id)
287                .ok_or_else(|| generic_error!("No events consumer found for transform component '{}'", component_id))?;
288
289            let health_handle = health_registry
290                .register_component(format!("{}.transforms.{}", root_component_name, component_id))
291                .expect("duplicate transform component ID in health registry");
292
293            let component_context = ComponentContext::transform(component_id.clone());
294            let context = TransformContext::new(
295                &topology_context,
296                &component_context,
297                component_registry,
298                health_handle,
299                dispatcher,
300                consumer,
301            );
302
303            let (alloc_group, transform) = transform.into_parts();
304            let task_handle = spawn_component(
305                &mut component_tasks,
306                component_context,
307                alloc_group,
308                transform.run(context),
309            );
310            component_task_map.insert(task_handle.id(), component_id);
311        }
312
313        // Spawn our destinations.
314        for (component_id, destination) in self.destinations {
315            let (destination, component_registry) = destination.into_parts();
316
317            let consumer = interconnects.take_destination_consumer(&component_id).ok_or_else(|| {
318                generic_error!("No events consumer found for destination component '{}'", component_id)
319            })?;
320
321            let health_handle = health_registry
322                .register_component(format!("{}.destinations.{}", root_component_name, component_id))
323                .expect("duplicate destination component ID in health registry");
324
325            let component_context = ComponentContext::destination(component_id.clone());
326            let context = DestinationContext::new(
327                &topology_context,
328                &component_context,
329                component_registry,
330                health_handle,
331                consumer,
332            );
333
334            let (alloc_group, destination) = destination.into_parts();
335            let task_handle = spawn_component(
336                &mut component_tasks,
337                component_context,
338                alloc_group,
339                destination.run(context),
340            );
341            component_task_map.insert(task_handle.id(), component_id);
342        }
343
344        // Spawn our encoders.
345        for (component_id, encoder) in self.encoders {
346            let (encoder, component_registry) = encoder.into_parts();
347
348            let dispatcher = interconnects.take_encoder_dispatcher(&component_id).ok_or_else(|| {
349                generic_error!("No payloads dispatcher found for encoder component '{}'", component_id)
350            })?;
351
352            let consumer = interconnects
353                .take_encoder_consumer(&component_id)
354                .ok_or_else(|| generic_error!("No events consumer found for encoder component '{}'", component_id))?;
355
356            let health_handle = health_registry
357                .register_component(format!("{}.encoders.{}", root_component_name, component_id))
358                .expect("duplicate encoder component ID in health registry");
359
360            let component_context = ComponentContext::encoder(component_id.clone());
361            let context = EncoderContext::new(
362                &topology_context,
363                &component_context,
364                component_registry,
365                health_handle,
366                dispatcher,
367                consumer,
368            );
369
370            let (alloc_group, encoder) = encoder.into_parts();
371            let task_handle = spawn_component(
372                &mut component_tasks,
373                component_context,
374                alloc_group,
375                encoder.run(context),
376            );
377            component_task_map.insert(task_handle.id(), component_id);
378        }
379
380        // Spawn our forwarders.
381        for (component_id, forwarder) in self.forwarders {
382            let (forwarder, component_registry) = forwarder.into_parts();
383
384            let consumer = interconnects.take_forwarder_consumer(&component_id).ok_or_else(|| {
385                generic_error!("No payloads consumer found for forwarder component '{}'", component_id)
386            })?;
387
388            let health_handle = health_registry
389                .register_component(format!("{}.forwarders.{}", root_component_name, component_id))
390                .expect("duplicate forwarder component ID in health registry");
391
392            let component_context = ComponentContext::forwarder(component_id.clone());
393            let context = ForwarderContext::new(
394                &topology_context,
395                &component_context,
396                component_registry,
397                health_handle,
398                consumer,
399            );
400
401            let (alloc_group, forwarder) = forwarder.into_parts();
402            let task_handle = spawn_component(
403                &mut component_tasks,
404                component_context,
405                alloc_group,
406                forwarder.run(context),
407            );
408            component_task_map.insert(task_handle.id(), component_id);
409        }
410
411        Ok(RunningTopology::from_parts(
412            shutdown_coordinator,
413            component_tasks,
414            component_task_map,
415        ))
416    }
417}
418
419struct ComponentInterconnects {
420    interconnect_capacity: NonZeroUsize,
421    source_dispatchers: HashMap<ComponentId, EventsDispatcher>,
422    relay_dispatchers: HashMap<ComponentId, PayloadsDispatcher>,
423    decoder_consumers: HashMap<ComponentId, (mpsc::Sender<PayloadsBuffer>, PayloadsConsumer)>,
424    decoder_dispatchers: HashMap<ComponentId, EventsDispatcher>,
425    transform_consumers: HashMap<ComponentId, (mpsc::Sender<EventsBuffer>, EventsConsumer)>,
426    transform_dispatchers: HashMap<ComponentId, EventsDispatcher>,
427    destination_consumers: HashMap<ComponentId, (mpsc::Sender<EventsBuffer>, EventsConsumer)>,
428    encoder_consumers: HashMap<ComponentId, (mpsc::Sender<EventsBuffer>, EventsConsumer)>,
429    encoder_dispatchers: HashMap<ComponentId, PayloadsDispatcher>,
430    forwarder_consumers: HashMap<ComponentId, (mpsc::Sender<PayloadsBuffer>, PayloadsConsumer)>,
431}
432
433impl ComponentInterconnects {
434    fn from_graph(interconnect_capacity: NonZeroUsize, graph: &Graph) -> Result<Self, GenericError> {
435        let mut interconnects = Self {
436            interconnect_capacity,
437            source_dispatchers: HashMap::new(),
438            relay_dispatchers: HashMap::new(),
439            decoder_consumers: HashMap::new(),
440            decoder_dispatchers: HashMap::new(),
441            transform_consumers: HashMap::new(),
442            transform_dispatchers: HashMap::new(),
443            destination_consumers: HashMap::new(),
444            encoder_consumers: HashMap::new(),
445            encoder_dispatchers: HashMap::new(),
446            forwarder_consumers: HashMap::new(),
447        };
448
449        interconnects.generate_interconnects(graph)?;
450        Ok(interconnects)
451    }
452
453    fn take_source_dispatcher(&mut self, component_id: &ComponentId) -> Option<EventsDispatcher> {
454        self.source_dispatchers.remove(component_id)
455    }
456
457    fn take_relay_dispatcher(&mut self, component_id: &ComponentId) -> Option<PayloadsDispatcher> {
458        self.relay_dispatchers.remove(component_id)
459    }
460
461    fn take_decoder_dispatcher(&mut self, component_id: &ComponentId) -> Option<EventsDispatcher> {
462        self.decoder_dispatchers.remove(component_id)
463    }
464
465    fn take_decoder_consumer(&mut self, component_id: &ComponentId) -> Option<PayloadsConsumer> {
466        self.decoder_consumers
467            .remove(component_id)
468            .map(|(_, consumer)| consumer)
469    }
470
471    fn take_transform_dispatcher(&mut self, component_id: &ComponentId) -> Option<EventsDispatcher> {
472        self.transform_dispatchers.remove(component_id)
473    }
474
475    fn take_encoder_dispatcher(&mut self, component_id: &ComponentId) -> Option<PayloadsDispatcher> {
476        self.encoder_dispatchers.remove(component_id)
477    }
478
479    fn take_transform_consumer(&mut self, component_id: &ComponentId) -> Option<EventsConsumer> {
480        self.transform_consumers
481            .remove(component_id)
482            .map(|(_, consumer)| consumer)
483    }
484
485    fn take_destination_consumer(&mut self, component_id: &ComponentId) -> Option<EventsConsumer> {
486        self.destination_consumers
487            .remove(component_id)
488            .map(|(_, consumer)| consumer)
489    }
490
491    fn take_encoder_consumer(&mut self, component_id: &ComponentId) -> Option<EventsConsumer> {
492        self.encoder_consumers
493            .remove(component_id)
494            .map(|(_, consumer)| consumer)
495    }
496
497    fn take_forwarder_consumer(&mut self, component_id: &ComponentId) -> Option<PayloadsConsumer> {
498        self.forwarder_consumers
499            .remove(component_id)
500            .map(|(_, consumer)| consumer)
501    }
502
503    fn generate_interconnects(&mut self, graph: &Graph) -> Result<(), GenericError> {
504        // Collect and iterate over each outbound edge in the topology graph.
505        //
506        // For each upstream component ("from" side of the edge), we attach each downstream component ("to" side of the edge) to it,
507        // creating the relevant dispatcher or consumer if necessary.
508        let outbound_edges = graph.get_outbound_directed_edges();
509        for (upstream_id, output_map) in outbound_edges {
510            match upstream_id.component_type() {
511                ComponentType::Source | ComponentType::Decoder | ComponentType::Transform => {
512                    self.generate_event_interconnect(upstream_id, output_map)?;
513                }
514                ComponentType::Relay | ComponentType::Encoder => {
515                    self.generate_payload_interconnect(upstream_id, output_map)?
516                }
517                _ => panic!(
518                    "Only sources, decoders, transforms, relays, and encoders can dispatch events/payloads to downstream components."
519                ),
520            }
521        }
522
523        Ok(())
524    }
525
526    fn generate_event_interconnect(
527        &mut self, upstream_id: TypedComponentId, output_map: HashMap<OutputName, Vec<TypedComponentId>>,
528    ) -> Result<(), GenericError> {
529        for (upstream_output_id, downstream_ids) in output_map {
530            let mut senders = Vec::new();
531            for downstream_id in downstream_ids {
532                debug!(upstream_id = %upstream_id.component_id(), %upstream_output_id, downstream_id = %downstream_id.component_id(), "Adding dispatcher output.");
533                let sender = self.get_or_create_events_sender(downstream_id);
534                senders.push(sender);
535            }
536
537            let dispatcher = self.get_or_create_events_dispatcher(upstream_id.clone());
538            dispatcher.add_output(upstream_output_id.clone())?;
539
540            for sender in senders {
541                dispatcher.attach_sender_to_output(&upstream_output_id, sender)?;
542            }
543        }
544
545        Ok(())
546    }
547
548    fn generate_payload_interconnect(
549        &mut self, upstream_id: TypedComponentId, output_map: HashMap<OutputName, Vec<TypedComponentId>>,
550    ) -> Result<(), GenericError> {
551        for (upstream_output_id, downstream_ids) in output_map {
552            let mut senders = Vec::new();
553            for downstream_id in downstream_ids {
554                debug!(upstream_id = %upstream_id.component_id(), %upstream_output_id, downstream_id = %downstream_id.component_id(), "Adding dispatcher output.");
555                let sender = self.get_or_create_payloads_sender(downstream_id);
556                senders.push(sender);
557            }
558
559            let dispatcher = self.get_or_create_payloads_dispatcher(upstream_id.clone());
560            dispatcher.add_output(upstream_output_id.clone())?;
561
562            for sender in senders {
563                dispatcher.attach_sender_to_output(&upstream_output_id, sender)?;
564            }
565        }
566
567        Ok(())
568    }
569
570    fn get_or_create_events_dispatcher(&mut self, component_id: TypedComponentId) -> &mut EventsDispatcher {
571        let (component_id, component_type, component_context) = component_id.into_parts();
572
573        match component_type {
574            ComponentType::Source => self
575                .source_dispatchers
576                .entry(component_id)
577                .or_insert_with(|| EventsDispatcher::new(component_context)),
578            ComponentType::Decoder => self
579                .decoder_dispatchers
580                .entry(component_id)
581                .or_insert_with(|| EventsDispatcher::new(component_context)),
582            ComponentType::Transform => self
583                .transform_dispatchers
584                .entry(component_id)
585                .or_insert_with(|| EventsDispatcher::new(component_context)),
586            _ => {
587                panic!("Only sources, decoders, and transforms can dispatch events to downstream components.")
588            }
589        }
590    }
591
592    fn get_or_create_events_sender(&mut self, component_id: TypedComponentId) -> mpsc::Sender<EventsBuffer> {
593        let (component_id, component_type, component_context) = component_id.into_parts();
594        let interconnect_capacity = self.interconnect_capacity;
595
596        let (sender, _) = match component_type {
597            ComponentType::Transform => self
598                .transform_consumers
599                .entry(component_id)
600                .or_insert_with(|| build_events_consumer_pair(component_context, interconnect_capacity)),
601            ComponentType::Destination => self
602                .destination_consumers
603                .entry(component_id)
604                .or_insert_with(|| build_events_consumer_pair(component_context, interconnect_capacity)),
605            ComponentType::Encoder => self
606                .encoder_consumers
607                .entry(component_id)
608                .or_insert_with(|| build_events_consumer_pair(component_context, interconnect_capacity)),
609            _ => panic!("Only transforms, destinations, and encoders can consume events."),
610        };
611
612        sender.clone()
613    }
614
615    fn get_or_create_payloads_dispatcher(&mut self, component_id: TypedComponentId) -> &mut PayloadsDispatcher {
616        let (component_id, component_type, component_context) = component_id.into_parts();
617
618        match component_type {
619            ComponentType::Relay => self
620                .relay_dispatchers
621                .entry(component_id)
622                .or_insert_with(|| PayloadsDispatcher::new(component_context)),
623            ComponentType::Encoder => self
624                .encoder_dispatchers
625                .entry(component_id)
626                .or_insert_with(|| PayloadsDispatcher::new(component_context)),
627            _ => {
628                panic!("Only relays and encoders can dispatch payloads to downstream components.")
629            }
630        }
631    }
632
633    fn get_or_create_payloads_sender(&mut self, component_id: TypedComponentId) -> mpsc::Sender<PayloadsBuffer> {
634        let (component_id, component_type, component_context) = component_id.into_parts();
635        let interconnect_capacity = self.interconnect_capacity;
636
637        let (sender, _) = match component_type {
638            ComponentType::Decoder => self
639                .decoder_consumers
640                .entry(component_id)
641                .or_insert_with(|| build_payloads_consumer_pair(component_context, interconnect_capacity)),
642            ComponentType::Forwarder => self
643                .forwarder_consumers
644                .entry(component_id)
645                .or_insert_with(|| build_payloads_consumer_pair(component_context, interconnect_capacity)),
646            _ => panic!("Only decoders and forwarders can consume payloads."),
647        };
648
649        sender.clone()
650    }
651}
652
653fn build_events_consumer_pair(
654    component_context: ComponentContext, interconnect_capacity: NonZeroUsize,
655) -> (mpsc::Sender<EventsBuffer>, EventsConsumer) {
656    let (sender, receiver) = mpsc::channel(interconnect_capacity.get());
657    let consumer = EventsConsumer::new(component_context, receiver);
658    (sender, consumer)
659}
660
661fn build_payloads_consumer_pair(
662    component_context: ComponentContext, interconnect_capacity: NonZeroUsize,
663) -> (mpsc::Sender<PayloadsBuffer>, PayloadsConsumer) {
664    let (sender, receiver) = mpsc::channel(interconnect_capacity.get());
665    let consumer = PayloadsConsumer::new(component_context, receiver);
666    (sender, consumer)
667}
668
669fn spawn_component<F>(
670    join_set: &mut JoinSet<Result<(), GenericError>>, context: ComponentContext,
671    allocation_group_token: AllocationGroupToken, component_future: F,
672) -> AbortHandle
673where
674    F: Future<Output = Result<(), GenericError>> + Send + 'static,
675{
676    let component_span = error_span!(
677        "component",
678        "type" = context.component_type().as_str(),
679        id = %context.component_id(),
680    );
681
682    let _span = component_span.enter();
683    let _guard = allocation_group_token.enter();
684
685    let component_task_name = format!(
686        "topology-{}-{}",
687        context.component_type().as_str(),
688        context.component_id()
689    );
690    join_set.spawn_traced_named(component_task_name, component_future)
691}
692
693#[cfg(test)]
694mod tests {
695    use std::num::NonZeroUsize;
696
697    use super::*;
698    use crate::data_model::event::EventType;
699    use crate::data_model::payload::PayloadType;
700    use crate::topology::graph::Graph;
701
702    #[test]
703    fn component_interconnects_adds_output_before_attaching() {
704        let mut graph = Graph::default();
705
706        // Create a set of components and connect them together.
707        graph
708            .with_source("source1", EventType::EventD)
709            .with_transform("transform1", EventType::EventD, EventType::EventD)
710            .with_encoder("encoder1", EventType::EventD, PayloadType::Raw)
711            .with_forwarder("forwarder1", PayloadType::Raw)
712            .with_destination("dest1", EventType::EventD)
713            .with_edge("source1", "transform1")
714            .with_edge("transform1", "encoder1")
715            .with_edge("encoder1", "forwarder1")
716            .with_edge("transform1", "dest1");
717
718        // Ensure we can properly build the interconnects for them, which requires adding the outputs
719        // before attaching senders to them:
720        let interconnect_capacity = NonZeroUsize::new(10).unwrap();
721        let _ = ComponentInterconnects::from_graph(interconnect_capacity, &graph)
722            .expect("should build interconnects successfully");
723    }
724}