Skip to main content

saluki_components/decoders/otlp/
mod.rs

1use std::time::Duration;
2
3use async_trait::async_trait;
4use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
5use otlp_protos::opentelemetry::proto::collector::trace::v1::ExportTraceServiceRequest;
6use prost::Message;
7use saluki_config::GenericConfiguration;
8use saluki_core::{
9    components::{
10        decoders::{Decoder, DecoderBuilder, DecoderContext},
11        ComponentContext,
12    },
13    data_model::{event::EventType, payload::PayloadType},
14    topology::interconnect::EventBufferManager,
15};
16use saluki_error::{generic_error, ErrorContext as _, GenericError};
17use serde::Deserialize;
18use tokio::{
19    select,
20    time::{interval, MissedTickBehavior},
21};
22use tracing::{debug, error, warn};
23
24use crate::common::otlp::traces::translator::OtlpTracesTranslator;
25use crate::common::otlp::{
26    build_metrics, config::TracesConfig, Metrics, OTLP_LOGS_GRPC_SERVICE_PATH, OTLP_METRICS_GRPC_SERVICE_PATH,
27    OTLP_TRACES_GRPC_SERVICE_PATH,
28};
29
30/// Configuration for the OTLP decoder.
31#[derive(Deserialize, Default)]
32#[cfg_attr(test, derive(Debug, PartialEq, serde::Serialize))]
33pub struct OtlpDecoderConfiguration {
34    #[serde(default)]
35    otlp_config: OtlpDecoderConfig,
36}
37
38/// OTLP configuration for the decoder.
39#[derive(Deserialize, Default)]
40#[cfg_attr(test, derive(Debug, PartialEq, serde::Serialize))]
41struct OtlpDecoderConfig {
42    #[serde(default)]
43    traces: TracesConfig,
44}
45
46impl OtlpDecoderConfiguration {
47    /// Creates a new `OtlpDecoderConfiguration` from the given configuration.
48    pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
49        let mut cfg: Self = config
50            .as_typed()
51            .error_context("Failed to load OTLP decoder configuration")?;
52        cfg.otlp_config.traces.apply_env_overrides(config)?;
53        Ok(cfg)
54    }
55}
56
57#[async_trait]
58impl DecoderBuilder for OtlpDecoderConfiguration {
59    fn input_payload_type(&self) -> PayloadType {
60        PayloadType::Grpc
61    }
62
63    fn output_event_type(&self) -> EventType {
64        EventType::Trace
65    }
66
67    async fn build(&self, context: ComponentContext) -> Result<Box<dyn Decoder + Send>, GenericError> {
68        let metrics = build_metrics(&context);
69        let traces_interner_size =
70            std::num::NonZeroUsize::new(self.otlp_config.traces.string_interner_bytes.as_u64() as usize)
71                .ok_or_else(|| generic_error!("otlp_config.traces.string_interner_size must be greater than 0"))?;
72        let traces_translator = OtlpTracesTranslator::new(self.otlp_config.traces.clone(), traces_interner_size);
73
74        Ok(Box::new(OtlpDecoder {
75            traces_translator,
76            metrics,
77        }))
78    }
79}
80
81impl MemoryBounds for OtlpDecoderConfiguration {
82    fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
83        builder.minimum().with_single_value::<OtlpDecoder>("decoder struct");
84    }
85}
86
87/// OTLP decoder.
88pub struct OtlpDecoder {
89    traces_translator: OtlpTracesTranslator,
90    metrics: Metrics,
91}
92
93#[async_trait]
94impl Decoder for OtlpDecoder {
95    async fn run(self: Box<Self>, mut context: DecoderContext) -> Result<(), GenericError> {
96        let Self {
97            mut traces_translator,
98            metrics,
99        } = *self;
100        let mut health = context.take_health_handle();
101        health.mark_ready();
102
103        debug!("OTLP decoder started.");
104
105        // Set a buffer flush interval of 100ms to ensure we flush buffered events periodically.
106        let mut buffer_flush = interval(Duration::from_millis(100));
107        buffer_flush.set_missed_tick_behavior(MissedTickBehavior::Delay);
108
109        let mut event_buffer_manager = EventBufferManager::default();
110
111        loop {
112            select! {
113                maybe_payload = context.payloads().next() => {
114                    let payload = match maybe_payload {
115                        Some(payload) => payload,
116                        None => {
117                            debug!("Payloads stream closed, shutting down decoder.");
118                            break;
119                        }
120                    };
121
122                    let grpc_payload = match payload.try_into_grpc_payload() {
123                        Some(grpc) => grpc,
124                        None => {
125                            warn!("Received non-gRPC payload in OTLP decoder. Dropping payload.");
126                            continue;
127                        }
128                    };
129
130                    match grpc_payload.service_path() {
131                        path if path == &*OTLP_TRACES_GRPC_SERVICE_PATH => {
132                            let (_, _, _, body) = grpc_payload.into_parts();
133                            let request = match ExportTraceServiceRequest::decode(body.into_bytes()) {
134                                Ok(req) => req,
135                                Err(e) => {
136                                    error!(error = %e, "Failed to decode OTLP trace request.");
137                                    continue;
138                                }
139                            };
140
141                            for resource_spans in request.resource_spans {
142                                for trace_event in traces_translator.translate_spans(resource_spans, &metrics) {
143                                    if let Some(event_buffer) = event_buffer_manager.try_push(trace_event) {
144                                        if let Err(e) = context.dispatcher().dispatch(event_buffer).await {
145                                            error!(error = %e, "Failed to dispatch trace events.");
146                                        }
147                                    }
148                                }
149                            }
150                        }
151                        path if path == &*OTLP_METRICS_GRPC_SERVICE_PATH => {
152                            warn!("OTLP metrics decoding not yet implemented. Dropping metrics payload.");
153                        }
154                        path if path == &*OTLP_LOGS_GRPC_SERVICE_PATH => {
155                            warn!("OTLP logs decoding not yet implemented. Dropping logs payload.");
156                        }
157                        path => {
158                            warn!(service_path = path, "Received gRPC payload with unknown service path. Dropping payload.");
159                        }
160                    }
161                },
162                _ = buffer_flush.tick() => {
163                    if let Some(event_buffer) = event_buffer_manager.consume() {
164                        if let Err(e) = context.dispatcher().dispatch(event_buffer).await {
165                            error!(error = %e, "Failed to dispatch buffered trace events.");
166                        }
167                    }
168                },
169                _ = health.live() => continue,
170            }
171        }
172
173        if let Some(event_buffer) = event_buffer_manager.consume() {
174            if let Err(e) = context.dispatcher().dispatch(event_buffer).await {
175                error!(error = %e, "Failed to dispatch final trace events.");
176            }
177        }
178
179        debug!("OTLP decoder stopped.");
180
181        Ok(())
182    }
183}
184
185#[cfg(test)]
186mod config_smoke {
187    use serde_json::json;
188
189    use super::OtlpDecoderConfiguration;
190    use crate::config_registry::structs;
191    use crate::config_registry::test_support::run_config_smoke_tests;
192
193    #[tokio::test]
194    async fn smoke_test() {
195        run_config_smoke_tests(structs::OTLP_DECODER_CONFIGURATION, &[], json!({}), |cfg| {
196            OtlpDecoderConfiguration::from_configuration(&cfg).expect("OtlpDecoderConfiguration should deserialize")
197        })
198        .await
199    }
200}