1use std::{collections::HashMap, future::Future, num::NonZeroUsize};
2
3use memory_accounting::{
4 allocator::{AllocationGroupToken, Tracked},
5 MemoryLimiter,
6};
7use saluki_common::task::JoinSetExt as _;
8use saluki_error::{generic_error, ErrorContext as _, GenericError};
9use saluki_health::HealthRegistry;
10use tokio::{
11 sync::mpsc,
12 task::{AbortHandle, JoinSet},
13};
14use tracing::{debug, error_span};
15
16use super::{
17 graph::Graph, running::RunningTopology, shutdown::ComponentShutdownCoordinator, ComponentId, EventsBuffer,
18 EventsConsumer, OutputName, PayloadsConsumer, RegisteredComponent, TypedComponentId,
19};
20use crate::{
21 components::{
22 destinations::{Destination, DestinationContext},
23 encoders::{Encoder, EncoderContext},
24 forwarders::{Forwarder, ForwarderContext},
25 relays::{Relay, RelayContext},
26 sources::{Source, SourceContext},
27 transforms::{Transform, TransformContext},
28 ComponentContext, ComponentType,
29 },
30 topology::{context::TopologyContext, EventsDispatcher, PayloadsBuffer, PayloadsDispatcher},
31};
32
33pub struct BuiltTopology {
40 name: String,
41 graph: Graph,
42 sources: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Source + Send>>>>,
43 relays: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Relay + Send>>>>,
44 transforms: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Transform + Send>>>>,
45 destinations: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Destination + Send>>>>,
46 encoders: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Encoder + Send>>>>,
47 forwarders: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Forwarder + Send>>>>,
48 component_token: AllocationGroupToken,
49 interconnect_capacity: NonZeroUsize,
50}
51
52impl BuiltTopology {
53 #[allow(clippy::too_many_arguments)]
54 pub(crate) fn from_parts(
55 name: String, graph: Graph,
56 sources: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Source + Send>>>>,
57 relays: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Relay + Send>>>>,
58 transforms: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Transform + Send>>>>,
59 destinations: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Destination + Send>>>>,
60 encoders: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Encoder + Send>>>>,
61 forwarders: HashMap<ComponentId, RegisteredComponent<Tracked<Box<dyn Forwarder + Send>>>>,
62 component_token: AllocationGroupToken, interconnect_capacity: NonZeroUsize,
63 ) -> Self {
64 Self {
65 name,
66 graph,
67 sources,
68 relays,
69 transforms,
70 destinations,
71 encoders,
72 forwarders,
73 component_token,
74 interconnect_capacity,
75 }
76 }
77
78 pub async fn spawn(
86 self, health_registry: &HealthRegistry, memory_limiter: MemoryLimiter,
87 ) -> Result<RunningTopology, GenericError> {
88 let root_component_name = format!("topology.{}", self.name);
89
90 let _guard = self.component_token.enter();
91
92 let thread_pool = tokio::runtime::Builder::new_multi_thread()
93 .worker_threads(8)
94 .enable_all()
95 .build()
96 .error_context("Failed to build asynchronous thread pool runtime.")?;
97 let thread_pool_handle = thread_pool.handle().clone();
98
99 let topology_context = TopologyContext::new(memory_limiter, health_registry.clone(), thread_pool_handle);
100
101 let mut component_tasks = JoinSet::new();
102 let mut component_task_map = HashMap::new();
103
104 let mut interconnects = ComponentInterconnects::from_graph(self.interconnect_capacity, &self.graph)
106 .error_context("Failed to build component interconnects.")?;
107
108 let mut shutdown_coordinator = ComponentShutdownCoordinator::default();
109
110 for (component_id, source) in self.sources {
112 let (source, component_registry) = source.into_parts();
113
114 let dispatcher = interconnects
115 .take_source_dispatcher(&component_id)
116 .ok_or_else(|| generic_error!("No events dispatcher found for source component '{}'", component_id))?;
117
118 let shutdown_handle = shutdown_coordinator.register();
119 let health_handle = health_registry
120 .register_component(format!("{}.sources.{}", root_component_name, component_id))
121 .expect("duplicate source component ID in health registry");
122
123 let component_context = ComponentContext::source(component_id.clone());
124 let context = SourceContext::new(
125 &topology_context,
126 &component_context,
127 component_registry,
128 shutdown_handle,
129 health_handle,
130 dispatcher,
131 );
132
133 let (alloc_group, source) = source.into_parts();
134 let task_handle = spawn_component(
135 &mut component_tasks,
136 component_context,
137 alloc_group,
138 source.run(context),
139 );
140 component_task_map.insert(task_handle.id(), component_id);
141 }
142
143 for (component_id, relay) in self.relays {
145 let (relay, component_registry) = relay.into_parts();
146
147 let dispatcher = interconnects
148 .take_relay_dispatcher(&component_id)
149 .ok_or_else(|| generic_error!("No payloads dispatcher found for relay component '{}'", component_id))?;
150
151 let shutdown_handle = shutdown_coordinator.register();
152 let health_handle = health_registry
153 .register_component(format!("{}.relays.{}", root_component_name, component_id))
154 .expect("duplicate relay component ID in health registry");
155
156 let component_context = ComponentContext::relay(component_id.clone());
157 let context = RelayContext::new(
158 &topology_context,
159 &component_context,
160 component_registry,
161 shutdown_handle,
162 health_handle,
163 dispatcher,
164 );
165
166 let (alloc_group, relay) = relay.into_parts();
167 let task_handle = spawn_component(&mut component_tasks, component_context, alloc_group, relay.run(context));
168 component_task_map.insert(task_handle.id(), component_id);
169 }
170
171 for (component_id, transform) in self.transforms {
173 let (transform, component_registry) = transform.into_parts();
174
175 let dispatcher = interconnects.take_transform_dispatcher(&component_id).ok_or_else(|| {
176 generic_error!("No events dispatcher found for transform component '{}'", component_id)
177 })?;
178
179 let consumer = interconnects
180 .take_transform_consumer(&component_id)
181 .ok_or_else(|| generic_error!("No events consumer found for transform component '{}'", component_id))?;
182
183 let health_handle = health_registry
184 .register_component(format!("{}.transforms.{}", root_component_name, component_id))
185 .expect("duplicate transform component ID in health registry");
186
187 let component_context = ComponentContext::transform(component_id.clone());
188 let context = TransformContext::new(
189 &topology_context,
190 &component_context,
191 component_registry,
192 health_handle,
193 dispatcher,
194 consumer,
195 );
196
197 let (alloc_group, transform) = transform.into_parts();
198 let task_handle = spawn_component(
199 &mut component_tasks,
200 component_context,
201 alloc_group,
202 transform.run(context),
203 );
204 component_task_map.insert(task_handle.id(), component_id);
205 }
206
207 for (component_id, destination) in self.destinations {
209 let (destination, component_registry) = destination.into_parts();
210
211 let consumer = interconnects.take_destination_consumer(&component_id).ok_or_else(|| {
212 generic_error!("No events consumer found for destination component '{}'", component_id)
213 })?;
214
215 let health_handle = health_registry
216 .register_component(format!("{}.destinations.{}", root_component_name, component_id))
217 .expect("duplicate destination component ID in health registry");
218
219 let component_context = ComponentContext::destination(component_id.clone());
220 let context = DestinationContext::new(
221 &topology_context,
222 &component_context,
223 component_registry,
224 health_handle,
225 consumer,
226 );
227
228 let (alloc_group, destination) = destination.into_parts();
229 let task_handle = spawn_component(
230 &mut component_tasks,
231 component_context,
232 alloc_group,
233 destination.run(context),
234 );
235 component_task_map.insert(task_handle.id(), component_id);
236 }
237
238 for (component_id, encoder) in self.encoders {
240 let (encoder, component_registry) = encoder.into_parts();
241
242 let dispatcher = interconnects.take_encoder_dispatcher(&component_id).ok_or_else(|| {
243 generic_error!("No payloads dispatcher found for encoder component '{}'", component_id)
244 })?;
245
246 let consumer = interconnects
247 .take_encoder_consumer(&component_id)
248 .ok_or_else(|| generic_error!("No events consumer found for encoder component '{}'", component_id))?;
249
250 let health_handle = health_registry
251 .register_component(format!("{}.encoders.{}", root_component_name, component_id))
252 .expect("duplicate encoder component ID in health registry");
253
254 let component_context = ComponentContext::encoder(component_id.clone());
255 let context = EncoderContext::new(
256 &topology_context,
257 &component_context,
258 component_registry,
259 health_handle,
260 dispatcher,
261 consumer,
262 );
263
264 let (alloc_group, encoder) = encoder.into_parts();
265 let task_handle = spawn_component(
266 &mut component_tasks,
267 component_context,
268 alloc_group,
269 encoder.run(context),
270 );
271 component_task_map.insert(task_handle.id(), component_id);
272 }
273
274 for (component_id, forwarder) in self.forwarders {
276 let (forwarder, component_registry) = forwarder.into_parts();
277
278 let consumer = interconnects.take_forwarder_consumer(&component_id).ok_or_else(|| {
279 generic_error!("No payloads consumer found for forwarder component '{}'", component_id)
280 })?;
281
282 let health_handle = health_registry
283 .register_component(format!("{}.forwarders.{}", root_component_name, component_id))
284 .expect("duplicate forwarder component ID in health registry");
285
286 let component_context = ComponentContext::forwarder(component_id.clone());
287 let context = ForwarderContext::new(
288 &topology_context,
289 &component_context,
290 component_registry,
291 health_handle,
292 consumer,
293 );
294
295 let (alloc_group, forwarder) = forwarder.into_parts();
296 let task_handle = spawn_component(
297 &mut component_tasks,
298 component_context,
299 alloc_group,
300 forwarder.run(context),
301 );
302 component_task_map.insert(task_handle.id(), component_id);
303 }
304
305 Ok(RunningTopology::from_parts(
306 thread_pool,
307 shutdown_coordinator,
308 component_tasks,
309 component_task_map,
310 ))
311 }
312}
313
314struct ComponentInterconnects {
315 interconnect_capacity: NonZeroUsize,
316 source_dispatchers: HashMap<ComponentId, EventsDispatcher>,
317 relay_dispatchers: HashMap<ComponentId, PayloadsDispatcher>,
318 transform_consumers: HashMap<ComponentId, (mpsc::Sender<EventsBuffer>, EventsConsumer)>,
319 transform_dispatchers: HashMap<ComponentId, EventsDispatcher>,
320 destination_consumers: HashMap<ComponentId, (mpsc::Sender<EventsBuffer>, EventsConsumer)>,
321 encoder_consumers: HashMap<ComponentId, (mpsc::Sender<EventsBuffer>, EventsConsumer)>,
322 encoder_dispatchers: HashMap<ComponentId, PayloadsDispatcher>,
323 forwarder_consumers: HashMap<ComponentId, (mpsc::Sender<PayloadsBuffer>, PayloadsConsumer)>,
324}
325
326impl ComponentInterconnects {
327 fn from_graph(interconnect_capacity: NonZeroUsize, graph: &Graph) -> Result<Self, GenericError> {
328 let mut interconnects = Self {
329 interconnect_capacity,
330 source_dispatchers: HashMap::new(),
331 relay_dispatchers: HashMap::new(),
332 transform_consumers: HashMap::new(),
333 transform_dispatchers: HashMap::new(),
334 destination_consumers: HashMap::new(),
335 encoder_consumers: HashMap::new(),
336 encoder_dispatchers: HashMap::new(),
337 forwarder_consumers: HashMap::new(),
338 };
339
340 interconnects.generate_interconnects(graph)?;
341 Ok(interconnects)
342 }
343
344 fn take_source_dispatcher(&mut self, component_id: &ComponentId) -> Option<EventsDispatcher> {
345 self.source_dispatchers.remove(component_id)
346 }
347
348 fn take_relay_dispatcher(&mut self, component_id: &ComponentId) -> Option<PayloadsDispatcher> {
349 self.relay_dispatchers.remove(component_id)
350 }
351
352 fn take_transform_dispatcher(&mut self, component_id: &ComponentId) -> Option<EventsDispatcher> {
353 self.transform_dispatchers.remove(component_id)
354 }
355
356 fn take_encoder_dispatcher(&mut self, component_id: &ComponentId) -> Option<PayloadsDispatcher> {
357 self.encoder_dispatchers.remove(component_id)
358 }
359
360 fn take_transform_consumer(&mut self, component_id: &ComponentId) -> Option<EventsConsumer> {
361 self.transform_consumers
362 .remove(component_id)
363 .map(|(_, consumer)| consumer)
364 }
365
366 fn take_destination_consumer(&mut self, component_id: &ComponentId) -> Option<EventsConsumer> {
367 self.destination_consumers
368 .remove(component_id)
369 .map(|(_, consumer)| consumer)
370 }
371
372 fn take_encoder_consumer(&mut self, component_id: &ComponentId) -> Option<EventsConsumer> {
373 self.encoder_consumers
374 .remove(component_id)
375 .map(|(_, consumer)| consumer)
376 }
377
378 fn take_forwarder_consumer(&mut self, component_id: &ComponentId) -> Option<PayloadsConsumer> {
379 self.forwarder_consumers
380 .remove(component_id)
381 .map(|(_, consumer)| consumer)
382 }
383
384 fn generate_interconnects(&mut self, graph: &Graph) -> Result<(), GenericError> {
385 let outbound_edges = graph.get_outbound_directed_edges();
390 for (upstream_id, output_map) in outbound_edges {
391 match upstream_id.component_type() {
392 ComponentType::Source | ComponentType::Transform => {
393 self.generate_event_interconnect(upstream_id, output_map)?;
394 }
395 ComponentType::Relay | ComponentType::Encoder => {
396 self.generate_payload_interconnect(upstream_id, output_map)?
397 }
398 _ => panic!(
399 "Only sources, transforms, relays, and encoders can dispatch events/payloads to downstream components."
400 ),
401 }
402 }
403
404 Ok(())
405 }
406
407 fn generate_event_interconnect(
408 &mut self, upstream_id: TypedComponentId, output_map: HashMap<OutputName, Vec<TypedComponentId>>,
409 ) -> Result<(), GenericError> {
410 for (upstream_output_id, downstream_ids) in output_map {
411 let mut senders = Vec::new();
412 for downstream_id in downstream_ids {
413 debug!(upstream_id = %upstream_id.component_id(), %upstream_output_id, downstream_id = %downstream_id.component_id(), "Adding dispatcher output.");
414 let sender = self.get_or_create_events_sender(downstream_id);
415 senders.push(sender);
416 }
417
418 let dispatcher = self.get_or_create_events_dispatcher(upstream_id.clone());
419 dispatcher.add_output(upstream_output_id.clone())?;
420
421 for sender in senders {
422 dispatcher.attach_sender_to_output(&upstream_output_id, sender)?;
423 }
424 }
425
426 Ok(())
427 }
428
429 fn generate_payload_interconnect(
430 &mut self, upstream_id: TypedComponentId, output_map: HashMap<OutputName, Vec<TypedComponentId>>,
431 ) -> Result<(), GenericError> {
432 for (upstream_output_id, downstream_ids) in output_map {
433 let mut senders = Vec::new();
434 for downstream_id in downstream_ids {
435 debug!(upstream_id = %upstream_id.component_id(), %upstream_output_id, downstream_id = %downstream_id.component_id(), "Adding dispatcher output.");
436 let sender = self.get_or_create_payloads_sender(downstream_id);
437 senders.push(sender);
438 }
439
440 let dispatcher = self.get_or_create_payloads_dispatcher(upstream_id.clone());
441 dispatcher.add_output(upstream_output_id.clone())?;
442
443 for sender in senders {
444 dispatcher.attach_sender_to_output(&upstream_output_id, sender)?;
445 }
446 }
447
448 Ok(())
449 }
450
451 fn get_or_create_events_dispatcher(&mut self, component_id: TypedComponentId) -> &mut EventsDispatcher {
452 let (component_id, component_type, component_context) = component_id.into_parts();
453
454 match component_type {
455 ComponentType::Source => self
456 .source_dispatchers
457 .entry(component_id)
458 .or_insert_with(|| EventsDispatcher::new(component_context)),
459 ComponentType::Transform => self
460 .transform_dispatchers
461 .entry(component_id)
462 .or_insert_with(|| EventsDispatcher::new(component_context)),
463 _ => {
464 panic!("Only sources and transforms can dispatch events to downstream components.")
465 }
466 }
467 }
468
469 fn get_or_create_events_sender(&mut self, component_id: TypedComponentId) -> mpsc::Sender<EventsBuffer> {
470 let (component_id, component_type, component_context) = component_id.into_parts();
471 let interconnect_capacity = self.interconnect_capacity;
472
473 let (sender, _) = match component_type {
474 ComponentType::Transform => self
475 .transform_consumers
476 .entry(component_id)
477 .or_insert_with(|| build_events_consumer_pair(component_context, interconnect_capacity)),
478 ComponentType::Destination => self
479 .destination_consumers
480 .entry(component_id)
481 .or_insert_with(|| build_events_consumer_pair(component_context, interconnect_capacity)),
482 ComponentType::Encoder => self
483 .encoder_consumers
484 .entry(component_id)
485 .or_insert_with(|| build_events_consumer_pair(component_context, interconnect_capacity)),
486 _ => panic!("Only transforms, destinations, and encoders can consume events."),
487 };
488
489 sender.clone()
490 }
491
492 fn get_or_create_payloads_dispatcher(&mut self, component_id: TypedComponentId) -> &mut PayloadsDispatcher {
493 let (component_id, component_type, component_context) = component_id.into_parts();
494
495 match component_type {
496 ComponentType::Relay => self
497 .relay_dispatchers
498 .entry(component_id)
499 .or_insert_with(|| PayloadsDispatcher::new(component_context)),
500 ComponentType::Encoder => self
501 .encoder_dispatchers
502 .entry(component_id)
503 .or_insert_with(|| PayloadsDispatcher::new(component_context)),
504 _ => {
505 panic!("Only relays and encoders can dispatch payloads to downstream components.")
506 }
507 }
508 }
509
510 fn get_or_create_payloads_sender(&mut self, component_id: TypedComponentId) -> mpsc::Sender<PayloadsBuffer> {
511 let (component_id, component_type, component_context) = component_id.into_parts();
512 let interconnect_capacity = self.interconnect_capacity;
513
514 let (sender, _) = match component_type {
515 ComponentType::Forwarder => self
516 .forwarder_consumers
517 .entry(component_id)
518 .or_insert_with(|| build_payloads_consumer_pair(component_context, interconnect_capacity)),
519 _ => panic!("Only forwarders can consume payloads."),
520 };
521
522 sender.clone()
523 }
524}
525
526fn build_events_consumer_pair(
527 component_context: ComponentContext, interconnect_capacity: NonZeroUsize,
528) -> (mpsc::Sender<EventsBuffer>, EventsConsumer) {
529 let (sender, receiver) = mpsc::channel(interconnect_capacity.get());
530 let consumer = EventsConsumer::new(component_context, receiver);
531 (sender, consumer)
532}
533
534fn build_payloads_consumer_pair(
535 component_context: ComponentContext, interconnect_capacity: NonZeroUsize,
536) -> (mpsc::Sender<PayloadsBuffer>, PayloadsConsumer) {
537 let (sender, receiver) = mpsc::channel(interconnect_capacity.get());
538 let consumer = PayloadsConsumer::new(component_context, receiver);
539 (sender, consumer)
540}
541
542fn spawn_component<F>(
543 join_set: &mut JoinSet<Result<(), GenericError>>, context: ComponentContext,
544 allocation_group_token: AllocationGroupToken, component_future: F,
545) -> AbortHandle
546where
547 F: Future<Output = Result<(), GenericError>> + Send + 'static,
548{
549 let component_span = error_span!(
550 "component",
551 "type" = context.component_type().as_str(),
552 id = %context.component_id(),
553 );
554
555 let _span = component_span.enter();
556 let _guard = allocation_group_token.enter();
557
558 let component_task_name = format!(
559 "topology-{}-{}",
560 context.component_type().as_str(),
561 context.component_id()
562 );
563 join_set.spawn_traced_named(component_task_name, component_future)
564}
565
566#[cfg(test)]
567mod tests {
568 use std::num::NonZeroUsize;
569
570 use super::*;
571 use crate::data_model::event::EventType;
572 use crate::data_model::payload::PayloadType;
573 use crate::topology::graph::Graph;
574
575 #[test]
576 fn component_interconnects_adds_output_before_attaching() {
577 let mut graph = Graph::default();
578
579 graph
581 .with_source("source1", EventType::EventD)
582 .with_transform("transform1", EventType::EventD, EventType::EventD)
583 .with_encoder("encoder1", EventType::EventD, PayloadType::Raw)
584 .with_forwarder("forwarder1", PayloadType::Raw)
585 .with_destination("dest1", EventType::EventD)
586 .with_edge("source1", "transform1")
587 .with_edge("transform1", "encoder1")
588 .with_edge("encoder1", "forwarder1")
589 .with_edge("transform1", "dest1");
590
591 let interconnect_capacity = NonZeroUsize::new(10).unwrap();
594 let _ = ComponentInterconnects::from_graph(interconnect_capacity, &graph)
595 .expect("should build interconnects successfully");
596 }
597}