saluki_components/transforms/chained/
mod.rs

1use async_trait::async_trait;
2use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
3use saluki_common::strings::lower_alphanumeric;
4use saluki_core::data_model::event::EventType;
5use saluki_core::{
6    components::{transforms::*, ComponentContext},
7    topology::OutputDefinition,
8};
9use saluki_error::GenericError;
10use tokio::select;
11use tracing::{debug, error};
12
13/// Chained transform.
14///
15/// Allows chaining multiple transforms together in a single component, which can avoid the overhead of receiving and
16/// sending events multiple times when concurrency is not required for processing.
17///
18/// ## Synchronous transforms
19///
20/// This component works with synchronous transforms only. If you need to chain asynchronous transforms, they must be
21/// added to the topology normally.
22#[derive(Default)]
23pub struct ChainedConfiguration {
24    subtransform_builders: Vec<(String, Box<dyn SynchronousTransformBuilder + Send + Sync>)>,
25}
26
27impl ChainedConfiguration {
28    /// Adds a new synchronous transform to the chain.
29    pub fn with_transform_builder<TB>(mut self, subtransform_name: &str, subtransform_builder: TB) -> Self
30    where
31        TB: SynchronousTransformBuilder + Send + Sync + 'static,
32    {
33        let subtransform_id = format!(
34            "{}_{}",
35            self.subtransform_builders.len(),
36            lower_alphanumeric(subtransform_name)
37        );
38        self.subtransform_builders
39            .push((subtransform_id, Box::new(subtransform_builder)));
40        self
41    }
42}
43
44impl MemoryBounds for ChainedConfiguration {
45    fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
46        // Capture the size of the heap allocation when the component is built.
47        builder.minimum().with_single_value::<Chained>("component struct");
48
49        for (subtransform_id, subtransform_builder) in self.subtransform_builders.iter() {
50            let mut subtransform_bounds_builder = builder.subcomponent(subtransform_id);
51            subtransform_builder.specify_bounds(&mut subtransform_bounds_builder);
52        }
53    }
54}
55
56#[async_trait]
57impl TransformBuilder for ChainedConfiguration {
58    async fn build(&self, context: ComponentContext) -> Result<Box<dyn Transform + Send>, GenericError> {
59        let mut subtransforms = Vec::new();
60        for (subtransform_id, subtransform_builder) in &self.subtransform_builders {
61            let subtransform = subtransform_builder.build(context.clone()).await?;
62            subtransforms.push((subtransform_id.clone(), subtransform));
63        }
64
65        Ok(Box::new(Chained { subtransforms }))
66    }
67
68    fn input_event_type(&self) -> EventType {
69        EventType::all_bits()
70    }
71
72    fn outputs(&self) -> &[OutputDefinition<EventType>] {
73        static OUTPUTS: &[OutputDefinition<EventType>] = &[OutputDefinition::default_output(EventType::all_bits())];
74        OUTPUTS
75    }
76}
77
78pub struct Chained {
79    subtransforms: Vec<(String, Box<dyn SynchronousTransform + Send>)>,
80}
81
82#[async_trait]
83impl Transform for Chained {
84    async fn run(mut self: Box<Self>, mut context: TransformContext) -> Result<(), GenericError> {
85        let mut health = context.take_health_handle();
86
87        debug!(
88            "Chained transform started with {} synchronous subtransform(s) present.",
89            self.subtransforms.len()
90        );
91
92        // We have to re-associate each subtransform with their allocation group token here, as we don't have access to
93        // it when the bounds are initially defined.
94        let mut subtransforms = self
95            .subtransforms
96            .into_iter()
97            .map(|(subtransform_id, subtransform)| {
98                (
99                    context.component_registry().get_or_create(subtransform_id).token(),
100                    subtransform,
101                )
102            })
103            .collect::<Vec<_>>();
104
105        health.mark_ready();
106        debug!("Chained transform started.");
107
108        loop {
109            select! {
110                _ = health.live() => continue,
111                maybe_events = context.events().next() => match maybe_events {
112                    Some(mut event_buffer) => {
113                        for (allocation_token, transform) in &mut subtransforms {
114                            let _guard = allocation_token.enter();
115                            transform.transform_buffer(&mut event_buffer);
116                        }
117
118                        if let Err(e) = context.dispatcher().dispatch(event_buffer).await {
119                            error!(error = %e, "Failed to dispatch events.");
120                        }
121                    },
122                    None => break,
123                },
124            }
125        }
126
127        debug!("Chained transform stopped.");
128
129        Ok(())
130    }
131}