Skip to main content

saluki_components/encoders/datadog/traces/
mod.rs

1#![allow(dead_code)]
2
3use std::{fmt::Write, time::Duration};
4
5use async_trait::async_trait;
6use datadog_protos::traces::builders::{
7    attribute_any_value::AttributeAnyValueType, attribute_array_value::AttributeArrayValueType, AgentPayloadBuilder,
8    AttributeAnyValueBuilder, AttributeArrayValueBuilder,
9};
10use facet::Facet;
11use http::{uri::PathAndQuery, HeaderName, HeaderValue, Method, Uri};
12use piecemeal::{ScratchBuffer, ScratchWriter};
13use resource_accounting::{MemoryBounds, MemoryBoundsBuilder};
14use saluki_common::collections::FastHashMap;
15use saluki_common::strings::StringBuilder;
16use saluki_common::task::HandleExt as _;
17use saluki_config::GenericConfiguration;
18use saluki_context::tags::TagSet;
19use saluki_core::data_model::event::trace::AttributeValue;
20use saluki_core::topology::{EventsBuffer, PayloadsBuffer};
21use saluki_core::{
22    components::{encoders::*, ComponentContext},
23    data_model::{
24        event::{trace::Trace, EventType},
25        payload::{HttpPayload, Payload, PayloadMetadata, PayloadType},
26    },
27    observability::ComponentMetricsExt as _,
28};
29use saluki_env::host::providers::BoxedHostProvider;
30use saluki_env::{EnvironmentProvider, HostProvider};
31use saluki_error::generic_error;
32use saluki_error::{ErrorContext as _, GenericError};
33use saluki_io::compression::CompressionScheme;
34use saluki_metrics::MetricsBuilder;
35use serde::Deserialize;
36use stringtheory::MetaString;
37use tokio::{
38    select,
39    sync::mpsc::{self, Receiver, Sender},
40    time::sleep,
41};
42use tracing::{debug, error};
43
44use crate::common::datadog::{
45    apm::ApmConfig,
46    io::RB_BUFFER_CHUNK_SIZE,
47    request_builder::{EndpointEncoder, RequestBuilder},
48    telemetry::ComponentTelemetry,
49    DEFAULT_INTAKE_COMPRESSED_SIZE_LIMIT, DEFAULT_INTAKE_UNCOMPRESSED_SIZE_LIMIT, TAG_DECISION_MAKER,
50};
51use crate::common::otlp::config::TracesConfig;
52use crate::common::otlp::util::{
53    attributes_to_source, extract_container_tags_from_attributes_map, Source as OtlpSource,
54    SourceKind as OtlpSourceKind, KEY_DATADOG_CONTAINER_TAGS,
55};
56
57const CONTAINER_TAGS_META_KEY: &str = "_dd.tags.container";
58const MAX_TRACES_PER_PAYLOAD: usize = 10000;
59static CONTENT_TYPE_PROTOBUF: HeaderValue = HeaderValue::from_static("application/x-protobuf");
60
61// Sampling metadata keys / values.
62const TAG_OTLP_SAMPLING_RATE: &str = "_dd.otlp_sr";
63const DEFAULT_CHUNK_PRIORITY: i32 = 1; // PRIORITY_AUTO_KEEP
64
65fn default_serializer_compressor_kind() -> String {
66    "zstd".to_string()
67}
68
69const fn default_zstd_compressor_level() -> i32 {
70    3
71}
72
73const fn default_flush_timeout_secs() -> u64 {
74    2
75}
76
77fn default_env() -> String {
78    "none".to_string()
79}
80
81/// Configuration for the Datadog Traces encoder.
82///
83/// This encoder converts trace events into Datadog's TracerPayload protobuf format and sends them
84/// to the Datadog traces intake endpoint (`/api/v0.2/traces`). It handles batching, compression,
85/// and enrichment with metadata such as hostname, environment, and container tags.
86#[derive(Deserialize, Facet)]
87#[cfg_attr(test, derive(Debug, PartialEq, serde::Serialize))]
88pub struct DatadogTraceConfiguration {
89    #[serde(
90        rename = "serializer_compressor_kind",  // renames the field in the user_configuration from "serializer_compressor_kind" to "compressor_kind".
91        default = "default_serializer_compressor_kind"
92    )]
93    compressor_kind: String,
94
95    #[serde(
96        rename = "serializer_zstd_compressor_level",
97        default = "default_zstd_compressor_level"
98    )]
99    zstd_compressor_level: i32,
100
101    /// Flush timeout for pending requests, in seconds.
102    ///
103    /// When the encoder has written traces to the in-flight request payload, but it hasn't yet reached the
104    /// payload size limits that would force the payload to be flushed, the encoder will wait for a period of time
105    /// before flushing the in-flight request payload.
106    ///
107    /// Defaults to 2 seconds.
108    #[serde(default = "default_flush_timeout_secs")]
109    flush_timeout_secs: u64,
110
111    #[serde(skip)]
112    default_hostname: Option<String>,
113
114    #[serde(skip)]
115    version: String,
116
117    #[serde(skip)]
118    #[facet(opaque)]
119    apm_config: ApmConfig,
120
121    #[serde(skip)]
122    #[facet(opaque)]
123    otlp_traces: TracesConfig,
124
125    #[serde(default = "default_env")]
126    env: String,
127}
128
129impl DatadogTraceConfiguration {
130    /// Creates a new `DatadogTraceConfiguration` from the given configuration.
131    pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
132        let mut trace_config: Self = config.as_typed()?;
133
134        let app_details = saluki_metadata::get_app_details();
135        trace_config.version = format!("agent-data-plane/{}", app_details.version().raw());
136
137        trace_config.apm_config = ApmConfig::from_configuration(config)?;
138        trace_config.otlp_traces = config.try_get_typed("otlp_config.traces")?.unwrap_or_default();
139
140        Ok(trace_config)
141    }
142}
143
144impl DatadogTraceConfiguration {
145    /// Sets the `default_hostname` using the environment provider
146    pub async fn with_environment_provider<E>(mut self, environment_provider: E) -> Result<Self, GenericError>
147    where
148        E: EnvironmentProvider<Host = BoxedHostProvider>,
149    {
150        let host_provider = environment_provider.host();
151        let hostname = host_provider.get_hostname().await?;
152        self.default_hostname = Some(hostname);
153        Ok(self)
154    }
155}
156
157#[async_trait]
158impl EncoderBuilder for DatadogTraceConfiguration {
159    fn input_event_type(&self) -> EventType {
160        EventType::Trace
161    }
162
163    fn output_payload_type(&self) -> PayloadType {
164        PayloadType::Http
165    }
166
167    async fn build(&self, context: ComponentContext) -> Result<Box<dyn Encoder + Send>, GenericError> {
168        let metrics_builder = MetricsBuilder::from_component_context(&context);
169        let telemetry = ComponentTelemetry::from_builder(&metrics_builder);
170        let compression_scheme = CompressionScheme::new(&self.compressor_kind, self.zstd_compressor_level);
171
172        let default_hostname = self.default_hostname.clone().unwrap_or_default();
173        let default_hostname = MetaString::from(default_hostname);
174
175        // Create request builder for traces which is used to generate HTTP requests.
176
177        let mut trace_rb = RequestBuilder::new(
178            TraceEndpointEncoder::new(
179                default_hostname,
180                self.version.clone(),
181                self.env.clone(),
182                self.apm_config.clone(),
183                self.otlp_traces.clone(),
184            ),
185            compression_scheme,
186            RB_BUFFER_CHUNK_SIZE,
187        )
188        .await?;
189        trace_rb.with_max_inputs_per_payload(MAX_TRACES_PER_PAYLOAD);
190
191        let flush_timeout = match self.flush_timeout_secs {
192            // We always give ourselves a minimum flush timeout of 10ms to allow for some very minimal amount of
193            // batching, while still practically flushing things almost immediately.
194            0 => Duration::from_millis(10),
195            secs => Duration::from_secs(secs),
196        };
197
198        Ok(Box::new(DatadogTrace {
199            trace_rb,
200            telemetry,
201            flush_timeout,
202        }))
203    }
204}
205
206impl MemoryBounds for DatadogTraceConfiguration {
207    fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
208        // TODO: How do we properly represent the requests we can generate that may be sitting around in-flight?
209        builder
210            .minimum()
211            .with_single_value::<DatadogTrace>("component struct")
212            .with_array::<EventsBuffer>("request builder events channel", 8)
213            .with_array::<PayloadsBuffer>("request builder payloads channel", 8);
214
215        builder
216            .firm()
217            .with_array::<Trace>("traces split re-encode buffer", MAX_TRACES_PER_PAYLOAD);
218    }
219}
220
221pub struct DatadogTrace {
222    trace_rb: RequestBuilder<TraceEndpointEncoder>,
223    telemetry: ComponentTelemetry,
224    flush_timeout: Duration,
225}
226
227// Encodes Trace events to TracerPayloads.
228#[async_trait]
229impl Encoder for DatadogTrace {
230    async fn run(mut self: Box<Self>, mut context: EncoderContext) -> Result<(), GenericError> {
231        let Self {
232            trace_rb,
233            telemetry,
234            flush_timeout,
235        } = *self;
236
237        let mut health = context.take_health_handle();
238
239        // The encoder runs two async loops, the main encoder loop and the request builder loop,
240        // this channel is used to send events from the main encoder loop to the request builder loop safely.
241        let (events_tx, events_rx) = mpsc::channel(8);
242        // adds a channel to send payloads to the dispatcher and a channel to receive them.
243        let (payloads_tx, mut payloads_rx) = mpsc::channel(8);
244        let request_builder_fut = run_request_builder(trace_rb, telemetry, events_rx, payloads_tx, flush_timeout);
245        // Spawn the request builder task on the global thread pool, this task is responsible for encoding traces and flushing requests.
246        let request_builder_handle = context
247            .topology_context()
248            .global_thread_pool() // Use the shared Tokio runtime thread pool.
249            .spawn_traced_named("dd-traces-request-builder", request_builder_fut);
250
251        health.mark_ready();
252        debug!("Datadog Trace encoder started.");
253
254        loop {
255            select! {
256                biased; // makes the branches of the select statement be evaluated in order.
257
258                _ = health.live() => continue,
259                maybe_payload = payloads_rx.recv() => match maybe_payload {
260                    Some(payload) => {
261                        // Dispatch an HTTP payload to the dispatcher.
262                        if let Err(e) = context.dispatcher().dispatch(payload).await {
263                            error!("Failed to dispatch payload: {}", e);
264                        }
265                    }
266                    None => break,
267                },
268                maybe_event_buffer = context.events().next() => match maybe_event_buffer {
269                    Some(event_buffer) => events_tx.send(event_buffer).await
270                        .error_context("Failed to send event buffer to request builder task.")?,
271                    None => break,
272                },
273            }
274        }
275
276        // Drop the events sender, which signals the request builder task to stop.
277        drop(events_tx);
278
279        // Continue draining the payloads receiver until it is closed.
280        while let Some(payload) = payloads_rx.recv().await {
281            if let Err(e) = context.dispatcher().dispatch(payload).await {
282                error!("Failed to dispatch payload: {}", e);
283            }
284        }
285
286        // Request build task should now be stopped.
287        match request_builder_handle.await {
288            Ok(Ok(())) => debug!("Request builder task stopped."),
289            Ok(Err(e)) => error!(error = %e, "Request builder task failed."),
290            Err(e) => error!(error = %e, "Request builder task panicked."),
291        }
292
293        debug!("Datadog Trace encoder stopped.");
294
295        Ok(())
296    }
297}
298
299async fn run_request_builder(
300    mut trace_request_builder: RequestBuilder<TraceEndpointEncoder>, telemetry: ComponentTelemetry,
301    mut events_rx: Receiver<EventsBuffer>, payloads_tx: Sender<PayloadsBuffer>, flush_timeout: std::time::Duration,
302) -> Result<(), GenericError> {
303    let mut pending_flush = false;
304    let pending_flush_timeout = sleep(flush_timeout);
305    tokio::pin!(pending_flush_timeout);
306
307    loop {
308        select! {
309            Some(event_buffer) = events_rx.recv() => {
310                for event in event_buffer {
311                    let trace = match event.try_into_trace() {
312                        Some(trace) => trace,
313                        None => continue,
314                    };
315                    // Encode the trace. If we get it back, that means the current request is full, and we need to
316                    // flush it before we can try to encode the trace again.
317                    let trace_to_retry = match trace_request_builder.encode(trace).await {
318                        Ok(None) => continue,
319                        Ok(Some(trace)) => trace,
320                        Err(e) => {
321                            error!(error = %e, "Failed to encode trace.");
322                            telemetry.events_dropped_encoder().increment(1);
323                            continue;
324                        }
325                    };
326
327                    let maybe_requests = trace_request_builder.flush().await;
328                    if maybe_requests.is_empty() {
329                        panic!("builder told us to flush, but gave us nothing");
330                    }
331
332                    for maybe_request in maybe_requests {
333                        match maybe_request {
334                            Ok((events, _data_points, request)) => {
335                                let payload_meta = PayloadMetadata::from_event_count(events);
336                                let http_payload = HttpPayload::new(payload_meta, request);
337                                let payload = Payload::Http(http_payload);
338
339                                payloads_tx.send(payload).await
340                                    .map_err(|_| generic_error!("Failed to send payload to encoder."))?;
341                            },
342                            Err(e) => if e.is_recoverable() {
343                                // If the error is recoverable, we'll hold on to the trace to retry it later.
344                                continue;
345                            } else {
346                                return Err(GenericError::from(e).context("Failed to flush request."));
347                            }
348                        }
349                    }
350
351                    // Now try to encode the trace again.
352                    if let Err(e) = trace_request_builder.encode(trace_to_retry).await {
353                        error!(error = %e, "Failed to encode trace.");
354                        telemetry.events_dropped_encoder().increment(1);
355                    }
356                }
357
358                debug!("Processed event buffer.");
359
360                // If we're not already pending a flush, we'll start the countdown.
361                if !pending_flush {
362                    pending_flush_timeout.as_mut().reset(tokio::time::Instant::now() + flush_timeout);
363                    pending_flush = true;
364                }
365            },
366            _ = &mut pending_flush_timeout, if pending_flush => {
367                debug!("Flushing pending request(s).");
368
369                pending_flush = false;
370
371                // Once we've encoded and written all traces, we flush the request builders to generate a request with
372                // anything left over. Again, we'll enqueue those requests to be sent immediately.
373                let maybe_trace_requests = trace_request_builder.flush().await;
374                for maybe_request in maybe_trace_requests {
375                    match maybe_request {
376                        Ok((events, _data_points, request)) => {
377                            let payload_meta = PayloadMetadata::from_event_count(events);
378                            let http_payload = HttpPayload::new(payload_meta, request);
379                            let payload = Payload::Http(http_payload);
380
381                            payloads_tx.send(payload).await
382                                .map_err(|_| generic_error!("Failed to send payload to encoder."))?;
383                        },
384                        Err(e) => if e.is_recoverable() {
385                            continue;
386                        } else {
387                            return Err(GenericError::from(e).context("Failed to flush request."));
388                        }
389                    }
390                }
391
392                debug!("All flushed requests sent to I/O task. Waiting for next event buffer...");
393            },
394
395            // Event buffers channel has been closed, and we have no pending flushing, so we're all done.
396            else => break,
397        }
398    }
399
400    Ok(())
401}
402
403#[derive(Debug)]
404struct TraceEndpointEncoder {
405    scratch: ScratchWriter<Vec<u8>>,
406    default_hostname: MetaString,
407    agent_hostname: String,
408    version: String,
409    env: String,
410    apm_config: ApmConfig,
411    otlp_traces: TracesConfig,
412    string_builder: StringBuilder,
413    error_tracking_standalone: bool,
414    extra_headers: Vec<(HeaderName, HeaderValue)>,
415}
416
417impl TraceEndpointEncoder {
418    fn new(
419        default_hostname: MetaString, version: String, env: String, apm_config: ApmConfig, otlp_traces: TracesConfig,
420    ) -> Self {
421        let error_tracking_standalone = apm_config.error_tracking_standalone_enabled();
422        let extra_headers = if error_tracking_standalone {
423            vec![(
424                HeaderName::from_static("x-datadog-error-tracking-standalone"),
425                HeaderValue::from_static("true"),
426            )]
427        } else {
428            Vec::new()
429        };
430        Self {
431            scratch: ScratchWriter::new(Vec::with_capacity(8192)),
432            agent_hostname: default_hostname.as_ref().to_string(),
433            default_hostname,
434            version,
435            env,
436            apm_config,
437            otlp_traces,
438            string_builder: StringBuilder::new(),
439            error_tracking_standalone,
440            extra_headers,
441        }
442    }
443
444    fn encode_tracer_payload(&mut self, trace: &Trace, output_buffer: &mut Vec<u8>) -> std::io::Result<()> {
445        let sampling_rate = self.sampling_rate();
446        let source = attributes_to_source(&trace.attributes);
447
448        // Resolve metadata from payload fields and attributes.
449        let container_id = if !trace.payload.container_id.is_empty() {
450            Some(trace.payload.container_id.as_ref())
451        } else {
452            None
453        };
454        let lang = if !trace.payload.language_name.is_empty() {
455            Some(trace.payload.language_name.as_ref())
456        } else {
457            None
458        };
459        let tracer_version = format!("otlp-{}", trace.payload.tracer_version.as_ref());
460        let container_tags = resolve_container_tags_from_attrs(
461            &trace.attributes,
462            source.as_ref(),
463            self.otlp_traces.ignore_missing_datadog_fields,
464        );
465        let env = if !trace.payload.env.is_empty() {
466            Some(trace.payload.env.as_ref())
467        } else if self.otlp_traces.ignore_missing_datadog_fields {
468            Some("")
469        } else {
470            None
471        };
472        let hostname = resolve_hostname_from_payload(
473            trace.payload.hostname.as_ref(),
474            source.as_ref(),
475            Some(self.default_hostname.as_ref()),
476            self.otlp_traces.ignore_missing_datadog_fields,
477        );
478        let app_version = if !trace.payload.app_version.is_empty() {
479            Some(trace.payload.app_version.as_ref())
480        } else {
481            None
482        };
483
484        // Resolve sampling metadata from flat trace fields.
485        let priority = trace.priority.unwrap_or(DEFAULT_CHUNK_PRIORITY);
486        let dropped_trace = trace.dropped_trace;
487        let decision_maker = trace.decision_maker.as_deref();
488        let otlp_sr = trace.otlp_sampling_rate.unwrap_or(sampling_rate);
489
490        // Now incrementally build the payload.
491        let mut ap_builder = AgentPayloadBuilder::new(&mut self.scratch);
492
493        ap_builder
494            .host_name(&self.agent_hostname)?
495            .env(&self.env)?
496            .agent_version(&self.version)?
497            .target_tps(self.apm_config.target_traces_per_second())?
498            .error_tps(self.apm_config.errors_per_second())?;
499
500        ap_builder.add_tracer_payloads(|tp| {
501            if let Some(cid) = container_id {
502                tp.container_id(cid)?;
503            }
504            if let Some(l) = lang {
505                tp.language_name(l)?;
506            }
507            tp.tracer_version(&tracer_version)?;
508
509            // Encode the single TraceChunk containing all spans.
510            tp.add_chunks(|chunk| {
511                chunk.priority(priority)?;
512
513                for span in trace.spans() {
514                    chunk.add_spans(|s| {
515                        s.service(span.service())?
516                            .name(span.name())?
517                            .resource(span.resource())?
518                            .trace_id(trace.trace_id_low)?
519                            .span_id(span.span_id())?
520                            .parent_id(span.parent_id())?
521                            .start(span.start() as i64)?
522                            .duration(span.duration() as i64)?
523                            .error(span.error())?;
524
525                        {
526                            let mut meta = s.meta();
527                            for (k, v) in &span.attributes {
528                                match v {
529                                    AttributeValue::String(str_val) => {
530                                        meta.write_entry(k.as_ref(), str_val.as_ref())?;
531                                    }
532                                    AttributeValue::Bool(b) => {
533                                        meta.write_entry(k.as_ref(), if *b { "true" } else { "false" })?;
534                                    }
535                                    _ => {}
536                                }
537                            }
538                        }
539
540                        {
541                            let mut metrics = s.metrics();
542                            for (k, v) in &span.attributes {
543                                match v {
544                                    AttributeValue::Float(f) => metrics.write_entry(k.as_ref(), *f)?,
545                                    AttributeValue::Int(i) => metrics.write_entry(k.as_ref(), *i as f64)?,
546                                    _ => {}
547                                }
548                            }
549                        }
550
551                        s.type_(span.span_type())?;
552
553                        {
554                            let mut ms = s.meta_struct();
555                            for (k, v) in &span.attributes {
556                                // TODO: Array and KeyValueList could be JSON-serialized into meta_struct;
557                                // skipped until a caller needs them for span-level attributes.
558                                if let AttributeValue::Bytes(bytes) = v {
559                                    ms.write_entry(k.as_ref(), bytes.as_slice())?;
560                                }
561                            }
562                        }
563
564                        for link in span.span_links() {
565                            s.add_span_links(|sl| {
566                                sl.trace_id(link.trace_id())?
567                                    .trace_id_high(link.trace_id_high())?
568                                    .span_id(link.span_id())?;
569                                {
570                                    // TODO: investigate whether non-String attribute values can be
571                                    // serialized directly into the link attributes proto field rather
572                                    // than being silently dropped here.
573                                    let mut attrs = sl.attributes();
574                                    for (k, v) in link.attributes() {
575                                        if let AttributeValue::String(str_val) = v {
576                                            attrs.write_entry(k.as_ref(), str_val.as_ref())?;
577                                        }
578                                    }
579                                }
580                                let tracestate = link.tracestate().to_string();
581                                sl.tracestate(tracestate.as_str())?.flags(link.flags())?;
582                                Ok(())
583                            })?;
584                        }
585
586                        for event in span.span_events() {
587                            s.add_span_events(|se| {
588                                se.time_unix_nano(event.time_unix_nano())?.name(event.name())?;
589                                {
590                                    let mut attrs = se.attributes();
591                                    for (k, v) in event.attributes() {
592                                        attrs.write_entry(&**k, |av| encode_attribute_value(av, v))?;
593                                    }
594                                }
595                                Ok(())
596                            })?;
597                        }
598
599                        Ok(())
600                    })?;
601                }
602
603                // Chunk tags.
604                {
605                    let mut tags = chunk.tags();
606                    if let Some(dm) = decision_maker {
607                        tags.write_entry(TAG_DECISION_MAKER, dm)?;
608                    }
609                    if self.error_tracking_standalone {
610                        let trace_has_error = trace.spans().iter().any(|span| {
611                            span.error() != 0
612                                || span
613                                    .attributes
614                                    .get("_dd.span_events.has_exception")
615                                    .and_then(AttributeValue::as_string)
616                                    .is_some_and(|v| v == "true")
617                        });
618                        if trace_has_error {
619                            tags.write_entry("_dd.error_tracking_standalone.error", "true")?;
620                        }
621                    }
622
623                    self.string_builder.clear();
624                    write!(&mut self.string_builder, "{:.2}", otlp_sr)
625                        .expect("should never fail to format sampling rate");
626                    tags.write_entry(TAG_OTLP_SAMPLING_RATE, self.string_builder.as_str())?;
627                }
628
629                if dropped_trace {
630                    chunk.dropped_trace(true)?;
631                }
632
633                Ok(())
634            })?;
635
636            // Tracer payload tags.
637            if let Some(ct) = container_tags {
638                let mut tags = tp.tags();
639                tags.write_entry(CONTAINER_TAGS_META_KEY, &*ct)?;
640            }
641
642            if let Some(e) = env {
643                tp.env(e)?;
644            }
645            if let Some(h) = hostname {
646                tp.hostname(h)?;
647            }
648            if let Some(av) = app_version {
649                tp.app_version(av)?;
650            }
651
652            Ok(())
653        })?;
654
655        ap_builder.finish(output_buffer)?;
656
657        Ok(())
658    }
659
660    fn sampling_rate(&self) -> f64 {
661        let rate = self.otlp_traces.probabilistic_sampler.sampling_percentage / 100.0;
662        if rate <= 0.0 || rate >= 1.0 {
663            return 1.0;
664        }
665        rate
666    }
667}
668
669impl EndpointEncoder for TraceEndpointEncoder {
670    type Input = Trace;
671    type EncodeError = std::io::Error;
672    fn encoder_name() -> &'static str {
673        "traces"
674    }
675
676    fn compressed_size_limit(&self) -> usize {
677        DEFAULT_INTAKE_COMPRESSED_SIZE_LIMIT
678    }
679
680    fn uncompressed_size_limit(&self) -> usize {
681        DEFAULT_INTAKE_UNCOMPRESSED_SIZE_LIMIT
682    }
683
684    fn encode(&mut self, trace: &Self::Input, buffer: &mut Vec<u8>) -> Result<(), Self::EncodeError> {
685        self.encode_tracer_payload(trace, buffer)
686    }
687
688    fn endpoint_uri(&self) -> Uri {
689        PathAndQuery::from_static("/api/v0.2/traces").into()
690    }
691
692    fn endpoint_method(&self) -> Method {
693        Method::POST
694    }
695
696    fn content_type(&self) -> HeaderValue {
697        CONTENT_TYPE_PROTOBUF.clone()
698    }
699
700    fn additional_headers(&self) -> &[(HeaderName, HeaderValue)] {
701        &self.extra_headers
702    }
703}
704
705fn encode_attribute_value<S: ScratchBuffer>(
706    builder: &mut AttributeAnyValueBuilder<'_, S>, value: &AttributeValue,
707) -> std::io::Result<()> {
708    match value {
709        AttributeValue::String(v) => {
710            builder.type_(AttributeAnyValueType::STRING_VALUE)?.string_value(v)?;
711        }
712        AttributeValue::Bool(v) => {
713            builder.type_(AttributeAnyValueType::BOOL_VALUE)?.bool_value(*v)?;
714        }
715        AttributeValue::Int(v) => {
716            builder.type_(AttributeAnyValueType::INT_VALUE)?.int_value(*v)?;
717        }
718        AttributeValue::Float(v) => {
719            builder.type_(AttributeAnyValueType::DOUBLE_VALUE)?.double_value(*v)?;
720        }
721        AttributeValue::Bytes(_) => {
722            // Bytes are not directly representable in OTLP AnyValue; skip.
723        }
724        AttributeValue::Array(values) => {
725            builder.type_(AttributeAnyValueType::ARRAY_VALUE)?.array_value(|arr| {
726                for val in values {
727                    arr.add_values(|av| encode_attribute_array_value(av, val))?;
728                }
729                Ok(())
730            })?;
731        }
732        AttributeValue::KeyValueList(_) => {
733            // KVList encoding not needed for this encoder path; skip.
734        }
735    }
736    Ok(())
737}
738
739fn encode_attribute_array_value<S: ScratchBuffer>(
740    builder: &mut AttributeArrayValueBuilder<'_, S>, value: &AttributeValue,
741) -> std::io::Result<()> {
742    match value {
743        AttributeValue::String(v) => {
744            builder.type_(AttributeArrayValueType::STRING_VALUE)?.string_value(v)?;
745        }
746        AttributeValue::Bool(v) => {
747            builder.type_(AttributeArrayValueType::BOOL_VALUE)?.bool_value(*v)?;
748        }
749        AttributeValue::Int(v) => {
750            builder.type_(AttributeArrayValueType::INT_VALUE)?.int_value(*v)?;
751        }
752        AttributeValue::Float(v) => {
753            builder.type_(AttributeArrayValueType::DOUBLE_VALUE)?.double_value(*v)?;
754        }
755        AttributeValue::Bytes(_) | AttributeValue::Array(_) | AttributeValue::KeyValueList(_) => {
756            // Nested complex values not representable in OTLP array; skip.
757        }
758    }
759    Ok(())
760}
761
762fn resolve_hostname_from_payload<'a>(
763    payload_hostname: &'a str, source: Option<&'a OtlpSource>, default_hostname: Option<&'a str>,
764    ignore_missing_fields: bool,
765) -> Option<&'a str> {
766    if !payload_hostname.is_empty() {
767        return Some(payload_hostname);
768    }
769    if ignore_missing_fields {
770        return Some("");
771    }
772    match source {
773        Some(src) => match src.kind {
774            OtlpSourceKind::HostnameKind => Some(src.identifier.as_str()),
775            _ => Some(""),
776        },
777        None => default_hostname,
778    }
779}
780
781fn resolve_container_tags_from_attrs(
782    attributes: &FastHashMap<MetaString, AttributeValue>, source: Option<&OtlpSource>, ignore_missing_fields: bool,
783) -> Option<MetaString> {
784    if let Some(AttributeValue::String(tags)) = attributes.get(KEY_DATADOG_CONTAINER_TAGS) {
785        if !tags.is_empty() {
786            return Some(tags.clone());
787        }
788    }
789
790    if ignore_missing_fields {
791        return None;
792    }
793    let mut container_tags = TagSet::default();
794    extract_container_tags_from_attributes_map(attributes, &mut container_tags);
795    let is_fargate_source = source.is_some_and(|src| src.kind == OtlpSourceKind::AwsEcsFargateKind);
796    if container_tags.is_empty() && !is_fargate_source {
797        return None;
798    }
799
800    let mut flattened = flatten_container_tag(container_tags);
801    if is_fargate_source {
802        if let Some(src) = source {
803            append_tags(&mut flattened, &src.tag());
804        }
805    }
806
807    if flattened.is_empty() {
808        None
809    } else {
810        Some(MetaString::from(flattened))
811    }
812}
813
814fn flatten_container_tag(tags: TagSet) -> String {
815    let mut flattened = String::new();
816    for tag in tags {
817        if !flattened.is_empty() {
818            flattened.push(',');
819        }
820        flattened.push_str(tag.as_str());
821    }
822    flattened
823}
824
825fn append_tags(target: &mut String, tags: &str) {
826    if tags.is_empty() {
827        return;
828    }
829    if !target.is_empty() {
830        target.push(',');
831    }
832    target.push_str(tags);
833}
834
835#[cfg(test)]
836mod tests {
837    use datadog_protos::traces::AgentPayload;
838    use protobuf::Message as _;
839    use saluki_config::ConfigurationLoader;
840    use saluki_core::data_model::event::trace::{Span as DdSpan, Trace};
841    use stringtheory::MetaString;
842
843    use super::*;
844    use crate::common::datadog::apm::ApmConfig;
845    use crate::common::otlp::config::TracesConfig;
846    use crate::config::{DatadogRemapper, KEY_ALIASES};
847
848    async fn make_encoder(ets_enabled: bool) -> TraceEndpointEncoder {
849        let env_vars: Vec<(String, String)> = if ets_enabled {
850            vec![("APM_ERROR_TRACKING_STANDALONE_ENABLED".to_string(), "true".to_string())]
851        } else {
852            vec![]
853        };
854        let (cfg, _) = ConfigurationLoader::for_tests_with_provider_factory(
855            None,
856            Some(&env_vars),
857            false,
858            KEY_ALIASES,
859            DatadogRemapper::new,
860        )
861        .await;
862        let apm_config = ApmConfig::from_configuration(&cfg).expect("ApmConfig should deserialize");
863        TraceEndpointEncoder::new(
864            MetaString::from("test-host"),
865            "0.0.0".to_string(),
866            "none".to_string(),
867            apm_config,
868            TracesConfig::default(),
869        )
870    }
871
872    fn make_trace() -> Trace {
873        let span = DdSpan::new(
874            MetaString::from("svc"),
875            MetaString::from("op"),
876            MetaString::from("res"),
877            MetaString::from("web"),
878            1,    // span_id
879            0,    // parent_id
880            0,    // start
881            1000, // duration
882            0,    // error
883        );
884        let mut trace = Trace::new(vec![span]);
885        trace.priority = Some(1);
886        trace
887    }
888
889    fn make_error_trace() -> Trace {
890        let span = DdSpan::new(
891            MetaString::from("svc"),
892            MetaString::from("op"),
893            MetaString::from("res"),
894            MetaString::from("web"),
895            1,    // span_id
896            0,    // parent_id
897            0,    // start
898            1000, // duration
899            1,    // error
900        );
901        let mut trace = Trace::new(vec![span]);
902        trace.priority = Some(1);
903        trace
904    }
905
906    #[tokio::test]
907    async fn ets_header_present_when_enabled() {
908        let encoder = make_encoder(true).await;
909        let headers = encoder.additional_headers();
910        assert_eq!(headers.len(), 1);
911        assert_eq!(headers[0].0.as_str(), "x-datadog-error-tracking-standalone");
912        assert_eq!(headers[0].1, "true");
913    }
914
915    #[tokio::test]
916    async fn ets_header_absent_when_disabled() {
917        let encoder = make_encoder(false).await;
918        assert!(encoder.additional_headers().is_empty());
919    }
920
921    #[tokio::test]
922    async fn ets_chunk_tag_present_for_error_trace() {
923        let mut encoder = make_encoder(true).await;
924        let trace = make_error_trace();
925        let mut buf = Vec::new();
926        encoder.encode(&trace, &mut buf).expect("encode should succeed");
927        let payload = AgentPayload::parse_from_bytes(&buf).expect("should parse AgentPayload");
928        let tag_value = payload
929            .tracerPayloads
930            .iter()
931            .flat_map(|tp| tp.chunks.iter())
932            .find_map(|chunk| {
933                chunk
934                    .tags
935                    .get("_dd.error_tracking_standalone.error")
936                    .map(|v| v.as_str())
937            });
938        assert_eq!(
939            tag_value,
940            Some("true"),
941            "ETS chunk tag should be present for error traces when ETS is enabled"
942        );
943    }
944
945    #[tokio::test]
946    async fn ets_chunk_tag_absent_for_non_error_trace() {
947        let mut encoder = make_encoder(true).await;
948        let trace = make_trace(); // no error
949        let mut buf = Vec::new();
950        encoder.encode(&trace, &mut buf).expect("encode should succeed");
951        let payload = AgentPayload::parse_from_bytes(&buf).expect("should parse AgentPayload");
952        let has_tag = payload
953            .tracerPayloads
954            .iter()
955            .flat_map(|tp| tp.chunks.iter())
956            .any(|chunk| chunk.tags.contains_key("_dd.error_tracking_standalone.error"));
957        assert!(!has_tag, "ETS chunk tag should be absent for non-error traces");
958    }
959
960    #[tokio::test]
961    async fn ets_chunk_tag_absent_when_disabled() {
962        let mut encoder = make_encoder(false).await;
963        let trace = make_trace();
964        let mut buf = Vec::new();
965        encoder.encode(&trace, &mut buf).expect("encode should succeed");
966        let payload = AgentPayload::parse_from_bytes(&buf).expect("should parse AgentPayload");
967        let has_tag = payload
968            .tracerPayloads
969            .iter()
970            .flat_map(|tp| tp.chunks.iter())
971            .any(|chunk| chunk.tags.contains_key("_dd.error_tracking_standalone.error"));
972        assert!(!has_tag, "ETS chunk tag should be absent when ETS is disabled");
973    }
974}
975
976#[cfg(test)]
977mod config_smoke {
978    use serde_json::json;
979
980    use super::DatadogTraceConfiguration;
981    use crate::config_registry::structs;
982    use crate::config_registry::test_support::run_config_smoke_tests;
983
984    #[tokio::test]
985    async fn smoke_test() {
986        run_config_smoke_tests(structs::DATADOG_TRACE_CONFIGURATION, &[], json!({}), |cfg| {
987            cfg.as_typed::<DatadogTraceConfiguration>()
988                .expect("DatadogTraceConfiguration should deserialize")
989        })
990        .await
991    }
992}