saluki_components/decoders/otlp/
mod.rs1use std::time::Duration;
2
3use async_trait::async_trait;
4use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
5use otlp_protos::opentelemetry::proto::collector::trace::v1::ExportTraceServiceRequest;
6use prost::Message;
7use saluki_config::GenericConfiguration;
8use saluki_core::{
9 components::{
10 decoders::{Decoder, DecoderBuilder, DecoderContext},
11 ComponentContext,
12 },
13 data_model::{event::EventType, payload::PayloadType},
14 topology::interconnect::EventBufferManager,
15};
16use saluki_error::{generic_error, ErrorContext as _, GenericError};
17use serde::Deserialize;
18use tokio::{
19 select,
20 time::{interval, MissedTickBehavior},
21};
22use tracing::{debug, error, warn};
23
24use crate::common::otlp::traces::translator::OtlpTracesTranslator;
25use crate::common::otlp::{
26 build_metrics, config::TracesConfig, Metrics, OTLP_LOGS_GRPC_SERVICE_PATH, OTLP_METRICS_GRPC_SERVICE_PATH,
27 OTLP_TRACES_GRPC_SERVICE_PATH,
28};
29
30#[derive(Deserialize, Default)]
32pub struct OtlpDecoderConfiguration {
33 #[serde(default)]
34 otlp_config: OtlpDecoderConfig,
35}
36
37#[derive(Deserialize, Default)]
39struct OtlpDecoderConfig {
40 #[serde(default)]
41 traces: TracesConfig,
42}
43
44impl OtlpDecoderConfiguration {
45 pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
47 config
48 .as_typed()
49 .error_context("Failed to load OTLP decoder configuration")
50 }
51}
52
53#[async_trait]
54impl DecoderBuilder for OtlpDecoderConfiguration {
55 fn input_payload_type(&self) -> PayloadType {
56 PayloadType::Grpc
57 }
58
59 fn output_event_type(&self) -> EventType {
60 EventType::Trace
61 }
62
63 async fn build(&self, context: ComponentContext) -> Result<Box<dyn Decoder + Send>, GenericError> {
64 let metrics = build_metrics(&context);
65 let traces_interner_size =
66 std::num::NonZeroUsize::new(self.otlp_config.traces.string_interner_bytes.as_u64() as usize)
67 .ok_or_else(|| generic_error!("otlp_config.traces.string_interner_size must be greater than 0"))?;
68 let traces_translator = OtlpTracesTranslator::new(self.otlp_config.traces.clone(), traces_interner_size);
69
70 Ok(Box::new(OtlpDecoder {
71 traces_translator,
72 metrics,
73 }))
74 }
75}
76
77impl MemoryBounds for OtlpDecoderConfiguration {
78 fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
79 builder.minimum().with_single_value::<OtlpDecoder>("decoder struct");
80 }
81}
82
83pub struct OtlpDecoder {
85 traces_translator: OtlpTracesTranslator,
86 metrics: Metrics,
87}
88
89#[async_trait]
90impl Decoder for OtlpDecoder {
91 async fn run(self: Box<Self>, mut context: DecoderContext) -> Result<(), GenericError> {
92 let Self {
93 mut traces_translator,
94 metrics,
95 } = *self;
96 let mut health = context.take_health_handle();
97 health.mark_ready();
98
99 debug!("OTLP decoder started.");
100
101 let mut buffer_flush = interval(Duration::from_millis(100));
103 buffer_flush.set_missed_tick_behavior(MissedTickBehavior::Delay);
104
105 let mut event_buffer_manager = EventBufferManager::default();
106
107 loop {
108 select! {
109 maybe_payload = context.payloads().next() => {
110 let payload = match maybe_payload {
111 Some(payload) => payload,
112 None => {
113 debug!("Payloads stream closed, shutting down decoder.");
114 break;
115 }
116 };
117
118 let grpc_payload = match payload.try_into_grpc_payload() {
119 Some(grpc) => grpc,
120 None => {
121 warn!("Received non-gRPC payload in OTLP decoder. Dropping payload.");
122 continue;
123 }
124 };
125
126 match grpc_payload.service_path() {
127 path if path == &*OTLP_TRACES_GRPC_SERVICE_PATH => {
128 let (_, _, _, body) = grpc_payload.into_parts();
129 let request = match ExportTraceServiceRequest::decode(body.into_bytes()) {
130 Ok(req) => req,
131 Err(e) => {
132 error!(error = %e, "Failed to decode OTLP trace request.");
133 continue;
134 }
135 };
136
137 for resource_spans in request.resource_spans {
138 for trace_event in traces_translator.translate_spans(resource_spans, &metrics) {
139 if let Some(event_buffer) = event_buffer_manager.try_push(trace_event) {
140 if let Err(e) = context.dispatcher().dispatch(event_buffer).await {
141 error!(error = %e, "Failed to dispatch trace events.");
142 }
143 }
144 }
145 }
146 }
147 path if path == &*OTLP_METRICS_GRPC_SERVICE_PATH => {
148 warn!("OTLP metrics decoding not yet implemented. Dropping metrics payload.");
149 }
150 path if path == &*OTLP_LOGS_GRPC_SERVICE_PATH => {
151 warn!("OTLP logs decoding not yet implemented. Dropping logs payload.");
152 }
153 path => {
154 warn!(service_path = path, "Received gRPC payload with unknown service path. Dropping payload.");
155 }
156 }
157 },
158 _ = buffer_flush.tick() => {
159 if let Some(event_buffer) = event_buffer_manager.consume() {
160 if let Err(e) = context.dispatcher().dispatch(event_buffer).await {
161 error!(error = %e, "Failed to dispatch buffered trace events.");
162 }
163 }
164 },
165 _ = health.live() => continue,
166 }
167 }
168
169 if let Some(event_buffer) = event_buffer_manager.consume() {
170 if let Err(e) = context.dispatcher().dispatch(event_buffer).await {
171 error!(error = %e, "Failed to dispatch final trace events.");
172 }
173 }
174
175 debug!("OTLP decoder stopped.");
176
177 Ok(())
178 }
179}