saluki_components/relays/otlp/
mod.rs1use std::sync::Arc;
2
3use async_trait::async_trait;
4use axum::body::Bytes;
5use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
6use saluki_config::GenericConfiguration;
7use saluki_core::components::relays::{Relay, RelayBuilder, RelayContext};
8use saluki_core::components::ComponentContext;
9use saluki_core::data_model::payload::{GrpcPayload, HttpPayload, Payload, PayloadMetadata, PayloadType};
10use saluki_error::GenericError;
11use saluki_io::net::ListenAddress;
12use serde::Deserialize;
13use stringtheory::MetaString;
14use tokio::select;
15use tokio::sync::mpsc;
16use tracing::{debug, error};
17
18use crate::common::otlp::config::{Receiver, Traces};
19use crate::common::otlp::{build_metrics, Metrics, OtlpHandler, OtlpServerBuilder};
20
21const OTLP_TRACE_SERVICE_PATH: MetaString =
22 MetaString::from_static("/opentelemetry.proto.collector.trace.v1.TraceService/Export");
23
24fn default_otlp_destination_endpoint() -> String {
25 "http://localhost:4319".to_string()
26}
27
28#[derive(Deserialize, Default)]
30pub struct OtlpRelayConfiguration {
31 #[serde(default)]
32 otlp_config: OtlpRelayConfig,
33
34 #[serde(default = "default_otlp_destination_endpoint")]
36 otlp_destination_endpoint: String,
37}
38
39#[derive(Deserialize, Default)]
41pub struct OtlpRelayConfig {
42 #[serde(default)]
43 receiver: Receiver,
44
45 #[serde(default)]
46 traces: Traces,
47}
48
49impl OtlpRelayConfiguration {
50 pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
52 config.as_typed().map_err(Into::into)
53 }
54
55 fn http_endpoint(&self) -> ListenAddress {
56 let transport = &self.otlp_config.receiver.protocols.http.transport;
57 let endpoint = &self.otlp_config.receiver.protocols.http.endpoint;
58 let address = format!("{}://{}", transport, endpoint);
59 ListenAddress::try_from(address).expect("valid HTTP endpoint")
60 }
61
62 fn grpc_endpoint(&self) -> ListenAddress {
63 let transport = &self.otlp_config.receiver.protocols.grpc.transport;
64 let endpoint = &self.otlp_config.receiver.protocols.grpc.endpoint;
65 let address = format!("{}://{}", transport, endpoint);
66 ListenAddress::try_from(address).expect("valid gRPC endpoint")
67 }
68
69 fn grpc_max_recv_msg_size_bytes(&self) -> usize {
70 (self.otlp_config.receiver.protocols.grpc.max_recv_msg_size_mib * 1024 * 1024) as usize
71 }
72
73 fn destination_endpoint(&self) -> &str {
74 &self.otlp_destination_endpoint
75 }
76
77 fn trace_destination_endpoint(&self) -> MetaString {
78 format!("localhost:{}", self.otlp_config.traces.internal_port).into()
79 }
80}
81
82impl MemoryBounds for OtlpRelayConfiguration {
83 fn specify_bounds(&self, _builder: &mut MemoryBoundsBuilder) {}
84}
85
86enum OtlpSignalType {
87 Metrics,
88 Logs,
89 Traces,
90}
91
92struct OtlpPayload {
93 signal_type: OtlpSignalType,
94 data: Bytes,
95}
96
97pub struct OtlpRelay {
101 http_endpoint: ListenAddress,
102 grpc_endpoint: ListenAddress,
103 grpc_max_recv_msg_size_bytes: usize,
104 destination_endpoint: String,
105 trace_destination_endpoint: MetaString,
106 metrics: Metrics,
107}
108
109struct RelayHandler {
111 tx: mpsc::Sender<OtlpPayload>,
112}
113
114impl RelayHandler {
115 fn new(tx: mpsc::Sender<OtlpPayload>) -> Self {
116 Self { tx }
117 }
118}
119
120#[async_trait]
121impl OtlpHandler for RelayHandler {
122 async fn handle_metrics(&self, body: Bytes) -> Result<(), String> {
123 let otlp_payload = OtlpPayload {
124 signal_type: OtlpSignalType::Metrics,
125 data: body,
126 };
127
128 self.tx
129 .send(otlp_payload)
130 .await
131 .map_err(|_| "Failed to send OTLP metrics payload to dispatcher; channel is closed.".to_string())?;
132 Ok(())
133 }
134
135 async fn handle_logs(&self, body: Bytes) -> Result<(), String> {
136 let otlp_payload = OtlpPayload {
137 signal_type: OtlpSignalType::Logs,
138 data: body,
139 };
140
141 self.tx
142 .send(otlp_payload)
143 .await
144 .map_err(|_| "Failed to send OTLP logs payload to dispatcher; channel is closed.".to_string())?;
145 Ok(())
146 }
147
148 async fn handle_traces(&self, body: Bytes) -> Result<(), String> {
149 let otlp_payload = OtlpPayload {
150 signal_type: OtlpSignalType::Traces,
151 data: body,
152 };
153
154 self.tx
155 .send(otlp_payload)
156 .await
157 .map_err(|_| "Failed to send OTLP traces payload to dispatcher; channel is closed.".to_string())?;
158 Ok(())
159 }
160}
161
162#[async_trait]
163impl RelayBuilder for OtlpRelayConfiguration {
164 fn output_payload_type(&self) -> PayloadType {
165 PayloadType::Http | PayloadType::Grpc
166 }
167
168 async fn build(&self, context: ComponentContext) -> Result<Box<dyn Relay + Send>, GenericError> {
169 Ok(Box::new(OtlpRelay {
170 http_endpoint: self.http_endpoint(),
171 grpc_endpoint: self.grpc_endpoint(),
172 grpc_max_recv_msg_size_bytes: self.grpc_max_recv_msg_size_bytes(),
173 destination_endpoint: self.destination_endpoint().to_string(),
174 trace_destination_endpoint: self.trace_destination_endpoint(),
175 metrics: build_metrics(&context),
176 }))
177 }
178}
179
180#[async_trait]
181impl Relay for OtlpRelay {
182 async fn run(self: Box<Self>, mut context: RelayContext) -> Result<(), GenericError> {
183 let mut global_shutdown = context.take_shutdown_handle();
184 let mut health = context.take_health_handle();
185 let global_thread_pool = context.topology_context().global_thread_pool().clone();
186 let memory_limiter = context.topology_context().memory_limiter().clone();
187 let dispatcher = context.dispatcher();
188
189 let (payload_tx, mut payload_rx) = mpsc::channel::<OtlpPayload>(1024);
190
191 let handler = RelayHandler::new(payload_tx);
192 let server_builder = OtlpServerBuilder::new(
193 self.http_endpoint.clone(),
194 self.grpc_endpoint.clone(),
195 self.grpc_max_recv_msg_size_bytes,
196 );
197
198 let metrics_arc = Arc::new(self.metrics);
199
200 let (http_shutdown, mut http_error) = server_builder
201 .build(
202 handler,
203 memory_limiter.clone(),
204 global_thread_pool.clone(),
205 metrics_arc.clone(),
206 )
207 .await?;
208
209 health.mark_ready();
210 debug!(
211 http_endpoint = %self.http_endpoint,
212 grpc_endpoint = %self.grpc_endpoint,
213 destination = %self.destination_endpoint,
214 "OTLP relay started."
215 );
216
217 loop {
218 select! {
219 _ = &mut global_shutdown => {
220 debug!("Received shutdown signal.");
221 break
222 },
223 error = &mut http_error => {
224 if let Some(error) = error {
225 debug!(%error, "HTTP server error.");
226 }
227 break;
228 },
229 Some(otlp_payload) = payload_rx.recv() => {
230 let metadata = PayloadMetadata::from_event_count(1);
231 let buffer = otlp_payload.data.into();
232 if matches!(otlp_payload.signal_type, OtlpSignalType::Traces) {
233
234 let grpc_payload = GrpcPayload::new(
235 metadata,
236 self.trace_destination_endpoint.clone(),
237 OTLP_TRACE_SERVICE_PATH,
238 buffer,
239 );
240 let payload = Payload::Grpc(grpc_payload);
241
242 if let Err(e) = dispatcher.dispatch(payload).await {
243 error!(error = %e, "Failed to dispatch gRPC trace payload.");
244 }
245 } else {
246 let path = match otlp_payload.signal_type {
247 OtlpSignalType::Metrics => "/v1/metrics",
248 OtlpSignalType::Logs => "/v1/logs",
249 OtlpSignalType::Traces => "/v1/traces",
250 };
251 let uri = format!("{}{}", self.destination_endpoint, path);
252
253 let request = match http::Request::builder()
254 .method("POST")
255 .uri(&uri)
256 .header("content-type", "application/x-protobuf")
257 .body(buffer)
258 {
259 Ok(req) => req,
260 Err(e) => {
261 error!(error = %e, "Failed to build HTTP request for OTLP payload.");
262 continue;
263 }
264 };
265
266 let http_payload = HttpPayload::new(metadata, request);
267 let payload = Payload::Http(http_payload);
268
269 if let Err(e) = dispatcher.dispatch(payload).await {
270 error!(error = %e, "Failed to dispatch HTTP OTLP payload.");
271 }
272 }
273 },
274 _ = health.live() => continue,
275 }
276 }
277
278 debug!("Stopping OTLP relay...");
279 http_shutdown.shutdown();
280 debug!("OTLP relay stopped.");
281
282 Ok(())
283 }
284}