saluki_components/encoders/datadog/events/
mod.rs

1use async_trait::async_trait;
2use datadog_protos::events as proto;
3use http::{uri::PathAndQuery, HeaderValue, Method, Uri};
4use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
5use protobuf::{rt::WireType, CodedOutputStream};
6use saluki_common::iter::ReusableDeduplicator;
7use saluki_config::GenericConfiguration;
8use saluki_context::tags::Tag;
9use saluki_core::{
10    components::{encoders::*, ComponentContext},
11    data_model::{
12        event::{eventd::EventD, Event, EventType},
13        payload::{HttpPayload, Payload, PayloadMetadata, PayloadType},
14    },
15    observability::ComponentMetricsExt as _,
16    topology::PayloadsDispatcher,
17};
18use saluki_error::{ErrorContext as _, GenericError};
19use saluki_io::compression::CompressionScheme;
20use saluki_metrics::MetricsBuilder;
21use serde::Deserialize;
22use tracing::{error, warn};
23
24use crate::common::datadog::{
25    io::RB_BUFFER_CHUNK_SIZE,
26    request_builder::{EndpointEncoder, RequestBuilder},
27    telemetry::ComponentTelemetry,
28    DEFAULT_INTAKE_COMPRESSED_SIZE_LIMIT, DEFAULT_INTAKE_UNCOMPRESSED_SIZE_LIMIT,
29};
30
31const DEFAULT_SERIALIZER_COMPRESSOR_KIND: &str = "zstd";
32const MAX_EVENTS_PER_PAYLOAD: usize = 100;
33const EVENTS_FIELD_NUMBER: u32 = 1;
34
35static CONTENT_TYPE_PROTOBUF: HeaderValue = HeaderValue::from_static("application/x-protobuf");
36
37fn default_serializer_compressor_kind() -> String {
38    DEFAULT_SERIALIZER_COMPRESSOR_KIND.to_owned()
39}
40
41const fn default_zstd_compressor_level() -> i32 {
42    3
43}
44
45/// Datadog Events incremental encoder.
46///
47/// Generates Datadog Events payloads for the Datadog platform.
48#[derive(Deserialize)]
49pub struct DatadogEventsConfiguration {
50    /// Compression kind to use for the request payloads.
51    ///
52    /// Defaults to `zstd`.
53    #[serde(
54        rename = "serializer_compressor_kind",
55        default = "default_serializer_compressor_kind"
56    )]
57    compressor_kind: String,
58
59    /// Compressor level to use when the compressor kind is `zstd`.
60    ///
61    /// Defaults to 3.
62    #[serde(
63        rename = "serializer_zstd_compressor_level",
64        default = "default_zstd_compressor_level"
65    )]
66    zstd_compressor_level: i32,
67}
68
69impl DatadogEventsConfiguration {
70    /// Creates a new `DatadogEventsConfiguration` from the given configuration.
71    pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
72        Ok(config.as_typed()?)
73    }
74}
75
76#[async_trait]
77impl IncrementalEncoderBuilder for DatadogEventsConfiguration {
78    type Output = DatadogEvents;
79
80    fn input_event_type(&self) -> EventType {
81        EventType::EventD
82    }
83
84    fn output_payload_type(&self) -> PayloadType {
85        PayloadType::Http
86    }
87
88    async fn build(&self, context: ComponentContext) -> Result<Self::Output, GenericError> {
89        let metrics_builder = MetricsBuilder::from_component_context(&context);
90        let telemetry = ComponentTelemetry::from_builder(&metrics_builder);
91        let compression_scheme = CompressionScheme::new(&self.compressor_kind, self.zstd_compressor_level);
92
93        // Create our request builder.
94        let mut request_builder =
95            RequestBuilder::new(EventsEndpointEncoder::new(), compression_scheme, RB_BUFFER_CHUNK_SIZE).await?;
96        request_builder.with_max_inputs_per_payload(MAX_EVENTS_PER_PAYLOAD);
97
98        Ok(DatadogEvents {
99            request_builder,
100            telemetry,
101        })
102    }
103}
104
105impl MemoryBounds for DatadogEventsConfiguration {
106    fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
107        // TODO: How do we properly represent the requests we can generate that may be sitting around in-flight?
108        //
109        // Theoretically, we'll end up being limited by the size of the downstream forwarder's interconnect, and however
110        // many payloads it will buffer internally... so realistically the firm limit boils down to the forwarder itself
111        // but we'll have a hard time in the forwarder knowing the maximum size of any given payload being sent in, which
112        // then makes it hard to calculate a proper firm bound even though we know the rest of the values required to
113        // calculate the firm bound.
114        builder.minimum().with_single_value::<DatadogEvents>("component struct");
115
116        builder
117            .firm()
118            // Capture the size of the "split re-encode" buffer in the request builder, which is where we keep owned
119            // versions of events that we encode in case we need to actually re-encode them during a split operation.
120            .with_array::<EventD>("events split re-encode buffer", MAX_EVENTS_PER_PAYLOAD);
121    }
122}
123
124pub struct DatadogEvents {
125    request_builder: RequestBuilder<EventsEndpointEncoder>,
126    telemetry: ComponentTelemetry,
127}
128
129#[async_trait]
130impl IncrementalEncoder for DatadogEvents {
131    async fn process_event(&mut self, event: Event) -> Result<ProcessResult, GenericError> {
132        let eventd = match event.try_into_eventd() {
133            Some(eventd) => eventd,
134            None => return Ok(ProcessResult::Continue),
135        };
136
137        match self.request_builder.encode(eventd).await {
138            Ok(None) => Ok(ProcessResult::Continue),
139            Ok(Some(eventd)) => Ok(ProcessResult::FlushRequired(Event::EventD(eventd))),
140            Err(e) => {
141                if e.is_recoverable() {
142                    warn!(error = %e, "Failed to encode Datadog event due to recoverable error. Continuing...");
143
144                    // TODO: Get the actual number of events dropped from the error itself.
145                    self.telemetry.events_dropped_encoder().increment(1);
146
147                    Ok(ProcessResult::Continue)
148                } else {
149                    Err(e).error_context("Failed to encode Datadog event due to unrecoverable error.")
150                }
151            }
152        }
153    }
154
155    async fn flush(&mut self, dispatcher: &PayloadsDispatcher) -> Result<(), GenericError> {
156        let maybe_requests = self.request_builder.flush().await;
157        for maybe_request in maybe_requests {
158            match maybe_request {
159                Ok((events, request)) => {
160                    let payload_meta = PayloadMetadata::from_event_count(events);
161                    let http_payload = HttpPayload::new(payload_meta, request);
162                    let payload = Payload::Http(http_payload);
163
164                    dispatcher.dispatch(payload).await?;
165                }
166                Err(e) => error!(error = %e, "Failed to build Datadog events payload. Continuing..."),
167            }
168        }
169
170        Ok(())
171    }
172}
173
174#[derive(Debug)]
175struct EventsEndpointEncoder {
176    tags_deduplicator: ReusableDeduplicator<Tag>,
177}
178
179impl EventsEndpointEncoder {
180    fn new() -> Self {
181        Self {
182            tags_deduplicator: ReusableDeduplicator::new(),
183        }
184    }
185}
186
187impl EndpointEncoder for EventsEndpointEncoder {
188    type Input = EventD;
189    type EncodeError = protobuf::Error;
190
191    fn encoder_name() -> &'static str {
192        "events"
193    }
194
195    fn compressed_size_limit(&self) -> usize {
196        DEFAULT_INTAKE_COMPRESSED_SIZE_LIMIT
197    }
198
199    fn uncompressed_size_limit(&self) -> usize {
200        DEFAULT_INTAKE_UNCOMPRESSED_SIZE_LIMIT
201    }
202
203    fn encode(&mut self, input: &Self::Input, buffer: &mut Vec<u8>) -> Result<(), Self::EncodeError> {
204        encode_and_write_eventd(input, buffer, &mut self.tags_deduplicator)
205    }
206
207    fn endpoint_uri(&self) -> Uri {
208        PathAndQuery::from_static("/api/v1/events_batch").into()
209    }
210
211    fn endpoint_method(&self) -> Method {
212        Method::POST
213    }
214
215    fn content_type(&self) -> HeaderValue {
216        CONTENT_TYPE_PROTOBUF.clone()
217    }
218}
219
220fn encode_and_write_eventd(
221    eventd: &EventD, buf: &mut Vec<u8>, tags_deduplicator: &mut ReusableDeduplicator<Tag>,
222) -> Result<(), protobuf::Error> {
223    let mut output_stream = CodedOutputStream::vec(buf);
224
225    // Write the field tag.
226    output_stream.write_tag(EVENTS_FIELD_NUMBER, WireType::LengthDelimited)?;
227
228    // Write the message.
229    let encoded_eventd = encode_eventd(eventd, tags_deduplicator);
230    output_stream.write_message_no_tag(&encoded_eventd)
231}
232
233fn encode_eventd(eventd: &EventD, tags_deduplicator: &mut ReusableDeduplicator<Tag>) -> proto::Event {
234    let mut event = proto::Event::new();
235    event.set_title(eventd.title().into());
236    event.set_text(eventd.text().into());
237
238    if let Some(timestamp) = eventd.timestamp() {
239        event.set_ts(timestamp as i64);
240    }
241
242    if let Some(priority) = eventd.priority() {
243        event.set_priority(priority.as_str().into());
244    }
245
246    if let Some(alert_type) = eventd.alert_type() {
247        event.set_alert_type(alert_type.as_str().into());
248    }
249
250    if let Some(hostname) = eventd.hostname() {
251        event.set_host(hostname.into());
252    }
253
254    if let Some(aggregation_key) = eventd.aggregation_key() {
255        event.set_aggregation_key(aggregation_key.into());
256    }
257
258    if let Some(source_type_name) = eventd.source_type_name() {
259        event.set_source_type_name(source_type_name.into());
260    }
261
262    let chained_tags = eventd.tags().into_iter().chain(eventd.origin_tags());
263    let deduplicated_tags = tags_deduplicator.deduplicated(chained_tags);
264
265    event.set_tags(deduplicated_tags.map(|tag| tag.as_str().into()).collect());
266
267    event
268}