1use std::{collections::HashMap, num::NonZeroUsize};
2
3use memory_accounting::{allocator::Track as _, ComponentRegistry, UsageExpr};
4use saluki_error::{ErrorContext as _, GenericError};
5use snafu::Snafu;
6
7use super::{
8 built::BuiltTopology,
9 graph::{Graph, GraphError},
10 ComponentId, RegisteredComponent,
11};
12use crate::{
13 components::{
14 decoders::DecoderBuilder, destinations::DestinationBuilder, encoders::EncoderBuilder,
15 forwarders::ForwarderBuilder, relays::RelayBuilder, sources::SourceBuilder, transforms::TransformBuilder,
16 ComponentContext,
17 },
18 data_model::event::Event,
19 topology::{EventsBuffer, DEFAULT_EVENTS_BUFFER_CAPACITY},
20};
21
22#[derive(Debug, Snafu)]
24#[snafu(context(suffix(false)))]
25pub enum BlueprintError {
26 #[snafu(display("Failed to build/validate topology graph: {}", source))]
28 InvalidGraph {
29 source: GraphError,
31 },
32
33 #[snafu(display("Failed to build component '{}': {}", id, source))]
35 FailedToBuildComponent {
36 id: ComponentId,
38
39 source: GenericError,
41 },
42}
43
44pub struct TopologyBlueprint {
46 name: String,
47 graph: Graph,
48 sources: HashMap<ComponentId, RegisteredComponent<Box<dyn SourceBuilder + Send>>>,
49 relays: HashMap<ComponentId, RegisteredComponent<Box<dyn RelayBuilder + Send>>>,
50 decoders: HashMap<ComponentId, RegisteredComponent<Box<dyn DecoderBuilder + Send>>>,
51 transforms: HashMap<ComponentId, RegisteredComponent<Box<dyn TransformBuilder + Send>>>,
52 destinations: HashMap<ComponentId, RegisteredComponent<Box<dyn DestinationBuilder + Send>>>,
53 encoders: HashMap<ComponentId, RegisteredComponent<Box<dyn EncoderBuilder + Send>>>,
54 forwarders: HashMap<ComponentId, RegisteredComponent<Box<dyn ForwarderBuilder + Send>>>,
55 component_registry: ComponentRegistry,
56 interconnect_capacity: NonZeroUsize,
57}
58
59impl TopologyBlueprint {
60 pub fn new(name: &str, component_registry: &ComponentRegistry) -> Self {
62 let component_registry = component_registry.get_or_create("topology").get_or_create(name);
64
65 Self {
66 name: name.to_string(),
67 graph: Graph::default(),
68 sources: HashMap::new(),
69 relays: HashMap::new(),
70 decoders: HashMap::new(),
71 transforms: HashMap::new(),
72 destinations: HashMap::new(),
73 encoders: HashMap::new(),
74 forwarders: HashMap::new(),
75 component_registry,
76 interconnect_capacity: super::DEFAULT_INTERCONNECT_CAPACITY,
77 }
78 }
79
80 pub fn with_interconnect_capacity(&mut self, capacity: NonZeroUsize) -> &mut Self {
88 self.interconnect_capacity = capacity;
89 self.recalculate_bounds();
90 self
91 }
92
93 fn recalculate_bounds(&mut self) {
94 let interconnect_capacity = self.interconnect_capacity.get();
95
96 let mut bounds_builder = self.component_registry.bounds_builder();
97 let mut bounds_builder = bounds_builder.subcomponent("interconnects");
98 bounds_builder.reset();
99
100 let total_interconnect_capacity = interconnect_capacity * (self.transforms.len() + self.destinations.len());
105 bounds_builder
106 .minimum()
107 .with_array::<EventsBuffer>("events", total_interconnect_capacity);
108
109 let max_in_flight_event_buffers = ((self.transforms.len() + self.destinations.len()) * interconnect_capacity)
120 + self.sources.len()
121 + self.decoders.len()
122 + self.transforms.len();
123
124 bounds_builder
125 .firm()
126 .with_expr(UsageExpr::product(
128 "events",
129 UsageExpr::constant("max in-flight event buffers", max_in_flight_event_buffers),
130 UsageExpr::sum(
131 "",
132 UsageExpr::struct_size::<EventsBuffer>("events buffer"),
133 UsageExpr::product(
134 "",
135 UsageExpr::struct_size::<Event>("event"),
136 UsageExpr::constant("default event buffer capacity", DEFAULT_EVENTS_BUFFER_CAPACITY),
137 ),
138 ),
139 ));
140 }
141
142 pub fn add_source<I, B>(&mut self, component_id: I, builder: B) -> Result<&mut Self, GenericError>
148 where
149 I: AsRef<str>,
150 B: SourceBuilder + Send + 'static,
151 {
152 let component_id = self
153 .graph
154 .add_source(component_id, &builder)
155 .error_context("Failed to add source to topology graph.")?;
156
157 let mut source_registry = self
158 .component_registry
159 .get_or_create(format!("components.sources.{}", component_id));
160 let mut bounds_builder = source_registry.bounds_builder();
161 builder.specify_bounds(&mut bounds_builder);
162
163 self.recalculate_bounds();
164
165 let _ = self.sources.insert(
166 component_id,
167 RegisteredComponent::new(Box::new(builder), source_registry),
168 );
169
170 Ok(self)
171 }
172
173 pub fn add_relay<I, B>(&mut self, component_id: I, builder: B) -> Result<&mut Self, GenericError>
179 where
180 I: AsRef<str>,
181 B: RelayBuilder + Send + 'static,
182 {
183 let component_id = self
184 .graph
185 .add_relay(component_id, &builder)
186 .error_context("Failed to add relay to topology graph.")?;
187
188 let mut relay_registry = self
189 .component_registry
190 .get_or_create(format!("components.relays.{}", component_id));
191 let mut bounds_builder = relay_registry.bounds_builder();
192 builder.specify_bounds(&mut bounds_builder);
193
194 self.recalculate_bounds();
195
196 let _ = self.relays.insert(
197 component_id,
198 RegisteredComponent::new(Box::new(builder), relay_registry),
199 );
200
201 Ok(self)
202 }
203
204 pub fn add_decoder<I, B>(&mut self, component_id: I, builder: B) -> Result<&mut Self, GenericError>
210 where
211 I: AsRef<str>,
212 B: DecoderBuilder + Send + 'static,
213 {
214 let component_id = self
215 .graph
216 .add_decoder(component_id, &builder)
217 .error_context("Failed to add decoder to topology graph.")?;
218
219 let mut decoder_registry = self
220 .component_registry
221 .get_or_create(format!("components.decoders.{}", component_id));
222 let mut bounds_builder = decoder_registry.bounds_builder();
223 builder.specify_bounds(&mut bounds_builder);
224
225 self.recalculate_bounds();
226
227 let _ = self.decoders.insert(
228 component_id,
229 RegisteredComponent::new(Box::new(builder), decoder_registry),
230 );
231
232 Ok(self)
233 }
234
235 pub fn add_transform<I, B>(&mut self, component_id: I, builder: B) -> Result<&mut Self, GenericError>
241 where
242 I: AsRef<str>,
243 B: TransformBuilder + Send + 'static,
244 {
245 let component_id = self
246 .graph
247 .add_transform(component_id, &builder)
248 .error_context("Failed to add transform to topology graph.")?;
249
250 let mut transform_registry = self
251 .component_registry
252 .get_or_create(format!("components.transforms.{}", component_id));
253 let mut bounds_builder = transform_registry.bounds_builder();
254 builder.specify_bounds(&mut bounds_builder);
255
256 self.recalculate_bounds();
257
258 let _ = self.transforms.insert(
259 component_id,
260 RegisteredComponent::new(Box::new(builder), transform_registry),
261 );
262
263 Ok(self)
264 }
265
266 pub fn add_destination<I, B>(&mut self, component_id: I, builder: B) -> Result<&mut Self, GenericError>
272 where
273 I: AsRef<str>,
274 B: DestinationBuilder + Send + 'static,
275 {
276 let component_id = self
277 .graph
278 .add_destination(component_id, &builder)
279 .error_context("Failed to add destination to topology graph.")?;
280
281 let mut destination_registry = self
282 .component_registry
283 .get_or_create(format!("components.destinations.{}", component_id));
284 let mut bounds_builder = destination_registry.bounds_builder();
285 builder.specify_bounds(&mut bounds_builder);
286
287 self.recalculate_bounds();
288
289 let _ = self.destinations.insert(
290 component_id,
291 RegisteredComponent::new(Box::new(builder), destination_registry),
292 );
293
294 Ok(self)
295 }
296
297 pub fn add_encoder<I, B>(&mut self, component_id: I, builder: B) -> Result<&mut Self, GenericError>
303 where
304 I: AsRef<str>,
305 B: EncoderBuilder + Send + 'static,
306 {
307 let component_id = self
308 .graph
309 .add_encoder(component_id, &builder)
310 .error_context("Failed to add encoder to topology graph.")?;
311
312 let mut encoder_registry = self
313 .component_registry
314 .get_or_create(format!("components.encoders.{}", component_id));
315 let mut bounds_builder = encoder_registry.bounds_builder();
316 builder.specify_bounds(&mut bounds_builder);
317
318 self.recalculate_bounds();
319
320 let _ = self.encoders.insert(
321 component_id,
322 RegisteredComponent::new(Box::new(builder), encoder_registry),
323 );
324
325 Ok(self)
326 }
327
328 pub fn add_forwarder<I, B>(&mut self, component_id: I, builder: B) -> Result<&mut Self, GenericError>
334 where
335 I: AsRef<str>,
336 B: ForwarderBuilder + Send + 'static,
337 {
338 let component_id = self
339 .graph
340 .add_forwarder(component_id, &builder)
341 .error_context("Failed to add forwarder to topology graph.")?;
342
343 let mut forwarder_registry = self
344 .component_registry
345 .get_or_create(format!("components.forwarders.{}", component_id));
346 let mut bounds_builder = forwarder_registry.bounds_builder();
347 builder.specify_bounds(&mut bounds_builder);
348
349 self.recalculate_bounds();
350
351 let _ = self.forwarders.insert(
352 component_id,
353 RegisteredComponent::new(Box::new(builder), forwarder_registry),
354 );
355
356 Ok(self)
357 }
358
359 pub fn connect_component<DI, SI, I>(
366 &mut self, destination_component_id: DI, source_output_component_ids: SI,
367 ) -> Result<&mut Self, GenericError>
368 where
369 DI: AsRef<str>,
370 SI: IntoIterator<Item = I>,
371 I: AsRef<str>,
372 {
373 for source_output_component_id in source_output_component_ids.into_iter() {
374 self.graph
375 .add_edge(source_output_component_id, destination_component_id.as_ref())
376 .error_context("Failed to add component connection to topology graph.")?;
377 }
378
379 Ok(self)
380 }
381
382 pub async fn build(mut self) -> Result<BuiltTopology, GenericError> {
388 self.graph.validate().error_context("Failed to build topology graph.")?;
389
390 let mut sources = HashMap::new();
391 for (id, builder) in self.sources {
392 let (builder, mut component_registry) = builder.into_parts();
393 let allocation_token = component_registry.token();
394
395 let component_context = ComponentContext::source(id.clone());
396 let source = builder
397 .build(component_context)
398 .track_allocations(allocation_token)
399 .await
400 .with_error_context(|| format!("Failed to build source '{}'.", id))?;
401
402 sources.insert(
403 id,
404 RegisteredComponent::new(source.track_allocations(allocation_token), component_registry),
405 );
406 }
407
408 let mut relays = HashMap::new();
409 for (id, builder) in self.relays {
410 let (builder, mut component_registry) = builder.into_parts();
411 let allocation_token = component_registry.token();
412
413 let component_context = ComponentContext::relay(id.clone());
414 let relay = builder
415 .build(component_context)
416 .track_allocations(allocation_token)
417 .await
418 .with_error_context(|| format!("Failed to build relay '{}'.", id))?;
419
420 relays.insert(
421 id,
422 RegisteredComponent::new(relay.track_allocations(allocation_token), component_registry),
423 );
424 }
425
426 let mut decoders = HashMap::new();
427 for (id, builder) in self.decoders {
428 let (builder, mut component_registry) = builder.into_parts();
429 let allocation_token = component_registry.token();
430
431 let component_context = ComponentContext::decoder(id.clone());
432 let decoder = builder
433 .build(component_context)
434 .track_allocations(allocation_token)
435 .await
436 .with_error_context(|| format!("Failed to build decoder '{}'.", id))?;
437
438 decoders.insert(
439 id,
440 RegisteredComponent::new(decoder.track_allocations(allocation_token), component_registry),
441 );
442 }
443
444 let mut transforms = HashMap::new();
445 for (id, builder) in self.transforms {
446 let (builder, mut component_registry) = builder.into_parts();
447 let allocation_token = component_registry.token();
448
449 let component_context = ComponentContext::transform(id.clone());
450 let transform = builder
451 .build(component_context)
452 .track_allocations(allocation_token)
453 .await
454 .with_error_context(|| format!("Failed to build transform '{}'.", id))?;
455
456 transforms.insert(
457 id,
458 RegisteredComponent::new(transform.track_allocations(allocation_token), component_registry),
459 );
460 }
461
462 let mut destinations = HashMap::new();
463 for (id, builder) in self.destinations {
464 let (builder, mut component_registry) = builder.into_parts();
465 let allocation_token = component_registry.token();
466
467 let component_context = ComponentContext::destination(id.clone());
468 let destination = builder
469 .build(component_context)
470 .track_allocations(allocation_token)
471 .await
472 .with_error_context(|| format!("Failed to build destination '{}'.", id))?;
473
474 destinations.insert(
475 id,
476 RegisteredComponent::new(destination.track_allocations(allocation_token), component_registry),
477 );
478 }
479
480 let mut encoders = HashMap::new();
481 for (id, builder) in self.encoders {
482 let (builder, mut component_registry) = builder.into_parts();
483 let allocation_token = component_registry.token();
484
485 let component_context = ComponentContext::encoder(id.clone());
486 let encoder = builder
487 .build(component_context)
488 .track_allocations(allocation_token)
489 .await
490 .with_error_context(|| format!("Failed to build encoder '{}'.", id))?;
491
492 encoders.insert(
493 id,
494 RegisteredComponent::new(encoder.track_allocations(allocation_token), component_registry),
495 );
496 }
497
498 let mut forwarders = HashMap::new();
499 for (id, builder) in self.forwarders {
500 let (builder, mut component_registry) = builder.into_parts();
501 let allocation_token = component_registry.token();
502
503 let component_context = ComponentContext::forwarder(id.clone());
504 let forwarder = builder
505 .build(component_context)
506 .track_allocations(allocation_token)
507 .await
508 .with_error_context(|| format!("Failed to build forwarder '{}'.", id))?;
509
510 forwarders.insert(
511 id,
512 RegisteredComponent::new(forwarder.track_allocations(allocation_token), component_registry),
513 );
514 }
515
516 Ok(BuiltTopology::from_parts(
517 self.name,
518 self.graph,
519 sources,
520 relays,
521 decoders,
522 transforms,
523 destinations,
524 encoders,
525 forwarders,
526 self.component_registry.token(),
527 self.interconnect_capacity,
528 ))
529 }
530}