saluki_components/transforms/chained/
mod.rs1use async_trait::async_trait;
2use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
3use saluki_common::strings::lower_alphanumeric;
4use saluki_core::data_model::event::EventType;
5use saluki_core::{
6 components::{transforms::*, ComponentContext},
7 topology::OutputDefinition,
8};
9use saluki_error::GenericError;
10use tokio::select;
11use tracing::{debug, error};
12
13#[derive(Default)]
23pub struct ChainedConfiguration {
24 subtransform_builders: Vec<(String, Box<dyn SynchronousTransformBuilder + Send + Sync>)>,
25}
26
27impl ChainedConfiguration {
28 pub fn with_transform_builder<TB>(mut self, subtransform_name: &str, subtransform_builder: TB) -> Self
30 where
31 TB: SynchronousTransformBuilder + Send + Sync + 'static,
32 {
33 let subtransform_id = format!(
34 "{}_{}",
35 self.subtransform_builders.len(),
36 lower_alphanumeric(subtransform_name)
37 );
38 self.subtransform_builders
39 .push((subtransform_id, Box::new(subtransform_builder)));
40 self
41 }
42}
43
44impl MemoryBounds for ChainedConfiguration {
45 fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
46 builder.minimum().with_single_value::<Chained>("component struct");
48
49 for (subtransform_id, subtransform_builder) in self.subtransform_builders.iter() {
50 let mut subtransform_bounds_builder = builder.subcomponent(subtransform_id);
51 subtransform_builder.specify_bounds(&mut subtransform_bounds_builder);
52 }
53 }
54}
55
56#[async_trait]
57impl TransformBuilder for ChainedConfiguration {
58 async fn build(&self, context: ComponentContext) -> Result<Box<dyn Transform + Send>, GenericError> {
59 let mut subtransforms = Vec::new();
60 for (subtransform_id, subtransform_builder) in &self.subtransform_builders {
61 let subtransform = subtransform_builder.build(context.clone()).await?;
62 subtransforms.push((subtransform_id.clone(), subtransform));
63 }
64
65 Ok(Box::new(Chained { subtransforms }))
66 }
67
68 fn input_event_type(&self) -> EventType {
69 EventType::all_bits()
70 }
71
72 fn outputs(&self) -> &[OutputDefinition] {
73 static OUTPUTS: &[OutputDefinition] = &[OutputDefinition::default_output(EventType::all_bits())];
74
75 OUTPUTS
76 }
77}
78
79pub struct Chained {
80 subtransforms: Vec<(String, Box<dyn SynchronousTransform + Send>)>,
81}
82
83#[async_trait]
84impl Transform for Chained {
85 async fn run(mut self: Box<Self>, mut context: TransformContext) -> Result<(), GenericError> {
86 let mut health = context.take_health_handle();
87
88 debug!(
89 "Chained transform started with {} synchronous subtransform(s) present.",
90 self.subtransforms.len()
91 );
92
93 let mut subtransforms = self
96 .subtransforms
97 .into_iter()
98 .map(|(subtransform_id, subtransform)| {
99 (
100 context.component_registry().get_or_create(subtransform_id).token(),
101 subtransform,
102 )
103 })
104 .collect::<Vec<_>>();
105
106 health.mark_ready();
107 debug!("Chained transform started.");
108
109 loop {
110 select! {
111 _ = health.live() => continue,
112 maybe_events = context.events().next() => match maybe_events {
113 Some(mut event_buffer) => {
114 for (allocation_token, transform) in &mut subtransforms {
115 let _guard = allocation_token.enter();
116 transform.transform_buffer(&mut event_buffer);
117 }
118
119 if let Err(e) = context.dispatcher().dispatch(event_buffer).await {
120 error!(error = %e, "Failed to dispatch events.");
121 }
122 },
123 None => break,
124 },
125 }
126 }
127
128 debug!("Chained transform stopped.");
129
130 Ok(())
131 }
132}