1use std::{collections::HashMap, future::Future, num::NonZeroUsize};
2
3use memory_accounting::{
4 allocator::{AllocationGroupToken, Tracked},
5 MemoryLimiter,
6};
7use saluki_common::task::JoinSetExt as _;
8use saluki_error::{generic_error, ErrorContext as _, GenericError};
9use saluki_health::HealthRegistry;
10use tokio::{
11 sync::mpsc,
12 task::{AbortHandle, JoinSet},
13};
14use tracing::{debug, error_span};
15
16use super::{
17 graph::Graph, running::RunningTopology, shutdown::ComponentShutdownCoordinator, ComponentId, EventsBuffer,
18 EventsConsumer, OutputName, PayloadsConsumer, RegisteredComponent, TypedComponentId,
19};
20use crate::{
21 components::{
22 destinations::{Destination, DestinationContext},
23 encoders::{Encoder, EncoderContext},
24 forwarders::{Forwarder, ForwarderContext},
25 relays::{Relay, RelayContext},
26 sources::{Source, SourceContext},
27 transforms::{Transform, TransformContext},
28 ComponentContext, ComponentType,
29 },
30 topology::{context::TopologyContext, EventsDispatcher, PayloadsBuffer, PayloadsDispatcher},
31};
32
33pub struct BuiltTopology {
40 name: String,
41 graph: Graph,
42 sources: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Source + Send>>>>,
43 relays: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Relay + Send>>>>,
44 transforms: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Transform + Send>>>>,
45 destinations: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Destination + Send>>>>,
46 encoders: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Encoder + Send>>>>,
47 forwarders: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Forwarder + Send>>>>,
48 component_token: AllocationGroupToken,
49 interconnect_capacity: NonZeroUsize,
50}
51
52impl BuiltTopology {
53 #[allow(clippy::too_many_arguments)]
54 pub(crate) fn from_parts(
55 name: String, graph: Graph,
56 sources: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Source + Send>>>>,
57 relays: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Relay + Send>>>>,
58 transforms: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Transform + Send>>>>,
59 destinations: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Destination + Send>>>>,
60 encoders: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Encoder + Send>>>>,
61 forwarders: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Forwarder + Send>>>>,
62 component_token: AllocationGroupToken, interconnect_capacity: NonZeroUsize,
63 ) -> Self {
64 Self {
65 name,
66 graph,
67 sources,
68 relays,
69 transforms,
70 destinations,
71 encoders,
72 forwarders,
73 component_token,
74 interconnect_capacity,
75 }
76 }
77
78 pub async fn spawn(
86 self, health_registry: &HealthRegistry, memory_limiter: MemoryLimiter,
87 ) -> Result<RunningTopology, GenericError> {
88 let root_component_name = format!("topology.{}", self.name);
89
90 let _guard = self.component_token.enter();
91
92 let thread_pool = tokio::runtime::Builder::new_multi_thread()
93 .worker_threads(8)
94 .enable_all()
95 .build()
96 .error_context("Failed to build asynchronous thread pool runtime.")?;
97 let thread_pool_handle = thread_pool.handle().clone();
98
99 std::thread::spawn(move || {
100 thread_pool.block_on(std::future::pending::<()>());
101 });
102
103 let topology_context = TopologyContext::new(memory_limiter, health_registry.clone(), thread_pool_handle);
104
105 let mut component_tasks = JoinSet::new();
106 let mut component_task_map = HashMap::new();
107
108 let mut interconnects = ComponentInterconnects::from_graph(self.interconnect_capacity, &self.graph)
110 .error_context("Failed to build component interconnects.")?;
111
112 let mut shutdown_coordinator = ComponentShutdownCoordinator::default();
113
114 for (component_id, source) in self.sources {
116 let (source, component_registry) = source.into_parts();
117
118 let dispatcher = interconnects
119 .take_source_dispatcher(&component_id)
120 .ok_or_else(|| generic_error!("No events dispatcher found for source component '{}'", component_id))?;
121
122 let shutdown_handle = shutdown_coordinator.register();
123 let health_handle = health_registry
124 .register_component(format!("{}.sources.{}", root_component_name, component_id))
125 .expect("duplicate source component ID in health registry");
126
127 let component_context = ComponentContext::source(component_id.clone());
128 let context = SourceContext::new(
129 &topology_context,
130 &component_context,
131 component_registry,
132 shutdown_handle,
133 health_handle,
134 dispatcher,
135 );
136
137 let (alloc_group, source) = source.into_parts();
138 let task_handle = spawn_component(
139 &mut component_tasks,
140 component_context,
141 alloc_group,
142 source.run(context),
143 );
144 component_task_map.insert(task_handle.id(), component_id);
145 }
146
147 for (component_id, relay) in self.relays {
149 let (relay, component_registry) = relay.into_parts();
150
151 let dispatcher = interconnects
152 .take_relay_dispatcher(&component_id)
153 .ok_or_else(|| generic_error!("No payloads dispatcher found for relay component '{}'", component_id))?;
154
155 let shutdown_handle = shutdown_coordinator.register();
156 let health_handle = health_registry
157 .register_component(format!("{}.relays.{}", root_component_name, component_id))
158 .expect("duplicate relay component ID in health registry");
159
160 let component_context = ComponentContext::relay(component_id.clone());
161 let context = RelayContext::new(
162 &topology_context,
163 &component_context,
164 component_registry,
165 shutdown_handle,
166 health_handle,
167 dispatcher,
168 );
169
170 let (alloc_group, relay) = relay.into_parts();
171 let task_handle = spawn_component(&mut component_tasks, component_context, alloc_group, relay.run(context));
172 component_task_map.insert(task_handle.id(), component_id);
173 }
174
175 for (component_id, transform) in self.transforms {
177 let (transform, component_registry) = transform.into_parts();
178
179 let dispatcher = interconnects.take_transform_dispatcher(&component_id).ok_or_else(|| {
180 generic_error!("No events dispatcher found for transform component '{}'", component_id)
181 })?;
182
183 let consumer = interconnects
184 .take_transform_consumer(&component_id)
185 .ok_or_else(|| generic_error!("No events consumer found for transform component '{}'", component_id))?;
186
187 let health_handle = health_registry
188 .register_component(format!("{}.transforms.{}", root_component_name, component_id))
189 .expect("duplicate transform component ID in health registry");
190
191 let component_context = ComponentContext::transform(component_id.clone());
192 let context = TransformContext::new(
193 &topology_context,
194 &component_context,
195 component_registry,
196 health_handle,
197 dispatcher,
198 consumer,
199 );
200
201 let (alloc_group, transform) = transform.into_parts();
202 let task_handle = spawn_component(
203 &mut component_tasks,
204 component_context,
205 alloc_group,
206 transform.run(context),
207 );
208 component_task_map.insert(task_handle.id(), component_id);
209 }
210
211 for (component_id, destination) in self.destinations {
213 let (destination, component_registry) = destination.into_parts();
214
215 let consumer = interconnects.take_destination_consumer(&component_id).ok_or_else(|| {
216 generic_error!("No events consumer found for destination component '{}'", component_id)
217 })?;
218
219 let health_handle = health_registry
220 .register_component(format!("{}.destinations.{}", root_component_name, component_id))
221 .expect("duplicate destination component ID in health registry");
222
223 let component_context = ComponentContext::destination(component_id.clone());
224 let context = DestinationContext::new(
225 &topology_context,
226 &component_context,
227 component_registry,
228 health_handle,
229 consumer,
230 );
231
232 let (alloc_group, destination) = destination.into_parts();
233 let task_handle = spawn_component(
234 &mut component_tasks,
235 component_context,
236 alloc_group,
237 destination.run(context),
238 );
239 component_task_map.insert(task_handle.id(), component_id);
240 }
241
242 for (component_id, encoder) in self.encoders {
244 let (encoder, component_registry) = encoder.into_parts();
245
246 let dispatcher = interconnects.take_encoder_dispatcher(&component_id).ok_or_else(|| {
247 generic_error!("No payloads dispatcher found for encoder component '{}'", component_id)
248 })?;
249
250 let consumer = interconnects
251 .take_encoder_consumer(&component_id)
252 .ok_or_else(|| generic_error!("No events consumer found for encoder component '{}'", component_id))?;
253
254 let health_handle = health_registry
255 .register_component(format!("{}.encoders.{}", root_component_name, component_id))
256 .expect("duplicate encoder component ID in health registry");
257
258 let component_context = ComponentContext::encoder(component_id.clone());
259 let context = EncoderContext::new(
260 &topology_context,
261 &component_context,
262 component_registry,
263 health_handle,
264 dispatcher,
265 consumer,
266 );
267
268 let (alloc_group, encoder) = encoder.into_parts();
269 let task_handle = spawn_component(
270 &mut component_tasks,
271 component_context,
272 alloc_group,
273 encoder.run(context),
274 );
275 component_task_map.insert(task_handle.id(), component_id);
276 }
277
278 for (component_id, forwarder) in self.forwarders {
280 let (forwarder, component_registry) = forwarder.into_parts();
281
282 let consumer = interconnects.take_forwarder_consumer(&component_id).ok_or_else(|| {
283 generic_error!("No payloads consumer found for forwarder component '{}'", component_id)
284 })?;
285
286 let health_handle = health_registry
287 .register_component(format!("{}.forwarders.{}", root_component_name, component_id))
288 .expect("duplicate forwarder component ID in health registry");
289
290 let component_context = ComponentContext::forwarder(component_id.clone());
291 let context = ForwarderContext::new(
292 &topology_context,
293 &component_context,
294 component_registry,
295 health_handle,
296 consumer,
297 );
298
299 let (alloc_group, forwarder) = forwarder.into_parts();
300 let task_handle = spawn_component(
301 &mut component_tasks,
302 component_context,
303 alloc_group,
304 forwarder.run(context),
305 );
306 component_task_map.insert(task_handle.id(), component_id);
307 }
308
309 Ok(RunningTopology::from_parts(
310 shutdown_coordinator,
311 component_tasks,
312 component_task_map,
313 ))
314 }
315}
316
317struct ComponentInterconnects {
318 interconnect_capacity: NonZeroUsize,
319 source_dispatchers: HashMap<ComponentId, EventsDispatcher>,
320 relay_dispatchers: HashMap<ComponentId, PayloadsDispatcher>,
321 transform_consumers: HashMap<ComponentId, (mpsc::Sender<EventsBuffer>, EventsConsumer)>,
322 transform_dispatchers: HashMap<ComponentId, EventsDispatcher>,
323 destination_consumers: HashMap<ComponentId, (mpsc::Sender<EventsBuffer>, EventsConsumer)>,
324 encoder_consumers: HashMap<ComponentId, (mpsc::Sender<EventsBuffer>, EventsConsumer)>,
325 encoder_dispatchers: HashMap<ComponentId, PayloadsDispatcher>,
326 forwarder_consumers: HashMap<ComponentId, (mpsc::Sender<PayloadsBuffer>, PayloadsConsumer)>,
327}
328
329impl ComponentInterconnects {
330 fn from_graph(interconnect_capacity: NonZeroUsize, graph: &Graph) -> Result<Self, GenericError> {
331 let mut interconnects = Self {
332 interconnect_capacity,
333 source_dispatchers: HashMap::new(),
334 relay_dispatchers: HashMap::new(),
335 transform_consumers: HashMap::new(),
336 transform_dispatchers: HashMap::new(),
337 destination_consumers: HashMap::new(),
338 encoder_consumers: HashMap::new(),
339 encoder_dispatchers: HashMap::new(),
340 forwarder_consumers: HashMap::new(),
341 };
342
343 interconnects.generate_interconnects(graph)?;
344 Ok(interconnects)
345 }
346
347 fn take_source_dispatcher(&mut self, component_id: &ComponentId) -> Option<EventsDispatcher> {
348 self.source_dispatchers.remove(component_id)
349 }
350
351 fn take_relay_dispatcher(&mut self, component_id: &ComponentId) -> Option<PayloadsDispatcher> {
352 self.relay_dispatchers.remove(component_id)
353 }
354
355 fn take_transform_dispatcher(&mut self, component_id: &ComponentId) -> Option<EventsDispatcher> {
356 self.transform_dispatchers.remove(component_id)
357 }
358
359 fn take_encoder_dispatcher(&mut self, component_id: &ComponentId) -> Option<PayloadsDispatcher> {
360 self.encoder_dispatchers.remove(component_id)
361 }
362
363 fn take_transform_consumer(&mut self, component_id: &ComponentId) -> Option<EventsConsumer> {
364 self.transform_consumers
365 .remove(component_id)
366 .map(|(_, consumer)| consumer)
367 }
368
369 fn take_destination_consumer(&mut self, component_id: &ComponentId) -> Option<EventsConsumer> {
370 self.destination_consumers
371 .remove(component_id)
372 .map(|(_, consumer)| consumer)
373 }
374
375 fn take_encoder_consumer(&mut self, component_id: &ComponentId) -> Option<EventsConsumer> {
376 self.encoder_consumers
377 .remove(component_id)
378 .map(|(_, consumer)| consumer)
379 }
380
381 fn take_forwarder_consumer(&mut self, component_id: &ComponentId) -> Option<PayloadsConsumer> {
382 self.forwarder_consumers
383 .remove(component_id)
384 .map(|(_, consumer)| consumer)
385 }
386
387 fn generate_interconnects(&mut self, graph: &Graph) -> Result<(), GenericError> {
388 let outbound_edges = graph.get_outbound_directed_edges();
393 for (upstream_id, output_map) in outbound_edges {
394 match upstream_id.component_type() {
395 ComponentType::Source | ComponentType::Transform => {
396 self.generate_event_interconnect(upstream_id, output_map)?;
397 }
398 ComponentType::Relay | ComponentType::Encoder => {
399 self.generate_payload_interconnect(upstream_id, output_map)?
400 }
401 _ => panic!(
402 "Only sources, transforms, relays, and encoders can dispatch events/payloads to downstream components."
403 ),
404 }
405 }
406
407 Ok(())
408 }
409
410 fn generate_event_interconnect(
411 &mut self, upstream_id: TypedComponentId, output_map: HashMap<OutputName, Vec<TypedComponentId>>,
412 ) -> Result<(), GenericError> {
413 for (upstream_output_id, downstream_ids) in output_map {
414 let mut senders = Vec::new();
415 for downstream_id in downstream_ids {
416 debug!(upstream_id = %upstream_id.component_id(), %upstream_output_id, downstream_id = %downstream_id.component_id(), "Adding dispatcher output.");
417 let sender = self.get_or_create_events_sender(downstream_id);
418 senders.push(sender);
419 }
420
421 let dispatcher = self.get_or_create_events_dispatcher(upstream_id.clone());
422 dispatcher.add_output(upstream_output_id.clone())?;
423
424 for sender in senders {
425 dispatcher.attach_sender_to_output(&upstream_output_id, sender)?;
426 }
427 }
428
429 Ok(())
430 }
431
432 fn generate_payload_interconnect(
433 &mut self, upstream_id: TypedComponentId, output_map: HashMap<OutputName, Vec<TypedComponentId>>,
434 ) -> Result<(), GenericError> {
435 for (upstream_output_id, downstream_ids) in output_map {
436 let mut senders = Vec::new();
437 for downstream_id in downstream_ids {
438 debug!(upstream_id = %upstream_id.component_id(), %upstream_output_id, downstream_id = %downstream_id.component_id(), "Adding dispatcher output.");
439 let sender = self.get_or_create_payloads_sender(downstream_id);
440 senders.push(sender);
441 }
442
443 let dispatcher = self.get_or_create_payloads_dispatcher(upstream_id.clone());
444 dispatcher.add_output(upstream_output_id.clone())?;
445
446 for sender in senders {
447 dispatcher.attach_sender_to_output(&upstream_output_id, sender)?;
448 }
449 }
450
451 Ok(())
452 }
453
454 fn get_or_create_events_dispatcher(&mut self, component_id: TypedComponentId) -> &mut EventsDispatcher {
455 let (component_id, component_type, component_context) = component_id.into_parts();
456
457 match component_type {
458 ComponentType::Source => self
459 .source_dispatchers
460 .entry(component_id)
461 .or_insert_with(|| EventsDispatcher::new(component_context)),
462 ComponentType::Transform => self
463 .transform_dispatchers
464 .entry(component_id)
465 .or_insert_with(|| EventsDispatcher::new(component_context)),
466 _ => {
467 panic!("Only sources and transforms can dispatch events to downstream components.")
468 }
469 }
470 }
471
472 fn get_or_create_events_sender(&mut self, component_id: TypedComponentId) -> mpsc::Sender<EventsBuffer> {
473 let (component_id, component_type, component_context) = component_id.into_parts();
474 let interconnect_capacity = self.interconnect_capacity;
475
476 let (sender, _) = match component_type {
477 ComponentType::Transform => self
478 .transform_consumers
479 .entry(component_id)
480 .or_insert_with(|| build_events_consumer_pair(component_context, interconnect_capacity)),
481 ComponentType::Destination => self
482 .destination_consumers
483 .entry(component_id)
484 .or_insert_with(|| build_events_consumer_pair(component_context, interconnect_capacity)),
485 ComponentType::Encoder => self
486 .encoder_consumers
487 .entry(component_id)
488 .or_insert_with(|| build_events_consumer_pair(component_context, interconnect_capacity)),
489 _ => panic!("Only transforms, destinations, and encoders can consume events."),
490 };
491
492 sender.clone()
493 }
494
495 fn get_or_create_payloads_dispatcher(&mut self, component_id: TypedComponentId) -> &mut PayloadsDispatcher {
496 let (component_id, component_type, component_context) = component_id.into_parts();
497
498 match component_type {
499 ComponentType::Relay => self
500 .relay_dispatchers
501 .entry(component_id)
502 .or_insert_with(|| PayloadsDispatcher::new(component_context)),
503 ComponentType::Encoder => self
504 .encoder_dispatchers
505 .entry(component_id)
506 .or_insert_with(|| PayloadsDispatcher::new(component_context)),
507 _ => {
508 panic!("Only relays and encoders can dispatch payloads to downstream components.")
509 }
510 }
511 }
512
513 fn get_or_create_payloads_sender(&mut self, component_id: TypedComponentId) -> mpsc::Sender<PayloadsBuffer> {
514 let (component_id, component_type, component_context) = component_id.into_parts();
515 let interconnect_capacity = self.interconnect_capacity;
516
517 let (sender, _) = match component_type {
518 ComponentType::Forwarder => self
519 .forwarder_consumers
520 .entry(component_id)
521 .or_insert_with(|| build_payloads_consumer_pair(component_context, interconnect_capacity)),
522 _ => panic!("Only forwarders can consume payloads."),
523 };
524
525 sender.clone()
526 }
527}
528
529fn build_events_consumer_pair(
530 component_context: ComponentContext, interconnect_capacity: NonZeroUsize,
531) -> (mpsc::Sender<EventsBuffer>, EventsConsumer) {
532 let (sender, receiver) = mpsc::channel(interconnect_capacity.get());
533 let consumer = EventsConsumer::new(component_context, receiver);
534 (sender, consumer)
535}
536
537fn build_payloads_consumer_pair(
538 component_context: ComponentContext, interconnect_capacity: NonZeroUsize,
539) -> (mpsc::Sender<PayloadsBuffer>, PayloadsConsumer) {
540 let (sender, receiver) = mpsc::channel(interconnect_capacity.get());
541 let consumer = PayloadsConsumer::new(component_context, receiver);
542 (sender, consumer)
543}
544
545fn spawn_component<F>(
546 join_set: &mut JoinSet<Result<(), GenericError>>, context: ComponentContext,
547 allocation_group_token: AllocationGroupToken, component_future: F,
548) -> AbortHandle
549where
550 F: Future<Output = Result<(), GenericError>> + Send + 'static,
551{
552 let component_span = error_span!(
553 "component",
554 "type" = context.component_type().as_str(),
555 id = %context.component_id(),
556 );
557
558 let _span = component_span.enter();
559 let _guard = allocation_group_token.enter();
560
561 let component_task_name = format!(
562 "topology-{}-{}",
563 context.component_type().as_str(),
564 context.component_id()
565 );
566 join_set.spawn_traced_named(component_task_name, component_future)
567}
568
569#[cfg(test)]
570mod tests {
571 use std::num::NonZeroUsize;
572
573 use super::*;
574 use crate::data_model::event::EventType;
575 use crate::data_model::payload::PayloadType;
576 use crate::topology::graph::Graph;
577
578 #[test]
579 fn component_interconnects_adds_output_before_attaching() {
580 let mut graph = Graph::default();
581
582 graph
584 .with_source("source1", EventType::EventD)
585 .with_transform("transform1", EventType::EventD, EventType::EventD)
586 .with_encoder("encoder1", EventType::EventD, PayloadType::Raw)
587 .with_forwarder("forwarder1", PayloadType::Raw)
588 .with_destination("dest1", EventType::EventD)
589 .with_edge("source1", "transform1")
590 .with_edge("transform1", "encoder1")
591 .with_edge("encoder1", "forwarder1")
592 .with_edge("transform1", "dest1");
593
594 let interconnect_capacity = NonZeroUsize::new(10).unwrap();
597 let _ = ComponentInterconnects::from_graph(interconnect_capacity, &graph)
598 .expect("should build interconnects successfully");
599 }
600}