Skip to main content

saluki_components/sources/otlp/
mod.rs

1use std::sync::Arc;
2use std::sync::LazyLock;
3use std::time::Duration;
4
5use async_trait::async_trait;
6use axum::body::Bytes;
7use bytesize::ByteSize;
8use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
9use otlp_protos::opentelemetry::proto::collector::logs::v1::ExportLogsServiceRequest;
10use otlp_protos::opentelemetry::proto::collector::metrics::v1::ExportMetricsServiceRequest;
11use otlp_protos::opentelemetry::proto::collector::trace::v1::ExportTraceServiceRequest;
12use otlp_protos::opentelemetry::proto::logs::v1::ResourceLogs as OtlpResourceLogs;
13use otlp_protos::opentelemetry::proto::metrics::v1::ResourceMetrics as OtlpResourceMetrics;
14use otlp_protos::opentelemetry::proto::trace::v1::ResourceSpans as OtlpResourceSpans;
15use prost::Message;
16use saluki_common::task::HandleExt as _;
17use saluki_config::GenericConfiguration;
18use saluki_context::ContextResolver;
19use saluki_core::topology::interconnect::BufferedDispatcher;
20use saluki_core::topology::shutdown::{DynamicShutdownCoordinator, DynamicShutdownHandle};
21use saluki_core::{
22    components::{
23        sources::{Source, SourceBuilder, SourceContext},
24        ComponentContext,
25    },
26    data_model::event::EventType,
27    topology::{EventsBuffer, OutputDefinition},
28};
29use saluki_env::WorkloadProvider;
30use saluki_error::ErrorContext as _;
31use saluki_error::{generic_error, GenericError};
32use saluki_io::net::ListenAddress;
33use serde::Deserialize;
34use tokio::select;
35use tokio::sync::mpsc;
36use tokio::time::{interval, MissedTickBehavior};
37use tracing::{debug, error};
38
39use crate::common::otlp::config::OtlpConfig;
40use crate::common::otlp::{build_metrics, Metrics, OtlpHandler, OtlpServerBuilder};
41
42mod logs;
43mod metrics;
44mod resolver;
45use self::logs::translator::OtlpLogsTranslator;
46use self::metrics::translator::OtlpMetricsTranslator;
47use self::resolver::build_context_resolver;
48use crate::common::otlp::origin::OtlpOriginTagResolver;
49use crate::common::otlp::traces::translator::OtlpTracesTranslator;
50
51const fn default_context_string_interner_size() -> ByteSize {
52    ByteSize::mib(2)
53}
54
55const fn default_cached_contexts_limit() -> usize {
56    500_000
57}
58
59const fn default_cached_tagsets_limit() -> usize {
60    500_000
61}
62
63const fn default_allow_context_heap_allocations() -> bool {
64    true
65}
66
67/// Configuration for the OTLP source.
68#[derive(Deserialize, Default)]
69#[cfg_attr(test, derive(derive_where::DeriveWhere, serde::Serialize))]
70#[cfg_attr(test, derive_where(PartialEq))]
71pub struct OtlpConfiguration {
72    otlp_config: OtlpConfig,
73
74    /// Total size of the string interner used for contexts.
75    ///
76    /// This controls the amount of memory that can be used to intern metric names and tags. If the interner is full,
77    /// metrics with contexts that have not already been resolved may or may not be dropped, depending on the value of
78    /// `allow_context_heap_allocations`.
79    #[serde(
80        rename = "otlp_string_interner_size",
81        default = "default_context_string_interner_size"
82    )]
83    context_string_interner_bytes: ByteSize,
84
85    /// The maximum number of cached contexts to allow.
86    ///
87    /// This is the maximum number of resolved contexts that can be cached at any given time. This limit does not affect
88    /// the total number of contexts that can be _alive_ at any given time, which is dependent on the interner capacity
89    /// and whether or not heap allocations are allowed.
90    ///
91    /// Defaults to 500,000.
92    #[serde(rename = "otlp_cached_contexts_limit", default = "default_cached_contexts_limit")]
93    cached_contexts_limit: usize,
94
95    /// The maximum number of cached tagsets to allow.
96    ///
97    /// This is the maximum number of resolved tagsets that can be cached at any given time. This limit does not affect
98    /// the total number of tagsets that can be _alive_ at any given time, which is dependent on the interner capacity
99    /// and whether or not heap allocations are allowed.
100    ///
101    /// Defaults to 500,000.
102    #[serde(rename = "otlp_cached_tagsets_limit", default = "default_cached_tagsets_limit")]
103    cached_tagsets_limit: usize,
104
105    /// Whether or not to allow heap allocations when resolving contexts.
106    ///
107    /// When resolving contexts during parsing, the metric name and tags are interned to reduce memory usage. The
108    /// interner has a fixed size, however, which means some strings can fail to be interned if the interner is full.
109    /// When set to `true`, we allow these strings to be allocated on the heap like normal, but this can lead to
110    /// increased (unbounded) memory usage. When set to `false`, if the metric name and all of its tags cannot be
111    /// interned, the metric is skipped.
112    ///
113    /// Defaults to `true`.
114    #[serde(
115        rename = "otlp_allow_context_heap_allocs",
116        default = "default_allow_context_heap_allocations"
117    )]
118    allow_context_heap_allocations: bool,
119
120    /// Workload provider to utilize for origin detection/enrichment.
121    #[serde(skip)]
122    #[cfg_attr(test, derive_where(skip))]
123    workload_provider: Option<Arc<dyn WorkloadProvider + Send + Sync>>,
124}
125
126impl OtlpConfiguration {
127    /// Creates a new `OTLPConfiguration` from the given configuration.
128    pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
129        let mut cfg: Self = config.as_typed()?;
130        cfg.otlp_config.traces.apply_env_overrides(config)?;
131        Ok(cfg)
132    }
133
134    /// Sets the workload provider to use for configuring origin detection/enrichment.
135    ///
136    /// A workload provider must be set otherwise origin detection/enrichment will not be enabled.
137    ///
138    /// Defaults to unset.
139    pub fn with_workload_provider<W>(mut self, workload_provider: W) -> Self
140    where
141        W: WorkloadProvider + Send + Sync + 'static,
142    {
143        self.workload_provider = Some(Arc::new(workload_provider));
144        self
145    }
146}
147
148#[async_trait]
149impl SourceBuilder for OtlpConfiguration {
150    fn outputs(&self) -> &[OutputDefinition<EventType>] {
151        static OUTPUTS: LazyLock<Vec<OutputDefinition<EventType>>> = LazyLock::new(|| {
152            vec![
153                OutputDefinition::named_output("metrics", EventType::Metric),
154                OutputDefinition::named_output("logs", EventType::Log),
155                OutputDefinition::named_output("traces", EventType::Trace),
156            ]
157        });
158
159        &OUTPUTS
160    }
161
162    async fn build(&self, context: ComponentContext) -> Result<Box<dyn Source + Send>, GenericError> {
163        if !self.otlp_config.metrics.enabled && !self.otlp_config.logs.enabled && !self.otlp_config.traces.enabled {
164            return Err(generic_error!(
165                "OTLP metrics, logs and traces support is disabled. Please enable at least one of them."
166            ));
167        }
168
169        let grpc_listen_str = format!(
170            "{}://{}",
171            self.otlp_config.receiver.protocols.grpc.transport, self.otlp_config.receiver.protocols.grpc.endpoint
172        );
173        let grpc_endpoint = ListenAddress::try_from(grpc_listen_str.as_str())
174            .map_err(|e| generic_error!("Invalid gRPC endpoint address '{}': {}", grpc_listen_str, e))?;
175
176        // Enforce the current limitation that we only support TCP for gRPC.
177        if !matches!(grpc_endpoint, ListenAddress::Tcp(_)) {
178            return Err(generic_error!("Only 'tcp' transport is supported for OTLP gRPC"));
179        }
180
181        let http_socket_addr = self.otlp_config.receiver.protocols.http.endpoint.parse().map_err(|e| {
182            generic_error!(
183                "Invalid HTTP endpoint address '{}': {}",
184                self.otlp_config.receiver.protocols.http.endpoint,
185                e
186            )
187        })?;
188
189        let maybe_origin_tags_resolver = self.workload_provider.clone().map(OtlpOriginTagResolver::new);
190
191        let context_resolver = build_context_resolver(self, &context, maybe_origin_tags_resolver.clone())?;
192        let metrics_translator_config = metrics::config::OtlpMetricsTranslatorConfig::default()
193            .with_remapping(true)
194            .with_quantiles(true);
195        let traces_interner_size =
196            std::num::NonZeroUsize::new(self.otlp_config.traces.string_interner_bytes.as_u64() as usize)
197                .ok_or_else(|| generic_error!("otlp_config.traces.string_interner_size must be greater than 0"))?;
198        let traces_translator = OtlpTracesTranslator::new(self.otlp_config.traces.clone(), traces_interner_size);
199        let grpc_max_recv_msg_size_bytes =
200            self.otlp_config.receiver.protocols.grpc.max_recv_msg_size_mib as usize * 1024 * 1024;
201        let metrics = build_metrics(&context);
202
203        Ok(Box::new(Otlp {
204            context_resolver,
205            origin_tag_resolver: maybe_origin_tags_resolver,
206            grpc_endpoint,
207            http_endpoint: ListenAddress::Tcp(http_socket_addr),
208            grpc_max_recv_msg_size_bytes,
209            metrics_translator_config,
210            traces_translator,
211            metrics,
212        }))
213    }
214}
215
216impl MemoryBounds for OtlpConfiguration {
217    fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
218        builder
219            .minimum()
220            .with_single_value::<Otlp>("source struct")
221            .with_single_value::<SourceHandler>("source handler");
222    }
223}
224
225pub struct Otlp {
226    context_resolver: ContextResolver,
227    origin_tag_resolver: Option<OtlpOriginTagResolver>,
228    grpc_endpoint: ListenAddress,
229    http_endpoint: ListenAddress,
230    grpc_max_recv_msg_size_bytes: usize,
231    metrics_translator_config: metrics::config::OtlpMetricsTranslatorConfig,
232    traces_translator: OtlpTracesTranslator,
233    metrics: Metrics, // Telemetry metrics, not DD native metrics.
234}
235
236#[async_trait]
237impl Source for Otlp {
238    async fn run(self: Box<Self>, mut context: SourceContext) -> Result<(), GenericError> {
239        let Self {
240            context_resolver,
241            origin_tag_resolver,
242            grpc_endpoint,
243            http_endpoint,
244            grpc_max_recv_msg_size_bytes,
245            metrics_translator_config,
246            traces_translator,
247            metrics,
248        } = *self;
249
250        let mut global_shutdown = context.take_shutdown_handle();
251        let mut health = context.take_health_handle();
252        let memory_limiter = context.topology_context().memory_limiter();
253
254        // Create the internal channel for decoupling the servers from the converter.
255        let (tx, rx) = mpsc::channel::<OtlpResource>(1024);
256
257        let mut converter_shutdown_coordinator = DynamicShutdownCoordinator::default();
258
259        let metrics_translator = OtlpMetricsTranslator::new(metrics_translator_config, context_resolver)?;
260
261        let thread_pool_handle = context.topology_context().global_thread_pool().clone();
262
263        // Spawn the converter task. This task is shared by both servers.
264        thread_pool_handle.spawn_traced_named(
265            "otlp-resource-converter",
266            run_converter(
267                rx,
268                context.clone(),
269                origin_tag_resolver,
270                converter_shutdown_coordinator.register(),
271                metrics_translator,
272                metrics.clone(),
273                traces_translator,
274            ),
275        );
276
277        let handler = SourceHandler::new(tx);
278        let server_builder = OtlpServerBuilder::new(http_endpoint, grpc_endpoint, grpc_max_recv_msg_size_bytes);
279
280        let (http_shutdown, mut http_error) = server_builder
281            .build(handler, memory_limiter.clone(), thread_pool_handle, metrics)
282            .await?;
283
284        health.mark_ready();
285        debug!("OTLP source started.");
286
287        // Wait for the global shutdown signal, then notify converter to shutdown.
288        loop {
289            select! {
290                _ = &mut global_shutdown => {
291                    debug!("Received shutdown signal.");
292                    break
293                },
294                error = &mut http_error => {
295                    if let Some(error) = error {
296                        debug!(%error, "HTTP server error.");
297                    }
298                    break;
299                },
300                _ = health.live() => continue,
301            }
302        }
303
304        debug!("Stopping OTLP source...");
305
306        http_shutdown.shutdown();
307        converter_shutdown_coordinator.shutdown().await;
308
309        debug!("OTLP source stopped.");
310
311        Ok(())
312    }
313}
314
315enum OtlpResource {
316    Metrics(OtlpResourceMetrics),
317    Logs(OtlpResourceLogs),
318    Traces(OtlpResourceSpans),
319}
320
321/// Handler that decodes OTLP bytes and sends resources to the converter.
322struct SourceHandler {
323    tx: mpsc::Sender<OtlpResource>,
324}
325
326impl SourceHandler {
327    fn new(tx: mpsc::Sender<OtlpResource>) -> Self {
328        Self { tx }
329    }
330}
331
332#[async_trait]
333impl OtlpHandler for SourceHandler {
334    async fn handle_metrics(&self, body: Bytes) -> Result<(), GenericError> {
335        let request =
336            ExportMetricsServiceRequest::decode(body).error_context("Failed to decode metrics export request.")?;
337
338        for resource_metrics in request.resource_metrics {
339            self.tx
340                .send(OtlpResource::Metrics(resource_metrics))
341                .await
342                .error_context("Failed to send resource metrics to converter: channel is closed.")?;
343        }
344        Ok(())
345    }
346
347    async fn handle_logs(&self, body: Bytes) -> Result<(), GenericError> {
348        let request = ExportLogsServiceRequest::decode(body).error_context("Failed to decode logs export request.")?;
349
350        for resource_logs in request.resource_logs {
351            self.tx
352                .send(OtlpResource::Logs(resource_logs))
353                .await
354                .error_context("Failed to send resource logs to converter: channel is closed.")?;
355        }
356        Ok(())
357    }
358
359    async fn handle_traces(&self, body: Bytes) -> Result<(), GenericError> {
360        let request =
361            ExportTraceServiceRequest::decode(body).error_context("Failed to decode trace export request.")?;
362
363        for resource_spans in request.resource_spans {
364            self.tx
365                .send(OtlpResource::Traces(resource_spans))
366                .await
367                .error_context("Failed to send resource spans to converter: channel is closed.")?;
368        }
369        Ok(())
370    }
371}
372
373async fn run_converter(
374    mut receiver: mpsc::Receiver<OtlpResource>, source_context: SourceContext,
375    origin_tag_resolver: Option<OtlpOriginTagResolver>, shutdown_handle: DynamicShutdownHandle,
376    mut metrics_translator: OtlpMetricsTranslator, metrics: Metrics, mut traces_translator: OtlpTracesTranslator,
377) {
378    tokio::pin!(shutdown_handle);
379    debug!("OTLP resource converter task started.");
380
381    // Set a buffer flush interval of 100ms, which will ensure we always flush buffered events at least every 100ms if
382    // we're otherwise idle and not receiving packets from the client.
383    let mut buffer_flush = interval(Duration::from_millis(100));
384    buffer_flush.set_missed_tick_behavior(MissedTickBehavior::Delay);
385
386    let mut metrics_dispatcher: Option<BufferedDispatcher<'_, EventsBuffer>> = None;
387    let mut logs_dispatcher: Option<BufferedDispatcher<'_, EventsBuffer>> = None;
388    let mut traces_dispatcher: Option<BufferedDispatcher<'_, EventsBuffer>> = None;
389
390    loop {
391        select! {
392            Some(otlp_resource) = receiver.recv() => {
393                match otlp_resource {
394                    OtlpResource::Metrics(resource_metrics) => {
395                        match metrics_translator.translate_metrics(resource_metrics, &metrics) {
396                            Ok(events) => {
397                                for event in events {
398                                    let dispatcher = metrics_dispatcher.get_or_insert_with(|| {
399                                        source_context
400                                            .dispatcher()
401                                            .buffered_named("metrics")
402                                            .expect("metrics output should exist")
403                                    });
404                                    if let Err(e) = dispatcher.push(event).await {
405                                        error!(error = %e, "Failed to dispatch metric event.");
406                                    }
407                                }
408                            }
409                            Err(e) => {
410                                error!(error = %e, "Failed to handle resource metrics.");
411                            }
412                        }
413                    }
414                    OtlpResource::Logs(resource_logs) => {
415                        let translator = OtlpLogsTranslator::from_resource_logs(resource_logs, origin_tag_resolver.as_ref());
416                        for log_event in translator {
417                            metrics.logs_received().increment(1);
418
419                            let dispatcher = logs_dispatcher.get_or_insert_with(|| {
420                                source_context
421                                    .dispatcher()
422                                    .buffered_named("logs")
423                                    .expect("logs output should exist")
424                            });
425                            if let Err(e) = dispatcher.push(log_event).await {
426                                error!(error = %e, "Failed to dispatch log event.");
427                            }
428                        }
429                    }
430                    OtlpResource::Traces(resource_spans) => {
431                        for trace_event in traces_translator.translate_spans(resource_spans, &metrics) {
432                            let dispatcher = traces_dispatcher.get_or_insert_with(|| {
433                                source_context
434                                    .dispatcher()
435                                    .buffered_named("traces")
436                                    .expect("traces output should exist")
437                            });
438                            if let Err(e) = dispatcher.push(trace_event).await {
439                                error!(error = %e, "Failed to dispatch trace event.");
440                            }
441                        }
442                    }
443                }
444            },
445            _ = buffer_flush.tick() => {
446                if let Some(dispatcher) = metrics_dispatcher.take() {
447                    if let Err(e) = dispatcher.flush().await {
448                        error!(error = %e, "Failed to flush metric events.");
449                    }
450                }
451                if let Some(dispatcher) = logs_dispatcher.take() {
452                    if let Err(e) = dispatcher.flush().await {
453                        error!(error = %e, "Failed to flush log events.");
454                    }
455                }
456                if let Some(dispatcher) = traces_dispatcher.take() {
457                    if let Err(e) = dispatcher.flush().await {
458                        error!(error = %e, "Failed to flush trace events.");
459                    }
460                }
461            },
462            _ = &mut shutdown_handle => {
463                debug!("Converter task received shutdown signal.");
464                break;
465            }
466        }
467    }
468
469    if let Some(dispatcher) = metrics_dispatcher.take() {
470        if let Err(e) = dispatcher.flush().await {
471            error!(error = %e, "Failed to flush metric events.");
472        }
473    }
474    if let Some(dispatcher) = logs_dispatcher.take() {
475        if let Err(e) = dispatcher.flush().await {
476            error!(error = %e, "Failed to flush log events.");
477        }
478    }
479    if let Some(dispatcher) = traces_dispatcher.take() {
480        if let Err(e) = dispatcher.flush().await {
481            error!(error = %e, "Failed to flush trace events.");
482        }
483    }
484
485    debug!("OTLP resource converter task stopped.");
486}
487
488#[cfg(test)]
489mod config_smoke {
490    use serde_json::json;
491
492    use super::OtlpConfiguration;
493    use crate::config_registry::structs;
494    use crate::config_registry::test_support::run_config_smoke_tests;
495
496    #[tokio::test]
497    async fn smoke_test() {
498        run_config_smoke_tests(structs::OTLP_CONFIGURATION, &[], json!({ "otlp_config": {} }), |cfg| {
499            OtlpConfiguration::from_configuration(&cfg).expect("OtlpConfiguration should deserialize")
500        })
501        .await
502    }
503}