saluki_components/encoders/datadog/service_checks/
mod.rs1use async_trait::async_trait;
2use http::{uri::PathAndQuery, HeaderValue, Method, Uri};
3use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
4use saluki_config::GenericConfiguration;
5use saluki_core::{
6 components::{encoders::*, ComponentContext},
7 data_model::{
8 event::{service_check::ServiceCheck, Event, EventType},
9 payload::{HttpPayload, Payload, PayloadMetadata, PayloadType},
10 },
11 observability::ComponentMetricsExt as _,
12 topology::PayloadsDispatcher,
13};
14use saluki_error::{ErrorContext as _, GenericError};
15use saluki_io::compression::CompressionScheme;
16use saluki_metrics::MetricsBuilder;
17use serde::Deserialize;
18use tracing::{error, warn};
19
20use crate::common::datadog::{
21 io::RB_BUFFER_CHUNK_SIZE,
22 request_builder::{EndpointEncoder, RequestBuilder},
23 telemetry::ComponentTelemetry,
24 DEFAULT_INTAKE_COMPRESSED_SIZE_LIMIT, DEFAULT_INTAKE_UNCOMPRESSED_SIZE_LIMIT,
25};
26
27const DEFAULT_SERIALIZER_COMPRESSOR_KIND: &str = "zstd";
28const MAX_SERVICE_CHECKS_PER_PAYLOAD: usize = 100;
29
30static CONTENT_TYPE_JSON: HeaderValue = HeaderValue::from_static("application/json");
31
32fn default_serializer_compressor_kind() -> String {
33 DEFAULT_SERIALIZER_COMPRESSOR_KIND.to_owned()
34}
35
36const fn default_zstd_compressor_level() -> i32 {
37 3
38}
39
40#[derive(Deserialize)]
44pub struct DatadogServiceChecksConfiguration {
45 #[serde(
49 rename = "serializer_compressor_kind",
50 default = "default_serializer_compressor_kind"
51 )]
52 compressor_kind: String,
53
54 #[serde(
58 rename = "serializer_zstd_compressor_level",
59 default = "default_zstd_compressor_level"
60 )]
61 zstd_compressor_level: i32,
62}
63
64impl DatadogServiceChecksConfiguration {
65 pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
67 Ok(config.as_typed()?)
68 }
69}
70
71#[async_trait]
72impl IncrementalEncoderBuilder for DatadogServiceChecksConfiguration {
73 type Output = DatadogServiceChecks;
74
75 fn input_event_type(&self) -> EventType {
76 EventType::ServiceCheck
77 }
78
79 fn output_payload_type(&self) -> PayloadType {
80 PayloadType::Http
81 }
82
83 async fn build(&self, context: ComponentContext) -> Result<Self::Output, GenericError> {
84 let metrics_builder = MetricsBuilder::from_component_context(&context);
85 let telemetry = ComponentTelemetry::from_builder(&metrics_builder);
86 let compression_scheme = CompressionScheme::new(&self.compressor_kind, self.zstd_compressor_level);
87
88 let mut request_builder =
90 RequestBuilder::new(ServiceChecksEndpointEncoder, compression_scheme, RB_BUFFER_CHUNK_SIZE).await?;
91 request_builder.with_max_inputs_per_payload(MAX_SERVICE_CHECKS_PER_PAYLOAD);
92
93 Ok(DatadogServiceChecks {
94 request_builder,
95 telemetry,
96 })
97 }
98}
99
100impl MemoryBounds for DatadogServiceChecksConfiguration {
101 fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
102 builder
110 .minimum()
111 .with_single_value::<DatadogServiceChecks>("component struct");
112
113 builder
114 .firm()
115 .with_array::<ServiceCheck>("service checks split re-encode buffer", MAX_SERVICE_CHECKS_PER_PAYLOAD);
118 }
119}
120
121pub struct DatadogServiceChecks {
122 request_builder: RequestBuilder<ServiceChecksEndpointEncoder>,
123 telemetry: ComponentTelemetry,
124}
125
126#[async_trait]
127impl IncrementalEncoder for DatadogServiceChecks {
128 async fn process_event(&mut self, event: Event) -> Result<ProcessResult, GenericError> {
129 let service_check = match event.try_into_service_check() {
130 Some(eventd) => eventd,
131 None => return Ok(ProcessResult::Continue),
132 };
133
134 match self.request_builder.encode(service_check).await {
135 Ok(None) => Ok(ProcessResult::Continue),
136 Ok(Some(service_check)) => Ok(ProcessResult::FlushRequired(Event::ServiceCheck(service_check))),
137 Err(e) => {
138 if e.is_recoverable() {
139 warn!(error = %e, "Failed to encode Datadog service check due to recoverable error. Continuing...");
140
141 self.telemetry.events_dropped_encoder().increment(1);
143
144 Ok(ProcessResult::Continue)
145 } else {
146 Err(e).error_context("Failed to encode Datadog service check due to unrecoverable error.")
147 }
148 }
149 }
150 }
151
152 async fn flush(&mut self, dispatcher: &PayloadsDispatcher) -> Result<(), GenericError> {
153 let maybe_requests = self.request_builder.flush().await;
154 for maybe_request in maybe_requests {
155 match maybe_request {
156 Ok((events, request)) => {
157 let payload_meta = PayloadMetadata::from_event_count(events);
158 let http_payload = HttpPayload::new(payload_meta, request);
159 let payload = Payload::Http(http_payload);
160
161 dispatcher.dispatch(payload).await?;
162 }
163 Err(e) => error!(error = %e, "Failed to build Datadog service checks payload. Continuing..."),
164 }
165 }
166
167 Ok(())
168 }
169}
170
171#[derive(Debug)]
172struct ServiceChecksEndpointEncoder;
173
174impl EndpointEncoder for ServiceChecksEndpointEncoder {
175 type Input = ServiceCheck;
176 type EncodeError = serde_json::Error;
177
178 fn encoder_name() -> &'static str {
179 "service_check"
180 }
181
182 fn compressed_size_limit(&self) -> usize {
183 DEFAULT_INTAKE_COMPRESSED_SIZE_LIMIT
184 }
185
186 fn uncompressed_size_limit(&self) -> usize {
187 DEFAULT_INTAKE_UNCOMPRESSED_SIZE_LIMIT
188 }
189
190 fn encode(&mut self, input: &Self::Input, buffer: &mut Vec<u8>) -> Result<(), Self::EncodeError> {
191 serde_json::to_writer(buffer, input)
192 }
193
194 fn get_payload_prefix(&self) -> Option<&'static [u8]> {
195 Some(b"[")
196 }
197
198 fn get_payload_suffix(&self) -> Option<&'static [u8]> {
199 Some(b"]")
200 }
201
202 fn get_input_separator(&self) -> Option<&'static [u8]> {
203 Some(b",")
204 }
205
206 fn endpoint_uri(&self) -> Uri {
207 PathAndQuery::from_static("/api/v1/check_run").into()
208 }
209
210 fn endpoint_method(&self) -> Method {
211 Method::POST
212 }
213
214 fn content_type(&self) -> HeaderValue {
215 CONTENT_TYPE_JSON.clone()
216 }
217}