saluki_core/topology/
built.rs

1use std::{collections::HashMap, future::Future, num::NonZeroUsize};
2
3use memory_accounting::{
4    allocator::{AllocationGroupToken, Tracked},
5    MemoryLimiter,
6};
7use saluki_common::task::JoinSetExt as _;
8use saluki_error::{generic_error, ErrorContext as _, GenericError};
9use saluki_health::HealthRegistry;
10use tokio::{
11    sync::mpsc,
12    task::{AbortHandle, JoinSet},
13};
14use tracing::{debug, error_span};
15
16use super::{
17    graph::Graph, running::RunningTopology, shutdown::ComponentShutdownCoordinator, ComponentId, EventsBuffer,
18    EventsConsumer, OutputName, PayloadsConsumer, RegisteredComponent, TypedComponentId,
19};
20use crate::{
21    components::{
22        destinations::{Destination, DestinationContext},
23        encoders::{Encoder, EncoderContext},
24        forwarders::{Forwarder, ForwarderContext},
25        relays::{Relay, RelayContext},
26        sources::{Source, SourceContext},
27        transforms::{Transform, TransformContext},
28        ComponentContext, ComponentType,
29    },
30    topology::{context::TopologyContext, EventsDispatcher, PayloadsBuffer, PayloadsDispatcher},
31};
32
33/// A built topology.
34///
35/// Built topologies represent a topology blueprint where each configured component, along with their associated
36/// connections to other components, was validated and built successfully.
37///
38/// A built topology must be spawned via [`spawn`][Self::spawn].
39pub struct BuiltTopology {
40    name: String,
41    graph: Graph,
42    sources: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Source + Send>>>>,
43    relays: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Relay + Send>>>>,
44    transforms: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Transform + Send>>>>,
45    destinations: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Destination + Send>>>>,
46    encoders: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Encoder + Send>>>>,
47    forwarders: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Forwarder + Send>>>>,
48    component_token: AllocationGroupToken,
49    interconnect_capacity: NonZeroUsize,
50}
51
52impl BuiltTopology {
53    #[allow(clippy::too_many_arguments)]
54    pub(crate) fn from_parts(
55        name: String, graph: Graph,
56        sources: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Source + Send>>>>,
57        relays: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Relay + Send>>>>,
58        transforms: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Transform + Send>>>>,
59        destinations: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Destination + Send>>>>,
60        encoders: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Encoder + Send>>>>,
61        forwarders: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Forwarder + Send>>>>,
62        component_token: AllocationGroupToken, interconnect_capacity: NonZeroUsize,
63    ) -> Self {
64        Self {
65            name,
66            graph,
67            sources,
68            relays,
69            transforms,
70            destinations,
71            encoders,
72            forwarders,
73            component_token,
74            interconnect_capacity,
75        }
76    }
77
78    /// Spawns the topology.
79    ///
80    /// A handle is returned that can be used to trigger the topology to shutdown.
81    ///
82    /// ## Errors
83    ///
84    /// If an error occurs while spawning the topology, an error is returned.
85    pub async fn spawn(
86        self, health_registry: &HealthRegistry, memory_limiter: MemoryLimiter,
87    ) -> Result<RunningTopology, GenericError> {
88        let root_component_name = format!("topology.{}", self.name);
89
90        let _guard = self.component_token.enter();
91
92        let thread_pool = tokio::runtime::Builder::new_multi_thread()
93            .worker_threads(8)
94            .enable_all()
95            .build()
96            .error_context("Failed to build asynchronous thread pool runtime.")?;
97        let thread_pool_handle = thread_pool.handle().clone();
98
99        std::thread::spawn(move || {
100            thread_pool.block_on(std::future::pending::<()>());
101        });
102
103        let topology_context = TopologyContext::new(memory_limiter, health_registry.clone(), thread_pool_handle);
104
105        let mut component_tasks = JoinSet::new();
106        let mut component_task_map = HashMap::new();
107
108        // Build our interconnects, which we'll grab from piecemeal as we spawn our components.
109        let mut interconnects = ComponentInterconnects::from_graph(self.interconnect_capacity, &self.graph)
110            .error_context("Failed to build component interconnects.")?;
111
112        let mut shutdown_coordinator = ComponentShutdownCoordinator::default();
113
114        // Spawn our sources.
115        for (component_id, source) in self.sources {
116            let (source, component_registry) = source.into_parts();
117
118            let dispatcher = interconnects
119                .take_source_dispatcher(&component_id)
120                .ok_or_else(|| generic_error!("No events dispatcher found for source component '{}'", component_id))?;
121
122            let shutdown_handle = shutdown_coordinator.register();
123            let health_handle = health_registry
124                .register_component(format!("{}.sources.{}", root_component_name, component_id))
125                .expect("duplicate source component ID in health registry");
126
127            let component_context = ComponentContext::source(component_id.clone());
128            let context = SourceContext::new(
129                &topology_context,
130                &component_context,
131                component_registry,
132                shutdown_handle,
133                health_handle,
134                dispatcher,
135            );
136
137            let (alloc_group, source) = source.into_parts();
138            let task_handle = spawn_component(
139                &mut component_tasks,
140                component_context,
141                alloc_group,
142                source.run(context),
143            );
144            component_task_map.insert(task_handle.id(), component_id);
145        }
146
147        // Spawn our relays.
148        for (component_id, relay) in self.relays {
149            let (relay, component_registry) = relay.into_parts();
150
151            let dispatcher = interconnects
152                .take_relay_dispatcher(&component_id)
153                .ok_or_else(|| generic_error!("No payloads dispatcher found for relay component '{}'", component_id))?;
154
155            let shutdown_handle = shutdown_coordinator.register();
156            let health_handle = health_registry
157                .register_component(format!("{}.relays.{}", root_component_name, component_id))
158                .expect("duplicate relay component ID in health registry");
159
160            let component_context = ComponentContext::relay(component_id.clone());
161            let context = RelayContext::new(
162                &topology_context,
163                &component_context,
164                component_registry,
165                shutdown_handle,
166                health_handle,
167                dispatcher,
168            );
169
170            let (alloc_group, relay) = relay.into_parts();
171            let task_handle = spawn_component(&mut component_tasks, component_context, alloc_group, relay.run(context));
172            component_task_map.insert(task_handle.id(), component_id);
173        }
174
175        // Spawn our transforms.
176        for (component_id, transform) in self.transforms {
177            let (transform, component_registry) = transform.into_parts();
178
179            let dispatcher = interconnects.take_transform_dispatcher(&component_id).ok_or_else(|| {
180                generic_error!("No events dispatcher found for transform component '{}'", component_id)
181            })?;
182
183            let consumer = interconnects
184                .take_transform_consumer(&component_id)
185                .ok_or_else(|| generic_error!("No events consumer found for transform component '{}'", component_id))?;
186
187            let health_handle = health_registry
188                .register_component(format!("{}.transforms.{}", root_component_name, component_id))
189                .expect("duplicate transform component ID in health registry");
190
191            let component_context = ComponentContext::transform(component_id.clone());
192            let context = TransformContext::new(
193                &topology_context,
194                &component_context,
195                component_registry,
196                health_handle,
197                dispatcher,
198                consumer,
199            );
200
201            let (alloc_group, transform) = transform.into_parts();
202            let task_handle = spawn_component(
203                &mut component_tasks,
204                component_context,
205                alloc_group,
206                transform.run(context),
207            );
208            component_task_map.insert(task_handle.id(), component_id);
209        }
210
211        // Spawn our destinations.
212        for (component_id, destination) in self.destinations {
213            let (destination, component_registry) = destination.into_parts();
214
215            let consumer = interconnects.take_destination_consumer(&component_id).ok_or_else(|| {
216                generic_error!("No events consumer found for destination component '{}'", component_id)
217            })?;
218
219            let health_handle = health_registry
220                .register_component(format!("{}.destinations.{}", root_component_name, component_id))
221                .expect("duplicate destination component ID in health registry");
222
223            let component_context = ComponentContext::destination(component_id.clone());
224            let context = DestinationContext::new(
225                &topology_context,
226                &component_context,
227                component_registry,
228                health_handle,
229                consumer,
230            );
231
232            let (alloc_group, destination) = destination.into_parts();
233            let task_handle = spawn_component(
234                &mut component_tasks,
235                component_context,
236                alloc_group,
237                destination.run(context),
238            );
239            component_task_map.insert(task_handle.id(), component_id);
240        }
241
242        // Spawn our encoders.
243        for (component_id, encoder) in self.encoders {
244            let (encoder, component_registry) = encoder.into_parts();
245
246            let dispatcher = interconnects.take_encoder_dispatcher(&component_id).ok_or_else(|| {
247                generic_error!("No payloads dispatcher found for encoder component '{}'", component_id)
248            })?;
249
250            let consumer = interconnects
251                .take_encoder_consumer(&component_id)
252                .ok_or_else(|| generic_error!("No events consumer found for encoder component '{}'", component_id))?;
253
254            let health_handle = health_registry
255                .register_component(format!("{}.encoders.{}", root_component_name, component_id))
256                .expect("duplicate encoder component ID in health registry");
257
258            let component_context = ComponentContext::encoder(component_id.clone());
259            let context = EncoderContext::new(
260                &topology_context,
261                &component_context,
262                component_registry,
263                health_handle,
264                dispatcher,
265                consumer,
266            );
267
268            let (alloc_group, encoder) = encoder.into_parts();
269            let task_handle = spawn_component(
270                &mut component_tasks,
271                component_context,
272                alloc_group,
273                encoder.run(context),
274            );
275            component_task_map.insert(task_handle.id(), component_id);
276        }
277
278        // Spawn our forwarders.
279        for (component_id, forwarder) in self.forwarders {
280            let (forwarder, component_registry) = forwarder.into_parts();
281
282            let consumer = interconnects.take_forwarder_consumer(&component_id).ok_or_else(|| {
283                generic_error!("No payloads consumer found for forwarder component '{}'", component_id)
284            })?;
285
286            let health_handle = health_registry
287                .register_component(format!("{}.forwarders.{}", root_component_name, component_id))
288                .expect("duplicate forwarder component ID in health registry");
289
290            let component_context = ComponentContext::forwarder(component_id.clone());
291            let context = ForwarderContext::new(
292                &topology_context,
293                &component_context,
294                component_registry,
295                health_handle,
296                consumer,
297            );
298
299            let (alloc_group, forwarder) = forwarder.into_parts();
300            let task_handle = spawn_component(
301                &mut component_tasks,
302                component_context,
303                alloc_group,
304                forwarder.run(context),
305            );
306            component_task_map.insert(task_handle.id(), component_id);
307        }
308
309        Ok(RunningTopology::from_parts(
310            shutdown_coordinator,
311            component_tasks,
312            component_task_map,
313        ))
314    }
315}
316
317struct ComponentInterconnects {
318    interconnect_capacity: NonZeroUsize,
319    source_dispatchers: HashMap<ComponentId, EventsDispatcher>,
320    relay_dispatchers: HashMap<ComponentId, PayloadsDispatcher>,
321    transform_consumers: HashMap<ComponentId, (mpsc::Sender<EventsBuffer>, EventsConsumer)>,
322    transform_dispatchers: HashMap<ComponentId, EventsDispatcher>,
323    destination_consumers: HashMap<ComponentId, (mpsc::Sender<EventsBuffer>, EventsConsumer)>,
324    encoder_consumers: HashMap<ComponentId, (mpsc::Sender<EventsBuffer>, EventsConsumer)>,
325    encoder_dispatchers: HashMap<ComponentId, PayloadsDispatcher>,
326    forwarder_consumers: HashMap<ComponentId, (mpsc::Sender<PayloadsBuffer>, PayloadsConsumer)>,
327}
328
329impl ComponentInterconnects {
330    fn from_graph(interconnect_capacity: NonZeroUsize, graph: &Graph) -> Result<Self, GenericError> {
331        let mut interconnects = Self {
332            interconnect_capacity,
333            source_dispatchers: HashMap::new(),
334            relay_dispatchers: HashMap::new(),
335            transform_consumers: HashMap::new(),
336            transform_dispatchers: HashMap::new(),
337            destination_consumers: HashMap::new(),
338            encoder_consumers: HashMap::new(),
339            encoder_dispatchers: HashMap::new(),
340            forwarder_consumers: HashMap::new(),
341        };
342
343        interconnects.generate_interconnects(graph)?;
344        Ok(interconnects)
345    }
346
347    fn take_source_dispatcher(&mut self, component_id: &ComponentId) -> Option<EventsDispatcher> {
348        self.source_dispatchers.remove(component_id)
349    }
350
351    fn take_relay_dispatcher(&mut self, component_id: &ComponentId) -> Option<PayloadsDispatcher> {
352        self.relay_dispatchers.remove(component_id)
353    }
354
355    fn take_transform_dispatcher(&mut self, component_id: &ComponentId) -> Option<EventsDispatcher> {
356        self.transform_dispatchers.remove(component_id)
357    }
358
359    fn take_encoder_dispatcher(&mut self, component_id: &ComponentId) -> Option<PayloadsDispatcher> {
360        self.encoder_dispatchers.remove(component_id)
361    }
362
363    fn take_transform_consumer(&mut self, component_id: &ComponentId) -> Option<EventsConsumer> {
364        self.transform_consumers
365            .remove(component_id)
366            .map(|(_, consumer)| consumer)
367    }
368
369    fn take_destination_consumer(&mut self, component_id: &ComponentId) -> Option<EventsConsumer> {
370        self.destination_consumers
371            .remove(component_id)
372            .map(|(_, consumer)| consumer)
373    }
374
375    fn take_encoder_consumer(&mut self, component_id: &ComponentId) -> Option<EventsConsumer> {
376        self.encoder_consumers
377            .remove(component_id)
378            .map(|(_, consumer)| consumer)
379    }
380
381    fn take_forwarder_consumer(&mut self, component_id: &ComponentId) -> Option<PayloadsConsumer> {
382        self.forwarder_consumers
383            .remove(component_id)
384            .map(|(_, consumer)| consumer)
385    }
386
387    fn generate_interconnects(&mut self, graph: &Graph) -> Result<(), GenericError> {
388        // Collect and iterate over each outbound edge in the topology graph.
389        //
390        // For each upstream component ("from" side of the edge), we attach each downstream component ("to" side of the edge) to it,
391        // creating the relevant dispatcher or consumer if necessary.
392        let outbound_edges = graph.get_outbound_directed_edges();
393        for (upstream_id, output_map) in outbound_edges {
394            match upstream_id.component_type() {
395                ComponentType::Source | ComponentType::Transform => {
396                    self.generate_event_interconnect(upstream_id, output_map)?;
397                }
398                ComponentType::Relay | ComponentType::Encoder => {
399                    self.generate_payload_interconnect(upstream_id, output_map)?
400                }
401                _ => panic!(
402                    "Only sources, transforms, relays, and encoders can dispatch events/payloads to downstream components."
403                ),
404            }
405        }
406
407        Ok(())
408    }
409
410    fn generate_event_interconnect(
411        &mut self, upstream_id: TypedComponentId, output_map: HashMap<OutputName, Vec<TypedComponentId>>,
412    ) -> Result<(), GenericError> {
413        for (upstream_output_id, downstream_ids) in output_map {
414            let mut senders = Vec::new();
415            for downstream_id in downstream_ids {
416                debug!(upstream_id = %upstream_id.component_id(), %upstream_output_id, downstream_id = %downstream_id.component_id(), "Adding dispatcher output.");
417                let sender = self.get_or_create_events_sender(downstream_id);
418                senders.push(sender);
419            }
420
421            let dispatcher = self.get_or_create_events_dispatcher(upstream_id.clone());
422            dispatcher.add_output(upstream_output_id.clone())?;
423
424            for sender in senders {
425                dispatcher.attach_sender_to_output(&upstream_output_id, sender)?;
426            }
427        }
428
429        Ok(())
430    }
431
432    fn generate_payload_interconnect(
433        &mut self, upstream_id: TypedComponentId, output_map: HashMap<OutputName, Vec<TypedComponentId>>,
434    ) -> Result<(), GenericError> {
435        for (upstream_output_id, downstream_ids) in output_map {
436            let mut senders = Vec::new();
437            for downstream_id in downstream_ids {
438                debug!(upstream_id = %upstream_id.component_id(), %upstream_output_id, downstream_id = %downstream_id.component_id(), "Adding dispatcher output.");
439                let sender = self.get_or_create_payloads_sender(downstream_id);
440                senders.push(sender);
441            }
442
443            let dispatcher = self.get_or_create_payloads_dispatcher(upstream_id.clone());
444            dispatcher.add_output(upstream_output_id.clone())?;
445
446            for sender in senders {
447                dispatcher.attach_sender_to_output(&upstream_output_id, sender)?;
448            }
449        }
450
451        Ok(())
452    }
453
454    fn get_or_create_events_dispatcher(&mut self, component_id: TypedComponentId) -> &mut EventsDispatcher {
455        let (component_id, component_type, component_context) = component_id.into_parts();
456
457        match component_type {
458            ComponentType::Source => self
459                .source_dispatchers
460                .entry(component_id)
461                .or_insert_with(|| EventsDispatcher::new(component_context)),
462            ComponentType::Transform => self
463                .transform_dispatchers
464                .entry(component_id)
465                .or_insert_with(|| EventsDispatcher::new(component_context)),
466            _ => {
467                panic!("Only sources and transforms can dispatch events to downstream components.")
468            }
469        }
470    }
471
472    fn get_or_create_events_sender(&mut self, component_id: TypedComponentId) -> mpsc::Sender<EventsBuffer> {
473        let (component_id, component_type, component_context) = component_id.into_parts();
474        let interconnect_capacity = self.interconnect_capacity;
475
476        let (sender, _) = match component_type {
477            ComponentType::Transform => self
478                .transform_consumers
479                .entry(component_id)
480                .or_insert_with(|| build_events_consumer_pair(component_context, interconnect_capacity)),
481            ComponentType::Destination => self
482                .destination_consumers
483                .entry(component_id)
484                .or_insert_with(|| build_events_consumer_pair(component_context, interconnect_capacity)),
485            ComponentType::Encoder => self
486                .encoder_consumers
487                .entry(component_id)
488                .or_insert_with(|| build_events_consumer_pair(component_context, interconnect_capacity)),
489            _ => panic!("Only transforms, destinations, and encoders can consume events."),
490        };
491
492        sender.clone()
493    }
494
495    fn get_or_create_payloads_dispatcher(&mut self, component_id: TypedComponentId) -> &mut PayloadsDispatcher {
496        let (component_id, component_type, component_context) = component_id.into_parts();
497
498        match component_type {
499            ComponentType::Relay => self
500                .relay_dispatchers
501                .entry(component_id)
502                .or_insert_with(|| PayloadsDispatcher::new(component_context)),
503            ComponentType::Encoder => self
504                .encoder_dispatchers
505                .entry(component_id)
506                .or_insert_with(|| PayloadsDispatcher::new(component_context)),
507            _ => {
508                panic!("Only relays and encoders can dispatch payloads to downstream components.")
509            }
510        }
511    }
512
513    fn get_or_create_payloads_sender(&mut self, component_id: TypedComponentId) -> mpsc::Sender<PayloadsBuffer> {
514        let (component_id, component_type, component_context) = component_id.into_parts();
515        let interconnect_capacity = self.interconnect_capacity;
516
517        let (sender, _) = match component_type {
518            ComponentType::Forwarder => self
519                .forwarder_consumers
520                .entry(component_id)
521                .or_insert_with(|| build_payloads_consumer_pair(component_context, interconnect_capacity)),
522            _ => panic!("Only forwarders can consume payloads."),
523        };
524
525        sender.clone()
526    }
527}
528
529fn build_events_consumer_pair(
530    component_context: ComponentContext, interconnect_capacity: NonZeroUsize,
531) -> (mpsc::Sender<EventsBuffer>, EventsConsumer) {
532    let (sender, receiver) = mpsc::channel(interconnect_capacity.get());
533    let consumer = EventsConsumer::new(component_context, receiver);
534    (sender, consumer)
535}
536
537fn build_payloads_consumer_pair(
538    component_context: ComponentContext, interconnect_capacity: NonZeroUsize,
539) -> (mpsc::Sender<PayloadsBuffer>, PayloadsConsumer) {
540    let (sender, receiver) = mpsc::channel(interconnect_capacity.get());
541    let consumer = PayloadsConsumer::new(component_context, receiver);
542    (sender, consumer)
543}
544
545fn spawn_component<F>(
546    join_set: &mut JoinSet<Result<(), GenericError>>, context: ComponentContext,
547    allocation_group_token: AllocationGroupToken, component_future: F,
548) -> AbortHandle
549where
550    F: Future<Output = Result<(), GenericError>> + Send + 'static,
551{
552    let component_span = error_span!(
553        "component",
554        "type" = context.component_type().as_str(),
555        id = %context.component_id(),
556    );
557
558    let _span = component_span.enter();
559    let _guard = allocation_group_token.enter();
560
561    let component_task_name = format!(
562        "topology-{}-{}",
563        context.component_type().as_str(),
564        context.component_id()
565    );
566    join_set.spawn_traced_named(component_task_name, component_future)
567}
568
569#[cfg(test)]
570mod tests {
571    use std::num::NonZeroUsize;
572
573    use super::*;
574    use crate::data_model::event::EventType;
575    use crate::data_model::payload::PayloadType;
576    use crate::topology::graph::Graph;
577
578    #[test]
579    fn component_interconnects_adds_output_before_attaching() {
580        let mut graph = Graph::default();
581
582        // Create a set of components and connect them together.
583        graph
584            .with_source("source1", EventType::EventD)
585            .with_transform("transform1", EventType::EventD, EventType::EventD)
586            .with_encoder("encoder1", EventType::EventD, PayloadType::Raw)
587            .with_forwarder("forwarder1", PayloadType::Raw)
588            .with_destination("dest1", EventType::EventD)
589            .with_edge("source1", "transform1")
590            .with_edge("transform1", "encoder1")
591            .with_edge("encoder1", "forwarder1")
592            .with_edge("transform1", "dest1");
593
594        // Ensure we can properly build the interconnects for them, which requires adding the outputs
595        // before attaching senders to them:
596        let interconnect_capacity = NonZeroUsize::new(10).unwrap();
597        let _ = ComponentInterconnects::from_graph(interconnect_capacity, &graph)
598            .expect("should build interconnects successfully");
599    }
600}