saluki_components/sources/otlp/
mod.rs1use std::sync::Arc;
2use std::sync::LazyLock;
3use std::time::Duration;
4
5use async_trait::async_trait;
6use axum::body::Bytes;
7use bytesize::ByteSize;
8use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
9use otlp_protos::opentelemetry::proto::collector::logs::v1::ExportLogsServiceRequest;
10use otlp_protos::opentelemetry::proto::collector::metrics::v1::ExportMetricsServiceRequest;
11use otlp_protos::opentelemetry::proto::collector::trace::v1::ExportTraceServiceRequest;
12use otlp_protos::opentelemetry::proto::logs::v1::ResourceLogs as OtlpResourceLogs;
13use otlp_protos::opentelemetry::proto::metrics::v1::ResourceMetrics as OtlpResourceMetrics;
14use otlp_protos::opentelemetry::proto::trace::v1::ResourceSpans as OtlpResourceSpans;
15use prost::Message;
16use saluki_common::task::HandleExt as _;
17use saluki_config::GenericConfiguration;
18use saluki_context::ContextResolver;
19use saluki_core::topology::interconnect::BufferedDispatcher;
20use saluki_core::topology::shutdown::{DynamicShutdownCoordinator, DynamicShutdownHandle};
21use saluki_core::{
22 components::{
23 sources::{Source, SourceBuilder, SourceContext},
24 ComponentContext,
25 },
26 data_model::event::EventType,
27 topology::{EventsBuffer, OutputDefinition},
28};
29use saluki_env::WorkloadProvider;
30use saluki_error::ErrorContext as _;
31use saluki_error::{generic_error, GenericError};
32use saluki_io::net::ListenAddress;
33use serde::Deserialize;
34use tokio::select;
35use tokio::sync::mpsc;
36use tokio::time::{interval, MissedTickBehavior};
37use tracing::{debug, error};
38
39use crate::common::otlp::config::OtlpConfig;
40use crate::common::otlp::{build_metrics, Metrics, OtlpHandler, OtlpServerBuilder};
41
42mod logs;
43mod metrics;
44mod resolver;
45use self::logs::translator::OtlpLogsTranslator;
46use self::metrics::translator::OtlpMetricsTranslator;
47use self::resolver::build_context_resolver;
48use crate::common::otlp::origin::OtlpOriginTagResolver;
49use crate::common::otlp::traces::translator::OtlpTracesTranslator;
50
51const fn default_context_string_interner_size() -> ByteSize {
52 ByteSize::mib(2)
53}
54
55const fn default_cached_contexts_limit() -> usize {
56 500_000
57}
58
59const fn default_cached_tagsets_limit() -> usize {
60 500_000
61}
62
63const fn default_allow_context_heap_allocations() -> bool {
64 true
65}
66
67#[derive(Deserialize, Default)]
69#[cfg_attr(test, derive(derive_where::DeriveWhere, serde::Serialize))]
70#[cfg_attr(test, derive_where(PartialEq))]
71pub struct OtlpConfiguration {
72 otlp_config: OtlpConfig,
73
74 #[serde(
80 rename = "otlp_string_interner_size",
81 default = "default_context_string_interner_size"
82 )]
83 context_string_interner_bytes: ByteSize,
84
85 #[serde(rename = "otlp_cached_contexts_limit", default = "default_cached_contexts_limit")]
93 cached_contexts_limit: usize,
94
95 #[serde(rename = "otlp_cached_tagsets_limit", default = "default_cached_tagsets_limit")]
103 cached_tagsets_limit: usize,
104
105 #[serde(
115 rename = "otlp_allow_context_heap_allocs",
116 default = "default_allow_context_heap_allocations"
117 )]
118 allow_context_heap_allocations: bool,
119
120 #[serde(skip)]
122 #[cfg_attr(test, derive_where(skip))]
123 workload_provider: Option<Arc<dyn WorkloadProvider + Send + Sync>>,
124}
125
126impl OtlpConfiguration {
127 pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
129 let mut cfg: Self = config.as_typed()?;
130 cfg.otlp_config.traces.apply_env_overrides(config)?;
131 Ok(cfg)
132 }
133
134 pub fn with_workload_provider<W>(mut self, workload_provider: W) -> Self
140 where
141 W: WorkloadProvider + Send + Sync + 'static,
142 {
143 self.workload_provider = Some(Arc::new(workload_provider));
144 self
145 }
146}
147
148#[async_trait]
149impl SourceBuilder for OtlpConfiguration {
150 fn outputs(&self) -> &[OutputDefinition<EventType>] {
151 static OUTPUTS: LazyLock<Vec<OutputDefinition<EventType>>> = LazyLock::new(|| {
152 vec![
153 OutputDefinition::named_output("metrics", EventType::Metric),
154 OutputDefinition::named_output("logs", EventType::Log),
155 OutputDefinition::named_output("traces", EventType::Trace),
156 ]
157 });
158
159 &OUTPUTS
160 }
161
162 async fn build(&self, context: ComponentContext) -> Result<Box<dyn Source + Send>, GenericError> {
163 if !self.otlp_config.metrics.enabled && !self.otlp_config.logs.enabled && !self.otlp_config.traces.enabled {
164 return Err(generic_error!(
165 "OTLP metrics, logs and traces support is disabled. Please enable at least one of them."
166 ));
167 }
168
169 let grpc_listen_str = format!(
170 "{}://{}",
171 self.otlp_config.receiver.protocols.grpc.transport, self.otlp_config.receiver.protocols.grpc.endpoint
172 );
173 let grpc_endpoint = ListenAddress::try_from(grpc_listen_str.as_str())
174 .map_err(|e| generic_error!("Invalid gRPC endpoint address '{}': {}", grpc_listen_str, e))?;
175
176 if !matches!(grpc_endpoint, ListenAddress::Tcp(_)) {
178 return Err(generic_error!("Only 'tcp' transport is supported for OTLP gRPC"));
179 }
180
181 let http_socket_addr = self.otlp_config.receiver.protocols.http.endpoint.parse().map_err(|e| {
182 generic_error!(
183 "Invalid HTTP endpoint address '{}': {}",
184 self.otlp_config.receiver.protocols.http.endpoint,
185 e
186 )
187 })?;
188
189 let maybe_origin_tags_resolver = self.workload_provider.clone().map(OtlpOriginTagResolver::new);
190
191 let context_resolver = build_context_resolver(self, &context, maybe_origin_tags_resolver.clone())?;
192 let metrics_translator_config = metrics::config::OtlpMetricsTranslatorConfig::default()
193 .with_remapping(true)
194 .with_quantiles(true);
195 let traces_interner_size =
196 std::num::NonZeroUsize::new(self.otlp_config.traces.string_interner_bytes.as_u64() as usize)
197 .ok_or_else(|| generic_error!("otlp_config.traces.string_interner_size must be greater than 0"))?;
198 let traces_translator = OtlpTracesTranslator::new(self.otlp_config.traces.clone(), traces_interner_size);
199 let grpc_max_recv_msg_size_bytes =
200 self.otlp_config.receiver.protocols.grpc.max_recv_msg_size_mib as usize * 1024 * 1024;
201 let metrics = build_metrics(&context);
202
203 Ok(Box::new(Otlp {
204 context_resolver,
205 origin_tag_resolver: maybe_origin_tags_resolver,
206 grpc_endpoint,
207 http_endpoint: ListenAddress::Tcp(http_socket_addr),
208 grpc_max_recv_msg_size_bytes,
209 metrics_translator_config,
210 traces_translator,
211 metrics,
212 }))
213 }
214}
215
216impl MemoryBounds for OtlpConfiguration {
217 fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
218 builder
219 .minimum()
220 .with_single_value::<Otlp>("source struct")
221 .with_single_value::<SourceHandler>("source handler");
222 }
223}
224
225pub struct Otlp {
226 context_resolver: ContextResolver,
227 origin_tag_resolver: Option<OtlpOriginTagResolver>,
228 grpc_endpoint: ListenAddress,
229 http_endpoint: ListenAddress,
230 grpc_max_recv_msg_size_bytes: usize,
231 metrics_translator_config: metrics::config::OtlpMetricsTranslatorConfig,
232 traces_translator: OtlpTracesTranslator,
233 metrics: Metrics, }
235
236#[async_trait]
237impl Source for Otlp {
238 async fn run(self: Box<Self>, mut context: SourceContext) -> Result<(), GenericError> {
239 let Self {
240 context_resolver,
241 origin_tag_resolver,
242 grpc_endpoint,
243 http_endpoint,
244 grpc_max_recv_msg_size_bytes,
245 metrics_translator_config,
246 traces_translator,
247 metrics,
248 } = *self;
249
250 let mut global_shutdown = context.take_shutdown_handle();
251 let mut health = context.take_health_handle();
252 let memory_limiter = context.topology_context().memory_limiter();
253
254 let (tx, rx) = mpsc::channel::<OtlpResource>(1024);
256
257 let mut converter_shutdown_coordinator = DynamicShutdownCoordinator::default();
258
259 let metrics_translator = OtlpMetricsTranslator::new(metrics_translator_config, context_resolver)?;
260
261 let thread_pool_handle = context.topology_context().global_thread_pool().clone();
262
263 thread_pool_handle.spawn_traced_named(
265 "otlp-resource-converter",
266 run_converter(
267 rx,
268 context.clone(),
269 origin_tag_resolver,
270 converter_shutdown_coordinator.register(),
271 metrics_translator,
272 metrics.clone(),
273 traces_translator,
274 ),
275 );
276
277 let handler = SourceHandler::new(tx);
278 let server_builder = OtlpServerBuilder::new(http_endpoint, grpc_endpoint, grpc_max_recv_msg_size_bytes);
279
280 let (http_shutdown, mut http_error) = server_builder
281 .build(handler, memory_limiter.clone(), thread_pool_handle, metrics)
282 .await?;
283
284 health.mark_ready();
285 debug!("OTLP source started.");
286
287 loop {
289 select! {
290 _ = &mut global_shutdown => {
291 debug!("Received shutdown signal.");
292 break
293 },
294 error = &mut http_error => {
295 if let Some(error) = error {
296 debug!(%error, "HTTP server error.");
297 }
298 break;
299 },
300 _ = health.live() => continue,
301 }
302 }
303
304 debug!("Stopping OTLP source...");
305
306 http_shutdown.shutdown();
307 converter_shutdown_coordinator.shutdown().await;
308
309 debug!("OTLP source stopped.");
310
311 Ok(())
312 }
313}
314
315enum OtlpResource {
316 Metrics(OtlpResourceMetrics),
317 Logs(OtlpResourceLogs),
318 Traces(OtlpResourceSpans),
319}
320
321struct SourceHandler {
323 tx: mpsc::Sender<OtlpResource>,
324}
325
326impl SourceHandler {
327 fn new(tx: mpsc::Sender<OtlpResource>) -> Self {
328 Self { tx }
329 }
330}
331
332#[async_trait]
333impl OtlpHandler for SourceHandler {
334 async fn handle_metrics(&self, body: Bytes) -> Result<(), GenericError> {
335 let request =
336 ExportMetricsServiceRequest::decode(body).error_context("Failed to decode metrics export request.")?;
337
338 for resource_metrics in request.resource_metrics {
339 self.tx
340 .send(OtlpResource::Metrics(resource_metrics))
341 .await
342 .error_context("Failed to send resource metrics to converter: channel is closed.")?;
343 }
344 Ok(())
345 }
346
347 async fn handle_logs(&self, body: Bytes) -> Result<(), GenericError> {
348 let request = ExportLogsServiceRequest::decode(body).error_context("Failed to decode logs export request.")?;
349
350 for resource_logs in request.resource_logs {
351 self.tx
352 .send(OtlpResource::Logs(resource_logs))
353 .await
354 .error_context("Failed to send resource logs to converter: channel is closed.")?;
355 }
356 Ok(())
357 }
358
359 async fn handle_traces(&self, body: Bytes) -> Result<(), GenericError> {
360 let request =
361 ExportTraceServiceRequest::decode(body).error_context("Failed to decode trace export request.")?;
362
363 for resource_spans in request.resource_spans {
364 self.tx
365 .send(OtlpResource::Traces(resource_spans))
366 .await
367 .error_context("Failed to send resource spans to converter: channel is closed.")?;
368 }
369 Ok(())
370 }
371}
372
373async fn run_converter(
374 mut receiver: mpsc::Receiver<OtlpResource>, source_context: SourceContext,
375 origin_tag_resolver: Option<OtlpOriginTagResolver>, shutdown_handle: DynamicShutdownHandle,
376 mut metrics_translator: OtlpMetricsTranslator, metrics: Metrics, mut traces_translator: OtlpTracesTranslator,
377) {
378 tokio::pin!(shutdown_handle);
379 debug!("OTLP resource converter task started.");
380
381 let mut buffer_flush = interval(Duration::from_millis(100));
384 buffer_flush.set_missed_tick_behavior(MissedTickBehavior::Delay);
385
386 let mut metrics_dispatcher: Option<BufferedDispatcher<'_, EventsBuffer>> = None;
387 let mut logs_dispatcher: Option<BufferedDispatcher<'_, EventsBuffer>> = None;
388 let mut traces_dispatcher: Option<BufferedDispatcher<'_, EventsBuffer>> = None;
389
390 loop {
391 select! {
392 Some(otlp_resource) = receiver.recv() => {
393 match otlp_resource {
394 OtlpResource::Metrics(resource_metrics) => {
395 match metrics_translator.translate_metrics(resource_metrics, &metrics) {
396 Ok(events) => {
397 for event in events {
398 let dispatcher = metrics_dispatcher.get_or_insert_with(|| {
399 source_context
400 .dispatcher()
401 .buffered_named("metrics")
402 .expect("metrics output should exist")
403 });
404 if let Err(e) = dispatcher.push(event).await {
405 error!(error = %e, "Failed to dispatch metric event.");
406 }
407 }
408 }
409 Err(e) => {
410 error!(error = %e, "Failed to handle resource metrics.");
411 }
412 }
413 }
414 OtlpResource::Logs(resource_logs) => {
415 let translator = OtlpLogsTranslator::from_resource_logs(resource_logs, origin_tag_resolver.as_ref());
416 for log_event in translator {
417 metrics.logs_received().increment(1);
418
419 let dispatcher = logs_dispatcher.get_or_insert_with(|| {
420 source_context
421 .dispatcher()
422 .buffered_named("logs")
423 .expect("logs output should exist")
424 });
425 if let Err(e) = dispatcher.push(log_event).await {
426 error!(error = %e, "Failed to dispatch log event.");
427 }
428 }
429 }
430 OtlpResource::Traces(resource_spans) => {
431 for trace_event in traces_translator.translate_spans(resource_spans, &metrics) {
432 let dispatcher = traces_dispatcher.get_or_insert_with(|| {
433 source_context
434 .dispatcher()
435 .buffered_named("traces")
436 .expect("traces output should exist")
437 });
438 if let Err(e) = dispatcher.push(trace_event).await {
439 error!(error = %e, "Failed to dispatch trace event.");
440 }
441 }
442 }
443 }
444 },
445 _ = buffer_flush.tick() => {
446 if let Some(dispatcher) = metrics_dispatcher.take() {
447 if let Err(e) = dispatcher.flush().await {
448 error!(error = %e, "Failed to flush metric events.");
449 }
450 }
451 if let Some(dispatcher) = logs_dispatcher.take() {
452 if let Err(e) = dispatcher.flush().await {
453 error!(error = %e, "Failed to flush log events.");
454 }
455 }
456 if let Some(dispatcher) = traces_dispatcher.take() {
457 if let Err(e) = dispatcher.flush().await {
458 error!(error = %e, "Failed to flush trace events.");
459 }
460 }
461 },
462 _ = &mut shutdown_handle => {
463 debug!("Converter task received shutdown signal.");
464 break;
465 }
466 }
467 }
468
469 if let Some(dispatcher) = metrics_dispatcher.take() {
470 if let Err(e) = dispatcher.flush().await {
471 error!(error = %e, "Failed to flush metric events.");
472 }
473 }
474 if let Some(dispatcher) = logs_dispatcher.take() {
475 if let Err(e) = dispatcher.flush().await {
476 error!(error = %e, "Failed to flush log events.");
477 }
478 }
479 if let Some(dispatcher) = traces_dispatcher.take() {
480 if let Err(e) = dispatcher.flush().await {
481 error!(error = %e, "Failed to flush trace events.");
482 }
483 }
484
485 debug!("OTLP resource converter task stopped.");
486}
487
488#[cfg(test)]
489mod config_smoke {
490 use serde_json::json;
491
492 use super::OtlpConfiguration;
493 use crate::config_registry::structs;
494 use crate::config_registry::test_support::run_config_smoke_tests;
495
496 #[tokio::test]
497 async fn smoke_test() {
498 run_config_smoke_tests(structs::OTLP_CONFIGURATION, &[], json!({ "otlp_config": {} }), |cfg| {
499 OtlpConfiguration::from_configuration(&cfg).expect("OtlpConfiguration should deserialize")
500 })
501 .await
502 }
503}