saluki_components/forwarders/otlp/
mod.rs

1use async_trait::async_trait;
2use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
3use otlp_protos::opentelemetry::proto::collector::logs::v1::logs_service_client::LogsServiceClient;
4use otlp_protos::opentelemetry::proto::collector::logs::v1::ExportLogsServiceRequest;
5use otlp_protos::opentelemetry::proto::collector::metrics::v1::metrics_service_client::MetricsServiceClient;
6use otlp_protos::opentelemetry::proto::collector::metrics::v1::ExportMetricsServiceRequest;
7use otlp_protos::opentelemetry::proto::collector::trace::v1::{
8    trace_service_client::TraceServiceClient, ExportTraceServiceRequest,
9};
10use prost::Message;
11use saluki_common::buf::FrozenChunkedBytesBuffer;
12use saluki_config::GenericConfiguration;
13use saluki_core::data_model::payload::Payload;
14use saluki_core::{
15    components::{forwarders::*, ComponentContext},
16    data_model::payload::PayloadType,
17};
18use saluki_error::ErrorContext as _;
19use saluki_error::GenericError;
20use stringtheory::MetaString;
21use tokio::select;
22use tonic::transport::Channel;
23use tracing::{debug, error, warn};
24
25use crate::common::otlp::{OTLP_LOGS_GRPC_SERVICE_PATH, OTLP_METRICS_GRPC_SERVICE_PATH, OTLP_TRACES_GRPC_SERVICE_PATH};
26
27/// OTLP forwarder configuration.
28///
29/// Forwards OTLP metrics and logs to the Core Agent, and traces to the Trace Agent.
30#[derive(Clone)]
31pub struct OtlpForwarderConfiguration {
32    core_agent_otlp_grpc_endpoint: String,
33    core_agent_traces_internal_port: u16,
34}
35
36impl OtlpForwarderConfiguration {
37    /// Creates a new `OtlpForwarderConfiguration` from the given configuration.
38    pub fn from_configuration(
39        config: &GenericConfiguration, core_agent_otlp_grpc_endpoint: String,
40    ) -> Result<Self, GenericError> {
41        let core_agent_traces_internal_port = config
42            .try_get_typed("otlp_config.traces.internal_port")?
43            .unwrap_or(5003);
44        Ok(Self {
45            core_agent_otlp_grpc_endpoint,
46            core_agent_traces_internal_port,
47        })
48    }
49}
50
51#[async_trait]
52impl ForwarderBuilder for OtlpForwarderConfiguration {
53    fn input_payload_type(&self) -> PayloadType {
54        PayloadType::Grpc
55    }
56
57    async fn build(&self, _context: ComponentContext) -> Result<Box<dyn Forwarder + Send>, GenericError> {
58        let trace_agent_endpoint = format!("http://localhost:{}", self.core_agent_traces_internal_port);
59        let trace_agent_channel = Channel::from_shared(trace_agent_endpoint.clone())
60            .error_context("Failed to construct gRPC channel due to an invalid endpoint.")?
61            .connect_lazy();
62        let trace_agent_client = TraceServiceClient::new(trace_agent_channel);
63
64        let normalized_endpoint = normalize_endpoint(&self.core_agent_otlp_grpc_endpoint);
65        let core_agent_grpc_channel = Channel::from_shared(normalized_endpoint)
66            .error_context("Failed to construct gRPC channel due to an invalid endpoint.")?
67            .connect_lazy();
68        let core_agent_metrics_client = MetricsServiceClient::new(core_agent_grpc_channel.clone());
69        let core_agent_logs_client = LogsServiceClient::new(core_agent_grpc_channel);
70
71        Ok(Box::new(OtlpForwarder {
72            trace_agent_client,
73            core_agent_metrics_client,
74            core_agent_logs_client,
75        }))
76    }
77}
78
79impl MemoryBounds for OtlpForwarderConfiguration {
80    fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
81        builder.minimum().with_single_value::<OtlpForwarder>("component struct");
82    }
83}
84
85struct OtlpForwarder {
86    trace_agent_client: TraceServiceClient<Channel>,
87    core_agent_metrics_client: MetricsServiceClient<Channel>,
88    core_agent_logs_client: LogsServiceClient<Channel>,
89}
90
91#[async_trait]
92impl Forwarder for OtlpForwarder {
93    async fn run(mut self: Box<Self>, mut context: ForwarderContext) -> Result<(), GenericError> {
94        let Self {
95            mut trace_agent_client,
96            mut core_agent_metrics_client,
97            mut core_agent_logs_client,
98        } = *self;
99
100        let mut health = context.take_health_handle();
101
102        health.mark_ready();
103        debug!("OTLP forwarder started.");
104
105        loop {
106            select! {
107                _ = health.live() => continue,
108                maybe_payload = context.payloads().next() => match maybe_payload {
109                    Some(payload) => match payload {
110                        Payload::Grpc(grpc_payload) => {
111                            // Extract the parts of the payload, and make sure we have an OTLP payload, otherwise
112                            // we skip it and move on.
113                            let (_, endpoint, service_path, body) = grpc_payload.into_parts();
114                            match &service_path {
115                                path if *path == OTLP_TRACES_GRPC_SERVICE_PATH => {
116                                    export_traces(&mut trace_agent_client, &endpoint, &service_path, body).await;
117                                }
118                                path if *path == OTLP_METRICS_GRPC_SERVICE_PATH => {
119                                    export_metrics(&mut core_agent_metrics_client, &endpoint, &service_path, body).await;
120                                }
121                                path if *path == OTLP_LOGS_GRPC_SERVICE_PATH => {
122                                    export_logs(&mut core_agent_logs_client, &endpoint, &service_path, body).await;
123                                }
124                                _ => {
125                                    warn!(service_path = %service_path, "Received gRPC payload with unknown service path. Skipping.");
126                                    continue;
127                                }
128                            }
129
130                        },
131                        _ => continue,
132                    },
133                    None => break,
134                },
135            }
136        }
137
138        debug!("OTLP forwarder stopped.");
139
140        Ok(())
141    }
142}
143
144async fn export_traces(
145    trace_agent_client: &mut TraceServiceClient<Channel>, endpoint: &MetaString, service_path: &MetaString,
146    body: FrozenChunkedBytesBuffer,
147) {
148    // Decode the raw request payload into a typed body so we can export it.
149    //
150    // TODO: This is suboptimal since we know the payload should be valid as it was decoded when it
151    // was ingested, and only after that converted to raw bytes. It would be nice to just forward
152    // the bytes as-is without decoding it again here just to satisfy the client interface, but no
153    // such API currently exists.
154    let body = body.into_bytes();
155    let request = match ExportTraceServiceRequest::decode(body) {
156        Ok(req) => req,
157        Err(e) => {
158            error!(error = %e, "Failed to decode trace export request from payload.");
159            return;
160        }
161    };
162
163    match trace_agent_client.export(request).await {
164        Ok(response) => {
165            let resp = response.into_inner();
166            if let Some(partial_success) = resp.partial_success {
167                if partial_success.rejected_spans > 0 {
168                    warn!(
169                        rejected_spans = partial_success.rejected_spans,
170                        error = %partial_success.error_message,
171                        "Trace export partially failed."
172                    );
173                }
174            }
175        }
176        Err(e) => {
177            error!(error = %e, %endpoint, %service_path, "Failed to export traces to Trace Agent.");
178        }
179    }
180}
181
182async fn export_metrics(
183    core_agent_grpc_client: &mut MetricsServiceClient<Channel>, endpoint: &MetaString, service_path: &MetaString,
184    body: FrozenChunkedBytesBuffer,
185) {
186    // Decode the raw request payload into a typed body so we can export it.
187    //
188    // TODO: This is suboptimal since we know the payload should be valid as it was decoded when it
189    // was ingested, and only after that converted to raw bytes. It would be nice to just forward
190    // the bytes as-is without decoding it again here just to satisfy the client interface, but no
191    // such API currently exists.
192    let body = body.into_bytes();
193
194    let request = match ExportMetricsServiceRequest::decode(body) {
195        Ok(req) => req,
196        Err(e) => {
197            error!(error = %e, "Failed to decode metrics or logs export request from payload.");
198            return;
199        }
200    };
201
202    match core_agent_grpc_client.export(request).await {
203        Ok(response) => {
204            let resp = response.into_inner();
205            if let Some(partial_success) = resp.partial_success {
206                if partial_success.rejected_data_points > 0 {
207                    warn!(
208                        rejected_data_points = partial_success.rejected_data_points,
209                        error = %partial_success.error_message,
210                        "Metrics export partially failed."
211                    );
212                }
213            }
214        }
215        Err(e) => {
216            error!(error = %e, %endpoint, %service_path, "Failed to export metrics to Core Agent.");
217        }
218    }
219}
220
221async fn export_logs(
222    core_agent_grpc_client: &mut LogsServiceClient<Channel>, endpoint: &MetaString, service_path: &MetaString,
223    body: FrozenChunkedBytesBuffer,
224) {
225    // Decode the raw request payload into a typed body so we can export it.
226    //
227    // TODO: This is suboptimal since we know the payload should be valid as it was decoded when it
228    // was ingested, and only after that converted to raw bytes. It would be nice to just forward
229    // the bytes as-is without decoding it again here just to satisfy the client interface, but no
230    // such API currently exists.
231    let body = body.into_bytes();
232
233    let request = match ExportLogsServiceRequest::decode(body) {
234        Ok(req) => req,
235        Err(e) => {
236            error!(error = %e, "Failed to decode logs export request from payload.");
237            return;
238        }
239    };
240
241    match core_agent_grpc_client.export(request).await {
242        Ok(response) => {
243            let resp = response.into_inner();
244            if let Some(partial_success) = resp.partial_success {
245                if partial_success.rejected_log_records > 0 {
246                    warn!(
247                        rejected_log_records = partial_success.rejected_log_records,
248                        error = %partial_success.error_message,
249                        "Trace export partially failed."
250                    );
251                }
252            }
253        }
254        Err(e) => {
255            error!(error = %e, %endpoint, %service_path, "Failed to export metrics to Core Agent.");
256        }
257    }
258}
259
260fn normalize_endpoint(endpoint: &str) -> String {
261    if endpoint.starts_with("http://") || endpoint.starts_with("https://") {
262        endpoint.to_string()
263    } else {
264        format!("https://{}", endpoint)
265    }
266}