Skip to main content

saluki_components/config/
cluster_agent.rs

1//! Cluster Agent configuration.
2
3use std::net::IpAddr;
4
5use saluki_config::GenericConfiguration;
6use saluki_error::GenericError;
7
8const DEFAULT_CLUSTER_AGENT_KUBERNETES_SERVICE_NAME: &str = "datadog-cluster-agent";
9
10/// Cluster Agent forwarding configuration.
11#[derive(Clone, Debug, Eq, PartialEq)]
12pub struct ClusterAgentConfiguration {
13    enabled: bool,
14    url: Option<String>,
15    kubernetes_service_name: Option<String>,
16    auth_token: Option<String>,
17}
18
19impl ClusterAgentConfiguration {
20    /// Creates a new `ClusterAgentConfiguration` from the given configuration.
21    pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
22        Ok(Self {
23            enabled: config.try_get_typed("cluster_agent.enabled")?.unwrap_or(false),
24            url: get_non_empty_string(config, "cluster_agent.url")?,
25            kubernetes_service_name: get_trimmed_string(config, "cluster_agent.kubernetes_service_name")?,
26            auth_token: get_non_empty_string(config, "cluster_agent.auth_token")?,
27        })
28    }
29
30    /// Returns the Cluster Agent HTTPS endpoint and bearer token when forwarding can be configured.
31    pub fn endpoint_and_token(&self) -> Option<(String, String)> {
32        self.endpoint_and_token_with_env(|key| std::env::var(key).ok())
33    }
34
35    fn endpoint_and_token_with_env<F>(&self, env_lookup: F) -> Option<(String, String)>
36    where
37        F: Fn(&str) -> Option<String>,
38    {
39        if !self.enabled {
40            return None;
41        }
42
43        let endpoint = self.resolve_endpoint(env_lookup)?;
44
45        Some((endpoint, self.auth_token.clone()?))
46    }
47
48    fn resolve_endpoint<F>(&self, env_lookup: F) -> Option<String>
49    where
50        F: Fn(&str) -> Option<String>,
51    {
52        if let Some(url) = self.url.as_deref() {
53            return normalize_cluster_agent_url(url);
54        }
55
56        let service_name = self
57            .kubernetes_service_name
58            .as_deref()
59            .unwrap_or(DEFAULT_CLUSTER_AGENT_KUBERNETES_SERVICE_NAME);
60        if service_name.is_empty() {
61            return None;
62        }
63
64        resolve_kubernetes_service_endpoint(service_name, env_lookup)
65    }
66}
67
68fn get_non_empty_string(config: &GenericConfiguration, key: &str) -> Result<Option<String>, GenericError> {
69    Ok(get_trimmed_string(config, key)?.filter(|value| !value.is_empty()))
70}
71
72fn get_trimmed_string(config: &GenericConfiguration, key: &str) -> Result<Option<String>, GenericError> {
73    Ok(config
74        .try_get_typed::<String>(key)?
75        .map(|value| value.trim().to_string()))
76}
77
78fn normalize_cluster_agent_url(url: &str) -> Option<String> {
79    let normalized = if url.contains("://") {
80        url.to_string()
81    } else {
82        format!("https://{url}")
83    };
84
85    let parsed = url::Url::parse(&normalized).ok()?;
86    if parsed.scheme() == "https" && parsed.host_str().is_some() {
87        Some(normalized)
88    } else {
89        None
90    }
91}
92
93fn resolve_kubernetes_service_endpoint<F>(service_name: &str, env_lookup: F) -> Option<String>
94where
95    F: Fn(&str) -> Option<String>,
96{
97    let env_prefix = service_name.to_uppercase().replace('-', "_");
98    let host_env = format!("{env_prefix}_SERVICE_HOST");
99    let port_env = format!("{env_prefix}_SERVICE_PORT");
100
101    let host = env_lookup(&host_env)?.trim().to_string();
102    let port = env_lookup(&port_env)?.trim().to_string();
103    if host.is_empty() || port.is_empty() {
104        return None;
105    }
106
107    normalize_cluster_agent_url(&join_host_port(&host, &port))
108}
109
110fn join_host_port(host: &str, port: &str) -> String {
111    match host.parse::<IpAddr>() {
112        Ok(IpAddr::V6(_)) => format!("[{host}]:{port}"),
113        _ => format!("{host}:{port}"),
114    }
115}
116
117#[cfg(test)]
118mod tests {
119    use saluki_config::ConfigurationLoader;
120    use serde_json::json;
121
122    use super::*;
123
124    async fn cluster_agent_config_from(value: serde_json::Value) -> ClusterAgentConfiguration {
125        let (config, _) = ConfigurationLoader::for_tests(Some(value), None, false).await;
126        ClusterAgentConfiguration::from_configuration(&config).expect("Cluster Agent configuration should deserialize")
127    }
128
129    #[tokio::test]
130    async fn endpoint_and_token_requires_enabled_cluster_agent() {
131        let config = cluster_agent_config_from(json!({
132            "cluster_agent": {
133                "enabled": false,
134                "url": "https://cluster-agent.example.com",
135                "auth_token": "secret-token"
136            }
137        }))
138        .await;
139
140        assert_eq!(config.endpoint_and_token_with_env(env_lookup(&[])), None);
141    }
142
143    #[tokio::test]
144    async fn endpoint_and_token_requires_resolvable_endpoint() {
145        let config = cluster_agent_config_from(json!({
146            "cluster_agent": {
147                "enabled": true,
148                "auth_token": "secret-token"
149            }
150        }))
151        .await;
152
153        assert_eq!(config.endpoint_and_token_with_env(env_lookup(&[])), None);
154    }
155
156    #[tokio::test]
157    async fn endpoint_and_token_requires_https_url() {
158        let config = cluster_agent_config_from(json!({
159            "cluster_agent": {
160                "enabled": true,
161                "url": "http://cluster-agent.example.com",
162                "auth_token": "secret-token"
163            }
164        }))
165        .await;
166
167        assert_eq!(config.endpoint_and_token(), None);
168    }
169
170    #[tokio::test]
171    async fn endpoint_and_token_adds_https_scheme_to_url() {
172        let config = cluster_agent_config_from(json!({
173            "cluster_agent": {
174                "enabled": true,
175                "url": "cluster-agent.example.com:5005",
176                "auth_token": "secret-token"
177            }
178        }))
179        .await;
180
181        assert_eq!(
182            config.endpoint_and_token(),
183            Some((
184                "https://cluster-agent.example.com:5005".to_string(),
185                "secret-token".to_string()
186            ))
187        );
188    }
189
190    #[tokio::test]
191    async fn endpoint_and_token_requires_non_empty_token() {
192        let config = cluster_agent_config_from(json!({
193            "cluster_agent": {
194                "enabled": true,
195                "url": "https://cluster-agent.example.com",
196                "auth_token": "  "
197            }
198        }))
199        .await;
200
201        assert_eq!(config.endpoint_and_token(), None);
202    }
203
204    #[tokio::test]
205    async fn endpoint_and_token_returns_https_url_and_token() {
206        let config = cluster_agent_config_from(json!({
207            "cluster_agent": {
208                "enabled": true,
209                "url": " https://cluster-agent.example.com ",
210                "auth_token": " secret-token "
211            }
212        }))
213        .await;
214
215        assert_eq!(
216            config.endpoint_and_token(),
217            Some((
218                "https://cluster-agent.example.com".to_string(),
219                "secret-token".to_string()
220            ))
221        );
222    }
223
224    #[tokio::test]
225    async fn endpoint_and_token_resolves_default_kubernetes_service_env() {
226        let config = cluster_agent_config_from(json!({
227            "cluster_agent": {
228                "enabled": true,
229                "auth_token": "secret-token"
230            }
231        }))
232        .await;
233
234        assert_eq!(
235            config.endpoint_and_token_with_env(env_lookup(&[
236                ("DATADOG_CLUSTER_AGENT_SERVICE_HOST", "127.0.0.1"),
237                ("DATADOG_CLUSTER_AGENT_SERVICE_PORT", "443"),
238            ])),
239            Some(("https://127.0.0.1:443".to_string(), "secret-token".to_string()))
240        );
241    }
242
243    #[tokio::test]
244    async fn endpoint_and_token_resolves_configured_kubernetes_service_env() {
245        let config = cluster_agent_config_from(json!({
246            "cluster_agent": {
247                "enabled": true,
248                "kubernetes_service_name": "custom-cluster-agent",
249                "auth_token": "secret-token"
250            }
251        }))
252        .await;
253
254        assert_eq!(
255            config.endpoint_and_token_with_env(env_lookup(&[
256                ("CUSTOM_CLUSTER_AGENT_SERVICE_HOST", "10.0.0.7"),
257                ("CUSTOM_CLUSTER_AGENT_SERVICE_PORT", "5005"),
258            ])),
259            Some(("https://10.0.0.7:5005".to_string(), "secret-token".to_string()))
260        );
261    }
262
263    #[tokio::test]
264    async fn endpoint_and_token_wraps_kubernetes_service_ipv6_host() {
265        let config = cluster_agent_config_from(json!({
266            "cluster_agent": {
267                "enabled": true,
268                "auth_token": "secret-token"
269            }
270        }))
271        .await;
272
273        assert_eq!(
274            config.endpoint_and_token_with_env(env_lookup(&[
275                ("DATADOG_CLUSTER_AGENT_SERVICE_HOST", "fd38:552b:2959::4f4a"),
276                ("DATADOG_CLUSTER_AGENT_SERVICE_PORT", "5005"),
277            ])),
278            Some((
279                "https://[fd38:552b:2959::4f4a]:5005".to_string(),
280                "secret-token".to_string()
281            ))
282        );
283    }
284
285    #[tokio::test]
286    async fn endpoint_and_token_prefers_configured_url_over_kubernetes_service_env() {
287        let config = cluster_agent_config_from(json!({
288            "cluster_agent": {
289                "enabled": true,
290                "url": "https://configured-cluster-agent.example.com",
291                "kubernetes_service_name": "custom-cluster-agent",
292                "auth_token": "secret-token"
293            }
294        }))
295        .await;
296
297        assert_eq!(
298            config.endpoint_and_token_with_env(env_lookup(&[
299                ("CUSTOM_CLUSTER_AGENT_SERVICE_HOST", "10.0.0.7"),
300                ("CUSTOM_CLUSTER_AGENT_SERVICE_PORT", "5005"),
301            ])),
302            Some((
303                "https://configured-cluster-agent.example.com".to_string(),
304                "secret-token".to_string()
305            ))
306        );
307    }
308
309    fn env_lookup<'a>(entries: &'a [(&'a str, &'a str)]) -> impl Fn(&str) -> Option<String> + 'a {
310        move |key| {
311            entries
312                .iter()
313                .find_map(|(entry_key, entry_value)| (*entry_key == key).then(|| (*entry_value).to_string()))
314        }
315    }
316}