saluki_components/forwarders/datadog/
mod.rs1use async_trait::async_trait;
2use http::Uri;
3use memory_accounting::{MemoryBounds, MemoryBoundsBuilder, UsageExpr};
4use saluki_common::buf::FrozenChunkedBytesBuffer;
5use saluki_config::GenericConfiguration;
6use saluki_core::{
7 components::{forwarders::*, ComponentContext},
8 data_model::payload::PayloadType,
9 observability::ComponentMetricsExt as _,
10};
11use saluki_error::GenericError;
12use saluki_metrics::MetricsBuilder;
13use serde::Deserialize;
14use stringtheory::MetaString;
15use tokio::select;
16use tracing::debug;
17
18use crate::common::datadog::{
19 config::ForwarderConfiguration,
20 io::TransactionForwarder,
21 telemetry::ComponentTelemetry,
22 transaction::{Metadata, Transaction},
23 DEFAULT_INTAKE_COMPRESSED_SIZE_LIMIT,
24};
25
26#[derive(Deserialize)]
32pub struct DatadogConfiguration {
33 #[serde(flatten)]
37 forwarder_config: ForwarderConfiguration,
38
39 #[serde(skip)]
40 configuration: Option<GenericConfiguration>,
41}
42
43impl DatadogConfiguration {
44 pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
46 let mut forwarder_config: DatadogConfiguration = config.as_typed()?;
47 forwarder_config.configuration = Some(config.clone());
48 Ok(forwarder_config)
49 }
50
51 pub fn with_endpoint_override(mut self, dd_url: String, api_key: String) -> Self {
63 let endpoint = self.forwarder_config.endpoint_mut();
67 endpoint.clear_additional_endpoints();
68 endpoint.set_dd_url(dd_url);
69 endpoint.set_api_key(api_key);
70
71 self
72 }
73}
74
75#[async_trait]
76impl ForwarderBuilder for DatadogConfiguration {
77 fn input_payload_type(&self) -> PayloadType {
78 PayloadType::Http
79 }
80
81 async fn build(&self, context: ComponentContext) -> Result<Box<dyn Forwarder + Send>, GenericError> {
82 let metrics_builder = MetricsBuilder::from_component_context(&context);
83 let telemetry = ComponentTelemetry::from_builder(&metrics_builder);
84 let forwarder = TransactionForwarder::from_config(
85 context,
86 self.forwarder_config.clone(),
87 self.configuration.clone(),
88 get_dd_endpoint_name,
89 telemetry.clone(),
90 metrics_builder,
91 )?;
92
93 Ok(Box::new(Datadog { forwarder }))
94 }
95}
96
97impl MemoryBounds for DatadogConfiguration {
98 fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
99 builder
100 .minimum()
101 .with_single_value::<Datadog>("component struct")
102 .with_array::<Transaction<FrozenChunkedBytesBuffer>>("requests channel", 8);
103
104 builder
105 .firm()
106 .with_expr(UsageExpr::sum(
113 "in-flight requests",
114 UsageExpr::config(
115 "forwarder_retry_queue_payloads_max_size",
116 self.forwarder_config.retry().queue_max_size_bytes() as usize,
117 ),
118 UsageExpr::product(
119 "high priority queue",
120 UsageExpr::config(
121 "forwarder_high_prio_buffer_size",
122 self.forwarder_config.endpoint_buffer_size(),
123 ),
124 UsageExpr::constant("maximum compressed payload size", DEFAULT_INTAKE_COMPRESSED_SIZE_LIMIT),
127 ),
128 ));
129 }
130}
131
132pub struct Datadog {
133 forwarder: TransactionForwarder<FrozenChunkedBytesBuffer>,
134}
135
136#[async_trait]
137impl Forwarder for Datadog {
138 async fn run(mut self: Box<Self>, mut context: ForwarderContext) -> Result<(), GenericError> {
139 let Self { forwarder } = *self;
140
141 let mut health = context.take_health_handle();
142
143 let forwarder = forwarder.spawn().await;
145
146 health.mark_ready();
147 debug!("Datadog forwarder started.");
148
149 loop {
150 select! {
151 _ = health.live() => continue,
152 maybe_payload = context.payloads().next() => match maybe_payload {
153 Some(payload) => if let Some(http_payload) = payload.try_into_http_payload() {
154 let (payload_meta, request) = http_payload.into_parts();
155 let transaction_meta = Metadata::from_event_count(payload_meta.event_count());
156 let transaction = Transaction::from_original(transaction_meta, request);
157
158 forwarder.send_transaction(transaction).await?;
159 },
160 None => break,
161 },
162 }
163 }
164
165 forwarder.shutdown().await;
167
168 debug!("Datadog forwarder stopped.");
169
170 Ok(())
171 }
172}
173
174fn get_dd_endpoint_name(uri: &Uri) -> Option<MetaString> {
175 match uri.path() {
176 "/api/v2/logs" => Some(MetaString::from_static("logs_v2")),
177 "/api/v2/series" => Some(MetaString::from_static("series_v2")),
178 "/api/beta/sketches" => Some(MetaString::from_static("sketches_v2")),
179 "/api/v1/check_run" => Some(MetaString::from_static("check_run_v1")),
180 "/api/v1/events_batch" => Some(MetaString::from_static("events_batch_v1")),
181 _ => None,
182 }
183}