Skip to main content

saluki_components/encoders/datadog/traces/
mod.rs

1#![allow(dead_code)]
2
3use std::{fmt::Write, time::Duration};
4
5use async_trait::async_trait;
6use datadog_protos::traces::builders::{
7    attribute_any_value::AttributeAnyValueType, attribute_array_value::AttributeArrayValueType, AgentPayloadBuilder,
8    AttributeAnyValueBuilder, AttributeArrayValueBuilder,
9};
10use facet::Facet;
11use http::{uri::PathAndQuery, HeaderName, HeaderValue, Method, Uri};
12use piecemeal::{ScratchBuffer, ScratchWriter};
13use resource_accounting::{MemoryBounds, MemoryBoundsBuilder};
14use saluki_common::collections::FastHashMap;
15use saluki_common::strings::StringBuilder;
16use saluki_common::task::HandleExt as _;
17use saluki_config::GenericConfiguration;
18use saluki_context::tags::TagSet;
19use saluki_core::data_model::event::trace::AttributeValue;
20use saluki_core::topology::{EventsBuffer, PayloadsBuffer};
21use saluki_core::{
22    components::{encoders::*, ComponentContext},
23    data_model::{
24        event::{trace::Trace, EventType},
25        payload::{HttpPayload, Payload, PayloadMetadata, PayloadType},
26    },
27    observability::ComponentMetricsExt as _,
28};
29use saluki_env::host::providers::BoxedHostProvider;
30use saluki_env::{EnvironmentProvider, HostProvider};
31use saluki_error::generic_error;
32use saluki_error::{ErrorContext as _, GenericError};
33use saluki_io::compression::CompressionScheme;
34use saluki_metrics::MetricsBuilder;
35use serde::Deserialize;
36use stringtheory::MetaString;
37use tokio::pin;
38use tokio::{
39    select,
40    sync::mpsc::{self, Receiver, Sender},
41    time::sleep,
42};
43use tracing::{debug, error};
44
45use crate::common::datadog::{
46    apm::ApmConfig,
47    io::RB_BUFFER_CHUNK_SIZE,
48    request_builder::{EndpointEncoder, RequestBuilder},
49    telemetry::ComponentTelemetry,
50    DEFAULT_INTAKE_COMPRESSED_SIZE_LIMIT, DEFAULT_INTAKE_UNCOMPRESSED_SIZE_LIMIT, TAG_DECISION_MAKER,
51};
52use crate::common::otlp::config::TracesConfig;
53use crate::common::otlp::util::{
54    attributes_to_source, extract_container_tags_from_attributes_map, Source as OtlpSource,
55    SourceKind as OtlpSourceKind, KEY_DATADOG_CONTAINER_TAGS,
56};
57
58const CONTAINER_TAGS_META_KEY: &str = "_dd.tags.container";
59const MAX_TRACES_PER_PAYLOAD: usize = 10000;
60static CONTENT_TYPE_PROTOBUF: HeaderValue = HeaderValue::from_static("application/x-protobuf");
61
62// Sampling metadata keys / values.
63const TAG_OTLP_SAMPLING_RATE: &str = "_dd.otlp_sr";
64const DEFAULT_CHUNK_PRIORITY: i32 = 1; // PRIORITY_AUTO_KEEP
65
66fn default_serializer_compressor_kind() -> String {
67    "zstd".to_string()
68}
69
70const fn default_zstd_compressor_level() -> i32 {
71    3
72}
73
74const fn default_flush_timeout_secs() -> u64 {
75    2
76}
77
78fn default_env() -> String {
79    "none".to_string()
80}
81
82/// Configuration for the Datadog Traces encoder.
83///
84/// This encoder converts trace events into Datadog's TracerPayload protobuf format and sends them
85/// to the Datadog traces intake endpoint (`/api/v0.2/traces`). It handles batching, compression,
86/// and enrichment with metadata such as hostname, environment, and container tags.
87#[derive(Deserialize, Facet)]
88#[cfg_attr(test, derive(Debug, PartialEq, serde::Serialize))]
89pub struct DatadogTraceConfiguration {
90    #[serde(
91        rename = "serializer_compressor_kind",  // renames the field in the user_configuration from "serializer_compressor_kind" to "compressor_kind".
92        default = "default_serializer_compressor_kind"
93    )]
94    compressor_kind: String,
95
96    #[serde(
97        rename = "serializer_zstd_compressor_level",
98        default = "default_zstd_compressor_level"
99    )]
100    zstd_compressor_level: i32,
101
102    /// Flush timeout for pending requests, in seconds.
103    ///
104    /// When the encoder has written traces to the in-flight request payload, but it hasn't yet reached the
105    /// payload size limits that would force the payload to be flushed, the encoder will wait for a period of time
106    /// before flushing the in-flight request payload.
107    ///
108    /// Defaults to 2 seconds.
109    #[serde(default = "default_flush_timeout_secs")]
110    flush_timeout_secs: u64,
111
112    #[serde(skip)]
113    default_hostname: Option<String>,
114
115    #[serde(skip)]
116    version: String,
117
118    #[serde(skip)]
119    #[facet(opaque)]
120    apm_config: ApmConfig,
121
122    #[serde(skip)]
123    #[facet(opaque)]
124    otlp_traces: TracesConfig,
125
126    #[serde(default = "default_env")]
127    env: String,
128}
129
130impl DatadogTraceConfiguration {
131    /// Creates a new `DatadogTraceConfiguration` from the given configuration.
132    pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
133        let mut trace_config: Self = config.as_typed()?;
134
135        let app_details = saluki_metadata::get_app_details();
136        trace_config.version = format!("agent-data-plane/{}", app_details.version().raw());
137
138        trace_config.apm_config = ApmConfig::from_configuration(config)?;
139        trace_config.otlp_traces = config.try_get_typed("otlp_config.traces")?.unwrap_or_default();
140
141        Ok(trace_config)
142    }
143}
144
145impl DatadogTraceConfiguration {
146    /// Sets the `default_hostname` using the environment provider
147    pub async fn with_environment_provider<E>(mut self, environment_provider: E) -> Result<Self, GenericError>
148    where
149        E: EnvironmentProvider<Host = BoxedHostProvider>,
150    {
151        let host_provider = environment_provider.host();
152        let hostname = host_provider.get_hostname().await?;
153        self.default_hostname = Some(hostname);
154        Ok(self)
155    }
156}
157
158#[async_trait]
159impl EncoderBuilder for DatadogTraceConfiguration {
160    fn input_event_type(&self) -> EventType {
161        EventType::Trace
162    }
163
164    fn output_payload_type(&self) -> PayloadType {
165        PayloadType::Http
166    }
167
168    async fn build(&self, context: ComponentContext) -> Result<Box<dyn Encoder + Send>, GenericError> {
169        let metrics_builder = MetricsBuilder::from_component_context(&context);
170        let telemetry = ComponentTelemetry::from_builder(&metrics_builder);
171        let compression_scheme = CompressionScheme::new(&self.compressor_kind, self.zstd_compressor_level);
172
173        let default_hostname = self.default_hostname.clone().unwrap_or_default();
174        let default_hostname = MetaString::from(default_hostname);
175
176        // Create request builder for traces which is used to generate HTTP requests.
177
178        let mut trace_rb = RequestBuilder::new(
179            TraceEndpointEncoder::new(
180                default_hostname,
181                self.version.clone(),
182                self.env.clone(),
183                self.apm_config.clone(),
184                self.otlp_traces.clone(),
185            ),
186            compression_scheme,
187            RB_BUFFER_CHUNK_SIZE,
188        )
189        .await?;
190        trace_rb.with_max_inputs_per_payload(MAX_TRACES_PER_PAYLOAD);
191
192        let flush_timeout = match self.flush_timeout_secs {
193            // We always give ourselves a minimum flush timeout of 10ms to allow for some very minimal amount of
194            // batching, while still practically flushing things almost immediately.
195            0 => Duration::from_millis(10),
196            secs => Duration::from_secs(secs),
197        };
198
199        Ok(Box::new(DatadogTrace {
200            trace_rb,
201            telemetry,
202            flush_timeout,
203        }))
204    }
205}
206
207impl MemoryBounds for DatadogTraceConfiguration {
208    fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
209        // TODO: How do we properly represent the requests we can generate that may be sitting around in-flight?
210        builder
211            .minimum()
212            .with_single_value::<DatadogTrace>("component struct")
213            .with_array::<EventsBuffer>("request builder events channel", 8)
214            .with_array::<PayloadsBuffer>("request builder payloads channel", 8);
215
216        builder
217            .firm()
218            .with_array::<Trace>("traces split re-encode buffer", MAX_TRACES_PER_PAYLOAD);
219    }
220}
221
222pub struct DatadogTrace {
223    trace_rb: RequestBuilder<TraceEndpointEncoder>,
224    telemetry: ComponentTelemetry,
225    flush_timeout: Duration,
226}
227
228// Encodes Trace events to TracerPayloads.
229#[async_trait]
230impl Encoder for DatadogTrace {
231    async fn run(mut self: Box<Self>, mut context: EncoderContext) -> Result<(), GenericError> {
232        let Self {
233            trace_rb,
234            telemetry,
235            flush_timeout,
236        } = *self;
237
238        let mut health = context.take_health_handle();
239
240        // The encoder runs two async loops, the main encoder loop and the request builder loop,
241        // this channel is used to send events from the main encoder loop to the request builder loop safely.
242        let (events_tx, events_rx) = mpsc::channel(8);
243        // adds a channel to send payloads to the dispatcher and a channel to receive them.
244        let (payloads_tx, mut payloads_rx) = mpsc::channel(8);
245        let request_builder_fut = run_request_builder(trace_rb, telemetry, events_rx, payloads_tx, flush_timeout);
246        // Spawn the request builder task on the global thread pool, this task is responsible for encoding traces and flushing requests.
247        let request_builder_handle = context
248            .topology_context()
249            .global_thread_pool() // Use the shared Tokio runtime thread pool.
250            .spawn_traced_named("dd-traces-request-builder", request_builder_fut);
251
252        health.mark_ready();
253        debug!("Datadog Trace encoder started.");
254
255        loop {
256            select! {
257                biased; // makes the branches of the select statement be evaluated in order.
258
259                _ = health.live() => continue,
260                maybe_payload = payloads_rx.recv() => match maybe_payload {
261                    Some(payload) => {
262                        // Dispatch an HTTP payload to the dispatcher.
263                        if let Err(e) = context.dispatcher().dispatch(payload).await {
264                            error!("Failed to dispatch payload: {}", e);
265                        }
266                    }
267                    None => break,
268                },
269                maybe_event_buffer = context.events().next() => match maybe_event_buffer {
270                    Some(event_buffer) => events_tx.send(event_buffer).await
271                        .error_context("Failed to send event buffer to request builder task.")?,
272                    None => break,
273                },
274            }
275        }
276
277        // Drop the events sender, which signals the request builder task to stop.
278        drop(events_tx);
279
280        // Continue draining the payloads receiver until it is closed.
281        while let Some(payload) = payloads_rx.recv().await {
282            if let Err(e) = context.dispatcher().dispatch(payload).await {
283                error!("Failed to dispatch payload: {}", e);
284            }
285        }
286
287        // Request build task should now be stopped.
288        match request_builder_handle.await {
289            Ok(Ok(())) => debug!("Request builder task stopped."),
290            Ok(Err(e)) => error!(error = %e, "Request builder task failed."),
291            Err(e) => error!(error = %e, "Request builder task panicked."),
292        }
293
294        debug!("Datadog Trace encoder stopped.");
295
296        Ok(())
297    }
298}
299
300async fn run_request_builder(
301    mut trace_request_builder: RequestBuilder<TraceEndpointEncoder>, telemetry: ComponentTelemetry,
302    mut events_rx: Receiver<EventsBuffer>, payloads_tx: Sender<PayloadsBuffer>, flush_timeout: std::time::Duration,
303) -> Result<(), GenericError> {
304    let mut pending_flush = false;
305    let pending_flush_timeout = sleep(flush_timeout);
306    pin!(pending_flush_timeout);
307
308    loop {
309        select! {
310            Some(event_buffer) = events_rx.recv() => {
311                for event in event_buffer {
312                    let trace = match event.try_into_trace() {
313                        Some(trace) => trace,
314                        None => continue,
315                    };
316                    // Encode the trace. If we get it back, that means the current request is full, and we need to
317                    // flush it before we can try to encode the trace again.
318                    let trace_to_retry = match trace_request_builder.encode(trace).await {
319                        Ok(None) => continue,
320                        Ok(Some(trace)) => trace,
321                        Err(e) => {
322                            error!(error = %e, "Failed to encode trace.");
323                            telemetry.events_dropped_encoder().increment(1);
324                            continue;
325                        }
326                    };
327
328                    let maybe_requests = trace_request_builder.flush().await;
329                    if maybe_requests.is_empty() {
330                        panic!("builder told us to flush, but gave us nothing");
331                    }
332
333                    for maybe_request in maybe_requests {
334                        match maybe_request {
335                            Ok((events, _data_points, request)) => {
336                                let payload_meta = PayloadMetadata::from_event_count(events);
337                                let http_payload = HttpPayload::new(payload_meta, request);
338                                let payload = Payload::Http(http_payload);
339
340                                payloads_tx.send(payload).await
341                                    .map_err(|_| generic_error!("Failed to send payload to encoder."))?;
342                            },
343                            Err(e) => if e.is_recoverable() {
344                                // If the error is recoverable, we'll hold on to the trace to retry it later.
345                                continue;
346                            } else {
347                                return Err(GenericError::from(e).context("Failed to flush request."));
348                            }
349                        }
350                    }
351
352                    // Now try to encode the trace again.
353                    if let Err(e) = trace_request_builder.encode(trace_to_retry).await {
354                        error!(error = %e, "Failed to encode trace.");
355                        telemetry.events_dropped_encoder().increment(1);
356                    }
357                }
358
359                debug!("Processed event buffer.");
360
361                // If we're not already pending a flush, we'll start the countdown.
362                if !pending_flush {
363                    pending_flush_timeout.as_mut().reset(tokio::time::Instant::now() + flush_timeout);
364                    pending_flush = true;
365                }
366            },
367            _ = &mut pending_flush_timeout, if pending_flush => {
368                debug!("Flushing pending request(s).");
369
370                pending_flush = false;
371
372                // Once we've encoded and written all traces, we flush the request builders to generate a request with
373                // anything left over. Again, we'll enqueue those requests to be sent immediately.
374                let maybe_trace_requests = trace_request_builder.flush().await;
375                for maybe_request in maybe_trace_requests {
376                    match maybe_request {
377                        Ok((events, _data_points, request)) => {
378                            let payload_meta = PayloadMetadata::from_event_count(events);
379                            let http_payload = HttpPayload::new(payload_meta, request);
380                            let payload = Payload::Http(http_payload);
381
382                            payloads_tx.send(payload).await
383                                .map_err(|_| generic_error!("Failed to send payload to encoder."))?;
384                        },
385                        Err(e) => if e.is_recoverable() {
386                            continue;
387                        } else {
388                            return Err(GenericError::from(e).context("Failed to flush request."));
389                        }
390                    }
391                }
392
393                debug!("All flushed requests sent to I/O task. Waiting for next event buffer...");
394            },
395
396            // Event buffers channel has been closed, and we have no pending flushing, so we're all done.
397            else => break,
398        }
399    }
400
401    Ok(())
402}
403
404#[derive(Debug)]
405struct TraceEndpointEncoder {
406    scratch: ScratchWriter<Vec<u8>>,
407    default_hostname: MetaString,
408    agent_hostname: String,
409    version: String,
410    env: String,
411    apm_config: ApmConfig,
412    otlp_traces: TracesConfig,
413    string_builder: StringBuilder,
414    error_tracking_standalone: bool,
415    extra_headers: Vec<(HeaderName, HeaderValue)>,
416}
417
418impl TraceEndpointEncoder {
419    fn new(
420        default_hostname: MetaString, version: String, env: String, apm_config: ApmConfig, otlp_traces: TracesConfig,
421    ) -> Self {
422        let error_tracking_standalone = apm_config.error_tracking_standalone_enabled();
423        let extra_headers = if error_tracking_standalone {
424            vec![(
425                HeaderName::from_static("x-datadog-error-tracking-standalone"),
426                HeaderValue::from_static("true"),
427            )]
428        } else {
429            Vec::new()
430        };
431        Self {
432            scratch: ScratchWriter::new(Vec::with_capacity(8192)),
433            agent_hostname: default_hostname.as_ref().to_string(),
434            default_hostname,
435            version,
436            env,
437            apm_config,
438            otlp_traces,
439            string_builder: StringBuilder::new(),
440            error_tracking_standalone,
441            extra_headers,
442        }
443    }
444
445    fn encode_tracer_payload(&mut self, trace: &Trace, output_buffer: &mut Vec<u8>) -> std::io::Result<()> {
446        let sampling_rate = self.sampling_rate();
447        let source = attributes_to_source(&trace.attributes);
448
449        // Resolve metadata from payload fields and attributes.
450        let container_id = if !trace.payload.container_id.is_empty() {
451            Some(trace.payload.container_id.as_ref())
452        } else {
453            None
454        };
455        let lang = if !trace.payload.language_name.is_empty() {
456            Some(trace.payload.language_name.as_ref())
457        } else {
458            None
459        };
460        let tracer_version = format!("otlp-{}", trace.payload.tracer_version.as_ref());
461        let container_tags = resolve_container_tags_from_attrs(
462            &trace.attributes,
463            source.as_ref(),
464            self.otlp_traces.ignore_missing_datadog_fields,
465        );
466        let env = if !trace.payload.env.is_empty() {
467            Some(trace.payload.env.as_ref())
468        } else if self.otlp_traces.ignore_missing_datadog_fields {
469            Some("")
470        } else {
471            None
472        };
473        let hostname = resolve_hostname_from_payload(
474            trace.payload.hostname.as_ref(),
475            source.as_ref(),
476            Some(self.default_hostname.as_ref()),
477            self.otlp_traces.ignore_missing_datadog_fields,
478        );
479        let app_version = if !trace.payload.app_version.is_empty() {
480            Some(trace.payload.app_version.as_ref())
481        } else {
482            None
483        };
484
485        // Resolve sampling metadata from flat trace fields.
486        let priority = trace.priority.unwrap_or(DEFAULT_CHUNK_PRIORITY);
487        let dropped_trace = trace.dropped_trace;
488        let decision_maker = trace.decision_maker.as_deref();
489        let otlp_sr = trace.otlp_sampling_rate.unwrap_or(sampling_rate);
490
491        // Now incrementally build the payload.
492        let mut ap_builder = AgentPayloadBuilder::new(&mut self.scratch);
493
494        ap_builder
495            .host_name(&self.agent_hostname)?
496            .env(&self.env)?
497            .agent_version(&self.version)?
498            .target_tps(self.apm_config.target_traces_per_second())?
499            .error_tps(self.apm_config.errors_per_second())?;
500
501        ap_builder.add_tracer_payloads(|tp| {
502            if let Some(cid) = container_id {
503                tp.container_id(cid)?;
504            }
505            if let Some(l) = lang {
506                tp.language_name(l)?;
507            }
508            tp.tracer_version(&tracer_version)?;
509
510            // Encode the single TraceChunk containing all spans.
511            tp.add_chunks(|chunk| {
512                chunk.priority(priority)?;
513
514                for span in trace.spans() {
515                    chunk.add_spans(|s| {
516                        s.service(span.service())?
517                            .name(span.name())?
518                            .resource(span.resource())?
519                            .trace_id(trace.trace_id_low)?
520                            .span_id(span.span_id())?
521                            .parent_id(span.parent_id())?
522                            .start(span.start() as i64)?
523                            .duration(span.duration() as i64)?
524                            .error(span.error())?;
525
526                        {
527                            let mut meta = s.meta();
528                            for (k, v) in &span.attributes {
529                                match v {
530                                    AttributeValue::String(str_val) => {
531                                        meta.write_entry(k.as_ref(), str_val.as_ref())?;
532                                    }
533                                    AttributeValue::Bool(b) => {
534                                        meta.write_entry(k.as_ref(), if *b { "true" } else { "false" })?;
535                                    }
536                                    _ => {}
537                                }
538                            }
539                        }
540
541                        {
542                            let mut metrics = s.metrics();
543                            for (k, v) in &span.attributes {
544                                match v {
545                                    AttributeValue::Float(f) => metrics.write_entry(k.as_ref(), *f)?,
546                                    AttributeValue::Int(i) => metrics.write_entry(k.as_ref(), *i as f64)?,
547                                    _ => {}
548                                }
549                            }
550                        }
551
552                        s.type_(span.span_type())?;
553
554                        {
555                            let mut ms = s.meta_struct();
556                            for (k, v) in &span.attributes {
557                                // TODO: Array and KeyValueList could be JSON-serialized into meta_struct;
558                                // skipped until a caller needs them for span-level attributes.
559                                if let AttributeValue::Bytes(bytes) = v {
560                                    ms.write_entry(k.as_ref(), bytes.as_slice())?;
561                                }
562                            }
563                        }
564
565                        for link in span.span_links() {
566                            s.add_span_links(|sl| {
567                                sl.trace_id(link.trace_id())?
568                                    .trace_id_high(link.trace_id_high())?
569                                    .span_id(link.span_id())?;
570                                {
571                                    // TODO: investigate whether non-String attribute values can be
572                                    // serialized directly into the link attributes proto field rather
573                                    // than being silently dropped here.
574                                    let mut attrs = sl.attributes();
575                                    for (k, v) in link.attributes() {
576                                        if let AttributeValue::String(str_val) = v {
577                                            attrs.write_entry(k.as_ref(), str_val.as_ref())?;
578                                        }
579                                    }
580                                }
581                                let tracestate = link.tracestate().to_string();
582                                sl.tracestate(tracestate.as_str())?.flags(link.flags())?;
583                                Ok(())
584                            })?;
585                        }
586
587                        for event in span.span_events() {
588                            s.add_span_events(|se| {
589                                se.time_unix_nano(event.time_unix_nano())?.name(event.name())?;
590                                {
591                                    let mut attrs = se.attributes();
592                                    for (k, v) in event.attributes() {
593                                        attrs.write_entry(&**k, |av| encode_attribute_value(av, v))?;
594                                    }
595                                }
596                                Ok(())
597                            })?;
598                        }
599
600                        Ok(())
601                    })?;
602                }
603
604                // Chunk tags.
605                {
606                    let mut tags = chunk.tags();
607                    if let Some(dm) = decision_maker {
608                        tags.write_entry(TAG_DECISION_MAKER, dm)?;
609                    }
610                    if self.error_tracking_standalone {
611                        let trace_has_error = trace.spans().iter().any(|span| {
612                            span.error() != 0
613                                || span
614                                    .attributes
615                                    .get("_dd.span_events.has_exception")
616                                    .and_then(AttributeValue::as_string)
617                                    .is_some_and(|v| v == "true")
618                        });
619                        if trace_has_error {
620                            tags.write_entry("_dd.error_tracking_standalone.error", "true")?;
621                        }
622                    }
623
624                    self.string_builder.clear();
625                    write!(&mut self.string_builder, "{:.2}", otlp_sr)
626                        .expect("should never fail to format sampling rate");
627                    tags.write_entry(TAG_OTLP_SAMPLING_RATE, self.string_builder.as_str())?;
628                }
629
630                if dropped_trace {
631                    chunk.dropped_trace(true)?;
632                }
633
634                Ok(())
635            })?;
636
637            // Tracer payload tags.
638            if let Some(ct) = container_tags {
639                let mut tags = tp.tags();
640                tags.write_entry(CONTAINER_TAGS_META_KEY, &*ct)?;
641            }
642
643            if let Some(e) = env {
644                tp.env(e)?;
645            }
646            if let Some(h) = hostname {
647                tp.hostname(h)?;
648            }
649            if let Some(av) = app_version {
650                tp.app_version(av)?;
651            }
652
653            Ok(())
654        })?;
655
656        ap_builder.finish(output_buffer)?;
657
658        Ok(())
659    }
660
661    fn sampling_rate(&self) -> f64 {
662        let rate = self.otlp_traces.probabilistic_sampler.sampling_percentage / 100.0;
663        if rate <= 0.0 || rate >= 1.0 {
664            return 1.0;
665        }
666        rate
667    }
668}
669
670impl EndpointEncoder for TraceEndpointEncoder {
671    type Input = Trace;
672    type EncodeError = std::io::Error;
673    fn encoder_name() -> &'static str {
674        "traces"
675    }
676
677    fn compressed_size_limit(&self) -> usize {
678        DEFAULT_INTAKE_COMPRESSED_SIZE_LIMIT
679    }
680
681    fn uncompressed_size_limit(&self) -> usize {
682        DEFAULT_INTAKE_UNCOMPRESSED_SIZE_LIMIT
683    }
684
685    fn encode(&mut self, trace: &Self::Input, buffer: &mut Vec<u8>) -> Result<(), Self::EncodeError> {
686        self.encode_tracer_payload(trace, buffer)
687    }
688
689    fn endpoint_uri(&self) -> Uri {
690        PathAndQuery::from_static("/api/v0.2/traces").into()
691    }
692
693    fn endpoint_method(&self) -> Method {
694        Method::POST
695    }
696
697    fn content_type(&self) -> HeaderValue {
698        CONTENT_TYPE_PROTOBUF.clone()
699    }
700
701    fn additional_headers(&self) -> &[(HeaderName, HeaderValue)] {
702        &self.extra_headers
703    }
704}
705
706fn encode_attribute_value<S: ScratchBuffer>(
707    builder: &mut AttributeAnyValueBuilder<'_, S>, value: &AttributeValue,
708) -> std::io::Result<()> {
709    match value {
710        AttributeValue::String(v) => {
711            builder.type_(AttributeAnyValueType::STRING_VALUE)?.string_value(v)?;
712        }
713        AttributeValue::Bool(v) => {
714            builder.type_(AttributeAnyValueType::BOOL_VALUE)?.bool_value(*v)?;
715        }
716        AttributeValue::Int(v) => {
717            builder.type_(AttributeAnyValueType::INT_VALUE)?.int_value(*v)?;
718        }
719        AttributeValue::Float(v) => {
720            builder.type_(AttributeAnyValueType::DOUBLE_VALUE)?.double_value(*v)?;
721        }
722        AttributeValue::Bytes(_) => {
723            // Bytes are not directly representable in OTLP AnyValue; skip.
724        }
725        AttributeValue::Array(values) => {
726            builder.type_(AttributeAnyValueType::ARRAY_VALUE)?.array_value(|arr| {
727                for val in values {
728                    arr.add_values(|av| encode_attribute_array_value(av, val))?;
729                }
730                Ok(())
731            })?;
732        }
733        AttributeValue::KeyValueList(_) => {
734            // KVList encoding not needed for this encoder path; skip.
735        }
736    }
737    Ok(())
738}
739
740fn encode_attribute_array_value<S: ScratchBuffer>(
741    builder: &mut AttributeArrayValueBuilder<'_, S>, value: &AttributeValue,
742) -> std::io::Result<()> {
743    match value {
744        AttributeValue::String(v) => {
745            builder.type_(AttributeArrayValueType::STRING_VALUE)?.string_value(v)?;
746        }
747        AttributeValue::Bool(v) => {
748            builder.type_(AttributeArrayValueType::BOOL_VALUE)?.bool_value(*v)?;
749        }
750        AttributeValue::Int(v) => {
751            builder.type_(AttributeArrayValueType::INT_VALUE)?.int_value(*v)?;
752        }
753        AttributeValue::Float(v) => {
754            builder.type_(AttributeArrayValueType::DOUBLE_VALUE)?.double_value(*v)?;
755        }
756        AttributeValue::Bytes(_) | AttributeValue::Array(_) | AttributeValue::KeyValueList(_) => {
757            // Nested complex values not representable in OTLP array; skip.
758        }
759    }
760    Ok(())
761}
762
763fn resolve_hostname_from_payload<'a>(
764    payload_hostname: &'a str, source: Option<&'a OtlpSource>, default_hostname: Option<&'a str>,
765    ignore_missing_fields: bool,
766) -> Option<&'a str> {
767    if !payload_hostname.is_empty() {
768        return Some(payload_hostname);
769    }
770    if ignore_missing_fields {
771        return Some("");
772    }
773    match source {
774        Some(src) => match src.kind {
775            OtlpSourceKind::HostnameKind => Some(src.identifier.as_str()),
776            _ => Some(""),
777        },
778        None => default_hostname,
779    }
780}
781
782fn resolve_container_tags_from_attrs(
783    attributes: &FastHashMap<MetaString, AttributeValue>, source: Option<&OtlpSource>, ignore_missing_fields: bool,
784) -> Option<MetaString> {
785    if let Some(AttributeValue::String(tags)) = attributes.get(KEY_DATADOG_CONTAINER_TAGS) {
786        if !tags.is_empty() {
787            return Some(tags.clone());
788        }
789    }
790
791    if ignore_missing_fields {
792        return None;
793    }
794    let mut container_tags = TagSet::default();
795    extract_container_tags_from_attributes_map(attributes, &mut container_tags);
796    let is_fargate_source = source.is_some_and(|src| src.kind == OtlpSourceKind::AwsEcsFargateKind);
797    if container_tags.is_empty() && !is_fargate_source {
798        return None;
799    }
800
801    let mut flattened = flatten_container_tag(container_tags);
802    if is_fargate_source {
803        if let Some(src) = source {
804            append_tags(&mut flattened, &src.tag());
805        }
806    }
807
808    if flattened.is_empty() {
809        None
810    } else {
811        Some(MetaString::from(flattened))
812    }
813}
814
815fn flatten_container_tag(tags: TagSet) -> String {
816    let mut flattened = String::new();
817    for tag in tags {
818        if !flattened.is_empty() {
819            flattened.push(',');
820        }
821        flattened.push_str(tag.as_str());
822    }
823    flattened
824}
825
826fn append_tags(target: &mut String, tags: &str) {
827    if tags.is_empty() {
828        return;
829    }
830    if !target.is_empty() {
831        target.push(',');
832    }
833    target.push_str(tags);
834}
835
836#[cfg(test)]
837mod tests {
838    use datadog_protos::traces::AgentPayload;
839    use protobuf::Message as _;
840    use saluki_config::ConfigurationLoader;
841    use saluki_core::data_model::event::trace::{Span as DdSpan, Trace};
842    use stringtheory::MetaString;
843
844    use super::*;
845    use crate::common::datadog::apm::ApmConfig;
846    use crate::common::otlp::config::TracesConfig;
847    use crate::config::{DatadogRemapper, KEY_ALIASES};
848
849    async fn make_encoder(ets_enabled: bool) -> TraceEndpointEncoder {
850        let env_vars: Vec<(String, String)> = if ets_enabled {
851            vec![("APM_ERROR_TRACKING_STANDALONE_ENABLED".to_string(), "true".to_string())]
852        } else {
853            vec![]
854        };
855        let (cfg, _) = ConfigurationLoader::for_tests_with_provider_factory(
856            None,
857            Some(&env_vars),
858            false,
859            KEY_ALIASES,
860            DatadogRemapper::new,
861        )
862        .await;
863        let apm_config = ApmConfig::from_configuration(&cfg).expect("ApmConfig should deserialize");
864        TraceEndpointEncoder::new(
865            MetaString::from("test-host"),
866            "0.0.0".to_string(),
867            "none".to_string(),
868            apm_config,
869            TracesConfig::default(),
870        )
871    }
872
873    fn make_trace() -> Trace {
874        let span = DdSpan::new(
875            MetaString::from("svc"),
876            MetaString::from("op"),
877            MetaString::from("res"),
878            MetaString::from("web"),
879            1,    // span_id
880            0,    // parent_id
881            0,    // start
882            1000, // duration
883            0,    // error
884        );
885        let mut trace = Trace::new(vec![span]);
886        trace.priority = Some(1);
887        trace
888    }
889
890    fn make_error_trace() -> Trace {
891        let span = DdSpan::new(
892            MetaString::from("svc"),
893            MetaString::from("op"),
894            MetaString::from("res"),
895            MetaString::from("web"),
896            1,    // span_id
897            0,    // parent_id
898            0,    // start
899            1000, // duration
900            1,    // error
901        );
902        let mut trace = Trace::new(vec![span]);
903        trace.priority = Some(1);
904        trace
905    }
906
907    #[tokio::test]
908    async fn ets_header_present_when_enabled() {
909        let encoder = make_encoder(true).await;
910        let headers = encoder.additional_headers();
911        assert_eq!(headers.len(), 1);
912        assert_eq!(headers[0].0.as_str(), "x-datadog-error-tracking-standalone");
913        assert_eq!(headers[0].1, "true");
914    }
915
916    #[tokio::test]
917    async fn ets_header_absent_when_disabled() {
918        let encoder = make_encoder(false).await;
919        assert!(encoder.additional_headers().is_empty());
920    }
921
922    #[tokio::test]
923    async fn ets_chunk_tag_present_for_error_trace() {
924        let mut encoder = make_encoder(true).await;
925        let trace = make_error_trace();
926        let mut buf = Vec::new();
927        encoder.encode(&trace, &mut buf).expect("encode should succeed");
928        let payload = AgentPayload::parse_from_bytes(&buf).expect("should parse AgentPayload");
929        let tag_value = payload
930            .tracerPayloads
931            .iter()
932            .flat_map(|tp| tp.chunks.iter())
933            .find_map(|chunk| {
934                chunk
935                    .tags
936                    .get("_dd.error_tracking_standalone.error")
937                    .map(|v| v.as_str())
938            });
939        assert_eq!(
940            tag_value,
941            Some("true"),
942            "ETS chunk tag should be present for error traces when ETS is enabled"
943        );
944    }
945
946    #[tokio::test]
947    async fn ets_chunk_tag_absent_for_non_error_trace() {
948        let mut encoder = make_encoder(true).await;
949        let trace = make_trace(); // no error
950        let mut buf = Vec::new();
951        encoder.encode(&trace, &mut buf).expect("encode should succeed");
952        let payload = AgentPayload::parse_from_bytes(&buf).expect("should parse AgentPayload");
953        let has_tag = payload
954            .tracerPayloads
955            .iter()
956            .flat_map(|tp| tp.chunks.iter())
957            .any(|chunk| chunk.tags.contains_key("_dd.error_tracking_standalone.error"));
958        assert!(!has_tag, "ETS chunk tag should be absent for non-error traces");
959    }
960
961    #[tokio::test]
962    async fn ets_chunk_tag_absent_when_disabled() {
963        let mut encoder = make_encoder(false).await;
964        let trace = make_trace();
965        let mut buf = Vec::new();
966        encoder.encode(&trace, &mut buf).expect("encode should succeed");
967        let payload = AgentPayload::parse_from_bytes(&buf).expect("should parse AgentPayload");
968        let has_tag = payload
969            .tracerPayloads
970            .iter()
971            .flat_map(|tp| tp.chunks.iter())
972            .any(|chunk| chunk.tags.contains_key("_dd.error_tracking_standalone.error"));
973        assert!(!has_tag, "ETS chunk tag should be absent when ETS is disabled");
974    }
975}
976
977#[cfg(test)]
978mod config_smoke {
979    use datadog_agent_config_testing::config_registry::structs;
980    use datadog_agent_config_testing::run_config_smoke_tests;
981    use serde_json::json;
982
983    use super::DatadogTraceConfiguration;
984    use crate::config::{DatadogRemapper, KEY_ALIASES};
985
986    #[tokio::test]
987    async fn smoke_test() {
988        run_config_smoke_tests(
989            structs::DATADOG_TRACE_CONFIGURATION,
990            &[],
991            json!({}),
992            |cfg| {
993                cfg.as_typed::<DatadogTraceConfiguration>()
994                    .expect("DatadogTraceConfiguration should deserialize")
995            },
996            KEY_ALIASES,
997            DatadogRemapper::new,
998        )
999        .await
1000    }
1001}