saluki_components/encoders/datadog/stats/
mod.rs1use std::time::Duration;
4
5use async_trait::async_trait;
6use datadog_protos::traces::{
7 ClientGroupedStats as ProtoClientGroupedStats, ClientStatsBucket as ProtoClientStatsBucket,
8 ClientStatsPayload as ProtoClientStatsPayload, StatsPayload as ProtoStatsPayload, Trilean,
9};
10use http::{uri::PathAndQuery, HeaderValue, Method, Uri};
11use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
12use saluki_common::task::HandleExt as _;
13use saluki_config::GenericConfiguration;
14use saluki_core::{
15 components::{encoders::*, ComponentContext},
16 data_model::{
17 event::{
18 trace_stats::{ClientGroupedStats, ClientStatsBucket, ClientStatsPayload, TraceStats},
19 EventType,
20 },
21 payload::{HttpPayload, Payload, PayloadMetadata, PayloadType},
22 },
23 observability::ComponentMetricsExt as _,
24 topology::{EventsBuffer, PayloadsBuffer},
25};
26use saluki_env::{host::providers::BoxedHostProvider, EnvironmentProvider, HostProvider};
27use saluki_error::{generic_error, ErrorContext as _, GenericError};
28use saluki_io::compression::CompressionScheme;
29use saluki_metrics::MetricsBuilder;
30use serde::Deserialize;
31use stringtheory::MetaString;
32use tokio::{
33 select,
34 sync::mpsc::{self, Receiver, Sender},
35 time::sleep,
36};
37use tracing::{debug, error};
38
39use crate::common::datadog::{
40 io::RB_BUFFER_CHUNK_SIZE,
41 request_builder::{EndpointEncoder, RequestBuilder},
42 telemetry::ComponentTelemetry,
43 DEFAULT_INTAKE_COMPRESSED_SIZE_LIMIT, DEFAULT_INTAKE_UNCOMPRESSED_SIZE_LIMIT,
44};
45
46const MAX_STATS_PER_PAYLOAD: usize = 4000;
47static CONTENT_TYPE_MSGPACK: HeaderValue = HeaderValue::from_static("application/msgpack");
48
49const fn default_flush_timeout_secs() -> u64 {
50 2
51}
52
53fn default_env() -> String {
54 "none".to_string()
55}
56
57#[derive(Deserialize)]
59pub struct DatadogApmStatsEncoderConfiguration {
60 #[serde(default = "default_flush_timeout_secs")]
68 flush_timeout_secs: u64,
69
70 #[serde(skip)]
71 agent_hostname: Option<String>,
72
73 #[serde(skip)]
74 agent_version: String,
75
76 #[serde(default = "default_env")]
77 env: String,
78}
79
80impl DatadogApmStatsEncoderConfiguration {
81 pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
83 let mut stats_config: Self = config.as_typed()?;
84 let app_details = saluki_metadata::get_app_details();
85 stats_config.agent_version = format!("agent-data-plane/{}", app_details.version().raw());
86
87 Ok(stats_config)
88 }
89
90 pub async fn with_environment_provider<E>(mut self, environment_provider: E) -> Result<Self, GenericError>
92 where
93 E: EnvironmentProvider<Host = BoxedHostProvider>,
94 {
95 let host_provider = environment_provider.host();
96 let hostname = host_provider.get_hostname().await?;
97 self.agent_hostname = Some(hostname);
98 Ok(self)
99 }
100}
101
102#[async_trait]
103impl EncoderBuilder for DatadogApmStatsEncoderConfiguration {
104 fn input_event_type(&self) -> EventType {
105 EventType::TraceStats
106 }
107
108 fn output_payload_type(&self) -> PayloadType {
109 PayloadType::Http
110 }
111
112 async fn build(&self, context: ComponentContext) -> Result<Box<dyn Encoder + Send>, GenericError> {
113 let metrics_builder = MetricsBuilder::from_component_context(&context);
114 let telemetry = ComponentTelemetry::from_builder(&metrics_builder);
115 let compression_scheme = CompressionScheme::gzip_default();
116
117 let agent_hostname = MetaString::from(self.agent_hostname.clone().unwrap_or_default());
118 let agent_version = MetaString::from(self.agent_version.clone());
119 let agent_env = MetaString::from(self.env.clone());
120
121 let mut stats_rb = RequestBuilder::new(
122 StatsEndpointEncoder::new(agent_hostname, agent_version, agent_env),
123 compression_scheme,
124 RB_BUFFER_CHUNK_SIZE,
125 )
126 .await?;
127 stats_rb.with_max_inputs_per_payload(MAX_STATS_PER_PAYLOAD);
128
129 let flush_timeout = match self.flush_timeout_secs {
130 0 => Duration::from_millis(10),
133 secs => Duration::from_secs(secs),
134 };
135
136 Ok(Box::new(DatadogStats {
137 stats_rb,
138 telemetry,
139 flush_timeout,
140 }))
141 }
142}
143
144impl MemoryBounds for DatadogApmStatsEncoderConfiguration {
145 fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
146 builder
147 .minimum()
148 .with_single_value::<DatadogStats>("component struct")
149 .with_array::<EventsBuffer>("request builder events channel", 8)
150 .with_array::<PayloadsBuffer>("request builder payloads channel", 8);
151
152 builder
153 .firm()
154 .with_array::<TraceStats>("stats split re-encode buffer", MAX_STATS_PER_PAYLOAD);
155 }
156}
157
158pub struct DatadogStats {
159 stats_rb: RequestBuilder<StatsEndpointEncoder>,
160 telemetry: ComponentTelemetry,
161 flush_timeout: Duration,
162}
163
164#[async_trait]
165impl Encoder for DatadogStats {
166 async fn run(mut self: Box<Self>, mut context: EncoderContext) -> Result<(), GenericError> {
167 let Self {
168 stats_rb,
169 telemetry,
170 flush_timeout,
171 } = *self;
172
173 let mut health = context.take_health_handle();
174
175 let (events_tx, events_rx) = mpsc::channel(8);
176 let (payloads_tx, mut payloads_rx) = mpsc::channel(8);
177
178 let request_builder_fut = run_request_builder(stats_rb, telemetry, events_rx, payloads_tx, flush_timeout);
179 let request_builder_handle = context
180 .topology_context()
181 .global_thread_pool()
182 .spawn_traced_named("dd-stats-request-builder", request_builder_fut);
183
184 health.mark_ready();
185 debug!("Datadog APM Stats encoder started.");
186
187 loop {
188 select! {
189 biased;
190
191 _ = health.live() => continue,
192 maybe_payload = payloads_rx.recv() => match maybe_payload {
193 Some(payload) => {
194 if let Err(e) = context.dispatcher().dispatch(payload).await {
195 error!("Failed to dispatch payload: {}", e);
196 }
197 }
198 None => break,
199 },
200 maybe_event_buffer = context.events().next() => match maybe_event_buffer {
201 Some(event_buffer) => events_tx.send(event_buffer).await
202 .error_context("Failed to send event buffer to request builder task.")?,
203 None => break,
204 },
205 }
206 }
207
208 drop(events_tx);
209
210 while let Some(payload) = payloads_rx.recv().await {
212 if let Err(e) = context.dispatcher().dispatch(payload).await {
213 error!("Failed to dispatch payload: {}", e);
214 }
215 }
216
217 match request_builder_handle.await {
219 Ok(Ok(())) => debug!("Request builder task stopped."),
220 Ok(Err(e)) => error!(error = %e, "Request builder task failed."),
221 Err(e) => error!(error = %e, "Request builder task panicked."),
222 }
223
224 debug!("Datadog APM Stats encoder stopped.");
225
226 Ok(())
227 }
228}
229
230async fn run_request_builder(
231 mut stats_request_builder: RequestBuilder<StatsEndpointEncoder>, telemetry: ComponentTelemetry,
232 mut events_rx: Receiver<EventsBuffer>, payloads_tx: Sender<PayloadsBuffer>, flush_timeout: std::time::Duration,
233) -> Result<(), GenericError> {
234 let mut pending_flush = false;
235 let pending_flush_timeout = sleep(flush_timeout);
236 tokio::pin!(pending_flush_timeout);
237
238 loop {
239 select! {
240 Some(event_buffer) = events_rx.recv() => {
241 for event in event_buffer {
242 let trace_stats = match event.try_into_trace_stats() {
243 Some(stats) => stats,
244 None => continue,
245 };
246
247 let stats_to_retry = match stats_request_builder.encode(trace_stats).await {
250 Ok(None) => continue,
251 Ok(Some(stats)) => stats,
252 Err(e) => {
253 error!(error = %e, "Failed to encode stats.");
254 telemetry.events_dropped_encoder().increment(1);
255 continue;
256 }
257 };
258
259 let maybe_requests = stats_request_builder.flush().await;
260 if maybe_requests.is_empty() {
261 panic!("builder told us to flush, but gave us nothing");
262 }
263
264 for maybe_request in maybe_requests {
265 match maybe_request {
266 Ok((events, request)) => {
267 let payload_meta = PayloadMetadata::from_event_count(events);
268 let http_payload = HttpPayload::new(payload_meta, request);
269 let payload = Payload::Http(http_payload);
270
271 payloads_tx.send(payload).await
272 .map_err(|_| generic_error!("Failed to send payload to encoder."))?;
273 },
274 Err(e) => if e.is_recoverable() {
275 continue;
276 } else {
277 return Err(GenericError::from(e).context("Failed to flush request."));
278 }
279 }
280 }
281
282 if let Err(e) = stats_request_builder.encode(stats_to_retry).await {
283 error!(error = %e, "Failed to encode stats.");
284 telemetry.events_dropped_encoder().increment(1);
285 }
286 }
287
288 debug!("Processed event buffer.");
289
290 if !pending_flush {
292 pending_flush_timeout.as_mut().reset(tokio::time::Instant::now() + flush_timeout);
293 pending_flush = true;
294 }
295 },
296 _ = &mut pending_flush_timeout, if pending_flush => {
297 debug!("Flushing pending request(s).");
298
299 pending_flush = false;
300
301 let maybe_stats_requests = stats_request_builder.flush().await;
302 for maybe_request in maybe_stats_requests {
303 match maybe_request {
304 Ok((events, request)) => {
305 let payload_meta = PayloadMetadata::from_event_count(events);
306 let http_payload = HttpPayload::new(payload_meta, request);
307 let payload = Payload::Http(http_payload);
308
309 payloads_tx.send(payload).await
310 .map_err(|_| generic_error!("Failed to send payload to encoder."))?;
311 },
312 Err(e) => if e.is_recoverable() {
313 continue;
314 } else {
315 return Err(GenericError::from(e).context("Failed to flush request."));
316 }
317 }
318 }
319
320 debug!("All flushed requests sent to I/O task. Waiting for next event buffer...");
321 },
322
323 else => break,
324 }
325 }
326
327 Ok(())
328}
329
330#[derive(Debug)]
331struct StatsEndpointEncoder {
332 agent_hostname: MetaString,
333 agent_version: MetaString,
334 agent_env: MetaString,
335}
336
337impl StatsEndpointEncoder {
338 fn new(agent_hostname: MetaString, agent_version: MetaString, agent_env: MetaString) -> Self {
339 Self {
340 agent_hostname,
341 agent_version,
342 agent_env,
343 }
344 }
345
346 fn to_proto_stats_payload(&self, stats: &TraceStats) -> ProtoStatsPayload {
347 let mut payload = ProtoStatsPayload::new();
348 payload.set_agentHostname(self.agent_hostname.to_string());
349 payload.set_agentEnv(self.agent_env.to_string());
350 payload.set_agentVersion(self.agent_version.to_string());
351 payload.set_clientComputed(false);
352 payload.set_splitPayload(false);
353 payload.set_stats(stats.stats().iter().map(convert_client_stats_payload).collect());
354
355 payload
356 }
357}
358
359fn convert_client_stats_payload(client_payload: &ClientStatsPayload) -> ProtoClientStatsPayload {
360 let mut proto_client = ProtoClientStatsPayload::new();
361 proto_client.set_hostname(client_payload.hostname().to_string());
362 proto_client.set_env(client_payload.env().to_string());
363 proto_client.set_version(client_payload.version().to_string());
364 proto_client.set_lang(client_payload.lang().to_string());
365 proto_client.set_tracerVersion(client_payload.tracer_version().to_string());
366 proto_client.set_runtimeID(client_payload.runtime_id().to_string());
367 proto_client.set_sequence(client_payload.sequence());
368 proto_client.set_agentAggregation(client_payload.agent_aggregation().to_string());
369 proto_client.set_service(client_payload.service().to_string());
370 proto_client.set_containerID(client_payload.container_id().to_string());
371 proto_client.set_tags(client_payload.tags().iter().map(|s| s.to_string()).collect());
372 proto_client.set_git_commit_sha(client_payload.git_commit_sha().to_string());
373 proto_client.set_image_tag(client_payload.image_tag().to_string());
374 proto_client.set_process_tags_hash(client_payload.process_tags_hash());
375 proto_client.set_process_tags(client_payload.process_tags().to_string());
376 proto_client.set_stats(client_payload.stats().iter().map(convert_client_stats_bucket).collect());
377 proto_client
378}
379
380fn convert_client_stats_bucket(bucket: &ClientStatsBucket) -> ProtoClientStatsBucket {
381 let mut proto_bucket = ProtoClientStatsBucket::new();
382 proto_bucket.set_start(bucket.start());
383 proto_bucket.set_duration(bucket.duration());
384 proto_bucket.set_agentTimeShift(bucket.agent_time_shift());
385 proto_bucket.set_stats(bucket.stats().iter().map(convert_client_grouped_stats).collect());
386 proto_bucket
387}
388
389fn convert_client_grouped_stats(grouped: &ClientGroupedStats) -> ProtoClientGroupedStats {
390 let mut proto_grouped = ProtoClientGroupedStats::new();
391 proto_grouped.set_service(grouped.service().to_string());
392 proto_grouped.set_name(grouped.name().to_string());
393 proto_grouped.set_resource(grouped.resource().to_string());
394 proto_grouped.set_HTTP_status_code(grouped.http_status_code());
395 proto_grouped.set_type(grouped.span_type().to_string());
396 proto_grouped.set_DB_type(grouped.db_type().to_string());
397 proto_grouped.set_hits(grouped.hits());
398 proto_grouped.set_errors(grouped.errors());
399 proto_grouped.set_duration(grouped.duration());
400 proto_grouped.set_okSummary(grouped.ok_summary().to_vec());
401 proto_grouped.set_errorSummary(grouped.error_summary().to_vec());
402 proto_grouped.set_synthetics(grouped.synthetics());
403 proto_grouped.set_topLevelHits(grouped.top_level_hits());
404 proto_grouped.set_span_kind(grouped.span_kind().to_string());
405 proto_grouped.set_peer_tags(grouped.peer_tags().iter().map(|s| s.to_string()).collect());
406 proto_grouped.set_is_trace_root(match grouped.is_trace_root() {
407 None => Trilean::NOT_SET,
408 Some(true) => Trilean::TRUE,
409 Some(false) => Trilean::FALSE,
410 });
411 proto_grouped.set_GRPC_status_code(grouped.grpc_status_code().to_string());
412 proto_grouped.set_HTTP_method(grouped.http_method().to_string());
413 proto_grouped.set_HTTP_endpoint(grouped.http_endpoint().to_string());
414 proto_grouped
415}
416
417#[derive(Debug)]
419pub struct StatsEncodeError(rmp_serde::encode::Error);
420
421impl std::fmt::Display for StatsEncodeError {
422 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
423 write!(f, "failed to encode stats as MessagePack: {}", self.0)
424 }
425}
426
427impl std::error::Error for StatsEncodeError {
428 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
429 Some(&self.0)
430 }
431}
432
433impl EndpointEncoder for StatsEndpointEncoder {
434 type Input = TraceStats;
435 type EncodeError = StatsEncodeError;
436
437 fn encoder_name() -> &'static str {
438 "stats"
439 }
440
441 fn compressed_size_limit(&self) -> usize {
442 DEFAULT_INTAKE_COMPRESSED_SIZE_LIMIT
443 }
444
445 fn uncompressed_size_limit(&self) -> usize {
446 DEFAULT_INTAKE_UNCOMPRESSED_SIZE_LIMIT
447 }
448
449 fn encode(&mut self, stats: &Self::Input, buffer: &mut Vec<u8>) -> Result<(), Self::EncodeError> {
450 let payload = self.to_proto_stats_payload(stats);
451 rmp_serde::encode::write_named(buffer, &payload).map_err(StatsEncodeError)?;
452 Ok(())
453 }
454
455 fn endpoint_uri(&self) -> Uri {
456 PathAndQuery::from_static("/api/v0.2/stats").into()
457 }
458
459 fn endpoint_method(&self) -> Method {
460 Method::POST
461 }
462
463 fn content_type(&self) -> HeaderValue {
464 CONTENT_TYPE_MSGPACK.clone()
465 }
466}