saluki_components/relays/otlp/
mod.rs

1use std::sync::LazyLock;
2
3use async_trait::async_trait;
4use axum::body::Bytes;
5use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
6use saluki_common::buf::FrozenChunkedBytesBuffer;
7use saluki_config::GenericConfiguration;
8use saluki_core::components::relays::{Relay, RelayBuilder, RelayContext};
9use saluki_core::components::ComponentContext;
10use saluki_core::data_model::payload::{GrpcPayload, Payload, PayloadMetadata, PayloadType};
11use saluki_core::topology::OutputDefinition;
12use saluki_error::{ErrorContext as _, GenericError};
13use saluki_io::net::ListenAddress;
14use serde::Deserialize;
15use stringtheory::MetaString;
16use tokio::select;
17use tokio::sync::mpsc;
18use tracing::{debug, error};
19
20use crate::common::otlp::config::Receiver;
21use crate::common::otlp::{
22    build_metrics, Metrics, OtlpHandler, OtlpServerBuilder, OTLP_LOGS_GRPC_SERVICE_PATH,
23    OTLP_METRICS_GRPC_SERVICE_PATH, OTLP_TRACES_GRPC_SERVICE_PATH,
24};
25
26/// Configuration for the OTLP relay.
27#[derive(Deserialize, Default)]
28pub struct OtlpRelayConfiguration {
29    #[serde(default)]
30    otlp_config: OtlpRelayConfig,
31}
32
33/// OTLP configuration for the relay.
34#[derive(Deserialize, Default)]
35pub struct OtlpRelayConfig {
36    #[serde(default)]
37    receiver: Receiver,
38}
39
40impl OtlpRelayConfiguration {
41    /// Creates a new `OtlpRelayConfiguration` from the given generic configuration.
42    pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
43        config.as_typed().map_err(Into::into)
44    }
45
46    fn http_endpoint(&self) -> ListenAddress {
47        let transport = &self.otlp_config.receiver.protocols.http.transport;
48        let endpoint = &self.otlp_config.receiver.protocols.http.endpoint;
49        let address = format!("{}://{}", transport, endpoint);
50        ListenAddress::try_from(address).expect("valid HTTP endpoint")
51    }
52
53    fn grpc_endpoint(&self) -> ListenAddress {
54        let transport = &self.otlp_config.receiver.protocols.grpc.transport;
55        let endpoint = &self.otlp_config.receiver.protocols.grpc.endpoint;
56        let address = format!("{}://{}", transport, endpoint);
57        ListenAddress::try_from(address).expect("valid gRPC endpoint")
58    }
59
60    fn grpc_max_recv_msg_size_bytes(&self) -> usize {
61        (self.otlp_config.receiver.protocols.grpc.max_recv_msg_size_mib * 1024 * 1024) as usize
62    }
63}
64
65impl MemoryBounds for OtlpRelayConfiguration {
66    fn specify_bounds(&self, _builder: &mut MemoryBoundsBuilder) {}
67}
68
69#[async_trait]
70impl RelayBuilder for OtlpRelayConfiguration {
71    fn outputs(&self) -> &[OutputDefinition<PayloadType>] {
72        static OUTPUTS: LazyLock<Vec<OutputDefinition<PayloadType>>> = LazyLock::new(|| {
73            vec![
74                OutputDefinition::named_output("metrics", PayloadType::Grpc),
75                OutputDefinition::named_output("logs", PayloadType::Grpc),
76                OutputDefinition::named_output("traces", PayloadType::Grpc),
77            ]
78        });
79        &OUTPUTS
80    }
81
82    async fn build(&self, context: ComponentContext) -> Result<Box<dyn Relay + Send>, GenericError> {
83        Ok(Box::new(OtlpRelay {
84            http_endpoint: self.http_endpoint(),
85            grpc_endpoint: self.grpc_endpoint(),
86            grpc_max_recv_msg_size_bytes: self.grpc_max_recv_msg_size_bytes(),
87            metrics: build_metrics(&context),
88        }))
89    }
90}
91
92/// OTLP relay.
93///
94/// Receives OTLP metrics and logs via gRPC and HTTP, outputting payloads for downstream processing.
95pub struct OtlpRelay {
96    http_endpoint: ListenAddress,
97    grpc_endpoint: ListenAddress,
98    grpc_max_recv_msg_size_bytes: usize,
99    metrics: Metrics,
100}
101
102#[async_trait]
103impl Relay for OtlpRelay {
104    async fn run(self: Box<Self>, mut context: RelayContext) -> Result<(), GenericError> {
105        let Self {
106            http_endpoint,
107            grpc_endpoint,
108            grpc_max_recv_msg_size_bytes,
109            metrics,
110        } = *self;
111
112        let mut global_shutdown = context.take_shutdown_handle();
113        let mut health = context.take_health_handle();
114        let global_thread_pool = context.topology_context().global_thread_pool().clone();
115        let memory_limiter = context.topology_context().memory_limiter().clone();
116        let dispatcher = context.dispatcher();
117
118        let (payload_tx, mut payload_rx) = mpsc::channel(1024);
119
120        let handler = RelayHandler::new(payload_tx);
121        let server_builder = OtlpServerBuilder::new(
122            http_endpoint.clone(),
123            grpc_endpoint.clone(),
124            grpc_max_recv_msg_size_bytes,
125        );
126
127        let (http_shutdown, mut http_error) = server_builder
128            .build(handler, memory_limiter, global_thread_pool, metrics)
129            .await?;
130
131        health.mark_ready();
132        debug!(%http_endpoint, %grpc_endpoint, "OTLP relay started.");
133
134        loop {
135            select! {
136                _ = &mut global_shutdown => {
137                    debug!("Received shutdown signal.");
138                    break
139                },
140                error = &mut http_error => {
141                    if let Some(error) = error {
142                        debug!(%error, "HTTP server error.");
143                    }
144                    break;
145                },
146                Some(otlp_payload) = payload_rx.recv() => {
147                    let output_name = otlp_payload.signal_type.as_str();
148                    let payload = Payload::Grpc(otlp_payload.into_grpc_payload());
149                    if let Err(e) = dispatcher.dispatch_named(output_name, payload).await {
150                        error!(error = %e, output = output_name, "Failed to dispatch OTLP payload.");
151                    }
152                },
153                _ = health.live() => continue,
154            }
155        }
156
157        debug!("Stopping OTLP relay...");
158
159        http_shutdown.shutdown();
160
161        debug!("OTLP relay stopped.");
162
163        Ok(())
164    }
165}
166
167enum OtlpSignalType {
168    Metrics,
169    Logs,
170    Traces,
171}
172
173impl OtlpSignalType {
174    fn as_str(&self) -> &'static str {
175        match self {
176            OtlpSignalType::Metrics => "metrics",
177            OtlpSignalType::Logs => "logs",
178            OtlpSignalType::Traces => "traces",
179        }
180    }
181}
182
183struct OtlpPayload {
184    signal_type: OtlpSignalType,
185    data: Bytes,
186}
187
188impl OtlpPayload {
189    fn metrics(data: Bytes) -> Self {
190        Self {
191            signal_type: OtlpSignalType::Metrics,
192            data,
193        }
194    }
195
196    fn logs(data: Bytes) -> Self {
197        Self {
198            signal_type: OtlpSignalType::Logs,
199            data,
200        }
201    }
202
203    fn traces(data: Bytes) -> Self {
204        Self {
205            signal_type: OtlpSignalType::Traces,
206            data,
207        }
208    }
209
210    fn into_grpc_payload(self) -> GrpcPayload {
211        let service_path = match self.signal_type {
212            OtlpSignalType::Metrics => OTLP_METRICS_GRPC_SERVICE_PATH,
213            OtlpSignalType::Logs => OTLP_LOGS_GRPC_SERVICE_PATH,
214            OtlpSignalType::Traces => OTLP_TRACES_GRPC_SERVICE_PATH,
215        };
216
217        // We provide an empty endpoint because we want any consuming components to fill that in for themselves.
218        GrpcPayload::new(
219            PayloadMetadata::from_event_count(1),
220            MetaString::empty(),
221            service_path,
222            FrozenChunkedBytesBuffer::from(self.data),
223        )
224    }
225}
226
227/// Handler that forwards OTLP payloads to a channel for downstream processing.
228struct RelayHandler {
229    tx: mpsc::Sender<OtlpPayload>,
230}
231
232impl RelayHandler {
233    fn new(tx: mpsc::Sender<OtlpPayload>) -> Self {
234        Self { tx }
235    }
236}
237
238#[async_trait]
239impl OtlpHandler for RelayHandler {
240    async fn handle_metrics(&self, body: Bytes) -> Result<(), GenericError> {
241        self.tx
242            .send(OtlpPayload::metrics(body))
243            .await
244            .error_context("Failed to send OTLP metrics payload to relay dispatcher: channel closed.")
245    }
246
247    async fn handle_logs(&self, body: Bytes) -> Result<(), GenericError> {
248        self.tx
249            .send(OtlpPayload::logs(body))
250            .await
251            .error_context("Failed to send OTLP logs payload to relay dispatcher: channel closed.")
252    }
253
254    async fn handle_traces(&self, body: Bytes) -> Result<(), GenericError> {
255        self.tx
256            .send(OtlpPayload::traces(body))
257            .await
258            .error_context("Failed to send OTLP traces payload to relay dispatcher: channel closed.")
259    }
260}