1#![allow(dead_code)]
2
3use std::{fmt::Write, time::Duration};
4
5use async_trait::async_trait;
6use datadog_protos::traces::builders::{
7 attribute_any_value::AttributeAnyValueType, attribute_array_value::AttributeArrayValueType, AgentPayloadBuilder,
8 AttributeAnyValueBuilder, AttributeArrayValueBuilder,
9};
10use facet::Facet;
11use http::{uri::PathAndQuery, HeaderName, HeaderValue, Method, Uri};
12use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
13use opentelemetry_semantic_conventions::resource::{
14 CONTAINER_ID, DEPLOYMENT_ENVIRONMENT_NAME, K8S_POD_UID, SERVICE_VERSION,
15};
16use piecemeal::{ScratchBuffer, ScratchWriter};
17use saluki_common::strings::StringBuilder;
18use saluki_common::task::HandleExt as _;
19use saluki_config::GenericConfiguration;
20use saluki_context::tags::TagSet;
21use saluki_core::data_model::event::trace::{AttributeScalarValue, AttributeValue, Span as DdSpan};
22use saluki_core::topology::{EventsBuffer, PayloadsBuffer};
23use saluki_core::{
24 components::{encoders::*, ComponentContext},
25 data_model::{
26 event::{trace::Trace, EventType},
27 payload::{HttpPayload, Payload, PayloadMetadata, PayloadType},
28 },
29 observability::ComponentMetricsExt as _,
30};
31use saluki_env::host::providers::BoxedHostProvider;
32use saluki_env::{EnvironmentProvider, HostProvider};
33use saluki_error::generic_error;
34use saluki_error::{ErrorContext as _, GenericError};
35use saluki_io::compression::CompressionScheme;
36use saluki_metrics::MetricsBuilder;
37use serde::Deserialize;
38use stringtheory::MetaString;
39use tokio::{
40 select,
41 sync::mpsc::{self, Receiver, Sender},
42 time::sleep,
43};
44use tracing::{debug, error};
45
46use crate::common::datadog::{
47 apm::ApmConfig,
48 io::RB_BUFFER_CHUNK_SIZE,
49 request_builder::{EndpointEncoder, RequestBuilder},
50 telemetry::ComponentTelemetry,
51 DEFAULT_INTAKE_COMPRESSED_SIZE_LIMIT, DEFAULT_INTAKE_UNCOMPRESSED_SIZE_LIMIT, TAG_DECISION_MAKER,
52};
53use crate::common::otlp::config::TracesConfig;
54use crate::common::otlp::util::{
55 extract_container_tags_from_resource_tagset, tags_to_source, Source as OtlpSource, SourceKind as OtlpSourceKind,
56 DEPLOYMENT_ENVIRONMENT_KEY, KEY_DATADOG_CONTAINER_ID, KEY_DATADOG_CONTAINER_TAGS, KEY_DATADOG_ENVIRONMENT,
57 KEY_DATADOG_HOST, KEY_DATADOG_VERSION,
58};
59
60const CONTAINER_TAGS_META_KEY: &str = "_dd.tags.container";
61const MAX_TRACES_PER_PAYLOAD: usize = 10000;
62static CONTENT_TYPE_PROTOBUF: HeaderValue = HeaderValue::from_static("application/x-protobuf");
63
64const TAG_OTLP_SAMPLING_RATE: &str = "_dd.otlp_sr";
66const DEFAULT_CHUNK_PRIORITY: i32 = 1; fn default_serializer_compressor_kind() -> String {
69 "zstd".to_string()
70}
71
72const fn default_zstd_compressor_level() -> i32 {
73 3
74}
75
76const fn default_flush_timeout_secs() -> u64 {
77 2
78}
79
80fn default_env() -> String {
81 "none".to_string()
82}
83
84#[derive(Deserialize, Facet)]
90#[cfg_attr(test, derive(Debug, PartialEq, serde::Serialize))]
91pub struct DatadogTraceConfiguration {
92 #[serde(
93 rename = "serializer_compressor_kind", default = "default_serializer_compressor_kind"
95 )]
96 compressor_kind: String,
97
98 #[serde(
99 rename = "serializer_zstd_compressor_level",
100 default = "default_zstd_compressor_level"
101 )]
102 zstd_compressor_level: i32,
103
104 #[serde(default = "default_flush_timeout_secs")]
112 flush_timeout_secs: u64,
113
114 #[serde(skip)]
115 default_hostname: Option<String>,
116
117 #[serde(skip)]
118 version: String,
119
120 #[serde(skip)]
121 #[facet(opaque)]
122 apm_config: ApmConfig,
123
124 #[serde(skip)]
125 #[facet(opaque)]
126 otlp_traces: TracesConfig,
127
128 #[serde(default = "default_env")]
129 env: String,
130}
131
132impl DatadogTraceConfiguration {
133 pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
135 let mut trace_config: Self = config.as_typed()?;
136
137 let app_details = saluki_metadata::get_app_details();
138 trace_config.version = format!("agent-data-plane/{}", app_details.version().raw());
139
140 trace_config.apm_config = ApmConfig::from_configuration(config)?;
141 trace_config.otlp_traces = config.try_get_typed("otlp_config.traces")?.unwrap_or_default();
142
143 Ok(trace_config)
144 }
145}
146
147impl DatadogTraceConfiguration {
148 pub async fn with_environment_provider<E>(mut self, environment_provider: E) -> Result<Self, GenericError>
150 where
151 E: EnvironmentProvider<Host = BoxedHostProvider>,
152 {
153 let host_provider = environment_provider.host();
154 let hostname = host_provider.get_hostname().await?;
155 self.default_hostname = Some(hostname);
156 Ok(self)
157 }
158}
159
160#[async_trait]
161impl EncoderBuilder for DatadogTraceConfiguration {
162 fn input_event_type(&self) -> EventType {
163 EventType::Trace
164 }
165
166 fn output_payload_type(&self) -> PayloadType {
167 PayloadType::Http
168 }
169
170 async fn build(&self, context: ComponentContext) -> Result<Box<dyn Encoder + Send>, GenericError> {
171 let metrics_builder = MetricsBuilder::from_component_context(&context);
172 let telemetry = ComponentTelemetry::from_builder(&metrics_builder);
173 let compression_scheme = CompressionScheme::new(&self.compressor_kind, self.zstd_compressor_level);
174
175 let default_hostname = self.default_hostname.clone().unwrap_or_default();
176 let default_hostname = MetaString::from(default_hostname);
177
178 let mut trace_rb = RequestBuilder::new(
181 TraceEndpointEncoder::new(
182 default_hostname,
183 self.version.clone(),
184 self.env.clone(),
185 self.apm_config.clone(),
186 self.otlp_traces.clone(),
187 ),
188 compression_scheme,
189 RB_BUFFER_CHUNK_SIZE,
190 )
191 .await?;
192 trace_rb.with_max_inputs_per_payload(MAX_TRACES_PER_PAYLOAD);
193
194 let flush_timeout = match self.flush_timeout_secs {
195 0 => Duration::from_millis(10),
198 secs => Duration::from_secs(secs),
199 };
200
201 Ok(Box::new(DatadogTrace {
202 trace_rb,
203 telemetry,
204 flush_timeout,
205 }))
206 }
207}
208
209impl MemoryBounds for DatadogTraceConfiguration {
210 fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
211 builder
213 .minimum()
214 .with_single_value::<DatadogTrace>("component struct")
215 .with_array::<EventsBuffer>("request builder events channel", 8)
216 .with_array::<PayloadsBuffer>("request builder payloads channel", 8);
217
218 builder
219 .firm()
220 .with_array::<Trace>("traces split re-encode buffer", MAX_TRACES_PER_PAYLOAD);
221 }
222}
223
224pub struct DatadogTrace {
225 trace_rb: RequestBuilder<TraceEndpointEncoder>,
226 telemetry: ComponentTelemetry,
227 flush_timeout: Duration,
228}
229
230#[async_trait]
232impl Encoder for DatadogTrace {
233 async fn run(mut self: Box<Self>, mut context: EncoderContext) -> Result<(), GenericError> {
234 let Self {
235 trace_rb,
236 telemetry,
237 flush_timeout,
238 } = *self;
239
240 let mut health = context.take_health_handle();
241
242 let (events_tx, events_rx) = mpsc::channel(8);
245 let (payloads_tx, mut payloads_rx) = mpsc::channel(8);
247 let request_builder_fut = run_request_builder(trace_rb, telemetry, events_rx, payloads_tx, flush_timeout);
248 let request_builder_handle = context
250 .topology_context()
251 .global_thread_pool() .spawn_traced_named("dd-traces-request-builder", request_builder_fut);
253
254 health.mark_ready();
255 debug!("Datadog Trace encoder started.");
256
257 loop {
258 select! {
259 biased; _ = health.live() => continue,
262 maybe_payload = payloads_rx.recv() => match maybe_payload {
263 Some(payload) => {
264 if let Err(e) = context.dispatcher().dispatch(payload).await {
266 error!("Failed to dispatch payload: {}", e);
267 }
268 }
269 None => break,
270 },
271 maybe_event_buffer = context.events().next() => match maybe_event_buffer {
272 Some(event_buffer) => events_tx.send(event_buffer).await
273 .error_context("Failed to send event buffer to request builder task.")?,
274 None => break,
275 },
276 }
277 }
278
279 drop(events_tx);
281
282 while let Some(payload) = payloads_rx.recv().await {
284 if let Err(e) = context.dispatcher().dispatch(payload).await {
285 error!("Failed to dispatch payload: {}", e);
286 }
287 }
288
289 match request_builder_handle.await {
291 Ok(Ok(())) => debug!("Request builder task stopped."),
292 Ok(Err(e)) => error!(error = %e, "Request builder task failed."),
293 Err(e) => error!(error = %e, "Request builder task panicked."),
294 }
295
296 debug!("Datadog Trace encoder stopped.");
297
298 Ok(())
299 }
300}
301
302async fn run_request_builder(
303 mut trace_request_builder: RequestBuilder<TraceEndpointEncoder>, telemetry: ComponentTelemetry,
304 mut events_rx: Receiver<EventsBuffer>, payloads_tx: Sender<PayloadsBuffer>, flush_timeout: std::time::Duration,
305) -> Result<(), GenericError> {
306 let mut pending_flush = false;
307 let pending_flush_timeout = sleep(flush_timeout);
308 tokio::pin!(pending_flush_timeout);
309
310 loop {
311 select! {
312 Some(event_buffer) = events_rx.recv() => {
313 for event in event_buffer {
314 let trace = match event.try_into_trace() {
315 Some(trace) => trace,
316 None => continue,
317 };
318 let trace_to_retry = match trace_request_builder.encode(trace).await {
321 Ok(None) => continue,
322 Ok(Some(trace)) => trace,
323 Err(e) => {
324 error!(error = %e, "Failed to encode trace.");
325 telemetry.events_dropped_encoder().increment(1);
326 continue;
327 }
328 };
329
330 let maybe_requests = trace_request_builder.flush().await;
331 if maybe_requests.is_empty() {
332 panic!("builder told us to flush, but gave us nothing");
333 }
334
335 for maybe_request in maybe_requests {
336 match maybe_request {
337 Ok((events, request)) => {
338 let payload_meta = PayloadMetadata::from_event_count(events);
339 let http_payload = HttpPayload::new(payload_meta, request);
340 let payload = Payload::Http(http_payload);
341
342 payloads_tx.send(payload).await
343 .map_err(|_| generic_error!("Failed to send payload to encoder."))?;
344 },
345 Err(e) => if e.is_recoverable() {
346 continue;
348 } else {
349 return Err(GenericError::from(e).context("Failed to flush request."));
350 }
351 }
352 }
353
354 if let Err(e) = trace_request_builder.encode(trace_to_retry).await {
356 error!(error = %e, "Failed to encode trace.");
357 telemetry.events_dropped_encoder().increment(1);
358 }
359 }
360
361 debug!("Processed event buffer.");
362
363 if !pending_flush {
365 pending_flush_timeout.as_mut().reset(tokio::time::Instant::now() + flush_timeout);
366 pending_flush = true;
367 }
368 },
369 _ = &mut pending_flush_timeout, if pending_flush => {
370 debug!("Flushing pending request(s).");
371
372 pending_flush = false;
373
374 let maybe_trace_requests = trace_request_builder.flush().await;
377 for maybe_request in maybe_trace_requests {
378 match maybe_request {
379 Ok((events, request)) => {
380 let payload_meta = PayloadMetadata::from_event_count(events);
381 let http_payload = HttpPayload::new(payload_meta, request);
382 let payload = Payload::Http(http_payload);
383
384 payloads_tx.send(payload).await
385 .map_err(|_| generic_error!("Failed to send payload to encoder."))?;
386 },
387 Err(e) => if e.is_recoverable() {
388 continue;
389 } else {
390 return Err(GenericError::from(e).context("Failed to flush request."));
391 }
392 }
393 }
394
395 debug!("All flushed requests sent to I/O task. Waiting for next event buffer...");
396 },
397
398 else => break,
400 }
401 }
402
403 Ok(())
404}
405
406#[derive(Debug)]
407struct TraceEndpointEncoder {
408 scratch: ScratchWriter<Vec<u8>>,
409 default_hostname: MetaString,
410 agent_hostname: String,
411 version: String,
412 env: String,
413 apm_config: ApmConfig,
414 otlp_traces: TracesConfig,
415 string_builder: StringBuilder,
416 error_tracking_standalone: bool,
417 extra_headers: Vec<(HeaderName, HeaderValue)>,
418}
419
420impl TraceEndpointEncoder {
421 fn new(
422 default_hostname: MetaString, version: String, env: String, apm_config: ApmConfig, otlp_traces: TracesConfig,
423 ) -> Self {
424 let error_tracking_standalone = apm_config.error_tracking_standalone_enabled();
425 let extra_headers = if error_tracking_standalone {
426 vec![(
427 HeaderName::from_static("x-datadog-error-tracking-standalone"),
428 HeaderValue::from_static("true"),
429 )]
430 } else {
431 Vec::new()
432 };
433 Self {
434 scratch: ScratchWriter::new(Vec::with_capacity(8192)),
435 agent_hostname: default_hostname.as_ref().to_string(),
436 default_hostname,
437 version,
438 env,
439 apm_config,
440 otlp_traces,
441 string_builder: StringBuilder::new(),
442 error_tracking_standalone,
443 extra_headers,
444 }
445 }
446
447 fn encode_tracer_payload(&mut self, trace: &Trace, output_buffer: &mut Vec<u8>) -> std::io::Result<()> {
448 let sampling_rate = self.sampling_rate();
449 let resource_tags = trace.resource_tags();
450 let first_span = trace.spans().first();
451 let source = tags_to_source(resource_tags);
452
453 let container_id = resolve_container_id(resource_tags, first_span);
455 let lang = get_resource_tag_value(resource_tags, "telemetry.sdk.language");
456 let sdk_version = get_resource_tag_value(resource_tags, "telemetry.sdk.version").unwrap_or("");
457 let tracer_version = format!("otlp-{}", sdk_version);
458 let container_tags = resolve_container_tags(
459 resource_tags,
460 source.as_ref(),
461 self.otlp_traces.ignore_missing_datadog_fields,
462 );
463 let env = resolve_env(resource_tags, self.otlp_traces.ignore_missing_datadog_fields);
464 let hostname = resolve_hostname(
465 resource_tags,
466 source.as_ref(),
467 Some(self.default_hostname.as_ref()),
468 self.otlp_traces.ignore_missing_datadog_fields,
469 );
470 let app_version = resolve_app_version(resource_tags);
471
472 let (priority, dropped_trace, decision_maker, otlp_sr) = match trace.sampling() {
474 Some(sampling) => (
475 sampling.priority.unwrap_or(DEFAULT_CHUNK_PRIORITY),
476 sampling.dropped_trace,
477 sampling.decision_maker.as_deref(),
478 sampling.otlp_sampling_rate.unwrap_or(sampling_rate),
479 ),
480 None => (DEFAULT_CHUNK_PRIORITY, false, None, sampling_rate),
481 };
482
483 let mut ap_builder = AgentPayloadBuilder::new(&mut self.scratch);
485
486 ap_builder
487 .host_name(&self.agent_hostname)?
488 .env(&self.env)?
489 .agent_version(&self.version)?
490 .target_tps(self.apm_config.target_traces_per_second())?
491 .error_tps(self.apm_config.errors_per_second())?;
492
493 ap_builder.add_tracer_payloads(|tp| {
494 if let Some(cid) = container_id {
495 tp.container_id(cid)?;
496 }
497 if let Some(l) = lang {
498 tp.language_name(l)?;
499 }
500 tp.tracer_version(&tracer_version)?;
501
502 tp.add_chunks(|chunk| {
504 chunk.priority(priority)?;
505
506 for span in trace.spans() {
507 chunk.add_spans(|s| {
508 s.service(span.service())?
509 .name(span.name())?
510 .resource(span.resource())?
511 .trace_id(span.trace_id())?
512 .span_id(span.span_id())?
513 .parent_id(span.parent_id())?
514 .start(span.start() as i64)?
515 .duration(span.duration() as i64)?
516 .error(span.error())?;
517
518 {
519 let mut meta = s.meta();
520 for (k, v) in span.meta() {
521 meta.write_entry(k.as_ref(), v.as_ref())?;
522 }
523 }
524
525 {
526 let mut metrics = s.metrics();
527 for (k, v) in span.metrics() {
528 metrics.write_entry(k.as_ref(), *v)?;
529 }
530 }
531
532 s.type_(span.span_type())?;
533
534 {
535 let mut ms = s.meta_struct();
536 for (k, v) in span.meta_struct() {
537 ms.write_entry(k.as_ref(), v.as_slice())?;
538 }
539 }
540
541 for link in span.span_links() {
542 s.add_span_links(|sl| {
543 sl.trace_id(link.trace_id())?
544 .trace_id_high(link.trace_id_high())?
545 .span_id(link.span_id())?;
546 {
547 let mut attrs = sl.attributes();
548 for (k, v) in link.attributes() {
549 attrs.write_entry(&**k, &**v)?;
550 }
551 }
552 let tracestate = link.tracestate().to_string();
553 sl.tracestate(tracestate.as_str())?.flags(link.flags())?;
554 Ok(())
555 })?;
556 }
557
558 for event in span.span_events() {
559 s.add_span_events(|se| {
560 se.time_unix_nano(event.time_unix_nano())?.name(event.name())?;
561 {
562 let mut attrs = se.attributes();
563 for (k, v) in event.attributes() {
564 attrs.write_entry(&**k, |av| encode_attribute_value(av, v))?;
565 }
566 }
567 Ok(())
568 })?;
569 }
570
571 Ok(())
572 })?;
573 }
574
575 {
577 let mut tags = chunk.tags();
578 if let Some(dm) = decision_maker {
579 tags.write_entry(TAG_DECISION_MAKER, dm)?;
580 }
581 if self.error_tracking_standalone {
582 let trace_has_error = trace.spans().iter().any(|span| {
583 span.error() != 0
584 || span
585 .meta()
586 .get("_dd.span_events.has_exception")
587 .is_some_and(|v| v == "true")
588 });
589 if trace_has_error {
590 tags.write_entry("_dd.error_tracking_standalone.error", "true")?;
591 }
592 }
593
594 self.string_builder.clear();
595 write!(&mut self.string_builder, "{:.2}", otlp_sr)
596 .expect("should never fail to format sampling rate");
597 tags.write_entry(TAG_OTLP_SAMPLING_RATE, self.string_builder.as_str())?;
598 }
599
600 if dropped_trace {
601 chunk.dropped_trace(true)?;
602 }
603
604 Ok(())
605 })?;
606
607 if let Some(ct) = container_tags {
609 let mut tags = tp.tags();
610 tags.write_entry(CONTAINER_TAGS_META_KEY, &*ct)?;
611 }
612
613 if let Some(e) = env {
614 tp.env(e)?;
615 }
616 if let Some(h) = hostname {
617 tp.hostname(h)?;
618 }
619 if let Some(av) = app_version {
620 tp.app_version(av)?;
621 }
622
623 Ok(())
624 })?;
625
626 ap_builder.finish(output_buffer)?;
627
628 Ok(())
629 }
630
631 fn sampling_rate(&self) -> f64 {
632 let rate = self.otlp_traces.probabilistic_sampler.sampling_percentage / 100.0;
633 if rate <= 0.0 || rate >= 1.0 {
634 return 1.0;
635 }
636 rate
637 }
638}
639
640impl EndpointEncoder for TraceEndpointEncoder {
641 type Input = Trace;
642 type EncodeError = std::io::Error;
643 fn encoder_name() -> &'static str {
644 "traces"
645 }
646
647 fn compressed_size_limit(&self) -> usize {
648 DEFAULT_INTAKE_COMPRESSED_SIZE_LIMIT
649 }
650
651 fn uncompressed_size_limit(&self) -> usize {
652 DEFAULT_INTAKE_UNCOMPRESSED_SIZE_LIMIT
653 }
654
655 fn encode(&mut self, trace: &Self::Input, buffer: &mut Vec<u8>) -> Result<(), Self::EncodeError> {
656 self.encode_tracer_payload(trace, buffer)
657 }
658
659 fn endpoint_uri(&self) -> Uri {
660 PathAndQuery::from_static("/api/v0.2/traces").into()
661 }
662
663 fn endpoint_method(&self) -> Method {
664 Method::POST
665 }
666
667 fn content_type(&self) -> HeaderValue {
668 CONTENT_TYPE_PROTOBUF.clone()
669 }
670
671 fn additional_headers(&self) -> &[(HeaderName, HeaderValue)] {
672 &self.extra_headers
673 }
674}
675
676fn encode_attribute_value<S: ScratchBuffer>(
677 builder: &mut AttributeAnyValueBuilder<'_, S>, value: &AttributeValue,
678) -> std::io::Result<()> {
679 match value {
680 AttributeValue::String(v) => {
681 builder.type_(AttributeAnyValueType::STRING_VALUE)?.string_value(v)?;
682 }
683 AttributeValue::Bool(v) => {
684 builder.type_(AttributeAnyValueType::BOOL_VALUE)?.bool_value(*v)?;
685 }
686 AttributeValue::Int(v) => {
687 builder.type_(AttributeAnyValueType::INT_VALUE)?.int_value(*v)?;
688 }
689 AttributeValue::Double(v) => {
690 builder.type_(AttributeAnyValueType::DOUBLE_VALUE)?.double_value(*v)?;
691 }
692 AttributeValue::Array(values) => {
693 builder.type_(AttributeAnyValueType::ARRAY_VALUE)?.array_value(|arr| {
694 for val in values {
695 arr.add_values(|av| encode_attribute_array_value(av, val))?;
696 }
697 Ok(())
698 })?;
699 }
700 }
701 Ok(())
702}
703
704fn encode_attribute_array_value<S: ScratchBuffer>(
705 builder: &mut AttributeArrayValueBuilder<'_, S>, value: &AttributeScalarValue,
706) -> std::io::Result<()> {
707 match value {
708 AttributeScalarValue::String(v) => {
709 builder.type_(AttributeArrayValueType::STRING_VALUE)?.string_value(v)?;
710 }
711 AttributeScalarValue::Bool(v) => {
712 builder.type_(AttributeArrayValueType::BOOL_VALUE)?.bool_value(*v)?;
713 }
714 AttributeScalarValue::Int(v) => {
715 builder.type_(AttributeArrayValueType::INT_VALUE)?.int_value(*v)?;
716 }
717 AttributeScalarValue::Double(v) => {
718 builder.type_(AttributeArrayValueType::DOUBLE_VALUE)?.double_value(*v)?;
719 }
720 }
721 Ok(())
722}
723
724fn get_resource_tag_value<'a>(resource_tags: &'a TagSet, key: &str) -> Option<&'a str> {
725 resource_tags.get_single_tag(key).and_then(|t| t.value())
726}
727
728fn resolve_hostname<'a>(
729 resource_tags: &'a TagSet, source: Option<&'a OtlpSource>, default_hostname: Option<&'a str>,
730 ignore_missing_fields: bool,
731) -> Option<&'a str> {
732 let mut hostname = match source {
733 Some(src) => match src.kind {
734 OtlpSourceKind::HostnameKind => Some(src.identifier.as_str()),
735 _ => Some(""),
736 },
737 None => default_hostname,
738 };
739
740 if ignore_missing_fields {
741 hostname = Some("");
742 }
743
744 if let Some(value) = get_resource_tag_value(resource_tags, KEY_DATADOG_HOST) {
745 hostname = Some(value);
746 }
747
748 hostname
749}
750
751fn resolve_env(resource_tags: &TagSet, ignore_missing_fields: bool) -> Option<&str> {
752 if let Some(value) = get_resource_tag_value(resource_tags, KEY_DATADOG_ENVIRONMENT) {
753 return Some(value);
754 }
755 if ignore_missing_fields {
756 return None;
757 }
758 if let Some(value) = get_resource_tag_value(resource_tags, DEPLOYMENT_ENVIRONMENT_NAME) {
759 return Some(value);
760 }
761 get_resource_tag_value(resource_tags, DEPLOYMENT_ENVIRONMENT_KEY)
762}
763
764fn resolve_container_id<'a>(resource_tags: &'a TagSet, first_span: Option<&'a DdSpan>) -> Option<&'a str> {
765 for key in [KEY_DATADOG_CONTAINER_ID, CONTAINER_ID, K8S_POD_UID] {
766 if let Some(value) = get_resource_tag_value(resource_tags, key) {
767 return Some(value);
768 }
769 }
770 if let Some(span) = first_span {
773 for (k, v) in span.meta() {
774 if k == KEY_DATADOG_CONTAINER_ID || k == K8S_POD_UID {
775 return Some(v.as_ref());
776 }
777 }
778 }
779 None
780}
781
782fn resolve_app_version(resource_tags: &TagSet) -> Option<&str> {
783 if let Some(value) = get_resource_tag_value(resource_tags, KEY_DATADOG_VERSION) {
784 return Some(value);
785 }
786 get_resource_tag_value(resource_tags, SERVICE_VERSION)
787}
788
789fn resolve_container_tags(
790 resource_tags: &TagSet, source: Option<&OtlpSource>, ignore_missing_fields: bool,
791) -> Option<MetaString> {
792 if let Some(tags) = get_resource_tag_value(resource_tags, KEY_DATADOG_CONTAINER_TAGS) {
796 if !tags.is_empty() {
797 return Some(MetaString::from(tags));
798 }
799 }
800
801 if ignore_missing_fields {
802 return None;
803 }
804 let mut container_tags = TagSet::default();
805 extract_container_tags_from_resource_tagset(resource_tags, &mut container_tags);
806 let is_fargate_source = source.is_some_and(|src| src.kind == OtlpSourceKind::AwsEcsFargateKind);
807 if container_tags.is_empty() && !is_fargate_source {
808 return None;
809 }
810
811 let mut flattened = flatten_container_tag(container_tags);
812 if is_fargate_source {
813 if let Some(src) = source {
814 append_tags(&mut flattened, &src.tag());
815 }
816 }
817
818 if flattened.is_empty() {
819 None
820 } else {
821 Some(MetaString::from(flattened))
822 }
823}
824
825fn flatten_container_tag(tags: TagSet) -> String {
826 let mut flattened = String::new();
827 for tag in tags {
828 if !flattened.is_empty() {
829 flattened.push(',');
830 }
831 flattened.push_str(tag.as_str());
832 }
833 flattened
834}
835
836fn append_tags(target: &mut String, tags: &str) {
837 if tags.is_empty() {
838 return;
839 }
840 if !target.is_empty() {
841 target.push(',');
842 }
843 target.push_str(tags);
844}
845
846#[cfg(test)]
847mod tests {
848 use datadog_protos::traces::AgentPayload;
849 use protobuf::Message as _;
850 use saluki_config::ConfigurationLoader;
851 use saluki_context::tags::TagSet;
852 use saluki_core::data_model::event::trace::{Span as DdSpan, Trace, TraceSampling};
853 use stringtheory::MetaString;
854
855 use super::*;
856 use crate::common::datadog::apm::ApmConfig;
857 use crate::common::otlp::config::TracesConfig;
858 use crate::config::{DatadogRemapper, KEY_ALIASES};
859
860 async fn make_encoder(ets_enabled: bool) -> TraceEndpointEncoder {
861 let env_vars: Vec<(String, String)> = if ets_enabled {
862 vec![("APM_ERROR_TRACKING_STANDALONE_ENABLED".to_string(), "true".to_string())]
863 } else {
864 vec![]
865 };
866 let (cfg, _) = ConfigurationLoader::for_tests_with_provider_factory(
867 None,
868 Some(&env_vars),
869 false,
870 KEY_ALIASES,
871 DatadogRemapper::new,
872 )
873 .await;
874 let apm_config = ApmConfig::from_configuration(&cfg).expect("ApmConfig should deserialize");
875 TraceEndpointEncoder::new(
876 MetaString::from("test-host"),
877 "0.0.0".to_string(),
878 "none".to_string(),
879 apm_config,
880 TracesConfig::default(),
881 )
882 }
883
884 fn make_trace() -> Trace {
885 let span = DdSpan::new(
886 MetaString::from("svc"),
887 MetaString::from("op"),
888 MetaString::from("res"),
889 MetaString::from("web"),
890 1,
891 1,
892 0,
893 0,
894 1000,
895 0,
896 );
897 let mut trace = Trace::new(vec![span], TagSet::default());
898 trace.set_sampling(Some(TraceSampling::new(false, Some(1), None, None)));
899 trace
900 }
901
902 fn make_error_trace() -> Trace {
903 let span = DdSpan::new(
904 MetaString::from("svc"),
905 MetaString::from("op"),
906 MetaString::from("res"),
907 MetaString::from("web"),
908 1, 1, 0, 0, 1000, 1, );
915 let mut trace = Trace::new(vec![span], TagSet::default());
916 trace.set_sampling(Some(TraceSampling::new(false, Some(1), None, None)));
917 trace
918 }
919
920 #[tokio::test]
921 async fn ets_header_present_when_enabled() {
922 let encoder = make_encoder(true).await;
923 let headers = encoder.additional_headers();
924 assert_eq!(headers.len(), 1);
925 assert_eq!(headers[0].0.as_str(), "x-datadog-error-tracking-standalone");
926 assert_eq!(headers[0].1, "true");
927 }
928
929 #[tokio::test]
930 async fn ets_header_absent_when_disabled() {
931 let encoder = make_encoder(false).await;
932 assert!(encoder.additional_headers().is_empty());
933 }
934
935 #[tokio::test]
936 async fn ets_chunk_tag_present_for_error_trace() {
937 let mut encoder = make_encoder(true).await;
938 let trace = make_error_trace();
939 let mut buf = Vec::new();
940 encoder.encode(&trace, &mut buf).expect("encode should succeed");
941 let payload = AgentPayload::parse_from_bytes(&buf).expect("should parse AgentPayload");
942 let tag_value = payload
943 .tracerPayloads
944 .iter()
945 .flat_map(|tp| tp.chunks.iter())
946 .find_map(|chunk| {
947 chunk
948 .tags
949 .get("_dd.error_tracking_standalone.error")
950 .map(|v| v.as_str())
951 });
952 assert_eq!(
953 tag_value,
954 Some("true"),
955 "ETS chunk tag should be present for error traces when ETS is enabled"
956 );
957 }
958
959 #[tokio::test]
960 async fn ets_chunk_tag_absent_for_non_error_trace() {
961 let mut encoder = make_encoder(true).await;
962 let trace = make_trace(); let mut buf = Vec::new();
964 encoder.encode(&trace, &mut buf).expect("encode should succeed");
965 let payload = AgentPayload::parse_from_bytes(&buf).expect("should parse AgentPayload");
966 let has_tag = payload
967 .tracerPayloads
968 .iter()
969 .flat_map(|tp| tp.chunks.iter())
970 .any(|chunk| chunk.tags.contains_key("_dd.error_tracking_standalone.error"));
971 assert!(!has_tag, "ETS chunk tag should be absent for non-error traces");
972 }
973
974 #[tokio::test]
975 async fn ets_chunk_tag_absent_when_disabled() {
976 let mut encoder = make_encoder(false).await;
977 let trace = make_trace();
978 let mut buf = Vec::new();
979 encoder.encode(&trace, &mut buf).expect("encode should succeed");
980 let payload = AgentPayload::parse_from_bytes(&buf).expect("should parse AgentPayload");
981 let has_tag = payload
982 .tracerPayloads
983 .iter()
984 .flat_map(|tp| tp.chunks.iter())
985 .any(|chunk| chunk.tags.contains_key("_dd.error_tracking_standalone.error"));
986 assert!(!has_tag, "ETS chunk tag should be absent when ETS is disabled");
987 }
988}
989
990#[cfg(test)]
991mod config_smoke {
992 use serde_json::json;
993
994 use super::DatadogTraceConfiguration;
995 use crate::config_registry::structs;
996 use crate::config_registry::test_support::run_config_smoke_tests;
997
998 #[tokio::test]
999 async fn smoke_test() {
1000 run_config_smoke_tests(structs::DATADOG_TRACE_CONFIGURATION, &[], json!({}), |cfg| {
1001 cfg.as_typed::<DatadogTraceConfiguration>()
1002 .expect("DatadogTraceConfiguration should deserialize")
1003 })
1004 .await
1005 }
1006}