saluki_components/forwarders/datadog/
mod.rs1use async_trait::async_trait;
2use http::Uri;
3use resource_accounting::{MemoryBounds, MemoryBoundsBuilder, UsageExpr};
4use saluki_common::buf::FrozenChunkedBytesBuffer;
5use saluki_config::GenericConfiguration;
6use saluki_core::{
7 components::{forwarders::*, ComponentContext},
8 data_model::payload::PayloadType,
9 observability::ComponentMetricsExt as _,
10};
11use saluki_error::GenericError;
12use saluki_metrics::MetricsBuilder;
13use stringtheory::MetaString;
14use tokio::select;
15use tracing::debug;
16
17use crate::common::datadog::{
18 config::ForwarderConfiguration,
19 io::TransactionForwarder,
20 telemetry::ComponentTelemetry,
21 transaction::{Metadata, Transaction},
22 validation::ValidationReadiness,
23 DEFAULT_INTAKE_COMPRESSED_SIZE_LIMIT,
24};
25
26pub struct DatadogConfiguration {
32 forwarder_config: ForwarderConfiguration,
36
37 configuration: Option<GenericConfiguration>,
38}
39
40impl DatadogConfiguration {
41 pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
43 let forwarder_config = ForwarderConfiguration::from_configuration(config)?;
44 Ok(Self {
45 forwarder_config,
46 configuration: Some(config.clone()),
47 })
48 }
49
50 pub fn with_endpoint_override_and_api_key_refresh_config_path(
55 mut self, dd_url: String, api_key: String, api_key_refresh_config_path: &'static str,
56 ) -> Self {
57 self.apply_endpoint_override(dd_url, api_key, api_key_refresh_config_path);
58
59 self
60 }
61
62 fn apply_endpoint_override(&mut self, dd_url: String, api_key: String, api_key_refresh_config_path: &'static str) {
63 let endpoint = self.forwarder_config.endpoint_mut();
67 endpoint.clear_additional_endpoints();
68 endpoint.set_dd_url(dd_url);
69 endpoint.set_api_key(api_key);
70 endpoint.set_api_key_refresh_config_path(api_key_refresh_config_path);
71 self.forwarder_config.clear_opw_metrics_endpoint();
72 }
73}
74
75#[async_trait]
76impl ForwarderBuilder for DatadogConfiguration {
77 fn input_payload_type(&self) -> PayloadType {
78 PayloadType::Http
79 }
80
81 async fn build(&self, context: ComponentContext) -> Result<Box<dyn Forwarder + Send>, GenericError> {
82 let metrics_builder = MetricsBuilder::from_component_context(&context);
83 let telemetry = ComponentTelemetry::from_builder(&metrics_builder);
84 let forwarder = TransactionForwarder::from_config(
85 context,
86 self.forwarder_config.clone(),
87 self.configuration.clone(),
88 get_dd_endpoint_name,
89 telemetry.clone(),
90 metrics_builder,
91 )?;
92
93 Ok(Box::new(Datadog { forwarder }))
94 }
95}
96
97impl MemoryBounds for DatadogConfiguration {
98 fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
99 builder
100 .minimum()
101 .with_single_value::<Datadog>("component struct")
102 .with_array::<Transaction<FrozenChunkedBytesBuffer>>("requests channel", 8);
103
104 builder
105 .firm()
106 .with_expr(UsageExpr::sum(
113 "in-flight requests",
114 UsageExpr::config(
115 "forwarder_retry_queue_payloads_max_size",
116 self.forwarder_config.retry().queue_max_size_bytes() as usize,
117 ),
118 UsageExpr::product(
119 "high priority queue",
120 UsageExpr::config(
121 "forwarder_high_prio_buffer_size",
122 self.forwarder_config.endpoint_buffer_size(),
123 ),
124 UsageExpr::constant("maximum compressed payload size", DEFAULT_INTAKE_COMPRESSED_SIZE_LIMIT),
127 ),
128 ));
129 }
130}
131
132pub struct Datadog {
133 forwarder: TransactionForwarder<FrozenChunkedBytesBuffer>,
134}
135
136#[async_trait]
137impl Forwarder for Datadog {
138 async fn run(mut self: Box<Self>, mut context: ForwarderContext) -> Result<(), GenericError> {
139 let Self { forwarder } = *self;
140
141 let mut health = context.take_health_handle();
142
143 let mut validation = forwarder.api_key_validator().spawn();
144
145 let forwarder = forwarder.spawn().await;
147
148 debug!("Datadog forwarder started.");
149
150 loop {
151 select! {
152 _ = health.live() => continue,
153 readiness = validation.wait_for_change() => match readiness {
154 ValidationReadiness::Ready => health.mark_ready(),
155 ValidationReadiness::NotReady => health.mark_not_ready(),
156 },
157 maybe_payload = context.payloads().next() => match maybe_payload {
158 Some(payload) => if let Some(http_payload) = payload.try_into_http_payload() {
159 let (payload_meta, request) = http_payload.into_parts();
160 let transaction_meta = Metadata::from_event_and_data_point_count(
161 payload_meta.event_count(),
162 payload_meta.data_point_count(),
163 );
164 let transaction = Transaction::from_original(transaction_meta, request);
165
166 forwarder.send_transaction(transaction).await?;
167 }
168 None => break,
169 },
170 }
171 }
172
173 validation.abort();
175 forwarder.shutdown().await;
176
177 debug!("Datadog forwarder stopped.");
178
179 Ok(())
180 }
181}
182
183fn get_dd_endpoint_name(uri: &Uri) -> Option<MetaString> {
184 match uri.path() {
185 "/api/v2/logs" => Some(MetaString::from_static("logs_v2")),
186 "/api/v1/series" => Some(MetaString::from_static("series_v1")),
187 "/api/v2/series" => Some(MetaString::from_static("series_v2")),
188 "/api/beta/sketches" => Some(MetaString::from_static("sketches_v2")),
189 "/api/v1/check_run" => Some(MetaString::from_static("check_run_v1")),
190 "/api/v1/events_batch" => Some(MetaString::from_static("events_batch_v1")),
191 "/api/v0.2/traces" => Some(MetaString::from_static("traces_v0.2")),
192 _ => None,
193 }
194}
195
196#[cfg(test)]
197mod tests {
198 use saluki_config::ConfigurationLoader;
199 use serde_json::json;
200
201 use super::*;
202
203 #[tokio::test]
204 async fn endpoint_override_refreshes_from_mrf_api_key() {
205 let (generic_config, sender) = ConfigurationLoader::for_tests(
206 Some(json!({
207 "api_key": "primary-api-key",
208 "multi_region_failover": {
209 "api_key": "mrf-api-key"
210 }
211 })),
212 None,
213 true,
214 )
215 .await;
216 let sender = sender.expect("dynamic sender should exist");
217 sender
218 .send(saluki_config::dynamic::ConfigUpdate::Snapshot(json!({})))
219 .await
220 .expect("initial dynamic snapshot should be sent");
221 generic_config.ready().await;
222
223 let config = DatadogConfiguration::from_configuration(&generic_config)
224 .expect("DatadogConfiguration should parse")
225 .with_endpoint_override_and_api_key_refresh_config_path(
226 "http://mrf.example.test".to_string(),
227 "mrf-api-key".to_string(),
228 "multi_region_failover.api_key",
229 );
230
231 let mut endpoints = config
232 .forwarder_config
233 .build_routable_endpoints(config.configuration.clone())
234 .expect("endpoint should resolve");
235
236 assert_eq!(endpoints.len(), 1);
237 let (_, mut endpoint) = endpoints.pop().unwrap().into_parts();
238 assert_eq!(endpoint.cached_api_key(), "mrf-api-key");
239 assert!(endpoint.has_configuration());
240 assert_eq!(endpoint.api_key(), "mrf-api-key");
241
242 sender
243 .send(saluki_config::dynamic::ConfigUpdate::Partial {
244 key: "api_key".to_string(),
245 value: json!("rotated-primary-api-key"),
246 })
247 .await
248 .expect("primary API key update should be sent");
249 sender
250 .send(saluki_config::dynamic::ConfigUpdate::Partial {
251 key: "multi_region_failover.api_key".to_string(),
252 value: json!("rotated-mrf-api-key"),
253 })
254 .await
255 .expect("MRF API key update should be sent");
256
257 let deadline = std::time::Instant::now() + std::time::Duration::from_secs(2);
258 loop {
259 if endpoint.api_key() == "rotated-mrf-api-key" {
260 break;
261 }
262 assert!(
263 std::time::Instant::now() < deadline,
264 "timed out waiting for endpoint override to refresh from MRF API key"
265 );
266 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
267 }
268 }
269}