saluki_components/transforms/chained/
mod.rs

1use async_trait::async_trait;
2use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
3use saluki_common::strings::lower_alphanumeric;
4use saluki_core::data_model::event::EventType;
5use saluki_core::{
6    components::{transforms::*, ComponentContext},
7    topology::OutputDefinition,
8};
9use saluki_error::GenericError;
10use tokio::select;
11use tracing::{debug, error};
12
13/// Chained transform.
14///
15/// Allows chaining multiple transforms together in a single component, which can avoid the overhead of receiving and
16/// sending events multiple times when concurrency is not required for processing.
17///
18/// ## Synchronous transforms
19///
20/// This component works with synchronous transforms only. If you need to chain asynchronous transforms, they must be
21/// added to the topology normally.
22#[derive(Default)]
23pub struct ChainedConfiguration {
24    subtransform_builders: Vec<(String, Box<dyn SynchronousTransformBuilder + Send + Sync>)>,
25}
26
27impl ChainedConfiguration {
28    /// Adds a new synchronous transform to the chain.
29    pub fn with_transform_builder<TB>(mut self, subtransform_name: &str, subtransform_builder: TB) -> Self
30    where
31        TB: SynchronousTransformBuilder + Send + Sync + 'static,
32    {
33        let subtransform_id = format!(
34            "{}_{}",
35            self.subtransform_builders.len(),
36            lower_alphanumeric(subtransform_name)
37        );
38        self.subtransform_builders
39            .push((subtransform_id, Box::new(subtransform_builder)));
40        self
41    }
42}
43
44impl MemoryBounds for ChainedConfiguration {
45    fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
46        // Capture the size of the heap allocation when the component is built.
47        builder.minimum().with_single_value::<Chained>("component struct");
48
49        for (subtransform_id, subtransform_builder) in self.subtransform_builders.iter() {
50            let mut subtransform_bounds_builder = builder.subcomponent(subtransform_id);
51            subtransform_builder.specify_bounds(&mut subtransform_bounds_builder);
52        }
53    }
54}
55
56#[async_trait]
57impl TransformBuilder for ChainedConfiguration {
58    async fn build(&self, context: ComponentContext) -> Result<Box<dyn Transform + Send>, GenericError> {
59        let mut subtransforms = Vec::new();
60        for (subtransform_id, subtransform_builder) in &self.subtransform_builders {
61            let subtransform = subtransform_builder.build(context.clone()).await?;
62            subtransforms.push((subtransform_id.clone(), subtransform));
63        }
64
65        Ok(Box::new(Chained { subtransforms }))
66    }
67
68    fn input_event_type(&self) -> EventType {
69        EventType::all_bits()
70    }
71
72    fn outputs(&self) -> &[OutputDefinition] {
73        static OUTPUTS: &[OutputDefinition] = &[OutputDefinition::default_output(EventType::all_bits())];
74
75        OUTPUTS
76    }
77}
78
79pub struct Chained {
80    subtransforms: Vec<(String, Box<dyn SynchronousTransform + Send>)>,
81}
82
83#[async_trait]
84impl Transform for Chained {
85    async fn run(mut self: Box<Self>, mut context: TransformContext) -> Result<(), GenericError> {
86        let mut health = context.take_health_handle();
87
88        debug!(
89            "Chained transform started with {} synchronous subtransform(s) present.",
90            self.subtransforms.len()
91        );
92
93        // We have to re-associate each subtransform with their allocation group token here, as we don't have access to
94        // it when the bounds are initially defined.
95        let mut subtransforms = self
96            .subtransforms
97            .into_iter()
98            .map(|(subtransform_id, subtransform)| {
99                (
100                    context.component_registry().get_or_create(subtransform_id).token(),
101                    subtransform,
102                )
103            })
104            .collect::<Vec<_>>();
105
106        health.mark_ready();
107        debug!("Chained transform started.");
108
109        loop {
110            select! {
111                _ = health.live() => continue,
112                maybe_events = context.events().next() => match maybe_events {
113                    Some(mut event_buffer) => {
114                        for (allocation_token, transform) in &mut subtransforms {
115                            let _guard = allocation_token.enter();
116                            transform.transform_buffer(&mut event_buffer);
117                        }
118
119                        if let Err(e) = context.dispatcher().dispatch(event_buffer).await {
120                            error!(error = %e, "Failed to dispatch events.");
121                        }
122                    },
123                    None => break,
124                },
125            }
126        }
127
128        debug!("Chained transform stopped.");
129
130        Ok(())
131    }
132}