saluki_components/encoders/datadog/events/
mod.rs1use async_trait::async_trait;
2use datadog_protos::events as proto;
3use http::{uri::PathAndQuery, HeaderValue, Method, Uri};
4use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
5use protobuf::{rt::WireType, CodedOutputStream};
6use saluki_common::iter::ReusableDeduplicator;
7use saluki_config::GenericConfiguration;
8use saluki_context::tags::Tag;
9use saluki_core::{
10 components::{encoders::*, ComponentContext},
11 data_model::{
12 event::{eventd::EventD, Event, EventType},
13 payload::{HttpPayload, Payload, PayloadMetadata, PayloadType},
14 },
15 observability::ComponentMetricsExt as _,
16 topology::PayloadsDispatcher,
17};
18use saluki_error::{ErrorContext as _, GenericError};
19use saluki_io::compression::CompressionScheme;
20use saluki_metrics::MetricsBuilder;
21use serde::Deserialize;
22use tracing::{error, warn};
23
24use crate::common::datadog::{
25 io::RB_BUFFER_CHUNK_SIZE,
26 request_builder::{EndpointEncoder, RequestBuilder},
27 telemetry::ComponentTelemetry,
28 DEFAULT_INTAKE_COMPRESSED_SIZE_LIMIT, DEFAULT_INTAKE_UNCOMPRESSED_SIZE_LIMIT,
29};
30
31const DEFAULT_SERIALIZER_COMPRESSOR_KIND: &str = "zstd";
32const MAX_EVENTS_PER_PAYLOAD: usize = 100;
33const EVENTS_FIELD_NUMBER: u32 = 1;
34
35static CONTENT_TYPE_PROTOBUF: HeaderValue = HeaderValue::from_static("application/x-protobuf");
36
37fn default_serializer_compressor_kind() -> String {
38 DEFAULT_SERIALIZER_COMPRESSOR_KIND.to_owned()
39}
40
41const fn default_zstd_compressor_level() -> i32 {
42 3
43}
44
45#[derive(Deserialize)]
49pub struct DatadogEventsConfiguration {
50 #[serde(
54 rename = "serializer_compressor_kind",
55 default = "default_serializer_compressor_kind"
56 )]
57 compressor_kind: String,
58
59 #[serde(
63 rename = "serializer_zstd_compressor_level",
64 default = "default_zstd_compressor_level"
65 )]
66 zstd_compressor_level: i32,
67}
68
69impl DatadogEventsConfiguration {
70 pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
72 Ok(config.as_typed()?)
73 }
74}
75
76#[async_trait]
77impl IncrementalEncoderBuilder for DatadogEventsConfiguration {
78 type Output = DatadogEvents;
79
80 fn input_event_type(&self) -> EventType {
81 EventType::EventD
82 }
83
84 fn output_payload_type(&self) -> PayloadType {
85 PayloadType::Http
86 }
87
88 async fn build(&self, context: ComponentContext) -> Result<Self::Output, GenericError> {
89 let metrics_builder = MetricsBuilder::from_component_context(&context);
90 let telemetry = ComponentTelemetry::from_builder(&metrics_builder);
91 let compression_scheme = CompressionScheme::new(&self.compressor_kind, self.zstd_compressor_level);
92
93 let mut request_builder =
95 RequestBuilder::new(EventsEndpointEncoder::new(), compression_scheme, RB_BUFFER_CHUNK_SIZE).await?;
96 request_builder.with_max_inputs_per_payload(MAX_EVENTS_PER_PAYLOAD);
97
98 Ok(DatadogEvents {
99 request_builder,
100 telemetry,
101 })
102 }
103}
104
105impl MemoryBounds for DatadogEventsConfiguration {
106 fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
107 builder.minimum().with_single_value::<DatadogEvents>("component struct");
115
116 builder
117 .firm()
118 .with_array::<EventD>("events split re-encode buffer", MAX_EVENTS_PER_PAYLOAD);
121 }
122}
123
124pub struct DatadogEvents {
125 request_builder: RequestBuilder<EventsEndpointEncoder>,
126 telemetry: ComponentTelemetry,
127}
128
129#[async_trait]
130impl IncrementalEncoder for DatadogEvents {
131 async fn process_event(&mut self, event: Event) -> Result<ProcessResult, GenericError> {
132 let eventd = match event.try_into_eventd() {
133 Some(eventd) => eventd,
134 None => return Ok(ProcessResult::Continue),
135 };
136
137 match self.request_builder.encode(eventd).await {
138 Ok(None) => Ok(ProcessResult::Continue),
139 Ok(Some(eventd)) => Ok(ProcessResult::FlushRequired(Event::EventD(eventd))),
140 Err(e) => {
141 if e.is_recoverable() {
142 warn!(error = %e, "Failed to encode Datadog event due to recoverable error. Continuing...");
143
144 self.telemetry.events_dropped_encoder().increment(1);
146
147 Ok(ProcessResult::Continue)
148 } else {
149 Err(e).error_context("Failed to encode Datadog event due to unrecoverable error.")
150 }
151 }
152 }
153 }
154
155 async fn flush(&mut self, dispatcher: &PayloadsDispatcher) -> Result<(), GenericError> {
156 let maybe_requests = self.request_builder.flush().await;
157 for maybe_request in maybe_requests {
158 match maybe_request {
159 Ok((events, request)) => {
160 let payload_meta = PayloadMetadata::from_event_count(events);
161 let http_payload = HttpPayload::new(payload_meta, request);
162 let payload = Payload::Http(http_payload);
163
164 dispatcher.dispatch(payload).await?;
165 }
166 Err(e) => error!(error = %e, "Failed to build Datadog events payload. Continuing..."),
167 }
168 }
169
170 Ok(())
171 }
172}
173
174#[derive(Debug)]
175struct EventsEndpointEncoder {
176 tags_deduplicator: ReusableDeduplicator<Tag>,
177}
178
179impl EventsEndpointEncoder {
180 fn new() -> Self {
181 Self {
182 tags_deduplicator: ReusableDeduplicator::new(),
183 }
184 }
185}
186
187impl EndpointEncoder for EventsEndpointEncoder {
188 type Input = EventD;
189 type EncodeError = protobuf::Error;
190
191 fn encoder_name() -> &'static str {
192 "events"
193 }
194
195 fn compressed_size_limit(&self) -> usize {
196 DEFAULT_INTAKE_COMPRESSED_SIZE_LIMIT
197 }
198
199 fn uncompressed_size_limit(&self) -> usize {
200 DEFAULT_INTAKE_UNCOMPRESSED_SIZE_LIMIT
201 }
202
203 fn encode(&mut self, input: &Self::Input, buffer: &mut Vec<u8>) -> Result<(), Self::EncodeError> {
204 encode_and_write_eventd(input, buffer, &mut self.tags_deduplicator)
205 }
206
207 fn endpoint_uri(&self) -> Uri {
208 PathAndQuery::from_static("/api/v1/events_batch").into()
209 }
210
211 fn endpoint_method(&self) -> Method {
212 Method::POST
213 }
214
215 fn content_type(&self) -> HeaderValue {
216 CONTENT_TYPE_PROTOBUF.clone()
217 }
218}
219
220fn encode_and_write_eventd(
221 eventd: &EventD, buf: &mut Vec<u8>, tags_deduplicator: &mut ReusableDeduplicator<Tag>,
222) -> Result<(), protobuf::Error> {
223 let mut output_stream = CodedOutputStream::vec(buf);
224
225 output_stream.write_tag(EVENTS_FIELD_NUMBER, WireType::LengthDelimited)?;
227
228 let encoded_eventd = encode_eventd(eventd, tags_deduplicator);
230 output_stream.write_message_no_tag(&encoded_eventd)
231}
232
233fn encode_eventd(eventd: &EventD, tags_deduplicator: &mut ReusableDeduplicator<Tag>) -> proto::Event {
234 let mut event = proto::Event::new();
235 event.set_title(eventd.title().into());
236 event.set_text(eventd.text().into());
237
238 if let Some(timestamp) = eventd.timestamp() {
239 event.set_ts(timestamp as i64);
240 }
241
242 if let Some(priority) = eventd.priority() {
243 event.set_priority(priority.as_str().into());
244 }
245
246 if let Some(alert_type) = eventd.alert_type() {
247 event.set_alert_type(alert_type.as_str().into());
248 }
249
250 if let Some(hostname) = eventd.hostname() {
251 event.set_host(hostname.into());
252 }
253
254 if let Some(aggregation_key) = eventd.aggregation_key() {
255 event.set_aggregation_key(aggregation_key.into());
256 }
257
258 if let Some(source_type_name) = eventd.source_type_name() {
259 event.set_source_type_name(source_type_name.into());
260 }
261
262 let chained_tags = eventd.tags().into_iter().chain(eventd.origin_tags());
263 let deduplicated_tags = tags_deduplicator.deduplicated(chained_tags);
264
265 event.set_tags(deduplicated_tags.map(|tag| tag.as_str().into()).collect());
266
267 event
268}