Skip to main content

saluki_core/topology/
built.rs

1use std::{collections::HashMap, future::Future, num::NonZeroUsize};
2
3use resource_accounting::{MemoryLimiter, ResourceGroupToken, Tracked};
4use saluki_common::task::JoinSetExt as _;
5use saluki_error::{generic_error, ErrorContext as _, GenericError};
6use tokio::{
7    runtime::Handle,
8    sync::mpsc,
9    task::{AbortHandle, JoinSet},
10};
11use tracing::{debug, error_span};
12
13/// Configuration for the worker pool used by the topology.
14///
15/// This controls where tasks spawned by components themselves are executed. When components
16/// have heavy, compute-bound tasks that they need to execute, they should generally be spawned on the worker pool (also known as the "global thread pool") to
17/// avoid scheduling contention/starvation with the core component tasks.
18pub enum WorkerPoolConfiguration {
19    /// Use the ambient Tokio runtime (that's, `Handle::current()`).
20    ///
21    /// Component subtasks are spawned on whatever runtime is currently active.
22    /// Useful when the topology is embedded in an application that already
23    /// manages its own runtime, such as a Lambda extension.
24    Ambient,
25
26    /// Create a dedicated, multi-threaded Tokio runtime with 8 worker threads.
27    ///
28    /// This is the default behavior.
29    Dedicated,
30
31    /// Use an externally provided runtime.
32    ///
33    /// Component subtasks are spawned on the runtime associated with the given handle.
34    Explicit(Handle),
35}
36
37use super::{
38    graph::Graph, running::RunningTopology, shutdown::ComponentShutdownCoordinator, ComponentId, EventsBuffer,
39    EventsConsumer, OutputName, PayloadsConsumer, RegisteredComponent, TypedComponentId,
40};
41use crate::health::HealthRegistry;
42use crate::{
43    components::{
44        decoders::{Decoder, DecoderContext},
45        destinations::{Destination, DestinationContext},
46        encoders::{Encoder, EncoderContext},
47        forwarders::{Forwarder, ForwarderContext},
48        relays::{Relay, RelayContext},
49        sources::{Source, SourceContext},
50        transforms::{Transform, TransformContext},
51        ComponentContext, ComponentType,
52    },
53    topology::{context::TopologyContext, EventsDispatcher, PayloadsBuffer, PayloadsDispatcher},
54};
55
56/// A built topology.
57///
58/// Built topologies represent a topology blueprint where each configured component, along with their associated
59/// connections to other components, was validated and built successfully.
60///
61/// A built topology must be spawned via [`spawn`][Self::spawn].
62pub struct BuiltTopology {
63    name: String,
64    graph: Graph,
65    sources: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Source + Send>>>>,
66    relays: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Relay + Send>>>>,
67    decoders: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Decoder + Send>>>>,
68    transforms: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Transform + Send>>>>,
69    destinations: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Destination + Send>>>>,
70    encoders: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Encoder + Send>>>>,
71    forwarders: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Forwarder + Send>>>>,
72    component_token: ResourceGroupToken,
73    interconnect_capacity: NonZeroUsize,
74    worker_pool_config: WorkerPoolConfiguration,
75}
76
77impl BuiltTopology {
78    #[allow(clippy::too_many_arguments)]
79    pub(crate) fn from_parts(
80        name: String, graph: Graph,
81        sources: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Source + Send>>>>,
82        relays: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Relay + Send>>>>,
83        decoders: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Decoder + Send>>>>,
84        transforms: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Transform + Send>>>>,
85        destinations: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Destination + Send>>>>,
86        encoders: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Encoder + Send>>>>,
87        forwarders: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Forwarder + Send>>>>,
88        component_token: ResourceGroupToken, interconnect_capacity: NonZeroUsize,
89    ) -> Self {
90        Self {
91            name,
92            graph,
93            sources,
94            relays,
95            decoders,
96            transforms,
97            destinations,
98            encoders,
99            forwarders,
100            component_token,
101            interconnect_capacity,
102            worker_pool_config: WorkerPoolConfiguration::Dedicated,
103        }
104    }
105
106    /// Configures the topology to use the ambient Tokio runtime.
107    ///
108    /// Component subtasks will be spawned on whatever runtime is currently active
109    /// when `spawn` is called. This avoids creating a dedicated thread pool,
110    /// which is useful for resource-constrained environments like Lambda.
111    pub fn with_ambient_worker_pool(mut self) -> Self {
112        self.worker_pool_config = WorkerPoolConfiguration::Ambient;
113        self
114    }
115
116    /// Configures the topology to use an externally provided Tokio runtime.
117    ///
118    /// Component subtasks will be spawned on the runtime associated with the given handle.
119    pub fn with_explicit_worker_pool(mut self, handle: Handle) -> Self {
120        self.worker_pool_config = WorkerPoolConfiguration::Explicit(handle);
121        self
122    }
123
124    fn resolve_worker_pool_handle(&self) -> Result<Handle, GenericError> {
125        match &self.worker_pool_config {
126            WorkerPoolConfiguration::Ambient => Ok(Handle::current()),
127            WorkerPoolConfiguration::Dedicated => {
128                let thread_pool = tokio::runtime::Builder::new_multi_thread()
129                    .worker_threads(8)
130                    .enable_all()
131                    .build()
132                    .error_context("Failed to build asynchronous thread pool runtime.")?;
133                let handle = thread_pool.handle().clone();
134
135                std::thread::spawn(move || {
136                    thread_pool.block_on(std::future::pending::<()>());
137                });
138
139                Ok(handle)
140            }
141            WorkerPoolConfiguration::Explicit(handle) => Ok(handle.clone()),
142        }
143    }
144
145    /// Spawns the topology.
146    ///
147    /// By default, a dedicated, multi-threaded Tokio runtime (8 threads) is created for
148    /// components to spawn compute-heavy subtasks on. Use
149    /// [`with_ambient_worker_pool`][Self::with_ambient_worker_pool] or
150    /// [`with_explicit_worker_pool`][Self::with_explicit_worker_pool] to change this
151    /// before calling `spawn`.
152    ///
153    /// A handle is returned that can be used to trigger the topology to shutdown.
154    ///
155    /// ## Errors
156    ///
157    /// If an error occurs while spawning the topology, an error is returned.
158    pub async fn spawn(
159        self, health_registry: &HealthRegistry, memory_limiter: MemoryLimiter,
160    ) -> Result<RunningTopology, GenericError> {
161        let root_component_name = format!("topology.{}", self.name);
162
163        let _guard = self.component_token.enter();
164
165        let thread_pool_handle = self.resolve_worker_pool_handle()?;
166        let topology_context = TopologyContext::new(memory_limiter, health_registry.clone(), thread_pool_handle);
167
168        let mut component_tasks = JoinSet::new();
169        let mut component_task_map = HashMap::new();
170
171        // Build our interconnects, which we'll grab from piecemeal as we spawn our components.
172        let mut interconnects = ComponentInterconnects::from_graph(self.interconnect_capacity, &self.graph)
173            .error_context("Failed to build component interconnects.")?;
174
175        let mut shutdown_coordinator = ComponentShutdownCoordinator::default();
176
177        // Spawn our sources.
178        for (component_id, source) in self.sources {
179            let (source, component_registry) = source.into_parts();
180
181            let dispatcher = interconnects
182                .take_source_dispatcher(&component_id)
183                .ok_or_else(|| generic_error!("No events dispatcher found for source component '{}'", component_id))?;
184
185            let shutdown_handle = shutdown_coordinator.register();
186            let health_handle = health_registry
187                .register_component(format!("{}.sources.{}", root_component_name, component_id))
188                .expect("duplicate source component ID in health registry");
189
190            let component_context = ComponentContext::source(component_id.clone());
191            let context = SourceContext::new(
192                &topology_context,
193                &component_context,
194                component_registry,
195                shutdown_handle,
196                health_handle,
197                dispatcher,
198            );
199
200            let (alloc_group, source) = source.into_parts();
201            let task_handle = spawn_component(
202                &mut component_tasks,
203                component_context,
204                alloc_group,
205                source.run(context),
206            );
207            component_task_map.insert(task_handle.id(), component_id);
208        }
209
210        // Spawn our relays.
211        for (component_id, relay) in self.relays {
212            let (relay, component_registry) = relay.into_parts();
213
214            let dispatcher = interconnects
215                .take_relay_dispatcher(&component_id)
216                .ok_or_else(|| generic_error!("No payloads dispatcher found for relay component '{}'", component_id))?;
217
218            let shutdown_handle = shutdown_coordinator.register();
219            let health_handle = health_registry
220                .register_component(format!("{}.relays.{}", root_component_name, component_id))
221                .expect("duplicate relay component ID in health registry");
222
223            let component_context = ComponentContext::relay(component_id.clone());
224            let context = RelayContext::new(
225                &topology_context,
226                &component_context,
227                component_registry,
228                shutdown_handle,
229                health_handle,
230                dispatcher,
231            );
232
233            let (alloc_group, relay) = relay.into_parts();
234            let task_handle = spawn_component(&mut component_tasks, component_context, alloc_group, relay.run(context));
235            component_task_map.insert(task_handle.id(), component_id);
236        }
237
238        // Spawn our decoders.
239        for (component_id, decoder) in self.decoders {
240            let (decoder, component_registry) = decoder.into_parts();
241
242            let dispatcher = interconnects
243                .take_decoder_dispatcher(&component_id)
244                .ok_or_else(|| generic_error!("No events dispatcher found for decoder component '{}'", component_id))?;
245
246            let consumer = interconnects
247                .take_decoder_consumer(&component_id)
248                .ok_or_else(|| generic_error!("No payloads consumer found for decoder component '{}'", component_id))?;
249
250            let health_handle = health_registry
251                .register_component(format!("{}.decoders.{}", root_component_name, component_id))
252                .expect("duplicate decoder component ID in health registry");
253
254            let component_context = ComponentContext::decoder(component_id.clone());
255            let context = DecoderContext::new(
256                &topology_context,
257                &component_context,
258                component_registry,
259                health_handle,
260                dispatcher,
261                consumer,
262            );
263
264            let (alloc_group, decoder) = decoder.into_parts();
265            let task_handle = spawn_component(
266                &mut component_tasks,
267                component_context,
268                alloc_group,
269                decoder.run(context),
270            );
271            component_task_map.insert(task_handle.id(), component_id);
272        }
273
274        // Spawn our transforms.
275        for (component_id, transform) in self.transforms {
276            let (transform, component_registry) = transform.into_parts();
277
278            let dispatcher = interconnects.take_transform_dispatcher(&component_id).ok_or_else(|| {
279                generic_error!("No events dispatcher found for transform component '{}'", component_id)
280            })?;
281
282            let consumer = interconnects
283                .take_transform_consumer(&component_id)
284                .ok_or_else(|| generic_error!("No events consumer found for transform component '{}'", component_id))?;
285
286            let health_handle = health_registry
287                .register_component(format!("{}.transforms.{}", root_component_name, component_id))
288                .expect("duplicate transform component ID in health registry");
289
290            let component_context = ComponentContext::transform(component_id.clone());
291            let context = TransformContext::new(
292                &topology_context,
293                &component_context,
294                component_registry,
295                health_handle,
296                dispatcher,
297                consumer,
298            );
299
300            let (alloc_group, transform) = transform.into_parts();
301            let task_handle = spawn_component(
302                &mut component_tasks,
303                component_context,
304                alloc_group,
305                transform.run(context),
306            );
307            component_task_map.insert(task_handle.id(), component_id);
308        }
309
310        // Spawn our destinations.
311        for (component_id, destination) in self.destinations {
312            let (destination, component_registry) = destination.into_parts();
313
314            let consumer = interconnects.take_destination_consumer(&component_id).ok_or_else(|| {
315                generic_error!("No events consumer found for destination component '{}'", component_id)
316            })?;
317
318            let health_handle = health_registry
319                .register_component(format!("{}.destinations.{}", root_component_name, component_id))
320                .expect("duplicate destination component ID in health registry");
321
322            let component_context = ComponentContext::destination(component_id.clone());
323            let context = DestinationContext::new(
324                &topology_context,
325                &component_context,
326                component_registry,
327                health_handle,
328                consumer,
329            );
330
331            let (alloc_group, destination) = destination.into_parts();
332            let task_handle = spawn_component(
333                &mut component_tasks,
334                component_context,
335                alloc_group,
336                destination.run(context),
337            );
338            component_task_map.insert(task_handle.id(), component_id);
339        }
340
341        // Spawn our encoders.
342        for (component_id, encoder) in self.encoders {
343            let (encoder, component_registry) = encoder.into_parts();
344
345            let dispatcher = interconnects.take_encoder_dispatcher(&component_id).ok_or_else(|| {
346                generic_error!("No payloads dispatcher found for encoder component '{}'", component_id)
347            })?;
348
349            let consumer = interconnects
350                .take_encoder_consumer(&component_id)
351                .ok_or_else(|| generic_error!("No events consumer found for encoder component '{}'", component_id))?;
352
353            let health_handle = health_registry
354                .register_component(format!("{}.encoders.{}", root_component_name, component_id))
355                .expect("duplicate encoder component ID in health registry");
356
357            let component_context = ComponentContext::encoder(component_id.clone());
358            let context = EncoderContext::new(
359                &topology_context,
360                &component_context,
361                component_registry,
362                health_handle,
363                dispatcher,
364                consumer,
365            );
366
367            let (alloc_group, encoder) = encoder.into_parts();
368            let task_handle = spawn_component(
369                &mut component_tasks,
370                component_context,
371                alloc_group,
372                encoder.run(context),
373            );
374            component_task_map.insert(task_handle.id(), component_id);
375        }
376
377        // Spawn our forwarders.
378        for (component_id, forwarder) in self.forwarders {
379            let (forwarder, component_registry) = forwarder.into_parts();
380
381            let consumer = interconnects.take_forwarder_consumer(&component_id).ok_or_else(|| {
382                generic_error!("No payloads consumer found for forwarder component '{}'", component_id)
383            })?;
384
385            let health_handle = health_registry
386                .register_component(format!("{}.forwarders.{}", root_component_name, component_id))
387                .expect("duplicate forwarder component ID in health registry");
388
389            let component_context = ComponentContext::forwarder(component_id.clone());
390            let context = ForwarderContext::new(
391                &topology_context,
392                &component_context,
393                component_registry,
394                health_handle,
395                consumer,
396            );
397
398            let (alloc_group, forwarder) = forwarder.into_parts();
399            let task_handle = spawn_component(
400                &mut component_tasks,
401                component_context,
402                alloc_group,
403                forwarder.run(context),
404            );
405            component_task_map.insert(task_handle.id(), component_id);
406        }
407
408        Ok(RunningTopology::from_parts(
409            shutdown_coordinator,
410            component_tasks,
411            component_task_map,
412        ))
413    }
414}
415
416struct ComponentInterconnects {
417    interconnect_capacity: NonZeroUsize,
418    source_dispatchers: HashMap<ComponentId, EventsDispatcher>,
419    relay_dispatchers: HashMap<ComponentId, PayloadsDispatcher>,
420    decoder_consumers: HashMap<ComponentId, (mpsc::Sender<PayloadsBuffer>, PayloadsConsumer)>,
421    decoder_dispatchers: HashMap<ComponentId, EventsDispatcher>,
422    transform_consumers: HashMap<ComponentId, (mpsc::Sender<EventsBuffer>, EventsConsumer)>,
423    transform_dispatchers: HashMap<ComponentId, EventsDispatcher>,
424    destination_consumers: HashMap<ComponentId, (mpsc::Sender<EventsBuffer>, EventsConsumer)>,
425    encoder_consumers: HashMap<ComponentId, (mpsc::Sender<EventsBuffer>, EventsConsumer)>,
426    encoder_dispatchers: HashMap<ComponentId, PayloadsDispatcher>,
427    forwarder_consumers: HashMap<ComponentId, (mpsc::Sender<PayloadsBuffer>, PayloadsConsumer)>,
428}
429
430impl ComponentInterconnects {
431    fn from_graph(interconnect_capacity: NonZeroUsize, graph: &Graph) -> Result<Self, GenericError> {
432        let mut interconnects = Self {
433            interconnect_capacity,
434            source_dispatchers: HashMap::new(),
435            relay_dispatchers: HashMap::new(),
436            decoder_consumers: HashMap::new(),
437            decoder_dispatchers: HashMap::new(),
438            transform_consumers: HashMap::new(),
439            transform_dispatchers: HashMap::new(),
440            destination_consumers: HashMap::new(),
441            encoder_consumers: HashMap::new(),
442            encoder_dispatchers: HashMap::new(),
443            forwarder_consumers: HashMap::new(),
444        };
445
446        interconnects.generate_interconnects(graph)?;
447        Ok(interconnects)
448    }
449
450    fn take_source_dispatcher(&mut self, component_id: &ComponentId) -> Option<EventsDispatcher> {
451        self.source_dispatchers.remove(component_id)
452    }
453
454    fn take_relay_dispatcher(&mut self, component_id: &ComponentId) -> Option<PayloadsDispatcher> {
455        self.relay_dispatchers.remove(component_id)
456    }
457
458    fn take_decoder_dispatcher(&mut self, component_id: &ComponentId) -> Option<EventsDispatcher> {
459        self.decoder_dispatchers.remove(component_id)
460    }
461
462    fn take_decoder_consumer(&mut self, component_id: &ComponentId) -> Option<PayloadsConsumer> {
463        self.decoder_consumers
464            .remove(component_id)
465            .map(|(_, consumer)| consumer)
466    }
467
468    fn take_transform_dispatcher(&mut self, component_id: &ComponentId) -> Option<EventsDispatcher> {
469        self.transform_dispatchers.remove(component_id)
470    }
471
472    fn take_encoder_dispatcher(&mut self, component_id: &ComponentId) -> Option<PayloadsDispatcher> {
473        self.encoder_dispatchers.remove(component_id)
474    }
475
476    fn take_transform_consumer(&mut self, component_id: &ComponentId) -> Option<EventsConsumer> {
477        self.transform_consumers
478            .remove(component_id)
479            .map(|(_, consumer)| consumer)
480    }
481
482    fn take_destination_consumer(&mut self, component_id: &ComponentId) -> Option<EventsConsumer> {
483        self.destination_consumers
484            .remove(component_id)
485            .map(|(_, consumer)| consumer)
486    }
487
488    fn take_encoder_consumer(&mut self, component_id: &ComponentId) -> Option<EventsConsumer> {
489        self.encoder_consumers
490            .remove(component_id)
491            .map(|(_, consumer)| consumer)
492    }
493
494    fn take_forwarder_consumer(&mut self, component_id: &ComponentId) -> Option<PayloadsConsumer> {
495        self.forwarder_consumers
496            .remove(component_id)
497            .map(|(_, consumer)| consumer)
498    }
499
500    fn generate_interconnects(&mut self, graph: &Graph) -> Result<(), GenericError> {
501        // Collect and iterate over each outbound edge in the topology graph.
502        //
503        // For each upstream component ("from" side of the edge), we attach each downstream component ("to" side of the edge) to it,
504        // creating the relevant dispatcher or consumer if necessary.
505        let outbound_edges = graph.get_outbound_directed_edges();
506        for (upstream_id, output_map) in outbound_edges {
507            match upstream_id.component_type() {
508                ComponentType::Source | ComponentType::Decoder | ComponentType::Transform => {
509                    self.generate_event_interconnect(upstream_id, output_map)?;
510                }
511                ComponentType::Relay | ComponentType::Encoder => {
512                    self.generate_payload_interconnect(upstream_id, output_map)?
513                }
514                _ => panic!(
515                    "Only sources, decoders, transforms, relays, and encoders can dispatch events/payloads to downstream components."
516                ),
517            }
518        }
519
520        Ok(())
521    }
522
523    fn generate_event_interconnect(
524        &mut self, upstream_id: TypedComponentId, output_map: HashMap<OutputName, Vec<TypedComponentId>>,
525    ) -> Result<(), GenericError> {
526        for (upstream_output_id, downstream_ids) in output_map {
527            let mut senders = Vec::new();
528            for downstream_id in downstream_ids {
529                debug!(upstream_id = %upstream_id.component_id(), %upstream_output_id, downstream_id = %downstream_id.component_id(), "Adding dispatcher output.");
530                let sender = self.get_or_create_events_sender(downstream_id);
531                senders.push(sender);
532            }
533
534            let dispatcher = self.get_or_create_events_dispatcher(upstream_id.clone());
535            dispatcher.add_output(upstream_output_id.clone())?;
536
537            for sender in senders {
538                dispatcher.attach_sender_to_output(&upstream_output_id, sender)?;
539            }
540        }
541
542        Ok(())
543    }
544
545    fn generate_payload_interconnect(
546        &mut self, upstream_id: TypedComponentId, output_map: HashMap<OutputName, Vec<TypedComponentId>>,
547    ) -> Result<(), GenericError> {
548        for (upstream_output_id, downstream_ids) in output_map {
549            let mut senders = Vec::new();
550            for downstream_id in downstream_ids {
551                debug!(upstream_id = %upstream_id.component_id(), %upstream_output_id, downstream_id = %downstream_id.component_id(), "Adding dispatcher output.");
552                let sender = self.get_or_create_payloads_sender(downstream_id);
553                senders.push(sender);
554            }
555
556            let dispatcher = self.get_or_create_payloads_dispatcher(upstream_id.clone());
557            dispatcher.add_output(upstream_output_id.clone())?;
558
559            for sender in senders {
560                dispatcher.attach_sender_to_output(&upstream_output_id, sender)?;
561            }
562        }
563
564        Ok(())
565    }
566
567    fn get_or_create_events_dispatcher(&mut self, component_id: TypedComponentId) -> &mut EventsDispatcher {
568        let (component_id, component_type, component_context) = component_id.into_parts();
569
570        match component_type {
571            ComponentType::Source => self
572                .source_dispatchers
573                .entry(component_id)
574                .or_insert_with(|| EventsDispatcher::new(component_context)),
575            ComponentType::Decoder => self
576                .decoder_dispatchers
577                .entry(component_id)
578                .or_insert_with(|| EventsDispatcher::new(component_context)),
579            ComponentType::Transform => self
580                .transform_dispatchers
581                .entry(component_id)
582                .or_insert_with(|| EventsDispatcher::new(component_context)),
583            _ => {
584                panic!("Only sources, decoders, and transforms can dispatch events to downstream components.")
585            }
586        }
587    }
588
589    fn get_or_create_events_sender(&mut self, component_id: TypedComponentId) -> mpsc::Sender<EventsBuffer> {
590        let (component_id, component_type, component_context) = component_id.into_parts();
591        let interconnect_capacity = self.interconnect_capacity;
592
593        let (sender, _) = match component_type {
594            ComponentType::Transform => self
595                .transform_consumers
596                .entry(component_id)
597                .or_insert_with(|| build_events_consumer_pair(component_context, interconnect_capacity)),
598            ComponentType::Destination => self
599                .destination_consumers
600                .entry(component_id)
601                .or_insert_with(|| build_events_consumer_pair(component_context, interconnect_capacity)),
602            ComponentType::Encoder => self
603                .encoder_consumers
604                .entry(component_id)
605                .or_insert_with(|| build_events_consumer_pair(component_context, interconnect_capacity)),
606            _ => panic!("Only transforms, destinations, and encoders can consume events."),
607        };
608
609        sender.clone()
610    }
611
612    fn get_or_create_payloads_dispatcher(&mut self, component_id: TypedComponentId) -> &mut PayloadsDispatcher {
613        let (component_id, component_type, component_context) = component_id.into_parts();
614
615        match component_type {
616            ComponentType::Relay => self
617                .relay_dispatchers
618                .entry(component_id)
619                .or_insert_with(|| PayloadsDispatcher::new(component_context)),
620            ComponentType::Encoder => self
621                .encoder_dispatchers
622                .entry(component_id)
623                .or_insert_with(|| PayloadsDispatcher::new(component_context)),
624            _ => {
625                panic!("Only relays and encoders can dispatch payloads to downstream components.")
626            }
627        }
628    }
629
630    fn get_or_create_payloads_sender(&mut self, component_id: TypedComponentId) -> mpsc::Sender<PayloadsBuffer> {
631        let (component_id, component_type, component_context) = component_id.into_parts();
632        let interconnect_capacity = self.interconnect_capacity;
633
634        let (sender, _) = match component_type {
635            ComponentType::Decoder => self
636                .decoder_consumers
637                .entry(component_id)
638                .or_insert_with(|| build_payloads_consumer_pair(component_context, interconnect_capacity)),
639            ComponentType::Forwarder => self
640                .forwarder_consumers
641                .entry(component_id)
642                .or_insert_with(|| build_payloads_consumer_pair(component_context, interconnect_capacity)),
643            _ => panic!("Only decoders and forwarders can consume payloads."),
644        };
645
646        sender.clone()
647    }
648}
649
650fn build_events_consumer_pair(
651    component_context: ComponentContext, interconnect_capacity: NonZeroUsize,
652) -> (mpsc::Sender<EventsBuffer>, EventsConsumer) {
653    let (sender, receiver) = mpsc::channel(interconnect_capacity.get());
654    let consumer = EventsConsumer::new(component_context, receiver);
655    (sender, consumer)
656}
657
658fn build_payloads_consumer_pair(
659    component_context: ComponentContext, interconnect_capacity: NonZeroUsize,
660) -> (mpsc::Sender<PayloadsBuffer>, PayloadsConsumer) {
661    let (sender, receiver) = mpsc::channel(interconnect_capacity.get());
662    let consumer = PayloadsConsumer::new(component_context, receiver);
663    (sender, consumer)
664}
665
666fn spawn_component<F>(
667    join_set: &mut JoinSet<Result<(), GenericError>>, context: ComponentContext,
668    resource_group_token: ResourceGroupToken, component_future: F,
669) -> AbortHandle
670where
671    F: Future<Output = Result<(), GenericError>> + Send + 'static,
672{
673    let component_span = error_span!(
674        "component",
675        "type" = context.component_type().as_str(),
676        id = %context.component_id(),
677    );
678
679    let _span = component_span.enter();
680    let _guard = resource_group_token.enter();
681
682    let component_task_name = format!(
683        "topology-{}-{}",
684        context.component_type().as_str(),
685        context.component_id()
686    );
687    join_set.spawn_traced_named(component_task_name, component_future)
688}
689
690#[cfg(test)]
691mod tests {
692    use std::num::NonZeroUsize;
693
694    use super::*;
695    use crate::data_model::event::EventType;
696    use crate::data_model::payload::PayloadType;
697    use crate::topology::graph::Graph;
698
699    #[test]
700    fn component_interconnects_adds_output_before_attaching() {
701        let mut graph = Graph::default();
702
703        // Create a set of components and connect them together.
704        graph
705            .with_source("source1", EventType::EventD)
706            .with_transform("transform1", EventType::EventD, EventType::EventD)
707            .with_encoder("encoder1", EventType::EventD, PayloadType::Raw)
708            .with_forwarder("forwarder1", PayloadType::Raw)
709            .with_destination("dest1", EventType::EventD)
710            .with_edge("source1", "transform1")
711            .with_edge("transform1", "encoder1")
712            .with_edge("encoder1", "forwarder1")
713            .with_edge("transform1", "dest1");
714
715        // Ensure we can properly build the interconnects for them, which requires adding the outputs
716        // before attaching senders to them:
717        let interconnect_capacity = NonZeroUsize::new(10).unwrap();
718        let _ = ComponentInterconnects::from_graph(interconnect_capacity, &graph)
719            .expect("should build interconnects successfully");
720    }
721}