Skip to main content

saluki_components/forwarders/datadog/
mod.rs

1use async_trait::async_trait;
2use http::Uri;
3use resource_accounting::{MemoryBounds, MemoryBoundsBuilder, UsageExpr};
4use saluki_common::buf::FrozenChunkedBytesBuffer;
5use saluki_config::GenericConfiguration;
6use saluki_core::{
7    components::{forwarders::*, ComponentContext},
8    data_model::payload::PayloadType,
9    observability::ComponentMetricsExt as _,
10};
11use saluki_error::GenericError;
12use saluki_metrics::MetricsBuilder;
13use stringtheory::MetaString;
14use tokio::select;
15use tracing::debug;
16
17use crate::common::datadog::{
18    config::ForwarderConfiguration,
19    io::TransactionForwarder,
20    telemetry::ComponentTelemetry,
21    transaction::{Metadata, Transaction},
22    DEFAULT_INTAKE_COMPRESSED_SIZE_LIMIT,
23};
24
25/// Datadog forwarder.
26///
27/// Forwards Datadog-specific payloads to the Datadog platform. Handles the standard Datadog Agent configuration,
28/// in terms of specifying additional endpoints, adding the necessary HTTP request headers for authentication,
29/// identification, and more.
30pub struct DatadogConfiguration {
31    /// Forwarder configuration settings.
32    ///
33    /// See [`ForwarderConfiguration`] for more information about the available settings.
34    forwarder_config: ForwarderConfiguration,
35
36    configuration: Option<GenericConfiguration>,
37}
38
39impl DatadogConfiguration {
40    /// Creates a new `DatadogConfiguration` from the given configuration.
41    pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
42        let forwarder_config = ForwarderConfiguration::from_configuration(config)?;
43        Ok(Self {
44            forwarder_config,
45            configuration: Some(config.clone()),
46        })
47    }
48
49    /// Overrides the default endpoint that payloads are sent to.
50    ///
51    /// This overrides any existing endpoint configuration, and manually sets the base endpoint (for example,
52    /// `https://api.datad0g.com`) to be used for all payloads.
53    ///
54    /// This can be used to preserve other configuration settings (forwarder settings, retry, etc) while still allowing
55    /// for overriding _where_ payloads are sent to.
56    ///
57    /// # Errors
58    ///
59    /// If the given request path isn't valid, an error is returned.
60    pub fn with_endpoint_override(mut self, dd_url: String, api_key: String) -> Self {
61        // Clear any existing additional endpoints, and set the new DD URL and API key.
62        //
63        // This ensures that the only endpoint we'll send to is this one.
64        let endpoint = self.forwarder_config.endpoint_mut();
65        endpoint.clear_additional_endpoints();
66        endpoint.set_dd_url(dd_url);
67        endpoint.set_api_key(api_key);
68        self.forwarder_config.clear_opw_metrics_endpoint();
69
70        self
71    }
72}
73
74#[async_trait]
75impl ForwarderBuilder for DatadogConfiguration {
76    fn input_payload_type(&self) -> PayloadType {
77        PayloadType::Http
78    }
79
80    async fn build(&self, context: ComponentContext) -> Result<Box<dyn Forwarder + Send>, GenericError> {
81        let metrics_builder = MetricsBuilder::from_component_context(&context);
82        let telemetry = ComponentTelemetry::from_builder(&metrics_builder);
83        let forwarder = TransactionForwarder::from_config(
84            context,
85            self.forwarder_config.clone(),
86            self.configuration.clone(),
87            get_dd_endpoint_name,
88            telemetry.clone(),
89            metrics_builder,
90        )?;
91
92        Ok(Box::new(Datadog { forwarder }))
93    }
94}
95
96impl MemoryBounds for DatadogConfiguration {
97    fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
98        builder
99            .minimum()
100            .with_single_value::<Datadog>("component struct")
101            .with_array::<Transaction<FrozenChunkedBytesBuffer>>("requests channel", 8);
102
103        builder
104            .firm()
105            // TODO: This is a little wonky because we're accounting for the firm bound portion of connected encoders here, as this
106            // is the only place where we can calculate how many requests we'll hold on to in memory, which is what ultimately influences
107            // the firm usage.
108            //
109            // We're also cheating by knowing what the largest possible payload is that we'll potentially see, based on the limits on the
110            // Datadog encoders. This won't necessarily hold up for future sources/encoders, but is good enough for now.
111            .with_expr(UsageExpr::sum(
112                "in-flight requests",
113                UsageExpr::config(
114                    "forwarder_retry_queue_payloads_max_size",
115                    self.forwarder_config.retry().queue_max_size_bytes() as usize,
116                ),
117                UsageExpr::product(
118                    "high priority queue",
119                    UsageExpr::config(
120                        "forwarder_high_prio_buffer_size",
121                        self.forwarder_config.endpoint_buffer_size(),
122                    ),
123                    // TODO: The default compressed size limit just so happens to be the biggest one we currently default with on our side,
124                    // but it's not clear that this will always be the case.
125                    UsageExpr::constant("maximum compressed payload size", DEFAULT_INTAKE_COMPRESSED_SIZE_LIMIT),
126                ),
127            ));
128    }
129}
130
131pub struct Datadog {
132    forwarder: TransactionForwarder<FrozenChunkedBytesBuffer>,
133}
134
135#[async_trait]
136impl Forwarder for Datadog {
137    async fn run(mut self: Box<Self>, mut context: ForwarderContext) -> Result<(), GenericError> {
138        let Self { forwarder } = *self;
139
140        let mut health = context.take_health_handle();
141
142        // Spawn our forwarder task to handle sending requests.
143        let forwarder = forwarder.spawn().await;
144
145        health.mark_ready();
146        debug!("Datadog forwarder started.");
147
148        loop {
149            select! {
150                _ = health.live() => continue,
151                maybe_payload = context.payloads().next() => match maybe_payload {
152                    Some(payload) => if let Some(http_payload) = payload.try_into_http_payload() {
153                        let (payload_meta, request) = http_payload.into_parts();
154                        let transaction_meta = Metadata::from_event_and_data_point_count(
155                            payload_meta.event_count(),
156                            payload_meta.data_point_count(),
157                        );
158                        let transaction = Transaction::from_original(transaction_meta, request);
159
160                        forwarder.send_transaction(transaction).await?;
161                    }
162                    None => break,
163                },
164            }
165        }
166
167        // Shutdown the forwarder gracefully.
168        forwarder.shutdown().await;
169
170        debug!("Datadog forwarder stopped.");
171
172        Ok(())
173    }
174}
175
176fn get_dd_endpoint_name(uri: &Uri) -> Option<MetaString> {
177    match uri.path() {
178        "/api/v2/logs" => Some(MetaString::from_static("logs_v2")),
179        "/api/v1/series" => Some(MetaString::from_static("series_v1")),
180        "/api/v2/series" => Some(MetaString::from_static("series_v2")),
181        "/api/beta/sketches" => Some(MetaString::from_static("sketches_v2")),
182        "/api/v1/check_run" => Some(MetaString::from_static("check_run_v1")),
183        "/api/v1/events_batch" => Some(MetaString::from_static("events_batch_v1")),
184        "/api/v0.2/traces" => Some(MetaString::from_static("traces_v0.2")),
185        _ => None,
186    }
187}