saluki_components/relays/otlp/
mod.rs1use std::sync::LazyLock;
2
3use async_trait::async_trait;
4use axum::body::Bytes;
5use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
6use saluki_common::buf::FrozenChunkedBytesBuffer;
7use saluki_config::GenericConfiguration;
8use saluki_core::components::relays::{Relay, RelayBuilder, RelayContext};
9use saluki_core::components::ComponentContext;
10use saluki_core::data_model::payload::{GrpcPayload, Payload, PayloadMetadata, PayloadType};
11use saluki_core::topology::OutputDefinition;
12use saluki_error::{ErrorContext as _, GenericError};
13use saluki_io::net::ListenAddress;
14use serde::Deserialize;
15use stringtheory::MetaString;
16use tokio::select;
17use tokio::sync::mpsc;
18use tracing::{debug, error};
19
20use crate::common::otlp::config::Receiver;
21use crate::common::otlp::{
22 build_metrics, Metrics, OtlpHandler, OtlpServerBuilder, OTLP_LOGS_GRPC_SERVICE_PATH,
23 OTLP_METRICS_GRPC_SERVICE_PATH, OTLP_TRACES_GRPC_SERVICE_PATH,
24};
25
26#[derive(Deserialize, Default)]
28pub struct OtlpRelayConfiguration {
29 #[serde(default)]
30 otlp_config: OtlpRelayConfig,
31}
32
33#[derive(Deserialize, Default)]
35pub struct OtlpRelayConfig {
36 #[serde(default)]
37 receiver: Receiver,
38}
39
40impl OtlpRelayConfiguration {
41 pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
43 config.as_typed().map_err(Into::into)
44 }
45
46 fn http_endpoint(&self) -> ListenAddress {
47 let transport = &self.otlp_config.receiver.protocols.http.transport;
48 let endpoint = &self.otlp_config.receiver.protocols.http.endpoint;
49 let address = format!("{}://{}", transport, endpoint);
50 ListenAddress::try_from(address).expect("valid HTTP endpoint")
51 }
52
53 fn grpc_endpoint(&self) -> ListenAddress {
54 let transport = &self.otlp_config.receiver.protocols.grpc.transport;
55 let endpoint = &self.otlp_config.receiver.protocols.grpc.endpoint;
56 let address = format!("{}://{}", transport, endpoint);
57 ListenAddress::try_from(address).expect("valid gRPC endpoint")
58 }
59
60 fn grpc_max_recv_msg_size_bytes(&self) -> usize {
61 (self.otlp_config.receiver.protocols.grpc.max_recv_msg_size_mib * 1024 * 1024) as usize
62 }
63}
64
65impl MemoryBounds for OtlpRelayConfiguration {
66 fn specify_bounds(&self, _builder: &mut MemoryBoundsBuilder) {}
67}
68
69#[async_trait]
70impl RelayBuilder for OtlpRelayConfiguration {
71 fn outputs(&self) -> &[OutputDefinition<PayloadType>] {
72 static OUTPUTS: LazyLock<Vec<OutputDefinition<PayloadType>>> = LazyLock::new(|| {
73 vec![
74 OutputDefinition::named_output("metrics", PayloadType::Grpc),
75 OutputDefinition::named_output("logs", PayloadType::Grpc),
76 OutputDefinition::named_output("traces", PayloadType::Grpc),
77 ]
78 });
79 &OUTPUTS
80 }
81
82 async fn build(&self, context: ComponentContext) -> Result<Box<dyn Relay + Send>, GenericError> {
83 Ok(Box::new(OtlpRelay {
84 http_endpoint: self.http_endpoint(),
85 grpc_endpoint: self.grpc_endpoint(),
86 grpc_max_recv_msg_size_bytes: self.grpc_max_recv_msg_size_bytes(),
87 metrics: build_metrics(&context),
88 }))
89 }
90}
91
92pub struct OtlpRelay {
96 http_endpoint: ListenAddress,
97 grpc_endpoint: ListenAddress,
98 grpc_max_recv_msg_size_bytes: usize,
99 metrics: Metrics,
100}
101
102#[async_trait]
103impl Relay for OtlpRelay {
104 async fn run(self: Box<Self>, mut context: RelayContext) -> Result<(), GenericError> {
105 let Self {
106 http_endpoint,
107 grpc_endpoint,
108 grpc_max_recv_msg_size_bytes,
109 metrics,
110 } = *self;
111
112 let mut global_shutdown = context.take_shutdown_handle();
113 let mut health = context.take_health_handle();
114 let global_thread_pool = context.topology_context().global_thread_pool().clone();
115 let memory_limiter = context.topology_context().memory_limiter().clone();
116 let dispatcher = context.dispatcher();
117
118 let (payload_tx, mut payload_rx) = mpsc::channel(1024);
119
120 let handler = RelayHandler::new(payload_tx);
121 let server_builder = OtlpServerBuilder::new(
122 http_endpoint.clone(),
123 grpc_endpoint.clone(),
124 grpc_max_recv_msg_size_bytes,
125 );
126
127 let (http_shutdown, mut http_error) = server_builder
128 .build(handler, memory_limiter, global_thread_pool, metrics)
129 .await?;
130
131 health.mark_ready();
132 debug!(%http_endpoint, %grpc_endpoint, "OTLP relay started.");
133
134 loop {
135 select! {
136 _ = &mut global_shutdown => {
137 debug!("Received shutdown signal.");
138 break
139 },
140 error = &mut http_error => {
141 if let Some(error) = error {
142 debug!(%error, "HTTP server error.");
143 }
144 break;
145 },
146 Some(otlp_payload) = payload_rx.recv() => {
147 let payload = Payload::Grpc(otlp_payload.into_grpc_payload());
148 if let Err(e) = dispatcher.dispatch(payload).await {
149 error!(error = %e, "Failed to dispatch OTLP payload.");
150 }
151 },
152 _ = health.live() => continue,
153 }
154 }
155
156 debug!("Stopping OTLP relay...");
157
158 http_shutdown.shutdown();
159
160 debug!("OTLP relay stopped.");
161
162 Ok(())
163 }
164}
165
166enum OtlpSignalType {
167 Metrics,
168 Logs,
169 Traces,
170}
171
172struct OtlpPayload {
173 signal_type: OtlpSignalType,
174 data: Bytes,
175}
176
177impl OtlpPayload {
178 fn metrics(data: Bytes) -> Self {
179 Self {
180 signal_type: OtlpSignalType::Metrics,
181 data,
182 }
183 }
184
185 fn logs(data: Bytes) -> Self {
186 Self {
187 signal_type: OtlpSignalType::Logs,
188 data,
189 }
190 }
191
192 fn traces(data: Bytes) -> Self {
193 Self {
194 signal_type: OtlpSignalType::Traces,
195 data,
196 }
197 }
198
199 fn into_grpc_payload(self) -> GrpcPayload {
200 let service_path = match self.signal_type {
201 OtlpSignalType::Metrics => OTLP_METRICS_GRPC_SERVICE_PATH,
202 OtlpSignalType::Logs => OTLP_LOGS_GRPC_SERVICE_PATH,
203 OtlpSignalType::Traces => OTLP_TRACES_GRPC_SERVICE_PATH,
204 };
205
206 GrpcPayload::new(
208 PayloadMetadata::from_event_count(1),
209 MetaString::empty(),
210 service_path,
211 FrozenChunkedBytesBuffer::from(self.data),
212 )
213 }
214}
215
216struct RelayHandler {
218 tx: mpsc::Sender<OtlpPayload>,
219}
220
221impl RelayHandler {
222 fn new(tx: mpsc::Sender<OtlpPayload>) -> Self {
223 Self { tx }
224 }
225}
226
227#[async_trait]
228impl OtlpHandler for RelayHandler {
229 async fn handle_metrics(&self, body: Bytes) -> Result<(), GenericError> {
230 self.tx
231 .send(OtlpPayload::metrics(body))
232 .await
233 .error_context("Failed to send OTLP metrics payload to relay dispatcher: channel closed.")
234 }
235
236 async fn handle_logs(&self, body: Bytes) -> Result<(), GenericError> {
237 self.tx
238 .send(OtlpPayload::logs(body))
239 .await
240 .error_context("Failed to send OTLP logs payload to relay dispatcher: channel closed.")
241 }
242
243 async fn handle_traces(&self, body: Bytes) -> Result<(), GenericError> {
244 self.tx
245 .send(OtlpPayload::traces(body))
246 .await
247 .error_context("Failed to send OTLP traces payload to relay dispatcher: channel closed.")
248 }
249}