saluki_components/decoders/otlp/
mod.rs1use std::time::Duration;
2
3use async_trait::async_trait;
4use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
5use otlp_protos::opentelemetry::proto::collector::trace::v1::ExportTraceServiceRequest;
6use prost::Message;
7use saluki_config::GenericConfiguration;
8use saluki_core::{
9 components::{
10 decoders::{Decoder, DecoderBuilder, DecoderContext},
11 ComponentContext,
12 },
13 data_model::{event::EventType, payload::PayloadType},
14 topology::interconnect::EventBufferManager,
15};
16use saluki_error::{ErrorContext as _, GenericError};
17use serde::Deserialize;
18use tokio::{
19 select,
20 time::{interval, MissedTickBehavior},
21};
22use tracing::{debug, error, warn};
23
24use crate::common::otlp::traces::translator::OtlpTracesTranslator;
25use crate::common::otlp::{
26 build_metrics, config::TracesConfig, Metrics, OTLP_LOGS_GRPC_SERVICE_PATH, OTLP_METRICS_GRPC_SERVICE_PATH,
27 OTLP_TRACES_GRPC_SERVICE_PATH,
28};
29
30#[derive(Deserialize, Default)]
32pub struct OtlpDecoderConfiguration {
33 #[serde(default)]
34 otlp_config: OtlpDecoderConfig,
35}
36
37#[derive(Deserialize, Default)]
39struct OtlpDecoderConfig {
40 #[serde(default)]
41 traces: TracesConfig,
42}
43
44impl OtlpDecoderConfiguration {
45 pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
47 config
48 .as_typed()
49 .error_context("Failed to load OTLP decoder configuration")
50 }
51}
52
53#[async_trait]
54impl DecoderBuilder for OtlpDecoderConfiguration {
55 fn input_payload_type(&self) -> PayloadType {
56 PayloadType::Grpc
57 }
58
59 fn output_event_type(&self) -> EventType {
60 EventType::Trace
61 }
62
63 async fn build(&self, context: ComponentContext) -> Result<Box<dyn Decoder + Send>, GenericError> {
64 let metrics = build_metrics(&context);
65 let traces_translator = OtlpTracesTranslator::new(self.otlp_config.traces.clone());
66
67 Ok(Box::new(OtlpDecoder {
68 traces_translator,
69 metrics,
70 }))
71 }
72}
73
74impl MemoryBounds for OtlpDecoderConfiguration {
75 fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
76 builder.minimum().with_single_value::<OtlpDecoder>("decoder struct");
77 }
78}
79
80pub struct OtlpDecoder {
82 traces_translator: OtlpTracesTranslator,
83 metrics: Metrics,
84}
85
86#[async_trait]
87impl Decoder for OtlpDecoder {
88 async fn run(self: Box<Self>, mut context: DecoderContext) -> Result<(), GenericError> {
89 let mut health = context.take_health_handle();
90 health.mark_ready();
91
92 debug!("OTLP decoder started.");
93
94 let mut buffer_flush = interval(Duration::from_millis(100));
96 buffer_flush.set_missed_tick_behavior(MissedTickBehavior::Delay);
97
98 let mut event_buffer_manager = EventBufferManager::default();
99
100 loop {
101 select! {
102 maybe_payload = context.payloads().next() => {
103 let payload = match maybe_payload {
104 Some(payload) => payload,
105 None => {
106 debug!("Payloads stream closed, shutting down decoder.");
107 break;
108 }
109 };
110
111 let grpc_payload = match payload.try_into_grpc_payload() {
112 Some(grpc) => grpc,
113 None => {
114 warn!("Received non-gRPC payload in OTLP decoder. Dropping payload.");
115 continue;
116 }
117 };
118
119 match grpc_payload.service_path() {
120 path if path == &*OTLP_TRACES_GRPC_SERVICE_PATH => {
121 let (_, _, _, body) = grpc_payload.into_parts();
122 let request = match ExportTraceServiceRequest::decode(body.into_bytes()) {
123 Ok(req) => req,
124 Err(e) => {
125 error!(error = %e, "Failed to decode OTLP trace request.");
126 continue;
127 }
128 };
129
130 for resource_spans in request.resource_spans {
131 let trace_events = self.traces_translator.translate_resource_spans(resource_spans, &self.metrics);
132 for trace_event in trace_events {
133 if let Some(event_buffer) = event_buffer_manager.try_push(trace_event) {
134 if let Err(e) = context.dispatcher().dispatch(event_buffer).await {
135 error!(error = %e, "Failed to dispatch trace events.");
136 }
137 }
138 }
139 }
140 }
141 path if path == &*OTLP_METRICS_GRPC_SERVICE_PATH => {
142 warn!("OTLP metrics decoding not yet implemented. Dropping metrics payload.");
143 }
144 path if path == &*OTLP_LOGS_GRPC_SERVICE_PATH => {
145 warn!("OTLP logs decoding not yet implemented. Dropping logs payload.");
146 }
147 path => {
148 warn!(service_path = path, "Received gRPC payload with unknown service path. Dropping payload.");
149 }
150 }
151 },
152 _ = buffer_flush.tick() => {
153 if let Some(event_buffer) = event_buffer_manager.consume() {
154 if let Err(e) = context.dispatcher().dispatch(event_buffer).await {
155 error!(error = %e, "Failed to dispatch buffered trace events.");
156 }
157 }
158 },
159 _ = health.live() => continue,
160 }
161 }
162
163 if let Some(event_buffer) = event_buffer_manager.consume() {
164 if let Err(e) = context.dispatcher().dispatch(event_buffer).await {
165 error!(error = %e, "Failed to dispatch final trace events.");
166 }
167 }
168
169 debug!("OTLP decoder stopped.");
170
171 Ok(())
172 }
173}