saluki_components/encoders/datadog/traces/
mod.rs

1#![allow(dead_code)]
2
3use std::time::Duration;
4
5use async_trait::async_trait;
6use datadog_protos::traces::{
7    attribute_any_value::AttributeAnyValueType, attribute_array_value::AttributeArrayValueType, AttributeAnyValue,
8    AttributeArray, AttributeArrayValue, Span as ProtoSpan, SpanEvent as ProtoSpanEvent, SpanLink as ProtoSpanLink,
9    TraceChunk,
10};
11use http::{uri::PathAndQuery, HeaderValue, Method, Uri};
12use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
13use opentelemetry_semantic_conventions::resource::{
14    CONTAINER_ID, DEPLOYMENT_ENVIRONMENT_NAME, K8S_POD_UID, SERVICE_VERSION,
15};
16use protobuf::{rt::WireType, CodedOutputStream, Message};
17use saluki_common::task::HandleExt as _;
18use saluki_config::GenericConfiguration;
19use saluki_context::tags::TagSet;
20use saluki_core::data_model::event::trace::{
21    AttributeScalarValue, AttributeValue, Span as DdSpan, SpanEvent as DdSpanEvent, SpanLink as DdSpanLink,
22};
23use saluki_core::topology::{EventsBuffer, PayloadsBuffer};
24use saluki_core::{
25    components::{encoders::*, ComponentContext},
26    data_model::{
27        event::{trace::Trace, EventType},
28        payload::{HttpPayload, Payload, PayloadMetadata, PayloadType},
29    },
30    observability::ComponentMetricsExt as _,
31};
32use saluki_env::host::providers::BoxedHostProvider;
33use saluki_env::{EnvironmentProvider, HostProvider};
34use saluki_error::generic_error;
35use saluki_error::{ErrorContext as _, GenericError};
36use saluki_io::compression::CompressionScheme;
37use saluki_metrics::MetricsBuilder;
38use serde::Deserialize;
39use stringtheory::MetaString;
40use tokio::{
41    select,
42    sync::mpsc::{self, Receiver, Sender},
43    time::sleep,
44};
45use tracing::{debug, error};
46
47use crate::common::datadog::{
48    apm::ApmConfig,
49    io::RB_BUFFER_CHUNK_SIZE,
50    request_builder::{EndpointEncoder, RequestBuilder},
51    telemetry::ComponentTelemetry,
52    DEFAULT_INTAKE_COMPRESSED_SIZE_LIMIT, DEFAULT_INTAKE_UNCOMPRESSED_SIZE_LIMIT,
53};
54use crate::common::otlp::config::TracesConfig;
55use crate::common::otlp::util::{
56    extract_container_tags_from_resource_tagset, tags_to_source, Source as OtlpSource, SourceKind as OtlpSourceKind,
57    DEPLOYMENT_ENVIRONMENT_KEY, KEY_DATADOG_CONTAINER_ID, KEY_DATADOG_CONTAINER_TAGS, KEY_DATADOG_ENVIRONMENT,
58    KEY_DATADOG_HOST, KEY_DATADOG_VERSION,
59};
60
61const CONTAINER_TAGS_META_KEY: &str = "_dd.tags.container";
62const MAX_TRACES_PER_PAYLOAD: usize = 10000;
63static CONTENT_TYPE_PROTOBUF: HeaderValue = HeaderValue::from_static("application/x-protobuf");
64
65fn default_serializer_compressor_kind() -> String {
66    "zstd".to_string()
67}
68
69const fn default_zstd_compressor_level() -> i32 {
70    3
71}
72
73const fn default_flush_timeout_secs() -> u64 {
74    2
75}
76
77// Field numbers from lib/protos/datadog/proto/datadog-agent/datadog/trace/tracer_payload.proto. This is is used to construct the format of the tracer payload and trace chunk.
78const TRACER_PAYLOAD_CONTAINER_ID_FIELD_NUMBER: u32 = 1;
79const TRACER_PAYLOAD_LANGUAGE_NAME_FIELD_NUMBER: u32 = 2;
80const TRACER_PAYLOAD_TRACER_VERSION_FIELD_NUMBER: u32 = 4;
81const TRACER_PAYLOAD_CHUNKS_FIELD_NUMBER: u32 = 6;
82const TRACER_PAYLOAD_TAGS_FIELD_NUMBER: u32 = 7;
83const TRACER_PAYLOAD_ENV_FIELD_NUMBER: u32 = 8;
84const TRACER_PAYLOAD_HOSTNAME_FIELD_NUMBER: u32 = 9;
85const TRACER_PAYLOAD_APP_VERSION_FIELD_NUMBER: u32 = 10;
86
87const AGENT_PAYLOAD_HOSTNAME_FIELD_NUMBER: u32 = 1;
88const AGENT_PAYLOAD_ENV_FIELD_NUMBER: u32 = 2;
89const AGENT_PAYLOAD_TRACER_PAYLOADS_FIELD_NUMBER: u32 = 5;
90const AGENT_PAYLOAD_AGENT_VERSION_FIELD_NUMBER: u32 = 7;
91const AGENT_PAYLOAD_TARGET_TPS_FIELD_NUMBER: u32 = 8;
92const AGENT_PAYLOAD_ERROR_TPS_FIELD_NUMBER: u32 = 9;
93
94fn default_env() -> String {
95    "none".to_string()
96}
97
98/// Configuration for the Datadog Traces encoder.
99///
100/// This encoder converts trace events into Datadog's TracerPayload protobuf format and sends them
101/// to the Datadog traces intake endpoint (`/api/v0.2/traces`). It handles batching, compression,
102/// and enrichment with metadata such as hostname, environment, and container tags.
103#[derive(Deserialize)]
104pub struct DatadogTraceConfiguration {
105    #[serde(
106        rename = "serializer_compressor_kind",  // renames the field in the user_configuration from "serializer_compressor_kind" to "compressor_kind".
107        default = "default_serializer_compressor_kind"
108    )]
109    compressor_kind: String,
110
111    #[serde(
112        rename = "serializer_zstd_compressor_level",
113        default = "default_zstd_compressor_level"
114    )]
115    zstd_compressor_level: i32,
116
117    /// Flush timeout for pending requests, in seconds.
118    ///
119    /// When the encoder has written traces to the in-flight request payload, but it has not yet reached the
120    /// payload size limits that would force the payload to be flushed, the encoder will wait for a period of time
121    /// before flushing the in-flight request payload.
122    ///
123    /// Defaults to 2 seconds.
124    #[serde(default = "default_flush_timeout_secs")]
125    flush_timeout_secs: u64,
126
127    #[serde(skip)]
128    default_hostname: Option<String>,
129
130    #[serde(skip)]
131    version: String,
132
133    #[serde(skip)]
134    apm_config: ApmConfig,
135
136    #[serde(skip)]
137    otlp_traces: TracesConfig,
138
139    #[serde(default = "default_env")]
140    env: String,
141}
142
143impl DatadogTraceConfiguration {
144    /// Creates a new `DatadogTraceConfiguration` from the given configuration.
145    pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
146        let mut trace_config: Self = config.as_typed()?;
147
148        let app_details = saluki_metadata::get_app_details();
149        trace_config.version = format!("agent-data-plane/{}", app_details.version().raw());
150
151        trace_config.apm_config = ApmConfig::from_configuration(config)?;
152        trace_config.otlp_traces = config.try_get_typed("otlp_config.traces")?.unwrap_or_default();
153
154        Ok(trace_config)
155    }
156}
157
158impl DatadogTraceConfiguration {
159    /// Sets the default_hostname using the environment provider
160    pub async fn with_environment_provider<E>(mut self, environment_provider: E) -> Result<Self, GenericError>
161    where
162        E: EnvironmentProvider<Host = BoxedHostProvider>,
163    {
164        let host_provider = environment_provider.host();
165        let hostname = host_provider.get_hostname().await?;
166        self.default_hostname = Some(hostname);
167        Ok(self)
168    }
169}
170
171#[async_trait]
172impl EncoderBuilder for DatadogTraceConfiguration {
173    fn input_event_type(&self) -> EventType {
174        EventType::Trace
175    }
176
177    fn output_payload_type(&self) -> PayloadType {
178        PayloadType::Http
179    }
180
181    async fn build(&self, context: ComponentContext) -> Result<Box<dyn Encoder + Send>, GenericError> {
182        let metrics_builder = MetricsBuilder::from_component_context(&context);
183        let telemetry = ComponentTelemetry::from_builder(&metrics_builder);
184        let compression_scheme = CompressionScheme::new(&self.compressor_kind, self.zstd_compressor_level);
185
186        let default_hostname = self.default_hostname.clone().unwrap_or_default();
187        let default_hostname = MetaString::from(default_hostname);
188
189        // Create request builder for traces which is used to generate HTTP requests.
190
191        let mut trace_rb = RequestBuilder::new(
192            TraceEndpointEncoder::new(
193                default_hostname,
194                self.version.clone(),
195                self.env.clone(),
196                self.apm_config.clone(),
197                self.otlp_traces.clone(),
198            ),
199            compression_scheme,
200            RB_BUFFER_CHUNK_SIZE,
201        )
202        .await?;
203        trace_rb.with_max_inputs_per_payload(MAX_TRACES_PER_PAYLOAD);
204
205        let flush_timeout = match self.flush_timeout_secs {
206            // We always give ourselves a minimum flush timeout of 10ms to allow for some very minimal amount of
207            // batching, while still practically flushing things almost immediately.
208            0 => Duration::from_millis(10),
209            secs => Duration::from_secs(secs),
210        };
211
212        Ok(Box::new(DatadogTrace {
213            trace_rb,
214            telemetry,
215            flush_timeout,
216        }))
217    }
218}
219
220impl MemoryBounds for DatadogTraceConfiguration {
221    fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
222        // TODO: How do we properly represent the requests we can generate that may be sitting around in-flight?
223        builder
224            .minimum()
225            .with_single_value::<DatadogTrace>("component struct")
226            .with_array::<EventsBuffer>("request builder events channel", 8)
227            .with_array::<PayloadsBuffer>("request builder payloads channel", 8);
228
229        builder
230            .firm()
231            .with_array::<Trace>("traces split re-encode buffer", MAX_TRACES_PER_PAYLOAD);
232    }
233}
234
235pub struct DatadogTrace {
236    trace_rb: RequestBuilder<TraceEndpointEncoder>,
237    telemetry: ComponentTelemetry,
238    flush_timeout: Duration,
239}
240
241// Encodes Trace events to TracerPayloads.
242#[async_trait]
243impl Encoder for DatadogTrace {
244    async fn run(mut self: Box<Self>, mut context: EncoderContext) -> Result<(), GenericError> {
245        let Self {
246            trace_rb,
247            telemetry,
248            flush_timeout,
249        } = *self;
250
251        let mut health = context.take_health_handle();
252
253        // The encoder runs two async loops, the main encoder loop and the request builder loop,
254        // this channel is used to send events from the main encoder loop to the request builder loop safely.
255        let (events_tx, events_rx) = mpsc::channel(8);
256        // adds a channel to send payloads to the dispatcher and a channel to receive them.
257        let (payloads_tx, mut payloads_rx) = mpsc::channel(8);
258        let request_builder_fut = run_request_builder(trace_rb, telemetry, events_rx, payloads_tx, flush_timeout);
259        // Spawn the request builder task on the global thread pool, this task is responsible for encoding traces and flushing requests.
260        let request_builder_handle = context
261            .topology_context()
262            .global_thread_pool() // Use the shared Tokio runtime thread pool.
263            .spawn_traced_named("dd-traces-request-builder", request_builder_fut);
264
265        health.mark_ready();
266        debug!("Datadog Trace encoder started.");
267
268        loop {
269            select! {
270                biased; // makes the branches of the select statement be evaluated in order.
271
272                _ = health.live() => continue,
273                maybe_payload = payloads_rx.recv() => match maybe_payload {
274                    Some(payload) => {
275                        // Dispatch an HTTP payload to the dispatcher.
276                        if let Err(e) = context.dispatcher().dispatch(payload).await {
277                            error!("Failed to dispatch payload: {}", e);
278                        }
279                    }
280                    None => break,
281                },
282                maybe_event_buffer = context.events().next() => match maybe_event_buffer {
283                    Some(event_buffer) => events_tx.send(event_buffer).await
284                        .error_context("Failed to send event buffer to request builder task.")?,
285                    None => break,
286                },
287            }
288        }
289
290        // Drop the events sender, which signals the request builder task to stop.
291        drop(events_tx);
292
293        // Continue draining the payloads receiver until it is closed.
294        while let Some(payload) = payloads_rx.recv().await {
295            if let Err(e) = context.dispatcher().dispatch(payload).await {
296                error!("Failed to dispatch payload: {}", e);
297            }
298        }
299
300        // Request build task should now be stopped.
301        match request_builder_handle.await {
302            Ok(Ok(())) => debug!("Request builder task stopped."),
303            Ok(Err(e)) => error!(error = %e, "Request builder task failed."),
304            Err(e) => error!(error = %e, "Request builder task panicked."),
305        }
306
307        debug!("Datadog Trace encoder stopped.");
308
309        Ok(())
310    }
311}
312
313async fn run_request_builder(
314    mut trace_request_builder: RequestBuilder<TraceEndpointEncoder>, telemetry: ComponentTelemetry,
315    mut events_rx: Receiver<EventsBuffer>, payloads_tx: Sender<PayloadsBuffer>, flush_timeout: std::time::Duration,
316) -> Result<(), GenericError> {
317    let mut pending_flush = false;
318    let pending_flush_timeout = sleep(flush_timeout);
319    tokio::pin!(pending_flush_timeout);
320
321    loop {
322        select! {
323            Some(event_buffer) = events_rx.recv() => {
324                for event in event_buffer {
325                    let trace = match event.try_into_trace() {
326                        Some(trace) => trace,
327                        None => continue,
328                    };
329                    // Encode the trace. If we get it back, that means the current request is full, and we need to
330                    // flush it before we can try to encode the trace again.
331                    let trace_to_retry = match trace_request_builder.encode(trace).await {
332                        Ok(None) => continue,
333                        Ok(Some(trace)) => trace,
334                        Err(e) => {
335                            error!(error = %e, "Failed to encode trace.");
336                            telemetry.events_dropped_encoder().increment(1);
337                            continue;
338                        }
339                    };
340
341                    let maybe_requests = trace_request_builder.flush().await;
342                    if maybe_requests.is_empty() {
343                        panic!("builder told us to flush, but gave us nothing");
344                    }
345
346                    for maybe_request in maybe_requests {
347                        match maybe_request {
348                            Ok((events, request)) => {
349                                let payload_meta = PayloadMetadata::from_event_count(events);
350                                let http_payload = HttpPayload::new(payload_meta, request);
351                                let payload = Payload::Http(http_payload);
352
353                                payloads_tx.send(payload).await
354                                    .map_err(|_| generic_error!("Failed to send payload to encoder."))?;
355                            },
356                            Err(e) => if e.is_recoverable() {
357                                // If the error is recoverable, we'll hold on to the trace to retry it later.
358                                continue;
359                            } else {
360                                return Err(GenericError::from(e).context("Failed to flush request."));
361                            }
362                        }
363                    }
364
365                    // Now try to encode the trace again.
366                    if let Err(e) = trace_request_builder.encode(trace_to_retry).await {
367                        error!(error = %e, "Failed to encode trace.");
368                        telemetry.events_dropped_encoder().increment(1);
369                    }
370                }
371
372                debug!("Processed event buffer.");
373
374                // If we're not already pending a flush, we'll start the countdown.
375                if !pending_flush {
376                    pending_flush_timeout.as_mut().reset(tokio::time::Instant::now() + flush_timeout);
377                    pending_flush = true;
378                }
379            },
380            _ = &mut pending_flush_timeout, if pending_flush => {
381                debug!("Flushing pending request(s).");
382
383                pending_flush = false;
384
385                // Once we've encoded and written all traces, we flush the request builders to generate a request with
386                // anything left over. Again, we'll enqueue those requests to be sent immediately.
387                let maybe_trace_requests = trace_request_builder.flush().await;
388                for maybe_request in maybe_trace_requests {
389                    match maybe_request {
390                        Ok((events, request)) => {
391                            let payload_meta = PayloadMetadata::from_event_count(events);
392                            let http_payload = HttpPayload::new(payload_meta, request);
393                            let payload = Payload::Http(http_payload);
394
395                            payloads_tx.send(payload).await
396                                .map_err(|_| generic_error!("Failed to send payload to encoder."))?;
397                        },
398                        Err(e) => if e.is_recoverable() {
399                            continue;
400                        } else {
401                            return Err(GenericError::from(e).context("Failed to flush request."));
402                        }
403                    }
404                }
405
406                debug!("All flushed requests sent to I/O task. Waiting for next event buffer...");
407            },
408
409            // Event buffers channel has been closed, and we have no pending flushing, so we're all done.
410            else => break,
411        }
412    }
413
414    Ok(())
415}
416
417#[derive(Debug)]
418struct TraceEndpointEncoder {
419    tracer_payload_scratch: Vec<u8>,
420    chunk_scratch: Vec<u8>,
421    tags_scratch: Vec<u8>,
422    // TODO: do we need additional tags or tag deplicator?
423    default_hostname: MetaString,
424    agent_hostname: String,
425    version: String,
426    env: String,
427    apm_config: ApmConfig,
428    otlp_traces: TracesConfig,
429}
430
431impl TraceEndpointEncoder {
432    fn new(
433        default_hostname: MetaString, version: String, env: String, apm_config: ApmConfig, otlp_traces: TracesConfig,
434    ) -> Self {
435        Self {
436            tracer_payload_scratch: Vec::new(),
437            chunk_scratch: Vec::new(),
438            tags_scratch: Vec::new(),
439            agent_hostname: default_hostname.as_ref().to_string(),
440            default_hostname,
441            version,
442            env,
443            apm_config,
444            otlp_traces,
445        }
446    }
447
448    fn encode_tracer_payload(&mut self, trace: &Trace, output_buffer: &mut Vec<u8>) -> Result<(), protobuf::Error> {
449        let resource_tags = trace.resource_tags();
450        let first_span = trace.spans().first();
451        let source = tags_to_source(resource_tags);
452
453        let trace_chunk = self.build_trace_chunk(trace);
454
455        // Build AgentPayload (outer message) writing to output_buffer
456        let mut agent_payload_stream = CodedOutputStream::vec(output_buffer);
457
458        // Write AgentPayload fields defined in agent_payload.proto
459        agent_payload_stream.write_string(AGENT_PAYLOAD_HOSTNAME_FIELD_NUMBER, &self.agent_hostname)?;
460        agent_payload_stream.write_string(AGENT_PAYLOAD_ENV_FIELD_NUMBER, &self.env)?;
461
462        agent_payload_stream.write_string(AGENT_PAYLOAD_AGENT_VERSION_FIELD_NUMBER, &self.version)?;
463        agent_payload_stream.write_double(
464            AGENT_PAYLOAD_TARGET_TPS_FIELD_NUMBER,
465            self.apm_config.target_traces_per_second(),
466        )?;
467        agent_payload_stream.write_double(
468            AGENT_PAYLOAD_ERROR_TPS_FIELD_NUMBER,
469            self.apm_config.errors_per_second(),
470        )?;
471
472        // Build TracerPayload (nested message) in scratch buffer
473        self.tracer_payload_scratch.clear();
474        let mut tracer_payload_stream = CodedOutputStream::vec(&mut self.tracer_payload_scratch);
475
476        // Write TracerPayload fields
477        if let Some(container_id) = resolve_container_id(resource_tags, first_span) {
478            tracer_payload_stream.write_string(TRACER_PAYLOAD_CONTAINER_ID_FIELD_NUMBER, container_id)?;
479        }
480
481        if let Some(lang) = get_resource_tag_value(resource_tags, "telemetry.sdk.language") {
482            tracer_payload_stream.write_string(TRACER_PAYLOAD_LANGUAGE_NAME_FIELD_NUMBER, lang)?;
483        }
484
485        // Write tracer_version for OTLP traces, even if telemetry.sdk.version is missing.
486        let sdk_version = get_resource_tag_value(resource_tags, "telemetry.sdk.version").unwrap_or("");
487        let tracer_version = format!("otlp-{}", sdk_version);
488        tracer_payload_stream.write_string(TRACER_PAYLOAD_TRACER_VERSION_FIELD_NUMBER, &tracer_version)?;
489
490        self.chunk_scratch.clear();
491        write_message_field(
492            &mut tracer_payload_stream,
493            TRACER_PAYLOAD_CHUNKS_FIELD_NUMBER,
494            &trace_chunk,
495            &mut self.chunk_scratch,
496        )?;
497
498        self.tags_scratch.clear();
499        if let Some(tags) = resolve_container_tags(
500            resource_tags,
501            source.as_ref(),
502            self.otlp_traces.ignore_missing_datadog_fields,
503        ) {
504            write_map_entry_string_string(
505                &mut tracer_payload_stream,
506                TRACER_PAYLOAD_TAGS_FIELD_NUMBER,
507                CONTAINER_TAGS_META_KEY,
508                tags.as_ref(),
509                &mut self.tags_scratch,
510            )?;
511        }
512
513        if let Some(env) = resolve_env(resource_tags, self.otlp_traces.ignore_missing_datadog_fields) {
514            tracer_payload_stream.write_string(TRACER_PAYLOAD_ENV_FIELD_NUMBER, env)?;
515        }
516
517        if let Some(hostname) = resolve_hostname(
518            resource_tags,
519            source.as_ref(),
520            Some(self.default_hostname.as_ref()),
521            self.otlp_traces.ignore_missing_datadog_fields,
522        ) {
523            tracer_payload_stream.write_string(TRACER_PAYLOAD_HOSTNAME_FIELD_NUMBER, hostname)?;
524        }
525
526        if let Some(app_version) = resolve_app_version(resource_tags) {
527            tracer_payload_stream.write_string(TRACER_PAYLOAD_APP_VERSION_FIELD_NUMBER, app_version)?;
528        }
529
530        tracer_payload_stream.flush()?;
531        // Drop tracer_payload_stream to release the mutable borrow of tracer_payload_scratch
532        drop(tracer_payload_stream);
533
534        // Write TracerPayload as a nested message in AgentPayload (repeated field)
535        agent_payload_stream.write_bytes(AGENT_PAYLOAD_TRACER_PAYLOADS_FIELD_NUMBER, &self.tracer_payload_scratch)?;
536        agent_payload_stream.flush()?;
537
538        Ok(())
539    }
540
541    fn sampling_rate(&self) -> f64 {
542        let rate = self.otlp_traces.probabilistic_sampler.sampling_percentage / 100.0;
543        if rate <= 0.0 || rate >= 1.0 {
544            return 1.0;
545        }
546        rate
547    }
548
549    fn build_trace_chunk(&self, trace: &Trace) -> TraceChunk {
550        let mut spans: Vec<ProtoSpan> = trace.spans().iter().map(convert_span).collect();
551        let mut chunk = TraceChunk::new();
552
553        let rate = self.sampling_rate();
554        let mut tags = std::collections::HashMap::new();
555        tags.insert("_dd.otlp_sr".to_string(), format!("{:.2}", rate));
556
557        // TODO: Remove this once we have sampling. We have to hardcode the priority to 1 for now so that intake does not drop the trace.
558        const PRIORITY_AUTO_KEEP: i32 = 1;
559        chunk.set_priority(PRIORITY_AUTO_KEEP);
560
561        // Set _dd.p.dm (decision maker)
562        // Only set if sampling priority is "keep" (which it is, since we set PRIORITY_AUTO_KEEP)
563        // Decision maker "-9" indicates probabilistic sampler made the decision
564        const DECISION_MAKER: &str = "-9";
565        if let Some(first_span) = spans.first_mut() {
566            let mut meta = first_span.take_meta();
567            meta.insert("_dd.p.dm".to_string(), DECISION_MAKER.to_string());
568            first_span.set_meta(meta);
569        }
570
571        tags.insert("_dd.p.dm".to_string(), DECISION_MAKER.to_string());
572        chunk.set_tags(tags);
573
574        chunk.set_spans(spans);
575
576        chunk
577    }
578}
579
580impl EndpointEncoder for TraceEndpointEncoder {
581    type Input = Trace;
582    type EncodeError = protobuf::Error;
583    fn encoder_name() -> &'static str {
584        "traces"
585    }
586
587    fn compressed_size_limit(&self) -> usize {
588        DEFAULT_INTAKE_COMPRESSED_SIZE_LIMIT
589    }
590
591    fn uncompressed_size_limit(&self) -> usize {
592        DEFAULT_INTAKE_UNCOMPRESSED_SIZE_LIMIT
593    }
594
595    fn encode(&mut self, trace: &Self::Input, buffer: &mut Vec<u8>) -> Result<(), Self::EncodeError> {
596        self.encode_tracer_payload(trace, buffer)
597    }
598
599    fn endpoint_uri(&self) -> Uri {
600        PathAndQuery::from_static("/api/v0.2/traces").into()
601    }
602
603    fn endpoint_method(&self) -> Method {
604        Method::POST
605    }
606
607    fn content_type(&self) -> HeaderValue {
608        CONTENT_TYPE_PROTOBUF.clone()
609    }
610}
611
612fn convert_span(span: &DdSpan) -> ProtoSpan {
613    let mut proto = ProtoSpan::new();
614    proto.set_service(span.service().to_string());
615    proto.set_name(span.name().to_string());
616    proto.set_resource(span.resource().to_string());
617    proto.set_traceID(span.trace_id());
618    proto.set_spanID(span.span_id());
619    proto.set_parentID(span.parent_id());
620    proto.set_start(span.start() as i64);
621    proto.set_duration(span.duration() as i64);
622    proto.set_error(span.error());
623    proto.set_type(span.span_type().to_string());
624
625    proto.set_meta(
626        span.meta()
627            .iter()
628            .map(|(k, v)| (k.to_string(), v.to_string()))
629            .collect(),
630    );
631    proto.set_metrics(span.metrics().iter().map(|(k, v)| (k.to_string(), *v)).collect());
632    proto.set_meta_struct(
633        span.meta_struct()
634            .iter()
635            .map(|(k, v)| (k.to_string(), v.clone()))
636            .collect(),
637    );
638    proto.set_spanLinks(span.span_links().iter().map(convert_span_link).collect());
639    proto.set_spanEvents(span.span_events().iter().map(convert_span_event).collect());
640    proto
641}
642
643fn convert_span_link(link: &DdSpanLink) -> ProtoSpanLink {
644    let mut proto = ProtoSpanLink::new();
645    proto.set_traceID(link.trace_id());
646    proto.set_traceID_high(link.trace_id_high());
647    proto.set_spanID(link.span_id());
648    proto.set_attributes(
649        link.attributes()
650            .iter()
651            .map(|(k, v)| (k.to_string(), v.to_string()))
652            .collect(),
653    );
654    proto.set_tracestate(link.tracestate().to_string());
655    proto.set_flags(link.flags());
656    proto
657}
658
659fn convert_span_event(event: &DdSpanEvent) -> ProtoSpanEvent {
660    let mut proto = ProtoSpanEvent::new();
661    proto.set_time_unix_nano(event.time_unix_nano());
662    proto.set_name(event.name().to_string());
663    proto.set_attributes(
664        event
665            .attributes()
666            .iter()
667            .map(|(k, v)| (k.to_string(), convert_attribute_value(v)))
668            .collect(),
669    );
670    proto
671}
672
673fn convert_attribute_value(value: &AttributeValue) -> AttributeAnyValue {
674    let mut proto = AttributeAnyValue::new();
675    match value {
676        AttributeValue::String(v) => {
677            proto.set_type(AttributeAnyValueType::STRING_VALUE);
678            proto.set_string_value(v.to_string());
679        }
680        AttributeValue::Bool(v) => {
681            proto.set_type(AttributeAnyValueType::BOOL_VALUE);
682            proto.set_bool_value(*v);
683        }
684        AttributeValue::Int(v) => {
685            proto.set_type(AttributeAnyValueType::INT_VALUE);
686            proto.set_int_value(*v);
687        }
688        AttributeValue::Double(v) => {
689            proto.set_type(AttributeAnyValueType::DOUBLE_VALUE);
690            proto.set_double_value(*v);
691        }
692        AttributeValue::Array(values) => {
693            proto.set_type(AttributeAnyValueType::ARRAY_VALUE);
694            let mut array = AttributeArray::new();
695            array.set_values(values.iter().map(convert_attribute_array_value).collect());
696            proto.set_array_value(array);
697        }
698    }
699    proto
700}
701
702fn convert_attribute_array_value(value: &AttributeScalarValue) -> AttributeArrayValue {
703    let mut proto = AttributeArrayValue::new();
704    match value {
705        AttributeScalarValue::String(v) => {
706            proto.set_type(AttributeArrayValueType::STRING_VALUE);
707            proto.set_string_value(v.to_string());
708        }
709        AttributeScalarValue::Bool(v) => {
710            proto.set_type(AttributeArrayValueType::BOOL_VALUE);
711            proto.set_bool_value(*v);
712        }
713        AttributeScalarValue::Int(v) => {
714            proto.set_type(AttributeArrayValueType::INT_VALUE);
715            proto.set_int_value(*v);
716        }
717        AttributeScalarValue::Double(v) => {
718            proto.set_type(AttributeArrayValueType::DOUBLE_VALUE);
719            proto.set_double_value(*v);
720        }
721    }
722    proto
723}
724
725fn write_message_field<M: Message>(
726    output_stream: &mut CodedOutputStream<'_>, field_number: u32, message: &M, scratch_buf: &mut Vec<u8>,
727) -> Result<(), protobuf::Error> {
728    scratch_buf.clear();
729    {
730        // In protobuf, length-delimited is one of the wire types, it encodes data as [tag][length][value].
731        // We use a nested output stream to write the message to because output_stream requires the size of the message to be known before writing
732        // and the Message type size is not known as it depends on the actual data values and lengths and we
733        // don't know this until runtime.
734        let mut nested = CodedOutputStream::vec(scratch_buf);
735        message.write_to(&mut nested)?;
736        nested.flush()?;
737    }
738    output_stream.write_tag(field_number, WireType::LengthDelimited)?;
739    output_stream.write_raw_varint32(scratch_buf.len() as u32)?;
740    output_stream.write_raw_bytes(scratch_buf)?;
741    Ok(())
742}
743
744fn write_map_entry_string_string(
745    output_stream: &mut CodedOutputStream<'_>, field_number: u32, key: &str, value: &str, scratch_buf: &mut Vec<u8>,
746) -> Result<(), protobuf::Error> {
747    scratch_buf.clear();
748    {
749        let mut nested = CodedOutputStream::vec(scratch_buf);
750        // the field number 1 and 2 correspond to key and value
751        nested.write_string(1, key)?;
752        nested.write_string(2, value)?;
753        nested.flush()?;
754    }
755    output_stream.write_tag(field_number, WireType::LengthDelimited)?;
756    output_stream.write_raw_varint32(scratch_buf.len() as u32)?;
757    output_stream.write_raw_bytes(scratch_buf)?;
758    Ok(())
759}
760
761fn get_resource_tag_value<'a>(resource_tags: &'a TagSet, key: &str) -> Option<&'a str> {
762    resource_tags.get_single_tag(key).and_then(|t| t.value())
763}
764
765fn resolve_hostname<'a>(
766    resource_tags: &'a TagSet, source: Option<&'a OtlpSource>, default_hostname: Option<&'a str>,
767    ignore_missing_fields: bool,
768) -> Option<&'a str> {
769    let mut hostname = match source {
770        Some(src) => match src.kind {
771            OtlpSourceKind::HostnameKind => Some(src.identifier.as_str()),
772            _ => Some(""),
773        },
774        None => default_hostname,
775    };
776
777    if ignore_missing_fields {
778        hostname = Some("");
779    }
780
781    if let Some(value) = get_resource_tag_value(resource_tags, KEY_DATADOG_HOST) {
782        hostname = Some(value);
783    }
784
785    hostname
786}
787
788fn resolve_env(resource_tags: &TagSet, ignore_missing_fields: bool) -> Option<&str> {
789    if let Some(value) = get_resource_tag_value(resource_tags, KEY_DATADOG_ENVIRONMENT) {
790        return Some(value);
791    }
792    if ignore_missing_fields {
793        return None;
794    }
795    if let Some(value) = get_resource_tag_value(resource_tags, DEPLOYMENT_ENVIRONMENT_NAME) {
796        return Some(value);
797    }
798    get_resource_tag_value(resource_tags, DEPLOYMENT_ENVIRONMENT_KEY)
799}
800
801fn resolve_container_id<'a>(resource_tags: &'a TagSet, first_span: Option<&'a DdSpan>) -> Option<&'a str> {
802    for key in [KEY_DATADOG_CONTAINER_ID, CONTAINER_ID, K8S_POD_UID] {
803        if let Some(value) = get_resource_tag_value(resource_tags, key) {
804            return Some(value);
805        }
806    }
807    // TODO: add container id fallback equivalent to cidProvider
808    // https://github.com/DataDog/datadog-agent/blob/main/pkg/trace/api/otlp.go#L414
809    if let Some(span) = first_span {
810        for (k, v) in span.meta() {
811            if k == KEY_DATADOG_CONTAINER_ID || k == K8S_POD_UID {
812                return Some(v.as_ref());
813            }
814        }
815    }
816    None
817}
818
819fn resolve_app_version(resource_tags: &TagSet) -> Option<&str> {
820    if let Some(value) = get_resource_tag_value(resource_tags, KEY_DATADOG_VERSION) {
821        return Some(value);
822    }
823    get_resource_tag_value(resource_tags, SERVICE_VERSION)
824}
825
826fn resolve_container_tags(
827    resource_tags: &TagSet, source: Option<&OtlpSource>, ignore_missing_fields: bool,
828) -> Option<MetaString> {
829    // TODO: some refactoring is probably needed to normalize this function, the tags should already be normalized
830    // since we do so when we transform OTLP spans to DD spans however to make this class extensible for non otlp traces, we would
831    // need to normalize the tags here.
832    if let Some(tags) = get_resource_tag_value(resource_tags, KEY_DATADOG_CONTAINER_TAGS) {
833        if !tags.is_empty() {
834            return Some(MetaString::from(tags));
835        }
836    }
837
838    if ignore_missing_fields {
839        return None;
840    }
841    let mut container_tags = TagSet::default();
842    extract_container_tags_from_resource_tagset(resource_tags, &mut container_tags);
843    let is_fargate_source = source.is_some_and(|src| src.kind == OtlpSourceKind::AwsEcsFargateKind);
844    if container_tags.is_empty() && !is_fargate_source {
845        return None;
846    }
847
848    let mut flattened = flatten_container_tag(container_tags);
849    if is_fargate_source {
850        if let Some(src) = source {
851            append_tags(&mut flattened, &src.tag());
852        }
853    }
854
855    if flattened.is_empty() {
856        None
857    } else {
858        Some(MetaString::from(flattened))
859    }
860}
861
862fn flatten_container_tag(tags: TagSet) -> String {
863    let mut flattened = String::new();
864    for tag in tags {
865        if !flattened.is_empty() {
866            flattened.push(',');
867        }
868        flattened.push_str(tag.as_str());
869    }
870    flattened
871}
872
873fn append_tags(target: &mut String, tags: &str) {
874    if tags.is_empty() {
875        return;
876    }
877    if !target.is_empty() {
878        target.push(',');
879    }
880    target.push_str(tags);
881}