Skip to main content

saluki_components/common/datadog/
endpoints.rs

1use std::{
2    collections::{HashMap, HashSet},
3    str::FromStr,
4    sync::LazyLock,
5};
6
7use http::uri::Authority;
8use regex::Regex;
9use saluki_config::GenericConfiguration;
10use saluki_error::{ErrorContext as _, GenericError};
11use saluki_metadata;
12use serde::Deserialize;
13use serde_with::{serde_as, DisplayFromStr, OneOrMany, PickFirst};
14use snafu::{ResultExt, Snafu};
15use tracing::debug;
16use url::Url;
17
18static DD_URL_REGEX: LazyLock<Regex> =
19    LazyLock::new(|| Regex::new(r"^app(\.mrf)?(\.[a-z]{2}\d)?\.(datad(oghq|0g)\.(com|eu)|ddog-gov\.com)$").unwrap());
20
21pub const DEFAULT_SITE: &str = "datadoghq.com";
22
23fn default_site() -> String {
24    DEFAULT_SITE.to_owned()
25}
26
27/// Error type for invalid endpoints.
28#[derive(Debug, Snafu)]
29#[snafu(context(suffix(false)))]
30pub(crate) enum EndpointError {
31    Parse { source: url::ParseError, endpoint: String },
32}
33
34#[serde_as]
35#[derive(Clone, Debug, Default, Deserialize)]
36struct APIKeys(#[serde_as(as = "OneOrMany<_>")] Vec<String>);
37
38#[derive(Clone, Debug, Default, Deserialize)]
39struct MappedAPIKeys(HashMap<String, APIKeys>);
40
41impl MappedAPIKeys {
42    fn mappings(&self) -> impl Iterator<Item = (&str, &APIKeys)> {
43        self.0.iter().map(|(k, v)| (k.as_str(), v))
44    }
45}
46
47impl FromStr for MappedAPIKeys {
48    type Err = serde_json::Error;
49
50    fn from_str(s: &str) -> Result<Self, Self::Err> {
51        let inner = serde_json::from_str(s)?;
52        Ok(Self(inner))
53    }
54}
55
56/// A set of additional API endpoints to forward metrics to.
57///
58/// Each endpoint can be associated with multiple API keys. Requests will be forwarded to each unique endpoint/API key pair.
59#[serde_as]
60#[derive(Clone, Debug, Default, Deserialize)]
61pub(crate) struct AdditionalEndpoints(#[serde_as(as = "PickFirst<(DisplayFromStr, _)>")] MappedAPIKeys);
62
63impl AdditionalEndpoints {
64    /// Returns the resolved endpoints from the additional endpoint configuration.
65    ///
66    /// This will generate a `ResolvedEndpoint` for each unique endpoint/API key pair.
67    ///
68    /// # Errors
69    ///
70    /// If any of the additional endpoints are not valid URLs, or a valid URL could not be constructed after applying
71    /// the necessary normalization / modifications, an error will be returned.
72    pub fn resolved_endpoints(&self) -> Result<Vec<ResolvedEndpoint>, EndpointError> {
73        let mut resolved = Vec::new();
74
75        for (raw_endpoint, api_keys) in self.0.mappings() {
76            let endpoint = parse_and_normalize_endpoint(raw_endpoint)?;
77            let logs_authority = compute_logs_authority(&endpoint);
78            let traces_authority = compute_traces_authority(&endpoint);
79
80            // With our fully parsed and versioned endpoint, we'll now create a resolved version for each associated API
81            // key attached to it.
82            let mut seen = HashSet::new();
83            for api_key in &api_keys.0 {
84                // Filter out empty or duplicate API keys for this endpoint.
85                let trimmed_api_key = api_key.trim();
86                if trimmed_api_key.is_empty() || seen.contains(trimmed_api_key) {
87                    continue;
88                }
89
90                seen.insert(trimmed_api_key);
91                resolved.push(ResolvedEndpoint {
92                    endpoint: endpoint.clone(),
93                    api_key: trimmed_api_key.to_string(),
94                    config: None,
95                    logs_authority: logs_authority.clone(),
96                    traces_authority: traces_authority.clone(),
97                });
98            }
99        }
100
101        Ok(resolved)
102    }
103}
104
105/// Endpoint configuration for sending payloads to the Datadog platform.
106#[derive(Clone, Deserialize)]
107pub struct EndpointConfiguration {
108    /// The API key to use.
109    api_key: String,
110
111    /// The site to send metrics to.
112    ///
113    /// This is the base domain for the Datadog site in which the API key originates from. This will generally be a
114    /// portion of the domain used to access the Datadog UI, such as `datadoghq.com` or `us5.datadoghq.com`.
115    ///
116    /// Defaults to `datadoghq.com`.
117    #[serde(default = "default_site")]
118    site: String,
119
120    /// The full URL base to send metrics to.
121    ///
122    /// This takes precedence over `site`, and is not altered in any way. This can be useful to specifying the exact
123    /// endpoint used, such as when looking to change the scheme (for example, `http` vs `https`) or specifying a custom port,
124    /// which are both useful when proxying traffic to an intermediate destination before forwarding to Datadog.
125    ///
126    /// Defaults to unset.
127    #[serde(default)]
128    dd_url: Option<String>,
129
130    /// Enables sending data to multiple endpoints and/or with multiple API keys via dual shipping.
131    ///
132    /// Defaults to empty.
133    #[serde(default)]
134    additional_endpoints: AdditionalEndpoints,
135}
136
137impl EndpointConfiguration {
138    /// Sets the full URL base to send metrics to.
139    pub fn set_dd_url(&mut self, url: String) {
140        self.dd_url = Some(url);
141    }
142
143    /// Sets the API key to use.
144    pub fn set_api_key(&mut self, api_key: String) {
145        self.api_key = api_key;
146    }
147
148    /// Clears all additional endpoints.
149    pub fn clear_additional_endpoints(&mut self) {
150        self.additional_endpoints = AdditionalEndpoints::default();
151    }
152
153    /// Builds the resolved endpoints from the endpoint configuration.
154    ///
155    /// This will generate a `ResolvedEndpoint` for each unique endpoint/API key pair, which includes the "primary"
156    /// endpoint defined by `site`/`dd_url` and any additional endpoints defined in `additional_endpoints`.
157    ///
158    /// # Errors
159    ///
160    /// If any of the additional endpoints are not valid URLs, or a valid URL could not be constructed after applying
161    /// the necessary normalization / modifications to a particular endpoint, an error will be returned.
162    pub fn build_resolved_endpoints(
163        &self, configuration: Option<GenericConfiguration>,
164    ) -> Result<Vec<ResolvedEndpoint>, GenericError> {
165        let primary_endpoint = calculate_resolved_endpoint(self.dd_url.as_deref(), &self.site, &self.api_key)
166            .error_context("Failed parsing/resolving the primary destination endpoint.")?
167            .with_configuration(configuration);
168
169        let additional_endpoints = self
170            .additional_endpoints
171            .resolved_endpoints()
172            .error_context("Failed parsing/resolving the additional destination endpoints.")?;
173
174        let mut endpoints = additional_endpoints;
175        endpoints.insert(0, primary_endpoint);
176
177        Ok(endpoints)
178    }
179}
180
181/// A single API endpoint and its associated API key.
182///
183/// An endpoint is defined as a unique, fully-qualified domain name that metrics will be sent to, such as
184/// `https://app.datadoghq.com`.
185#[derive(Clone, Debug)]
186pub struct ResolvedEndpoint {
187    endpoint: Url,
188    api_key: String,
189    config: Option<GenericConfiguration>,
190    /// Pre-computed logs intake authority (for example, `agent-http-intake.logs.datadoghq.com`).
191    /// This is derived from the endpoint host when it contains `.agent.` marker.
192    logs_authority: Option<Authority>,
193    /// Pre-computed traces intake authority (for example, `trace.agent.datadoghq.com`).
194    /// This is derived from the endpoint host when it contains `.agent.` marker.
195    traces_authority: Option<Authority>,
196}
197
198impl ResolvedEndpoint {
199    /// Creates a new `ResolvedEndpoint` instance from the given endpoint and API key, normalizing and modifying the
200    /// endpoint as necessary.
201    ///
202    /// # Errors
203    ///
204    /// If the given endpoint is not a valid URL, or a valid URL could not be constructed after applying the necessary
205    /// normalization / modifications, an error will be returned.
206    fn from_raw_endpoint(raw_endpoint: &str, api_key: &str) -> Result<Self, EndpointError> {
207        let endpoint = parse_and_normalize_endpoint(raw_endpoint)?;
208        let logs_authority = compute_logs_authority(&endpoint);
209        let traces_authority = compute_traces_authority(&endpoint);
210        Ok(Self {
211            endpoint,
212            api_key: api_key.to_string(),
213            config: None,
214            logs_authority,
215            traces_authority,
216        })
217    }
218
219    /// Creates a new  `ResolvedEndpoint` instance from an existing `ResolvedEndpoint`, adding an optional `GenericConfiguration` which can be used to fetch the up-to-date API key.
220    pub fn with_configuration(self, config: Option<GenericConfiguration>) -> Self {
221        Self {
222            endpoint: self.endpoint,
223            api_key: self.api_key,
224            config,
225            logs_authority: self.logs_authority,
226            traces_authority: self.traces_authority,
227        }
228    }
229
230    /// Returns the endpoint of the resolver.
231    pub fn endpoint(&self) -> &Url {
232        &self.endpoint
233    }
234
235    /// Returns the API key associated with the endpoint.
236    ///
237    /// If a [`GenericConfiguration`] has been configured, the API key will be queried from the configuration and
238    /// stored if it has been updated since the last time `api_key` was called.
239    pub fn api_key(&mut self) -> &str {
240        if let Some(config) = &self.config {
241            match config.try_get_typed::<String>("api_key") {
242                Ok(Some(api_key)) => {
243                    if !api_key.is_empty() && self.api_key != api_key {
244                        debug!(endpoint = %self.endpoint, "Refreshed API key.");
245                        self.api_key = api_key;
246                    }
247                }
248                Ok(None) | Err(_) => {
249                    debug!("Failed to retrieve API key from remote source (missing or wrong type). Continuing with last known valid API key.");
250                }
251            }
252        }
253        self.api_key.as_str()
254    }
255
256    /// Returns the API key associated with the endpoint without refreshing it.
257    pub fn cached_api_key(&self) -> &str {
258        self.api_key.as_str()
259    }
260
261    /// Returns the pre-computed logs intake authority, if available.
262    ///
263    /// This authority is derived from the endpoint host when it contains the `.agent.` marker,
264    /// and is used for routing log payloads to the appropriate logs intake host.
265    pub fn logs_authority(&self) -> Option<&Authority> {
266        self.logs_authority.as_ref()
267    }
268
269    /// Returns the pre-computed traces intake authority, if available.
270    pub fn traces_authority(&self) -> Option<&Authority> {
271        self.traces_authority.as_ref()
272    }
273}
274
275fn parse_and_normalize_endpoint(raw_endpoint: &str) -> Result<Url, EndpointError> {
276    // Start out by parsing the given domain/endpoint, which means ensuring first that it has a scheme.
277    //
278    // If no scheme is present, we assume HTTPS.
279    let raw_endpoint = if !raw_endpoint.starts_with("http://") && !raw_endpoint.starts_with("https://") {
280        format!("https://{}", raw_endpoint)
281    } else {
282        raw_endpoint.to_string()
283    };
284
285    let endpoint = Url::parse(&raw_endpoint).context(Parse { endpoint: raw_endpoint })?;
286
287    // With our valid endpoint URL, we'll optionally prefix it with a subdomain that represents the data plane version,
288    // which differentiates the traffic between different versions of the data plane application.
289    //
290    // This prefixing only occurs for official Datadog API endpoints.
291    add_data_plane_version_prefix(endpoint)
292}
293
294/// Returns a specialized domain prefix based on the versioning of the current application.
295///
296/// This generates a prefix that is similar in format to the one generated by Datadog Agent for determining the endpoint
297/// to send metrics to.
298fn get_data_plane_version_prefix() -> String {
299    let app_details = saluki_metadata::get_app_details();
300    let version = app_details.version();
301    format!(
302        "{}-{}-{}-{}.agent",
303        version.major(),
304        version.minor(),
305        version.patch(),
306        app_details.identifier(),
307    )
308}
309
310/// Prefixes the given API endpoint with the version of the data plane process.
311///
312/// If the given API endpoint does not include a scheme, `https` is assumed. As well, if the endpoint does not represent
313/// an official Datadog API endpoint, it will not be modified.
314///
315/// # Errors
316///
317/// If the given API endpoint cannot be parsed as a valid URL, an error will be returned.
318fn add_data_plane_version_prefix(mut endpoint: Url) -> Result<Url, EndpointError> {
319    let new_host = match endpoint.host_str() {
320        Some(host) => {
321            // Do not update non-official Datadog URLs.
322            if !DD_URL_REGEX.is_match(host) {
323                debug!("Configured endpoint '{}' appears to be a non-Datadog endpoint. Utilizing endpoint without modification.", host);
324                return Ok(endpoint);
325            }
326
327            // We expect to be getting a domain that has at least one subdomain portion (i.e., `app.datadoghq.com`) if
328            // not more. We're aiming to simply replace the leftmost subdomain portion with the version prefix.
329            let leftmost_segment = host.split('.').next().unwrap_or("");
330            let versioned_segment = get_data_plane_version_prefix();
331            host.replacen(leftmost_segment, &versioned_segment, 1)
332        }
333        None => {
334            return Err(EndpointError::Parse {
335                source: url::ParseError::EmptyHost,
336                endpoint: endpoint.to_string(),
337            })
338        }
339    };
340
341    // Update the host with the prefixed version.
342    if let Err(e) = endpoint.set_host(Some(new_host.as_str())) {
343        return Err(EndpointError::Parse {
344            source: e,
345            endpoint: endpoint.to_string(),
346        });
347    }
348
349    Ok(endpoint)
350}
351
352/// Calculates the correct API endpoint to use based on the given override URL and site settings.
353///
354/// # Errors
355///
356/// If an override URL is provided and cannot be parsed, or if a valid endpoint cannot be constructed from the given
357/// site, an error will be returned.
358fn calculate_resolved_endpoint(
359    override_url: Option<&str>, site: &str, api_key: &str,
360) -> Result<ResolvedEndpoint, EndpointError> {
361    let raw_endpoint = match override_url {
362        // If an override URL is provided, use it directly.
363        Some(url) => url.to_string(),
364        None => {
365            // When using the site, we'll provide the default US-based site if the site value is empty.
366            //
367            // We also do a little bit of prefixing to get it in the right shape before creating the resolved endpoint.
368            let base_domain = if site.is_empty() { DEFAULT_SITE } else { site };
369            format!("app.{}", base_domain)
370        }
371    };
372
373    ResolvedEndpoint::from_raw_endpoint(&raw_endpoint, api_key)
374}
375
376/// Computes the logs intake authority from a resolved endpoint URL.
377///
378/// If the endpoint host contains the `.agent.` marker (for example, `7-52-0-adp.agent.datadoghq.com`),
379/// this extracts the site suffix and constructs the logs intake host in the form
380/// `agent-http-intake.logs.{site}`.
381///
382/// Returns `None` if the host doesn't contain the marker or if the authority cannot be parsed.
383fn compute_logs_authority(endpoint: &Url) -> Option<Authority> {
384    const AGENT_HOST_MARKER: &str = ".agent.";
385
386    let host = endpoint.host_str()?;
387    let idx = host.find(AGENT_HOST_MARKER)?;
388    let site = &host[idx + AGENT_HOST_MARKER.len()..];
389    let logs_host = format!("agent-http-intake.logs.{}", site);
390
391    Authority::from_str(&logs_host).ok()
392}
393
394/// Computes the traces intake authority from a resolved endpoint URL.
395/// Returns `None` if the host doesn't contain the marker or if the authority cannot be parsed.
396fn compute_traces_authority(endpoint: &Url) -> Option<Authority> {
397    const AGENT_HOST_MARKER: &str = ".agent.";
398
399    let host = endpoint.host_str()?;
400    let idx = host.find(AGENT_HOST_MARKER)?;
401    let site = &host[idx + AGENT_HOST_MARKER.len()..];
402    let traces_host = format!("trace.agent.{}", site);
403
404    Authority::from_str(&traces_host).ok()
405}
406
407#[cfg(test)]
408mod tests {
409    use super::*;
410
411    fn additional_endpoints_to_sorted_strings(endpoints: &AdditionalEndpoints) -> Vec<String> {
412        let mut flattened = endpoints
413            .0
414            .mappings()
415            .flat_map(|(domain, api_keys)| api_keys.0.iter().map(move |api_key| format!("{}:{}", domain, api_key)))
416            .collect::<Vec<String>>();
417        flattened.sort();
418        flattened
419    }
420
421    #[test]
422    fn deser_additional_endpoints_json_direct_mapping() {
423        let raw_input = r#""{\"app.datadoghq.com\":\"fake-api-key-1\",\"app.datadoghq.eu\":\"fake-api-key-2\"}""#;
424
425        let result = serde_yaml::from_str::<AdditionalEndpoints>(raw_input)
426            .expect("should not fail to deserialize AdditionalEndpoints from JSON string");
427
428        let expected = vec!["app.datadoghq.com:fake-api-key-1", "app.datadoghq.eu:fake-api-key-2"];
429        let actual = additional_endpoints_to_sorted_strings(&result);
430        assert_eq!(expected, actual);
431    }
432
433    #[test]
434    fn deser_additional_endpoints_json_multiple_api_keys() {
435        let raw_input = r#""{\"app.datadoghq.com\":[\"fake-api-key-1a\",\"fake-api-key-1b\"],\"app.datadoghq.eu\":[\"fake-api-key-2a\",\"fake-api-key-2b\"]}""#;
436
437        let result = serde_yaml::from_str::<AdditionalEndpoints>(raw_input)
438            .expect("should not fail to deserialize AdditionalEndpoints from JSON string");
439
440        let expected = vec![
441            "app.datadoghq.com:fake-api-key-1a",
442            "app.datadoghq.com:fake-api-key-1b",
443            "app.datadoghq.eu:fake-api-key-2a",
444            "app.datadoghq.eu:fake-api-key-2b",
445        ];
446        let actual = additional_endpoints_to_sorted_strings(&result);
447        assert_eq!(expected, actual);
448    }
449
450    #[test]
451    fn deser_additional_endpoints_direct_mapping() {
452        let raw_input = "app.datadoghq.com: fake-api-key-1\napp.datadoghq.eu: fake-api-key-2";
453
454        let result = serde_yaml::from_str::<AdditionalEndpoints>(raw_input)
455            .expect("should not fail to deserialize AdditionalEndpoints from YAML string");
456
457        let expected = vec!["app.datadoghq.com:fake-api-key-1", "app.datadoghq.eu:fake-api-key-2"];
458        let actual = additional_endpoints_to_sorted_strings(&result);
459        assert_eq!(expected, actual);
460    }
461
462    #[test]
463    fn deser_additional_endpoints_multiple_api_keys() {
464        let raw_input = "app.datadoghq.com:\n  - fake-api-key-1a\n  - fake-api-key-1b\napp.datadoghq.eu:\n  - fake-api-key-2a\n  - fake-api-key-2b";
465
466        let result = serde_yaml::from_str::<AdditionalEndpoints>(raw_input)
467            .expect("should not fail to deserialize AdditionalEndpoints from YAML string");
468
469        let expected = vec![
470            "app.datadoghq.com:fake-api-key-1a",
471            "app.datadoghq.com:fake-api-key-1b",
472            "app.datadoghq.eu:fake-api-key-2a",
473            "app.datadoghq.eu:fake-api-key-2b",
474        ];
475        let actual = additional_endpoints_to_sorted_strings(&result);
476        assert_eq!(expected, actual);
477    }
478
479    #[test]
480    fn add_version_prefix() {
481        let input_urls = [
482            "https://app.datadoghq.com",     // US
483            "https://app.datadoghq.eu",      // EU
484            "app.ddog-gov.com",              // Gov
485            "app.us2.datadoghq.com",         // Additional Site
486            "https://app.xx9.datadoghq.com", // Arbitrary site
487        ];
488        let expected_hosts = [
489            "datadoghq.com",
490            "datadoghq.eu",
491            "ddog-gov.com",
492            "us2.datadoghq.com",
493            "xx9.datadoghq.com",
494        ]
495        .iter()
496        .map(|s| format!("{}.{}", get_data_plane_version_prefix(), s))
497        .collect::<Vec<_>>();
498
499        for (input_url, expected_host) in input_urls.iter().zip(expected_hosts) {
500            let resolved =
501                ResolvedEndpoint::from_raw_endpoint(input_url, "fake_api_key").expect("error resolving endpoint");
502            assert_eq!(
503                expected_host,
504                resolved.endpoint().host_str().expect("error getting host")
505            );
506        }
507    }
508
509    #[test]
510    fn skip_version_prefix() {
511        let input_urls = [
512            "https://custom.datadoghq.com",       // Custom
513            "https://custom.agent.datadoghq.com", // Custom with 'agent' subdomain
514            "https://app.custom.datadoghq.com",   // Custom
515            "https://app.datadoghq.internal",     // Custom top-level domain
516            "https://app.myproxy.com",            // Proxy
517        ];
518        let expected_hosts = [
519            "custom.datadoghq.com",
520            "custom.agent.datadoghq.com",
521            "app.custom.datadoghq.com",
522            "app.datadoghq.internal",
523            "app.myproxy.com",
524        ];
525
526        for (input_url, expected_host) in input_urls.iter().zip(expected_hosts) {
527            let resolved =
528                ResolvedEndpoint::from_raw_endpoint(input_url, "fake_api_key").expect("error resolving endpoint");
529            assert_eq!(
530                expected_host,
531                resolved.endpoint().host_str().expect("error getting host")
532            );
533        }
534    }
535
536    #[test]
537    fn calculate_api_endpoint_no_override_no_site() {
538        let prefix = get_data_plane_version_prefix();
539        let expected_endpoint = format!("https://{}.{}/", prefix, DEFAULT_SITE);
540
541        let resolved = calculate_resolved_endpoint(None, "", "").expect("error calculating default API endpoint");
542        assert_eq!(expected_endpoint, resolved.endpoint().to_string());
543    }
544
545    #[test]
546    fn calculate_api_endpoint_no_override() {
547        let site = "us3.datadoghq.com";
548        let prefix = get_data_plane_version_prefix();
549        let expected_endpoint = format!("https://{}.{}/", prefix, site);
550
551        let resolved =
552            calculate_resolved_endpoint(None, "us3.datadoghq.com", "").expect("error calculating custom API endpoint");
553        assert_eq!(expected_endpoint, resolved.endpoint().to_string());
554    }
555
556    #[test]
557    fn calculate_api_endpoint_no_site() {
558        let override_url = "https://dogpound.io/";
559        let expected_endpoint = override_url;
560
561        let resolved =
562            calculate_resolved_endpoint(Some(override_url), "", "").expect("error calculating override API endpoint");
563        assert_eq!(expected_endpoint, resolved.endpoint().to_string());
564    }
565
566    #[test]
567    fn calculate_api_endpoint_override_and_site() {
568        let override_url = "https://dogpound.io/";
569        let expected_endpoint = override_url;
570
571        let resolved = calculate_resolved_endpoint(Some(override_url), "us3.datadoghq.com", "")
572            .expect("error calculating override API endpoint");
573        assert_eq!(expected_endpoint, resolved.endpoint().to_string());
574    }
575}