saluki_components/forwarders/trace_agent/
mod.rs1use async_trait::async_trait;
2use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
3use otlp_protos::opentelemetry::proto::collector::trace::v1::{
4 trace_service_client::TraceServiceClient, ExportTraceServiceRequest,
5};
6use prost::Message;
7use saluki_config::GenericConfiguration;
8use saluki_core::{
9 components::{forwarders::*, ComponentContext},
10 data_model::payload::PayloadType,
11};
12use saluki_error::ErrorContext as _;
13use saluki_error::GenericError;
14use serde::Deserialize;
15use tokio::select;
16use tonic::transport::Channel;
17use tracing::{debug, error, warn};
18
19use crate::common::otlp::OTLP_TRACES_GRPC_SERVICE_PATH;
20
21#[derive(Clone, Deserialize, Default)]
25pub struct TraceAgentForwarderConfiguration {
26 #[serde(default)]
27 otlp_config: TraceAgentOtlpConfig,
28}
29
30#[derive(Clone, Deserialize, Default)]
31struct TraceAgentOtlpConfig {
32 #[serde(default)]
33 traces: Traces,
34}
35
36#[derive(Clone, Deserialize, Default)]
37struct Traces {
38 internal_port: u16,
39}
40
41impl TraceAgentForwarderConfiguration {
42 pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
44 config.as_typed().map_err(Into::into)
45 }
46}
47
48#[async_trait]
49impl ForwarderBuilder for TraceAgentForwarderConfiguration {
50 fn input_payload_type(&self) -> PayloadType {
51 PayloadType::Grpc
52 }
53
54 async fn build(&self, _context: ComponentContext) -> Result<Box<dyn Forwarder + Send>, GenericError> {
55 let endpoint = format!("http://localhost:{}", self.otlp_config.traces.internal_port);
56 let channel = Channel::from_shared(endpoint.clone())
57 .error_context("Failed to construct gRPC channel due to an invalid endpoint.")?
58 .connect_lazy();
59
60 let client = TraceServiceClient::new(channel);
61
62 Ok(Box::new(TraceAgentForwarder { client, endpoint }))
63 }
64}
65
66impl MemoryBounds for TraceAgentForwarderConfiguration {
67 fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
68 builder
69 .minimum()
70 .with_single_value::<TraceAgentForwarder>("component struct");
71 }
72}
73
74struct TraceAgentForwarder {
75 client: TraceServiceClient<Channel>,
76 endpoint: String,
77}
78
79#[async_trait]
80impl Forwarder for TraceAgentForwarder {
81 async fn run(mut self: Box<Self>, mut context: ForwarderContext) -> Result<(), GenericError> {
82 let Self { mut client, endpoint } = *self;
83
84 let mut health = context.take_health_handle();
85
86 health.mark_ready();
87 debug!("Trace Agent forwarder started.");
88
89 loop {
90 select! {
91 _ = health.live() => continue,
92 maybe_payload = context.payloads().next() => match maybe_payload {
93 Some(payload) => if let Some(grpc_payload) = payload.try_into_grpc_payload() {
94 let (_, _, service_path, body) = grpc_payload.into_parts();
97 if service_path != OTLP_TRACES_GRPC_SERVICE_PATH {
98 warn!("Received unexpected non-trace gRPC payload in Trace Agent forwarder. Skipping.");
99 continue;
100 }
101
102 let body = body.into_bytes();
109 let request = match ExportTraceServiceRequest::decode(body) {
110 Ok(req) => req,
111 Err(e) => {
112 error!(error = %e, "Failed to decode trace export request from payload.");
113 continue;
114 }
115 };
116
117 match client.export(request).await {
118 Ok(response) => {
119 let resp = response.into_inner();
120 if let Some(partial_success) = resp.partial_success {
121 if partial_success.rejected_spans > 0 {
122 warn!(
123 rejected_spans = partial_success.rejected_spans,
124 error = %partial_success.error_message,
125 "Trace export partially failed."
126 );
127 }
128 }
129 }
130 Err(e) => {
131 error!(error = %e, %endpoint, "Failed to export traces to Trace Agent.");
132 }
133 }
134 },
135 None => break,
136 },
137 }
138 }
139
140 debug!("Trace Agent forwarder stopped.");
141
142 Ok(())
143 }
144}