saluki_components/forwarders/datadog/
mod.rs1use async_trait::async_trait;
2use http::Uri;
3use memory_accounting::{MemoryBounds, MemoryBoundsBuilder, UsageExpr};
4use saluki_common::buf::FrozenChunkedBytesBuffer;
5use saluki_config::GenericConfiguration;
6use saluki_core::{
7 components::{forwarders::*, ComponentContext},
8 data_model::payload::PayloadType,
9 observability::ComponentMetricsExt as _,
10};
11use saluki_error::GenericError;
12use saluki_metrics::MetricsBuilder;
13use stringtheory::MetaString;
14use tokio::select;
15use tracing::debug;
16
17use crate::common::datadog::{
18 config::ForwarderConfiguration,
19 io::TransactionForwarder,
20 telemetry::ComponentTelemetry,
21 transaction::{Metadata, Transaction},
22 DEFAULT_INTAKE_COMPRESSED_SIZE_LIMIT,
23};
24
25pub struct DatadogConfiguration {
31 forwarder_config: ForwarderConfiguration,
35
36 configuration: Option<GenericConfiguration>,
37}
38
39impl DatadogConfiguration {
40 pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
42 let forwarder_config = ForwarderConfiguration::from_configuration(config)?;
43 Ok(Self {
44 forwarder_config,
45 configuration: Some(config.clone()),
46 })
47 }
48
49 pub fn with_endpoint_override(mut self, dd_url: String, api_key: String) -> Self {
61 let endpoint = self.forwarder_config.endpoint_mut();
65 endpoint.clear_additional_endpoints();
66 endpoint.set_dd_url(dd_url);
67 endpoint.set_api_key(api_key);
68
69 self
70 }
71}
72
73#[async_trait]
74impl ForwarderBuilder for DatadogConfiguration {
75 fn input_payload_type(&self) -> PayloadType {
76 PayloadType::Http
77 }
78
79 async fn build(&self, context: ComponentContext) -> Result<Box<dyn Forwarder + Send>, GenericError> {
80 let metrics_builder = MetricsBuilder::from_component_context(&context);
81 let telemetry = ComponentTelemetry::from_builder(&metrics_builder);
82 let forwarder = TransactionForwarder::from_config(
83 context,
84 self.forwarder_config.clone(),
85 self.configuration.clone(),
86 get_dd_endpoint_name,
87 telemetry.clone(),
88 metrics_builder,
89 )?;
90
91 Ok(Box::new(Datadog { forwarder }))
92 }
93}
94
95impl MemoryBounds for DatadogConfiguration {
96 fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
97 builder
98 .minimum()
99 .with_single_value::<Datadog>("component struct")
100 .with_array::<Transaction<FrozenChunkedBytesBuffer>>("requests channel", 8);
101
102 builder
103 .firm()
104 .with_expr(UsageExpr::sum(
111 "in-flight requests",
112 UsageExpr::config(
113 "forwarder_retry_queue_payloads_max_size",
114 self.forwarder_config.retry().queue_max_size_bytes() as usize,
115 ),
116 UsageExpr::product(
117 "high priority queue",
118 UsageExpr::config(
119 "forwarder_high_prio_buffer_size",
120 self.forwarder_config.endpoint_buffer_size(),
121 ),
122 UsageExpr::constant("maximum compressed payload size", DEFAULT_INTAKE_COMPRESSED_SIZE_LIMIT),
125 ),
126 ));
127 }
128}
129
130pub struct Datadog {
131 forwarder: TransactionForwarder<FrozenChunkedBytesBuffer>,
132}
133
134#[async_trait]
135impl Forwarder for Datadog {
136 async fn run(mut self: Box<Self>, mut context: ForwarderContext) -> Result<(), GenericError> {
137 let Self { forwarder } = *self;
138
139 let mut health = context.take_health_handle();
140
141 let forwarder = forwarder.spawn().await;
143
144 health.mark_ready();
145 debug!("Datadog forwarder started.");
146
147 loop {
148 select! {
149 _ = health.live() => continue,
150 maybe_payload = context.payloads().next() => match maybe_payload {
151 Some(payload) => if let Some(http_payload) = payload.try_into_http_payload() {
152 let (payload_meta, request) = http_payload.into_parts();
153 let transaction_meta = Metadata::from_event_count(payload_meta.event_count());
154 let transaction = Transaction::from_original(transaction_meta, request);
155
156 forwarder.send_transaction(transaction).await?;
157 },
158 None => break,
159 },
160 }
161 }
162
163 forwarder.shutdown().await;
165
166 debug!("Datadog forwarder stopped.");
167
168 Ok(())
169 }
170}
171
172fn get_dd_endpoint_name(uri: &Uri) -> Option<MetaString> {
173 match uri.path() {
174 "/api/v2/logs" => Some(MetaString::from_static("logs_v2")),
175 "/api/v2/series" => Some(MetaString::from_static("series_v2")),
176 "/api/beta/sketches" => Some(MetaString::from_static("sketches_v2")),
177 "/api/v1/check_run" => Some(MetaString::from_static("check_run_v1")),
178 "/api/v1/events_batch" => Some(MetaString::from_static("events_batch_v1")),
179 _ => None,
180 }
181}