saluki_components/forwarders/otlp/
mod.rs1use std::time::Duration;
2
3use async_trait::async_trait;
4use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
5use otlp_protos::opentelemetry::proto::collector::logs::v1::logs_service_client::LogsServiceClient;
6use otlp_protos::opentelemetry::proto::collector::logs::v1::ExportLogsServiceRequest;
7use otlp_protos::opentelemetry::proto::collector::metrics::v1::metrics_service_client::MetricsServiceClient;
8use otlp_protos::opentelemetry::proto::collector::metrics::v1::ExportMetricsServiceRequest;
9use otlp_protos::opentelemetry::proto::collector::trace::v1::{
10 trace_service_client::TraceServiceClient, ExportTraceServiceRequest,
11};
12use prost::Message;
13use saluki_common::buf::FrozenChunkedBytesBuffer;
14use saluki_config::GenericConfiguration;
15use saluki_core::data_model::payload::Payload;
16use saluki_core::{
17 components::{forwarders::*, ComponentContext},
18 data_model::payload::PayloadType,
19};
20use saluki_error::ErrorContext as _;
21use saluki_error::GenericError;
22use stringtheory::MetaString;
23use tokio::select;
24use tonic::transport::Channel;
25use tracing::{debug, error, warn};
26
27use crate::common::otlp::{OTLP_LOGS_GRPC_SERVICE_PATH, OTLP_METRICS_GRPC_SERVICE_PATH, OTLP_TRACES_GRPC_SERVICE_PATH};
28
29#[derive(Clone)]
33pub struct OtlpForwarderConfiguration {
34 core_agent_otlp_grpc_endpoint: String,
35 core_agent_traces_internal_port: u16,
36}
37
38impl OtlpForwarderConfiguration {
39 pub fn from_configuration(
41 config: &GenericConfiguration, core_agent_otlp_grpc_endpoint: String,
42 ) -> Result<Self, GenericError> {
43 let core_agent_traces_internal_port = config
44 .try_get_typed("otlp_config.traces.internal_port")?
45 .unwrap_or(5003);
46 Ok(Self {
47 core_agent_otlp_grpc_endpoint,
48 core_agent_traces_internal_port,
49 })
50 }
51}
52
53#[async_trait]
54impl ForwarderBuilder for OtlpForwarderConfiguration {
55 fn input_payload_type(&self) -> PayloadType {
56 PayloadType::Grpc
57 }
58
59 async fn build(&self, _context: ComponentContext) -> Result<Box<dyn Forwarder + Send>, GenericError> {
60 let trace_agent_endpoint = format!("http://localhost:{}", self.core_agent_traces_internal_port);
61 let trace_agent_channel = Channel::from_shared(trace_agent_endpoint.clone())
62 .error_context("Failed to construct gRPC channel due to an invalid endpoint.")?
63 .connect_lazy();
64 let trace_agent_client = TraceServiceClient::new(trace_agent_channel);
65
66 let normalized_endpoint = normalize_endpoint(&self.core_agent_otlp_grpc_endpoint);
67 let core_agent_grpc_channel = Channel::from_shared(normalized_endpoint)
68 .error_context("Failed to construct gRPC channel due to an invalid endpoint.")?
69 .connect_timeout(Duration::from_secs(5))
70 .connect_lazy();
71
72 let core_agent_metrics_client = MetricsServiceClient::new(core_agent_grpc_channel.clone());
73 let core_agent_logs_client = LogsServiceClient::new(core_agent_grpc_channel);
74
75 Ok(Box::new(OtlpForwarder {
76 trace_agent_client,
77 core_agent_metrics_client,
78 core_agent_logs_client,
79 }))
80 }
81}
82
83impl MemoryBounds for OtlpForwarderConfiguration {
84 fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
85 builder.minimum().with_single_value::<OtlpForwarder>("component struct");
86 }
87}
88
89struct OtlpForwarder {
90 trace_agent_client: TraceServiceClient<Channel>,
91 core_agent_metrics_client: MetricsServiceClient<Channel>,
92 core_agent_logs_client: LogsServiceClient<Channel>,
93}
94
95#[async_trait]
96impl Forwarder for OtlpForwarder {
97 async fn run(mut self: Box<Self>, mut context: ForwarderContext) -> Result<(), GenericError> {
98 let Self {
99 mut trace_agent_client,
100 mut core_agent_metrics_client,
101 mut core_agent_logs_client,
102 } = *self;
103
104 let mut health = context.take_health_handle();
105
106 health.mark_ready();
107 debug!("OTLP forwarder started.");
108
109 loop {
110 select! {
111 _ = health.live() => continue,
112 maybe_payload = context.payloads().next() => match maybe_payload {
113 Some(payload) => match payload {
114 Payload::Grpc(grpc_payload) => {
115 let (_, endpoint, service_path, body) = grpc_payload.into_parts();
118 match &service_path {
119 path if *path == OTLP_TRACES_GRPC_SERVICE_PATH => {
120 export_traces(&mut trace_agent_client, &endpoint, &service_path, body).await;
121 }
122 path if *path == OTLP_METRICS_GRPC_SERVICE_PATH => {
123 export_metrics(&mut core_agent_metrics_client, &endpoint, &service_path, body).await;
124 }
125 path if *path == OTLP_LOGS_GRPC_SERVICE_PATH => {
126 export_logs(&mut core_agent_logs_client, &endpoint, &service_path, body).await;
127 }
128 _ => {
129 warn!(service_path = %service_path, "Received gRPC payload with unknown service path. Skipping.");
130 continue;
131 }
132 }
133
134 },
135 _ => continue,
136 },
137 None => break,
138 },
139 }
140 }
141
142 debug!("OTLP forwarder stopped.");
143
144 Ok(())
145 }
146}
147
148async fn export_traces(
149 trace_agent_client: &mut TraceServiceClient<Channel>, endpoint: &MetaString, service_path: &MetaString,
150 body: FrozenChunkedBytesBuffer,
151) {
152 let body = body.into_bytes();
159 let request = match ExportTraceServiceRequest::decode(body) {
160 Ok(req) => req,
161 Err(e) => {
162 error!(error = %e, "Failed to decode trace export request from payload.");
163 return;
164 }
165 };
166
167 match trace_agent_client.export(request).await {
168 Ok(response) => {
169 let resp = response.into_inner();
170 if let Some(partial_success) = resp.partial_success {
171 if partial_success.rejected_spans > 0 {
172 warn!(
173 rejected_spans = partial_success.rejected_spans,
174 error = %partial_success.error_message,
175 "Trace export partially failed."
176 );
177 }
178 }
179 }
180 Err(e) => {
181 error!(error = %e, %endpoint, %service_path, "Failed to export traces to Trace Agent.");
182 }
183 }
184}
185
186async fn export_metrics(
187 core_agent_grpc_client: &mut MetricsServiceClient<Channel>, endpoint: &MetaString, service_path: &MetaString,
188 body: FrozenChunkedBytesBuffer,
189) {
190 let body = body.into_bytes();
197
198 let request = match ExportMetricsServiceRequest::decode(body) {
199 Ok(req) => req,
200 Err(e) => {
201 error!(error = %e, "Failed to decode metrics or logs export request from payload.");
202 return;
203 }
204 };
205
206 match core_agent_grpc_client.export(request).await {
207 Ok(response) => {
208 let resp = response.into_inner();
209 if let Some(partial_success) = resp.partial_success {
210 if partial_success.rejected_data_points > 0 {
211 warn!(
212 rejected_data_points = partial_success.rejected_data_points,
213 error = %partial_success.error_message,
214 "Metrics export partially failed."
215 );
216 }
217 }
218 }
219 Err(e) => {
220 error!(error = %e, %endpoint, %service_path, "Failed to export metrics to Core Agent.");
221 }
222 }
223}
224
225async fn export_logs(
226 core_agent_grpc_client: &mut LogsServiceClient<Channel>, endpoint: &MetaString, service_path: &MetaString,
227 body: FrozenChunkedBytesBuffer,
228) {
229 let body = body.into_bytes();
236
237 let request = match ExportLogsServiceRequest::decode(body) {
238 Ok(req) => req,
239 Err(e) => {
240 error!(error = %e, "Failed to decode logs export request from payload.");
241 return;
242 }
243 };
244
245 match core_agent_grpc_client.export(request).await {
246 Ok(response) => {
247 let resp = response.into_inner();
248 if let Some(partial_success) = resp.partial_success {
249 if partial_success.rejected_log_records > 0 {
250 warn!(
251 rejected_log_records = partial_success.rejected_log_records,
252 error = %partial_success.error_message,
253 "Trace export partially failed."
254 );
255 }
256 }
257 }
258 Err(e) => {
259 error!(error = %e, %endpoint, %service_path, "Failed to export metrics to Core Agent.");
260 }
261 }
262}
263
264fn normalize_endpoint(endpoint: &str) -> String {
265 if endpoint.starts_with("http://") || endpoint.starts_with("https://") {
266 endpoint.to_string()
267 } else {
268 format!("https://{}", endpoint)
269 }
270}