Skip to main content

saluki_components/relays/otlp/
mod.rs

1use std::sync::LazyLock;
2
3use async_trait::async_trait;
4use axum::body::Bytes;
5use facet::Facet;
6use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
7use saluki_common::buf::FrozenChunkedBytesBuffer;
8use saluki_config::GenericConfiguration;
9use saluki_core::components::relays::{Relay, RelayBuilder, RelayContext};
10use saluki_core::components::ComponentContext;
11use saluki_core::data_model::payload::{GrpcPayload, Payload, PayloadMetadata, PayloadType};
12use saluki_core::topology::OutputDefinition;
13use saluki_error::{ErrorContext as _, GenericError};
14use saluki_io::net::ListenAddress;
15use serde::Deserialize;
16use stringtheory::MetaString;
17use tokio::select;
18use tokio::sync::mpsc;
19use tracing::{debug, error};
20
21use crate::common::otlp::config::Receiver;
22use crate::common::otlp::{
23    build_metrics, Metrics, OtlpHandler, OtlpServerBuilder, OTLP_LOGS_GRPC_SERVICE_PATH,
24    OTLP_METRICS_GRPC_SERVICE_PATH, OTLP_TRACES_GRPC_SERVICE_PATH,
25};
26
27/// Configuration for the OTLP relay.
28#[derive(Deserialize, Default, Facet)]
29#[cfg_attr(test, derive(Debug, PartialEq, serde::Serialize))]
30pub struct OtlpRelayConfiguration {
31    #[serde(default)]
32    otlp_config: OtlpRelayConfig,
33}
34
35/// OTLP configuration for the relay.
36#[derive(Deserialize, Default, Facet)]
37#[cfg_attr(test, derive(Debug, PartialEq, serde::Serialize))]
38pub struct OtlpRelayConfig {
39    #[serde(default)]
40    receiver: Receiver,
41}
42
43impl OtlpRelayConfiguration {
44    /// Creates a new `OtlpRelayConfiguration` from the given generic configuration.
45    pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
46        config.as_typed().map_err(Into::into)
47    }
48
49    fn http_endpoint(&self) -> ListenAddress {
50        let transport = &self.otlp_config.receiver.protocols.http.transport;
51        let endpoint = &self.otlp_config.receiver.protocols.http.endpoint;
52        let address = format!("{}://{}", transport, endpoint);
53        ListenAddress::try_from(address).expect("valid HTTP endpoint")
54    }
55
56    fn grpc_endpoint(&self) -> ListenAddress {
57        let transport = &self.otlp_config.receiver.protocols.grpc.transport;
58        let endpoint = &self.otlp_config.receiver.protocols.grpc.endpoint;
59        let address = format!("{}://{}", transport, endpoint);
60        ListenAddress::try_from(address).expect("valid gRPC endpoint")
61    }
62
63    fn grpc_max_recv_msg_size_bytes(&self) -> usize {
64        (self.otlp_config.receiver.protocols.grpc.max_recv_msg_size_mib * 1024 * 1024) as usize
65    }
66}
67
68impl MemoryBounds for OtlpRelayConfiguration {
69    fn specify_bounds(&self, _builder: &mut MemoryBoundsBuilder) {}
70}
71
72#[async_trait]
73impl RelayBuilder for OtlpRelayConfiguration {
74    fn outputs(&self) -> &[OutputDefinition<PayloadType>] {
75        static OUTPUTS: LazyLock<Vec<OutputDefinition<PayloadType>>> = LazyLock::new(|| {
76            vec![
77                OutputDefinition::named_output("metrics", PayloadType::Grpc),
78                OutputDefinition::named_output("logs", PayloadType::Grpc),
79                OutputDefinition::named_output("traces", PayloadType::Grpc),
80            ]
81        });
82        &OUTPUTS
83    }
84
85    async fn build(&self, context: ComponentContext) -> Result<Box<dyn Relay + Send>, GenericError> {
86        Ok(Box::new(OtlpRelay {
87            http_endpoint: self.http_endpoint(),
88            grpc_endpoint: self.grpc_endpoint(),
89            grpc_max_recv_msg_size_bytes: self.grpc_max_recv_msg_size_bytes(),
90            metrics: build_metrics(&context),
91        }))
92    }
93}
94
95/// OTLP relay.
96///
97/// Receives OTLP metrics and logs via gRPC and HTTP, outputting payloads for downstream processing.
98pub struct OtlpRelay {
99    http_endpoint: ListenAddress,
100    grpc_endpoint: ListenAddress,
101    grpc_max_recv_msg_size_bytes: usize,
102    metrics: Metrics,
103}
104
105#[async_trait]
106impl Relay for OtlpRelay {
107    async fn run(self: Box<Self>, mut context: RelayContext) -> Result<(), GenericError> {
108        let Self {
109            http_endpoint,
110            grpc_endpoint,
111            grpc_max_recv_msg_size_bytes,
112            metrics,
113        } = *self;
114
115        let mut global_shutdown = context.take_shutdown_handle();
116        let mut health = context.take_health_handle();
117        let global_thread_pool = context.topology_context().global_thread_pool().clone();
118        let memory_limiter = context.topology_context().memory_limiter().clone();
119        let dispatcher = context.dispatcher();
120
121        let (payload_tx, mut payload_rx) = mpsc::channel(1024);
122
123        let handler = RelayHandler::new(payload_tx);
124        let server_builder = OtlpServerBuilder::new(
125            http_endpoint.clone(),
126            grpc_endpoint.clone(),
127            grpc_max_recv_msg_size_bytes,
128        );
129
130        let (http_shutdown, mut http_error) = server_builder
131            .build(handler, memory_limiter, global_thread_pool, metrics)
132            .await?;
133
134        health.mark_ready();
135        debug!(%http_endpoint, %grpc_endpoint, "OTLP relay started.");
136
137        loop {
138            select! {
139                _ = &mut global_shutdown => {
140                    debug!("Received shutdown signal.");
141                    break
142                },
143                error = &mut http_error => {
144                    if let Some(error) = error {
145                        debug!(%error, "HTTP server error.");
146                    }
147                    break;
148                },
149                Some(otlp_payload) = payload_rx.recv() => {
150                    let output_name = otlp_payload.signal_type.as_str();
151                    let payload = Payload::Grpc(otlp_payload.into_grpc_payload());
152                    if let Err(e) = dispatcher.dispatch_named(output_name, payload).await {
153                        error!(error = %e, output = output_name, "Failed to dispatch OTLP payload.");
154                    }
155                },
156                _ = health.live() => continue,
157            }
158        }
159
160        debug!("Stopping OTLP relay...");
161
162        http_shutdown.shutdown();
163
164        debug!("OTLP relay stopped.");
165
166        Ok(())
167    }
168}
169
170enum OtlpSignalType {
171    Metrics,
172    Logs,
173    Traces,
174}
175
176impl OtlpSignalType {
177    fn as_str(&self) -> &'static str {
178        match self {
179            OtlpSignalType::Metrics => "metrics",
180            OtlpSignalType::Logs => "logs",
181            OtlpSignalType::Traces => "traces",
182        }
183    }
184}
185
186struct OtlpPayload {
187    signal_type: OtlpSignalType,
188    data: Bytes,
189}
190
191impl OtlpPayload {
192    fn metrics(data: Bytes) -> Self {
193        Self {
194            signal_type: OtlpSignalType::Metrics,
195            data,
196        }
197    }
198
199    fn logs(data: Bytes) -> Self {
200        Self {
201            signal_type: OtlpSignalType::Logs,
202            data,
203        }
204    }
205
206    fn traces(data: Bytes) -> Self {
207        Self {
208            signal_type: OtlpSignalType::Traces,
209            data,
210        }
211    }
212
213    fn into_grpc_payload(self) -> GrpcPayload {
214        let service_path = match self.signal_type {
215            OtlpSignalType::Metrics => OTLP_METRICS_GRPC_SERVICE_PATH,
216            OtlpSignalType::Logs => OTLP_LOGS_GRPC_SERVICE_PATH,
217            OtlpSignalType::Traces => OTLP_TRACES_GRPC_SERVICE_PATH,
218        };
219
220        // We provide an empty endpoint because we want any consuming components to fill that in for themselves.
221        GrpcPayload::new(
222            PayloadMetadata::from_event_count(1),
223            MetaString::empty(),
224            service_path,
225            FrozenChunkedBytesBuffer::from(self.data),
226        )
227    }
228}
229
230/// Handler that forwards OTLP payloads to a channel for downstream processing.
231struct RelayHandler {
232    tx: mpsc::Sender<OtlpPayload>,
233}
234
235impl RelayHandler {
236    fn new(tx: mpsc::Sender<OtlpPayload>) -> Self {
237        Self { tx }
238    }
239}
240
241#[async_trait]
242impl OtlpHandler for RelayHandler {
243    async fn handle_metrics(&self, body: Bytes) -> Result<(), GenericError> {
244        self.tx
245            .send(OtlpPayload::metrics(body))
246            .await
247            .error_context("Failed to send OTLP metrics payload to relay dispatcher: channel closed.")
248    }
249
250    async fn handle_logs(&self, body: Bytes) -> Result<(), GenericError> {
251        self.tx
252            .send(OtlpPayload::logs(body))
253            .await
254            .error_context("Failed to send OTLP logs payload to relay dispatcher: channel closed.")
255    }
256
257    async fn handle_traces(&self, body: Bytes) -> Result<(), GenericError> {
258        self.tx
259            .send(OtlpPayload::traces(body))
260            .await
261            .error_context("Failed to send OTLP traces payload to relay dispatcher: channel closed.")
262    }
263}
264
265#[cfg(test)]
266mod config_smoke {
267    use serde_json::json;
268
269    use super::OtlpRelayConfiguration;
270    use crate::config_registry::structs;
271    use crate::config_registry::test_support::run_config_smoke_tests;
272
273    #[tokio::test]
274    async fn smoke_test() {
275        run_config_smoke_tests(structs::OTLP_RELAY_CONFIGURATION, &[], json!({}), |cfg| {
276            cfg.as_typed::<OtlpRelayConfiguration>()
277                .expect("OtlpRelayConfiguration should deserialize")
278        })
279        .await
280    }
281}