Skip to main content

saluki_core/topology/
blueprint.rs

1use std::{collections::HashMap, future::Future, num::NonZeroUsize, pin::Pin, sync::Mutex, time::Duration};
2
3use async_trait::async_trait;
4use resource_accounting::{ComponentRegistry, MemoryLimiter, Track as _, UsageExpr};
5use saluki_common::sync::shutdown::ShutdownHandle;
6use saluki_error::{generic_error, ErrorContext as _, GenericError};
7use snafu::Snafu;
8use tokio::{pin, runtime::Handle, select, sync::oneshot};
9use tracing::{error, info};
10
11use super::{
12    built::{BuiltTopology, WorkerPoolConfiguration},
13    graph::{Graph, GraphError},
14    ComponentId, RegisteredComponent,
15};
16use crate::{
17    components::{
18        decoders::DecoderBuilder, destinations::DestinationBuilder, encoders::EncoderBuilder,
19        forwarders::ForwarderBuilder, relays::RelayBuilder, sources::SourceBuilder, transforms::TransformBuilder,
20        ComponentContext,
21    },
22    data_model::event::Event,
23    health::HealthRegistry,
24    runtime::{state::DataspaceRegistry, InitializationError, ShutdownStrategy, Supervisable, SupervisorFuture},
25    topology::{ids::AsComponentIds, EventsBuffer, DEFAULT_EVENTS_BUFFER_CAPACITY},
26};
27
28const DEFAULT_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(30);
29
30/// A topology blueprint error.
31#[derive(Debug, Snafu)]
32#[snafu(context(suffix(false)))]
33pub enum BlueprintError {
34    /// Adding a component/connection lead to an invalid graph.
35    #[snafu(display("Failed to build/validate topology graph: {}", source))]
36    InvalidGraph {
37        /// The underlying graph error.
38        source: GraphError,
39    },
40
41    /// Failed to build a component.
42    #[snafu(display("Failed to build component '{}': {}", id, source))]
43    FailedToBuildComponent {
44        /// Component ID for the component that failed to build.
45        id: ComponentId,
46
47        /// The underlying component build error.
48        source: GenericError,
49    },
50}
51
52/// A topology blueprint represents a directed graph of components.
53///
54/// A blueprint is assembled by adding components and connecting them together, and then run by adding it to a
55/// [`Supervisor`][crate::runtime::Supervisor]: `TopologyBlueprint` implements [`Supervisable`], so there is no
56/// standalone spawn/run method. A blueprint can only be initialized (and thus run) once.
57pub struct TopologyBlueprint {
58    name: String,
59    build_state: Mutex<Option<TopologyBuildState>>,
60    health_registry: Option<HealthRegistry>,
61    memory_limiter: Option<MemoryLimiter>,
62}
63
64/// The consumable build state of a [`TopologyBlueprint`].
65///
66/// This is taken out of the blueprint when it's first initialized, at which point the topology is built and spawned.
67struct TopologyBuildState {
68    graph: Graph,
69    sources: HashMap<ComponentId, RegisteredComponent<Box<dyn SourceBuilder + Send>>>,
70    relays: HashMap<ComponentId, RegisteredComponent<Box<dyn RelayBuilder + Send>>>,
71    decoders: HashMap<ComponentId, RegisteredComponent<Box<dyn DecoderBuilder + Send>>>,
72    transforms: HashMap<ComponentId, RegisteredComponent<Box<dyn TransformBuilder + Send>>>,
73    destinations: HashMap<ComponentId, RegisteredComponent<Box<dyn DestinationBuilder + Send>>>,
74    encoders: HashMap<ComponentId, RegisteredComponent<Box<dyn EncoderBuilder + Send>>>,
75    forwarders: HashMap<ComponentId, RegisteredComponent<Box<dyn ForwarderBuilder + Send>>>,
76    component_registry: ComponentRegistry,
77    interconnect_capacity: NonZeroUsize,
78    shutdown_timeout: Duration,
79    worker_pool_config: WorkerPoolConfiguration,
80    environment_ready: Option<Pin<Box<dyn Future<Output = ()> + Send>>>,
81    ready_signal: Option<oneshot::Sender<()>>,
82}
83
84impl TopologyBlueprint {
85    /// Creates an empty `TopologyBlueprint` with the given name.
86    pub fn new(name: &str, component_registry: &ComponentRegistry) -> Self {
87        // Create a nested component registry for this topology.
88        let component_registry = component_registry.get_or_create("topology").get_or_create(name);
89
90        let build_state = TopologyBuildState {
91            graph: Graph::default(),
92            sources: HashMap::new(),
93            relays: HashMap::new(),
94            decoders: HashMap::new(),
95            transforms: HashMap::new(),
96            destinations: HashMap::new(),
97            encoders: HashMap::new(),
98            forwarders: HashMap::new(),
99            component_registry,
100            interconnect_capacity: super::DEFAULT_INTERCONNECT_CAPACITY,
101            shutdown_timeout: DEFAULT_SHUTDOWN_TIMEOUT,
102            worker_pool_config: WorkerPoolConfiguration::Dedicated,
103            environment_ready: None,
104            ready_signal: None,
105        };
106
107        Self {
108            name: name.to_string(),
109            build_state: Mutex::new(Some(build_state)),
110            health_registry: None,
111            memory_limiter: None,
112        }
113    }
114
115    /// Gets a mutable reference to the build state.
116    ///
117    /// # Panics
118    ///
119    /// Panics if the blueprint has already been initialized (its build state having been consumed).
120    fn state_mut(&mut self) -> &mut TopologyBuildState {
121        self.build_state
122            .get_mut()
123            .expect("topology blueprint mutex poisoned")
124            .as_mut()
125            .expect("topology blueprint already initialized")
126    }
127
128    /// Sets the capacity of interconnects in the topology.
129    ///
130    /// Interconnects are used to connect components to one another. Once their capacity is reached, no more items can be sent
131    /// through until in-flight items are processed. This will apply backpressure to the upstream components. Raising or lowering
132    /// the capacity allows trading off throughput at the expense of memory usage.
133    ///
134    /// Defaults to 128.
135    pub fn with_interconnect_capacity(&mut self, capacity: NonZeroUsize) -> &mut Self {
136        self.state_mut().set_interconnect_capacity(capacity);
137        self
138    }
139
140    /// Sets how long the topology waits for components to stop during graceful shutdown.
141    ///
142    /// Defaults to 30 seconds.
143    pub fn with_shutdown_timeout(&mut self, timeout: Duration) -> &mut Self {
144        self.state_mut().shutdown_timeout = timeout;
145        self
146    }
147
148    /// Sets the health registry used when the topology is spawned.
149    ///
150    /// This must be set before the blueprint is added to a supervisor; initialization fails otherwise.
151    pub fn with_health_registry(&mut self, health_registry: HealthRegistry) -> &mut Self {
152        self.health_registry = Some(health_registry);
153        self
154    }
155
156    /// Sets the memory limiter used when the topology is spawned.
157    ///
158    /// This must be set before the blueprint is added to a supervisor; initialization fails otherwise.
159    pub fn with_memory_limiter(&mut self, memory_limiter: MemoryLimiter) -> &mut Self {
160        self.memory_limiter = Some(memory_limiter);
161        self
162    }
163
164    /// Sets a readiness signal that must resolve before the topology starts its components.
165    ///
166    /// When set, the topology is still built up front during initialization, but its components are not spawned until
167    /// the given future resolves (or the topology is asked to shut down first). This is used to defer the topology from
168    /// processing data until its dependencies -- such as the environment provider's metadata collectors -- are ready.
169    pub fn with_environment_readiness<F>(&mut self, ready: F) -> &mut Self
170    where
171        F: Future<Output = ()> + Send + 'static,
172    {
173        self.state_mut().environment_ready = Some(Box::pin(ready));
174        self
175    }
176
177    /// Returns a handle for awaiting the readiness of the topology once it's running.
178    ///
179    /// This handle depends on observing the readiness of the individual topology components, and so must be called after
180    /// [`with_health_registry`][Self::with_health_registry].
181    ///
182    /// # Panics
183    ///
184    /// Panics if the health registry has not been set, or if the blueprint has already been initialized.
185    pub fn topology_ready(&mut self) -> TopologyReady {
186        let health_registry = self
187            .health_registry
188            .clone()
189            .expect("health registry must be set before acquiring a topology readiness handle");
190        let component_prefix = format!("{}.", super::health_component_root(&self.name));
191
192        let (registered_tx, registered_rx) = oneshot::channel();
193        self.state_mut().ready_signal = Some(registered_tx);
194
195        TopologyReady {
196            registered_rx,
197            health_registry,
198            component_prefix,
199        }
200    }
201
202    /// Configures the topology to use the ambient Tokio runtime for component subtasks.
203    ///
204    /// Component subtasks will be spawned on whatever runtime is currently active when the topology is initialized.
205    /// This avoids creating a dedicated thread pool, which is useful for resource-constrained environments.
206    pub fn with_ambient_worker_pool(&mut self) -> &mut Self {
207        self.state_mut().worker_pool_config = WorkerPoolConfiguration::Ambient;
208        self
209    }
210
211    /// Configures the topology to use an externally provided Tokio runtime for component subtasks.
212    ///
213    /// Component subtasks will be spawned on the runtime associated with the given handle.
214    pub fn with_explicit_worker_pool(&mut self, handle: Handle) -> &mut Self {
215        self.state_mut().worker_pool_config = WorkerPoolConfiguration::Explicit(handle);
216        self
217    }
218
219    /// Adds a source component to the blueprint.
220    ///
221    /// # Errors
222    ///
223    /// If the component ID is invalid or the component can't be added to the graph, an error is returned.
224    pub fn add_source<I, B>(&mut self, component_id: I, builder: B) -> Result<&mut Self, GenericError>
225    where
226        I: AsRef<str>,
227        B: SourceBuilder + Send + 'static,
228    {
229        self.state_mut().add_source(component_id, builder)?;
230        Ok(self)
231    }
232
233    /// Adds a relay component to the blueprint.
234    ///
235    /// # Errors
236    ///
237    /// If the component ID is invalid or the component can't be added to the graph, an error is returned.
238    pub fn add_relay<I, B>(&mut self, component_id: I, builder: B) -> Result<&mut Self, GenericError>
239    where
240        I: AsRef<str>,
241        B: RelayBuilder + Send + 'static,
242    {
243        self.state_mut().add_relay(component_id, builder)?;
244        Ok(self)
245    }
246
247    /// Adds a decoder component to the blueprint.
248    ///
249    /// # Errors
250    ///
251    /// If the component ID is invalid or the component can't be added to the graph, an error is returned.
252    pub fn add_decoder<I, B>(&mut self, component_id: I, builder: B) -> Result<&mut Self, GenericError>
253    where
254        I: AsRef<str>,
255        B: DecoderBuilder + Send + 'static,
256    {
257        self.state_mut().add_decoder(component_id, builder)?;
258        Ok(self)
259    }
260
261    /// Adds a transform component to the blueprint.
262    ///
263    /// # Errors
264    ///
265    /// If the component ID is invalid or the component can't be added to the graph, an error is returned.
266    pub fn add_transform<I, B>(&mut self, component_id: I, builder: B) -> Result<&mut Self, GenericError>
267    where
268        I: AsRef<str>,
269        B: TransformBuilder + Send + 'static,
270    {
271        self.state_mut().add_transform(component_id, builder)?;
272        Ok(self)
273    }
274
275    /// Adds a destination component to the blueprint.
276    ///
277    /// # Errors
278    ///
279    /// If the component ID is invalid or the component can't be added to the graph, an error is returned.
280    pub fn add_destination<I, B>(&mut self, component_id: I, builder: B) -> Result<&mut Self, GenericError>
281    where
282        I: AsRef<str>,
283        B: DestinationBuilder + Send + 'static,
284    {
285        self.state_mut().add_destination(component_id, builder)?;
286        Ok(self)
287    }
288
289    /// Adds an encoder component to the blueprint.
290    ///
291    /// # Errors
292    ///
293    /// If the component ID is invalid or the component can't be added to the graph, an error is returned.
294    pub fn add_encoder<I, B>(&mut self, component_id: I, builder: B) -> Result<&mut Self, GenericError>
295    where
296        I: AsRef<str>,
297        B: EncoderBuilder + Send + 'static,
298    {
299        self.state_mut().add_encoder(component_id, builder)?;
300        Ok(self)
301    }
302
303    /// Adds a forwarder component to the blueprint.
304    ///
305    /// # Errors
306    ///
307    /// If the component ID is invalid or the component can't be added to the graph, an error is returned.
308    pub fn add_forwarder<I, B>(&mut self, component_id: I, builder: B) -> Result<&mut Self, GenericError>
309    where
310        I: AsRef<str>,
311        B: ForwarderBuilder + Send + 'static,
312    {
313        self.state_mut().add_forwarder(component_id, builder)?;
314        Ok(self)
315    }
316
317    /// Connects one or more upstream component outputs to one or more downstream components.
318    ///
319    /// This method allows for ergonomically defining many-to-one, one-to-many, and many-to-many connections to
320    /// facilitate common patterns like fanning in many upstream components to a single downstream component, or fanning
321    /// out a single upstream component to many downstream components.
322    ///
323    /// When both there are both multiple upstream _and_ downstream component IDs, connections resemble a mesh: every
324    /// upstream component will be connected to every downstream component. This should be rare, but is technically
325    /// supported.
326    ///
327    /// # Errors
328    ///
329    /// If any of the upstream or downstream component IDs are invalid or don't exist, or if the data types between one
330    /// of the upstream/downstream component pairs is incompatible, an error is returned.
331    pub fn connect_components<MS, SI, MD, DI>(
332        &mut self, upstream_output_component_ids: SI, downstream_component_ids: DI,
333    ) -> Result<&mut Self, GenericError>
334    where
335        SI: AsComponentIds<MS>,
336        DI: AsComponentIds<MD>,
337    {
338        self.state_mut()
339            .connect_components(upstream_output_component_ids, downstream_component_ids)?;
340        Ok(self)
341    }
342
343    /// Connects a set of component IDs to one another in a pairwise fashion.
344    ///
345    /// This can be used to connect multiple components -- each sharing only a single edge between one another -- in a
346    /// single call instead of multiple calls.
347    ///
348    /// For example, passing `["first", "second", "third"]` would connect `first`'s output to `second`'s input, and
349    /// `second`'s output to `third`'s input.
350    ///
351    /// One caveat is that only the default output of a component can be used for connections past the first pair, as
352    /// the identifier given must be able to describe both the component ID to _send_ to as well as the component output
353    /// ID to connect to the subsequent component. This limitation does not exist on the first component ID, since it is
354    /// only used in the context of being a component output ID.
355    ///
356    /// # Errors
357    ///
358    /// If any of the component IDs are invalid or don't exist, or if the data types between one of the
359    /// upstream/downstream component pairs is incompatible, or if less than two component IDs are provided, an error is
360    /// returned.
361    ///
362    /// Care should be taken on failure as this method will not rollback any previously successful connections, which
363    /// could leave the blueprint in an indeterminate state if some connections are made prior to hitting an error.
364    pub fn connect_components_in_order<IT, I>(&mut self, ordered_component_ids: IT) -> Result<&mut Self, GenericError>
365    where
366        IT: IntoIterator<Item = I>,
367        I: AsRef<str>,
368    {
369        self.state_mut().connect_components_in_order(ordered_component_ids)?;
370        Ok(self)
371    }
372}
373
374/// A handle for awaiting the readiness of a running topology.
375pub struct TopologyReady {
376    registered_rx: oneshot::Receiver<()>,
377    health_registry: HealthRegistry,
378    component_prefix: String,
379}
380
381impl TopologyReady {
382    /// Waits until the topology has registered its components and all of them have reported ready.
383    ///
384    /// Returns `true` once the topology is fully ready, or `false` if the topology was torn down before it finished
385    /// registering its components. The topology might be torn down before readiness is achieved if shutdown is
386    /// requested while still waiting on an upstream dependency such as the environment provider.
387    pub async fn wait(self) -> bool {
388        // First, wait for the topology to actually register its components in the health registry.
389        //
390        // If we didn't do this, we could observe `all_ready_matching` return immediately (due to no matching components)
391        // which would not correctly represent the topology being ready.
392        if self.registered_rx.await.is_err() {
393            return false;
394        }
395
396        // Now wait for all registered topology components to actually become ready.
397        self.health_registry
398            .all_ready_matching(|name| name.starts_with(&self.component_prefix))
399            .await;
400
401        true
402    }
403}
404
405impl TopologyBuildState {
406    fn set_interconnect_capacity(&mut self, capacity: NonZeroUsize) {
407        self.interconnect_capacity = capacity;
408        self.recalculate_bounds();
409    }
410
411    fn recalculate_bounds(&mut self) {
412        let interconnect_capacity = self.interconnect_capacity.get();
413
414        let mut bounds_builder = self.component_registry.bounds_builder();
415        let mut bounds_builder = bounds_builder.subcomponent("interconnects");
416        bounds_builder.reset();
417
418        // Adjust the bounds related to interconnects.
419        //
420        // This deals with the minimum size of the interconnects themselves, since they're bounded and thus allocated
421        // up-front. Every non-source component has an interconnect.
422        let total_interconnect_capacity = interconnect_capacity * (self.transforms.len() + self.destinations.len());
423        bounds_builder
424            .minimum()
425            .with_array::<EventsBuffer>("events", total_interconnect_capacity);
426
427        // TODO: Add a minimum subitem for payloads when we have payload interconnects.
428
429        // Adjust the bounds related to event buffers themselves.
430        //
431        // We calculate the maximum number of event buffers by adding up the total capacity of all non-source components, plus the count
432        // of non-destination components. This is the effective upper bound because once all component channels are full, sending
433        // components can only allocate one more event buffer before being blocked on sending, which is then the effective upper bound.
434        //
435        // TODO: Somewhat fragile. Need to revisit this.
436        // TODO: Add a firm subitem for payloads when we have payload interconnects.
437        let max_in_flight_event_buffers = ((self.transforms.len() + self.destinations.len()) * interconnect_capacity)
438            + self.sources.len()
439            + self.decoders.len()
440            + self.transforms.len();
441
442        bounds_builder
443            .firm()
444            // max_in_flight_event_buffers * (size_of<EventsContainer> + (size_of<Event> * default_event_buffer_capacity))
445            .with_expr(UsageExpr::product(
446                "events",
447                UsageExpr::constant("max in-flight event buffers", max_in_flight_event_buffers),
448                UsageExpr::sum(
449                    "",
450                    UsageExpr::struct_size::<EventsBuffer>("events buffer"),
451                    UsageExpr::product(
452                        "",
453                        UsageExpr::struct_size::<Event>("event"),
454                        UsageExpr::constant("default event buffer capacity", DEFAULT_EVENTS_BUFFER_CAPACITY),
455                    ),
456                ),
457            ));
458    }
459
460    fn add_source<I, B>(&mut self, component_id: I, builder: B) -> Result<(), GenericError>
461    where
462        I: AsRef<str>,
463        B: SourceBuilder + Send + 'static,
464    {
465        let component_id = self
466            .graph
467            .add_source(component_id, &builder)
468            .error_context("Failed to add source to topology graph.")?;
469
470        let mut source_registry = self
471            .component_registry
472            .get_or_create(format!("components.sources.{}", component_id));
473        let mut bounds_builder = source_registry.bounds_builder();
474        builder.specify_bounds(&mut bounds_builder);
475
476        self.recalculate_bounds();
477
478        let _ = self.sources.insert(
479            component_id,
480            RegisteredComponent::new(Box::new(builder), source_registry),
481        );
482
483        Ok(())
484    }
485
486    fn add_relay<I, B>(&mut self, component_id: I, builder: B) -> Result<(), GenericError>
487    where
488        I: AsRef<str>,
489        B: RelayBuilder + Send + 'static,
490    {
491        let component_id = self
492            .graph
493            .add_relay(component_id, &builder)
494            .error_context("Failed to add relay to topology graph.")?;
495
496        let mut relay_registry = self
497            .component_registry
498            .get_or_create(format!("components.relays.{}", component_id));
499        let mut bounds_builder = relay_registry.bounds_builder();
500        builder.specify_bounds(&mut bounds_builder);
501
502        self.recalculate_bounds();
503
504        let _ = self.relays.insert(
505            component_id,
506            RegisteredComponent::new(Box::new(builder), relay_registry),
507        );
508
509        Ok(())
510    }
511
512    fn add_decoder<I, B>(&mut self, component_id: I, builder: B) -> Result<(), GenericError>
513    where
514        I: AsRef<str>,
515        B: DecoderBuilder + Send + 'static,
516    {
517        let component_id = self
518            .graph
519            .add_decoder(component_id, &builder)
520            .error_context("Failed to add decoder to topology graph.")?;
521
522        let mut decoder_registry = self
523            .component_registry
524            .get_or_create(format!("components.decoders.{}", component_id));
525        let mut bounds_builder = decoder_registry.bounds_builder();
526        builder.specify_bounds(&mut bounds_builder);
527
528        self.recalculate_bounds();
529
530        let _ = self.decoders.insert(
531            component_id,
532            RegisteredComponent::new(Box::new(builder), decoder_registry),
533        );
534
535        Ok(())
536    }
537
538    fn add_transform<I, B>(&mut self, component_id: I, builder: B) -> Result<(), GenericError>
539    where
540        I: AsRef<str>,
541        B: TransformBuilder + Send + 'static,
542    {
543        let component_id = self
544            .graph
545            .add_transform(component_id, &builder)
546            .error_context("Failed to add transform to topology graph.")?;
547
548        let mut transform_registry = self
549            .component_registry
550            .get_or_create(format!("components.transforms.{}", component_id));
551        let mut bounds_builder = transform_registry.bounds_builder();
552        builder.specify_bounds(&mut bounds_builder);
553
554        self.recalculate_bounds();
555
556        let _ = self.transforms.insert(
557            component_id,
558            RegisteredComponent::new(Box::new(builder), transform_registry),
559        );
560
561        Ok(())
562    }
563
564    fn add_destination<I, B>(&mut self, component_id: I, builder: B) -> Result<(), GenericError>
565    where
566        I: AsRef<str>,
567        B: DestinationBuilder + Send + 'static,
568    {
569        let component_id = self
570            .graph
571            .add_destination(component_id, &builder)
572            .error_context("Failed to add destination to topology graph.")?;
573
574        let mut destination_registry = self
575            .component_registry
576            .get_or_create(format!("components.destinations.{}", component_id));
577        let mut bounds_builder = destination_registry.bounds_builder();
578        builder.specify_bounds(&mut bounds_builder);
579
580        self.recalculate_bounds();
581
582        let _ = self.destinations.insert(
583            component_id,
584            RegisteredComponent::new(Box::new(builder), destination_registry),
585        );
586
587        Ok(())
588    }
589
590    fn add_encoder<I, B>(&mut self, component_id: I, builder: B) -> Result<(), GenericError>
591    where
592        I: AsRef<str>,
593        B: EncoderBuilder + Send + 'static,
594    {
595        let component_id = self
596            .graph
597            .add_encoder(component_id, &builder)
598            .error_context("Failed to add encoder to topology graph.")?;
599
600        let mut encoder_registry = self
601            .component_registry
602            .get_or_create(format!("components.encoders.{}", component_id));
603        let mut bounds_builder = encoder_registry.bounds_builder();
604        builder.specify_bounds(&mut bounds_builder);
605
606        self.recalculate_bounds();
607
608        let _ = self.encoders.insert(
609            component_id,
610            RegisteredComponent::new(Box::new(builder), encoder_registry),
611        );
612
613        Ok(())
614    }
615
616    fn add_forwarder<I, B>(&mut self, component_id: I, builder: B) -> Result<(), GenericError>
617    where
618        I: AsRef<str>,
619        B: ForwarderBuilder + Send + 'static,
620    {
621        let component_id = self
622            .graph
623            .add_forwarder(component_id, &builder)
624            .error_context("Failed to add forwarder to topology graph.")?;
625
626        let mut forwarder_registry = self
627            .component_registry
628            .get_or_create(format!("components.forwarders.{}", component_id));
629        let mut bounds_builder = forwarder_registry.bounds_builder();
630        builder.specify_bounds(&mut bounds_builder);
631
632        self.recalculate_bounds();
633
634        let _ = self.forwarders.insert(
635            component_id,
636            RegisteredComponent::new(Box::new(builder), forwarder_registry),
637        );
638
639        Ok(())
640    }
641
642    fn connect_components<MS, SI, MD, DI>(
643        &mut self, upstream_output_component_ids: SI, downstream_component_ids: DI,
644    ) -> Result<(), GenericError>
645    where
646        SI: AsComponentIds<MS>,
647        DI: AsComponentIds<MD>,
648    {
649        for upstream_output_component_id in upstream_output_component_ids.as_component_ids() {
650            for downstream_component_id in downstream_component_ids.as_component_ids() {
651                self.graph
652                    .add_edge(upstream_output_component_id.as_ref(), downstream_component_id.as_ref())
653                    .error_context("Failed to add component connection to topology graph.")?;
654            }
655        }
656
657        Ok(())
658    }
659
660    fn connect_components_in_order<IT, I>(&mut self, ordered_component_ids: IT) -> Result<(), GenericError>
661    where
662        IT: IntoIterator<Item = I>,
663        I: AsRef<str>,
664    {
665        let mut pending_output_component_id: Option<I> = None;
666        let mut connected_any = false;
667
668        for component_id in ordered_component_ids.into_iter() {
669            if let Some(output_component_id) = pending_output_component_id.take() {
670                self.graph
671                    .add_edge(output_component_id.as_ref(), component_id.as_ref())
672                    .error_context("Failed to add component connection to topology graph.")?;
673
674                connected_any = true;
675            }
676
677            // Store the _current_ component ID so we can chain its connection to the next component, and so on.
678            pending_output_component_id = Some(component_id);
679        }
680
681        // Make sure we connected at least one pair of components together, otherwise this is an invalid connection attempt.
682        if !connected_any {
683            return Err(generic_error!(
684                "Two or more components must be provided for connection."
685            ));
686        }
687
688        Ok(())
689    }
690
691    /// Builds the topology.
692    ///
693    /// # Errors
694    ///
695    /// If any of the components couldn't be built, an error is returned.
696    async fn build(mut self, name: String) -> Result<BuiltTopology, GenericError> {
697        self.graph.validate().error_context("Failed to build topology graph.")?;
698
699        let mut sources = HashMap::new();
700        for (id, builder) in self.sources {
701            let (builder, mut component_registry) = builder.into_parts();
702            let allocation_token = component_registry.token();
703
704            let component_context = ComponentContext::source(id.clone());
705            let source = builder
706                .build(component_context)
707                .track_resources(allocation_token)
708                .await
709                .with_error_context(|| format!("Failed to build source '{}'.", id))?;
710
711            sources.insert(
712                id,
713                RegisteredComponent::new(source.track_resources(allocation_token), component_registry),
714            );
715        }
716
717        let mut relays = HashMap::new();
718        for (id, builder) in self.relays {
719            let (builder, mut component_registry) = builder.into_parts();
720            let allocation_token = component_registry.token();
721
722            let component_context = ComponentContext::relay(id.clone());
723            let relay = builder
724                .build(component_context)
725                .track_resources(allocation_token)
726                .await
727                .with_error_context(|| format!("Failed to build relay '{}'.", id))?;
728
729            relays.insert(
730                id,
731                RegisteredComponent::new(relay.track_resources(allocation_token), component_registry),
732            );
733        }
734
735        let mut decoders = HashMap::new();
736        for (id, builder) in self.decoders {
737            let (builder, mut component_registry) = builder.into_parts();
738            let allocation_token = component_registry.token();
739
740            let component_context = ComponentContext::decoder(id.clone());
741            let decoder = builder
742                .build(component_context)
743                .track_resources(allocation_token)
744                .await
745                .with_error_context(|| format!("Failed to build decoder '{}'.", id))?;
746
747            decoders.insert(
748                id,
749                RegisteredComponent::new(decoder.track_resources(allocation_token), component_registry),
750            );
751        }
752
753        let mut transforms = HashMap::new();
754        for (id, builder) in self.transforms {
755            let (builder, mut component_registry) = builder.into_parts();
756            let allocation_token = component_registry.token();
757
758            let component_context = ComponentContext::transform(id.clone());
759            let transform = builder
760                .build(component_context)
761                .track_resources(allocation_token)
762                .await
763                .with_error_context(|| format!("Failed to build transform '{}'.", id))?;
764
765            transforms.insert(
766                id,
767                RegisteredComponent::new(transform.track_resources(allocation_token), component_registry),
768            );
769        }
770
771        let mut destinations = HashMap::new();
772        for (id, builder) in self.destinations {
773            let (builder, mut component_registry) = builder.into_parts();
774            let allocation_token = component_registry.token();
775
776            let component_context = ComponentContext::destination(id.clone());
777            let destination = builder
778                .build(component_context)
779                .track_resources(allocation_token)
780                .await
781                .with_error_context(|| format!("Failed to build destination '{}'.", id))?;
782
783            destinations.insert(
784                id,
785                RegisteredComponent::new(destination.track_resources(allocation_token), component_registry),
786            );
787        }
788
789        let mut encoders = HashMap::new();
790        for (id, builder) in self.encoders {
791            let (builder, mut component_registry) = builder.into_parts();
792            let allocation_token = component_registry.token();
793
794            let component_context = ComponentContext::encoder(id.clone());
795            let encoder = builder
796                .build(component_context)
797                .track_resources(allocation_token)
798                .await
799                .with_error_context(|| format!("Failed to build encoder '{}'.", id))?;
800
801            encoders.insert(
802                id,
803                RegisteredComponent::new(encoder.track_resources(allocation_token), component_registry),
804            );
805        }
806
807        let mut forwarders = HashMap::new();
808        for (id, builder) in self.forwarders {
809            let (builder, mut component_registry) = builder.into_parts();
810            let allocation_token = component_registry.token();
811
812            let component_context = ComponentContext::forwarder(id.clone());
813            let forwarder = builder
814                .build(component_context)
815                .track_resources(allocation_token)
816                .await
817                .with_error_context(|| format!("Failed to build forwarder '{}'.", id))?;
818
819            forwarders.insert(
820                id,
821                RegisteredComponent::new(forwarder.track_resources(allocation_token), component_registry),
822            );
823        }
824
825        Ok(BuiltTopology::from_parts(
826            name,
827            self.graph,
828            sources,
829            relays,
830            decoders,
831            transforms,
832            destinations,
833            encoders,
834            forwarders,
835            self.component_registry.token(),
836            self.interconnect_capacity,
837            self.worker_pool_config,
838        ))
839    }
840}
841
842#[async_trait]
843impl Supervisable for TopologyBlueprint {
844    fn name(&self) -> &str {
845        &self.name
846    }
847
848    fn shutdown_strategy(&self) -> ShutdownStrategy {
849        // Set an infinitely long (effectively) graceful shutdown timeout because we enforce our _own_ realistic graceful
850        // shutdown as part of the supervisor future we generate.
851        ShutdownStrategy::Graceful(Duration::MAX)
852    }
853
854    async fn initialize(&self, shutdown: ShutdownHandle) -> Result<SupervisorFuture, InitializationError> {
855        // Consume the build state.
856        //
857        // Topologies currently can't be initialized more than once.
858        let mut build_state = self
859            .build_state
860            .lock()
861            .expect("topology blueprint mutex poisoned")
862            .take()
863            .ok_or_else(|| generic_error!("Topology has already been initialized and cannot be run more than once."))?;
864
865        let health_registry = self
866            .health_registry
867            .clone()
868            .ok_or_else(|| generic_error!("Topology blueprint is missing its health registry."))?;
869        let memory_limiter = self
870            .memory_limiter
871            .clone()
872            .ok_or_else(|| generic_error!("Topology blueprint is missing its memory limiter."))?;
873
874        let dataspace = DataspaceRegistry::try_current()
875            .ok_or_else(|| generic_error!("Topology must be initialized within a supervised process context."))?;
876
877        // Build our topology components.
878        //
879        // This creates the topology components but does not actually spawn them or run them in any way.
880        //
881        // We do this outside of the supervisor future to ensure that we fail during initialization, which bubbles up as
882        // a non-restartable error that ultimately leads to the process exiting. This is the desired behavior at present
883        // time, but maybe change in the future.
884        let environment_ready = build_state.environment_ready.take();
885        let ready_signal = build_state.ready_signal.take();
886        let shutdown_timeout = build_state.shutdown_timeout;
887        let built = build_state.build(self.name.clone()).await?;
888
889        Ok(Box::pin(async move {
890            pin!(shutdown);
891
892            // If a readiness signal was provided, wait for it before spawning the components, but remain responsive to
893            // shutdown so we exit promptly if asked to stop before we've started.
894            if let Some(environment_ready) = environment_ready {
895                select! {
896                    _ = &mut shutdown => return Ok(()),
897                    _ = environment_ready => {},
898                }
899            }
900
901            let mut running = built.spawn_inner(&health_registry, memory_limiter, dataspace).await?;
902
903            // Signal that the topology has registered all of its components in the health registry, so any readiness
904            // handle can begin waiting on those components. We send this only after `spawn_inner` so readiness can't be
905            // observed before the topology's components exist.
906            if let Some(ready_signal) = ready_signal {
907                let _ = ready_signal.send(());
908            }
909
910            let mut topology_failed = false;
911            select! {
912                // The supervisor requested shutdown.
913                _ = &mut shutdown => {
914                    info!("Topology received shutdown signal. Shutting down...");
915                },
916
917                // A component finished before shutdown was requested, which we treat as a failure of the topology.
918                _ = running.wait_for_unexpected_finish() => {
919                    error!("Topology component unexpectedly finished. Shutting down...");
920                    topology_failed = true;
921                },
922            }
923
924            // Trigger graceful shutdown and wait for all components to stop.
925            let shutdown_result = running.shutdown_with_timeout(shutdown_timeout).await;
926            match (shutdown_result, topology_failed) {
927                (Ok(()), false) => Ok(()),
928                (Ok(()), true) => Err(generic_error!(
929                    "Topology shut down after a component unexpectedly finished."
930                )),
931                (Err(e), _) => Err(e),
932            }
933        }))
934    }
935}
936
937#[cfg(test)]
938mod tests {
939    use std::time::Duration;
940
941    use resource_accounting::{ComponentRegistry, MemoryLimiter};
942    use tokio::sync::oneshot;
943
944    use super::{TopologyBlueprint, TopologyReady};
945    use crate::{
946        data_model::event::EventType,
947        health::HealthRegistry,
948        runtime::{RestartMode, RestartStrategy, Supervisor, SupervisorError},
949        topology::test_util::{TestDestinationBuilder, TestSourceBuilder, TestTransformBuilder},
950    };
951
952    /// Builds a blueprint pre-populated with a source, transform, and destination, all dealing in event-D events.
953    ///
954    /// No connections are made between the components.
955    fn blueprint_with_components() -> TopologyBlueprint {
956        let component_registry = ComponentRegistry::default();
957        let mut blueprint = TopologyBlueprint::new("test", &component_registry);
958
959        blueprint
960            .add_source("source", TestSourceBuilder::default_output(EventType::EventD))
961            .expect("should not fail to add source")
962            .add_transform(
963                "transform",
964                TestTransformBuilder::default_output(EventType::EventD, EventType::EventD),
965            )
966            .expect("should not fail to add transform")
967            .add_destination(
968                "destination",
969                TestDestinationBuilder::with_input_type(EventType::EventD),
970            )
971            .expect("should not fail to add destination");
972
973        blueprint
974    }
975
976    /// Builds a blueprint pre-populated with the given source and destination component IDs, all dealing in event-D
977    /// events.
978    ///
979    /// No connections are made between the components.
980    fn blueprint_with_sources_and_destinations(source_ids: &[&str], destination_ids: &[&str]) -> TopologyBlueprint {
981        let component_registry = ComponentRegistry::default();
982        let mut blueprint = TopologyBlueprint::new("test", &component_registry);
983
984        for source_id in source_ids {
985            blueprint
986                .add_source(*source_id, TestSourceBuilder::default_output(EventType::EventD))
987                .expect("should not fail to add source");
988        }
989
990        for destination_id in destination_ids {
991            blueprint
992                .add_destination(
993                    *destination_id,
994                    TestDestinationBuilder::with_input_type(EventType::EventD),
995                )
996                .expect("should not fail to add destination");
997        }
998
999        blueprint
1000    }
1001
1002    /// Collects the blueprint's directed connections as a sorted list of `(from, to)` component ID pairs.
1003    fn connected_pairs(blueprint: &TopologyBlueprint) -> Vec<(String, String)> {
1004        let guard = blueprint.build_state.lock().expect("topology blueprint mutex poisoned");
1005        let outbound_edges = guard
1006            .as_ref()
1007            .expect("topology blueprint already initialized")
1008            .graph
1009            .get_outbound_directed_edges();
1010
1011        let mut pairs = Vec::new();
1012        for (from, outputs) in &outbound_edges {
1013            for targets in outputs.values() {
1014                for to in targets {
1015                    pairs.push((from.component_id().to_string(), to.component_id().to_string()));
1016                }
1017            }
1018        }
1019        pairs.sort();
1020        pairs
1021    }
1022
1023    #[test]
1024    fn connect_components_in_order_errors_with_fewer_than_two_ids() {
1025        let mut blueprint = blueprint_with_components();
1026
1027        // No component IDs at all.
1028        let result = blueprint.connect_components_in_order(Vec::<&str>::new()).map(|_| ());
1029        assert!(result.is_err());
1030
1031        // A single component ID is still not enough to form a connection.
1032        let result = blueprint.connect_components_in_order(["source"]).map(|_| ());
1033        assert!(result.is_err());
1034
1035        // Neither attempt should have added any connections to the graph.
1036        assert!(connected_pairs(&blueprint).is_empty());
1037    }
1038
1039    #[test]
1040    fn connect_components_in_order_connects_pairwise_left_to_right() {
1041        let mut blueprint = blueprint_with_components();
1042
1043        blueprint
1044            .connect_components_in_order(["source", "transform", "destination"])
1045            .expect("should not fail to connect components in order");
1046
1047        // Adjacent components should be connected from left to right (`source` -> `transform` -> `destination`), with
1048        // a single edge shared between each pair.
1049        assert_eq!(
1050            connected_pairs(&blueprint),
1051            vec![
1052                ("source".to_string(), "transform".to_string()),
1053                ("transform".to_string(), "destination".to_string()),
1054            ],
1055        );
1056    }
1057
1058    #[test]
1059    fn connect_component_one_to_many_fans_out() {
1060        // A single upstream component is fanned out to multiple downstream components. The upstream ID is given as a
1061        // bare string (`Single`), while the downstream IDs are given as a slice (`Multiple`).
1062        let mut blueprint = blueprint_with_sources_and_destinations(&["source"], &["dest_a", "dest_b"]);
1063
1064        blueprint
1065            .connect_components("source", ["dest_a", "dest_b"])
1066            .expect("should not fail to connect component");
1067
1068        assert_eq!(
1069            connected_pairs(&blueprint),
1070            vec![
1071                ("source".to_string(), "dest_a".to_string()),
1072                ("source".to_string(), "dest_b".to_string()),
1073            ],
1074        );
1075    }
1076
1077    #[test]
1078    fn connect_component_many_to_one_fans_in() {
1079        // Multiple upstream components are fanned in to a single downstream component. The upstream IDs are given as a
1080        // slice (`Multiple`), while the downstream ID is given as a bare string (`Single`).
1081        let mut blueprint = blueprint_with_sources_and_destinations(&["source_a", "source_b"], &["dest"]);
1082
1083        blueprint
1084            .connect_components(["source_a", "source_b"], "dest")
1085            .expect("should not fail to connect component");
1086
1087        assert_eq!(
1088            connected_pairs(&blueprint),
1089            vec![
1090                ("source_a".to_string(), "dest".to_string()),
1091                ("source_b".to_string(), "dest".to_string()),
1092            ],
1093        );
1094    }
1095
1096    #[test]
1097    fn connect_component_many_to_many_creates_mesh() {
1098        // Multiple upstream components are meshed with multiple downstream components: every upstream component is
1099        // connected to every downstream component. Both sides are given as slices (`Multiple`).
1100        let mut blueprint = blueprint_with_sources_and_destinations(&["source_a", "source_b"], &["dest_a", "dest_b"]);
1101
1102        blueprint
1103            .connect_components(["source_a", "source_b"], ["dest_a", "dest_b"])
1104            .expect("should not fail to connect component");
1105
1106        assert_eq!(
1107            connected_pairs(&blueprint),
1108            vec![
1109                ("source_a".to_string(), "dest_a".to_string()),
1110                ("source_a".to_string(), "dest_b".to_string()),
1111                ("source_b".to_string(), "dest_a".to_string()),
1112                ("source_b".to_string(), "dest_b".to_string()),
1113            ],
1114        );
1115    }
1116
1117    /// Builds a connected `source` -> `transform` -> `destination` blueprint using the immediate-exit test components.
1118    fn connected_blueprint() -> TopologyBlueprint {
1119        let mut blueprint = blueprint_with_components();
1120        blueprint
1121            .connect_components_in_order(["source", "transform", "destination"])
1122            .expect("should not fail to connect components");
1123        blueprint
1124    }
1125
1126    #[tokio::test]
1127    async fn topology_failure_shuts_down_supervisor() {
1128        // The test components all finish immediately, which the topology worker treats as an unexpected component
1129        // finish -- a topology failure. With a restart intensity of zero, that must fail the supervisor (and, in the
1130        // real binary, exit the process).
1131        let mut blueprint = connected_blueprint();
1132        blueprint
1133            .with_health_registry(HealthRegistry::new())
1134            .with_memory_limiter(MemoryLimiter::noop());
1135
1136        let mut supervisor = Supervisor::new("test-topology")
1137            .expect("should not fail to create supervisor")
1138            .with_restart_strategy(RestartStrategy::new(RestartMode::OneForOne, 0, Duration::from_secs(5)));
1139        supervisor.add_worker(blueprint);
1140
1141        let (_tx, rx) = oneshot::channel::<()>();
1142        let result = tokio::time::timeout(Duration::from_secs(5), supervisor.run_with_shutdown(rx))
1143            .await
1144            .expect("supervisor should exit promptly");
1145
1146        assert!(matches!(result, Err(SupervisorError::Shutdown)));
1147    }
1148
1149    #[tokio::test]
1150    async fn topology_cannot_be_initialized_more_than_once() {
1151        // A topology can only be initialized once. Under the default restart strategy (which allows one restart), the
1152        // topology fails at runtime (components finish immediately), the supervisor attempts to restart it, and the
1153        // second initialization fails because the blueprint's build state was already consumed. That surfaces as a
1154        // non-restartable initialization failure.
1155        let mut blueprint = connected_blueprint();
1156        blueprint
1157            .with_health_registry(HealthRegistry::new())
1158            .with_memory_limiter(MemoryLimiter::noop());
1159
1160        let mut supervisor = Supervisor::new("test-topology").expect("should not fail to create supervisor");
1161        supervisor.add_worker(blueprint);
1162
1163        let (_tx, rx) = oneshot::channel::<()>();
1164        let result = tokio::time::timeout(Duration::from_secs(5), supervisor.run_with_shutdown(rx))
1165            .await
1166            .expect("supervisor should exit promptly");
1167
1168        assert!(matches!(result, Err(SupervisorError::FailedToInitialize { .. })));
1169    }
1170
1171    #[tokio::test]
1172    async fn topology_waits_for_environment_readiness_before_starting() {
1173        // The topology must not start its components until the environment readiness signal resolves. We provide a
1174        // signal that never resolves, then trigger shutdown: the topology should exit cleanly without ever spawning
1175        // its components (which would otherwise finish immediately and fail the supervisor), and the supervisor should
1176        // shut down successfully.
1177        let mut blueprint = connected_blueprint();
1178        blueprint
1179            .with_health_registry(HealthRegistry::new())
1180            .with_memory_limiter(MemoryLimiter::noop())
1181            .with_environment_readiness(std::future::pending::<()>());
1182
1183        let mut supervisor = Supervisor::new("test-topology").expect("should not fail to create supervisor");
1184        supervisor.add_worker(blueprint);
1185
1186        let (tx, rx) = oneshot::channel::<()>();
1187        let handle = tokio::spawn(async move { supervisor.run_with_shutdown(rx).await });
1188
1189        // Give the supervisor a moment to start and reach the readiness gate, then trigger shutdown.
1190        tokio::time::sleep(Duration::from_millis(50)).await;
1191        tx.send(()).expect("should send shutdown signal");
1192
1193        let result = tokio::time::timeout(Duration::from_secs(5), handle)
1194            .await
1195            .expect("supervisor should exit promptly")
1196            .expect("supervisor task should not panic");
1197
1198        assert!(result.is_ok(), "supervisor should shut down cleanly, got: {:?}", result);
1199    }
1200
1201    #[test]
1202    fn topology_ready_waits_for_registration_before_checking_readiness() {
1203        use tokio_test::{assert_pending, assert_ready, task::spawn};
1204
1205        let health_registry = HealthRegistry::new();
1206
1207        // Simulate an unrelated subsystem that has already registered and become ready. A naive readiness check against
1208        // the shared registry could resolve immediately here, even though the topology hasn't registered anything yet.
1209        let mut other = health_registry
1210            .register_component("env_provider.workload.foo")
1211            .expect("should register component");
1212        other.mark_ready();
1213
1214        let (registered_tx, registered_rx) = oneshot::channel();
1215        let topology_ready = TopologyReady {
1216            registered_rx,
1217            health_registry: health_registry.clone(),
1218            component_prefix: "topology.primary.".to_string(),
1219        };
1220
1221        let mut wait = spawn(topology_ready.wait());
1222
1223        // Despite no topology components being registered yet, `wait` must not resolve: it's gated on the registration
1224        // signal, which is precisely what prevents a false-ready observation.
1225        assert_pending!(wait.poll());
1226
1227        // Now register a topology component (as the topology does when it spawns), but leave it not-ready.
1228        let mut source = health_registry
1229            .register_component("topology.primary.sources.in")
1230            .expect("should register component");
1231
1232        // Fire the registration signal. `wait` advances to the scoped readiness check, which is still pending because
1233        // the topology component hasn't reported ready.
1234        registered_tx.send(()).expect("receiver should be alive");
1235        assert_pending!(wait.poll());
1236
1237        // Once the topology component reports ready, `wait` resolves to `true`.
1238        source.mark_ready();
1239        assert!(assert_ready!(wait.poll()));
1240    }
1241
1242    #[tokio::test]
1243    async fn topology_ready_returns_false_when_torn_down_before_registration() {
1244        let health_registry = HealthRegistry::new();
1245
1246        let (registered_tx, registered_rx) = oneshot::channel::<()>();
1247        let topology_ready = TopologyReady {
1248            registered_rx,
1249            health_registry,
1250            component_prefix: "topology.primary.".to_string(),
1251        };
1252
1253        // Drop the sender without ever signaling, as happens when the topology is torn down before it registers its
1254        // components. `wait` should report that readiness will never be reached.
1255        drop(registered_tx);
1256
1257        assert!(!topology_ready.wait().await);
1258    }
1259}