1use std::{collections::HashMap, future::Future, num::NonZeroUsize};
2
3use memory_accounting::{
4 allocator::{AllocationGroupToken, Tracked},
5 MemoryLimiter,
6};
7use saluki_common::task::JoinSetExt as _;
8use saluki_error::{generic_error, ErrorContext as _, GenericError};
9use saluki_health::HealthRegistry;
10use tokio::{
11 sync::mpsc,
12 task::{AbortHandle, JoinSet},
13};
14use tracing::{debug, error_span};
15
16use super::{
17 graph::Graph, running::RunningTopology, shutdown::ComponentShutdownCoordinator, ComponentId, EventsBuffer,
18 EventsConsumer, OutputName, PayloadsConsumer, RegisteredComponent, TypedComponentId,
19};
20use crate::{
21 components::{
22 decoders::{Decoder, DecoderContext},
23 destinations::{Destination, DestinationContext},
24 encoders::{Encoder, EncoderContext},
25 forwarders::{Forwarder, ForwarderContext},
26 relays::{Relay, RelayContext},
27 sources::{Source, SourceContext},
28 transforms::{Transform, TransformContext},
29 ComponentContext, ComponentType,
30 },
31 topology::{context::TopologyContext, EventsDispatcher, PayloadsBuffer, PayloadsDispatcher},
32};
33
34pub struct BuiltTopology {
41 name: String,
42 graph: Graph,
43 sources: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Source + Send>>>>,
44 relays: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Relay + Send>>>>,
45 decoders: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Decoder + Send>>>>,
46 transforms: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Transform + Send>>>>,
47 destinations: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Destination + Send>>>>,
48 encoders: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Encoder + Send>>>>,
49 forwarders: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Forwarder + Send>>>>,
50 component_token: AllocationGroupToken,
51 interconnect_capacity: NonZeroUsize,
52}
53
54impl BuiltTopology {
55 #[allow(clippy::too_many_arguments)]
56 pub(crate) fn from_parts(
57 name: String, graph: Graph,
58 sources: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Source + Send>>>>,
59 relays: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Relay + Send>>>>,
60 decoders: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Decoder + Send>>>>,
61 transforms: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Transform + Send>>>>,
62 destinations: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Destination + Send>>>>,
63 encoders: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Encoder + Send>>>>,
64 forwarders: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Forwarder + Send>>>>,
65 component_token: AllocationGroupToken, interconnect_capacity: NonZeroUsize,
66 ) -> Self {
67 Self {
68 name,
69 graph,
70 sources,
71 relays,
72 decoders,
73 transforms,
74 destinations,
75 encoders,
76 forwarders,
77 component_token,
78 interconnect_capacity,
79 }
80 }
81
82 pub async fn spawn(
90 self, health_registry: &HealthRegistry, memory_limiter: MemoryLimiter,
91 ) -> Result<RunningTopology, GenericError> {
92 let root_component_name = format!("topology.{}", self.name);
93
94 let _guard = self.component_token.enter();
95
96 let thread_pool = tokio::runtime::Builder::new_multi_thread()
97 .worker_threads(8)
98 .enable_all()
99 .build()
100 .error_context("Failed to build asynchronous thread pool runtime.")?;
101 let thread_pool_handle = thread_pool.handle().clone();
102
103 std::thread::spawn(move || {
104 thread_pool.block_on(std::future::pending::<()>());
105 });
106
107 let topology_context = TopologyContext::new(memory_limiter, health_registry.clone(), thread_pool_handle);
108
109 let mut component_tasks = JoinSet::new();
110 let mut component_task_map = HashMap::new();
111
112 let mut interconnects = ComponentInterconnects::from_graph(self.interconnect_capacity, &self.graph)
114 .error_context("Failed to build component interconnects.")?;
115
116 let mut shutdown_coordinator = ComponentShutdownCoordinator::default();
117
118 for (component_id, source) in self.sources {
120 let (source, component_registry) = source.into_parts();
121
122 let dispatcher = interconnects
123 .take_source_dispatcher(&component_id)
124 .ok_or_else(|| generic_error!("No events dispatcher found for source component '{}'", component_id))?;
125
126 let shutdown_handle = shutdown_coordinator.register();
127 let health_handle = health_registry
128 .register_component(format!("{}.sources.{}", root_component_name, component_id))
129 .expect("duplicate source component ID in health registry");
130
131 let component_context = ComponentContext::source(component_id.clone());
132 let context = SourceContext::new(
133 &topology_context,
134 &component_context,
135 component_registry,
136 shutdown_handle,
137 health_handle,
138 dispatcher,
139 );
140
141 let (alloc_group, source) = source.into_parts();
142 let task_handle = spawn_component(
143 &mut component_tasks,
144 component_context,
145 alloc_group,
146 source.run(context),
147 );
148 component_task_map.insert(task_handle.id(), component_id);
149 }
150
151 for (component_id, relay) in self.relays {
153 let (relay, component_registry) = relay.into_parts();
154
155 let dispatcher = interconnects
156 .take_relay_dispatcher(&component_id)
157 .ok_or_else(|| generic_error!("No payloads dispatcher found for relay component '{}'", component_id))?;
158
159 let shutdown_handle = shutdown_coordinator.register();
160 let health_handle = health_registry
161 .register_component(format!("{}.relays.{}", root_component_name, component_id))
162 .expect("duplicate relay component ID in health registry");
163
164 let component_context = ComponentContext::relay(component_id.clone());
165 let context = RelayContext::new(
166 &topology_context,
167 &component_context,
168 component_registry,
169 shutdown_handle,
170 health_handle,
171 dispatcher,
172 );
173
174 let (alloc_group, relay) = relay.into_parts();
175 let task_handle = spawn_component(&mut component_tasks, component_context, alloc_group, relay.run(context));
176 component_task_map.insert(task_handle.id(), component_id);
177 }
178
179 for (component_id, decoder) in self.decoders {
181 let (decoder, component_registry) = decoder.into_parts();
182
183 let dispatcher = interconnects
184 .take_decoder_dispatcher(&component_id)
185 .ok_or_else(|| generic_error!("No events dispatcher found for decoder component '{}'", component_id))?;
186
187 let consumer = interconnects
188 .take_decoder_consumer(&component_id)
189 .ok_or_else(|| generic_error!("No payloads consumer found for decoder component '{}'", component_id))?;
190
191 let health_handle = health_registry
192 .register_component(format!("{}.decoders.{}", root_component_name, component_id))
193 .expect("duplicate decoder component ID in health registry");
194
195 let component_context = ComponentContext::decoder(component_id.clone());
196 let context = DecoderContext::new(
197 &topology_context,
198 &component_context,
199 component_registry,
200 health_handle,
201 dispatcher,
202 consumer,
203 );
204
205 let (alloc_group, decoder) = decoder.into_parts();
206 let task_handle = spawn_component(
207 &mut component_tasks,
208 component_context,
209 alloc_group,
210 decoder.run(context),
211 );
212 component_task_map.insert(task_handle.id(), component_id);
213 }
214
215 for (component_id, transform) in self.transforms {
217 let (transform, component_registry) = transform.into_parts();
218
219 let dispatcher = interconnects.take_transform_dispatcher(&component_id).ok_or_else(|| {
220 generic_error!("No events dispatcher found for transform component '{}'", component_id)
221 })?;
222
223 let consumer = interconnects
224 .take_transform_consumer(&component_id)
225 .ok_or_else(|| generic_error!("No events consumer found for transform component '{}'", component_id))?;
226
227 let health_handle = health_registry
228 .register_component(format!("{}.transforms.{}", root_component_name, component_id))
229 .expect("duplicate transform component ID in health registry");
230
231 let component_context = ComponentContext::transform(component_id.clone());
232 let context = TransformContext::new(
233 &topology_context,
234 &component_context,
235 component_registry,
236 health_handle,
237 dispatcher,
238 consumer,
239 );
240
241 let (alloc_group, transform) = transform.into_parts();
242 let task_handle = spawn_component(
243 &mut component_tasks,
244 component_context,
245 alloc_group,
246 transform.run(context),
247 );
248 component_task_map.insert(task_handle.id(), component_id);
249 }
250
251 for (component_id, destination) in self.destinations {
253 let (destination, component_registry) = destination.into_parts();
254
255 let consumer = interconnects.take_destination_consumer(&component_id).ok_or_else(|| {
256 generic_error!("No events consumer found for destination component '{}'", component_id)
257 })?;
258
259 let health_handle = health_registry
260 .register_component(format!("{}.destinations.{}", root_component_name, component_id))
261 .expect("duplicate destination component ID in health registry");
262
263 let component_context = ComponentContext::destination(component_id.clone());
264 let context = DestinationContext::new(
265 &topology_context,
266 &component_context,
267 component_registry,
268 health_handle,
269 consumer,
270 );
271
272 let (alloc_group, destination) = destination.into_parts();
273 let task_handle = spawn_component(
274 &mut component_tasks,
275 component_context,
276 alloc_group,
277 destination.run(context),
278 );
279 component_task_map.insert(task_handle.id(), component_id);
280 }
281
282 for (component_id, encoder) in self.encoders {
284 let (encoder, component_registry) = encoder.into_parts();
285
286 let dispatcher = interconnects.take_encoder_dispatcher(&component_id).ok_or_else(|| {
287 generic_error!("No payloads dispatcher found for encoder component '{}'", component_id)
288 })?;
289
290 let consumer = interconnects
291 .take_encoder_consumer(&component_id)
292 .ok_or_else(|| generic_error!("No events consumer found for encoder component '{}'", component_id))?;
293
294 let health_handle = health_registry
295 .register_component(format!("{}.encoders.{}", root_component_name, component_id))
296 .expect("duplicate encoder component ID in health registry");
297
298 let component_context = ComponentContext::encoder(component_id.clone());
299 let context = EncoderContext::new(
300 &topology_context,
301 &component_context,
302 component_registry,
303 health_handle,
304 dispatcher,
305 consumer,
306 );
307
308 let (alloc_group, encoder) = encoder.into_parts();
309 let task_handle = spawn_component(
310 &mut component_tasks,
311 component_context,
312 alloc_group,
313 encoder.run(context),
314 );
315 component_task_map.insert(task_handle.id(), component_id);
316 }
317
318 for (component_id, forwarder) in self.forwarders {
320 let (forwarder, component_registry) = forwarder.into_parts();
321
322 let consumer = interconnects.take_forwarder_consumer(&component_id).ok_or_else(|| {
323 generic_error!("No payloads consumer found for forwarder component '{}'", component_id)
324 })?;
325
326 let health_handle = health_registry
327 .register_component(format!("{}.forwarders.{}", root_component_name, component_id))
328 .expect("duplicate forwarder component ID in health registry");
329
330 let component_context = ComponentContext::forwarder(component_id.clone());
331 let context = ForwarderContext::new(
332 &topology_context,
333 &component_context,
334 component_registry,
335 health_handle,
336 consumer,
337 );
338
339 let (alloc_group, forwarder) = forwarder.into_parts();
340 let task_handle = spawn_component(
341 &mut component_tasks,
342 component_context,
343 alloc_group,
344 forwarder.run(context),
345 );
346 component_task_map.insert(task_handle.id(), component_id);
347 }
348
349 Ok(RunningTopology::from_parts(
350 shutdown_coordinator,
351 component_tasks,
352 component_task_map,
353 ))
354 }
355}
356
357struct ComponentInterconnects {
358 interconnect_capacity: NonZeroUsize,
359 source_dispatchers: HashMap<ComponentId, EventsDispatcher>,
360 relay_dispatchers: HashMap<ComponentId, PayloadsDispatcher>,
361 decoder_consumers: HashMap<ComponentId, (mpsc::Sender<PayloadsBuffer>, PayloadsConsumer)>,
362 decoder_dispatchers: HashMap<ComponentId, EventsDispatcher>,
363 transform_consumers: HashMap<ComponentId, (mpsc::Sender<EventsBuffer>, EventsConsumer)>,
364 transform_dispatchers: HashMap<ComponentId, EventsDispatcher>,
365 destination_consumers: HashMap<ComponentId, (mpsc::Sender<EventsBuffer>, EventsConsumer)>,
366 encoder_consumers: HashMap<ComponentId, (mpsc::Sender<EventsBuffer>, EventsConsumer)>,
367 encoder_dispatchers: HashMap<ComponentId, PayloadsDispatcher>,
368 forwarder_consumers: HashMap<ComponentId, (mpsc::Sender<PayloadsBuffer>, PayloadsConsumer)>,
369}
370
371impl ComponentInterconnects {
372 fn from_graph(interconnect_capacity: NonZeroUsize, graph: &Graph) -> Result<Self, GenericError> {
373 let mut interconnects = Self {
374 interconnect_capacity,
375 source_dispatchers: HashMap::new(),
376 relay_dispatchers: HashMap::new(),
377 decoder_consumers: HashMap::new(),
378 decoder_dispatchers: HashMap::new(),
379 transform_consumers: HashMap::new(),
380 transform_dispatchers: HashMap::new(),
381 destination_consumers: HashMap::new(),
382 encoder_consumers: HashMap::new(),
383 encoder_dispatchers: HashMap::new(),
384 forwarder_consumers: HashMap::new(),
385 };
386
387 interconnects.generate_interconnects(graph)?;
388 Ok(interconnects)
389 }
390
391 fn take_source_dispatcher(&mut self, component_id: &ComponentId) -> Option<EventsDispatcher> {
392 self.source_dispatchers.remove(component_id)
393 }
394
395 fn take_relay_dispatcher(&mut self, component_id: &ComponentId) -> Option<PayloadsDispatcher> {
396 self.relay_dispatchers.remove(component_id)
397 }
398
399 fn take_decoder_dispatcher(&mut self, component_id: &ComponentId) -> Option<EventsDispatcher> {
400 self.decoder_dispatchers.remove(component_id)
401 }
402
403 fn take_decoder_consumer(&mut self, component_id: &ComponentId) -> Option<PayloadsConsumer> {
404 self.decoder_consumers
405 .remove(component_id)
406 .map(|(_, consumer)| consumer)
407 }
408
409 fn take_transform_dispatcher(&mut self, component_id: &ComponentId) -> Option<EventsDispatcher> {
410 self.transform_dispatchers.remove(component_id)
411 }
412
413 fn take_encoder_dispatcher(&mut self, component_id: &ComponentId) -> Option<PayloadsDispatcher> {
414 self.encoder_dispatchers.remove(component_id)
415 }
416
417 fn take_transform_consumer(&mut self, component_id: &ComponentId) -> Option<EventsConsumer> {
418 self.transform_consumers
419 .remove(component_id)
420 .map(|(_, consumer)| consumer)
421 }
422
423 fn take_destination_consumer(&mut self, component_id: &ComponentId) -> Option<EventsConsumer> {
424 self.destination_consumers
425 .remove(component_id)
426 .map(|(_, consumer)| consumer)
427 }
428
429 fn take_encoder_consumer(&mut self, component_id: &ComponentId) -> Option<EventsConsumer> {
430 self.encoder_consumers
431 .remove(component_id)
432 .map(|(_, consumer)| consumer)
433 }
434
435 fn take_forwarder_consumer(&mut self, component_id: &ComponentId) -> Option<PayloadsConsumer> {
436 self.forwarder_consumers
437 .remove(component_id)
438 .map(|(_, consumer)| consumer)
439 }
440
441 fn generate_interconnects(&mut self, graph: &Graph) -> Result<(), GenericError> {
442 let outbound_edges = graph.get_outbound_directed_edges();
447 for (upstream_id, output_map) in outbound_edges {
448 match upstream_id.component_type() {
449 ComponentType::Source | ComponentType::Decoder | ComponentType::Transform => {
450 self.generate_event_interconnect(upstream_id, output_map)?;
451 }
452 ComponentType::Relay | ComponentType::Encoder => {
453 self.generate_payload_interconnect(upstream_id, output_map)?
454 }
455 _ => panic!(
456 "Only sources, decoders, transforms, relays, and encoders can dispatch events/payloads to downstream components."
457 ),
458 }
459 }
460
461 Ok(())
462 }
463
464 fn generate_event_interconnect(
465 &mut self, upstream_id: TypedComponentId, output_map: HashMap<OutputName, Vec<TypedComponentId>>,
466 ) -> Result<(), GenericError> {
467 for (upstream_output_id, downstream_ids) in output_map {
468 let mut senders = Vec::new();
469 for downstream_id in downstream_ids {
470 debug!(upstream_id = %upstream_id.component_id(), %upstream_output_id, downstream_id = %downstream_id.component_id(), "Adding dispatcher output.");
471 let sender = self.get_or_create_events_sender(downstream_id);
472 senders.push(sender);
473 }
474
475 let dispatcher = self.get_or_create_events_dispatcher(upstream_id.clone());
476 dispatcher.add_output(upstream_output_id.clone())?;
477
478 for sender in senders {
479 dispatcher.attach_sender_to_output(&upstream_output_id, sender)?;
480 }
481 }
482
483 Ok(())
484 }
485
486 fn generate_payload_interconnect(
487 &mut self, upstream_id: TypedComponentId, output_map: HashMap<OutputName, Vec<TypedComponentId>>,
488 ) -> Result<(), GenericError> {
489 for (upstream_output_id, downstream_ids) in output_map {
490 let mut senders = Vec::new();
491 for downstream_id in downstream_ids {
492 debug!(upstream_id = %upstream_id.component_id(), %upstream_output_id, downstream_id = %downstream_id.component_id(), "Adding dispatcher output.");
493 let sender = self.get_or_create_payloads_sender(downstream_id);
494 senders.push(sender);
495 }
496
497 let dispatcher = self.get_or_create_payloads_dispatcher(upstream_id.clone());
498 dispatcher.add_output(upstream_output_id.clone())?;
499
500 for sender in senders {
501 dispatcher.attach_sender_to_output(&upstream_output_id, sender)?;
502 }
503 }
504
505 Ok(())
506 }
507
508 fn get_or_create_events_dispatcher(&mut self, component_id: TypedComponentId) -> &mut EventsDispatcher {
509 let (component_id, component_type, component_context) = component_id.into_parts();
510
511 match component_type {
512 ComponentType::Source => self
513 .source_dispatchers
514 .entry(component_id)
515 .or_insert_with(|| EventsDispatcher::new(component_context)),
516 ComponentType::Decoder => self
517 .decoder_dispatchers
518 .entry(component_id)
519 .or_insert_with(|| EventsDispatcher::new(component_context)),
520 ComponentType::Transform => self
521 .transform_dispatchers
522 .entry(component_id)
523 .or_insert_with(|| EventsDispatcher::new(component_context)),
524 _ => {
525 panic!("Only sources, decoders, and transforms can dispatch events to downstream components.")
526 }
527 }
528 }
529
530 fn get_or_create_events_sender(&mut self, component_id: TypedComponentId) -> mpsc::Sender<EventsBuffer> {
531 let (component_id, component_type, component_context) = component_id.into_parts();
532 let interconnect_capacity = self.interconnect_capacity;
533
534 let (sender, _) = match component_type {
535 ComponentType::Transform => self
536 .transform_consumers
537 .entry(component_id)
538 .or_insert_with(|| build_events_consumer_pair(component_context, interconnect_capacity)),
539 ComponentType::Destination => self
540 .destination_consumers
541 .entry(component_id)
542 .or_insert_with(|| build_events_consumer_pair(component_context, interconnect_capacity)),
543 ComponentType::Encoder => self
544 .encoder_consumers
545 .entry(component_id)
546 .or_insert_with(|| build_events_consumer_pair(component_context, interconnect_capacity)),
547 _ => panic!("Only transforms, destinations, and encoders can consume events."),
548 };
549
550 sender.clone()
551 }
552
553 fn get_or_create_payloads_dispatcher(&mut self, component_id: TypedComponentId) -> &mut PayloadsDispatcher {
554 let (component_id, component_type, component_context) = component_id.into_parts();
555
556 match component_type {
557 ComponentType::Relay => self
558 .relay_dispatchers
559 .entry(component_id)
560 .or_insert_with(|| PayloadsDispatcher::new(component_context)),
561 ComponentType::Encoder => self
562 .encoder_dispatchers
563 .entry(component_id)
564 .or_insert_with(|| PayloadsDispatcher::new(component_context)),
565 _ => {
566 panic!("Only relays and encoders can dispatch payloads to downstream components.")
567 }
568 }
569 }
570
571 fn get_or_create_payloads_sender(&mut self, component_id: TypedComponentId) -> mpsc::Sender<PayloadsBuffer> {
572 let (component_id, component_type, component_context) = component_id.into_parts();
573 let interconnect_capacity = self.interconnect_capacity;
574
575 let (sender, _) = match component_type {
576 ComponentType::Decoder => self
577 .decoder_consumers
578 .entry(component_id)
579 .or_insert_with(|| build_payloads_consumer_pair(component_context, interconnect_capacity)),
580 ComponentType::Forwarder => self
581 .forwarder_consumers
582 .entry(component_id)
583 .or_insert_with(|| build_payloads_consumer_pair(component_context, interconnect_capacity)),
584 _ => panic!("Only decoders and forwarders can consume payloads."),
585 };
586
587 sender.clone()
588 }
589}
590
591fn build_events_consumer_pair(
592 component_context: ComponentContext, interconnect_capacity: NonZeroUsize,
593) -> (mpsc::Sender<EventsBuffer>, EventsConsumer) {
594 let (sender, receiver) = mpsc::channel(interconnect_capacity.get());
595 let consumer = EventsConsumer::new(component_context, receiver);
596 (sender, consumer)
597}
598
599fn build_payloads_consumer_pair(
600 component_context: ComponentContext, interconnect_capacity: NonZeroUsize,
601) -> (mpsc::Sender<PayloadsBuffer>, PayloadsConsumer) {
602 let (sender, receiver) = mpsc::channel(interconnect_capacity.get());
603 let consumer = PayloadsConsumer::new(component_context, receiver);
604 (sender, consumer)
605}
606
607fn spawn_component<F>(
608 join_set: &mut JoinSet<Result<(), GenericError>>, context: ComponentContext,
609 allocation_group_token: AllocationGroupToken, component_future: F,
610) -> AbortHandle
611where
612 F: Future<Output = Result<(), GenericError>> + Send + 'static,
613{
614 let component_span = error_span!(
615 "component",
616 "type" = context.component_type().as_str(),
617 id = %context.component_id(),
618 );
619
620 let _span = component_span.enter();
621 let _guard = allocation_group_token.enter();
622
623 let component_task_name = format!(
624 "topology-{}-{}",
625 context.component_type().as_str(),
626 context.component_id()
627 );
628 join_set.spawn_traced_named(component_task_name, component_future)
629}
630
631#[cfg(test)]
632mod tests {
633 use std::num::NonZeroUsize;
634
635 use super::*;
636 use crate::data_model::event::EventType;
637 use crate::data_model::payload::PayloadType;
638 use crate::topology::graph::Graph;
639
640 #[test]
641 fn component_interconnects_adds_output_before_attaching() {
642 let mut graph = Graph::default();
643
644 graph
646 .with_source("source1", EventType::EventD)
647 .with_transform("transform1", EventType::EventD, EventType::EventD)
648 .with_encoder("encoder1", EventType::EventD, PayloadType::Raw)
649 .with_forwarder("forwarder1", PayloadType::Raw)
650 .with_destination("dest1", EventType::EventD)
651 .with_edge("source1", "transform1")
652 .with_edge("transform1", "encoder1")
653 .with_edge("encoder1", "forwarder1")
654 .with_edge("transform1", "dest1");
655
656 let interconnect_capacity = NonZeroUsize::new(10).unwrap();
659 let _ = ComponentInterconnects::from_graph(interconnect_capacity, &graph)
660 .expect("should build interconnects successfully");
661 }
662}