saluki_components/encoders/datadog/logs/
mod.rs1use async_trait::async_trait;
2use chrono::{SecondsFormat, Utc};
3use http::{uri::PathAndQuery, HeaderValue, Method, Uri};
4use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
5use saluki_common::iter::ReusableDeduplicator;
6use saluki_config::GenericConfiguration;
7use saluki_context::tags::Tag;
8use saluki_core::{
9 components::{encoders::*, ComponentContext},
10 data_model::{
11 event::{log::Log, Event, EventType},
12 payload::{HttpPayload, Payload, PayloadMetadata, PayloadType},
13 },
14 observability::ComponentMetricsExt as _,
15 topology::PayloadsDispatcher,
16};
17use saluki_error::{ErrorContext as _, GenericError};
18use saluki_io::compression::CompressionScheme;
19use saluki_metrics::MetricsBuilder;
20use serde::Deserialize;
21use serde_json::{Map as JsonMap, Value as JsonValue};
22use tracing::{error, warn};
23
24use crate::common::datadog::{
25 io::RB_BUFFER_CHUNK_SIZE,
26 request_builder::{EndpointEncoder, RequestBuilder},
27 telemetry::ComponentTelemetry,
28 DEFAULT_INTAKE_COMPRESSED_SIZE_LIMIT, DEFAULT_INTAKE_UNCOMPRESSED_SIZE_LIMIT,
29};
30
31const DEFAULT_SERIALIZER_COMPRESSOR_KIND: &str = "zstd";
32const MAX_LOGS_PER_PAYLOAD: usize = 1000;
33
34static CONTENT_TYPE_JSON: HeaderValue = HeaderValue::from_static("application/json");
35
36fn default_serializer_compressor_kind() -> String {
37 DEFAULT_SERIALIZER_COMPRESSOR_KIND.to_owned()
38}
39
40const fn default_zstd_compressor_level() -> i32 {
41 3
42}
43
44#[derive(Deserialize, Debug)]
46pub struct DatadogLogsConfiguration {
47 #[serde(
49 rename = "serializer_compressor_kind",
50 default = "default_serializer_compressor_kind"
51 )]
52 compressor_kind: String,
53
54 #[serde(
56 rename = "serializer_zstd_compressor_level",
57 default = "default_zstd_compressor_level"
58 )]
59 zstd_compressor_level: i32,
60}
61
62impl DatadogLogsConfiguration {
63 pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
65 Ok(config.as_typed()?)
66 }
67}
68
69#[async_trait]
70impl IncrementalEncoderBuilder for DatadogLogsConfiguration {
71 type Output = DatadogLogs;
72
73 fn input_event_type(&self) -> EventType {
74 EventType::Log
75 }
76
77 fn output_payload_type(&self) -> PayloadType {
78 PayloadType::Http
79 }
80
81 async fn build(&self, context: ComponentContext) -> Result<Self::Output, GenericError> {
82 let metrics_builder = MetricsBuilder::from_component_context(&context);
83 let telemetry = ComponentTelemetry::from_builder(&metrics_builder);
84 let compression_scheme = CompressionScheme::new(&self.compressor_kind, self.zstd_compressor_level);
85
86 let mut request_builder =
87 RequestBuilder::new(LogsEndpointEncoder::new(), compression_scheme, RB_BUFFER_CHUNK_SIZE).await?;
88 request_builder.with_max_inputs_per_payload(MAX_LOGS_PER_PAYLOAD);
89
90 Ok(DatadogLogs {
91 request_builder,
92 telemetry,
93 })
94 }
95}
96
97impl MemoryBounds for DatadogLogsConfiguration {
98 fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
99 builder.minimum().with_single_value::<DatadogLogs>("component struct");
102 builder.firm().with_array::<Log>("logs buffer", MAX_LOGS_PER_PAYLOAD);
103 }
104}
105
106pub struct DatadogLogs {
107 request_builder: RequestBuilder<LogsEndpointEncoder>,
108 telemetry: ComponentTelemetry,
109}
110
111#[async_trait]
112impl IncrementalEncoder for DatadogLogs {
113 async fn process_event(&mut self, event: Event) -> Result<ProcessResult, GenericError> {
114 let log: Log = match event {
115 Event::Log(log) => log,
116 _ => return Ok(ProcessResult::Continue),
117 };
118 match self.request_builder.encode(log).await {
119 Ok(None) => Ok(ProcessResult::Continue),
120 Ok(Some(log)) => Ok(ProcessResult::FlushRequired(Event::Log(log))),
121 Err(e) => {
122 if e.is_recoverable() {
123 warn!(error = %e, "Failed to encode Datadog log due to recoverable error. Continuing...");
124
125 self.telemetry.events_dropped_encoder().increment(1);
127 Ok(ProcessResult::Continue)
128 } else {
129 Err(e).error_context("Failed to encode Datadog log due to unrecoverable error.")
130 }
131 }
132 }
133 }
134
135 async fn flush(&mut self, dispatcher: &PayloadsDispatcher) -> Result<(), GenericError> {
136 let maybe_requests = self.request_builder.flush().await;
137 for maybe_request in maybe_requests {
138 match maybe_request {
139 Ok((events, request)) => {
140 let payload_meta = PayloadMetadata::from_event_count(events);
141 let http_payload = HttpPayload::new(payload_meta, request);
142 let payload = Payload::Http(http_payload);
143 dispatcher.dispatch(payload).await?;
144 }
145 Err(e) => error!(error = %e, "Failed to build Datadog logs payload. Continuing..."),
146 }
147 }
148
149 Ok(())
150 }
151}
152
153#[derive(Debug)]
154struct LogsEndpointEncoder {
155 tags_deduplicator: ReusableDeduplicator<Tag>,
156}
157
158impl LogsEndpointEncoder {
159 fn new() -> Self {
160 Self {
161 tags_deduplicator: ReusableDeduplicator::new(),
162 }
163 }
164
165 fn build_agent_json(&mut self, log: &Log) -> JsonValue {
166 let mut obj = JsonMap::new();
167
168 let mut message_inner = JsonMap::new();
170 message_inner.insert("message".to_string(), JsonValue::String(log.message().to_string()));
171 if !log.service().is_empty() {
172 message_inner.insert("service".to_string(), JsonValue::String(log.service().to_string()));
173 }
174 let message_str =
175 serde_json::to_string(&JsonValue::Object(message_inner)).unwrap_or_else(|_| log.message().to_string());
176 obj.insert("message".to_string(), JsonValue::String(message_str));
177
178 if let Some(status) = log.status() {
179 obj.insert("status".to_string(), JsonValue::String(status.as_str().to_string()));
180 }
181 if !log.hostname().is_empty() {
182 obj.insert("hostname".to_string(), JsonValue::String(log.hostname().to_string()));
183 }
184 if !log.service().is_empty() {
185 obj.insert("service".to_string(), JsonValue::String(log.service().to_string()));
186 }
187
188 if let Some(ddsource) = log.source().clone() {
189 obj.insert("ddsource".to_string(), JsonValue::String(ddsource.to_string()));
190 }
191
192 let tags_iter = self.tags_deduplicator.deduplicated(log.tags().into_iter());
194 let tags_vec: Vec<&str> = tags_iter.map(|t| t.as_str()).collect();
195 if !tags_vec.is_empty() {
196 obj.insert("ddtags".to_string(), JsonValue::String(tags_vec.join(",")));
197 }
198
199 let user_provided_timestamp = log.additional_properties().contains_key("timestamp")
201 || log.additional_properties().contains_key("@timestamp");
202 if !user_provided_timestamp {
203 let now_rfc3339 = Utc::now().to_rfc3339_opts(SecondsFormat::Millis, true);
204 obj.insert("@timestamp".to_string(), JsonValue::String(now_rfc3339));
205 }
206
207 for (k, v) in log.additional_properties() {
209 obj.insert(k.to_string(), v.clone());
210 }
211
212 JsonValue::Object(obj)
213 }
214}
215
216impl EndpointEncoder for LogsEndpointEncoder {
217 type Input = Log;
218 type EncodeError = serde_json::Error;
219
220 fn encoder_name() -> &'static str {
221 "logs"
222 }
223
224 fn compressed_size_limit(&self) -> usize {
225 DEFAULT_INTAKE_COMPRESSED_SIZE_LIMIT
226 }
227
228 fn uncompressed_size_limit(&self) -> usize {
229 DEFAULT_INTAKE_UNCOMPRESSED_SIZE_LIMIT
230 }
231
232 fn get_payload_prefix(&self) -> Option<&'static [u8]> {
233 Some(b"[")
234 }
235
236 fn get_payload_suffix(&self) -> Option<&'static [u8]> {
237 Some(b"]")
238 }
239
240 fn get_input_separator(&self) -> Option<&'static [u8]> {
241 Some(b",")
242 }
243
244 fn encode(&mut self, input: &Self::Input, buffer: &mut Vec<u8>) -> Result<(), Self::EncodeError> {
245 let json = self.build_agent_json(input);
246 serde_json::to_writer(buffer, &json)
247 }
248
249 fn endpoint_uri(&self) -> Uri {
250 PathAndQuery::from_static("/api/v2/logs").into()
251 }
252
253 fn endpoint_method(&self) -> Method {
254 Method::POST
255 }
256
257 fn content_type(&self) -> HeaderValue {
258 CONTENT_TYPE_JSON.clone()
259 }
260}