saluki_components/relays/otlp/
mod.rs

1use std::sync::Arc;
2
3use async_trait::async_trait;
4use axum::body::Bytes;
5use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
6use saluki_config::GenericConfiguration;
7use saluki_core::components::relays::{Relay, RelayBuilder, RelayContext};
8use saluki_core::components::ComponentContext;
9use saluki_core::data_model::payload::{GrpcPayload, HttpPayload, Payload, PayloadMetadata, PayloadType};
10use saluki_error::GenericError;
11use saluki_io::net::ListenAddress;
12use serde::Deserialize;
13use stringtheory::MetaString;
14use tokio::select;
15use tokio::sync::mpsc;
16use tracing::{debug, error};
17
18use crate::common::otlp::config::{Receiver, Traces};
19use crate::common::otlp::{build_metrics, Metrics, OtlpHandler, OtlpServerBuilder};
20
21const OTLP_TRACE_SERVICE_PATH: MetaString =
22    MetaString::from_static("/opentelemetry.proto.collector.trace.v1.TraceService/Export");
23
24fn default_otlp_destination_endpoint() -> String {
25    "http://localhost:4319".to_string()
26}
27
28/// Configuration for the OTLP relay.
29#[derive(Deserialize, Default)]
30pub struct OtlpRelayConfiguration {
31    #[serde(default)]
32    otlp_config: OtlpRelayConfig,
33
34    /// The destination endpoint to forward OTLP data to (metrics and logs).
35    #[serde(default = "default_otlp_destination_endpoint")]
36    otlp_destination_endpoint: String,
37}
38
39/// OTLP configuration for the relay.
40#[derive(Deserialize, Default)]
41pub struct OtlpRelayConfig {
42    #[serde(default)]
43    receiver: Receiver,
44
45    #[serde(default)]
46    traces: Traces,
47}
48
49impl OtlpRelayConfiguration {
50    /// Creates a new `OtlpRelayConfiguration` from the given generic configuration.
51    pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
52        config.as_typed().map_err(Into::into)
53    }
54
55    fn http_endpoint(&self) -> ListenAddress {
56        let transport = &self.otlp_config.receiver.protocols.http.transport;
57        let endpoint = &self.otlp_config.receiver.protocols.http.endpoint;
58        let address = format!("{}://{}", transport, endpoint);
59        ListenAddress::try_from(address).expect("valid HTTP endpoint")
60    }
61
62    fn grpc_endpoint(&self) -> ListenAddress {
63        let transport = &self.otlp_config.receiver.protocols.grpc.transport;
64        let endpoint = &self.otlp_config.receiver.protocols.grpc.endpoint;
65        let address = format!("{}://{}", transport, endpoint);
66        ListenAddress::try_from(address).expect("valid gRPC endpoint")
67    }
68
69    fn grpc_max_recv_msg_size_bytes(&self) -> usize {
70        (self.otlp_config.receiver.protocols.grpc.max_recv_msg_size_mib * 1024 * 1024) as usize
71    }
72
73    fn destination_endpoint(&self) -> &str {
74        &self.otlp_destination_endpoint
75    }
76
77    fn trace_destination_endpoint(&self) -> MetaString {
78        format!("localhost:{}", self.otlp_config.traces.internal_port).into()
79    }
80}
81
82impl MemoryBounds for OtlpRelayConfiguration {
83    fn specify_bounds(&self, _builder: &mut MemoryBoundsBuilder) {}
84}
85
86enum OtlpSignalType {
87    Metrics,
88    Logs,
89    Traces,
90}
91
92struct OtlpPayload {
93    signal_type: OtlpSignalType,
94    data: Bytes,
95}
96
97/// OTLP relay.
98///
99/// Receives OTLP metrics and logs via gRPC and HTTP, outputting payloads for downstream processing.
100pub struct OtlpRelay {
101    http_endpoint: ListenAddress,
102    grpc_endpoint: ListenAddress,
103    grpc_max_recv_msg_size_bytes: usize,
104    destination_endpoint: String,
105    trace_destination_endpoint: MetaString,
106    metrics: Metrics,
107}
108
109/// Handler that forwards OTLP payloads to a channel for downstream processing.
110struct RelayHandler {
111    tx: mpsc::Sender<OtlpPayload>,
112}
113
114impl RelayHandler {
115    fn new(tx: mpsc::Sender<OtlpPayload>) -> Self {
116        Self { tx }
117    }
118}
119
120#[async_trait]
121impl OtlpHandler for RelayHandler {
122    async fn handle_metrics(&self, body: Bytes) -> Result<(), String> {
123        let otlp_payload = OtlpPayload {
124            signal_type: OtlpSignalType::Metrics,
125            data: body,
126        };
127
128        self.tx
129            .send(otlp_payload)
130            .await
131            .map_err(|_| "Failed to send OTLP metrics payload to dispatcher; channel is closed.".to_string())?;
132        Ok(())
133    }
134
135    async fn handle_logs(&self, body: Bytes) -> Result<(), String> {
136        let otlp_payload = OtlpPayload {
137            signal_type: OtlpSignalType::Logs,
138            data: body,
139        };
140
141        self.tx
142            .send(otlp_payload)
143            .await
144            .map_err(|_| "Failed to send OTLP logs payload to dispatcher; channel is closed.".to_string())?;
145        Ok(())
146    }
147
148    async fn handle_traces(&self, body: Bytes) -> Result<(), String> {
149        let otlp_payload = OtlpPayload {
150            signal_type: OtlpSignalType::Traces,
151            data: body,
152        };
153
154        self.tx
155            .send(otlp_payload)
156            .await
157            .map_err(|_| "Failed to send OTLP traces payload to dispatcher; channel is closed.".to_string())?;
158        Ok(())
159    }
160}
161
162#[async_trait]
163impl RelayBuilder for OtlpRelayConfiguration {
164    fn output_payload_type(&self) -> PayloadType {
165        PayloadType::Http | PayloadType::Grpc
166    }
167
168    async fn build(&self, context: ComponentContext) -> Result<Box<dyn Relay + Send>, GenericError> {
169        Ok(Box::new(OtlpRelay {
170            http_endpoint: self.http_endpoint(),
171            grpc_endpoint: self.grpc_endpoint(),
172            grpc_max_recv_msg_size_bytes: self.grpc_max_recv_msg_size_bytes(),
173            destination_endpoint: self.destination_endpoint().to_string(),
174            trace_destination_endpoint: self.trace_destination_endpoint(),
175            metrics: build_metrics(&context),
176        }))
177    }
178}
179
180#[async_trait]
181impl Relay for OtlpRelay {
182    async fn run(self: Box<Self>, mut context: RelayContext) -> Result<(), GenericError> {
183        let mut global_shutdown = context.take_shutdown_handle();
184        let mut health = context.take_health_handle();
185        let global_thread_pool = context.topology_context().global_thread_pool().clone();
186        let memory_limiter = context.topology_context().memory_limiter().clone();
187        let dispatcher = context.dispatcher();
188
189        let (payload_tx, mut payload_rx) = mpsc::channel::<OtlpPayload>(1024);
190
191        let handler = RelayHandler::new(payload_tx);
192        let server_builder = OtlpServerBuilder::new(
193            self.http_endpoint.clone(),
194            self.grpc_endpoint.clone(),
195            self.grpc_max_recv_msg_size_bytes,
196        );
197
198        let metrics_arc = Arc::new(self.metrics);
199
200        let (http_shutdown, mut http_error) = server_builder
201            .build(
202                handler,
203                memory_limiter.clone(),
204                global_thread_pool.clone(),
205                metrics_arc.clone(),
206            )
207            .await?;
208
209        health.mark_ready();
210        debug!(
211            http_endpoint = %self.http_endpoint,
212            grpc_endpoint = %self.grpc_endpoint,
213            destination = %self.destination_endpoint,
214            "OTLP relay started."
215        );
216
217        loop {
218            select! {
219                _ = &mut global_shutdown => {
220                    debug!("Received shutdown signal.");
221                    break
222                },
223                error = &mut http_error => {
224                    if let Some(error) = error {
225                        debug!(%error, "HTTP server error.");
226                    }
227                    break;
228                },
229                Some(otlp_payload) = payload_rx.recv() => {
230                    let metadata = PayloadMetadata::from_event_count(1);
231                    let buffer = otlp_payload.data.into();
232                    if matches!(otlp_payload.signal_type, OtlpSignalType::Traces) {
233
234                        let grpc_payload = GrpcPayload::new(
235                            metadata,
236                            self.trace_destination_endpoint.clone(),
237                            OTLP_TRACE_SERVICE_PATH,
238                            buffer,
239                        );
240                        let payload = Payload::Grpc(grpc_payload);
241
242                        if let Err(e) = dispatcher.dispatch(payload).await {
243                            error!(error = %e, "Failed to dispatch gRPC trace payload.");
244                        }
245                    } else {
246                        let path = match otlp_payload.signal_type {
247                            OtlpSignalType::Metrics => "/v1/metrics",
248                            OtlpSignalType::Logs => "/v1/logs",
249                            OtlpSignalType::Traces => "/v1/traces",
250                        };
251                        let uri = format!("{}{}", self.destination_endpoint, path);
252
253                        let request = match http::Request::builder()
254                            .method("POST")
255                            .uri(&uri)
256                            .header("content-type", "application/x-protobuf")
257                            .body(buffer)
258                        {
259                            Ok(req) => req,
260                            Err(e) => {
261                                error!(error = %e, "Failed to build HTTP request for OTLP payload.");
262                                continue;
263                            }
264                        };
265
266                        let http_payload = HttpPayload::new(metadata, request);
267                        let payload = Payload::Http(http_payload);
268
269                        if let Err(e) = dispatcher.dispatch(payload).await {
270                            error!(error = %e, "Failed to dispatch HTTP OTLP payload.");
271                        }
272                    }
273                },
274                _ = health.live() => continue,
275            }
276        }
277
278        debug!("Stopping OTLP relay...");
279        http_shutdown.shutdown();
280        debug!("OTLP relay stopped.");
281
282        Ok(())
283    }
284}