1use std::{collections::HashMap, num::NonZeroUsize};
2
3use memory_accounting::{allocator::Track as _, ComponentRegistry, UsageExpr};
4use saluki_error::{ErrorContext as _, GenericError};
5use snafu::Snafu;
6
7use super::{
8 built::BuiltTopology,
9 graph::{Graph, GraphError},
10 ComponentId, RegisteredComponent,
11};
12use crate::{
13 components::{
14 destinations::DestinationBuilder, encoders::EncoderBuilder, forwarders::ForwarderBuilder, relays::RelayBuilder,
15 sources::SourceBuilder, transforms::TransformBuilder, ComponentContext,
16 },
17 data_model::event::Event,
18 topology::{EventsBuffer, DEFAULT_EVENTS_BUFFER_CAPACITY},
19};
20
21#[derive(Debug, Snafu)]
23#[snafu(context(suffix(false)))]
24pub enum BlueprintError {
25 #[snafu(display("Failed to build/validate topology graph: {}", source))]
27 InvalidGraph {
28 source: GraphError,
30 },
31
32 #[snafu(display("Failed to build component '{}': {}", id, source))]
34 FailedToBuildComponent {
35 id: ComponentId,
37
38 source: GenericError,
40 },
41}
42
43pub struct TopologyBlueprint {
45 name: String,
46 graph: Graph,
47 sources: HashMap<ComponentId, RegisteredComponent<Box<dyn SourceBuilder + Send>>>,
48 relays: HashMap<ComponentId, RegisteredComponent<Box<dyn RelayBuilder + Send>>>,
49 transforms: HashMap<ComponentId, RegisteredComponent<Box<dyn TransformBuilder + Send>>>,
50 destinations: HashMap<ComponentId, RegisteredComponent<Box<dyn DestinationBuilder + Send>>>,
51 encoders: HashMap<ComponentId, RegisteredComponent<Box<dyn EncoderBuilder + Send>>>,
52 forwarders: HashMap<ComponentId, RegisteredComponent<Box<dyn ForwarderBuilder + Send>>>,
53 component_registry: ComponentRegistry,
54 interconnect_capacity: NonZeroUsize,
55}
56
57impl TopologyBlueprint {
58 pub fn new(name: &str, component_registry: &ComponentRegistry) -> Self {
60 let component_registry = component_registry.get_or_create("topology").get_or_create(name);
62
63 Self {
64 name: name.to_string(),
65 graph: Graph::default(),
66 sources: HashMap::new(),
67 relays: HashMap::new(),
68 transforms: HashMap::new(),
69 destinations: HashMap::new(),
70 encoders: HashMap::new(),
71 forwarders: HashMap::new(),
72 component_registry,
73 interconnect_capacity: super::DEFAULT_INTERCONNECT_CAPACITY,
74 }
75 }
76
77 pub fn with_interconnect_capacity(&mut self, capacity: NonZeroUsize) -> &mut Self {
85 self.interconnect_capacity = capacity;
86 self.recalculate_bounds();
87 self
88 }
89
90 fn recalculate_bounds(&mut self) {
91 let interconnect_capacity = self.interconnect_capacity.get();
92
93 let mut bounds_builder = self.component_registry.bounds_builder();
94 let mut bounds_builder = bounds_builder.subcomponent("interconnects");
95 bounds_builder.reset();
96
97 let total_interconnect_capacity = interconnect_capacity * (self.transforms.len() + self.destinations.len());
102 bounds_builder
103 .minimum()
104 .with_array::<EventsBuffer>("events", total_interconnect_capacity);
105
106 let max_in_flight_event_buffers = ((self.transforms.len() + self.destinations.len()) * interconnect_capacity)
117 + self.sources.len()
118 + self.transforms.len();
119
120 bounds_builder
121 .firm()
122 .with_expr(UsageExpr::product(
124 "events",
125 UsageExpr::constant("max in-flight event buffers", max_in_flight_event_buffers),
126 UsageExpr::sum(
127 "",
128 UsageExpr::struct_size::<EventsBuffer>("events buffer"),
129 UsageExpr::product(
130 "",
131 UsageExpr::struct_size::<Event>("event"),
132 UsageExpr::constant("default event buffer capacity", DEFAULT_EVENTS_BUFFER_CAPACITY),
133 ),
134 ),
135 ));
136 }
137
138 pub fn add_source<I, B>(&mut self, component_id: I, builder: B) -> Result<&mut Self, GenericError>
144 where
145 I: AsRef<str>,
146 B: SourceBuilder + Send + 'static,
147 {
148 let component_id = self
149 .graph
150 .add_source(component_id, &builder)
151 .error_context("Failed to add source to topology graph.")?;
152
153 let mut source_registry = self
154 .component_registry
155 .get_or_create(format!("components.sources.{}", component_id));
156 let mut bounds_builder = source_registry.bounds_builder();
157 builder.specify_bounds(&mut bounds_builder);
158
159 self.recalculate_bounds();
160
161 let _ = self.sources.insert(
162 component_id,
163 RegisteredComponent::new(Box::new(builder), source_registry),
164 );
165
166 Ok(self)
167 }
168
169 pub fn add_relay<I, B>(&mut self, component_id: I, builder: B) -> Result<&mut Self, GenericError>
175 where
176 I: AsRef<str>,
177 B: RelayBuilder + Send + 'static,
178 {
179 let component_id = self
180 .graph
181 .add_relay(component_id, &builder)
182 .error_context("Failed to add relay to topology graph.")?;
183
184 let mut relay_registry = self
185 .component_registry
186 .get_or_create(format!("components.relays.{}", component_id));
187 let mut bounds_builder = relay_registry.bounds_builder();
188 builder.specify_bounds(&mut bounds_builder);
189
190 self.recalculate_bounds();
191
192 let _ = self.relays.insert(
193 component_id,
194 RegisteredComponent::new(Box::new(builder), relay_registry),
195 );
196
197 Ok(self)
198 }
199
200 pub fn add_transform<I, B>(&mut self, component_id: I, builder: B) -> Result<&mut Self, GenericError>
206 where
207 I: AsRef<str>,
208 B: TransformBuilder + Send + 'static,
209 {
210 let component_id = self
211 .graph
212 .add_transform(component_id, &builder)
213 .error_context("Failed to add transform to topology graph.")?;
214
215 let mut transform_registry = self
216 .component_registry
217 .get_or_create(format!("components.transforms.{}", component_id));
218 let mut bounds_builder = transform_registry.bounds_builder();
219 builder.specify_bounds(&mut bounds_builder);
220
221 self.recalculate_bounds();
222
223 let _ = self.transforms.insert(
224 component_id,
225 RegisteredComponent::new(Box::new(builder), transform_registry),
226 );
227
228 Ok(self)
229 }
230
231 pub fn add_destination<I, B>(&mut self, component_id: I, builder: B) -> Result<&mut Self, GenericError>
237 where
238 I: AsRef<str>,
239 B: DestinationBuilder + Send + 'static,
240 {
241 let component_id = self
242 .graph
243 .add_destination(component_id, &builder)
244 .error_context("Failed to add destination to topology graph.")?;
245
246 let mut destination_registry = self
247 .component_registry
248 .get_or_create(format!("components.destinations.{}", component_id));
249 let mut bounds_builder = destination_registry.bounds_builder();
250 builder.specify_bounds(&mut bounds_builder);
251
252 self.recalculate_bounds();
253
254 let _ = self.destinations.insert(
255 component_id,
256 RegisteredComponent::new(Box::new(builder), destination_registry),
257 );
258
259 Ok(self)
260 }
261
262 pub fn add_encoder<I, B>(&mut self, component_id: I, builder: B) -> Result<&mut Self, GenericError>
268 where
269 I: AsRef<str>,
270 B: EncoderBuilder + Send + 'static,
271 {
272 let component_id = self
273 .graph
274 .add_encoder(component_id, &builder)
275 .error_context("Failed to add encoder to topology graph.")?;
276
277 let mut encoder_registry = self
278 .component_registry
279 .get_or_create(format!("components.encoders.{}", component_id));
280 let mut bounds_builder = encoder_registry.bounds_builder();
281 builder.specify_bounds(&mut bounds_builder);
282
283 self.recalculate_bounds();
284
285 let _ = self.encoders.insert(
286 component_id,
287 RegisteredComponent::new(Box::new(builder), encoder_registry),
288 );
289
290 Ok(self)
291 }
292
293 pub fn add_forwarder<I, B>(&mut self, component_id: I, builder: B) -> Result<&mut Self, GenericError>
299 where
300 I: AsRef<str>,
301 B: ForwarderBuilder + Send + 'static,
302 {
303 let component_id = self
304 .graph
305 .add_forwarder(component_id, &builder)
306 .error_context("Failed to add forwarder to topology graph.")?;
307
308 let mut forwarder_registry = self
309 .component_registry
310 .get_or_create(format!("components.forwarders.{}", component_id));
311 let mut bounds_builder = forwarder_registry.bounds_builder();
312 builder.specify_bounds(&mut bounds_builder);
313
314 self.recalculate_bounds();
315
316 let _ = self.forwarders.insert(
317 component_id,
318 RegisteredComponent::new(Box::new(builder), forwarder_registry),
319 );
320
321 Ok(self)
322 }
323
324 pub fn connect_component<DI, SI, I>(
331 &mut self, destination_component_id: DI, source_output_component_ids: SI,
332 ) -> Result<&mut Self, GenericError>
333 where
334 DI: AsRef<str>,
335 SI: IntoIterator<Item = I>,
336 I: AsRef<str>,
337 {
338 for source_output_component_id in source_output_component_ids.into_iter() {
339 self.graph
340 .add_edge(source_output_component_id, destination_component_id.as_ref())
341 .error_context("Failed to add component connection to topology graph.")?;
342 }
343
344 Ok(self)
345 }
346
347 pub async fn build(mut self) -> Result<BuiltTopology, GenericError> {
353 self.graph.validate().error_context("Failed to build topology graph.")?;
354
355 let mut sources = HashMap::new();
356 for (id, builder) in self.sources {
357 let (builder, mut component_registry) = builder.into_parts();
358 let allocation_token = component_registry.token();
359
360 let component_context = ComponentContext::source(id.clone());
361 let source = builder
362 .build(component_context)
363 .track_allocations(allocation_token)
364 .await
365 .with_error_context(|| format!("Failed to build source '{}'.", id))?;
366
367 sources.insert(
368 id,
369 RegisteredComponent::new(source.track_allocations(allocation_token), component_registry),
370 );
371 }
372
373 let mut relays = HashMap::new();
374 for (id, builder) in self.relays {
375 let (builder, mut component_registry) = builder.into_parts();
376 let allocation_token = component_registry.token();
377
378 let component_context = ComponentContext::relay(id.clone());
379 let relay = builder
380 .build(component_context)
381 .track_allocations(allocation_token)
382 .await
383 .with_error_context(|| format!("Failed to build relay '{}'.", id))?;
384
385 relays.insert(
386 id,
387 RegisteredComponent::new(relay.track_allocations(allocation_token), component_registry),
388 );
389 }
390
391 let mut transforms = HashMap::new();
392 for (id, builder) in self.transforms {
393 let (builder, mut component_registry) = builder.into_parts();
394 let allocation_token = component_registry.token();
395
396 let component_context = ComponentContext::transform(id.clone());
397 let transform = builder
398 .build(component_context)
399 .track_allocations(allocation_token)
400 .await
401 .with_error_context(|| format!("Failed to build transform '{}'.", id))?;
402
403 transforms.insert(
404 id,
405 RegisteredComponent::new(transform.track_allocations(allocation_token), component_registry),
406 );
407 }
408
409 let mut destinations = HashMap::new();
410 for (id, builder) in self.destinations {
411 let (builder, mut component_registry) = builder.into_parts();
412 let allocation_token = component_registry.token();
413
414 let component_context = ComponentContext::destination(id.clone());
415 let destination = builder
416 .build(component_context)
417 .track_allocations(allocation_token)
418 .await
419 .with_error_context(|| format!("Failed to build destination '{}'.", id))?;
420
421 destinations.insert(
422 id,
423 RegisteredComponent::new(destination.track_allocations(allocation_token), component_registry),
424 );
425 }
426
427 let mut encoders = HashMap::new();
428 for (id, builder) in self.encoders {
429 let (builder, mut component_registry) = builder.into_parts();
430 let allocation_token = component_registry.token();
431
432 let component_context = ComponentContext::encoder(id.clone());
433 let encoder = builder
434 .build(component_context)
435 .track_allocations(allocation_token)
436 .await
437 .with_error_context(|| format!("Failed to build encoder '{}'.", id))?;
438
439 encoders.insert(
440 id,
441 RegisteredComponent::new(encoder.track_allocations(allocation_token), component_registry),
442 );
443 }
444
445 let mut forwarders = HashMap::new();
446 for (id, builder) in self.forwarders {
447 let (builder, mut component_registry) = builder.into_parts();
448 let allocation_token = component_registry.token();
449
450 let component_context = ComponentContext::forwarder(id.clone());
451 let forwarder = builder
452 .build(component_context)
453 .track_allocations(allocation_token)
454 .await
455 .with_error_context(|| format!("Failed to build forwarder '{}'.", id))?;
456
457 forwarders.insert(
458 id,
459 RegisteredComponent::new(forwarder.track_allocations(allocation_token), component_registry),
460 );
461 }
462
463 Ok(BuiltTopology::from_parts(
464 self.name,
465 self.graph,
466 sources,
467 relays,
468 transforms,
469 destinations,
470 encoders,
471 forwarders,
472 self.component_registry.token(),
473 self.interconnect_capacity,
474 ))
475 }
476}