saluki_components/encoders/datadog/service_checks/
mod.rs1use async_trait::async_trait;
2use facet::Facet;
3use http::{uri::PathAndQuery, HeaderValue, Method, Uri};
4use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
5use saluki_config::GenericConfiguration;
6use saluki_core::{
7 components::{encoders::*, ComponentContext},
8 data_model::{
9 event::{service_check::ServiceCheck, Event, EventType},
10 payload::{HttpPayload, Payload, PayloadMetadata, PayloadType},
11 },
12 observability::ComponentMetricsExt as _,
13 topology::PayloadsDispatcher,
14};
15use saluki_error::{ErrorContext as _, GenericError};
16use saluki_io::compression::CompressionScheme;
17use saluki_metrics::MetricsBuilder;
18use serde::Deserialize;
19use tracing::{error, warn};
20
21use crate::common::datadog::{
22 io::RB_BUFFER_CHUNK_SIZE,
23 request_builder::{EndpointEncoder, RequestBuilder},
24 telemetry::ComponentTelemetry,
25 DEFAULT_INTAKE_COMPRESSED_SIZE_LIMIT, DEFAULT_INTAKE_UNCOMPRESSED_SIZE_LIMIT,
26};
27
28const DEFAULT_SERIALIZER_COMPRESSOR_KIND: &str = "zstd";
29const MAX_SERVICE_CHECKS_PER_PAYLOAD: usize = 100;
30
31static CONTENT_TYPE_JSON: HeaderValue = HeaderValue::from_static("application/json");
32
33fn default_serializer_compressor_kind() -> String {
34 DEFAULT_SERIALIZER_COMPRESSOR_KIND.to_owned()
35}
36
37const fn default_zstd_compressor_level() -> i32 {
38 3
39}
40
41#[derive(Deserialize, Facet)]
45#[cfg_attr(test, derive(Debug, PartialEq, serde::Serialize))]
46pub struct DatadogServiceChecksConfiguration {
47 #[serde(
51 rename = "serializer_compressor_kind",
52 default = "default_serializer_compressor_kind"
53 )]
54 compressor_kind: String,
55
56 #[serde(
60 rename = "serializer_zstd_compressor_level",
61 default = "default_zstd_compressor_level"
62 )]
63 zstd_compressor_level: i32,
64}
65
66impl DatadogServiceChecksConfiguration {
67 pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
69 Ok(config.as_typed()?)
70 }
71}
72
73#[async_trait]
74impl IncrementalEncoderBuilder for DatadogServiceChecksConfiguration {
75 type Output = DatadogServiceChecks;
76
77 fn input_event_type(&self) -> EventType {
78 EventType::ServiceCheck
79 }
80
81 fn output_payload_type(&self) -> PayloadType {
82 PayloadType::Http
83 }
84
85 async fn build(&self, context: ComponentContext) -> Result<Self::Output, GenericError> {
86 let metrics_builder = MetricsBuilder::from_component_context(&context);
87 let telemetry = ComponentTelemetry::from_builder(&metrics_builder);
88 let compression_scheme = CompressionScheme::new(&self.compressor_kind, self.zstd_compressor_level);
89
90 let mut request_builder =
92 RequestBuilder::new(ServiceChecksEndpointEncoder, compression_scheme, RB_BUFFER_CHUNK_SIZE).await?;
93 request_builder.with_max_inputs_per_payload(MAX_SERVICE_CHECKS_PER_PAYLOAD);
94
95 Ok(DatadogServiceChecks {
96 request_builder,
97 telemetry,
98 })
99 }
100}
101
102impl MemoryBounds for DatadogServiceChecksConfiguration {
103 fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
104 builder
112 .minimum()
113 .with_single_value::<DatadogServiceChecks>("component struct");
114
115 builder
116 .firm()
117 .with_array::<ServiceCheck>("service checks split re-encode buffer", MAX_SERVICE_CHECKS_PER_PAYLOAD);
120 }
121}
122
123pub struct DatadogServiceChecks {
124 request_builder: RequestBuilder<ServiceChecksEndpointEncoder>,
125 telemetry: ComponentTelemetry,
126}
127
128#[async_trait]
129impl IncrementalEncoder for DatadogServiceChecks {
130 async fn process_event(&mut self, event: Event) -> Result<ProcessResult, GenericError> {
131 let service_check = match event.try_into_service_check() {
132 Some(eventd) => eventd,
133 None => return Ok(ProcessResult::Continue),
134 };
135
136 match self.request_builder.encode(service_check).await {
137 Ok(None) => Ok(ProcessResult::Continue),
138 Ok(Some(service_check)) => Ok(ProcessResult::FlushRequired(Event::ServiceCheck(service_check))),
139 Err(e) => {
140 if e.is_recoverable() {
141 warn!(error = %e, "Failed to encode Datadog service check due to recoverable error. Continuing...");
142
143 self.telemetry.events_dropped_encoder().increment(1);
145
146 Ok(ProcessResult::Continue)
147 } else {
148 Err(e).error_context("Failed to encode Datadog service check due to unrecoverable error.")
149 }
150 }
151 }
152 }
153
154 async fn flush(&mut self, dispatcher: &PayloadsDispatcher) -> Result<(), GenericError> {
155 let maybe_requests = self.request_builder.flush().await;
156 for maybe_request in maybe_requests {
157 match maybe_request {
158 Ok((events, request)) => {
159 let payload_meta = PayloadMetadata::from_event_count(events);
160 let http_payload = HttpPayload::new(payload_meta, request);
161 let payload = Payload::Http(http_payload);
162
163 dispatcher.dispatch(payload).await?;
164 }
165 Err(e) => error!(error = %e, "Failed to build Datadog service checks payload. Continuing..."),
166 }
167 }
168
169 Ok(())
170 }
171}
172
173#[derive(Debug)]
174struct ServiceChecksEndpointEncoder;
175
176impl EndpointEncoder for ServiceChecksEndpointEncoder {
177 type Input = ServiceCheck;
178 type EncodeError = serde_json::Error;
179
180 fn encoder_name() -> &'static str {
181 "service_check"
182 }
183
184 fn compressed_size_limit(&self) -> usize {
185 DEFAULT_INTAKE_COMPRESSED_SIZE_LIMIT
186 }
187
188 fn uncompressed_size_limit(&self) -> usize {
189 DEFAULT_INTAKE_UNCOMPRESSED_SIZE_LIMIT
190 }
191
192 fn encode(&mut self, input: &Self::Input, buffer: &mut Vec<u8>) -> Result<(), Self::EncodeError> {
193 serde_json::to_writer(buffer, input)
194 }
195
196 fn get_payload_prefix(&self) -> Option<&'static [u8]> {
197 Some(b"[")
198 }
199
200 fn get_payload_suffix(&self) -> Option<&'static [u8]> {
201 Some(b"]")
202 }
203
204 fn get_input_separator(&self) -> Option<&'static [u8]> {
205 Some(b",")
206 }
207
208 fn endpoint_uri(&self) -> Uri {
209 PathAndQuery::from_static("/api/v1/check_run").into()
210 }
211
212 fn endpoint_method(&self) -> Method {
213 Method::POST
214 }
215
216 fn content_type(&self) -> HeaderValue {
217 CONTENT_TYPE_JSON.clone()
218 }
219}
220
221#[cfg(test)]
222mod config_smoke {
223 use serde_json::json;
224
225 use super::DatadogServiceChecksConfiguration;
226 use crate::config_registry::structs;
227 use crate::config_registry::test_support::run_config_smoke_tests;
228
229 #[tokio::test]
230 async fn smoke_test() {
231 run_config_smoke_tests(structs::DATADOG_SERVICE_CHECKS_CONFIGURATION, &[], json!({}), |cfg| {
232 cfg.as_typed::<DatadogServiceChecksConfiguration>()
233 .expect("DatadogServiceChecksConfiguration should deserialize")
234 })
235 .await
236 }
237}