saluki_components/decoders/otlp/
mod.rs

1use std::time::Duration;
2
3use async_trait::async_trait;
4use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
5use otlp_protos::opentelemetry::proto::collector::trace::v1::ExportTraceServiceRequest;
6use prost::Message;
7use saluki_config::GenericConfiguration;
8use saluki_core::{
9    components::{
10        decoders::{Decoder, DecoderBuilder, DecoderContext},
11        ComponentContext,
12    },
13    data_model::{event::EventType, payload::PayloadType},
14    topology::interconnect::EventBufferManager,
15};
16use saluki_error::{ErrorContext as _, GenericError};
17use serde::Deserialize;
18use tokio::{
19    select,
20    time::{interval, MissedTickBehavior},
21};
22use tracing::{debug, error, warn};
23
24use crate::common::otlp::traces::translator::OtlpTracesTranslator;
25use crate::common::otlp::{
26    build_metrics, config::TracesConfig, Metrics, OTLP_LOGS_GRPC_SERVICE_PATH, OTLP_METRICS_GRPC_SERVICE_PATH,
27    OTLP_TRACES_GRPC_SERVICE_PATH,
28};
29
30/// Configuration for the OTLP decoder.
31#[derive(Deserialize, Default)]
32pub struct OtlpDecoderConfiguration {
33    #[serde(default)]
34    otlp_config: OtlpDecoderConfig,
35}
36
37/// OTLP configuration for the decoder.
38#[derive(Deserialize, Default)]
39struct OtlpDecoderConfig {
40    #[serde(default)]
41    traces: TracesConfig,
42}
43
44impl OtlpDecoderConfiguration {
45    /// Creates a new `OtlpDecoderConfiguration` from the given configuration.
46    pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
47        config
48            .as_typed()
49            .error_context("Failed to load OTLP decoder configuration")
50    }
51}
52
53#[async_trait]
54impl DecoderBuilder for OtlpDecoderConfiguration {
55    fn input_payload_type(&self) -> PayloadType {
56        PayloadType::Grpc
57    }
58
59    fn output_event_type(&self) -> EventType {
60        EventType::Trace
61    }
62
63    async fn build(&self, context: ComponentContext) -> Result<Box<dyn Decoder + Send>, GenericError> {
64        let metrics = build_metrics(&context);
65        let traces_translator = OtlpTracesTranslator::new(self.otlp_config.traces.clone());
66
67        Ok(Box::new(OtlpDecoder {
68            traces_translator,
69            metrics,
70        }))
71    }
72}
73
74impl MemoryBounds for OtlpDecoderConfiguration {
75    fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
76        builder.minimum().with_single_value::<OtlpDecoder>("decoder struct");
77    }
78}
79
80/// OTLP decoder.
81pub struct OtlpDecoder {
82    traces_translator: OtlpTracesTranslator,
83    metrics: Metrics,
84}
85
86#[async_trait]
87impl Decoder for OtlpDecoder {
88    async fn run(self: Box<Self>, mut context: DecoderContext) -> Result<(), GenericError> {
89        let mut health = context.take_health_handle();
90        health.mark_ready();
91
92        debug!("OTLP decoder started.");
93
94        // Set a buffer flush interval of 100ms to ensure we flush buffered events periodically.
95        let mut buffer_flush = interval(Duration::from_millis(100));
96        buffer_flush.set_missed_tick_behavior(MissedTickBehavior::Delay);
97
98        let mut event_buffer_manager = EventBufferManager::default();
99
100        loop {
101            select! {
102                maybe_payload = context.payloads().next() => {
103                    let payload = match maybe_payload {
104                        Some(payload) => payload,
105                        None => {
106                            debug!("Payloads stream closed, shutting down decoder.");
107                            break;
108                        }
109                    };
110
111                    let grpc_payload = match payload.try_into_grpc_payload() {
112                        Some(grpc) => grpc,
113                        None => {
114                            warn!("Received non-gRPC payload in OTLP decoder. Dropping payload.");
115                            continue;
116                        }
117                    };
118
119                    match grpc_payload.service_path() {
120                        path if path == &*OTLP_TRACES_GRPC_SERVICE_PATH => {
121                            let (_, _, _, body) = grpc_payload.into_parts();
122                            let request = match ExportTraceServiceRequest::decode(body.into_bytes()) {
123                                Ok(req) => req,
124                                Err(e) => {
125                                    error!(error = %e, "Failed to decode OTLP trace request.");
126                                    continue;
127                                }
128                            };
129
130                            for resource_spans in request.resource_spans {
131                                let trace_events = self.traces_translator.translate_resource_spans(resource_spans, &self.metrics);
132                                for trace_event in trace_events {
133                                    if let Some(event_buffer) = event_buffer_manager.try_push(trace_event) {
134                                        if let Err(e) = context.dispatcher().dispatch(event_buffer).await {
135                                            error!(error = %e, "Failed to dispatch trace events.");
136                                        }
137                                    }
138                                }
139                            }
140                        }
141                        path if path == &*OTLP_METRICS_GRPC_SERVICE_PATH => {
142                            warn!("OTLP metrics decoding not yet implemented. Dropping metrics payload.");
143                        }
144                        path if path == &*OTLP_LOGS_GRPC_SERVICE_PATH => {
145                            warn!("OTLP logs decoding not yet implemented. Dropping logs payload.");
146                        }
147                        path => {
148                            warn!(service_path = path, "Received gRPC payload with unknown service path. Dropping payload.");
149                        }
150                    }
151                },
152                _ = buffer_flush.tick() => {
153                    if let Some(event_buffer) = event_buffer_manager.consume() {
154                        if let Err(e) = context.dispatcher().dispatch(event_buffer).await {
155                            error!(error = %e, "Failed to dispatch buffered trace events.");
156                        }
157                    }
158                },
159                _ = health.live() => continue,
160            }
161        }
162
163        if let Some(event_buffer) = event_buffer_manager.consume() {
164            if let Err(e) = context.dispatcher().dispatch(event_buffer).await {
165                error!(error = %e, "Failed to dispatch final trace events.");
166            }
167        }
168
169        debug!("OTLP decoder stopped.");
170
171        Ok(())
172    }
173}