saluki_components/relays/otlp/
mod.rs1use std::sync::LazyLock;
2
3use async_trait::async_trait;
4use axum::body::Bytes;
5use facet::Facet;
6use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
7use saluki_common::buf::FrozenChunkedBytesBuffer;
8use saluki_config::GenericConfiguration;
9use saluki_core::components::relays::{Relay, RelayBuilder, RelayContext};
10use saluki_core::components::ComponentContext;
11use saluki_core::data_model::payload::{GrpcPayload, Payload, PayloadMetadata, PayloadType};
12use saluki_core::topology::OutputDefinition;
13use saluki_error::{ErrorContext as _, GenericError};
14use saluki_io::net::ListenAddress;
15use serde::Deserialize;
16use stringtheory::MetaString;
17use tokio::select;
18use tokio::sync::mpsc;
19use tracing::{debug, error};
20
21use crate::common::otlp::config::Receiver;
22use crate::common::otlp::{
23 build_metrics, Metrics, OtlpHandler, OtlpServerBuilder, OTLP_LOGS_GRPC_SERVICE_PATH,
24 OTLP_METRICS_GRPC_SERVICE_PATH, OTLP_TRACES_GRPC_SERVICE_PATH,
25};
26
27#[derive(Deserialize, Default, Facet)]
29#[cfg_attr(test, derive(Debug, PartialEq, serde::Serialize))]
30pub struct OtlpRelayConfiguration {
31 #[serde(default)]
32 otlp_config: OtlpRelayConfig,
33}
34
35#[derive(Deserialize, Default, Facet)]
37#[cfg_attr(test, derive(Debug, PartialEq, serde::Serialize))]
38pub struct OtlpRelayConfig {
39 #[serde(default)]
40 receiver: Receiver,
41}
42
43impl OtlpRelayConfiguration {
44 pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
46 config.as_typed().map_err(Into::into)
47 }
48
49 fn http_endpoint(&self) -> ListenAddress {
50 let transport = &self.otlp_config.receiver.protocols.http.transport;
51 let endpoint = &self.otlp_config.receiver.protocols.http.endpoint;
52 let address = format!("{}://{}", transport, endpoint);
53 ListenAddress::try_from(address).expect("valid HTTP endpoint")
54 }
55
56 fn grpc_endpoint(&self) -> ListenAddress {
57 let transport = &self.otlp_config.receiver.protocols.grpc.transport;
58 let endpoint = &self.otlp_config.receiver.protocols.grpc.endpoint;
59 let address = format!("{}://{}", transport, endpoint);
60 ListenAddress::try_from(address).expect("valid gRPC endpoint")
61 }
62
63 fn grpc_max_recv_msg_size_bytes(&self) -> usize {
64 (self.otlp_config.receiver.protocols.grpc.max_recv_msg_size_mib * 1024 * 1024) as usize
65 }
66}
67
68impl MemoryBounds for OtlpRelayConfiguration {
69 fn specify_bounds(&self, _builder: &mut MemoryBoundsBuilder) {}
70}
71
72#[async_trait]
73impl RelayBuilder for OtlpRelayConfiguration {
74 fn outputs(&self) -> &[OutputDefinition<PayloadType>] {
75 static OUTPUTS: LazyLock<Vec<OutputDefinition<PayloadType>>> = LazyLock::new(|| {
76 vec![
77 OutputDefinition::named_output("metrics", PayloadType::Grpc),
78 OutputDefinition::named_output("logs", PayloadType::Grpc),
79 OutputDefinition::named_output("traces", PayloadType::Grpc),
80 ]
81 });
82 &OUTPUTS
83 }
84
85 async fn build(&self, context: ComponentContext) -> Result<Box<dyn Relay + Send>, GenericError> {
86 Ok(Box::new(OtlpRelay {
87 http_endpoint: self.http_endpoint(),
88 grpc_endpoint: self.grpc_endpoint(),
89 grpc_max_recv_msg_size_bytes: self.grpc_max_recv_msg_size_bytes(),
90 metrics: build_metrics(&context),
91 }))
92 }
93}
94
95pub struct OtlpRelay {
99 http_endpoint: ListenAddress,
100 grpc_endpoint: ListenAddress,
101 grpc_max_recv_msg_size_bytes: usize,
102 metrics: Metrics,
103}
104
105#[async_trait]
106impl Relay for OtlpRelay {
107 async fn run(self: Box<Self>, mut context: RelayContext) -> Result<(), GenericError> {
108 let Self {
109 http_endpoint,
110 grpc_endpoint,
111 grpc_max_recv_msg_size_bytes,
112 metrics,
113 } = *self;
114
115 let mut global_shutdown = context.take_shutdown_handle();
116 let mut health = context.take_health_handle();
117 let global_thread_pool = context.topology_context().global_thread_pool().clone();
118 let memory_limiter = context.topology_context().memory_limiter().clone();
119 let dispatcher = context.dispatcher();
120
121 let (payload_tx, mut payload_rx) = mpsc::channel(1024);
122
123 let handler = RelayHandler::new(payload_tx);
124 let server_builder = OtlpServerBuilder::new(
125 http_endpoint.clone(),
126 grpc_endpoint.clone(),
127 grpc_max_recv_msg_size_bytes,
128 );
129
130 let (http_shutdown, mut http_error) = server_builder
131 .build(handler, memory_limiter, global_thread_pool, metrics)
132 .await?;
133
134 health.mark_ready();
135 debug!(%http_endpoint, %grpc_endpoint, "OTLP relay started.");
136
137 loop {
138 select! {
139 _ = &mut global_shutdown => {
140 debug!("Received shutdown signal.");
141 break
142 },
143 error = &mut http_error => {
144 if let Some(error) = error {
145 debug!(%error, "HTTP server error.");
146 }
147 break;
148 },
149 Some(otlp_payload) = payload_rx.recv() => {
150 let output_name = otlp_payload.signal_type.as_str();
151 let payload = Payload::Grpc(otlp_payload.into_grpc_payload());
152 if let Err(e) = dispatcher.dispatch_named(output_name, payload).await {
153 error!(error = %e, output = output_name, "Failed to dispatch OTLP payload.");
154 }
155 },
156 _ = health.live() => continue,
157 }
158 }
159
160 debug!("Stopping OTLP relay...");
161
162 http_shutdown.shutdown();
163
164 debug!("OTLP relay stopped.");
165
166 Ok(())
167 }
168}
169
170enum OtlpSignalType {
171 Metrics,
172 Logs,
173 Traces,
174}
175
176impl OtlpSignalType {
177 fn as_str(&self) -> &'static str {
178 match self {
179 OtlpSignalType::Metrics => "metrics",
180 OtlpSignalType::Logs => "logs",
181 OtlpSignalType::Traces => "traces",
182 }
183 }
184}
185
186struct OtlpPayload {
187 signal_type: OtlpSignalType,
188 data: Bytes,
189}
190
191impl OtlpPayload {
192 fn metrics(data: Bytes) -> Self {
193 Self {
194 signal_type: OtlpSignalType::Metrics,
195 data,
196 }
197 }
198
199 fn logs(data: Bytes) -> Self {
200 Self {
201 signal_type: OtlpSignalType::Logs,
202 data,
203 }
204 }
205
206 fn traces(data: Bytes) -> Self {
207 Self {
208 signal_type: OtlpSignalType::Traces,
209 data,
210 }
211 }
212
213 fn into_grpc_payload(self) -> GrpcPayload {
214 let service_path = match self.signal_type {
215 OtlpSignalType::Metrics => OTLP_METRICS_GRPC_SERVICE_PATH,
216 OtlpSignalType::Logs => OTLP_LOGS_GRPC_SERVICE_PATH,
217 OtlpSignalType::Traces => OTLP_TRACES_GRPC_SERVICE_PATH,
218 };
219
220 GrpcPayload::new(
222 PayloadMetadata::from_event_count(1),
223 MetaString::empty(),
224 service_path,
225 FrozenChunkedBytesBuffer::from(self.data),
226 )
227 }
228}
229
230struct RelayHandler {
232 tx: mpsc::Sender<OtlpPayload>,
233}
234
235impl RelayHandler {
236 fn new(tx: mpsc::Sender<OtlpPayload>) -> Self {
237 Self { tx }
238 }
239}
240
241#[async_trait]
242impl OtlpHandler for RelayHandler {
243 async fn handle_metrics(&self, body: Bytes) -> Result<(), GenericError> {
244 self.tx
245 .send(OtlpPayload::metrics(body))
246 .await
247 .error_context("Failed to send OTLP metrics payload to relay dispatcher: channel closed.")
248 }
249
250 async fn handle_logs(&self, body: Bytes) -> Result<(), GenericError> {
251 self.tx
252 .send(OtlpPayload::logs(body))
253 .await
254 .error_context("Failed to send OTLP logs payload to relay dispatcher: channel closed.")
255 }
256
257 async fn handle_traces(&self, body: Bytes) -> Result<(), GenericError> {
258 self.tx
259 .send(OtlpPayload::traces(body))
260 .await
261 .error_context("Failed to send OTLP traces payload to relay dispatcher: channel closed.")
262 }
263}
264
265#[cfg(test)]
266mod config_smoke {
267 use serde_json::json;
268
269 use super::OtlpRelayConfiguration;
270 use crate::config_registry::structs;
271 use crate::config_registry::test_support::run_config_smoke_tests;
272
273 #[tokio::test]
274 async fn smoke_test() {
275 run_config_smoke_tests(structs::OTLP_RELAY_CONFIGURATION, &[], json!({}), |cfg| {
276 cfg.as_typed::<OtlpRelayConfiguration>()
277 .expect("OtlpRelayConfiguration should deserialize")
278 })
279 .await
280 }
281}