saluki_components/encoders/datadog/traces/
mod.rs

1#![allow(dead_code)]
2
3use async_trait::async_trait;
4use datadog_protos::traces::{
5    attribute_any_value::AttributeAnyValueType, attribute_array_value::AttributeArrayValueType, AttributeAnyValue,
6    AttributeArray, AttributeArrayValue, Span as ProtoSpan, SpanEvent as ProtoSpanEvent, SpanLink as ProtoSpanLink,
7    TraceChunk,
8};
9use http::{uri::PathAndQuery, HeaderValue, Method, Uri};
10use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
11use opentelemetry_semantic_conventions::resource::{
12    CONTAINER_ID, DEPLOYMENT_ENVIRONMENT_NAME, K8S_POD_UID, SERVICE_VERSION,
13};
14use protobuf::{rt::WireType, CodedOutputStream, Message};
15use saluki_common::task::HandleExt as _;
16use saluki_config::GenericConfiguration;
17use saluki_context::tags::TagSet;
18use saluki_core::data_model::event::trace::{
19    AttributeScalarValue, AttributeValue, Span as DdSpan, SpanEvent as DdSpanEvent, SpanLink as DdSpanLink,
20};
21use saluki_core::data_model::event::Event;
22use saluki_core::topology::{EventsBuffer, PayloadsBuffer};
23use saluki_core::{
24    components::{encoders::*, ComponentContext},
25    data_model::{
26        event::{trace::Trace, EventType},
27        payload::{HttpPayload, Payload, PayloadMetadata, PayloadType},
28    },
29    observability::ComponentMetricsExt as _,
30};
31use saluki_env::host::providers::BoxedHostProvider;
32use saluki_env::{EnvironmentProvider, HostProvider};
33use saluki_error::generic_error;
34use saluki_error::{ErrorContext as _, GenericError};
35use saluki_io::compression::CompressionScheme;
36use saluki_metrics::MetricsBuilder;
37use serde::Deserialize;
38use stringtheory::MetaString;
39use tokio::{
40    select,
41    sync::mpsc::{self, Receiver, Sender},
42    time::sleep,
43};
44use tracing::{debug, error};
45
46use crate::common::datadog::{
47    apm::ApmConfig,
48    io::RB_BUFFER_CHUNK_SIZE,
49    request_builder::{EndpointEncoder, RequestBuilder},
50    telemetry::ComponentTelemetry,
51    DEFAULT_INTAKE_COMPRESSED_SIZE_LIMIT, DEFAULT_INTAKE_UNCOMPRESSED_SIZE_LIMIT,
52};
53use crate::common::otlp::config::TracesConfig;
54use crate::common::otlp::util::{
55    extract_container_tags_from_resource_tagset, tags_to_source, Source as OtlpSource, SourceKind as OtlpSourceKind,
56    DEPLOYMENT_ENVIRONMENT_KEY, KEY_DATADOG_CONTAINER_ID, KEY_DATADOG_CONTAINER_TAGS, KEY_DATADOG_ENVIRONMENT,
57    KEY_DATADOG_HOST, KEY_DATADOG_VERSION,
58};
59
60const CONTAINER_TAGS_META_KEY: &str = "_dd.tags.container";
61const MAX_TRACES_PER_PAYLOAD: usize = 10000;
62static CONTENT_TYPE_PROTOBUF: HeaderValue = HeaderValue::from_static("application/x-protobuf");
63
64fn default_serializer_compressor_kind() -> String {
65    "zstd".to_string()
66}
67
68const fn default_zstd_compressor_level() -> i32 {
69    3
70}
71
72const fn default_flush_timeout_secs() -> u64 {
73    2
74}
75
76// Field numbers from lib/protos/datadog/proto/datadog-agent/datadog/trace/tracer_payload.proto. This is is used to construct the format of the tracer payload and trace chunk.
77const TRACER_PAYLOAD_CONTAINER_ID_FIELD_NUMBER: u32 = 1;
78const TRACER_PAYLOAD_LANGUAGE_NAME_FIELD_NUMBER: u32 = 2;
79const TRACER_PAYLOAD_TRACER_VERSION_FIELD_NUMBER: u32 = 4;
80const TRACER_PAYLOAD_CHUNKS_FIELD_NUMBER: u32 = 6;
81const TRACER_PAYLOAD_TAGS_FIELD_NUMBER: u32 = 7;
82const TRACER_PAYLOAD_ENV_FIELD_NUMBER: u32 = 8;
83const TRACER_PAYLOAD_HOSTNAME_FIELD_NUMBER: u32 = 9;
84const TRACER_PAYLOAD_APP_VERSION_FIELD_NUMBER: u32 = 10;
85
86const AGENT_PAYLOAD_HOSTNAME_FIELD_NUMBER: u32 = 1;
87const AGENT_PAYLOAD_ENV_FIELD_NUMBER: u32 = 2;
88const AGENT_PAYLOAD_TRACER_PAYLOADS_FIELD_NUMBER: u32 = 5;
89const AGENT_PAYLOAD_AGENT_VERSION_FIELD_NUMBER: u32 = 7;
90const AGENT_PAYLOAD_TARGET_TPS_FIELD_NUMBER: u32 = 8;
91const AGENT_PAYLOAD_ERROR_TPS_FIELD_NUMBER: u32 = 9;
92
93fn default_env() -> String {
94    "none".to_string()
95}
96
97/// Configuration for the Datadog Traces encoder.
98///
99/// This encoder converts trace events into Datadog's TracerPayload protobuf format and sends them
100/// to the Datadog traces intake endpoint (`/api/v0.2/traces`). It handles batching, compression,
101/// and enrichment with metadata such as hostname, environment, and container tags.
102#[derive(Deserialize)]
103pub struct DatadogTraceConfiguration {
104    #[serde(
105        rename = "serializer_compressor_kind",  // renames the field in the user_configuration from "serializer_compressor_kind" to "compressor_kind".
106        default = "default_serializer_compressor_kind"
107    )]
108    compressor_kind: String,
109
110    #[serde(
111        rename = "serializer_zstd_compressor_level",
112        default = "default_zstd_compressor_level"
113    )]
114    zstd_compressor_level: i32,
115
116    /// Flush timeout for pending requests, in seconds.
117    ///
118    /// When the encoder has written traces to the in-flight request payload, but it has not yet reached the
119    /// payload size limits that would force the payload to be flushed, the encoder will wait for a period of time
120    /// before flushing the in-flight request payload.
121    ///
122    /// Defaults to 2 seconds.
123    #[serde(default = "default_flush_timeout_secs")]
124    flush_timeout_secs: u64,
125
126    #[serde(skip)]
127    default_hostname: Option<String>,
128
129    #[serde(skip)]
130    version: String,
131
132    #[serde(skip)]
133    apm_config: ApmConfig,
134
135    #[serde(skip)]
136    otlp_traces: TracesConfig,
137
138    #[serde(default = "default_env")]
139    env: String,
140}
141
142impl DatadogTraceConfiguration {
143    /// Creates a new `DatadogTraceConfiguration` from the given configuration.
144    pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
145        let mut trace_config: Self = config.as_typed()?;
146
147        let app_details = saluki_metadata::get_app_details();
148        trace_config.version = format!("agent-data-plane/{}", app_details.version().raw());
149
150        trace_config.apm_config = ApmConfig::from_configuration(config)?;
151        trace_config.otlp_traces = config.try_get_typed("otlp_config.traces")?.unwrap_or_default();
152
153        Ok(trace_config)
154    }
155}
156
157impl DatadogTraceConfiguration {
158    /// Sets the default_hostname using the environment provider
159    pub async fn with_environment_provider<E>(mut self, environment_provider: E) -> Result<Self, GenericError>
160    where
161        E: EnvironmentProvider<Host = BoxedHostProvider>,
162    {
163        let host_provider = environment_provider.host();
164        let hostname = host_provider.get_hostname().await?;
165        self.default_hostname = Some(hostname);
166        Ok(self)
167    }
168}
169
170#[async_trait]
171impl EncoderBuilder for DatadogTraceConfiguration {
172    fn input_event_type(&self) -> EventType {
173        EventType::Trace
174    }
175
176    fn output_payload_type(&self) -> PayloadType {
177        PayloadType::Http
178    }
179
180    async fn build(&self, context: ComponentContext) -> Result<Box<dyn Encoder + Send>, GenericError> {
181        let metrics_builder = MetricsBuilder::from_component_context(&context);
182        let telemetry = ComponentTelemetry::from_builder(&metrics_builder);
183        let compression_scheme = CompressionScheme::new(&self.compressor_kind, self.zstd_compressor_level);
184
185        let default_hostname = self.default_hostname.clone().unwrap_or_default();
186        let default_hostname = MetaString::from(default_hostname);
187
188        // Create request builder for traces which is used to generate HTTP requests.
189
190        let mut trace_rb = RequestBuilder::new(
191            TraceEndpointEncoder::new(
192                default_hostname,
193                self.version.clone(),
194                self.env.clone(),
195                self.apm_config.clone(),
196                self.otlp_traces.clone(),
197            ),
198            compression_scheme,
199            RB_BUFFER_CHUNK_SIZE,
200        )
201        .await?;
202        trace_rb.with_max_inputs_per_payload(MAX_TRACES_PER_PAYLOAD);
203
204        let flush_timeout = match self.flush_timeout_secs {
205            // Give ourselves a minimum flush timeout of 10ms to allow for minimal batching
206            0 => std::time::Duration::from_millis(10),
207            secs => std::time::Duration::from_secs(secs),
208        };
209
210        Ok(Box::new(DatadogTrace {
211            trace_rb,
212            telemetry,
213            flush_timeout,
214        }))
215    }
216}
217
218impl MemoryBounds for DatadogTraceConfiguration {
219    fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
220        // TODO: How do we properly represent the requests we can generate that may be sitting around in-flight?
221        builder
222            .minimum()
223            .with_single_value::<DatadogTrace>("component struct")
224            .with_array::<EventsBuffer>("request builder events channel", 8)
225            .with_array::<PayloadsBuffer>("request builder payloads channel", 8);
226
227        builder
228            .firm()
229            .with_array::<Trace>("traces split re-encode buffer", MAX_TRACES_PER_PAYLOAD);
230    }
231}
232
233pub struct DatadogTrace {
234    trace_rb: RequestBuilder<TraceEndpointEncoder>,
235    telemetry: ComponentTelemetry,
236    flush_timeout: std::time::Duration,
237}
238
239// Encodes Trace events to TracerPayloads.
240#[async_trait]
241impl Encoder for DatadogTrace {
242    async fn run(mut self: Box<Self>, mut context: EncoderContext) -> Result<(), GenericError> {
243        let Self {
244            trace_rb,
245            telemetry,
246            flush_timeout,
247        } = *self;
248
249        let mut health = context.take_health_handle();
250
251        // The encoder runs two async loops, the main encoder loop and the request builder loop,
252        // this channel is used to send events from the main encoder loop to the request builder loop safely.
253        let (events_tx, events_rx) = mpsc::channel(8);
254        // adds a channel to send payloads to the dispatcher and a channel to receive them.
255        let (payloads_tx, mut payloads_rx) = mpsc::channel(8);
256        let request_builder_fut = run_request_builder(trace_rb, telemetry, events_rx, payloads_tx, flush_timeout);
257        // Spawn the request builder task on the global thread pool, this task is responsible for encoding traces and flushing requests.
258        let request_builder_handle = context
259            .topology_context()
260            .global_thread_pool() // Use the shared Tokio runtime thread pool.
261            .spawn_traced_named("dd-traces-request-builder", request_builder_fut);
262
263        health.mark_ready();
264        debug!("Datadog Trace encoder started.");
265
266        loop {
267            select! {
268                biased; // makes the branches of the select statement be evaluated in order.
269
270                _ = health.live() => continue,
271                maybe_payload = payloads_rx.recv() => match maybe_payload {
272                    Some(payload) => {
273                        // Dispatch an HTTP payload to the dispatcher.
274                        if let Err(e) = context.dispatcher().dispatch(payload).await {
275                            error!("Failed to dispatch payload: {}", e);
276                        }
277                    }
278                    None => break,
279                },
280                maybe_event_buffer = context.events().next() => match maybe_event_buffer {
281                    Some(event_buffer) => events_tx.send(event_buffer).await
282                        .error_context("Failed to send event buffer to request builder task.")?,
283                    None => break,
284                },
285            }
286        }
287
288        // Drop the events sender, which signals the request builder task to stop.
289        drop(events_tx);
290
291        // Continue draining the payloads receiver until it is closed.
292        while let Some(payload) = payloads_rx.recv().await {
293            if let Err(e) = context.dispatcher().dispatch(payload).await {
294                error!("Failed to dispatch payload: {}", e);
295            }
296        }
297
298        // Request build task should now be stopped.
299        match request_builder_handle.await {
300            Ok(Ok(())) => debug!("Request builder task stopped."),
301            Ok(Err(e)) => error!(error = %e, "Request builder task failed."),
302            Err(e) => error!(error = %e, "Request builder task panicked."),
303        }
304
305        debug!("Datadog Trace encoder stopped.");
306
307        Ok(())
308    }
309}
310
311async fn run_request_builder(
312    mut trace_request_builder: RequestBuilder<TraceEndpointEncoder>, telemetry: ComponentTelemetry,
313    mut events_rx: Receiver<EventsBuffer>, payloads_tx: Sender<PayloadsBuffer>, flush_timeout: std::time::Duration,
314) -> Result<(), GenericError> {
315    let mut pending_flush = false;
316    let pending_flush_timeout = sleep(flush_timeout);
317    tokio::pin!(pending_flush_timeout);
318
319    loop {
320        select! {
321            Some(event_buffer) = events_rx.recv() => {
322                for event in event_buffer {
323                    let trace = match event {
324                        Event::Trace(trace) => trace,
325                        _ => continue,
326                    };
327
328                    // Encode the trace. If we get it back, that means the current request is full, and we need to
329                    // flush it before we can try to encode the trace again.
330                    let trace_to_retry = match trace_request_builder.encode(trace).await {
331                        Ok(None) => continue,
332                        Ok(Some(trace)) => trace,
333                        Err(e) => {
334                            error!(error = %e, "Failed to encode trace.");
335                            telemetry.events_dropped_encoder().increment(1);
336                            continue;
337                        }
338                    };
339
340                    let maybe_requests = trace_request_builder.flush().await;
341                    if maybe_requests.is_empty() {
342                        panic!("builder told us to flush, but gave us nothing");
343                    }
344
345                    for maybe_request in maybe_requests {
346                        match maybe_request {
347                            Ok((events, request)) => {
348                                let payload_meta = PayloadMetadata::from_event_count(events);
349                                let http_payload = HttpPayload::new(payload_meta, request);
350                                let payload = Payload::Http(http_payload);
351
352                                payloads_tx.send(payload).await
353                                    .map_err(|_| generic_error!("Failed to send payload to encoder."))?;
354                            },
355                            Err(e) => if e.is_recoverable() {
356                                // If the error is recoverable, we'll hold on to the trace to retry it later.
357                                continue;
358                            } else {
359                                return Err(GenericError::from(e).context("Failed to flush request."));
360                            }
361                        }
362                    }
363
364                    // Now try to encode the trace again.
365                    if let Err(e) = trace_request_builder.encode(trace_to_retry).await {
366                        error!(error = %e, "Failed to encode trace.");
367                        telemetry.events_dropped_encoder().increment(1);
368                    }
369                }
370
371                debug!("Processed event buffer.");
372
373                // If we're not already pending a flush, we'll start the countdown.
374                if !pending_flush {
375                    pending_flush_timeout.as_mut().reset(tokio::time::Instant::now() + flush_timeout);
376                    pending_flush = true;
377                }
378            },
379            _ = &mut pending_flush_timeout, if pending_flush => {
380                debug!("Flushing pending request(s).");
381
382                pending_flush = false;
383
384                // Once we've encoded and written all traces, we flush the request builders to generate a request with
385                // anything left over. Again, we'll enqueue those requests to be sent immediately.
386                let maybe_trace_requests = trace_request_builder.flush().await;
387                for maybe_request in maybe_trace_requests {
388                    match maybe_request {
389                        Ok((events, request)) => {
390                            let payload_meta = PayloadMetadata::from_event_count(events);
391                            let http_payload = HttpPayload::new(payload_meta, request);
392                            let payload = Payload::Http(http_payload);
393
394                            payloads_tx.send(payload).await
395                                .map_err(|_| generic_error!("Failed to send payload to encoder."))?;
396                        },
397                        Err(e) => if e.is_recoverable() {
398                            continue;
399                        } else {
400                            return Err(GenericError::from(e).context("Failed to flush request."));
401                        }
402                    }
403                }
404
405                debug!("All flushed requests sent to I/O task. Waiting for next event buffer...");
406            },
407
408            // Event buffers channel has been closed, and we have no pending flushing, so we're all done.
409            else => break,
410        }
411    }
412
413    Ok(())
414}
415
416#[derive(Debug)]
417struct TraceEndpointEncoder {
418    tracer_payload_scratch: Vec<u8>,
419    chunk_scratch: Vec<u8>,
420    tags_scratch: Vec<u8>,
421    // TODO: do we need additional tags or tag deplicator?
422    default_hostname: MetaString,
423    agent_hostname: String,
424    version: String,
425    env: String,
426    apm_config: ApmConfig,
427    otlp_traces: TracesConfig,
428}
429
430impl TraceEndpointEncoder {
431    fn new(
432        default_hostname: MetaString, version: String, env: String, apm_config: ApmConfig, otlp_traces: TracesConfig,
433    ) -> Self {
434        Self {
435            tracer_payload_scratch: Vec::new(),
436            chunk_scratch: Vec::new(),
437            tags_scratch: Vec::new(),
438            agent_hostname: default_hostname.as_ref().to_string(),
439            default_hostname,
440            version,
441            env,
442            apm_config,
443            otlp_traces,
444        }
445    }
446
447    fn encode_tracer_payload(&mut self, trace: &Trace, output_buffer: &mut Vec<u8>) -> Result<(), protobuf::Error> {
448        let resource_tags = trace.resource_tags();
449        let first_span = trace.spans().first();
450        let source = tags_to_source(resource_tags);
451
452        let trace_chunk = self.build_trace_chunk(trace);
453
454        // Build AgentPayload (outer message) writing to output_buffer
455        let mut agent_payload_stream = CodedOutputStream::vec(output_buffer);
456
457        // Write AgentPayload fields defined in agent_payload.proto
458        agent_payload_stream.write_string(AGENT_PAYLOAD_HOSTNAME_FIELD_NUMBER, &self.agent_hostname)?;
459        agent_payload_stream.write_string(AGENT_PAYLOAD_ENV_FIELD_NUMBER, &self.env)?;
460
461        agent_payload_stream.write_string(AGENT_PAYLOAD_AGENT_VERSION_FIELD_NUMBER, &self.version)?;
462        agent_payload_stream.write_double(
463            AGENT_PAYLOAD_TARGET_TPS_FIELD_NUMBER,
464            self.apm_config.target_traces_per_second(),
465        )?;
466        agent_payload_stream.write_double(
467            AGENT_PAYLOAD_ERROR_TPS_FIELD_NUMBER,
468            self.apm_config.errors_per_second(),
469        )?;
470
471        // Build TracerPayload (nested message) in scratch buffer
472        self.tracer_payload_scratch.clear();
473        let mut tracer_payload_stream = CodedOutputStream::vec(&mut self.tracer_payload_scratch);
474
475        // Write TracerPayload fields
476        if let Some(container_id) = resolve_container_id(resource_tags, first_span) {
477            tracer_payload_stream.write_string(TRACER_PAYLOAD_CONTAINER_ID_FIELD_NUMBER, container_id)?;
478        }
479
480        if let Some(lang) = get_resource_tag_value(resource_tags, "telemetry.sdk.language") {
481            tracer_payload_stream.write_string(TRACER_PAYLOAD_LANGUAGE_NAME_FIELD_NUMBER, lang)?;
482        }
483
484        // Write tracer_version for OTLP traces, even if telemetry.sdk.version is missing.
485        let sdk_version = get_resource_tag_value(resource_tags, "telemetry.sdk.version").unwrap_or("");
486        let tracer_version = format!("otlp-{}", sdk_version);
487        tracer_payload_stream.write_string(TRACER_PAYLOAD_TRACER_VERSION_FIELD_NUMBER, &tracer_version)?;
488
489        self.chunk_scratch.clear();
490        write_message_field(
491            &mut tracer_payload_stream,
492            TRACER_PAYLOAD_CHUNKS_FIELD_NUMBER,
493            &trace_chunk,
494            &mut self.chunk_scratch,
495        )?;
496
497        self.tags_scratch.clear();
498        if let Some(tags) = resolve_container_tags(
499            resource_tags,
500            source.as_ref(),
501            self.otlp_traces.ignore_missing_datadog_fields,
502        ) {
503            write_map_entry_string_string(
504                &mut tracer_payload_stream,
505                TRACER_PAYLOAD_TAGS_FIELD_NUMBER,
506                CONTAINER_TAGS_META_KEY,
507                tags.as_ref(),
508                &mut self.tags_scratch,
509            )?;
510        }
511
512        if let Some(env) = resolve_env(resource_tags, self.otlp_traces.ignore_missing_datadog_fields) {
513            tracer_payload_stream.write_string(TRACER_PAYLOAD_ENV_FIELD_NUMBER, env)?;
514        }
515
516        if let Some(hostname) = resolve_hostname(
517            resource_tags,
518            source.as_ref(),
519            Some(self.default_hostname.as_ref()),
520            self.otlp_traces.ignore_missing_datadog_fields,
521        ) {
522            tracer_payload_stream.write_string(TRACER_PAYLOAD_HOSTNAME_FIELD_NUMBER, hostname)?;
523        }
524
525        if let Some(app_version) = resolve_app_version(resource_tags) {
526            tracer_payload_stream.write_string(TRACER_PAYLOAD_APP_VERSION_FIELD_NUMBER, app_version)?;
527        }
528
529        tracer_payload_stream.flush()?;
530        // Drop tracer_payload_stream to release the mutable borrow of tracer_payload_scratch
531        drop(tracer_payload_stream);
532
533        // Write TracerPayload as a nested message in AgentPayload (repeated field)
534        agent_payload_stream.write_bytes(AGENT_PAYLOAD_TRACER_PAYLOADS_FIELD_NUMBER, &self.tracer_payload_scratch)?;
535        agent_payload_stream.flush()?;
536
537        Ok(())
538    }
539
540    fn sampling_rate(&self) -> f64 {
541        let rate = self.otlp_traces.probabilistic_sampler.sampling_percentage / 100.0;
542        if rate <= 0.0 || rate >= 1.0 {
543            return 1.0;
544        }
545        rate
546    }
547
548    fn build_trace_chunk(&self, trace: &Trace) -> TraceChunk {
549        let mut spans: Vec<ProtoSpan> = trace.spans().iter().map(convert_span).collect();
550        let mut chunk = TraceChunk::new();
551
552        let rate = self.sampling_rate();
553        let mut tags = std::collections::HashMap::new();
554        tags.insert("_dd.otlp_sr".to_string(), format!("{:.2}", rate));
555
556        // TODO: Remove this once we have sampling. We have to hardcode the priority to 1 for now so that intake does not drop the trace.
557        const PRIORITY_AUTO_KEEP: i32 = 1;
558        chunk.set_priority(PRIORITY_AUTO_KEEP);
559
560        // Set _dd.p.dm (decision maker)
561        // Only set if sampling priority is "keep" (which it is, since we set PRIORITY_AUTO_KEEP)
562        // Decision maker "-9" indicates probabilistic sampler made the decision
563        const DECISION_MAKER: &str = "-9";
564        if let Some(first_span) = spans.first_mut() {
565            let mut meta = first_span.take_meta();
566            meta.insert("_dd.p.dm".to_string(), DECISION_MAKER.to_string());
567            first_span.set_meta(meta);
568        }
569
570        tags.insert("_dd.p.dm".to_string(), DECISION_MAKER.to_string());
571        chunk.set_tags(tags);
572
573        chunk.set_spans(spans);
574
575        chunk
576    }
577}
578
579impl EndpointEncoder for TraceEndpointEncoder {
580    type Input = Trace;
581    type EncodeError = protobuf::Error;
582    fn encoder_name() -> &'static str {
583        "traces"
584    }
585
586    fn compressed_size_limit(&self) -> usize {
587        DEFAULT_INTAKE_COMPRESSED_SIZE_LIMIT
588    }
589
590    fn uncompressed_size_limit(&self) -> usize {
591        DEFAULT_INTAKE_UNCOMPRESSED_SIZE_LIMIT
592    }
593
594    fn encode(&mut self, trace: &Self::Input, buffer: &mut Vec<u8>) -> Result<(), Self::EncodeError> {
595        self.encode_tracer_payload(trace, buffer)
596    }
597
598    fn endpoint_uri(&self) -> Uri {
599        PathAndQuery::from_static("/api/v0.2/traces").into()
600    }
601
602    fn endpoint_method(&self) -> Method {
603        Method::POST
604    }
605
606    fn content_type(&self) -> HeaderValue {
607        CONTENT_TYPE_PROTOBUF.clone()
608    }
609}
610
611fn convert_span(span: &DdSpan) -> ProtoSpan {
612    let mut proto = ProtoSpan::new();
613    proto.set_service(span.service().to_string());
614    proto.set_name(span.name().to_string());
615    proto.set_resource(span.resource().to_string());
616    proto.set_traceID(span.trace_id());
617    proto.set_spanID(span.span_id());
618    proto.set_parentID(span.parent_id());
619    proto.set_start(span.start());
620    proto.set_duration(span.duration());
621    proto.set_error(span.error());
622    proto.set_type(span.span_type().to_string());
623
624    proto.set_meta(
625        span.meta()
626            .iter()
627            .map(|(k, v)| (k.to_string(), v.to_string()))
628            .collect(),
629    );
630    proto.set_metrics(span.metrics().iter().map(|(k, v)| (k.to_string(), *v)).collect());
631    proto.set_meta_struct(
632        span.meta_struct()
633            .iter()
634            .map(|(k, v)| (k.to_string(), v.clone()))
635            .collect(),
636    );
637    proto.set_spanLinks(span.span_links().iter().map(convert_span_link).collect());
638    proto.set_spanEvents(span.span_events().iter().map(convert_span_event).collect());
639    proto
640}
641
642fn convert_span_link(link: &DdSpanLink) -> ProtoSpanLink {
643    let mut proto = ProtoSpanLink::new();
644    proto.set_traceID(link.trace_id());
645    proto.set_traceID_high(link.trace_id_high());
646    proto.set_spanID(link.span_id());
647    proto.set_attributes(
648        link.attributes()
649            .iter()
650            .map(|(k, v)| (k.to_string(), v.to_string()))
651            .collect(),
652    );
653    proto.set_tracestate(link.tracestate().to_string());
654    proto.set_flags(link.flags());
655    proto
656}
657
658fn convert_span_event(event: &DdSpanEvent) -> ProtoSpanEvent {
659    let mut proto = ProtoSpanEvent::new();
660    proto.set_time_unix_nano(event.time_unix_nano());
661    proto.set_name(event.name().to_string());
662    proto.set_attributes(
663        event
664            .attributes()
665            .iter()
666            .map(|(k, v)| (k.to_string(), convert_attribute_value(v)))
667            .collect(),
668    );
669    proto
670}
671
672fn convert_attribute_value(value: &AttributeValue) -> AttributeAnyValue {
673    let mut proto = AttributeAnyValue::new();
674    match value {
675        AttributeValue::String(v) => {
676            proto.set_type(AttributeAnyValueType::STRING_VALUE);
677            proto.set_string_value(v.to_string());
678        }
679        AttributeValue::Bool(v) => {
680            proto.set_type(AttributeAnyValueType::BOOL_VALUE);
681            proto.set_bool_value(*v);
682        }
683        AttributeValue::Int(v) => {
684            proto.set_type(AttributeAnyValueType::INT_VALUE);
685            proto.set_int_value(*v);
686        }
687        AttributeValue::Double(v) => {
688            proto.set_type(AttributeAnyValueType::DOUBLE_VALUE);
689            proto.set_double_value(*v);
690        }
691        AttributeValue::Array(values) => {
692            proto.set_type(AttributeAnyValueType::ARRAY_VALUE);
693            let mut array = AttributeArray::new();
694            array.set_values(values.iter().map(convert_attribute_array_value).collect());
695            proto.set_array_value(array);
696        }
697    }
698    proto
699}
700
701fn convert_attribute_array_value(value: &AttributeScalarValue) -> AttributeArrayValue {
702    let mut proto = AttributeArrayValue::new();
703    match value {
704        AttributeScalarValue::String(v) => {
705            proto.set_type(AttributeArrayValueType::STRING_VALUE);
706            proto.set_string_value(v.to_string());
707        }
708        AttributeScalarValue::Bool(v) => {
709            proto.set_type(AttributeArrayValueType::BOOL_VALUE);
710            proto.set_bool_value(*v);
711        }
712        AttributeScalarValue::Int(v) => {
713            proto.set_type(AttributeArrayValueType::INT_VALUE);
714            proto.set_int_value(*v);
715        }
716        AttributeScalarValue::Double(v) => {
717            proto.set_type(AttributeArrayValueType::DOUBLE_VALUE);
718            proto.set_double_value(*v);
719        }
720    }
721    proto
722}
723
724fn write_message_field<M: Message>(
725    output_stream: &mut CodedOutputStream<'_>, field_number: u32, message: &M, scratch_buf: &mut Vec<u8>,
726) -> Result<(), protobuf::Error> {
727    scratch_buf.clear();
728    {
729        // In protobuf, length-delimited is one of the wire types, it encodes data as [tag][length][value].
730        // We use a nested output stream to write the message to because output_stream requires the size of the message to be known before writing
731        // and the Message type size is not known as it depends on the actual data values and lengths and we
732        // don't know this until runtime.
733        let mut nested = CodedOutputStream::vec(scratch_buf);
734        message.write_to(&mut nested)?;
735        nested.flush()?;
736    }
737    output_stream.write_tag(field_number, WireType::LengthDelimited)?;
738    output_stream.write_raw_varint32(scratch_buf.len() as u32)?;
739    output_stream.write_raw_bytes(scratch_buf)?;
740    Ok(())
741}
742
743fn write_map_entry_string_string(
744    output_stream: &mut CodedOutputStream<'_>, field_number: u32, key: &str, value: &str, scratch_buf: &mut Vec<u8>,
745) -> Result<(), protobuf::Error> {
746    scratch_buf.clear();
747    {
748        let mut nested = CodedOutputStream::vec(scratch_buf);
749        // the field number 1 and 2 correspond to key and value
750        nested.write_string(1, key)?;
751        nested.write_string(2, value)?;
752        nested.flush()?;
753    }
754    output_stream.write_tag(field_number, WireType::LengthDelimited)?;
755    output_stream.write_raw_varint32(scratch_buf.len() as u32)?;
756    output_stream.write_raw_bytes(scratch_buf)?;
757    Ok(())
758}
759
760fn get_resource_tag_value<'a>(resource_tags: &'a TagSet, key: &str) -> Option<&'a str> {
761    resource_tags.get_single_tag(key).and_then(|t| t.value())
762}
763
764fn resolve_hostname<'a>(
765    resource_tags: &'a TagSet, source: Option<&'a OtlpSource>, default_hostname: Option<&'a str>,
766    ignore_missing_fields: bool,
767) -> Option<&'a str> {
768    let mut hostname = match source {
769        Some(src) => match src.kind {
770            OtlpSourceKind::HostnameKind => Some(src.identifier.as_str()),
771            _ => Some(""),
772        },
773        None => default_hostname,
774    };
775
776    if ignore_missing_fields {
777        hostname = Some("");
778    }
779
780    if let Some(value) = get_resource_tag_value(resource_tags, KEY_DATADOG_HOST) {
781        hostname = Some(value);
782    }
783
784    hostname
785}
786
787fn resolve_env(resource_tags: &TagSet, ignore_missing_fields: bool) -> Option<&str> {
788    if let Some(value) = get_resource_tag_value(resource_tags, KEY_DATADOG_ENVIRONMENT) {
789        return Some(value);
790    }
791    if ignore_missing_fields {
792        return None;
793    }
794    if let Some(value) = get_resource_tag_value(resource_tags, DEPLOYMENT_ENVIRONMENT_NAME) {
795        return Some(value);
796    }
797    get_resource_tag_value(resource_tags, DEPLOYMENT_ENVIRONMENT_KEY)
798}
799
800fn resolve_container_id<'a>(resource_tags: &'a TagSet, first_span: Option<&'a DdSpan>) -> Option<&'a str> {
801    for key in [KEY_DATADOG_CONTAINER_ID, CONTAINER_ID, K8S_POD_UID] {
802        if let Some(value) = get_resource_tag_value(resource_tags, key) {
803            return Some(value);
804        }
805    }
806    // TODO: add container id fallback equivalent to cidProvider
807    // https://github.com/DataDog/datadog-agent/blob/main/pkg/trace/api/otlp.go#L414
808    if let Some(span) = first_span {
809        for (k, v) in span.meta() {
810            if k == KEY_DATADOG_CONTAINER_ID || k == K8S_POD_UID {
811                return Some(v.as_ref());
812            }
813        }
814    }
815    None
816}
817
818fn resolve_app_version(resource_tags: &TagSet) -> Option<&str> {
819    if let Some(value) = get_resource_tag_value(resource_tags, KEY_DATADOG_VERSION) {
820        return Some(value);
821    }
822    get_resource_tag_value(resource_tags, SERVICE_VERSION)
823}
824
825fn resolve_container_tags(
826    resource_tags: &TagSet, source: Option<&OtlpSource>, ignore_missing_fields: bool,
827) -> Option<MetaString> {
828    // TODO: some refactoring is probably needed to normalize this function, the tags should already be normalized
829    // since we do so when we transform OTLP spans to DD spans however to make this class extensible for non otlp traces, we would
830    // need to normalize the tags here.
831    if let Some(tags) = get_resource_tag_value(resource_tags, KEY_DATADOG_CONTAINER_TAGS) {
832        if !tags.is_empty() {
833            return Some(MetaString::from(tags));
834        }
835    }
836
837    if ignore_missing_fields {
838        return None;
839    }
840    let mut container_tags = TagSet::default();
841    extract_container_tags_from_resource_tagset(resource_tags, &mut container_tags);
842    let is_fargate_source = source.is_some_and(|src| src.kind == OtlpSourceKind::AwsEcsFargateKind);
843    if container_tags.is_empty() && !is_fargate_source {
844        return None;
845    }
846
847    let mut flattened = flatten_container_tag(container_tags);
848    if is_fargate_source {
849        if let Some(src) = source {
850            append_tags(&mut flattened, &src.tag());
851        }
852    }
853
854    if flattened.is_empty() {
855        None
856    } else {
857        Some(MetaString::from(flattened))
858    }
859}
860
861fn flatten_container_tag(tags: TagSet) -> String {
862    let mut flattened = String::new();
863    for tag in tags {
864        if !flattened.is_empty() {
865            flattened.push(',');
866        }
867        flattened.push_str(tag.as_str());
868    }
869    flattened
870}
871
872fn append_tags(target: &mut String, tags: &str) {
873    if tags.is_empty() {
874        return;
875    }
876    if !target.is_empty() {
877        target.push(',');
878    }
879    target.push_str(tags);
880}