saluki_core/topology/
blueprint.rs

1use std::{collections::HashMap, num::NonZeroUsize};
2
3use memory_accounting::{allocator::Track as _, ComponentRegistry, UsageExpr};
4use saluki_error::{ErrorContext as _, GenericError};
5use snafu::Snafu;
6
7use super::{
8    built::BuiltTopology,
9    graph::{Graph, GraphError},
10    ComponentId, RegisteredComponent,
11};
12use crate::{
13    components::{
14        destinations::DestinationBuilder, encoders::EncoderBuilder, forwarders::ForwarderBuilder,
15        sources::SourceBuilder, transforms::TransformBuilder, ComponentContext,
16    },
17    data_model::event::Event,
18    topology::{EventsBuffer, DEFAULT_EVENTS_BUFFER_CAPACITY},
19};
20
21/// A topology blueprint error.
22#[derive(Debug, Snafu)]
23#[snafu(context(suffix(false)))]
24pub enum BlueprintError {
25    /// Adding a component/connection lead to an invalid graph.
26    #[snafu(display("Failed to build/validate topology graph: {}", source))]
27    InvalidGraph {
28        /// The underlying graph error.
29        source: GraphError,
30    },
31
32    /// Failed to build a component.
33    #[snafu(display("Failed to build component '{}': {}", id, source))]
34    FailedToBuildComponent {
35        /// Component ID for the component that failed to build.
36        id: ComponentId,
37
38        /// The underlying component build error.
39        source: GenericError,
40    },
41}
42
43/// A topology blueprint represents a directed graph of components.
44pub struct TopologyBlueprint {
45    name: String,
46    graph: Graph,
47    sources: HashMap<ComponentId, RegisteredComponent<Box<dyn SourceBuilder + Send>>>,
48    transforms: HashMap<ComponentId, RegisteredComponent<Box<dyn TransformBuilder + Send>>>,
49    destinations: HashMap<ComponentId, RegisteredComponent<Box<dyn DestinationBuilder + Send>>>,
50    encoders: HashMap<ComponentId, RegisteredComponent<Box<dyn EncoderBuilder + Send>>>,
51    forwarders: HashMap<ComponentId, RegisteredComponent<Box<dyn ForwarderBuilder + Send>>>,
52    component_registry: ComponentRegistry,
53    interconnect_capacity: NonZeroUsize,
54}
55
56impl TopologyBlueprint {
57    /// Creates an empty `TopologyBlueprint` with the given name.
58    pub fn new(name: &str, component_registry: &ComponentRegistry) -> Self {
59        // Create a nested component registry for this topology.
60        let component_registry = component_registry.get_or_create("topology").get_or_create(name);
61
62        Self {
63            name: name.to_string(),
64            graph: Graph::default(),
65            sources: HashMap::new(),
66            transforms: HashMap::new(),
67            destinations: HashMap::new(),
68            encoders: HashMap::new(),
69            forwarders: HashMap::new(),
70            component_registry,
71            interconnect_capacity: super::DEFAULT_INTERCONNECT_CAPACITY,
72        }
73    }
74
75    /// Sets the capacity of interconnects in the topology.
76    ///
77    /// Interconnects are used to connect components to one another. Once their capacity is reached, no more items can be sent
78    /// through until in-flight items are processed. This will apply backpressure to the upstream components. Raising or lowering
79    /// the capacity allows trading off throughput at the expense of memory usage.
80    ///
81    /// Defaults to 128.
82    pub fn with_interconnect_capacity(&mut self, capacity: NonZeroUsize) -> &mut Self {
83        self.interconnect_capacity = capacity;
84        self.recalculate_bounds();
85        self
86    }
87
88    fn recalculate_bounds(&mut self) {
89        let interconnect_capacity = self.interconnect_capacity.get();
90
91        let mut bounds_builder = self.component_registry.bounds_builder();
92        let mut bounds_builder = bounds_builder.subcomponent("interconnects");
93        bounds_builder.reset();
94
95        // Adjust the bounds related to interconnects.
96        //
97        // This deals with the minimum size of the interconnects themselves, since they're bounded and thus allocated
98        // up-front. Every non-source component has an interconnect.
99        let total_interconnect_capacity = interconnect_capacity * (self.transforms.len() + self.destinations.len());
100        bounds_builder
101            .minimum()
102            .with_array::<EventsBuffer>("events", total_interconnect_capacity);
103
104        // TODO: Add a minimum subitem for payloads when we have payload interconnects.
105
106        // Adjust the bounds related to event buffers themselves.
107        //
108        // We calculate the maximum number of event buffers by adding up the total capacity of all non-source components, plus the count
109        // of non-destination components. This is the effective upper bound because once all component channels are full, sending
110        // components can only allocate one more event buffer before being blocked on sending, which is then the effective upper bound.
111        //
112        // TODO: Somewhat fragile. Need to revisit this.
113        // TODO: Add a firm subitem for payloads when we have payload interconnects.
114        let max_in_flight_event_buffers = ((self.transforms.len() + self.destinations.len()) * interconnect_capacity)
115            + self.sources.len()
116            + self.transforms.len();
117
118        bounds_builder
119            .firm()
120            // max_in_flight_event_buffers * (size_of<EventsContainer> + (size_of<Event> * default_event_buffer_capacity))
121            .with_expr(UsageExpr::product(
122                "events",
123                UsageExpr::constant("max in-flight event buffers", max_in_flight_event_buffers),
124                UsageExpr::sum(
125                    "",
126                    UsageExpr::struct_size::<EventsBuffer>("events buffer"),
127                    UsageExpr::product(
128                        "",
129                        UsageExpr::struct_size::<Event>("event"),
130                        UsageExpr::constant("default event buffer capacity", DEFAULT_EVENTS_BUFFER_CAPACITY),
131                    ),
132                ),
133            ));
134    }
135
136    /// Adds a source component to the blueprint.
137    ///
138    /// # Errors
139    ///
140    /// If the component ID is invalid or the component cannot be added to the graph, an error is returned.
141    pub fn add_source<I, B>(&mut self, component_id: I, builder: B) -> Result<&mut Self, GenericError>
142    where
143        I: AsRef<str>,
144        B: SourceBuilder + Send + 'static,
145    {
146        let component_id = self
147            .graph
148            .add_source(component_id, &builder)
149            .error_context("Failed to add source to topology graph.")?;
150
151        let mut source_registry = self
152            .component_registry
153            .get_or_create(format!("components.sources.{}", component_id));
154        let mut bounds_builder = source_registry.bounds_builder();
155        builder.specify_bounds(&mut bounds_builder);
156
157        self.recalculate_bounds();
158
159        let _ = self.sources.insert(
160            component_id,
161            RegisteredComponent::new(Box::new(builder), source_registry),
162        );
163
164        Ok(self)
165    }
166
167    /// Adds a transform component to the blueprint.
168    ///
169    /// # Errors
170    ///
171    /// If the component ID is invalid or the component cannot be added to the graph, an error is returned.
172    pub fn add_transform<I, B>(&mut self, component_id: I, builder: B) -> Result<&mut Self, GenericError>
173    where
174        I: AsRef<str>,
175        B: TransformBuilder + Send + 'static,
176    {
177        let component_id = self
178            .graph
179            .add_transform(component_id, &builder)
180            .error_context("Failed to add transform to topology graph.")?;
181
182        let mut transform_registry = self
183            .component_registry
184            .get_or_create(format!("components.transforms.{}", component_id));
185        let mut bounds_builder = transform_registry.bounds_builder();
186        builder.specify_bounds(&mut bounds_builder);
187
188        self.recalculate_bounds();
189
190        let _ = self.transforms.insert(
191            component_id,
192            RegisteredComponent::new(Box::new(builder), transform_registry),
193        );
194
195        Ok(self)
196    }
197
198    /// Adds a destination component to the blueprint.
199    ///
200    /// # Errors
201    ///
202    /// If the component ID is invalid or the component cannot be added to the graph, an error is returned.
203    pub fn add_destination<I, B>(&mut self, component_id: I, builder: B) -> Result<&mut Self, GenericError>
204    where
205        I: AsRef<str>,
206        B: DestinationBuilder + Send + 'static,
207    {
208        let component_id = self
209            .graph
210            .add_destination(component_id, &builder)
211            .error_context("Failed to add destination to topology graph.")?;
212
213        let mut destination_registry = self
214            .component_registry
215            .get_or_create(format!("components.destinations.{}", component_id));
216        let mut bounds_builder = destination_registry.bounds_builder();
217        builder.specify_bounds(&mut bounds_builder);
218
219        self.recalculate_bounds();
220
221        let _ = self.destinations.insert(
222            component_id,
223            RegisteredComponent::new(Box::new(builder), destination_registry),
224        );
225
226        Ok(self)
227    }
228
229    /// Adds an encoder component to the blueprint.
230    ///
231    /// # Errors
232    ///
233    /// If the component ID is invalid or the component cannot be added to the graph, an error is returned.
234    pub fn add_encoder<I, B>(&mut self, component_id: I, builder: B) -> Result<&mut Self, GenericError>
235    where
236        I: AsRef<str>,
237        B: EncoderBuilder + Send + 'static,
238    {
239        let component_id = self
240            .graph
241            .add_encoder(component_id, &builder)
242            .error_context("Failed to add encoder to topology graph.")?;
243
244        let mut encoder_registry = self
245            .component_registry
246            .get_or_create(format!("components.encoders.{}", component_id));
247        let mut bounds_builder = encoder_registry.bounds_builder();
248        builder.specify_bounds(&mut bounds_builder);
249
250        self.recalculate_bounds();
251
252        let _ = self.encoders.insert(
253            component_id,
254            RegisteredComponent::new(Box::new(builder), encoder_registry),
255        );
256
257        Ok(self)
258    }
259
260    /// Adds a forwarder component to the blueprint.
261    ///
262    /// # Errors
263    ///
264    /// If the component ID is invalid or the component cannot be added to the graph, an error is returned.
265    pub fn add_forwarder<I, B>(&mut self, component_id: I, builder: B) -> Result<&mut Self, GenericError>
266    where
267        I: AsRef<str>,
268        B: ForwarderBuilder + Send + 'static,
269    {
270        let component_id = self
271            .graph
272            .add_forwarder(component_id, &builder)
273            .error_context("Failed to add forwarder to topology graph.")?;
274
275        let mut forwarder_registry = self
276            .component_registry
277            .get_or_create(format!("components.forwarders.{}", component_id));
278        let mut bounds_builder = forwarder_registry.bounds_builder();
279        builder.specify_bounds(&mut bounds_builder);
280
281        self.recalculate_bounds();
282
283        let _ = self.forwarders.insert(
284            component_id,
285            RegisteredComponent::new(Box::new(builder), forwarder_registry),
286        );
287
288        Ok(self)
289    }
290
291    /// Connects one or more source component outputs to a destination component.
292    ///
293    /// # Errors
294    ///
295    /// If the destination component ID, or any of the source component IDs, are invalid or do not exist, or if the data
296    /// types between one of the source/destination component pairs is incompatible, an error is returned.
297    pub fn connect_component<DI, SI, I>(
298        &mut self, destination_component_id: DI, source_output_component_ids: SI,
299    ) -> Result<&mut Self, GenericError>
300    where
301        DI: AsRef<str>,
302        SI: IntoIterator<Item = I>,
303        I: AsRef<str>,
304    {
305        for source_output_component_id in source_output_component_ids.into_iter() {
306            self.graph
307                .add_edge(source_output_component_id, destination_component_id.as_ref())
308                .error_context("Failed to add component connection to topology graph.")?;
309        }
310
311        Ok(self)
312    }
313
314    /// Builds the topology.
315    ///
316    /// # Errors
317    ///
318    /// If any of the components could not be built, an error is returned.
319    pub async fn build(mut self) -> Result<BuiltTopology, GenericError> {
320        self.graph.validate().error_context("Failed to build topology graph.")?;
321
322        let mut sources = HashMap::new();
323        for (id, builder) in self.sources {
324            let (builder, mut component_registry) = builder.into_parts();
325            let allocation_token = component_registry.token();
326
327            let component_context = ComponentContext::source(id.clone());
328            let source = builder
329                .build(component_context)
330                .track_allocations(allocation_token)
331                .await
332                .with_error_context(|| format!("Failed to build source '{}'.", id))?;
333
334            sources.insert(
335                id,
336                RegisteredComponent::new(source.track_allocations(allocation_token), component_registry),
337            );
338        }
339
340        let mut transforms = HashMap::new();
341        for (id, builder) in self.transforms {
342            let (builder, mut component_registry) = builder.into_parts();
343            let allocation_token = component_registry.token();
344
345            let component_context = ComponentContext::transform(id.clone());
346            let transform = builder
347                .build(component_context)
348                .track_allocations(allocation_token)
349                .await
350                .with_error_context(|| format!("Failed to build transform '{}'.", id))?;
351
352            transforms.insert(
353                id,
354                RegisteredComponent::new(transform.track_allocations(allocation_token), component_registry),
355            );
356        }
357
358        let mut destinations = HashMap::new();
359        for (id, builder) in self.destinations {
360            let (builder, mut component_registry) = builder.into_parts();
361            let allocation_token = component_registry.token();
362
363            let component_context = ComponentContext::destination(id.clone());
364            let destination = builder
365                .build(component_context)
366                .track_allocations(allocation_token)
367                .await
368                .with_error_context(|| format!("Failed to build destination '{}'.", id))?;
369
370            destinations.insert(
371                id,
372                RegisteredComponent::new(destination.track_allocations(allocation_token), component_registry),
373            );
374        }
375
376        let mut encoders = HashMap::new();
377        for (id, builder) in self.encoders {
378            let (builder, mut component_registry) = builder.into_parts();
379            let allocation_token = component_registry.token();
380
381            let component_context = ComponentContext::encoder(id.clone());
382            let encoder = builder
383                .build(component_context)
384                .track_allocations(allocation_token)
385                .await
386                .with_error_context(|| format!("Failed to build encoder '{}'.", id))?;
387
388            encoders.insert(
389                id,
390                RegisteredComponent::new(encoder.track_allocations(allocation_token), component_registry),
391            );
392        }
393
394        let mut forwarders = HashMap::new();
395        for (id, builder) in self.forwarders {
396            let (builder, mut component_registry) = builder.into_parts();
397            let allocation_token = component_registry.token();
398
399            let component_context = ComponentContext::forwarder(id.clone());
400            let forwarder = builder
401                .build(component_context)
402                .track_allocations(allocation_token)
403                .await
404                .with_error_context(|| format!("Failed to build forwarder '{}'.", id))?;
405
406            forwarders.insert(
407                id,
408                RegisteredComponent::new(forwarder.track_allocations(allocation_token), component_registry),
409            );
410        }
411
412        Ok(BuiltTopology::from_parts(
413            self.name,
414            self.graph,
415            sources,
416            transforms,
417            destinations,
418            encoders,
419            forwarders,
420            self.component_registry.token(),
421            self.interconnect_capacity,
422        ))
423    }
424}