Skip to main content

saluki_components/sources/otlp/
mod.rs

1use std::sync::Arc;
2use std::sync::LazyLock;
3use std::time::Duration;
4
5use async_trait::async_trait;
6use axum::body::Bytes;
7use bytesize::ByteSize;
8use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
9use otlp_protos::opentelemetry::proto::collector::logs::v1::ExportLogsServiceRequest;
10use otlp_protos::opentelemetry::proto::collector::metrics::v1::ExportMetricsServiceRequest;
11use otlp_protos::opentelemetry::proto::collector::trace::v1::ExportTraceServiceRequest;
12use otlp_protos::opentelemetry::proto::logs::v1::ResourceLogs as OtlpResourceLogs;
13use otlp_protos::opentelemetry::proto::metrics::v1::ResourceMetrics as OtlpResourceMetrics;
14use otlp_protos::opentelemetry::proto::trace::v1::ResourceSpans as OtlpResourceSpans;
15use prost::Message;
16use saluki_common::task::HandleExt as _;
17use saluki_config::GenericConfiguration;
18use saluki_context::ContextResolver;
19use saluki_core::topology::interconnect::BufferedDispatcher;
20use saluki_core::topology::shutdown::{DynamicShutdownCoordinator, DynamicShutdownHandle};
21use saluki_core::{
22    components::{
23        sources::{Source, SourceBuilder, SourceContext},
24        ComponentContext,
25    },
26    data_model::event::EventType,
27    topology::{EventsBuffer, OutputDefinition},
28};
29use saluki_env::WorkloadProvider;
30use saluki_error::ErrorContext as _;
31use saluki_error::{generic_error, GenericError};
32use saluki_io::net::ListenAddress;
33use serde::Deserialize;
34use tokio::select;
35use tokio::sync::mpsc;
36use tokio::time::{interval, MissedTickBehavior};
37use tracing::{debug, error};
38
39use crate::common::otlp::config::OtlpConfig;
40use crate::common::otlp::{build_metrics, Metrics, OtlpHandler, OtlpServerBuilder};
41
42mod logs;
43mod metrics;
44mod resolver;
45use self::logs::translator::OtlpLogsTranslator;
46use self::metrics::translator::OtlpMetricsTranslator;
47use self::resolver::build_context_resolver;
48use crate::common::otlp::origin::OtlpOriginTagResolver;
49use crate::common::otlp::traces::translator::OtlpTracesTranslator;
50
51const fn default_context_string_interner_size() -> ByteSize {
52    ByteSize::mib(2)
53}
54
55const fn default_cached_contexts_limit() -> usize {
56    500_000
57}
58
59const fn default_cached_tagsets_limit() -> usize {
60    500_000
61}
62
63const fn default_allow_context_heap_allocations() -> bool {
64    true
65}
66
67/// Configuration for the OTLP source.
68#[derive(Deserialize, Default)]
69pub struct OtlpConfiguration {
70    otlp_config: OtlpConfig,
71
72    /// Total size of the string interner used for contexts.
73    ///
74    /// This controls the amount of memory that can be used to intern metric names and tags. If the interner is full,
75    /// metrics with contexts that have not already been resolved may or may not be dropped, depending on the value of
76    /// `allow_context_heap_allocations`.
77    #[serde(
78        rename = "otlp_string_interner_size",
79        default = "default_context_string_interner_size"
80    )]
81    context_string_interner_bytes: ByteSize,
82
83    /// The maximum number of cached contexts to allow.
84    ///
85    /// This is the maximum number of resolved contexts that can be cached at any given time. This limit does not affect
86    /// the total number of contexts that can be _alive_ at any given time, which is dependent on the interner capacity
87    /// and whether or not heap allocations are allowed.
88    ///
89    /// Defaults to 500,000.
90    #[serde(rename = "otlp_cached_contexts_limit", default = "default_cached_contexts_limit")]
91    cached_contexts_limit: usize,
92
93    /// The maximum number of cached tagsets to allow.
94    ///
95    /// This is the maximum number of resolved tagsets that can be cached at any given time. This limit does not affect
96    /// the total number of tagsets that can be _alive_ at any given time, which is dependent on the interner capacity
97    /// and whether or not heap allocations are allowed.
98    ///
99    /// Defaults to 500,000.
100    #[serde(rename = "otlp_cached_tagsets_limit", default = "default_cached_tagsets_limit")]
101    cached_tagsets_limit: usize,
102
103    /// Whether or not to allow heap allocations when resolving contexts.
104    ///
105    /// When resolving contexts during parsing, the metric name and tags are interned to reduce memory usage. The
106    /// interner has a fixed size, however, which means some strings can fail to be interned if the interner is full.
107    /// When set to `true`, we allow these strings to be allocated on the heap like normal, but this can lead to
108    /// increased (unbounded) memory usage. When set to `false`, if the metric name and all of its tags cannot be
109    /// interned, the metric is skipped.
110    ///
111    /// Defaults to `true`.
112    #[serde(
113        rename = "otlp_allow_context_heap_allocs",
114        default = "default_allow_context_heap_allocations"
115    )]
116    allow_context_heap_allocations: bool,
117
118    /// Workload provider to utilize for origin detection/enrichment.
119    #[serde(skip)]
120    workload_provider: Option<Arc<dyn WorkloadProvider + Send + Sync>>,
121}
122
123impl OtlpConfiguration {
124    /// Creates a new `OTLPConfiguration` from the given configuration.
125    pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
126        Ok(config.as_typed()?)
127    }
128
129    /// Sets the workload provider to use for configuring origin detection/enrichment.
130    ///
131    /// A workload provider must be set otherwise origin detection/enrichment will not be enabled.
132    ///
133    /// Defaults to unset.
134    pub fn with_workload_provider<W>(mut self, workload_provider: W) -> Self
135    where
136        W: WorkloadProvider + Send + Sync + 'static,
137    {
138        self.workload_provider = Some(Arc::new(workload_provider));
139        self
140    }
141}
142
143#[async_trait]
144impl SourceBuilder for OtlpConfiguration {
145    fn outputs(&self) -> &[OutputDefinition<EventType>] {
146        static OUTPUTS: LazyLock<Vec<OutputDefinition<EventType>>> = LazyLock::new(|| {
147            vec![
148                OutputDefinition::named_output("metrics", EventType::Metric),
149                OutputDefinition::named_output("logs", EventType::Log),
150                OutputDefinition::named_output("traces", EventType::Trace),
151            ]
152        });
153
154        &OUTPUTS
155    }
156
157    async fn build(&self, context: ComponentContext) -> Result<Box<dyn Source + Send>, GenericError> {
158        if !self.otlp_config.metrics.enabled && !self.otlp_config.logs.enabled && !self.otlp_config.traces.enabled {
159            return Err(generic_error!(
160                "OTLP metrics, logs and traces support is disabled. Please enable at least one of them."
161            ));
162        }
163
164        let grpc_listen_str = format!(
165            "{}://{}",
166            self.otlp_config.receiver.protocols.grpc.transport, self.otlp_config.receiver.protocols.grpc.endpoint
167        );
168        let grpc_endpoint = ListenAddress::try_from(grpc_listen_str.as_str())
169            .map_err(|e| generic_error!("Invalid gRPC endpoint address '{}': {}", grpc_listen_str, e))?;
170
171        // Enforce the current limitation that we only support TCP for gRPC.
172        if !matches!(grpc_endpoint, ListenAddress::Tcp(_)) {
173            return Err(generic_error!("Only 'tcp' transport is supported for OTLP gRPC"));
174        }
175
176        let http_socket_addr = self.otlp_config.receiver.protocols.http.endpoint.parse().map_err(|e| {
177            generic_error!(
178                "Invalid HTTP endpoint address '{}': {}",
179                self.otlp_config.receiver.protocols.http.endpoint,
180                e
181            )
182        })?;
183
184        let maybe_origin_tags_resolver = self.workload_provider.clone().map(OtlpOriginTagResolver::new);
185
186        let context_resolver = build_context_resolver(self, &context, maybe_origin_tags_resolver.clone())?;
187        let metrics_translator_config = metrics::config::OtlpMetricsTranslatorConfig::default().with_remapping(true);
188        let traces_interner_size =
189            std::num::NonZeroUsize::new(self.otlp_config.traces.string_interner_bytes.as_u64() as usize)
190                .ok_or_else(|| generic_error!("otlp_config.traces.string_interner_size must be greater than 0"))?;
191        let traces_translator = OtlpTracesTranslator::new(self.otlp_config.traces.clone(), traces_interner_size);
192        let grpc_max_recv_msg_size_bytes =
193            self.otlp_config.receiver.protocols.grpc.max_recv_msg_size_mib as usize * 1024 * 1024;
194        let metrics = build_metrics(&context);
195
196        Ok(Box::new(Otlp {
197            context_resolver,
198            origin_tag_resolver: maybe_origin_tags_resolver,
199            grpc_endpoint,
200            http_endpoint: ListenAddress::Tcp(http_socket_addr),
201            grpc_max_recv_msg_size_bytes,
202            metrics_translator_config,
203            traces_translator,
204            metrics,
205        }))
206    }
207}
208
209impl MemoryBounds for OtlpConfiguration {
210    fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
211        builder
212            .minimum()
213            .with_single_value::<Otlp>("source struct")
214            .with_single_value::<SourceHandler>("source handler");
215    }
216}
217
218pub struct Otlp {
219    context_resolver: ContextResolver,
220    origin_tag_resolver: Option<OtlpOriginTagResolver>,
221    grpc_endpoint: ListenAddress,
222    http_endpoint: ListenAddress,
223    grpc_max_recv_msg_size_bytes: usize,
224    metrics_translator_config: metrics::config::OtlpMetricsTranslatorConfig,
225    traces_translator: OtlpTracesTranslator,
226    metrics: Metrics, // Telemetry metrics, not DD native metrics.
227}
228
229#[async_trait]
230impl Source for Otlp {
231    async fn run(self: Box<Self>, mut context: SourceContext) -> Result<(), GenericError> {
232        let Self {
233            context_resolver,
234            origin_tag_resolver,
235            grpc_endpoint,
236            http_endpoint,
237            grpc_max_recv_msg_size_bytes,
238            metrics_translator_config,
239            traces_translator,
240            metrics,
241        } = *self;
242
243        let mut global_shutdown = context.take_shutdown_handle();
244        let mut health = context.take_health_handle();
245        let memory_limiter = context.topology_context().memory_limiter();
246
247        // Create the internal channel for decoupling the servers from the converter.
248        let (tx, rx) = mpsc::channel::<OtlpResource>(1024);
249
250        let mut converter_shutdown_coordinator = DynamicShutdownCoordinator::default();
251
252        let metrics_translator = OtlpMetricsTranslator::new(metrics_translator_config, context_resolver);
253
254        let thread_pool_handle = context.topology_context().global_thread_pool().clone();
255
256        // Spawn the converter task. This task is shared by both servers.
257        thread_pool_handle.spawn_traced_named(
258            "otlp-resource-converter",
259            run_converter(
260                rx,
261                context.clone(),
262                origin_tag_resolver,
263                converter_shutdown_coordinator.register(),
264                metrics_translator,
265                metrics.clone(),
266                traces_translator,
267            ),
268        );
269
270        let handler = SourceHandler::new(tx);
271        let server_builder = OtlpServerBuilder::new(http_endpoint, grpc_endpoint, grpc_max_recv_msg_size_bytes);
272
273        let (http_shutdown, mut http_error) = server_builder
274            .build(handler, memory_limiter.clone(), thread_pool_handle, metrics)
275            .await?;
276
277        health.mark_ready();
278        debug!("OTLP source started.");
279
280        // Wait for the global shutdown signal, then notify converter to shutdown.
281        loop {
282            select! {
283                _ = &mut global_shutdown => {
284                    debug!("Received shutdown signal.");
285                    break
286                },
287                error = &mut http_error => {
288                    if let Some(error) = error {
289                        debug!(%error, "HTTP server error.");
290                    }
291                    break;
292                },
293                _ = health.live() => continue,
294            }
295        }
296
297        debug!("Stopping OTLP source...");
298
299        http_shutdown.shutdown();
300        converter_shutdown_coordinator.shutdown().await;
301
302        debug!("OTLP source stopped.");
303
304        Ok(())
305    }
306}
307
308enum OtlpResource {
309    Metrics(OtlpResourceMetrics),
310    Logs(OtlpResourceLogs),
311    Traces(OtlpResourceSpans),
312}
313
314/// Handler that decodes OTLP bytes and sends resources to the converter.
315struct SourceHandler {
316    tx: mpsc::Sender<OtlpResource>,
317}
318
319impl SourceHandler {
320    fn new(tx: mpsc::Sender<OtlpResource>) -> Self {
321        Self { tx }
322    }
323}
324
325#[async_trait]
326impl OtlpHandler for SourceHandler {
327    async fn handle_metrics(&self, body: Bytes) -> Result<(), GenericError> {
328        let request =
329            ExportMetricsServiceRequest::decode(body).error_context("Failed to decode metrics export request.")?;
330
331        for resource_metrics in request.resource_metrics {
332            self.tx
333                .send(OtlpResource::Metrics(resource_metrics))
334                .await
335                .error_context("Failed to send resource metrics to converter: channel is closed.")?;
336        }
337        Ok(())
338    }
339
340    async fn handle_logs(&self, body: Bytes) -> Result<(), GenericError> {
341        let request = ExportLogsServiceRequest::decode(body).error_context("Failed to decode logs export request.")?;
342
343        for resource_logs in request.resource_logs {
344            self.tx
345                .send(OtlpResource::Logs(resource_logs))
346                .await
347                .error_context("Failed to send resource logs to converter: channel is closed.")?;
348        }
349        Ok(())
350    }
351
352    async fn handle_traces(&self, body: Bytes) -> Result<(), GenericError> {
353        let request =
354            ExportTraceServiceRequest::decode(body).error_context("Failed to decode trace export request.")?;
355
356        for resource_spans in request.resource_spans {
357            self.tx
358                .send(OtlpResource::Traces(resource_spans))
359                .await
360                .error_context("Failed to send resource spans to converter: channel is closed.")?;
361        }
362        Ok(())
363    }
364}
365
366async fn run_converter(
367    mut receiver: mpsc::Receiver<OtlpResource>, source_context: SourceContext,
368    origin_tag_resolver: Option<OtlpOriginTagResolver>, shutdown_handle: DynamicShutdownHandle,
369    mut metrics_translator: OtlpMetricsTranslator, metrics: Metrics, mut traces_translator: OtlpTracesTranslator,
370) {
371    tokio::pin!(shutdown_handle);
372    debug!("OTLP resource converter task started.");
373
374    // Set a buffer flush interval of 100ms, which will ensure we always flush buffered events at least every 100ms if
375    // we're otherwise idle and not receiving packets from the client.
376    let mut buffer_flush = interval(Duration::from_millis(100));
377    buffer_flush.set_missed_tick_behavior(MissedTickBehavior::Delay);
378
379    let mut metrics_dispatcher: Option<BufferedDispatcher<'_, EventsBuffer>> = None;
380    let mut logs_dispatcher: Option<BufferedDispatcher<'_, EventsBuffer>> = None;
381    let mut traces_dispatcher: Option<BufferedDispatcher<'_, EventsBuffer>> = None;
382
383    loop {
384        select! {
385            Some(otlp_resource) = receiver.recv() => {
386                match otlp_resource {
387                    OtlpResource::Metrics(resource_metrics) => {
388                        match metrics_translator.translate_metrics(resource_metrics, &metrics) {
389                            Ok(events) => {
390                                for event in events {
391                                    let dispatcher = metrics_dispatcher.get_or_insert_with(|| {
392                                        source_context
393                                            .dispatcher()
394                                            .buffered_named("metrics")
395                                            .expect("metrics output should exist")
396                                    });
397                                    if let Err(e) = dispatcher.push(event).await {
398                                        error!(error = %e, "Failed to dispatch metric event.");
399                                    }
400                                }
401                            }
402                            Err(e) => {
403                                error!(error = %e, "Failed to handle resource metrics.");
404                            }
405                        }
406                    }
407                    OtlpResource::Logs(resource_logs) => {
408                        let translator = OtlpLogsTranslator::from_resource_logs(resource_logs, origin_tag_resolver.as_ref());
409                        for log_event in translator {
410                            metrics.logs_received().increment(1);
411
412                            let dispatcher = logs_dispatcher.get_or_insert_with(|| {
413                                source_context
414                                    .dispatcher()
415                                    .buffered_named("logs")
416                                    .expect("logs output should exist")
417                            });
418                            if let Err(e) = dispatcher.push(log_event).await {
419                                error!(error = %e, "Failed to dispatch log event.");
420                            }
421                        }
422                    }
423                    OtlpResource::Traces(resource_spans) => {
424                        for trace_event in traces_translator.translate_spans(resource_spans, &metrics) {
425                            let dispatcher = traces_dispatcher.get_or_insert_with(|| {
426                                source_context
427                                    .dispatcher()
428                                    .buffered_named("traces")
429                                    .expect("traces output should exist")
430                            });
431                            if let Err(e) = dispatcher.push(trace_event).await {
432                                error!(error = %e, "Failed to dispatch trace event.");
433                            }
434                        }
435                    }
436                }
437            },
438            _ = buffer_flush.tick() => {
439                if let Some(dispatcher) = metrics_dispatcher.take() {
440                    if let Err(e) = dispatcher.flush().await {
441                        error!(error = %e, "Failed to flush metric events.");
442                    }
443                }
444                if let Some(dispatcher) = logs_dispatcher.take() {
445                    if let Err(e) = dispatcher.flush().await {
446                        error!(error = %e, "Failed to flush log events.");
447                    }
448                }
449                if let Some(dispatcher) = traces_dispatcher.take() {
450                    if let Err(e) = dispatcher.flush().await {
451                        error!(error = %e, "Failed to flush trace events.");
452                    }
453                }
454            },
455            _ = &mut shutdown_handle => {
456                debug!("Converter task received shutdown signal.");
457                break;
458            }
459        }
460    }
461
462    if let Some(dispatcher) = metrics_dispatcher.take() {
463        if let Err(e) = dispatcher.flush().await {
464            error!(error = %e, "Failed to flush metric events.");
465        }
466    }
467    if let Some(dispatcher) = logs_dispatcher.take() {
468        if let Err(e) = dispatcher.flush().await {
469            error!(error = %e, "Failed to flush log events.");
470        }
471    }
472    if let Some(dispatcher) = traces_dispatcher.take() {
473        if let Err(e) = dispatcher.flush().await {
474            error!(error = %e, "Failed to flush trace events.");
475        }
476    }
477
478    debug!("OTLP resource converter task stopped.");
479}