saluki_components/encoders/datadog/service_checks/
mod.rs1use async_trait::async_trait;
2use facet::Facet;
3use http::{uri::PathAndQuery, HeaderValue, Method, Uri};
4use resource_accounting::{MemoryBounds, MemoryBoundsBuilder};
5use saluki_config::GenericConfiguration;
6use saluki_core::{
7 components::{encoders::*, ComponentContext},
8 data_model::{
9 event::{service_check::ServiceCheck, Event, EventType},
10 payload::{HttpPayload, Payload, PayloadMetadata, PayloadType},
11 },
12 observability::ComponentMetricsExt as _,
13 topology::PayloadsDispatcher,
14};
15use saluki_error::{ErrorContext as _, GenericError};
16use saluki_io::compression::CompressionScheme;
17use saluki_metrics::MetricsBuilder;
18use serde::Deserialize;
19use tracing::{debug, error, warn};
20
21use crate::common::datadog::{
22 clamp_payload_limits,
23 io::RB_BUFFER_CHUNK_SIZE,
24 request_builder::{EndpointEncoder, RequestBuilder},
25 telemetry::ComponentTelemetry,
26 DEFAULT_SERIALIZER_COMPRESSED_SIZE_LIMIT, DEFAULT_SERIALIZER_UNCOMPRESSED_SIZE_LIMIT,
27};
28
29const DEFAULT_SERIALIZER_COMPRESSOR_KIND: &str = "zstd";
30const MAX_SERVICE_CHECKS_PER_PAYLOAD: usize = 100;
31
32static CONTENT_TYPE_JSON: HeaderValue = HeaderValue::from_static("application/json");
33
34fn default_serializer_compressor_kind() -> String {
35 DEFAULT_SERIALIZER_COMPRESSOR_KIND.to_owned()
36}
37
38const fn default_zstd_compressor_level() -> i32 {
39 3
40}
41
42const fn default_max_payload_size() -> usize {
43 DEFAULT_SERIALIZER_COMPRESSED_SIZE_LIMIT
44}
45
46const fn default_max_uncompressed_payload_size() -> usize {
47 DEFAULT_SERIALIZER_UNCOMPRESSED_SIZE_LIMIT
48}
49
50const fn default_log_payloads() -> bool {
51 false
52}
53
54#[derive(Deserialize, Facet)]
58#[cfg_attr(test, derive(Debug, PartialEq, serde::Serialize))]
59pub struct DatadogServiceChecksConfiguration {
60 #[serde(rename = "serializer_max_payload_size", default = "default_max_payload_size")]
69 max_payload_size: usize,
70
71 #[serde(
80 rename = "serializer_max_uncompressed_payload_size",
81 default = "default_max_uncompressed_payload_size"
82 )]
83 max_uncompressed_payload_size: usize,
84
85 #[serde(
89 rename = "serializer_compressor_kind",
90 default = "default_serializer_compressor_kind"
91 )]
92 compressor_kind: String,
93
94 #[serde(
98 rename = "serializer_zstd_compressor_level",
99 default = "default_zstd_compressor_level"
100 )]
101 zstd_compressor_level: i32,
102
103 #[serde(default = "default_log_payloads")]
109 log_payloads: bool,
110}
111
112impl DatadogServiceChecksConfiguration {
113 pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
115 Ok(config.as_typed()?)
116 }
117}
118
119#[async_trait]
120impl IncrementalEncoderBuilder for DatadogServiceChecksConfiguration {
121 type Output = DatadogServiceChecks;
122
123 fn input_event_type(&self) -> EventType {
124 EventType::ServiceCheck
125 }
126
127 fn output_payload_type(&self) -> PayloadType {
128 PayloadType::Http
129 }
130
131 async fn build(&self, context: ComponentContext) -> Result<Self::Output, GenericError> {
132 let metrics_builder = MetricsBuilder::from_component_context(&context);
133 let telemetry = ComponentTelemetry::from_builder(&metrics_builder);
134 let compression_scheme = CompressionScheme::new(&self.compressor_kind, self.zstd_compressor_level);
135
136 let mut request_builder =
138 RequestBuilder::new(ServiceChecksEndpointEncoder, compression_scheme, RB_BUFFER_CHUNK_SIZE).await?;
139 let (uncompressed_limit, compressed_limit) = clamp_payload_limits(
140 self.max_uncompressed_payload_size,
141 self.max_payload_size,
142 DEFAULT_SERIALIZER_UNCOMPRESSED_SIZE_LIMIT,
143 DEFAULT_SERIALIZER_COMPRESSED_SIZE_LIMIT,
144 );
145 request_builder.with_len_limits(uncompressed_limit, compressed_limit)?;
146 request_builder.with_max_inputs_per_payload(MAX_SERVICE_CHECKS_PER_PAYLOAD);
147
148 Ok(DatadogServiceChecks {
149 request_builder,
150 telemetry,
151 log_payloads: self.log_payloads,
152 })
153 }
154}
155
156impl MemoryBounds for DatadogServiceChecksConfiguration {
157 fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
158 builder
166 .minimum()
167 .with_single_value::<DatadogServiceChecks>("component struct");
168
169 builder
170 .firm()
171 .with_array::<ServiceCheck>("service checks split re-encode buffer", MAX_SERVICE_CHECKS_PER_PAYLOAD);
174 }
175}
176
177pub struct DatadogServiceChecks {
178 request_builder: RequestBuilder<ServiceChecksEndpointEncoder>,
179 telemetry: ComponentTelemetry,
180 log_payloads: bool,
181}
182
183#[async_trait]
184impl IncrementalEncoder for DatadogServiceChecks {
185 async fn process_event(&mut self, event: Event) -> Result<ProcessResult, GenericError> {
186 let service_check = match event.try_into_service_check() {
187 Some(eventd) => eventd,
188 None => return Ok(ProcessResult::Continue),
189 };
190
191 if self.log_payloads {
192 debug!(?service_check, "Flushing service check.");
193 }
194
195 match self.request_builder.encode(service_check).await {
196 Ok(None) => Ok(ProcessResult::Continue),
197 Ok(Some(service_check)) => Ok(ProcessResult::FlushRequired(Event::ServiceCheck(service_check))),
198 Err(e) => {
199 if e.is_recoverable() {
200 warn!(error = %e, "Failed to encode Datadog service check due to recoverable error. Continuing...");
201
202 self.telemetry.events_dropped_encoder().increment(1);
204
205 Ok(ProcessResult::Continue)
206 } else {
207 Err(e).error_context("Failed to encode Datadog service check due to unrecoverable error.")
208 }
209 }
210 }
211 }
212
213 async fn flush(&mut self, dispatcher: &PayloadsDispatcher) -> Result<(), GenericError> {
214 let maybe_requests = self.request_builder.flush().await;
215 for maybe_request in maybe_requests {
216 match maybe_request {
217 Ok((events, _data_points, request)) => {
218 let payload_meta = PayloadMetadata::from_event_count(events);
219 let http_payload = HttpPayload::new(payload_meta, request);
220 let payload = Payload::Http(http_payload);
221
222 dispatcher.dispatch(payload).await?;
223 }
224 Err(e) => error!(error = %e, "Failed to build Datadog service checks payload. Continuing..."),
225 }
226 }
227
228 Ok(())
229 }
230}
231
232#[derive(Debug)]
233struct ServiceChecksEndpointEncoder;
234
235impl EndpointEncoder for ServiceChecksEndpointEncoder {
236 type Input = ServiceCheck;
237 type EncodeError = serde_json::Error;
238
239 fn encoder_name() -> &'static str {
240 "service_check"
241 }
242
243 fn compressed_size_limit(&self) -> usize {
244 DEFAULT_SERIALIZER_COMPRESSED_SIZE_LIMIT
245 }
246
247 fn uncompressed_size_limit(&self) -> usize {
248 DEFAULT_SERIALIZER_UNCOMPRESSED_SIZE_LIMIT
249 }
250
251 fn encode(&mut self, input: &Self::Input, buffer: &mut Vec<u8>) -> Result<(), Self::EncodeError> {
252 serde_json::to_writer(buffer, input)
253 }
254
255 fn get_payload_prefix(&self) -> Option<&'static [u8]> {
256 Some(b"[")
257 }
258
259 fn get_payload_suffix(&self) -> Option<&'static [u8]> {
260 Some(b"]")
261 }
262
263 fn get_input_separator(&self) -> Option<&'static [u8]> {
264 Some(b",")
265 }
266
267 fn endpoint_uri(&self) -> Uri {
268 PathAndQuery::from_static("/api/v1/check_run").into()
269 }
270
271 fn endpoint_method(&self) -> Method {
272 Method::POST
273 }
274
275 fn content_type(&self) -> HeaderValue {
276 CONTENT_TYPE_JSON.clone()
277 }
278}
279
280#[cfg(test)]
281mod config_smoke {
282 use serde_json::json;
283
284 use super::DatadogServiceChecksConfiguration;
285 use crate::config_registry::structs;
286 use crate::config_registry::test_support::run_config_smoke_tests;
287
288 #[tokio::test]
289 async fn smoke_test() {
290 run_config_smoke_tests(structs::DATADOG_SERVICE_CHECKS_CONFIGURATION, &[], json!({}), |cfg| {
291 cfg.as_typed::<DatadogServiceChecksConfiguration>()
292 .expect("DatadogServiceChecksConfiguration should deserialize")
293 })
294 .await
295 }
296}