saluki_components/forwarders/trace_agent/
mod.rs

1use async_trait::async_trait;
2use bytes::Buf;
3use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
4use otlp_protos::opentelemetry::proto::collector::trace::v1::{
5    trace_service_client::TraceServiceClient, ExportTraceServiceRequest,
6};
7use prost::Message;
8use saluki_config::GenericConfiguration;
9use saluki_core::{
10    components::{forwarders::*, ComponentContext},
11    data_model::payload::PayloadType,
12};
13use saluki_error::ErrorContext as _;
14use saluki_error::GenericError;
15use serde::Deserialize;
16use tokio::select;
17use tonic::transport::Channel;
18use tracing::{debug, error, warn};
19
20use crate::common::otlp::config::Traces;
21
22/// Configuration for the trace-agent forwarder.
23#[derive(Clone, Deserialize, Default)]
24pub struct TraceAgentForwarderConfiguration {
25    #[serde(default)]
26    otlp_config: TraceAgentOtlpConfig,
27}
28
29#[derive(Clone, Deserialize, Default)]
30struct TraceAgentOtlpConfig {
31    #[serde(default)]
32    traces: Traces,
33}
34
35impl TraceAgentForwarderConfiguration {
36    /// Creates a new `TraceAgentForwarderConfiguration` from the given configuration.
37    pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
38        config.as_typed().map_err(Into::into)
39    }
40}
41
42#[async_trait]
43impl ForwarderBuilder for TraceAgentForwarderConfiguration {
44    fn input_payload_type(&self) -> PayloadType {
45        PayloadType::Grpc
46    }
47
48    async fn build(&self, _context: ComponentContext) -> Result<Box<dyn Forwarder + Send>, GenericError> {
49        let endpoint = format!("http://localhost:{}", self.otlp_config.traces.internal_port);
50        let channel = Channel::from_shared(endpoint)
51            .error_context("Invalid gRPC endpoint")?
52            .connect_lazy();
53
54        let client = TraceServiceClient::new(channel);
55
56        Ok(Box::new(TraceAgentForwarder { client }))
57    }
58}
59
60impl MemoryBounds for TraceAgentForwarderConfiguration {
61    fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
62        builder
63            .minimum()
64            .with_single_value::<TraceAgentForwarder>("component struct");
65    }
66}
67
68/// Trace-agent forwarder.
69pub struct TraceAgentForwarder {
70    client: TraceServiceClient<Channel>,
71}
72
73#[async_trait]
74impl Forwarder for TraceAgentForwarder {
75    async fn run(mut self: Box<Self>, mut context: ForwarderContext) -> Result<(), GenericError> {
76        let mut health = context.take_health_handle();
77
78        health.mark_ready();
79        debug!("Trace-agent forwarder started.");
80
81        loop {
82            select! {
83                _ = health.live() => continue,
84                maybe_payload = context.payloads().next() => match maybe_payload {
85                    Some(payload) => if let Some(grpc_payload) = payload.try_into_grpc_payload() {
86                        let (_, endpoint, _, mut body) = grpc_payload.into_parts();
87
88                        let remaining = body.remaining();
89                        let body_bytes = body.copy_to_bytes(remaining);
90                        let request = match ExportTraceServiceRequest::decode(body_bytes) {
91                            Ok(req) => req,
92                            Err(e) => {
93                                error!(error = %e, "Failed to decode ExportTraceServiceRequest");
94                                continue;
95                            }
96                        };
97
98                        match self.client.export(request).await {
99                            Ok(response) => {
100                                let resp = response.into_inner();
101                                if let Some(partial_success) = resp.partial_success {
102                                    if partial_success.rejected_spans > 0 {
103                                        warn!(
104                                            rejected_spans = partial_success.rejected_spans,
105                                            error_message = %partial_success.error_message,
106                                            "Trace export partially failed"
107                                        );
108                                    }
109                                }
110                            }
111                            Err(e) => {
112                                error!(error = %e, endpoint = %endpoint, "Failed to export traces to trace-agent");
113                            }
114                        }
115                    },
116                    None => break,
117                },
118            }
119        }
120
121        debug!("Trace-agent forwarder stopped.");
122        Ok(())
123    }
124}