saluki_components/common/datadog/
endpoints.rs1use std::{
2 collections::{HashMap, HashSet},
3 str::FromStr,
4 sync::LazyLock,
5};
6
7use http::uri::Authority;
8use regex::Regex;
9use saluki_config::GenericConfiguration;
10use saluki_error::{ErrorContext as _, GenericError};
11use saluki_metadata;
12use serde::Deserialize;
13use serde_with::{serde_as, DisplayFromStr, OneOrMany, PickFirst};
14use snafu::{ResultExt, Snafu};
15use tracing::debug;
16use url::Url;
17
18static DD_URL_REGEX: LazyLock<Regex> =
19 LazyLock::new(|| Regex::new(r"^app(\.mrf)?(\.[a-z]{2}\d)?\.(datad(oghq|0g)\.(com|eu)|ddog-gov\.com)$").unwrap());
20
21pub const DEFAULT_SITE: &str = "datadoghq.com";
22
23fn default_site() -> String {
24 DEFAULT_SITE.to_owned()
25}
26
27#[derive(Debug, Snafu)]
29#[snafu(context(suffix(false)))]
30pub(crate) enum EndpointError {
31 Parse { source: url::ParseError, endpoint: String },
32}
33
34#[serde_as]
35#[derive(Clone, Debug, Default, Deserialize)]
36struct APIKeys(#[serde_as(as = "OneOrMany<_>")] Vec<String>);
37
38#[derive(Clone, Debug, Default, Deserialize)]
39struct MappedAPIKeys(HashMap<String, APIKeys>);
40
41impl MappedAPIKeys {
42 fn mappings(&self) -> impl Iterator<Item = (&str, &APIKeys)> {
43 self.0.iter().map(|(k, v)| (k.as_str(), v))
44 }
45}
46
47impl FromStr for MappedAPIKeys {
48 type Err = serde_json::Error;
49
50 fn from_str(s: &str) -> Result<Self, Self::Err> {
51 let inner = serde_json::from_str(s)?;
52 Ok(Self(inner))
53 }
54}
55
56#[serde_as]
60#[derive(Clone, Debug, Default, Deserialize)]
61pub(crate) struct AdditionalEndpoints(#[serde_as(as = "PickFirst<(DisplayFromStr, _)>")] MappedAPIKeys);
62
63impl AdditionalEndpoints {
64 pub fn resolved_endpoints(&self) -> Result<Vec<ResolvedEndpoint>, EndpointError> {
73 let mut resolved = Vec::new();
74
75 for (raw_endpoint, api_keys) in self.0.mappings() {
76 let endpoint = parse_and_normalize_endpoint(raw_endpoint)?;
77 let logs_authority = compute_logs_authority(&endpoint);
78 let traces_authority = compute_traces_authority(&endpoint);
79
80 let mut seen = HashSet::new();
83 for api_key in &api_keys.0 {
84 let trimmed_api_key = api_key.trim();
86 if trimmed_api_key.is_empty() || seen.contains(trimmed_api_key) {
87 continue;
88 }
89
90 seen.insert(trimmed_api_key);
91 resolved.push(ResolvedEndpoint {
92 endpoint: endpoint.clone(),
93 api_key: trimmed_api_key.to_string(),
94 config: None,
95 logs_authority: logs_authority.clone(),
96 traces_authority: traces_authority.clone(),
97 });
98 }
99 }
100
101 Ok(resolved)
102 }
103}
104
105#[derive(Clone, Deserialize)]
107pub struct EndpointConfiguration {
108 api_key: String,
110
111 #[serde(default = "default_site")]
118 site: String,
119
120 #[serde(default)]
128 dd_url: Option<String>,
129
130 #[serde(default)]
134 additional_endpoints: AdditionalEndpoints,
135}
136
137impl EndpointConfiguration {
138 pub fn set_dd_url(&mut self, url: String) {
140 self.dd_url = Some(url);
141 }
142
143 pub fn set_api_key(&mut self, api_key: String) {
145 self.api_key = api_key;
146 }
147
148 pub fn clear_additional_endpoints(&mut self) {
150 self.additional_endpoints = AdditionalEndpoints::default();
151 }
152
153 pub fn build_resolved_endpoints(
163 &self, configuration: Option<GenericConfiguration>,
164 ) -> Result<Vec<ResolvedEndpoint>, GenericError> {
165 let primary_endpoint = calculate_resolved_endpoint(self.dd_url.as_deref(), &self.site, &self.api_key)
166 .error_context("Failed parsing/resolving the primary destination endpoint.")?
167 .with_configuration(configuration);
168
169 let additional_endpoints = self
170 .additional_endpoints
171 .resolved_endpoints()
172 .error_context("Failed parsing/resolving the additional destination endpoints.")?;
173
174 let mut endpoints = additional_endpoints;
175 endpoints.insert(0, primary_endpoint);
176
177 Ok(endpoints)
178 }
179}
180
181#[derive(Clone, Debug)]
186pub struct ResolvedEndpoint {
187 endpoint: Url,
188 api_key: String,
189 config: Option<GenericConfiguration>,
190 logs_authority: Option<Authority>,
193 traces_authority: Option<Authority>,
196}
197
198impl ResolvedEndpoint {
199 fn from_raw_endpoint(raw_endpoint: &str, api_key: &str) -> Result<Self, EndpointError> {
207 let endpoint = parse_and_normalize_endpoint(raw_endpoint)?;
208 let logs_authority = compute_logs_authority(&endpoint);
209 let traces_authority = compute_traces_authority(&endpoint);
210 Ok(Self {
211 endpoint,
212 api_key: api_key.to_string(),
213 config: None,
214 logs_authority,
215 traces_authority,
216 })
217 }
218
219 pub fn with_configuration(self, config: Option<GenericConfiguration>) -> Self {
221 Self {
222 endpoint: self.endpoint,
223 api_key: self.api_key,
224 config,
225 logs_authority: self.logs_authority,
226 traces_authority: self.traces_authority,
227 }
228 }
229
230 pub fn endpoint(&self) -> &Url {
232 &self.endpoint
233 }
234
235 pub fn api_key(&mut self) -> &str {
240 if let Some(config) = &self.config {
241 match config.try_get_typed::<String>("api_key") {
242 Ok(Some(api_key)) => {
243 if !api_key.is_empty() && self.api_key != api_key {
244 debug!(endpoint = %self.endpoint, "Refreshed API key.");
245 self.api_key = api_key;
246 }
247 }
248 Ok(None) | Err(_) => {
249 debug!("Failed to retrieve API key from remote source (missing or wrong type). Continuing with last known valid API key.");
250 }
251 }
252 }
253 self.api_key.as_str()
254 }
255
256 pub fn cached_api_key(&self) -> &str {
258 self.api_key.as_str()
259 }
260
261 pub fn logs_authority(&self) -> Option<&Authority> {
266 self.logs_authority.as_ref()
267 }
268
269 pub fn traces_authority(&self) -> Option<&Authority> {
271 self.traces_authority.as_ref()
272 }
273}
274
275fn parse_and_normalize_endpoint(raw_endpoint: &str) -> Result<Url, EndpointError> {
276 let raw_endpoint = if !raw_endpoint.starts_with("http://") && !raw_endpoint.starts_with("https://") {
280 format!("https://{}", raw_endpoint)
281 } else {
282 raw_endpoint.to_string()
283 };
284
285 let endpoint = Url::parse(&raw_endpoint).context(Parse { endpoint: raw_endpoint })?;
286
287 add_data_plane_version_prefix(endpoint)
292}
293
294fn get_data_plane_version_prefix() -> String {
299 let app_details = saluki_metadata::get_app_details();
300 let version = app_details.version();
301 format!(
302 "{}-{}-{}-{}.agent",
303 version.major(),
304 version.minor(),
305 version.patch(),
306 app_details.identifier(),
307 )
308}
309
310fn add_data_plane_version_prefix(mut endpoint: Url) -> Result<Url, EndpointError> {
319 let new_host = match endpoint.host_str() {
320 Some(host) => {
321 if !DD_URL_REGEX.is_match(host) {
323 debug!("Configured endpoint '{}' appears to be a non-Datadog endpoint. Utilizing endpoint without modification.", host);
324 return Ok(endpoint);
325 }
326
327 let leftmost_segment = host.split('.').next().unwrap_or("");
330 let versioned_segment = get_data_plane_version_prefix();
331 host.replacen(leftmost_segment, &versioned_segment, 1)
332 }
333 None => {
334 return Err(EndpointError::Parse {
335 source: url::ParseError::EmptyHost,
336 endpoint: endpoint.to_string(),
337 })
338 }
339 };
340
341 if let Err(e) = endpoint.set_host(Some(new_host.as_str())) {
343 return Err(EndpointError::Parse {
344 source: e,
345 endpoint: endpoint.to_string(),
346 });
347 }
348
349 Ok(endpoint)
350}
351
352fn calculate_resolved_endpoint(
359 override_url: Option<&str>, site: &str, api_key: &str,
360) -> Result<ResolvedEndpoint, EndpointError> {
361 let raw_endpoint = match override_url {
362 Some(url) => url.to_string(),
364 None => {
365 let base_domain = if site.is_empty() { DEFAULT_SITE } else { site };
369 format!("app.{}", base_domain)
370 }
371 };
372
373 ResolvedEndpoint::from_raw_endpoint(&raw_endpoint, api_key)
374}
375
376fn compute_logs_authority(endpoint: &Url) -> Option<Authority> {
384 const AGENT_HOST_MARKER: &str = ".agent.";
385
386 let host = endpoint.host_str()?;
387 let idx = host.find(AGENT_HOST_MARKER)?;
388 let site = &host[idx + AGENT_HOST_MARKER.len()..];
389 let logs_host = format!("agent-http-intake.logs.{}", site);
390
391 Authority::from_str(&logs_host).ok()
392}
393
394fn compute_traces_authority(endpoint: &Url) -> Option<Authority> {
397 const AGENT_HOST_MARKER: &str = ".agent.";
398
399 let host = endpoint.host_str()?;
400 let idx = host.find(AGENT_HOST_MARKER)?;
401 let site = &host[idx + AGENT_HOST_MARKER.len()..];
402 let traces_host = format!("trace.agent.{}", site);
403
404 Authority::from_str(&traces_host).ok()
405}
406
407#[cfg(test)]
408mod tests {
409 use super::*;
410
411 fn additional_endpoints_to_sorted_strings(endpoints: &AdditionalEndpoints) -> Vec<String> {
412 let mut flattened = endpoints
413 .0
414 .mappings()
415 .flat_map(|(domain, api_keys)| api_keys.0.iter().map(move |api_key| format!("{}:{}", domain, api_key)))
416 .collect::<Vec<String>>();
417 flattened.sort();
418 flattened
419 }
420
421 #[test]
422 fn deser_additional_endpoints_json_direct_mapping() {
423 let raw_input = r#""{\"app.datadoghq.com\":\"fake-api-key-1\",\"app.datadoghq.eu\":\"fake-api-key-2\"}""#;
424
425 let result = serde_yaml::from_str::<AdditionalEndpoints>(raw_input)
426 .expect("should not fail to deserialize AdditionalEndpoints from JSON string");
427
428 let expected = vec!["app.datadoghq.com:fake-api-key-1", "app.datadoghq.eu:fake-api-key-2"];
429 let actual = additional_endpoints_to_sorted_strings(&result);
430 assert_eq!(expected, actual);
431 }
432
433 #[test]
434 fn deser_additional_endpoints_json_multiple_api_keys() {
435 let raw_input = r#""{\"app.datadoghq.com\":[\"fake-api-key-1a\",\"fake-api-key-1b\"],\"app.datadoghq.eu\":[\"fake-api-key-2a\",\"fake-api-key-2b\"]}""#;
436
437 let result = serde_yaml::from_str::<AdditionalEndpoints>(raw_input)
438 .expect("should not fail to deserialize AdditionalEndpoints from JSON string");
439
440 let expected = vec![
441 "app.datadoghq.com:fake-api-key-1a",
442 "app.datadoghq.com:fake-api-key-1b",
443 "app.datadoghq.eu:fake-api-key-2a",
444 "app.datadoghq.eu:fake-api-key-2b",
445 ];
446 let actual = additional_endpoints_to_sorted_strings(&result);
447 assert_eq!(expected, actual);
448 }
449
450 #[test]
451 fn deser_additional_endpoints_direct_mapping() {
452 let raw_input = "app.datadoghq.com: fake-api-key-1\napp.datadoghq.eu: fake-api-key-2";
453
454 let result = serde_yaml::from_str::<AdditionalEndpoints>(raw_input)
455 .expect("should not fail to deserialize AdditionalEndpoints from YAML string");
456
457 let expected = vec!["app.datadoghq.com:fake-api-key-1", "app.datadoghq.eu:fake-api-key-2"];
458 let actual = additional_endpoints_to_sorted_strings(&result);
459 assert_eq!(expected, actual);
460 }
461
462 #[test]
463 fn deser_additional_endpoints_multiple_api_keys() {
464 let raw_input = "app.datadoghq.com:\n - fake-api-key-1a\n - fake-api-key-1b\napp.datadoghq.eu:\n - fake-api-key-2a\n - fake-api-key-2b";
465
466 let result = serde_yaml::from_str::<AdditionalEndpoints>(raw_input)
467 .expect("should not fail to deserialize AdditionalEndpoints from YAML string");
468
469 let expected = vec![
470 "app.datadoghq.com:fake-api-key-1a",
471 "app.datadoghq.com:fake-api-key-1b",
472 "app.datadoghq.eu:fake-api-key-2a",
473 "app.datadoghq.eu:fake-api-key-2b",
474 ];
475 let actual = additional_endpoints_to_sorted_strings(&result);
476 assert_eq!(expected, actual);
477 }
478
479 #[test]
480 fn add_version_prefix() {
481 let input_urls = [
482 "https://app.datadoghq.com", "https://app.datadoghq.eu", "app.ddog-gov.com", "app.us2.datadoghq.com", "https://app.xx9.datadoghq.com", ];
488 let expected_hosts = [
489 "datadoghq.com",
490 "datadoghq.eu",
491 "ddog-gov.com",
492 "us2.datadoghq.com",
493 "xx9.datadoghq.com",
494 ]
495 .iter()
496 .map(|s| format!("{}.{}", get_data_plane_version_prefix(), s))
497 .collect::<Vec<_>>();
498
499 for (input_url, expected_host) in input_urls.iter().zip(expected_hosts) {
500 let resolved =
501 ResolvedEndpoint::from_raw_endpoint(input_url, "fake_api_key").expect("error resolving endpoint");
502 assert_eq!(
503 expected_host,
504 resolved.endpoint().host_str().expect("error getting host")
505 );
506 }
507 }
508
509 #[test]
510 fn skip_version_prefix() {
511 let input_urls = [
512 "https://custom.datadoghq.com", "https://custom.agent.datadoghq.com", "https://app.custom.datadoghq.com", "https://app.datadoghq.internal", "https://app.myproxy.com", ];
518 let expected_hosts = [
519 "custom.datadoghq.com",
520 "custom.agent.datadoghq.com",
521 "app.custom.datadoghq.com",
522 "app.datadoghq.internal",
523 "app.myproxy.com",
524 ];
525
526 for (input_url, expected_host) in input_urls.iter().zip(expected_hosts) {
527 let resolved =
528 ResolvedEndpoint::from_raw_endpoint(input_url, "fake_api_key").expect("error resolving endpoint");
529 assert_eq!(
530 expected_host,
531 resolved.endpoint().host_str().expect("error getting host")
532 );
533 }
534 }
535
536 #[test]
537 fn calculate_api_endpoint_no_override_no_site() {
538 let prefix = get_data_plane_version_prefix();
539 let expected_endpoint = format!("https://{}.{}/", prefix, DEFAULT_SITE);
540
541 let resolved = calculate_resolved_endpoint(None, "", "").expect("error calculating default API endpoint");
542 assert_eq!(expected_endpoint, resolved.endpoint().to_string());
543 }
544
545 #[test]
546 fn calculate_api_endpoint_no_override() {
547 let site = "us3.datadoghq.com";
548 let prefix = get_data_plane_version_prefix();
549 let expected_endpoint = format!("https://{}.{}/", prefix, site);
550
551 let resolved =
552 calculate_resolved_endpoint(None, "us3.datadoghq.com", "").expect("error calculating custom API endpoint");
553 assert_eq!(expected_endpoint, resolved.endpoint().to_string());
554 }
555
556 #[test]
557 fn calculate_api_endpoint_no_site() {
558 let override_url = "https://dogpound.io/";
559 let expected_endpoint = override_url;
560
561 let resolved =
562 calculate_resolved_endpoint(Some(override_url), "", "").expect("error calculating override API endpoint");
563 assert_eq!(expected_endpoint, resolved.endpoint().to_string());
564 }
565
566 #[test]
567 fn calculate_api_endpoint_override_and_site() {
568 let override_url = "https://dogpound.io/";
569 let expected_endpoint = override_url;
570
571 let resolved = calculate_resolved_endpoint(Some(override_url), "us3.datadoghq.com", "")
572 .expect("error calculating override API endpoint");
573 assert_eq!(expected_endpoint, resolved.endpoint().to_string());
574 }
575}