saluki_components/forwarders/trace_agent/
mod.rs

1use async_trait::async_trait;
2use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
3use otlp_protos::opentelemetry::proto::collector::trace::v1::{
4    trace_service_client::TraceServiceClient, ExportTraceServiceRequest,
5};
6use prost::Message;
7use saluki_config::GenericConfiguration;
8use saluki_core::{
9    components::{forwarders::*, ComponentContext},
10    data_model::payload::PayloadType,
11};
12use saluki_error::ErrorContext as _;
13use saluki_error::GenericError;
14use serde::Deserialize;
15use tokio::select;
16use tonic::transport::Channel;
17use tracing::{debug, error, warn};
18
19use crate::common::otlp::OTLP_TRACES_GRPC_SERVICE_PATH;
20
21/// Trace Agent forwarder.
22///
23/// Forwards OTLP trace payloads to the Trace Agent.
24#[derive(Clone, Deserialize, Default)]
25pub struct TraceAgentForwarderConfiguration {
26    #[serde(default)]
27    otlp_config: TraceAgentOtlpConfig,
28}
29
30#[derive(Clone, Deserialize, Default)]
31struct TraceAgentOtlpConfig {
32    #[serde(default)]
33    traces: Traces,
34}
35
36#[derive(Clone, Deserialize, Default)]
37struct Traces {
38    internal_port: u16,
39}
40
41impl TraceAgentForwarderConfiguration {
42    /// Creates a new `TraceAgentForwarderConfiguration` from the given configuration.
43    pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
44        config.as_typed().map_err(Into::into)
45    }
46}
47
48#[async_trait]
49impl ForwarderBuilder for TraceAgentForwarderConfiguration {
50    fn input_payload_type(&self) -> PayloadType {
51        PayloadType::Grpc
52    }
53
54    async fn build(&self, _context: ComponentContext) -> Result<Box<dyn Forwarder + Send>, GenericError> {
55        let endpoint = format!("http://localhost:{}", self.otlp_config.traces.internal_port);
56        let channel = Channel::from_shared(endpoint.clone())
57            .error_context("Failed to construct gRPC channel due to an invalid endpoint.")?
58            .connect_lazy();
59
60        let client = TraceServiceClient::new(channel);
61
62        Ok(Box::new(TraceAgentForwarder { client, endpoint }))
63    }
64}
65
66impl MemoryBounds for TraceAgentForwarderConfiguration {
67    fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
68        builder
69            .minimum()
70            .with_single_value::<TraceAgentForwarder>("component struct");
71    }
72}
73
74struct TraceAgentForwarder {
75    client: TraceServiceClient<Channel>,
76    endpoint: String,
77}
78
79#[async_trait]
80impl Forwarder for TraceAgentForwarder {
81    async fn run(mut self: Box<Self>, mut context: ForwarderContext) -> Result<(), GenericError> {
82        let Self { mut client, endpoint } = *self;
83
84        let mut health = context.take_health_handle();
85
86        health.mark_ready();
87        debug!("Trace Agent forwarder started.");
88
89        loop {
90            select! {
91                _ = health.live() => continue,
92                maybe_payload = context.payloads().next() => match maybe_payload {
93                    Some(payload) => if let Some(grpc_payload) = payload.try_into_grpc_payload() {
94                        // Extract the parts of the payload, and make sure we have an OTLP trace payload, otherwise
95                        // we skip it and move on.
96                        let (_, _, service_path, body) = grpc_payload.into_parts();
97                        if service_path != OTLP_TRACES_GRPC_SERVICE_PATH {
98                            warn!("Received unexpected non-trace gRPC payload in Trace Agent forwarder. Skipping.");
99                            continue;
100                        }
101
102                        // Decode the raw request payload into a typed body so we can export it.
103                        //
104                        // TODO: This is suboptimal since we know the payload should be valid as it was decoded when it
105                        // was ingested, and only after that converted to raw bytes. It would be nice to just forward
106                        // the bytes as-is without decoding it again here just to satisfy the client interface, but no
107                        // such API currently exists.
108                        let body = body.into_bytes();
109                        let request = match ExportTraceServiceRequest::decode(body) {
110                            Ok(req) => req,
111                            Err(e) => {
112                                error!(error = %e, "Failed to decode trace export request from payload.");
113                                continue;
114                            }
115                        };
116
117                        match client.export(request).await {
118                            Ok(response) => {
119                                let resp = response.into_inner();
120                                if let Some(partial_success) = resp.partial_success {
121                                    if partial_success.rejected_spans > 0 {
122                                        warn!(
123                                            rejected_spans = partial_success.rejected_spans,
124                                            error = %partial_success.error_message,
125                                            "Trace export partially failed."
126                                        );
127                                    }
128                                }
129                            }
130                            Err(e) => {
131                                error!(error = %e, %endpoint, "Failed to export traces to Trace Agent.");
132                            }
133                        }
134                    },
135                    None => break,
136                },
137            }
138        }
139
140        debug!("Trace Agent forwarder stopped.");
141
142        Ok(())
143    }
144}