Skip to main content

saluki_core/topology/
mod.rs

1//! Topology building.
2
3use std::num::NonZeroUsize;
4
5use resource_accounting::ComponentRegistry;
6
7use crate::data_model::payload::Payload;
8use crate::topology::interconnect::FixedSizeEventBuffer;
9
10mod blueprint;
11pub use self::blueprint::{BlueprintError, TopologyBlueprint, TopologyReady};
12
13mod built;
14
15mod context;
16pub use self::context::TopologyContext;
17
18mod graph;
19
20pub mod ids;
21pub use self::ids::{
22    ComponentId, ComponentOutputId, OutputDefinition, OutputName, TypedComponentId, TypedComponentOutputId,
23};
24
25pub mod interconnect;
26use self::interconnect::{Consumer, Dispatcher};
27
28mod running;
29
30#[cfg(test)]
31pub(super) mod test_util;
32
33// SAFETY: These are obviously non-zero.
34const DEFAULT_EVENTS_BUFFER_CAPACITY: usize = 1024;
35const DEFAULT_INTERCONNECT_CAPACITY: NonZeroUsize = NonZeroUsize::new(128).unwrap();
36
37// Topology-wide defaults.
38
39/// Default type for dispatching/consuming items from event-based components.
40pub type EventsBuffer = FixedSizeEventBuffer<DEFAULT_EVENTS_BUFFER_CAPACITY>;
41
42/// Default dispatcher for event-based components.
43pub type EventsDispatcher = Dispatcher<EventsBuffer>;
44
45/// Default consumer for event-based components.
46pub type EventsConsumer = Consumer<EventsBuffer>;
47
48/// Default type for dispatching/consuming items from payload-based components.
49pub type PayloadsBuffer = Payload;
50
51/// Default dispatcher for payload-based components.
52pub type PayloadsDispatcher = Dispatcher<PayloadsBuffer>;
53
54/// Default consumer for payload-based components.
55pub type PayloadsConsumer = Consumer<PayloadsBuffer>;
56
57/// Returns the health registry component-name root for a topology with the given name.
58///
59/// Every component in a topology registers in the health registry under this dotted root (for example,
60/// `topology.primary.sources.dsd_in`). Centralizing it here keeps component registration (when a topology is spawned)
61/// and topology readiness waiting (`TopologyReady`) in sync.
62fn health_component_root(name: &str) -> String {
63    format!("topology.{}", name)
64}
65
66pub(super) struct RegisteredComponent<T> {
67    component: T,
68    component_registry: ComponentRegistry,
69}
70
71impl<T> RegisteredComponent<T> {
72    fn new(component: T, component_registry: ComponentRegistry) -> Self {
73        Self {
74            component,
75            component_registry,
76        }
77    }
78    fn into_parts(self) -> (T, ComponentRegistry) {
79        (self.component, self.component_registry)
80    }
81}