saluki_components/encoders/datadog/service_checks/
mod.rs

1use async_trait::async_trait;
2use http::{uri::PathAndQuery, HeaderValue, Method, Uri};
3use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
4use saluki_config::GenericConfiguration;
5use saluki_core::{
6    components::{encoders::*, ComponentContext},
7    data_model::{
8        event::{service_check::ServiceCheck, Event, EventType},
9        payload::{HttpPayload, Payload, PayloadMetadata, PayloadType},
10    },
11    observability::ComponentMetricsExt as _,
12    topology::PayloadsDispatcher,
13};
14use saluki_error::{ErrorContext as _, GenericError};
15use saluki_io::compression::CompressionScheme;
16use saluki_metrics::MetricsBuilder;
17use serde::Deserialize;
18use tracing::{error, warn};
19
20use crate::common::datadog::{
21    io::RB_BUFFER_CHUNK_SIZE,
22    request_builder::{EndpointEncoder, RequestBuilder},
23    telemetry::ComponentTelemetry,
24    DEFAULT_INTAKE_COMPRESSED_SIZE_LIMIT, DEFAULT_INTAKE_UNCOMPRESSED_SIZE_LIMIT,
25};
26
27const DEFAULT_SERIALIZER_COMPRESSOR_KIND: &str = "zstd";
28const MAX_SERVICE_CHECKS_PER_PAYLOAD: usize = 100;
29
30static CONTENT_TYPE_JSON: HeaderValue = HeaderValue::from_static("application/json");
31
32fn default_serializer_compressor_kind() -> String {
33    DEFAULT_SERIALIZER_COMPRESSOR_KIND.to_owned()
34}
35
36const fn default_zstd_compressor_level() -> i32 {
37    3
38}
39
40/// Datadog Service Checks incremental encoder.
41///
42/// Generates Datadog Service Checks payloads for the Datadog platform.
43#[derive(Deserialize)]
44pub struct DatadogServiceChecksConfiguration {
45    /// Compression kind to use for the request payloads.
46    ///
47    /// Defaults to `zstd`.
48    #[serde(
49        rename = "serializer_compressor_kind",
50        default = "default_serializer_compressor_kind"
51    )]
52    compressor_kind: String,
53
54    /// Compressor level to use when the compressor kind is `zstd`.
55    ///
56    /// Defaults to 3.
57    #[serde(
58        rename = "serializer_zstd_compressor_level",
59        default = "default_zstd_compressor_level"
60    )]
61    zstd_compressor_level: i32,
62}
63
64impl DatadogServiceChecksConfiguration {
65    /// Creates a new `DatadogServiceChecksConfiguration` from the given configuration.
66    pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
67        Ok(config.as_typed()?)
68    }
69}
70
71#[async_trait]
72impl IncrementalEncoderBuilder for DatadogServiceChecksConfiguration {
73    type Output = DatadogServiceChecks;
74
75    fn input_event_type(&self) -> EventType {
76        EventType::ServiceCheck
77    }
78
79    fn output_payload_type(&self) -> PayloadType {
80        PayloadType::Http
81    }
82
83    async fn build(&self, context: ComponentContext) -> Result<Self::Output, GenericError> {
84        let metrics_builder = MetricsBuilder::from_component_context(&context);
85        let telemetry = ComponentTelemetry::from_builder(&metrics_builder);
86        let compression_scheme = CompressionScheme::new(&self.compressor_kind, self.zstd_compressor_level);
87
88        // Create our request builder.
89        let mut request_builder =
90            RequestBuilder::new(ServiceChecksEndpointEncoder, compression_scheme, RB_BUFFER_CHUNK_SIZE).await?;
91        request_builder.with_max_inputs_per_payload(MAX_SERVICE_CHECKS_PER_PAYLOAD);
92
93        Ok(DatadogServiceChecks {
94            request_builder,
95            telemetry,
96        })
97    }
98}
99
100impl MemoryBounds for DatadogServiceChecksConfiguration {
101    fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
102        // TODO: How do we properly represent the requests we can generate that may be sitting around in-flight?
103        //
104        // Theoretically, we'll end up being limited by the size of the downstream forwarder's interconnect, and however
105        // many payloads it will buffer internally... so realistically the firm limit boils down to the forwarder itself
106        // but we'll have a hard time in the forwarder knowing the maximum size of any given payload being sent in, which
107        // then makes it hard to calculate a proper firm bound even though we know the rest of the values required to
108        // calculate the firm bound.
109        builder
110            .minimum()
111            .with_single_value::<DatadogServiceChecks>("component struct");
112
113        builder
114            .firm()
115            // Capture the size of the "split re-encode" buffer in the request builder, which is where we keep owned
116            // versions of events that we encode in case we need to actually re-encode them during a split operation.
117            .with_array::<ServiceCheck>("service checks split re-encode buffer", MAX_SERVICE_CHECKS_PER_PAYLOAD);
118    }
119}
120
121pub struct DatadogServiceChecks {
122    request_builder: RequestBuilder<ServiceChecksEndpointEncoder>,
123    telemetry: ComponentTelemetry,
124}
125
126#[async_trait]
127impl IncrementalEncoder for DatadogServiceChecks {
128    async fn process_event(&mut self, event: Event) -> Result<ProcessResult, GenericError> {
129        let service_check = match event.try_into_service_check() {
130            Some(eventd) => eventd,
131            None => return Ok(ProcessResult::Continue),
132        };
133
134        match self.request_builder.encode(service_check).await {
135            Ok(None) => Ok(ProcessResult::Continue),
136            Ok(Some(service_check)) => Ok(ProcessResult::FlushRequired(Event::ServiceCheck(service_check))),
137            Err(e) => {
138                if e.is_recoverable() {
139                    warn!(error = %e, "Failed to encode Datadog service check due to recoverable error. Continuing...");
140
141                    // TODO: Get the actual number of events dropped from the error itself.
142                    self.telemetry.events_dropped_encoder().increment(1);
143
144                    Ok(ProcessResult::Continue)
145                } else {
146                    Err(e).error_context("Failed to encode Datadog service check due to unrecoverable error.")
147                }
148            }
149        }
150    }
151
152    async fn flush(&mut self, dispatcher: &PayloadsDispatcher) -> Result<(), GenericError> {
153        let maybe_requests = self.request_builder.flush().await;
154        for maybe_request in maybe_requests {
155            match maybe_request {
156                Ok((events, request)) => {
157                    let payload_meta = PayloadMetadata::from_event_count(events);
158                    let http_payload = HttpPayload::new(payload_meta, request);
159                    let payload = Payload::Http(http_payload);
160
161                    dispatcher.dispatch(payload).await?;
162                }
163                Err(e) => error!(error = %e, "Failed to build Datadog service checks payload. Continuing..."),
164            }
165        }
166
167        Ok(())
168    }
169}
170
171#[derive(Debug)]
172struct ServiceChecksEndpointEncoder;
173
174impl EndpointEncoder for ServiceChecksEndpointEncoder {
175    type Input = ServiceCheck;
176    type EncodeError = serde_json::Error;
177
178    fn encoder_name() -> &'static str {
179        "service_check"
180    }
181
182    fn compressed_size_limit(&self) -> usize {
183        DEFAULT_INTAKE_COMPRESSED_SIZE_LIMIT
184    }
185
186    fn uncompressed_size_limit(&self) -> usize {
187        DEFAULT_INTAKE_UNCOMPRESSED_SIZE_LIMIT
188    }
189
190    fn encode(&mut self, input: &Self::Input, buffer: &mut Vec<u8>) -> Result<(), Self::EncodeError> {
191        serde_json::to_writer(buffer, input)
192    }
193
194    fn get_payload_prefix(&self) -> Option<&'static [u8]> {
195        Some(b"[")
196    }
197
198    fn get_payload_suffix(&self) -> Option<&'static [u8]> {
199        Some(b"]")
200    }
201
202    fn get_input_separator(&self) -> Option<&'static [u8]> {
203        Some(b",")
204    }
205
206    fn endpoint_uri(&self) -> Uri {
207        PathAndQuery::from_static("/api/v1/check_run").into()
208    }
209
210    fn endpoint_method(&self) -> Method {
211        Method::POST
212    }
213
214    fn content_type(&self) -> HeaderValue {
215        CONTENT_TYPE_JSON.clone()
216    }
217}