saluki_components/encoders/buffered_incremental/
mod.rs

1use std::time::Duration;
2
3use async_trait::async_trait;
4use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
5use saluki_common::task::HandleExt as _;
6use saluki_core::{
7    components::{encoders::*, ComponentContext},
8    data_model::{event::EventType, payload::PayloadType},
9    observability::ComponentMetricsExt,
10};
11use saluki_error::GenericError;
12use saluki_metrics::MetricsBuilder;
13use tokio::{select, time::sleep};
14use tracing::{debug, error};
15
16mod telemetry;
17use self::telemetry::ComponentTelemetry;
18
19const DEFAULT_FLUSH_TIMEOUT: Duration = Duration::from_secs(2);
20
21/// Buffered incremental encoder.
22///
23/// Wraps an `IncrementalEncoder` and drives it with incoming events, allowing buffering by utilizing a configurable
24/// flush timeout. Payloads are encoded on the global thread pool to avoid affecting latency-sensitive tasks.
25pub struct BufferedIncrementalConfiguration<EB> {
26    /// Flush timeout for pending requests.
27    ///
28    /// When the encoder has written events to the in-flight request payload, but it has not yet reached the
29    /// payload size limits that would force the payload to be flushed, the encoder will wait for a period of time
30    /// before flushing the in-flight request payload. This allows for the possibility of other events to be processed
31    /// and written into the request payload, thereby maximizing the payload size and reducing the number of requests
32    /// generated and sent overall.
33    ///
34    /// Defaults to 2 seconds.
35    flush_timeout: Duration,
36
37    encoder_builder: EB,
38}
39
40impl<EB> BufferedIncrementalConfiguration<EB>
41where
42    EB: IncrementalEncoderBuilder,
43{
44    /// Creates a new `BufferedIncrementalConfiguration` from the given incremental encoder builder.
45    pub fn from_encoder_builder(encoder_builder: EB) -> Self {
46        Self {
47            flush_timeout: DEFAULT_FLUSH_TIMEOUT,
48            encoder_builder,
49        }
50    }
51}
52
53#[async_trait]
54impl<EB> EncoderBuilder for BufferedIncrementalConfiguration<EB>
55where
56    EB: IncrementalEncoderBuilder + Sync,
57    EB::Output: Send + 'static,
58{
59    fn input_event_type(&self) -> EventType {
60        self.encoder_builder.input_event_type()
61    }
62
63    fn output_payload_type(&self) -> PayloadType {
64        self.encoder_builder.output_payload_type()
65    }
66
67    async fn build(&self, context: ComponentContext) -> Result<Box<dyn Encoder + Send>, GenericError> {
68        let metrics_builder = MetricsBuilder::from_component_context(&context);
69        let telemetry = ComponentTelemetry::from_builder(&metrics_builder);
70
71        let encoder = self.encoder_builder.build(context).await?;
72
73        let flush_timeout = match self.flush_timeout {
74            // We always give ourselves a minimum flush timeout of 10ms to allow for some very minimal amount of
75            // batching, while still practically flushing things almost immediately.
76            Duration::ZERO => Duration::from_millis(10),
77            dur => dur,
78        };
79
80        Ok(Box::new(BufferedIncremental {
81            encoder,
82            telemetry,
83            flush_timeout,
84        }))
85    }
86}
87
88impl<EB> MemoryBounds for BufferedIncrementalConfiguration<EB>
89where
90    EB: IncrementalEncoderBuilder,
91{
92    fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
93        builder
94            .minimum()
95            .with_single_value::<BufferedIncremental<EB::Output>>("component struct");
96
97        self.encoder_builder.specify_bounds(builder);
98    }
99}
100
101pub struct BufferedIncremental<E> {
102    encoder: E,
103    telemetry: ComponentTelemetry,
104    flush_timeout: Duration,
105}
106
107#[async_trait]
108impl<E> Encoder for BufferedIncremental<E>
109where
110    E: IncrementalEncoder + Send + 'static,
111{
112    async fn run(mut self: Box<Self>, context: EncoderContext) -> Result<(), GenericError> {
113        let Self {
114            encoder,
115            telemetry,
116            flush_timeout,
117        } = *self;
118
119        // Spawn our background incremental encoder task.
120        let thread_pool_handle = context.topology_context().global_thread_pool().clone();
121        let runner_name = format!(
122            "{}-incremental-encoder",
123            context.component_context().component_id().replace("_", "-")
124        );
125        let runner = run_incremental_encoder(context, encoder, telemetry, flush_timeout);
126        let runner_handle = thread_pool_handle.spawn_traced_named(runner_name, runner);
127
128        debug!("Buffered Incremental encoder started.");
129
130        // Simply wait for the runner to finish.
131        //
132        // It handles all of the health checking, event consuming, encoding, dispatching, etc.
133        match runner_handle.await {
134            Ok(Ok(())) => debug!("Incremental encoder task stopped."),
135            Ok(Err(e)) => error!(error = %e, "Incremental encoder task failed."),
136            Err(e) => error!(error = %e, "Incremental encoder task panicked."),
137        }
138
139        debug!("Buffered Incremental encoder stopped.");
140
141        Ok(())
142    }
143}
144
145async fn run_incremental_encoder<E>(
146    mut context: EncoderContext, mut encoder: E, telemetry: ComponentTelemetry, flush_timeout: Duration,
147) -> Result<(), GenericError>
148where
149    E: IncrementalEncoder,
150{
151    let mut health = context.take_health_handle();
152
153    health.mark_ready();
154
155    let mut pending_flush = false;
156    let pending_flush_timeout = sleep(flush_timeout);
157    tokio::pin!(pending_flush_timeout);
158
159    loop {
160        select! {
161            _ = health.live() => continue,
162            maybe_event_buffer = context.events().next() => {
163                // Break out of our loop when the events channel is closed.
164                let event_buffer = match maybe_event_buffer {
165                    Some(event_buffer) => event_buffer,
166                    None => break,
167                };
168
169                for event in event_buffer {
170                    // Try to process the event.
171                    //
172                    // If we're informed that we need to flush, we'll hold on to this event before triggering a flush and then
173                    // retry processing it after flushing.
174                    let event_to_retry = match encoder.process_event(event).await? {
175                        ProcessResult::Continue => continue,
176                        ProcessResult::FlushRequired(event) => event,
177                    };
178
179                    // Flush the encoder, waiting any payloads it has generated.
180                    encoder.flush(context.dispatcher()).await?;
181
182                    // Now try to process the event again.
183                    //
184                    // If this fails, then we drop the event because it's a logical bug to not be able to encode an event after
185                    // flushing, and we don't want to get stuck in an infinite loop.
186                    match encoder.process_event(event_to_retry).await? {
187                        ProcessResult::Continue => {},
188                        ProcessResult::FlushRequired(_) => {
189                            error!("Failed to process event after flushing.");
190                            telemetry.events_dropped_encoder().increment(1);
191                        },
192                    }
193                }
194
195                debug!("Processed event buffer.");
196
197                // If we're not already pending a flush, we'll start the countdown.
198                if !pending_flush {
199                    pending_flush_timeout.as_mut().reset(tokio::time::Instant::now() + flush_timeout);
200                    pending_flush = true;
201                }
202            },
203            _ = &mut pending_flush_timeout, if pending_flush => {
204                debug!("Flushing encoder of any pending payload(s).");
205
206                pending_flush = false;
207
208                encoder.flush(context.dispatcher()).await?;
209
210                debug!("All pending payloads flushed.");
211            }
212        }
213    }
214
215    // Do a final flush since we may have had a pending payloads before breaking out of the loop.
216    encoder.flush(context.dispatcher()).await?;
217
218    Ok(())
219}