saluki_components/sources/otlp/
mod.rs1use std::sync::Arc;
2use std::sync::LazyLock;
3use std::time::Duration;
4
5use async_trait::async_trait;
6use axum::body::Bytes;
7use bytesize::ByteSize;
8use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
9use otlp_protos::opentelemetry::proto::collector::logs::v1::ExportLogsServiceRequest;
10use otlp_protos::opentelemetry::proto::collector::metrics::v1::ExportMetricsServiceRequest;
11use otlp_protos::opentelemetry::proto::collector::trace::v1::ExportTraceServiceRequest;
12use otlp_protos::opentelemetry::proto::logs::v1::ResourceLogs as OtlpResourceLogs;
13use otlp_protos::opentelemetry::proto::metrics::v1::ResourceMetrics as OtlpResourceMetrics;
14use otlp_protos::opentelemetry::proto::trace::v1::ResourceSpans as OtlpResourceSpans;
15use prost::Message;
16use saluki_common::task::HandleExt as _;
17use saluki_config::GenericConfiguration;
18use saluki_context::ContextResolver;
19use saluki_core::topology::interconnect::EventBufferManager;
20use saluki_core::topology::shutdown::{DynamicShutdownCoordinator, DynamicShutdownHandle};
21use saluki_core::{
22 components::{
23 sources::{Source, SourceBuilder, SourceContext},
24 ComponentContext,
25 },
26 data_model::event::{Event, EventType},
27 topology::{EventsBuffer, OutputDefinition},
28};
29use saluki_env::WorkloadProvider;
30use saluki_error::{generic_error, GenericError};
31use saluki_io::net::ListenAddress;
32use serde::Deserialize;
33use tokio::select;
34use tokio::sync::mpsc;
35use tokio::time::{interval, MissedTickBehavior};
36use tracing::{debug, error};
37
38use crate::common::otlp::config::Receiver;
39use crate::common::otlp::{build_metrics, Metrics, OtlpHandler, OtlpServerBuilder};
40
41mod attributes;
42mod logs;
43mod metrics;
44mod origin;
45mod resolver;
46use self::logs::translator::OtlpLogsTranslator;
47use self::metrics::translator::OtlpMetricsTranslator;
48use self::origin::OtlpOriginTagResolver;
49use self::resolver::build_context_resolver;
50
51const fn default_context_string_interner_size() -> ByteSize {
52 ByteSize::mib(2)
53}
54
55const fn default_cached_contexts_limit() -> usize {
56 500_000
57}
58
59const fn default_cached_tagsets_limit() -> usize {
60 500_000
61}
62
63const fn default_allow_context_heap_allocations() -> bool {
64 true
65}
66
67#[derive(Deserialize, Default)]
69pub struct OtlpConfiguration {
70 otlp_config: OtlpConfig,
71
72 #[serde(
78 rename = "otlp_string_interner_size",
79 default = "default_context_string_interner_size"
80 )]
81 context_string_interner_bytes: ByteSize,
82
83 #[serde(rename = "otlp_cached_contexts_limit", default = "default_cached_contexts_limit")]
91 cached_contexts_limit: usize,
92
93 #[serde(rename = "otlp_cached_tagsets_limit", default = "default_cached_tagsets_limit")]
101 cached_tagsets_limit: usize,
102
103 #[serde(
113 rename = "otlp_allow_context_heap_allocs",
114 default = "default_allow_context_heap_allocations"
115 )]
116 allow_context_heap_allocations: bool,
117
118 #[serde(skip)]
120 workload_provider: Option<Arc<dyn WorkloadProvider + Send + Sync>>,
121}
122
123#[derive(Deserialize, Debug, Default)]
124pub struct OtlpConfig {
125 #[serde(default)]
126 receiver: Receiver,
127 #[serde(default)]
128 metrics: MetricsConfig,
129 #[serde(default)]
130 logs: LogsConfig,
131 #[serde(default)]
132 traces: TracesConfig,
133}
134
135enum OtlpResource {
136 Metrics(OtlpResourceMetrics),
137 Logs(OtlpResourceLogs),
138 Traces(OtlpResourceSpans),
139}
140
141struct SourceHandler {
143 tx: mpsc::Sender<OtlpResource>,
144}
145
146impl SourceHandler {
147 fn new(tx: mpsc::Sender<OtlpResource>) -> Self {
148 Self { tx }
149 }
150}
151
152#[async_trait]
153impl OtlpHandler for SourceHandler {
154 async fn handle_metrics(&self, body: Bytes) -> Result<(), String> {
155 match ExportMetricsServiceRequest::decode(body) {
156 Ok(request) => {
157 for resource_metrics in request.resource_metrics {
158 if self.tx.send(OtlpResource::Metrics(resource_metrics)).await.is_err() {
159 error!("Failed to send resource metrics to converter; channel is closed.");
160 return Err("Internal processing channel closed.".to_string());
161 }
162 }
163 Ok(())
164 }
165 Err(e) => {
166 error!(error = %e, "Failed to decode OTLP protobuf request.");
167 Err("Bad Request: Invalid protobuf.".to_string())
168 }
169 }
170 }
171
172 async fn handle_logs(&self, body: Bytes) -> Result<(), String> {
173 match ExportLogsServiceRequest::decode(body) {
174 Ok(request) => {
175 for resource_logs in request.resource_logs {
176 if self.tx.send(OtlpResource::Logs(resource_logs)).await.is_err() {
177 error!("Failed to send resource logs to converter; channel is closed.");
178 return Err("Internal processing channel closed.".to_string());
179 }
180 }
181 Ok(())
182 }
183 Err(e) => {
184 error!(error = %e, "Failed to decode OTLP protobuf request.");
185 Err("Bad Request: Invalid protobuf.".to_string())
186 }
187 }
188 }
189
190 async fn handle_traces(&self, body: Bytes) -> Result<(), String> {
191 match ExportTraceServiceRequest::decode(body) {
192 Ok(request) => {
193 for resource_spans in request.resource_spans {
194 if self.tx.send(OtlpResource::Traces(resource_spans)).await.is_err() {
195 error!("Failed to send resource spans to converter; channel is closed.");
196 return Err("Internal processing channel closed.".to_string());
197 }
198 }
199 Ok(())
200 }
201 Err(e) => {
202 error!(error = %e, "Failed to decode OTLP protobuf request.");
203 Err("Bad Request: Invalid protobuf.".to_string())
204 }
205 }
206 }
207}
208
209#[derive(Deserialize, Debug)]
210pub struct LogsConfig {
211 #[serde(default = "default_logs_enabled")]
215 pub enabled: bool,
216}
217
218fn default_logs_enabled() -> bool {
219 true
220}
221
222impl Default for LogsConfig {
223 fn default() -> Self {
224 Self {
225 enabled: default_logs_enabled(),
226 }
227 }
228}
229
230#[derive(Deserialize, Debug)]
231pub struct MetricsConfig {
232 #[serde(default = "default_metrics_enabled")]
236 pub enabled: bool,
237}
238
239fn default_metrics_enabled() -> bool {
240 true
241}
242
243impl Default for MetricsConfig {
244 fn default() -> Self {
245 Self {
246 enabled: default_metrics_enabled(),
247 }
248 }
249}
250
251#[derive(Deserialize, Debug)]
252pub struct TracesConfig {
253 #[serde(default = "default_traces_enabled")]
257 pub enabled: bool,
258}
259
260fn default_traces_enabled() -> bool {
261 true
262}
263
264impl Default for TracesConfig {
265 fn default() -> Self {
266 Self {
267 enabled: default_traces_enabled(),
268 }
269 }
270}
271impl OtlpConfiguration {
272 pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
274 Ok(config.as_typed()?)
275 }
276
277 pub fn with_workload_provider<W>(mut self, workload_provider: W) -> Self
283 where
284 W: WorkloadProvider + Send + Sync + 'static,
285 {
286 self.workload_provider = Some(Arc::new(workload_provider));
287 self
288 }
289}
290
291#[async_trait]
292impl SourceBuilder for OtlpConfiguration {
293 fn outputs(&self) -> &[OutputDefinition] {
294 static OUTPUTS: LazyLock<Vec<OutputDefinition>> = LazyLock::new(|| {
295 vec![
296 OutputDefinition::named_output("metrics", EventType::Metric),
297 OutputDefinition::named_output("logs", EventType::Log),
298 OutputDefinition::named_output("traces", EventType::Trace),
299 ]
300 });
301
302 &OUTPUTS
303 }
304
305 async fn build(&self, context: ComponentContext) -> Result<Box<dyn Source + Send>, GenericError> {
306 if !self.otlp_config.metrics.enabled && !self.otlp_config.logs.enabled && !self.otlp_config.traces.enabled {
307 return Err(generic_error!(
308 "OTLP metrics, logs and traces support is disabled. Please enable at least one of them."
309 ));
310 }
311
312 let grpc_listen_str = format!(
313 "{}://{}",
314 self.otlp_config.receiver.protocols.grpc.transport, self.otlp_config.receiver.protocols.grpc.endpoint
315 );
316 let grpc_endpoint = ListenAddress::try_from(grpc_listen_str.as_str())
317 .map_err(|e| generic_error!("Invalid gRPC endpoint address '{}': {}", grpc_listen_str, e))?;
318
319 if !matches!(grpc_endpoint, ListenAddress::Tcp(_)) {
321 return Err(generic_error!("Only 'tcp' transport is supported for OTLP gRPC"));
322 }
323
324 let http_socket_addr = self.otlp_config.receiver.protocols.http.endpoint.parse().map_err(|e| {
325 generic_error!(
326 "Invalid HTTP endpoint address '{}': {}",
327 self.otlp_config.receiver.protocols.http.endpoint,
328 e
329 )
330 })?;
331
332 let maybe_origin_tags_resolver = self.workload_provider.clone().map(OtlpOriginTagResolver::new);
333
334 let context_resolver = build_context_resolver(self, &context, maybe_origin_tags_resolver.clone())?;
335 let translator_config = metrics::config::OtlpTranslatorConfig::default().with_remapping(true);
336 let grpc_max_recv_msg_size_bytes =
337 self.otlp_config.receiver.protocols.grpc.max_recv_msg_size_mib as usize * 1024 * 1024;
338 let metrics = build_metrics(&context);
339
340 Ok(Box::new(Otlp {
341 context_resolver,
342 origin_tag_resolver: maybe_origin_tags_resolver,
343 grpc_endpoint,
344 http_endpoint: ListenAddress::Tcp(http_socket_addr),
345 grpc_max_recv_msg_size_bytes,
346 translator_config,
347 metrics,
348 }))
349 }
350}
351
352impl MemoryBounds for OtlpConfiguration {
353 fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
354 builder
355 .minimum()
356 .with_single_value::<Otlp>("source struct")
357 .with_single_value::<SourceHandler>("source handler");
358 }
359}
360
361pub struct Otlp {
362 context_resolver: ContextResolver,
363 origin_tag_resolver: Option<OtlpOriginTagResolver>,
364 grpc_endpoint: ListenAddress,
365 http_endpoint: ListenAddress,
366 grpc_max_recv_msg_size_bytes: usize,
367 translator_config: metrics::config::OtlpTranslatorConfig,
368 metrics: Metrics, }
370
371#[async_trait]
372impl Source for Otlp {
373 async fn run(self: Box<Self>, mut context: SourceContext) -> Result<(), GenericError> {
374 let mut global_shutdown = context.take_shutdown_handle();
375 let mut health = context.take_health_handle();
376 let memory_limiter = context.topology_context().memory_limiter();
377
378 let (tx, rx) = mpsc::channel::<OtlpResource>(1024);
380
381 let mut converter_shutdown_coordinator = DynamicShutdownCoordinator::default();
382
383 let metrics_translator = OtlpMetricsTranslator::new(self.translator_config, self.context_resolver);
384
385 let thread_pool_handle = context.topology_context().global_thread_pool().clone();
386
387 let metrics_arc = Arc::new(self.metrics);
388
389 thread_pool_handle.spawn_traced_named(
391 "otlp-resource-converter",
392 run_converter(
393 rx,
394 context.clone(),
395 self.origin_tag_resolver,
396 converter_shutdown_coordinator.register(),
397 metrics_translator,
398 metrics_arc.clone(),
399 ),
400 );
401
402 let handler = SourceHandler::new(tx);
403 let server_builder = OtlpServerBuilder::new(
404 self.http_endpoint,
405 self.grpc_endpoint,
406 self.grpc_max_recv_msg_size_bytes,
407 );
408
409 let (http_shutdown, mut http_error) = server_builder
410 .build(handler, memory_limiter.clone(), thread_pool_handle, metrics_arc.clone())
411 .await?;
412
413 health.mark_ready();
414 debug!("OTLP source started.");
415
416 loop {
418 select! {
419 _ = &mut global_shutdown => {
420 debug!("Received shutdown signal.");
421 break
422 },
423 error = &mut http_error => {
424 if let Some(error) = error {
425 debug!(%error, "HTTP server error.");
426 }
427 break;
428 },
429 _ = health.live() => continue,
430 }
431 }
432
433 debug!("Stopping OTLP source...");
434
435 http_shutdown.shutdown();
436 converter_shutdown_coordinator.shutdown().await;
437
438 debug!("OTLP source stopped.");
439
440 Ok(())
441 }
442}
443
444async fn dispatch_events(mut events: EventsBuffer, source_context: &SourceContext) {
445 if events.is_empty() {
446 return;
447 }
448
449 if events.has_event_type(EventType::Trace) {
450 let mut buffered_dispatcher = source_context
451 .dispatcher()
452 .buffered_named("traces")
453 .expect("traces output should exist");
454 for trace_event in events.extract(Event::is_trace) {
455 if let Err(e) = buffered_dispatcher.push(trace_event).await {
456 error!(error = %e, "Failed to dispatch trace(s).");
457 }
458 }
459 if let Err(e) = buffered_dispatcher.flush().await {
460 error!(error = %e, "Failed to flush trace(s).");
461 }
462 }
463
464 if events.has_event_type(EventType::Log) {
465 let mut buffered_dispatcher = source_context
466 .dispatcher()
467 .buffered_named("logs")
468 .expect("logs output should exist");
469
470 for log_event in events.extract(Event::is_log) {
471 if let Err(e) = buffered_dispatcher.push(log_event).await {
472 error!(error = %e, "Failed to dispatch log(s).");
473 }
474 }
475
476 if let Err(e) = buffered_dispatcher.flush().await {
477 error!(error = %e, "Failed to flush log(s).");
478 }
479 }
480
481 let len = events.len();
482 if let Err(e) = source_context.dispatcher().dispatch_named("metrics", events).await {
483 error!(error = %e, "Failed to dispatch metric events.");
484 } else {
485 debug!(events_len = len, "Dispatched metric events.");
486 }
487}
488
489async fn run_converter(
490 mut receiver: mpsc::Receiver<OtlpResource>, source_context: SourceContext,
491 origin_tag_resolver: Option<OtlpOriginTagResolver>, shutdown_handle: DynamicShutdownHandle,
492 mut metrics_translator: OtlpMetricsTranslator, metrics: Arc<Metrics>,
493) {
494 tokio::pin!(shutdown_handle);
495 debug!("OTLP resource converter task started.");
496
497 let mut buffer_flush = interval(Duration::from_millis(100));
500 buffer_flush.set_missed_tick_behavior(MissedTickBehavior::Delay);
501
502 let mut event_buffer_manager = EventBufferManager::default();
503
504 loop {
505 select! {
506 Some(otlp_resource) = receiver.recv() => {
507 match otlp_resource {
508 OtlpResource::Metrics(resource_metrics) => {
509 match metrics_translator.map_metrics(resource_metrics, &metrics) {
510 Ok(events) => {
511 for event in events {
512 if let Some(event_buffer) = event_buffer_manager.try_push(event) {
513 dispatch_events(event_buffer, &source_context).await;
514 }
515 }
516 }
517 Err(e) => {
518 error!(error = %e, "Failed to handle resource metrics.");
519 }
520 }
521 }
522 OtlpResource::Logs(resource_logs) => {
523 let translator = OtlpLogsTranslator::from_resource_logs(resource_logs, origin_tag_resolver.as_ref());
524 for log_event in translator {
525 metrics.logs_received().increment(1);
526
527 if let Some(event_buffer) = event_buffer_manager.try_push(log_event) {
528 dispatch_events(event_buffer, &source_context).await;
529 }
530 }
531 }
532 OtlpResource::Traces(_resource_spans) => {
533 }
535 }
536 },
537 _ = buffer_flush.tick() => {
538 if let Some(event_buffer) = event_buffer_manager.consume() {
539 dispatch_events(event_buffer, &source_context).await;
540 }
541 },
542 _ = &mut shutdown_handle => {
543 debug!("Converter task received shutdown signal.");
544 break;
545 }
546 }
547 }
548
549 if let Some(event_buffer) = event_buffer_manager.consume() {
550 dispatch_events(event_buffer, &source_context).await;
551 }
552
553 debug!("OTLP resource converter task stopped.");
554}