saluki_components/forwarders/otlp/
mod.rs1use async_trait::async_trait;
2use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
3use otlp_protos::opentelemetry::proto::collector::logs::v1::logs_service_client::LogsServiceClient;
4use otlp_protos::opentelemetry::proto::collector::logs::v1::ExportLogsServiceRequest;
5use otlp_protos::opentelemetry::proto::collector::metrics::v1::metrics_service_client::MetricsServiceClient;
6use otlp_protos::opentelemetry::proto::collector::metrics::v1::ExportMetricsServiceRequest;
7use otlp_protos::opentelemetry::proto::collector::trace::v1::{
8 trace_service_client::TraceServiceClient, ExportTraceServiceRequest,
9};
10use prost::Message;
11use saluki_common::buf::FrozenChunkedBytesBuffer;
12use saluki_config::GenericConfiguration;
13use saluki_core::data_model::payload::Payload;
14use saluki_core::{
15 components::{forwarders::*, ComponentContext},
16 data_model::payload::PayloadType,
17};
18use saluki_error::ErrorContext as _;
19use saluki_error::GenericError;
20use stringtheory::MetaString;
21use tokio::select;
22use tonic::transport::Channel;
23use tracing::{debug, error, warn};
24
25use crate::common::otlp::{OTLP_LOGS_GRPC_SERVICE_PATH, OTLP_METRICS_GRPC_SERVICE_PATH, OTLP_TRACES_GRPC_SERVICE_PATH};
26
27#[derive(Clone)]
31pub struct OtlpForwarderConfiguration {
32 core_agent_otlp_grpc_endpoint: String,
33 core_agent_traces_internal_port: u16,
34}
35
36impl OtlpForwarderConfiguration {
37 pub fn from_configuration(
39 config: &GenericConfiguration, core_agent_otlp_grpc_endpoint: String,
40 ) -> Result<Self, GenericError> {
41 let core_agent_traces_internal_port = config
42 .try_get_typed("otlp_config.traces.internal_port")?
43 .unwrap_or(5003);
44 Ok(Self {
45 core_agent_otlp_grpc_endpoint,
46 core_agent_traces_internal_port,
47 })
48 }
49}
50
51#[async_trait]
52impl ForwarderBuilder for OtlpForwarderConfiguration {
53 fn input_payload_type(&self) -> PayloadType {
54 PayloadType::Grpc
55 }
56
57 async fn build(&self, _context: ComponentContext) -> Result<Box<dyn Forwarder + Send>, GenericError> {
58 let trace_agent_endpoint = format!("http://localhost:{}", self.core_agent_traces_internal_port);
59 let trace_agent_channel = Channel::from_shared(trace_agent_endpoint.clone())
60 .error_context("Failed to construct gRPC channel due to an invalid endpoint.")?
61 .connect_lazy();
62 let trace_agent_client = TraceServiceClient::new(trace_agent_channel);
63
64 let normalized_endpoint = normalize_endpoint(&self.core_agent_otlp_grpc_endpoint);
65 let core_agent_grpc_channel = Channel::from_shared(normalized_endpoint)
66 .error_context("Failed to construct gRPC channel due to an invalid endpoint.")?
67 .connect_lazy();
68 let core_agent_metrics_client = MetricsServiceClient::new(core_agent_grpc_channel.clone());
69 let core_agent_logs_client = LogsServiceClient::new(core_agent_grpc_channel);
70
71 Ok(Box::new(OtlpForwarder {
72 trace_agent_client,
73 core_agent_metrics_client,
74 core_agent_logs_client,
75 }))
76 }
77}
78
79impl MemoryBounds for OtlpForwarderConfiguration {
80 fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
81 builder.minimum().with_single_value::<OtlpForwarder>("component struct");
82 }
83}
84
85struct OtlpForwarder {
86 trace_agent_client: TraceServiceClient<Channel>,
87 core_agent_metrics_client: MetricsServiceClient<Channel>,
88 core_agent_logs_client: LogsServiceClient<Channel>,
89}
90
91#[async_trait]
92impl Forwarder for OtlpForwarder {
93 async fn run(mut self: Box<Self>, mut context: ForwarderContext) -> Result<(), GenericError> {
94 let Self {
95 mut trace_agent_client,
96 mut core_agent_metrics_client,
97 mut core_agent_logs_client,
98 } = *self;
99
100 let mut health = context.take_health_handle();
101
102 health.mark_ready();
103 debug!("OTLP forwarder started.");
104
105 loop {
106 select! {
107 _ = health.live() => continue,
108 maybe_payload = context.payloads().next() => match maybe_payload {
109 Some(payload) => match payload {
110 Payload::Grpc(grpc_payload) => {
111 let (_, endpoint, service_path, body) = grpc_payload.into_parts();
114 match &service_path {
115 path if *path == OTLP_TRACES_GRPC_SERVICE_PATH => {
116 export_traces(&mut trace_agent_client, &endpoint, &service_path, body).await;
117 }
118 path if *path == OTLP_METRICS_GRPC_SERVICE_PATH => {
119 export_metrics(&mut core_agent_metrics_client, &endpoint, &service_path, body).await;
120 }
121 path if *path == OTLP_LOGS_GRPC_SERVICE_PATH => {
122 export_logs(&mut core_agent_logs_client, &endpoint, &service_path, body).await;
123 }
124 _ => {
125 warn!(service_path = %service_path, "Received gRPC payload with unknown service path. Skipping.");
126 continue;
127 }
128 }
129
130 },
131 _ => continue,
132 },
133 None => break,
134 },
135 }
136 }
137
138 debug!("OTLP forwarder stopped.");
139
140 Ok(())
141 }
142}
143
144async fn export_traces(
145 trace_agent_client: &mut TraceServiceClient<Channel>, endpoint: &MetaString, service_path: &MetaString,
146 body: FrozenChunkedBytesBuffer,
147) {
148 let body = body.into_bytes();
155 let request = match ExportTraceServiceRequest::decode(body) {
156 Ok(req) => req,
157 Err(e) => {
158 error!(error = %e, "Failed to decode trace export request from payload.");
159 return;
160 }
161 };
162
163 match trace_agent_client.export(request).await {
164 Ok(response) => {
165 let resp = response.into_inner();
166 if let Some(partial_success) = resp.partial_success {
167 if partial_success.rejected_spans > 0 {
168 warn!(
169 rejected_spans = partial_success.rejected_spans,
170 error = %partial_success.error_message,
171 "Trace export partially failed."
172 );
173 }
174 }
175 }
176 Err(e) => {
177 error!(error = %e, %endpoint, %service_path, "Failed to export traces to Trace Agent.");
178 }
179 }
180}
181
182async fn export_metrics(
183 core_agent_grpc_client: &mut MetricsServiceClient<Channel>, endpoint: &MetaString, service_path: &MetaString,
184 body: FrozenChunkedBytesBuffer,
185) {
186 let body = body.into_bytes();
193
194 let request = match ExportMetricsServiceRequest::decode(body) {
195 Ok(req) => req,
196 Err(e) => {
197 error!(error = %e, "Failed to decode metrics or logs export request from payload.");
198 return;
199 }
200 };
201
202 match core_agent_grpc_client.export(request).await {
203 Ok(response) => {
204 let resp = response.into_inner();
205 if let Some(partial_success) = resp.partial_success {
206 if partial_success.rejected_data_points > 0 {
207 warn!(
208 rejected_data_points = partial_success.rejected_data_points,
209 error = %partial_success.error_message,
210 "Metrics export partially failed."
211 );
212 }
213 }
214 }
215 Err(e) => {
216 error!(error = %e, %endpoint, %service_path, "Failed to export metrics to Core Agent.");
217 }
218 }
219}
220
221async fn export_logs(
222 core_agent_grpc_client: &mut LogsServiceClient<Channel>, endpoint: &MetaString, service_path: &MetaString,
223 body: FrozenChunkedBytesBuffer,
224) {
225 let body = body.into_bytes();
232
233 let request = match ExportLogsServiceRequest::decode(body) {
234 Ok(req) => req,
235 Err(e) => {
236 error!(error = %e, "Failed to decode logs export request from payload.");
237 return;
238 }
239 };
240
241 match core_agent_grpc_client.export(request).await {
242 Ok(response) => {
243 let resp = response.into_inner();
244 if let Some(partial_success) = resp.partial_success {
245 if partial_success.rejected_log_records > 0 {
246 warn!(
247 rejected_log_records = partial_success.rejected_log_records,
248 error = %partial_success.error_message,
249 "Trace export partially failed."
250 );
251 }
252 }
253 }
254 Err(e) => {
255 error!(error = %e, %endpoint, %service_path, "Failed to export metrics to Core Agent.");
256 }
257 }
258}
259
260fn normalize_endpoint(endpoint: &str) -> String {
261 if endpoint.starts_with("http://") || endpoint.starts_with("https://") {
262 endpoint.to_string()
263 } else {
264 format!("https://{}", endpoint)
265 }
266}