1use std::{collections::HashMap, future::Future, num::NonZeroUsize, pin::Pin, sync::Mutex, time::Duration};
2
3use async_trait::async_trait;
4use resource_accounting::{ComponentRegistry, MemoryLimiter, Track as _, UsageExpr};
5use saluki_common::sync::shutdown::ShutdownHandle;
6use saluki_error::{generic_error, ErrorContext as _, GenericError};
7use snafu::Snafu;
8use tokio::{pin, runtime::Handle, select, sync::oneshot};
9use tracing::{error, info};
10
11use super::{
12 built::{BuiltTopology, WorkerPoolConfiguration},
13 graph::{Graph, GraphError},
14 ComponentId, RegisteredComponent,
15};
16use crate::{
17 components::{
18 decoders::DecoderBuilder, destinations::DestinationBuilder, encoders::EncoderBuilder,
19 forwarders::ForwarderBuilder, relays::RelayBuilder, sources::SourceBuilder, transforms::TransformBuilder,
20 ComponentContext,
21 },
22 data_model::event::Event,
23 health::HealthRegistry,
24 runtime::{state::DataspaceRegistry, InitializationError, ShutdownStrategy, Supervisable, SupervisorFuture},
25 topology::{ids::AsComponentIds, EventsBuffer, DEFAULT_EVENTS_BUFFER_CAPACITY},
26};
27
28const DEFAULT_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(30);
29
30#[derive(Debug, Snafu)]
32#[snafu(context(suffix(false)))]
33pub enum BlueprintError {
34 #[snafu(display("Failed to build/validate topology graph: {}", source))]
36 InvalidGraph {
37 source: GraphError,
39 },
40
41 #[snafu(display("Failed to build component '{}': {}", id, source))]
43 FailedToBuildComponent {
44 id: ComponentId,
46
47 source: GenericError,
49 },
50}
51
52pub struct TopologyBlueprint {
58 name: String,
59 build_state: Mutex<Option<TopologyBuildState>>,
60 health_registry: Option<HealthRegistry>,
61 memory_limiter: Option<MemoryLimiter>,
62}
63
64struct TopologyBuildState {
68 graph: Graph,
69 sources: HashMap<ComponentId, RegisteredComponent<Box<dyn SourceBuilder + Send>>>,
70 relays: HashMap<ComponentId, RegisteredComponent<Box<dyn RelayBuilder + Send>>>,
71 decoders: HashMap<ComponentId, RegisteredComponent<Box<dyn DecoderBuilder + Send>>>,
72 transforms: HashMap<ComponentId, RegisteredComponent<Box<dyn TransformBuilder + Send>>>,
73 destinations: HashMap<ComponentId, RegisteredComponent<Box<dyn DestinationBuilder + Send>>>,
74 encoders: HashMap<ComponentId, RegisteredComponent<Box<dyn EncoderBuilder + Send>>>,
75 forwarders: HashMap<ComponentId, RegisteredComponent<Box<dyn ForwarderBuilder + Send>>>,
76 component_registry: ComponentRegistry,
77 interconnect_capacity: NonZeroUsize,
78 shutdown_timeout: Duration,
79 worker_pool_config: WorkerPoolConfiguration,
80 environment_ready: Option<Pin<Box<dyn Future<Output = ()> + Send>>>,
81 ready_signal: Option<oneshot::Sender<()>>,
82}
83
84impl TopologyBlueprint {
85 pub fn new(name: &str, component_registry: &ComponentRegistry) -> Self {
87 let component_registry = component_registry.get_or_create("topology").get_or_create(name);
89
90 let build_state = TopologyBuildState {
91 graph: Graph::default(),
92 sources: HashMap::new(),
93 relays: HashMap::new(),
94 decoders: HashMap::new(),
95 transforms: HashMap::new(),
96 destinations: HashMap::new(),
97 encoders: HashMap::new(),
98 forwarders: HashMap::new(),
99 component_registry,
100 interconnect_capacity: super::DEFAULT_INTERCONNECT_CAPACITY,
101 shutdown_timeout: DEFAULT_SHUTDOWN_TIMEOUT,
102 worker_pool_config: WorkerPoolConfiguration::Dedicated,
103 environment_ready: None,
104 ready_signal: None,
105 };
106
107 Self {
108 name: name.to_string(),
109 build_state: Mutex::new(Some(build_state)),
110 health_registry: None,
111 memory_limiter: None,
112 }
113 }
114
115 fn state_mut(&mut self) -> &mut TopologyBuildState {
121 self.build_state
122 .get_mut()
123 .expect("topology blueprint mutex poisoned")
124 .as_mut()
125 .expect("topology blueprint already initialized")
126 }
127
128 pub fn with_interconnect_capacity(&mut self, capacity: NonZeroUsize) -> &mut Self {
136 self.state_mut().set_interconnect_capacity(capacity);
137 self
138 }
139
140 pub fn with_shutdown_timeout(&mut self, timeout: Duration) -> &mut Self {
144 self.state_mut().shutdown_timeout = timeout;
145 self
146 }
147
148 pub fn with_health_registry(&mut self, health_registry: HealthRegistry) -> &mut Self {
152 self.health_registry = Some(health_registry);
153 self
154 }
155
156 pub fn with_memory_limiter(&mut self, memory_limiter: MemoryLimiter) -> &mut Self {
160 self.memory_limiter = Some(memory_limiter);
161 self
162 }
163
164 pub fn with_environment_readiness<F>(&mut self, ready: F) -> &mut Self
170 where
171 F: Future<Output = ()> + Send + 'static,
172 {
173 self.state_mut().environment_ready = Some(Box::pin(ready));
174 self
175 }
176
177 pub fn topology_ready(&mut self) -> TopologyReady {
186 let health_registry = self
187 .health_registry
188 .clone()
189 .expect("health registry must be set before acquiring a topology readiness handle");
190 let component_prefix = format!("{}.", super::health_component_root(&self.name));
191
192 let (registered_tx, registered_rx) = oneshot::channel();
193 self.state_mut().ready_signal = Some(registered_tx);
194
195 TopologyReady {
196 registered_rx,
197 health_registry,
198 component_prefix,
199 }
200 }
201
202 pub fn with_ambient_worker_pool(&mut self) -> &mut Self {
207 self.state_mut().worker_pool_config = WorkerPoolConfiguration::Ambient;
208 self
209 }
210
211 pub fn with_explicit_worker_pool(&mut self, handle: Handle) -> &mut Self {
215 self.state_mut().worker_pool_config = WorkerPoolConfiguration::Explicit(handle);
216 self
217 }
218
219 pub fn add_source<I, B>(&mut self, component_id: I, builder: B) -> Result<&mut Self, GenericError>
225 where
226 I: AsRef<str>,
227 B: SourceBuilder + Send + 'static,
228 {
229 self.state_mut().add_source(component_id, builder)?;
230 Ok(self)
231 }
232
233 pub fn add_relay<I, B>(&mut self, component_id: I, builder: B) -> Result<&mut Self, GenericError>
239 where
240 I: AsRef<str>,
241 B: RelayBuilder + Send + 'static,
242 {
243 self.state_mut().add_relay(component_id, builder)?;
244 Ok(self)
245 }
246
247 pub fn add_decoder<I, B>(&mut self, component_id: I, builder: B) -> Result<&mut Self, GenericError>
253 where
254 I: AsRef<str>,
255 B: DecoderBuilder + Send + 'static,
256 {
257 self.state_mut().add_decoder(component_id, builder)?;
258 Ok(self)
259 }
260
261 pub fn add_transform<I, B>(&mut self, component_id: I, builder: B) -> Result<&mut Self, GenericError>
267 where
268 I: AsRef<str>,
269 B: TransformBuilder + Send + 'static,
270 {
271 self.state_mut().add_transform(component_id, builder)?;
272 Ok(self)
273 }
274
275 pub fn add_destination<I, B>(&mut self, component_id: I, builder: B) -> Result<&mut Self, GenericError>
281 where
282 I: AsRef<str>,
283 B: DestinationBuilder + Send + 'static,
284 {
285 self.state_mut().add_destination(component_id, builder)?;
286 Ok(self)
287 }
288
289 pub fn add_encoder<I, B>(&mut self, component_id: I, builder: B) -> Result<&mut Self, GenericError>
295 where
296 I: AsRef<str>,
297 B: EncoderBuilder + Send + 'static,
298 {
299 self.state_mut().add_encoder(component_id, builder)?;
300 Ok(self)
301 }
302
303 pub fn add_forwarder<I, B>(&mut self, component_id: I, builder: B) -> Result<&mut Self, GenericError>
309 where
310 I: AsRef<str>,
311 B: ForwarderBuilder + Send + 'static,
312 {
313 self.state_mut().add_forwarder(component_id, builder)?;
314 Ok(self)
315 }
316
317 pub fn connect_components<MS, SI, MD, DI>(
332 &mut self, upstream_output_component_ids: SI, downstream_component_ids: DI,
333 ) -> Result<&mut Self, GenericError>
334 where
335 SI: AsComponentIds<MS>,
336 DI: AsComponentIds<MD>,
337 {
338 self.state_mut()
339 .connect_components(upstream_output_component_ids, downstream_component_ids)?;
340 Ok(self)
341 }
342
343 pub fn connect_components_in_order<IT, I>(&mut self, ordered_component_ids: IT) -> Result<&mut Self, GenericError>
365 where
366 IT: IntoIterator<Item = I>,
367 I: AsRef<str>,
368 {
369 self.state_mut().connect_components_in_order(ordered_component_ids)?;
370 Ok(self)
371 }
372}
373
374pub struct TopologyReady {
376 registered_rx: oneshot::Receiver<()>,
377 health_registry: HealthRegistry,
378 component_prefix: String,
379}
380
381impl TopologyReady {
382 pub async fn wait(self) -> bool {
388 if self.registered_rx.await.is_err() {
393 return false;
394 }
395
396 self.health_registry
398 .all_ready_matching(|name| name.starts_with(&self.component_prefix))
399 .await;
400
401 true
402 }
403}
404
405impl TopologyBuildState {
406 fn set_interconnect_capacity(&mut self, capacity: NonZeroUsize) {
407 self.interconnect_capacity = capacity;
408 self.recalculate_bounds();
409 }
410
411 fn recalculate_bounds(&mut self) {
412 let interconnect_capacity = self.interconnect_capacity.get();
413
414 let mut bounds_builder = self.component_registry.bounds_builder();
415 let mut bounds_builder = bounds_builder.subcomponent("interconnects");
416 bounds_builder.reset();
417
418 let total_interconnect_capacity = interconnect_capacity * (self.transforms.len() + self.destinations.len());
423 bounds_builder
424 .minimum()
425 .with_array::<EventsBuffer>("events", total_interconnect_capacity);
426
427 let max_in_flight_event_buffers = ((self.transforms.len() + self.destinations.len()) * interconnect_capacity)
438 + self.sources.len()
439 + self.decoders.len()
440 + self.transforms.len();
441
442 bounds_builder
443 .firm()
444 .with_expr(UsageExpr::product(
446 "events",
447 UsageExpr::constant("max in-flight event buffers", max_in_flight_event_buffers),
448 UsageExpr::sum(
449 "",
450 UsageExpr::struct_size::<EventsBuffer>("events buffer"),
451 UsageExpr::product(
452 "",
453 UsageExpr::struct_size::<Event>("event"),
454 UsageExpr::constant("default event buffer capacity", DEFAULT_EVENTS_BUFFER_CAPACITY),
455 ),
456 ),
457 ));
458 }
459
460 fn add_source<I, B>(&mut self, component_id: I, builder: B) -> Result<(), GenericError>
461 where
462 I: AsRef<str>,
463 B: SourceBuilder + Send + 'static,
464 {
465 let component_id = self
466 .graph
467 .add_source(component_id, &builder)
468 .error_context("Failed to add source to topology graph.")?;
469
470 let mut source_registry = self
471 .component_registry
472 .get_or_create(format!("components.sources.{}", component_id));
473 let mut bounds_builder = source_registry.bounds_builder();
474 builder.specify_bounds(&mut bounds_builder);
475
476 self.recalculate_bounds();
477
478 let _ = self.sources.insert(
479 component_id,
480 RegisteredComponent::new(Box::new(builder), source_registry),
481 );
482
483 Ok(())
484 }
485
486 fn add_relay<I, B>(&mut self, component_id: I, builder: B) -> Result<(), GenericError>
487 where
488 I: AsRef<str>,
489 B: RelayBuilder + Send + 'static,
490 {
491 let component_id = self
492 .graph
493 .add_relay(component_id, &builder)
494 .error_context("Failed to add relay to topology graph.")?;
495
496 let mut relay_registry = self
497 .component_registry
498 .get_or_create(format!("components.relays.{}", component_id));
499 let mut bounds_builder = relay_registry.bounds_builder();
500 builder.specify_bounds(&mut bounds_builder);
501
502 self.recalculate_bounds();
503
504 let _ = self.relays.insert(
505 component_id,
506 RegisteredComponent::new(Box::new(builder), relay_registry),
507 );
508
509 Ok(())
510 }
511
512 fn add_decoder<I, B>(&mut self, component_id: I, builder: B) -> Result<(), GenericError>
513 where
514 I: AsRef<str>,
515 B: DecoderBuilder + Send + 'static,
516 {
517 let component_id = self
518 .graph
519 .add_decoder(component_id, &builder)
520 .error_context("Failed to add decoder to topology graph.")?;
521
522 let mut decoder_registry = self
523 .component_registry
524 .get_or_create(format!("components.decoders.{}", component_id));
525 let mut bounds_builder = decoder_registry.bounds_builder();
526 builder.specify_bounds(&mut bounds_builder);
527
528 self.recalculate_bounds();
529
530 let _ = self.decoders.insert(
531 component_id,
532 RegisteredComponent::new(Box::new(builder), decoder_registry),
533 );
534
535 Ok(())
536 }
537
538 fn add_transform<I, B>(&mut self, component_id: I, builder: B) -> Result<(), GenericError>
539 where
540 I: AsRef<str>,
541 B: TransformBuilder + Send + 'static,
542 {
543 let component_id = self
544 .graph
545 .add_transform(component_id, &builder)
546 .error_context("Failed to add transform to topology graph.")?;
547
548 let mut transform_registry = self
549 .component_registry
550 .get_or_create(format!("components.transforms.{}", component_id));
551 let mut bounds_builder = transform_registry.bounds_builder();
552 builder.specify_bounds(&mut bounds_builder);
553
554 self.recalculate_bounds();
555
556 let _ = self.transforms.insert(
557 component_id,
558 RegisteredComponent::new(Box::new(builder), transform_registry),
559 );
560
561 Ok(())
562 }
563
564 fn add_destination<I, B>(&mut self, component_id: I, builder: B) -> Result<(), GenericError>
565 where
566 I: AsRef<str>,
567 B: DestinationBuilder + Send + 'static,
568 {
569 let component_id = self
570 .graph
571 .add_destination(component_id, &builder)
572 .error_context("Failed to add destination to topology graph.")?;
573
574 let mut destination_registry = self
575 .component_registry
576 .get_or_create(format!("components.destinations.{}", component_id));
577 let mut bounds_builder = destination_registry.bounds_builder();
578 builder.specify_bounds(&mut bounds_builder);
579
580 self.recalculate_bounds();
581
582 let _ = self.destinations.insert(
583 component_id,
584 RegisteredComponent::new(Box::new(builder), destination_registry),
585 );
586
587 Ok(())
588 }
589
590 fn add_encoder<I, B>(&mut self, component_id: I, builder: B) -> Result<(), GenericError>
591 where
592 I: AsRef<str>,
593 B: EncoderBuilder + Send + 'static,
594 {
595 let component_id = self
596 .graph
597 .add_encoder(component_id, &builder)
598 .error_context("Failed to add encoder to topology graph.")?;
599
600 let mut encoder_registry = self
601 .component_registry
602 .get_or_create(format!("components.encoders.{}", component_id));
603 let mut bounds_builder = encoder_registry.bounds_builder();
604 builder.specify_bounds(&mut bounds_builder);
605
606 self.recalculate_bounds();
607
608 let _ = self.encoders.insert(
609 component_id,
610 RegisteredComponent::new(Box::new(builder), encoder_registry),
611 );
612
613 Ok(())
614 }
615
616 fn add_forwarder<I, B>(&mut self, component_id: I, builder: B) -> Result<(), GenericError>
617 where
618 I: AsRef<str>,
619 B: ForwarderBuilder + Send + 'static,
620 {
621 let component_id = self
622 .graph
623 .add_forwarder(component_id, &builder)
624 .error_context("Failed to add forwarder to topology graph.")?;
625
626 let mut forwarder_registry = self
627 .component_registry
628 .get_or_create(format!("components.forwarders.{}", component_id));
629 let mut bounds_builder = forwarder_registry.bounds_builder();
630 builder.specify_bounds(&mut bounds_builder);
631
632 self.recalculate_bounds();
633
634 let _ = self.forwarders.insert(
635 component_id,
636 RegisteredComponent::new(Box::new(builder), forwarder_registry),
637 );
638
639 Ok(())
640 }
641
642 fn connect_components<MS, SI, MD, DI>(
643 &mut self, upstream_output_component_ids: SI, downstream_component_ids: DI,
644 ) -> Result<(), GenericError>
645 where
646 SI: AsComponentIds<MS>,
647 DI: AsComponentIds<MD>,
648 {
649 for upstream_output_component_id in upstream_output_component_ids.as_component_ids() {
650 for downstream_component_id in downstream_component_ids.as_component_ids() {
651 self.graph
652 .add_edge(upstream_output_component_id.as_ref(), downstream_component_id.as_ref())
653 .error_context("Failed to add component connection to topology graph.")?;
654 }
655 }
656
657 Ok(())
658 }
659
660 fn connect_components_in_order<IT, I>(&mut self, ordered_component_ids: IT) -> Result<(), GenericError>
661 where
662 IT: IntoIterator<Item = I>,
663 I: AsRef<str>,
664 {
665 let mut pending_output_component_id: Option<I> = None;
666 let mut connected_any = false;
667
668 for component_id in ordered_component_ids.into_iter() {
669 if let Some(output_component_id) = pending_output_component_id.take() {
670 self.graph
671 .add_edge(output_component_id.as_ref(), component_id.as_ref())
672 .error_context("Failed to add component connection to topology graph.")?;
673
674 connected_any = true;
675 }
676
677 pending_output_component_id = Some(component_id);
679 }
680
681 if !connected_any {
683 return Err(generic_error!(
684 "Two or more components must be provided for connection."
685 ));
686 }
687
688 Ok(())
689 }
690
691 async fn build(mut self, name: String) -> Result<BuiltTopology, GenericError> {
697 self.graph.validate().error_context("Failed to build topology graph.")?;
698
699 let mut sources = HashMap::new();
700 for (id, builder) in self.sources {
701 let (builder, mut component_registry) = builder.into_parts();
702 let allocation_token = component_registry.token();
703
704 let component_context = ComponentContext::source(id.clone());
705 let source = builder
706 .build(component_context)
707 .track_resources(allocation_token)
708 .await
709 .with_error_context(|| format!("Failed to build source '{}'.", id))?;
710
711 sources.insert(
712 id,
713 RegisteredComponent::new(source.track_resources(allocation_token), component_registry),
714 );
715 }
716
717 let mut relays = HashMap::new();
718 for (id, builder) in self.relays {
719 let (builder, mut component_registry) = builder.into_parts();
720 let allocation_token = component_registry.token();
721
722 let component_context = ComponentContext::relay(id.clone());
723 let relay = builder
724 .build(component_context)
725 .track_resources(allocation_token)
726 .await
727 .with_error_context(|| format!("Failed to build relay '{}'.", id))?;
728
729 relays.insert(
730 id,
731 RegisteredComponent::new(relay.track_resources(allocation_token), component_registry),
732 );
733 }
734
735 let mut decoders = HashMap::new();
736 for (id, builder) in self.decoders {
737 let (builder, mut component_registry) = builder.into_parts();
738 let allocation_token = component_registry.token();
739
740 let component_context = ComponentContext::decoder(id.clone());
741 let decoder = builder
742 .build(component_context)
743 .track_resources(allocation_token)
744 .await
745 .with_error_context(|| format!("Failed to build decoder '{}'.", id))?;
746
747 decoders.insert(
748 id,
749 RegisteredComponent::new(decoder.track_resources(allocation_token), component_registry),
750 );
751 }
752
753 let mut transforms = HashMap::new();
754 for (id, builder) in self.transforms {
755 let (builder, mut component_registry) = builder.into_parts();
756 let allocation_token = component_registry.token();
757
758 let component_context = ComponentContext::transform(id.clone());
759 let transform = builder
760 .build(component_context)
761 .track_resources(allocation_token)
762 .await
763 .with_error_context(|| format!("Failed to build transform '{}'.", id))?;
764
765 transforms.insert(
766 id,
767 RegisteredComponent::new(transform.track_resources(allocation_token), component_registry),
768 );
769 }
770
771 let mut destinations = HashMap::new();
772 for (id, builder) in self.destinations {
773 let (builder, mut component_registry) = builder.into_parts();
774 let allocation_token = component_registry.token();
775
776 let component_context = ComponentContext::destination(id.clone());
777 let destination = builder
778 .build(component_context)
779 .track_resources(allocation_token)
780 .await
781 .with_error_context(|| format!("Failed to build destination '{}'.", id))?;
782
783 destinations.insert(
784 id,
785 RegisteredComponent::new(destination.track_resources(allocation_token), component_registry),
786 );
787 }
788
789 let mut encoders = HashMap::new();
790 for (id, builder) in self.encoders {
791 let (builder, mut component_registry) = builder.into_parts();
792 let allocation_token = component_registry.token();
793
794 let component_context = ComponentContext::encoder(id.clone());
795 let encoder = builder
796 .build(component_context)
797 .track_resources(allocation_token)
798 .await
799 .with_error_context(|| format!("Failed to build encoder '{}'.", id))?;
800
801 encoders.insert(
802 id,
803 RegisteredComponent::new(encoder.track_resources(allocation_token), component_registry),
804 );
805 }
806
807 let mut forwarders = HashMap::new();
808 for (id, builder) in self.forwarders {
809 let (builder, mut component_registry) = builder.into_parts();
810 let allocation_token = component_registry.token();
811
812 let component_context = ComponentContext::forwarder(id.clone());
813 let forwarder = builder
814 .build(component_context)
815 .track_resources(allocation_token)
816 .await
817 .with_error_context(|| format!("Failed to build forwarder '{}'.", id))?;
818
819 forwarders.insert(
820 id,
821 RegisteredComponent::new(forwarder.track_resources(allocation_token), component_registry),
822 );
823 }
824
825 Ok(BuiltTopology::from_parts(
826 name,
827 self.graph,
828 sources,
829 relays,
830 decoders,
831 transforms,
832 destinations,
833 encoders,
834 forwarders,
835 self.component_registry.token(),
836 self.interconnect_capacity,
837 self.worker_pool_config,
838 ))
839 }
840}
841
842#[async_trait]
843impl Supervisable for TopologyBlueprint {
844 fn name(&self) -> &str {
845 &self.name
846 }
847
848 fn shutdown_strategy(&self) -> ShutdownStrategy {
849 ShutdownStrategy::Graceful(Duration::MAX)
852 }
853
854 async fn initialize(&self, shutdown: ShutdownHandle) -> Result<SupervisorFuture, InitializationError> {
855 let mut build_state = self
859 .build_state
860 .lock()
861 .expect("topology blueprint mutex poisoned")
862 .take()
863 .ok_or_else(|| generic_error!("Topology has already been initialized and cannot be run more than once."))?;
864
865 let health_registry = self
866 .health_registry
867 .clone()
868 .ok_or_else(|| generic_error!("Topology blueprint is missing its health registry."))?;
869 let memory_limiter = self
870 .memory_limiter
871 .clone()
872 .ok_or_else(|| generic_error!("Topology blueprint is missing its memory limiter."))?;
873
874 let dataspace = DataspaceRegistry::try_current()
875 .ok_or_else(|| generic_error!("Topology must be initialized within a supervised process context."))?;
876
877 let environment_ready = build_state.environment_ready.take();
885 let ready_signal = build_state.ready_signal.take();
886 let shutdown_timeout = build_state.shutdown_timeout;
887 let built = build_state.build(self.name.clone()).await?;
888
889 Ok(Box::pin(async move {
890 pin!(shutdown);
891
892 if let Some(environment_ready) = environment_ready {
895 select! {
896 _ = &mut shutdown => return Ok(()),
897 _ = environment_ready => {},
898 }
899 }
900
901 let mut running = built.spawn_inner(&health_registry, memory_limiter, dataspace).await?;
902
903 if let Some(ready_signal) = ready_signal {
907 let _ = ready_signal.send(());
908 }
909
910 let mut topology_failed = false;
911 select! {
912 _ = &mut shutdown => {
914 info!("Topology received shutdown signal. Shutting down...");
915 },
916
917 _ = running.wait_for_unexpected_finish() => {
919 error!("Topology component unexpectedly finished. Shutting down...");
920 topology_failed = true;
921 },
922 }
923
924 let shutdown_result = running.shutdown_with_timeout(shutdown_timeout).await;
926 match (shutdown_result, topology_failed) {
927 (Ok(()), false) => Ok(()),
928 (Ok(()), true) => Err(generic_error!(
929 "Topology shut down after a component unexpectedly finished."
930 )),
931 (Err(e), _) => Err(e),
932 }
933 }))
934 }
935}
936
937#[cfg(test)]
938mod tests {
939 use std::time::Duration;
940
941 use resource_accounting::{ComponentRegistry, MemoryLimiter};
942 use tokio::sync::oneshot;
943
944 use super::{TopologyBlueprint, TopologyReady};
945 use crate::{
946 data_model::event::EventType,
947 health::HealthRegistry,
948 runtime::{RestartMode, RestartStrategy, Supervisor, SupervisorError},
949 topology::test_util::{TestDestinationBuilder, TestSourceBuilder, TestTransformBuilder},
950 };
951
952 fn blueprint_with_components() -> TopologyBlueprint {
956 let component_registry = ComponentRegistry::default();
957 let mut blueprint = TopologyBlueprint::new("test", &component_registry);
958
959 blueprint
960 .add_source("source", TestSourceBuilder::default_output(EventType::EventD))
961 .expect("should not fail to add source")
962 .add_transform(
963 "transform",
964 TestTransformBuilder::default_output(EventType::EventD, EventType::EventD),
965 )
966 .expect("should not fail to add transform")
967 .add_destination(
968 "destination",
969 TestDestinationBuilder::with_input_type(EventType::EventD),
970 )
971 .expect("should not fail to add destination");
972
973 blueprint
974 }
975
976 fn blueprint_with_sources_and_destinations(source_ids: &[&str], destination_ids: &[&str]) -> TopologyBlueprint {
981 let component_registry = ComponentRegistry::default();
982 let mut blueprint = TopologyBlueprint::new("test", &component_registry);
983
984 for source_id in source_ids {
985 blueprint
986 .add_source(*source_id, TestSourceBuilder::default_output(EventType::EventD))
987 .expect("should not fail to add source");
988 }
989
990 for destination_id in destination_ids {
991 blueprint
992 .add_destination(
993 *destination_id,
994 TestDestinationBuilder::with_input_type(EventType::EventD),
995 )
996 .expect("should not fail to add destination");
997 }
998
999 blueprint
1000 }
1001
1002 fn connected_pairs(blueprint: &TopologyBlueprint) -> Vec<(String, String)> {
1004 let guard = blueprint.build_state.lock().expect("topology blueprint mutex poisoned");
1005 let outbound_edges = guard
1006 .as_ref()
1007 .expect("topology blueprint already initialized")
1008 .graph
1009 .get_outbound_directed_edges();
1010
1011 let mut pairs = Vec::new();
1012 for (from, outputs) in &outbound_edges {
1013 for targets in outputs.values() {
1014 for to in targets {
1015 pairs.push((from.component_id().to_string(), to.component_id().to_string()));
1016 }
1017 }
1018 }
1019 pairs.sort();
1020 pairs
1021 }
1022
1023 #[test]
1024 fn connect_components_in_order_errors_with_fewer_than_two_ids() {
1025 let mut blueprint = blueprint_with_components();
1026
1027 let result = blueprint.connect_components_in_order(Vec::<&str>::new()).map(|_| ());
1029 assert!(result.is_err());
1030
1031 let result = blueprint.connect_components_in_order(["source"]).map(|_| ());
1033 assert!(result.is_err());
1034
1035 assert!(connected_pairs(&blueprint).is_empty());
1037 }
1038
1039 #[test]
1040 fn connect_components_in_order_connects_pairwise_left_to_right() {
1041 let mut blueprint = blueprint_with_components();
1042
1043 blueprint
1044 .connect_components_in_order(["source", "transform", "destination"])
1045 .expect("should not fail to connect components in order");
1046
1047 assert_eq!(
1050 connected_pairs(&blueprint),
1051 vec![
1052 ("source".to_string(), "transform".to_string()),
1053 ("transform".to_string(), "destination".to_string()),
1054 ],
1055 );
1056 }
1057
1058 #[test]
1059 fn connect_component_one_to_many_fans_out() {
1060 let mut blueprint = blueprint_with_sources_and_destinations(&["source"], &["dest_a", "dest_b"]);
1063
1064 blueprint
1065 .connect_components("source", ["dest_a", "dest_b"])
1066 .expect("should not fail to connect component");
1067
1068 assert_eq!(
1069 connected_pairs(&blueprint),
1070 vec![
1071 ("source".to_string(), "dest_a".to_string()),
1072 ("source".to_string(), "dest_b".to_string()),
1073 ],
1074 );
1075 }
1076
1077 #[test]
1078 fn connect_component_many_to_one_fans_in() {
1079 let mut blueprint = blueprint_with_sources_and_destinations(&["source_a", "source_b"], &["dest"]);
1082
1083 blueprint
1084 .connect_components(["source_a", "source_b"], "dest")
1085 .expect("should not fail to connect component");
1086
1087 assert_eq!(
1088 connected_pairs(&blueprint),
1089 vec![
1090 ("source_a".to_string(), "dest".to_string()),
1091 ("source_b".to_string(), "dest".to_string()),
1092 ],
1093 );
1094 }
1095
1096 #[test]
1097 fn connect_component_many_to_many_creates_mesh() {
1098 let mut blueprint = blueprint_with_sources_and_destinations(&["source_a", "source_b"], &["dest_a", "dest_b"]);
1101
1102 blueprint
1103 .connect_components(["source_a", "source_b"], ["dest_a", "dest_b"])
1104 .expect("should not fail to connect component");
1105
1106 assert_eq!(
1107 connected_pairs(&blueprint),
1108 vec![
1109 ("source_a".to_string(), "dest_a".to_string()),
1110 ("source_a".to_string(), "dest_b".to_string()),
1111 ("source_b".to_string(), "dest_a".to_string()),
1112 ("source_b".to_string(), "dest_b".to_string()),
1113 ],
1114 );
1115 }
1116
1117 fn connected_blueprint() -> TopologyBlueprint {
1119 let mut blueprint = blueprint_with_components();
1120 blueprint
1121 .connect_components_in_order(["source", "transform", "destination"])
1122 .expect("should not fail to connect components");
1123 blueprint
1124 }
1125
1126 #[tokio::test]
1127 async fn topology_failure_shuts_down_supervisor() {
1128 let mut blueprint = connected_blueprint();
1132 blueprint
1133 .with_health_registry(HealthRegistry::new())
1134 .with_memory_limiter(MemoryLimiter::noop());
1135
1136 let mut supervisor = Supervisor::new("test-topology")
1137 .expect("should not fail to create supervisor")
1138 .with_restart_strategy(RestartStrategy::new(RestartMode::OneForOne, 0, Duration::from_secs(5)));
1139 supervisor.add_worker(blueprint);
1140
1141 let (_tx, rx) = oneshot::channel::<()>();
1142 let result = tokio::time::timeout(Duration::from_secs(5), supervisor.run_with_shutdown(rx))
1143 .await
1144 .expect("supervisor should exit promptly");
1145
1146 assert!(matches!(result, Err(SupervisorError::Shutdown)));
1147 }
1148
1149 #[tokio::test]
1150 async fn topology_cannot_be_initialized_more_than_once() {
1151 let mut blueprint = connected_blueprint();
1156 blueprint
1157 .with_health_registry(HealthRegistry::new())
1158 .with_memory_limiter(MemoryLimiter::noop());
1159
1160 let mut supervisor = Supervisor::new("test-topology").expect("should not fail to create supervisor");
1161 supervisor.add_worker(blueprint);
1162
1163 let (_tx, rx) = oneshot::channel::<()>();
1164 let result = tokio::time::timeout(Duration::from_secs(5), supervisor.run_with_shutdown(rx))
1165 .await
1166 .expect("supervisor should exit promptly");
1167
1168 assert!(matches!(result, Err(SupervisorError::FailedToInitialize { .. })));
1169 }
1170
1171 #[tokio::test]
1172 async fn topology_waits_for_environment_readiness_before_starting() {
1173 let mut blueprint = connected_blueprint();
1178 blueprint
1179 .with_health_registry(HealthRegistry::new())
1180 .with_memory_limiter(MemoryLimiter::noop())
1181 .with_environment_readiness(std::future::pending::<()>());
1182
1183 let mut supervisor = Supervisor::new("test-topology").expect("should not fail to create supervisor");
1184 supervisor.add_worker(blueprint);
1185
1186 let (tx, rx) = oneshot::channel::<()>();
1187 let handle = tokio::spawn(async move { supervisor.run_with_shutdown(rx).await });
1188
1189 tokio::time::sleep(Duration::from_millis(50)).await;
1191 tx.send(()).expect("should send shutdown signal");
1192
1193 let result = tokio::time::timeout(Duration::from_secs(5), handle)
1194 .await
1195 .expect("supervisor should exit promptly")
1196 .expect("supervisor task should not panic");
1197
1198 assert!(result.is_ok(), "supervisor should shut down cleanly, got: {:?}", result);
1199 }
1200
1201 #[test]
1202 fn topology_ready_waits_for_registration_before_checking_readiness() {
1203 use tokio_test::{assert_pending, assert_ready, task::spawn};
1204
1205 let health_registry = HealthRegistry::new();
1206
1207 let mut other = health_registry
1210 .register_component("env_provider.workload.foo")
1211 .expect("should register component");
1212 other.mark_ready();
1213
1214 let (registered_tx, registered_rx) = oneshot::channel();
1215 let topology_ready = TopologyReady {
1216 registered_rx,
1217 health_registry: health_registry.clone(),
1218 component_prefix: "topology.primary.".to_string(),
1219 };
1220
1221 let mut wait = spawn(topology_ready.wait());
1222
1223 assert_pending!(wait.poll());
1226
1227 let mut source = health_registry
1229 .register_component("topology.primary.sources.in")
1230 .expect("should register component");
1231
1232 registered_tx.send(()).expect("receiver should be alive");
1235 assert_pending!(wait.poll());
1236
1237 source.mark_ready();
1239 assert!(assert_ready!(wait.poll()));
1240 }
1241
1242 #[tokio::test]
1243 async fn topology_ready_returns_false_when_torn_down_before_registration() {
1244 let health_registry = HealthRegistry::new();
1245
1246 let (registered_tx, registered_rx) = oneshot::channel::<()>();
1247 let topology_ready = TopologyReady {
1248 registered_rx,
1249 health_registry,
1250 component_prefix: "topology.primary.".to_string(),
1251 };
1252
1253 drop(registered_tx);
1256
1257 assert!(!topology_ready.wait().await);
1258 }
1259}