1use std::{collections::HashMap, num::NonZeroUsize};
2
3use memory_accounting::{allocator::Track as _, ComponentRegistry, UsageExpr};
4use saluki_error::{ErrorContext as _, GenericError};
5use snafu::Snafu;
6
7use super::{
8 built::BuiltTopology,
9 graph::{Graph, GraphError},
10 ComponentId, RegisteredComponent,
11};
12use crate::{
13 components::{
14 destinations::DestinationBuilder, encoders::EncoderBuilder, forwarders::ForwarderBuilder,
15 sources::SourceBuilder, transforms::TransformBuilder, ComponentContext,
16 },
17 data_model::event::Event,
18 topology::{EventsBuffer, DEFAULT_EVENTS_BUFFER_CAPACITY},
19};
20
21#[derive(Debug, Snafu)]
23#[snafu(context(suffix(false)))]
24pub enum BlueprintError {
25 #[snafu(display("Failed to build/validate topology graph: {}", source))]
27 InvalidGraph {
28 source: GraphError,
30 },
31
32 #[snafu(display("Failed to build component '{}': {}", id, source))]
34 FailedToBuildComponent {
35 id: ComponentId,
37
38 source: GenericError,
40 },
41}
42
43pub struct TopologyBlueprint {
45 name: String,
46 graph: Graph,
47 sources: HashMap<ComponentId, RegisteredComponent<Box<dyn SourceBuilder + Send>>>,
48 transforms: HashMap<ComponentId, RegisteredComponent<Box<dyn TransformBuilder + Send>>>,
49 destinations: HashMap<ComponentId, RegisteredComponent<Box<dyn DestinationBuilder + Send>>>,
50 encoders: HashMap<ComponentId, RegisteredComponent<Box<dyn EncoderBuilder + Send>>>,
51 forwarders: HashMap<ComponentId, RegisteredComponent<Box<dyn ForwarderBuilder + Send>>>,
52 component_registry: ComponentRegistry,
53 interconnect_capacity: NonZeroUsize,
54}
55
56impl TopologyBlueprint {
57 pub fn new(name: &str, component_registry: &ComponentRegistry) -> Self {
59 let component_registry = component_registry.get_or_create("topology").get_or_create(name);
61
62 Self {
63 name: name.to_string(),
64 graph: Graph::default(),
65 sources: HashMap::new(),
66 transforms: HashMap::new(),
67 destinations: HashMap::new(),
68 encoders: HashMap::new(),
69 forwarders: HashMap::new(),
70 component_registry,
71 interconnect_capacity: super::DEFAULT_INTERCONNECT_CAPACITY,
72 }
73 }
74
75 pub fn with_interconnect_capacity(&mut self, capacity: NonZeroUsize) -> &mut Self {
83 self.interconnect_capacity = capacity;
84 self.recalculate_bounds();
85 self
86 }
87
88 fn recalculate_bounds(&mut self) {
89 let interconnect_capacity = self.interconnect_capacity.get();
90
91 let mut bounds_builder = self.component_registry.bounds_builder();
92 let mut bounds_builder = bounds_builder.subcomponent("interconnects");
93 bounds_builder.reset();
94
95 let total_interconnect_capacity = interconnect_capacity * (self.transforms.len() + self.destinations.len());
100 bounds_builder
101 .minimum()
102 .with_array::<EventsBuffer>("events", total_interconnect_capacity);
103
104 let max_in_flight_event_buffers = ((self.transforms.len() + self.destinations.len()) * interconnect_capacity)
115 + self.sources.len()
116 + self.transforms.len();
117
118 bounds_builder
119 .firm()
120 .with_expr(UsageExpr::product(
122 "events",
123 UsageExpr::constant("max in-flight event buffers", max_in_flight_event_buffers),
124 UsageExpr::sum(
125 "",
126 UsageExpr::struct_size::<EventsBuffer>("events buffer"),
127 UsageExpr::product(
128 "",
129 UsageExpr::struct_size::<Event>("event"),
130 UsageExpr::constant("default event buffer capacity", DEFAULT_EVENTS_BUFFER_CAPACITY),
131 ),
132 ),
133 ));
134 }
135
136 pub fn add_source<I, B>(&mut self, component_id: I, builder: B) -> Result<&mut Self, GenericError>
142 where
143 I: AsRef<str>,
144 B: SourceBuilder + Send + 'static,
145 {
146 let component_id = self
147 .graph
148 .add_source(component_id, &builder)
149 .error_context("Failed to add source to topology graph.")?;
150
151 let mut source_registry = self
152 .component_registry
153 .get_or_create(format!("components.sources.{}", component_id));
154 let mut bounds_builder = source_registry.bounds_builder();
155 builder.specify_bounds(&mut bounds_builder);
156
157 self.recalculate_bounds();
158
159 let _ = self.sources.insert(
160 component_id,
161 RegisteredComponent::new(Box::new(builder), source_registry),
162 );
163
164 Ok(self)
165 }
166
167 pub fn add_transform<I, B>(&mut self, component_id: I, builder: B) -> Result<&mut Self, GenericError>
173 where
174 I: AsRef<str>,
175 B: TransformBuilder + Send + 'static,
176 {
177 let component_id = self
178 .graph
179 .add_transform(component_id, &builder)
180 .error_context("Failed to add transform to topology graph.")?;
181
182 let mut transform_registry = self
183 .component_registry
184 .get_or_create(format!("components.transforms.{}", component_id));
185 let mut bounds_builder = transform_registry.bounds_builder();
186 builder.specify_bounds(&mut bounds_builder);
187
188 self.recalculate_bounds();
189
190 let _ = self.transforms.insert(
191 component_id,
192 RegisteredComponent::new(Box::new(builder), transform_registry),
193 );
194
195 Ok(self)
196 }
197
198 pub fn add_destination<I, B>(&mut self, component_id: I, builder: B) -> Result<&mut Self, GenericError>
204 where
205 I: AsRef<str>,
206 B: DestinationBuilder + Send + 'static,
207 {
208 let component_id = self
209 .graph
210 .add_destination(component_id, &builder)
211 .error_context("Failed to add destination to topology graph.")?;
212
213 let mut destination_registry = self
214 .component_registry
215 .get_or_create(format!("components.destinations.{}", component_id));
216 let mut bounds_builder = destination_registry.bounds_builder();
217 builder.specify_bounds(&mut bounds_builder);
218
219 self.recalculate_bounds();
220
221 let _ = self.destinations.insert(
222 component_id,
223 RegisteredComponent::new(Box::new(builder), destination_registry),
224 );
225
226 Ok(self)
227 }
228
229 pub fn add_encoder<I, B>(&mut self, component_id: I, builder: B) -> Result<&mut Self, GenericError>
235 where
236 I: AsRef<str>,
237 B: EncoderBuilder + Send + 'static,
238 {
239 let component_id = self
240 .graph
241 .add_encoder(component_id, &builder)
242 .error_context("Failed to add encoder to topology graph.")?;
243
244 let mut encoder_registry = self
245 .component_registry
246 .get_or_create(format!("components.encoders.{}", component_id));
247 let mut bounds_builder = encoder_registry.bounds_builder();
248 builder.specify_bounds(&mut bounds_builder);
249
250 self.recalculate_bounds();
251
252 let _ = self.encoders.insert(
253 component_id,
254 RegisteredComponent::new(Box::new(builder), encoder_registry),
255 );
256
257 Ok(self)
258 }
259
260 pub fn add_forwarder<I, B>(&mut self, component_id: I, builder: B) -> Result<&mut Self, GenericError>
266 where
267 I: AsRef<str>,
268 B: ForwarderBuilder + Send + 'static,
269 {
270 let component_id = self
271 .graph
272 .add_forwarder(component_id, &builder)
273 .error_context("Failed to add forwarder to topology graph.")?;
274
275 let mut forwarder_registry = self
276 .component_registry
277 .get_or_create(format!("components.forwarders.{}", component_id));
278 let mut bounds_builder = forwarder_registry.bounds_builder();
279 builder.specify_bounds(&mut bounds_builder);
280
281 self.recalculate_bounds();
282
283 let _ = self.forwarders.insert(
284 component_id,
285 RegisteredComponent::new(Box::new(builder), forwarder_registry),
286 );
287
288 Ok(self)
289 }
290
291 pub fn connect_component<DI, SI, I>(
298 &mut self, destination_component_id: DI, source_output_component_ids: SI,
299 ) -> Result<&mut Self, GenericError>
300 where
301 DI: AsRef<str>,
302 SI: IntoIterator<Item = I>,
303 I: AsRef<str>,
304 {
305 for source_output_component_id in source_output_component_ids.into_iter() {
306 self.graph
307 .add_edge(source_output_component_id, destination_component_id.as_ref())
308 .error_context("Failed to add component connection to topology graph.")?;
309 }
310
311 Ok(self)
312 }
313
314 pub async fn build(mut self) -> Result<BuiltTopology, GenericError> {
320 self.graph.validate().error_context("Failed to build topology graph.")?;
321
322 let mut sources = HashMap::new();
323 for (id, builder) in self.sources {
324 let (builder, mut component_registry) = builder.into_parts();
325 let allocation_token = component_registry.token();
326
327 let component_context = ComponentContext::source(id.clone());
328 let source = builder
329 .build(component_context)
330 .track_allocations(allocation_token)
331 .await
332 .with_error_context(|| format!("Failed to build source '{}'.", id))?;
333
334 sources.insert(
335 id,
336 RegisteredComponent::new(source.track_allocations(allocation_token), component_registry),
337 );
338 }
339
340 let mut transforms = HashMap::new();
341 for (id, builder) in self.transforms {
342 let (builder, mut component_registry) = builder.into_parts();
343 let allocation_token = component_registry.token();
344
345 let component_context = ComponentContext::transform(id.clone());
346 let transform = builder
347 .build(component_context)
348 .track_allocations(allocation_token)
349 .await
350 .with_error_context(|| format!("Failed to build transform '{}'.", id))?;
351
352 transforms.insert(
353 id,
354 RegisteredComponent::new(transform.track_allocations(allocation_token), component_registry),
355 );
356 }
357
358 let mut destinations = HashMap::new();
359 for (id, builder) in self.destinations {
360 let (builder, mut component_registry) = builder.into_parts();
361 let allocation_token = component_registry.token();
362
363 let component_context = ComponentContext::destination(id.clone());
364 let destination = builder
365 .build(component_context)
366 .track_allocations(allocation_token)
367 .await
368 .with_error_context(|| format!("Failed to build destination '{}'.", id))?;
369
370 destinations.insert(
371 id,
372 RegisteredComponent::new(destination.track_allocations(allocation_token), component_registry),
373 );
374 }
375
376 let mut encoders = HashMap::new();
377 for (id, builder) in self.encoders {
378 let (builder, mut component_registry) = builder.into_parts();
379 let allocation_token = component_registry.token();
380
381 let component_context = ComponentContext::encoder(id.clone());
382 let encoder = builder
383 .build(component_context)
384 .track_allocations(allocation_token)
385 .await
386 .with_error_context(|| format!("Failed to build encoder '{}'.", id))?;
387
388 encoders.insert(
389 id,
390 RegisteredComponent::new(encoder.track_allocations(allocation_token), component_registry),
391 );
392 }
393
394 let mut forwarders = HashMap::new();
395 for (id, builder) in self.forwarders {
396 let (builder, mut component_registry) = builder.into_parts();
397 let allocation_token = component_registry.token();
398
399 let component_context = ComponentContext::forwarder(id.clone());
400 let forwarder = builder
401 .build(component_context)
402 .track_allocations(allocation_token)
403 .await
404 .with_error_context(|| format!("Failed to build forwarder '{}'.", id))?;
405
406 forwarders.insert(
407 id,
408 RegisteredComponent::new(forwarder.track_allocations(allocation_token), component_registry),
409 );
410 }
411
412 Ok(BuiltTopology::from_parts(
413 self.name,
414 self.graph,
415 sources,
416 transforms,
417 destinations,
418 encoders,
419 forwarders,
420 self.component_registry.token(),
421 self.interconnect_capacity,
422 ))
423 }
424}