saluki_components/encoders/datadog/events/
mod.rs1use async_trait::async_trait;
2use datadog_protos::events as proto;
3use facet::Facet;
4use http::{uri::PathAndQuery, HeaderValue, Method, Uri};
5use protobuf::{rt::WireType, CodedOutputStream};
6use resource_accounting::{MemoryBounds, MemoryBoundsBuilder};
7use saluki_common::iter::ReusableDeduplicator;
8use saluki_config::GenericConfiguration;
9use saluki_context::tags::Tag;
10use saluki_core::{
11 components::{encoders::*, ComponentContext},
12 data_model::{
13 event::{eventd::EventD, Event, EventType},
14 payload::{HttpPayload, Payload, PayloadMetadata, PayloadType},
15 },
16 observability::ComponentMetricsExt as _,
17 topology::PayloadsDispatcher,
18};
19use saluki_error::{ErrorContext as _, GenericError};
20use saluki_io::compression::CompressionScheme;
21use saluki_metrics::MetricsBuilder;
22use serde::Deserialize;
23use tracing::{debug, error, warn};
24
25use crate::common::datadog::{
26 clamp_payload_limits,
27 io::RB_BUFFER_CHUNK_SIZE,
28 request_builder::{EndpointEncoder, RequestBuilder},
29 telemetry::ComponentTelemetry,
30 DEFAULT_INTAKE_COMPRESSED_SIZE_LIMIT, DEFAULT_INTAKE_UNCOMPRESSED_SIZE_LIMIT,
31 DEFAULT_SERIALIZER_COMPRESSED_SIZE_LIMIT, DEFAULT_SERIALIZER_UNCOMPRESSED_SIZE_LIMIT,
32};
33
34const DEFAULT_SERIALIZER_COMPRESSOR_KIND: &str = "zstd";
35const MAX_EVENTS_PER_PAYLOAD: usize = 100;
36const EVENTS_FIELD_NUMBER: u32 = 1;
37
38static CONTENT_TYPE_PROTOBUF: HeaderValue = HeaderValue::from_static("application/x-protobuf");
39
40fn default_serializer_compressor_kind() -> String {
41 DEFAULT_SERIALIZER_COMPRESSOR_KIND.to_owned()
42}
43
44const fn default_zstd_compressor_level() -> i32 {
45 3
46}
47
48const fn default_max_payload_size() -> usize {
49 DEFAULT_SERIALIZER_COMPRESSED_SIZE_LIMIT
50}
51
52const fn default_max_uncompressed_payload_size() -> usize {
53 DEFAULT_SERIALIZER_UNCOMPRESSED_SIZE_LIMIT
54}
55
56const fn default_log_payloads() -> bool {
57 false
58}
59
60#[derive(Deserialize, Facet)]
64#[cfg_attr(test, derive(Debug, PartialEq, serde::Serialize))]
65pub struct DatadogEventsConfiguration {
66 #[serde(rename = "serializer_max_payload_size", default = "default_max_payload_size")]
74 max_payload_size: usize,
75
76 #[serde(
84 rename = "serializer_max_uncompressed_payload_size",
85 default = "default_max_uncompressed_payload_size"
86 )]
87 max_uncompressed_payload_size: usize,
88
89 #[serde(
93 rename = "serializer_compressor_kind",
94 default = "default_serializer_compressor_kind"
95 )]
96 compressor_kind: String,
97
98 #[serde(
102 rename = "serializer_zstd_compressor_level",
103 default = "default_zstd_compressor_level"
104 )]
105 zstd_compressor_level: i32,
106
107 #[serde(default = "default_log_payloads")]
113 log_payloads: bool,
114}
115
116impl DatadogEventsConfiguration {
117 pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
119 Ok(config.as_typed()?)
120 }
121}
122
123#[async_trait]
124impl IncrementalEncoderBuilder for DatadogEventsConfiguration {
125 type Output = DatadogEvents;
126
127 fn input_event_type(&self) -> EventType {
128 EventType::EventD
129 }
130
131 fn output_payload_type(&self) -> PayloadType {
132 PayloadType::Http
133 }
134
135 async fn build(&self, context: ComponentContext) -> Result<Self::Output, GenericError> {
136 let metrics_builder = MetricsBuilder::from_component_context(&context);
137 let telemetry = ComponentTelemetry::from_builder(&metrics_builder);
138 let compression_scheme = CompressionScheme::new(&self.compressor_kind, self.zstd_compressor_level);
139
140 let mut request_builder =
142 RequestBuilder::new(EventsEndpointEncoder::new(), compression_scheme, RB_BUFFER_CHUNK_SIZE).await?;
143 let (uncompressed_limit, compressed_limit) = clamp_payload_limits(
144 self.max_uncompressed_payload_size,
145 self.max_payload_size,
146 DEFAULT_INTAKE_UNCOMPRESSED_SIZE_LIMIT,
147 DEFAULT_INTAKE_COMPRESSED_SIZE_LIMIT,
148 );
149 request_builder.with_len_limits(uncompressed_limit, compressed_limit)?;
150 request_builder.with_max_inputs_per_payload(MAX_EVENTS_PER_PAYLOAD);
151
152 Ok(DatadogEvents {
153 request_builder,
154 telemetry,
155 log_payloads: self.log_payloads,
156 })
157 }
158}
159
160impl MemoryBounds for DatadogEventsConfiguration {
161 fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
162 builder.minimum().with_single_value::<DatadogEvents>("component struct");
170
171 builder
172 .firm()
173 .with_array::<EventD>("events split re-encode buffer", MAX_EVENTS_PER_PAYLOAD);
176 }
177}
178
179pub struct DatadogEvents {
180 request_builder: RequestBuilder<EventsEndpointEncoder>,
181 telemetry: ComponentTelemetry,
182 log_payloads: bool,
183}
184
185#[async_trait]
186impl IncrementalEncoder for DatadogEvents {
187 async fn process_event(&mut self, event: Event) -> Result<ProcessResult, GenericError> {
188 let eventd = match event.try_into_eventd() {
189 Some(eventd) => eventd,
190 None => return Ok(ProcessResult::Continue),
191 };
192
193 if self.log_payloads {
194 debug!(event = ?eventd, "Flushing event.");
195 }
196
197 match self.request_builder.encode(eventd).await {
198 Ok(None) => Ok(ProcessResult::Continue),
199 Ok(Some(eventd)) => Ok(ProcessResult::FlushRequired(Event::EventD(eventd))),
200 Err(e) => {
201 if e.is_recoverable() {
202 warn!(error = %e, "Failed to encode Datadog event due to recoverable error. Continuing...");
203
204 self.telemetry.events_dropped_encoder().increment(1);
206
207 Ok(ProcessResult::Continue)
208 } else {
209 Err(e).error_context("Failed to encode Datadog event due to unrecoverable error.")
210 }
211 }
212 }
213 }
214
215 async fn flush(&mut self, dispatcher: &PayloadsDispatcher) -> Result<(), GenericError> {
216 let maybe_requests = self.request_builder.flush().await;
217 for maybe_request in maybe_requests {
218 match maybe_request {
219 Ok((events, _data_points, request)) => {
220 let payload_meta = PayloadMetadata::from_event_count(events);
221 let http_payload = HttpPayload::new(payload_meta, request);
222 let payload = Payload::Http(http_payload);
223
224 dispatcher.dispatch(payload).await?;
225 }
226 Err(e) => error!(error = %e, "Failed to build Datadog events payload. Continuing..."),
227 }
228 }
229
230 Ok(())
231 }
232}
233
234#[derive(Debug)]
235struct EventsEndpointEncoder {
236 tags_deduplicator: ReusableDeduplicator<Tag>,
237}
238
239impl EventsEndpointEncoder {
240 fn new() -> Self {
241 Self {
242 tags_deduplicator: ReusableDeduplicator::new(),
243 }
244 }
245}
246
247impl EndpointEncoder for EventsEndpointEncoder {
248 type Input = EventD;
249 type EncodeError = protobuf::Error;
250
251 fn encoder_name() -> &'static str {
252 "events"
253 }
254
255 fn compressed_size_limit(&self) -> usize {
256 DEFAULT_INTAKE_COMPRESSED_SIZE_LIMIT
257 }
258
259 fn uncompressed_size_limit(&self) -> usize {
260 DEFAULT_INTAKE_UNCOMPRESSED_SIZE_LIMIT
261 }
262
263 fn encode(&mut self, input: &Self::Input, buffer: &mut Vec<u8>) -> Result<(), Self::EncodeError> {
264 encode_and_write_eventd(input, buffer, &mut self.tags_deduplicator)
265 }
266
267 fn endpoint_uri(&self) -> Uri {
268 PathAndQuery::from_static("/api/v1/events_batch").into()
269 }
270
271 fn endpoint_method(&self) -> Method {
272 Method::POST
273 }
274
275 fn content_type(&self) -> HeaderValue {
276 CONTENT_TYPE_PROTOBUF.clone()
277 }
278}
279
280fn encode_and_write_eventd(
281 eventd: &EventD, buf: &mut Vec<u8>, tags_deduplicator: &mut ReusableDeduplicator<Tag>,
282) -> Result<(), protobuf::Error> {
283 let mut output_stream = CodedOutputStream::vec(buf);
284
285 output_stream.write_tag(EVENTS_FIELD_NUMBER, WireType::LengthDelimited)?;
287
288 let encoded_eventd = encode_eventd(eventd, tags_deduplicator);
290 output_stream.write_message_no_tag(&encoded_eventd)
291}
292
293fn encode_eventd(eventd: &EventD, tags_deduplicator: &mut ReusableDeduplicator<Tag>) -> proto::Event {
294 let mut event = proto::Event::new();
295 event.set_title(eventd.title().into());
296 event.set_text(eventd.text().into());
297
298 if let Some(timestamp) = eventd.timestamp() {
299 event.set_ts(timestamp as i64);
300 }
301
302 if let Some(priority) = eventd.priority() {
303 event.set_priority(priority.as_str().into());
304 }
305
306 if let Some(alert_type) = eventd.alert_type() {
307 event.set_alert_type(alert_type.as_str().into());
308 }
309
310 if let Some(hostname) = eventd.hostname() {
311 event.set_host(hostname.into());
312 }
313
314 if let Some(aggregation_key) = eventd.aggregation_key() {
315 event.set_aggregation_key(aggregation_key.into());
316 }
317
318 if let Some(source_type_name) = eventd.source_type_name() {
319 event.set_source_type_name(source_type_name.into());
320 }
321
322 let chained_tags = eventd.tags().into_iter().chain(eventd.origin_tags());
323 let deduplicated_tags = tags_deduplicator.deduplicated(chained_tags);
324
325 event.set_tags(deduplicated_tags.map(|tag| tag.as_str().into()).collect());
326
327 event
328}
329
330#[cfg(test)]
331mod config_smoke {
332 use serde_json::json;
333
334 use super::DatadogEventsConfiguration;
335 use crate::config_registry::structs;
336 use crate::config_registry::test_support::run_config_smoke_tests;
337
338 #[tokio::test]
339 async fn smoke_test() {
340 run_config_smoke_tests(structs::DATADOG_EVENTS_CONFIGURATION, &[], json!({}), |cfg| {
341 cfg.as_typed::<DatadogEventsConfiguration>()
342 .expect("DatadogEventsConfiguration should deserialize")
343 })
344 .await
345 }
346}