saluki_core/topology/
blueprint.rs

1use std::{collections::HashMap, num::NonZeroUsize};
2
3use memory_accounting::{allocator::Track as _, ComponentRegistry, UsageExpr};
4use saluki_error::{ErrorContext as _, GenericError};
5use snafu::Snafu;
6
7use super::{
8    built::BuiltTopology,
9    graph::{Graph, GraphError},
10    ComponentId, RegisteredComponent,
11};
12use crate::{
13    components::{
14        destinations::DestinationBuilder, encoders::EncoderBuilder, forwarders::ForwarderBuilder, relays::RelayBuilder,
15        sources::SourceBuilder, transforms::TransformBuilder, ComponentContext,
16    },
17    data_model::event::Event,
18    topology::{EventsBuffer, DEFAULT_EVENTS_BUFFER_CAPACITY},
19};
20
21/// A topology blueprint error.
22#[derive(Debug, Snafu)]
23#[snafu(context(suffix(false)))]
24pub enum BlueprintError {
25    /// Adding a component/connection lead to an invalid graph.
26    #[snafu(display("Failed to build/validate topology graph: {}", source))]
27    InvalidGraph {
28        /// The underlying graph error.
29        source: GraphError,
30    },
31
32    /// Failed to build a component.
33    #[snafu(display("Failed to build component '{}': {}", id, source))]
34    FailedToBuildComponent {
35        /// Component ID for the component that failed to build.
36        id: ComponentId,
37
38        /// The underlying component build error.
39        source: GenericError,
40    },
41}
42
43/// A topology blueprint represents a directed graph of components.
44pub struct TopologyBlueprint {
45    name: String,
46    graph: Graph,
47    sources: HashMap<ComponentId, RegisteredComponent<Box<dyn SourceBuilder + Send>>>,
48    relays: HashMap<ComponentId, RegisteredComponent<Box<dyn RelayBuilder + Send>>>,
49    transforms: HashMap<ComponentId, RegisteredComponent<Box<dyn TransformBuilder + Send>>>,
50    destinations: HashMap<ComponentId, RegisteredComponent<Box<dyn DestinationBuilder + Send>>>,
51    encoders: HashMap<ComponentId, RegisteredComponent<Box<dyn EncoderBuilder + Send>>>,
52    forwarders: HashMap<ComponentId, RegisteredComponent<Box<dyn ForwarderBuilder + Send>>>,
53    component_registry: ComponentRegistry,
54    interconnect_capacity: NonZeroUsize,
55}
56
57impl TopologyBlueprint {
58    /// Creates an empty `TopologyBlueprint` with the given name.
59    pub fn new(name: &str, component_registry: &ComponentRegistry) -> Self {
60        // Create a nested component registry for this topology.
61        let component_registry = component_registry.get_or_create("topology").get_or_create(name);
62
63        Self {
64            name: name.to_string(),
65            graph: Graph::default(),
66            sources: HashMap::new(),
67            relays: HashMap::new(),
68            transforms: HashMap::new(),
69            destinations: HashMap::new(),
70            encoders: HashMap::new(),
71            forwarders: HashMap::new(),
72            component_registry,
73            interconnect_capacity: super::DEFAULT_INTERCONNECT_CAPACITY,
74        }
75    }
76
77    /// Sets the capacity of interconnects in the topology.
78    ///
79    /// Interconnects are used to connect components to one another. Once their capacity is reached, no more items can be sent
80    /// through until in-flight items are processed. This will apply backpressure to the upstream components. Raising or lowering
81    /// the capacity allows trading off throughput at the expense of memory usage.
82    ///
83    /// Defaults to 128.
84    pub fn with_interconnect_capacity(&mut self, capacity: NonZeroUsize) -> &mut Self {
85        self.interconnect_capacity = capacity;
86        self.recalculate_bounds();
87        self
88    }
89
90    fn recalculate_bounds(&mut self) {
91        let interconnect_capacity = self.interconnect_capacity.get();
92
93        let mut bounds_builder = self.component_registry.bounds_builder();
94        let mut bounds_builder = bounds_builder.subcomponent("interconnects");
95        bounds_builder.reset();
96
97        // Adjust the bounds related to interconnects.
98        //
99        // This deals with the minimum size of the interconnects themselves, since they're bounded and thus allocated
100        // up-front. Every non-source component has an interconnect.
101        let total_interconnect_capacity = interconnect_capacity * (self.transforms.len() + self.destinations.len());
102        bounds_builder
103            .minimum()
104            .with_array::<EventsBuffer>("events", total_interconnect_capacity);
105
106        // TODO: Add a minimum subitem for payloads when we have payload interconnects.
107
108        // Adjust the bounds related to event buffers themselves.
109        //
110        // We calculate the maximum number of event buffers by adding up the total capacity of all non-source components, plus the count
111        // of non-destination components. This is the effective upper bound because once all component channels are full, sending
112        // components can only allocate one more event buffer before being blocked on sending, which is then the effective upper bound.
113        //
114        // TODO: Somewhat fragile. Need to revisit this.
115        // TODO: Add a firm subitem for payloads when we have payload interconnects.
116        let max_in_flight_event_buffers = ((self.transforms.len() + self.destinations.len()) * interconnect_capacity)
117            + self.sources.len()
118            + self.transforms.len();
119
120        bounds_builder
121            .firm()
122            // max_in_flight_event_buffers * (size_of<EventsContainer> + (size_of<Event> * default_event_buffer_capacity))
123            .with_expr(UsageExpr::product(
124                "events",
125                UsageExpr::constant("max in-flight event buffers", max_in_flight_event_buffers),
126                UsageExpr::sum(
127                    "",
128                    UsageExpr::struct_size::<EventsBuffer>("events buffer"),
129                    UsageExpr::product(
130                        "",
131                        UsageExpr::struct_size::<Event>("event"),
132                        UsageExpr::constant("default event buffer capacity", DEFAULT_EVENTS_BUFFER_CAPACITY),
133                    ),
134                ),
135            ));
136    }
137
138    /// Adds a source component to the blueprint.
139    ///
140    /// # Errors
141    ///
142    /// If the component ID is invalid or the component cannot be added to the graph, an error is returned.
143    pub fn add_source<I, B>(&mut self, component_id: I, builder: B) -> Result<&mut Self, GenericError>
144    where
145        I: AsRef<str>,
146        B: SourceBuilder + Send + 'static,
147    {
148        let component_id = self
149            .graph
150            .add_source(component_id, &builder)
151            .error_context("Failed to add source to topology graph.")?;
152
153        let mut source_registry = self
154            .component_registry
155            .get_or_create(format!("components.sources.{}", component_id));
156        let mut bounds_builder = source_registry.bounds_builder();
157        builder.specify_bounds(&mut bounds_builder);
158
159        self.recalculate_bounds();
160
161        let _ = self.sources.insert(
162            component_id,
163            RegisteredComponent::new(Box::new(builder), source_registry),
164        );
165
166        Ok(self)
167    }
168
169    /// Adds a relay component to the blueprint.
170    ///
171    /// # Errors
172    ///
173    /// If the component ID is invalid or the component cannot be added to the graph, an error is returned.
174    pub fn add_relay<I, B>(&mut self, component_id: I, builder: B) -> Result<&mut Self, GenericError>
175    where
176        I: AsRef<str>,
177        B: RelayBuilder + Send + 'static,
178    {
179        let component_id = self
180            .graph
181            .add_relay(component_id, &builder)
182            .error_context("Failed to add relay to topology graph.")?;
183
184        let mut relay_registry = self
185            .component_registry
186            .get_or_create(format!("components.relays.{}", component_id));
187        let mut bounds_builder = relay_registry.bounds_builder();
188        builder.specify_bounds(&mut bounds_builder);
189
190        self.recalculate_bounds();
191
192        let _ = self.relays.insert(
193            component_id,
194            RegisteredComponent::new(Box::new(builder), relay_registry),
195        );
196
197        Ok(self)
198    }
199
200    /// Adds a transform component to the blueprint.
201    ///
202    /// # Errors
203    ///
204    /// If the component ID is invalid or the component cannot be added to the graph, an error is returned.
205    pub fn add_transform<I, B>(&mut self, component_id: I, builder: B) -> Result<&mut Self, GenericError>
206    where
207        I: AsRef<str>,
208        B: TransformBuilder + Send + 'static,
209    {
210        let component_id = self
211            .graph
212            .add_transform(component_id, &builder)
213            .error_context("Failed to add transform to topology graph.")?;
214
215        let mut transform_registry = self
216            .component_registry
217            .get_or_create(format!("components.transforms.{}", component_id));
218        let mut bounds_builder = transform_registry.bounds_builder();
219        builder.specify_bounds(&mut bounds_builder);
220
221        self.recalculate_bounds();
222
223        let _ = self.transforms.insert(
224            component_id,
225            RegisteredComponent::new(Box::new(builder), transform_registry),
226        );
227
228        Ok(self)
229    }
230
231    /// Adds a destination component to the blueprint.
232    ///
233    /// # Errors
234    ///
235    /// If the component ID is invalid or the component cannot be added to the graph, an error is returned.
236    pub fn add_destination<I, B>(&mut self, component_id: I, builder: B) -> Result<&mut Self, GenericError>
237    where
238        I: AsRef<str>,
239        B: DestinationBuilder + Send + 'static,
240    {
241        let component_id = self
242            .graph
243            .add_destination(component_id, &builder)
244            .error_context("Failed to add destination to topology graph.")?;
245
246        let mut destination_registry = self
247            .component_registry
248            .get_or_create(format!("components.destinations.{}", component_id));
249        let mut bounds_builder = destination_registry.bounds_builder();
250        builder.specify_bounds(&mut bounds_builder);
251
252        self.recalculate_bounds();
253
254        let _ = self.destinations.insert(
255            component_id,
256            RegisteredComponent::new(Box::new(builder), destination_registry),
257        );
258
259        Ok(self)
260    }
261
262    /// Adds an encoder component to the blueprint.
263    ///
264    /// # Errors
265    ///
266    /// If the component ID is invalid or the component cannot be added to the graph, an error is returned.
267    pub fn add_encoder<I, B>(&mut self, component_id: I, builder: B) -> Result<&mut Self, GenericError>
268    where
269        I: AsRef<str>,
270        B: EncoderBuilder + Send + 'static,
271    {
272        let component_id = self
273            .graph
274            .add_encoder(component_id, &builder)
275            .error_context("Failed to add encoder to topology graph.")?;
276
277        let mut encoder_registry = self
278            .component_registry
279            .get_or_create(format!("components.encoders.{}", component_id));
280        let mut bounds_builder = encoder_registry.bounds_builder();
281        builder.specify_bounds(&mut bounds_builder);
282
283        self.recalculate_bounds();
284
285        let _ = self.encoders.insert(
286            component_id,
287            RegisteredComponent::new(Box::new(builder), encoder_registry),
288        );
289
290        Ok(self)
291    }
292
293    /// Adds a forwarder component to the blueprint.
294    ///
295    /// # Errors
296    ///
297    /// If the component ID is invalid or the component cannot be added to the graph, an error is returned.
298    pub fn add_forwarder<I, B>(&mut self, component_id: I, builder: B) -> Result<&mut Self, GenericError>
299    where
300        I: AsRef<str>,
301        B: ForwarderBuilder + Send + 'static,
302    {
303        let component_id = self
304            .graph
305            .add_forwarder(component_id, &builder)
306            .error_context("Failed to add forwarder to topology graph.")?;
307
308        let mut forwarder_registry = self
309            .component_registry
310            .get_or_create(format!("components.forwarders.{}", component_id));
311        let mut bounds_builder = forwarder_registry.bounds_builder();
312        builder.specify_bounds(&mut bounds_builder);
313
314        self.recalculate_bounds();
315
316        let _ = self.forwarders.insert(
317            component_id,
318            RegisteredComponent::new(Box::new(builder), forwarder_registry),
319        );
320
321        Ok(self)
322    }
323
324    /// Connects one or more source component outputs to a destination component.
325    ///
326    /// # Errors
327    ///
328    /// If the destination component ID, or any of the source component IDs, are invalid or do not exist, or if the data
329    /// types between one of the source/destination component pairs is incompatible, an error is returned.
330    pub fn connect_component<DI, SI, I>(
331        &mut self, destination_component_id: DI, source_output_component_ids: SI,
332    ) -> Result<&mut Self, GenericError>
333    where
334        DI: AsRef<str>,
335        SI: IntoIterator<Item = I>,
336        I: AsRef<str>,
337    {
338        for source_output_component_id in source_output_component_ids.into_iter() {
339            self.graph
340                .add_edge(source_output_component_id, destination_component_id.as_ref())
341                .error_context("Failed to add component connection to topology graph.")?;
342        }
343
344        Ok(self)
345    }
346
347    /// Builds the topology.
348    ///
349    /// # Errors
350    ///
351    /// If any of the components could not be built, an error is returned.
352    pub async fn build(mut self) -> Result<BuiltTopology, GenericError> {
353        self.graph.validate().error_context("Failed to build topology graph.")?;
354
355        let mut sources = HashMap::new();
356        for (id, builder) in self.sources {
357            let (builder, mut component_registry) = builder.into_parts();
358            let allocation_token = component_registry.token();
359
360            let component_context = ComponentContext::source(id.clone());
361            let source = builder
362                .build(component_context)
363                .track_allocations(allocation_token)
364                .await
365                .with_error_context(|| format!("Failed to build source '{}'.", id))?;
366
367            sources.insert(
368                id,
369                RegisteredComponent::new(source.track_allocations(allocation_token), component_registry),
370            );
371        }
372
373        let mut relays = HashMap::new();
374        for (id, builder) in self.relays {
375            let (builder, mut component_registry) = builder.into_parts();
376            let allocation_token = component_registry.token();
377
378            let component_context = ComponentContext::relay(id.clone());
379            let relay = builder
380                .build(component_context)
381                .track_allocations(allocation_token)
382                .await
383                .with_error_context(|| format!("Failed to build relay '{}'.", id))?;
384
385            relays.insert(
386                id,
387                RegisteredComponent::new(relay.track_allocations(allocation_token), component_registry),
388            );
389        }
390
391        let mut transforms = HashMap::new();
392        for (id, builder) in self.transforms {
393            let (builder, mut component_registry) = builder.into_parts();
394            let allocation_token = component_registry.token();
395
396            let component_context = ComponentContext::transform(id.clone());
397            let transform = builder
398                .build(component_context)
399                .track_allocations(allocation_token)
400                .await
401                .with_error_context(|| format!("Failed to build transform '{}'.", id))?;
402
403            transforms.insert(
404                id,
405                RegisteredComponent::new(transform.track_allocations(allocation_token), component_registry),
406            );
407        }
408
409        let mut destinations = HashMap::new();
410        for (id, builder) in self.destinations {
411            let (builder, mut component_registry) = builder.into_parts();
412            let allocation_token = component_registry.token();
413
414            let component_context = ComponentContext::destination(id.clone());
415            let destination = builder
416                .build(component_context)
417                .track_allocations(allocation_token)
418                .await
419                .with_error_context(|| format!("Failed to build destination '{}'.", id))?;
420
421            destinations.insert(
422                id,
423                RegisteredComponent::new(destination.track_allocations(allocation_token), component_registry),
424            );
425        }
426
427        let mut encoders = HashMap::new();
428        for (id, builder) in self.encoders {
429            let (builder, mut component_registry) = builder.into_parts();
430            let allocation_token = component_registry.token();
431
432            let component_context = ComponentContext::encoder(id.clone());
433            let encoder = builder
434                .build(component_context)
435                .track_allocations(allocation_token)
436                .await
437                .with_error_context(|| format!("Failed to build encoder '{}'.", id))?;
438
439            encoders.insert(
440                id,
441                RegisteredComponent::new(encoder.track_allocations(allocation_token), component_registry),
442            );
443        }
444
445        let mut forwarders = HashMap::new();
446        for (id, builder) in self.forwarders {
447            let (builder, mut component_registry) = builder.into_parts();
448            let allocation_token = component_registry.token();
449
450            let component_context = ComponentContext::forwarder(id.clone());
451            let forwarder = builder
452                .build(component_context)
453                .track_allocations(allocation_token)
454                .await
455                .with_error_context(|| format!("Failed to build forwarder '{}'.", id))?;
456
457            forwarders.insert(
458                id,
459                RegisteredComponent::new(forwarder.track_allocations(allocation_token), component_registry),
460            );
461        }
462
463        Ok(BuiltTopology::from_parts(
464            self.name,
465            self.graph,
466            sources,
467            relays,
468            transforms,
469            destinations,
470            encoders,
471            forwarders,
472            self.component_registry.token(),
473            self.interconnect_capacity,
474        ))
475    }
476}