1#![allow(dead_code)]
2
3use std::{fmt::Write, time::Duration};
4
5use async_trait::async_trait;
6use datadog_protos::traces::builders::{
7 attribute_any_value::AttributeAnyValueType, attribute_array_value::AttributeArrayValueType, AgentPayloadBuilder,
8 AttributeAnyValueBuilder, AttributeArrayValueBuilder,
9};
10use facet::Facet;
11use http::{uri::PathAndQuery, HeaderName, HeaderValue, Method, Uri};
12use piecemeal::{ScratchBuffer, ScratchWriter};
13use resource_accounting::{MemoryBounds, MemoryBoundsBuilder};
14use saluki_common::collections::FastHashMap;
15use saluki_common::strings::StringBuilder;
16use saluki_common::task::HandleExt as _;
17use saluki_config::GenericConfiguration;
18use saluki_context::tags::TagSet;
19use saluki_core::data_model::event::trace::AttributeValue;
20use saluki_core::topology::{EventsBuffer, PayloadsBuffer};
21use saluki_core::{
22 components::{encoders::*, ComponentContext},
23 data_model::{
24 event::{trace::Trace, EventType},
25 payload::{HttpPayload, Payload, PayloadMetadata, PayloadType},
26 },
27 observability::ComponentMetricsExt as _,
28};
29use saluki_env::host::providers::BoxedHostProvider;
30use saluki_env::{EnvironmentProvider, HostProvider};
31use saluki_error::generic_error;
32use saluki_error::{ErrorContext as _, GenericError};
33use saluki_io::compression::CompressionScheme;
34use saluki_metrics::MetricsBuilder;
35use serde::Deserialize;
36use stringtheory::MetaString;
37use tokio::{
38 select,
39 sync::mpsc::{self, Receiver, Sender},
40 time::sleep,
41};
42use tracing::{debug, error};
43
44use crate::common::datadog::{
45 apm::ApmConfig,
46 io::RB_BUFFER_CHUNK_SIZE,
47 request_builder::{EndpointEncoder, RequestBuilder},
48 telemetry::ComponentTelemetry,
49 DEFAULT_INTAKE_COMPRESSED_SIZE_LIMIT, DEFAULT_INTAKE_UNCOMPRESSED_SIZE_LIMIT, TAG_DECISION_MAKER,
50};
51use crate::common::otlp::config::TracesConfig;
52use crate::common::otlp::util::{
53 attributes_to_source, extract_container_tags_from_attributes_map, Source as OtlpSource,
54 SourceKind as OtlpSourceKind, KEY_DATADOG_CONTAINER_TAGS,
55};
56
57const CONTAINER_TAGS_META_KEY: &str = "_dd.tags.container";
58const MAX_TRACES_PER_PAYLOAD: usize = 10000;
59static CONTENT_TYPE_PROTOBUF: HeaderValue = HeaderValue::from_static("application/x-protobuf");
60
61const TAG_OTLP_SAMPLING_RATE: &str = "_dd.otlp_sr";
63const DEFAULT_CHUNK_PRIORITY: i32 = 1; fn default_serializer_compressor_kind() -> String {
66 "zstd".to_string()
67}
68
69const fn default_zstd_compressor_level() -> i32 {
70 3
71}
72
73const fn default_flush_timeout_secs() -> u64 {
74 2
75}
76
77fn default_env() -> String {
78 "none".to_string()
79}
80
81#[derive(Deserialize, Facet)]
87#[cfg_attr(test, derive(Debug, PartialEq, serde::Serialize))]
88pub struct DatadogTraceConfiguration {
89 #[serde(
90 rename = "serializer_compressor_kind", default = "default_serializer_compressor_kind"
92 )]
93 compressor_kind: String,
94
95 #[serde(
96 rename = "serializer_zstd_compressor_level",
97 default = "default_zstd_compressor_level"
98 )]
99 zstd_compressor_level: i32,
100
101 #[serde(default = "default_flush_timeout_secs")]
109 flush_timeout_secs: u64,
110
111 #[serde(skip)]
112 default_hostname: Option<String>,
113
114 #[serde(skip)]
115 version: String,
116
117 #[serde(skip)]
118 #[facet(opaque)]
119 apm_config: ApmConfig,
120
121 #[serde(skip)]
122 #[facet(opaque)]
123 otlp_traces: TracesConfig,
124
125 #[serde(default = "default_env")]
126 env: String,
127}
128
129impl DatadogTraceConfiguration {
130 pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
132 let mut trace_config: Self = config.as_typed()?;
133
134 let app_details = saluki_metadata::get_app_details();
135 trace_config.version = format!("agent-data-plane/{}", app_details.version().raw());
136
137 trace_config.apm_config = ApmConfig::from_configuration(config)?;
138 trace_config.otlp_traces = config.try_get_typed("otlp_config.traces")?.unwrap_or_default();
139
140 Ok(trace_config)
141 }
142}
143
144impl DatadogTraceConfiguration {
145 pub async fn with_environment_provider<E>(mut self, environment_provider: E) -> Result<Self, GenericError>
147 where
148 E: EnvironmentProvider<Host = BoxedHostProvider>,
149 {
150 let host_provider = environment_provider.host();
151 let hostname = host_provider.get_hostname().await?;
152 self.default_hostname = Some(hostname);
153 Ok(self)
154 }
155}
156
157#[async_trait]
158impl EncoderBuilder for DatadogTraceConfiguration {
159 fn input_event_type(&self) -> EventType {
160 EventType::Trace
161 }
162
163 fn output_payload_type(&self) -> PayloadType {
164 PayloadType::Http
165 }
166
167 async fn build(&self, context: ComponentContext) -> Result<Box<dyn Encoder + Send>, GenericError> {
168 let metrics_builder = MetricsBuilder::from_component_context(&context);
169 let telemetry = ComponentTelemetry::from_builder(&metrics_builder);
170 let compression_scheme = CompressionScheme::new(&self.compressor_kind, self.zstd_compressor_level);
171
172 let default_hostname = self.default_hostname.clone().unwrap_or_default();
173 let default_hostname = MetaString::from(default_hostname);
174
175 let mut trace_rb = RequestBuilder::new(
178 TraceEndpointEncoder::new(
179 default_hostname,
180 self.version.clone(),
181 self.env.clone(),
182 self.apm_config.clone(),
183 self.otlp_traces.clone(),
184 ),
185 compression_scheme,
186 RB_BUFFER_CHUNK_SIZE,
187 )
188 .await?;
189 trace_rb.with_max_inputs_per_payload(MAX_TRACES_PER_PAYLOAD);
190
191 let flush_timeout = match self.flush_timeout_secs {
192 0 => Duration::from_millis(10),
195 secs => Duration::from_secs(secs),
196 };
197
198 Ok(Box::new(DatadogTrace {
199 trace_rb,
200 telemetry,
201 flush_timeout,
202 }))
203 }
204}
205
206impl MemoryBounds for DatadogTraceConfiguration {
207 fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
208 builder
210 .minimum()
211 .with_single_value::<DatadogTrace>("component struct")
212 .with_array::<EventsBuffer>("request builder events channel", 8)
213 .with_array::<PayloadsBuffer>("request builder payloads channel", 8);
214
215 builder
216 .firm()
217 .with_array::<Trace>("traces split re-encode buffer", MAX_TRACES_PER_PAYLOAD);
218 }
219}
220
221pub struct DatadogTrace {
222 trace_rb: RequestBuilder<TraceEndpointEncoder>,
223 telemetry: ComponentTelemetry,
224 flush_timeout: Duration,
225}
226
227#[async_trait]
229impl Encoder for DatadogTrace {
230 async fn run(mut self: Box<Self>, mut context: EncoderContext) -> Result<(), GenericError> {
231 let Self {
232 trace_rb,
233 telemetry,
234 flush_timeout,
235 } = *self;
236
237 let mut health = context.take_health_handle();
238
239 let (events_tx, events_rx) = mpsc::channel(8);
242 let (payloads_tx, mut payloads_rx) = mpsc::channel(8);
244 let request_builder_fut = run_request_builder(trace_rb, telemetry, events_rx, payloads_tx, flush_timeout);
245 let request_builder_handle = context
247 .topology_context()
248 .global_thread_pool() .spawn_traced_named("dd-traces-request-builder", request_builder_fut);
250
251 health.mark_ready();
252 debug!("Datadog Trace encoder started.");
253
254 loop {
255 select! {
256 biased; _ = health.live() => continue,
259 maybe_payload = payloads_rx.recv() => match maybe_payload {
260 Some(payload) => {
261 if let Err(e) = context.dispatcher().dispatch(payload).await {
263 error!("Failed to dispatch payload: {}", e);
264 }
265 }
266 None => break,
267 },
268 maybe_event_buffer = context.events().next() => match maybe_event_buffer {
269 Some(event_buffer) => events_tx.send(event_buffer).await
270 .error_context("Failed to send event buffer to request builder task.")?,
271 None => break,
272 },
273 }
274 }
275
276 drop(events_tx);
278
279 while let Some(payload) = payloads_rx.recv().await {
281 if let Err(e) = context.dispatcher().dispatch(payload).await {
282 error!("Failed to dispatch payload: {}", e);
283 }
284 }
285
286 match request_builder_handle.await {
288 Ok(Ok(())) => debug!("Request builder task stopped."),
289 Ok(Err(e)) => error!(error = %e, "Request builder task failed."),
290 Err(e) => error!(error = %e, "Request builder task panicked."),
291 }
292
293 debug!("Datadog Trace encoder stopped.");
294
295 Ok(())
296 }
297}
298
299async fn run_request_builder(
300 mut trace_request_builder: RequestBuilder<TraceEndpointEncoder>, telemetry: ComponentTelemetry,
301 mut events_rx: Receiver<EventsBuffer>, payloads_tx: Sender<PayloadsBuffer>, flush_timeout: std::time::Duration,
302) -> Result<(), GenericError> {
303 let mut pending_flush = false;
304 let pending_flush_timeout = sleep(flush_timeout);
305 tokio::pin!(pending_flush_timeout);
306
307 loop {
308 select! {
309 Some(event_buffer) = events_rx.recv() => {
310 for event in event_buffer {
311 let trace = match event.try_into_trace() {
312 Some(trace) => trace,
313 None => continue,
314 };
315 let trace_to_retry = match trace_request_builder.encode(trace).await {
318 Ok(None) => continue,
319 Ok(Some(trace)) => trace,
320 Err(e) => {
321 error!(error = %e, "Failed to encode trace.");
322 telemetry.events_dropped_encoder().increment(1);
323 continue;
324 }
325 };
326
327 let maybe_requests = trace_request_builder.flush().await;
328 if maybe_requests.is_empty() {
329 panic!("builder told us to flush, but gave us nothing");
330 }
331
332 for maybe_request in maybe_requests {
333 match maybe_request {
334 Ok((events, _data_points, request)) => {
335 let payload_meta = PayloadMetadata::from_event_count(events);
336 let http_payload = HttpPayload::new(payload_meta, request);
337 let payload = Payload::Http(http_payload);
338
339 payloads_tx.send(payload).await
340 .map_err(|_| generic_error!("Failed to send payload to encoder."))?;
341 },
342 Err(e) => if e.is_recoverable() {
343 continue;
345 } else {
346 return Err(GenericError::from(e).context("Failed to flush request."));
347 }
348 }
349 }
350
351 if let Err(e) = trace_request_builder.encode(trace_to_retry).await {
353 error!(error = %e, "Failed to encode trace.");
354 telemetry.events_dropped_encoder().increment(1);
355 }
356 }
357
358 debug!("Processed event buffer.");
359
360 if !pending_flush {
362 pending_flush_timeout.as_mut().reset(tokio::time::Instant::now() + flush_timeout);
363 pending_flush = true;
364 }
365 },
366 _ = &mut pending_flush_timeout, if pending_flush => {
367 debug!("Flushing pending request(s).");
368
369 pending_flush = false;
370
371 let maybe_trace_requests = trace_request_builder.flush().await;
374 for maybe_request in maybe_trace_requests {
375 match maybe_request {
376 Ok((events, _data_points, request)) => {
377 let payload_meta = PayloadMetadata::from_event_count(events);
378 let http_payload = HttpPayload::new(payload_meta, request);
379 let payload = Payload::Http(http_payload);
380
381 payloads_tx.send(payload).await
382 .map_err(|_| generic_error!("Failed to send payload to encoder."))?;
383 },
384 Err(e) => if e.is_recoverable() {
385 continue;
386 } else {
387 return Err(GenericError::from(e).context("Failed to flush request."));
388 }
389 }
390 }
391
392 debug!("All flushed requests sent to I/O task. Waiting for next event buffer...");
393 },
394
395 else => break,
397 }
398 }
399
400 Ok(())
401}
402
403#[derive(Debug)]
404struct TraceEndpointEncoder {
405 scratch: ScratchWriter<Vec<u8>>,
406 default_hostname: MetaString,
407 agent_hostname: String,
408 version: String,
409 env: String,
410 apm_config: ApmConfig,
411 otlp_traces: TracesConfig,
412 string_builder: StringBuilder,
413 error_tracking_standalone: bool,
414 extra_headers: Vec<(HeaderName, HeaderValue)>,
415}
416
417impl TraceEndpointEncoder {
418 fn new(
419 default_hostname: MetaString, version: String, env: String, apm_config: ApmConfig, otlp_traces: TracesConfig,
420 ) -> Self {
421 let error_tracking_standalone = apm_config.error_tracking_standalone_enabled();
422 let extra_headers = if error_tracking_standalone {
423 vec![(
424 HeaderName::from_static("x-datadog-error-tracking-standalone"),
425 HeaderValue::from_static("true"),
426 )]
427 } else {
428 Vec::new()
429 };
430 Self {
431 scratch: ScratchWriter::new(Vec::with_capacity(8192)),
432 agent_hostname: default_hostname.as_ref().to_string(),
433 default_hostname,
434 version,
435 env,
436 apm_config,
437 otlp_traces,
438 string_builder: StringBuilder::new(),
439 error_tracking_standalone,
440 extra_headers,
441 }
442 }
443
444 fn encode_tracer_payload(&mut self, trace: &Trace, output_buffer: &mut Vec<u8>) -> std::io::Result<()> {
445 let sampling_rate = self.sampling_rate();
446 let source = attributes_to_source(&trace.attributes);
447
448 let container_id = if !trace.payload.container_id.is_empty() {
450 Some(trace.payload.container_id.as_ref())
451 } else {
452 None
453 };
454 let lang = if !trace.payload.language_name.is_empty() {
455 Some(trace.payload.language_name.as_ref())
456 } else {
457 None
458 };
459 let tracer_version = format!("otlp-{}", trace.payload.tracer_version.as_ref());
460 let container_tags = resolve_container_tags_from_attrs(
461 &trace.attributes,
462 source.as_ref(),
463 self.otlp_traces.ignore_missing_datadog_fields,
464 );
465 let env = if !trace.payload.env.is_empty() {
466 Some(trace.payload.env.as_ref())
467 } else if self.otlp_traces.ignore_missing_datadog_fields {
468 Some("")
469 } else {
470 None
471 };
472 let hostname = resolve_hostname_from_payload(
473 trace.payload.hostname.as_ref(),
474 source.as_ref(),
475 Some(self.default_hostname.as_ref()),
476 self.otlp_traces.ignore_missing_datadog_fields,
477 );
478 let app_version = if !trace.payload.app_version.is_empty() {
479 Some(trace.payload.app_version.as_ref())
480 } else {
481 None
482 };
483
484 let priority = trace.priority.unwrap_or(DEFAULT_CHUNK_PRIORITY);
486 let dropped_trace = trace.dropped_trace;
487 let decision_maker = trace.decision_maker.as_deref();
488 let otlp_sr = trace.otlp_sampling_rate.unwrap_or(sampling_rate);
489
490 let mut ap_builder = AgentPayloadBuilder::new(&mut self.scratch);
492
493 ap_builder
494 .host_name(&self.agent_hostname)?
495 .env(&self.env)?
496 .agent_version(&self.version)?
497 .target_tps(self.apm_config.target_traces_per_second())?
498 .error_tps(self.apm_config.errors_per_second())?;
499
500 ap_builder.add_tracer_payloads(|tp| {
501 if let Some(cid) = container_id {
502 tp.container_id(cid)?;
503 }
504 if let Some(l) = lang {
505 tp.language_name(l)?;
506 }
507 tp.tracer_version(&tracer_version)?;
508
509 tp.add_chunks(|chunk| {
511 chunk.priority(priority)?;
512
513 for span in trace.spans() {
514 chunk.add_spans(|s| {
515 s.service(span.service())?
516 .name(span.name())?
517 .resource(span.resource())?
518 .trace_id(trace.trace_id_low)?
519 .span_id(span.span_id())?
520 .parent_id(span.parent_id())?
521 .start(span.start() as i64)?
522 .duration(span.duration() as i64)?
523 .error(span.error())?;
524
525 {
526 let mut meta = s.meta();
527 for (k, v) in &span.attributes {
528 match v {
529 AttributeValue::String(str_val) => {
530 meta.write_entry(k.as_ref(), str_val.as_ref())?;
531 }
532 AttributeValue::Bool(b) => {
533 meta.write_entry(k.as_ref(), if *b { "true" } else { "false" })?;
534 }
535 _ => {}
536 }
537 }
538 }
539
540 {
541 let mut metrics = s.metrics();
542 for (k, v) in &span.attributes {
543 match v {
544 AttributeValue::Float(f) => metrics.write_entry(k.as_ref(), *f)?,
545 AttributeValue::Int(i) => metrics.write_entry(k.as_ref(), *i as f64)?,
546 _ => {}
547 }
548 }
549 }
550
551 s.type_(span.span_type())?;
552
553 {
554 let mut ms = s.meta_struct();
555 for (k, v) in &span.attributes {
556 if let AttributeValue::Bytes(bytes) = v {
559 ms.write_entry(k.as_ref(), bytes.as_slice())?;
560 }
561 }
562 }
563
564 for link in span.span_links() {
565 s.add_span_links(|sl| {
566 sl.trace_id(link.trace_id())?
567 .trace_id_high(link.trace_id_high())?
568 .span_id(link.span_id())?;
569 {
570 let mut attrs = sl.attributes();
574 for (k, v) in link.attributes() {
575 if let AttributeValue::String(str_val) = v {
576 attrs.write_entry(k.as_ref(), str_val.as_ref())?;
577 }
578 }
579 }
580 let tracestate = link.tracestate().to_string();
581 sl.tracestate(tracestate.as_str())?.flags(link.flags())?;
582 Ok(())
583 })?;
584 }
585
586 for event in span.span_events() {
587 s.add_span_events(|se| {
588 se.time_unix_nano(event.time_unix_nano())?.name(event.name())?;
589 {
590 let mut attrs = se.attributes();
591 for (k, v) in event.attributes() {
592 attrs.write_entry(&**k, |av| encode_attribute_value(av, v))?;
593 }
594 }
595 Ok(())
596 })?;
597 }
598
599 Ok(())
600 })?;
601 }
602
603 {
605 let mut tags = chunk.tags();
606 if let Some(dm) = decision_maker {
607 tags.write_entry(TAG_DECISION_MAKER, dm)?;
608 }
609 if self.error_tracking_standalone {
610 let trace_has_error = trace.spans().iter().any(|span| {
611 span.error() != 0
612 || span
613 .attributes
614 .get("_dd.span_events.has_exception")
615 .and_then(AttributeValue::as_string)
616 .is_some_and(|v| v == "true")
617 });
618 if trace_has_error {
619 tags.write_entry("_dd.error_tracking_standalone.error", "true")?;
620 }
621 }
622
623 self.string_builder.clear();
624 write!(&mut self.string_builder, "{:.2}", otlp_sr)
625 .expect("should never fail to format sampling rate");
626 tags.write_entry(TAG_OTLP_SAMPLING_RATE, self.string_builder.as_str())?;
627 }
628
629 if dropped_trace {
630 chunk.dropped_trace(true)?;
631 }
632
633 Ok(())
634 })?;
635
636 if let Some(ct) = container_tags {
638 let mut tags = tp.tags();
639 tags.write_entry(CONTAINER_TAGS_META_KEY, &*ct)?;
640 }
641
642 if let Some(e) = env {
643 tp.env(e)?;
644 }
645 if let Some(h) = hostname {
646 tp.hostname(h)?;
647 }
648 if let Some(av) = app_version {
649 tp.app_version(av)?;
650 }
651
652 Ok(())
653 })?;
654
655 ap_builder.finish(output_buffer)?;
656
657 Ok(())
658 }
659
660 fn sampling_rate(&self) -> f64 {
661 let rate = self.otlp_traces.probabilistic_sampler.sampling_percentage / 100.0;
662 if rate <= 0.0 || rate >= 1.0 {
663 return 1.0;
664 }
665 rate
666 }
667}
668
669impl EndpointEncoder for TraceEndpointEncoder {
670 type Input = Trace;
671 type EncodeError = std::io::Error;
672 fn encoder_name() -> &'static str {
673 "traces"
674 }
675
676 fn compressed_size_limit(&self) -> usize {
677 DEFAULT_INTAKE_COMPRESSED_SIZE_LIMIT
678 }
679
680 fn uncompressed_size_limit(&self) -> usize {
681 DEFAULT_INTAKE_UNCOMPRESSED_SIZE_LIMIT
682 }
683
684 fn encode(&mut self, trace: &Self::Input, buffer: &mut Vec<u8>) -> Result<(), Self::EncodeError> {
685 self.encode_tracer_payload(trace, buffer)
686 }
687
688 fn endpoint_uri(&self) -> Uri {
689 PathAndQuery::from_static("/api/v0.2/traces").into()
690 }
691
692 fn endpoint_method(&self) -> Method {
693 Method::POST
694 }
695
696 fn content_type(&self) -> HeaderValue {
697 CONTENT_TYPE_PROTOBUF.clone()
698 }
699
700 fn additional_headers(&self) -> &[(HeaderName, HeaderValue)] {
701 &self.extra_headers
702 }
703}
704
705fn encode_attribute_value<S: ScratchBuffer>(
706 builder: &mut AttributeAnyValueBuilder<'_, S>, value: &AttributeValue,
707) -> std::io::Result<()> {
708 match value {
709 AttributeValue::String(v) => {
710 builder.type_(AttributeAnyValueType::STRING_VALUE)?.string_value(v)?;
711 }
712 AttributeValue::Bool(v) => {
713 builder.type_(AttributeAnyValueType::BOOL_VALUE)?.bool_value(*v)?;
714 }
715 AttributeValue::Int(v) => {
716 builder.type_(AttributeAnyValueType::INT_VALUE)?.int_value(*v)?;
717 }
718 AttributeValue::Float(v) => {
719 builder.type_(AttributeAnyValueType::DOUBLE_VALUE)?.double_value(*v)?;
720 }
721 AttributeValue::Bytes(_) => {
722 }
724 AttributeValue::Array(values) => {
725 builder.type_(AttributeAnyValueType::ARRAY_VALUE)?.array_value(|arr| {
726 for val in values {
727 arr.add_values(|av| encode_attribute_array_value(av, val))?;
728 }
729 Ok(())
730 })?;
731 }
732 AttributeValue::KeyValueList(_) => {
733 }
735 }
736 Ok(())
737}
738
739fn encode_attribute_array_value<S: ScratchBuffer>(
740 builder: &mut AttributeArrayValueBuilder<'_, S>, value: &AttributeValue,
741) -> std::io::Result<()> {
742 match value {
743 AttributeValue::String(v) => {
744 builder.type_(AttributeArrayValueType::STRING_VALUE)?.string_value(v)?;
745 }
746 AttributeValue::Bool(v) => {
747 builder.type_(AttributeArrayValueType::BOOL_VALUE)?.bool_value(*v)?;
748 }
749 AttributeValue::Int(v) => {
750 builder.type_(AttributeArrayValueType::INT_VALUE)?.int_value(*v)?;
751 }
752 AttributeValue::Float(v) => {
753 builder.type_(AttributeArrayValueType::DOUBLE_VALUE)?.double_value(*v)?;
754 }
755 AttributeValue::Bytes(_) | AttributeValue::Array(_) | AttributeValue::KeyValueList(_) => {
756 }
758 }
759 Ok(())
760}
761
762fn resolve_hostname_from_payload<'a>(
763 payload_hostname: &'a str, source: Option<&'a OtlpSource>, default_hostname: Option<&'a str>,
764 ignore_missing_fields: bool,
765) -> Option<&'a str> {
766 if !payload_hostname.is_empty() {
767 return Some(payload_hostname);
768 }
769 if ignore_missing_fields {
770 return Some("");
771 }
772 match source {
773 Some(src) => match src.kind {
774 OtlpSourceKind::HostnameKind => Some(src.identifier.as_str()),
775 _ => Some(""),
776 },
777 None => default_hostname,
778 }
779}
780
781fn resolve_container_tags_from_attrs(
782 attributes: &FastHashMap<MetaString, AttributeValue>, source: Option<&OtlpSource>, ignore_missing_fields: bool,
783) -> Option<MetaString> {
784 if let Some(AttributeValue::String(tags)) = attributes.get(KEY_DATADOG_CONTAINER_TAGS) {
785 if !tags.is_empty() {
786 return Some(tags.clone());
787 }
788 }
789
790 if ignore_missing_fields {
791 return None;
792 }
793 let mut container_tags = TagSet::default();
794 extract_container_tags_from_attributes_map(attributes, &mut container_tags);
795 let is_fargate_source = source.is_some_and(|src| src.kind == OtlpSourceKind::AwsEcsFargateKind);
796 if container_tags.is_empty() && !is_fargate_source {
797 return None;
798 }
799
800 let mut flattened = flatten_container_tag(container_tags);
801 if is_fargate_source {
802 if let Some(src) = source {
803 append_tags(&mut flattened, &src.tag());
804 }
805 }
806
807 if flattened.is_empty() {
808 None
809 } else {
810 Some(MetaString::from(flattened))
811 }
812}
813
814fn flatten_container_tag(tags: TagSet) -> String {
815 let mut flattened = String::new();
816 for tag in tags {
817 if !flattened.is_empty() {
818 flattened.push(',');
819 }
820 flattened.push_str(tag.as_str());
821 }
822 flattened
823}
824
825fn append_tags(target: &mut String, tags: &str) {
826 if tags.is_empty() {
827 return;
828 }
829 if !target.is_empty() {
830 target.push(',');
831 }
832 target.push_str(tags);
833}
834
835#[cfg(test)]
836mod tests {
837 use datadog_protos::traces::AgentPayload;
838 use protobuf::Message as _;
839 use saluki_config::ConfigurationLoader;
840 use saluki_core::data_model::event::trace::{Span as DdSpan, Trace};
841 use stringtheory::MetaString;
842
843 use super::*;
844 use crate::common::datadog::apm::ApmConfig;
845 use crate::common::otlp::config::TracesConfig;
846 use crate::config::{DatadogRemapper, KEY_ALIASES};
847
848 async fn make_encoder(ets_enabled: bool) -> TraceEndpointEncoder {
849 let env_vars: Vec<(String, String)> = if ets_enabled {
850 vec![("APM_ERROR_TRACKING_STANDALONE_ENABLED".to_string(), "true".to_string())]
851 } else {
852 vec![]
853 };
854 let (cfg, _) = ConfigurationLoader::for_tests_with_provider_factory(
855 None,
856 Some(&env_vars),
857 false,
858 KEY_ALIASES,
859 DatadogRemapper::new,
860 )
861 .await;
862 let apm_config = ApmConfig::from_configuration(&cfg).expect("ApmConfig should deserialize");
863 TraceEndpointEncoder::new(
864 MetaString::from("test-host"),
865 "0.0.0".to_string(),
866 "none".to_string(),
867 apm_config,
868 TracesConfig::default(),
869 )
870 }
871
872 fn make_trace() -> Trace {
873 let span = DdSpan::new(
874 MetaString::from("svc"),
875 MetaString::from("op"),
876 MetaString::from("res"),
877 MetaString::from("web"),
878 1, 0, 0, 1000, 0, );
884 let mut trace = Trace::new(vec![span]);
885 trace.priority = Some(1);
886 trace
887 }
888
889 fn make_error_trace() -> Trace {
890 let span = DdSpan::new(
891 MetaString::from("svc"),
892 MetaString::from("op"),
893 MetaString::from("res"),
894 MetaString::from("web"),
895 1, 0, 0, 1000, 1, );
901 let mut trace = Trace::new(vec![span]);
902 trace.priority = Some(1);
903 trace
904 }
905
906 #[tokio::test]
907 async fn ets_header_present_when_enabled() {
908 let encoder = make_encoder(true).await;
909 let headers = encoder.additional_headers();
910 assert_eq!(headers.len(), 1);
911 assert_eq!(headers[0].0.as_str(), "x-datadog-error-tracking-standalone");
912 assert_eq!(headers[0].1, "true");
913 }
914
915 #[tokio::test]
916 async fn ets_header_absent_when_disabled() {
917 let encoder = make_encoder(false).await;
918 assert!(encoder.additional_headers().is_empty());
919 }
920
921 #[tokio::test]
922 async fn ets_chunk_tag_present_for_error_trace() {
923 let mut encoder = make_encoder(true).await;
924 let trace = make_error_trace();
925 let mut buf = Vec::new();
926 encoder.encode(&trace, &mut buf).expect("encode should succeed");
927 let payload = AgentPayload::parse_from_bytes(&buf).expect("should parse AgentPayload");
928 let tag_value = payload
929 .tracerPayloads
930 .iter()
931 .flat_map(|tp| tp.chunks.iter())
932 .find_map(|chunk| {
933 chunk
934 .tags
935 .get("_dd.error_tracking_standalone.error")
936 .map(|v| v.as_str())
937 });
938 assert_eq!(
939 tag_value,
940 Some("true"),
941 "ETS chunk tag should be present for error traces when ETS is enabled"
942 );
943 }
944
945 #[tokio::test]
946 async fn ets_chunk_tag_absent_for_non_error_trace() {
947 let mut encoder = make_encoder(true).await;
948 let trace = make_trace(); let mut buf = Vec::new();
950 encoder.encode(&trace, &mut buf).expect("encode should succeed");
951 let payload = AgentPayload::parse_from_bytes(&buf).expect("should parse AgentPayload");
952 let has_tag = payload
953 .tracerPayloads
954 .iter()
955 .flat_map(|tp| tp.chunks.iter())
956 .any(|chunk| chunk.tags.contains_key("_dd.error_tracking_standalone.error"));
957 assert!(!has_tag, "ETS chunk tag should be absent for non-error traces");
958 }
959
960 #[tokio::test]
961 async fn ets_chunk_tag_absent_when_disabled() {
962 let mut encoder = make_encoder(false).await;
963 let trace = make_trace();
964 let mut buf = Vec::new();
965 encoder.encode(&trace, &mut buf).expect("encode should succeed");
966 let payload = AgentPayload::parse_from_bytes(&buf).expect("should parse AgentPayload");
967 let has_tag = payload
968 .tracerPayloads
969 .iter()
970 .flat_map(|tp| tp.chunks.iter())
971 .any(|chunk| chunk.tags.contains_key("_dd.error_tracking_standalone.error"));
972 assert!(!has_tag, "ETS chunk tag should be absent when ETS is disabled");
973 }
974}
975
976#[cfg(test)]
977mod config_smoke {
978 use serde_json::json;
979
980 use super::DatadogTraceConfiguration;
981 use crate::config_registry::structs;
982 use crate::config_registry::test_support::run_config_smoke_tests;
983
984 #[tokio::test]
985 async fn smoke_test() {
986 run_config_smoke_tests(structs::DATADOG_TRACE_CONFIGURATION, &[], json!({}), |cfg| {
987 cfg.as_typed::<DatadogTraceConfiguration>()
988 .expect("DatadogTraceConfiguration should deserialize")
989 })
990 .await
991 }
992}