Skip to main content

saluki_components/forwarders/otlp/
mod.rs

1use std::time::Duration;
2
3use async_trait::async_trait;
4use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
5use otlp_protos::opentelemetry::proto::collector::logs::v1::logs_service_client::LogsServiceClient;
6use otlp_protos::opentelemetry::proto::collector::logs::v1::ExportLogsServiceRequest;
7use otlp_protos::opentelemetry::proto::collector::metrics::v1::metrics_service_client::MetricsServiceClient;
8use otlp_protos::opentelemetry::proto::collector::metrics::v1::ExportMetricsServiceRequest;
9use otlp_protos::opentelemetry::proto::collector::trace::v1::{
10    trace_service_client::TraceServiceClient, ExportTraceServiceRequest,
11};
12use prost::Message;
13use saluki_common::buf::FrozenChunkedBytesBuffer;
14use saluki_config::GenericConfiguration;
15use saluki_core::data_model::payload::Payload;
16use saluki_core::{
17    components::{forwarders::*, ComponentContext},
18    data_model::payload::PayloadType,
19};
20use saluki_error::ErrorContext as _;
21use saluki_error::GenericError;
22use stringtheory::MetaString;
23use tokio::select;
24use tonic::transport::Channel;
25use tracing::{debug, error, warn};
26
27use crate::common::otlp::{OTLP_LOGS_GRPC_SERVICE_PATH, OTLP_METRICS_GRPC_SERVICE_PATH, OTLP_TRACES_GRPC_SERVICE_PATH};
28
29/// OTLP forwarder configuration.
30///
31/// Forwards OTLP metrics and logs to the Core Agent, and traces to the Trace Agent.
32#[derive(Clone)]
33pub struct OtlpForwarderConfiguration {
34    core_agent_otlp_grpc_endpoint: String,
35    core_agent_traces_internal_port: u16,
36}
37
38impl OtlpForwarderConfiguration {
39    /// Creates a new `OtlpForwarderConfiguration` from the given configuration.
40    pub fn from_configuration(
41        config: &GenericConfiguration, core_agent_otlp_grpc_endpoint: String,
42    ) -> Result<Self, GenericError> {
43        let core_agent_traces_internal_port = config
44            .try_get_typed("otlp_config.traces.internal_port")?
45            .unwrap_or(5003);
46        Ok(Self {
47            core_agent_otlp_grpc_endpoint,
48            core_agent_traces_internal_port,
49        })
50    }
51}
52
53#[async_trait]
54impl ForwarderBuilder for OtlpForwarderConfiguration {
55    fn input_payload_type(&self) -> PayloadType {
56        PayloadType::Grpc
57    }
58
59    async fn build(&self, _context: ComponentContext) -> Result<Box<dyn Forwarder + Send>, GenericError> {
60        let trace_agent_endpoint = format!("http://localhost:{}", self.core_agent_traces_internal_port);
61        let trace_agent_channel = Channel::from_shared(trace_agent_endpoint.clone())
62            .error_context("Failed to construct gRPC channel due to an invalid endpoint.")?
63            .connect_lazy();
64        let trace_agent_client = TraceServiceClient::new(trace_agent_channel);
65
66        let normalized_endpoint = normalize_endpoint(&self.core_agent_otlp_grpc_endpoint);
67        let core_agent_grpc_channel = Channel::from_shared(normalized_endpoint)
68            .error_context("Failed to construct gRPC channel due to an invalid endpoint.")?
69            .connect_timeout(Duration::from_secs(5))
70            .connect_lazy();
71
72        let core_agent_metrics_client = MetricsServiceClient::new(core_agent_grpc_channel.clone());
73        let core_agent_logs_client = LogsServiceClient::new(core_agent_grpc_channel);
74
75        Ok(Box::new(OtlpForwarder {
76            trace_agent_client,
77            core_agent_metrics_client,
78            core_agent_logs_client,
79        }))
80    }
81}
82
83impl MemoryBounds for OtlpForwarderConfiguration {
84    fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
85        builder.minimum().with_single_value::<OtlpForwarder>("component struct");
86    }
87}
88
89struct OtlpForwarder {
90    trace_agent_client: TraceServiceClient<Channel>,
91    core_agent_metrics_client: MetricsServiceClient<Channel>,
92    core_agent_logs_client: LogsServiceClient<Channel>,
93}
94
95#[async_trait]
96impl Forwarder for OtlpForwarder {
97    async fn run(mut self: Box<Self>, mut context: ForwarderContext) -> Result<(), GenericError> {
98        let Self {
99            mut trace_agent_client,
100            mut core_agent_metrics_client,
101            mut core_agent_logs_client,
102        } = *self;
103
104        let mut health = context.take_health_handle();
105
106        health.mark_ready();
107        debug!("OTLP forwarder started.");
108
109        loop {
110            select! {
111                _ = health.live() => continue,
112                maybe_payload = context.payloads().next() => match maybe_payload {
113                    Some(payload) => match payload {
114                        Payload::Grpc(grpc_payload) => {
115                            // Extract the parts of the payload, and make sure we have an OTLP payload, otherwise
116                            // we skip it and move on.
117                            let (_, endpoint, service_path, body) = grpc_payload.into_parts();
118                            match &service_path {
119                                path if *path == OTLP_TRACES_GRPC_SERVICE_PATH => {
120                                    export_traces(&mut trace_agent_client, &endpoint, &service_path, body).await;
121                                }
122                                path if *path == OTLP_METRICS_GRPC_SERVICE_PATH => {
123                                    export_metrics(&mut core_agent_metrics_client, &endpoint, &service_path, body).await;
124                                }
125                                path if *path == OTLP_LOGS_GRPC_SERVICE_PATH => {
126                                    export_logs(&mut core_agent_logs_client, &endpoint, &service_path, body).await;
127                                }
128                                _ => {
129                                    warn!(service_path = %service_path, "Received gRPC payload with unknown service path. Skipping.");
130                                    continue;
131                                }
132                            }
133
134                        },
135                        _ => continue,
136                    },
137                    None => break,
138                },
139            }
140        }
141
142        debug!("OTLP forwarder stopped.");
143
144        Ok(())
145    }
146}
147
148async fn export_traces(
149    trace_agent_client: &mut TraceServiceClient<Channel>, endpoint: &MetaString, service_path: &MetaString,
150    body: FrozenChunkedBytesBuffer,
151) {
152    // Decode the raw request payload into a typed body so we can export it.
153    //
154    // TODO: This is suboptimal since we know the payload should be valid as it was decoded when it
155    // was ingested, and only after that converted to raw bytes. It would be nice to just forward
156    // the bytes as-is without decoding it again here just to satisfy the client interface, but no
157    // such API currently exists.
158    let body = body.into_bytes();
159    let request = match ExportTraceServiceRequest::decode(body) {
160        Ok(req) => req,
161        Err(e) => {
162            error!(error = %e, "Failed to decode trace export request from payload.");
163            return;
164        }
165    };
166
167    match trace_agent_client.export(request).await {
168        Ok(response) => {
169            let resp = response.into_inner();
170            if let Some(partial_success) = resp.partial_success {
171                if partial_success.rejected_spans > 0 {
172                    warn!(
173                        rejected_spans = partial_success.rejected_spans,
174                        error = %partial_success.error_message,
175                        "Trace export partially failed."
176                    );
177                }
178            }
179        }
180        Err(e) => {
181            error!(error = %e, %endpoint, %service_path, "Failed to export traces to Trace Agent.");
182        }
183    }
184}
185
186async fn export_metrics(
187    core_agent_grpc_client: &mut MetricsServiceClient<Channel>, endpoint: &MetaString, service_path: &MetaString,
188    body: FrozenChunkedBytesBuffer,
189) {
190    // Decode the raw request payload into a typed body so we can export it.
191    //
192    // TODO: This is suboptimal since we know the payload should be valid as it was decoded when it
193    // was ingested, and only after that converted to raw bytes. It would be nice to just forward
194    // the bytes as-is without decoding it again here just to satisfy the client interface, but no
195    // such API currently exists.
196    let body = body.into_bytes();
197
198    let request = match ExportMetricsServiceRequest::decode(body) {
199        Ok(req) => req,
200        Err(e) => {
201            error!(error = %e, "Failed to decode metrics or logs export request from payload.");
202            return;
203        }
204    };
205
206    match core_agent_grpc_client.export(request).await {
207        Ok(response) => {
208            let resp = response.into_inner();
209            if let Some(partial_success) = resp.partial_success {
210                if partial_success.rejected_data_points > 0 {
211                    warn!(
212                        rejected_data_points = partial_success.rejected_data_points,
213                        error = %partial_success.error_message,
214                        "Metrics export partially failed."
215                    );
216                }
217            }
218        }
219        Err(e) => {
220            error!(error = %e, %endpoint, %service_path, "Failed to export metrics to Core Agent.");
221        }
222    }
223}
224
225async fn export_logs(
226    core_agent_grpc_client: &mut LogsServiceClient<Channel>, endpoint: &MetaString, service_path: &MetaString,
227    body: FrozenChunkedBytesBuffer,
228) {
229    // Decode the raw request payload into a typed body so we can export it.
230    //
231    // TODO: This is suboptimal since we know the payload should be valid as it was decoded when it
232    // was ingested, and only after that converted to raw bytes. It would be nice to just forward
233    // the bytes as-is without decoding it again here just to satisfy the client interface, but no
234    // such API currently exists.
235    let body = body.into_bytes();
236
237    let request = match ExportLogsServiceRequest::decode(body) {
238        Ok(req) => req,
239        Err(e) => {
240            error!(error = %e, "Failed to decode logs export request from payload.");
241            return;
242        }
243    };
244
245    match core_agent_grpc_client.export(request).await {
246        Ok(response) => {
247            let resp = response.into_inner();
248            if let Some(partial_success) = resp.partial_success {
249                if partial_success.rejected_log_records > 0 {
250                    warn!(
251                        rejected_log_records = partial_success.rejected_log_records,
252                        error = %partial_success.error_message,
253                        "Trace export partially failed."
254                    );
255                }
256            }
257        }
258        Err(e) => {
259            error!(error = %e, %endpoint, %service_path, "Failed to export metrics to Core Agent.");
260        }
261    }
262}
263
264fn normalize_endpoint(endpoint: &str) -> String {
265    if endpoint.starts_with("http://") || endpoint.starts_with("https://") {
266        endpoint.to_string()
267    } else {
268        format!("https://{}", endpoint)
269    }
270}