saluki_components/sources/otlp/
mod.rs

1use std::sync::Arc;
2use std::sync::LazyLock;
3use std::time::Duration;
4
5use async_trait::async_trait;
6use axum::body::Bytes;
7use bytesize::ByteSize;
8use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
9use otlp_protos::opentelemetry::proto::collector::logs::v1::ExportLogsServiceRequest;
10use otlp_protos::opentelemetry::proto::collector::metrics::v1::ExportMetricsServiceRequest;
11use otlp_protos::opentelemetry::proto::collector::trace::v1::ExportTraceServiceRequest;
12use otlp_protos::opentelemetry::proto::logs::v1::ResourceLogs as OtlpResourceLogs;
13use otlp_protos::opentelemetry::proto::metrics::v1::ResourceMetrics as OtlpResourceMetrics;
14use otlp_protos::opentelemetry::proto::trace::v1::ResourceSpans as OtlpResourceSpans;
15use prost::Message;
16use saluki_common::task::HandleExt as _;
17use saluki_config::GenericConfiguration;
18use saluki_context::ContextResolver;
19use saluki_core::topology::interconnect::EventBufferManager;
20use saluki_core::topology::shutdown::{DynamicShutdownCoordinator, DynamicShutdownHandle};
21use saluki_core::{
22    components::{
23        sources::{Source, SourceBuilder, SourceContext},
24        ComponentContext,
25    },
26    data_model::event::{Event, EventType},
27    topology::{EventsBuffer, OutputDefinition},
28};
29use saluki_env::WorkloadProvider;
30use saluki_error::ErrorContext as _;
31use saluki_error::{generic_error, GenericError};
32use saluki_io::net::ListenAddress;
33use serde::Deserialize;
34use tokio::select;
35use tokio::sync::mpsc;
36use tokio::time::{interval, MissedTickBehavior};
37use tracing::{debug, error};
38
39use crate::common::otlp::config::OtlpConfig;
40use crate::common::otlp::config::TracesConfig;
41use crate::common::otlp::{build_metrics, Metrics, OtlpHandler, OtlpServerBuilder};
42
43mod attributes;
44mod logs;
45mod metrics;
46mod origin;
47mod resolver;
48pub mod traces;
49use self::logs::translator::OtlpLogsTranslator;
50use self::metrics::translator::OtlpMetricsTranslator;
51use self::origin::OtlpOriginTagResolver;
52use self::resolver::build_context_resolver;
53use self::traces::translator::OtlpTracesTranslator;
54
55const fn default_context_string_interner_size() -> ByteSize {
56    ByteSize::mib(2)
57}
58
59const fn default_cached_contexts_limit() -> usize {
60    500_000
61}
62
63const fn default_cached_tagsets_limit() -> usize {
64    500_000
65}
66
67const fn default_allow_context_heap_allocations() -> bool {
68    true
69}
70
71/// Configuration for the OTLP source.
72#[derive(Deserialize, Default)]
73pub struct OtlpConfiguration {
74    otlp_config: OtlpConfig,
75
76    /// Total size of the string interner used for contexts.
77    ///
78    /// This controls the amount of memory that can be used to intern metric names and tags. If the interner is full,
79    /// metrics with contexts that have not already been resolved may or may not be dropped, depending on the value of
80    /// `allow_context_heap_allocations`.
81    #[serde(
82        rename = "otlp_string_interner_size",
83        default = "default_context_string_interner_size"
84    )]
85    context_string_interner_bytes: ByteSize,
86
87    /// The maximum number of cached contexts to allow.
88    ///
89    /// This is the maximum number of resolved contexts that can be cached at any given time. This limit does not affect
90    /// the total number of contexts that can be _alive_ at any given time, which is dependent on the interner capacity
91    /// and whether or not heap allocations are allowed.
92    ///
93    /// Defaults to 500,000.
94    #[serde(rename = "otlp_cached_contexts_limit", default = "default_cached_contexts_limit")]
95    cached_contexts_limit: usize,
96
97    /// The maximum number of cached tagsets to allow.
98    ///
99    /// This is the maximum number of resolved tagsets that can be cached at any given time. This limit does not affect
100    /// the total number of tagsets that can be _alive_ at any given time, which is dependent on the interner capacity
101    /// and whether or not heap allocations are allowed.
102    ///
103    /// Defaults to 500,000.
104    #[serde(rename = "otlp_cached_tagsets_limit", default = "default_cached_tagsets_limit")]
105    cached_tagsets_limit: usize,
106
107    /// Whether or not to allow heap allocations when resolving contexts.
108    ///
109    /// When resolving contexts during parsing, the metric name and tags are interned to reduce memory usage. The
110    /// interner has a fixed size, however, which means some strings can fail to be interned if the interner is full.
111    /// When set to `true`, we allow these strings to be allocated on the heap like normal, but this can lead to
112    /// increased (unbounded) memory usage. When set to `false`, if the metric name and all of its tags cannot be
113    /// interned, the metric is skipped.
114    ///
115    /// Defaults to `true`.
116    #[serde(
117        rename = "otlp_allow_context_heap_allocs",
118        default = "default_allow_context_heap_allocations"
119    )]
120    allow_context_heap_allocations: bool,
121
122    /// Workload provider to utilize for origin detection/enrichment.
123    #[serde(skip)]
124    workload_provider: Option<Arc<dyn WorkloadProvider + Send + Sync>>,
125}
126
127impl OtlpConfiguration {
128    /// Creates a new `OTLPConfiguration` from the given configuration.
129    pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
130        Ok(config.as_typed()?)
131    }
132
133    /// Sets the workload provider to use for configuring origin detection/enrichment.
134    ///
135    /// A workload provider must be set otherwise origin detection/enrichment will not be enabled.
136    ///
137    /// Defaults to unset.
138    pub fn with_workload_provider<W>(mut self, workload_provider: W) -> Self
139    where
140        W: WorkloadProvider + Send + Sync + 'static,
141    {
142        self.workload_provider = Some(Arc::new(workload_provider));
143        self
144    }
145}
146
147#[async_trait]
148impl SourceBuilder for OtlpConfiguration {
149    fn outputs(&self) -> &[OutputDefinition<EventType>] {
150        static OUTPUTS: LazyLock<Vec<OutputDefinition<EventType>>> = LazyLock::new(|| {
151            vec![
152                OutputDefinition::named_output("metrics", EventType::Metric),
153                OutputDefinition::named_output("logs", EventType::Log),
154                OutputDefinition::named_output("traces", EventType::Trace),
155            ]
156        });
157
158        &OUTPUTS
159    }
160
161    async fn build(&self, context: ComponentContext) -> Result<Box<dyn Source + Send>, GenericError> {
162        if !self.otlp_config.metrics.enabled && !self.otlp_config.logs.enabled && !self.otlp_config.traces.enabled {
163            return Err(generic_error!(
164                "OTLP metrics, logs and traces support is disabled. Please enable at least one of them."
165            ));
166        }
167
168        let grpc_listen_str = format!(
169            "{}://{}",
170            self.otlp_config.receiver.protocols.grpc.transport, self.otlp_config.receiver.protocols.grpc.endpoint
171        );
172        let grpc_endpoint = ListenAddress::try_from(grpc_listen_str.as_str())
173            .map_err(|e| generic_error!("Invalid gRPC endpoint address '{}': {}", grpc_listen_str, e))?;
174
175        // Enforce the current limitation that we only support TCP for gRPC.
176        if !matches!(grpc_endpoint, ListenAddress::Tcp(_)) {
177            return Err(generic_error!("Only 'tcp' transport is supported for OTLP gRPC"));
178        }
179
180        let http_socket_addr = self.otlp_config.receiver.protocols.http.endpoint.parse().map_err(|e| {
181            generic_error!(
182                "Invalid HTTP endpoint address '{}': {}",
183                self.otlp_config.receiver.protocols.http.endpoint,
184                e
185            )
186        })?;
187
188        let maybe_origin_tags_resolver = self.workload_provider.clone().map(OtlpOriginTagResolver::new);
189
190        let context_resolver = build_context_resolver(self, &context, maybe_origin_tags_resolver.clone())?;
191        let metrics_translator_config = metrics::config::OtlpMetricsTranslatorConfig::default().with_remapping(true);
192        let traces_config = self.otlp_config.traces.clone();
193        let grpc_max_recv_msg_size_bytes =
194            self.otlp_config.receiver.protocols.grpc.max_recv_msg_size_mib as usize * 1024 * 1024;
195        let metrics = build_metrics(&context);
196
197        Ok(Box::new(Otlp {
198            context_resolver,
199            origin_tag_resolver: maybe_origin_tags_resolver,
200            grpc_endpoint,
201            http_endpoint: ListenAddress::Tcp(http_socket_addr),
202            grpc_max_recv_msg_size_bytes,
203            metrics_translator_config,
204            traces_config,
205            metrics,
206        }))
207    }
208}
209
210impl MemoryBounds for OtlpConfiguration {
211    fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
212        builder
213            .minimum()
214            .with_single_value::<Otlp>("source struct")
215            .with_single_value::<SourceHandler>("source handler");
216    }
217}
218
219pub struct Otlp {
220    context_resolver: ContextResolver,
221    origin_tag_resolver: Option<OtlpOriginTagResolver>,
222    grpc_endpoint: ListenAddress,
223    http_endpoint: ListenAddress,
224    grpc_max_recv_msg_size_bytes: usize,
225    metrics_translator_config: metrics::config::OtlpMetricsTranslatorConfig,
226    traces_config: TracesConfig,
227    metrics: Metrics, // Telemetry metrics, not DD native metrics.
228}
229
230#[async_trait]
231impl Source for Otlp {
232    async fn run(self: Box<Self>, mut context: SourceContext) -> Result<(), GenericError> {
233        let Self {
234            context_resolver,
235            origin_tag_resolver,
236            grpc_endpoint,
237            http_endpoint,
238            grpc_max_recv_msg_size_bytes,
239            metrics_translator_config,
240            traces_config,
241            metrics,
242        } = *self;
243
244        let mut global_shutdown = context.take_shutdown_handle();
245        let mut health = context.take_health_handle();
246        let memory_limiter = context.topology_context().memory_limiter();
247
248        // Create the internal channel for decoupling the servers from the converter.
249        let (tx, rx) = mpsc::channel::<OtlpResource>(1024);
250
251        let mut converter_shutdown_coordinator = DynamicShutdownCoordinator::default();
252
253        let metrics_translator = OtlpMetricsTranslator::new(metrics_translator_config, context_resolver);
254
255        let thread_pool_handle = context.topology_context().global_thread_pool().clone();
256
257        // Spawn the converter task. This task is shared by both servers.
258        thread_pool_handle.spawn_traced_named(
259            "otlp-resource-converter",
260            run_converter(
261                rx,
262                context.clone(),
263                origin_tag_resolver,
264                converter_shutdown_coordinator.register(),
265                metrics_translator,
266                metrics.clone(),
267                traces_config,
268            ),
269        );
270
271        let handler = SourceHandler::new(tx);
272        let server_builder = OtlpServerBuilder::new(http_endpoint, grpc_endpoint, grpc_max_recv_msg_size_bytes);
273
274        let (http_shutdown, mut http_error) = server_builder
275            .build(handler, memory_limiter.clone(), thread_pool_handle, metrics)
276            .await?;
277
278        health.mark_ready();
279        debug!("OTLP source started.");
280
281        // Wait for the global shutdown signal, then notify converter to shutdown.
282        loop {
283            select! {
284                _ = &mut global_shutdown => {
285                    debug!("Received shutdown signal.");
286                    break
287                },
288                error = &mut http_error => {
289                    if let Some(error) = error {
290                        debug!(%error, "HTTP server error.");
291                    }
292                    break;
293                },
294                _ = health.live() => continue,
295            }
296        }
297
298        debug!("Stopping OTLP source...");
299
300        http_shutdown.shutdown();
301        converter_shutdown_coordinator.shutdown().await;
302
303        debug!("OTLP source stopped.");
304
305        Ok(())
306    }
307}
308
309enum OtlpResource {
310    Metrics(OtlpResourceMetrics),
311    Logs(OtlpResourceLogs),
312    Traces(OtlpResourceSpans),
313}
314
315/// Handler that decodes OTLP bytes and sends resources to the converter.
316struct SourceHandler {
317    tx: mpsc::Sender<OtlpResource>,
318}
319
320impl SourceHandler {
321    fn new(tx: mpsc::Sender<OtlpResource>) -> Self {
322        Self { tx }
323    }
324}
325
326#[async_trait]
327impl OtlpHandler for SourceHandler {
328    async fn handle_metrics(&self, body: Bytes) -> Result<(), GenericError> {
329        let request =
330            ExportMetricsServiceRequest::decode(body).error_context("Failed to decode metrics export request.")?;
331
332        for resource_metrics in request.resource_metrics {
333            self.tx
334                .send(OtlpResource::Metrics(resource_metrics))
335                .await
336                .error_context("Failed to send resource metrics to converter: channel is closed.")?;
337        }
338        Ok(())
339    }
340
341    async fn handle_logs(&self, body: Bytes) -> Result<(), GenericError> {
342        let request = ExportLogsServiceRequest::decode(body).error_context("Failed to decode logs export request.")?;
343
344        for resource_logs in request.resource_logs {
345            self.tx
346                .send(OtlpResource::Logs(resource_logs))
347                .await
348                .error_context("Failed to send resource logs to converter: channel is closed.")?;
349        }
350        Ok(())
351    }
352
353    async fn handle_traces(&self, body: Bytes) -> Result<(), GenericError> {
354        let request =
355            ExportTraceServiceRequest::decode(body).error_context("Failed to decode trace export request.")?;
356
357        for resource_spans in request.resource_spans {
358            self.tx
359                .send(OtlpResource::Traces(resource_spans))
360                .await
361                .error_context("Failed to send resource spans to converter: channel is closed.")?;
362        }
363        Ok(())
364    }
365}
366
367async fn dispatch_events(mut events: EventsBuffer, source_context: &SourceContext) {
368    if events.is_empty() {
369        return;
370    }
371
372    if events.has_event_type(EventType::Trace) {
373        let mut buffered_dispatcher = source_context
374            .dispatcher()
375            .buffered_named("traces")
376            .expect("traces output should exist");
377        for trace_event in events.extract(Event::is_trace) {
378            if let Err(e) = buffered_dispatcher.push(trace_event).await {
379                error!(error = %e, "Failed to dispatch trace(s).");
380            }
381        }
382        if let Err(e) = buffered_dispatcher.flush().await {
383            error!(error = %e, "Failed to flush trace(s).");
384        }
385    }
386
387    if events.has_event_type(EventType::Log) {
388        let mut buffered_dispatcher = source_context
389            .dispatcher()
390            .buffered_named("logs")
391            .expect("logs output should exist");
392
393        for log_event in events.extract(Event::is_log) {
394            if let Err(e) = buffered_dispatcher.push(log_event).await {
395                error!(error = %e, "Failed to dispatch log(s).");
396            }
397        }
398
399        if let Err(e) = buffered_dispatcher.flush().await {
400            error!(error = %e, "Failed to flush log(s).");
401        }
402    }
403
404    let len = events.len();
405    if let Err(e) = source_context.dispatcher().dispatch_named("metrics", events).await {
406        error!(error = %e, "Failed to dispatch metric events.");
407    } else {
408        debug!(events_len = len, "Dispatched metric events.");
409    }
410}
411
412async fn run_converter(
413    mut receiver: mpsc::Receiver<OtlpResource>, source_context: SourceContext,
414    origin_tag_resolver: Option<OtlpOriginTagResolver>, shutdown_handle: DynamicShutdownHandle,
415    mut metrics_translator: OtlpMetricsTranslator, metrics: Metrics, traces_config: TracesConfig,
416) {
417    tokio::pin!(shutdown_handle);
418    debug!("OTLP resource converter task started.");
419
420    // Set a buffer flush interval of 100ms, which will ensure we always flush buffered events at least every 100ms if
421    // we're otherwise idle and not receiving packets from the client.
422    let mut buffer_flush = interval(Duration::from_millis(100));
423    buffer_flush.set_missed_tick_behavior(MissedTickBehavior::Delay);
424
425    let mut event_buffer_manager = EventBufferManager::default();
426
427    let traces_translator = OtlpTracesTranslator::new(traces_config);
428
429    loop {
430        select! {
431            Some(otlp_resource) = receiver.recv() => {
432                match otlp_resource {
433                    OtlpResource::Metrics(resource_metrics) => {
434                        match metrics_translator.map_metrics(resource_metrics, &metrics) {
435                            Ok(events) => {
436                                for event in events {
437                                    if let Some(event_buffer) = event_buffer_manager.try_push(event) {
438                                        dispatch_events(event_buffer, &source_context).await;
439                                    }
440                                }
441                            }
442                            Err(e) => {
443                                error!(error = %e, "Failed to handle resource metrics.");
444                            }
445                        }
446                    }
447                    OtlpResource::Logs(resource_logs) => {
448                        let translator = OtlpLogsTranslator::from_resource_logs(resource_logs, origin_tag_resolver.as_ref());
449                        for log_event in translator {
450                            metrics.logs_received().increment(1);
451
452                            if let Some(event_buffer) = event_buffer_manager.try_push(log_event) {
453                                dispatch_events(event_buffer, &source_context).await;
454                            }
455                        }
456                    }
457                    OtlpResource::Traces(resource_spans) => {
458                        let trace_events =
459                            traces_translator.translate_resource_spans(resource_spans, &metrics);
460                        for trace_event in trace_events {
461                            if let Some(event_buffer) = event_buffer_manager.try_push(trace_event) {
462                                dispatch_events(event_buffer, &source_context).await;
463                            }
464                        }
465                    }
466                }
467            },
468            _ = buffer_flush.tick() => {
469                if let Some(event_buffer) = event_buffer_manager.consume() {
470                    dispatch_events(event_buffer, &source_context).await;
471                }
472            },
473            _ = &mut shutdown_handle => {
474                debug!("Converter task received shutdown signal.");
475                break;
476            }
477        }
478    }
479
480    if let Some(event_buffer) = event_buffer_manager.consume() {
481        dispatch_events(event_buffer, &source_context).await;
482    }
483
484    debug!("OTLP resource converter task stopped.");
485}