Skip to main content

saluki_components/relays/otlp/
mod.rs

1use std::sync::LazyLock;
2
3use async_trait::async_trait;
4use axum::body::Bytes;
5use facet::Facet;
6use resource_accounting::{MemoryBounds, MemoryBoundsBuilder};
7use saluki_common::buf::FrozenChunkedBytesBuffer;
8use saluki_config::GenericConfiguration;
9use saluki_core::components::relays::{Relay, RelayBuilder, RelayContext};
10use saluki_core::components::ComponentContext;
11use saluki_core::data_model::payload::{GrpcPayload, Payload, PayloadMetadata, PayloadType};
12use saluki_core::topology::OutputDefinition;
13use saluki_error::{ErrorContext as _, GenericError};
14use saluki_io::net::ListenAddress;
15use serde::Deserialize;
16use stringtheory::MetaString;
17use tokio::sync::mpsc;
18use tokio::{pin, select};
19use tracing::{debug, error};
20
21use crate::common::otlp::config::Receiver;
22use crate::common::otlp::{
23    build_metrics, Metrics, OtlpHandler, OtlpServerBuilder, OTLP_LOGS_GRPC_SERVICE_PATH,
24    OTLP_METRICS_GRPC_SERVICE_PATH, OTLP_TRACES_GRPC_SERVICE_PATH,
25};
26
27/// Configuration for the OTLP relay.
28#[derive(Deserialize, Default, Facet)]
29#[cfg_attr(test, derive(Debug, PartialEq, serde::Serialize))]
30pub struct OtlpRelayConfiguration {
31    #[serde(default)]
32    otlp_config: OtlpRelayConfig,
33}
34
35/// OTLP configuration for the relay.
36#[derive(Deserialize, Default, Facet)]
37#[cfg_attr(test, derive(Debug, PartialEq, serde::Serialize))]
38pub struct OtlpRelayConfig {
39    #[serde(default)]
40    receiver: Receiver,
41}
42
43impl OtlpRelayConfiguration {
44    /// Creates a new `OtlpRelayConfiguration` from the given generic configuration.
45    pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
46        config.as_typed().map_err(Into::into)
47    }
48
49    fn http_endpoint(&self) -> ListenAddress {
50        let transport = &self.otlp_config.receiver.protocols.http.transport;
51        let endpoint = &self.otlp_config.receiver.protocols.http.endpoint;
52        let address = format!("{}://{}", transport, endpoint);
53        ListenAddress::try_from(address).expect("valid HTTP endpoint")
54    }
55
56    fn grpc_endpoint(&self) -> ListenAddress {
57        let transport = &self.otlp_config.receiver.protocols.grpc.transport;
58        let endpoint = &self.otlp_config.receiver.protocols.grpc.endpoint;
59        let address = format!("{}://{}", transport, endpoint);
60        ListenAddress::try_from(address).expect("valid gRPC endpoint")
61    }
62
63    fn grpc_max_recv_msg_size_bytes(&self) -> usize {
64        (self.otlp_config.receiver.protocols.grpc.max_recv_msg_size_mib * 1024 * 1024) as usize
65    }
66}
67
68impl MemoryBounds for OtlpRelayConfiguration {
69    fn specify_bounds(&self, _builder: &mut MemoryBoundsBuilder) {}
70}
71
72#[async_trait]
73impl RelayBuilder for OtlpRelayConfiguration {
74    fn outputs(&self) -> &[OutputDefinition<PayloadType>] {
75        static OUTPUTS: LazyLock<Vec<OutputDefinition<PayloadType>>> = LazyLock::new(|| {
76            vec![
77                OutputDefinition::named_output("metrics", PayloadType::Grpc),
78                OutputDefinition::named_output("logs", PayloadType::Grpc),
79                OutputDefinition::named_output("traces", PayloadType::Grpc),
80            ]
81        });
82        &OUTPUTS
83    }
84
85    async fn build(&self, context: ComponentContext) -> Result<Box<dyn Relay + Send>, GenericError> {
86        Ok(Box::new(OtlpRelay {
87            http_endpoint: self.http_endpoint(),
88            grpc_endpoint: self.grpc_endpoint(),
89            grpc_max_recv_msg_size_bytes: self.grpc_max_recv_msg_size_bytes(),
90            metrics: build_metrics(&context),
91        }))
92    }
93}
94
95/// OTLP relay.
96///
97/// Receives OTLP metrics and logs via gRPC and HTTP, outputting payloads for downstream processing.
98pub struct OtlpRelay {
99    http_endpoint: ListenAddress,
100    grpc_endpoint: ListenAddress,
101    grpc_max_recv_msg_size_bytes: usize,
102    metrics: Metrics,
103}
104
105#[async_trait]
106impl Relay for OtlpRelay {
107    async fn run(self: Box<Self>, mut context: RelayContext) -> Result<(), GenericError> {
108        let Self {
109            http_endpoint,
110            grpc_endpoint,
111            grpc_max_recv_msg_size_bytes,
112            metrics,
113        } = *self;
114
115        let global_shutdown = context.take_shutdown_handle();
116        pin!(global_shutdown);
117
118        let mut health = context.take_health_handle();
119        let global_thread_pool = context.topology_context().global_thread_pool().clone();
120        let memory_limiter = context.topology_context().memory_limiter().clone();
121        let dispatcher = context.dispatcher();
122
123        let (payload_tx, mut payload_rx) = mpsc::channel(1024);
124
125        let handler = RelayHandler::new(payload_tx);
126        let server_builder = OtlpServerBuilder::new(
127            http_endpoint.clone(),
128            grpc_endpoint.clone(),
129            grpc_max_recv_msg_size_bytes,
130        );
131
132        let (http_shutdown, mut http_error) = server_builder
133            .build(handler, memory_limiter, global_thread_pool, metrics)
134            .await?;
135
136        health.mark_ready();
137        debug!(%http_endpoint, %grpc_endpoint, "OTLP relay started.");
138
139        loop {
140            select! {
141                _ = &mut global_shutdown => {
142                    debug!("Received shutdown signal.");
143                    break
144                },
145                error = &mut http_error => {
146                    if let Some(error) = error {
147                        debug!(%error, "HTTP server error.");
148                    }
149                    break;
150                },
151                Some(otlp_payload) = payload_rx.recv() => {
152                    let output_name = otlp_payload.signal_type.as_str();
153                    let payload = Payload::Grpc(otlp_payload.into_grpc_payload());
154                    if let Err(e) = dispatcher.dispatch_named(output_name, payload).await {
155                        error!(error = %e, output = output_name, "Failed to dispatch OTLP payload.");
156                    }
157                },
158                _ = health.live() => continue,
159            }
160        }
161
162        debug!("Stopping OTLP relay...");
163
164        http_shutdown.shutdown();
165
166        debug!("OTLP relay stopped.");
167
168        Ok(())
169    }
170}
171
172enum OtlpSignalType {
173    Metrics,
174    Logs,
175    Traces,
176}
177
178impl OtlpSignalType {
179    fn as_str(&self) -> &'static str {
180        match self {
181            OtlpSignalType::Metrics => "metrics",
182            OtlpSignalType::Logs => "logs",
183            OtlpSignalType::Traces => "traces",
184        }
185    }
186}
187
188struct OtlpPayload {
189    signal_type: OtlpSignalType,
190    data: Bytes,
191}
192
193impl OtlpPayload {
194    fn metrics(data: Bytes) -> Self {
195        Self {
196            signal_type: OtlpSignalType::Metrics,
197            data,
198        }
199    }
200
201    fn logs(data: Bytes) -> Self {
202        Self {
203            signal_type: OtlpSignalType::Logs,
204            data,
205        }
206    }
207
208    fn traces(data: Bytes) -> Self {
209        Self {
210            signal_type: OtlpSignalType::Traces,
211            data,
212        }
213    }
214
215    fn into_grpc_payload(self) -> GrpcPayload {
216        let service_path = match self.signal_type {
217            OtlpSignalType::Metrics => OTLP_METRICS_GRPC_SERVICE_PATH,
218            OtlpSignalType::Logs => OTLP_LOGS_GRPC_SERVICE_PATH,
219            OtlpSignalType::Traces => OTLP_TRACES_GRPC_SERVICE_PATH,
220        };
221
222        // We provide an empty endpoint because we want any consuming components to fill that in for themselves.
223        GrpcPayload::new(
224            PayloadMetadata::from_event_count(1),
225            MetaString::empty(),
226            service_path,
227            FrozenChunkedBytesBuffer::from(self.data),
228        )
229    }
230}
231
232/// Handler that forwards OTLP payloads to a channel for downstream processing.
233struct RelayHandler {
234    tx: mpsc::Sender<OtlpPayload>,
235}
236
237impl RelayHandler {
238    fn new(tx: mpsc::Sender<OtlpPayload>) -> Self {
239        Self { tx }
240    }
241}
242
243#[async_trait]
244impl OtlpHandler for RelayHandler {
245    async fn handle_metrics(&self, body: Bytes) -> Result<(), GenericError> {
246        self.tx
247            .send(OtlpPayload::metrics(body))
248            .await
249            .error_context("Failed to send OTLP metrics payload to relay dispatcher: channel closed.")
250    }
251
252    async fn handle_logs(&self, body: Bytes) -> Result<(), GenericError> {
253        self.tx
254            .send(OtlpPayload::logs(body))
255            .await
256            .error_context("Failed to send OTLP logs payload to relay dispatcher: channel closed.")
257    }
258
259    async fn handle_traces(&self, body: Bytes) -> Result<(), GenericError> {
260        self.tx
261            .send(OtlpPayload::traces(body))
262            .await
263            .error_context("Failed to send OTLP traces payload to relay dispatcher: channel closed.")
264    }
265}
266
267#[cfg(test)]
268mod config_smoke {
269    use datadog_agent_config_testing::config_registry::structs;
270    use datadog_agent_config_testing::run_config_smoke_tests;
271    use serde_json::json;
272
273    use super::OtlpRelayConfiguration;
274    use crate::config::{DatadogRemapper, KEY_ALIASES};
275
276    #[tokio::test]
277    async fn smoke_test() {
278        run_config_smoke_tests(
279            structs::OTLP_RELAY_CONFIGURATION,
280            &[],
281            json!({}),
282            |cfg| {
283                cfg.as_typed::<OtlpRelayConfiguration>()
284                    .expect("OtlpRelayConfiguration should deserialize")
285            },
286            KEY_ALIASES,
287            DatadogRemapper::new,
288        )
289        .await
290    }
291}