Skip to main content

saluki_components/sources/otlp/
mod.rs

1use std::sync::Arc;
2use std::sync::LazyLock;
3use std::time::Duration;
4
5use async_trait::async_trait;
6use axum::body::Bytes;
7use bytesize::ByteSize;
8use otlp_protos::opentelemetry::proto::collector::logs::v1::ExportLogsServiceRequest;
9use otlp_protos::opentelemetry::proto::collector::metrics::v1::ExportMetricsServiceRequest;
10use otlp_protos::opentelemetry::proto::collector::trace::v1::ExportTraceServiceRequest;
11use otlp_protos::opentelemetry::proto::logs::v1::ResourceLogs as OtlpResourceLogs;
12use otlp_protos::opentelemetry::proto::metrics::v1::ResourceMetrics as OtlpResourceMetrics;
13use otlp_protos::opentelemetry::proto::trace::v1::ResourceSpans as OtlpResourceSpans;
14use prost::Message;
15use resource_accounting::{MemoryBounds, MemoryBoundsBuilder};
16use saluki_common::sync::shutdown::{ShutdownCoordinator, ShutdownHandle};
17use saluki_common::task::HandleExt as _;
18use saluki_config::GenericConfiguration;
19use saluki_context::ContextResolver;
20use saluki_core::topology::interconnect::BufferedDispatcher;
21use saluki_core::{
22    components::{
23        sources::{Source, SourceBuilder, SourceContext},
24        ComponentContext,
25    },
26    data_model::event::EventType,
27    topology::{EventsBuffer, OutputDefinition},
28};
29use saluki_env::WorkloadProvider;
30use saluki_error::ErrorContext as _;
31use saluki_error::{generic_error, GenericError};
32use saluki_io::net::ListenAddress;
33use serde::Deserialize;
34use tokio::pin;
35use tokio::select;
36use tokio::sync::mpsc;
37use tokio::time::{interval, MissedTickBehavior};
38use tracing::{debug, error};
39
40use crate::common::otlp::config::OtlpConfig;
41use crate::common::otlp::{build_metrics, Metrics, OtlpHandler, OtlpServerBuilder};
42
43mod logs;
44mod metrics;
45mod resolver;
46use self::logs::translator::OtlpLogsTranslator;
47use self::metrics::translator::OtlpMetricsTranslator;
48use self::resolver::build_context_resolver;
49use crate::common::otlp::origin::OtlpOriginTagResolver;
50use crate::common::otlp::traces::translator::OtlpTracesTranslator;
51
52const fn default_context_string_interner_size() -> ByteSize {
53    ByteSize::mib(2)
54}
55
56const fn default_cached_contexts_limit() -> usize {
57    500_000
58}
59
60const fn default_cached_tagsets_limit() -> usize {
61    500_000
62}
63
64const fn default_allow_context_heap_allocations() -> bool {
65    true
66}
67
68/// Configuration for the OTLP source.
69#[derive(Deserialize, Default)]
70#[cfg_attr(test, derive(derive_where::DeriveWhere, serde::Serialize))]
71#[cfg_attr(test, derive_where(PartialEq))]
72pub struct OtlpConfiguration {
73    otlp_config: OtlpConfig,
74
75    /// Total size of the string interner used for contexts.
76    ///
77    /// This controls the amount of memory that can be used to intern metric names and tags. If the interner is full,
78    /// metrics with contexts that haven't already been resolved may or may not be dropped, depending on the value of
79    /// `allow_context_heap_allocations`.
80    #[serde(
81        rename = "otlp_string_interner_size",
82        default = "default_context_string_interner_size"
83    )]
84    context_string_interner_bytes: ByteSize,
85
86    /// The maximum number of cached contexts to allow.
87    ///
88    /// This is the maximum number of resolved contexts that can be cached at any given time. This limit doesn't affect
89    /// the total number of contexts that can be _alive_ at any given time, which is dependent on the interner capacity
90    /// and whether or not heap allocations are allowed.
91    ///
92    /// Defaults to 500,000.
93    #[serde(rename = "otlp_cached_contexts_limit", default = "default_cached_contexts_limit")]
94    cached_contexts_limit: usize,
95
96    /// The maximum number of cached tagsets to allow.
97    ///
98    /// This is the maximum number of resolved tagsets that can be cached at any given time. This limit doesn't affect
99    /// the total number of tagsets that can be _alive_ at any given time, which is dependent on the interner capacity
100    /// and whether or not heap allocations are allowed.
101    ///
102    /// Defaults to 500,000.
103    #[serde(rename = "otlp_cached_tagsets_limit", default = "default_cached_tagsets_limit")]
104    cached_tagsets_limit: usize,
105
106    /// Whether or not to allow heap allocations when resolving contexts.
107    ///
108    /// When resolving contexts during parsing, the metric name and tags are interned to reduce memory usage. The
109    /// interner has a fixed size, however, which means some strings can fail to be interned if the interner is full.
110    /// When set to `true`, we allow these strings to be allocated on the heap like normal, but this can lead to
111    /// increased (unbounded) memory usage. When set to `false`, if the metric name and all of its tags can't be
112    /// interned, the metric is skipped.
113    ///
114    /// Defaults to `true`.
115    #[serde(
116        rename = "otlp_allow_context_heap_allocs",
117        default = "default_allow_context_heap_allocations"
118    )]
119    allow_context_heap_allocations: bool,
120
121    /// Workload provider to utilize for origin detection/enrichment.
122    #[serde(skip)]
123    #[cfg_attr(test, derive_where(skip))]
124    workload_provider: Option<Arc<dyn WorkloadProvider + Send + Sync>>,
125}
126
127impl OtlpConfiguration {
128    /// Creates a new `OTLPConfiguration` from the given configuration.
129    pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
130        let mut cfg: Self = config.as_typed()?;
131        cfg.otlp_config.traces.apply_env_overrides(config)?;
132        Ok(cfg)
133    }
134
135    /// Sets the workload provider to use for configuring origin detection/enrichment.
136    ///
137    /// A workload provider must be set otherwise origin detection/enrichment won't be enabled.
138    ///
139    /// Defaults to unset.
140    pub fn with_workload_provider<W>(mut self, workload_provider: W) -> Self
141    where
142        W: WorkloadProvider + Send + Sync + 'static,
143    {
144        self.workload_provider = Some(Arc::new(workload_provider));
145        self
146    }
147}
148
149#[async_trait]
150impl SourceBuilder for OtlpConfiguration {
151    fn outputs(&self) -> &[OutputDefinition<EventType>] {
152        static OUTPUTS: LazyLock<Vec<OutputDefinition<EventType>>> = LazyLock::new(|| {
153            vec![
154                OutputDefinition::named_output("metrics", EventType::Metric),
155                OutputDefinition::named_output("logs", EventType::Log),
156                OutputDefinition::named_output("traces", EventType::Trace),
157            ]
158        });
159
160        &OUTPUTS
161    }
162
163    async fn build(&self, context: ComponentContext) -> Result<Box<dyn Source + Send>, GenericError> {
164        if !self.otlp_config.metrics.enabled && !self.otlp_config.logs.enabled && !self.otlp_config.traces.enabled {
165            return Err(generic_error!(
166                "OTLP metrics, logs and traces support is disabled. Please enable at least one of them."
167            ));
168        }
169
170        let grpc_listen_str = format!(
171            "{}://{}",
172            self.otlp_config.receiver.protocols.grpc.transport, self.otlp_config.receiver.protocols.grpc.endpoint
173        );
174        let grpc_endpoint = ListenAddress::try_from(grpc_listen_str.as_str())
175            .map_err(|e| generic_error!("Invalid gRPC endpoint address '{}': {}", grpc_listen_str, e))?;
176
177        // Enforce the current limitation that we only support TCP for gRPC.
178        if !matches!(grpc_endpoint, ListenAddress::Tcp(_)) {
179            return Err(generic_error!("Only 'tcp' transport is supported for OTLP gRPC"));
180        }
181
182        let http_socket_addr = self.otlp_config.receiver.protocols.http.endpoint.parse().map_err(|e| {
183            generic_error!(
184                "Invalid HTTP endpoint address '{}': {}",
185                self.otlp_config.receiver.protocols.http.endpoint,
186                e
187            )
188        })?;
189
190        let maybe_origin_tags_resolver = self.workload_provider.clone().map(OtlpOriginTagResolver::new);
191
192        let context_resolver = build_context_resolver(self, &context, maybe_origin_tags_resolver.clone())?;
193        let metrics_translator_config = metrics::config::OtlpMetricsTranslatorConfig::default()
194            .with_remapping(true)
195            .with_quantiles(true);
196        let traces_interner_size =
197            std::num::NonZeroUsize::new(self.otlp_config.traces.string_interner_bytes.as_u64() as usize)
198                .ok_or_else(|| generic_error!("otlp_config.traces.string_interner_size must be greater than 0"))?;
199        let traces_translator = OtlpTracesTranslator::new(self.otlp_config.traces.clone(), traces_interner_size);
200        let grpc_max_recv_msg_size_bytes =
201            self.otlp_config.receiver.protocols.grpc.max_recv_msg_size_mib as usize * 1024 * 1024;
202        let metrics = build_metrics(&context);
203
204        Ok(Box::new(Otlp {
205            context_resolver,
206            origin_tag_resolver: maybe_origin_tags_resolver,
207            grpc_endpoint,
208            http_endpoint: ListenAddress::Tcp(http_socket_addr),
209            grpc_max_recv_msg_size_bytes,
210            metrics_translator_config,
211            traces_translator,
212            metrics,
213        }))
214    }
215}
216
217impl MemoryBounds for OtlpConfiguration {
218    fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
219        builder
220            .minimum()
221            .with_single_value::<Otlp>("source struct")
222            .with_single_value::<SourceHandler>("source handler");
223    }
224}
225
226pub struct Otlp {
227    context_resolver: ContextResolver,
228    origin_tag_resolver: Option<OtlpOriginTagResolver>,
229    grpc_endpoint: ListenAddress,
230    http_endpoint: ListenAddress,
231    grpc_max_recv_msg_size_bytes: usize,
232    metrics_translator_config: metrics::config::OtlpMetricsTranslatorConfig,
233    traces_translator: OtlpTracesTranslator,
234    metrics: Metrics, // Telemetry metrics, not DD native metrics.
235}
236
237#[async_trait]
238impl Source for Otlp {
239    async fn run(self: Box<Self>, mut context: SourceContext) -> Result<(), GenericError> {
240        let Self {
241            context_resolver,
242            origin_tag_resolver,
243            grpc_endpoint,
244            http_endpoint,
245            grpc_max_recv_msg_size_bytes,
246            metrics_translator_config,
247            traces_translator,
248            metrics,
249        } = *self;
250
251        let global_shutdown = context.take_shutdown_handle();
252        pin!(global_shutdown);
253
254        let mut health = context.take_health_handle();
255        let memory_limiter = context.topology_context().memory_limiter();
256
257        // Create the internal channel for decoupling the servers from the converter.
258        let (tx, rx) = mpsc::channel::<OtlpResource>(1024);
259
260        let mut converter_shutdown_coordinator = ShutdownCoordinator::default();
261
262        let metrics_translator = OtlpMetricsTranslator::new(metrics_translator_config, context_resolver)?;
263
264        let thread_pool_handle = context.topology_context().global_thread_pool().clone();
265
266        // Spawn the converter task. This task is shared by both servers.
267        thread_pool_handle.spawn_traced_named(
268            "otlp-resource-converter",
269            run_converter(
270                rx,
271                context.clone(),
272                origin_tag_resolver,
273                converter_shutdown_coordinator.register(),
274                metrics_translator,
275                metrics.clone(),
276                traces_translator,
277            ),
278        );
279
280        let handler = SourceHandler::new(tx);
281        let server_builder = OtlpServerBuilder::new(http_endpoint, grpc_endpoint, grpc_max_recv_msg_size_bytes);
282
283        let (http_shutdown, mut http_error) = server_builder
284            .build(handler, memory_limiter.clone(), thread_pool_handle, metrics)
285            .await?;
286
287        health.mark_ready();
288        debug!("OTLP source started.");
289
290        // Wait for the global shutdown signal, then notify converter to shutdown.
291        loop {
292            select! {
293                _ = &mut global_shutdown => {
294                    debug!("Received shutdown signal.");
295                    break
296                },
297                error = &mut http_error => {
298                    if let Some(error) = error {
299                        debug!(%error, "HTTP server error.");
300                    }
301                    break;
302                },
303                _ = health.live() => continue,
304            }
305        }
306
307        debug!("Stopping OTLP source...");
308
309        http_shutdown.shutdown();
310        converter_shutdown_coordinator.shutdown_and_wait().await;
311
312        debug!("OTLP source stopped.");
313
314        Ok(())
315    }
316}
317
318enum OtlpResource {
319    Metrics(OtlpResourceMetrics),
320    Logs(OtlpResourceLogs),
321    Traces(OtlpResourceSpans),
322}
323
324/// Handler that decodes OTLP bytes and sends resources to the converter.
325struct SourceHandler {
326    tx: mpsc::Sender<OtlpResource>,
327}
328
329impl SourceHandler {
330    fn new(tx: mpsc::Sender<OtlpResource>) -> Self {
331        Self { tx }
332    }
333}
334
335#[async_trait]
336impl OtlpHandler for SourceHandler {
337    async fn handle_metrics(&self, body: Bytes) -> Result<(), GenericError> {
338        let request =
339            ExportMetricsServiceRequest::decode(body).error_context("Failed to decode metrics export request.")?;
340
341        for resource_metrics in request.resource_metrics {
342            self.tx
343                .send(OtlpResource::Metrics(resource_metrics))
344                .await
345                .error_context("Failed to send resource metrics to converter: channel is closed.")?;
346        }
347        Ok(())
348    }
349
350    async fn handle_logs(&self, body: Bytes) -> Result<(), GenericError> {
351        let request = ExportLogsServiceRequest::decode(body).error_context("Failed to decode logs export request.")?;
352
353        for resource_logs in request.resource_logs {
354            self.tx
355                .send(OtlpResource::Logs(resource_logs))
356                .await
357                .error_context("Failed to send resource logs to converter: channel is closed.")?;
358        }
359        Ok(())
360    }
361
362    async fn handle_traces(&self, body: Bytes) -> Result<(), GenericError> {
363        let request =
364            ExportTraceServiceRequest::decode(body).error_context("Failed to decode trace export request.")?;
365
366        for resource_spans in request.resource_spans {
367            self.tx
368                .send(OtlpResource::Traces(resource_spans))
369                .await
370                .error_context("Failed to send resource spans to converter: channel is closed.")?;
371        }
372        Ok(())
373    }
374}
375
376async fn run_converter(
377    mut receiver: mpsc::Receiver<OtlpResource>, source_context: SourceContext,
378    origin_tag_resolver: Option<OtlpOriginTagResolver>, shutdown_handle: ShutdownHandle,
379    mut metrics_translator: OtlpMetricsTranslator, metrics: Metrics, mut traces_translator: OtlpTracesTranslator,
380) {
381    pin!(shutdown_handle);
382
383    debug!("OTLP resource converter task started.");
384
385    // Set a buffer flush interval of 100ms, which will ensure we always flush buffered events at least every 100ms if
386    // we're otherwise idle and not receiving packets from the client.
387    let mut buffer_flush = interval(Duration::from_millis(100));
388    buffer_flush.set_missed_tick_behavior(MissedTickBehavior::Delay);
389
390    let mut metrics_dispatcher: Option<BufferedDispatcher<'_, EventsBuffer>> = None;
391    let mut logs_dispatcher: Option<BufferedDispatcher<'_, EventsBuffer>> = None;
392    let mut traces_dispatcher: Option<BufferedDispatcher<'_, EventsBuffer>> = None;
393
394    loop {
395        select! {
396            Some(otlp_resource) = receiver.recv() => {
397                match otlp_resource {
398                    OtlpResource::Metrics(resource_metrics) => {
399                        match metrics_translator.translate_metrics(resource_metrics, &metrics) {
400                            Ok(events) => {
401                                for event in events {
402                                    let dispatcher = metrics_dispatcher.get_or_insert_with(|| {
403                                        source_context
404                                            .dispatcher()
405                                            .buffered_named("metrics")
406                                            .expect("metrics output should exist")
407                                    });
408                                    if let Err(e) = dispatcher.push(event).await {
409                                        error!(error = %e, "Failed to dispatch metric event.");
410                                    }
411                                }
412                            }
413                            Err(e) => {
414                                error!(error = %e, "Failed to handle resource metrics.");
415                            }
416                        }
417                    }
418                    OtlpResource::Logs(resource_logs) => {
419                        let translator = OtlpLogsTranslator::from_resource_logs(resource_logs, origin_tag_resolver.as_ref());
420                        for log_event in translator {
421                            metrics.logs_received().increment(1);
422
423                            let dispatcher = logs_dispatcher.get_or_insert_with(|| {
424                                source_context
425                                    .dispatcher()
426                                    .buffered_named("logs")
427                                    .expect("logs output should exist")
428                            });
429                            if let Err(e) = dispatcher.push(log_event).await {
430                                error!(error = %e, "Failed to dispatch log event.");
431                            }
432                        }
433                    }
434                    OtlpResource::Traces(resource_spans) => {
435                        for trace_event in traces_translator.translate_spans(resource_spans, &metrics) {
436                            let dispatcher = traces_dispatcher.get_or_insert_with(|| {
437                                source_context
438                                    .dispatcher()
439                                    .buffered_named("traces")
440                                    .expect("traces output should exist")
441                            });
442                            if let Err(e) = dispatcher.push(trace_event).await {
443                                error!(error = %e, "Failed to dispatch trace event.");
444                            }
445                        }
446                    }
447                }
448            },
449            _ = buffer_flush.tick() => {
450                if let Some(dispatcher) = metrics_dispatcher.take() {
451                    if let Err(e) = dispatcher.flush().await {
452                        error!(error = %e, "Failed to flush metric events.");
453                    }
454                }
455                if let Some(dispatcher) = logs_dispatcher.take() {
456                    if let Err(e) = dispatcher.flush().await {
457                        error!(error = %e, "Failed to flush log events.");
458                    }
459                }
460                if let Some(dispatcher) = traces_dispatcher.take() {
461                    if let Err(e) = dispatcher.flush().await {
462                        error!(error = %e, "Failed to flush trace events.");
463                    }
464                }
465            },
466            _ = &mut shutdown_handle => {
467                debug!("Converter task received shutdown signal.");
468                break;
469            }
470        }
471    }
472
473    if let Some(dispatcher) = metrics_dispatcher.take() {
474        if let Err(e) = dispatcher.flush().await {
475            error!(error = %e, "Failed to flush metric events.");
476        }
477    }
478    if let Some(dispatcher) = logs_dispatcher.take() {
479        if let Err(e) = dispatcher.flush().await {
480            error!(error = %e, "Failed to flush log events.");
481        }
482    }
483    if let Some(dispatcher) = traces_dispatcher.take() {
484        if let Err(e) = dispatcher.flush().await {
485            error!(error = %e, "Failed to flush trace events.");
486        }
487    }
488
489    debug!("OTLP resource converter task stopped.");
490}
491
492#[cfg(test)]
493mod config_smoke {
494    use datadog_agent_config_testing::config_registry::structs;
495    use datadog_agent_config_testing::run_config_smoke_tests;
496    use serde_json::json;
497
498    use super::OtlpConfiguration;
499    use crate::config::{DatadogRemapper, KEY_ALIASES};
500
501    #[tokio::test]
502    async fn smoke_test() {
503        run_config_smoke_tests(
504            structs::OTLP_CONFIGURATION,
505            &[],
506            json!({ "otlp_config": {} }),
507            |cfg| OtlpConfiguration::from_configuration(&cfg).expect("OtlpConfiguration should deserialize"),
508            KEY_ALIASES,
509            DatadogRemapper::new,
510        )
511        .await
512    }
513}