Skip to main content

saluki_components/encoders/datadog/events/
mod.rs

1use async_trait::async_trait;
2use datadog_protos::events as proto;
3use facet::Facet;
4use http::{uri::PathAndQuery, HeaderValue, Method, Uri};
5use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
6use protobuf::{rt::WireType, CodedOutputStream};
7use saluki_common::iter::ReusableDeduplicator;
8use saluki_config::GenericConfiguration;
9use saluki_context::tags::Tag;
10use saluki_core::{
11    components::{encoders::*, ComponentContext},
12    data_model::{
13        event::{eventd::EventD, Event, EventType},
14        payload::{HttpPayload, Payload, PayloadMetadata, PayloadType},
15    },
16    observability::ComponentMetricsExt as _,
17    topology::PayloadsDispatcher,
18};
19use saluki_error::{ErrorContext as _, GenericError};
20use saluki_io::compression::CompressionScheme;
21use saluki_metrics::MetricsBuilder;
22use serde::Deserialize;
23use tracing::{error, warn};
24
25use crate::common::datadog::{
26    io::RB_BUFFER_CHUNK_SIZE,
27    request_builder::{EndpointEncoder, RequestBuilder},
28    telemetry::ComponentTelemetry,
29    DEFAULT_INTAKE_COMPRESSED_SIZE_LIMIT, DEFAULT_INTAKE_UNCOMPRESSED_SIZE_LIMIT,
30};
31
32const DEFAULT_SERIALIZER_COMPRESSOR_KIND: &str = "zstd";
33const MAX_EVENTS_PER_PAYLOAD: usize = 100;
34const EVENTS_FIELD_NUMBER: u32 = 1;
35
36static CONTENT_TYPE_PROTOBUF: HeaderValue = HeaderValue::from_static("application/x-protobuf");
37
38fn default_serializer_compressor_kind() -> String {
39    DEFAULT_SERIALIZER_COMPRESSOR_KIND.to_owned()
40}
41
42const fn default_zstd_compressor_level() -> i32 {
43    3
44}
45
46/// Datadog Events incremental encoder.
47///
48/// Generates Datadog Events payloads for the Datadog platform.
49#[derive(Deserialize, Facet)]
50#[cfg_attr(test, derive(Debug, PartialEq, serde::Serialize))]
51pub struct DatadogEventsConfiguration {
52    /// Compression kind to use for the request payloads.
53    ///
54    /// Defaults to `zstd`.
55    #[serde(
56        rename = "serializer_compressor_kind",
57        default = "default_serializer_compressor_kind"
58    )]
59    compressor_kind: String,
60
61    /// Compressor level to use when the compressor kind is `zstd`.
62    ///
63    /// Defaults to 3.
64    #[serde(
65        rename = "serializer_zstd_compressor_level",
66        default = "default_zstd_compressor_level"
67    )]
68    zstd_compressor_level: i32,
69}
70
71impl DatadogEventsConfiguration {
72    /// Creates a new `DatadogEventsConfiguration` from the given configuration.
73    pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
74        Ok(config.as_typed()?)
75    }
76}
77
78#[async_trait]
79impl IncrementalEncoderBuilder for DatadogEventsConfiguration {
80    type Output = DatadogEvents;
81
82    fn input_event_type(&self) -> EventType {
83        EventType::EventD
84    }
85
86    fn output_payload_type(&self) -> PayloadType {
87        PayloadType::Http
88    }
89
90    async fn build(&self, context: ComponentContext) -> Result<Self::Output, GenericError> {
91        let metrics_builder = MetricsBuilder::from_component_context(&context);
92        let telemetry = ComponentTelemetry::from_builder(&metrics_builder);
93        let compression_scheme = CompressionScheme::new(&self.compressor_kind, self.zstd_compressor_level);
94
95        // Create our request builder.
96        let mut request_builder =
97            RequestBuilder::new(EventsEndpointEncoder::new(), compression_scheme, RB_BUFFER_CHUNK_SIZE).await?;
98        request_builder.with_max_inputs_per_payload(MAX_EVENTS_PER_PAYLOAD);
99
100        Ok(DatadogEvents {
101            request_builder,
102            telemetry,
103        })
104    }
105}
106
107impl MemoryBounds for DatadogEventsConfiguration {
108    fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
109        // TODO: How do we properly represent the requests we can generate that may be sitting around in-flight?
110        //
111        // Theoretically, we'll end up being limited by the size of the downstream forwarder's interconnect, and however
112        // many payloads it will buffer internally... so realistically the firm limit boils down to the forwarder itself
113        // but we'll have a hard time in the forwarder knowing the maximum size of any given payload being sent in, which
114        // then makes it hard to calculate a proper firm bound even though we know the rest of the values required to
115        // calculate the firm bound.
116        builder.minimum().with_single_value::<DatadogEvents>("component struct");
117
118        builder
119            .firm()
120            // Capture the size of the "split re-encode" buffer in the request builder, which is where we keep owned
121            // versions of events that we encode in case we need to actually re-encode them during a split operation.
122            .with_array::<EventD>("events split re-encode buffer", MAX_EVENTS_PER_PAYLOAD);
123    }
124}
125
126pub struct DatadogEvents {
127    request_builder: RequestBuilder<EventsEndpointEncoder>,
128    telemetry: ComponentTelemetry,
129}
130
131#[async_trait]
132impl IncrementalEncoder for DatadogEvents {
133    async fn process_event(&mut self, event: Event) -> Result<ProcessResult, GenericError> {
134        let eventd = match event.try_into_eventd() {
135            Some(eventd) => eventd,
136            None => return Ok(ProcessResult::Continue),
137        };
138
139        match self.request_builder.encode(eventd).await {
140            Ok(None) => Ok(ProcessResult::Continue),
141            Ok(Some(eventd)) => Ok(ProcessResult::FlushRequired(Event::EventD(eventd))),
142            Err(e) => {
143                if e.is_recoverable() {
144                    warn!(error = %e, "Failed to encode Datadog event due to recoverable error. Continuing...");
145
146                    // TODO: Get the actual number of events dropped from the error itself.
147                    self.telemetry.events_dropped_encoder().increment(1);
148
149                    Ok(ProcessResult::Continue)
150                } else {
151                    Err(e).error_context("Failed to encode Datadog event due to unrecoverable error.")
152                }
153            }
154        }
155    }
156
157    async fn flush(&mut self, dispatcher: &PayloadsDispatcher) -> Result<(), GenericError> {
158        let maybe_requests = self.request_builder.flush().await;
159        for maybe_request in maybe_requests {
160            match maybe_request {
161                Ok((events, request)) => {
162                    let payload_meta = PayloadMetadata::from_event_count(events);
163                    let http_payload = HttpPayload::new(payload_meta, request);
164                    let payload = Payload::Http(http_payload);
165
166                    dispatcher.dispatch(payload).await?;
167                }
168                Err(e) => error!(error = %e, "Failed to build Datadog events payload. Continuing..."),
169            }
170        }
171
172        Ok(())
173    }
174}
175
176#[derive(Debug)]
177struct EventsEndpointEncoder {
178    tags_deduplicator: ReusableDeduplicator<Tag>,
179}
180
181impl EventsEndpointEncoder {
182    fn new() -> Self {
183        Self {
184            tags_deduplicator: ReusableDeduplicator::new(),
185        }
186    }
187}
188
189impl EndpointEncoder for EventsEndpointEncoder {
190    type Input = EventD;
191    type EncodeError = protobuf::Error;
192
193    fn encoder_name() -> &'static str {
194        "events"
195    }
196
197    fn compressed_size_limit(&self) -> usize {
198        DEFAULT_INTAKE_COMPRESSED_SIZE_LIMIT
199    }
200
201    fn uncompressed_size_limit(&self) -> usize {
202        DEFAULT_INTAKE_UNCOMPRESSED_SIZE_LIMIT
203    }
204
205    fn encode(&mut self, input: &Self::Input, buffer: &mut Vec<u8>) -> Result<(), Self::EncodeError> {
206        encode_and_write_eventd(input, buffer, &mut self.tags_deduplicator)
207    }
208
209    fn endpoint_uri(&self) -> Uri {
210        PathAndQuery::from_static("/api/v1/events_batch").into()
211    }
212
213    fn endpoint_method(&self) -> Method {
214        Method::POST
215    }
216
217    fn content_type(&self) -> HeaderValue {
218        CONTENT_TYPE_PROTOBUF.clone()
219    }
220}
221
222fn encode_and_write_eventd(
223    eventd: &EventD, buf: &mut Vec<u8>, tags_deduplicator: &mut ReusableDeduplicator<Tag>,
224) -> Result<(), protobuf::Error> {
225    let mut output_stream = CodedOutputStream::vec(buf);
226
227    // Write the field tag.
228    output_stream.write_tag(EVENTS_FIELD_NUMBER, WireType::LengthDelimited)?;
229
230    // Write the message.
231    let encoded_eventd = encode_eventd(eventd, tags_deduplicator);
232    output_stream.write_message_no_tag(&encoded_eventd)
233}
234
235fn encode_eventd(eventd: &EventD, tags_deduplicator: &mut ReusableDeduplicator<Tag>) -> proto::Event {
236    let mut event = proto::Event::new();
237    event.set_title(eventd.title().into());
238    event.set_text(eventd.text().into());
239
240    if let Some(timestamp) = eventd.timestamp() {
241        event.set_ts(timestamp as i64);
242    }
243
244    if let Some(priority) = eventd.priority() {
245        event.set_priority(priority.as_str().into());
246    }
247
248    if let Some(alert_type) = eventd.alert_type() {
249        event.set_alert_type(alert_type.as_str().into());
250    }
251
252    if let Some(hostname) = eventd.hostname() {
253        event.set_host(hostname.into());
254    }
255
256    if let Some(aggregation_key) = eventd.aggregation_key() {
257        event.set_aggregation_key(aggregation_key.into());
258    }
259
260    if let Some(source_type_name) = eventd.source_type_name() {
261        event.set_source_type_name(source_type_name.into());
262    }
263
264    let chained_tags = eventd.tags().into_iter().chain(eventd.origin_tags());
265    let deduplicated_tags = tags_deduplicator.deduplicated(chained_tags);
266
267    event.set_tags(deduplicated_tags.map(|tag| tag.as_str().into()).collect());
268
269    event
270}
271
272#[cfg(test)]
273mod config_smoke {
274    use serde_json::json;
275
276    use super::DatadogEventsConfiguration;
277    use crate::config_registry::structs;
278    use crate::config_registry::test_support::run_config_smoke_tests;
279
280    #[tokio::test]
281    async fn smoke_test() {
282        run_config_smoke_tests(structs::DATADOG_EVENTS_CONFIGURATION, &[], json!({}), |cfg| {
283            cfg.as_typed::<DatadogEventsConfiguration>()
284                .expect("DatadogEventsConfiguration should deserialize")
285        })
286        .await
287    }
288}