Skip to main content

saluki_components/forwarders/cluster_agent/
mod.rs

1//! Cluster Agent forwarder.
2
3use async_trait::async_trait;
4use http::{
5    header::AUTHORIZATION,
6    uri::{Authority, Scheme},
7    HeaderName, HeaderValue, Request, Uri,
8};
9use resource_accounting::{MemoryBounds, MemoryBoundsBuilder, UsageExpr};
10use saluki_common::buf::FrozenChunkedBytesBuffer;
11use saluki_config::GenericConfiguration;
12use saluki_core::{
13    components::{forwarders::*, ComponentContext},
14    data_model::payload::PayloadType,
15    observability::ComponentMetricsExt as _,
16};
17use saluki_error::{generic_error, GenericError};
18use saluki_metrics::MetricsBuilder;
19use stringtheory::MetaString;
20use tokio::select;
21use tracing::debug;
22
23use crate::common::datadog::{
24    config::ForwarderConfiguration,
25    endpoints::ResolvedEndpoint,
26    io::{EndpointRequestMapper, EndpointRequestMapperFactory, TransactionForwarder},
27    telemetry::ComponentTelemetry,
28    transaction::{Metadata, Transaction, TransactionBody},
29    DEFAULT_INTAKE_COMPRESSED_SIZE_LIMIT,
30};
31
32const CLUSTER_AGENT_SERIES_PATH: &str = "/series";
33
34static DD_API_KEY_HEADER: HeaderName = HeaderName::from_static("dd-api-key");
35
36/// Cluster Agent forwarder configuration.
37///
38/// This forwarder sends encoded metric series payloads to the Cluster Agent `/series` endpoint using bearer-token
39/// authorization. It intentionally does not use Datadog intake endpoint configuration or `DD-Api-Key` auth.
40pub struct ClusterAgentForwarderConfiguration {
41    forwarder_config: ForwarderConfiguration,
42    auth_header_value: HeaderValue,
43}
44
45impl ClusterAgentForwarderConfiguration {
46    /// Creates a new `ClusterAgentForwarderConfiguration` from the given Cluster Agent endpoint and bearer token.
47    pub fn from_configuration(
48        config: &GenericConfiguration, endpoint_url: String, auth_token: String,
49    ) -> Result<Self, GenericError> {
50        let auth_header_value = bearer_auth_header_value(&auth_token)?;
51        let mut forwarder_config = ForwarderConfiguration::from_configuration(config)?.with_allow_arbitrary_tags(false);
52
53        let endpoint = forwarder_config.endpoint_mut();
54        endpoint.clear_additional_endpoints();
55        endpoint.set_dd_url(endpoint_url);
56        endpoint.set_api_key(auth_token);
57        forwarder_config.clear_opw_metrics_endpoint();
58
59        Ok(Self {
60            forwarder_config,
61            auth_header_value,
62        })
63    }
64}
65
66#[async_trait]
67impl ForwarderBuilder for ClusterAgentForwarderConfiguration {
68    fn input_payload_type(&self) -> PayloadType {
69        PayloadType::Http
70    }
71
72    async fn build(&self, context: ComponentContext) -> Result<Box<dyn Forwarder + Send>, GenericError> {
73        let metrics_builder = MetricsBuilder::from_component_context(&context);
74        let telemetry = ComponentTelemetry::from_builder(&metrics_builder);
75        let endpoint_request_mapper_factory = cluster_agent_request_mapper_factory(self.auth_header_value.clone());
76        let forwarder = TransactionForwarder::from_config_with_endpoint_request_mapper(
77            context,
78            self.forwarder_config.clone(),
79            None,
80            get_cluster_agent_endpoint_name,
81            telemetry.clone(),
82            metrics_builder,
83            endpoint_request_mapper_factory,
84        )?;
85
86        Ok(Box::new(ClusterAgentForwarder { forwarder }))
87    }
88}
89
90impl MemoryBounds for ClusterAgentForwarderConfiguration {
91    fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
92        builder
93            .minimum()
94            .with_single_value::<ClusterAgentForwarder>("component struct")
95            .with_array::<Transaction<FrozenChunkedBytesBuffer>>("requests channel", 8);
96
97        builder.firm().with_expr(UsageExpr::sum(
98            "in-flight requests",
99            UsageExpr::config(
100                "forwarder_retry_queue_payloads_max_size",
101                self.forwarder_config.retry().queue_max_size_bytes() as usize,
102            ),
103            UsageExpr::product(
104                "high priority queue",
105                UsageExpr::config(
106                    "forwarder_high_prio_buffer_size",
107                    self.forwarder_config.endpoint_buffer_size(),
108                ),
109                UsageExpr::constant("maximum compressed payload size", DEFAULT_INTAKE_COMPRESSED_SIZE_LIMIT),
110            ),
111        ));
112    }
113}
114
115/// Cluster Agent forwarder.
116pub struct ClusterAgentForwarder {
117    forwarder: TransactionForwarder<FrozenChunkedBytesBuffer>,
118}
119
120#[async_trait]
121impl Forwarder for ClusterAgentForwarder {
122    async fn run(mut self: Box<Self>, mut context: ForwarderContext) -> Result<(), GenericError> {
123        let Self { forwarder } = *self;
124
125        let mut health = context.take_health_handle();
126        let forwarder = forwarder.spawn().await;
127
128        health.mark_ready();
129        debug!("Cluster Agent forwarder started.");
130
131        loop {
132            select! {
133                _ = health.live() => continue,
134                maybe_payload = context.payloads().next() => match maybe_payload {
135                    Some(payload) => if let Some(http_payload) = payload.try_into_http_payload() {
136                        let (payload_meta, request) = http_payload.into_parts();
137                        let transaction_meta = Metadata::from_event_and_data_point_count(
138                            payload_meta.event_count(),
139                            payload_meta.data_point_count(),
140                        );
141                        let transaction = Transaction::from_original(transaction_meta, request);
142
143                        forwarder.send_transaction(transaction).await?;
144                    }
145                    None => break,
146                },
147            }
148        }
149
150        forwarder.shutdown().await;
151
152        debug!("Cluster Agent forwarder stopped.");
153
154        Ok(())
155    }
156}
157
158fn bearer_auth_header_value(auth_token: &str) -> Result<HeaderValue, GenericError> {
159    let raw_value = format!("Bearer {auth_token}");
160    HeaderValue::from_str(&raw_value)
161        .map_err(|_| generic_error!("cluster_agent.auth_token contains characters that are invalid in HTTP headers."))
162}
163
164fn cluster_agent_request_mapper_factory<B>(auth_header_value: HeaderValue) -> EndpointRequestMapperFactory<B>
165where
166    B: 'static,
167{
168    std::sync::Arc::new(move |endpoint| cluster_agent_request_mapper(endpoint, auth_header_value.clone()))
169}
170
171fn cluster_agent_request_mapper<B>(
172    endpoint: ResolvedEndpoint, auth_header_value: HeaderValue,
173) -> EndpointRequestMapper<B> {
174    let new_uri_authority = Authority::try_from(endpoint.endpoint().authority())
175        .expect("should not fail to construct new endpoint authority");
176    let new_uri_scheme =
177        Scheme::try_from(endpoint.endpoint().scheme()).expect("should not fail to construct new endpoint scheme");
178
179    Box::new(move |mut request: Request<TransactionBody<B>>| {
180        let new_uri = Uri::builder()
181            .scheme(new_uri_scheme.clone())
182            .authority(new_uri_authority.clone())
183            .path_and_query(CLUSTER_AGENT_SERIES_PATH)
184            .build()
185            .expect("should not fail to construct Cluster Agent URI");
186        *request.uri_mut() = new_uri;
187        request.headers_mut().remove(&DD_API_KEY_HEADER);
188        request.headers_mut().insert(AUTHORIZATION, auth_header_value.clone());
189
190        request
191    })
192}
193
194fn get_cluster_agent_endpoint_name(uri: &Uri) -> Option<MetaString> {
195    match uri.path() {
196        CLUSTER_AGENT_SERIES_PATH => Some(MetaString::from_static("cluster_agent_series")),
197        _ => None,
198    }
199}
200
201#[cfg(test)]
202mod tests {
203    use http::Method;
204    use saluki_config::ConfigurationLoader;
205    use serde_json::json;
206
207    use super::*;
208    use crate::common::datadog::endpoints::EndpointRoute;
209
210    #[test]
211    fn request_mapper_targets_cluster_agent_series_endpoint_with_bearer_auth() {
212        let auth_header_value = bearer_auth_header_value("secret-token").expect("auth header should be valid");
213        let endpoint = ResolvedEndpoint::from_raw_endpoint("https://cluster-agent.example.com:5005", "secret-token")
214            .expect("endpoint should resolve");
215        let mut mapper = cluster_agent_request_mapper::<()>(endpoint, auth_header_value);
216        let request = Request::builder()
217            .method(Method::POST)
218            .uri("/api/v2/series")
219            .header("dd-api-key", "primary-api-key")
220            .body(TransactionBody::<()>::Rehydrated(None))
221            .expect("request should build");
222
223        let request = mapper(request);
224
225        assert_eq!(
226            request.uri().to_string(),
227            "https://cluster-agent.example.com:5005/series"
228        );
229        assert_eq!(request.headers().get(AUTHORIZATION).unwrap(), "Bearer secret-token");
230        assert!(request.headers().get("dd-api-key").is_none());
231    }
232
233    #[test]
234    fn bearer_auth_header_rejects_invalid_token() {
235        assert!(bearer_auth_header_value("bad\ntoken").is_err());
236    }
237
238    #[tokio::test]
239    async fn configuration_uses_only_cluster_agent_endpoint() {
240        let (config, _) = ConfigurationLoader::for_tests(
241            Some(json!({
242                "api_key": "primary-api-key",
243                "dd_url": "https://app.datadoghq.com",
244                "additional_endpoints": {
245                    "https://additional.example.com": ["additional-api-key"]
246                },
247                "observability_pipelines_worker": {
248                    "metrics": {
249                        "enabled": true,
250                        "url": "https://opw.example.com"
251                    }
252                }
253            })),
254            None,
255            false,
256        )
257        .await;
258
259        let config = ClusterAgentForwarderConfiguration::from_configuration(
260            &config,
261            "https://cluster-agent.example.com".to_string(),
262            "secret-token".to_string(),
263        )
264        .expect("Cluster Agent forwarder configuration should parse");
265        let endpoints = config
266            .forwarder_config
267            .build_routable_endpoints(None)
268            .expect("endpoint should resolve");
269
270        assert_eq!(endpoints.len(), 1);
271        assert_eq!(endpoints[0].route(), EndpointRoute::Primary);
272        assert_eq!(
273            endpoints[0].endpoint().endpoint().as_str(),
274            "https://cluster-agent.example.com/"
275        );
276        assert_eq!(endpoints[0].endpoint().cached_api_key(), "secret-token");
277    }
278}