saluki_components/sources/otlp/
mod.rs

1use std::sync::Arc;
2use std::sync::LazyLock;
3use std::time::Duration;
4
5use async_trait::async_trait;
6use axum::body::Bytes;
7use bytesize::ByteSize;
8use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
9use otlp_protos::opentelemetry::proto::collector::logs::v1::ExportLogsServiceRequest;
10use otlp_protos::opentelemetry::proto::collector::metrics::v1::ExportMetricsServiceRequest;
11use otlp_protos::opentelemetry::proto::collector::trace::v1::ExportTraceServiceRequest;
12use otlp_protos::opentelemetry::proto::logs::v1::ResourceLogs as OtlpResourceLogs;
13use otlp_protos::opentelemetry::proto::metrics::v1::ResourceMetrics as OtlpResourceMetrics;
14use otlp_protos::opentelemetry::proto::trace::v1::ResourceSpans as OtlpResourceSpans;
15use prost::Message;
16use saluki_common::task::HandleExt as _;
17use saluki_config::GenericConfiguration;
18use saluki_context::ContextResolver;
19use saluki_core::topology::interconnect::EventBufferManager;
20use saluki_core::topology::shutdown::{DynamicShutdownCoordinator, DynamicShutdownHandle};
21use saluki_core::{
22    components::{
23        sources::{Source, SourceBuilder, SourceContext},
24        ComponentContext,
25    },
26    data_model::event::{Event, EventType},
27    topology::{EventsBuffer, OutputDefinition},
28};
29use saluki_env::WorkloadProvider;
30use saluki_error::{generic_error, GenericError};
31use saluki_io::net::ListenAddress;
32use serde::Deserialize;
33use tokio::select;
34use tokio::sync::mpsc;
35use tokio::time::{interval, MissedTickBehavior};
36use tracing::{debug, error};
37
38use crate::common::otlp::config::Receiver;
39use crate::common::otlp::{build_metrics, Metrics, OtlpHandler, OtlpServerBuilder};
40
41mod attributes;
42mod logs;
43mod metrics;
44mod origin;
45mod resolver;
46use self::logs::translator::OtlpLogsTranslator;
47use self::metrics::translator::OtlpMetricsTranslator;
48use self::origin::OtlpOriginTagResolver;
49use self::resolver::build_context_resolver;
50
51const fn default_context_string_interner_size() -> ByteSize {
52    ByteSize::mib(2)
53}
54
55const fn default_cached_contexts_limit() -> usize {
56    500_000
57}
58
59const fn default_cached_tagsets_limit() -> usize {
60    500_000
61}
62
63const fn default_allow_context_heap_allocations() -> bool {
64    true
65}
66
67/// Configuration for the OTLP source.
68#[derive(Deserialize, Default)]
69pub struct OtlpConfiguration {
70    otlp_config: OtlpConfig,
71
72    /// Total size of the string interner used for contexts.
73    ///
74    /// This controls the amount of memory that can be used to intern metric names and tags. If the interner is full,
75    /// metrics with contexts that have not already been resolved may or may not be dropped, depending on the value of
76    /// `allow_context_heap_allocations`.
77    #[serde(
78        rename = "otlp_string_interner_size",
79        default = "default_context_string_interner_size"
80    )]
81    context_string_interner_bytes: ByteSize,
82
83    /// The maximum number of cached contexts to allow.
84    ///
85    /// This is the maximum number of resolved contexts that can be cached at any given time. This limit does not affect
86    /// the total number of contexts that can be _alive_ at any given time, which is dependent on the interner capacity
87    /// and whether or not heap allocations are allowed.
88    ///
89    /// Defaults to 500,000.
90    #[serde(rename = "otlp_cached_contexts_limit", default = "default_cached_contexts_limit")]
91    cached_contexts_limit: usize,
92
93    /// The maximum number of cached tagsets to allow.
94    ///
95    /// This is the maximum number of resolved tagsets that can be cached at any given time. This limit does not affect
96    /// the total number of tagsets that can be _alive_ at any given time, which is dependent on the interner capacity
97    /// and whether or not heap allocations are allowed.
98    ///
99    /// Defaults to 500,000.
100    #[serde(rename = "otlp_cached_tagsets_limit", default = "default_cached_tagsets_limit")]
101    cached_tagsets_limit: usize,
102
103    /// Whether or not to allow heap allocations when resolving contexts.
104    ///
105    /// When resolving contexts during parsing, the metric name and tags are interned to reduce memory usage. The
106    /// interner has a fixed size, however, which means some strings can fail to be interned if the interner is full.
107    /// When set to `true`, we allow these strings to be allocated on the heap like normal, but this can lead to
108    /// increased (unbounded) memory usage. When set to `false`, if the metric name and all of its tags cannot be
109    /// interned, the metric is skipped.
110    ///
111    /// Defaults to `true`.
112    #[serde(
113        rename = "otlp_allow_context_heap_allocs",
114        default = "default_allow_context_heap_allocations"
115    )]
116    allow_context_heap_allocations: bool,
117
118    /// Workload provider to utilize for origin detection/enrichment.
119    #[serde(skip)]
120    workload_provider: Option<Arc<dyn WorkloadProvider + Send + Sync>>,
121}
122
123#[derive(Deserialize, Debug, Default)]
124pub struct OtlpConfig {
125    #[serde(default)]
126    receiver: Receiver,
127    #[serde(default)]
128    metrics: MetricsConfig,
129    #[serde(default)]
130    logs: LogsConfig,
131    #[serde(default)]
132    traces: TracesConfig,
133}
134
135enum OtlpResource {
136    Metrics(OtlpResourceMetrics),
137    Logs(OtlpResourceLogs),
138    Traces(OtlpResourceSpans),
139}
140
141/// Handler that decodes OTLP bytes and sends resources to the converter.
142struct SourceHandler {
143    tx: mpsc::Sender<OtlpResource>,
144}
145
146impl SourceHandler {
147    fn new(tx: mpsc::Sender<OtlpResource>) -> Self {
148        Self { tx }
149    }
150}
151
152#[async_trait]
153impl OtlpHandler for SourceHandler {
154    async fn handle_metrics(&self, body: Bytes) -> Result<(), String> {
155        match ExportMetricsServiceRequest::decode(body) {
156            Ok(request) => {
157                for resource_metrics in request.resource_metrics {
158                    if self.tx.send(OtlpResource::Metrics(resource_metrics)).await.is_err() {
159                        error!("Failed to send resource metrics to converter; channel is closed.");
160                        return Err("Internal processing channel closed.".to_string());
161                    }
162                }
163                Ok(())
164            }
165            Err(e) => {
166                error!(error = %e, "Failed to decode OTLP protobuf request.");
167                Err("Bad Request: Invalid protobuf.".to_string())
168            }
169        }
170    }
171
172    async fn handle_logs(&self, body: Bytes) -> Result<(), String> {
173        match ExportLogsServiceRequest::decode(body) {
174            Ok(request) => {
175                for resource_logs in request.resource_logs {
176                    if self.tx.send(OtlpResource::Logs(resource_logs)).await.is_err() {
177                        error!("Failed to send resource logs to converter; channel is closed.");
178                        return Err("Internal processing channel closed.".to_string());
179                    }
180                }
181                Ok(())
182            }
183            Err(e) => {
184                error!(error = %e, "Failed to decode OTLP protobuf request.");
185                Err("Bad Request: Invalid protobuf.".to_string())
186            }
187        }
188    }
189
190    async fn handle_traces(&self, body: Bytes) -> Result<(), String> {
191        match ExportTraceServiceRequest::decode(body) {
192            Ok(request) => {
193                for resource_spans in request.resource_spans {
194                    if self.tx.send(OtlpResource::Traces(resource_spans)).await.is_err() {
195                        error!("Failed to send resource spans to converter; channel is closed.");
196                        return Err("Internal processing channel closed.".to_string());
197                    }
198                }
199                Ok(())
200            }
201            Err(e) => {
202                error!(error = %e, "Failed to decode OTLP protobuf request.");
203                Err("Bad Request: Invalid protobuf.".to_string())
204            }
205        }
206    }
207}
208
209#[derive(Deserialize, Debug)]
210pub struct LogsConfig {
211    /// Whether to enable OTLP logs support.
212    ///
213    /// Defaults to true.
214    #[serde(default = "default_logs_enabled")]
215    pub enabled: bool,
216}
217
218fn default_logs_enabled() -> bool {
219    true
220}
221
222impl Default for LogsConfig {
223    fn default() -> Self {
224        Self {
225            enabled: default_logs_enabled(),
226        }
227    }
228}
229
230#[derive(Deserialize, Debug)]
231pub struct MetricsConfig {
232    /// Whether to enable OTLP metrics support.
233    ///
234    /// Defaults to true.
235    #[serde(default = "default_metrics_enabled")]
236    pub enabled: bool,
237}
238
239fn default_metrics_enabled() -> bool {
240    true
241}
242
243impl Default for MetricsConfig {
244    fn default() -> Self {
245        Self {
246            enabled: default_metrics_enabled(),
247        }
248    }
249}
250
251#[derive(Deserialize, Debug)]
252pub struct TracesConfig {
253    /// Whether to enable OTLP traces support.
254    ///
255    /// Defaults to true.
256    #[serde(default = "default_traces_enabled")]
257    pub enabled: bool,
258}
259
260fn default_traces_enabled() -> bool {
261    true
262}
263
264impl Default for TracesConfig {
265    fn default() -> Self {
266        Self {
267            enabled: default_traces_enabled(),
268        }
269    }
270}
271impl OtlpConfiguration {
272    /// Creates a new `OTLPConfiguration` from the given configuration.
273    pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
274        Ok(config.as_typed()?)
275    }
276
277    /// Sets the workload provider to use for configuring origin detection/enrichment.
278    ///
279    /// A workload provider must be set otherwise origin detection/enrichment will not be enabled.
280    ///
281    /// Defaults to unset.
282    pub fn with_workload_provider<W>(mut self, workload_provider: W) -> Self
283    where
284        W: WorkloadProvider + Send + Sync + 'static,
285    {
286        self.workload_provider = Some(Arc::new(workload_provider));
287        self
288    }
289}
290
291#[async_trait]
292impl SourceBuilder for OtlpConfiguration {
293    fn outputs(&self) -> &[OutputDefinition] {
294        static OUTPUTS: LazyLock<Vec<OutputDefinition>> = LazyLock::new(|| {
295            vec![
296                OutputDefinition::named_output("metrics", EventType::Metric),
297                OutputDefinition::named_output("logs", EventType::Log),
298                OutputDefinition::named_output("traces", EventType::Trace),
299            ]
300        });
301
302        &OUTPUTS
303    }
304
305    async fn build(&self, context: ComponentContext) -> Result<Box<dyn Source + Send>, GenericError> {
306        if !self.otlp_config.metrics.enabled && !self.otlp_config.logs.enabled && !self.otlp_config.traces.enabled {
307            return Err(generic_error!(
308                "OTLP metrics, logs and traces support is disabled. Please enable at least one of them."
309            ));
310        }
311
312        let grpc_listen_str = format!(
313            "{}://{}",
314            self.otlp_config.receiver.protocols.grpc.transport, self.otlp_config.receiver.protocols.grpc.endpoint
315        );
316        let grpc_endpoint = ListenAddress::try_from(grpc_listen_str.as_str())
317            .map_err(|e| generic_error!("Invalid gRPC endpoint address '{}': {}", grpc_listen_str, e))?;
318
319        // Enforce the current limitation that we only support TCP for gRPC.
320        if !matches!(grpc_endpoint, ListenAddress::Tcp(_)) {
321            return Err(generic_error!("Only 'tcp' transport is supported for OTLP gRPC"));
322        }
323
324        let http_socket_addr = self.otlp_config.receiver.protocols.http.endpoint.parse().map_err(|e| {
325            generic_error!(
326                "Invalid HTTP endpoint address '{}': {}",
327                self.otlp_config.receiver.protocols.http.endpoint,
328                e
329            )
330        })?;
331
332        let maybe_origin_tags_resolver = self.workload_provider.clone().map(OtlpOriginTagResolver::new);
333
334        let context_resolver = build_context_resolver(self, &context, maybe_origin_tags_resolver.clone())?;
335        let translator_config = metrics::config::OtlpTranslatorConfig::default().with_remapping(true);
336        let grpc_max_recv_msg_size_bytes =
337            self.otlp_config.receiver.protocols.grpc.max_recv_msg_size_mib as usize * 1024 * 1024;
338        let metrics = build_metrics(&context);
339
340        Ok(Box::new(Otlp {
341            context_resolver,
342            origin_tag_resolver: maybe_origin_tags_resolver,
343            grpc_endpoint,
344            http_endpoint: ListenAddress::Tcp(http_socket_addr),
345            grpc_max_recv_msg_size_bytes,
346            translator_config,
347            metrics,
348        }))
349    }
350}
351
352impl MemoryBounds for OtlpConfiguration {
353    fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
354        builder
355            .minimum()
356            .with_single_value::<Otlp>("source struct")
357            .with_single_value::<SourceHandler>("source handler");
358    }
359}
360
361pub struct Otlp {
362    context_resolver: ContextResolver,
363    origin_tag_resolver: Option<OtlpOriginTagResolver>,
364    grpc_endpoint: ListenAddress,
365    http_endpoint: ListenAddress,
366    grpc_max_recv_msg_size_bytes: usize,
367    translator_config: metrics::config::OtlpTranslatorConfig,
368    metrics: Metrics, // Telemetry metrics, not DD native metrics.
369}
370
371#[async_trait]
372impl Source for Otlp {
373    async fn run(self: Box<Self>, mut context: SourceContext) -> Result<(), GenericError> {
374        let mut global_shutdown = context.take_shutdown_handle();
375        let mut health = context.take_health_handle();
376        let memory_limiter = context.topology_context().memory_limiter();
377
378        // Create the internal channel for decoupling the servers from the converter.
379        let (tx, rx) = mpsc::channel::<OtlpResource>(1024);
380
381        let mut converter_shutdown_coordinator = DynamicShutdownCoordinator::default();
382
383        let metrics_translator = OtlpMetricsTranslator::new(self.translator_config, self.context_resolver);
384
385        let thread_pool_handle = context.topology_context().global_thread_pool().clone();
386
387        let metrics_arc = Arc::new(self.metrics);
388
389        // Spawn the converter task. This task is shared by both servers.
390        thread_pool_handle.spawn_traced_named(
391            "otlp-resource-converter",
392            run_converter(
393                rx,
394                context.clone(),
395                self.origin_tag_resolver,
396                converter_shutdown_coordinator.register(),
397                metrics_translator,
398                metrics_arc.clone(),
399            ),
400        );
401
402        let handler = SourceHandler::new(tx);
403        let server_builder = OtlpServerBuilder::new(
404            self.http_endpoint,
405            self.grpc_endpoint,
406            self.grpc_max_recv_msg_size_bytes,
407        );
408
409        let (http_shutdown, mut http_error) = server_builder
410            .build(handler, memory_limiter.clone(), thread_pool_handle, metrics_arc.clone())
411            .await?;
412
413        health.mark_ready();
414        debug!("OTLP source started.");
415
416        // Wait for the global shutdown signal, then notify converter to shutdown.
417        loop {
418            select! {
419                _ = &mut global_shutdown => {
420                    debug!("Received shutdown signal.");
421                    break
422                },
423                error = &mut http_error => {
424                    if let Some(error) = error {
425                        debug!(%error, "HTTP server error.");
426                    }
427                    break;
428                },
429                _ = health.live() => continue,
430            }
431        }
432
433        debug!("Stopping OTLP source...");
434
435        http_shutdown.shutdown();
436        converter_shutdown_coordinator.shutdown().await;
437
438        debug!("OTLP source stopped.");
439
440        Ok(())
441    }
442}
443
444async fn dispatch_events(mut events: EventsBuffer, source_context: &SourceContext) {
445    if events.is_empty() {
446        return;
447    }
448
449    if events.has_event_type(EventType::Trace) {
450        let mut buffered_dispatcher = source_context
451            .dispatcher()
452            .buffered_named("traces")
453            .expect("traces output should exist");
454        for trace_event in events.extract(Event::is_trace) {
455            if let Err(e) = buffered_dispatcher.push(trace_event).await {
456                error!(error = %e, "Failed to dispatch trace(s).");
457            }
458        }
459        if let Err(e) = buffered_dispatcher.flush().await {
460            error!(error = %e, "Failed to flush trace(s).");
461        }
462    }
463
464    if events.has_event_type(EventType::Log) {
465        let mut buffered_dispatcher = source_context
466            .dispatcher()
467            .buffered_named("logs")
468            .expect("logs output should exist");
469
470        for log_event in events.extract(Event::is_log) {
471            if let Err(e) = buffered_dispatcher.push(log_event).await {
472                error!(error = %e, "Failed to dispatch log(s).");
473            }
474        }
475
476        if let Err(e) = buffered_dispatcher.flush().await {
477            error!(error = %e, "Failed to flush log(s).");
478        }
479    }
480
481    let len = events.len();
482    if let Err(e) = source_context.dispatcher().dispatch_named("metrics", events).await {
483        error!(error = %e, "Failed to dispatch metric events.");
484    } else {
485        debug!(events_len = len, "Dispatched metric events.");
486    }
487}
488
489async fn run_converter(
490    mut receiver: mpsc::Receiver<OtlpResource>, source_context: SourceContext,
491    origin_tag_resolver: Option<OtlpOriginTagResolver>, shutdown_handle: DynamicShutdownHandle,
492    mut metrics_translator: OtlpMetricsTranslator, metrics: Arc<Metrics>,
493) {
494    tokio::pin!(shutdown_handle);
495    debug!("OTLP resource converter task started.");
496
497    // Set a buffer flush interval of 100ms, which will ensure we always flush buffered events at least every 100ms if
498    // we're otherwise idle and not receiving packets from the client.
499    let mut buffer_flush = interval(Duration::from_millis(100));
500    buffer_flush.set_missed_tick_behavior(MissedTickBehavior::Delay);
501
502    let mut event_buffer_manager = EventBufferManager::default();
503
504    loop {
505        select! {
506            Some(otlp_resource) = receiver.recv() => {
507                match otlp_resource {
508                    OtlpResource::Metrics(resource_metrics) => {
509                        match metrics_translator.map_metrics(resource_metrics, &metrics) {
510                            Ok(events) => {
511                                for event in events {
512                                    if let Some(event_buffer) = event_buffer_manager.try_push(event) {
513                                        dispatch_events(event_buffer, &source_context).await;
514                                    }
515                                }
516                            }
517                            Err(e) => {
518                                error!(error = %e, "Failed to handle resource metrics.");
519                            }
520                        }
521                    }
522                    OtlpResource::Logs(resource_logs) => {
523                        let translator = OtlpLogsTranslator::from_resource_logs(resource_logs, origin_tag_resolver.as_ref());
524                        for log_event in translator {
525                            metrics.logs_received().increment(1);
526
527                            if let Some(event_buffer) = event_buffer_manager.try_push(log_event) {
528                                dispatch_events(event_buffer, &source_context).await;
529                            }
530                        }
531                    }
532                    OtlpResource::Traces(_resource_spans) => {
533                        // TODO: Implement traces translation.
534                    }
535                }
536            },
537            _ = buffer_flush.tick() => {
538                if let Some(event_buffer) = event_buffer_manager.consume() {
539                    dispatch_events(event_buffer, &source_context).await;
540                }
541            },
542            _ = &mut shutdown_handle => {
543                debug!("Converter task received shutdown signal.");
544                break;
545            }
546        }
547    }
548
549    if let Some(event_buffer) = event_buffer_manager.consume() {
550        dispatch_events(event_buffer, &source_context).await;
551    }
552
553    debug!("OTLP resource converter task stopped.");
554}