saluki_components/sources/otlp/
mod.rs1use std::sync::Arc;
2use std::sync::LazyLock;
3use std::time::Duration;
4
5use async_trait::async_trait;
6use axum::body::Bytes;
7use bytesize::ByteSize;
8use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
9use otlp_protos::opentelemetry::proto::collector::logs::v1::ExportLogsServiceRequest;
10use otlp_protos::opentelemetry::proto::collector::metrics::v1::ExportMetricsServiceRequest;
11use otlp_protos::opentelemetry::proto::collector::trace::v1::ExportTraceServiceRequest;
12use otlp_protos::opentelemetry::proto::logs::v1::ResourceLogs as OtlpResourceLogs;
13use otlp_protos::opentelemetry::proto::metrics::v1::ResourceMetrics as OtlpResourceMetrics;
14use otlp_protos::opentelemetry::proto::trace::v1::ResourceSpans as OtlpResourceSpans;
15use prost::Message;
16use saluki_common::task::HandleExt as _;
17use saluki_config::GenericConfiguration;
18use saluki_context::ContextResolver;
19use saluki_core::topology::interconnect::EventBufferManager;
20use saluki_core::topology::shutdown::{DynamicShutdownCoordinator, DynamicShutdownHandle};
21use saluki_core::{
22 components::{
23 sources::{Source, SourceBuilder, SourceContext},
24 ComponentContext,
25 },
26 data_model::event::{Event, EventType},
27 topology::{EventsBuffer, OutputDefinition},
28};
29use saluki_env::WorkloadProvider;
30use saluki_error::ErrorContext as _;
31use saluki_error::{generic_error, GenericError};
32use saluki_io::net::ListenAddress;
33use serde::Deserialize;
34use tokio::select;
35use tokio::sync::mpsc;
36use tokio::time::{interval, MissedTickBehavior};
37use tracing::{debug, error};
38
39use crate::common::otlp::config::OtlpConfig;
40use crate::common::otlp::config::TracesConfig;
41use crate::common::otlp::{build_metrics, Metrics, OtlpHandler, OtlpServerBuilder};
42
43mod logs;
44mod metrics;
45mod resolver;
46use self::logs::translator::OtlpLogsTranslator;
47use self::metrics::translator::OtlpMetricsTranslator;
48use self::resolver::build_context_resolver;
49use crate::common::otlp::origin::OtlpOriginTagResolver;
50use crate::common::otlp::traces::translator::OtlpTracesTranslator;
51
52const fn default_context_string_interner_size() -> ByteSize {
53 ByteSize::mib(2)
54}
55
56const fn default_cached_contexts_limit() -> usize {
57 500_000
58}
59
60const fn default_cached_tagsets_limit() -> usize {
61 500_000
62}
63
64const fn default_allow_context_heap_allocations() -> bool {
65 true
66}
67
68#[derive(Deserialize, Default)]
70pub struct OtlpConfiguration {
71 otlp_config: OtlpConfig,
72
73 #[serde(
79 rename = "otlp_string_interner_size",
80 default = "default_context_string_interner_size"
81 )]
82 context_string_interner_bytes: ByteSize,
83
84 #[serde(rename = "otlp_cached_contexts_limit", default = "default_cached_contexts_limit")]
92 cached_contexts_limit: usize,
93
94 #[serde(rename = "otlp_cached_tagsets_limit", default = "default_cached_tagsets_limit")]
102 cached_tagsets_limit: usize,
103
104 #[serde(
114 rename = "otlp_allow_context_heap_allocs",
115 default = "default_allow_context_heap_allocations"
116 )]
117 allow_context_heap_allocations: bool,
118
119 #[serde(skip)]
121 workload_provider: Option<Arc<dyn WorkloadProvider + Send + Sync>>,
122}
123
124impl OtlpConfiguration {
125 pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
127 Ok(config.as_typed()?)
128 }
129
130 pub fn with_workload_provider<W>(mut self, workload_provider: W) -> Self
136 where
137 W: WorkloadProvider + Send + Sync + 'static,
138 {
139 self.workload_provider = Some(Arc::new(workload_provider));
140 self
141 }
142}
143
144#[async_trait]
145impl SourceBuilder for OtlpConfiguration {
146 fn outputs(&self) -> &[OutputDefinition<EventType>] {
147 static OUTPUTS: LazyLock<Vec<OutputDefinition<EventType>>> = LazyLock::new(|| {
148 vec![
149 OutputDefinition::named_output("metrics", EventType::Metric),
150 OutputDefinition::named_output("logs", EventType::Log),
151 OutputDefinition::named_output("traces", EventType::Trace),
152 ]
153 });
154
155 &OUTPUTS
156 }
157
158 async fn build(&self, context: ComponentContext) -> Result<Box<dyn Source + Send>, GenericError> {
159 if !self.otlp_config.metrics.enabled && !self.otlp_config.logs.enabled && !self.otlp_config.traces.enabled {
160 return Err(generic_error!(
161 "OTLP metrics, logs and traces support is disabled. Please enable at least one of them."
162 ));
163 }
164
165 let grpc_listen_str = format!(
166 "{}://{}",
167 self.otlp_config.receiver.protocols.grpc.transport, self.otlp_config.receiver.protocols.grpc.endpoint
168 );
169 let grpc_endpoint = ListenAddress::try_from(grpc_listen_str.as_str())
170 .map_err(|e| generic_error!("Invalid gRPC endpoint address '{}': {}", grpc_listen_str, e))?;
171
172 if !matches!(grpc_endpoint, ListenAddress::Tcp(_)) {
174 return Err(generic_error!("Only 'tcp' transport is supported for OTLP gRPC"));
175 }
176
177 let http_socket_addr = self.otlp_config.receiver.protocols.http.endpoint.parse().map_err(|e| {
178 generic_error!(
179 "Invalid HTTP endpoint address '{}': {}",
180 self.otlp_config.receiver.protocols.http.endpoint,
181 e
182 )
183 })?;
184
185 let maybe_origin_tags_resolver = self.workload_provider.clone().map(OtlpOriginTagResolver::new);
186
187 let context_resolver = build_context_resolver(self, &context, maybe_origin_tags_resolver.clone())?;
188 let metrics_translator_config = metrics::config::OtlpMetricsTranslatorConfig::default().with_remapping(true);
189 let traces_config = self.otlp_config.traces.clone();
190 let grpc_max_recv_msg_size_bytes =
191 self.otlp_config.receiver.protocols.grpc.max_recv_msg_size_mib as usize * 1024 * 1024;
192 let metrics = build_metrics(&context);
193
194 Ok(Box::new(Otlp {
195 context_resolver,
196 origin_tag_resolver: maybe_origin_tags_resolver,
197 grpc_endpoint,
198 http_endpoint: ListenAddress::Tcp(http_socket_addr),
199 grpc_max_recv_msg_size_bytes,
200 metrics_translator_config,
201 traces_config,
202 metrics,
203 }))
204 }
205}
206
207impl MemoryBounds for OtlpConfiguration {
208 fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
209 builder
210 .minimum()
211 .with_single_value::<Otlp>("source struct")
212 .with_single_value::<SourceHandler>("source handler");
213 }
214}
215
216pub struct Otlp {
217 context_resolver: ContextResolver,
218 origin_tag_resolver: Option<OtlpOriginTagResolver>,
219 grpc_endpoint: ListenAddress,
220 http_endpoint: ListenAddress,
221 grpc_max_recv_msg_size_bytes: usize,
222 metrics_translator_config: metrics::config::OtlpMetricsTranslatorConfig,
223 traces_config: TracesConfig,
224 metrics: Metrics, }
226
227#[async_trait]
228impl Source for Otlp {
229 async fn run(self: Box<Self>, mut context: SourceContext) -> Result<(), GenericError> {
230 let Self {
231 context_resolver,
232 origin_tag_resolver,
233 grpc_endpoint,
234 http_endpoint,
235 grpc_max_recv_msg_size_bytes,
236 metrics_translator_config,
237 traces_config,
238 metrics,
239 } = *self;
240
241 let mut global_shutdown = context.take_shutdown_handle();
242 let mut health = context.take_health_handle();
243 let memory_limiter = context.topology_context().memory_limiter();
244
245 let (tx, rx) = mpsc::channel::<OtlpResource>(1024);
247
248 let mut converter_shutdown_coordinator = DynamicShutdownCoordinator::default();
249
250 let metrics_translator = OtlpMetricsTranslator::new(metrics_translator_config, context_resolver);
251
252 let thread_pool_handle = context.topology_context().global_thread_pool().clone();
253
254 thread_pool_handle.spawn_traced_named(
256 "otlp-resource-converter",
257 run_converter(
258 rx,
259 context.clone(),
260 origin_tag_resolver,
261 converter_shutdown_coordinator.register(),
262 metrics_translator,
263 metrics.clone(),
264 traces_config,
265 ),
266 );
267
268 let handler = SourceHandler::new(tx);
269 let server_builder = OtlpServerBuilder::new(http_endpoint, grpc_endpoint, grpc_max_recv_msg_size_bytes);
270
271 let (http_shutdown, mut http_error) = server_builder
272 .build(handler, memory_limiter.clone(), thread_pool_handle, metrics)
273 .await?;
274
275 health.mark_ready();
276 debug!("OTLP source started.");
277
278 loop {
280 select! {
281 _ = &mut global_shutdown => {
282 debug!("Received shutdown signal.");
283 break
284 },
285 error = &mut http_error => {
286 if let Some(error) = error {
287 debug!(%error, "HTTP server error.");
288 }
289 break;
290 },
291 _ = health.live() => continue,
292 }
293 }
294
295 debug!("Stopping OTLP source...");
296
297 http_shutdown.shutdown();
298 converter_shutdown_coordinator.shutdown().await;
299
300 debug!("OTLP source stopped.");
301
302 Ok(())
303 }
304}
305
306enum OtlpResource {
307 Metrics(OtlpResourceMetrics),
308 Logs(OtlpResourceLogs),
309 Traces(OtlpResourceSpans),
310}
311
312struct SourceHandler {
314 tx: mpsc::Sender<OtlpResource>,
315}
316
317impl SourceHandler {
318 fn new(tx: mpsc::Sender<OtlpResource>) -> Self {
319 Self { tx }
320 }
321}
322
323#[async_trait]
324impl OtlpHandler for SourceHandler {
325 async fn handle_metrics(&self, body: Bytes) -> Result<(), GenericError> {
326 let request =
327 ExportMetricsServiceRequest::decode(body).error_context("Failed to decode metrics export request.")?;
328
329 for resource_metrics in request.resource_metrics {
330 self.tx
331 .send(OtlpResource::Metrics(resource_metrics))
332 .await
333 .error_context("Failed to send resource metrics to converter: channel is closed.")?;
334 }
335 Ok(())
336 }
337
338 async fn handle_logs(&self, body: Bytes) -> Result<(), GenericError> {
339 let request = ExportLogsServiceRequest::decode(body).error_context("Failed to decode logs export request.")?;
340
341 for resource_logs in request.resource_logs {
342 self.tx
343 .send(OtlpResource::Logs(resource_logs))
344 .await
345 .error_context("Failed to send resource logs to converter: channel is closed.")?;
346 }
347 Ok(())
348 }
349
350 async fn handle_traces(&self, body: Bytes) -> Result<(), GenericError> {
351 let request =
352 ExportTraceServiceRequest::decode(body).error_context("Failed to decode trace export request.")?;
353
354 for resource_spans in request.resource_spans {
355 self.tx
356 .send(OtlpResource::Traces(resource_spans))
357 .await
358 .error_context("Failed to send resource spans to converter: channel is closed.")?;
359 }
360 Ok(())
361 }
362}
363
364async fn dispatch_events(mut events: EventsBuffer, source_context: &SourceContext) {
365 if events.is_empty() {
366 return;
367 }
368
369 if events.has_event_type(EventType::Trace) {
370 let mut buffered_dispatcher = source_context
371 .dispatcher()
372 .buffered_named("traces")
373 .expect("traces output should exist");
374 for trace_event in events.extract(Event::is_trace) {
375 if let Err(e) = buffered_dispatcher.push(trace_event).await {
376 error!(error = %e, "Failed to dispatch trace(s).");
377 }
378 }
379 if let Err(e) = buffered_dispatcher.flush().await {
380 error!(error = %e, "Failed to flush trace(s).");
381 }
382 }
383
384 if events.has_event_type(EventType::Log) {
385 let mut buffered_dispatcher = source_context
386 .dispatcher()
387 .buffered_named("logs")
388 .expect("logs output should exist");
389
390 for log_event in events.extract(Event::is_log) {
391 if let Err(e) = buffered_dispatcher.push(log_event).await {
392 error!(error = %e, "Failed to dispatch log(s).");
393 }
394 }
395
396 if let Err(e) = buffered_dispatcher.flush().await {
397 error!(error = %e, "Failed to flush log(s).");
398 }
399 }
400
401 let len = events.len();
402 if let Err(e) = source_context.dispatcher().dispatch_named("metrics", events).await {
403 error!(error = %e, "Failed to dispatch metric events.");
404 } else {
405 debug!(events_len = len, "Dispatched metric events.");
406 }
407}
408
409async fn run_converter(
410 mut receiver: mpsc::Receiver<OtlpResource>, source_context: SourceContext,
411 origin_tag_resolver: Option<OtlpOriginTagResolver>, shutdown_handle: DynamicShutdownHandle,
412 mut metrics_translator: OtlpMetricsTranslator, metrics: Metrics, traces_config: TracesConfig,
413) {
414 tokio::pin!(shutdown_handle);
415 debug!("OTLP resource converter task started.");
416
417 let mut buffer_flush = interval(Duration::from_millis(100));
420 buffer_flush.set_missed_tick_behavior(MissedTickBehavior::Delay);
421
422 let mut event_buffer_manager = EventBufferManager::default();
423
424 let traces_translator = OtlpTracesTranslator::new(traces_config);
425
426 loop {
427 select! {
428 Some(otlp_resource) = receiver.recv() => {
429 match otlp_resource {
430 OtlpResource::Metrics(resource_metrics) => {
431 match metrics_translator.map_metrics(resource_metrics, &metrics) {
432 Ok(events) => {
433 for event in events {
434 if let Some(event_buffer) = event_buffer_manager.try_push(event) {
435 dispatch_events(event_buffer, &source_context).await;
436 }
437 }
438 }
439 Err(e) => {
440 error!(error = %e, "Failed to handle resource metrics.");
441 }
442 }
443 }
444 OtlpResource::Logs(resource_logs) => {
445 let translator = OtlpLogsTranslator::from_resource_logs(resource_logs, origin_tag_resolver.as_ref());
446 for log_event in translator {
447 metrics.logs_received().increment(1);
448
449 if let Some(event_buffer) = event_buffer_manager.try_push(log_event) {
450 dispatch_events(event_buffer, &source_context).await;
451 }
452 }
453 }
454 OtlpResource::Traces(resource_spans) => {
455 let trace_events =
456 traces_translator.translate_resource_spans(resource_spans, &metrics);
457 for trace_event in trace_events {
458 if let Some(event_buffer) = event_buffer_manager.try_push(trace_event) {
459 dispatch_events(event_buffer, &source_context).await;
460 }
461 }
462 }
463 }
464 },
465 _ = buffer_flush.tick() => {
466 if let Some(event_buffer) = event_buffer_manager.consume() {
467 dispatch_events(event_buffer, &source_context).await;
468 }
469 },
470 _ = &mut shutdown_handle => {
471 debug!("Converter task received shutdown signal.");
472 break;
473 }
474 }
475 }
476
477 if let Some(event_buffer) = event_buffer_manager.consume() {
478 dispatch_events(event_buffer, &source_context).await;
479 }
480
481 debug!("OTLP resource converter task stopped.");
482}