saluki_components/sources/otlp/
mod.rs1use std::sync::Arc;
2use std::sync::LazyLock;
3use std::time::Duration;
4
5use async_trait::async_trait;
6use axum::body::Bytes;
7use bytesize::ByteSize;
8use otlp_protos::opentelemetry::proto::collector::logs::v1::ExportLogsServiceRequest;
9use otlp_protos::opentelemetry::proto::collector::metrics::v1::ExportMetricsServiceRequest;
10use otlp_protos::opentelemetry::proto::collector::trace::v1::ExportTraceServiceRequest;
11use otlp_protos::opentelemetry::proto::logs::v1::ResourceLogs as OtlpResourceLogs;
12use otlp_protos::opentelemetry::proto::metrics::v1::ResourceMetrics as OtlpResourceMetrics;
13use otlp_protos::opentelemetry::proto::trace::v1::ResourceSpans as OtlpResourceSpans;
14use prost::Message;
15use resource_accounting::{MemoryBounds, MemoryBoundsBuilder};
16use saluki_common::sync::shutdown::{ShutdownCoordinator, ShutdownHandle};
17use saluki_common::task::HandleExt as _;
18use saluki_config::GenericConfiguration;
19use saluki_context::ContextResolver;
20use saluki_core::topology::interconnect::BufferedDispatcher;
21use saluki_core::{
22 components::{
23 sources::{Source, SourceBuilder, SourceContext},
24 ComponentContext,
25 },
26 data_model::event::EventType,
27 topology::{EventsBuffer, OutputDefinition},
28};
29use saluki_env::WorkloadProvider;
30use saluki_error::ErrorContext as _;
31use saluki_error::{generic_error, GenericError};
32use saluki_io::net::ListenAddress;
33use serde::Deserialize;
34use tokio::pin;
35use tokio::select;
36use tokio::sync::mpsc;
37use tokio::time::{interval, MissedTickBehavior};
38use tracing::{debug, error};
39
40use crate::common::otlp::config::OtlpConfig;
41use crate::common::otlp::{build_metrics, Metrics, OtlpHandler, OtlpServerBuilder};
42
43mod logs;
44mod metrics;
45mod resolver;
46use self::logs::translator::OtlpLogsTranslator;
47use self::metrics::translator::OtlpMetricsTranslator;
48use self::resolver::build_context_resolver;
49use crate::common::otlp::origin::OtlpOriginTagResolver;
50use crate::common::otlp::traces::translator::OtlpTracesTranslator;
51
52const fn default_context_string_interner_size() -> ByteSize {
53 ByteSize::mib(2)
54}
55
56const fn default_cached_contexts_limit() -> usize {
57 500_000
58}
59
60const fn default_cached_tagsets_limit() -> usize {
61 500_000
62}
63
64const fn default_allow_context_heap_allocations() -> bool {
65 true
66}
67
68#[derive(Deserialize, Default)]
70#[cfg_attr(test, derive(derive_where::DeriveWhere, serde::Serialize))]
71#[cfg_attr(test, derive_where(PartialEq))]
72pub struct OtlpConfiguration {
73 otlp_config: OtlpConfig,
74
75 #[serde(
81 rename = "otlp_string_interner_size",
82 default = "default_context_string_interner_size"
83 )]
84 context_string_interner_bytes: ByteSize,
85
86 #[serde(rename = "otlp_cached_contexts_limit", default = "default_cached_contexts_limit")]
94 cached_contexts_limit: usize,
95
96 #[serde(rename = "otlp_cached_tagsets_limit", default = "default_cached_tagsets_limit")]
104 cached_tagsets_limit: usize,
105
106 #[serde(
116 rename = "otlp_allow_context_heap_allocs",
117 default = "default_allow_context_heap_allocations"
118 )]
119 allow_context_heap_allocations: bool,
120
121 #[serde(skip)]
123 #[cfg_attr(test, derive_where(skip))]
124 workload_provider: Option<Arc<dyn WorkloadProvider + Send + Sync>>,
125}
126
127impl OtlpConfiguration {
128 pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
130 let mut cfg: Self = config.as_typed()?;
131 cfg.otlp_config.traces.apply_env_overrides(config)?;
132 Ok(cfg)
133 }
134
135 pub fn with_workload_provider<W>(mut self, workload_provider: W) -> Self
141 where
142 W: WorkloadProvider + Send + Sync + 'static,
143 {
144 self.workload_provider = Some(Arc::new(workload_provider));
145 self
146 }
147}
148
149#[async_trait]
150impl SourceBuilder for OtlpConfiguration {
151 fn outputs(&self) -> &[OutputDefinition<EventType>] {
152 static OUTPUTS: LazyLock<Vec<OutputDefinition<EventType>>> = LazyLock::new(|| {
153 vec![
154 OutputDefinition::named_output("metrics", EventType::Metric),
155 OutputDefinition::named_output("logs", EventType::Log),
156 OutputDefinition::named_output("traces", EventType::Trace),
157 ]
158 });
159
160 &OUTPUTS
161 }
162
163 async fn build(&self, context: ComponentContext) -> Result<Box<dyn Source + Send>, GenericError> {
164 if !self.otlp_config.metrics.enabled && !self.otlp_config.logs.enabled && !self.otlp_config.traces.enabled {
165 return Err(generic_error!(
166 "OTLP metrics, logs and traces support is disabled. Please enable at least one of them."
167 ));
168 }
169
170 let grpc_listen_str = format!(
171 "{}://{}",
172 self.otlp_config.receiver.protocols.grpc.transport, self.otlp_config.receiver.protocols.grpc.endpoint
173 );
174 let grpc_endpoint = ListenAddress::try_from(grpc_listen_str.as_str())
175 .map_err(|e| generic_error!("Invalid gRPC endpoint address '{}': {}", grpc_listen_str, e))?;
176
177 if !matches!(grpc_endpoint, ListenAddress::Tcp(_)) {
179 return Err(generic_error!("Only 'tcp' transport is supported for OTLP gRPC"));
180 }
181
182 let http_socket_addr = self.otlp_config.receiver.protocols.http.endpoint.parse().map_err(|e| {
183 generic_error!(
184 "Invalid HTTP endpoint address '{}': {}",
185 self.otlp_config.receiver.protocols.http.endpoint,
186 e
187 )
188 })?;
189
190 let maybe_origin_tags_resolver = self.workload_provider.clone().map(OtlpOriginTagResolver::new);
191
192 let context_resolver = build_context_resolver(self, &context, maybe_origin_tags_resolver.clone())?;
193 let metrics_translator_config = metrics::config::OtlpMetricsTranslatorConfig::default()
194 .with_remapping(true)
195 .with_quantiles(true);
196 let traces_interner_size =
197 std::num::NonZeroUsize::new(self.otlp_config.traces.string_interner_bytes.as_u64() as usize)
198 .ok_or_else(|| generic_error!("otlp_config.traces.string_interner_size must be greater than 0"))?;
199 let traces_translator = OtlpTracesTranslator::new(self.otlp_config.traces.clone(), traces_interner_size);
200 let grpc_max_recv_msg_size_bytes =
201 self.otlp_config.receiver.protocols.grpc.max_recv_msg_size_mib as usize * 1024 * 1024;
202 let metrics = build_metrics(&context);
203
204 Ok(Box::new(Otlp {
205 context_resolver,
206 origin_tag_resolver: maybe_origin_tags_resolver,
207 grpc_endpoint,
208 http_endpoint: ListenAddress::Tcp(http_socket_addr),
209 grpc_max_recv_msg_size_bytes,
210 metrics_translator_config,
211 traces_translator,
212 metrics,
213 }))
214 }
215}
216
217impl MemoryBounds for OtlpConfiguration {
218 fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
219 builder
220 .minimum()
221 .with_single_value::<Otlp>("source struct")
222 .with_single_value::<SourceHandler>("source handler");
223 }
224}
225
226pub struct Otlp {
227 context_resolver: ContextResolver,
228 origin_tag_resolver: Option<OtlpOriginTagResolver>,
229 grpc_endpoint: ListenAddress,
230 http_endpoint: ListenAddress,
231 grpc_max_recv_msg_size_bytes: usize,
232 metrics_translator_config: metrics::config::OtlpMetricsTranslatorConfig,
233 traces_translator: OtlpTracesTranslator,
234 metrics: Metrics, }
236
237#[async_trait]
238impl Source for Otlp {
239 async fn run(self: Box<Self>, mut context: SourceContext) -> Result<(), GenericError> {
240 let Self {
241 context_resolver,
242 origin_tag_resolver,
243 grpc_endpoint,
244 http_endpoint,
245 grpc_max_recv_msg_size_bytes,
246 metrics_translator_config,
247 traces_translator,
248 metrics,
249 } = *self;
250
251 let global_shutdown = context.take_shutdown_handle();
252 pin!(global_shutdown);
253
254 let mut health = context.take_health_handle();
255 let memory_limiter = context.topology_context().memory_limiter();
256
257 let (tx, rx) = mpsc::channel::<OtlpResource>(1024);
259
260 let mut converter_shutdown_coordinator = ShutdownCoordinator::default();
261
262 let metrics_translator = OtlpMetricsTranslator::new(metrics_translator_config, context_resolver)?;
263
264 let thread_pool_handle = context.topology_context().global_thread_pool().clone();
265
266 thread_pool_handle.spawn_traced_named(
268 "otlp-resource-converter",
269 run_converter(
270 rx,
271 context.clone(),
272 origin_tag_resolver,
273 converter_shutdown_coordinator.register(),
274 metrics_translator,
275 metrics.clone(),
276 traces_translator,
277 ),
278 );
279
280 let handler = SourceHandler::new(tx);
281 let server_builder = OtlpServerBuilder::new(http_endpoint, grpc_endpoint, grpc_max_recv_msg_size_bytes);
282
283 let (http_shutdown, mut http_error) = server_builder
284 .build(handler, memory_limiter.clone(), thread_pool_handle, metrics)
285 .await?;
286
287 health.mark_ready();
288 debug!("OTLP source started.");
289
290 loop {
292 select! {
293 _ = &mut global_shutdown => {
294 debug!("Received shutdown signal.");
295 break
296 },
297 error = &mut http_error => {
298 if let Some(error) = error {
299 debug!(%error, "HTTP server error.");
300 }
301 break;
302 },
303 _ = health.live() => continue,
304 }
305 }
306
307 debug!("Stopping OTLP source...");
308
309 http_shutdown.shutdown();
310 converter_shutdown_coordinator.shutdown_and_wait().await;
311
312 debug!("OTLP source stopped.");
313
314 Ok(())
315 }
316}
317
318enum OtlpResource {
319 Metrics(OtlpResourceMetrics),
320 Logs(OtlpResourceLogs),
321 Traces(OtlpResourceSpans),
322}
323
324struct SourceHandler {
326 tx: mpsc::Sender<OtlpResource>,
327}
328
329impl SourceHandler {
330 fn new(tx: mpsc::Sender<OtlpResource>) -> Self {
331 Self { tx }
332 }
333}
334
335#[async_trait]
336impl OtlpHandler for SourceHandler {
337 async fn handle_metrics(&self, body: Bytes) -> Result<(), GenericError> {
338 let request =
339 ExportMetricsServiceRequest::decode(body).error_context("Failed to decode metrics export request.")?;
340
341 for resource_metrics in request.resource_metrics {
342 self.tx
343 .send(OtlpResource::Metrics(resource_metrics))
344 .await
345 .error_context("Failed to send resource metrics to converter: channel is closed.")?;
346 }
347 Ok(())
348 }
349
350 async fn handle_logs(&self, body: Bytes) -> Result<(), GenericError> {
351 let request = ExportLogsServiceRequest::decode(body).error_context("Failed to decode logs export request.")?;
352
353 for resource_logs in request.resource_logs {
354 self.tx
355 .send(OtlpResource::Logs(resource_logs))
356 .await
357 .error_context("Failed to send resource logs to converter: channel is closed.")?;
358 }
359 Ok(())
360 }
361
362 async fn handle_traces(&self, body: Bytes) -> Result<(), GenericError> {
363 let request =
364 ExportTraceServiceRequest::decode(body).error_context("Failed to decode trace export request.")?;
365
366 for resource_spans in request.resource_spans {
367 self.tx
368 .send(OtlpResource::Traces(resource_spans))
369 .await
370 .error_context("Failed to send resource spans to converter: channel is closed.")?;
371 }
372 Ok(())
373 }
374}
375
376async fn run_converter(
377 mut receiver: mpsc::Receiver<OtlpResource>, source_context: SourceContext,
378 origin_tag_resolver: Option<OtlpOriginTagResolver>, shutdown_handle: ShutdownHandle,
379 mut metrics_translator: OtlpMetricsTranslator, metrics: Metrics, mut traces_translator: OtlpTracesTranslator,
380) {
381 pin!(shutdown_handle);
382
383 debug!("OTLP resource converter task started.");
384
385 let mut buffer_flush = interval(Duration::from_millis(100));
388 buffer_flush.set_missed_tick_behavior(MissedTickBehavior::Delay);
389
390 let mut metrics_dispatcher: Option<BufferedDispatcher<'_, EventsBuffer>> = None;
391 let mut logs_dispatcher: Option<BufferedDispatcher<'_, EventsBuffer>> = None;
392 let mut traces_dispatcher: Option<BufferedDispatcher<'_, EventsBuffer>> = None;
393
394 loop {
395 select! {
396 Some(otlp_resource) = receiver.recv() => {
397 match otlp_resource {
398 OtlpResource::Metrics(resource_metrics) => {
399 match metrics_translator.translate_metrics(resource_metrics, &metrics) {
400 Ok(events) => {
401 for event in events {
402 let dispatcher = metrics_dispatcher.get_or_insert_with(|| {
403 source_context
404 .dispatcher()
405 .buffered_named("metrics")
406 .expect("metrics output should exist")
407 });
408 if let Err(e) = dispatcher.push(event).await {
409 error!(error = %e, "Failed to dispatch metric event.");
410 }
411 }
412 }
413 Err(e) => {
414 error!(error = %e, "Failed to handle resource metrics.");
415 }
416 }
417 }
418 OtlpResource::Logs(resource_logs) => {
419 let translator = OtlpLogsTranslator::from_resource_logs(resource_logs, origin_tag_resolver.as_ref());
420 for log_event in translator {
421 metrics.logs_received().increment(1);
422
423 let dispatcher = logs_dispatcher.get_or_insert_with(|| {
424 source_context
425 .dispatcher()
426 .buffered_named("logs")
427 .expect("logs output should exist")
428 });
429 if let Err(e) = dispatcher.push(log_event).await {
430 error!(error = %e, "Failed to dispatch log event.");
431 }
432 }
433 }
434 OtlpResource::Traces(resource_spans) => {
435 for trace_event in traces_translator.translate_spans(resource_spans, &metrics) {
436 let dispatcher = traces_dispatcher.get_or_insert_with(|| {
437 source_context
438 .dispatcher()
439 .buffered_named("traces")
440 .expect("traces output should exist")
441 });
442 if let Err(e) = dispatcher.push(trace_event).await {
443 error!(error = %e, "Failed to dispatch trace event.");
444 }
445 }
446 }
447 }
448 },
449 _ = buffer_flush.tick() => {
450 if let Some(dispatcher) = metrics_dispatcher.take() {
451 if let Err(e) = dispatcher.flush().await {
452 error!(error = %e, "Failed to flush metric events.");
453 }
454 }
455 if let Some(dispatcher) = logs_dispatcher.take() {
456 if let Err(e) = dispatcher.flush().await {
457 error!(error = %e, "Failed to flush log events.");
458 }
459 }
460 if let Some(dispatcher) = traces_dispatcher.take() {
461 if let Err(e) = dispatcher.flush().await {
462 error!(error = %e, "Failed to flush trace events.");
463 }
464 }
465 },
466 _ = &mut shutdown_handle => {
467 debug!("Converter task received shutdown signal.");
468 break;
469 }
470 }
471 }
472
473 if let Some(dispatcher) = metrics_dispatcher.take() {
474 if let Err(e) = dispatcher.flush().await {
475 error!(error = %e, "Failed to flush metric events.");
476 }
477 }
478 if let Some(dispatcher) = logs_dispatcher.take() {
479 if let Err(e) = dispatcher.flush().await {
480 error!(error = %e, "Failed to flush log events.");
481 }
482 }
483 if let Some(dispatcher) = traces_dispatcher.take() {
484 if let Err(e) = dispatcher.flush().await {
485 error!(error = %e, "Failed to flush trace events.");
486 }
487 }
488
489 debug!("OTLP resource converter task stopped.");
490}
491
492#[cfg(test)]
493mod config_smoke {
494 use datadog_agent_config_testing::config_registry::structs;
495 use datadog_agent_config_testing::run_config_smoke_tests;
496 use serde_json::json;
497
498 use super::OtlpConfiguration;
499 use crate::config::{DatadogRemapper, KEY_ALIASES};
500
501 #[tokio::test]
502 async fn smoke_test() {
503 run_config_smoke_tests(
504 structs::OTLP_CONFIGURATION,
505 &[],
506 json!({ "otlp_config": {} }),
507 |cfg| OtlpConfiguration::from_configuration(&cfg).expect("OtlpConfiguration should deserialize"),
508 KEY_ALIASES,
509 DatadogRemapper::new,
510 )
511 .await
512 }
513}