Skip to main content

saluki_components/encoders/datadog/traces/
mod.rs

1#![allow(dead_code)]
2
3use std::{fmt::Write, time::Duration};
4
5use async_trait::async_trait;
6use datadog_protos::traces::builders::{
7    attribute_any_value::AttributeAnyValueType, attribute_array_value::AttributeArrayValueType, AgentPayloadBuilder,
8    AttributeAnyValueBuilder, AttributeArrayValueBuilder,
9};
10use http::{uri::PathAndQuery, HeaderValue, Method, Uri};
11use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
12use opentelemetry_semantic_conventions::resource::{
13    CONTAINER_ID, DEPLOYMENT_ENVIRONMENT_NAME, K8S_POD_UID, SERVICE_VERSION,
14};
15use piecemeal::{ScratchBuffer, ScratchWriter};
16use saluki_common::strings::StringBuilder;
17use saluki_common::task::HandleExt as _;
18use saluki_config::GenericConfiguration;
19use saluki_context::tags::{SharedTagSet, TagSet};
20use saluki_core::data_model::event::trace::{AttributeScalarValue, AttributeValue, Span as DdSpan};
21use saluki_core::topology::{EventsBuffer, PayloadsBuffer};
22use saluki_core::{
23    components::{encoders::*, ComponentContext},
24    data_model::{
25        event::{trace::Trace, EventType},
26        payload::{HttpPayload, Payload, PayloadMetadata, PayloadType},
27    },
28    observability::ComponentMetricsExt as _,
29};
30use saluki_env::host::providers::BoxedHostProvider;
31use saluki_env::{EnvironmentProvider, HostProvider};
32use saluki_error::generic_error;
33use saluki_error::{ErrorContext as _, GenericError};
34use saluki_io::compression::CompressionScheme;
35use saluki_metrics::MetricsBuilder;
36use serde::Deserialize;
37use stringtheory::MetaString;
38use tokio::{
39    select,
40    sync::mpsc::{self, Receiver, Sender},
41    time::sleep,
42};
43use tracing::{debug, error};
44
45use crate::common::datadog::{
46    apm::ApmConfig,
47    io::RB_BUFFER_CHUNK_SIZE,
48    request_builder::{EndpointEncoder, RequestBuilder},
49    telemetry::ComponentTelemetry,
50    DEFAULT_INTAKE_COMPRESSED_SIZE_LIMIT, DEFAULT_INTAKE_UNCOMPRESSED_SIZE_LIMIT, TAG_DECISION_MAKER,
51};
52use crate::common::otlp::config::TracesConfig;
53use crate::common::otlp::util::{
54    extract_container_tags_from_resource_tagset, tags_to_source, Source as OtlpSource, SourceKind as OtlpSourceKind,
55    DEPLOYMENT_ENVIRONMENT_KEY, KEY_DATADOG_CONTAINER_ID, KEY_DATADOG_CONTAINER_TAGS, KEY_DATADOG_ENVIRONMENT,
56    KEY_DATADOG_HOST, KEY_DATADOG_VERSION,
57};
58
59const CONTAINER_TAGS_META_KEY: &str = "_dd.tags.container";
60const MAX_TRACES_PER_PAYLOAD: usize = 10000;
61static CONTENT_TYPE_PROTOBUF: HeaderValue = HeaderValue::from_static("application/x-protobuf");
62
63// Sampling metadata keys / values.
64const TAG_OTLP_SAMPLING_RATE: &str = "_dd.otlp_sr";
65const DEFAULT_CHUNK_PRIORITY: i32 = 1; // PRIORITY_AUTO_KEEP
66
67fn default_serializer_compressor_kind() -> String {
68    "zstd".to_string()
69}
70
71const fn default_zstd_compressor_level() -> i32 {
72    3
73}
74
75const fn default_flush_timeout_secs() -> u64 {
76    2
77}
78
79fn default_env() -> String {
80    "none".to_string()
81}
82
83/// Configuration for the Datadog Traces encoder.
84///
85/// This encoder converts trace events into Datadog's TracerPayload protobuf format and sends them
86/// to the Datadog traces intake endpoint (`/api/v0.2/traces`). It handles batching, compression,
87/// and enrichment with metadata such as hostname, environment, and container tags.
88#[derive(Deserialize)]
89pub struct DatadogTraceConfiguration {
90    #[serde(
91        rename = "serializer_compressor_kind",  // renames the field in the user_configuration from "serializer_compressor_kind" to "compressor_kind".
92        default = "default_serializer_compressor_kind"
93    )]
94    compressor_kind: String,
95
96    #[serde(
97        rename = "serializer_zstd_compressor_level",
98        default = "default_zstd_compressor_level"
99    )]
100    zstd_compressor_level: i32,
101
102    /// Flush timeout for pending requests, in seconds.
103    ///
104    /// When the encoder has written traces to the in-flight request payload, but it has not yet reached the
105    /// payload size limits that would force the payload to be flushed, the encoder will wait for a period of time
106    /// before flushing the in-flight request payload.
107    ///
108    /// Defaults to 2 seconds.
109    #[serde(default = "default_flush_timeout_secs")]
110    flush_timeout_secs: u64,
111
112    #[serde(skip)]
113    default_hostname: Option<String>,
114
115    #[serde(skip)]
116    version: String,
117
118    #[serde(skip)]
119    apm_config: ApmConfig,
120
121    #[serde(skip)]
122    otlp_traces: TracesConfig,
123
124    #[serde(default = "default_env")]
125    env: String,
126}
127
128impl DatadogTraceConfiguration {
129    /// Creates a new `DatadogTraceConfiguration` from the given configuration.
130    pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
131        let mut trace_config: Self = config.as_typed()?;
132
133        let app_details = saluki_metadata::get_app_details();
134        trace_config.version = format!("agent-data-plane/{}", app_details.version().raw());
135
136        trace_config.apm_config = ApmConfig::from_configuration(config)?;
137        trace_config.otlp_traces = config.try_get_typed("otlp_config.traces")?.unwrap_or_default();
138
139        Ok(trace_config)
140    }
141}
142
143impl DatadogTraceConfiguration {
144    /// Sets the default_hostname using the environment provider
145    pub async fn with_environment_provider<E>(mut self, environment_provider: E) -> Result<Self, GenericError>
146    where
147        E: EnvironmentProvider<Host = BoxedHostProvider>,
148    {
149        let host_provider = environment_provider.host();
150        let hostname = host_provider.get_hostname().await?;
151        self.default_hostname = Some(hostname);
152        Ok(self)
153    }
154}
155
156#[async_trait]
157impl EncoderBuilder for DatadogTraceConfiguration {
158    fn input_event_type(&self) -> EventType {
159        EventType::Trace
160    }
161
162    fn output_payload_type(&self) -> PayloadType {
163        PayloadType::Http
164    }
165
166    async fn build(&self, context: ComponentContext) -> Result<Box<dyn Encoder + Send>, GenericError> {
167        let metrics_builder = MetricsBuilder::from_component_context(&context);
168        let telemetry = ComponentTelemetry::from_builder(&metrics_builder);
169        let compression_scheme = CompressionScheme::new(&self.compressor_kind, self.zstd_compressor_level);
170
171        let default_hostname = self.default_hostname.clone().unwrap_or_default();
172        let default_hostname = MetaString::from(default_hostname);
173
174        // Create request builder for traces which is used to generate HTTP requests.
175
176        let mut trace_rb = RequestBuilder::new(
177            TraceEndpointEncoder::new(
178                default_hostname,
179                self.version.clone(),
180                self.env.clone(),
181                self.apm_config.clone(),
182                self.otlp_traces.clone(),
183            ),
184            compression_scheme,
185            RB_BUFFER_CHUNK_SIZE,
186        )
187        .await?;
188        trace_rb.with_max_inputs_per_payload(MAX_TRACES_PER_PAYLOAD);
189
190        let flush_timeout = match self.flush_timeout_secs {
191            // We always give ourselves a minimum flush timeout of 10ms to allow for some very minimal amount of
192            // batching, while still practically flushing things almost immediately.
193            0 => Duration::from_millis(10),
194            secs => Duration::from_secs(secs),
195        };
196
197        Ok(Box::new(DatadogTrace {
198            trace_rb,
199            telemetry,
200            flush_timeout,
201        }))
202    }
203}
204
205impl MemoryBounds for DatadogTraceConfiguration {
206    fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
207        // TODO: How do we properly represent the requests we can generate that may be sitting around in-flight?
208        builder
209            .minimum()
210            .with_single_value::<DatadogTrace>("component struct")
211            .with_array::<EventsBuffer>("request builder events channel", 8)
212            .with_array::<PayloadsBuffer>("request builder payloads channel", 8);
213
214        builder
215            .firm()
216            .with_array::<Trace>("traces split re-encode buffer", MAX_TRACES_PER_PAYLOAD);
217    }
218}
219
220pub struct DatadogTrace {
221    trace_rb: RequestBuilder<TraceEndpointEncoder>,
222    telemetry: ComponentTelemetry,
223    flush_timeout: Duration,
224}
225
226// Encodes Trace events to TracerPayloads.
227#[async_trait]
228impl Encoder for DatadogTrace {
229    async fn run(mut self: Box<Self>, mut context: EncoderContext) -> Result<(), GenericError> {
230        let Self {
231            trace_rb,
232            telemetry,
233            flush_timeout,
234        } = *self;
235
236        let mut health = context.take_health_handle();
237
238        // The encoder runs two async loops, the main encoder loop and the request builder loop,
239        // this channel is used to send events from the main encoder loop to the request builder loop safely.
240        let (events_tx, events_rx) = mpsc::channel(8);
241        // adds a channel to send payloads to the dispatcher and a channel to receive them.
242        let (payloads_tx, mut payloads_rx) = mpsc::channel(8);
243        let request_builder_fut = run_request_builder(trace_rb, telemetry, events_rx, payloads_tx, flush_timeout);
244        // Spawn the request builder task on the global thread pool, this task is responsible for encoding traces and flushing requests.
245        let request_builder_handle = context
246            .topology_context()
247            .global_thread_pool() // Use the shared Tokio runtime thread pool.
248            .spawn_traced_named("dd-traces-request-builder", request_builder_fut);
249
250        health.mark_ready();
251        debug!("Datadog Trace encoder started.");
252
253        loop {
254            select! {
255                biased; // makes the branches of the select statement be evaluated in order.
256
257                _ = health.live() => continue,
258                maybe_payload = payloads_rx.recv() => match maybe_payload {
259                    Some(payload) => {
260                        // Dispatch an HTTP payload to the dispatcher.
261                        if let Err(e) = context.dispatcher().dispatch(payload).await {
262                            error!("Failed to dispatch payload: {}", e);
263                        }
264                    }
265                    None => break,
266                },
267                maybe_event_buffer = context.events().next() => match maybe_event_buffer {
268                    Some(event_buffer) => events_tx.send(event_buffer).await
269                        .error_context("Failed to send event buffer to request builder task.")?,
270                    None => break,
271                },
272            }
273        }
274
275        // Drop the events sender, which signals the request builder task to stop.
276        drop(events_tx);
277
278        // Continue draining the payloads receiver until it is closed.
279        while let Some(payload) = payloads_rx.recv().await {
280            if let Err(e) = context.dispatcher().dispatch(payload).await {
281                error!("Failed to dispatch payload: {}", e);
282            }
283        }
284
285        // Request build task should now be stopped.
286        match request_builder_handle.await {
287            Ok(Ok(())) => debug!("Request builder task stopped."),
288            Ok(Err(e)) => error!(error = %e, "Request builder task failed."),
289            Err(e) => error!(error = %e, "Request builder task panicked."),
290        }
291
292        debug!("Datadog Trace encoder stopped.");
293
294        Ok(())
295    }
296}
297
298async fn run_request_builder(
299    mut trace_request_builder: RequestBuilder<TraceEndpointEncoder>, telemetry: ComponentTelemetry,
300    mut events_rx: Receiver<EventsBuffer>, payloads_tx: Sender<PayloadsBuffer>, flush_timeout: std::time::Duration,
301) -> Result<(), GenericError> {
302    let mut pending_flush = false;
303    let pending_flush_timeout = sleep(flush_timeout);
304    tokio::pin!(pending_flush_timeout);
305
306    loop {
307        select! {
308            Some(event_buffer) = events_rx.recv() => {
309                for event in event_buffer {
310                    let trace = match event.try_into_trace() {
311                        Some(trace) => trace,
312                        None => continue,
313                    };
314                    // Encode the trace. If we get it back, that means the current request is full, and we need to
315                    // flush it before we can try to encode the trace again.
316                    let trace_to_retry = match trace_request_builder.encode(trace).await {
317                        Ok(None) => continue,
318                        Ok(Some(trace)) => trace,
319                        Err(e) => {
320                            error!(error = %e, "Failed to encode trace.");
321                            telemetry.events_dropped_encoder().increment(1);
322                            continue;
323                        }
324                    };
325
326                    let maybe_requests = trace_request_builder.flush().await;
327                    if maybe_requests.is_empty() {
328                        panic!("builder told us to flush, but gave us nothing");
329                    }
330
331                    for maybe_request in maybe_requests {
332                        match maybe_request {
333                            Ok((events, request)) => {
334                                let payload_meta = PayloadMetadata::from_event_count(events);
335                                let http_payload = HttpPayload::new(payload_meta, request);
336                                let payload = Payload::Http(http_payload);
337
338                                payloads_tx.send(payload).await
339                                    .map_err(|_| generic_error!("Failed to send payload to encoder."))?;
340                            },
341                            Err(e) => if e.is_recoverable() {
342                                // If the error is recoverable, we'll hold on to the trace to retry it later.
343                                continue;
344                            } else {
345                                return Err(GenericError::from(e).context("Failed to flush request."));
346                            }
347                        }
348                    }
349
350                    // Now try to encode the trace again.
351                    if let Err(e) = trace_request_builder.encode(trace_to_retry).await {
352                        error!(error = %e, "Failed to encode trace.");
353                        telemetry.events_dropped_encoder().increment(1);
354                    }
355                }
356
357                debug!("Processed event buffer.");
358
359                // If we're not already pending a flush, we'll start the countdown.
360                if !pending_flush {
361                    pending_flush_timeout.as_mut().reset(tokio::time::Instant::now() + flush_timeout);
362                    pending_flush = true;
363                }
364            },
365            _ = &mut pending_flush_timeout, if pending_flush => {
366                debug!("Flushing pending request(s).");
367
368                pending_flush = false;
369
370                // Once we've encoded and written all traces, we flush the request builders to generate a request with
371                // anything left over. Again, we'll enqueue those requests to be sent immediately.
372                let maybe_trace_requests = trace_request_builder.flush().await;
373                for maybe_request in maybe_trace_requests {
374                    match maybe_request {
375                        Ok((events, request)) => {
376                            let payload_meta = PayloadMetadata::from_event_count(events);
377                            let http_payload = HttpPayload::new(payload_meta, request);
378                            let payload = Payload::Http(http_payload);
379
380                            payloads_tx.send(payload).await
381                                .map_err(|_| generic_error!("Failed to send payload to encoder."))?;
382                        },
383                        Err(e) => if e.is_recoverable() {
384                            continue;
385                        } else {
386                            return Err(GenericError::from(e).context("Failed to flush request."));
387                        }
388                    }
389                }
390
391                debug!("All flushed requests sent to I/O task. Waiting for next event buffer...");
392            },
393
394            // Event buffers channel has been closed, and we have no pending flushing, so we're all done.
395            else => break,
396        }
397    }
398
399    Ok(())
400}
401
402#[derive(Debug)]
403struct TraceEndpointEncoder {
404    scratch: ScratchWriter<Vec<u8>>,
405    default_hostname: MetaString,
406    agent_hostname: String,
407    version: String,
408    env: String,
409    apm_config: ApmConfig,
410    otlp_traces: TracesConfig,
411    string_builder: StringBuilder,
412}
413
414impl TraceEndpointEncoder {
415    fn new(
416        default_hostname: MetaString, version: String, env: String, apm_config: ApmConfig, otlp_traces: TracesConfig,
417    ) -> Self {
418        Self {
419            scratch: ScratchWriter::new(Vec::with_capacity(8192)),
420            agent_hostname: default_hostname.as_ref().to_string(),
421            default_hostname,
422            version,
423            env,
424            apm_config,
425            otlp_traces,
426            string_builder: StringBuilder::new(),
427        }
428    }
429
430    fn encode_tracer_payload(&mut self, trace: &Trace, output_buffer: &mut Vec<u8>) -> std::io::Result<()> {
431        let sampling_rate = self.sampling_rate();
432        let resource_tags = trace.resource_tags();
433        let first_span = trace.spans().first();
434        let source = tags_to_source(resource_tags);
435
436        // Resolve metadata from resource tags.
437        let container_id = resolve_container_id(resource_tags, first_span);
438        let lang = get_resource_tag_value(resource_tags, "telemetry.sdk.language");
439        let sdk_version = get_resource_tag_value(resource_tags, "telemetry.sdk.version").unwrap_or("");
440        let tracer_version = format!("otlp-{}", sdk_version);
441        let container_tags = resolve_container_tags(
442            resource_tags,
443            source.as_ref(),
444            self.otlp_traces.ignore_missing_datadog_fields,
445        );
446        let env = resolve_env(resource_tags, self.otlp_traces.ignore_missing_datadog_fields);
447        let hostname = resolve_hostname(
448            resource_tags,
449            source.as_ref(),
450            Some(self.default_hostname.as_ref()),
451            self.otlp_traces.ignore_missing_datadog_fields,
452        );
453        let app_version = resolve_app_version(resource_tags);
454
455        // Resolve sampling metadata.
456        let (priority, dropped_trace, decision_maker, otlp_sr) = match trace.sampling() {
457            Some(sampling) => (
458                sampling.priority.unwrap_or(DEFAULT_CHUNK_PRIORITY),
459                sampling.dropped_trace,
460                sampling.decision_maker.as_deref(),
461                sampling.otlp_sampling_rate.unwrap_or(sampling_rate),
462            ),
463            None => (DEFAULT_CHUNK_PRIORITY, false, None, sampling_rate),
464        };
465
466        // Now incrementally build the payload.
467        let mut ap_builder = AgentPayloadBuilder::new(&mut self.scratch);
468
469        ap_builder
470            .host_name(&self.agent_hostname)?
471            .env(&self.env)?
472            .agent_version(&self.version)?
473            .target_tps(self.apm_config.target_traces_per_second())?
474            .error_tps(self.apm_config.errors_per_second())?;
475
476        ap_builder.add_tracer_payloads(|tp| {
477            if let Some(cid) = container_id {
478                tp.container_id(cid)?;
479            }
480            if let Some(l) = lang {
481                tp.language_name(l)?;
482            }
483            tp.tracer_version(&tracer_version)?;
484
485            // Encode the single TraceChunk containing all spans.
486            tp.add_chunks(|chunk| {
487                chunk.priority(priority)?;
488
489                for span in trace.spans() {
490                    chunk.add_spans(|s| {
491                        s.service(span.service())?
492                            .name(span.name())?
493                            .resource(span.resource())?
494                            .trace_id(span.trace_id())?
495                            .span_id(span.span_id())?
496                            .parent_id(span.parent_id())?
497                            .start(span.start() as i64)?
498                            .duration(span.duration() as i64)?
499                            .error(span.error())?;
500
501                        {
502                            let mut meta = s.meta();
503                            for (k, v) in span.meta() {
504                                meta.write_entry(k.as_ref(), v.as_ref())?;
505                            }
506                        }
507
508                        {
509                            let mut metrics = s.metrics();
510                            for (k, v) in span.metrics() {
511                                metrics.write_entry(k.as_ref(), *v)?;
512                            }
513                        }
514
515                        s.type_(span.span_type())?;
516
517                        {
518                            let mut ms = s.meta_struct();
519                            for (k, v) in span.meta_struct() {
520                                ms.write_entry(k.as_ref(), v.as_slice())?;
521                            }
522                        }
523
524                        for link in span.span_links() {
525                            s.add_span_links(|sl| {
526                                sl.trace_id(link.trace_id())?
527                                    .trace_id_high(link.trace_id_high())?
528                                    .span_id(link.span_id())?;
529                                {
530                                    let mut attrs = sl.attributes();
531                                    for (k, v) in link.attributes() {
532                                        attrs.write_entry(&**k, &**v)?;
533                                    }
534                                }
535                                let tracestate = link.tracestate().to_string();
536                                sl.tracestate(tracestate.as_str())?.flags(link.flags())?;
537                                Ok(())
538                            })?;
539                        }
540
541                        for event in span.span_events() {
542                            s.add_span_events(|se| {
543                                se.time_unix_nano(event.time_unix_nano())?.name(event.name())?;
544                                {
545                                    let mut attrs = se.attributes();
546                                    for (k, v) in event.attributes() {
547                                        attrs.write_entry(&**k, |av| encode_attribute_value(av, v))?;
548                                    }
549                                }
550                                Ok(())
551                            })?;
552                        }
553
554                        Ok(())
555                    })?;
556                }
557
558                // Chunk tags.
559                {
560                    let mut tags = chunk.tags();
561                    if let Some(dm) = decision_maker {
562                        tags.write_entry(TAG_DECISION_MAKER, dm)?;
563                    }
564
565                    self.string_builder.clear();
566                    write!(&mut self.string_builder, "{:.2}", otlp_sr)
567                        .expect("should never fail to format sampling rate");
568                    tags.write_entry(TAG_OTLP_SAMPLING_RATE, self.string_builder.as_str())?;
569                }
570
571                if dropped_trace {
572                    chunk.dropped_trace(true)?;
573                }
574
575                Ok(())
576            })?;
577
578            // Tracer payload tags.
579            if let Some(ct) = container_tags {
580                let mut tags = tp.tags();
581                tags.write_entry(CONTAINER_TAGS_META_KEY, &*ct)?;
582            }
583
584            if let Some(e) = env {
585                tp.env(e)?;
586            }
587            if let Some(h) = hostname {
588                tp.hostname(h)?;
589            }
590            if let Some(av) = app_version {
591                tp.app_version(av)?;
592            }
593
594            Ok(())
595        })?;
596
597        ap_builder.finish(output_buffer)?;
598
599        Ok(())
600    }
601
602    fn sampling_rate(&self) -> f64 {
603        let rate = self.otlp_traces.probabilistic_sampler.sampling_percentage / 100.0;
604        if rate <= 0.0 || rate >= 1.0 {
605            return 1.0;
606        }
607        rate
608    }
609}
610
611impl EndpointEncoder for TraceEndpointEncoder {
612    type Input = Trace;
613    type EncodeError = std::io::Error;
614    fn encoder_name() -> &'static str {
615        "traces"
616    }
617
618    fn compressed_size_limit(&self) -> usize {
619        DEFAULT_INTAKE_COMPRESSED_SIZE_LIMIT
620    }
621
622    fn uncompressed_size_limit(&self) -> usize {
623        DEFAULT_INTAKE_UNCOMPRESSED_SIZE_LIMIT
624    }
625
626    fn encode(&mut self, trace: &Self::Input, buffer: &mut Vec<u8>) -> Result<(), Self::EncodeError> {
627        self.encode_tracer_payload(trace, buffer)
628    }
629
630    fn endpoint_uri(&self) -> Uri {
631        PathAndQuery::from_static("/api/v0.2/traces").into()
632    }
633
634    fn endpoint_method(&self) -> Method {
635        Method::POST
636    }
637
638    fn content_type(&self) -> HeaderValue {
639        CONTENT_TYPE_PROTOBUF.clone()
640    }
641}
642
643fn encode_attribute_value<S: ScratchBuffer>(
644    builder: &mut AttributeAnyValueBuilder<'_, S>, value: &AttributeValue,
645) -> std::io::Result<()> {
646    match value {
647        AttributeValue::String(v) => {
648            builder.type_(AttributeAnyValueType::STRING_VALUE)?.string_value(v)?;
649        }
650        AttributeValue::Bool(v) => {
651            builder.type_(AttributeAnyValueType::BOOL_VALUE)?.bool_value(*v)?;
652        }
653        AttributeValue::Int(v) => {
654            builder.type_(AttributeAnyValueType::INT_VALUE)?.int_value(*v)?;
655        }
656        AttributeValue::Double(v) => {
657            builder.type_(AttributeAnyValueType::DOUBLE_VALUE)?.double_value(*v)?;
658        }
659        AttributeValue::Array(values) => {
660            builder.type_(AttributeAnyValueType::ARRAY_VALUE)?.array_value(|arr| {
661                for val in values {
662                    arr.add_values(|av| encode_attribute_array_value(av, val))?;
663                }
664                Ok(())
665            })?;
666        }
667    }
668    Ok(())
669}
670
671fn encode_attribute_array_value<S: ScratchBuffer>(
672    builder: &mut AttributeArrayValueBuilder<'_, S>, value: &AttributeScalarValue,
673) -> std::io::Result<()> {
674    match value {
675        AttributeScalarValue::String(v) => {
676            builder.type_(AttributeArrayValueType::STRING_VALUE)?.string_value(v)?;
677        }
678        AttributeScalarValue::Bool(v) => {
679            builder.type_(AttributeArrayValueType::BOOL_VALUE)?.bool_value(*v)?;
680        }
681        AttributeScalarValue::Int(v) => {
682            builder.type_(AttributeArrayValueType::INT_VALUE)?.int_value(*v)?;
683        }
684        AttributeScalarValue::Double(v) => {
685            builder.type_(AttributeArrayValueType::DOUBLE_VALUE)?.double_value(*v)?;
686        }
687    }
688    Ok(())
689}
690
691fn get_resource_tag_value<'a>(resource_tags: &'a SharedTagSet, key: &str) -> Option<&'a str> {
692    resource_tags.get_single_tag(key).and_then(|t| t.value())
693}
694
695fn resolve_hostname<'a>(
696    resource_tags: &'a SharedTagSet, source: Option<&'a OtlpSource>, default_hostname: Option<&'a str>,
697    ignore_missing_fields: bool,
698) -> Option<&'a str> {
699    let mut hostname = match source {
700        Some(src) => match src.kind {
701            OtlpSourceKind::HostnameKind => Some(src.identifier.as_str()),
702            _ => Some(""),
703        },
704        None => default_hostname,
705    };
706
707    if ignore_missing_fields {
708        hostname = Some("");
709    }
710
711    if let Some(value) = get_resource_tag_value(resource_tags, KEY_DATADOG_HOST) {
712        hostname = Some(value);
713    }
714
715    hostname
716}
717
718fn resolve_env(resource_tags: &SharedTagSet, ignore_missing_fields: bool) -> Option<&str> {
719    if let Some(value) = get_resource_tag_value(resource_tags, KEY_DATADOG_ENVIRONMENT) {
720        return Some(value);
721    }
722    if ignore_missing_fields {
723        return None;
724    }
725    if let Some(value) = get_resource_tag_value(resource_tags, DEPLOYMENT_ENVIRONMENT_NAME) {
726        return Some(value);
727    }
728    get_resource_tag_value(resource_tags, DEPLOYMENT_ENVIRONMENT_KEY)
729}
730
731fn resolve_container_id<'a>(resource_tags: &'a SharedTagSet, first_span: Option<&'a DdSpan>) -> Option<&'a str> {
732    for key in [KEY_DATADOG_CONTAINER_ID, CONTAINER_ID, K8S_POD_UID] {
733        if let Some(value) = get_resource_tag_value(resource_tags, key) {
734            return Some(value);
735        }
736    }
737    // TODO: add container id fallback equivalent to cidProvider
738    // https://github.com/DataDog/datadog-agent/blob/main/pkg/trace/api/otlp.go#L414
739    if let Some(span) = first_span {
740        for (k, v) in span.meta() {
741            if k == KEY_DATADOG_CONTAINER_ID || k == K8S_POD_UID {
742                return Some(v.as_ref());
743            }
744        }
745    }
746    None
747}
748
749fn resolve_app_version(resource_tags: &SharedTagSet) -> Option<&str> {
750    if let Some(value) = get_resource_tag_value(resource_tags, KEY_DATADOG_VERSION) {
751        return Some(value);
752    }
753    get_resource_tag_value(resource_tags, SERVICE_VERSION)
754}
755
756fn resolve_container_tags(
757    resource_tags: &SharedTagSet, source: Option<&OtlpSource>, ignore_missing_fields: bool,
758) -> Option<MetaString> {
759    // TODO: some refactoring is probably needed to normalize this function, the tags should already be normalized
760    // since we do so when we transform OTLP spans to DD spans however to make this class extensible for non otlp traces, we would
761    // need to normalize the tags here.
762    if let Some(tags) = get_resource_tag_value(resource_tags, KEY_DATADOG_CONTAINER_TAGS) {
763        if !tags.is_empty() {
764            return Some(MetaString::from(tags));
765        }
766    }
767
768    if ignore_missing_fields {
769        return None;
770    }
771    let mut container_tags = TagSet::default();
772    extract_container_tags_from_resource_tagset(resource_tags, &mut container_tags);
773    let is_fargate_source = source.is_some_and(|src| src.kind == OtlpSourceKind::AwsEcsFargateKind);
774    if container_tags.is_empty() && !is_fargate_source {
775        return None;
776    }
777
778    let mut flattened = flatten_container_tag(container_tags);
779    if is_fargate_source {
780        if let Some(src) = source {
781            append_tags(&mut flattened, &src.tag());
782        }
783    }
784
785    if flattened.is_empty() {
786        None
787    } else {
788        Some(MetaString::from(flattened))
789    }
790}
791
792fn flatten_container_tag(tags: TagSet) -> String {
793    let mut flattened = String::new();
794    for tag in tags {
795        if !flattened.is_empty() {
796            flattened.push(',');
797        }
798        flattened.push_str(tag.as_str());
799    }
800    flattened
801}
802
803fn append_tags(target: &mut String, tags: &str) {
804    if tags.is_empty() {
805        return;
806    }
807    if !target.is_empty() {
808        target.push(',');
809    }
810    target.push_str(tags);
811}