saluki_components/forwarders/datadog/
mod.rs1use async_trait::async_trait;
2use http::Uri;
3use resource_accounting::{MemoryBounds, MemoryBoundsBuilder, UsageExpr};
4use saluki_common::buf::FrozenChunkedBytesBuffer;
5use saluki_config::GenericConfiguration;
6use saluki_core::{
7 components::{forwarders::*, ComponentContext},
8 data_model::payload::PayloadType,
9 observability::ComponentMetricsExt as _,
10};
11use saluki_error::GenericError;
12use saluki_metrics::MetricsBuilder;
13use stringtheory::MetaString;
14use tokio::select;
15use tracing::debug;
16
17use crate::common::datadog::{
18 config::ForwarderConfiguration,
19 io::TransactionForwarder,
20 telemetry::ComponentTelemetry,
21 transaction::{Metadata, Transaction},
22 DEFAULT_INTAKE_COMPRESSED_SIZE_LIMIT,
23};
24
25pub struct DatadogConfiguration {
31 forwarder_config: ForwarderConfiguration,
35
36 configuration: Option<GenericConfiguration>,
37}
38
39impl DatadogConfiguration {
40 pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
42 let forwarder_config = ForwarderConfiguration::from_configuration(config)?;
43 Ok(Self {
44 forwarder_config,
45 configuration: Some(config.clone()),
46 })
47 }
48
49 pub fn with_endpoint_override(mut self, dd_url: String, api_key: String) -> Self {
61 let endpoint = self.forwarder_config.endpoint_mut();
65 endpoint.clear_additional_endpoints();
66 endpoint.set_dd_url(dd_url);
67 endpoint.set_api_key(api_key);
68 self.forwarder_config.clear_opw_metrics_endpoint();
69
70 self
71 }
72}
73
74#[async_trait]
75impl ForwarderBuilder for DatadogConfiguration {
76 fn input_payload_type(&self) -> PayloadType {
77 PayloadType::Http
78 }
79
80 async fn build(&self, context: ComponentContext) -> Result<Box<dyn Forwarder + Send>, GenericError> {
81 let metrics_builder = MetricsBuilder::from_component_context(&context);
82 let telemetry = ComponentTelemetry::from_builder(&metrics_builder);
83 let forwarder = TransactionForwarder::from_config(
84 context,
85 self.forwarder_config.clone(),
86 self.configuration.clone(),
87 get_dd_endpoint_name,
88 telemetry.clone(),
89 metrics_builder,
90 )?;
91
92 Ok(Box::new(Datadog { forwarder }))
93 }
94}
95
96impl MemoryBounds for DatadogConfiguration {
97 fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
98 builder
99 .minimum()
100 .with_single_value::<Datadog>("component struct")
101 .with_array::<Transaction<FrozenChunkedBytesBuffer>>("requests channel", 8);
102
103 builder
104 .firm()
105 .with_expr(UsageExpr::sum(
112 "in-flight requests",
113 UsageExpr::config(
114 "forwarder_retry_queue_payloads_max_size",
115 self.forwarder_config.retry().queue_max_size_bytes() as usize,
116 ),
117 UsageExpr::product(
118 "high priority queue",
119 UsageExpr::config(
120 "forwarder_high_prio_buffer_size",
121 self.forwarder_config.endpoint_buffer_size(),
122 ),
123 UsageExpr::constant("maximum compressed payload size", DEFAULT_INTAKE_COMPRESSED_SIZE_LIMIT),
126 ),
127 ));
128 }
129}
130
131pub struct Datadog {
132 forwarder: TransactionForwarder<FrozenChunkedBytesBuffer>,
133}
134
135#[async_trait]
136impl Forwarder for Datadog {
137 async fn run(mut self: Box<Self>, mut context: ForwarderContext) -> Result<(), GenericError> {
138 let Self { forwarder } = *self;
139
140 let mut health = context.take_health_handle();
141
142 let forwarder = forwarder.spawn().await;
144
145 health.mark_ready();
146 debug!("Datadog forwarder started.");
147
148 loop {
149 select! {
150 _ = health.live() => continue,
151 maybe_payload = context.payloads().next() => match maybe_payload {
152 Some(payload) => if let Some(http_payload) = payload.try_into_http_payload() {
153 let (payload_meta, request) = http_payload.into_parts();
154 let transaction_meta = Metadata::from_event_and_data_point_count(
155 payload_meta.event_count(),
156 payload_meta.data_point_count(),
157 );
158 let transaction = Transaction::from_original(transaction_meta, request);
159
160 forwarder.send_transaction(transaction).await?;
161 }
162 None => break,
163 },
164 }
165 }
166
167 forwarder.shutdown().await;
169
170 debug!("Datadog forwarder stopped.");
171
172 Ok(())
173 }
174}
175
176fn get_dd_endpoint_name(uri: &Uri) -> Option<MetaString> {
177 match uri.path() {
178 "/api/v2/logs" => Some(MetaString::from_static("logs_v2")),
179 "/api/v1/series" => Some(MetaString::from_static("series_v1")),
180 "/api/v2/series" => Some(MetaString::from_static("series_v2")),
181 "/api/beta/sketches" => Some(MetaString::from_static("sketches_v2")),
182 "/api/v1/check_run" => Some(MetaString::from_static("check_run_v1")),
183 "/api/v1/events_batch" => Some(MetaString::from_static("events_batch_v1")),
184 "/api/v0.2/traces" => Some(MetaString::from_static("traces_v0.2")),
185 _ => None,
186 }
187}