Skip to main content

saluki_components/encoders/datadog/traces/
mod.rs

1#![allow(dead_code)]
2
3use std::{fmt::Write, time::Duration};
4
5use async_trait::async_trait;
6use datadog_protos::traces::builders::{
7    attribute_any_value::AttributeAnyValueType, attribute_array_value::AttributeArrayValueType, AgentPayloadBuilder,
8    AttributeAnyValueBuilder, AttributeArrayValueBuilder,
9};
10use facet::Facet;
11use http::{uri::PathAndQuery, HeaderName, HeaderValue, Method, Uri};
12use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
13use opentelemetry_semantic_conventions::resource::{
14    CONTAINER_ID, DEPLOYMENT_ENVIRONMENT_NAME, K8S_POD_UID, SERVICE_VERSION,
15};
16use piecemeal::{ScratchBuffer, ScratchWriter};
17use saluki_common::strings::StringBuilder;
18use saluki_common::task::HandleExt as _;
19use saluki_config::GenericConfiguration;
20use saluki_context::tags::TagSet;
21use saluki_core::data_model::event::trace::{AttributeScalarValue, AttributeValue, Span as DdSpan};
22use saluki_core::topology::{EventsBuffer, PayloadsBuffer};
23use saluki_core::{
24    components::{encoders::*, ComponentContext},
25    data_model::{
26        event::{trace::Trace, EventType},
27        payload::{HttpPayload, Payload, PayloadMetadata, PayloadType},
28    },
29    observability::ComponentMetricsExt as _,
30};
31use saluki_env::host::providers::BoxedHostProvider;
32use saluki_env::{EnvironmentProvider, HostProvider};
33use saluki_error::generic_error;
34use saluki_error::{ErrorContext as _, GenericError};
35use saluki_io::compression::CompressionScheme;
36use saluki_metrics::MetricsBuilder;
37use serde::Deserialize;
38use stringtheory::MetaString;
39use tokio::{
40    select,
41    sync::mpsc::{self, Receiver, Sender},
42    time::sleep,
43};
44use tracing::{debug, error};
45
46use crate::common::datadog::{
47    apm::ApmConfig,
48    io::RB_BUFFER_CHUNK_SIZE,
49    request_builder::{EndpointEncoder, RequestBuilder},
50    telemetry::ComponentTelemetry,
51    DEFAULT_INTAKE_COMPRESSED_SIZE_LIMIT, DEFAULT_INTAKE_UNCOMPRESSED_SIZE_LIMIT, TAG_DECISION_MAKER,
52};
53use crate::common::otlp::config::TracesConfig;
54use crate::common::otlp::util::{
55    extract_container_tags_from_resource_tagset, tags_to_source, Source as OtlpSource, SourceKind as OtlpSourceKind,
56    DEPLOYMENT_ENVIRONMENT_KEY, KEY_DATADOG_CONTAINER_ID, KEY_DATADOG_CONTAINER_TAGS, KEY_DATADOG_ENVIRONMENT,
57    KEY_DATADOG_HOST, KEY_DATADOG_VERSION,
58};
59
60const CONTAINER_TAGS_META_KEY: &str = "_dd.tags.container";
61const MAX_TRACES_PER_PAYLOAD: usize = 10000;
62static CONTENT_TYPE_PROTOBUF: HeaderValue = HeaderValue::from_static("application/x-protobuf");
63
64// Sampling metadata keys / values.
65const TAG_OTLP_SAMPLING_RATE: &str = "_dd.otlp_sr";
66const DEFAULT_CHUNK_PRIORITY: i32 = 1; // PRIORITY_AUTO_KEEP
67
68fn default_serializer_compressor_kind() -> String {
69    "zstd".to_string()
70}
71
72const fn default_zstd_compressor_level() -> i32 {
73    3
74}
75
76const fn default_flush_timeout_secs() -> u64 {
77    2
78}
79
80fn default_env() -> String {
81    "none".to_string()
82}
83
84/// Configuration for the Datadog Traces encoder.
85///
86/// This encoder converts trace events into Datadog's TracerPayload protobuf format and sends them
87/// to the Datadog traces intake endpoint (`/api/v0.2/traces`). It handles batching, compression,
88/// and enrichment with metadata such as hostname, environment, and container tags.
89#[derive(Deserialize, Facet)]
90#[cfg_attr(test, derive(Debug, PartialEq, serde::Serialize))]
91pub struct DatadogTraceConfiguration {
92    #[serde(
93        rename = "serializer_compressor_kind",  // renames the field in the user_configuration from "serializer_compressor_kind" to "compressor_kind".
94        default = "default_serializer_compressor_kind"
95    )]
96    compressor_kind: String,
97
98    #[serde(
99        rename = "serializer_zstd_compressor_level",
100        default = "default_zstd_compressor_level"
101    )]
102    zstd_compressor_level: i32,
103
104    /// Flush timeout for pending requests, in seconds.
105    ///
106    /// When the encoder has written traces to the in-flight request payload, but it has not yet reached the
107    /// payload size limits that would force the payload to be flushed, the encoder will wait for a period of time
108    /// before flushing the in-flight request payload.
109    ///
110    /// Defaults to 2 seconds.
111    #[serde(default = "default_flush_timeout_secs")]
112    flush_timeout_secs: u64,
113
114    #[serde(skip)]
115    default_hostname: Option<String>,
116
117    #[serde(skip)]
118    version: String,
119
120    #[serde(skip)]
121    #[facet(opaque)]
122    apm_config: ApmConfig,
123
124    #[serde(skip)]
125    #[facet(opaque)]
126    otlp_traces: TracesConfig,
127
128    #[serde(default = "default_env")]
129    env: String,
130}
131
132impl DatadogTraceConfiguration {
133    /// Creates a new `DatadogTraceConfiguration` from the given configuration.
134    pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
135        let mut trace_config: Self = config.as_typed()?;
136
137        let app_details = saluki_metadata::get_app_details();
138        trace_config.version = format!("agent-data-plane/{}", app_details.version().raw());
139
140        trace_config.apm_config = ApmConfig::from_configuration(config)?;
141        trace_config.otlp_traces = config.try_get_typed("otlp_config.traces")?.unwrap_or_default();
142
143        Ok(trace_config)
144    }
145}
146
147impl DatadogTraceConfiguration {
148    /// Sets the default_hostname using the environment provider
149    pub async fn with_environment_provider<E>(mut self, environment_provider: E) -> Result<Self, GenericError>
150    where
151        E: EnvironmentProvider<Host = BoxedHostProvider>,
152    {
153        let host_provider = environment_provider.host();
154        let hostname = host_provider.get_hostname().await?;
155        self.default_hostname = Some(hostname);
156        Ok(self)
157    }
158}
159
160#[async_trait]
161impl EncoderBuilder for DatadogTraceConfiguration {
162    fn input_event_type(&self) -> EventType {
163        EventType::Trace
164    }
165
166    fn output_payload_type(&self) -> PayloadType {
167        PayloadType::Http
168    }
169
170    async fn build(&self, context: ComponentContext) -> Result<Box<dyn Encoder + Send>, GenericError> {
171        let metrics_builder = MetricsBuilder::from_component_context(&context);
172        let telemetry = ComponentTelemetry::from_builder(&metrics_builder);
173        let compression_scheme = CompressionScheme::new(&self.compressor_kind, self.zstd_compressor_level);
174
175        let default_hostname = self.default_hostname.clone().unwrap_or_default();
176        let default_hostname = MetaString::from(default_hostname);
177
178        // Create request builder for traces which is used to generate HTTP requests.
179
180        let mut trace_rb = RequestBuilder::new(
181            TraceEndpointEncoder::new(
182                default_hostname,
183                self.version.clone(),
184                self.env.clone(),
185                self.apm_config.clone(),
186                self.otlp_traces.clone(),
187            ),
188            compression_scheme,
189            RB_BUFFER_CHUNK_SIZE,
190        )
191        .await?;
192        trace_rb.with_max_inputs_per_payload(MAX_TRACES_PER_PAYLOAD);
193
194        let flush_timeout = match self.flush_timeout_secs {
195            // We always give ourselves a minimum flush timeout of 10ms to allow for some very minimal amount of
196            // batching, while still practically flushing things almost immediately.
197            0 => Duration::from_millis(10),
198            secs => Duration::from_secs(secs),
199        };
200
201        Ok(Box::new(DatadogTrace {
202            trace_rb,
203            telemetry,
204            flush_timeout,
205        }))
206    }
207}
208
209impl MemoryBounds for DatadogTraceConfiguration {
210    fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
211        // TODO: How do we properly represent the requests we can generate that may be sitting around in-flight?
212        builder
213            .minimum()
214            .with_single_value::<DatadogTrace>("component struct")
215            .with_array::<EventsBuffer>("request builder events channel", 8)
216            .with_array::<PayloadsBuffer>("request builder payloads channel", 8);
217
218        builder
219            .firm()
220            .with_array::<Trace>("traces split re-encode buffer", MAX_TRACES_PER_PAYLOAD);
221    }
222}
223
224pub struct DatadogTrace {
225    trace_rb: RequestBuilder<TraceEndpointEncoder>,
226    telemetry: ComponentTelemetry,
227    flush_timeout: Duration,
228}
229
230// Encodes Trace events to TracerPayloads.
231#[async_trait]
232impl Encoder for DatadogTrace {
233    async fn run(mut self: Box<Self>, mut context: EncoderContext) -> Result<(), GenericError> {
234        let Self {
235            trace_rb,
236            telemetry,
237            flush_timeout,
238        } = *self;
239
240        let mut health = context.take_health_handle();
241
242        // The encoder runs two async loops, the main encoder loop and the request builder loop,
243        // this channel is used to send events from the main encoder loop to the request builder loop safely.
244        let (events_tx, events_rx) = mpsc::channel(8);
245        // adds a channel to send payloads to the dispatcher and a channel to receive them.
246        let (payloads_tx, mut payloads_rx) = mpsc::channel(8);
247        let request_builder_fut = run_request_builder(trace_rb, telemetry, events_rx, payloads_tx, flush_timeout);
248        // Spawn the request builder task on the global thread pool, this task is responsible for encoding traces and flushing requests.
249        let request_builder_handle = context
250            .topology_context()
251            .global_thread_pool() // Use the shared Tokio runtime thread pool.
252            .spawn_traced_named("dd-traces-request-builder", request_builder_fut);
253
254        health.mark_ready();
255        debug!("Datadog Trace encoder started.");
256
257        loop {
258            select! {
259                biased; // makes the branches of the select statement be evaluated in order.
260
261                _ = health.live() => continue,
262                maybe_payload = payloads_rx.recv() => match maybe_payload {
263                    Some(payload) => {
264                        // Dispatch an HTTP payload to the dispatcher.
265                        if let Err(e) = context.dispatcher().dispatch(payload).await {
266                            error!("Failed to dispatch payload: {}", e);
267                        }
268                    }
269                    None => break,
270                },
271                maybe_event_buffer = context.events().next() => match maybe_event_buffer {
272                    Some(event_buffer) => events_tx.send(event_buffer).await
273                        .error_context("Failed to send event buffer to request builder task.")?,
274                    None => break,
275                },
276            }
277        }
278
279        // Drop the events sender, which signals the request builder task to stop.
280        drop(events_tx);
281
282        // Continue draining the payloads receiver until it is closed.
283        while let Some(payload) = payloads_rx.recv().await {
284            if let Err(e) = context.dispatcher().dispatch(payload).await {
285                error!("Failed to dispatch payload: {}", e);
286            }
287        }
288
289        // Request build task should now be stopped.
290        match request_builder_handle.await {
291            Ok(Ok(())) => debug!("Request builder task stopped."),
292            Ok(Err(e)) => error!(error = %e, "Request builder task failed."),
293            Err(e) => error!(error = %e, "Request builder task panicked."),
294        }
295
296        debug!("Datadog Trace encoder stopped.");
297
298        Ok(())
299    }
300}
301
302async fn run_request_builder(
303    mut trace_request_builder: RequestBuilder<TraceEndpointEncoder>, telemetry: ComponentTelemetry,
304    mut events_rx: Receiver<EventsBuffer>, payloads_tx: Sender<PayloadsBuffer>, flush_timeout: std::time::Duration,
305) -> Result<(), GenericError> {
306    let mut pending_flush = false;
307    let pending_flush_timeout = sleep(flush_timeout);
308    tokio::pin!(pending_flush_timeout);
309
310    loop {
311        select! {
312            Some(event_buffer) = events_rx.recv() => {
313                for event in event_buffer {
314                    let trace = match event.try_into_trace() {
315                        Some(trace) => trace,
316                        None => continue,
317                    };
318                    // Encode the trace. If we get it back, that means the current request is full, and we need to
319                    // flush it before we can try to encode the trace again.
320                    let trace_to_retry = match trace_request_builder.encode(trace).await {
321                        Ok(None) => continue,
322                        Ok(Some(trace)) => trace,
323                        Err(e) => {
324                            error!(error = %e, "Failed to encode trace.");
325                            telemetry.events_dropped_encoder().increment(1);
326                            continue;
327                        }
328                    };
329
330                    let maybe_requests = trace_request_builder.flush().await;
331                    if maybe_requests.is_empty() {
332                        panic!("builder told us to flush, but gave us nothing");
333                    }
334
335                    for maybe_request in maybe_requests {
336                        match maybe_request {
337                            Ok((events, request)) => {
338                                let payload_meta = PayloadMetadata::from_event_count(events);
339                                let http_payload = HttpPayload::new(payload_meta, request);
340                                let payload = Payload::Http(http_payload);
341
342                                payloads_tx.send(payload).await
343                                    .map_err(|_| generic_error!("Failed to send payload to encoder."))?;
344                            },
345                            Err(e) => if e.is_recoverable() {
346                                // If the error is recoverable, we'll hold on to the trace to retry it later.
347                                continue;
348                            } else {
349                                return Err(GenericError::from(e).context("Failed to flush request."));
350                            }
351                        }
352                    }
353
354                    // Now try to encode the trace again.
355                    if let Err(e) = trace_request_builder.encode(trace_to_retry).await {
356                        error!(error = %e, "Failed to encode trace.");
357                        telemetry.events_dropped_encoder().increment(1);
358                    }
359                }
360
361                debug!("Processed event buffer.");
362
363                // If we're not already pending a flush, we'll start the countdown.
364                if !pending_flush {
365                    pending_flush_timeout.as_mut().reset(tokio::time::Instant::now() + flush_timeout);
366                    pending_flush = true;
367                }
368            },
369            _ = &mut pending_flush_timeout, if pending_flush => {
370                debug!("Flushing pending request(s).");
371
372                pending_flush = false;
373
374                // Once we've encoded and written all traces, we flush the request builders to generate a request with
375                // anything left over. Again, we'll enqueue those requests to be sent immediately.
376                let maybe_trace_requests = trace_request_builder.flush().await;
377                for maybe_request in maybe_trace_requests {
378                    match maybe_request {
379                        Ok((events, request)) => {
380                            let payload_meta = PayloadMetadata::from_event_count(events);
381                            let http_payload = HttpPayload::new(payload_meta, request);
382                            let payload = Payload::Http(http_payload);
383
384                            payloads_tx.send(payload).await
385                                .map_err(|_| generic_error!("Failed to send payload to encoder."))?;
386                        },
387                        Err(e) => if e.is_recoverable() {
388                            continue;
389                        } else {
390                            return Err(GenericError::from(e).context("Failed to flush request."));
391                        }
392                    }
393                }
394
395                debug!("All flushed requests sent to I/O task. Waiting for next event buffer...");
396            },
397
398            // Event buffers channel has been closed, and we have no pending flushing, so we're all done.
399            else => break,
400        }
401    }
402
403    Ok(())
404}
405
406#[derive(Debug)]
407struct TraceEndpointEncoder {
408    scratch: ScratchWriter<Vec<u8>>,
409    default_hostname: MetaString,
410    agent_hostname: String,
411    version: String,
412    env: String,
413    apm_config: ApmConfig,
414    otlp_traces: TracesConfig,
415    string_builder: StringBuilder,
416    error_tracking_standalone: bool,
417    extra_headers: Vec<(HeaderName, HeaderValue)>,
418}
419
420impl TraceEndpointEncoder {
421    fn new(
422        default_hostname: MetaString, version: String, env: String, apm_config: ApmConfig, otlp_traces: TracesConfig,
423    ) -> Self {
424        let error_tracking_standalone = apm_config.error_tracking_standalone_enabled();
425        let extra_headers = if error_tracking_standalone {
426            vec![(
427                HeaderName::from_static("x-datadog-error-tracking-standalone"),
428                HeaderValue::from_static("true"),
429            )]
430        } else {
431            Vec::new()
432        };
433        Self {
434            scratch: ScratchWriter::new(Vec::with_capacity(8192)),
435            agent_hostname: default_hostname.as_ref().to_string(),
436            default_hostname,
437            version,
438            env,
439            apm_config,
440            otlp_traces,
441            string_builder: StringBuilder::new(),
442            error_tracking_standalone,
443            extra_headers,
444        }
445    }
446
447    fn encode_tracer_payload(&mut self, trace: &Trace, output_buffer: &mut Vec<u8>) -> std::io::Result<()> {
448        let sampling_rate = self.sampling_rate();
449        let resource_tags = trace.resource_tags();
450        let first_span = trace.spans().first();
451        let source = tags_to_source(resource_tags);
452
453        // Resolve metadata from resource tags.
454        let container_id = resolve_container_id(resource_tags, first_span);
455        let lang = get_resource_tag_value(resource_tags, "telemetry.sdk.language");
456        let sdk_version = get_resource_tag_value(resource_tags, "telemetry.sdk.version").unwrap_or("");
457        let tracer_version = format!("otlp-{}", sdk_version);
458        let container_tags = resolve_container_tags(
459            resource_tags,
460            source.as_ref(),
461            self.otlp_traces.ignore_missing_datadog_fields,
462        );
463        let env = resolve_env(resource_tags, self.otlp_traces.ignore_missing_datadog_fields);
464        let hostname = resolve_hostname(
465            resource_tags,
466            source.as_ref(),
467            Some(self.default_hostname.as_ref()),
468            self.otlp_traces.ignore_missing_datadog_fields,
469        );
470        let app_version = resolve_app_version(resource_tags);
471
472        // Resolve sampling metadata.
473        let (priority, dropped_trace, decision_maker, otlp_sr) = match trace.sampling() {
474            Some(sampling) => (
475                sampling.priority.unwrap_or(DEFAULT_CHUNK_PRIORITY),
476                sampling.dropped_trace,
477                sampling.decision_maker.as_deref(),
478                sampling.otlp_sampling_rate.unwrap_or(sampling_rate),
479            ),
480            None => (DEFAULT_CHUNK_PRIORITY, false, None, sampling_rate),
481        };
482
483        // Now incrementally build the payload.
484        let mut ap_builder = AgentPayloadBuilder::new(&mut self.scratch);
485
486        ap_builder
487            .host_name(&self.agent_hostname)?
488            .env(&self.env)?
489            .agent_version(&self.version)?
490            .target_tps(self.apm_config.target_traces_per_second())?
491            .error_tps(self.apm_config.errors_per_second())?;
492
493        ap_builder.add_tracer_payloads(|tp| {
494            if let Some(cid) = container_id {
495                tp.container_id(cid)?;
496            }
497            if let Some(l) = lang {
498                tp.language_name(l)?;
499            }
500            tp.tracer_version(&tracer_version)?;
501
502            // Encode the single TraceChunk containing all spans.
503            tp.add_chunks(|chunk| {
504                chunk.priority(priority)?;
505
506                for span in trace.spans() {
507                    chunk.add_spans(|s| {
508                        s.service(span.service())?
509                            .name(span.name())?
510                            .resource(span.resource())?
511                            .trace_id(span.trace_id())?
512                            .span_id(span.span_id())?
513                            .parent_id(span.parent_id())?
514                            .start(span.start() as i64)?
515                            .duration(span.duration() as i64)?
516                            .error(span.error())?;
517
518                        {
519                            let mut meta = s.meta();
520                            for (k, v) in span.meta() {
521                                meta.write_entry(k.as_ref(), v.as_ref())?;
522                            }
523                        }
524
525                        {
526                            let mut metrics = s.metrics();
527                            for (k, v) in span.metrics() {
528                                metrics.write_entry(k.as_ref(), *v)?;
529                            }
530                        }
531
532                        s.type_(span.span_type())?;
533
534                        {
535                            let mut ms = s.meta_struct();
536                            for (k, v) in span.meta_struct() {
537                                ms.write_entry(k.as_ref(), v.as_slice())?;
538                            }
539                        }
540
541                        for link in span.span_links() {
542                            s.add_span_links(|sl| {
543                                sl.trace_id(link.trace_id())?
544                                    .trace_id_high(link.trace_id_high())?
545                                    .span_id(link.span_id())?;
546                                {
547                                    let mut attrs = sl.attributes();
548                                    for (k, v) in link.attributes() {
549                                        attrs.write_entry(&**k, &**v)?;
550                                    }
551                                }
552                                let tracestate = link.tracestate().to_string();
553                                sl.tracestate(tracestate.as_str())?.flags(link.flags())?;
554                                Ok(())
555                            })?;
556                        }
557
558                        for event in span.span_events() {
559                            s.add_span_events(|se| {
560                                se.time_unix_nano(event.time_unix_nano())?.name(event.name())?;
561                                {
562                                    let mut attrs = se.attributes();
563                                    for (k, v) in event.attributes() {
564                                        attrs.write_entry(&**k, |av| encode_attribute_value(av, v))?;
565                                    }
566                                }
567                                Ok(())
568                            })?;
569                        }
570
571                        Ok(())
572                    })?;
573                }
574
575                // Chunk tags.
576                {
577                    let mut tags = chunk.tags();
578                    if let Some(dm) = decision_maker {
579                        tags.write_entry(TAG_DECISION_MAKER, dm)?;
580                    }
581                    if self.error_tracking_standalone {
582                        let trace_has_error = trace.spans().iter().any(|span| {
583                            span.error() != 0
584                                || span
585                                    .meta()
586                                    .get("_dd.span_events.has_exception")
587                                    .is_some_and(|v| v == "true")
588                        });
589                        if trace_has_error {
590                            tags.write_entry("_dd.error_tracking_standalone.error", "true")?;
591                        }
592                    }
593
594                    self.string_builder.clear();
595                    write!(&mut self.string_builder, "{:.2}", otlp_sr)
596                        .expect("should never fail to format sampling rate");
597                    tags.write_entry(TAG_OTLP_SAMPLING_RATE, self.string_builder.as_str())?;
598                }
599
600                if dropped_trace {
601                    chunk.dropped_trace(true)?;
602                }
603
604                Ok(())
605            })?;
606
607            // Tracer payload tags.
608            if let Some(ct) = container_tags {
609                let mut tags = tp.tags();
610                tags.write_entry(CONTAINER_TAGS_META_KEY, &*ct)?;
611            }
612
613            if let Some(e) = env {
614                tp.env(e)?;
615            }
616            if let Some(h) = hostname {
617                tp.hostname(h)?;
618            }
619            if let Some(av) = app_version {
620                tp.app_version(av)?;
621            }
622
623            Ok(())
624        })?;
625
626        ap_builder.finish(output_buffer)?;
627
628        Ok(())
629    }
630
631    fn sampling_rate(&self) -> f64 {
632        let rate = self.otlp_traces.probabilistic_sampler.sampling_percentage / 100.0;
633        if rate <= 0.0 || rate >= 1.0 {
634            return 1.0;
635        }
636        rate
637    }
638}
639
640impl EndpointEncoder for TraceEndpointEncoder {
641    type Input = Trace;
642    type EncodeError = std::io::Error;
643    fn encoder_name() -> &'static str {
644        "traces"
645    }
646
647    fn compressed_size_limit(&self) -> usize {
648        DEFAULT_INTAKE_COMPRESSED_SIZE_LIMIT
649    }
650
651    fn uncompressed_size_limit(&self) -> usize {
652        DEFAULT_INTAKE_UNCOMPRESSED_SIZE_LIMIT
653    }
654
655    fn encode(&mut self, trace: &Self::Input, buffer: &mut Vec<u8>) -> Result<(), Self::EncodeError> {
656        self.encode_tracer_payload(trace, buffer)
657    }
658
659    fn endpoint_uri(&self) -> Uri {
660        PathAndQuery::from_static("/api/v0.2/traces").into()
661    }
662
663    fn endpoint_method(&self) -> Method {
664        Method::POST
665    }
666
667    fn content_type(&self) -> HeaderValue {
668        CONTENT_TYPE_PROTOBUF.clone()
669    }
670
671    fn additional_headers(&self) -> &[(HeaderName, HeaderValue)] {
672        &self.extra_headers
673    }
674}
675
676fn encode_attribute_value<S: ScratchBuffer>(
677    builder: &mut AttributeAnyValueBuilder<'_, S>, value: &AttributeValue,
678) -> std::io::Result<()> {
679    match value {
680        AttributeValue::String(v) => {
681            builder.type_(AttributeAnyValueType::STRING_VALUE)?.string_value(v)?;
682        }
683        AttributeValue::Bool(v) => {
684            builder.type_(AttributeAnyValueType::BOOL_VALUE)?.bool_value(*v)?;
685        }
686        AttributeValue::Int(v) => {
687            builder.type_(AttributeAnyValueType::INT_VALUE)?.int_value(*v)?;
688        }
689        AttributeValue::Double(v) => {
690            builder.type_(AttributeAnyValueType::DOUBLE_VALUE)?.double_value(*v)?;
691        }
692        AttributeValue::Array(values) => {
693            builder.type_(AttributeAnyValueType::ARRAY_VALUE)?.array_value(|arr| {
694                for val in values {
695                    arr.add_values(|av| encode_attribute_array_value(av, val))?;
696                }
697                Ok(())
698            })?;
699        }
700    }
701    Ok(())
702}
703
704fn encode_attribute_array_value<S: ScratchBuffer>(
705    builder: &mut AttributeArrayValueBuilder<'_, S>, value: &AttributeScalarValue,
706) -> std::io::Result<()> {
707    match value {
708        AttributeScalarValue::String(v) => {
709            builder.type_(AttributeArrayValueType::STRING_VALUE)?.string_value(v)?;
710        }
711        AttributeScalarValue::Bool(v) => {
712            builder.type_(AttributeArrayValueType::BOOL_VALUE)?.bool_value(*v)?;
713        }
714        AttributeScalarValue::Int(v) => {
715            builder.type_(AttributeArrayValueType::INT_VALUE)?.int_value(*v)?;
716        }
717        AttributeScalarValue::Double(v) => {
718            builder.type_(AttributeArrayValueType::DOUBLE_VALUE)?.double_value(*v)?;
719        }
720    }
721    Ok(())
722}
723
724fn get_resource_tag_value<'a>(resource_tags: &'a TagSet, key: &str) -> Option<&'a str> {
725    resource_tags.get_single_tag(key).and_then(|t| t.value())
726}
727
728fn resolve_hostname<'a>(
729    resource_tags: &'a TagSet, source: Option<&'a OtlpSource>, default_hostname: Option<&'a str>,
730    ignore_missing_fields: bool,
731) -> Option<&'a str> {
732    let mut hostname = match source {
733        Some(src) => match src.kind {
734            OtlpSourceKind::HostnameKind => Some(src.identifier.as_str()),
735            _ => Some(""),
736        },
737        None => default_hostname,
738    };
739
740    if ignore_missing_fields {
741        hostname = Some("");
742    }
743
744    if let Some(value) = get_resource_tag_value(resource_tags, KEY_DATADOG_HOST) {
745        hostname = Some(value);
746    }
747
748    hostname
749}
750
751fn resolve_env(resource_tags: &TagSet, ignore_missing_fields: bool) -> Option<&str> {
752    if let Some(value) = get_resource_tag_value(resource_tags, KEY_DATADOG_ENVIRONMENT) {
753        return Some(value);
754    }
755    if ignore_missing_fields {
756        return None;
757    }
758    if let Some(value) = get_resource_tag_value(resource_tags, DEPLOYMENT_ENVIRONMENT_NAME) {
759        return Some(value);
760    }
761    get_resource_tag_value(resource_tags, DEPLOYMENT_ENVIRONMENT_KEY)
762}
763
764fn resolve_container_id<'a>(resource_tags: &'a TagSet, first_span: Option<&'a DdSpan>) -> Option<&'a str> {
765    for key in [KEY_DATADOG_CONTAINER_ID, CONTAINER_ID, K8S_POD_UID] {
766        if let Some(value) = get_resource_tag_value(resource_tags, key) {
767            return Some(value);
768        }
769    }
770    // TODO: add container id fallback equivalent to cidProvider
771    // https://github.com/DataDog/datadog-agent/blob/main/pkg/trace/api/otlp.go#L414
772    if let Some(span) = first_span {
773        for (k, v) in span.meta() {
774            if k == KEY_DATADOG_CONTAINER_ID || k == K8S_POD_UID {
775                return Some(v.as_ref());
776            }
777        }
778    }
779    None
780}
781
782fn resolve_app_version(resource_tags: &TagSet) -> Option<&str> {
783    if let Some(value) = get_resource_tag_value(resource_tags, KEY_DATADOG_VERSION) {
784        return Some(value);
785    }
786    get_resource_tag_value(resource_tags, SERVICE_VERSION)
787}
788
789fn resolve_container_tags(
790    resource_tags: &TagSet, source: Option<&OtlpSource>, ignore_missing_fields: bool,
791) -> Option<MetaString> {
792    // TODO: some refactoring is probably needed to normalize this function, the tags should already be normalized
793    // since we do so when we transform OTLP spans to DD spans however to make this class extensible for non otlp traces, we would
794    // need to normalize the tags here.
795    if let Some(tags) = get_resource_tag_value(resource_tags, KEY_DATADOG_CONTAINER_TAGS) {
796        if !tags.is_empty() {
797            return Some(MetaString::from(tags));
798        }
799    }
800
801    if ignore_missing_fields {
802        return None;
803    }
804    let mut container_tags = TagSet::default();
805    extract_container_tags_from_resource_tagset(resource_tags, &mut container_tags);
806    let is_fargate_source = source.is_some_and(|src| src.kind == OtlpSourceKind::AwsEcsFargateKind);
807    if container_tags.is_empty() && !is_fargate_source {
808        return None;
809    }
810
811    let mut flattened = flatten_container_tag(container_tags);
812    if is_fargate_source {
813        if let Some(src) = source {
814            append_tags(&mut flattened, &src.tag());
815        }
816    }
817
818    if flattened.is_empty() {
819        None
820    } else {
821        Some(MetaString::from(flattened))
822    }
823}
824
825fn flatten_container_tag(tags: TagSet) -> String {
826    let mut flattened = String::new();
827    for tag in tags {
828        if !flattened.is_empty() {
829            flattened.push(',');
830        }
831        flattened.push_str(tag.as_str());
832    }
833    flattened
834}
835
836fn append_tags(target: &mut String, tags: &str) {
837    if tags.is_empty() {
838        return;
839    }
840    if !target.is_empty() {
841        target.push(',');
842    }
843    target.push_str(tags);
844}
845
846#[cfg(test)]
847mod tests {
848    use datadog_protos::traces::AgentPayload;
849    use protobuf::Message as _;
850    use saluki_config::ConfigurationLoader;
851    use saluki_context::tags::TagSet;
852    use saluki_core::data_model::event::trace::{Span as DdSpan, Trace, TraceSampling};
853    use stringtheory::MetaString;
854
855    use super::*;
856    use crate::common::datadog::apm::ApmConfig;
857    use crate::common::otlp::config::TracesConfig;
858    use crate::config::{DatadogRemapper, KEY_ALIASES};
859
860    async fn make_encoder(ets_enabled: bool) -> TraceEndpointEncoder {
861        let env_vars: Vec<(String, String)> = if ets_enabled {
862            vec![("APM_ERROR_TRACKING_STANDALONE_ENABLED".to_string(), "true".to_string())]
863        } else {
864            vec![]
865        };
866        let (cfg, _) = ConfigurationLoader::for_tests_with_provider_factory(
867            None,
868            Some(&env_vars),
869            false,
870            KEY_ALIASES,
871            DatadogRemapper::new,
872        )
873        .await;
874        let apm_config = ApmConfig::from_configuration(&cfg).expect("ApmConfig should deserialize");
875        TraceEndpointEncoder::new(
876            MetaString::from("test-host"),
877            "0.0.0".to_string(),
878            "none".to_string(),
879            apm_config,
880            TracesConfig::default(),
881        )
882    }
883
884    fn make_trace() -> Trace {
885        let span = DdSpan::new(
886            MetaString::from("svc"),
887            MetaString::from("op"),
888            MetaString::from("res"),
889            MetaString::from("web"),
890            1,
891            1,
892            0,
893            0,
894            1000,
895            0,
896        );
897        let mut trace = Trace::new(vec![span], TagSet::default());
898        trace.set_sampling(Some(TraceSampling::new(false, Some(1), None, None)));
899        trace
900    }
901
902    fn make_error_trace() -> Trace {
903        let span = DdSpan::new(
904            MetaString::from("svc"),
905            MetaString::from("op"),
906            MetaString::from("res"),
907            MetaString::from("web"),
908            1,    // trace_id
909            1,    // span_id
910            0,    // parent_id
911            0,    // start
912            1000, // duration
913            1,    // error
914        );
915        let mut trace = Trace::new(vec![span], TagSet::default());
916        trace.set_sampling(Some(TraceSampling::new(false, Some(1), None, None)));
917        trace
918    }
919
920    #[tokio::test]
921    async fn ets_header_present_when_enabled() {
922        let encoder = make_encoder(true).await;
923        let headers = encoder.additional_headers();
924        assert_eq!(headers.len(), 1);
925        assert_eq!(headers[0].0.as_str(), "x-datadog-error-tracking-standalone");
926        assert_eq!(headers[0].1, "true");
927    }
928
929    #[tokio::test]
930    async fn ets_header_absent_when_disabled() {
931        let encoder = make_encoder(false).await;
932        assert!(encoder.additional_headers().is_empty());
933    }
934
935    #[tokio::test]
936    async fn ets_chunk_tag_present_for_error_trace() {
937        let mut encoder = make_encoder(true).await;
938        let trace = make_error_trace();
939        let mut buf = Vec::new();
940        encoder.encode(&trace, &mut buf).expect("encode should succeed");
941        let payload = AgentPayload::parse_from_bytes(&buf).expect("should parse AgentPayload");
942        let tag_value = payload
943            .tracerPayloads
944            .iter()
945            .flat_map(|tp| tp.chunks.iter())
946            .find_map(|chunk| {
947                chunk
948                    .tags
949                    .get("_dd.error_tracking_standalone.error")
950                    .map(|v| v.as_str())
951            });
952        assert_eq!(
953            tag_value,
954            Some("true"),
955            "ETS chunk tag should be present for error traces when ETS is enabled"
956        );
957    }
958
959    #[tokio::test]
960    async fn ets_chunk_tag_absent_for_non_error_trace() {
961        let mut encoder = make_encoder(true).await;
962        let trace = make_trace(); // no error
963        let mut buf = Vec::new();
964        encoder.encode(&trace, &mut buf).expect("encode should succeed");
965        let payload = AgentPayload::parse_from_bytes(&buf).expect("should parse AgentPayload");
966        let has_tag = payload
967            .tracerPayloads
968            .iter()
969            .flat_map(|tp| tp.chunks.iter())
970            .any(|chunk| chunk.tags.contains_key("_dd.error_tracking_standalone.error"));
971        assert!(!has_tag, "ETS chunk tag should be absent for non-error traces");
972    }
973
974    #[tokio::test]
975    async fn ets_chunk_tag_absent_when_disabled() {
976        let mut encoder = make_encoder(false).await;
977        let trace = make_trace();
978        let mut buf = Vec::new();
979        encoder.encode(&trace, &mut buf).expect("encode should succeed");
980        let payload = AgentPayload::parse_from_bytes(&buf).expect("should parse AgentPayload");
981        let has_tag = payload
982            .tracerPayloads
983            .iter()
984            .flat_map(|tp| tp.chunks.iter())
985            .any(|chunk| chunk.tags.contains_key("_dd.error_tracking_standalone.error"));
986        assert!(!has_tag, "ETS chunk tag should be absent when ETS is disabled");
987    }
988}
989
990#[cfg(test)]
991mod config_smoke {
992    use serde_json::json;
993
994    use super::DatadogTraceConfiguration;
995    use crate::config_registry::structs;
996    use crate::config_registry::test_support::run_config_smoke_tests;
997
998    #[tokio::test]
999    async fn smoke_test() {
1000        run_config_smoke_tests(structs::DATADOG_TRACE_CONFIGURATION, &[], json!({}), |cfg| {
1001            cfg.as_typed::<DatadogTraceConfiguration>()
1002                .expect("DatadogTraceConfiguration should deserialize")
1003        })
1004        .await
1005    }
1006}