Skip to main content

saluki_components/decoders/otlp/
mod.rs

1use std::time::Duration;
2
3use async_trait::async_trait;
4use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
5use otlp_protos::opentelemetry::proto::collector::trace::v1::ExportTraceServiceRequest;
6use prost::Message;
7use saluki_config::GenericConfiguration;
8use saluki_core::{
9    components::{
10        decoders::{Decoder, DecoderBuilder, DecoderContext},
11        ComponentContext,
12    },
13    data_model::{event::EventType, payload::PayloadType},
14    topology::interconnect::EventBufferManager,
15};
16use saluki_error::{generic_error, ErrorContext as _, GenericError};
17use serde::Deserialize;
18use tokio::{
19    select,
20    time::{interval, MissedTickBehavior},
21};
22use tracing::{debug, error, warn};
23
24use crate::common::otlp::traces::translator::OtlpTracesTranslator;
25use crate::common::otlp::{
26    build_metrics, config::TracesConfig, Metrics, OTLP_LOGS_GRPC_SERVICE_PATH, OTLP_METRICS_GRPC_SERVICE_PATH,
27    OTLP_TRACES_GRPC_SERVICE_PATH,
28};
29
30/// Configuration for the OTLP decoder.
31#[derive(Deserialize, Default)]
32pub struct OtlpDecoderConfiguration {
33    #[serde(default)]
34    otlp_config: OtlpDecoderConfig,
35}
36
37/// OTLP configuration for the decoder.
38#[derive(Deserialize, Default)]
39struct OtlpDecoderConfig {
40    #[serde(default)]
41    traces: TracesConfig,
42}
43
44impl OtlpDecoderConfiguration {
45    /// Creates a new `OtlpDecoderConfiguration` from the given configuration.
46    pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
47        config
48            .as_typed()
49            .error_context("Failed to load OTLP decoder configuration")
50    }
51}
52
53#[async_trait]
54impl DecoderBuilder for OtlpDecoderConfiguration {
55    fn input_payload_type(&self) -> PayloadType {
56        PayloadType::Grpc
57    }
58
59    fn output_event_type(&self) -> EventType {
60        EventType::Trace
61    }
62
63    async fn build(&self, context: ComponentContext) -> Result<Box<dyn Decoder + Send>, GenericError> {
64        let metrics = build_metrics(&context);
65        let traces_interner_size =
66            std::num::NonZeroUsize::new(self.otlp_config.traces.string_interner_bytes.as_u64() as usize)
67                .ok_or_else(|| generic_error!("otlp_config.traces.string_interner_size must be greater than 0"))?;
68        let traces_translator = OtlpTracesTranslator::new(self.otlp_config.traces.clone(), traces_interner_size);
69
70        Ok(Box::new(OtlpDecoder {
71            traces_translator,
72            metrics,
73        }))
74    }
75}
76
77impl MemoryBounds for OtlpDecoderConfiguration {
78    fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
79        builder.minimum().with_single_value::<OtlpDecoder>("decoder struct");
80    }
81}
82
83/// OTLP decoder.
84pub struct OtlpDecoder {
85    traces_translator: OtlpTracesTranslator,
86    metrics: Metrics,
87}
88
89#[async_trait]
90impl Decoder for OtlpDecoder {
91    async fn run(self: Box<Self>, mut context: DecoderContext) -> Result<(), GenericError> {
92        let Self {
93            mut traces_translator,
94            metrics,
95        } = *self;
96        let mut health = context.take_health_handle();
97        health.mark_ready();
98
99        debug!("OTLP decoder started.");
100
101        // Set a buffer flush interval of 100ms to ensure we flush buffered events periodically.
102        let mut buffer_flush = interval(Duration::from_millis(100));
103        buffer_flush.set_missed_tick_behavior(MissedTickBehavior::Delay);
104
105        let mut event_buffer_manager = EventBufferManager::default();
106
107        loop {
108            select! {
109                maybe_payload = context.payloads().next() => {
110                    let payload = match maybe_payload {
111                        Some(payload) => payload,
112                        None => {
113                            debug!("Payloads stream closed, shutting down decoder.");
114                            break;
115                        }
116                    };
117
118                    let grpc_payload = match payload.try_into_grpc_payload() {
119                        Some(grpc) => grpc,
120                        None => {
121                            warn!("Received non-gRPC payload in OTLP decoder. Dropping payload.");
122                            continue;
123                        }
124                    };
125
126                    match grpc_payload.service_path() {
127                        path if path == &*OTLP_TRACES_GRPC_SERVICE_PATH => {
128                            let (_, _, _, body) = grpc_payload.into_parts();
129                            let request = match ExportTraceServiceRequest::decode(body.into_bytes()) {
130                                Ok(req) => req,
131                                Err(e) => {
132                                    error!(error = %e, "Failed to decode OTLP trace request.");
133                                    continue;
134                                }
135                            };
136
137                            for resource_spans in request.resource_spans {
138                                for trace_event in traces_translator.translate_spans(resource_spans, &metrics) {
139                                    if let Some(event_buffer) = event_buffer_manager.try_push(trace_event) {
140                                        if let Err(e) = context.dispatcher().dispatch(event_buffer).await {
141                                            error!(error = %e, "Failed to dispatch trace events.");
142                                        }
143                                    }
144                                }
145                            }
146                        }
147                        path if path == &*OTLP_METRICS_GRPC_SERVICE_PATH => {
148                            warn!("OTLP metrics decoding not yet implemented. Dropping metrics payload.");
149                        }
150                        path if path == &*OTLP_LOGS_GRPC_SERVICE_PATH => {
151                            warn!("OTLP logs decoding not yet implemented. Dropping logs payload.");
152                        }
153                        path => {
154                            warn!(service_path = path, "Received gRPC payload with unknown service path. Dropping payload.");
155                        }
156                    }
157                },
158                _ = buffer_flush.tick() => {
159                    if let Some(event_buffer) = event_buffer_manager.consume() {
160                        if let Err(e) = context.dispatcher().dispatch(event_buffer).await {
161                            error!(error = %e, "Failed to dispatch buffered trace events.");
162                        }
163                    }
164                },
165                _ = health.live() => continue,
166            }
167        }
168
169        if let Some(event_buffer) = event_buffer_manager.consume() {
170            if let Err(e) = context.dispatcher().dispatch(event_buffer).await {
171                error!(error = %e, "Failed to dispatch final trace events.");
172            }
173        }
174
175        debug!("OTLP decoder stopped.");
176
177        Ok(())
178    }
179}