saluki_components/decoders/otlp/
mod.rs1use std::time::Duration;
2
3use async_trait::async_trait;
4use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
5use otlp_protos::opentelemetry::proto::collector::trace::v1::ExportTraceServiceRequest;
6use prost::Message;
7use saluki_config::GenericConfiguration;
8use saluki_core::{
9 components::{
10 decoders::{Decoder, DecoderBuilder, DecoderContext},
11 ComponentContext,
12 },
13 data_model::{event::EventType, payload::PayloadType},
14 topology::interconnect::EventBufferManager,
15};
16use saluki_error::{generic_error, ErrorContext as _, GenericError};
17use serde::Deserialize;
18use tokio::{
19 select,
20 time::{interval, MissedTickBehavior},
21};
22use tracing::{debug, error, warn};
23
24use crate::common::otlp::traces::translator::OtlpTracesTranslator;
25use crate::common::otlp::{
26 build_metrics, config::TracesConfig, Metrics, OTLP_LOGS_GRPC_SERVICE_PATH, OTLP_METRICS_GRPC_SERVICE_PATH,
27 OTLP_TRACES_GRPC_SERVICE_PATH,
28};
29
30#[derive(Deserialize, Default)]
32#[cfg_attr(test, derive(Debug, PartialEq, serde::Serialize))]
33pub struct OtlpDecoderConfiguration {
34 #[serde(default)]
35 otlp_config: OtlpDecoderConfig,
36}
37
38#[derive(Deserialize, Default)]
40#[cfg_attr(test, derive(Debug, PartialEq, serde::Serialize))]
41struct OtlpDecoderConfig {
42 #[serde(default)]
43 traces: TracesConfig,
44}
45
46impl OtlpDecoderConfiguration {
47 pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
49 let mut cfg: Self = config
50 .as_typed()
51 .error_context("Failed to load OTLP decoder configuration")?;
52 cfg.otlp_config.traces.apply_env_overrides(config)?;
53 Ok(cfg)
54 }
55}
56
57#[async_trait]
58impl DecoderBuilder for OtlpDecoderConfiguration {
59 fn input_payload_type(&self) -> PayloadType {
60 PayloadType::Grpc
61 }
62
63 fn output_event_type(&self) -> EventType {
64 EventType::Trace
65 }
66
67 async fn build(&self, context: ComponentContext) -> Result<Box<dyn Decoder + Send>, GenericError> {
68 let metrics = build_metrics(&context);
69 let traces_interner_size =
70 std::num::NonZeroUsize::new(self.otlp_config.traces.string_interner_bytes.as_u64() as usize)
71 .ok_or_else(|| generic_error!("otlp_config.traces.string_interner_size must be greater than 0"))?;
72 let traces_translator = OtlpTracesTranslator::new(self.otlp_config.traces.clone(), traces_interner_size);
73
74 Ok(Box::new(OtlpDecoder {
75 traces_translator,
76 metrics,
77 }))
78 }
79}
80
81impl MemoryBounds for OtlpDecoderConfiguration {
82 fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
83 builder.minimum().with_single_value::<OtlpDecoder>("decoder struct");
84 }
85}
86
87pub struct OtlpDecoder {
89 traces_translator: OtlpTracesTranslator,
90 metrics: Metrics,
91}
92
93#[async_trait]
94impl Decoder for OtlpDecoder {
95 async fn run(self: Box<Self>, mut context: DecoderContext) -> Result<(), GenericError> {
96 let Self {
97 mut traces_translator,
98 metrics,
99 } = *self;
100 let mut health = context.take_health_handle();
101 health.mark_ready();
102
103 debug!("OTLP decoder started.");
104
105 let mut buffer_flush = interval(Duration::from_millis(100));
107 buffer_flush.set_missed_tick_behavior(MissedTickBehavior::Delay);
108
109 let mut event_buffer_manager = EventBufferManager::default();
110
111 loop {
112 select! {
113 maybe_payload = context.payloads().next() => {
114 let payload = match maybe_payload {
115 Some(payload) => payload,
116 None => {
117 debug!("Payloads stream closed, shutting down decoder.");
118 break;
119 }
120 };
121
122 let grpc_payload = match payload.try_into_grpc_payload() {
123 Some(grpc) => grpc,
124 None => {
125 warn!("Received non-gRPC payload in OTLP decoder. Dropping payload.");
126 continue;
127 }
128 };
129
130 match grpc_payload.service_path() {
131 path if path == &*OTLP_TRACES_GRPC_SERVICE_PATH => {
132 let (_, _, _, body) = grpc_payload.into_parts();
133 let request = match ExportTraceServiceRequest::decode(body.into_bytes()) {
134 Ok(req) => req,
135 Err(e) => {
136 error!(error = %e, "Failed to decode OTLP trace request.");
137 continue;
138 }
139 };
140
141 for resource_spans in request.resource_spans {
142 for trace_event in traces_translator.translate_spans(resource_spans, &metrics) {
143 if let Some(event_buffer) = event_buffer_manager.try_push(trace_event) {
144 if let Err(e) = context.dispatcher().dispatch(event_buffer).await {
145 error!(error = %e, "Failed to dispatch trace events.");
146 }
147 }
148 }
149 }
150 }
151 path if path == &*OTLP_METRICS_GRPC_SERVICE_PATH => {
152 warn!("OTLP metrics decoding not yet implemented. Dropping metrics payload.");
153 }
154 path if path == &*OTLP_LOGS_GRPC_SERVICE_PATH => {
155 warn!("OTLP logs decoding not yet implemented. Dropping logs payload.");
156 }
157 path => {
158 warn!(service_path = path, "Received gRPC payload with unknown service path. Dropping payload.");
159 }
160 }
161 },
162 _ = buffer_flush.tick() => {
163 if let Some(event_buffer) = event_buffer_manager.consume() {
164 if let Err(e) = context.dispatcher().dispatch(event_buffer).await {
165 error!(error = %e, "Failed to dispatch buffered trace events.");
166 }
167 }
168 },
169 _ = health.live() => continue,
170 }
171 }
172
173 if let Some(event_buffer) = event_buffer_manager.consume() {
174 if let Err(e) = context.dispatcher().dispatch(event_buffer).await {
175 error!(error = %e, "Failed to dispatch final trace events.");
176 }
177 }
178
179 debug!("OTLP decoder stopped.");
180
181 Ok(())
182 }
183}
184
185#[cfg(test)]
186mod config_smoke {
187 use serde_json::json;
188
189 use super::OtlpDecoderConfiguration;
190 use crate::config_registry::structs;
191 use crate::config_registry::test_support::run_config_smoke_tests;
192
193 #[tokio::test]
194 async fn smoke_test() {
195 run_config_smoke_tests(structs::OTLP_DECODER_CONFIGURATION, &[], json!({}), |cfg| {
196 OtlpDecoderConfiguration::from_configuration(&cfg).expect("OtlpDecoderConfiguration should deserialize")
197 })
198 .await
199 }
200}