saluki_core/topology/
blueprint.rs

1use std::{collections::HashMap, num::NonZeroUsize};
2
3use memory_accounting::{allocator::Track as _, ComponentRegistry, UsageExpr};
4use saluki_error::{ErrorContext as _, GenericError};
5use snafu::Snafu;
6
7use super::{
8    built::BuiltTopology,
9    graph::{Graph, GraphError},
10    ComponentId, RegisteredComponent,
11};
12use crate::{
13    components::{
14        decoders::DecoderBuilder, destinations::DestinationBuilder, encoders::EncoderBuilder,
15        forwarders::ForwarderBuilder, relays::RelayBuilder, sources::SourceBuilder, transforms::TransformBuilder,
16        ComponentContext,
17    },
18    data_model::event::Event,
19    topology::{EventsBuffer, DEFAULT_EVENTS_BUFFER_CAPACITY},
20};
21
22/// A topology blueprint error.
23#[derive(Debug, Snafu)]
24#[snafu(context(suffix(false)))]
25pub enum BlueprintError {
26    /// Adding a component/connection lead to an invalid graph.
27    #[snafu(display("Failed to build/validate topology graph: {}", source))]
28    InvalidGraph {
29        /// The underlying graph error.
30        source: GraphError,
31    },
32
33    /// Failed to build a component.
34    #[snafu(display("Failed to build component '{}': {}", id, source))]
35    FailedToBuildComponent {
36        /// Component ID for the component that failed to build.
37        id: ComponentId,
38
39        /// The underlying component build error.
40        source: GenericError,
41    },
42}
43
44/// A topology blueprint represents a directed graph of components.
45pub struct TopologyBlueprint {
46    name: String,
47    graph: Graph,
48    sources: HashMap<ComponentId, RegisteredComponent<Box<dyn SourceBuilder + Send>>>,
49    relays: HashMap<ComponentId, RegisteredComponent<Box<dyn RelayBuilder + Send>>>,
50    decoders: HashMap<ComponentId, RegisteredComponent<Box<dyn DecoderBuilder + Send>>>,
51    transforms: HashMap<ComponentId, RegisteredComponent<Box<dyn TransformBuilder + Send>>>,
52    destinations: HashMap<ComponentId, RegisteredComponent<Box<dyn DestinationBuilder + Send>>>,
53    encoders: HashMap<ComponentId, RegisteredComponent<Box<dyn EncoderBuilder + Send>>>,
54    forwarders: HashMap<ComponentId, RegisteredComponent<Box<dyn ForwarderBuilder + Send>>>,
55    component_registry: ComponentRegistry,
56    interconnect_capacity: NonZeroUsize,
57}
58
59impl TopologyBlueprint {
60    /// Creates an empty `TopologyBlueprint` with the given name.
61    pub fn new(name: &str, component_registry: &ComponentRegistry) -> Self {
62        // Create a nested component registry for this topology.
63        let component_registry = component_registry.get_or_create("topology").get_or_create(name);
64
65        Self {
66            name: name.to_string(),
67            graph: Graph::default(),
68            sources: HashMap::new(),
69            relays: HashMap::new(),
70            decoders: HashMap::new(),
71            transforms: HashMap::new(),
72            destinations: HashMap::new(),
73            encoders: HashMap::new(),
74            forwarders: HashMap::new(),
75            component_registry,
76            interconnect_capacity: super::DEFAULT_INTERCONNECT_CAPACITY,
77        }
78    }
79
80    /// Sets the capacity of interconnects in the topology.
81    ///
82    /// Interconnects are used to connect components to one another. Once their capacity is reached, no more items can be sent
83    /// through until in-flight items are processed. This will apply backpressure to the upstream components. Raising or lowering
84    /// the capacity allows trading off throughput at the expense of memory usage.
85    ///
86    /// Defaults to 128.
87    pub fn with_interconnect_capacity(&mut self, capacity: NonZeroUsize) -> &mut Self {
88        self.interconnect_capacity = capacity;
89        self.recalculate_bounds();
90        self
91    }
92
93    fn recalculate_bounds(&mut self) {
94        let interconnect_capacity = self.interconnect_capacity.get();
95
96        let mut bounds_builder = self.component_registry.bounds_builder();
97        let mut bounds_builder = bounds_builder.subcomponent("interconnects");
98        bounds_builder.reset();
99
100        // Adjust the bounds related to interconnects.
101        //
102        // This deals with the minimum size of the interconnects themselves, since they're bounded and thus allocated
103        // up-front. Every non-source component has an interconnect.
104        let total_interconnect_capacity = interconnect_capacity * (self.transforms.len() + self.destinations.len());
105        bounds_builder
106            .minimum()
107            .with_array::<EventsBuffer>("events", total_interconnect_capacity);
108
109        // TODO: Add a minimum subitem for payloads when we have payload interconnects.
110
111        // Adjust the bounds related to event buffers themselves.
112        //
113        // We calculate the maximum number of event buffers by adding up the total capacity of all non-source components, plus the count
114        // of non-destination components. This is the effective upper bound because once all component channels are full, sending
115        // components can only allocate one more event buffer before being blocked on sending, which is then the effective upper bound.
116        //
117        // TODO: Somewhat fragile. Need to revisit this.
118        // TODO: Add a firm subitem for payloads when we have payload interconnects.
119        let max_in_flight_event_buffers = ((self.transforms.len() + self.destinations.len()) * interconnect_capacity)
120            + self.sources.len()
121            + self.decoders.len()
122            + self.transforms.len();
123
124        bounds_builder
125            .firm()
126            // max_in_flight_event_buffers * (size_of<EventsContainer> + (size_of<Event> * default_event_buffer_capacity))
127            .with_expr(UsageExpr::product(
128                "events",
129                UsageExpr::constant("max in-flight event buffers", max_in_flight_event_buffers),
130                UsageExpr::sum(
131                    "",
132                    UsageExpr::struct_size::<EventsBuffer>("events buffer"),
133                    UsageExpr::product(
134                        "",
135                        UsageExpr::struct_size::<Event>("event"),
136                        UsageExpr::constant("default event buffer capacity", DEFAULT_EVENTS_BUFFER_CAPACITY),
137                    ),
138                ),
139            ));
140    }
141
142    /// Adds a source component to the blueprint.
143    ///
144    /// # Errors
145    ///
146    /// If the component ID is invalid or the component cannot be added to the graph, an error is returned.
147    pub fn add_source<I, B>(&mut self, component_id: I, builder: B) -> Result<&mut Self, GenericError>
148    where
149        I: AsRef<str>,
150        B: SourceBuilder + Send + 'static,
151    {
152        let component_id = self
153            .graph
154            .add_source(component_id, &builder)
155            .error_context("Failed to add source to topology graph.")?;
156
157        let mut source_registry = self
158            .component_registry
159            .get_or_create(format!("components.sources.{}", component_id));
160        let mut bounds_builder = source_registry.bounds_builder();
161        builder.specify_bounds(&mut bounds_builder);
162
163        self.recalculate_bounds();
164
165        let _ = self.sources.insert(
166            component_id,
167            RegisteredComponent::new(Box::new(builder), source_registry),
168        );
169
170        Ok(self)
171    }
172
173    /// Adds a relay component to the blueprint.
174    ///
175    /// # Errors
176    ///
177    /// If the component ID is invalid or the component cannot be added to the graph, an error is returned.
178    pub fn add_relay<I, B>(&mut self, component_id: I, builder: B) -> Result<&mut Self, GenericError>
179    where
180        I: AsRef<str>,
181        B: RelayBuilder + Send + 'static,
182    {
183        let component_id = self
184            .graph
185            .add_relay(component_id, &builder)
186            .error_context("Failed to add relay to topology graph.")?;
187
188        let mut relay_registry = self
189            .component_registry
190            .get_or_create(format!("components.relays.{}", component_id));
191        let mut bounds_builder = relay_registry.bounds_builder();
192        builder.specify_bounds(&mut bounds_builder);
193
194        self.recalculate_bounds();
195
196        let _ = self.relays.insert(
197            component_id,
198            RegisteredComponent::new(Box::new(builder), relay_registry),
199        );
200
201        Ok(self)
202    }
203
204    /// Adds a decoder component to the blueprint.
205    ///
206    /// # Errors
207    ///
208    /// If the component ID is invalid or the component cannot be added to the graph, an error is returned.
209    pub fn add_decoder<I, B>(&mut self, component_id: I, builder: B) -> Result<&mut Self, GenericError>
210    where
211        I: AsRef<str>,
212        B: DecoderBuilder + Send + 'static,
213    {
214        let component_id = self
215            .graph
216            .add_decoder(component_id, &builder)
217            .error_context("Failed to add decoder to topology graph.")?;
218
219        let mut decoder_registry = self
220            .component_registry
221            .get_or_create(format!("components.decoders.{}", component_id));
222        let mut bounds_builder = decoder_registry.bounds_builder();
223        builder.specify_bounds(&mut bounds_builder);
224
225        self.recalculate_bounds();
226
227        let _ = self.decoders.insert(
228            component_id,
229            RegisteredComponent::new(Box::new(builder), decoder_registry),
230        );
231
232        Ok(self)
233    }
234
235    /// Adds a transform component to the blueprint.
236    ///
237    /// # Errors
238    ///
239    /// If the component ID is invalid or the component cannot be added to the graph, an error is returned.
240    pub fn add_transform<I, B>(&mut self, component_id: I, builder: B) -> Result<&mut Self, GenericError>
241    where
242        I: AsRef<str>,
243        B: TransformBuilder + Send + 'static,
244    {
245        let component_id = self
246            .graph
247            .add_transform(component_id, &builder)
248            .error_context("Failed to add transform to topology graph.")?;
249
250        let mut transform_registry = self
251            .component_registry
252            .get_or_create(format!("components.transforms.{}", component_id));
253        let mut bounds_builder = transform_registry.bounds_builder();
254        builder.specify_bounds(&mut bounds_builder);
255
256        self.recalculate_bounds();
257
258        let _ = self.transforms.insert(
259            component_id,
260            RegisteredComponent::new(Box::new(builder), transform_registry),
261        );
262
263        Ok(self)
264    }
265
266    /// Adds a destination component to the blueprint.
267    ///
268    /// # Errors
269    ///
270    /// If the component ID is invalid or the component cannot be added to the graph, an error is returned.
271    pub fn add_destination<I, B>(&mut self, component_id: I, builder: B) -> Result<&mut Self, GenericError>
272    where
273        I: AsRef<str>,
274        B: DestinationBuilder + Send + 'static,
275    {
276        let component_id = self
277            .graph
278            .add_destination(component_id, &builder)
279            .error_context("Failed to add destination to topology graph.")?;
280
281        let mut destination_registry = self
282            .component_registry
283            .get_or_create(format!("components.destinations.{}", component_id));
284        let mut bounds_builder = destination_registry.bounds_builder();
285        builder.specify_bounds(&mut bounds_builder);
286
287        self.recalculate_bounds();
288
289        let _ = self.destinations.insert(
290            component_id,
291            RegisteredComponent::new(Box::new(builder), destination_registry),
292        );
293
294        Ok(self)
295    }
296
297    /// Adds an encoder component to the blueprint.
298    ///
299    /// # Errors
300    ///
301    /// If the component ID is invalid or the component cannot be added to the graph, an error is returned.
302    pub fn add_encoder<I, B>(&mut self, component_id: I, builder: B) -> Result<&mut Self, GenericError>
303    where
304        I: AsRef<str>,
305        B: EncoderBuilder + Send + 'static,
306    {
307        let component_id = self
308            .graph
309            .add_encoder(component_id, &builder)
310            .error_context("Failed to add encoder to topology graph.")?;
311
312        let mut encoder_registry = self
313            .component_registry
314            .get_or_create(format!("components.encoders.{}", component_id));
315        let mut bounds_builder = encoder_registry.bounds_builder();
316        builder.specify_bounds(&mut bounds_builder);
317
318        self.recalculate_bounds();
319
320        let _ = self.encoders.insert(
321            component_id,
322            RegisteredComponent::new(Box::new(builder), encoder_registry),
323        );
324
325        Ok(self)
326    }
327
328    /// Adds a forwarder component to the blueprint.
329    ///
330    /// # Errors
331    ///
332    /// If the component ID is invalid or the component cannot be added to the graph, an error is returned.
333    pub fn add_forwarder<I, B>(&mut self, component_id: I, builder: B) -> Result<&mut Self, GenericError>
334    where
335        I: AsRef<str>,
336        B: ForwarderBuilder + Send + 'static,
337    {
338        let component_id = self
339            .graph
340            .add_forwarder(component_id, &builder)
341            .error_context("Failed to add forwarder to topology graph.")?;
342
343        let mut forwarder_registry = self
344            .component_registry
345            .get_or_create(format!("components.forwarders.{}", component_id));
346        let mut bounds_builder = forwarder_registry.bounds_builder();
347        builder.specify_bounds(&mut bounds_builder);
348
349        self.recalculate_bounds();
350
351        let _ = self.forwarders.insert(
352            component_id,
353            RegisteredComponent::new(Box::new(builder), forwarder_registry),
354        );
355
356        Ok(self)
357    }
358
359    /// Connects one or more source component outputs to a destination component.
360    ///
361    /// # Errors
362    ///
363    /// If the destination component ID, or any of the source component IDs, are invalid or do not exist, or if the data
364    /// types between one of the source/destination component pairs is incompatible, an error is returned.
365    pub fn connect_component<DI, SI, I>(
366        &mut self, destination_component_id: DI, source_output_component_ids: SI,
367    ) -> Result<&mut Self, GenericError>
368    where
369        DI: AsRef<str>,
370        SI: IntoIterator<Item = I>,
371        I: AsRef<str>,
372    {
373        for source_output_component_id in source_output_component_ids.into_iter() {
374            self.graph
375                .add_edge(source_output_component_id, destination_component_id.as_ref())
376                .error_context("Failed to add component connection to topology graph.")?;
377        }
378
379        Ok(self)
380    }
381
382    /// Builds the topology.
383    ///
384    /// # Errors
385    ///
386    /// If any of the components could not be built, an error is returned.
387    pub async fn build(mut self) -> Result<BuiltTopology, GenericError> {
388        self.graph.validate().error_context("Failed to build topology graph.")?;
389
390        let mut sources = HashMap::new();
391        for (id, builder) in self.sources {
392            let (builder, mut component_registry) = builder.into_parts();
393            let allocation_token = component_registry.token();
394
395            let component_context = ComponentContext::source(id.clone());
396            let source = builder
397                .build(component_context)
398                .track_allocations(allocation_token)
399                .await
400                .with_error_context(|| format!("Failed to build source '{}'.", id))?;
401
402            sources.insert(
403                id,
404                RegisteredComponent::new(source.track_allocations(allocation_token), component_registry),
405            );
406        }
407
408        let mut relays = HashMap::new();
409        for (id, builder) in self.relays {
410            let (builder, mut component_registry) = builder.into_parts();
411            let allocation_token = component_registry.token();
412
413            let component_context = ComponentContext::relay(id.clone());
414            let relay = builder
415                .build(component_context)
416                .track_allocations(allocation_token)
417                .await
418                .with_error_context(|| format!("Failed to build relay '{}'.", id))?;
419
420            relays.insert(
421                id,
422                RegisteredComponent::new(relay.track_allocations(allocation_token), component_registry),
423            );
424        }
425
426        let mut decoders = HashMap::new();
427        for (id, builder) in self.decoders {
428            let (builder, mut component_registry) = builder.into_parts();
429            let allocation_token = component_registry.token();
430
431            let component_context = ComponentContext::decoder(id.clone());
432            let decoder = builder
433                .build(component_context)
434                .track_allocations(allocation_token)
435                .await
436                .with_error_context(|| format!("Failed to build decoder '{}'.", id))?;
437
438            decoders.insert(
439                id,
440                RegisteredComponent::new(decoder.track_allocations(allocation_token), component_registry),
441            );
442        }
443
444        let mut transforms = HashMap::new();
445        for (id, builder) in self.transforms {
446            let (builder, mut component_registry) = builder.into_parts();
447            let allocation_token = component_registry.token();
448
449            let component_context = ComponentContext::transform(id.clone());
450            let transform = builder
451                .build(component_context)
452                .track_allocations(allocation_token)
453                .await
454                .with_error_context(|| format!("Failed to build transform '{}'.", id))?;
455
456            transforms.insert(
457                id,
458                RegisteredComponent::new(transform.track_allocations(allocation_token), component_registry),
459            );
460        }
461
462        let mut destinations = HashMap::new();
463        for (id, builder) in self.destinations {
464            let (builder, mut component_registry) = builder.into_parts();
465            let allocation_token = component_registry.token();
466
467            let component_context = ComponentContext::destination(id.clone());
468            let destination = builder
469                .build(component_context)
470                .track_allocations(allocation_token)
471                .await
472                .with_error_context(|| format!("Failed to build destination '{}'.", id))?;
473
474            destinations.insert(
475                id,
476                RegisteredComponent::new(destination.track_allocations(allocation_token), component_registry),
477            );
478        }
479
480        let mut encoders = HashMap::new();
481        for (id, builder) in self.encoders {
482            let (builder, mut component_registry) = builder.into_parts();
483            let allocation_token = component_registry.token();
484
485            let component_context = ComponentContext::encoder(id.clone());
486            let encoder = builder
487                .build(component_context)
488                .track_allocations(allocation_token)
489                .await
490                .with_error_context(|| format!("Failed to build encoder '{}'.", id))?;
491
492            encoders.insert(
493                id,
494                RegisteredComponent::new(encoder.track_allocations(allocation_token), component_registry),
495            );
496        }
497
498        let mut forwarders = HashMap::new();
499        for (id, builder) in self.forwarders {
500            let (builder, mut component_registry) = builder.into_parts();
501            let allocation_token = component_registry.token();
502
503            let component_context = ComponentContext::forwarder(id.clone());
504            let forwarder = builder
505                .build(component_context)
506                .track_allocations(allocation_token)
507                .await
508                .with_error_context(|| format!("Failed to build forwarder '{}'.", id))?;
509
510            forwarders.insert(
511                id,
512                RegisteredComponent::new(forwarder.track_allocations(allocation_token), component_registry),
513            );
514        }
515
516        Ok(BuiltTopology::from_parts(
517            self.name,
518            self.graph,
519            sources,
520            relays,
521            decoders,
522            transforms,
523            destinations,
524            encoders,
525            forwarders,
526            self.component_registry.token(),
527            self.interconnect_capacity,
528        ))
529    }
530}