saluki_components/relays/otlp/
mod.rs1use std::sync::LazyLock;
2
3use async_trait::async_trait;
4use axum::body::Bytes;
5use facet::Facet;
6use resource_accounting::{MemoryBounds, MemoryBoundsBuilder};
7use saluki_common::buf::FrozenChunkedBytesBuffer;
8use saluki_config::GenericConfiguration;
9use saluki_core::components::relays::{Relay, RelayBuilder, RelayContext};
10use saluki_core::components::ComponentContext;
11use saluki_core::data_model::payload::{GrpcPayload, Payload, PayloadMetadata, PayloadType};
12use saluki_core::topology::OutputDefinition;
13use saluki_error::{ErrorContext as _, GenericError};
14use saluki_io::net::ListenAddress;
15use serde::Deserialize;
16use stringtheory::MetaString;
17use tokio::sync::mpsc;
18use tokio::{pin, select};
19use tracing::{debug, error};
20
21use crate::common::otlp::config::Receiver;
22use crate::common::otlp::{
23 build_metrics, Metrics, OtlpHandler, OtlpServerBuilder, OTLP_LOGS_GRPC_SERVICE_PATH,
24 OTLP_METRICS_GRPC_SERVICE_PATH, OTLP_TRACES_GRPC_SERVICE_PATH,
25};
26
27#[derive(Deserialize, Default, Facet)]
29#[cfg_attr(test, derive(Debug, PartialEq, serde::Serialize))]
30pub struct OtlpRelayConfiguration {
31 #[serde(default)]
32 otlp_config: OtlpRelayConfig,
33}
34
35#[derive(Deserialize, Default, Facet)]
37#[cfg_attr(test, derive(Debug, PartialEq, serde::Serialize))]
38pub struct OtlpRelayConfig {
39 #[serde(default)]
40 receiver: Receiver,
41}
42
43impl OtlpRelayConfiguration {
44 pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
46 config.as_typed().map_err(Into::into)
47 }
48
49 fn http_endpoint(&self) -> ListenAddress {
50 let transport = &self.otlp_config.receiver.protocols.http.transport;
51 let endpoint = &self.otlp_config.receiver.protocols.http.endpoint;
52 let address = format!("{}://{}", transport, endpoint);
53 ListenAddress::try_from(address).expect("valid HTTP endpoint")
54 }
55
56 fn grpc_endpoint(&self) -> ListenAddress {
57 let transport = &self.otlp_config.receiver.protocols.grpc.transport;
58 let endpoint = &self.otlp_config.receiver.protocols.grpc.endpoint;
59 let address = format!("{}://{}", transport, endpoint);
60 ListenAddress::try_from(address).expect("valid gRPC endpoint")
61 }
62
63 fn grpc_max_recv_msg_size_bytes(&self) -> usize {
64 (self.otlp_config.receiver.protocols.grpc.max_recv_msg_size_mib * 1024 * 1024) as usize
65 }
66}
67
68impl MemoryBounds for OtlpRelayConfiguration {
69 fn specify_bounds(&self, _builder: &mut MemoryBoundsBuilder) {}
70}
71
72#[async_trait]
73impl RelayBuilder for OtlpRelayConfiguration {
74 fn outputs(&self) -> &[OutputDefinition<PayloadType>] {
75 static OUTPUTS: LazyLock<Vec<OutputDefinition<PayloadType>>> = LazyLock::new(|| {
76 vec![
77 OutputDefinition::named_output("metrics", PayloadType::Grpc),
78 OutputDefinition::named_output("logs", PayloadType::Grpc),
79 OutputDefinition::named_output("traces", PayloadType::Grpc),
80 ]
81 });
82 &OUTPUTS
83 }
84
85 async fn build(&self, context: ComponentContext) -> Result<Box<dyn Relay + Send>, GenericError> {
86 Ok(Box::new(OtlpRelay {
87 http_endpoint: self.http_endpoint(),
88 grpc_endpoint: self.grpc_endpoint(),
89 grpc_max_recv_msg_size_bytes: self.grpc_max_recv_msg_size_bytes(),
90 metrics: build_metrics(&context),
91 }))
92 }
93}
94
95pub struct OtlpRelay {
99 http_endpoint: ListenAddress,
100 grpc_endpoint: ListenAddress,
101 grpc_max_recv_msg_size_bytes: usize,
102 metrics: Metrics,
103}
104
105#[async_trait]
106impl Relay for OtlpRelay {
107 async fn run(self: Box<Self>, mut context: RelayContext) -> Result<(), GenericError> {
108 let Self {
109 http_endpoint,
110 grpc_endpoint,
111 grpc_max_recv_msg_size_bytes,
112 metrics,
113 } = *self;
114
115 let global_shutdown = context.take_shutdown_handle();
116 pin!(global_shutdown);
117
118 let mut health = context.take_health_handle();
119 let global_thread_pool = context.topology_context().global_thread_pool().clone();
120 let memory_limiter = context.topology_context().memory_limiter().clone();
121 let dispatcher = context.dispatcher();
122
123 let (payload_tx, mut payload_rx) = mpsc::channel(1024);
124
125 let handler = RelayHandler::new(payload_tx);
126 let server_builder = OtlpServerBuilder::new(
127 http_endpoint.clone(),
128 grpc_endpoint.clone(),
129 grpc_max_recv_msg_size_bytes,
130 );
131
132 let (http_shutdown, mut http_error) = server_builder
133 .build(handler, memory_limiter, global_thread_pool, metrics)
134 .await?;
135
136 health.mark_ready();
137 debug!(%http_endpoint, %grpc_endpoint, "OTLP relay started.");
138
139 loop {
140 select! {
141 _ = &mut global_shutdown => {
142 debug!("Received shutdown signal.");
143 break
144 },
145 error = &mut http_error => {
146 if let Some(error) = error {
147 debug!(%error, "HTTP server error.");
148 }
149 break;
150 },
151 Some(otlp_payload) = payload_rx.recv() => {
152 let output_name = otlp_payload.signal_type.as_str();
153 let payload = Payload::Grpc(otlp_payload.into_grpc_payload());
154 if let Err(e) = dispatcher.dispatch_named(output_name, payload).await {
155 error!(error = %e, output = output_name, "Failed to dispatch OTLP payload.");
156 }
157 },
158 _ = health.live() => continue,
159 }
160 }
161
162 debug!("Stopping OTLP relay...");
163
164 http_shutdown.shutdown();
165
166 debug!("OTLP relay stopped.");
167
168 Ok(())
169 }
170}
171
172enum OtlpSignalType {
173 Metrics,
174 Logs,
175 Traces,
176}
177
178impl OtlpSignalType {
179 fn as_str(&self) -> &'static str {
180 match self {
181 OtlpSignalType::Metrics => "metrics",
182 OtlpSignalType::Logs => "logs",
183 OtlpSignalType::Traces => "traces",
184 }
185 }
186}
187
188struct OtlpPayload {
189 signal_type: OtlpSignalType,
190 data: Bytes,
191}
192
193impl OtlpPayload {
194 fn metrics(data: Bytes) -> Self {
195 Self {
196 signal_type: OtlpSignalType::Metrics,
197 data,
198 }
199 }
200
201 fn logs(data: Bytes) -> Self {
202 Self {
203 signal_type: OtlpSignalType::Logs,
204 data,
205 }
206 }
207
208 fn traces(data: Bytes) -> Self {
209 Self {
210 signal_type: OtlpSignalType::Traces,
211 data,
212 }
213 }
214
215 fn into_grpc_payload(self) -> GrpcPayload {
216 let service_path = match self.signal_type {
217 OtlpSignalType::Metrics => OTLP_METRICS_GRPC_SERVICE_PATH,
218 OtlpSignalType::Logs => OTLP_LOGS_GRPC_SERVICE_PATH,
219 OtlpSignalType::Traces => OTLP_TRACES_GRPC_SERVICE_PATH,
220 };
221
222 GrpcPayload::new(
224 PayloadMetadata::from_event_count(1),
225 MetaString::empty(),
226 service_path,
227 FrozenChunkedBytesBuffer::from(self.data),
228 )
229 }
230}
231
232struct RelayHandler {
234 tx: mpsc::Sender<OtlpPayload>,
235}
236
237impl RelayHandler {
238 fn new(tx: mpsc::Sender<OtlpPayload>) -> Self {
239 Self { tx }
240 }
241}
242
243#[async_trait]
244impl OtlpHandler for RelayHandler {
245 async fn handle_metrics(&self, body: Bytes) -> Result<(), GenericError> {
246 self.tx
247 .send(OtlpPayload::metrics(body))
248 .await
249 .error_context("Failed to send OTLP metrics payload to relay dispatcher: channel closed.")
250 }
251
252 async fn handle_logs(&self, body: Bytes) -> Result<(), GenericError> {
253 self.tx
254 .send(OtlpPayload::logs(body))
255 .await
256 .error_context("Failed to send OTLP logs payload to relay dispatcher: channel closed.")
257 }
258
259 async fn handle_traces(&self, body: Bytes) -> Result<(), GenericError> {
260 self.tx
261 .send(OtlpPayload::traces(body))
262 .await
263 .error_context("Failed to send OTLP traces payload to relay dispatcher: channel closed.")
264 }
265}
266
267#[cfg(test)]
268mod config_smoke {
269 use datadog_agent_config_testing::config_registry::structs;
270 use datadog_agent_config_testing::run_config_smoke_tests;
271 use serde_json::json;
272
273 use super::OtlpRelayConfiguration;
274 use crate::config::{DatadogRemapper, KEY_ALIASES};
275
276 #[tokio::test]
277 async fn smoke_test() {
278 run_config_smoke_tests(
279 structs::OTLP_RELAY_CONFIGURATION,
280 &[],
281 json!({}),
282 |cfg| {
283 cfg.as_typed::<OtlpRelayConfiguration>()
284 .expect("OtlpRelayConfiguration should deserialize")
285 },
286 KEY_ALIASES,
287 DatadogRemapper::new,
288 )
289 .await
290 }
291}