saluki_components/encoders/datadog/logs/
mod.rs1use async_trait::async_trait;
2use chrono::{SecondsFormat, Utc};
3use facet::Facet;
4use http::{uri::PathAndQuery, HeaderValue, Method, Uri};
5use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
6use saluki_common::iter::ReusableDeduplicator;
7use saluki_config::GenericConfiguration;
8use saluki_context::tags::Tag;
9use saluki_core::{
10 components::{encoders::*, ComponentContext},
11 data_model::{
12 event::{log::Log, Event, EventType},
13 payload::{HttpPayload, Payload, PayloadMetadata, PayloadType},
14 },
15 observability::ComponentMetricsExt as _,
16 topology::PayloadsDispatcher,
17};
18use saluki_error::{ErrorContext as _, GenericError};
19use saluki_io::compression::CompressionScheme;
20use saluki_metrics::MetricsBuilder;
21use serde::Deserialize;
22use serde_json::{Map as JsonMap, Value as JsonValue};
23use tracing::{error, warn};
24
25use crate::common::datadog::{
26 io::RB_BUFFER_CHUNK_SIZE,
27 request_builder::{EndpointEncoder, RequestBuilder},
28 telemetry::ComponentTelemetry,
29 DEFAULT_INTAKE_COMPRESSED_SIZE_LIMIT, DEFAULT_INTAKE_UNCOMPRESSED_SIZE_LIMIT,
30};
31
32const DEFAULT_SERIALIZER_COMPRESSOR_KIND: &str = "zstd";
33const MAX_LOGS_PER_PAYLOAD: usize = 1000;
34
35static CONTENT_TYPE_JSON: HeaderValue = HeaderValue::from_static("application/json");
36
37fn default_serializer_compressor_kind() -> String {
38 DEFAULT_SERIALIZER_COMPRESSOR_KIND.to_owned()
39}
40
41const fn default_zstd_compressor_level() -> i32 {
42 3
43}
44
45#[derive(Deserialize, Debug, Facet)]
47#[cfg_attr(test, derive(PartialEq, serde::Serialize))]
48pub struct DatadogLogsConfiguration {
49 #[serde(
51 rename = "serializer_compressor_kind",
52 default = "default_serializer_compressor_kind"
53 )]
54 compressor_kind: String,
55
56 #[serde(
58 rename = "serializer_zstd_compressor_level",
59 default = "default_zstd_compressor_level"
60 )]
61 zstd_compressor_level: i32,
62}
63
64impl DatadogLogsConfiguration {
65 pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
67 Ok(config.as_typed()?)
68 }
69}
70
71#[async_trait]
72impl IncrementalEncoderBuilder for DatadogLogsConfiguration {
73 type Output = DatadogLogs;
74
75 fn input_event_type(&self) -> EventType {
76 EventType::Log
77 }
78
79 fn output_payload_type(&self) -> PayloadType {
80 PayloadType::Http
81 }
82
83 async fn build(&self, context: ComponentContext) -> Result<Self::Output, GenericError> {
84 let metrics_builder = MetricsBuilder::from_component_context(&context);
85 let telemetry = ComponentTelemetry::from_builder(&metrics_builder);
86 let compression_scheme = CompressionScheme::new(&self.compressor_kind, self.zstd_compressor_level);
87
88 let mut request_builder =
89 RequestBuilder::new(LogsEndpointEncoder::new(), compression_scheme, RB_BUFFER_CHUNK_SIZE).await?;
90 request_builder.with_max_inputs_per_payload(MAX_LOGS_PER_PAYLOAD);
91
92 Ok(DatadogLogs {
93 request_builder,
94 telemetry,
95 })
96 }
97}
98
99impl MemoryBounds for DatadogLogsConfiguration {
100 fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
101 builder.minimum().with_single_value::<DatadogLogs>("component struct");
104 builder.firm().with_array::<Log>("logs buffer", MAX_LOGS_PER_PAYLOAD);
105 }
106}
107
108pub struct DatadogLogs {
109 request_builder: RequestBuilder<LogsEndpointEncoder>,
110 telemetry: ComponentTelemetry,
111}
112
113#[async_trait]
114impl IncrementalEncoder for DatadogLogs {
115 async fn process_event(&mut self, event: Event) -> Result<ProcessResult, GenericError> {
116 let log: Log = match event {
117 Event::Log(log) => log,
118 _ => return Ok(ProcessResult::Continue),
119 };
120 match self.request_builder.encode(log).await {
121 Ok(None) => Ok(ProcessResult::Continue),
122 Ok(Some(log)) => Ok(ProcessResult::FlushRequired(Event::Log(log))),
123 Err(e) => {
124 if e.is_recoverable() {
125 warn!(error = %e, "Failed to encode Datadog log due to recoverable error. Continuing...");
126
127 self.telemetry.events_dropped_encoder().increment(1);
129 Ok(ProcessResult::Continue)
130 } else {
131 Err(e).error_context("Failed to encode Datadog log due to unrecoverable error.")
132 }
133 }
134 }
135 }
136
137 async fn flush(&mut self, dispatcher: &PayloadsDispatcher) -> Result<(), GenericError> {
138 let maybe_requests = self.request_builder.flush().await;
139 for maybe_request in maybe_requests {
140 match maybe_request {
141 Ok((events, request)) => {
142 let payload_meta = PayloadMetadata::from_event_count(events);
143 let http_payload = HttpPayload::new(payload_meta, request);
144 let payload = Payload::Http(http_payload);
145 dispatcher.dispatch(payload).await?;
146 }
147 Err(e) => error!(error = %e, "Failed to build Datadog logs payload. Continuing..."),
148 }
149 }
150
151 Ok(())
152 }
153}
154
155#[derive(Debug)]
156struct LogsEndpointEncoder {
157 tags_deduplicator: ReusableDeduplicator<Tag>,
158}
159
160impl LogsEndpointEncoder {
161 fn new() -> Self {
162 Self {
163 tags_deduplicator: ReusableDeduplicator::new(),
164 }
165 }
166
167 fn build_agent_json(&mut self, log: &Log) -> JsonValue {
168 let mut obj = JsonMap::new();
169
170 let mut message_inner = JsonMap::new();
172 message_inner.insert("message".to_string(), JsonValue::String(log.message().to_string()));
173 if !log.service().is_empty() {
174 message_inner.insert("service".to_string(), JsonValue::String(log.service().to_string()));
175 }
176 let message_str =
177 serde_json::to_string(&JsonValue::Object(message_inner)).unwrap_or_else(|_| log.message().to_string());
178 obj.insert("message".to_string(), JsonValue::String(message_str));
179
180 if let Some(status) = log.status() {
181 obj.insert("status".to_string(), JsonValue::String(status.as_str().to_string()));
182 }
183 if !log.hostname().is_empty() {
184 obj.insert("hostname".to_string(), JsonValue::String(log.hostname().to_string()));
185 }
186 if !log.service().is_empty() {
187 obj.insert("service".to_string(), JsonValue::String(log.service().to_string()));
188 }
189
190 if let Some(ddsource) = log.source().clone() {
191 obj.insert("ddsource".to_string(), JsonValue::String(ddsource.to_string()));
192 }
193
194 let tags_iter = self.tags_deduplicator.deduplicated(log.tags().into_iter());
196 let tags_vec: Vec<&str> = tags_iter.map(|t| t.as_str()).collect();
197 if !tags_vec.is_empty() {
198 obj.insert("ddtags".to_string(), JsonValue::String(tags_vec.join(",")));
199 }
200
201 let user_provided_timestamp = log.additional_properties().contains_key("timestamp")
203 || log.additional_properties().contains_key("@timestamp");
204 if !user_provided_timestamp {
205 let now_rfc3339 = Utc::now().to_rfc3339_opts(SecondsFormat::Millis, true);
206 obj.insert("@timestamp".to_string(), JsonValue::String(now_rfc3339));
207 }
208
209 for (k, v) in log.additional_properties() {
211 obj.insert(k.to_string(), v.clone());
212 }
213
214 JsonValue::Object(obj)
215 }
216}
217
218impl EndpointEncoder for LogsEndpointEncoder {
219 type Input = Log;
220 type EncodeError = serde_json::Error;
221
222 fn encoder_name() -> &'static str {
223 "logs"
224 }
225
226 fn compressed_size_limit(&self) -> usize {
227 DEFAULT_INTAKE_COMPRESSED_SIZE_LIMIT
228 }
229
230 fn uncompressed_size_limit(&self) -> usize {
231 DEFAULT_INTAKE_UNCOMPRESSED_SIZE_LIMIT
232 }
233
234 fn get_payload_prefix(&self) -> Option<&'static [u8]> {
235 Some(b"[")
236 }
237
238 fn get_payload_suffix(&self) -> Option<&'static [u8]> {
239 Some(b"]")
240 }
241
242 fn get_input_separator(&self) -> Option<&'static [u8]> {
243 Some(b",")
244 }
245
246 fn encode(&mut self, input: &Self::Input, buffer: &mut Vec<u8>) -> Result<(), Self::EncodeError> {
247 let json = self.build_agent_json(input);
248 serde_json::to_writer(buffer, &json)
249 }
250
251 fn endpoint_uri(&self) -> Uri {
252 PathAndQuery::from_static("/api/v2/logs").into()
253 }
254
255 fn endpoint_method(&self) -> Method {
256 Method::POST
257 }
258
259 fn content_type(&self) -> HeaderValue {
260 CONTENT_TYPE_JSON.clone()
261 }
262}
263
264#[cfg(test)]
265mod config_smoke {
266 use serde_json::json;
267
268 use super::DatadogLogsConfiguration;
269 use crate::config_registry::structs;
270 use crate::config_registry::test_support::run_config_smoke_tests;
271
272 #[tokio::test]
273 async fn smoke_test() {
274 run_config_smoke_tests(structs::DATADOG_LOGS_CONFIGURATION, &[], json!({}), |cfg| {
275 cfg.as_typed::<DatadogLogsConfiguration>()
276 .expect("DatadogLogsConfiguration should deserialize")
277 })
278 .await
279 }
280}