saluki_components/relays/otlp/
mod.rs1use std::sync::LazyLock;
2
3use async_trait::async_trait;
4use axum::body::Bytes;
5use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
6use saluki_common::buf::FrozenChunkedBytesBuffer;
7use saluki_config::GenericConfiguration;
8use saluki_core::components::relays::{Relay, RelayBuilder, RelayContext};
9use saluki_core::components::ComponentContext;
10use saluki_core::data_model::payload::{GrpcPayload, Payload, PayloadMetadata, PayloadType};
11use saluki_core::topology::OutputDefinition;
12use saluki_error::{ErrorContext as _, GenericError};
13use saluki_io::net::ListenAddress;
14use serde::Deserialize;
15use stringtheory::MetaString;
16use tokio::select;
17use tokio::sync::mpsc;
18use tracing::{debug, error};
19
20use crate::common::otlp::config::Receiver;
21use crate::common::otlp::{
22 build_metrics, Metrics, OtlpHandler, OtlpServerBuilder, OTLP_LOGS_GRPC_SERVICE_PATH,
23 OTLP_METRICS_GRPC_SERVICE_PATH, OTLP_TRACES_GRPC_SERVICE_PATH,
24};
25
26#[derive(Deserialize, Default)]
28pub struct OtlpRelayConfiguration {
29 #[serde(default)]
30 otlp_config: OtlpRelayConfig,
31}
32
33#[derive(Deserialize, Default)]
35pub struct OtlpRelayConfig {
36 #[serde(default)]
37 receiver: Receiver,
38}
39
40impl OtlpRelayConfiguration {
41 pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
43 config.as_typed().map_err(Into::into)
44 }
45
46 fn http_endpoint(&self) -> ListenAddress {
47 let transport = &self.otlp_config.receiver.protocols.http.transport;
48 let endpoint = &self.otlp_config.receiver.protocols.http.endpoint;
49 let address = format!("{}://{}", transport, endpoint);
50 ListenAddress::try_from(address).expect("valid HTTP endpoint")
51 }
52
53 fn grpc_endpoint(&self) -> ListenAddress {
54 let transport = &self.otlp_config.receiver.protocols.grpc.transport;
55 let endpoint = &self.otlp_config.receiver.protocols.grpc.endpoint;
56 let address = format!("{}://{}", transport, endpoint);
57 ListenAddress::try_from(address).expect("valid gRPC endpoint")
58 }
59
60 fn grpc_max_recv_msg_size_bytes(&self) -> usize {
61 (self.otlp_config.receiver.protocols.grpc.max_recv_msg_size_mib * 1024 * 1024) as usize
62 }
63}
64
65impl MemoryBounds for OtlpRelayConfiguration {
66 fn specify_bounds(&self, _builder: &mut MemoryBoundsBuilder) {}
67}
68
69#[async_trait]
70impl RelayBuilder for OtlpRelayConfiguration {
71 fn outputs(&self) -> &[OutputDefinition<PayloadType>] {
72 static OUTPUTS: LazyLock<Vec<OutputDefinition<PayloadType>>> = LazyLock::new(|| {
73 vec![
74 OutputDefinition::named_output("metrics", PayloadType::Grpc),
75 OutputDefinition::named_output("logs", PayloadType::Grpc),
76 OutputDefinition::named_output("traces", PayloadType::Grpc),
77 ]
78 });
79 &OUTPUTS
80 }
81
82 async fn build(&self, context: ComponentContext) -> Result<Box<dyn Relay + Send>, GenericError> {
83 Ok(Box::new(OtlpRelay {
84 http_endpoint: self.http_endpoint(),
85 grpc_endpoint: self.grpc_endpoint(),
86 grpc_max_recv_msg_size_bytes: self.grpc_max_recv_msg_size_bytes(),
87 metrics: build_metrics(&context),
88 }))
89 }
90}
91
92pub struct OtlpRelay {
96 http_endpoint: ListenAddress,
97 grpc_endpoint: ListenAddress,
98 grpc_max_recv_msg_size_bytes: usize,
99 metrics: Metrics,
100}
101
102#[async_trait]
103impl Relay for OtlpRelay {
104 async fn run(self: Box<Self>, mut context: RelayContext) -> Result<(), GenericError> {
105 let Self {
106 http_endpoint,
107 grpc_endpoint,
108 grpc_max_recv_msg_size_bytes,
109 metrics,
110 } = *self;
111
112 let mut global_shutdown = context.take_shutdown_handle();
113 let mut health = context.take_health_handle();
114 let global_thread_pool = context.topology_context().global_thread_pool().clone();
115 let memory_limiter = context.topology_context().memory_limiter().clone();
116 let dispatcher = context.dispatcher();
117
118 let (payload_tx, mut payload_rx) = mpsc::channel(1024);
119
120 let handler = RelayHandler::new(payload_tx);
121 let server_builder = OtlpServerBuilder::new(
122 http_endpoint.clone(),
123 grpc_endpoint.clone(),
124 grpc_max_recv_msg_size_bytes,
125 );
126
127 let (http_shutdown, mut http_error) = server_builder
128 .build(handler, memory_limiter, global_thread_pool, metrics)
129 .await?;
130
131 health.mark_ready();
132 debug!(%http_endpoint, %grpc_endpoint, "OTLP relay started.");
133
134 loop {
135 select! {
136 _ = &mut global_shutdown => {
137 debug!("Received shutdown signal.");
138 break
139 },
140 error = &mut http_error => {
141 if let Some(error) = error {
142 debug!(%error, "HTTP server error.");
143 }
144 break;
145 },
146 Some(otlp_payload) = payload_rx.recv() => {
147 let output_name = otlp_payload.signal_type.as_str();
148 let payload = Payload::Grpc(otlp_payload.into_grpc_payload());
149 if let Err(e) = dispatcher.dispatch_named(output_name, payload).await {
150 error!(error = %e, output = output_name, "Failed to dispatch OTLP payload.");
151 }
152 },
153 _ = health.live() => continue,
154 }
155 }
156
157 debug!("Stopping OTLP relay...");
158
159 http_shutdown.shutdown();
160
161 debug!("OTLP relay stopped.");
162
163 Ok(())
164 }
165}
166
167enum OtlpSignalType {
168 Metrics,
169 Logs,
170 Traces,
171}
172
173impl OtlpSignalType {
174 fn as_str(&self) -> &'static str {
175 match self {
176 OtlpSignalType::Metrics => "metrics",
177 OtlpSignalType::Logs => "logs",
178 OtlpSignalType::Traces => "traces",
179 }
180 }
181}
182
183struct OtlpPayload {
184 signal_type: OtlpSignalType,
185 data: Bytes,
186}
187
188impl OtlpPayload {
189 fn metrics(data: Bytes) -> Self {
190 Self {
191 signal_type: OtlpSignalType::Metrics,
192 data,
193 }
194 }
195
196 fn logs(data: Bytes) -> Self {
197 Self {
198 signal_type: OtlpSignalType::Logs,
199 data,
200 }
201 }
202
203 fn traces(data: Bytes) -> Self {
204 Self {
205 signal_type: OtlpSignalType::Traces,
206 data,
207 }
208 }
209
210 fn into_grpc_payload(self) -> GrpcPayload {
211 let service_path = match self.signal_type {
212 OtlpSignalType::Metrics => OTLP_METRICS_GRPC_SERVICE_PATH,
213 OtlpSignalType::Logs => OTLP_LOGS_GRPC_SERVICE_PATH,
214 OtlpSignalType::Traces => OTLP_TRACES_GRPC_SERVICE_PATH,
215 };
216
217 GrpcPayload::new(
219 PayloadMetadata::from_event_count(1),
220 MetaString::empty(),
221 service_path,
222 FrozenChunkedBytesBuffer::from(self.data),
223 )
224 }
225}
226
227struct RelayHandler {
229 tx: mpsc::Sender<OtlpPayload>,
230}
231
232impl RelayHandler {
233 fn new(tx: mpsc::Sender<OtlpPayload>) -> Self {
234 Self { tx }
235 }
236}
237
238#[async_trait]
239impl OtlpHandler for RelayHandler {
240 async fn handle_metrics(&self, body: Bytes) -> Result<(), GenericError> {
241 self.tx
242 .send(OtlpPayload::metrics(body))
243 .await
244 .error_context("Failed to send OTLP metrics payload to relay dispatcher: channel closed.")
245 }
246
247 async fn handle_logs(&self, body: Bytes) -> Result<(), GenericError> {
248 self.tx
249 .send(OtlpPayload::logs(body))
250 .await
251 .error_context("Failed to send OTLP logs payload to relay dispatcher: channel closed.")
252 }
253
254 async fn handle_traces(&self, body: Bytes) -> Result<(), GenericError> {
255 self.tx
256 .send(OtlpPayload::traces(body))
257 .await
258 .error_context("Failed to send OTLP traces payload to relay dispatcher: channel closed.")
259 }
260}