saluki_components/forwarders/cluster_agent/
mod.rs1use async_trait::async_trait;
4use http::{
5 header::AUTHORIZATION,
6 uri::{Authority, Scheme},
7 HeaderName, HeaderValue, Request, Uri,
8};
9use resource_accounting::{MemoryBounds, MemoryBoundsBuilder, UsageExpr};
10use saluki_common::buf::FrozenChunkedBytesBuffer;
11use saluki_config::GenericConfiguration;
12use saluki_core::{
13 components::{forwarders::*, ComponentContext},
14 data_model::payload::PayloadType,
15 observability::ComponentMetricsExt as _,
16};
17use saluki_error::{generic_error, GenericError};
18use saluki_metrics::MetricsBuilder;
19use stringtheory::MetaString;
20use tokio::select;
21use tracing::debug;
22
23use crate::common::datadog::{
24 config::ForwarderConfiguration,
25 endpoints::ResolvedEndpoint,
26 io::{EndpointRequestMapper, EndpointRequestMapperFactory, TransactionForwarder},
27 telemetry::ComponentTelemetry,
28 transaction::{Metadata, Transaction, TransactionBody},
29 DEFAULT_INTAKE_COMPRESSED_SIZE_LIMIT,
30};
31
32const CLUSTER_AGENT_SERIES_PATH: &str = "/series";
33
34static DD_API_KEY_HEADER: HeaderName = HeaderName::from_static("dd-api-key");
35
36pub struct ClusterAgentForwarderConfiguration {
41 forwarder_config: ForwarderConfiguration,
42 auth_header_value: HeaderValue,
43}
44
45impl ClusterAgentForwarderConfiguration {
46 pub fn from_configuration(
48 config: &GenericConfiguration, endpoint_url: String, auth_token: String,
49 ) -> Result<Self, GenericError> {
50 let auth_header_value = bearer_auth_header_value(&auth_token)?;
51 let mut forwarder_config = ForwarderConfiguration::from_configuration(config)?.with_allow_arbitrary_tags(false);
52
53 let endpoint = forwarder_config.endpoint_mut();
54 endpoint.clear_additional_endpoints();
55 endpoint.set_dd_url(endpoint_url);
56 endpoint.set_api_key(auth_token);
57 forwarder_config.clear_opw_metrics_endpoint();
58
59 Ok(Self {
60 forwarder_config,
61 auth_header_value,
62 })
63 }
64}
65
66#[async_trait]
67impl ForwarderBuilder for ClusterAgentForwarderConfiguration {
68 fn input_payload_type(&self) -> PayloadType {
69 PayloadType::Http
70 }
71
72 async fn build(&self, context: ComponentContext) -> Result<Box<dyn Forwarder + Send>, GenericError> {
73 let metrics_builder = MetricsBuilder::from_component_context(&context);
74 let telemetry = ComponentTelemetry::from_builder(&metrics_builder);
75 let endpoint_request_mapper_factory = cluster_agent_request_mapper_factory(self.auth_header_value.clone());
76 let forwarder = TransactionForwarder::from_config_with_endpoint_request_mapper(
77 context,
78 self.forwarder_config.clone(),
79 None,
80 get_cluster_agent_endpoint_name,
81 telemetry.clone(),
82 metrics_builder,
83 endpoint_request_mapper_factory,
84 )?;
85
86 Ok(Box::new(ClusterAgentForwarder { forwarder }))
87 }
88}
89
90impl MemoryBounds for ClusterAgentForwarderConfiguration {
91 fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
92 builder
93 .minimum()
94 .with_single_value::<ClusterAgentForwarder>("component struct")
95 .with_array::<Transaction<FrozenChunkedBytesBuffer>>("requests channel", 8);
96
97 builder.firm().with_expr(UsageExpr::sum(
98 "in-flight requests",
99 UsageExpr::config(
100 "forwarder_retry_queue_payloads_max_size",
101 self.forwarder_config.retry().queue_max_size_bytes() as usize,
102 ),
103 UsageExpr::product(
104 "high priority queue",
105 UsageExpr::config(
106 "forwarder_high_prio_buffer_size",
107 self.forwarder_config.endpoint_buffer_size(),
108 ),
109 UsageExpr::constant("maximum compressed payload size", DEFAULT_INTAKE_COMPRESSED_SIZE_LIMIT),
110 ),
111 ));
112 }
113}
114
115pub struct ClusterAgentForwarder {
117 forwarder: TransactionForwarder<FrozenChunkedBytesBuffer>,
118}
119
120#[async_trait]
121impl Forwarder for ClusterAgentForwarder {
122 async fn run(mut self: Box<Self>, mut context: ForwarderContext) -> Result<(), GenericError> {
123 let Self { forwarder } = *self;
124
125 let mut health = context.take_health_handle();
126 let forwarder = forwarder.spawn().await;
127
128 health.mark_ready();
129 debug!("Cluster Agent forwarder started.");
130
131 loop {
132 select! {
133 _ = health.live() => continue,
134 maybe_payload = context.payloads().next() => match maybe_payload {
135 Some(payload) => if let Some(http_payload) = payload.try_into_http_payload() {
136 let (payload_meta, request) = http_payload.into_parts();
137 let transaction_meta = Metadata::from_event_and_data_point_count(
138 payload_meta.event_count(),
139 payload_meta.data_point_count(),
140 );
141 let transaction = Transaction::from_original(transaction_meta, request);
142
143 forwarder.send_transaction(transaction).await?;
144 }
145 None => break,
146 },
147 }
148 }
149
150 forwarder.shutdown().await;
151
152 debug!("Cluster Agent forwarder stopped.");
153
154 Ok(())
155 }
156}
157
158fn bearer_auth_header_value(auth_token: &str) -> Result<HeaderValue, GenericError> {
159 let raw_value = format!("Bearer {auth_token}");
160 HeaderValue::from_str(&raw_value)
161 .map_err(|_| generic_error!("cluster_agent.auth_token contains characters that are invalid in HTTP headers."))
162}
163
164fn cluster_agent_request_mapper_factory<B>(auth_header_value: HeaderValue) -> EndpointRequestMapperFactory<B>
165where
166 B: 'static,
167{
168 std::sync::Arc::new(move |endpoint| cluster_agent_request_mapper(endpoint, auth_header_value.clone()))
169}
170
171fn cluster_agent_request_mapper<B>(
172 endpoint: ResolvedEndpoint, auth_header_value: HeaderValue,
173) -> EndpointRequestMapper<B> {
174 let new_uri_authority = Authority::try_from(endpoint.endpoint().authority())
175 .expect("should not fail to construct new endpoint authority");
176 let new_uri_scheme =
177 Scheme::try_from(endpoint.endpoint().scheme()).expect("should not fail to construct new endpoint scheme");
178
179 Box::new(move |mut request: Request<TransactionBody<B>>| {
180 let new_uri = Uri::builder()
181 .scheme(new_uri_scheme.clone())
182 .authority(new_uri_authority.clone())
183 .path_and_query(CLUSTER_AGENT_SERIES_PATH)
184 .build()
185 .expect("should not fail to construct Cluster Agent URI");
186 *request.uri_mut() = new_uri;
187 request.headers_mut().remove(&DD_API_KEY_HEADER);
188 request.headers_mut().insert(AUTHORIZATION, auth_header_value.clone());
189
190 request
191 })
192}
193
194fn get_cluster_agent_endpoint_name(uri: &Uri) -> Option<MetaString> {
195 match uri.path() {
196 CLUSTER_AGENT_SERIES_PATH => Some(MetaString::from_static("cluster_agent_series")),
197 _ => None,
198 }
199}
200
201#[cfg(test)]
202mod tests {
203 use http::Method;
204 use saluki_config::ConfigurationLoader;
205 use serde_json::json;
206
207 use super::*;
208 use crate::common::datadog::endpoints::EndpointRoute;
209
210 #[test]
211 fn request_mapper_targets_cluster_agent_series_endpoint_with_bearer_auth() {
212 let auth_header_value = bearer_auth_header_value("secret-token").expect("auth header should be valid");
213 let endpoint = ResolvedEndpoint::from_raw_endpoint("https://cluster-agent.example.com:5005", "secret-token")
214 .expect("endpoint should resolve");
215 let mut mapper = cluster_agent_request_mapper::<()>(endpoint, auth_header_value);
216 let request = Request::builder()
217 .method(Method::POST)
218 .uri("/api/v2/series")
219 .header("dd-api-key", "primary-api-key")
220 .body(TransactionBody::<()>::Rehydrated(None))
221 .expect("request should build");
222
223 let request = mapper(request);
224
225 assert_eq!(
226 request.uri().to_string(),
227 "https://cluster-agent.example.com:5005/series"
228 );
229 assert_eq!(request.headers().get(AUTHORIZATION).unwrap(), "Bearer secret-token");
230 assert!(request.headers().get("dd-api-key").is_none());
231 }
232
233 #[test]
234 fn bearer_auth_header_rejects_invalid_token() {
235 assert!(bearer_auth_header_value("bad\ntoken").is_err());
236 }
237
238 #[tokio::test]
239 async fn configuration_uses_only_cluster_agent_endpoint() {
240 let (config, _) = ConfigurationLoader::for_tests(
241 Some(json!({
242 "api_key": "primary-api-key",
243 "dd_url": "https://app.datadoghq.com",
244 "additional_endpoints": {
245 "https://additional.example.com": ["additional-api-key"]
246 },
247 "observability_pipelines_worker": {
248 "metrics": {
249 "enabled": true,
250 "url": "https://opw.example.com"
251 }
252 }
253 })),
254 None,
255 false,
256 )
257 .await;
258
259 let config = ClusterAgentForwarderConfiguration::from_configuration(
260 &config,
261 "https://cluster-agent.example.com".to_string(),
262 "secret-token".to_string(),
263 )
264 .expect("Cluster Agent forwarder configuration should parse");
265 let endpoints = config
266 .forwarder_config
267 .build_routable_endpoints(None)
268 .expect("endpoint should resolve");
269
270 assert_eq!(endpoints.len(), 1);
271 assert_eq!(endpoints[0].route(), EndpointRoute::Primary);
272 assert_eq!(
273 endpoints[0].endpoint().endpoint().as_str(),
274 "https://cluster-agent.example.com/"
275 );
276 assert_eq!(endpoints[0].endpoint().cached_api_key(), "secret-token");
277 }
278}