saluki_components/sources/otlp/
mod.rs

1use std::sync::Arc;
2use std::sync::LazyLock;
3use std::time::Duration;
4
5use ::metrics::Counter;
6use async_trait::async_trait;
7use axum::body::Bytes;
8use axum::extract::State;
9use axum::http::StatusCode;
10use axum::routing::post;
11use axum::Router;
12use bytesize::ByteSize;
13use memory_accounting::{MemoryBounds, MemoryBoundsBuilder, MemoryLimiter};
14use otlp_protos::opentelemetry::proto::collector::logs::v1::logs_service_server::{LogsService, LogsServiceServer};
15use otlp_protos::opentelemetry::proto::collector::logs::v1::{ExportLogsServiceRequest, ExportLogsServiceResponse};
16use otlp_protos::opentelemetry::proto::collector::metrics::v1::metrics_service_server::{
17    MetricsService, MetricsServiceServer,
18};
19use otlp_protos::opentelemetry::proto::collector::metrics::v1::{
20    ExportMetricsServiceRequest, ExportMetricsServiceResponse,
21};
22use otlp_protos::opentelemetry::proto::logs::v1::ResourceLogs as OtlpResourceLogs;
23use otlp_protos::opentelemetry::proto::metrics::v1::ResourceMetrics as OtlpResourceMetrics;
24use prost::Message;
25use saluki_common::task::HandleExt as _;
26use saluki_config::GenericConfiguration;
27use saluki_context::ContextResolver;
28use saluki_core::observability::ComponentMetricsExt;
29use saluki_core::topology::interconnect::EventBufferManager;
30use saluki_core::topology::shutdown::{DynamicShutdownCoordinator, DynamicShutdownHandle};
31use saluki_core::{
32    components::{
33        sources::{Source, SourceBuilder, SourceContext},
34        ComponentContext,
35    },
36    data_model::event::{Event, EventType},
37    topology::{EventsBuffer, OutputDefinition},
38};
39use saluki_env::WorkloadProvider;
40use saluki_error::{generic_error, GenericError};
41use saluki_io::net::listener::ConnectionOrientedListener;
42use saluki_io::net::server::http::HttpServer;
43use saluki_io::net::util::hyper::TowerToHyperService;
44use saluki_io::net::ListenAddress;
45use saluki_metrics::MetricsBuilder;
46use serde::Deserialize;
47use tokio::select;
48use tokio::sync::mpsc;
49use tokio::time::{interval, MissedTickBehavior};
50use tonic::transport::Server;
51use tonic::{Request, Response, Status};
52use tracing::{debug, error};
53
54mod attributes;
55mod logs;
56mod metrics;
57mod origin;
58mod resolver;
59use self::logs::translator::OtlpLogsTranslator;
60use self::metrics::translator::OtlpMetricsTranslator;
61use self::origin::OtlpOriginTagResolver;
62use self::resolver::build_context_resolver;
63
64const fn default_context_string_interner_size() -> ByteSize {
65    ByteSize::mib(2)
66}
67
68const fn default_cached_contexts_limit() -> usize {
69    500_000
70}
71
72const fn default_cached_tagsets_limit() -> usize {
73    500_000
74}
75
76const fn default_allow_context_heap_allocations() -> bool {
77    true
78}
79
80/// Configuration for the OTLP source.
81#[derive(Deserialize, Default)]
82pub struct OtlpConfiguration {
83    otlp_config: OtlpConfig,
84
85    /// Total size of the string interner used for contexts.
86    ///
87    /// This controls the amount of memory that can be used to intern metric names and tags. If the interner is full,
88    /// metrics with contexts that have not already been resolved may or may not be dropped, depending on the value of
89    /// `allow_context_heap_allocations`.
90    #[serde(
91        rename = "otlp_string_interner_size",
92        default = "default_context_string_interner_size"
93    )]
94    context_string_interner_bytes: ByteSize,
95
96    /// The maximum number of cached contexts to allow.
97    ///
98    /// This is the maximum number of resolved contexts that can be cached at any given time. This limit does not affect
99    /// the total number of contexts that can be _alive_ at any given time, which is dependent on the interner capacity
100    /// and whether or not heap allocations are allowed.
101    ///
102    /// Defaults to 500,000.
103    #[serde(rename = "otlp_cached_contexts_limit", default = "default_cached_contexts_limit")]
104    cached_contexts_limit: usize,
105
106    /// The maximum number of cached tagsets to allow.
107    ///
108    /// This is the maximum number of resolved tagsets that can be cached at any given time. This limit does not affect
109    /// the total number of tagsets that can be _alive_ at any given time, which is dependent on the interner capacity
110    /// and whether or not heap allocations are allowed.
111    ///
112    /// Defaults to 500,000.
113    #[serde(rename = "otlp_cached_tagsets_limit", default = "default_cached_tagsets_limit")]
114    cached_tagsets_limit: usize,
115
116    /// Whether or not to allow heap allocations when resolving contexts.
117    ///
118    /// When resolving contexts during parsing, the metric name and tags are interned to reduce memory usage. The
119    /// interner has a fixed size, however, which means some strings can fail to be interned if the interner is full.
120    /// When set to `true`, we allow these strings to be allocated on the heap like normal, but this can lead to
121    /// increased (unbounded) memory usage. When set to `false`, if the metric name and all of its tags cannot be
122    /// interned, the metric is skipped.
123    ///
124    /// Defaults to `true`.
125    #[serde(
126        rename = "otlp_allow_context_heap_allocs",
127        default = "default_allow_context_heap_allocations"
128    )]
129    allow_context_heap_allocations: bool,
130
131    /// Workload provider to utilize for origin detection/enrichment.
132    #[serde(skip)]
133    workload_provider: Option<Arc<dyn WorkloadProvider + Send + Sync>>,
134}
135
136#[derive(Deserialize, Debug, Default)]
137pub struct OtlpConfig {
138    #[serde(default)]
139    receiver: Receiver,
140    #[serde(default)]
141    metrics: MetricsConfig,
142    #[serde(default)]
143    logs: LogsConfig,
144}
145
146#[derive(Deserialize, Debug, Default)]
147pub struct Receiver {
148    #[serde(default)]
149    protocols: Protocols,
150}
151
152#[derive(Deserialize, Debug, Default)]
153struct Protocols {
154    #[serde(default)]
155    grpc: GrpcConfig,
156
157    #[serde(default)]
158    http: HttpConfig,
159}
160
161#[derive(Deserialize, Debug)]
162pub struct GrpcConfig {
163    /// The endpoint to bind the OTLP/gRPC server to.
164    ///
165    /// Defaults to 0.0.0.0:4317.
166    #[serde(default = "default_grpc_endpoint")]
167    pub endpoint: String,
168
169    /// The transport to use for the OTLP/gRPC server.
170    ///
171    /// Defaults to "tcp".
172    #[serde(default = "default_transport")]
173    pub transport: String,
174
175    /// The maximum size (in MiB) of messages accepted by the OTLP/gRPC endpoint.
176    ///
177    /// Defaults to 4 MiB.
178    #[serde(default = "default_max_recv_msg_size_mib")]
179    #[serde(rename = "max_recv_msg_size_mib")]
180    pub max_recv_msg_size_mib: u64,
181}
182
183#[derive(Deserialize, Debug)]
184pub struct HttpConfig {
185    /// The OTLP/HTTP listener endpoint.
186    ///
187    /// Defaults to 0.0.0.0:4318.
188    #[serde(default = "default_http_endpoint")]
189    pub endpoint: String,
190}
191
192impl Default for GrpcConfig {
193    fn default() -> Self {
194        Self {
195            endpoint: default_grpc_endpoint(),
196            transport: default_transport(),
197            max_recv_msg_size_mib: default_max_recv_msg_size_mib(),
198        }
199    }
200}
201
202impl Default for HttpConfig {
203    fn default() -> Self {
204        Self {
205            endpoint: default_http_endpoint(),
206        }
207    }
208}
209
210fn default_grpc_endpoint() -> String {
211    "0.0.0.0:4317".to_string()
212}
213
214fn default_http_endpoint() -> String {
215    "0.0.0.0:4318".to_string()
216}
217
218fn default_transport() -> String {
219    "tcp".to_string()
220}
221
222fn default_max_recv_msg_size_mib() -> u64 {
223    4
224}
225
226enum OtlpResource {
227    Metrics(OtlpResourceMetrics),
228    Logs(OtlpResourceLogs),
229}
230
231#[derive(Deserialize, Debug)]
232pub struct LogsConfig {
233    /// Whether to enable OTLP logs support.
234    ///
235    /// Defaults to true.
236    #[serde(default = "default_logs_enabled")]
237    pub enabled: bool,
238}
239
240fn default_logs_enabled() -> bool {
241    true
242}
243
244impl Default for LogsConfig {
245    fn default() -> Self {
246        Self {
247            enabled: default_logs_enabled(),
248        }
249    }
250}
251
252#[derive(Deserialize, Debug)]
253pub struct MetricsConfig {
254    /// Whether to enable OTLP metrics support.
255    ///
256    /// Defaults to true.
257    #[serde(default = "default_metrics_enabled")]
258    pub enabled: bool,
259}
260
261fn default_metrics_enabled() -> bool {
262    true
263}
264
265impl Default for MetricsConfig {
266    fn default() -> Self {
267        Self {
268            enabled: default_metrics_enabled(),
269        }
270    }
271}
272
273impl OtlpConfiguration {
274    /// Creates a new `OTLPConfiguration` from the given configuration.
275    pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
276        Ok(config.as_typed()?)
277    }
278
279    /// Sets the workload provider to use for configuring origin detection/enrichment.
280    ///
281    /// A workload provider must be set otherwise origin detection/enrichment will not be enabled.
282    ///
283    /// Defaults to unset.
284    pub fn with_workload_provider<W>(mut self, workload_provider: W) -> Self
285    where
286        W: WorkloadProvider + Send + Sync + 'static,
287    {
288        self.workload_provider = Some(Arc::new(workload_provider));
289        self
290    }
291}
292
293pub struct Metrics {
294    metrics_received: Counter,
295    logs_received: Counter,
296}
297
298impl Metrics {
299    fn metrics_received(&self) -> &Counter {
300        &self.metrics_received
301    }
302
303    fn logs_received(&self) -> &Counter {
304        &self.logs_received
305    }
306
307    /// Test-only helper to construct a `Metrics` instance.
308    #[cfg(test)]
309    pub fn for_tests() -> Self {
310        Metrics {
311            metrics_received: Counter::noop(),
312            logs_received: Counter::noop(),
313        }
314    }
315}
316
317fn build_metrics(component_context: &ComponentContext) -> Metrics {
318    let builder = MetricsBuilder::from_component_context(component_context);
319
320    Metrics {
321        metrics_received: builder
322            .register_debug_counter_with_tags("component_events_received_total", [("message_type", "otlp_metrics")]),
323        logs_received: builder
324            .register_debug_counter_with_tags("component_events_received_total", [("message_type", "otlp_logs")]),
325    }
326}
327
328#[async_trait]
329impl SourceBuilder for OtlpConfiguration {
330    fn outputs(&self) -> &[OutputDefinition] {
331        static OUTPUTS: LazyLock<Vec<OutputDefinition>> = LazyLock::new(|| {
332            vec![
333                OutputDefinition::named_output("metrics", EventType::Metric),
334                OutputDefinition::named_output("logs", EventType::Log),
335            ]
336        });
337
338        &OUTPUTS
339    }
340
341    async fn build(&self, context: ComponentContext) -> Result<Box<dyn Source + Send>, GenericError> {
342        if !self.otlp_config.metrics.enabled && !self.otlp_config.logs.enabled {
343            return Err(generic_error!(
344                "OTLP metrics and logs support is disabled. Please enable at least one of them."
345            ));
346        }
347
348        let grpc_listen_str = format!(
349            "{}://{}",
350            self.otlp_config.receiver.protocols.grpc.transport, self.otlp_config.receiver.protocols.grpc.endpoint
351        );
352        let grpc_endpoint = ListenAddress::try_from(grpc_listen_str.as_str())
353            .map_err(|e| generic_error!("Invalid gRPC endpoint address '{}': {}", grpc_listen_str, e))?;
354
355        // Enforce the current limitation that we only support TCP for gRPC.
356        if !matches!(grpc_endpoint, ListenAddress::Tcp(_)) {
357            return Err(generic_error!("Only 'tcp' transport is supported for OTLP gRPC"));
358        }
359
360        let http_socket_addr = self.otlp_config.receiver.protocols.http.endpoint.parse().map_err(|e| {
361            generic_error!(
362                "Invalid HTTP endpoint address '{}': {}",
363                self.otlp_config.receiver.protocols.http.endpoint,
364                e
365            )
366        })?;
367
368        let maybe_origin_tags_resolver = self.workload_provider.clone().map(OtlpOriginTagResolver::new);
369
370        let context_resolver = build_context_resolver(self, &context, maybe_origin_tags_resolver.clone())?;
371        let translator_config = metrics::config::OtlpTranslatorConfig::default().with_remapping(true);
372        let grpc_max_recv_msg_size_bytes =
373            self.otlp_config.receiver.protocols.grpc.max_recv_msg_size_mib as usize * 1024 * 1024;
374        let metrics = build_metrics(&context);
375
376        Ok(Box::new(Otlp {
377            context_resolver,
378            origin_tag_resolver: maybe_origin_tags_resolver,
379            grpc_endpoint,
380            http_endpoint: ListenAddress::Tcp(http_socket_addr),
381            grpc_max_recv_msg_size_bytes,
382            translator_config,
383            metrics,
384        }))
385    }
386}
387
388impl MemoryBounds for OtlpConfiguration {
389    fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
390        builder
391            .minimum()
392            .with_single_value::<Otlp>("source struct")
393            .with_single_value::<GrpcService>("gRPC service");
394    }
395}
396
397pub struct Otlp {
398    context_resolver: ContextResolver,
399    origin_tag_resolver: Option<OtlpOriginTagResolver>,
400    grpc_endpoint: ListenAddress,
401    http_endpoint: ListenAddress,
402    grpc_max_recv_msg_size_bytes: usize,
403    translator_config: metrics::config::OtlpTranslatorConfig,
404    metrics: Metrics, // Telemetry metrics, not DD native metrics.
405}
406
407#[async_trait]
408impl Source for Otlp {
409    async fn run(self: Box<Self>, mut context: SourceContext) -> Result<(), GenericError> {
410        let mut global_shutdown = context.take_shutdown_handle();
411        let mut health = context.take_health_handle();
412        let memory_limiter = context.topology_context().memory_limiter();
413
414        // Create the internal channel for decoupling the servers from the converter.
415        let (tx, rx) = mpsc::channel::<OtlpResource>(1024);
416
417        let mut converter_shutdown_coordinator = DynamicShutdownCoordinator::default();
418
419        let metrics_translator = OtlpMetricsTranslator::new(self.translator_config, self.context_resolver);
420
421        let thread_pool_handle = context.topology_context().global_thread_pool().clone();
422
423        // Spawn the converter task. This task is shared by both servers.
424        thread_pool_handle.spawn_traced_named(
425            "otlp-resource-converter",
426            run_converter(
427                rx,
428                context.clone(),
429                self.origin_tag_resolver,
430                converter_shutdown_coordinator.register(),
431                metrics_translator,
432                self.metrics,
433            ),
434        );
435
436        // Create and spawn the gRPC server.
437        let grpc_metrics_server = MetricsServiceServer::new(GrpcService::new(tx.clone(), memory_limiter.clone()))
438            .max_decoding_message_size(self.grpc_max_recv_msg_size_bytes);
439        let grpc_logs_server = LogsServiceServer::new(GrpcService::new(tx.clone(), memory_limiter.clone()))
440            .max_decoding_message_size(self.grpc_max_recv_msg_size_bytes);
441        let grpc_server = Server::builder()
442            .add_service(grpc_metrics_server)
443            .add_service(grpc_logs_server);
444
445        let grpc_socket_addr = self
446            .grpc_endpoint
447            .as_local_connect_addr()
448            .ok_or_else(|| generic_error!("OTLP gRPC endpoint is not a local TCP address."))?;
449        thread_pool_handle.spawn_traced_named("otlp-grpc-server", grpc_server.serve(grpc_socket_addr));
450        debug!(endpoint = %self.grpc_endpoint, "OTLP gRPC server started.");
451
452        // Create and spawn the HTTP server.
453        let service = TowerToHyperService::new(
454            Router::new()
455                .route("/v1/metrics", post(http_metric_handler))
456                .route("/v1/logs", post(http_logs_handler))
457                .with_state((tx, memory_limiter.clone())),
458        );
459        let http_listener = ConnectionOrientedListener::from_listen_address(self.http_endpoint)
460            .await
461            .map_err(|e| generic_error!("Failed to create OTLP HTTP listener: {}", e))?;
462        let http_server = HttpServer::from_listener(http_listener, service);
463        let (http_shutdown, mut http_error) = http_server.listen();
464
465        health.mark_ready();
466        debug!("OTLP source started.");
467
468        // Wait for the global shutdown signal, then notify converter to shutdown.
469        loop {
470            select! {
471                _ = &mut global_shutdown => {
472                    debug!("Received shutdown signal.");
473                    break
474                },
475                error = &mut http_error => {
476                    if let Some(error) = error {
477                        debug!(%error, "HTTP server error.");
478                    }
479                    break;
480                },
481                _ = health.live() => continue,
482            }
483        }
484
485        debug!("Stopping OTLP source...");
486
487        http_shutdown.shutdown();
488        converter_shutdown_coordinator.shutdown().await;
489
490        debug!("OTLP source stopped.");
491
492        Ok(())
493    }
494}
495
496async fn http_logs_handler(
497    State((tx, memory_limiter)): State<(mpsc::Sender<OtlpResource>, MemoryLimiter)>, body: Bytes,
498) -> (StatusCode, &'static str) {
499    memory_limiter.wait_for_capacity().await;
500    match ExportLogsServiceRequest::decode(body) {
501        Ok(request) => {
502            for resource_logs in request.resource_logs {
503                if tx.send(OtlpResource::Logs(resource_logs)).await.is_err() {
504                    error!("Failed to send resource logs to converter; channel is closed.");
505                    return (StatusCode::INTERNAL_SERVER_ERROR, "Internal processing channel closed.");
506                }
507            }
508            (StatusCode::OK, "OK")
509        }
510        Err(e) => {
511            error!(error = %e, "Failed to decode OTLP protobuf request.");
512            (StatusCode::BAD_REQUEST, "Bad Request: Invalid protobuf.")
513        }
514    }
515}
516
517async fn http_metric_handler(
518    State((tx, memory_limiter)): State<(mpsc::Sender<OtlpResource>, MemoryLimiter)>, body: Bytes,
519) -> (StatusCode, &'static str) {
520    memory_limiter.wait_for_capacity().await;
521    match ExportMetricsServiceRequest::decode(body) {
522        Ok(request) => {
523            for resource_metrics in request.resource_metrics {
524                if tx.send(OtlpResource::Metrics(resource_metrics)).await.is_err() {
525                    error!("Failed to send resource metrics to converter; channel is closed.");
526                    return (StatusCode::INTERNAL_SERVER_ERROR, "Internal processing channel closed.");
527                }
528            }
529            (StatusCode::OK, "OK")
530        }
531        Err(e) => {
532            error!(error = %e, "Failed to decode OTLP protobuf request.");
533            (StatusCode::BAD_REQUEST, "Bad Request: Invalid protobuf.")
534        }
535    }
536}
537
538struct GrpcService {
539    sender: mpsc::Sender<OtlpResource>,
540    memory_limiter: MemoryLimiter,
541}
542
543impl GrpcService {
544    fn new(sender: mpsc::Sender<OtlpResource>, memory_limiter: MemoryLimiter) -> Self {
545        Self { sender, memory_limiter }
546    }
547}
548
549#[async_trait]
550impl MetricsService for GrpcService {
551    async fn export(
552        &self, request: Request<ExportMetricsServiceRequest>,
553    ) -> Result<Response<ExportMetricsServiceResponse>, Status> {
554        self.memory_limiter.wait_for_capacity().await;
555        let request = request.into_inner();
556
557        for resource_metrics in request.resource_metrics {
558            if self.sender.send(OtlpResource::Metrics(resource_metrics)).await.is_err() {
559                error!("Failed to send resource metrics to converter; channel is closed.");
560                return Err(Status::internal("Internal processing channel closed."));
561            }
562        }
563
564        Ok(Response::new(ExportMetricsServiceResponse { partial_success: None }))
565    }
566}
567
568#[async_trait]
569impl LogsService for GrpcService {
570    async fn export(
571        &self, request: Request<ExportLogsServiceRequest>,
572    ) -> Result<Response<ExportLogsServiceResponse>, Status> {
573        self.memory_limiter.wait_for_capacity().await;
574        let request = request.into_inner();
575
576        for resource_logs in request.resource_logs {
577            if self.sender.send(OtlpResource::Logs(resource_logs)).await.is_err() {
578                error!("Failed to send resource logs to converter; channel is closed.");
579                return Err(Status::internal("Internal processing channel closed."));
580            }
581        }
582
583        Ok(Response::new(ExportLogsServiceResponse { partial_success: None }))
584    }
585}
586
587async fn dispatch_events(mut events: EventsBuffer, source_context: &SourceContext) {
588    if events.is_empty() {
589        return;
590    }
591
592    if events.has_event_type(EventType::Log) {
593        let mut buffered_dispatcher = source_context
594            .dispatcher()
595            .buffered_named("logs")
596            .expect("logs output should exist");
597
598        for log_event in events.extract(Event::is_log) {
599            if let Err(e) = buffered_dispatcher.push(log_event).await {
600                error!(error = %e, "Failed to dispatch log(s).");
601            }
602        }
603
604        if let Err(e) = buffered_dispatcher.flush().await {
605            error!(error = %e, "Failed to flush log(s).");
606        }
607    }
608
609    let len = events.len();
610    if let Err(e) = source_context.dispatcher().dispatch_named("metrics", events).await {
611        error!(error = %e, "Failed to dispatch metric events.");
612    } else {
613        debug!(events_len = len, "Dispatched metric events.");
614    }
615}
616
617async fn run_converter(
618    mut receiver: mpsc::Receiver<OtlpResource>, source_context: SourceContext,
619    origin_tag_resolver: Option<OtlpOriginTagResolver>, shutdown_handle: DynamicShutdownHandle,
620    mut metrics_translator: OtlpMetricsTranslator, metrics: Metrics,
621) {
622    tokio::pin!(shutdown_handle);
623    debug!("OTLP resource converter task started.");
624
625    // Set a buffer flush interval of 100ms, which will ensure we always flush buffered events at least every 100ms if
626    // we're otherwise idle and not receiving packets from the client.
627    let mut buffer_flush = interval(Duration::from_millis(100));
628    buffer_flush.set_missed_tick_behavior(MissedTickBehavior::Delay);
629
630    let mut event_buffer_manager = EventBufferManager::default();
631
632    loop {
633        select! {
634            Some(otlp_resource) = receiver.recv() => {
635                match otlp_resource {
636                    OtlpResource::Metrics(resource_metrics) => {
637                        match metrics_translator.map_metrics(resource_metrics, &metrics) {
638                            Ok(events) => {
639                                for event in events {
640                                    if let Some(event_buffer) = event_buffer_manager.try_push(event) {
641                                        dispatch_events(event_buffer, &source_context).await;
642                                    }
643                                }
644                            }
645                            Err(e) => {
646                                error!(error = %e, "Failed to handle resource metrics.");
647                            }
648                        }
649                    }
650                    OtlpResource::Logs(resource_logs) => {
651                        let translator = OtlpLogsTranslator::from_resource_logs(resource_logs, origin_tag_resolver.as_ref());
652                        for log_event in translator {
653                            metrics.logs_received().increment(1);
654
655                            if let Some(event_buffer) = event_buffer_manager.try_push(log_event) {
656                                dispatch_events(event_buffer, &source_context).await;
657                            }
658                        }
659                    }
660                }
661            },
662            _ = buffer_flush.tick() => {
663                if let Some(event_buffer) = event_buffer_manager.consume() {
664                    dispatch_events(event_buffer, &source_context).await;
665                }
666            },
667            _ = &mut shutdown_handle => {
668                debug!("Converter task received shutdown signal.");
669                break;
670            }
671        }
672    }
673
674    if let Some(event_buffer) = event_buffer_manager.consume() {
675        dispatch_events(event_buffer, &source_context).await;
676    }
677
678    debug!("OTLP resource converter task stopped.");
679}