saluki_components/forwarders/trace_agent/
mod.rs1use async_trait::async_trait;
2use bytes::Buf;
3use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
4use otlp_protos::opentelemetry::proto::collector::trace::v1::{
5 trace_service_client::TraceServiceClient, ExportTraceServiceRequest,
6};
7use prost::Message;
8use saluki_config::GenericConfiguration;
9use saluki_core::{
10 components::{forwarders::*, ComponentContext},
11 data_model::payload::PayloadType,
12};
13use saluki_error::ErrorContext as _;
14use saluki_error::GenericError;
15use serde::Deserialize;
16use tokio::select;
17use tonic::transport::Channel;
18use tracing::{debug, error, warn};
19
20use crate::common::otlp::config::Traces;
21
22#[derive(Clone, Deserialize, Default)]
24pub struct TraceAgentForwarderConfiguration {
25 #[serde(default)]
26 otlp_config: TraceAgentOtlpConfig,
27}
28
29#[derive(Clone, Deserialize, Default)]
30struct TraceAgentOtlpConfig {
31 #[serde(default)]
32 traces: Traces,
33}
34
35impl TraceAgentForwarderConfiguration {
36 pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
38 config.as_typed().map_err(Into::into)
39 }
40}
41
42#[async_trait]
43impl ForwarderBuilder for TraceAgentForwarderConfiguration {
44 fn input_payload_type(&self) -> PayloadType {
45 PayloadType::Grpc
46 }
47
48 async fn build(&self, _context: ComponentContext) -> Result<Box<dyn Forwarder + Send>, GenericError> {
49 let endpoint = format!("http://localhost:{}", self.otlp_config.traces.internal_port);
50 let channel = Channel::from_shared(endpoint)
51 .error_context("Invalid gRPC endpoint")?
52 .connect_lazy();
53
54 let client = TraceServiceClient::new(channel);
55
56 Ok(Box::new(TraceAgentForwarder { client }))
57 }
58}
59
60impl MemoryBounds for TraceAgentForwarderConfiguration {
61 fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
62 builder
63 .minimum()
64 .with_single_value::<TraceAgentForwarder>("component struct");
65 }
66}
67
68pub struct TraceAgentForwarder {
70 client: TraceServiceClient<Channel>,
71}
72
73#[async_trait]
74impl Forwarder for TraceAgentForwarder {
75 async fn run(mut self: Box<Self>, mut context: ForwarderContext) -> Result<(), GenericError> {
76 let mut health = context.take_health_handle();
77
78 health.mark_ready();
79 debug!("Trace-agent forwarder started.");
80
81 loop {
82 select! {
83 _ = health.live() => continue,
84 maybe_payload = context.payloads().next() => match maybe_payload {
85 Some(payload) => if let Some(grpc_payload) = payload.try_into_grpc_payload() {
86 let (_, endpoint, _, mut body) = grpc_payload.into_parts();
87
88 let remaining = body.remaining();
89 let body_bytes = body.copy_to_bytes(remaining);
90 let request = match ExportTraceServiceRequest::decode(body_bytes) {
91 Ok(req) => req,
92 Err(e) => {
93 error!(error = %e, "Failed to decode ExportTraceServiceRequest");
94 continue;
95 }
96 };
97
98 match self.client.export(request).await {
99 Ok(response) => {
100 let resp = response.into_inner();
101 if let Some(partial_success) = resp.partial_success {
102 if partial_success.rejected_spans > 0 {
103 warn!(
104 rejected_spans = partial_success.rejected_spans,
105 error_message = %partial_success.error_message,
106 "Trace export partially failed"
107 );
108 }
109 }
110 }
111 Err(e) => {
112 error!(error = %e, endpoint = %endpoint, "Failed to export traces to trace-agent");
113 }
114 }
115 },
116 None => break,
117 },
118 }
119 }
120
121 debug!("Trace-agent forwarder stopped.");
122 Ok(())
123 }
124}