saluki_components/sources/otlp/
mod.rs

1use std::sync::Arc;
2use std::sync::LazyLock;
3use std::time::Duration;
4
5use async_trait::async_trait;
6use axum::body::Bytes;
7use bytesize::ByteSize;
8use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
9use otlp_protos::opentelemetry::proto::collector::logs::v1::ExportLogsServiceRequest;
10use otlp_protos::opentelemetry::proto::collector::metrics::v1::ExportMetricsServiceRequest;
11use otlp_protos::opentelemetry::proto::collector::trace::v1::ExportTraceServiceRequest;
12use otlp_protos::opentelemetry::proto::logs::v1::ResourceLogs as OtlpResourceLogs;
13use otlp_protos::opentelemetry::proto::metrics::v1::ResourceMetrics as OtlpResourceMetrics;
14use otlp_protos::opentelemetry::proto::trace::v1::ResourceSpans as OtlpResourceSpans;
15use prost::Message;
16use saluki_common::task::HandleExt as _;
17use saluki_config::GenericConfiguration;
18use saluki_context::ContextResolver;
19use saluki_core::topology::interconnect::EventBufferManager;
20use saluki_core::topology::shutdown::{DynamicShutdownCoordinator, DynamicShutdownHandle};
21use saluki_core::{
22    components::{
23        sources::{Source, SourceBuilder, SourceContext},
24        ComponentContext,
25    },
26    data_model::event::{Event, EventType},
27    topology::{EventsBuffer, OutputDefinition},
28};
29use saluki_env::WorkloadProvider;
30use saluki_error::ErrorContext as _;
31use saluki_error::{generic_error, GenericError};
32use saluki_io::net::ListenAddress;
33use serde::Deserialize;
34use tokio::select;
35use tokio::sync::mpsc;
36use tokio::time::{interval, MissedTickBehavior};
37use tracing::{debug, error};
38
39use crate::common::otlp::config::OtlpConfig;
40use crate::common::otlp::config::TracesConfig;
41use crate::common::otlp::{build_metrics, Metrics, OtlpHandler, OtlpServerBuilder};
42
43mod logs;
44mod metrics;
45mod resolver;
46use self::logs::translator::OtlpLogsTranslator;
47use self::metrics::translator::OtlpMetricsTranslator;
48use self::resolver::build_context_resolver;
49use crate::common::otlp::origin::OtlpOriginTagResolver;
50use crate::common::otlp::traces::translator::OtlpTracesTranslator;
51
52const fn default_context_string_interner_size() -> ByteSize {
53    ByteSize::mib(2)
54}
55
56const fn default_cached_contexts_limit() -> usize {
57    500_000
58}
59
60const fn default_cached_tagsets_limit() -> usize {
61    500_000
62}
63
64const fn default_allow_context_heap_allocations() -> bool {
65    true
66}
67
68/// Configuration for the OTLP source.
69#[derive(Deserialize, Default)]
70pub struct OtlpConfiguration {
71    otlp_config: OtlpConfig,
72
73    /// Total size of the string interner used for contexts.
74    ///
75    /// This controls the amount of memory that can be used to intern metric names and tags. If the interner is full,
76    /// metrics with contexts that have not already been resolved may or may not be dropped, depending on the value of
77    /// `allow_context_heap_allocations`.
78    #[serde(
79        rename = "otlp_string_interner_size",
80        default = "default_context_string_interner_size"
81    )]
82    context_string_interner_bytes: ByteSize,
83
84    /// The maximum number of cached contexts to allow.
85    ///
86    /// This is the maximum number of resolved contexts that can be cached at any given time. This limit does not affect
87    /// the total number of contexts that can be _alive_ at any given time, which is dependent on the interner capacity
88    /// and whether or not heap allocations are allowed.
89    ///
90    /// Defaults to 500,000.
91    #[serde(rename = "otlp_cached_contexts_limit", default = "default_cached_contexts_limit")]
92    cached_contexts_limit: usize,
93
94    /// The maximum number of cached tagsets to allow.
95    ///
96    /// This is the maximum number of resolved tagsets that can be cached at any given time. This limit does not affect
97    /// the total number of tagsets that can be _alive_ at any given time, which is dependent on the interner capacity
98    /// and whether or not heap allocations are allowed.
99    ///
100    /// Defaults to 500,000.
101    #[serde(rename = "otlp_cached_tagsets_limit", default = "default_cached_tagsets_limit")]
102    cached_tagsets_limit: usize,
103
104    /// Whether or not to allow heap allocations when resolving contexts.
105    ///
106    /// When resolving contexts during parsing, the metric name and tags are interned to reduce memory usage. The
107    /// interner has a fixed size, however, which means some strings can fail to be interned if the interner is full.
108    /// When set to `true`, we allow these strings to be allocated on the heap like normal, but this can lead to
109    /// increased (unbounded) memory usage. When set to `false`, if the metric name and all of its tags cannot be
110    /// interned, the metric is skipped.
111    ///
112    /// Defaults to `true`.
113    #[serde(
114        rename = "otlp_allow_context_heap_allocs",
115        default = "default_allow_context_heap_allocations"
116    )]
117    allow_context_heap_allocations: bool,
118
119    /// Workload provider to utilize for origin detection/enrichment.
120    #[serde(skip)]
121    workload_provider: Option<Arc<dyn WorkloadProvider + Send + Sync>>,
122}
123
124impl OtlpConfiguration {
125    /// Creates a new `OTLPConfiguration` from the given configuration.
126    pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
127        Ok(config.as_typed()?)
128    }
129
130    /// Sets the workload provider to use for configuring origin detection/enrichment.
131    ///
132    /// A workload provider must be set otherwise origin detection/enrichment will not be enabled.
133    ///
134    /// Defaults to unset.
135    pub fn with_workload_provider<W>(mut self, workload_provider: W) -> Self
136    where
137        W: WorkloadProvider + Send + Sync + 'static,
138    {
139        self.workload_provider = Some(Arc::new(workload_provider));
140        self
141    }
142}
143
144#[async_trait]
145impl SourceBuilder for OtlpConfiguration {
146    fn outputs(&self) -> &[OutputDefinition<EventType>] {
147        static OUTPUTS: LazyLock<Vec<OutputDefinition<EventType>>> = LazyLock::new(|| {
148            vec![
149                OutputDefinition::named_output("metrics", EventType::Metric),
150                OutputDefinition::named_output("logs", EventType::Log),
151                OutputDefinition::named_output("traces", EventType::Trace),
152            ]
153        });
154
155        &OUTPUTS
156    }
157
158    async fn build(&self, context: ComponentContext) -> Result<Box<dyn Source + Send>, GenericError> {
159        if !self.otlp_config.metrics.enabled && !self.otlp_config.logs.enabled && !self.otlp_config.traces.enabled {
160            return Err(generic_error!(
161                "OTLP metrics, logs and traces support is disabled. Please enable at least one of them."
162            ));
163        }
164
165        let grpc_listen_str = format!(
166            "{}://{}",
167            self.otlp_config.receiver.protocols.grpc.transport, self.otlp_config.receiver.protocols.grpc.endpoint
168        );
169        let grpc_endpoint = ListenAddress::try_from(grpc_listen_str.as_str())
170            .map_err(|e| generic_error!("Invalid gRPC endpoint address '{}': {}", grpc_listen_str, e))?;
171
172        // Enforce the current limitation that we only support TCP for gRPC.
173        if !matches!(grpc_endpoint, ListenAddress::Tcp(_)) {
174            return Err(generic_error!("Only 'tcp' transport is supported for OTLP gRPC"));
175        }
176
177        let http_socket_addr = self.otlp_config.receiver.protocols.http.endpoint.parse().map_err(|e| {
178            generic_error!(
179                "Invalid HTTP endpoint address '{}': {}",
180                self.otlp_config.receiver.protocols.http.endpoint,
181                e
182            )
183        })?;
184
185        let maybe_origin_tags_resolver = self.workload_provider.clone().map(OtlpOriginTagResolver::new);
186
187        let context_resolver = build_context_resolver(self, &context, maybe_origin_tags_resolver.clone())?;
188        let metrics_translator_config = metrics::config::OtlpMetricsTranslatorConfig::default().with_remapping(true);
189        let traces_config = self.otlp_config.traces.clone();
190        let grpc_max_recv_msg_size_bytes =
191            self.otlp_config.receiver.protocols.grpc.max_recv_msg_size_mib as usize * 1024 * 1024;
192        let metrics = build_metrics(&context);
193
194        Ok(Box::new(Otlp {
195            context_resolver,
196            origin_tag_resolver: maybe_origin_tags_resolver,
197            grpc_endpoint,
198            http_endpoint: ListenAddress::Tcp(http_socket_addr),
199            grpc_max_recv_msg_size_bytes,
200            metrics_translator_config,
201            traces_config,
202            metrics,
203        }))
204    }
205}
206
207impl MemoryBounds for OtlpConfiguration {
208    fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
209        builder
210            .minimum()
211            .with_single_value::<Otlp>("source struct")
212            .with_single_value::<SourceHandler>("source handler");
213    }
214}
215
216pub struct Otlp {
217    context_resolver: ContextResolver,
218    origin_tag_resolver: Option<OtlpOriginTagResolver>,
219    grpc_endpoint: ListenAddress,
220    http_endpoint: ListenAddress,
221    grpc_max_recv_msg_size_bytes: usize,
222    metrics_translator_config: metrics::config::OtlpMetricsTranslatorConfig,
223    traces_config: TracesConfig,
224    metrics: Metrics, // Telemetry metrics, not DD native metrics.
225}
226
227#[async_trait]
228impl Source for Otlp {
229    async fn run(self: Box<Self>, mut context: SourceContext) -> Result<(), GenericError> {
230        let Self {
231            context_resolver,
232            origin_tag_resolver,
233            grpc_endpoint,
234            http_endpoint,
235            grpc_max_recv_msg_size_bytes,
236            metrics_translator_config,
237            traces_config,
238            metrics,
239        } = *self;
240
241        let mut global_shutdown = context.take_shutdown_handle();
242        let mut health = context.take_health_handle();
243        let memory_limiter = context.topology_context().memory_limiter();
244
245        // Create the internal channel for decoupling the servers from the converter.
246        let (tx, rx) = mpsc::channel::<OtlpResource>(1024);
247
248        let mut converter_shutdown_coordinator = DynamicShutdownCoordinator::default();
249
250        let metrics_translator = OtlpMetricsTranslator::new(metrics_translator_config, context_resolver);
251
252        let thread_pool_handle = context.topology_context().global_thread_pool().clone();
253
254        // Spawn the converter task. This task is shared by both servers.
255        thread_pool_handle.spawn_traced_named(
256            "otlp-resource-converter",
257            run_converter(
258                rx,
259                context.clone(),
260                origin_tag_resolver,
261                converter_shutdown_coordinator.register(),
262                metrics_translator,
263                metrics.clone(),
264                traces_config,
265            ),
266        );
267
268        let handler = SourceHandler::new(tx);
269        let server_builder = OtlpServerBuilder::new(http_endpoint, grpc_endpoint, grpc_max_recv_msg_size_bytes);
270
271        let (http_shutdown, mut http_error) = server_builder
272            .build(handler, memory_limiter.clone(), thread_pool_handle, metrics)
273            .await?;
274
275        health.mark_ready();
276        debug!("OTLP source started.");
277
278        // Wait for the global shutdown signal, then notify converter to shutdown.
279        loop {
280            select! {
281                _ = &mut global_shutdown => {
282                    debug!("Received shutdown signal.");
283                    break
284                },
285                error = &mut http_error => {
286                    if let Some(error) = error {
287                        debug!(%error, "HTTP server error.");
288                    }
289                    break;
290                },
291                _ = health.live() => continue,
292            }
293        }
294
295        debug!("Stopping OTLP source...");
296
297        http_shutdown.shutdown();
298        converter_shutdown_coordinator.shutdown().await;
299
300        debug!("OTLP source stopped.");
301
302        Ok(())
303    }
304}
305
306enum OtlpResource {
307    Metrics(OtlpResourceMetrics),
308    Logs(OtlpResourceLogs),
309    Traces(OtlpResourceSpans),
310}
311
312/// Handler that decodes OTLP bytes and sends resources to the converter.
313struct SourceHandler {
314    tx: mpsc::Sender<OtlpResource>,
315}
316
317impl SourceHandler {
318    fn new(tx: mpsc::Sender<OtlpResource>) -> Self {
319        Self { tx }
320    }
321}
322
323#[async_trait]
324impl OtlpHandler for SourceHandler {
325    async fn handle_metrics(&self, body: Bytes) -> Result<(), GenericError> {
326        let request =
327            ExportMetricsServiceRequest::decode(body).error_context("Failed to decode metrics export request.")?;
328
329        for resource_metrics in request.resource_metrics {
330            self.tx
331                .send(OtlpResource::Metrics(resource_metrics))
332                .await
333                .error_context("Failed to send resource metrics to converter: channel is closed.")?;
334        }
335        Ok(())
336    }
337
338    async fn handle_logs(&self, body: Bytes) -> Result<(), GenericError> {
339        let request = ExportLogsServiceRequest::decode(body).error_context("Failed to decode logs export request.")?;
340
341        for resource_logs in request.resource_logs {
342            self.tx
343                .send(OtlpResource::Logs(resource_logs))
344                .await
345                .error_context("Failed to send resource logs to converter: channel is closed.")?;
346        }
347        Ok(())
348    }
349
350    async fn handle_traces(&self, body: Bytes) -> Result<(), GenericError> {
351        let request =
352            ExportTraceServiceRequest::decode(body).error_context("Failed to decode trace export request.")?;
353
354        for resource_spans in request.resource_spans {
355            self.tx
356                .send(OtlpResource::Traces(resource_spans))
357                .await
358                .error_context("Failed to send resource spans to converter: channel is closed.")?;
359        }
360        Ok(())
361    }
362}
363
364async fn dispatch_events(mut events: EventsBuffer, source_context: &SourceContext) {
365    if events.is_empty() {
366        return;
367    }
368
369    if events.has_event_type(EventType::Trace) {
370        let mut buffered_dispatcher = source_context
371            .dispatcher()
372            .buffered_named("traces")
373            .expect("traces output should exist");
374        for trace_event in events.extract(Event::is_trace) {
375            if let Err(e) = buffered_dispatcher.push(trace_event).await {
376                error!(error = %e, "Failed to dispatch trace(s).");
377            }
378        }
379        if let Err(e) = buffered_dispatcher.flush().await {
380            error!(error = %e, "Failed to flush trace(s).");
381        }
382    }
383
384    if events.has_event_type(EventType::Log) {
385        let mut buffered_dispatcher = source_context
386            .dispatcher()
387            .buffered_named("logs")
388            .expect("logs output should exist");
389
390        for log_event in events.extract(Event::is_log) {
391            if let Err(e) = buffered_dispatcher.push(log_event).await {
392                error!(error = %e, "Failed to dispatch log(s).");
393            }
394        }
395
396        if let Err(e) = buffered_dispatcher.flush().await {
397            error!(error = %e, "Failed to flush log(s).");
398        }
399    }
400
401    let len = events.len();
402    if let Err(e) = source_context.dispatcher().dispatch_named("metrics", events).await {
403        error!(error = %e, "Failed to dispatch metric events.");
404    } else {
405        debug!(events_len = len, "Dispatched metric events.");
406    }
407}
408
409async fn run_converter(
410    mut receiver: mpsc::Receiver<OtlpResource>, source_context: SourceContext,
411    origin_tag_resolver: Option<OtlpOriginTagResolver>, shutdown_handle: DynamicShutdownHandle,
412    mut metrics_translator: OtlpMetricsTranslator, metrics: Metrics, traces_config: TracesConfig,
413) {
414    tokio::pin!(shutdown_handle);
415    debug!("OTLP resource converter task started.");
416
417    // Set a buffer flush interval of 100ms, which will ensure we always flush buffered events at least every 100ms if
418    // we're otherwise idle and not receiving packets from the client.
419    let mut buffer_flush = interval(Duration::from_millis(100));
420    buffer_flush.set_missed_tick_behavior(MissedTickBehavior::Delay);
421
422    let mut event_buffer_manager = EventBufferManager::default();
423
424    let traces_translator = OtlpTracesTranslator::new(traces_config);
425
426    loop {
427        select! {
428            Some(otlp_resource) = receiver.recv() => {
429                match otlp_resource {
430                    OtlpResource::Metrics(resource_metrics) => {
431                        match metrics_translator.map_metrics(resource_metrics, &metrics) {
432                            Ok(events) => {
433                                for event in events {
434                                    if let Some(event_buffer) = event_buffer_manager.try_push(event) {
435                                        dispatch_events(event_buffer, &source_context).await;
436                                    }
437                                }
438                            }
439                            Err(e) => {
440                                error!(error = %e, "Failed to handle resource metrics.");
441                            }
442                        }
443                    }
444                    OtlpResource::Logs(resource_logs) => {
445                        let translator = OtlpLogsTranslator::from_resource_logs(resource_logs, origin_tag_resolver.as_ref());
446                        for log_event in translator {
447                            metrics.logs_received().increment(1);
448
449                            if let Some(event_buffer) = event_buffer_manager.try_push(log_event) {
450                                dispatch_events(event_buffer, &source_context).await;
451                            }
452                        }
453                    }
454                    OtlpResource::Traces(resource_spans) => {
455                        let trace_events =
456                            traces_translator.translate_resource_spans(resource_spans, &metrics);
457                        for trace_event in trace_events {
458                            if let Some(event_buffer) = event_buffer_manager.try_push(trace_event) {
459                                dispatch_events(event_buffer, &source_context).await;
460                            }
461                        }
462                    }
463                }
464            },
465            _ = buffer_flush.tick() => {
466                if let Some(event_buffer) = event_buffer_manager.consume() {
467                    dispatch_events(event_buffer, &source_context).await;
468                }
469            },
470            _ = &mut shutdown_handle => {
471                debug!("Converter task received shutdown signal.");
472                break;
473            }
474        }
475    }
476
477    if let Some(event_buffer) = event_buffer_manager.consume() {
478        dispatch_events(event_buffer, &source_context).await;
479    }
480
481    debug!("OTLP resource converter task stopped.");
482}