saluki_components/encoders/datadog/logs/
mod.rs

1use async_trait::async_trait;
2use chrono::{SecondsFormat, Utc};
3use http::{uri::PathAndQuery, HeaderValue, Method, Uri};
4use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
5use saluki_common::iter::ReusableDeduplicator;
6use saluki_config::GenericConfiguration;
7use saluki_context::tags::Tag;
8use saluki_core::{
9    components::{encoders::*, ComponentContext},
10    data_model::{
11        event::{log::Log, Event, EventType},
12        payload::{HttpPayload, Payload, PayloadMetadata, PayloadType},
13    },
14    observability::ComponentMetricsExt as _,
15    topology::PayloadsDispatcher,
16};
17use saluki_error::{ErrorContext as _, GenericError};
18use saluki_io::compression::CompressionScheme;
19use saluki_metrics::MetricsBuilder;
20use serde::Deserialize;
21use serde_json::{Map as JsonMap, Value as JsonValue};
22use tracing::{error, warn};
23
24use crate::common::datadog::{
25    io::RB_BUFFER_CHUNK_SIZE,
26    request_builder::{EndpointEncoder, RequestBuilder},
27    telemetry::ComponentTelemetry,
28    DEFAULT_INTAKE_COMPRESSED_SIZE_LIMIT, DEFAULT_INTAKE_UNCOMPRESSED_SIZE_LIMIT,
29};
30
31const DEFAULT_SERIALIZER_COMPRESSOR_KIND: &str = "zstd";
32const MAX_LOGS_PER_PAYLOAD: usize = 1000;
33
34static CONTENT_TYPE_JSON: HeaderValue = HeaderValue::from_static("application/json");
35
36fn default_serializer_compressor_kind() -> String {
37    DEFAULT_SERIALIZER_COMPRESSOR_KIND.to_owned()
38}
39
40const fn default_zstd_compressor_level() -> i32 {
41    3
42}
43
44/// Datadog Logs incremental encoder.
45#[derive(Deserialize, Debug)]
46pub struct DatadogLogsConfiguration {
47    /// Compression kind for Logs payloads. Defaults to `zstd`.
48    #[serde(
49        rename = "serializer_compressor_kind",
50        default = "default_serializer_compressor_kind"
51    )]
52    compressor_kind: String,
53
54    /// Compressor level to use when the compressor kind is `zstd`. Defaults to 3.
55    #[serde(
56        rename = "serializer_zstd_compressor_level",
57        default = "default_zstd_compressor_level"
58    )]
59    zstd_compressor_level: i32,
60}
61
62impl DatadogLogsConfiguration {
63    /// Creates a new `DatadogLogsConfiguration` from the given configuration.
64    pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
65        Ok(config.as_typed()?)
66    }
67}
68
69#[async_trait]
70impl IncrementalEncoderBuilder for DatadogLogsConfiguration {
71    type Output = DatadogLogs;
72
73    fn input_event_type(&self) -> EventType {
74        EventType::Log
75    }
76
77    fn output_payload_type(&self) -> PayloadType {
78        PayloadType::Http
79    }
80
81    async fn build(&self, context: ComponentContext) -> Result<Self::Output, GenericError> {
82        let metrics_builder = MetricsBuilder::from_component_context(&context);
83        let telemetry = ComponentTelemetry::from_builder(&metrics_builder);
84        let compression_scheme = CompressionScheme::new(&self.compressor_kind, self.zstd_compressor_level);
85
86        let mut request_builder =
87            RequestBuilder::new(LogsEndpointEncoder::new(), compression_scheme, RB_BUFFER_CHUNK_SIZE).await?;
88        request_builder.with_max_inputs_per_payload(MAX_LOGS_PER_PAYLOAD);
89
90        Ok(DatadogLogs {
91            request_builder,
92            telemetry,
93        })
94    }
95}
96
97impl MemoryBounds for DatadogLogsConfiguration {
98    fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
99        // TODO: How do we properly represent the requests we can generate that may be sitting around in-flight?
100
101        builder.minimum().with_single_value::<DatadogLogs>("component struct");
102        builder.firm().with_array::<Log>("logs buffer", MAX_LOGS_PER_PAYLOAD);
103    }
104}
105
106pub struct DatadogLogs {
107    request_builder: RequestBuilder<LogsEndpointEncoder>,
108    telemetry: ComponentTelemetry,
109}
110
111#[async_trait]
112impl IncrementalEncoder for DatadogLogs {
113    async fn process_event(&mut self, event: Event) -> Result<ProcessResult, GenericError> {
114        let log: Log = match event {
115            Event::Log(log) => log,
116            _ => return Ok(ProcessResult::Continue),
117        };
118        match self.request_builder.encode(log).await {
119            Ok(None) => Ok(ProcessResult::Continue),
120            Ok(Some(log)) => Ok(ProcessResult::FlushRequired(Event::Log(log))),
121            Err(e) => {
122                if e.is_recoverable() {
123                    warn!(error = %e, "Failed to encode Datadog log due to recoverable error. Continuing...");
124
125                    // TODO: Get the actual number of events dropped from the error itself.
126                    self.telemetry.events_dropped_encoder().increment(1);
127                    Ok(ProcessResult::Continue)
128                } else {
129                    Err(e).error_context("Failed to encode Datadog log due to unrecoverable error.")
130                }
131            }
132        }
133    }
134
135    async fn flush(&mut self, dispatcher: &PayloadsDispatcher) -> Result<(), GenericError> {
136        let maybe_requests = self.request_builder.flush().await;
137        for maybe_request in maybe_requests {
138            match maybe_request {
139                Ok((events, request)) => {
140                    let payload_meta = PayloadMetadata::from_event_count(events);
141                    let http_payload = HttpPayload::new(payload_meta, request);
142                    let payload = Payload::Http(http_payload);
143                    dispatcher.dispatch(payload).await?;
144                }
145                Err(e) => error!(error = %e, "Failed to build Datadog logs payload. Continuing..."),
146            }
147        }
148
149        Ok(())
150    }
151}
152
153#[derive(Debug)]
154struct LogsEndpointEncoder {
155    tags_deduplicator: ReusableDeduplicator<Tag>,
156}
157
158impl LogsEndpointEncoder {
159    fn new() -> Self {
160        Self {
161            tags_deduplicator: ReusableDeduplicator::new(),
162        }
163    }
164
165    fn build_agent_json(&mut self, log: &Log) -> JsonValue {
166        let mut obj = JsonMap::new();
167
168        // Encode a structured message object as a JSON string in the `message` field.
169        let mut message_inner = JsonMap::new();
170        message_inner.insert("message".to_string(), JsonValue::String(log.message().to_string()));
171        if !log.service().is_empty() {
172            message_inner.insert("service".to_string(), JsonValue::String(log.service().to_string()));
173        }
174        let message_str =
175            serde_json::to_string(&JsonValue::Object(message_inner)).unwrap_or_else(|_| log.message().to_string());
176        obj.insert("message".to_string(), JsonValue::String(message_str));
177
178        if let Some(status) = log.status() {
179            obj.insert("status".to_string(), JsonValue::String(status.as_str().to_string()));
180        }
181        if !log.hostname().is_empty() {
182            obj.insert("hostname".to_string(), JsonValue::String(log.hostname().to_string()));
183        }
184        if !log.service().is_empty() {
185            obj.insert("service".to_string(), JsonValue::String(log.service().to_string()));
186        }
187
188        if let Some(ddsource) = log.source().clone() {
189            obj.insert("ddsource".to_string(), JsonValue::String(ddsource.to_string()));
190        }
191
192        // ddtags: comma-separated, deduplicated
193        let tags_iter = self.tags_deduplicator.deduplicated(log.tags().into_iter());
194        let tags_vec: Vec<&str> = tags_iter.map(|t| t.as_str()).collect();
195        if !tags_vec.is_empty() {
196            obj.insert("ddtags".to_string(), JsonValue::String(tags_vec.join(",")));
197        }
198
199        // Default timestamp (RFC3339 with milliseconds, Z) unless user provided `timestamp` or `@timestamp`.
200        let user_provided_timestamp = log.additional_properties().contains_key("timestamp")
201            || log.additional_properties().contains_key("@timestamp");
202        if !user_provided_timestamp {
203            let now_rfc3339 = Utc::now().to_rfc3339_opts(SecondsFormat::Millis, true);
204            obj.insert("@timestamp".to_string(), JsonValue::String(now_rfc3339));
205        }
206
207        // Last-write-wins: merge AdditionalProperties last
208        for (k, v) in log.additional_properties() {
209            obj.insert(k.to_string(), v.clone());
210        }
211
212        JsonValue::Object(obj)
213    }
214}
215
216impl EndpointEncoder for LogsEndpointEncoder {
217    type Input = Log;
218    type EncodeError = serde_json::Error;
219
220    fn encoder_name() -> &'static str {
221        "logs"
222    }
223
224    fn compressed_size_limit(&self) -> usize {
225        DEFAULT_INTAKE_COMPRESSED_SIZE_LIMIT
226    }
227
228    fn uncompressed_size_limit(&self) -> usize {
229        DEFAULT_INTAKE_UNCOMPRESSED_SIZE_LIMIT
230    }
231
232    fn get_payload_prefix(&self) -> Option<&'static [u8]> {
233        Some(b"[")
234    }
235
236    fn get_payload_suffix(&self) -> Option<&'static [u8]> {
237        Some(b"]")
238    }
239
240    fn get_input_separator(&self) -> Option<&'static [u8]> {
241        Some(b",")
242    }
243
244    fn encode(&mut self, input: &Self::Input, buffer: &mut Vec<u8>) -> Result<(), Self::EncodeError> {
245        let json = self.build_agent_json(input);
246        serde_json::to_writer(buffer, &json)
247    }
248
249    fn endpoint_uri(&self) -> Uri {
250        PathAndQuery::from_static("/api/v2/logs").into()
251    }
252
253    fn endpoint_method(&self) -> Method {
254        Method::POST
255    }
256
257    fn content_type(&self) -> HeaderValue {
258        CONTENT_TYPE_JSON.clone()
259    }
260}