saluki_components/sources/otlp/
mod.rs1use std::sync::Arc;
2use std::sync::LazyLock;
3use std::time::Duration;
4
5use async_trait::async_trait;
6use axum::body::Bytes;
7use bytesize::ByteSize;
8use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
9use otlp_protos::opentelemetry::proto::collector::logs::v1::ExportLogsServiceRequest;
10use otlp_protos::opentelemetry::proto::collector::metrics::v1::ExportMetricsServiceRequest;
11use otlp_protos::opentelemetry::proto::collector::trace::v1::ExportTraceServiceRequest;
12use otlp_protos::opentelemetry::proto::logs::v1::ResourceLogs as OtlpResourceLogs;
13use otlp_protos::opentelemetry::proto::metrics::v1::ResourceMetrics as OtlpResourceMetrics;
14use otlp_protos::opentelemetry::proto::trace::v1::ResourceSpans as OtlpResourceSpans;
15use prost::Message;
16use saluki_common::task::HandleExt as _;
17use saluki_config::GenericConfiguration;
18use saluki_context::ContextResolver;
19use saluki_core::topology::interconnect::BufferedDispatcher;
20use saluki_core::topology::shutdown::{DynamicShutdownCoordinator, DynamicShutdownHandle};
21use saluki_core::{
22 components::{
23 sources::{Source, SourceBuilder, SourceContext},
24 ComponentContext,
25 },
26 data_model::event::EventType,
27 topology::{EventsBuffer, OutputDefinition},
28};
29use saluki_env::WorkloadProvider;
30use saluki_error::ErrorContext as _;
31use saluki_error::{generic_error, GenericError};
32use saluki_io::net::ListenAddress;
33use serde::Deserialize;
34use tokio::select;
35use tokio::sync::mpsc;
36use tokio::time::{interval, MissedTickBehavior};
37use tracing::{debug, error};
38
39use crate::common::otlp::config::OtlpConfig;
40use crate::common::otlp::{build_metrics, Metrics, OtlpHandler, OtlpServerBuilder};
41
42mod logs;
43mod metrics;
44mod resolver;
45use self::logs::translator::OtlpLogsTranslator;
46use self::metrics::translator::OtlpMetricsTranslator;
47use self::resolver::build_context_resolver;
48use crate::common::otlp::origin::OtlpOriginTagResolver;
49use crate::common::otlp::traces::translator::OtlpTracesTranslator;
50
51const fn default_context_string_interner_size() -> ByteSize {
52 ByteSize::mib(2)
53}
54
55const fn default_cached_contexts_limit() -> usize {
56 500_000
57}
58
59const fn default_cached_tagsets_limit() -> usize {
60 500_000
61}
62
63const fn default_allow_context_heap_allocations() -> bool {
64 true
65}
66
67#[derive(Deserialize, Default)]
69pub struct OtlpConfiguration {
70 otlp_config: OtlpConfig,
71
72 #[serde(
78 rename = "otlp_string_interner_size",
79 default = "default_context_string_interner_size"
80 )]
81 context_string_interner_bytes: ByteSize,
82
83 #[serde(rename = "otlp_cached_contexts_limit", default = "default_cached_contexts_limit")]
91 cached_contexts_limit: usize,
92
93 #[serde(rename = "otlp_cached_tagsets_limit", default = "default_cached_tagsets_limit")]
101 cached_tagsets_limit: usize,
102
103 #[serde(
113 rename = "otlp_allow_context_heap_allocs",
114 default = "default_allow_context_heap_allocations"
115 )]
116 allow_context_heap_allocations: bool,
117
118 #[serde(skip)]
120 workload_provider: Option<Arc<dyn WorkloadProvider + Send + Sync>>,
121}
122
123impl OtlpConfiguration {
124 pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
126 Ok(config.as_typed()?)
127 }
128
129 pub fn with_workload_provider<W>(mut self, workload_provider: W) -> Self
135 where
136 W: WorkloadProvider + Send + Sync + 'static,
137 {
138 self.workload_provider = Some(Arc::new(workload_provider));
139 self
140 }
141}
142
143#[async_trait]
144impl SourceBuilder for OtlpConfiguration {
145 fn outputs(&self) -> &[OutputDefinition<EventType>] {
146 static OUTPUTS: LazyLock<Vec<OutputDefinition<EventType>>> = LazyLock::new(|| {
147 vec![
148 OutputDefinition::named_output("metrics", EventType::Metric),
149 OutputDefinition::named_output("logs", EventType::Log),
150 OutputDefinition::named_output("traces", EventType::Trace),
151 ]
152 });
153
154 &OUTPUTS
155 }
156
157 async fn build(&self, context: ComponentContext) -> Result<Box<dyn Source + Send>, GenericError> {
158 if !self.otlp_config.metrics.enabled && !self.otlp_config.logs.enabled && !self.otlp_config.traces.enabled {
159 return Err(generic_error!(
160 "OTLP metrics, logs and traces support is disabled. Please enable at least one of them."
161 ));
162 }
163
164 let grpc_listen_str = format!(
165 "{}://{}",
166 self.otlp_config.receiver.protocols.grpc.transport, self.otlp_config.receiver.protocols.grpc.endpoint
167 );
168 let grpc_endpoint = ListenAddress::try_from(grpc_listen_str.as_str())
169 .map_err(|e| generic_error!("Invalid gRPC endpoint address '{}': {}", grpc_listen_str, e))?;
170
171 if !matches!(grpc_endpoint, ListenAddress::Tcp(_)) {
173 return Err(generic_error!("Only 'tcp' transport is supported for OTLP gRPC"));
174 }
175
176 let http_socket_addr = self.otlp_config.receiver.protocols.http.endpoint.parse().map_err(|e| {
177 generic_error!(
178 "Invalid HTTP endpoint address '{}': {}",
179 self.otlp_config.receiver.protocols.http.endpoint,
180 e
181 )
182 })?;
183
184 let maybe_origin_tags_resolver = self.workload_provider.clone().map(OtlpOriginTagResolver::new);
185
186 let context_resolver = build_context_resolver(self, &context, maybe_origin_tags_resolver.clone())?;
187 let metrics_translator_config = metrics::config::OtlpMetricsTranslatorConfig::default().with_remapping(true);
188 let traces_interner_size =
189 std::num::NonZeroUsize::new(self.otlp_config.traces.string_interner_bytes.as_u64() as usize)
190 .ok_or_else(|| generic_error!("otlp_config.traces.string_interner_size must be greater than 0"))?;
191 let traces_translator = OtlpTracesTranslator::new(self.otlp_config.traces.clone(), traces_interner_size);
192 let grpc_max_recv_msg_size_bytes =
193 self.otlp_config.receiver.protocols.grpc.max_recv_msg_size_mib as usize * 1024 * 1024;
194 let metrics = build_metrics(&context);
195
196 Ok(Box::new(Otlp {
197 context_resolver,
198 origin_tag_resolver: maybe_origin_tags_resolver,
199 grpc_endpoint,
200 http_endpoint: ListenAddress::Tcp(http_socket_addr),
201 grpc_max_recv_msg_size_bytes,
202 metrics_translator_config,
203 traces_translator,
204 metrics,
205 }))
206 }
207}
208
209impl MemoryBounds for OtlpConfiguration {
210 fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
211 builder
212 .minimum()
213 .with_single_value::<Otlp>("source struct")
214 .with_single_value::<SourceHandler>("source handler");
215 }
216}
217
218pub struct Otlp {
219 context_resolver: ContextResolver,
220 origin_tag_resolver: Option<OtlpOriginTagResolver>,
221 grpc_endpoint: ListenAddress,
222 http_endpoint: ListenAddress,
223 grpc_max_recv_msg_size_bytes: usize,
224 metrics_translator_config: metrics::config::OtlpMetricsTranslatorConfig,
225 traces_translator: OtlpTracesTranslator,
226 metrics: Metrics, }
228
229#[async_trait]
230impl Source for Otlp {
231 async fn run(self: Box<Self>, mut context: SourceContext) -> Result<(), GenericError> {
232 let Self {
233 context_resolver,
234 origin_tag_resolver,
235 grpc_endpoint,
236 http_endpoint,
237 grpc_max_recv_msg_size_bytes,
238 metrics_translator_config,
239 traces_translator,
240 metrics,
241 } = *self;
242
243 let mut global_shutdown = context.take_shutdown_handle();
244 let mut health = context.take_health_handle();
245 let memory_limiter = context.topology_context().memory_limiter();
246
247 let (tx, rx) = mpsc::channel::<OtlpResource>(1024);
249
250 let mut converter_shutdown_coordinator = DynamicShutdownCoordinator::default();
251
252 let metrics_translator = OtlpMetricsTranslator::new(metrics_translator_config, context_resolver);
253
254 let thread_pool_handle = context.topology_context().global_thread_pool().clone();
255
256 thread_pool_handle.spawn_traced_named(
258 "otlp-resource-converter",
259 run_converter(
260 rx,
261 context.clone(),
262 origin_tag_resolver,
263 converter_shutdown_coordinator.register(),
264 metrics_translator,
265 metrics.clone(),
266 traces_translator,
267 ),
268 );
269
270 let handler = SourceHandler::new(tx);
271 let server_builder = OtlpServerBuilder::new(http_endpoint, grpc_endpoint, grpc_max_recv_msg_size_bytes);
272
273 let (http_shutdown, mut http_error) = server_builder
274 .build(handler, memory_limiter.clone(), thread_pool_handle, metrics)
275 .await?;
276
277 health.mark_ready();
278 debug!("OTLP source started.");
279
280 loop {
282 select! {
283 _ = &mut global_shutdown => {
284 debug!("Received shutdown signal.");
285 break
286 },
287 error = &mut http_error => {
288 if let Some(error) = error {
289 debug!(%error, "HTTP server error.");
290 }
291 break;
292 },
293 _ = health.live() => continue,
294 }
295 }
296
297 debug!("Stopping OTLP source...");
298
299 http_shutdown.shutdown();
300 converter_shutdown_coordinator.shutdown().await;
301
302 debug!("OTLP source stopped.");
303
304 Ok(())
305 }
306}
307
308enum OtlpResource {
309 Metrics(OtlpResourceMetrics),
310 Logs(OtlpResourceLogs),
311 Traces(OtlpResourceSpans),
312}
313
314struct SourceHandler {
316 tx: mpsc::Sender<OtlpResource>,
317}
318
319impl SourceHandler {
320 fn new(tx: mpsc::Sender<OtlpResource>) -> Self {
321 Self { tx }
322 }
323}
324
325#[async_trait]
326impl OtlpHandler for SourceHandler {
327 async fn handle_metrics(&self, body: Bytes) -> Result<(), GenericError> {
328 let request =
329 ExportMetricsServiceRequest::decode(body).error_context("Failed to decode metrics export request.")?;
330
331 for resource_metrics in request.resource_metrics {
332 self.tx
333 .send(OtlpResource::Metrics(resource_metrics))
334 .await
335 .error_context("Failed to send resource metrics to converter: channel is closed.")?;
336 }
337 Ok(())
338 }
339
340 async fn handle_logs(&self, body: Bytes) -> Result<(), GenericError> {
341 let request = ExportLogsServiceRequest::decode(body).error_context("Failed to decode logs export request.")?;
342
343 for resource_logs in request.resource_logs {
344 self.tx
345 .send(OtlpResource::Logs(resource_logs))
346 .await
347 .error_context("Failed to send resource logs to converter: channel is closed.")?;
348 }
349 Ok(())
350 }
351
352 async fn handle_traces(&self, body: Bytes) -> Result<(), GenericError> {
353 let request =
354 ExportTraceServiceRequest::decode(body).error_context("Failed to decode trace export request.")?;
355
356 for resource_spans in request.resource_spans {
357 self.tx
358 .send(OtlpResource::Traces(resource_spans))
359 .await
360 .error_context("Failed to send resource spans to converter: channel is closed.")?;
361 }
362 Ok(())
363 }
364}
365
366async fn run_converter(
367 mut receiver: mpsc::Receiver<OtlpResource>, source_context: SourceContext,
368 origin_tag_resolver: Option<OtlpOriginTagResolver>, shutdown_handle: DynamicShutdownHandle,
369 mut metrics_translator: OtlpMetricsTranslator, metrics: Metrics, mut traces_translator: OtlpTracesTranslator,
370) {
371 tokio::pin!(shutdown_handle);
372 debug!("OTLP resource converter task started.");
373
374 let mut buffer_flush = interval(Duration::from_millis(100));
377 buffer_flush.set_missed_tick_behavior(MissedTickBehavior::Delay);
378
379 let mut metrics_dispatcher: Option<BufferedDispatcher<'_, EventsBuffer>> = None;
380 let mut logs_dispatcher: Option<BufferedDispatcher<'_, EventsBuffer>> = None;
381 let mut traces_dispatcher: Option<BufferedDispatcher<'_, EventsBuffer>> = None;
382
383 loop {
384 select! {
385 Some(otlp_resource) = receiver.recv() => {
386 match otlp_resource {
387 OtlpResource::Metrics(resource_metrics) => {
388 match metrics_translator.translate_metrics(resource_metrics, &metrics) {
389 Ok(events) => {
390 for event in events {
391 let dispatcher = metrics_dispatcher.get_or_insert_with(|| {
392 source_context
393 .dispatcher()
394 .buffered_named("metrics")
395 .expect("metrics output should exist")
396 });
397 if let Err(e) = dispatcher.push(event).await {
398 error!(error = %e, "Failed to dispatch metric event.");
399 }
400 }
401 }
402 Err(e) => {
403 error!(error = %e, "Failed to handle resource metrics.");
404 }
405 }
406 }
407 OtlpResource::Logs(resource_logs) => {
408 let translator = OtlpLogsTranslator::from_resource_logs(resource_logs, origin_tag_resolver.as_ref());
409 for log_event in translator {
410 metrics.logs_received().increment(1);
411
412 let dispatcher = logs_dispatcher.get_or_insert_with(|| {
413 source_context
414 .dispatcher()
415 .buffered_named("logs")
416 .expect("logs output should exist")
417 });
418 if let Err(e) = dispatcher.push(log_event).await {
419 error!(error = %e, "Failed to dispatch log event.");
420 }
421 }
422 }
423 OtlpResource::Traces(resource_spans) => {
424 for trace_event in traces_translator.translate_spans(resource_spans, &metrics) {
425 let dispatcher = traces_dispatcher.get_or_insert_with(|| {
426 source_context
427 .dispatcher()
428 .buffered_named("traces")
429 .expect("traces output should exist")
430 });
431 if let Err(e) = dispatcher.push(trace_event).await {
432 error!(error = %e, "Failed to dispatch trace event.");
433 }
434 }
435 }
436 }
437 },
438 _ = buffer_flush.tick() => {
439 if let Some(dispatcher) = metrics_dispatcher.take() {
440 if let Err(e) = dispatcher.flush().await {
441 error!(error = %e, "Failed to flush metric events.");
442 }
443 }
444 if let Some(dispatcher) = logs_dispatcher.take() {
445 if let Err(e) = dispatcher.flush().await {
446 error!(error = %e, "Failed to flush log events.");
447 }
448 }
449 if let Some(dispatcher) = traces_dispatcher.take() {
450 if let Err(e) = dispatcher.flush().await {
451 error!(error = %e, "Failed to flush trace events.");
452 }
453 }
454 },
455 _ = &mut shutdown_handle => {
456 debug!("Converter task received shutdown signal.");
457 break;
458 }
459 }
460 }
461
462 if let Some(dispatcher) = metrics_dispatcher.take() {
463 if let Err(e) = dispatcher.flush().await {
464 error!(error = %e, "Failed to flush metric events.");
465 }
466 }
467 if let Some(dispatcher) = logs_dispatcher.take() {
468 if let Err(e) = dispatcher.flush().await {
469 error!(error = %e, "Failed to flush log events.");
470 }
471 }
472 if let Some(dispatcher) = traces_dispatcher.take() {
473 if let Err(e) = dispatcher.flush().await {
474 error!(error = %e, "Failed to flush trace events.");
475 }
476 }
477
478 debug!("OTLP resource converter task stopped.");
479}