1#![allow(dead_code)]
2
3use std::{fmt::Write, time::Duration};
4
5use async_trait::async_trait;
6use datadog_protos::traces::builders::{
7 attribute_any_value::AttributeAnyValueType, attribute_array_value::AttributeArrayValueType, AgentPayloadBuilder,
8 AttributeAnyValueBuilder, AttributeArrayValueBuilder,
9};
10use facet::Facet;
11use http::{uri::PathAndQuery, HeaderName, HeaderValue, Method, Uri};
12use piecemeal::{ScratchBuffer, ScratchWriter};
13use resource_accounting::{MemoryBounds, MemoryBoundsBuilder};
14use saluki_common::collections::FastHashMap;
15use saluki_common::strings::StringBuilder;
16use saluki_common::task::HandleExt as _;
17use saluki_config::GenericConfiguration;
18use saluki_context::tags::TagSet;
19use saluki_core::data_model::event::trace::AttributeValue;
20use saluki_core::topology::{EventsBuffer, PayloadsBuffer};
21use saluki_core::{
22 components::{encoders::*, ComponentContext},
23 data_model::{
24 event::{trace::Trace, EventType},
25 payload::{HttpPayload, Payload, PayloadMetadata, PayloadType},
26 },
27 observability::ComponentMetricsExt as _,
28};
29use saluki_env::host::providers::BoxedHostProvider;
30use saluki_env::{EnvironmentProvider, HostProvider};
31use saluki_error::generic_error;
32use saluki_error::{ErrorContext as _, GenericError};
33use saluki_io::compression::CompressionScheme;
34use saluki_metrics::MetricsBuilder;
35use serde::Deserialize;
36use stringtheory::MetaString;
37use tokio::pin;
38use tokio::{
39 select,
40 sync::mpsc::{self, Receiver, Sender},
41 time::sleep,
42};
43use tracing::{debug, error};
44
45use crate::common::datadog::{
46 apm::ApmConfig,
47 io::RB_BUFFER_CHUNK_SIZE,
48 request_builder::{EndpointEncoder, RequestBuilder},
49 telemetry::ComponentTelemetry,
50 DEFAULT_INTAKE_COMPRESSED_SIZE_LIMIT, DEFAULT_INTAKE_UNCOMPRESSED_SIZE_LIMIT, TAG_DECISION_MAKER,
51};
52use crate::common::otlp::config::TracesConfig;
53use crate::common::otlp::util::{
54 attributes_to_source, extract_container_tags_from_attributes_map, Source as OtlpSource,
55 SourceKind as OtlpSourceKind, KEY_DATADOG_CONTAINER_TAGS,
56};
57
58const CONTAINER_TAGS_META_KEY: &str = "_dd.tags.container";
59const MAX_TRACES_PER_PAYLOAD: usize = 10000;
60static CONTENT_TYPE_PROTOBUF: HeaderValue = HeaderValue::from_static("application/x-protobuf");
61
62const TAG_OTLP_SAMPLING_RATE: &str = "_dd.otlp_sr";
64const DEFAULT_CHUNK_PRIORITY: i32 = 1; fn default_serializer_compressor_kind() -> String {
67 "zstd".to_string()
68}
69
70const fn default_zstd_compressor_level() -> i32 {
71 3
72}
73
74const fn default_flush_timeout_secs() -> u64 {
75 2
76}
77
78fn default_env() -> String {
79 "none".to_string()
80}
81
82#[derive(Deserialize, Facet)]
88#[cfg_attr(test, derive(Debug, PartialEq, serde::Serialize))]
89pub struct DatadogTraceConfiguration {
90 #[serde(
91 rename = "serializer_compressor_kind", default = "default_serializer_compressor_kind"
93 )]
94 compressor_kind: String,
95
96 #[serde(
97 rename = "serializer_zstd_compressor_level",
98 default = "default_zstd_compressor_level"
99 )]
100 zstd_compressor_level: i32,
101
102 #[serde(default = "default_flush_timeout_secs")]
110 flush_timeout_secs: u64,
111
112 #[serde(skip)]
113 default_hostname: Option<String>,
114
115 #[serde(skip)]
116 version: String,
117
118 #[serde(skip)]
119 #[facet(opaque)]
120 apm_config: ApmConfig,
121
122 #[serde(skip)]
123 #[facet(opaque)]
124 otlp_traces: TracesConfig,
125
126 #[serde(default = "default_env")]
127 env: String,
128}
129
130impl DatadogTraceConfiguration {
131 pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
133 let mut trace_config: Self = config.as_typed()?;
134
135 let app_details = saluki_metadata::get_app_details();
136 trace_config.version = format!("agent-data-plane/{}", app_details.version().raw());
137
138 trace_config.apm_config = ApmConfig::from_configuration(config)?;
139 trace_config.otlp_traces = config.try_get_typed("otlp_config.traces")?.unwrap_or_default();
140
141 Ok(trace_config)
142 }
143}
144
145impl DatadogTraceConfiguration {
146 pub async fn with_environment_provider<E>(mut self, environment_provider: E) -> Result<Self, GenericError>
148 where
149 E: EnvironmentProvider<Host = BoxedHostProvider>,
150 {
151 let host_provider = environment_provider.host();
152 let hostname = host_provider.get_hostname().await?;
153 self.default_hostname = Some(hostname);
154 Ok(self)
155 }
156}
157
158#[async_trait]
159impl EncoderBuilder for DatadogTraceConfiguration {
160 fn input_event_type(&self) -> EventType {
161 EventType::Trace
162 }
163
164 fn output_payload_type(&self) -> PayloadType {
165 PayloadType::Http
166 }
167
168 async fn build(&self, context: ComponentContext) -> Result<Box<dyn Encoder + Send>, GenericError> {
169 let metrics_builder = MetricsBuilder::from_component_context(&context);
170 let telemetry = ComponentTelemetry::from_builder(&metrics_builder);
171 let compression_scheme = CompressionScheme::new(&self.compressor_kind, self.zstd_compressor_level);
172
173 let default_hostname = self.default_hostname.clone().unwrap_or_default();
174 let default_hostname = MetaString::from(default_hostname);
175
176 let mut trace_rb = RequestBuilder::new(
179 TraceEndpointEncoder::new(
180 default_hostname,
181 self.version.clone(),
182 self.env.clone(),
183 self.apm_config.clone(),
184 self.otlp_traces.clone(),
185 ),
186 compression_scheme,
187 RB_BUFFER_CHUNK_SIZE,
188 )
189 .await?;
190 trace_rb.with_max_inputs_per_payload(MAX_TRACES_PER_PAYLOAD);
191
192 let flush_timeout = match self.flush_timeout_secs {
193 0 => Duration::from_millis(10),
196 secs => Duration::from_secs(secs),
197 };
198
199 Ok(Box::new(DatadogTrace {
200 trace_rb,
201 telemetry,
202 flush_timeout,
203 }))
204 }
205}
206
207impl MemoryBounds for DatadogTraceConfiguration {
208 fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
209 builder
211 .minimum()
212 .with_single_value::<DatadogTrace>("component struct")
213 .with_array::<EventsBuffer>("request builder events channel", 8)
214 .with_array::<PayloadsBuffer>("request builder payloads channel", 8);
215
216 builder
217 .firm()
218 .with_array::<Trace>("traces split re-encode buffer", MAX_TRACES_PER_PAYLOAD);
219 }
220}
221
222pub struct DatadogTrace {
223 trace_rb: RequestBuilder<TraceEndpointEncoder>,
224 telemetry: ComponentTelemetry,
225 flush_timeout: Duration,
226}
227
228#[async_trait]
230impl Encoder for DatadogTrace {
231 async fn run(mut self: Box<Self>, mut context: EncoderContext) -> Result<(), GenericError> {
232 let Self {
233 trace_rb,
234 telemetry,
235 flush_timeout,
236 } = *self;
237
238 let mut health = context.take_health_handle();
239
240 let (events_tx, events_rx) = mpsc::channel(8);
243 let (payloads_tx, mut payloads_rx) = mpsc::channel(8);
245 let request_builder_fut = run_request_builder(trace_rb, telemetry, events_rx, payloads_tx, flush_timeout);
246 let request_builder_handle = context
248 .topology_context()
249 .global_thread_pool() .spawn_traced_named("dd-traces-request-builder", request_builder_fut);
251
252 health.mark_ready();
253 debug!("Datadog Trace encoder started.");
254
255 loop {
256 select! {
257 biased; _ = health.live() => continue,
260 maybe_payload = payloads_rx.recv() => match maybe_payload {
261 Some(payload) => {
262 if let Err(e) = context.dispatcher().dispatch(payload).await {
264 error!("Failed to dispatch payload: {}", e);
265 }
266 }
267 None => break,
268 },
269 maybe_event_buffer = context.events().next() => match maybe_event_buffer {
270 Some(event_buffer) => events_tx.send(event_buffer).await
271 .error_context("Failed to send event buffer to request builder task.")?,
272 None => break,
273 },
274 }
275 }
276
277 drop(events_tx);
279
280 while let Some(payload) = payloads_rx.recv().await {
282 if let Err(e) = context.dispatcher().dispatch(payload).await {
283 error!("Failed to dispatch payload: {}", e);
284 }
285 }
286
287 match request_builder_handle.await {
289 Ok(Ok(())) => debug!("Request builder task stopped."),
290 Ok(Err(e)) => error!(error = %e, "Request builder task failed."),
291 Err(e) => error!(error = %e, "Request builder task panicked."),
292 }
293
294 debug!("Datadog Trace encoder stopped.");
295
296 Ok(())
297 }
298}
299
300async fn run_request_builder(
301 mut trace_request_builder: RequestBuilder<TraceEndpointEncoder>, telemetry: ComponentTelemetry,
302 mut events_rx: Receiver<EventsBuffer>, payloads_tx: Sender<PayloadsBuffer>, flush_timeout: std::time::Duration,
303) -> Result<(), GenericError> {
304 let mut pending_flush = false;
305 let pending_flush_timeout = sleep(flush_timeout);
306 pin!(pending_flush_timeout);
307
308 loop {
309 select! {
310 Some(event_buffer) = events_rx.recv() => {
311 for event in event_buffer {
312 let trace = match event.try_into_trace() {
313 Some(trace) => trace,
314 None => continue,
315 };
316 let trace_to_retry = match trace_request_builder.encode(trace).await {
319 Ok(None) => continue,
320 Ok(Some(trace)) => trace,
321 Err(e) => {
322 error!(error = %e, "Failed to encode trace.");
323 telemetry.events_dropped_encoder().increment(1);
324 continue;
325 }
326 };
327
328 let maybe_requests = trace_request_builder.flush().await;
329 if maybe_requests.is_empty() {
330 panic!("builder told us to flush, but gave us nothing");
331 }
332
333 for maybe_request in maybe_requests {
334 match maybe_request {
335 Ok((events, _data_points, request)) => {
336 let payload_meta = PayloadMetadata::from_event_count(events);
337 let http_payload = HttpPayload::new(payload_meta, request);
338 let payload = Payload::Http(http_payload);
339
340 payloads_tx.send(payload).await
341 .map_err(|_| generic_error!("Failed to send payload to encoder."))?;
342 },
343 Err(e) => if e.is_recoverable() {
344 continue;
346 } else {
347 return Err(GenericError::from(e).context("Failed to flush request."));
348 }
349 }
350 }
351
352 if let Err(e) = trace_request_builder.encode(trace_to_retry).await {
354 error!(error = %e, "Failed to encode trace.");
355 telemetry.events_dropped_encoder().increment(1);
356 }
357 }
358
359 debug!("Processed event buffer.");
360
361 if !pending_flush {
363 pending_flush_timeout.as_mut().reset(tokio::time::Instant::now() + flush_timeout);
364 pending_flush = true;
365 }
366 },
367 _ = &mut pending_flush_timeout, if pending_flush => {
368 debug!("Flushing pending request(s).");
369
370 pending_flush = false;
371
372 let maybe_trace_requests = trace_request_builder.flush().await;
375 for maybe_request in maybe_trace_requests {
376 match maybe_request {
377 Ok((events, _data_points, request)) => {
378 let payload_meta = PayloadMetadata::from_event_count(events);
379 let http_payload = HttpPayload::new(payload_meta, request);
380 let payload = Payload::Http(http_payload);
381
382 payloads_tx.send(payload).await
383 .map_err(|_| generic_error!("Failed to send payload to encoder."))?;
384 },
385 Err(e) => if e.is_recoverable() {
386 continue;
387 } else {
388 return Err(GenericError::from(e).context("Failed to flush request."));
389 }
390 }
391 }
392
393 debug!("All flushed requests sent to I/O task. Waiting for next event buffer...");
394 },
395
396 else => break,
398 }
399 }
400
401 Ok(())
402}
403
404#[derive(Debug)]
405struct TraceEndpointEncoder {
406 scratch: ScratchWriter<Vec<u8>>,
407 default_hostname: MetaString,
408 agent_hostname: String,
409 version: String,
410 env: String,
411 apm_config: ApmConfig,
412 otlp_traces: TracesConfig,
413 string_builder: StringBuilder,
414 error_tracking_standalone: bool,
415 extra_headers: Vec<(HeaderName, HeaderValue)>,
416}
417
418impl TraceEndpointEncoder {
419 fn new(
420 default_hostname: MetaString, version: String, env: String, apm_config: ApmConfig, otlp_traces: TracesConfig,
421 ) -> Self {
422 let error_tracking_standalone = apm_config.error_tracking_standalone_enabled();
423 let extra_headers = if error_tracking_standalone {
424 vec![(
425 HeaderName::from_static("x-datadog-error-tracking-standalone"),
426 HeaderValue::from_static("true"),
427 )]
428 } else {
429 Vec::new()
430 };
431 Self {
432 scratch: ScratchWriter::new(Vec::with_capacity(8192)),
433 agent_hostname: default_hostname.as_ref().to_string(),
434 default_hostname,
435 version,
436 env,
437 apm_config,
438 otlp_traces,
439 string_builder: StringBuilder::new(),
440 error_tracking_standalone,
441 extra_headers,
442 }
443 }
444
445 fn encode_tracer_payload(&mut self, trace: &Trace, output_buffer: &mut Vec<u8>) -> std::io::Result<()> {
446 let sampling_rate = self.sampling_rate();
447 let source = attributes_to_source(&trace.attributes);
448
449 let container_id = if !trace.payload.container_id.is_empty() {
451 Some(trace.payload.container_id.as_ref())
452 } else {
453 None
454 };
455 let lang = if !trace.payload.language_name.is_empty() {
456 Some(trace.payload.language_name.as_ref())
457 } else {
458 None
459 };
460 let tracer_version = format!("otlp-{}", trace.payload.tracer_version.as_ref());
461 let container_tags = resolve_container_tags_from_attrs(
462 &trace.attributes,
463 source.as_ref(),
464 self.otlp_traces.ignore_missing_datadog_fields,
465 );
466 let env = if !trace.payload.env.is_empty() {
467 Some(trace.payload.env.as_ref())
468 } else if self.otlp_traces.ignore_missing_datadog_fields {
469 Some("")
470 } else {
471 None
472 };
473 let hostname = resolve_hostname_from_payload(
474 trace.payload.hostname.as_ref(),
475 source.as_ref(),
476 Some(self.default_hostname.as_ref()),
477 self.otlp_traces.ignore_missing_datadog_fields,
478 );
479 let app_version = if !trace.payload.app_version.is_empty() {
480 Some(trace.payload.app_version.as_ref())
481 } else {
482 None
483 };
484
485 let priority = trace.priority.unwrap_or(DEFAULT_CHUNK_PRIORITY);
487 let dropped_trace = trace.dropped_trace;
488 let decision_maker = trace.decision_maker.as_deref();
489 let otlp_sr = trace.otlp_sampling_rate.unwrap_or(sampling_rate);
490
491 let mut ap_builder = AgentPayloadBuilder::new(&mut self.scratch);
493
494 ap_builder
495 .host_name(&self.agent_hostname)?
496 .env(&self.env)?
497 .agent_version(&self.version)?
498 .target_tps(self.apm_config.target_traces_per_second())?
499 .error_tps(self.apm_config.errors_per_second())?;
500
501 ap_builder.add_tracer_payloads(|tp| {
502 if let Some(cid) = container_id {
503 tp.container_id(cid)?;
504 }
505 if let Some(l) = lang {
506 tp.language_name(l)?;
507 }
508 tp.tracer_version(&tracer_version)?;
509
510 tp.add_chunks(|chunk| {
512 chunk.priority(priority)?;
513
514 for span in trace.spans() {
515 chunk.add_spans(|s| {
516 s.service(span.service())?
517 .name(span.name())?
518 .resource(span.resource())?
519 .trace_id(trace.trace_id_low)?
520 .span_id(span.span_id())?
521 .parent_id(span.parent_id())?
522 .start(span.start() as i64)?
523 .duration(span.duration() as i64)?
524 .error(span.error())?;
525
526 {
527 let mut meta = s.meta();
528 for (k, v) in &span.attributes {
529 match v {
530 AttributeValue::String(str_val) => {
531 meta.write_entry(k.as_ref(), str_val.as_ref())?;
532 }
533 AttributeValue::Bool(b) => {
534 meta.write_entry(k.as_ref(), if *b { "true" } else { "false" })?;
535 }
536 _ => {}
537 }
538 }
539 }
540
541 {
542 let mut metrics = s.metrics();
543 for (k, v) in &span.attributes {
544 match v {
545 AttributeValue::Float(f) => metrics.write_entry(k.as_ref(), *f)?,
546 AttributeValue::Int(i) => metrics.write_entry(k.as_ref(), *i as f64)?,
547 _ => {}
548 }
549 }
550 }
551
552 s.type_(span.span_type())?;
553
554 {
555 let mut ms = s.meta_struct();
556 for (k, v) in &span.attributes {
557 if let AttributeValue::Bytes(bytes) = v {
560 ms.write_entry(k.as_ref(), bytes.as_slice())?;
561 }
562 }
563 }
564
565 for link in span.span_links() {
566 s.add_span_links(|sl| {
567 sl.trace_id(link.trace_id())?
568 .trace_id_high(link.trace_id_high())?
569 .span_id(link.span_id())?;
570 {
571 let mut attrs = sl.attributes();
575 for (k, v) in link.attributes() {
576 if let AttributeValue::String(str_val) = v {
577 attrs.write_entry(k.as_ref(), str_val.as_ref())?;
578 }
579 }
580 }
581 let tracestate = link.tracestate().to_string();
582 sl.tracestate(tracestate.as_str())?.flags(link.flags())?;
583 Ok(())
584 })?;
585 }
586
587 for event in span.span_events() {
588 s.add_span_events(|se| {
589 se.time_unix_nano(event.time_unix_nano())?.name(event.name())?;
590 {
591 let mut attrs = se.attributes();
592 for (k, v) in event.attributes() {
593 attrs.write_entry(&**k, |av| encode_attribute_value(av, v))?;
594 }
595 }
596 Ok(())
597 })?;
598 }
599
600 Ok(())
601 })?;
602 }
603
604 {
606 let mut tags = chunk.tags();
607 if let Some(dm) = decision_maker {
608 tags.write_entry(TAG_DECISION_MAKER, dm)?;
609 }
610 if self.error_tracking_standalone {
611 let trace_has_error = trace.spans().iter().any(|span| {
612 span.error() != 0
613 || span
614 .attributes
615 .get("_dd.span_events.has_exception")
616 .and_then(AttributeValue::as_string)
617 .is_some_and(|v| v == "true")
618 });
619 if trace_has_error {
620 tags.write_entry("_dd.error_tracking_standalone.error", "true")?;
621 }
622 }
623
624 self.string_builder.clear();
625 write!(&mut self.string_builder, "{:.2}", otlp_sr)
626 .expect("should never fail to format sampling rate");
627 tags.write_entry(TAG_OTLP_SAMPLING_RATE, self.string_builder.as_str())?;
628 }
629
630 if dropped_trace {
631 chunk.dropped_trace(true)?;
632 }
633
634 Ok(())
635 })?;
636
637 if let Some(ct) = container_tags {
639 let mut tags = tp.tags();
640 tags.write_entry(CONTAINER_TAGS_META_KEY, &*ct)?;
641 }
642
643 if let Some(e) = env {
644 tp.env(e)?;
645 }
646 if let Some(h) = hostname {
647 tp.hostname(h)?;
648 }
649 if let Some(av) = app_version {
650 tp.app_version(av)?;
651 }
652
653 Ok(())
654 })?;
655
656 ap_builder.finish(output_buffer)?;
657
658 Ok(())
659 }
660
661 fn sampling_rate(&self) -> f64 {
662 let rate = self.otlp_traces.probabilistic_sampler.sampling_percentage / 100.0;
663 if rate <= 0.0 || rate >= 1.0 {
664 return 1.0;
665 }
666 rate
667 }
668}
669
670impl EndpointEncoder for TraceEndpointEncoder {
671 type Input = Trace;
672 type EncodeError = std::io::Error;
673 fn encoder_name() -> &'static str {
674 "traces"
675 }
676
677 fn compressed_size_limit(&self) -> usize {
678 DEFAULT_INTAKE_COMPRESSED_SIZE_LIMIT
679 }
680
681 fn uncompressed_size_limit(&self) -> usize {
682 DEFAULT_INTAKE_UNCOMPRESSED_SIZE_LIMIT
683 }
684
685 fn encode(&mut self, trace: &Self::Input, buffer: &mut Vec<u8>) -> Result<(), Self::EncodeError> {
686 self.encode_tracer_payload(trace, buffer)
687 }
688
689 fn endpoint_uri(&self) -> Uri {
690 PathAndQuery::from_static("/api/v0.2/traces").into()
691 }
692
693 fn endpoint_method(&self) -> Method {
694 Method::POST
695 }
696
697 fn content_type(&self) -> HeaderValue {
698 CONTENT_TYPE_PROTOBUF.clone()
699 }
700
701 fn additional_headers(&self) -> &[(HeaderName, HeaderValue)] {
702 &self.extra_headers
703 }
704}
705
706fn encode_attribute_value<S: ScratchBuffer>(
707 builder: &mut AttributeAnyValueBuilder<'_, S>, value: &AttributeValue,
708) -> std::io::Result<()> {
709 match value {
710 AttributeValue::String(v) => {
711 builder.type_(AttributeAnyValueType::STRING_VALUE)?.string_value(v)?;
712 }
713 AttributeValue::Bool(v) => {
714 builder.type_(AttributeAnyValueType::BOOL_VALUE)?.bool_value(*v)?;
715 }
716 AttributeValue::Int(v) => {
717 builder.type_(AttributeAnyValueType::INT_VALUE)?.int_value(*v)?;
718 }
719 AttributeValue::Float(v) => {
720 builder.type_(AttributeAnyValueType::DOUBLE_VALUE)?.double_value(*v)?;
721 }
722 AttributeValue::Bytes(_) => {
723 }
725 AttributeValue::Array(values) => {
726 builder.type_(AttributeAnyValueType::ARRAY_VALUE)?.array_value(|arr| {
727 for val in values {
728 arr.add_values(|av| encode_attribute_array_value(av, val))?;
729 }
730 Ok(())
731 })?;
732 }
733 AttributeValue::KeyValueList(_) => {
734 }
736 }
737 Ok(())
738}
739
740fn encode_attribute_array_value<S: ScratchBuffer>(
741 builder: &mut AttributeArrayValueBuilder<'_, S>, value: &AttributeValue,
742) -> std::io::Result<()> {
743 match value {
744 AttributeValue::String(v) => {
745 builder.type_(AttributeArrayValueType::STRING_VALUE)?.string_value(v)?;
746 }
747 AttributeValue::Bool(v) => {
748 builder.type_(AttributeArrayValueType::BOOL_VALUE)?.bool_value(*v)?;
749 }
750 AttributeValue::Int(v) => {
751 builder.type_(AttributeArrayValueType::INT_VALUE)?.int_value(*v)?;
752 }
753 AttributeValue::Float(v) => {
754 builder.type_(AttributeArrayValueType::DOUBLE_VALUE)?.double_value(*v)?;
755 }
756 AttributeValue::Bytes(_) | AttributeValue::Array(_) | AttributeValue::KeyValueList(_) => {
757 }
759 }
760 Ok(())
761}
762
763fn resolve_hostname_from_payload<'a>(
764 payload_hostname: &'a str, source: Option<&'a OtlpSource>, default_hostname: Option<&'a str>,
765 ignore_missing_fields: bool,
766) -> Option<&'a str> {
767 if !payload_hostname.is_empty() {
768 return Some(payload_hostname);
769 }
770 if ignore_missing_fields {
771 return Some("");
772 }
773 match source {
774 Some(src) => match src.kind {
775 OtlpSourceKind::HostnameKind => Some(src.identifier.as_str()),
776 _ => Some(""),
777 },
778 None => default_hostname,
779 }
780}
781
782fn resolve_container_tags_from_attrs(
783 attributes: &FastHashMap<MetaString, AttributeValue>, source: Option<&OtlpSource>, ignore_missing_fields: bool,
784) -> Option<MetaString> {
785 if let Some(AttributeValue::String(tags)) = attributes.get(KEY_DATADOG_CONTAINER_TAGS) {
786 if !tags.is_empty() {
787 return Some(tags.clone());
788 }
789 }
790
791 if ignore_missing_fields {
792 return None;
793 }
794 let mut container_tags = TagSet::default();
795 extract_container_tags_from_attributes_map(attributes, &mut container_tags);
796 let is_fargate_source = source.is_some_and(|src| src.kind == OtlpSourceKind::AwsEcsFargateKind);
797 if container_tags.is_empty() && !is_fargate_source {
798 return None;
799 }
800
801 let mut flattened = flatten_container_tag(container_tags);
802 if is_fargate_source {
803 if let Some(src) = source {
804 append_tags(&mut flattened, &src.tag());
805 }
806 }
807
808 if flattened.is_empty() {
809 None
810 } else {
811 Some(MetaString::from(flattened))
812 }
813}
814
815fn flatten_container_tag(tags: TagSet) -> String {
816 let mut flattened = String::new();
817 for tag in tags {
818 if !flattened.is_empty() {
819 flattened.push(',');
820 }
821 flattened.push_str(tag.as_str());
822 }
823 flattened
824}
825
826fn append_tags(target: &mut String, tags: &str) {
827 if tags.is_empty() {
828 return;
829 }
830 if !target.is_empty() {
831 target.push(',');
832 }
833 target.push_str(tags);
834}
835
836#[cfg(test)]
837mod tests {
838 use datadog_protos::traces::AgentPayload;
839 use protobuf::Message as _;
840 use saluki_config::ConfigurationLoader;
841 use saluki_core::data_model::event::trace::{Span as DdSpan, Trace};
842 use stringtheory::MetaString;
843
844 use super::*;
845 use crate::common::datadog::apm::ApmConfig;
846 use crate::common::otlp::config::TracesConfig;
847 use crate::config::{DatadogRemapper, KEY_ALIASES};
848
849 async fn make_encoder(ets_enabled: bool) -> TraceEndpointEncoder {
850 let env_vars: Vec<(String, String)> = if ets_enabled {
851 vec![("APM_ERROR_TRACKING_STANDALONE_ENABLED".to_string(), "true".to_string())]
852 } else {
853 vec![]
854 };
855 let (cfg, _) = ConfigurationLoader::for_tests_with_provider_factory(
856 None,
857 Some(&env_vars),
858 false,
859 KEY_ALIASES,
860 DatadogRemapper::new,
861 )
862 .await;
863 let apm_config = ApmConfig::from_configuration(&cfg).expect("ApmConfig should deserialize");
864 TraceEndpointEncoder::new(
865 MetaString::from("test-host"),
866 "0.0.0".to_string(),
867 "none".to_string(),
868 apm_config,
869 TracesConfig::default(),
870 )
871 }
872
873 fn make_trace() -> Trace {
874 let span = DdSpan::new(
875 MetaString::from("svc"),
876 MetaString::from("op"),
877 MetaString::from("res"),
878 MetaString::from("web"),
879 1, 0, 0, 1000, 0, );
885 let mut trace = Trace::new(vec![span]);
886 trace.priority = Some(1);
887 trace
888 }
889
890 fn make_error_trace() -> Trace {
891 let span = DdSpan::new(
892 MetaString::from("svc"),
893 MetaString::from("op"),
894 MetaString::from("res"),
895 MetaString::from("web"),
896 1, 0, 0, 1000, 1, );
902 let mut trace = Trace::new(vec![span]);
903 trace.priority = Some(1);
904 trace
905 }
906
907 #[tokio::test]
908 async fn ets_header_present_when_enabled() {
909 let encoder = make_encoder(true).await;
910 let headers = encoder.additional_headers();
911 assert_eq!(headers.len(), 1);
912 assert_eq!(headers[0].0.as_str(), "x-datadog-error-tracking-standalone");
913 assert_eq!(headers[0].1, "true");
914 }
915
916 #[tokio::test]
917 async fn ets_header_absent_when_disabled() {
918 let encoder = make_encoder(false).await;
919 assert!(encoder.additional_headers().is_empty());
920 }
921
922 #[tokio::test]
923 async fn ets_chunk_tag_present_for_error_trace() {
924 let mut encoder = make_encoder(true).await;
925 let trace = make_error_trace();
926 let mut buf = Vec::new();
927 encoder.encode(&trace, &mut buf).expect("encode should succeed");
928 let payload = AgentPayload::parse_from_bytes(&buf).expect("should parse AgentPayload");
929 let tag_value = payload
930 .tracerPayloads
931 .iter()
932 .flat_map(|tp| tp.chunks.iter())
933 .find_map(|chunk| {
934 chunk
935 .tags
936 .get("_dd.error_tracking_standalone.error")
937 .map(|v| v.as_str())
938 });
939 assert_eq!(
940 tag_value,
941 Some("true"),
942 "ETS chunk tag should be present for error traces when ETS is enabled"
943 );
944 }
945
946 #[tokio::test]
947 async fn ets_chunk_tag_absent_for_non_error_trace() {
948 let mut encoder = make_encoder(true).await;
949 let trace = make_trace(); let mut buf = Vec::new();
951 encoder.encode(&trace, &mut buf).expect("encode should succeed");
952 let payload = AgentPayload::parse_from_bytes(&buf).expect("should parse AgentPayload");
953 let has_tag = payload
954 .tracerPayloads
955 .iter()
956 .flat_map(|tp| tp.chunks.iter())
957 .any(|chunk| chunk.tags.contains_key("_dd.error_tracking_standalone.error"));
958 assert!(!has_tag, "ETS chunk tag should be absent for non-error traces");
959 }
960
961 #[tokio::test]
962 async fn ets_chunk_tag_absent_when_disabled() {
963 let mut encoder = make_encoder(false).await;
964 let trace = make_trace();
965 let mut buf = Vec::new();
966 encoder.encode(&trace, &mut buf).expect("encode should succeed");
967 let payload = AgentPayload::parse_from_bytes(&buf).expect("should parse AgentPayload");
968 let has_tag = payload
969 .tracerPayloads
970 .iter()
971 .flat_map(|tp| tp.chunks.iter())
972 .any(|chunk| chunk.tags.contains_key("_dd.error_tracking_standalone.error"));
973 assert!(!has_tag, "ETS chunk tag should be absent when ETS is disabled");
974 }
975}
976
977#[cfg(test)]
978mod config_smoke {
979 use datadog_agent_config_testing::config_registry::structs;
980 use datadog_agent_config_testing::run_config_smoke_tests;
981 use serde_json::json;
982
983 use super::DatadogTraceConfiguration;
984 use crate::config::{DatadogRemapper, KEY_ALIASES};
985
986 #[tokio::test]
987 async fn smoke_test() {
988 run_config_smoke_tests(
989 structs::DATADOG_TRACE_CONFIGURATION,
990 &[],
991 json!({}),
992 |cfg| {
993 cfg.as_typed::<DatadogTraceConfiguration>()
994 .expect("DatadogTraceConfiguration should deserialize")
995 },
996 KEY_ALIASES,
997 DatadogRemapper::new,
998 )
999 .await
1000 }
1001}