1#![allow(dead_code)]
2
3use std::time::Duration;
4
5use async_trait::async_trait;
6use datadog_protos::traces::{
7 attribute_any_value::AttributeAnyValueType, attribute_array_value::AttributeArrayValueType, AttributeAnyValue,
8 AttributeArray, AttributeArrayValue, Span as ProtoSpan, SpanEvent as ProtoSpanEvent, SpanLink as ProtoSpanLink,
9 TraceChunk,
10};
11use http::{uri::PathAndQuery, HeaderValue, Method, Uri};
12use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
13use opentelemetry_semantic_conventions::resource::{
14 CONTAINER_ID, DEPLOYMENT_ENVIRONMENT_NAME, K8S_POD_UID, SERVICE_VERSION,
15};
16use protobuf::{rt::WireType, CodedOutputStream, Message};
17use saluki_common::task::HandleExt as _;
18use saluki_config::GenericConfiguration;
19use saluki_context::tags::TagSet;
20use saluki_core::data_model::event::trace::{
21 AttributeScalarValue, AttributeValue, Span as DdSpan, SpanEvent as DdSpanEvent, SpanLink as DdSpanLink,
22};
23use saluki_core::topology::{EventsBuffer, PayloadsBuffer};
24use saluki_core::{
25 components::{encoders::*, ComponentContext},
26 data_model::{
27 event::{trace::Trace, EventType},
28 payload::{HttpPayload, Payload, PayloadMetadata, PayloadType},
29 },
30 observability::ComponentMetricsExt as _,
31};
32use saluki_env::host::providers::BoxedHostProvider;
33use saluki_env::{EnvironmentProvider, HostProvider};
34use saluki_error::generic_error;
35use saluki_error::{ErrorContext as _, GenericError};
36use saluki_io::compression::CompressionScheme;
37use saluki_metrics::MetricsBuilder;
38use serde::Deserialize;
39use stringtheory::MetaString;
40use tokio::{
41 select,
42 sync::mpsc::{self, Receiver, Sender},
43 time::sleep,
44};
45use tracing::{debug, error};
46
47use crate::common::datadog::{
48 apm::ApmConfig,
49 io::RB_BUFFER_CHUNK_SIZE,
50 request_builder::{EndpointEncoder, RequestBuilder},
51 telemetry::ComponentTelemetry,
52 DEFAULT_INTAKE_COMPRESSED_SIZE_LIMIT, DEFAULT_INTAKE_UNCOMPRESSED_SIZE_LIMIT,
53};
54use crate::common::otlp::config::TracesConfig;
55use crate::common::otlp::util::{
56 extract_container_tags_from_resource_tagset, tags_to_source, Source as OtlpSource, SourceKind as OtlpSourceKind,
57 DEPLOYMENT_ENVIRONMENT_KEY, KEY_DATADOG_CONTAINER_ID, KEY_DATADOG_CONTAINER_TAGS, KEY_DATADOG_ENVIRONMENT,
58 KEY_DATADOG_HOST, KEY_DATADOG_VERSION,
59};
60
61const CONTAINER_TAGS_META_KEY: &str = "_dd.tags.container";
62const MAX_TRACES_PER_PAYLOAD: usize = 10000;
63static CONTENT_TYPE_PROTOBUF: HeaderValue = HeaderValue::from_static("application/x-protobuf");
64
65fn default_serializer_compressor_kind() -> String {
66 "zstd".to_string()
67}
68
69const fn default_zstd_compressor_level() -> i32 {
70 3
71}
72
73const fn default_flush_timeout_secs() -> u64 {
74 2
75}
76
77const TRACER_PAYLOAD_CONTAINER_ID_FIELD_NUMBER: u32 = 1;
79const TRACER_PAYLOAD_LANGUAGE_NAME_FIELD_NUMBER: u32 = 2;
80const TRACER_PAYLOAD_TRACER_VERSION_FIELD_NUMBER: u32 = 4;
81const TRACER_PAYLOAD_CHUNKS_FIELD_NUMBER: u32 = 6;
82const TRACER_PAYLOAD_TAGS_FIELD_NUMBER: u32 = 7;
83const TRACER_PAYLOAD_ENV_FIELD_NUMBER: u32 = 8;
84const TRACER_PAYLOAD_HOSTNAME_FIELD_NUMBER: u32 = 9;
85const TRACER_PAYLOAD_APP_VERSION_FIELD_NUMBER: u32 = 10;
86
87const AGENT_PAYLOAD_HOSTNAME_FIELD_NUMBER: u32 = 1;
88const AGENT_PAYLOAD_ENV_FIELD_NUMBER: u32 = 2;
89const AGENT_PAYLOAD_TRACER_PAYLOADS_FIELD_NUMBER: u32 = 5;
90const AGENT_PAYLOAD_AGENT_VERSION_FIELD_NUMBER: u32 = 7;
91const AGENT_PAYLOAD_TARGET_TPS_FIELD_NUMBER: u32 = 8;
92const AGENT_PAYLOAD_ERROR_TPS_FIELD_NUMBER: u32 = 9;
93
94fn default_env() -> String {
95 "none".to_string()
96}
97
98#[derive(Deserialize)]
104pub struct DatadogTraceConfiguration {
105 #[serde(
106 rename = "serializer_compressor_kind", default = "default_serializer_compressor_kind"
108 )]
109 compressor_kind: String,
110
111 #[serde(
112 rename = "serializer_zstd_compressor_level",
113 default = "default_zstd_compressor_level"
114 )]
115 zstd_compressor_level: i32,
116
117 #[serde(default = "default_flush_timeout_secs")]
125 flush_timeout_secs: u64,
126
127 #[serde(skip)]
128 default_hostname: Option<String>,
129
130 #[serde(skip)]
131 version: String,
132
133 #[serde(skip)]
134 apm_config: ApmConfig,
135
136 #[serde(skip)]
137 otlp_traces: TracesConfig,
138
139 #[serde(default = "default_env")]
140 env: String,
141}
142
143impl DatadogTraceConfiguration {
144 pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
146 let mut trace_config: Self = config.as_typed()?;
147
148 let app_details = saluki_metadata::get_app_details();
149 trace_config.version = format!("agent-data-plane/{}", app_details.version().raw());
150
151 trace_config.apm_config = ApmConfig::from_configuration(config)?;
152 trace_config.otlp_traces = config.try_get_typed("otlp_config.traces")?.unwrap_or_default();
153
154 Ok(trace_config)
155 }
156}
157
158impl DatadogTraceConfiguration {
159 pub async fn with_environment_provider<E>(mut self, environment_provider: E) -> Result<Self, GenericError>
161 where
162 E: EnvironmentProvider<Host = BoxedHostProvider>,
163 {
164 let host_provider = environment_provider.host();
165 let hostname = host_provider.get_hostname().await?;
166 self.default_hostname = Some(hostname);
167 Ok(self)
168 }
169}
170
171#[async_trait]
172impl EncoderBuilder for DatadogTraceConfiguration {
173 fn input_event_type(&self) -> EventType {
174 EventType::Trace
175 }
176
177 fn output_payload_type(&self) -> PayloadType {
178 PayloadType::Http
179 }
180
181 async fn build(&self, context: ComponentContext) -> Result<Box<dyn Encoder + Send>, GenericError> {
182 let metrics_builder = MetricsBuilder::from_component_context(&context);
183 let telemetry = ComponentTelemetry::from_builder(&metrics_builder);
184 let compression_scheme = CompressionScheme::new(&self.compressor_kind, self.zstd_compressor_level);
185
186 let default_hostname = self.default_hostname.clone().unwrap_or_default();
187 let default_hostname = MetaString::from(default_hostname);
188
189 let mut trace_rb = RequestBuilder::new(
192 TraceEndpointEncoder::new(
193 default_hostname,
194 self.version.clone(),
195 self.env.clone(),
196 self.apm_config.clone(),
197 self.otlp_traces.clone(),
198 ),
199 compression_scheme,
200 RB_BUFFER_CHUNK_SIZE,
201 )
202 .await?;
203 trace_rb.with_max_inputs_per_payload(MAX_TRACES_PER_PAYLOAD);
204
205 let flush_timeout = match self.flush_timeout_secs {
206 0 => Duration::from_millis(10),
209 secs => Duration::from_secs(secs),
210 };
211
212 Ok(Box::new(DatadogTrace {
213 trace_rb,
214 telemetry,
215 flush_timeout,
216 }))
217 }
218}
219
220impl MemoryBounds for DatadogTraceConfiguration {
221 fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
222 builder
224 .minimum()
225 .with_single_value::<DatadogTrace>("component struct")
226 .with_array::<EventsBuffer>("request builder events channel", 8)
227 .with_array::<PayloadsBuffer>("request builder payloads channel", 8);
228
229 builder
230 .firm()
231 .with_array::<Trace>("traces split re-encode buffer", MAX_TRACES_PER_PAYLOAD);
232 }
233}
234
235pub struct DatadogTrace {
236 trace_rb: RequestBuilder<TraceEndpointEncoder>,
237 telemetry: ComponentTelemetry,
238 flush_timeout: Duration,
239}
240
241#[async_trait]
243impl Encoder for DatadogTrace {
244 async fn run(mut self: Box<Self>, mut context: EncoderContext) -> Result<(), GenericError> {
245 let Self {
246 trace_rb,
247 telemetry,
248 flush_timeout,
249 } = *self;
250
251 let mut health = context.take_health_handle();
252
253 let (events_tx, events_rx) = mpsc::channel(8);
256 let (payloads_tx, mut payloads_rx) = mpsc::channel(8);
258 let request_builder_fut = run_request_builder(trace_rb, telemetry, events_rx, payloads_tx, flush_timeout);
259 let request_builder_handle = context
261 .topology_context()
262 .global_thread_pool() .spawn_traced_named("dd-traces-request-builder", request_builder_fut);
264
265 health.mark_ready();
266 debug!("Datadog Trace encoder started.");
267
268 loop {
269 select! {
270 biased; _ = health.live() => continue,
273 maybe_payload = payloads_rx.recv() => match maybe_payload {
274 Some(payload) => {
275 if let Err(e) = context.dispatcher().dispatch(payload).await {
277 error!("Failed to dispatch payload: {}", e);
278 }
279 }
280 None => break,
281 },
282 maybe_event_buffer = context.events().next() => match maybe_event_buffer {
283 Some(event_buffer) => events_tx.send(event_buffer).await
284 .error_context("Failed to send event buffer to request builder task.")?,
285 None => break,
286 },
287 }
288 }
289
290 drop(events_tx);
292
293 while let Some(payload) = payloads_rx.recv().await {
295 if let Err(e) = context.dispatcher().dispatch(payload).await {
296 error!("Failed to dispatch payload: {}", e);
297 }
298 }
299
300 match request_builder_handle.await {
302 Ok(Ok(())) => debug!("Request builder task stopped."),
303 Ok(Err(e)) => error!(error = %e, "Request builder task failed."),
304 Err(e) => error!(error = %e, "Request builder task panicked."),
305 }
306
307 debug!("Datadog Trace encoder stopped.");
308
309 Ok(())
310 }
311}
312
313async fn run_request_builder(
314 mut trace_request_builder: RequestBuilder<TraceEndpointEncoder>, telemetry: ComponentTelemetry,
315 mut events_rx: Receiver<EventsBuffer>, payloads_tx: Sender<PayloadsBuffer>, flush_timeout: std::time::Duration,
316) -> Result<(), GenericError> {
317 let mut pending_flush = false;
318 let pending_flush_timeout = sleep(flush_timeout);
319 tokio::pin!(pending_flush_timeout);
320
321 loop {
322 select! {
323 Some(event_buffer) = events_rx.recv() => {
324 for event in event_buffer {
325 let trace = match event.try_into_trace() {
326 Some(trace) => trace,
327 None => continue,
328 };
329 let trace_to_retry = match trace_request_builder.encode(trace).await {
332 Ok(None) => continue,
333 Ok(Some(trace)) => trace,
334 Err(e) => {
335 error!(error = %e, "Failed to encode trace.");
336 telemetry.events_dropped_encoder().increment(1);
337 continue;
338 }
339 };
340
341 let maybe_requests = trace_request_builder.flush().await;
342 if maybe_requests.is_empty() {
343 panic!("builder told us to flush, but gave us nothing");
344 }
345
346 for maybe_request in maybe_requests {
347 match maybe_request {
348 Ok((events, request)) => {
349 let payload_meta = PayloadMetadata::from_event_count(events);
350 let http_payload = HttpPayload::new(payload_meta, request);
351 let payload = Payload::Http(http_payload);
352
353 payloads_tx.send(payload).await
354 .map_err(|_| generic_error!("Failed to send payload to encoder."))?;
355 },
356 Err(e) => if e.is_recoverable() {
357 continue;
359 } else {
360 return Err(GenericError::from(e).context("Failed to flush request."));
361 }
362 }
363 }
364
365 if let Err(e) = trace_request_builder.encode(trace_to_retry).await {
367 error!(error = %e, "Failed to encode trace.");
368 telemetry.events_dropped_encoder().increment(1);
369 }
370 }
371
372 debug!("Processed event buffer.");
373
374 if !pending_flush {
376 pending_flush_timeout.as_mut().reset(tokio::time::Instant::now() + flush_timeout);
377 pending_flush = true;
378 }
379 },
380 _ = &mut pending_flush_timeout, if pending_flush => {
381 debug!("Flushing pending request(s).");
382
383 pending_flush = false;
384
385 let maybe_trace_requests = trace_request_builder.flush().await;
388 for maybe_request in maybe_trace_requests {
389 match maybe_request {
390 Ok((events, request)) => {
391 let payload_meta = PayloadMetadata::from_event_count(events);
392 let http_payload = HttpPayload::new(payload_meta, request);
393 let payload = Payload::Http(http_payload);
394
395 payloads_tx.send(payload).await
396 .map_err(|_| generic_error!("Failed to send payload to encoder."))?;
397 },
398 Err(e) => if e.is_recoverable() {
399 continue;
400 } else {
401 return Err(GenericError::from(e).context("Failed to flush request."));
402 }
403 }
404 }
405
406 debug!("All flushed requests sent to I/O task. Waiting for next event buffer...");
407 },
408
409 else => break,
411 }
412 }
413
414 Ok(())
415}
416
417#[derive(Debug)]
418struct TraceEndpointEncoder {
419 tracer_payload_scratch: Vec<u8>,
420 chunk_scratch: Vec<u8>,
421 tags_scratch: Vec<u8>,
422 default_hostname: MetaString,
424 agent_hostname: String,
425 version: String,
426 env: String,
427 apm_config: ApmConfig,
428 otlp_traces: TracesConfig,
429}
430
431impl TraceEndpointEncoder {
432 fn new(
433 default_hostname: MetaString, version: String, env: String, apm_config: ApmConfig, otlp_traces: TracesConfig,
434 ) -> Self {
435 Self {
436 tracer_payload_scratch: Vec::new(),
437 chunk_scratch: Vec::new(),
438 tags_scratch: Vec::new(),
439 agent_hostname: default_hostname.as_ref().to_string(),
440 default_hostname,
441 version,
442 env,
443 apm_config,
444 otlp_traces,
445 }
446 }
447
448 fn encode_tracer_payload(&mut self, trace: &Trace, output_buffer: &mut Vec<u8>) -> Result<(), protobuf::Error> {
449 let resource_tags = trace.resource_tags();
450 let first_span = trace.spans().first();
451 let source = tags_to_source(resource_tags);
452
453 let trace_chunk = self.build_trace_chunk(trace);
454
455 let mut agent_payload_stream = CodedOutputStream::vec(output_buffer);
457
458 agent_payload_stream.write_string(AGENT_PAYLOAD_HOSTNAME_FIELD_NUMBER, &self.agent_hostname)?;
460 agent_payload_stream.write_string(AGENT_PAYLOAD_ENV_FIELD_NUMBER, &self.env)?;
461
462 agent_payload_stream.write_string(AGENT_PAYLOAD_AGENT_VERSION_FIELD_NUMBER, &self.version)?;
463 agent_payload_stream.write_double(
464 AGENT_PAYLOAD_TARGET_TPS_FIELD_NUMBER,
465 self.apm_config.target_traces_per_second(),
466 )?;
467 agent_payload_stream.write_double(
468 AGENT_PAYLOAD_ERROR_TPS_FIELD_NUMBER,
469 self.apm_config.errors_per_second(),
470 )?;
471
472 self.tracer_payload_scratch.clear();
474 let mut tracer_payload_stream = CodedOutputStream::vec(&mut self.tracer_payload_scratch);
475
476 if let Some(container_id) = resolve_container_id(resource_tags, first_span) {
478 tracer_payload_stream.write_string(TRACER_PAYLOAD_CONTAINER_ID_FIELD_NUMBER, container_id)?;
479 }
480
481 if let Some(lang) = get_resource_tag_value(resource_tags, "telemetry.sdk.language") {
482 tracer_payload_stream.write_string(TRACER_PAYLOAD_LANGUAGE_NAME_FIELD_NUMBER, lang)?;
483 }
484
485 let sdk_version = get_resource_tag_value(resource_tags, "telemetry.sdk.version").unwrap_or("");
487 let tracer_version = format!("otlp-{}", sdk_version);
488 tracer_payload_stream.write_string(TRACER_PAYLOAD_TRACER_VERSION_FIELD_NUMBER, &tracer_version)?;
489
490 self.chunk_scratch.clear();
491 write_message_field(
492 &mut tracer_payload_stream,
493 TRACER_PAYLOAD_CHUNKS_FIELD_NUMBER,
494 &trace_chunk,
495 &mut self.chunk_scratch,
496 )?;
497
498 self.tags_scratch.clear();
499 if let Some(tags) = resolve_container_tags(
500 resource_tags,
501 source.as_ref(),
502 self.otlp_traces.ignore_missing_datadog_fields,
503 ) {
504 write_map_entry_string_string(
505 &mut tracer_payload_stream,
506 TRACER_PAYLOAD_TAGS_FIELD_NUMBER,
507 CONTAINER_TAGS_META_KEY,
508 tags.as_ref(),
509 &mut self.tags_scratch,
510 )?;
511 }
512
513 if let Some(env) = resolve_env(resource_tags, self.otlp_traces.ignore_missing_datadog_fields) {
514 tracer_payload_stream.write_string(TRACER_PAYLOAD_ENV_FIELD_NUMBER, env)?;
515 }
516
517 if let Some(hostname) = resolve_hostname(
518 resource_tags,
519 source.as_ref(),
520 Some(self.default_hostname.as_ref()),
521 self.otlp_traces.ignore_missing_datadog_fields,
522 ) {
523 tracer_payload_stream.write_string(TRACER_PAYLOAD_HOSTNAME_FIELD_NUMBER, hostname)?;
524 }
525
526 if let Some(app_version) = resolve_app_version(resource_tags) {
527 tracer_payload_stream.write_string(TRACER_PAYLOAD_APP_VERSION_FIELD_NUMBER, app_version)?;
528 }
529
530 tracer_payload_stream.flush()?;
531 drop(tracer_payload_stream);
533
534 agent_payload_stream.write_bytes(AGENT_PAYLOAD_TRACER_PAYLOADS_FIELD_NUMBER, &self.tracer_payload_scratch)?;
536 agent_payload_stream.flush()?;
537
538 Ok(())
539 }
540
541 fn sampling_rate(&self) -> f64 {
542 let rate = self.otlp_traces.probabilistic_sampler.sampling_percentage / 100.0;
543 if rate <= 0.0 || rate >= 1.0 {
544 return 1.0;
545 }
546 rate
547 }
548
549 fn build_trace_chunk(&self, trace: &Trace) -> TraceChunk {
550 let mut spans: Vec<ProtoSpan> = trace.spans().iter().map(convert_span).collect();
551 let mut chunk = TraceChunk::new();
552
553 let rate = self.sampling_rate();
554 let mut tags = std::collections::HashMap::new();
555 tags.insert("_dd.otlp_sr".to_string(), format!("{:.2}", rate));
556
557 const PRIORITY_AUTO_KEEP: i32 = 1;
559 chunk.set_priority(PRIORITY_AUTO_KEEP);
560
561 const DECISION_MAKER: &str = "-9";
565 if let Some(first_span) = spans.first_mut() {
566 let mut meta = first_span.take_meta();
567 meta.insert("_dd.p.dm".to_string(), DECISION_MAKER.to_string());
568 first_span.set_meta(meta);
569 }
570
571 tags.insert("_dd.p.dm".to_string(), DECISION_MAKER.to_string());
572 chunk.set_tags(tags);
573
574 chunk.set_spans(spans);
575
576 chunk
577 }
578}
579
580impl EndpointEncoder for TraceEndpointEncoder {
581 type Input = Trace;
582 type EncodeError = protobuf::Error;
583 fn encoder_name() -> &'static str {
584 "traces"
585 }
586
587 fn compressed_size_limit(&self) -> usize {
588 DEFAULT_INTAKE_COMPRESSED_SIZE_LIMIT
589 }
590
591 fn uncompressed_size_limit(&self) -> usize {
592 DEFAULT_INTAKE_UNCOMPRESSED_SIZE_LIMIT
593 }
594
595 fn encode(&mut self, trace: &Self::Input, buffer: &mut Vec<u8>) -> Result<(), Self::EncodeError> {
596 self.encode_tracer_payload(trace, buffer)
597 }
598
599 fn endpoint_uri(&self) -> Uri {
600 PathAndQuery::from_static("/api/v0.2/traces").into()
601 }
602
603 fn endpoint_method(&self) -> Method {
604 Method::POST
605 }
606
607 fn content_type(&self) -> HeaderValue {
608 CONTENT_TYPE_PROTOBUF.clone()
609 }
610}
611
612fn convert_span(span: &DdSpan) -> ProtoSpan {
613 let mut proto = ProtoSpan::new();
614 proto.set_service(span.service().to_string());
615 proto.set_name(span.name().to_string());
616 proto.set_resource(span.resource().to_string());
617 proto.set_traceID(span.trace_id());
618 proto.set_spanID(span.span_id());
619 proto.set_parentID(span.parent_id());
620 proto.set_start(span.start() as i64);
621 proto.set_duration(span.duration() as i64);
622 proto.set_error(span.error());
623 proto.set_type(span.span_type().to_string());
624
625 proto.set_meta(
626 span.meta()
627 .iter()
628 .map(|(k, v)| (k.to_string(), v.to_string()))
629 .collect(),
630 );
631 proto.set_metrics(span.metrics().iter().map(|(k, v)| (k.to_string(), *v)).collect());
632 proto.set_meta_struct(
633 span.meta_struct()
634 .iter()
635 .map(|(k, v)| (k.to_string(), v.clone()))
636 .collect(),
637 );
638 proto.set_spanLinks(span.span_links().iter().map(convert_span_link).collect());
639 proto.set_spanEvents(span.span_events().iter().map(convert_span_event).collect());
640 proto
641}
642
643fn convert_span_link(link: &DdSpanLink) -> ProtoSpanLink {
644 let mut proto = ProtoSpanLink::new();
645 proto.set_traceID(link.trace_id());
646 proto.set_traceID_high(link.trace_id_high());
647 proto.set_spanID(link.span_id());
648 proto.set_attributes(
649 link.attributes()
650 .iter()
651 .map(|(k, v)| (k.to_string(), v.to_string()))
652 .collect(),
653 );
654 proto.set_tracestate(link.tracestate().to_string());
655 proto.set_flags(link.flags());
656 proto
657}
658
659fn convert_span_event(event: &DdSpanEvent) -> ProtoSpanEvent {
660 let mut proto = ProtoSpanEvent::new();
661 proto.set_time_unix_nano(event.time_unix_nano());
662 proto.set_name(event.name().to_string());
663 proto.set_attributes(
664 event
665 .attributes()
666 .iter()
667 .map(|(k, v)| (k.to_string(), convert_attribute_value(v)))
668 .collect(),
669 );
670 proto
671}
672
673fn convert_attribute_value(value: &AttributeValue) -> AttributeAnyValue {
674 let mut proto = AttributeAnyValue::new();
675 match value {
676 AttributeValue::String(v) => {
677 proto.set_type(AttributeAnyValueType::STRING_VALUE);
678 proto.set_string_value(v.to_string());
679 }
680 AttributeValue::Bool(v) => {
681 proto.set_type(AttributeAnyValueType::BOOL_VALUE);
682 proto.set_bool_value(*v);
683 }
684 AttributeValue::Int(v) => {
685 proto.set_type(AttributeAnyValueType::INT_VALUE);
686 proto.set_int_value(*v);
687 }
688 AttributeValue::Double(v) => {
689 proto.set_type(AttributeAnyValueType::DOUBLE_VALUE);
690 proto.set_double_value(*v);
691 }
692 AttributeValue::Array(values) => {
693 proto.set_type(AttributeAnyValueType::ARRAY_VALUE);
694 let mut array = AttributeArray::new();
695 array.set_values(values.iter().map(convert_attribute_array_value).collect());
696 proto.set_array_value(array);
697 }
698 }
699 proto
700}
701
702fn convert_attribute_array_value(value: &AttributeScalarValue) -> AttributeArrayValue {
703 let mut proto = AttributeArrayValue::new();
704 match value {
705 AttributeScalarValue::String(v) => {
706 proto.set_type(AttributeArrayValueType::STRING_VALUE);
707 proto.set_string_value(v.to_string());
708 }
709 AttributeScalarValue::Bool(v) => {
710 proto.set_type(AttributeArrayValueType::BOOL_VALUE);
711 proto.set_bool_value(*v);
712 }
713 AttributeScalarValue::Int(v) => {
714 proto.set_type(AttributeArrayValueType::INT_VALUE);
715 proto.set_int_value(*v);
716 }
717 AttributeScalarValue::Double(v) => {
718 proto.set_type(AttributeArrayValueType::DOUBLE_VALUE);
719 proto.set_double_value(*v);
720 }
721 }
722 proto
723}
724
725fn write_message_field<M: Message>(
726 output_stream: &mut CodedOutputStream<'_>, field_number: u32, message: &M, scratch_buf: &mut Vec<u8>,
727) -> Result<(), protobuf::Error> {
728 scratch_buf.clear();
729 {
730 let mut nested = CodedOutputStream::vec(scratch_buf);
735 message.write_to(&mut nested)?;
736 nested.flush()?;
737 }
738 output_stream.write_tag(field_number, WireType::LengthDelimited)?;
739 output_stream.write_raw_varint32(scratch_buf.len() as u32)?;
740 output_stream.write_raw_bytes(scratch_buf)?;
741 Ok(())
742}
743
744fn write_map_entry_string_string(
745 output_stream: &mut CodedOutputStream<'_>, field_number: u32, key: &str, value: &str, scratch_buf: &mut Vec<u8>,
746) -> Result<(), protobuf::Error> {
747 scratch_buf.clear();
748 {
749 let mut nested = CodedOutputStream::vec(scratch_buf);
750 nested.write_string(1, key)?;
752 nested.write_string(2, value)?;
753 nested.flush()?;
754 }
755 output_stream.write_tag(field_number, WireType::LengthDelimited)?;
756 output_stream.write_raw_varint32(scratch_buf.len() as u32)?;
757 output_stream.write_raw_bytes(scratch_buf)?;
758 Ok(())
759}
760
761fn get_resource_tag_value<'a>(resource_tags: &'a TagSet, key: &str) -> Option<&'a str> {
762 resource_tags.get_single_tag(key).and_then(|t| t.value())
763}
764
765fn resolve_hostname<'a>(
766 resource_tags: &'a TagSet, source: Option<&'a OtlpSource>, default_hostname: Option<&'a str>,
767 ignore_missing_fields: bool,
768) -> Option<&'a str> {
769 let mut hostname = match source {
770 Some(src) => match src.kind {
771 OtlpSourceKind::HostnameKind => Some(src.identifier.as_str()),
772 _ => Some(""),
773 },
774 None => default_hostname,
775 };
776
777 if ignore_missing_fields {
778 hostname = Some("");
779 }
780
781 if let Some(value) = get_resource_tag_value(resource_tags, KEY_DATADOG_HOST) {
782 hostname = Some(value);
783 }
784
785 hostname
786}
787
788fn resolve_env(resource_tags: &TagSet, ignore_missing_fields: bool) -> Option<&str> {
789 if let Some(value) = get_resource_tag_value(resource_tags, KEY_DATADOG_ENVIRONMENT) {
790 return Some(value);
791 }
792 if ignore_missing_fields {
793 return None;
794 }
795 if let Some(value) = get_resource_tag_value(resource_tags, DEPLOYMENT_ENVIRONMENT_NAME) {
796 return Some(value);
797 }
798 get_resource_tag_value(resource_tags, DEPLOYMENT_ENVIRONMENT_KEY)
799}
800
801fn resolve_container_id<'a>(resource_tags: &'a TagSet, first_span: Option<&'a DdSpan>) -> Option<&'a str> {
802 for key in [KEY_DATADOG_CONTAINER_ID, CONTAINER_ID, K8S_POD_UID] {
803 if let Some(value) = get_resource_tag_value(resource_tags, key) {
804 return Some(value);
805 }
806 }
807 if let Some(span) = first_span {
810 for (k, v) in span.meta() {
811 if k == KEY_DATADOG_CONTAINER_ID || k == K8S_POD_UID {
812 return Some(v.as_ref());
813 }
814 }
815 }
816 None
817}
818
819fn resolve_app_version(resource_tags: &TagSet) -> Option<&str> {
820 if let Some(value) = get_resource_tag_value(resource_tags, KEY_DATADOG_VERSION) {
821 return Some(value);
822 }
823 get_resource_tag_value(resource_tags, SERVICE_VERSION)
824}
825
826fn resolve_container_tags(
827 resource_tags: &TagSet, source: Option<&OtlpSource>, ignore_missing_fields: bool,
828) -> Option<MetaString> {
829 if let Some(tags) = get_resource_tag_value(resource_tags, KEY_DATADOG_CONTAINER_TAGS) {
833 if !tags.is_empty() {
834 return Some(MetaString::from(tags));
835 }
836 }
837
838 if ignore_missing_fields {
839 return None;
840 }
841 let mut container_tags = TagSet::default();
842 extract_container_tags_from_resource_tagset(resource_tags, &mut container_tags);
843 let is_fargate_source = source.is_some_and(|src| src.kind == OtlpSourceKind::AwsEcsFargateKind);
844 if container_tags.is_empty() && !is_fargate_source {
845 return None;
846 }
847
848 let mut flattened = flatten_container_tag(container_tags);
849 if is_fargate_source {
850 if let Some(src) = source {
851 append_tags(&mut flattened, &src.tag());
852 }
853 }
854
855 if flattened.is_empty() {
856 None
857 } else {
858 Some(MetaString::from(flattened))
859 }
860}
861
862fn flatten_container_tag(tags: TagSet) -> String {
863 let mut flattened = String::new();
864 for tag in tags {
865 if !flattened.is_empty() {
866 flattened.push(',');
867 }
868 flattened.push_str(tag.as_str());
869 }
870 flattened
871}
872
873fn append_tags(target: &mut String, tags: &str) {
874 if tags.is_empty() {
875 return;
876 }
877 if !target.is_empty() {
878 target.push(',');
879 }
880 target.push_str(tags);
881}