saluki_core/topology/
built.rs

1use std::{collections::HashMap, future::Future, num::NonZeroUsize};
2
3use memory_accounting::{
4    allocator::{AllocationGroupToken, Tracked},
5    MemoryLimiter,
6};
7use saluki_common::task::JoinSetExt as _;
8use saluki_error::{generic_error, ErrorContext as _, GenericError};
9use saluki_health::HealthRegistry;
10use tokio::{
11    sync::mpsc,
12    task::{AbortHandle, JoinSet},
13};
14use tracing::{debug, error_span};
15
16use super::{
17    graph::Graph, running::RunningTopology, shutdown::ComponentShutdownCoordinator, ComponentId, EventsBuffer,
18    EventsConsumer, OutputName, PayloadsConsumer, RegisteredComponent, TypedComponentId,
19};
20use crate::{
21    components::{
22        destinations::{Destination, DestinationContext},
23        encoders::{Encoder, EncoderContext},
24        forwarders::{Forwarder, ForwarderContext},
25        relays::{Relay, RelayContext},
26        sources::{Source, SourceContext},
27        transforms::{Transform, TransformContext},
28        ComponentContext, ComponentType,
29    },
30    topology::{context::TopologyContext, EventsDispatcher, PayloadsBuffer, PayloadsDispatcher},
31};
32
33/// A built topology.
34///
35/// Built topologies represent a topology blueprint where each configured component, along with their associated
36/// connections to other components, was validated and built successfully.
37///
38/// A built topology must be spawned via [`spawn`][Self::spawn].
39pub struct BuiltTopology {
40    name: String,
41    graph: Graph,
42    sources: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Source + Send>>>>,
43    relays: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Relay + Send>>>>,
44    transforms: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Transform + Send>>>>,
45    destinations: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Destination + Send>>>>,
46    encoders: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Encoder + Send>>>>,
47    forwarders: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Forwarder + Send>>>>,
48    component_token: AllocationGroupToken,
49    interconnect_capacity: NonZeroUsize,
50}
51
52impl BuiltTopology {
53    #[allow(clippy::too_many_arguments)]
54    pub(crate) fn from_parts(
55        name: String, graph: Graph,
56        sources: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Source + Send>>>>,
57        relays: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Relay + Send>>>>,
58        transforms: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Transform + Send>>>>,
59        destinations: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Destination + Send>>>>,
60        encoders: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Encoder + Send>>>>,
61        forwarders: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Forwarder + Send>>>>,
62        component_token: AllocationGroupToken, interconnect_capacity: NonZeroUsize,
63    ) -> Self {
64        Self {
65            name,
66            graph,
67            sources,
68            relays,
69            transforms,
70            destinations,
71            encoders,
72            forwarders,
73            component_token,
74            interconnect_capacity,
75        }
76    }
77
78    /// Spawns the topology.
79    ///
80    /// A handle is returned that can be used to trigger the topology to shutdown.
81    ///
82    /// ## Errors
83    ///
84    /// If an error occurs while spawning the topology, an error is returned.
85    pub async fn spawn(
86        self, health_registry: &HealthRegistry, memory_limiter: MemoryLimiter,
87    ) -> Result<RunningTopology, GenericError> {
88        let root_component_name = format!("topology.{}", self.name);
89
90        let _guard = self.component_token.enter();
91
92        let thread_pool = tokio::runtime::Builder::new_multi_thread()
93            .worker_threads(8)
94            .enable_all()
95            .build()
96            .error_context("Failed to build asynchronous thread pool runtime.")?;
97        let thread_pool_handle = thread_pool.handle().clone();
98
99        let topology_context = TopologyContext::new(memory_limiter, health_registry.clone(), thread_pool_handle);
100
101        let mut component_tasks = JoinSet::new();
102        let mut component_task_map = HashMap::new();
103
104        // Build our interconnects, which we'll grab from piecemeal as we spawn our components.
105        let mut interconnects = ComponentInterconnects::from_graph(self.interconnect_capacity, &self.graph)
106            .error_context("Failed to build component interconnects.")?;
107
108        let mut shutdown_coordinator = ComponentShutdownCoordinator::default();
109
110        // Spawn our sources.
111        for (component_id, source) in self.sources {
112            let (source, component_registry) = source.into_parts();
113
114            let dispatcher = interconnects
115                .take_source_dispatcher(&component_id)
116                .ok_or_else(|| generic_error!("No events dispatcher found for source component '{}'", component_id))?;
117
118            let shutdown_handle = shutdown_coordinator.register();
119            let health_handle = health_registry
120                .register_component(format!("{}.sources.{}", root_component_name, component_id))
121                .expect("duplicate source component ID in health registry");
122
123            let component_context = ComponentContext::source(component_id.clone());
124            let context = SourceContext::new(
125                &topology_context,
126                &component_context,
127                component_registry,
128                shutdown_handle,
129                health_handle,
130                dispatcher,
131            );
132
133            let (alloc_group, source) = source.into_parts();
134            let task_handle = spawn_component(
135                &mut component_tasks,
136                component_context,
137                alloc_group,
138                source.run(context),
139            );
140            component_task_map.insert(task_handle.id(), component_id);
141        }
142
143        // Spawn our relays.
144        for (component_id, relay) in self.relays {
145            let (relay, component_registry) = relay.into_parts();
146
147            let dispatcher = interconnects
148                .take_relay_dispatcher(&component_id)
149                .ok_or_else(|| generic_error!("No payloads dispatcher found for relay component '{}'", component_id))?;
150
151            let shutdown_handle = shutdown_coordinator.register();
152            let health_handle = health_registry
153                .register_component(format!("{}.relays.{}", root_component_name, component_id))
154                .expect("duplicate relay component ID in health registry");
155
156            let component_context = ComponentContext::relay(component_id.clone());
157            let context = RelayContext::new(
158                &topology_context,
159                &component_context,
160                component_registry,
161                shutdown_handle,
162                health_handle,
163                dispatcher,
164            );
165
166            let (alloc_group, relay) = relay.into_parts();
167            let task_handle = spawn_component(&mut component_tasks, component_context, alloc_group, relay.run(context));
168            component_task_map.insert(task_handle.id(), component_id);
169        }
170
171        // Spawn our transforms.
172        for (component_id, transform) in self.transforms {
173            let (transform, component_registry) = transform.into_parts();
174
175            let dispatcher = interconnects.take_transform_dispatcher(&component_id).ok_or_else(|| {
176                generic_error!("No events dispatcher found for transform component '{}'", component_id)
177            })?;
178
179            let consumer = interconnects
180                .take_transform_consumer(&component_id)
181                .ok_or_else(|| generic_error!("No events consumer found for transform component '{}'", component_id))?;
182
183            let health_handle = health_registry
184                .register_component(format!("{}.transforms.{}", root_component_name, component_id))
185                .expect("duplicate transform component ID in health registry");
186
187            let component_context = ComponentContext::transform(component_id.clone());
188            let context = TransformContext::new(
189                &topology_context,
190                &component_context,
191                component_registry,
192                health_handle,
193                dispatcher,
194                consumer,
195            );
196
197            let (alloc_group, transform) = transform.into_parts();
198            let task_handle = spawn_component(
199                &mut component_tasks,
200                component_context,
201                alloc_group,
202                transform.run(context),
203            );
204            component_task_map.insert(task_handle.id(), component_id);
205        }
206
207        // Spawn our destinations.
208        for (component_id, destination) in self.destinations {
209            let (destination, component_registry) = destination.into_parts();
210
211            let consumer = interconnects.take_destination_consumer(&component_id).ok_or_else(|| {
212                generic_error!("No events consumer found for destination component '{}'", component_id)
213            })?;
214
215            let health_handle = health_registry
216                .register_component(format!("{}.destinations.{}", root_component_name, component_id))
217                .expect("duplicate destination component ID in health registry");
218
219            let component_context = ComponentContext::destination(component_id.clone());
220            let context = DestinationContext::new(
221                &topology_context,
222                &component_context,
223                component_registry,
224                health_handle,
225                consumer,
226            );
227
228            let (alloc_group, destination) = destination.into_parts();
229            let task_handle = spawn_component(
230                &mut component_tasks,
231                component_context,
232                alloc_group,
233                destination.run(context),
234            );
235            component_task_map.insert(task_handle.id(), component_id);
236        }
237
238        // Spawn our encoders.
239        for (component_id, encoder) in self.encoders {
240            let (encoder, component_registry) = encoder.into_parts();
241
242            let dispatcher = interconnects.take_encoder_dispatcher(&component_id).ok_or_else(|| {
243                generic_error!("No payloads dispatcher found for encoder component '{}'", component_id)
244            })?;
245
246            let consumer = interconnects
247                .take_encoder_consumer(&component_id)
248                .ok_or_else(|| generic_error!("No events consumer found for encoder component '{}'", component_id))?;
249
250            let health_handle = health_registry
251                .register_component(format!("{}.encoders.{}", root_component_name, component_id))
252                .expect("duplicate encoder component ID in health registry");
253
254            let component_context = ComponentContext::encoder(component_id.clone());
255            let context = EncoderContext::new(
256                &topology_context,
257                &component_context,
258                component_registry,
259                health_handle,
260                dispatcher,
261                consumer,
262            );
263
264            let (alloc_group, encoder) = encoder.into_parts();
265            let task_handle = spawn_component(
266                &mut component_tasks,
267                component_context,
268                alloc_group,
269                encoder.run(context),
270            );
271            component_task_map.insert(task_handle.id(), component_id);
272        }
273
274        // Spawn our forwarders.
275        for (component_id, forwarder) in self.forwarders {
276            let (forwarder, component_registry) = forwarder.into_parts();
277
278            let consumer = interconnects.take_forwarder_consumer(&component_id).ok_or_else(|| {
279                generic_error!("No payloads consumer found for forwarder component '{}'", component_id)
280            })?;
281
282            let health_handle = health_registry
283                .register_component(format!("{}.forwarders.{}", root_component_name, component_id))
284                .expect("duplicate forwarder component ID in health registry");
285
286            let component_context = ComponentContext::forwarder(component_id.clone());
287            let context = ForwarderContext::new(
288                &topology_context,
289                &component_context,
290                component_registry,
291                health_handle,
292                consumer,
293            );
294
295            let (alloc_group, forwarder) = forwarder.into_parts();
296            let task_handle = spawn_component(
297                &mut component_tasks,
298                component_context,
299                alloc_group,
300                forwarder.run(context),
301            );
302            component_task_map.insert(task_handle.id(), component_id);
303        }
304
305        Ok(RunningTopology::from_parts(
306            thread_pool,
307            shutdown_coordinator,
308            component_tasks,
309            component_task_map,
310        ))
311    }
312}
313
314struct ComponentInterconnects {
315    interconnect_capacity: NonZeroUsize,
316    source_dispatchers: HashMap<ComponentId, EventsDispatcher>,
317    relay_dispatchers: HashMap<ComponentId, PayloadsDispatcher>,
318    transform_consumers: HashMap<ComponentId, (mpsc::Sender<EventsBuffer>, EventsConsumer)>,
319    transform_dispatchers: HashMap<ComponentId, EventsDispatcher>,
320    destination_consumers: HashMap<ComponentId, (mpsc::Sender<EventsBuffer>, EventsConsumer)>,
321    encoder_consumers: HashMap<ComponentId, (mpsc::Sender<EventsBuffer>, EventsConsumer)>,
322    encoder_dispatchers: HashMap<ComponentId, PayloadsDispatcher>,
323    forwarder_consumers: HashMap<ComponentId, (mpsc::Sender<PayloadsBuffer>, PayloadsConsumer)>,
324}
325
326impl ComponentInterconnects {
327    fn from_graph(interconnect_capacity: NonZeroUsize, graph: &Graph) -> Result<Self, GenericError> {
328        let mut interconnects = Self {
329            interconnect_capacity,
330            source_dispatchers: HashMap::new(),
331            relay_dispatchers: HashMap::new(),
332            transform_consumers: HashMap::new(),
333            transform_dispatchers: HashMap::new(),
334            destination_consumers: HashMap::new(),
335            encoder_consumers: HashMap::new(),
336            encoder_dispatchers: HashMap::new(),
337            forwarder_consumers: HashMap::new(),
338        };
339
340        interconnects.generate_interconnects(graph)?;
341        Ok(interconnects)
342    }
343
344    fn take_source_dispatcher(&mut self, component_id: &ComponentId) -> Option<EventsDispatcher> {
345        self.source_dispatchers.remove(component_id)
346    }
347
348    fn take_relay_dispatcher(&mut self, component_id: &ComponentId) -> Option<PayloadsDispatcher> {
349        self.relay_dispatchers.remove(component_id)
350    }
351
352    fn take_transform_dispatcher(&mut self, component_id: &ComponentId) -> Option<EventsDispatcher> {
353        self.transform_dispatchers.remove(component_id)
354    }
355
356    fn take_encoder_dispatcher(&mut self, component_id: &ComponentId) -> Option<PayloadsDispatcher> {
357        self.encoder_dispatchers.remove(component_id)
358    }
359
360    fn take_transform_consumer(&mut self, component_id: &ComponentId) -> Option<EventsConsumer> {
361        self.transform_consumers
362            .remove(component_id)
363            .map(|(_, consumer)| consumer)
364    }
365
366    fn take_destination_consumer(&mut self, component_id: &ComponentId) -> Option<EventsConsumer> {
367        self.destination_consumers
368            .remove(component_id)
369            .map(|(_, consumer)| consumer)
370    }
371
372    fn take_encoder_consumer(&mut self, component_id: &ComponentId) -> Option<EventsConsumer> {
373        self.encoder_consumers
374            .remove(component_id)
375            .map(|(_, consumer)| consumer)
376    }
377
378    fn take_forwarder_consumer(&mut self, component_id: &ComponentId) -> Option<PayloadsConsumer> {
379        self.forwarder_consumers
380            .remove(component_id)
381            .map(|(_, consumer)| consumer)
382    }
383
384    fn generate_interconnects(&mut self, graph: &Graph) -> Result<(), GenericError> {
385        // Collect and iterate over each outbound edge in the topology graph.
386        //
387        // For each upstream component ("from" side of the edge), we attach each downstream component ("to" side of the edge) to it,
388        // creating the relevant dispatcher or consumer if necessary.
389        let outbound_edges = graph.get_outbound_directed_edges();
390        for (upstream_id, output_map) in outbound_edges {
391            match upstream_id.component_type() {
392                ComponentType::Source | ComponentType::Transform => {
393                    self.generate_event_interconnect(upstream_id, output_map)?;
394                }
395                ComponentType::Relay | ComponentType::Encoder => {
396                    self.generate_payload_interconnect(upstream_id, output_map)?
397                }
398                _ => panic!(
399                    "Only sources, transforms, relays, and encoders can dispatch events/payloads to downstream components."
400                ),
401            }
402        }
403
404        Ok(())
405    }
406
407    fn generate_event_interconnect(
408        &mut self, upstream_id: TypedComponentId, output_map: HashMap<OutputName, Vec<TypedComponentId>>,
409    ) -> Result<(), GenericError> {
410        for (upstream_output_id, downstream_ids) in output_map {
411            let mut senders = Vec::new();
412            for downstream_id in downstream_ids {
413                debug!(upstream_id = %upstream_id.component_id(), %upstream_output_id, downstream_id = %downstream_id.component_id(), "Adding dispatcher output.");
414                let sender = self.get_or_create_events_sender(downstream_id);
415                senders.push(sender);
416            }
417
418            let dispatcher = self.get_or_create_events_dispatcher(upstream_id.clone());
419            dispatcher.add_output(upstream_output_id.clone())?;
420
421            for sender in senders {
422                dispatcher.attach_sender_to_output(&upstream_output_id, sender)?;
423            }
424        }
425
426        Ok(())
427    }
428
429    fn generate_payload_interconnect(
430        &mut self, upstream_id: TypedComponentId, output_map: HashMap<OutputName, Vec<TypedComponentId>>,
431    ) -> Result<(), GenericError> {
432        for (upstream_output_id, downstream_ids) in output_map {
433            let mut senders = Vec::new();
434            for downstream_id in downstream_ids {
435                debug!(upstream_id = %upstream_id.component_id(), %upstream_output_id, downstream_id = %downstream_id.component_id(), "Adding dispatcher output.");
436                let sender = self.get_or_create_payloads_sender(downstream_id);
437                senders.push(sender);
438            }
439
440            let dispatcher = self.get_or_create_payloads_dispatcher(upstream_id.clone());
441            dispatcher.add_output(upstream_output_id.clone())?;
442
443            for sender in senders {
444                dispatcher.attach_sender_to_output(&upstream_output_id, sender)?;
445            }
446        }
447
448        Ok(())
449    }
450
451    fn get_or_create_events_dispatcher(&mut self, component_id: TypedComponentId) -> &mut EventsDispatcher {
452        let (component_id, component_type, component_context) = component_id.into_parts();
453
454        match component_type {
455            ComponentType::Source => self
456                .source_dispatchers
457                .entry(component_id)
458                .or_insert_with(|| EventsDispatcher::new(component_context)),
459            ComponentType::Transform => self
460                .transform_dispatchers
461                .entry(component_id)
462                .or_insert_with(|| EventsDispatcher::new(component_context)),
463            _ => {
464                panic!("Only sources and transforms can dispatch events to downstream components.")
465            }
466        }
467    }
468
469    fn get_or_create_events_sender(&mut self, component_id: TypedComponentId) -> mpsc::Sender<EventsBuffer> {
470        let (component_id, component_type, component_context) = component_id.into_parts();
471        let interconnect_capacity = self.interconnect_capacity;
472
473        let (sender, _) = match component_type {
474            ComponentType::Transform => self
475                .transform_consumers
476                .entry(component_id)
477                .or_insert_with(|| build_events_consumer_pair(component_context, interconnect_capacity)),
478            ComponentType::Destination => self
479                .destination_consumers
480                .entry(component_id)
481                .or_insert_with(|| build_events_consumer_pair(component_context, interconnect_capacity)),
482            ComponentType::Encoder => self
483                .encoder_consumers
484                .entry(component_id)
485                .or_insert_with(|| build_events_consumer_pair(component_context, interconnect_capacity)),
486            _ => panic!("Only transforms, destinations, and encoders can consume events."),
487        };
488
489        sender.clone()
490    }
491
492    fn get_or_create_payloads_dispatcher(&mut self, component_id: TypedComponentId) -> &mut PayloadsDispatcher {
493        let (component_id, component_type, component_context) = component_id.into_parts();
494
495        match component_type {
496            ComponentType::Relay => self
497                .relay_dispatchers
498                .entry(component_id)
499                .or_insert_with(|| PayloadsDispatcher::new(component_context)),
500            ComponentType::Encoder => self
501                .encoder_dispatchers
502                .entry(component_id)
503                .or_insert_with(|| PayloadsDispatcher::new(component_context)),
504            _ => {
505                panic!("Only relays and encoders can dispatch payloads to downstream components.")
506            }
507        }
508    }
509
510    fn get_or_create_payloads_sender(&mut self, component_id: TypedComponentId) -> mpsc::Sender<PayloadsBuffer> {
511        let (component_id, component_type, component_context) = component_id.into_parts();
512        let interconnect_capacity = self.interconnect_capacity;
513
514        let (sender, _) = match component_type {
515            ComponentType::Forwarder => self
516                .forwarder_consumers
517                .entry(component_id)
518                .or_insert_with(|| build_payloads_consumer_pair(component_context, interconnect_capacity)),
519            _ => panic!("Only forwarders can consume payloads."),
520        };
521
522        sender.clone()
523    }
524}
525
526fn build_events_consumer_pair(
527    component_context: ComponentContext, interconnect_capacity: NonZeroUsize,
528) -> (mpsc::Sender<EventsBuffer>, EventsConsumer) {
529    let (sender, receiver) = mpsc::channel(interconnect_capacity.get());
530    let consumer = EventsConsumer::new(component_context, receiver);
531    (sender, consumer)
532}
533
534fn build_payloads_consumer_pair(
535    component_context: ComponentContext, interconnect_capacity: NonZeroUsize,
536) -> (mpsc::Sender<PayloadsBuffer>, PayloadsConsumer) {
537    let (sender, receiver) = mpsc::channel(interconnect_capacity.get());
538    let consumer = PayloadsConsumer::new(component_context, receiver);
539    (sender, consumer)
540}
541
542fn spawn_component<F>(
543    join_set: &mut JoinSet<Result<(), GenericError>>, context: ComponentContext,
544    allocation_group_token: AllocationGroupToken, component_future: F,
545) -> AbortHandle
546where
547    F: Future<Output = Result<(), GenericError>> + Send + 'static,
548{
549    let component_span = error_span!(
550        "component",
551        "type" = context.component_type().as_str(),
552        id = %context.component_id(),
553    );
554
555    let _span = component_span.enter();
556    let _guard = allocation_group_token.enter();
557
558    let component_task_name = format!(
559        "topology-{}-{}",
560        context.component_type().as_str(),
561        context.component_id()
562    );
563    join_set.spawn_traced_named(component_task_name, component_future)
564}
565
566#[cfg(test)]
567mod tests {
568    use std::num::NonZeroUsize;
569
570    use super::*;
571    use crate::data_model::event::EventType;
572    use crate::data_model::payload::PayloadType;
573    use crate::topology::graph::Graph;
574
575    #[test]
576    fn component_interconnects_adds_output_before_attaching() {
577        let mut graph = Graph::default();
578
579        // Create a set of components and connect them together.
580        graph
581            .with_source("source1", EventType::EventD)
582            .with_transform("transform1", EventType::EventD, EventType::EventD)
583            .with_encoder("encoder1", EventType::EventD, PayloadType::Raw)
584            .with_forwarder("forwarder1", PayloadType::Raw)
585            .with_destination("dest1", EventType::EventD)
586            .with_edge("source1", "transform1")
587            .with_edge("transform1", "encoder1")
588            .with_edge("encoder1", "forwarder1")
589            .with_edge("transform1", "dest1");
590
591        // Ensure we can properly build the interconnects for them, which requires adding the outputs
592        // before attaching senders to them:
593        let interconnect_capacity = NonZeroUsize::new(10).unwrap();
594        let _ = ComponentInterconnects::from_graph(interconnect_capacity, &graph)
595            .expect("should build interconnects successfully");
596    }
597}