saluki_components/transforms/chained/
mod.rs1use async_trait::async_trait;
2use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
3use saluki_common::strings::lower_alphanumeric;
4use saluki_core::data_model::event::EventType;
5use saluki_core::{
6 components::{transforms::*, ComponentContext},
7 topology::OutputDefinition,
8};
9use saluki_error::GenericError;
10use tokio::select;
11use tracing::{debug, error};
12
13#[derive(Default)]
23pub struct ChainedConfiguration {
24 subtransform_builders: Vec<(String, Box<dyn SynchronousTransformBuilder + Send + Sync>)>,
25}
26
27impl ChainedConfiguration {
28 pub fn with_transform_builder<TB>(mut self, subtransform_name: &str, subtransform_builder: TB) -> Self
30 where
31 TB: SynchronousTransformBuilder + Send + Sync + 'static,
32 {
33 let subtransform_id = format!(
34 "{}_{}",
35 self.subtransform_builders.len(),
36 lower_alphanumeric(subtransform_name)
37 );
38 self.subtransform_builders
39 .push((subtransform_id, Box::new(subtransform_builder)));
40 self
41 }
42}
43
44impl MemoryBounds for ChainedConfiguration {
45 fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
46 builder.minimum().with_single_value::<Chained>("component struct");
48
49 for (subtransform_id, subtransform_builder) in self.subtransform_builders.iter() {
50 let mut subtransform_bounds_builder = builder.subcomponent(subtransform_id);
51 subtransform_builder.specify_bounds(&mut subtransform_bounds_builder);
52 }
53 }
54}
55
56#[async_trait]
57impl TransformBuilder for ChainedConfiguration {
58 async fn build(&self, context: ComponentContext) -> Result<Box<dyn Transform + Send>, GenericError> {
59 let mut subtransforms = Vec::new();
60 for (subtransform_id, subtransform_builder) in &self.subtransform_builders {
61 let subtransform = subtransform_builder.build(context.clone()).await?;
62 subtransforms.push((subtransform_id.clone(), subtransform));
63 }
64
65 Ok(Box::new(Chained { subtransforms }))
66 }
67
68 fn input_event_type(&self) -> EventType {
69 EventType::all_bits()
70 }
71
72 fn outputs(&self) -> &[OutputDefinition<EventType>] {
73 static OUTPUTS: &[OutputDefinition<EventType>] = &[OutputDefinition::default_output(EventType::all_bits())];
74 OUTPUTS
75 }
76}
77
78pub struct Chained {
79 subtransforms: Vec<(String, Box<dyn SynchronousTransform + Send>)>,
80}
81
82#[async_trait]
83impl Transform for Chained {
84 async fn run(mut self: Box<Self>, mut context: TransformContext) -> Result<(), GenericError> {
85 let mut health = context.take_health_handle();
86
87 debug!(
88 "Chained transform started with {} synchronous subtransform(s) present.",
89 self.subtransforms.len()
90 );
91
92 let mut subtransforms = self
95 .subtransforms
96 .into_iter()
97 .map(|(subtransform_id, subtransform)| {
98 (
99 context.component_registry().get_or_create(subtransform_id).token(),
100 subtransform,
101 )
102 })
103 .collect::<Vec<_>>();
104
105 health.mark_ready();
106 debug!("Chained transform started.");
107
108 loop {
109 select! {
110 _ = health.live() => continue,
111 maybe_events = context.events().next() => match maybe_events {
112 Some(mut event_buffer) => {
113 for (allocation_token, transform) in &mut subtransforms {
114 let _guard = allocation_token.enter();
115 transform.transform_buffer(&mut event_buffer);
116 }
117
118 if let Err(e) = context.dispatcher().dispatch(event_buffer).await {
119 error!(error = %e, "Failed to dispatch events.");
120 }
121 },
122 None => break,
123 },
124 }
125 }
126
127 debug!("Chained transform stopped.");
128
129 Ok(())
130 }
131}