Skip to main content

saluki_components/forwarders/datadog/
mod.rs

1use async_trait::async_trait;
2use http::Uri;
3use resource_accounting::{MemoryBounds, MemoryBoundsBuilder, UsageExpr};
4use saluki_common::buf::FrozenChunkedBytesBuffer;
5use saluki_config::GenericConfiguration;
6use saluki_core::{
7    components::{forwarders::*, ComponentContext},
8    data_model::payload::PayloadType,
9    observability::ComponentMetricsExt as _,
10};
11use saluki_error::GenericError;
12use saluki_metrics::MetricsBuilder;
13use stringtheory::MetaString;
14use tokio::select;
15use tracing::debug;
16
17use crate::common::datadog::{
18    config::ForwarderConfiguration,
19    io::TransactionForwarder,
20    telemetry::ComponentTelemetry,
21    transaction::{Metadata, Transaction},
22    validation::ValidationReadiness,
23    DEFAULT_INTAKE_COMPRESSED_SIZE_LIMIT,
24};
25
26/// Datadog forwarder.
27///
28/// Forwards Datadog-specific payloads to the Datadog platform. Handles the standard Datadog Agent configuration,
29/// in terms of specifying additional endpoints, adding the necessary HTTP request headers for authentication,
30/// identification, and more.
31pub struct DatadogConfiguration {
32    /// Forwarder configuration settings.
33    ///
34    /// See [`ForwarderConfiguration`] for more information about the available settings.
35    forwarder_config: ForwarderConfiguration,
36
37    configuration: Option<GenericConfiguration>,
38}
39
40impl DatadogConfiguration {
41    /// Creates a new `DatadogConfiguration` from the given configuration.
42    pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
43        let forwarder_config = ForwarderConfiguration::from_configuration(config)?;
44        Ok(Self {
45            forwarder_config,
46            configuration: Some(config.clone()),
47        })
48    }
49
50    /// Overrides the default endpoint and refreshes its API key from the given config path.
51    ///
52    /// This is for override endpoints whose API key does not refresh from the top-level `api_key`
53    /// config path, such as Multi-Region Failover.
54    pub fn with_endpoint_override_and_api_key_refresh_config_path(
55        mut self, dd_url: String, api_key: String, api_key_refresh_config_path: &'static str,
56    ) -> Self {
57        self.apply_endpoint_override(dd_url, api_key, api_key_refresh_config_path);
58
59        self
60    }
61
62    fn apply_endpoint_override(&mut self, dd_url: String, api_key: String, api_key_refresh_config_path: &'static str) {
63        // Clear any existing additional endpoints, and set the new DD URL and API key.
64        //
65        // This ensures that the only endpoint we'll send to is this one.
66        let endpoint = self.forwarder_config.endpoint_mut();
67        endpoint.clear_additional_endpoints();
68        endpoint.set_dd_url(dd_url);
69        endpoint.set_api_key(api_key);
70        endpoint.set_api_key_refresh_config_path(api_key_refresh_config_path);
71        self.forwarder_config.clear_opw_metrics_endpoint();
72    }
73}
74
75#[async_trait]
76impl ForwarderBuilder for DatadogConfiguration {
77    fn input_payload_type(&self) -> PayloadType {
78        PayloadType::Http
79    }
80
81    async fn build(&self, context: ComponentContext) -> Result<Box<dyn Forwarder + Send>, GenericError> {
82        let metrics_builder = MetricsBuilder::from_component_context(&context);
83        let telemetry = ComponentTelemetry::from_builder(&metrics_builder);
84        let forwarder = TransactionForwarder::from_config(
85            context,
86            self.forwarder_config.clone(),
87            self.configuration.clone(),
88            get_dd_endpoint_name,
89            telemetry.clone(),
90            metrics_builder,
91        )?;
92
93        Ok(Box::new(Datadog { forwarder }))
94    }
95}
96
97impl MemoryBounds for DatadogConfiguration {
98    fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
99        builder
100            .minimum()
101            .with_single_value::<Datadog>("component struct")
102            .with_array::<Transaction<FrozenChunkedBytesBuffer>>("requests channel", 8);
103
104        builder
105            .firm()
106            // TODO: This is a little wonky because we're accounting for the firm bound portion of connected encoders here, as this
107            // is the only place where we can calculate how many requests we'll hold on to in memory, which is what ultimately influences
108            // the firm usage.
109            //
110            // We're also cheating by knowing what the largest possible payload is that we'll potentially see, based on the limits on the
111            // Datadog encoders. This won't necessarily hold up for future sources/encoders, but is good enough for now.
112            .with_expr(UsageExpr::sum(
113                "in-flight requests",
114                UsageExpr::config(
115                    "forwarder_retry_queue_payloads_max_size",
116                    self.forwarder_config.retry().queue_max_size_bytes() as usize,
117                ),
118                UsageExpr::product(
119                    "high priority queue",
120                    UsageExpr::config(
121                        "forwarder_high_prio_buffer_size",
122                        self.forwarder_config.endpoint_buffer_size(),
123                    ),
124                    // TODO: The default compressed size limit just so happens to be the biggest one we currently default with on our side,
125                    // but it's not clear that this will always be the case.
126                    UsageExpr::constant("maximum compressed payload size", DEFAULT_INTAKE_COMPRESSED_SIZE_LIMIT),
127                ),
128            ));
129    }
130}
131
132pub struct Datadog {
133    forwarder: TransactionForwarder<FrozenChunkedBytesBuffer>,
134}
135
136#[async_trait]
137impl Forwarder for Datadog {
138    async fn run(mut self: Box<Self>, mut context: ForwarderContext) -> Result<(), GenericError> {
139        let Self { forwarder } = *self;
140
141        let mut health = context.take_health_handle();
142
143        let mut validation = forwarder.api_key_validator().spawn();
144
145        // Spawn our forwarder task to handle sending requests.
146        let forwarder = forwarder.spawn().await;
147
148        debug!("Datadog forwarder started.");
149
150        loop {
151            select! {
152                _ = health.live() => continue,
153                readiness = validation.wait_for_change() => match readiness {
154                    ValidationReadiness::Ready => health.mark_ready(),
155                    ValidationReadiness::NotReady => health.mark_not_ready(),
156                },
157                maybe_payload = context.payloads().next() => match maybe_payload {
158                    Some(payload) => if let Some(http_payload) = payload.try_into_http_payload() {
159                        let (payload_meta, request) = http_payload.into_parts();
160                        let transaction_meta = Metadata::from_event_and_data_point_count(
161                            payload_meta.event_count(),
162                            payload_meta.data_point_count(),
163                        );
164                        let transaction = Transaction::from_original(transaction_meta, request);
165
166                        forwarder.send_transaction(transaction).await?;
167                    }
168                    None => break,
169                },
170            }
171        }
172
173        // Shutdown the forwarder gracefully.
174        validation.abort();
175        forwarder.shutdown().await;
176
177        debug!("Datadog forwarder stopped.");
178
179        Ok(())
180    }
181}
182
183fn get_dd_endpoint_name(uri: &Uri) -> Option<MetaString> {
184    match uri.path() {
185        "/api/v2/logs" => Some(MetaString::from_static("logs_v2")),
186        "/api/v1/series" => Some(MetaString::from_static("series_v1")),
187        "/api/v2/series" => Some(MetaString::from_static("series_v2")),
188        "/api/beta/sketches" => Some(MetaString::from_static("sketches_v2")),
189        "/api/v1/check_run" => Some(MetaString::from_static("check_run_v1")),
190        "/api/v1/events_batch" => Some(MetaString::from_static("events_batch_v1")),
191        "/api/v0.2/traces" => Some(MetaString::from_static("traces_v0.2")),
192        _ => None,
193    }
194}
195
196#[cfg(test)]
197mod tests {
198    use saluki_config::ConfigurationLoader;
199    use serde_json::json;
200
201    use super::*;
202
203    #[tokio::test]
204    async fn endpoint_override_refreshes_from_mrf_api_key() {
205        let (generic_config, sender) = ConfigurationLoader::for_tests(
206            Some(json!({
207                "api_key": "primary-api-key",
208                "multi_region_failover": {
209                    "api_key": "mrf-api-key"
210                }
211            })),
212            None,
213            true,
214        )
215        .await;
216        let sender = sender.expect("dynamic sender should exist");
217        sender
218            .send(saluki_config::dynamic::ConfigUpdate::Snapshot(json!({})))
219            .await
220            .expect("initial dynamic snapshot should be sent");
221        generic_config.ready().await;
222
223        let config = DatadogConfiguration::from_configuration(&generic_config)
224            .expect("DatadogConfiguration should parse")
225            .with_endpoint_override_and_api_key_refresh_config_path(
226                "http://mrf.example.test".to_string(),
227                "mrf-api-key".to_string(),
228                "multi_region_failover.api_key",
229            );
230
231        let mut endpoints = config
232            .forwarder_config
233            .build_routable_endpoints(config.configuration.clone())
234            .expect("endpoint should resolve");
235
236        assert_eq!(endpoints.len(), 1);
237        let (_, mut endpoint) = endpoints.pop().unwrap().into_parts();
238        assert_eq!(endpoint.cached_api_key(), "mrf-api-key");
239        assert!(endpoint.has_configuration());
240        assert_eq!(endpoint.api_key(), "mrf-api-key");
241
242        sender
243            .send(saluki_config::dynamic::ConfigUpdate::Partial {
244                key: "api_key".to_string(),
245                value: json!("rotated-primary-api-key"),
246            })
247            .await
248            .expect("primary API key update should be sent");
249        sender
250            .send(saluki_config::dynamic::ConfigUpdate::Partial {
251                key: "multi_region_failover.api_key".to_string(),
252                value: json!("rotated-mrf-api-key"),
253            })
254            .await
255            .expect("MRF API key update should be sent");
256
257        let deadline = std::time::Instant::now() + std::time::Duration::from_secs(2);
258        loop {
259            if endpoint.api_key() == "rotated-mrf-api-key" {
260                break;
261            }
262            assert!(
263                std::time::Instant::now() < deadline,
264                "timed out waiting for endpoint override to refresh from MRF API key"
265            );
266            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
267        }
268    }
269}