Skip to main content

saluki_components/encoders/datadog/events/
mod.rs

1use async_trait::async_trait;
2use datadog_protos::events as proto;
3use facet::Facet;
4use http::{uri::PathAndQuery, HeaderValue, Method, Uri};
5use protobuf::{rt::WireType, CodedOutputStream};
6use resource_accounting::{MemoryBounds, MemoryBoundsBuilder};
7use saluki_common::iter::ReusableDeduplicator;
8use saluki_config::GenericConfiguration;
9use saluki_context::tags::Tag;
10use saluki_core::{
11    components::{encoders::*, ComponentContext},
12    data_model::{
13        event::{eventd::EventD, Event, EventType},
14        payload::{HttpPayload, Payload, PayloadMetadata, PayloadType},
15    },
16    observability::ComponentMetricsExt as _,
17    topology::PayloadsDispatcher,
18};
19use saluki_error::{ErrorContext as _, GenericError};
20use saluki_io::compression::CompressionScheme;
21use saluki_metrics::MetricsBuilder;
22use serde::Deserialize;
23use tracing::{debug, error, warn};
24
25use crate::common::datadog::{
26    clamp_payload_limits,
27    io::RB_BUFFER_CHUNK_SIZE,
28    request_builder::{EndpointEncoder, RequestBuilder},
29    telemetry::ComponentTelemetry,
30    DEFAULT_INTAKE_COMPRESSED_SIZE_LIMIT, DEFAULT_INTAKE_UNCOMPRESSED_SIZE_LIMIT,
31    DEFAULT_SERIALIZER_COMPRESSED_SIZE_LIMIT, DEFAULT_SERIALIZER_UNCOMPRESSED_SIZE_LIMIT,
32};
33
34const DEFAULT_SERIALIZER_COMPRESSOR_KIND: &str = "zstd";
35const MAX_EVENTS_PER_PAYLOAD: usize = 100;
36const EVENTS_FIELD_NUMBER: u32 = 1;
37
38static CONTENT_TYPE_PROTOBUF: HeaderValue = HeaderValue::from_static("application/x-protobuf");
39
40fn default_serializer_compressor_kind() -> String {
41    DEFAULT_SERIALIZER_COMPRESSOR_KIND.to_owned()
42}
43
44const fn default_zstd_compressor_level() -> i32 {
45    3
46}
47
48const fn default_max_payload_size() -> usize {
49    DEFAULT_SERIALIZER_COMPRESSED_SIZE_LIMIT
50}
51
52const fn default_max_uncompressed_payload_size() -> usize {
53    DEFAULT_SERIALIZER_UNCOMPRESSED_SIZE_LIMIT
54}
55
56const fn default_log_payloads() -> bool {
57    false
58}
59
60/// Datadog Events incremental encoder.
61///
62/// Generates Datadog Events payloads for the Datadog platform.
63#[derive(Deserialize, Facet)]
64#[cfg_attr(test, derive(Debug, PartialEq, serde::Serialize))]
65pub struct DatadogEventsConfiguration {
66    /// Maximum compressed size, in bytes, of an events payload.
67    ///
68    /// This uses the same generic event payload setting as the Datadog Agent. ADP sends events to
69    /// `/api/v1/events_batch`, so the effective value is clamped to that endpoint's global intake limit of 3,200,000
70    /// bytes. If set to `0`, every non-empty compressed payload exceeds the limit and is dropped during flush.
71    ///
72    /// Defaults to 2,621,440 bytes.
73    #[serde(rename = "serializer_max_payload_size", default = "default_max_payload_size")]
74    max_payload_size: usize,
75
76    /// Maximum uncompressed size, in bytes, of an events payload.
77    ///
78    /// This uses the same generic event payload setting as the Datadog Agent. ADP sends events to
79    /// `/api/v1/events_batch`, so the effective value is clamped to that endpoint's global intake limit of 62,914,560
80    /// bytes. Values smaller than the minimum endpoint framing size prevent the request builder from starting.
81    ///
82    /// Defaults to 4,194,304 bytes.
83    #[serde(
84        rename = "serializer_max_uncompressed_payload_size",
85        default = "default_max_uncompressed_payload_size"
86    )]
87    max_uncompressed_payload_size: usize,
88
89    /// Compression kind to use for the request payloads.
90    ///
91    /// Defaults to `zstd`.
92    #[serde(
93        rename = "serializer_compressor_kind",
94        default = "default_serializer_compressor_kind"
95    )]
96    compressor_kind: String,
97
98    /// Compressor level to use when the compressor kind is `zstd`.
99    ///
100    /// Defaults to 3.
101    #[serde(
102        rename = "serializer_zstd_compressor_level",
103        default = "default_zstd_compressor_level"
104    )]
105    zstd_compressor_level: i32,
106
107    /// Whether to log event payload contents before encoding.
108    ///
109    /// This logs decoded event objects, not the encoded HTTP body.
110    ///
111    /// Defaults to `false`.
112    #[serde(default = "default_log_payloads")]
113    log_payloads: bool,
114}
115
116impl DatadogEventsConfiguration {
117    /// Creates a new `DatadogEventsConfiguration` from the given configuration.
118    pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
119        Ok(config.as_typed()?)
120    }
121}
122
123#[async_trait]
124impl IncrementalEncoderBuilder for DatadogEventsConfiguration {
125    type Output = DatadogEvents;
126
127    fn input_event_type(&self) -> EventType {
128        EventType::EventD
129    }
130
131    fn output_payload_type(&self) -> PayloadType {
132        PayloadType::Http
133    }
134
135    async fn build(&self, context: ComponentContext) -> Result<Self::Output, GenericError> {
136        let metrics_builder = MetricsBuilder::from_component_context(&context);
137        let telemetry = ComponentTelemetry::from_builder(&metrics_builder);
138        let compression_scheme = CompressionScheme::new(&self.compressor_kind, self.zstd_compressor_level);
139
140        // Create our request builder.
141        let mut request_builder =
142            RequestBuilder::new(EventsEndpointEncoder::new(), compression_scheme, RB_BUFFER_CHUNK_SIZE).await?;
143        let (uncompressed_limit, compressed_limit) = clamp_payload_limits(
144            self.max_uncompressed_payload_size,
145            self.max_payload_size,
146            DEFAULT_INTAKE_UNCOMPRESSED_SIZE_LIMIT,
147            DEFAULT_INTAKE_COMPRESSED_SIZE_LIMIT,
148        );
149        request_builder.with_len_limits(uncompressed_limit, compressed_limit)?;
150        request_builder.with_max_inputs_per_payload(MAX_EVENTS_PER_PAYLOAD);
151
152        Ok(DatadogEvents {
153            request_builder,
154            telemetry,
155            log_payloads: self.log_payloads,
156        })
157    }
158}
159
160impl MemoryBounds for DatadogEventsConfiguration {
161    fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
162        // TODO: How do we properly represent the requests we can generate that may be sitting around in-flight?
163        //
164        // Theoretically, we'll end up being limited by the size of the downstream forwarder's interconnect, and however
165        // many payloads it will buffer internally... so realistically the firm limit boils down to the forwarder itself
166        // but we'll have a hard time in the forwarder knowing the maximum size of any given payload being sent in, which
167        // then makes it hard to calculate a proper firm bound even though we know the rest of the values required to
168        // calculate the firm bound.
169        builder.minimum().with_single_value::<DatadogEvents>("component struct");
170
171        builder
172            .firm()
173            // Capture the size of the "split re-encode" buffer in the request builder, which is where we keep owned
174            // versions of events that we encode in case we need to actually re-encode them during a split operation.
175            .with_array::<EventD>("events split re-encode buffer", MAX_EVENTS_PER_PAYLOAD);
176    }
177}
178
179pub struct DatadogEvents {
180    request_builder: RequestBuilder<EventsEndpointEncoder>,
181    telemetry: ComponentTelemetry,
182    log_payloads: bool,
183}
184
185#[async_trait]
186impl IncrementalEncoder for DatadogEvents {
187    async fn process_event(&mut self, event: Event) -> Result<ProcessResult, GenericError> {
188        let eventd = match event.try_into_eventd() {
189            Some(eventd) => eventd,
190            None => return Ok(ProcessResult::Continue),
191        };
192
193        if self.log_payloads {
194            debug!(event = ?eventd, "Flushing event.");
195        }
196
197        match self.request_builder.encode(eventd).await {
198            Ok(None) => Ok(ProcessResult::Continue),
199            Ok(Some(eventd)) => Ok(ProcessResult::FlushRequired(Event::EventD(eventd))),
200            Err(e) => {
201                if e.is_recoverable() {
202                    warn!(error = %e, "Failed to encode Datadog event due to recoverable error. Continuing...");
203
204                    // TODO: Get the actual number of events dropped from the error itself.
205                    self.telemetry.events_dropped_encoder().increment(1);
206
207                    Ok(ProcessResult::Continue)
208                } else {
209                    Err(e).error_context("Failed to encode Datadog event due to unrecoverable error.")
210                }
211            }
212        }
213    }
214
215    async fn flush(&mut self, dispatcher: &PayloadsDispatcher) -> Result<(), GenericError> {
216        let maybe_requests = self.request_builder.flush().await;
217        for maybe_request in maybe_requests {
218            match maybe_request {
219                Ok((events, _data_points, request)) => {
220                    let payload_meta = PayloadMetadata::from_event_count(events);
221                    let http_payload = HttpPayload::new(payload_meta, request);
222                    let payload = Payload::Http(http_payload);
223
224                    dispatcher.dispatch(payload).await?;
225                }
226                Err(e) => error!(error = %e, "Failed to build Datadog events payload. Continuing..."),
227            }
228        }
229
230        Ok(())
231    }
232}
233
234#[derive(Debug)]
235struct EventsEndpointEncoder {
236    tags_deduplicator: ReusableDeduplicator<Tag>,
237}
238
239impl EventsEndpointEncoder {
240    fn new() -> Self {
241        Self {
242            tags_deduplicator: ReusableDeduplicator::new(),
243        }
244    }
245}
246
247impl EndpointEncoder for EventsEndpointEncoder {
248    type Input = EventD;
249    type EncodeError = protobuf::Error;
250
251    fn encoder_name() -> &'static str {
252        "events"
253    }
254
255    fn compressed_size_limit(&self) -> usize {
256        DEFAULT_INTAKE_COMPRESSED_SIZE_LIMIT
257    }
258
259    fn uncompressed_size_limit(&self) -> usize {
260        DEFAULT_INTAKE_UNCOMPRESSED_SIZE_LIMIT
261    }
262
263    fn encode(&mut self, input: &Self::Input, buffer: &mut Vec<u8>) -> Result<(), Self::EncodeError> {
264        encode_and_write_eventd(input, buffer, &mut self.tags_deduplicator)
265    }
266
267    fn endpoint_uri(&self) -> Uri {
268        PathAndQuery::from_static("/api/v1/events_batch").into()
269    }
270
271    fn endpoint_method(&self) -> Method {
272        Method::POST
273    }
274
275    fn content_type(&self) -> HeaderValue {
276        CONTENT_TYPE_PROTOBUF.clone()
277    }
278}
279
280fn encode_and_write_eventd(
281    eventd: &EventD, buf: &mut Vec<u8>, tags_deduplicator: &mut ReusableDeduplicator<Tag>,
282) -> Result<(), protobuf::Error> {
283    let mut output_stream = CodedOutputStream::vec(buf);
284
285    // Write the field tag.
286    output_stream.write_tag(EVENTS_FIELD_NUMBER, WireType::LengthDelimited)?;
287
288    // Write the message.
289    let encoded_eventd = encode_eventd(eventd, tags_deduplicator);
290    output_stream.write_message_no_tag(&encoded_eventd)
291}
292
293fn encode_eventd(eventd: &EventD, tags_deduplicator: &mut ReusableDeduplicator<Tag>) -> proto::Event {
294    let mut event = proto::Event::new();
295    event.set_title(eventd.title().into());
296    event.set_text(eventd.text().into());
297
298    if let Some(timestamp) = eventd.timestamp() {
299        event.set_ts(timestamp as i64);
300    }
301
302    if let Some(priority) = eventd.priority() {
303        event.set_priority(priority.as_str().into());
304    }
305
306    if let Some(alert_type) = eventd.alert_type() {
307        event.set_alert_type(alert_type.as_str().into());
308    }
309
310    if let Some(hostname) = eventd.hostname() {
311        event.set_host(hostname.into());
312    }
313
314    if let Some(aggregation_key) = eventd.aggregation_key() {
315        event.set_aggregation_key(aggregation_key.into());
316    }
317
318    if let Some(source_type_name) = eventd.source_type_name() {
319        event.set_source_type_name(source_type_name.into());
320    }
321
322    let chained_tags = eventd.tags().into_iter().chain(eventd.origin_tags());
323    let deduplicated_tags = tags_deduplicator.deduplicated(chained_tags);
324
325    event.set_tags(deduplicated_tags.map(|tag| tag.as_str().into()).collect());
326
327    event
328}
329
330#[cfg(test)]
331mod config_smoke {
332    use serde_json::json;
333
334    use super::DatadogEventsConfiguration;
335    use crate::config_registry::structs;
336    use crate::config_registry::test_support::run_config_smoke_tests;
337
338    #[tokio::test]
339    async fn smoke_test() {
340        run_config_smoke_tests(structs::DATADOG_EVENTS_CONFIGURATION, &[], json!({}), |cfg| {
341            cfg.as_typed::<DatadogEventsConfiguration>()
342                .expect("DatadogEventsConfiguration should deserialize")
343        })
344        .await
345    }
346}