1use std::sync::Arc;
2use std::sync::LazyLock;
3use std::time::Duration;
4
5use ::metrics::Counter;
6use async_trait::async_trait;
7use axum::body::Bytes;
8use axum::extract::State;
9use axum::http::StatusCode;
10use axum::routing::post;
11use axum::Router;
12use bytesize::ByteSize;
13use memory_accounting::{MemoryBounds, MemoryBoundsBuilder, MemoryLimiter};
14use otlp_protos::opentelemetry::proto::collector::logs::v1::logs_service_server::{LogsService, LogsServiceServer};
15use otlp_protos::opentelemetry::proto::collector::logs::v1::{ExportLogsServiceRequest, ExportLogsServiceResponse};
16use otlp_protos::opentelemetry::proto::collector::metrics::v1::metrics_service_server::{
17 MetricsService, MetricsServiceServer,
18};
19use otlp_protos::opentelemetry::proto::collector::metrics::v1::{
20 ExportMetricsServiceRequest, ExportMetricsServiceResponse,
21};
22use otlp_protos::opentelemetry::proto::logs::v1::ResourceLogs as OtlpResourceLogs;
23use otlp_protos::opentelemetry::proto::metrics::v1::ResourceMetrics as OtlpResourceMetrics;
24use prost::Message;
25use saluki_common::task::HandleExt as _;
26use saluki_config::GenericConfiguration;
27use saluki_context::ContextResolver;
28use saluki_core::observability::ComponentMetricsExt;
29use saluki_core::topology::interconnect::EventBufferManager;
30use saluki_core::topology::shutdown::{DynamicShutdownCoordinator, DynamicShutdownHandle};
31use saluki_core::{
32 components::{
33 sources::{Source, SourceBuilder, SourceContext},
34 ComponentContext,
35 },
36 data_model::event::{Event, EventType},
37 topology::{EventsBuffer, OutputDefinition},
38};
39use saluki_env::WorkloadProvider;
40use saluki_error::{generic_error, GenericError};
41use saluki_io::net::listener::ConnectionOrientedListener;
42use saluki_io::net::server::http::HttpServer;
43use saluki_io::net::util::hyper::TowerToHyperService;
44use saluki_io::net::ListenAddress;
45use saluki_metrics::MetricsBuilder;
46use serde::Deserialize;
47use tokio::select;
48use tokio::sync::mpsc;
49use tokio::time::{interval, MissedTickBehavior};
50use tonic::transport::Server;
51use tonic::{Request, Response, Status};
52use tracing::{debug, error};
53
54mod attributes;
55mod logs;
56mod metrics;
57mod origin;
58mod resolver;
59use self::logs::translator::OtlpLogsTranslator;
60use self::metrics::translator::OtlpMetricsTranslator;
61use self::origin::OtlpOriginTagResolver;
62use self::resolver::build_context_resolver;
63
64const fn default_context_string_interner_size() -> ByteSize {
65 ByteSize::mib(2)
66}
67
68const fn default_cached_contexts_limit() -> usize {
69 500_000
70}
71
72const fn default_cached_tagsets_limit() -> usize {
73 500_000
74}
75
76const fn default_allow_context_heap_allocations() -> bool {
77 true
78}
79
80#[derive(Deserialize, Default)]
82pub struct OtlpConfiguration {
83 otlp_config: OtlpConfig,
84
85 #[serde(
91 rename = "otlp_string_interner_size",
92 default = "default_context_string_interner_size"
93 )]
94 context_string_interner_bytes: ByteSize,
95
96 #[serde(rename = "otlp_cached_contexts_limit", default = "default_cached_contexts_limit")]
104 cached_contexts_limit: usize,
105
106 #[serde(rename = "otlp_cached_tagsets_limit", default = "default_cached_tagsets_limit")]
114 cached_tagsets_limit: usize,
115
116 #[serde(
126 rename = "otlp_allow_context_heap_allocs",
127 default = "default_allow_context_heap_allocations"
128 )]
129 allow_context_heap_allocations: bool,
130
131 #[serde(skip)]
133 workload_provider: Option<Arc<dyn WorkloadProvider + Send + Sync>>,
134}
135
136#[derive(Deserialize, Debug, Default)]
137pub struct OtlpConfig {
138 #[serde(default)]
139 receiver: Receiver,
140 #[serde(default)]
141 metrics: MetricsConfig,
142 #[serde(default)]
143 logs: LogsConfig,
144}
145
146#[derive(Deserialize, Debug, Default)]
147pub struct Receiver {
148 #[serde(default)]
149 protocols: Protocols,
150}
151
152#[derive(Deserialize, Debug, Default)]
153struct Protocols {
154 #[serde(default)]
155 grpc: GrpcConfig,
156
157 #[serde(default)]
158 http: HttpConfig,
159}
160
161#[derive(Deserialize, Debug)]
162pub struct GrpcConfig {
163 #[serde(default = "default_grpc_endpoint")]
167 pub endpoint: String,
168
169 #[serde(default = "default_transport")]
173 pub transport: String,
174
175 #[serde(default = "default_max_recv_msg_size_mib")]
179 #[serde(rename = "max_recv_msg_size_mib")]
180 pub max_recv_msg_size_mib: u64,
181}
182
183#[derive(Deserialize, Debug)]
184pub struct HttpConfig {
185 #[serde(default = "default_http_endpoint")]
189 pub endpoint: String,
190}
191
192impl Default for GrpcConfig {
193 fn default() -> Self {
194 Self {
195 endpoint: default_grpc_endpoint(),
196 transport: default_transport(),
197 max_recv_msg_size_mib: default_max_recv_msg_size_mib(),
198 }
199 }
200}
201
202impl Default for HttpConfig {
203 fn default() -> Self {
204 Self {
205 endpoint: default_http_endpoint(),
206 }
207 }
208}
209
210fn default_grpc_endpoint() -> String {
211 "0.0.0.0:4317".to_string()
212}
213
214fn default_http_endpoint() -> String {
215 "0.0.0.0:4318".to_string()
216}
217
218fn default_transport() -> String {
219 "tcp".to_string()
220}
221
222fn default_max_recv_msg_size_mib() -> u64 {
223 4
224}
225
226enum OtlpResource {
227 Metrics(OtlpResourceMetrics),
228 Logs(OtlpResourceLogs),
229}
230
231#[derive(Deserialize, Debug)]
232pub struct LogsConfig {
233 #[serde(default = "default_logs_enabled")]
237 pub enabled: bool,
238}
239
240fn default_logs_enabled() -> bool {
241 true
242}
243
244impl Default for LogsConfig {
245 fn default() -> Self {
246 Self {
247 enabled: default_logs_enabled(),
248 }
249 }
250}
251
252#[derive(Deserialize, Debug)]
253pub struct MetricsConfig {
254 #[serde(default = "default_metrics_enabled")]
258 pub enabled: bool,
259}
260
261fn default_metrics_enabled() -> bool {
262 true
263}
264
265impl Default for MetricsConfig {
266 fn default() -> Self {
267 Self {
268 enabled: default_metrics_enabled(),
269 }
270 }
271}
272
273impl OtlpConfiguration {
274 pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
276 Ok(config.as_typed()?)
277 }
278
279 pub fn with_workload_provider<W>(mut self, workload_provider: W) -> Self
285 where
286 W: WorkloadProvider + Send + Sync + 'static,
287 {
288 self.workload_provider = Some(Arc::new(workload_provider));
289 self
290 }
291}
292
293pub struct Metrics {
294 metrics_received: Counter,
295 logs_received: Counter,
296}
297
298impl Metrics {
299 fn metrics_received(&self) -> &Counter {
300 &self.metrics_received
301 }
302
303 fn logs_received(&self) -> &Counter {
304 &self.logs_received
305 }
306
307 #[cfg(test)]
309 pub fn for_tests() -> Self {
310 Metrics {
311 metrics_received: Counter::noop(),
312 logs_received: Counter::noop(),
313 }
314 }
315}
316
317fn build_metrics(component_context: &ComponentContext) -> Metrics {
318 let builder = MetricsBuilder::from_component_context(component_context);
319
320 Metrics {
321 metrics_received: builder
322 .register_debug_counter_with_tags("component_events_received_total", [("message_type", "otlp_metrics")]),
323 logs_received: builder
324 .register_debug_counter_with_tags("component_events_received_total", [("message_type", "otlp_logs")]),
325 }
326}
327
328#[async_trait]
329impl SourceBuilder for OtlpConfiguration {
330 fn outputs(&self) -> &[OutputDefinition] {
331 static OUTPUTS: LazyLock<Vec<OutputDefinition>> = LazyLock::new(|| {
332 vec![
333 OutputDefinition::named_output("metrics", EventType::Metric),
334 OutputDefinition::named_output("logs", EventType::Log),
335 ]
336 });
337
338 &OUTPUTS
339 }
340
341 async fn build(&self, context: ComponentContext) -> Result<Box<dyn Source + Send>, GenericError> {
342 if !self.otlp_config.metrics.enabled && !self.otlp_config.logs.enabled {
343 return Err(generic_error!(
344 "OTLP metrics and logs support is disabled. Please enable at least one of them."
345 ));
346 }
347
348 let grpc_listen_str = format!(
349 "{}://{}",
350 self.otlp_config.receiver.protocols.grpc.transport, self.otlp_config.receiver.protocols.grpc.endpoint
351 );
352 let grpc_endpoint = ListenAddress::try_from(grpc_listen_str.as_str())
353 .map_err(|e| generic_error!("Invalid gRPC endpoint address '{}': {}", grpc_listen_str, e))?;
354
355 if !matches!(grpc_endpoint, ListenAddress::Tcp(_)) {
357 return Err(generic_error!("Only 'tcp' transport is supported for OTLP gRPC"));
358 }
359
360 let http_socket_addr = self.otlp_config.receiver.protocols.http.endpoint.parse().map_err(|e| {
361 generic_error!(
362 "Invalid HTTP endpoint address '{}': {}",
363 self.otlp_config.receiver.protocols.http.endpoint,
364 e
365 )
366 })?;
367
368 let maybe_origin_tags_resolver = self.workload_provider.clone().map(OtlpOriginTagResolver::new);
369
370 let context_resolver = build_context_resolver(self, &context, maybe_origin_tags_resolver.clone())?;
371 let translator_config = metrics::config::OtlpTranslatorConfig::default().with_remapping(true);
372 let grpc_max_recv_msg_size_bytes =
373 self.otlp_config.receiver.protocols.grpc.max_recv_msg_size_mib as usize * 1024 * 1024;
374 let metrics = build_metrics(&context);
375
376 Ok(Box::new(Otlp {
377 context_resolver,
378 origin_tag_resolver: maybe_origin_tags_resolver,
379 grpc_endpoint,
380 http_endpoint: ListenAddress::Tcp(http_socket_addr),
381 grpc_max_recv_msg_size_bytes,
382 translator_config,
383 metrics,
384 }))
385 }
386}
387
388impl MemoryBounds for OtlpConfiguration {
389 fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
390 builder
391 .minimum()
392 .with_single_value::<Otlp>("source struct")
393 .with_single_value::<GrpcService>("gRPC service");
394 }
395}
396
397pub struct Otlp {
398 context_resolver: ContextResolver,
399 origin_tag_resolver: Option<OtlpOriginTagResolver>,
400 grpc_endpoint: ListenAddress,
401 http_endpoint: ListenAddress,
402 grpc_max_recv_msg_size_bytes: usize,
403 translator_config: metrics::config::OtlpTranslatorConfig,
404 metrics: Metrics, }
406
407#[async_trait]
408impl Source for Otlp {
409 async fn run(self: Box<Self>, mut context: SourceContext) -> Result<(), GenericError> {
410 let mut global_shutdown = context.take_shutdown_handle();
411 let mut health = context.take_health_handle();
412 let memory_limiter = context.topology_context().memory_limiter();
413
414 let (tx, rx) = mpsc::channel::<OtlpResource>(1024);
416
417 let mut converter_shutdown_coordinator = DynamicShutdownCoordinator::default();
418
419 let metrics_translator = OtlpMetricsTranslator::new(self.translator_config, self.context_resolver);
420
421 let thread_pool_handle = context.topology_context().global_thread_pool().clone();
422
423 thread_pool_handle.spawn_traced_named(
425 "otlp-resource-converter",
426 run_converter(
427 rx,
428 context.clone(),
429 self.origin_tag_resolver,
430 converter_shutdown_coordinator.register(),
431 metrics_translator,
432 self.metrics,
433 ),
434 );
435
436 let grpc_metrics_server = MetricsServiceServer::new(GrpcService::new(tx.clone(), memory_limiter.clone()))
438 .max_decoding_message_size(self.grpc_max_recv_msg_size_bytes);
439 let grpc_logs_server = LogsServiceServer::new(GrpcService::new(tx.clone(), memory_limiter.clone()))
440 .max_decoding_message_size(self.grpc_max_recv_msg_size_bytes);
441 let grpc_server = Server::builder()
442 .add_service(grpc_metrics_server)
443 .add_service(grpc_logs_server);
444
445 let grpc_socket_addr = self
446 .grpc_endpoint
447 .as_local_connect_addr()
448 .ok_or_else(|| generic_error!("OTLP gRPC endpoint is not a local TCP address."))?;
449 thread_pool_handle.spawn_traced_named("otlp-grpc-server", grpc_server.serve(grpc_socket_addr));
450 debug!(endpoint = %self.grpc_endpoint, "OTLP gRPC server started.");
451
452 let service = TowerToHyperService::new(
454 Router::new()
455 .route("/v1/metrics", post(http_metric_handler))
456 .route("/v1/logs", post(http_logs_handler))
457 .with_state((tx, memory_limiter.clone())),
458 );
459 let http_listener = ConnectionOrientedListener::from_listen_address(self.http_endpoint)
460 .await
461 .map_err(|e| generic_error!("Failed to create OTLP HTTP listener: {}", e))?;
462 let http_server = HttpServer::from_listener(http_listener, service);
463 let (http_shutdown, mut http_error) = http_server.listen();
464
465 health.mark_ready();
466 debug!("OTLP source started.");
467
468 loop {
470 select! {
471 _ = &mut global_shutdown => {
472 debug!("Received shutdown signal.");
473 break
474 },
475 error = &mut http_error => {
476 if let Some(error) = error {
477 debug!(%error, "HTTP server error.");
478 }
479 break;
480 },
481 _ = health.live() => continue,
482 }
483 }
484
485 debug!("Stopping OTLP source...");
486
487 http_shutdown.shutdown();
488 converter_shutdown_coordinator.shutdown().await;
489
490 debug!("OTLP source stopped.");
491
492 Ok(())
493 }
494}
495
496async fn http_logs_handler(
497 State((tx, memory_limiter)): State<(mpsc::Sender<OtlpResource>, MemoryLimiter)>, body: Bytes,
498) -> (StatusCode, &'static str) {
499 memory_limiter.wait_for_capacity().await;
500 match ExportLogsServiceRequest::decode(body) {
501 Ok(request) => {
502 for resource_logs in request.resource_logs {
503 if tx.send(OtlpResource::Logs(resource_logs)).await.is_err() {
504 error!("Failed to send resource logs to converter; channel is closed.");
505 return (StatusCode::INTERNAL_SERVER_ERROR, "Internal processing channel closed.");
506 }
507 }
508 (StatusCode::OK, "OK")
509 }
510 Err(e) => {
511 error!(error = %e, "Failed to decode OTLP protobuf request.");
512 (StatusCode::BAD_REQUEST, "Bad Request: Invalid protobuf.")
513 }
514 }
515}
516
517async fn http_metric_handler(
518 State((tx, memory_limiter)): State<(mpsc::Sender<OtlpResource>, MemoryLimiter)>, body: Bytes,
519) -> (StatusCode, &'static str) {
520 memory_limiter.wait_for_capacity().await;
521 match ExportMetricsServiceRequest::decode(body) {
522 Ok(request) => {
523 for resource_metrics in request.resource_metrics {
524 if tx.send(OtlpResource::Metrics(resource_metrics)).await.is_err() {
525 error!("Failed to send resource metrics to converter; channel is closed.");
526 return (StatusCode::INTERNAL_SERVER_ERROR, "Internal processing channel closed.");
527 }
528 }
529 (StatusCode::OK, "OK")
530 }
531 Err(e) => {
532 error!(error = %e, "Failed to decode OTLP protobuf request.");
533 (StatusCode::BAD_REQUEST, "Bad Request: Invalid protobuf.")
534 }
535 }
536}
537
538struct GrpcService {
539 sender: mpsc::Sender<OtlpResource>,
540 memory_limiter: MemoryLimiter,
541}
542
543impl GrpcService {
544 fn new(sender: mpsc::Sender<OtlpResource>, memory_limiter: MemoryLimiter) -> Self {
545 Self { sender, memory_limiter }
546 }
547}
548
549#[async_trait]
550impl MetricsService for GrpcService {
551 async fn export(
552 &self, request: Request<ExportMetricsServiceRequest>,
553 ) -> Result<Response<ExportMetricsServiceResponse>, Status> {
554 self.memory_limiter.wait_for_capacity().await;
555 let request = request.into_inner();
556
557 for resource_metrics in request.resource_metrics {
558 if self.sender.send(OtlpResource::Metrics(resource_metrics)).await.is_err() {
559 error!("Failed to send resource metrics to converter; channel is closed.");
560 return Err(Status::internal("Internal processing channel closed."));
561 }
562 }
563
564 Ok(Response::new(ExportMetricsServiceResponse { partial_success: None }))
565 }
566}
567
568#[async_trait]
569impl LogsService for GrpcService {
570 async fn export(
571 &self, request: Request<ExportLogsServiceRequest>,
572 ) -> Result<Response<ExportLogsServiceResponse>, Status> {
573 self.memory_limiter.wait_for_capacity().await;
574 let request = request.into_inner();
575
576 for resource_logs in request.resource_logs {
577 if self.sender.send(OtlpResource::Logs(resource_logs)).await.is_err() {
578 error!("Failed to send resource logs to converter; channel is closed.");
579 return Err(Status::internal("Internal processing channel closed."));
580 }
581 }
582
583 Ok(Response::new(ExportLogsServiceResponse { partial_success: None }))
584 }
585}
586
587async fn dispatch_events(mut events: EventsBuffer, source_context: &SourceContext) {
588 if events.is_empty() {
589 return;
590 }
591
592 if events.has_event_type(EventType::Log) {
593 let mut buffered_dispatcher = source_context
594 .dispatcher()
595 .buffered_named("logs")
596 .expect("logs output should exist");
597
598 for log_event in events.extract(Event::is_log) {
599 if let Err(e) = buffered_dispatcher.push(log_event).await {
600 error!(error = %e, "Failed to dispatch log(s).");
601 }
602 }
603
604 if let Err(e) = buffered_dispatcher.flush().await {
605 error!(error = %e, "Failed to flush log(s).");
606 }
607 }
608
609 let len = events.len();
610 if let Err(e) = source_context.dispatcher().dispatch_named("metrics", events).await {
611 error!(error = %e, "Failed to dispatch metric events.");
612 } else {
613 debug!(events_len = len, "Dispatched metric events.");
614 }
615}
616
617async fn run_converter(
618 mut receiver: mpsc::Receiver<OtlpResource>, source_context: SourceContext,
619 origin_tag_resolver: Option<OtlpOriginTagResolver>, shutdown_handle: DynamicShutdownHandle,
620 mut metrics_translator: OtlpMetricsTranslator, metrics: Metrics,
621) {
622 tokio::pin!(shutdown_handle);
623 debug!("OTLP resource converter task started.");
624
625 let mut buffer_flush = interval(Duration::from_millis(100));
628 buffer_flush.set_missed_tick_behavior(MissedTickBehavior::Delay);
629
630 let mut event_buffer_manager = EventBufferManager::default();
631
632 loop {
633 select! {
634 Some(otlp_resource) = receiver.recv() => {
635 match otlp_resource {
636 OtlpResource::Metrics(resource_metrics) => {
637 match metrics_translator.map_metrics(resource_metrics, &metrics) {
638 Ok(events) => {
639 for event in events {
640 if let Some(event_buffer) = event_buffer_manager.try_push(event) {
641 dispatch_events(event_buffer, &source_context).await;
642 }
643 }
644 }
645 Err(e) => {
646 error!(error = %e, "Failed to handle resource metrics.");
647 }
648 }
649 }
650 OtlpResource::Logs(resource_logs) => {
651 let translator = OtlpLogsTranslator::from_resource_logs(resource_logs, origin_tag_resolver.as_ref());
652 for log_event in translator {
653 metrics.logs_received().increment(1);
654
655 if let Some(event_buffer) = event_buffer_manager.try_push(log_event) {
656 dispatch_events(event_buffer, &source_context).await;
657 }
658 }
659 }
660 }
661 },
662 _ = buffer_flush.tick() => {
663 if let Some(event_buffer) = event_buffer_manager.consume() {
664 dispatch_events(event_buffer, &source_context).await;
665 }
666 },
667 _ = &mut shutdown_handle => {
668 debug!("Converter task received shutdown signal.");
669 break;
670 }
671 }
672 }
673
674 if let Some(event_buffer) = event_buffer_manager.consume() {
675 dispatch_events(event_buffer, &source_context).await;
676 }
677
678 debug!("OTLP resource converter task stopped.");
679}