Skip to main content

saluki_components/encoders/datadog/logs/
mod.rs

1use async_trait::async_trait;
2use chrono::{SecondsFormat, Utc};
3use facet::Facet;
4use http::{uri::PathAndQuery, HeaderValue, Method, Uri};
5use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
6use saluki_common::iter::ReusableDeduplicator;
7use saluki_config::GenericConfiguration;
8use saluki_context::tags::Tag;
9use saluki_core::{
10    components::{encoders::*, ComponentContext},
11    data_model::{
12        event::{log::Log, Event, EventType},
13        payload::{HttpPayload, Payload, PayloadMetadata, PayloadType},
14    },
15    observability::ComponentMetricsExt as _,
16    topology::PayloadsDispatcher,
17};
18use saluki_error::{ErrorContext as _, GenericError};
19use saluki_io::compression::CompressionScheme;
20use saluki_metrics::MetricsBuilder;
21use serde::Deserialize;
22use serde_json::{Map as JsonMap, Value as JsonValue};
23use tracing::{error, warn};
24
25use crate::common::datadog::{
26    io::RB_BUFFER_CHUNK_SIZE,
27    request_builder::{EndpointEncoder, RequestBuilder},
28    telemetry::ComponentTelemetry,
29    DEFAULT_INTAKE_COMPRESSED_SIZE_LIMIT, DEFAULT_INTAKE_UNCOMPRESSED_SIZE_LIMIT,
30};
31
32const DEFAULT_SERIALIZER_COMPRESSOR_KIND: &str = "zstd";
33const MAX_LOGS_PER_PAYLOAD: usize = 1000;
34
35static CONTENT_TYPE_JSON: HeaderValue = HeaderValue::from_static("application/json");
36
37fn default_serializer_compressor_kind() -> String {
38    DEFAULT_SERIALIZER_COMPRESSOR_KIND.to_owned()
39}
40
41const fn default_zstd_compressor_level() -> i32 {
42    3
43}
44
45/// Datadog Logs incremental encoder.
46#[derive(Deserialize, Debug, Facet)]
47#[cfg_attr(test, derive(PartialEq, serde::Serialize))]
48pub struct DatadogLogsConfiguration {
49    /// Compression kind for Logs payloads. Defaults to `zstd`.
50    #[serde(
51        rename = "serializer_compressor_kind",
52        default = "default_serializer_compressor_kind"
53    )]
54    compressor_kind: String,
55
56    /// Compressor level to use when the compressor kind is `zstd`. Defaults to 3.
57    #[serde(
58        rename = "serializer_zstd_compressor_level",
59        default = "default_zstd_compressor_level"
60    )]
61    zstd_compressor_level: i32,
62}
63
64impl DatadogLogsConfiguration {
65    /// Creates a new `DatadogLogsConfiguration` from the given configuration.
66    pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
67        Ok(config.as_typed()?)
68    }
69}
70
71#[async_trait]
72impl IncrementalEncoderBuilder for DatadogLogsConfiguration {
73    type Output = DatadogLogs;
74
75    fn input_event_type(&self) -> EventType {
76        EventType::Log
77    }
78
79    fn output_payload_type(&self) -> PayloadType {
80        PayloadType::Http
81    }
82
83    async fn build(&self, context: ComponentContext) -> Result<Self::Output, GenericError> {
84        let metrics_builder = MetricsBuilder::from_component_context(&context);
85        let telemetry = ComponentTelemetry::from_builder(&metrics_builder);
86        let compression_scheme = CompressionScheme::new(&self.compressor_kind, self.zstd_compressor_level);
87
88        let mut request_builder =
89            RequestBuilder::new(LogsEndpointEncoder::new(), compression_scheme, RB_BUFFER_CHUNK_SIZE).await?;
90        request_builder.with_max_inputs_per_payload(MAX_LOGS_PER_PAYLOAD);
91
92        Ok(DatadogLogs {
93            request_builder,
94            telemetry,
95        })
96    }
97}
98
99impl MemoryBounds for DatadogLogsConfiguration {
100    fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
101        // TODO: How do we properly represent the requests we can generate that may be sitting around in-flight?
102
103        builder.minimum().with_single_value::<DatadogLogs>("component struct");
104        builder.firm().with_array::<Log>("logs buffer", MAX_LOGS_PER_PAYLOAD);
105    }
106}
107
108pub struct DatadogLogs {
109    request_builder: RequestBuilder<LogsEndpointEncoder>,
110    telemetry: ComponentTelemetry,
111}
112
113#[async_trait]
114impl IncrementalEncoder for DatadogLogs {
115    async fn process_event(&mut self, event: Event) -> Result<ProcessResult, GenericError> {
116        let log: Log = match event {
117            Event::Log(log) => log,
118            _ => return Ok(ProcessResult::Continue),
119        };
120        match self.request_builder.encode(log).await {
121            Ok(None) => Ok(ProcessResult::Continue),
122            Ok(Some(log)) => Ok(ProcessResult::FlushRequired(Event::Log(log))),
123            Err(e) => {
124                if e.is_recoverable() {
125                    warn!(error = %e, "Failed to encode Datadog log due to recoverable error. Continuing...");
126
127                    // TODO: Get the actual number of events dropped from the error itself.
128                    self.telemetry.events_dropped_encoder().increment(1);
129                    Ok(ProcessResult::Continue)
130                } else {
131                    Err(e).error_context("Failed to encode Datadog log due to unrecoverable error.")
132                }
133            }
134        }
135    }
136
137    async fn flush(&mut self, dispatcher: &PayloadsDispatcher) -> Result<(), GenericError> {
138        let maybe_requests = self.request_builder.flush().await;
139        for maybe_request in maybe_requests {
140            match maybe_request {
141                Ok((events, request)) => {
142                    let payload_meta = PayloadMetadata::from_event_count(events);
143                    let http_payload = HttpPayload::new(payload_meta, request);
144                    let payload = Payload::Http(http_payload);
145                    dispatcher.dispatch(payload).await?;
146                }
147                Err(e) => error!(error = %e, "Failed to build Datadog logs payload. Continuing..."),
148            }
149        }
150
151        Ok(())
152    }
153}
154
155#[derive(Debug)]
156struct LogsEndpointEncoder {
157    tags_deduplicator: ReusableDeduplicator<Tag>,
158}
159
160impl LogsEndpointEncoder {
161    fn new() -> Self {
162        Self {
163            tags_deduplicator: ReusableDeduplicator::new(),
164        }
165    }
166
167    fn build_agent_json(&mut self, log: &Log) -> JsonValue {
168        let mut obj = JsonMap::new();
169
170        // Encode a structured message object as a JSON string in the `message` field.
171        let mut message_inner = JsonMap::new();
172        message_inner.insert("message".to_string(), JsonValue::String(log.message().to_string()));
173        if !log.service().is_empty() {
174            message_inner.insert("service".to_string(), JsonValue::String(log.service().to_string()));
175        }
176        let message_str =
177            serde_json::to_string(&JsonValue::Object(message_inner)).unwrap_or_else(|_| log.message().to_string());
178        obj.insert("message".to_string(), JsonValue::String(message_str));
179
180        if let Some(status) = log.status() {
181            obj.insert("status".to_string(), JsonValue::String(status.as_str().to_string()));
182        }
183        if !log.hostname().is_empty() {
184            obj.insert("hostname".to_string(), JsonValue::String(log.hostname().to_string()));
185        }
186        if !log.service().is_empty() {
187            obj.insert("service".to_string(), JsonValue::String(log.service().to_string()));
188        }
189
190        if let Some(ddsource) = log.source().clone() {
191            obj.insert("ddsource".to_string(), JsonValue::String(ddsource.to_string()));
192        }
193
194        // ddtags: comma-separated, deduplicated
195        let tags_iter = self.tags_deduplicator.deduplicated(log.tags().into_iter());
196        let tags_vec: Vec<&str> = tags_iter.map(|t| t.as_str()).collect();
197        if !tags_vec.is_empty() {
198            obj.insert("ddtags".to_string(), JsonValue::String(tags_vec.join(",")));
199        }
200
201        // Default timestamp (RFC3339 with milliseconds, Z) unless user provided `timestamp` or `@timestamp`.
202        let user_provided_timestamp = log.additional_properties().contains_key("timestamp")
203            || log.additional_properties().contains_key("@timestamp");
204        if !user_provided_timestamp {
205            let now_rfc3339 = Utc::now().to_rfc3339_opts(SecondsFormat::Millis, true);
206            obj.insert("@timestamp".to_string(), JsonValue::String(now_rfc3339));
207        }
208
209        // Last-write-wins: merge AdditionalProperties last
210        for (k, v) in log.additional_properties() {
211            obj.insert(k.to_string(), v.clone());
212        }
213
214        JsonValue::Object(obj)
215    }
216}
217
218impl EndpointEncoder for LogsEndpointEncoder {
219    type Input = Log;
220    type EncodeError = serde_json::Error;
221
222    fn encoder_name() -> &'static str {
223        "logs"
224    }
225
226    fn compressed_size_limit(&self) -> usize {
227        DEFAULT_INTAKE_COMPRESSED_SIZE_LIMIT
228    }
229
230    fn uncompressed_size_limit(&self) -> usize {
231        DEFAULT_INTAKE_UNCOMPRESSED_SIZE_LIMIT
232    }
233
234    fn get_payload_prefix(&self) -> Option<&'static [u8]> {
235        Some(b"[")
236    }
237
238    fn get_payload_suffix(&self) -> Option<&'static [u8]> {
239        Some(b"]")
240    }
241
242    fn get_input_separator(&self) -> Option<&'static [u8]> {
243        Some(b",")
244    }
245
246    fn encode(&mut self, input: &Self::Input, buffer: &mut Vec<u8>) -> Result<(), Self::EncodeError> {
247        let json = self.build_agent_json(input);
248        serde_json::to_writer(buffer, &json)
249    }
250
251    fn endpoint_uri(&self) -> Uri {
252        PathAndQuery::from_static("/api/v2/logs").into()
253    }
254
255    fn endpoint_method(&self) -> Method {
256        Method::POST
257    }
258
259    fn content_type(&self) -> HeaderValue {
260        CONTENT_TYPE_JSON.clone()
261    }
262}
263
264#[cfg(test)]
265mod config_smoke {
266    use serde_json::json;
267
268    use super::DatadogLogsConfiguration;
269    use crate::config_registry::structs;
270    use crate::config_registry::test_support::run_config_smoke_tests;
271
272    #[tokio::test]
273    async fn smoke_test() {
274        run_config_smoke_tests(structs::DATADOG_LOGS_CONFIGURATION, &[], json!({}), |cfg| {
275            cfg.as_typed::<DatadogLogsConfiguration>()
276                .expect("DatadogLogsConfiguration should deserialize")
277        })
278        .await
279    }
280}