saluki_components/encoders/datadog/events/
mod.rs1use async_trait::async_trait;
2use datadog_protos::events as proto;
3use facet::Facet;
4use http::{uri::PathAndQuery, HeaderValue, Method, Uri};
5use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
6use protobuf::{rt::WireType, CodedOutputStream};
7use saluki_common::iter::ReusableDeduplicator;
8use saluki_config::GenericConfiguration;
9use saluki_context::tags::Tag;
10use saluki_core::{
11 components::{encoders::*, ComponentContext},
12 data_model::{
13 event::{eventd::EventD, Event, EventType},
14 payload::{HttpPayload, Payload, PayloadMetadata, PayloadType},
15 },
16 observability::ComponentMetricsExt as _,
17 topology::PayloadsDispatcher,
18};
19use saluki_error::{ErrorContext as _, GenericError};
20use saluki_io::compression::CompressionScheme;
21use saluki_metrics::MetricsBuilder;
22use serde::Deserialize;
23use tracing::{error, warn};
24
25use crate::common::datadog::{
26 io::RB_BUFFER_CHUNK_SIZE,
27 request_builder::{EndpointEncoder, RequestBuilder},
28 telemetry::ComponentTelemetry,
29 DEFAULT_INTAKE_COMPRESSED_SIZE_LIMIT, DEFAULT_INTAKE_UNCOMPRESSED_SIZE_LIMIT,
30};
31
32const DEFAULT_SERIALIZER_COMPRESSOR_KIND: &str = "zstd";
33const MAX_EVENTS_PER_PAYLOAD: usize = 100;
34const EVENTS_FIELD_NUMBER: u32 = 1;
35
36static CONTENT_TYPE_PROTOBUF: HeaderValue = HeaderValue::from_static("application/x-protobuf");
37
38fn default_serializer_compressor_kind() -> String {
39 DEFAULT_SERIALIZER_COMPRESSOR_KIND.to_owned()
40}
41
42const fn default_zstd_compressor_level() -> i32 {
43 3
44}
45
46#[derive(Deserialize, Facet)]
50#[cfg_attr(test, derive(Debug, PartialEq, serde::Serialize))]
51pub struct DatadogEventsConfiguration {
52 #[serde(
56 rename = "serializer_compressor_kind",
57 default = "default_serializer_compressor_kind"
58 )]
59 compressor_kind: String,
60
61 #[serde(
65 rename = "serializer_zstd_compressor_level",
66 default = "default_zstd_compressor_level"
67 )]
68 zstd_compressor_level: i32,
69}
70
71impl DatadogEventsConfiguration {
72 pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
74 Ok(config.as_typed()?)
75 }
76}
77
78#[async_trait]
79impl IncrementalEncoderBuilder for DatadogEventsConfiguration {
80 type Output = DatadogEvents;
81
82 fn input_event_type(&self) -> EventType {
83 EventType::EventD
84 }
85
86 fn output_payload_type(&self) -> PayloadType {
87 PayloadType::Http
88 }
89
90 async fn build(&self, context: ComponentContext) -> Result<Self::Output, GenericError> {
91 let metrics_builder = MetricsBuilder::from_component_context(&context);
92 let telemetry = ComponentTelemetry::from_builder(&metrics_builder);
93 let compression_scheme = CompressionScheme::new(&self.compressor_kind, self.zstd_compressor_level);
94
95 let mut request_builder =
97 RequestBuilder::new(EventsEndpointEncoder::new(), compression_scheme, RB_BUFFER_CHUNK_SIZE).await?;
98 request_builder.with_max_inputs_per_payload(MAX_EVENTS_PER_PAYLOAD);
99
100 Ok(DatadogEvents {
101 request_builder,
102 telemetry,
103 })
104 }
105}
106
107impl MemoryBounds for DatadogEventsConfiguration {
108 fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
109 builder.minimum().with_single_value::<DatadogEvents>("component struct");
117
118 builder
119 .firm()
120 .with_array::<EventD>("events split re-encode buffer", MAX_EVENTS_PER_PAYLOAD);
123 }
124}
125
126pub struct DatadogEvents {
127 request_builder: RequestBuilder<EventsEndpointEncoder>,
128 telemetry: ComponentTelemetry,
129}
130
131#[async_trait]
132impl IncrementalEncoder for DatadogEvents {
133 async fn process_event(&mut self, event: Event) -> Result<ProcessResult, GenericError> {
134 let eventd = match event.try_into_eventd() {
135 Some(eventd) => eventd,
136 None => return Ok(ProcessResult::Continue),
137 };
138
139 match self.request_builder.encode(eventd).await {
140 Ok(None) => Ok(ProcessResult::Continue),
141 Ok(Some(eventd)) => Ok(ProcessResult::FlushRequired(Event::EventD(eventd))),
142 Err(e) => {
143 if e.is_recoverable() {
144 warn!(error = %e, "Failed to encode Datadog event due to recoverable error. Continuing...");
145
146 self.telemetry.events_dropped_encoder().increment(1);
148
149 Ok(ProcessResult::Continue)
150 } else {
151 Err(e).error_context("Failed to encode Datadog event due to unrecoverable error.")
152 }
153 }
154 }
155 }
156
157 async fn flush(&mut self, dispatcher: &PayloadsDispatcher) -> Result<(), GenericError> {
158 let maybe_requests = self.request_builder.flush().await;
159 for maybe_request in maybe_requests {
160 match maybe_request {
161 Ok((events, request)) => {
162 let payload_meta = PayloadMetadata::from_event_count(events);
163 let http_payload = HttpPayload::new(payload_meta, request);
164 let payload = Payload::Http(http_payload);
165
166 dispatcher.dispatch(payload).await?;
167 }
168 Err(e) => error!(error = %e, "Failed to build Datadog events payload. Continuing..."),
169 }
170 }
171
172 Ok(())
173 }
174}
175
176#[derive(Debug)]
177struct EventsEndpointEncoder {
178 tags_deduplicator: ReusableDeduplicator<Tag>,
179}
180
181impl EventsEndpointEncoder {
182 fn new() -> Self {
183 Self {
184 tags_deduplicator: ReusableDeduplicator::new(),
185 }
186 }
187}
188
189impl EndpointEncoder for EventsEndpointEncoder {
190 type Input = EventD;
191 type EncodeError = protobuf::Error;
192
193 fn encoder_name() -> &'static str {
194 "events"
195 }
196
197 fn compressed_size_limit(&self) -> usize {
198 DEFAULT_INTAKE_COMPRESSED_SIZE_LIMIT
199 }
200
201 fn uncompressed_size_limit(&self) -> usize {
202 DEFAULT_INTAKE_UNCOMPRESSED_SIZE_LIMIT
203 }
204
205 fn encode(&mut self, input: &Self::Input, buffer: &mut Vec<u8>) -> Result<(), Self::EncodeError> {
206 encode_and_write_eventd(input, buffer, &mut self.tags_deduplicator)
207 }
208
209 fn endpoint_uri(&self) -> Uri {
210 PathAndQuery::from_static("/api/v1/events_batch").into()
211 }
212
213 fn endpoint_method(&self) -> Method {
214 Method::POST
215 }
216
217 fn content_type(&self) -> HeaderValue {
218 CONTENT_TYPE_PROTOBUF.clone()
219 }
220}
221
222fn encode_and_write_eventd(
223 eventd: &EventD, buf: &mut Vec<u8>, tags_deduplicator: &mut ReusableDeduplicator<Tag>,
224) -> Result<(), protobuf::Error> {
225 let mut output_stream = CodedOutputStream::vec(buf);
226
227 output_stream.write_tag(EVENTS_FIELD_NUMBER, WireType::LengthDelimited)?;
229
230 let encoded_eventd = encode_eventd(eventd, tags_deduplicator);
232 output_stream.write_message_no_tag(&encoded_eventd)
233}
234
235fn encode_eventd(eventd: &EventD, tags_deduplicator: &mut ReusableDeduplicator<Tag>) -> proto::Event {
236 let mut event = proto::Event::new();
237 event.set_title(eventd.title().into());
238 event.set_text(eventd.text().into());
239
240 if let Some(timestamp) = eventd.timestamp() {
241 event.set_ts(timestamp as i64);
242 }
243
244 if let Some(priority) = eventd.priority() {
245 event.set_priority(priority.as_str().into());
246 }
247
248 if let Some(alert_type) = eventd.alert_type() {
249 event.set_alert_type(alert_type.as_str().into());
250 }
251
252 if let Some(hostname) = eventd.hostname() {
253 event.set_host(hostname.into());
254 }
255
256 if let Some(aggregation_key) = eventd.aggregation_key() {
257 event.set_aggregation_key(aggregation_key.into());
258 }
259
260 if let Some(source_type_name) = eventd.source_type_name() {
261 event.set_source_type_name(source_type_name.into());
262 }
263
264 let chained_tags = eventd.tags().into_iter().chain(eventd.origin_tags());
265 let deduplicated_tags = tags_deduplicator.deduplicated(chained_tags);
266
267 event.set_tags(deduplicated_tags.map(|tag| tag.as_str().into()).collect());
268
269 event
270}
271
272#[cfg(test)]
273mod config_smoke {
274 use serde_json::json;
275
276 use super::DatadogEventsConfiguration;
277 use crate::config_registry::structs;
278 use crate::config_registry::test_support::run_config_smoke_tests;
279
280 #[tokio::test]
281 async fn smoke_test() {
282 run_config_smoke_tests(structs::DATADOG_EVENTS_CONFIGURATION, &[], json!({}), |cfg| {
283 cfg.as_typed::<DatadogEventsConfiguration>()
284 .expect("DatadogEventsConfiguration should deserialize")
285 })
286 .await
287 }
288}