saluki_components/encoders/buffered_incremental/
mod.rs1use std::time::Duration;
2
3use async_trait::async_trait;
4use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
5use saluki_common::task::HandleExt as _;
6use saluki_core::{
7 components::{encoders::*, ComponentContext},
8 data_model::{event::EventType, payload::PayloadType},
9 observability::ComponentMetricsExt,
10};
11use saluki_error::GenericError;
12use saluki_metrics::MetricsBuilder;
13use tokio::{select, time::sleep};
14use tracing::{debug, error};
15
16mod telemetry;
17use self::telemetry::ComponentTelemetry;
18
19const DEFAULT_FLUSH_TIMEOUT: Duration = Duration::from_secs(2);
20
21pub struct BufferedIncrementalConfiguration<EB> {
26 flush_timeout: Duration,
36
37 encoder_builder: EB,
38}
39
40impl<EB> BufferedIncrementalConfiguration<EB>
41where
42 EB: IncrementalEncoderBuilder,
43{
44 pub fn from_encoder_builder(encoder_builder: EB) -> Self {
46 Self {
47 flush_timeout: DEFAULT_FLUSH_TIMEOUT,
48 encoder_builder,
49 }
50 }
51}
52
53#[async_trait]
54impl<EB> EncoderBuilder for BufferedIncrementalConfiguration<EB>
55where
56 EB: IncrementalEncoderBuilder + Sync,
57 EB::Output: Send + 'static,
58{
59 fn input_event_type(&self) -> EventType {
60 self.encoder_builder.input_event_type()
61 }
62
63 fn output_payload_type(&self) -> PayloadType {
64 self.encoder_builder.output_payload_type()
65 }
66
67 async fn build(&self, context: ComponentContext) -> Result<Box<dyn Encoder + Send>, GenericError> {
68 let metrics_builder = MetricsBuilder::from_component_context(&context);
69 let telemetry = ComponentTelemetry::from_builder(&metrics_builder);
70
71 let encoder = self.encoder_builder.build(context).await?;
72
73 let flush_timeout = match self.flush_timeout {
74 Duration::ZERO => Duration::from_millis(10),
77 dur => dur,
78 };
79
80 Ok(Box::new(BufferedIncremental {
81 encoder,
82 telemetry,
83 flush_timeout,
84 }))
85 }
86}
87
88impl<EB> MemoryBounds for BufferedIncrementalConfiguration<EB>
89where
90 EB: IncrementalEncoderBuilder,
91{
92 fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
93 builder
94 .minimum()
95 .with_single_value::<BufferedIncremental<EB::Output>>("component struct");
96
97 self.encoder_builder.specify_bounds(builder);
98 }
99}
100
101pub struct BufferedIncremental<E> {
102 encoder: E,
103 telemetry: ComponentTelemetry,
104 flush_timeout: Duration,
105}
106
107#[async_trait]
108impl<E> Encoder for BufferedIncremental<E>
109where
110 E: IncrementalEncoder + Send + 'static,
111{
112 async fn run(mut self: Box<Self>, context: EncoderContext) -> Result<(), GenericError> {
113 let Self {
114 encoder,
115 telemetry,
116 flush_timeout,
117 } = *self;
118
119 let thread_pool_handle = context.topology_context().global_thread_pool().clone();
121 let runner_name = format!(
122 "{}-incremental-encoder",
123 context.component_context().component_id().replace("_", "-")
124 );
125 let runner = run_incremental_encoder(context, encoder, telemetry, flush_timeout);
126 let runner_handle = thread_pool_handle.spawn_traced_named(runner_name, runner);
127
128 debug!("Buffered Incremental encoder started.");
129
130 match runner_handle.await {
134 Ok(Ok(())) => debug!("Incremental encoder task stopped."),
135 Ok(Err(e)) => error!(error = %e, "Incremental encoder task failed."),
136 Err(e) => error!(error = %e, "Incremental encoder task panicked."),
137 }
138
139 debug!("Buffered Incremental encoder stopped.");
140
141 Ok(())
142 }
143}
144
145async fn run_incremental_encoder<E>(
146 mut context: EncoderContext, mut encoder: E, telemetry: ComponentTelemetry, flush_timeout: Duration,
147) -> Result<(), GenericError>
148where
149 E: IncrementalEncoder,
150{
151 let mut health = context.take_health_handle();
152
153 health.mark_ready();
154
155 let mut pending_flush = false;
156 let pending_flush_timeout = sleep(flush_timeout);
157 tokio::pin!(pending_flush_timeout);
158
159 loop {
160 select! {
161 _ = health.live() => continue,
162 maybe_event_buffer = context.events().next() => {
163 let event_buffer = match maybe_event_buffer {
165 Some(event_buffer) => event_buffer,
166 None => break,
167 };
168
169 for event in event_buffer {
170 let event_to_retry = match encoder.process_event(event).await? {
175 ProcessResult::Continue => continue,
176 ProcessResult::FlushRequired(event) => event,
177 };
178
179 encoder.flush(context.dispatcher()).await?;
181
182 match encoder.process_event(event_to_retry).await? {
187 ProcessResult::Continue => {},
188 ProcessResult::FlushRequired(_) => {
189 error!("Failed to process event after flushing.");
190 telemetry.events_dropped_encoder().increment(1);
191 },
192 }
193 }
194
195 debug!("Processed event buffer.");
196
197 if !pending_flush {
199 pending_flush_timeout.as_mut().reset(tokio::time::Instant::now() + flush_timeout);
200 pending_flush = true;
201 }
202 },
203 _ = &mut pending_flush_timeout, if pending_flush => {
204 debug!("Flushing encoder of any pending payload(s).");
205
206 pending_flush = false;
207
208 encoder.flush(context.dispatcher()).await?;
209
210 debug!("All pending payloads flushed.");
211 }
212 }
213 }
214
215 encoder.flush(context.dispatcher()).await?;
217
218 Ok(())
219}