Skip to main content

saluki_components/encoders/datadog/service_checks/
mod.rs

1use async_trait::async_trait;
2use facet::Facet;
3use http::{uri::PathAndQuery, HeaderValue, Method, Uri};
4use resource_accounting::{MemoryBounds, MemoryBoundsBuilder};
5use saluki_config::GenericConfiguration;
6use saluki_core::{
7    components::{encoders::*, ComponentContext},
8    data_model::{
9        event::{service_check::ServiceCheck, Event, EventType},
10        payload::{HttpPayload, Payload, PayloadMetadata, PayloadType},
11    },
12    observability::ComponentMetricsExt as _,
13    topology::PayloadsDispatcher,
14};
15use saluki_error::{ErrorContext as _, GenericError};
16use saluki_io::compression::CompressionScheme;
17use saluki_metrics::MetricsBuilder;
18use serde::Deserialize;
19use tracing::{debug, error, warn};
20
21use crate::common::datadog::{
22    clamp_payload_limits,
23    io::RB_BUFFER_CHUNK_SIZE,
24    request_builder::{EndpointEncoder, RequestBuilder},
25    telemetry::ComponentTelemetry,
26    DEFAULT_SERIALIZER_COMPRESSED_SIZE_LIMIT, DEFAULT_SERIALIZER_UNCOMPRESSED_SIZE_LIMIT,
27};
28
29const DEFAULT_SERIALIZER_COMPRESSOR_KIND: &str = "zstd";
30const MAX_SERVICE_CHECKS_PER_PAYLOAD: usize = 100;
31
32static CONTENT_TYPE_JSON: HeaderValue = HeaderValue::from_static("application/json");
33
34fn default_serializer_compressor_kind() -> String {
35    DEFAULT_SERIALIZER_COMPRESSOR_KIND.to_owned()
36}
37
38const fn default_zstd_compressor_level() -> i32 {
39    3
40}
41
42const fn default_max_payload_size() -> usize {
43    DEFAULT_SERIALIZER_COMPRESSED_SIZE_LIMIT
44}
45
46const fn default_max_uncompressed_payload_size() -> usize {
47    DEFAULT_SERIALIZER_UNCOMPRESSED_SIZE_LIMIT
48}
49
50const fn default_log_payloads() -> bool {
51    false
52}
53
54/// Datadog Service Checks incremental encoder.
55///
56/// Generates Datadog Service Checks payloads for the Datadog platform.
57#[derive(Deserialize, Facet)]
58#[cfg_attr(test, derive(Debug, PartialEq, serde::Serialize))]
59pub struct DatadogServiceChecksConfiguration {
60    /// Maximum compressed size, in bytes, of a service check payload.
61    ///
62    /// This matches the Datadog Agent's generic payload limit for service checks. The effective value is
63    /// clamped to the Agent's default intake-safe limit of 2,621,440 bytes, so larger configured values do not allow
64    /// payloads that intake may reject. If set to `0`, every non-empty compressed payload exceeds the limit and is
65    /// dropped during flush.
66    ///
67    /// Defaults to 2,621,440 bytes.
68    #[serde(rename = "serializer_max_payload_size", default = "default_max_payload_size")]
69    max_payload_size: usize,
70
71    /// Maximum uncompressed size, in bytes, of a service check payload.
72    ///
73    /// This matches the Datadog Agent's generic payload limit for service checks. The effective value is
74    /// clamped to the Agent's default intake-safe limit of 4,194,304 bytes, so larger configured values do not allow
75    /// payloads that intake may reject. Values smaller than the minimum endpoint framing size prevent the request
76    /// builder from starting.
77    ///
78    /// Defaults to 4,194,304 bytes.
79    #[serde(
80        rename = "serializer_max_uncompressed_payload_size",
81        default = "default_max_uncompressed_payload_size"
82    )]
83    max_uncompressed_payload_size: usize,
84
85    /// Compression kind to use for the request payloads.
86    ///
87    /// Defaults to `zstd`.
88    #[serde(
89        rename = "serializer_compressor_kind",
90        default = "default_serializer_compressor_kind"
91    )]
92    compressor_kind: String,
93
94    /// Compressor level to use when the compressor kind is `zstd`.
95    ///
96    /// Defaults to 3.
97    #[serde(
98        rename = "serializer_zstd_compressor_level",
99        default = "default_zstd_compressor_level"
100    )]
101    zstd_compressor_level: i32,
102
103    /// Whether to log service check payload contents before encoding.
104    ///
105    /// This logs decoded service check objects, not the encoded HTTP body.
106    ///
107    /// Defaults to `false`.
108    #[serde(default = "default_log_payloads")]
109    log_payloads: bool,
110}
111
112impl DatadogServiceChecksConfiguration {
113    /// Creates a new `DatadogServiceChecksConfiguration` from the given configuration.
114    pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
115        Ok(config.as_typed()?)
116    }
117}
118
119#[async_trait]
120impl IncrementalEncoderBuilder for DatadogServiceChecksConfiguration {
121    type Output = DatadogServiceChecks;
122
123    fn input_event_type(&self) -> EventType {
124        EventType::ServiceCheck
125    }
126
127    fn output_payload_type(&self) -> PayloadType {
128        PayloadType::Http
129    }
130
131    async fn build(&self, context: ComponentContext) -> Result<Self::Output, GenericError> {
132        let metrics_builder = MetricsBuilder::from_component_context(&context);
133        let telemetry = ComponentTelemetry::from_builder(&metrics_builder);
134        let compression_scheme = CompressionScheme::new(&self.compressor_kind, self.zstd_compressor_level);
135
136        // Create our request builder.
137        let mut request_builder =
138            RequestBuilder::new(ServiceChecksEndpointEncoder, compression_scheme, RB_BUFFER_CHUNK_SIZE).await?;
139        let (uncompressed_limit, compressed_limit) = clamp_payload_limits(
140            self.max_uncompressed_payload_size,
141            self.max_payload_size,
142            DEFAULT_SERIALIZER_UNCOMPRESSED_SIZE_LIMIT,
143            DEFAULT_SERIALIZER_COMPRESSED_SIZE_LIMIT,
144        );
145        request_builder.with_len_limits(uncompressed_limit, compressed_limit)?;
146        request_builder.with_max_inputs_per_payload(MAX_SERVICE_CHECKS_PER_PAYLOAD);
147
148        Ok(DatadogServiceChecks {
149            request_builder,
150            telemetry,
151            log_payloads: self.log_payloads,
152        })
153    }
154}
155
156impl MemoryBounds for DatadogServiceChecksConfiguration {
157    fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
158        // TODO: How do we properly represent the requests we can generate that may be sitting around in-flight?
159        //
160        // Theoretically, we'll end up being limited by the size of the downstream forwarder's interconnect, and however
161        // many payloads it will buffer internally... so realistically the firm limit boils down to the forwarder itself
162        // but we'll have a hard time in the forwarder knowing the maximum size of any given payload being sent in, which
163        // then makes it hard to calculate a proper firm bound even though we know the rest of the values required to
164        // calculate the firm bound.
165        builder
166            .minimum()
167            .with_single_value::<DatadogServiceChecks>("component struct");
168
169        builder
170            .firm()
171            // Capture the size of the "split re-encode" buffer in the request builder, which is where we keep owned
172            // versions of events that we encode in case we need to actually re-encode them during a split operation.
173            .with_array::<ServiceCheck>("service checks split re-encode buffer", MAX_SERVICE_CHECKS_PER_PAYLOAD);
174    }
175}
176
177pub struct DatadogServiceChecks {
178    request_builder: RequestBuilder<ServiceChecksEndpointEncoder>,
179    telemetry: ComponentTelemetry,
180    log_payloads: bool,
181}
182
183#[async_trait]
184impl IncrementalEncoder for DatadogServiceChecks {
185    async fn process_event(&mut self, event: Event) -> Result<ProcessResult, GenericError> {
186        let service_check = match event.try_into_service_check() {
187            Some(eventd) => eventd,
188            None => return Ok(ProcessResult::Continue),
189        };
190
191        if self.log_payloads {
192            debug!(?service_check, "Flushing service check.");
193        }
194
195        match self.request_builder.encode(service_check).await {
196            Ok(None) => Ok(ProcessResult::Continue),
197            Ok(Some(service_check)) => Ok(ProcessResult::FlushRequired(Event::ServiceCheck(service_check))),
198            Err(e) => {
199                if e.is_recoverable() {
200                    warn!(error = %e, "Failed to encode Datadog service check due to recoverable error. Continuing...");
201
202                    // TODO: Get the actual number of events dropped from the error itself.
203                    self.telemetry.events_dropped_encoder().increment(1);
204
205                    Ok(ProcessResult::Continue)
206                } else {
207                    Err(e).error_context("Failed to encode Datadog service check due to unrecoverable error.")
208                }
209            }
210        }
211    }
212
213    async fn flush(&mut self, dispatcher: &PayloadsDispatcher) -> Result<(), GenericError> {
214        let maybe_requests = self.request_builder.flush().await;
215        for maybe_request in maybe_requests {
216            match maybe_request {
217                Ok((events, _data_points, request)) => {
218                    let payload_meta = PayloadMetadata::from_event_count(events);
219                    let http_payload = HttpPayload::new(payload_meta, request);
220                    let payload = Payload::Http(http_payload);
221
222                    dispatcher.dispatch(payload).await?;
223                }
224                Err(e) => error!(error = %e, "Failed to build Datadog service checks payload. Continuing..."),
225            }
226        }
227
228        Ok(())
229    }
230}
231
232#[derive(Debug)]
233struct ServiceChecksEndpointEncoder;
234
235impl EndpointEncoder for ServiceChecksEndpointEncoder {
236    type Input = ServiceCheck;
237    type EncodeError = serde_json::Error;
238
239    fn encoder_name() -> &'static str {
240        "service_check"
241    }
242
243    fn compressed_size_limit(&self) -> usize {
244        DEFAULT_SERIALIZER_COMPRESSED_SIZE_LIMIT
245    }
246
247    fn uncompressed_size_limit(&self) -> usize {
248        DEFAULT_SERIALIZER_UNCOMPRESSED_SIZE_LIMIT
249    }
250
251    fn encode(&mut self, input: &Self::Input, buffer: &mut Vec<u8>) -> Result<(), Self::EncodeError> {
252        serde_json::to_writer(buffer, input)
253    }
254
255    fn get_payload_prefix(&self) -> Option<&'static [u8]> {
256        Some(b"[")
257    }
258
259    fn get_payload_suffix(&self) -> Option<&'static [u8]> {
260        Some(b"]")
261    }
262
263    fn get_input_separator(&self) -> Option<&'static [u8]> {
264        Some(b",")
265    }
266
267    fn endpoint_uri(&self) -> Uri {
268        PathAndQuery::from_static("/api/v1/check_run").into()
269    }
270
271    fn endpoint_method(&self) -> Method {
272        Method::POST
273    }
274
275    fn content_type(&self) -> HeaderValue {
276        CONTENT_TYPE_JSON.clone()
277    }
278}
279
280#[cfg(test)]
281mod config_smoke {
282    use serde_json::json;
283
284    use super::DatadogServiceChecksConfiguration;
285    use crate::config_registry::structs;
286    use crate::config_registry::test_support::run_config_smoke_tests;
287
288    #[tokio::test]
289    async fn smoke_test() {
290        run_config_smoke_tests(structs::DATADOG_SERVICE_CHECKS_CONFIGURATION, &[], json!({}), |cfg| {
291            cfg.as_typed::<DatadogServiceChecksConfiguration>()
292                .expect("DatadogServiceChecksConfiguration should deserialize")
293        })
294        .await
295    }
296}