Skip to main content

saluki_components/encoders/datadog/service_checks/
mod.rs

1use async_trait::async_trait;
2use facet::Facet;
3use http::{uri::PathAndQuery, HeaderValue, Method, Uri};
4use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
5use saluki_config::GenericConfiguration;
6use saluki_core::{
7    components::{encoders::*, ComponentContext},
8    data_model::{
9        event::{service_check::ServiceCheck, Event, EventType},
10        payload::{HttpPayload, Payload, PayloadMetadata, PayloadType},
11    },
12    observability::ComponentMetricsExt as _,
13    topology::PayloadsDispatcher,
14};
15use saluki_error::{ErrorContext as _, GenericError};
16use saluki_io::compression::CompressionScheme;
17use saluki_metrics::MetricsBuilder;
18use serde::Deserialize;
19use tracing::{error, warn};
20
21use crate::common::datadog::{
22    io::RB_BUFFER_CHUNK_SIZE,
23    request_builder::{EndpointEncoder, RequestBuilder},
24    telemetry::ComponentTelemetry,
25    DEFAULT_INTAKE_COMPRESSED_SIZE_LIMIT, DEFAULT_INTAKE_UNCOMPRESSED_SIZE_LIMIT,
26};
27
28const DEFAULT_SERIALIZER_COMPRESSOR_KIND: &str = "zstd";
29const MAX_SERVICE_CHECKS_PER_PAYLOAD: usize = 100;
30
31static CONTENT_TYPE_JSON: HeaderValue = HeaderValue::from_static("application/json");
32
33fn default_serializer_compressor_kind() -> String {
34    DEFAULT_SERIALIZER_COMPRESSOR_KIND.to_owned()
35}
36
37const fn default_zstd_compressor_level() -> i32 {
38    3
39}
40
41/// Datadog Service Checks incremental encoder.
42///
43/// Generates Datadog Service Checks payloads for the Datadog platform.
44#[derive(Deserialize, Facet)]
45#[cfg_attr(test, derive(Debug, PartialEq, serde::Serialize))]
46pub struct DatadogServiceChecksConfiguration {
47    /// Compression kind to use for the request payloads.
48    ///
49    /// Defaults to `zstd`.
50    #[serde(
51        rename = "serializer_compressor_kind",
52        default = "default_serializer_compressor_kind"
53    )]
54    compressor_kind: String,
55
56    /// Compressor level to use when the compressor kind is `zstd`.
57    ///
58    /// Defaults to 3.
59    #[serde(
60        rename = "serializer_zstd_compressor_level",
61        default = "default_zstd_compressor_level"
62    )]
63    zstd_compressor_level: i32,
64}
65
66impl DatadogServiceChecksConfiguration {
67    /// Creates a new `DatadogServiceChecksConfiguration` from the given configuration.
68    pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
69        Ok(config.as_typed()?)
70    }
71}
72
73#[async_trait]
74impl IncrementalEncoderBuilder for DatadogServiceChecksConfiguration {
75    type Output = DatadogServiceChecks;
76
77    fn input_event_type(&self) -> EventType {
78        EventType::ServiceCheck
79    }
80
81    fn output_payload_type(&self) -> PayloadType {
82        PayloadType::Http
83    }
84
85    async fn build(&self, context: ComponentContext) -> Result<Self::Output, GenericError> {
86        let metrics_builder = MetricsBuilder::from_component_context(&context);
87        let telemetry = ComponentTelemetry::from_builder(&metrics_builder);
88        let compression_scheme = CompressionScheme::new(&self.compressor_kind, self.zstd_compressor_level);
89
90        // Create our request builder.
91        let mut request_builder =
92            RequestBuilder::new(ServiceChecksEndpointEncoder, compression_scheme, RB_BUFFER_CHUNK_SIZE).await?;
93        request_builder.with_max_inputs_per_payload(MAX_SERVICE_CHECKS_PER_PAYLOAD);
94
95        Ok(DatadogServiceChecks {
96            request_builder,
97            telemetry,
98        })
99    }
100}
101
102impl MemoryBounds for DatadogServiceChecksConfiguration {
103    fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
104        // TODO: How do we properly represent the requests we can generate that may be sitting around in-flight?
105        //
106        // Theoretically, we'll end up being limited by the size of the downstream forwarder's interconnect, and however
107        // many payloads it will buffer internally... so realistically the firm limit boils down to the forwarder itself
108        // but we'll have a hard time in the forwarder knowing the maximum size of any given payload being sent in, which
109        // then makes it hard to calculate a proper firm bound even though we know the rest of the values required to
110        // calculate the firm bound.
111        builder
112            .minimum()
113            .with_single_value::<DatadogServiceChecks>("component struct");
114
115        builder
116            .firm()
117            // Capture the size of the "split re-encode" buffer in the request builder, which is where we keep owned
118            // versions of events that we encode in case we need to actually re-encode them during a split operation.
119            .with_array::<ServiceCheck>("service checks split re-encode buffer", MAX_SERVICE_CHECKS_PER_PAYLOAD);
120    }
121}
122
123pub struct DatadogServiceChecks {
124    request_builder: RequestBuilder<ServiceChecksEndpointEncoder>,
125    telemetry: ComponentTelemetry,
126}
127
128#[async_trait]
129impl IncrementalEncoder for DatadogServiceChecks {
130    async fn process_event(&mut self, event: Event) -> Result<ProcessResult, GenericError> {
131        let service_check = match event.try_into_service_check() {
132            Some(eventd) => eventd,
133            None => return Ok(ProcessResult::Continue),
134        };
135
136        match self.request_builder.encode(service_check).await {
137            Ok(None) => Ok(ProcessResult::Continue),
138            Ok(Some(service_check)) => Ok(ProcessResult::FlushRequired(Event::ServiceCheck(service_check))),
139            Err(e) => {
140                if e.is_recoverable() {
141                    warn!(error = %e, "Failed to encode Datadog service check due to recoverable error. Continuing...");
142
143                    // TODO: Get the actual number of events dropped from the error itself.
144                    self.telemetry.events_dropped_encoder().increment(1);
145
146                    Ok(ProcessResult::Continue)
147                } else {
148                    Err(e).error_context("Failed to encode Datadog service check due to unrecoverable error.")
149                }
150            }
151        }
152    }
153
154    async fn flush(&mut self, dispatcher: &PayloadsDispatcher) -> Result<(), GenericError> {
155        let maybe_requests = self.request_builder.flush().await;
156        for maybe_request in maybe_requests {
157            match maybe_request {
158                Ok((events, request)) => {
159                    let payload_meta = PayloadMetadata::from_event_count(events);
160                    let http_payload = HttpPayload::new(payload_meta, request);
161                    let payload = Payload::Http(http_payload);
162
163                    dispatcher.dispatch(payload).await?;
164                }
165                Err(e) => error!(error = %e, "Failed to build Datadog service checks payload. Continuing..."),
166            }
167        }
168
169        Ok(())
170    }
171}
172
173#[derive(Debug)]
174struct ServiceChecksEndpointEncoder;
175
176impl EndpointEncoder for ServiceChecksEndpointEncoder {
177    type Input = ServiceCheck;
178    type EncodeError = serde_json::Error;
179
180    fn encoder_name() -> &'static str {
181        "service_check"
182    }
183
184    fn compressed_size_limit(&self) -> usize {
185        DEFAULT_INTAKE_COMPRESSED_SIZE_LIMIT
186    }
187
188    fn uncompressed_size_limit(&self) -> usize {
189        DEFAULT_INTAKE_UNCOMPRESSED_SIZE_LIMIT
190    }
191
192    fn encode(&mut self, input: &Self::Input, buffer: &mut Vec<u8>) -> Result<(), Self::EncodeError> {
193        serde_json::to_writer(buffer, input)
194    }
195
196    fn get_payload_prefix(&self) -> Option<&'static [u8]> {
197        Some(b"[")
198    }
199
200    fn get_payload_suffix(&self) -> Option<&'static [u8]> {
201        Some(b"]")
202    }
203
204    fn get_input_separator(&self) -> Option<&'static [u8]> {
205        Some(b",")
206    }
207
208    fn endpoint_uri(&self) -> Uri {
209        PathAndQuery::from_static("/api/v1/check_run").into()
210    }
211
212    fn endpoint_method(&self) -> Method {
213        Method::POST
214    }
215
216    fn content_type(&self) -> HeaderValue {
217        CONTENT_TYPE_JSON.clone()
218    }
219}
220
221#[cfg(test)]
222mod config_smoke {
223    use serde_json::json;
224
225    use super::DatadogServiceChecksConfiguration;
226    use crate::config_registry::structs;
227    use crate::config_registry::test_support::run_config_smoke_tests;
228
229    #[tokio::test]
230    async fn smoke_test() {
231        run_config_smoke_tests(structs::DATADOG_SERVICE_CHECKS_CONFIGURATION, &[], json!({}), |cfg| {
232            cfg.as_typed::<DatadogServiceChecksConfiguration>()
233                .expect("DatadogServiceChecksConfiguration should deserialize")
234        })
235        .await
236    }
237}