Skip to main content

saluki_components/encoders/datadog/traces/
mod.rs

1#![allow(dead_code)]
2
3use std::time::Duration;
4
5use async_trait::async_trait;
6use datadog_protos::traces::builders::{
7    attribute_any_value::AttributeAnyValueType, attribute_array_value::AttributeArrayValueType, AgentPayloadBuilder,
8    AttributeAnyValueBuilder, AttributeArrayValueBuilder,
9};
10use http::{uri::PathAndQuery, HeaderValue, Method, Uri};
11use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
12use opentelemetry_semantic_conventions::resource::{
13    CONTAINER_ID, DEPLOYMENT_ENVIRONMENT_NAME, K8S_POD_UID, SERVICE_VERSION,
14};
15use piecemeal::{ScratchBuffer, ScratchWriter};
16use saluki_common::task::HandleExt as _;
17use saluki_config::GenericConfiguration;
18use saluki_context::tags::{SharedTagSet, TagSet};
19use saluki_core::data_model::event::trace::{AttributeScalarValue, AttributeValue, Span as DdSpan};
20use saluki_core::topology::{EventsBuffer, PayloadsBuffer};
21use saluki_core::{
22    components::{encoders::*, ComponentContext},
23    data_model::{
24        event::{trace::Trace, EventType},
25        payload::{HttpPayload, Payload, PayloadMetadata, PayloadType},
26    },
27    observability::ComponentMetricsExt as _,
28};
29use saluki_env::host::providers::BoxedHostProvider;
30use saluki_env::{EnvironmentProvider, HostProvider};
31use saluki_error::generic_error;
32use saluki_error::{ErrorContext as _, GenericError};
33use saluki_io::compression::CompressionScheme;
34use saluki_metrics::MetricsBuilder;
35use serde::Deserialize;
36use stringtheory::MetaString;
37use tokio::{
38    select,
39    sync::mpsc::{self, Receiver, Sender},
40    time::sleep,
41};
42use tracing::{debug, error};
43
44use crate::common::datadog::{
45    apm::ApmConfig,
46    io::RB_BUFFER_CHUNK_SIZE,
47    request_builder::{EndpointEncoder, RequestBuilder},
48    telemetry::ComponentTelemetry,
49    DEFAULT_INTAKE_COMPRESSED_SIZE_LIMIT, DEFAULT_INTAKE_UNCOMPRESSED_SIZE_LIMIT, TAG_DECISION_MAKER,
50};
51use crate::common::otlp::config::TracesConfig;
52use crate::common::otlp::util::{
53    extract_container_tags_from_resource_tagset, tags_to_source, Source as OtlpSource, SourceKind as OtlpSourceKind,
54    DEPLOYMENT_ENVIRONMENT_KEY, KEY_DATADOG_CONTAINER_ID, KEY_DATADOG_CONTAINER_TAGS, KEY_DATADOG_ENVIRONMENT,
55    KEY_DATADOG_HOST, KEY_DATADOG_VERSION,
56};
57
58const CONTAINER_TAGS_META_KEY: &str = "_dd.tags.container";
59const MAX_TRACES_PER_PAYLOAD: usize = 10000;
60static CONTENT_TYPE_PROTOBUF: HeaderValue = HeaderValue::from_static("application/x-protobuf");
61
62// Sampling metadata keys / values.
63const TAG_OTLP_SAMPLING_RATE: &str = "_dd.otlp_sr";
64const DEFAULT_CHUNK_PRIORITY: i32 = 1; // PRIORITY_AUTO_KEEP
65
66fn default_serializer_compressor_kind() -> String {
67    "zstd".to_string()
68}
69
70const fn default_zstd_compressor_level() -> i32 {
71    3
72}
73
74const fn default_flush_timeout_secs() -> u64 {
75    2
76}
77
78fn default_env() -> String {
79    "none".to_string()
80}
81
82/// Configuration for the Datadog Traces encoder.
83///
84/// This encoder converts trace events into Datadog's TracerPayload protobuf format and sends them
85/// to the Datadog traces intake endpoint (`/api/v0.2/traces`). It handles batching, compression,
86/// and enrichment with metadata such as hostname, environment, and container tags.
87#[derive(Deserialize)]
88pub struct DatadogTraceConfiguration {
89    #[serde(
90        rename = "serializer_compressor_kind",  // renames the field in the user_configuration from "serializer_compressor_kind" to "compressor_kind".
91        default = "default_serializer_compressor_kind"
92    )]
93    compressor_kind: String,
94
95    #[serde(
96        rename = "serializer_zstd_compressor_level",
97        default = "default_zstd_compressor_level"
98    )]
99    zstd_compressor_level: i32,
100
101    /// Flush timeout for pending requests, in seconds.
102    ///
103    /// When the encoder has written traces to the in-flight request payload, but it has not yet reached the
104    /// payload size limits that would force the payload to be flushed, the encoder will wait for a period of time
105    /// before flushing the in-flight request payload.
106    ///
107    /// Defaults to 2 seconds.
108    #[serde(default = "default_flush_timeout_secs")]
109    flush_timeout_secs: u64,
110
111    #[serde(skip)]
112    default_hostname: Option<String>,
113
114    #[serde(skip)]
115    version: String,
116
117    #[serde(skip)]
118    apm_config: ApmConfig,
119
120    #[serde(skip)]
121    otlp_traces: TracesConfig,
122
123    #[serde(default = "default_env")]
124    env: String,
125}
126
127impl DatadogTraceConfiguration {
128    /// Creates a new `DatadogTraceConfiguration` from the given configuration.
129    pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
130        let mut trace_config: Self = config.as_typed()?;
131
132        let app_details = saluki_metadata::get_app_details();
133        trace_config.version = format!("agent-data-plane/{}", app_details.version().raw());
134
135        trace_config.apm_config = ApmConfig::from_configuration(config)?;
136        trace_config.otlp_traces = config.try_get_typed("otlp_config.traces")?.unwrap_or_default();
137
138        Ok(trace_config)
139    }
140}
141
142impl DatadogTraceConfiguration {
143    /// Sets the default_hostname using the environment provider
144    pub async fn with_environment_provider<E>(mut self, environment_provider: E) -> Result<Self, GenericError>
145    where
146        E: EnvironmentProvider<Host = BoxedHostProvider>,
147    {
148        let host_provider = environment_provider.host();
149        let hostname = host_provider.get_hostname().await?;
150        self.default_hostname = Some(hostname);
151        Ok(self)
152    }
153}
154
155#[async_trait]
156impl EncoderBuilder for DatadogTraceConfiguration {
157    fn input_event_type(&self) -> EventType {
158        EventType::Trace
159    }
160
161    fn output_payload_type(&self) -> PayloadType {
162        PayloadType::Http
163    }
164
165    async fn build(&self, context: ComponentContext) -> Result<Box<dyn Encoder + Send>, GenericError> {
166        let metrics_builder = MetricsBuilder::from_component_context(&context);
167        let telemetry = ComponentTelemetry::from_builder(&metrics_builder);
168        let compression_scheme = CompressionScheme::new(&self.compressor_kind, self.zstd_compressor_level);
169
170        let default_hostname = self.default_hostname.clone().unwrap_or_default();
171        let default_hostname = MetaString::from(default_hostname);
172
173        // Create request builder for traces which is used to generate HTTP requests.
174
175        let mut trace_rb = RequestBuilder::new(
176            TraceEndpointEncoder::new(
177                default_hostname,
178                self.version.clone(),
179                self.env.clone(),
180                self.apm_config.clone(),
181                self.otlp_traces.clone(),
182            ),
183            compression_scheme,
184            RB_BUFFER_CHUNK_SIZE,
185        )
186        .await?;
187        trace_rb.with_max_inputs_per_payload(MAX_TRACES_PER_PAYLOAD);
188
189        let flush_timeout = match self.flush_timeout_secs {
190            // We always give ourselves a minimum flush timeout of 10ms to allow for some very minimal amount of
191            // batching, while still practically flushing things almost immediately.
192            0 => Duration::from_millis(10),
193            secs => Duration::from_secs(secs),
194        };
195
196        Ok(Box::new(DatadogTrace {
197            trace_rb,
198            telemetry,
199            flush_timeout,
200        }))
201    }
202}
203
204impl MemoryBounds for DatadogTraceConfiguration {
205    fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
206        // TODO: How do we properly represent the requests we can generate that may be sitting around in-flight?
207        builder
208            .minimum()
209            .with_single_value::<DatadogTrace>("component struct")
210            .with_array::<EventsBuffer>("request builder events channel", 8)
211            .with_array::<PayloadsBuffer>("request builder payloads channel", 8);
212
213        builder
214            .firm()
215            .with_array::<Trace>("traces split re-encode buffer", MAX_TRACES_PER_PAYLOAD);
216    }
217}
218
219pub struct DatadogTrace {
220    trace_rb: RequestBuilder<TraceEndpointEncoder>,
221    telemetry: ComponentTelemetry,
222    flush_timeout: Duration,
223}
224
225// Encodes Trace events to TracerPayloads.
226#[async_trait]
227impl Encoder for DatadogTrace {
228    async fn run(mut self: Box<Self>, mut context: EncoderContext) -> Result<(), GenericError> {
229        let Self {
230            trace_rb,
231            telemetry,
232            flush_timeout,
233        } = *self;
234
235        let mut health = context.take_health_handle();
236
237        // The encoder runs two async loops, the main encoder loop and the request builder loop,
238        // this channel is used to send events from the main encoder loop to the request builder loop safely.
239        let (events_tx, events_rx) = mpsc::channel(8);
240        // adds a channel to send payloads to the dispatcher and a channel to receive them.
241        let (payloads_tx, mut payloads_rx) = mpsc::channel(8);
242        let request_builder_fut = run_request_builder(trace_rb, telemetry, events_rx, payloads_tx, flush_timeout);
243        // Spawn the request builder task on the global thread pool, this task is responsible for encoding traces and flushing requests.
244        let request_builder_handle = context
245            .topology_context()
246            .global_thread_pool() // Use the shared Tokio runtime thread pool.
247            .spawn_traced_named("dd-traces-request-builder", request_builder_fut);
248
249        health.mark_ready();
250        debug!("Datadog Trace encoder started.");
251
252        loop {
253            select! {
254                biased; // makes the branches of the select statement be evaluated in order.
255
256                _ = health.live() => continue,
257                maybe_payload = payloads_rx.recv() => match maybe_payload {
258                    Some(payload) => {
259                        // Dispatch an HTTP payload to the dispatcher.
260                        if let Err(e) = context.dispatcher().dispatch(payload).await {
261                            error!("Failed to dispatch payload: {}", e);
262                        }
263                    }
264                    None => break,
265                },
266                maybe_event_buffer = context.events().next() => match maybe_event_buffer {
267                    Some(event_buffer) => events_tx.send(event_buffer).await
268                        .error_context("Failed to send event buffer to request builder task.")?,
269                    None => break,
270                },
271            }
272        }
273
274        // Drop the events sender, which signals the request builder task to stop.
275        drop(events_tx);
276
277        // Continue draining the payloads receiver until it is closed.
278        while let Some(payload) = payloads_rx.recv().await {
279            if let Err(e) = context.dispatcher().dispatch(payload).await {
280                error!("Failed to dispatch payload: {}", e);
281            }
282        }
283
284        // Request build task should now be stopped.
285        match request_builder_handle.await {
286            Ok(Ok(())) => debug!("Request builder task stopped."),
287            Ok(Err(e)) => error!(error = %e, "Request builder task failed."),
288            Err(e) => error!(error = %e, "Request builder task panicked."),
289        }
290
291        debug!("Datadog Trace encoder stopped.");
292
293        Ok(())
294    }
295}
296
297async fn run_request_builder(
298    mut trace_request_builder: RequestBuilder<TraceEndpointEncoder>, telemetry: ComponentTelemetry,
299    mut events_rx: Receiver<EventsBuffer>, payloads_tx: Sender<PayloadsBuffer>, flush_timeout: std::time::Duration,
300) -> Result<(), GenericError> {
301    let mut pending_flush = false;
302    let pending_flush_timeout = sleep(flush_timeout);
303    tokio::pin!(pending_flush_timeout);
304
305    loop {
306        select! {
307            Some(event_buffer) = events_rx.recv() => {
308                for event in event_buffer {
309                    let trace = match event.try_into_trace() {
310                        Some(trace) => trace,
311                        None => continue,
312                    };
313                    // Encode the trace. If we get it back, that means the current request is full, and we need to
314                    // flush it before we can try to encode the trace again.
315                    let trace_to_retry = match trace_request_builder.encode(trace).await {
316                        Ok(None) => continue,
317                        Ok(Some(trace)) => trace,
318                        Err(e) => {
319                            error!(error = %e, "Failed to encode trace.");
320                            telemetry.events_dropped_encoder().increment(1);
321                            continue;
322                        }
323                    };
324
325                    let maybe_requests = trace_request_builder.flush().await;
326                    if maybe_requests.is_empty() {
327                        panic!("builder told us to flush, but gave us nothing");
328                    }
329
330                    for maybe_request in maybe_requests {
331                        match maybe_request {
332                            Ok((events, request)) => {
333                                let payload_meta = PayloadMetadata::from_event_count(events);
334                                let http_payload = HttpPayload::new(payload_meta, request);
335                                let payload = Payload::Http(http_payload);
336
337                                payloads_tx.send(payload).await
338                                    .map_err(|_| generic_error!("Failed to send payload to encoder."))?;
339                            },
340                            Err(e) => if e.is_recoverable() {
341                                // If the error is recoverable, we'll hold on to the trace to retry it later.
342                                continue;
343                            } else {
344                                return Err(GenericError::from(e).context("Failed to flush request."));
345                            }
346                        }
347                    }
348
349                    // Now try to encode the trace again.
350                    if let Err(e) = trace_request_builder.encode(trace_to_retry).await {
351                        error!(error = %e, "Failed to encode trace.");
352                        telemetry.events_dropped_encoder().increment(1);
353                    }
354                }
355
356                debug!("Processed event buffer.");
357
358                // If we're not already pending a flush, we'll start the countdown.
359                if !pending_flush {
360                    pending_flush_timeout.as_mut().reset(tokio::time::Instant::now() + flush_timeout);
361                    pending_flush = true;
362                }
363            },
364            _ = &mut pending_flush_timeout, if pending_flush => {
365                debug!("Flushing pending request(s).");
366
367                pending_flush = false;
368
369                // Once we've encoded and written all traces, we flush the request builders to generate a request with
370                // anything left over. Again, we'll enqueue those requests to be sent immediately.
371                let maybe_trace_requests = trace_request_builder.flush().await;
372                for maybe_request in maybe_trace_requests {
373                    match maybe_request {
374                        Ok((events, request)) => {
375                            let payload_meta = PayloadMetadata::from_event_count(events);
376                            let http_payload = HttpPayload::new(payload_meta, request);
377                            let payload = Payload::Http(http_payload);
378
379                            payloads_tx.send(payload).await
380                                .map_err(|_| generic_error!("Failed to send payload to encoder."))?;
381                        },
382                        Err(e) => if e.is_recoverable() {
383                            continue;
384                        } else {
385                            return Err(GenericError::from(e).context("Failed to flush request."));
386                        }
387                    }
388                }
389
390                debug!("All flushed requests sent to I/O task. Waiting for next event buffer...");
391            },
392
393            // Event buffers channel has been closed, and we have no pending flushing, so we're all done.
394            else => break,
395        }
396    }
397
398    Ok(())
399}
400
401#[derive(Debug)]
402struct TraceEndpointEncoder {
403    scratch: ScratchWriter<Vec<u8>>,
404    default_hostname: MetaString,
405    agent_hostname: String,
406    version: String,
407    env: String,
408    apm_config: ApmConfig,
409    otlp_traces: TracesConfig,
410}
411
412impl TraceEndpointEncoder {
413    fn new(
414        default_hostname: MetaString, version: String, env: String, apm_config: ApmConfig, otlp_traces: TracesConfig,
415    ) -> Self {
416        Self {
417            scratch: ScratchWriter::new(Vec::with_capacity(8192)),
418            agent_hostname: default_hostname.as_ref().to_string(),
419            default_hostname,
420            version,
421            env,
422            apm_config,
423            otlp_traces,
424        }
425    }
426
427    fn encode_tracer_payload(&mut self, trace: &Trace, output_buffer: &mut Vec<u8>) -> std::io::Result<()> {
428        let sampling_rate = self.sampling_rate();
429        let resource_tags = trace.resource_tags();
430        let first_span = trace.spans().first();
431        let source = tags_to_source(resource_tags);
432
433        // Resolve metadata from resource tags.
434        let container_id = resolve_container_id(resource_tags, first_span);
435        let lang = get_resource_tag_value(resource_tags, "telemetry.sdk.language");
436        let sdk_version = get_resource_tag_value(resource_tags, "telemetry.sdk.version").unwrap_or("");
437        let tracer_version = format!("otlp-{}", sdk_version);
438        let container_tags = resolve_container_tags(
439            resource_tags,
440            source.as_ref(),
441            self.otlp_traces.ignore_missing_datadog_fields,
442        );
443        let env = resolve_env(resource_tags, self.otlp_traces.ignore_missing_datadog_fields);
444        let hostname = resolve_hostname(
445            resource_tags,
446            source.as_ref(),
447            Some(self.default_hostname.as_ref()),
448            self.otlp_traces.ignore_missing_datadog_fields,
449        );
450        let app_version = resolve_app_version(resource_tags);
451
452        // Resolve sampling metadata.
453        let (priority, dropped_trace, decision_maker, otlp_sr) = match trace.sampling() {
454            Some(sampling) => (
455                sampling.priority.unwrap_or(DEFAULT_CHUNK_PRIORITY),
456                sampling.dropped_trace,
457                sampling.decision_maker.as_deref(),
458                sampling
459                    .otlp_sampling_rate
460                    .as_ref()
461                    .map(|sr| sr.to_string())
462                    .unwrap_or_else(|| format!("{:.2}", sampling_rate)),
463            ),
464            None => (DEFAULT_CHUNK_PRIORITY, false, None, format!("{:.2}", sampling_rate)),
465        };
466
467        // Now incrementally build the payload.
468        let mut ap_builder = AgentPayloadBuilder::new(&mut self.scratch);
469
470        ap_builder
471            .host_name(&self.agent_hostname)?
472            .env(&self.env)?
473            .agent_version(&self.version)?
474            .target_tps(self.apm_config.target_traces_per_second())?
475            .error_tps(self.apm_config.errors_per_second())?;
476
477        ap_builder.add_tracer_payloads(|tp| {
478            if let Some(cid) = container_id {
479                tp.container_id(cid)?;
480            }
481            if let Some(l) = lang {
482                tp.language_name(l)?;
483            }
484            tp.tracer_version(&tracer_version)?;
485
486            // Encode the single TraceChunk containing all spans.
487            tp.add_chunks(|chunk| {
488                chunk.priority(priority)?;
489
490                for span in trace.spans() {
491                    chunk.add_spans(|s| {
492                        s.service(span.service())?
493                            .name(span.name())?
494                            .resource(span.resource())?
495                            .trace_id(span.trace_id())?
496                            .span_id(span.span_id())?
497                            .parent_id(span.parent_id())?
498                            .start(span.start() as i64)?
499                            .duration(span.duration() as i64)?
500                            .error(span.error())?;
501
502                        {
503                            let mut meta = s.meta();
504                            for (k, v) in span.meta() {
505                                meta.write_entry(k.as_ref(), v.as_ref())?;
506                            }
507                        }
508
509                        {
510                            let mut metrics = s.metrics();
511                            for (k, v) in span.metrics() {
512                                metrics.write_entry(k.as_ref(), *v)?;
513                            }
514                        }
515
516                        s.type_(span.span_type())?;
517
518                        {
519                            let mut ms = s.meta_struct();
520                            for (k, v) in span.meta_struct() {
521                                ms.write_entry(k.as_ref(), v.as_slice())?;
522                            }
523                        }
524
525                        for link in span.span_links() {
526                            s.add_span_links(|sl| {
527                                sl.trace_id(link.trace_id())?
528                                    .trace_id_high(link.trace_id_high())?
529                                    .span_id(link.span_id())?;
530                                {
531                                    let mut attrs = sl.attributes();
532                                    for (k, v) in link.attributes() {
533                                        attrs.write_entry(&**k, &**v)?;
534                                    }
535                                }
536                                let tracestate = link.tracestate().to_string();
537                                sl.tracestate(tracestate.as_str())?.flags(link.flags())?;
538                                Ok(())
539                            })?;
540                        }
541
542                        for event in span.span_events() {
543                            s.add_span_events(|se| {
544                                se.time_unix_nano(event.time_unix_nano())?.name(event.name())?;
545                                {
546                                    let mut attrs = se.attributes();
547                                    for (k, v) in event.attributes() {
548                                        attrs.write_entry(&**k, |av| encode_attribute_value(av, v))?;
549                                    }
550                                }
551                                Ok(())
552                            })?;
553                        }
554
555                        Ok(())
556                    })?;
557                }
558
559                // Chunk tags.
560                {
561                    let mut tags = chunk.tags();
562                    if let Some(dm) = decision_maker {
563                        tags.write_entry(TAG_DECISION_MAKER, dm)?;
564                    }
565                    tags.write_entry(TAG_OTLP_SAMPLING_RATE, otlp_sr.as_str())?;
566                }
567
568                if dropped_trace {
569                    chunk.dropped_trace(true)?;
570                }
571
572                Ok(())
573            })?;
574
575            // Tracer payload tags.
576            if let Some(ct) = container_tags {
577                let mut tags = tp.tags();
578                tags.write_entry(CONTAINER_TAGS_META_KEY, &*ct)?;
579            }
580
581            if let Some(e) = env {
582                tp.env(e)?;
583            }
584            if let Some(h) = hostname {
585                tp.hostname(h)?;
586            }
587            if let Some(av) = app_version {
588                tp.app_version(av)?;
589            }
590
591            Ok(())
592        })?;
593
594        ap_builder.finish(output_buffer)?;
595
596        Ok(())
597    }
598
599    fn sampling_rate(&self) -> f64 {
600        let rate = self.otlp_traces.probabilistic_sampler.sampling_percentage / 100.0;
601        if rate <= 0.0 || rate >= 1.0 {
602            return 1.0;
603        }
604        rate
605    }
606}
607
608impl EndpointEncoder for TraceEndpointEncoder {
609    type Input = Trace;
610    type EncodeError = std::io::Error;
611    fn encoder_name() -> &'static str {
612        "traces"
613    }
614
615    fn compressed_size_limit(&self) -> usize {
616        DEFAULT_INTAKE_COMPRESSED_SIZE_LIMIT
617    }
618
619    fn uncompressed_size_limit(&self) -> usize {
620        DEFAULT_INTAKE_UNCOMPRESSED_SIZE_LIMIT
621    }
622
623    fn encode(&mut self, trace: &Self::Input, buffer: &mut Vec<u8>) -> Result<(), Self::EncodeError> {
624        self.encode_tracer_payload(trace, buffer)
625    }
626
627    fn endpoint_uri(&self) -> Uri {
628        PathAndQuery::from_static("/api/v0.2/traces").into()
629    }
630
631    fn endpoint_method(&self) -> Method {
632        Method::POST
633    }
634
635    fn content_type(&self) -> HeaderValue {
636        CONTENT_TYPE_PROTOBUF.clone()
637    }
638}
639
640fn encode_attribute_value<S: ScratchBuffer>(
641    builder: &mut AttributeAnyValueBuilder<'_, S>, value: &AttributeValue,
642) -> std::io::Result<()> {
643    match value {
644        AttributeValue::String(v) => {
645            builder.type_(AttributeAnyValueType::STRING_VALUE)?.string_value(v)?;
646        }
647        AttributeValue::Bool(v) => {
648            builder.type_(AttributeAnyValueType::BOOL_VALUE)?.bool_value(*v)?;
649        }
650        AttributeValue::Int(v) => {
651            builder.type_(AttributeAnyValueType::INT_VALUE)?.int_value(*v)?;
652        }
653        AttributeValue::Double(v) => {
654            builder.type_(AttributeAnyValueType::DOUBLE_VALUE)?.double_value(*v)?;
655        }
656        AttributeValue::Array(values) => {
657            builder.type_(AttributeAnyValueType::ARRAY_VALUE)?.array_value(|arr| {
658                for val in values {
659                    arr.add_values(|av| encode_attribute_array_value(av, val))?;
660                }
661                Ok(())
662            })?;
663        }
664    }
665    Ok(())
666}
667
668fn encode_attribute_array_value<S: ScratchBuffer>(
669    builder: &mut AttributeArrayValueBuilder<'_, S>, value: &AttributeScalarValue,
670) -> std::io::Result<()> {
671    match value {
672        AttributeScalarValue::String(v) => {
673            builder.type_(AttributeArrayValueType::STRING_VALUE)?.string_value(v)?;
674        }
675        AttributeScalarValue::Bool(v) => {
676            builder.type_(AttributeArrayValueType::BOOL_VALUE)?.bool_value(*v)?;
677        }
678        AttributeScalarValue::Int(v) => {
679            builder.type_(AttributeArrayValueType::INT_VALUE)?.int_value(*v)?;
680        }
681        AttributeScalarValue::Double(v) => {
682            builder.type_(AttributeArrayValueType::DOUBLE_VALUE)?.double_value(*v)?;
683        }
684    }
685    Ok(())
686}
687
688fn get_resource_tag_value<'a>(resource_tags: &'a SharedTagSet, key: &str) -> Option<&'a str> {
689    resource_tags.get_single_tag(key).and_then(|t| t.value())
690}
691
692fn resolve_hostname<'a>(
693    resource_tags: &'a SharedTagSet, source: Option<&'a OtlpSource>, default_hostname: Option<&'a str>,
694    ignore_missing_fields: bool,
695) -> Option<&'a str> {
696    let mut hostname = match source {
697        Some(src) => match src.kind {
698            OtlpSourceKind::HostnameKind => Some(src.identifier.as_str()),
699            _ => Some(""),
700        },
701        None => default_hostname,
702    };
703
704    if ignore_missing_fields {
705        hostname = Some("");
706    }
707
708    if let Some(value) = get_resource_tag_value(resource_tags, KEY_DATADOG_HOST) {
709        hostname = Some(value);
710    }
711
712    hostname
713}
714
715fn resolve_env(resource_tags: &SharedTagSet, ignore_missing_fields: bool) -> Option<&str> {
716    if let Some(value) = get_resource_tag_value(resource_tags, KEY_DATADOG_ENVIRONMENT) {
717        return Some(value);
718    }
719    if ignore_missing_fields {
720        return None;
721    }
722    if let Some(value) = get_resource_tag_value(resource_tags, DEPLOYMENT_ENVIRONMENT_NAME) {
723        return Some(value);
724    }
725    get_resource_tag_value(resource_tags, DEPLOYMENT_ENVIRONMENT_KEY)
726}
727
728fn resolve_container_id<'a>(resource_tags: &'a SharedTagSet, first_span: Option<&'a DdSpan>) -> Option<&'a str> {
729    for key in [KEY_DATADOG_CONTAINER_ID, CONTAINER_ID, K8S_POD_UID] {
730        if let Some(value) = get_resource_tag_value(resource_tags, key) {
731            return Some(value);
732        }
733    }
734    // TODO: add container id fallback equivalent to cidProvider
735    // https://github.com/DataDog/datadog-agent/blob/main/pkg/trace/api/otlp.go#L414
736    if let Some(span) = first_span {
737        for (k, v) in span.meta() {
738            if k == KEY_DATADOG_CONTAINER_ID || k == K8S_POD_UID {
739                return Some(v.as_ref());
740            }
741        }
742    }
743    None
744}
745
746fn resolve_app_version(resource_tags: &SharedTagSet) -> Option<&str> {
747    if let Some(value) = get_resource_tag_value(resource_tags, KEY_DATADOG_VERSION) {
748        return Some(value);
749    }
750    get_resource_tag_value(resource_tags, SERVICE_VERSION)
751}
752
753fn resolve_container_tags(
754    resource_tags: &SharedTagSet, source: Option<&OtlpSource>, ignore_missing_fields: bool,
755) -> Option<MetaString> {
756    // TODO: some refactoring is probably needed to normalize this function, the tags should already be normalized
757    // since we do so when we transform OTLP spans to DD spans however to make this class extensible for non otlp traces, we would
758    // need to normalize the tags here.
759    if let Some(tags) = get_resource_tag_value(resource_tags, KEY_DATADOG_CONTAINER_TAGS) {
760        if !tags.is_empty() {
761            return Some(MetaString::from(tags));
762        }
763    }
764
765    if ignore_missing_fields {
766        return None;
767    }
768    let mut container_tags = TagSet::default();
769    extract_container_tags_from_resource_tagset(resource_tags, &mut container_tags);
770    let is_fargate_source = source.is_some_and(|src| src.kind == OtlpSourceKind::AwsEcsFargateKind);
771    if container_tags.is_empty() && !is_fargate_source {
772        return None;
773    }
774
775    let mut flattened = flatten_container_tag(container_tags);
776    if is_fargate_source {
777        if let Some(src) = source {
778            append_tags(&mut flattened, &src.tag());
779        }
780    }
781
782    if flattened.is_empty() {
783        None
784    } else {
785        Some(MetaString::from(flattened))
786    }
787}
788
789fn flatten_container_tag(tags: TagSet) -> String {
790    let mut flattened = String::new();
791    for tag in tags {
792        if !flattened.is_empty() {
793            flattened.push(',');
794        }
795        flattened.push_str(tag.as_str());
796    }
797    flattened
798}
799
800fn append_tags(target: &mut String, tags: &str) {
801    if tags.is_empty() {
802        return;
803    }
804    if !target.is_empty() {
805        target.push(',');
806    }
807    target.push_str(tags);
808}