saluki_components/config/
cluster_agent.rs1use std::net::IpAddr;
4
5use saluki_config::GenericConfiguration;
6use saluki_error::GenericError;
7
8const DEFAULT_CLUSTER_AGENT_KUBERNETES_SERVICE_NAME: &str = "datadog-cluster-agent";
9
10#[derive(Clone, Debug, Eq, PartialEq)]
12pub struct ClusterAgentConfiguration {
13 enabled: bool,
14 url: Option<String>,
15 kubernetes_service_name: Option<String>,
16 auth_token: Option<String>,
17}
18
19impl ClusterAgentConfiguration {
20 pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
22 Ok(Self {
23 enabled: config.try_get_typed("cluster_agent.enabled")?.unwrap_or(false),
24 url: get_non_empty_string(config, "cluster_agent.url")?,
25 kubernetes_service_name: get_trimmed_string(config, "cluster_agent.kubernetes_service_name")?,
26 auth_token: get_non_empty_string(config, "cluster_agent.auth_token")?,
27 })
28 }
29
30 pub fn endpoint_and_token(&self) -> Option<(String, String)> {
32 self.endpoint_and_token_with_env(|key| std::env::var(key).ok())
33 }
34
35 fn endpoint_and_token_with_env<F>(&self, env_lookup: F) -> Option<(String, String)>
36 where
37 F: Fn(&str) -> Option<String>,
38 {
39 if !self.enabled {
40 return None;
41 }
42
43 let endpoint = self.resolve_endpoint(env_lookup)?;
44
45 Some((endpoint, self.auth_token.clone()?))
46 }
47
48 fn resolve_endpoint<F>(&self, env_lookup: F) -> Option<String>
49 where
50 F: Fn(&str) -> Option<String>,
51 {
52 if let Some(url) = self.url.as_deref() {
53 return normalize_cluster_agent_url(url);
54 }
55
56 let service_name = self
57 .kubernetes_service_name
58 .as_deref()
59 .unwrap_or(DEFAULT_CLUSTER_AGENT_KUBERNETES_SERVICE_NAME);
60 if service_name.is_empty() {
61 return None;
62 }
63
64 resolve_kubernetes_service_endpoint(service_name, env_lookup)
65 }
66}
67
68fn get_non_empty_string(config: &GenericConfiguration, key: &str) -> Result<Option<String>, GenericError> {
69 Ok(get_trimmed_string(config, key)?.filter(|value| !value.is_empty()))
70}
71
72fn get_trimmed_string(config: &GenericConfiguration, key: &str) -> Result<Option<String>, GenericError> {
73 Ok(config
74 .try_get_typed::<String>(key)?
75 .map(|value| value.trim().to_string()))
76}
77
78fn normalize_cluster_agent_url(url: &str) -> Option<String> {
79 let normalized = if url.contains("://") {
80 url.to_string()
81 } else {
82 format!("https://{url}")
83 };
84
85 let parsed = url::Url::parse(&normalized).ok()?;
86 if parsed.scheme() == "https" && parsed.host_str().is_some() {
87 Some(normalized)
88 } else {
89 None
90 }
91}
92
93fn resolve_kubernetes_service_endpoint<F>(service_name: &str, env_lookup: F) -> Option<String>
94where
95 F: Fn(&str) -> Option<String>,
96{
97 let env_prefix = service_name.to_uppercase().replace('-', "_");
98 let host_env = format!("{env_prefix}_SERVICE_HOST");
99 let port_env = format!("{env_prefix}_SERVICE_PORT");
100
101 let host = env_lookup(&host_env)?.trim().to_string();
102 let port = env_lookup(&port_env)?.trim().to_string();
103 if host.is_empty() || port.is_empty() {
104 return None;
105 }
106
107 normalize_cluster_agent_url(&join_host_port(&host, &port))
108}
109
110fn join_host_port(host: &str, port: &str) -> String {
111 match host.parse::<IpAddr>() {
112 Ok(IpAddr::V6(_)) => format!("[{host}]:{port}"),
113 _ => format!("{host}:{port}"),
114 }
115}
116
117#[cfg(test)]
118mod tests {
119 use saluki_config::ConfigurationLoader;
120 use serde_json::json;
121
122 use super::*;
123
124 async fn cluster_agent_config_from(value: serde_json::Value) -> ClusterAgentConfiguration {
125 let (config, _) = ConfigurationLoader::for_tests(Some(value), None, false).await;
126 ClusterAgentConfiguration::from_configuration(&config).expect("Cluster Agent configuration should deserialize")
127 }
128
129 #[tokio::test]
130 async fn endpoint_and_token_requires_enabled_cluster_agent() {
131 let config = cluster_agent_config_from(json!({
132 "cluster_agent": {
133 "enabled": false,
134 "url": "https://cluster-agent.example.com",
135 "auth_token": "secret-token"
136 }
137 }))
138 .await;
139
140 assert_eq!(config.endpoint_and_token_with_env(env_lookup(&[])), None);
141 }
142
143 #[tokio::test]
144 async fn endpoint_and_token_requires_resolvable_endpoint() {
145 let config = cluster_agent_config_from(json!({
146 "cluster_agent": {
147 "enabled": true,
148 "auth_token": "secret-token"
149 }
150 }))
151 .await;
152
153 assert_eq!(config.endpoint_and_token_with_env(env_lookup(&[])), None);
154 }
155
156 #[tokio::test]
157 async fn endpoint_and_token_requires_https_url() {
158 let config = cluster_agent_config_from(json!({
159 "cluster_agent": {
160 "enabled": true,
161 "url": "http://cluster-agent.example.com",
162 "auth_token": "secret-token"
163 }
164 }))
165 .await;
166
167 assert_eq!(config.endpoint_and_token(), None);
168 }
169
170 #[tokio::test]
171 async fn endpoint_and_token_adds_https_scheme_to_url() {
172 let config = cluster_agent_config_from(json!({
173 "cluster_agent": {
174 "enabled": true,
175 "url": "cluster-agent.example.com:5005",
176 "auth_token": "secret-token"
177 }
178 }))
179 .await;
180
181 assert_eq!(
182 config.endpoint_and_token(),
183 Some((
184 "https://cluster-agent.example.com:5005".to_string(),
185 "secret-token".to_string()
186 ))
187 );
188 }
189
190 #[tokio::test]
191 async fn endpoint_and_token_requires_non_empty_token() {
192 let config = cluster_agent_config_from(json!({
193 "cluster_agent": {
194 "enabled": true,
195 "url": "https://cluster-agent.example.com",
196 "auth_token": " "
197 }
198 }))
199 .await;
200
201 assert_eq!(config.endpoint_and_token(), None);
202 }
203
204 #[tokio::test]
205 async fn endpoint_and_token_returns_https_url_and_token() {
206 let config = cluster_agent_config_from(json!({
207 "cluster_agent": {
208 "enabled": true,
209 "url": " https://cluster-agent.example.com ",
210 "auth_token": " secret-token "
211 }
212 }))
213 .await;
214
215 assert_eq!(
216 config.endpoint_and_token(),
217 Some((
218 "https://cluster-agent.example.com".to_string(),
219 "secret-token".to_string()
220 ))
221 );
222 }
223
224 #[tokio::test]
225 async fn endpoint_and_token_resolves_default_kubernetes_service_env() {
226 let config = cluster_agent_config_from(json!({
227 "cluster_agent": {
228 "enabled": true,
229 "auth_token": "secret-token"
230 }
231 }))
232 .await;
233
234 assert_eq!(
235 config.endpoint_and_token_with_env(env_lookup(&[
236 ("DATADOG_CLUSTER_AGENT_SERVICE_HOST", "127.0.0.1"),
237 ("DATADOG_CLUSTER_AGENT_SERVICE_PORT", "443"),
238 ])),
239 Some(("https://127.0.0.1:443".to_string(), "secret-token".to_string()))
240 );
241 }
242
243 #[tokio::test]
244 async fn endpoint_and_token_resolves_configured_kubernetes_service_env() {
245 let config = cluster_agent_config_from(json!({
246 "cluster_agent": {
247 "enabled": true,
248 "kubernetes_service_name": "custom-cluster-agent",
249 "auth_token": "secret-token"
250 }
251 }))
252 .await;
253
254 assert_eq!(
255 config.endpoint_and_token_with_env(env_lookup(&[
256 ("CUSTOM_CLUSTER_AGENT_SERVICE_HOST", "10.0.0.7"),
257 ("CUSTOM_CLUSTER_AGENT_SERVICE_PORT", "5005"),
258 ])),
259 Some(("https://10.0.0.7:5005".to_string(), "secret-token".to_string()))
260 );
261 }
262
263 #[tokio::test]
264 async fn endpoint_and_token_wraps_kubernetes_service_ipv6_host() {
265 let config = cluster_agent_config_from(json!({
266 "cluster_agent": {
267 "enabled": true,
268 "auth_token": "secret-token"
269 }
270 }))
271 .await;
272
273 assert_eq!(
274 config.endpoint_and_token_with_env(env_lookup(&[
275 ("DATADOG_CLUSTER_AGENT_SERVICE_HOST", "fd38:552b:2959::4f4a"),
276 ("DATADOG_CLUSTER_AGENT_SERVICE_PORT", "5005"),
277 ])),
278 Some((
279 "https://[fd38:552b:2959::4f4a]:5005".to_string(),
280 "secret-token".to_string()
281 ))
282 );
283 }
284
285 #[tokio::test]
286 async fn endpoint_and_token_prefers_configured_url_over_kubernetes_service_env() {
287 let config = cluster_agent_config_from(json!({
288 "cluster_agent": {
289 "enabled": true,
290 "url": "https://configured-cluster-agent.example.com",
291 "kubernetes_service_name": "custom-cluster-agent",
292 "auth_token": "secret-token"
293 }
294 }))
295 .await;
296
297 assert_eq!(
298 config.endpoint_and_token_with_env(env_lookup(&[
299 ("CUSTOM_CLUSTER_AGENT_SERVICE_HOST", "10.0.0.7"),
300 ("CUSTOM_CLUSTER_AGENT_SERVICE_PORT", "5005"),
301 ])),
302 Some((
303 "https://configured-cluster-agent.example.com".to_string(),
304 "secret-token".to_string()
305 ))
306 );
307 }
308
309 fn env_lookup<'a>(entries: &'a [(&'a str, &'a str)]) -> impl Fn(&str) -> Option<String> + 'a {
310 move |key| {
311 entries
312 .iter()
313 .find_map(|(entry_key, entry_value)| (*entry_key == key).then(|| (*entry_value).to_string()))
314 }
315 }
316}