saluki_components/relays/otlp/
mod.rs

1use std::sync::LazyLock;
2
3use async_trait::async_trait;
4use axum::body::Bytes;
5use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
6use saluki_common::buf::FrozenChunkedBytesBuffer;
7use saluki_config::GenericConfiguration;
8use saluki_core::components::relays::{Relay, RelayBuilder, RelayContext};
9use saluki_core::components::ComponentContext;
10use saluki_core::data_model::payload::{GrpcPayload, Payload, PayloadMetadata, PayloadType};
11use saluki_core::topology::OutputDefinition;
12use saluki_error::{ErrorContext as _, GenericError};
13use saluki_io::net::ListenAddress;
14use serde::Deserialize;
15use stringtheory::MetaString;
16use tokio::select;
17use tokio::sync::mpsc;
18use tracing::{debug, error};
19
20use crate::common::otlp::config::Receiver;
21use crate::common::otlp::{
22    build_metrics, Metrics, OtlpHandler, OtlpServerBuilder, OTLP_LOGS_GRPC_SERVICE_PATH,
23    OTLP_METRICS_GRPC_SERVICE_PATH, OTLP_TRACES_GRPC_SERVICE_PATH,
24};
25
26/// Configuration for the OTLP relay.
27#[derive(Deserialize, Default)]
28pub struct OtlpRelayConfiguration {
29    #[serde(default)]
30    otlp_config: OtlpRelayConfig,
31}
32
33/// OTLP configuration for the relay.
34#[derive(Deserialize, Default)]
35pub struct OtlpRelayConfig {
36    #[serde(default)]
37    receiver: Receiver,
38}
39
40impl OtlpRelayConfiguration {
41    /// Creates a new `OtlpRelayConfiguration` from the given generic configuration.
42    pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
43        config.as_typed().map_err(Into::into)
44    }
45
46    fn http_endpoint(&self) -> ListenAddress {
47        let transport = &self.otlp_config.receiver.protocols.http.transport;
48        let endpoint = &self.otlp_config.receiver.protocols.http.endpoint;
49        let address = format!("{}://{}", transport, endpoint);
50        ListenAddress::try_from(address).expect("valid HTTP endpoint")
51    }
52
53    fn grpc_endpoint(&self) -> ListenAddress {
54        let transport = &self.otlp_config.receiver.protocols.grpc.transport;
55        let endpoint = &self.otlp_config.receiver.protocols.grpc.endpoint;
56        let address = format!("{}://{}", transport, endpoint);
57        ListenAddress::try_from(address).expect("valid gRPC endpoint")
58    }
59
60    fn grpc_max_recv_msg_size_bytes(&self) -> usize {
61        (self.otlp_config.receiver.protocols.grpc.max_recv_msg_size_mib * 1024 * 1024) as usize
62    }
63}
64
65impl MemoryBounds for OtlpRelayConfiguration {
66    fn specify_bounds(&self, _builder: &mut MemoryBoundsBuilder) {}
67}
68
69#[async_trait]
70impl RelayBuilder for OtlpRelayConfiguration {
71    fn outputs(&self) -> &[OutputDefinition<PayloadType>] {
72        static OUTPUTS: LazyLock<Vec<OutputDefinition<PayloadType>>> = LazyLock::new(|| {
73            vec![
74                OutputDefinition::named_output("metrics", PayloadType::Grpc),
75                OutputDefinition::named_output("logs", PayloadType::Grpc),
76                OutputDefinition::named_output("traces", PayloadType::Grpc),
77            ]
78        });
79        &OUTPUTS
80    }
81
82    async fn build(&self, context: ComponentContext) -> Result<Box<dyn Relay + Send>, GenericError> {
83        Ok(Box::new(OtlpRelay {
84            http_endpoint: self.http_endpoint(),
85            grpc_endpoint: self.grpc_endpoint(),
86            grpc_max_recv_msg_size_bytes: self.grpc_max_recv_msg_size_bytes(),
87            metrics: build_metrics(&context),
88        }))
89    }
90}
91
92/// OTLP relay.
93///
94/// Receives OTLP metrics and logs via gRPC and HTTP, outputting payloads for downstream processing.
95pub struct OtlpRelay {
96    http_endpoint: ListenAddress,
97    grpc_endpoint: ListenAddress,
98    grpc_max_recv_msg_size_bytes: usize,
99    metrics: Metrics,
100}
101
102#[async_trait]
103impl Relay for OtlpRelay {
104    async fn run(self: Box<Self>, mut context: RelayContext) -> Result<(), GenericError> {
105        let Self {
106            http_endpoint,
107            grpc_endpoint,
108            grpc_max_recv_msg_size_bytes,
109            metrics,
110        } = *self;
111
112        let mut global_shutdown = context.take_shutdown_handle();
113        let mut health = context.take_health_handle();
114        let global_thread_pool = context.topology_context().global_thread_pool().clone();
115        let memory_limiter = context.topology_context().memory_limiter().clone();
116        let dispatcher = context.dispatcher();
117
118        let (payload_tx, mut payload_rx) = mpsc::channel(1024);
119
120        let handler = RelayHandler::new(payload_tx);
121        let server_builder = OtlpServerBuilder::new(
122            http_endpoint.clone(),
123            grpc_endpoint.clone(),
124            grpc_max_recv_msg_size_bytes,
125        );
126
127        let (http_shutdown, mut http_error) = server_builder
128            .build(handler, memory_limiter, global_thread_pool, metrics)
129            .await?;
130
131        health.mark_ready();
132        debug!(%http_endpoint, %grpc_endpoint, "OTLP relay started.");
133
134        loop {
135            select! {
136                _ = &mut global_shutdown => {
137                    debug!("Received shutdown signal.");
138                    break
139                },
140                error = &mut http_error => {
141                    if let Some(error) = error {
142                        debug!(%error, "HTTP server error.");
143                    }
144                    break;
145                },
146                Some(otlp_payload) = payload_rx.recv() => {
147                    let payload = Payload::Grpc(otlp_payload.into_grpc_payload());
148                    if let Err(e) = dispatcher.dispatch(payload).await {
149                        error!(error = %e, "Failed to dispatch OTLP payload.");
150                    }
151                },
152                _ = health.live() => continue,
153            }
154        }
155
156        debug!("Stopping OTLP relay...");
157
158        http_shutdown.shutdown();
159
160        debug!("OTLP relay stopped.");
161
162        Ok(())
163    }
164}
165
166enum OtlpSignalType {
167    Metrics,
168    Logs,
169    Traces,
170}
171
172struct OtlpPayload {
173    signal_type: OtlpSignalType,
174    data: Bytes,
175}
176
177impl OtlpPayload {
178    fn metrics(data: Bytes) -> Self {
179        Self {
180            signal_type: OtlpSignalType::Metrics,
181            data,
182        }
183    }
184
185    fn logs(data: Bytes) -> Self {
186        Self {
187            signal_type: OtlpSignalType::Logs,
188            data,
189        }
190    }
191
192    fn traces(data: Bytes) -> Self {
193        Self {
194            signal_type: OtlpSignalType::Traces,
195            data,
196        }
197    }
198
199    fn into_grpc_payload(self) -> GrpcPayload {
200        let service_path = match self.signal_type {
201            OtlpSignalType::Metrics => OTLP_METRICS_GRPC_SERVICE_PATH,
202            OtlpSignalType::Logs => OTLP_LOGS_GRPC_SERVICE_PATH,
203            OtlpSignalType::Traces => OTLP_TRACES_GRPC_SERVICE_PATH,
204        };
205
206        // We provide an empty endpoint because we want any consuming components to fill that in for themselves.
207        GrpcPayload::new(
208            PayloadMetadata::from_event_count(1),
209            MetaString::empty(),
210            service_path,
211            FrozenChunkedBytesBuffer::from(self.data),
212        )
213    }
214}
215
216/// Handler that forwards OTLP payloads to a channel for downstream processing.
217struct RelayHandler {
218    tx: mpsc::Sender<OtlpPayload>,
219}
220
221impl RelayHandler {
222    fn new(tx: mpsc::Sender<OtlpPayload>) -> Self {
223        Self { tx }
224    }
225}
226
227#[async_trait]
228impl OtlpHandler for RelayHandler {
229    async fn handle_metrics(&self, body: Bytes) -> Result<(), GenericError> {
230        self.tx
231            .send(OtlpPayload::metrics(body))
232            .await
233            .error_context("Failed to send OTLP metrics payload to relay dispatcher: channel closed.")
234    }
235
236    async fn handle_logs(&self, body: Bytes) -> Result<(), GenericError> {
237        self.tx
238            .send(OtlpPayload::logs(body))
239            .await
240            .error_context("Failed to send OTLP logs payload to relay dispatcher: channel closed.")
241    }
242
243    async fn handle_traces(&self, body: Bytes) -> Result<(), GenericError> {
244        self.tx
245            .send(OtlpPayload::traces(body))
246            .await
247            .error_context("Failed to send OTLP traces payload to relay dispatcher: channel closed.")
248    }
249}