saluki_components/sources/otlp/
mod.rs1use std::sync::Arc;
2use std::sync::LazyLock;
3use std::time::Duration;
4
5use async_trait::async_trait;
6use axum::body::Bytes;
7use bytesize::ByteSize;
8use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
9use otlp_protos::opentelemetry::proto::collector::logs::v1::ExportLogsServiceRequest;
10use otlp_protos::opentelemetry::proto::collector::metrics::v1::ExportMetricsServiceRequest;
11use otlp_protos::opentelemetry::proto::collector::trace::v1::ExportTraceServiceRequest;
12use otlp_protos::opentelemetry::proto::logs::v1::ResourceLogs as OtlpResourceLogs;
13use otlp_protos::opentelemetry::proto::metrics::v1::ResourceMetrics as OtlpResourceMetrics;
14use otlp_protos::opentelemetry::proto::trace::v1::ResourceSpans as OtlpResourceSpans;
15use prost::Message;
16use saluki_common::task::HandleExt as _;
17use saluki_config::GenericConfiguration;
18use saluki_context::ContextResolver;
19use saluki_core::topology::interconnect::EventBufferManager;
20use saluki_core::topology::shutdown::{DynamicShutdownCoordinator, DynamicShutdownHandle};
21use saluki_core::{
22 components::{
23 sources::{Source, SourceBuilder, SourceContext},
24 ComponentContext,
25 },
26 data_model::event::{Event, EventType},
27 topology::{EventsBuffer, OutputDefinition},
28};
29use saluki_env::WorkloadProvider;
30use saluki_error::ErrorContext as _;
31use saluki_error::{generic_error, GenericError};
32use saluki_io::net::ListenAddress;
33use serde::Deserialize;
34use tokio::select;
35use tokio::sync::mpsc;
36use tokio::time::{interval, MissedTickBehavior};
37use tracing::{debug, error};
38
39use crate::common::otlp::config::OtlpConfig;
40use crate::common::otlp::config::TracesConfig;
41use crate::common::otlp::{build_metrics, Metrics, OtlpHandler, OtlpServerBuilder};
42
43mod attributes;
44mod logs;
45mod metrics;
46mod origin;
47mod resolver;
48pub mod traces;
49use self::logs::translator::OtlpLogsTranslator;
50use self::metrics::translator::OtlpMetricsTranslator;
51use self::origin::OtlpOriginTagResolver;
52use self::resolver::build_context_resolver;
53use self::traces::translator::OtlpTracesTranslator;
54
55const fn default_context_string_interner_size() -> ByteSize {
56 ByteSize::mib(2)
57}
58
59const fn default_cached_contexts_limit() -> usize {
60 500_000
61}
62
63const fn default_cached_tagsets_limit() -> usize {
64 500_000
65}
66
67const fn default_allow_context_heap_allocations() -> bool {
68 true
69}
70
71#[derive(Deserialize, Default)]
73pub struct OtlpConfiguration {
74 otlp_config: OtlpConfig,
75
76 #[serde(
82 rename = "otlp_string_interner_size",
83 default = "default_context_string_interner_size"
84 )]
85 context_string_interner_bytes: ByteSize,
86
87 #[serde(rename = "otlp_cached_contexts_limit", default = "default_cached_contexts_limit")]
95 cached_contexts_limit: usize,
96
97 #[serde(rename = "otlp_cached_tagsets_limit", default = "default_cached_tagsets_limit")]
105 cached_tagsets_limit: usize,
106
107 #[serde(
117 rename = "otlp_allow_context_heap_allocs",
118 default = "default_allow_context_heap_allocations"
119 )]
120 allow_context_heap_allocations: bool,
121
122 #[serde(skip)]
124 workload_provider: Option<Arc<dyn WorkloadProvider + Send + Sync>>,
125}
126
127impl OtlpConfiguration {
128 pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
130 Ok(config.as_typed()?)
131 }
132
133 pub fn with_workload_provider<W>(mut self, workload_provider: W) -> Self
139 where
140 W: WorkloadProvider + Send + Sync + 'static,
141 {
142 self.workload_provider = Some(Arc::new(workload_provider));
143 self
144 }
145}
146
147#[async_trait]
148impl SourceBuilder for OtlpConfiguration {
149 fn outputs(&self) -> &[OutputDefinition<EventType>] {
150 static OUTPUTS: LazyLock<Vec<OutputDefinition<EventType>>> = LazyLock::new(|| {
151 vec![
152 OutputDefinition::named_output("metrics", EventType::Metric),
153 OutputDefinition::named_output("logs", EventType::Log),
154 OutputDefinition::named_output("traces", EventType::Trace),
155 ]
156 });
157
158 &OUTPUTS
159 }
160
161 async fn build(&self, context: ComponentContext) -> Result<Box<dyn Source + Send>, GenericError> {
162 if !self.otlp_config.metrics.enabled && !self.otlp_config.logs.enabled && !self.otlp_config.traces.enabled {
163 return Err(generic_error!(
164 "OTLP metrics, logs and traces support is disabled. Please enable at least one of them."
165 ));
166 }
167
168 let grpc_listen_str = format!(
169 "{}://{}",
170 self.otlp_config.receiver.protocols.grpc.transport, self.otlp_config.receiver.protocols.grpc.endpoint
171 );
172 let grpc_endpoint = ListenAddress::try_from(grpc_listen_str.as_str())
173 .map_err(|e| generic_error!("Invalid gRPC endpoint address '{}': {}", grpc_listen_str, e))?;
174
175 if !matches!(grpc_endpoint, ListenAddress::Tcp(_)) {
177 return Err(generic_error!("Only 'tcp' transport is supported for OTLP gRPC"));
178 }
179
180 let http_socket_addr = self.otlp_config.receiver.protocols.http.endpoint.parse().map_err(|e| {
181 generic_error!(
182 "Invalid HTTP endpoint address '{}': {}",
183 self.otlp_config.receiver.protocols.http.endpoint,
184 e
185 )
186 })?;
187
188 let maybe_origin_tags_resolver = self.workload_provider.clone().map(OtlpOriginTagResolver::new);
189
190 let context_resolver = build_context_resolver(self, &context, maybe_origin_tags_resolver.clone())?;
191 let metrics_translator_config = metrics::config::OtlpMetricsTranslatorConfig::default().with_remapping(true);
192 let traces_config = self.otlp_config.traces.clone();
193 let grpc_max_recv_msg_size_bytes =
194 self.otlp_config.receiver.protocols.grpc.max_recv_msg_size_mib as usize * 1024 * 1024;
195 let metrics = build_metrics(&context);
196
197 Ok(Box::new(Otlp {
198 context_resolver,
199 origin_tag_resolver: maybe_origin_tags_resolver,
200 grpc_endpoint,
201 http_endpoint: ListenAddress::Tcp(http_socket_addr),
202 grpc_max_recv_msg_size_bytes,
203 metrics_translator_config,
204 traces_config,
205 metrics,
206 }))
207 }
208}
209
210impl MemoryBounds for OtlpConfiguration {
211 fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
212 builder
213 .minimum()
214 .with_single_value::<Otlp>("source struct")
215 .with_single_value::<SourceHandler>("source handler");
216 }
217}
218
219pub struct Otlp {
220 context_resolver: ContextResolver,
221 origin_tag_resolver: Option<OtlpOriginTagResolver>,
222 grpc_endpoint: ListenAddress,
223 http_endpoint: ListenAddress,
224 grpc_max_recv_msg_size_bytes: usize,
225 metrics_translator_config: metrics::config::OtlpMetricsTranslatorConfig,
226 traces_config: TracesConfig,
227 metrics: Metrics, }
229
230#[async_trait]
231impl Source for Otlp {
232 async fn run(self: Box<Self>, mut context: SourceContext) -> Result<(), GenericError> {
233 let Self {
234 context_resolver,
235 origin_tag_resolver,
236 grpc_endpoint,
237 http_endpoint,
238 grpc_max_recv_msg_size_bytes,
239 metrics_translator_config,
240 traces_config,
241 metrics,
242 } = *self;
243
244 let mut global_shutdown = context.take_shutdown_handle();
245 let mut health = context.take_health_handle();
246 let memory_limiter = context.topology_context().memory_limiter();
247
248 let (tx, rx) = mpsc::channel::<OtlpResource>(1024);
250
251 let mut converter_shutdown_coordinator = DynamicShutdownCoordinator::default();
252
253 let metrics_translator = OtlpMetricsTranslator::new(metrics_translator_config, context_resolver);
254
255 let thread_pool_handle = context.topology_context().global_thread_pool().clone();
256
257 thread_pool_handle.spawn_traced_named(
259 "otlp-resource-converter",
260 run_converter(
261 rx,
262 context.clone(),
263 origin_tag_resolver,
264 converter_shutdown_coordinator.register(),
265 metrics_translator,
266 metrics.clone(),
267 traces_config,
268 ),
269 );
270
271 let handler = SourceHandler::new(tx);
272 let server_builder = OtlpServerBuilder::new(http_endpoint, grpc_endpoint, grpc_max_recv_msg_size_bytes);
273
274 let (http_shutdown, mut http_error) = server_builder
275 .build(handler, memory_limiter.clone(), thread_pool_handle, metrics)
276 .await?;
277
278 health.mark_ready();
279 debug!("OTLP source started.");
280
281 loop {
283 select! {
284 _ = &mut global_shutdown => {
285 debug!("Received shutdown signal.");
286 break
287 },
288 error = &mut http_error => {
289 if let Some(error) = error {
290 debug!(%error, "HTTP server error.");
291 }
292 break;
293 },
294 _ = health.live() => continue,
295 }
296 }
297
298 debug!("Stopping OTLP source...");
299
300 http_shutdown.shutdown();
301 converter_shutdown_coordinator.shutdown().await;
302
303 debug!("OTLP source stopped.");
304
305 Ok(())
306 }
307}
308
309enum OtlpResource {
310 Metrics(OtlpResourceMetrics),
311 Logs(OtlpResourceLogs),
312 Traces(OtlpResourceSpans),
313}
314
315struct SourceHandler {
317 tx: mpsc::Sender<OtlpResource>,
318}
319
320impl SourceHandler {
321 fn new(tx: mpsc::Sender<OtlpResource>) -> Self {
322 Self { tx }
323 }
324}
325
326#[async_trait]
327impl OtlpHandler for SourceHandler {
328 async fn handle_metrics(&self, body: Bytes) -> Result<(), GenericError> {
329 let request =
330 ExportMetricsServiceRequest::decode(body).error_context("Failed to decode metrics export request.")?;
331
332 for resource_metrics in request.resource_metrics {
333 self.tx
334 .send(OtlpResource::Metrics(resource_metrics))
335 .await
336 .error_context("Failed to send resource metrics to converter: channel is closed.")?;
337 }
338 Ok(())
339 }
340
341 async fn handle_logs(&self, body: Bytes) -> Result<(), GenericError> {
342 let request = ExportLogsServiceRequest::decode(body).error_context("Failed to decode logs export request.")?;
343
344 for resource_logs in request.resource_logs {
345 self.tx
346 .send(OtlpResource::Logs(resource_logs))
347 .await
348 .error_context("Failed to send resource logs to converter: channel is closed.")?;
349 }
350 Ok(())
351 }
352
353 async fn handle_traces(&self, body: Bytes) -> Result<(), GenericError> {
354 let request =
355 ExportTraceServiceRequest::decode(body).error_context("Failed to decode trace export request.")?;
356
357 for resource_spans in request.resource_spans {
358 self.tx
359 .send(OtlpResource::Traces(resource_spans))
360 .await
361 .error_context("Failed to send resource spans to converter: channel is closed.")?;
362 }
363 Ok(())
364 }
365}
366
367async fn dispatch_events(mut events: EventsBuffer, source_context: &SourceContext) {
368 if events.is_empty() {
369 return;
370 }
371
372 if events.has_event_type(EventType::Trace) {
373 let mut buffered_dispatcher = source_context
374 .dispatcher()
375 .buffered_named("traces")
376 .expect("traces output should exist");
377 for trace_event in events.extract(Event::is_trace) {
378 if let Err(e) = buffered_dispatcher.push(trace_event).await {
379 error!(error = %e, "Failed to dispatch trace(s).");
380 }
381 }
382 if let Err(e) = buffered_dispatcher.flush().await {
383 error!(error = %e, "Failed to flush trace(s).");
384 }
385 }
386
387 if events.has_event_type(EventType::Log) {
388 let mut buffered_dispatcher = source_context
389 .dispatcher()
390 .buffered_named("logs")
391 .expect("logs output should exist");
392
393 for log_event in events.extract(Event::is_log) {
394 if let Err(e) = buffered_dispatcher.push(log_event).await {
395 error!(error = %e, "Failed to dispatch log(s).");
396 }
397 }
398
399 if let Err(e) = buffered_dispatcher.flush().await {
400 error!(error = %e, "Failed to flush log(s).");
401 }
402 }
403
404 let len = events.len();
405 if let Err(e) = source_context.dispatcher().dispatch_named("metrics", events).await {
406 error!(error = %e, "Failed to dispatch metric events.");
407 } else {
408 debug!(events_len = len, "Dispatched metric events.");
409 }
410}
411
412async fn run_converter(
413 mut receiver: mpsc::Receiver<OtlpResource>, source_context: SourceContext,
414 origin_tag_resolver: Option<OtlpOriginTagResolver>, shutdown_handle: DynamicShutdownHandle,
415 mut metrics_translator: OtlpMetricsTranslator, metrics: Metrics, traces_config: TracesConfig,
416) {
417 tokio::pin!(shutdown_handle);
418 debug!("OTLP resource converter task started.");
419
420 let mut buffer_flush = interval(Duration::from_millis(100));
423 buffer_flush.set_missed_tick_behavior(MissedTickBehavior::Delay);
424
425 let mut event_buffer_manager = EventBufferManager::default();
426
427 let traces_translator = OtlpTracesTranslator::new(traces_config);
428
429 loop {
430 select! {
431 Some(otlp_resource) = receiver.recv() => {
432 match otlp_resource {
433 OtlpResource::Metrics(resource_metrics) => {
434 match metrics_translator.map_metrics(resource_metrics, &metrics) {
435 Ok(events) => {
436 for event in events {
437 if let Some(event_buffer) = event_buffer_manager.try_push(event) {
438 dispatch_events(event_buffer, &source_context).await;
439 }
440 }
441 }
442 Err(e) => {
443 error!(error = %e, "Failed to handle resource metrics.");
444 }
445 }
446 }
447 OtlpResource::Logs(resource_logs) => {
448 let translator = OtlpLogsTranslator::from_resource_logs(resource_logs, origin_tag_resolver.as_ref());
449 for log_event in translator {
450 metrics.logs_received().increment(1);
451
452 if let Some(event_buffer) = event_buffer_manager.try_push(log_event) {
453 dispatch_events(event_buffer, &source_context).await;
454 }
455 }
456 }
457 OtlpResource::Traces(resource_spans) => {
458 let trace_events =
459 traces_translator.translate_resource_spans(resource_spans, &metrics);
460 for trace_event in trace_events {
461 if let Some(event_buffer) = event_buffer_manager.try_push(trace_event) {
462 dispatch_events(event_buffer, &source_context).await;
463 }
464 }
465 }
466 }
467 },
468 _ = buffer_flush.tick() => {
469 if let Some(event_buffer) = event_buffer_manager.consume() {
470 dispatch_events(event_buffer, &source_context).await;
471 }
472 },
473 _ = &mut shutdown_handle => {
474 debug!("Converter task received shutdown signal.");
475 break;
476 }
477 }
478 }
479
480 if let Some(event_buffer) = event_buffer_manager.consume() {
481 dispatch_events(event_buffer, &source_context).await;
482 }
483
484 debug!("OTLP resource converter task stopped.");
485}