1#![allow(dead_code)]
2
3use async_trait::async_trait;
4use datadog_protos::traces::{
5 attribute_any_value::AttributeAnyValueType, attribute_array_value::AttributeArrayValueType, AttributeAnyValue,
6 AttributeArray, AttributeArrayValue, Span as ProtoSpan, SpanEvent as ProtoSpanEvent, SpanLink as ProtoSpanLink,
7 TraceChunk,
8};
9use http::{uri::PathAndQuery, HeaderValue, Method, Uri};
10use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
11use opentelemetry_semantic_conventions::resource::{
12 CONTAINER_ID, DEPLOYMENT_ENVIRONMENT_NAME, K8S_POD_UID, SERVICE_VERSION,
13};
14use protobuf::{rt::WireType, CodedOutputStream, Message};
15use saluki_common::task::HandleExt as _;
16use saluki_config::GenericConfiguration;
17use saluki_context::tags::TagSet;
18use saluki_core::data_model::event::trace::{
19 AttributeScalarValue, AttributeValue, Span as DdSpan, SpanEvent as DdSpanEvent, SpanLink as DdSpanLink,
20};
21use saluki_core::data_model::event::Event;
22use saluki_core::topology::{EventsBuffer, PayloadsBuffer};
23use saluki_core::{
24 components::{encoders::*, ComponentContext},
25 data_model::{
26 event::{trace::Trace, EventType},
27 payload::{HttpPayload, Payload, PayloadMetadata, PayloadType},
28 },
29 observability::ComponentMetricsExt as _,
30};
31use saluki_env::host::providers::BoxedHostProvider;
32use saluki_env::{EnvironmentProvider, HostProvider};
33use saluki_error::generic_error;
34use saluki_error::{ErrorContext as _, GenericError};
35use saluki_io::compression::CompressionScheme;
36use saluki_metrics::MetricsBuilder;
37use serde::Deserialize;
38use stringtheory::MetaString;
39use tokio::{
40 select,
41 sync::mpsc::{self, Receiver, Sender},
42 time::sleep,
43};
44use tracing::{debug, error};
45
46use crate::common::datadog::{
47 apm::ApmConfig,
48 io::RB_BUFFER_CHUNK_SIZE,
49 request_builder::{EndpointEncoder, RequestBuilder},
50 telemetry::ComponentTelemetry,
51 DEFAULT_INTAKE_COMPRESSED_SIZE_LIMIT, DEFAULT_INTAKE_UNCOMPRESSED_SIZE_LIMIT,
52};
53use crate::common::otlp::config::TracesConfig;
54use crate::common::otlp::util::{
55 extract_container_tags_from_resource_tagset, tags_to_source, Source as OtlpSource, SourceKind as OtlpSourceKind,
56 DEPLOYMENT_ENVIRONMENT_KEY, KEY_DATADOG_CONTAINER_ID, KEY_DATADOG_CONTAINER_TAGS, KEY_DATADOG_ENVIRONMENT,
57 KEY_DATADOG_HOST, KEY_DATADOG_VERSION,
58};
59
60const CONTAINER_TAGS_META_KEY: &str = "_dd.tags.container";
61const MAX_TRACES_PER_PAYLOAD: usize = 10000;
62static CONTENT_TYPE_PROTOBUF: HeaderValue = HeaderValue::from_static("application/x-protobuf");
63
64fn default_serializer_compressor_kind() -> String {
65 "zstd".to_string()
66}
67
68const fn default_zstd_compressor_level() -> i32 {
69 3
70}
71
72const fn default_flush_timeout_secs() -> u64 {
73 2
74}
75
76const TRACER_PAYLOAD_CONTAINER_ID_FIELD_NUMBER: u32 = 1;
78const TRACER_PAYLOAD_LANGUAGE_NAME_FIELD_NUMBER: u32 = 2;
79const TRACER_PAYLOAD_TRACER_VERSION_FIELD_NUMBER: u32 = 4;
80const TRACER_PAYLOAD_CHUNKS_FIELD_NUMBER: u32 = 6;
81const TRACER_PAYLOAD_TAGS_FIELD_NUMBER: u32 = 7;
82const TRACER_PAYLOAD_ENV_FIELD_NUMBER: u32 = 8;
83const TRACER_PAYLOAD_HOSTNAME_FIELD_NUMBER: u32 = 9;
84const TRACER_PAYLOAD_APP_VERSION_FIELD_NUMBER: u32 = 10;
85
86const AGENT_PAYLOAD_HOSTNAME_FIELD_NUMBER: u32 = 1;
87const AGENT_PAYLOAD_ENV_FIELD_NUMBER: u32 = 2;
88const AGENT_PAYLOAD_TRACER_PAYLOADS_FIELD_NUMBER: u32 = 5;
89const AGENT_PAYLOAD_AGENT_VERSION_FIELD_NUMBER: u32 = 7;
90const AGENT_PAYLOAD_TARGET_TPS_FIELD_NUMBER: u32 = 8;
91const AGENT_PAYLOAD_ERROR_TPS_FIELD_NUMBER: u32 = 9;
92
93fn default_env() -> String {
94 "none".to_string()
95}
96
97#[derive(Deserialize)]
103pub struct DatadogTraceConfiguration {
104 #[serde(
105 rename = "serializer_compressor_kind", default = "default_serializer_compressor_kind"
107 )]
108 compressor_kind: String,
109
110 #[serde(
111 rename = "serializer_zstd_compressor_level",
112 default = "default_zstd_compressor_level"
113 )]
114 zstd_compressor_level: i32,
115
116 #[serde(default = "default_flush_timeout_secs")]
124 flush_timeout_secs: u64,
125
126 #[serde(skip)]
127 default_hostname: Option<String>,
128
129 #[serde(skip)]
130 version: String,
131
132 #[serde(skip)]
133 apm_config: ApmConfig,
134
135 #[serde(skip)]
136 otlp_traces: TracesConfig,
137
138 #[serde(default = "default_env")]
139 env: String,
140}
141
142impl DatadogTraceConfiguration {
143 pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
145 let mut trace_config: Self = config.as_typed()?;
146
147 let app_details = saluki_metadata::get_app_details();
148 trace_config.version = format!("agent-data-plane/{}", app_details.version().raw());
149
150 trace_config.apm_config = ApmConfig::from_configuration(config)?;
151 trace_config.otlp_traces = config.try_get_typed("otlp_config.traces")?.unwrap_or_default();
152
153 Ok(trace_config)
154 }
155}
156
157impl DatadogTraceConfiguration {
158 pub async fn with_environment_provider<E>(mut self, environment_provider: E) -> Result<Self, GenericError>
160 where
161 E: EnvironmentProvider<Host = BoxedHostProvider>,
162 {
163 let host_provider = environment_provider.host();
164 let hostname = host_provider.get_hostname().await?;
165 self.default_hostname = Some(hostname);
166 Ok(self)
167 }
168}
169
170#[async_trait]
171impl EncoderBuilder for DatadogTraceConfiguration {
172 fn input_event_type(&self) -> EventType {
173 EventType::Trace
174 }
175
176 fn output_payload_type(&self) -> PayloadType {
177 PayloadType::Http
178 }
179
180 async fn build(&self, context: ComponentContext) -> Result<Box<dyn Encoder + Send>, GenericError> {
181 let metrics_builder = MetricsBuilder::from_component_context(&context);
182 let telemetry = ComponentTelemetry::from_builder(&metrics_builder);
183 let compression_scheme = CompressionScheme::new(&self.compressor_kind, self.zstd_compressor_level);
184
185 let default_hostname = self.default_hostname.clone().unwrap_or_default();
186 let default_hostname = MetaString::from(default_hostname);
187
188 let mut trace_rb = RequestBuilder::new(
191 TraceEndpointEncoder::new(
192 default_hostname,
193 self.version.clone(),
194 self.env.clone(),
195 self.apm_config.clone(),
196 self.otlp_traces.clone(),
197 ),
198 compression_scheme,
199 RB_BUFFER_CHUNK_SIZE,
200 )
201 .await?;
202 trace_rb.with_max_inputs_per_payload(MAX_TRACES_PER_PAYLOAD);
203
204 let flush_timeout = match self.flush_timeout_secs {
205 0 => std::time::Duration::from_millis(10),
207 secs => std::time::Duration::from_secs(secs),
208 };
209
210 Ok(Box::new(DatadogTrace {
211 trace_rb,
212 telemetry,
213 flush_timeout,
214 }))
215 }
216}
217
218impl MemoryBounds for DatadogTraceConfiguration {
219 fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
220 builder
222 .minimum()
223 .with_single_value::<DatadogTrace>("component struct")
224 .with_array::<EventsBuffer>("request builder events channel", 8)
225 .with_array::<PayloadsBuffer>("request builder payloads channel", 8);
226
227 builder
228 .firm()
229 .with_array::<Trace>("traces split re-encode buffer", MAX_TRACES_PER_PAYLOAD);
230 }
231}
232
233pub struct DatadogTrace {
234 trace_rb: RequestBuilder<TraceEndpointEncoder>,
235 telemetry: ComponentTelemetry,
236 flush_timeout: std::time::Duration,
237}
238
239#[async_trait]
241impl Encoder for DatadogTrace {
242 async fn run(mut self: Box<Self>, mut context: EncoderContext) -> Result<(), GenericError> {
243 let Self {
244 trace_rb,
245 telemetry,
246 flush_timeout,
247 } = *self;
248
249 let mut health = context.take_health_handle();
250
251 let (events_tx, events_rx) = mpsc::channel(8);
254 let (payloads_tx, mut payloads_rx) = mpsc::channel(8);
256 let request_builder_fut = run_request_builder(trace_rb, telemetry, events_rx, payloads_tx, flush_timeout);
257 let request_builder_handle = context
259 .topology_context()
260 .global_thread_pool() .spawn_traced_named("dd-traces-request-builder", request_builder_fut);
262
263 health.mark_ready();
264 debug!("Datadog Trace encoder started.");
265
266 loop {
267 select! {
268 biased; _ = health.live() => continue,
271 maybe_payload = payloads_rx.recv() => match maybe_payload {
272 Some(payload) => {
273 if let Err(e) = context.dispatcher().dispatch(payload).await {
275 error!("Failed to dispatch payload: {}", e);
276 }
277 }
278 None => break,
279 },
280 maybe_event_buffer = context.events().next() => match maybe_event_buffer {
281 Some(event_buffer) => events_tx.send(event_buffer).await
282 .error_context("Failed to send event buffer to request builder task.")?,
283 None => break,
284 },
285 }
286 }
287
288 drop(events_tx);
290
291 while let Some(payload) = payloads_rx.recv().await {
293 if let Err(e) = context.dispatcher().dispatch(payload).await {
294 error!("Failed to dispatch payload: {}", e);
295 }
296 }
297
298 match request_builder_handle.await {
300 Ok(Ok(())) => debug!("Request builder task stopped."),
301 Ok(Err(e)) => error!(error = %e, "Request builder task failed."),
302 Err(e) => error!(error = %e, "Request builder task panicked."),
303 }
304
305 debug!("Datadog Trace encoder stopped.");
306
307 Ok(())
308 }
309}
310
311async fn run_request_builder(
312 mut trace_request_builder: RequestBuilder<TraceEndpointEncoder>, telemetry: ComponentTelemetry,
313 mut events_rx: Receiver<EventsBuffer>, payloads_tx: Sender<PayloadsBuffer>, flush_timeout: std::time::Duration,
314) -> Result<(), GenericError> {
315 let mut pending_flush = false;
316 let pending_flush_timeout = sleep(flush_timeout);
317 tokio::pin!(pending_flush_timeout);
318
319 loop {
320 select! {
321 Some(event_buffer) = events_rx.recv() => {
322 for event in event_buffer {
323 let trace = match event {
324 Event::Trace(trace) => trace,
325 _ => continue,
326 };
327
328 let trace_to_retry = match trace_request_builder.encode(trace).await {
331 Ok(None) => continue,
332 Ok(Some(trace)) => trace,
333 Err(e) => {
334 error!(error = %e, "Failed to encode trace.");
335 telemetry.events_dropped_encoder().increment(1);
336 continue;
337 }
338 };
339
340 let maybe_requests = trace_request_builder.flush().await;
341 if maybe_requests.is_empty() {
342 panic!("builder told us to flush, but gave us nothing");
343 }
344
345 for maybe_request in maybe_requests {
346 match maybe_request {
347 Ok((events, request)) => {
348 let payload_meta = PayloadMetadata::from_event_count(events);
349 let http_payload = HttpPayload::new(payload_meta, request);
350 let payload = Payload::Http(http_payload);
351
352 payloads_tx.send(payload).await
353 .map_err(|_| generic_error!("Failed to send payload to encoder."))?;
354 },
355 Err(e) => if e.is_recoverable() {
356 continue;
358 } else {
359 return Err(GenericError::from(e).context("Failed to flush request."));
360 }
361 }
362 }
363
364 if let Err(e) = trace_request_builder.encode(trace_to_retry).await {
366 error!(error = %e, "Failed to encode trace.");
367 telemetry.events_dropped_encoder().increment(1);
368 }
369 }
370
371 debug!("Processed event buffer.");
372
373 if !pending_flush {
375 pending_flush_timeout.as_mut().reset(tokio::time::Instant::now() + flush_timeout);
376 pending_flush = true;
377 }
378 },
379 _ = &mut pending_flush_timeout, if pending_flush => {
380 debug!("Flushing pending request(s).");
381
382 pending_flush = false;
383
384 let maybe_trace_requests = trace_request_builder.flush().await;
387 for maybe_request in maybe_trace_requests {
388 match maybe_request {
389 Ok((events, request)) => {
390 let payload_meta = PayloadMetadata::from_event_count(events);
391 let http_payload = HttpPayload::new(payload_meta, request);
392 let payload = Payload::Http(http_payload);
393
394 payloads_tx.send(payload).await
395 .map_err(|_| generic_error!("Failed to send payload to encoder."))?;
396 },
397 Err(e) => if e.is_recoverable() {
398 continue;
399 } else {
400 return Err(GenericError::from(e).context("Failed to flush request."));
401 }
402 }
403 }
404
405 debug!("All flushed requests sent to I/O task. Waiting for next event buffer...");
406 },
407
408 else => break,
410 }
411 }
412
413 Ok(())
414}
415
416#[derive(Debug)]
417struct TraceEndpointEncoder {
418 tracer_payload_scratch: Vec<u8>,
419 chunk_scratch: Vec<u8>,
420 tags_scratch: Vec<u8>,
421 default_hostname: MetaString,
423 agent_hostname: String,
424 version: String,
425 env: String,
426 apm_config: ApmConfig,
427 otlp_traces: TracesConfig,
428}
429
430impl TraceEndpointEncoder {
431 fn new(
432 default_hostname: MetaString, version: String, env: String, apm_config: ApmConfig, otlp_traces: TracesConfig,
433 ) -> Self {
434 Self {
435 tracer_payload_scratch: Vec::new(),
436 chunk_scratch: Vec::new(),
437 tags_scratch: Vec::new(),
438 agent_hostname: default_hostname.as_ref().to_string(),
439 default_hostname,
440 version,
441 env,
442 apm_config,
443 otlp_traces,
444 }
445 }
446
447 fn encode_tracer_payload(&mut self, trace: &Trace, output_buffer: &mut Vec<u8>) -> Result<(), protobuf::Error> {
448 let resource_tags = trace.resource_tags();
449 let first_span = trace.spans().first();
450 let source = tags_to_source(resource_tags);
451
452 let trace_chunk = self.build_trace_chunk(trace);
453
454 let mut agent_payload_stream = CodedOutputStream::vec(output_buffer);
456
457 agent_payload_stream.write_string(AGENT_PAYLOAD_HOSTNAME_FIELD_NUMBER, &self.agent_hostname)?;
459 agent_payload_stream.write_string(AGENT_PAYLOAD_ENV_FIELD_NUMBER, &self.env)?;
460
461 agent_payload_stream.write_string(AGENT_PAYLOAD_AGENT_VERSION_FIELD_NUMBER, &self.version)?;
462 agent_payload_stream.write_double(
463 AGENT_PAYLOAD_TARGET_TPS_FIELD_NUMBER,
464 self.apm_config.target_traces_per_second(),
465 )?;
466 agent_payload_stream.write_double(
467 AGENT_PAYLOAD_ERROR_TPS_FIELD_NUMBER,
468 self.apm_config.errors_per_second(),
469 )?;
470
471 self.tracer_payload_scratch.clear();
473 let mut tracer_payload_stream = CodedOutputStream::vec(&mut self.tracer_payload_scratch);
474
475 if let Some(container_id) = resolve_container_id(resource_tags, first_span) {
477 tracer_payload_stream.write_string(TRACER_PAYLOAD_CONTAINER_ID_FIELD_NUMBER, container_id)?;
478 }
479
480 if let Some(lang) = get_resource_tag_value(resource_tags, "telemetry.sdk.language") {
481 tracer_payload_stream.write_string(TRACER_PAYLOAD_LANGUAGE_NAME_FIELD_NUMBER, lang)?;
482 }
483
484 let sdk_version = get_resource_tag_value(resource_tags, "telemetry.sdk.version").unwrap_or("");
486 let tracer_version = format!("otlp-{}", sdk_version);
487 tracer_payload_stream.write_string(TRACER_PAYLOAD_TRACER_VERSION_FIELD_NUMBER, &tracer_version)?;
488
489 self.chunk_scratch.clear();
490 write_message_field(
491 &mut tracer_payload_stream,
492 TRACER_PAYLOAD_CHUNKS_FIELD_NUMBER,
493 &trace_chunk,
494 &mut self.chunk_scratch,
495 )?;
496
497 self.tags_scratch.clear();
498 if let Some(tags) = resolve_container_tags(
499 resource_tags,
500 source.as_ref(),
501 self.otlp_traces.ignore_missing_datadog_fields,
502 ) {
503 write_map_entry_string_string(
504 &mut tracer_payload_stream,
505 TRACER_PAYLOAD_TAGS_FIELD_NUMBER,
506 CONTAINER_TAGS_META_KEY,
507 tags.as_ref(),
508 &mut self.tags_scratch,
509 )?;
510 }
511
512 if let Some(env) = resolve_env(resource_tags, self.otlp_traces.ignore_missing_datadog_fields) {
513 tracer_payload_stream.write_string(TRACER_PAYLOAD_ENV_FIELD_NUMBER, env)?;
514 }
515
516 if let Some(hostname) = resolve_hostname(
517 resource_tags,
518 source.as_ref(),
519 Some(self.default_hostname.as_ref()),
520 self.otlp_traces.ignore_missing_datadog_fields,
521 ) {
522 tracer_payload_stream.write_string(TRACER_PAYLOAD_HOSTNAME_FIELD_NUMBER, hostname)?;
523 }
524
525 if let Some(app_version) = resolve_app_version(resource_tags) {
526 tracer_payload_stream.write_string(TRACER_PAYLOAD_APP_VERSION_FIELD_NUMBER, app_version)?;
527 }
528
529 tracer_payload_stream.flush()?;
530 drop(tracer_payload_stream);
532
533 agent_payload_stream.write_bytes(AGENT_PAYLOAD_TRACER_PAYLOADS_FIELD_NUMBER, &self.tracer_payload_scratch)?;
535 agent_payload_stream.flush()?;
536
537 Ok(())
538 }
539
540 fn sampling_rate(&self) -> f64 {
541 let rate = self.otlp_traces.probabilistic_sampler.sampling_percentage / 100.0;
542 if rate <= 0.0 || rate >= 1.0 {
543 return 1.0;
544 }
545 rate
546 }
547
548 fn build_trace_chunk(&self, trace: &Trace) -> TraceChunk {
549 let mut spans: Vec<ProtoSpan> = trace.spans().iter().map(convert_span).collect();
550 let mut chunk = TraceChunk::new();
551
552 let rate = self.sampling_rate();
553 let mut tags = std::collections::HashMap::new();
554 tags.insert("_dd.otlp_sr".to_string(), format!("{:.2}", rate));
555
556 const PRIORITY_AUTO_KEEP: i32 = 1;
558 chunk.set_priority(PRIORITY_AUTO_KEEP);
559
560 const DECISION_MAKER: &str = "-9";
564 if let Some(first_span) = spans.first_mut() {
565 let mut meta = first_span.take_meta();
566 meta.insert("_dd.p.dm".to_string(), DECISION_MAKER.to_string());
567 first_span.set_meta(meta);
568 }
569
570 tags.insert("_dd.p.dm".to_string(), DECISION_MAKER.to_string());
571 chunk.set_tags(tags);
572
573 chunk.set_spans(spans);
574
575 chunk
576 }
577}
578
579impl EndpointEncoder for TraceEndpointEncoder {
580 type Input = Trace;
581 type EncodeError = protobuf::Error;
582 fn encoder_name() -> &'static str {
583 "traces"
584 }
585
586 fn compressed_size_limit(&self) -> usize {
587 DEFAULT_INTAKE_COMPRESSED_SIZE_LIMIT
588 }
589
590 fn uncompressed_size_limit(&self) -> usize {
591 DEFAULT_INTAKE_UNCOMPRESSED_SIZE_LIMIT
592 }
593
594 fn encode(&mut self, trace: &Self::Input, buffer: &mut Vec<u8>) -> Result<(), Self::EncodeError> {
595 self.encode_tracer_payload(trace, buffer)
596 }
597
598 fn endpoint_uri(&self) -> Uri {
599 PathAndQuery::from_static("/api/v0.2/traces").into()
600 }
601
602 fn endpoint_method(&self) -> Method {
603 Method::POST
604 }
605
606 fn content_type(&self) -> HeaderValue {
607 CONTENT_TYPE_PROTOBUF.clone()
608 }
609}
610
611fn convert_span(span: &DdSpan) -> ProtoSpan {
612 let mut proto = ProtoSpan::new();
613 proto.set_service(span.service().to_string());
614 proto.set_name(span.name().to_string());
615 proto.set_resource(span.resource().to_string());
616 proto.set_traceID(span.trace_id());
617 proto.set_spanID(span.span_id());
618 proto.set_parentID(span.parent_id());
619 proto.set_start(span.start());
620 proto.set_duration(span.duration());
621 proto.set_error(span.error());
622 proto.set_type(span.span_type().to_string());
623
624 proto.set_meta(
625 span.meta()
626 .iter()
627 .map(|(k, v)| (k.to_string(), v.to_string()))
628 .collect(),
629 );
630 proto.set_metrics(span.metrics().iter().map(|(k, v)| (k.to_string(), *v)).collect());
631 proto.set_meta_struct(
632 span.meta_struct()
633 .iter()
634 .map(|(k, v)| (k.to_string(), v.clone()))
635 .collect(),
636 );
637 proto.set_spanLinks(span.span_links().iter().map(convert_span_link).collect());
638 proto.set_spanEvents(span.span_events().iter().map(convert_span_event).collect());
639 proto
640}
641
642fn convert_span_link(link: &DdSpanLink) -> ProtoSpanLink {
643 let mut proto = ProtoSpanLink::new();
644 proto.set_traceID(link.trace_id());
645 proto.set_traceID_high(link.trace_id_high());
646 proto.set_spanID(link.span_id());
647 proto.set_attributes(
648 link.attributes()
649 .iter()
650 .map(|(k, v)| (k.to_string(), v.to_string()))
651 .collect(),
652 );
653 proto.set_tracestate(link.tracestate().to_string());
654 proto.set_flags(link.flags());
655 proto
656}
657
658fn convert_span_event(event: &DdSpanEvent) -> ProtoSpanEvent {
659 let mut proto = ProtoSpanEvent::new();
660 proto.set_time_unix_nano(event.time_unix_nano());
661 proto.set_name(event.name().to_string());
662 proto.set_attributes(
663 event
664 .attributes()
665 .iter()
666 .map(|(k, v)| (k.to_string(), convert_attribute_value(v)))
667 .collect(),
668 );
669 proto
670}
671
672fn convert_attribute_value(value: &AttributeValue) -> AttributeAnyValue {
673 let mut proto = AttributeAnyValue::new();
674 match value {
675 AttributeValue::String(v) => {
676 proto.set_type(AttributeAnyValueType::STRING_VALUE);
677 proto.set_string_value(v.to_string());
678 }
679 AttributeValue::Bool(v) => {
680 proto.set_type(AttributeAnyValueType::BOOL_VALUE);
681 proto.set_bool_value(*v);
682 }
683 AttributeValue::Int(v) => {
684 proto.set_type(AttributeAnyValueType::INT_VALUE);
685 proto.set_int_value(*v);
686 }
687 AttributeValue::Double(v) => {
688 proto.set_type(AttributeAnyValueType::DOUBLE_VALUE);
689 proto.set_double_value(*v);
690 }
691 AttributeValue::Array(values) => {
692 proto.set_type(AttributeAnyValueType::ARRAY_VALUE);
693 let mut array = AttributeArray::new();
694 array.set_values(values.iter().map(convert_attribute_array_value).collect());
695 proto.set_array_value(array);
696 }
697 }
698 proto
699}
700
701fn convert_attribute_array_value(value: &AttributeScalarValue) -> AttributeArrayValue {
702 let mut proto = AttributeArrayValue::new();
703 match value {
704 AttributeScalarValue::String(v) => {
705 proto.set_type(AttributeArrayValueType::STRING_VALUE);
706 proto.set_string_value(v.to_string());
707 }
708 AttributeScalarValue::Bool(v) => {
709 proto.set_type(AttributeArrayValueType::BOOL_VALUE);
710 proto.set_bool_value(*v);
711 }
712 AttributeScalarValue::Int(v) => {
713 proto.set_type(AttributeArrayValueType::INT_VALUE);
714 proto.set_int_value(*v);
715 }
716 AttributeScalarValue::Double(v) => {
717 proto.set_type(AttributeArrayValueType::DOUBLE_VALUE);
718 proto.set_double_value(*v);
719 }
720 }
721 proto
722}
723
724fn write_message_field<M: Message>(
725 output_stream: &mut CodedOutputStream<'_>, field_number: u32, message: &M, scratch_buf: &mut Vec<u8>,
726) -> Result<(), protobuf::Error> {
727 scratch_buf.clear();
728 {
729 let mut nested = CodedOutputStream::vec(scratch_buf);
734 message.write_to(&mut nested)?;
735 nested.flush()?;
736 }
737 output_stream.write_tag(field_number, WireType::LengthDelimited)?;
738 output_stream.write_raw_varint32(scratch_buf.len() as u32)?;
739 output_stream.write_raw_bytes(scratch_buf)?;
740 Ok(())
741}
742
743fn write_map_entry_string_string(
744 output_stream: &mut CodedOutputStream<'_>, field_number: u32, key: &str, value: &str, scratch_buf: &mut Vec<u8>,
745) -> Result<(), protobuf::Error> {
746 scratch_buf.clear();
747 {
748 let mut nested = CodedOutputStream::vec(scratch_buf);
749 nested.write_string(1, key)?;
751 nested.write_string(2, value)?;
752 nested.flush()?;
753 }
754 output_stream.write_tag(field_number, WireType::LengthDelimited)?;
755 output_stream.write_raw_varint32(scratch_buf.len() as u32)?;
756 output_stream.write_raw_bytes(scratch_buf)?;
757 Ok(())
758}
759
760fn get_resource_tag_value<'a>(resource_tags: &'a TagSet, key: &str) -> Option<&'a str> {
761 resource_tags.get_single_tag(key).and_then(|t| t.value())
762}
763
764fn resolve_hostname<'a>(
765 resource_tags: &'a TagSet, source: Option<&'a OtlpSource>, default_hostname: Option<&'a str>,
766 ignore_missing_fields: bool,
767) -> Option<&'a str> {
768 let mut hostname = match source {
769 Some(src) => match src.kind {
770 OtlpSourceKind::HostnameKind => Some(src.identifier.as_str()),
771 _ => Some(""),
772 },
773 None => default_hostname,
774 };
775
776 if ignore_missing_fields {
777 hostname = Some("");
778 }
779
780 if let Some(value) = get_resource_tag_value(resource_tags, KEY_DATADOG_HOST) {
781 hostname = Some(value);
782 }
783
784 hostname
785}
786
787fn resolve_env(resource_tags: &TagSet, ignore_missing_fields: bool) -> Option<&str> {
788 if let Some(value) = get_resource_tag_value(resource_tags, KEY_DATADOG_ENVIRONMENT) {
789 return Some(value);
790 }
791 if ignore_missing_fields {
792 return None;
793 }
794 if let Some(value) = get_resource_tag_value(resource_tags, DEPLOYMENT_ENVIRONMENT_NAME) {
795 return Some(value);
796 }
797 get_resource_tag_value(resource_tags, DEPLOYMENT_ENVIRONMENT_KEY)
798}
799
800fn resolve_container_id<'a>(resource_tags: &'a TagSet, first_span: Option<&'a DdSpan>) -> Option<&'a str> {
801 for key in [KEY_DATADOG_CONTAINER_ID, CONTAINER_ID, K8S_POD_UID] {
802 if let Some(value) = get_resource_tag_value(resource_tags, key) {
803 return Some(value);
804 }
805 }
806 if let Some(span) = first_span {
809 for (k, v) in span.meta() {
810 if k == KEY_DATADOG_CONTAINER_ID || k == K8S_POD_UID {
811 return Some(v.as_ref());
812 }
813 }
814 }
815 None
816}
817
818fn resolve_app_version(resource_tags: &TagSet) -> Option<&str> {
819 if let Some(value) = get_resource_tag_value(resource_tags, KEY_DATADOG_VERSION) {
820 return Some(value);
821 }
822 get_resource_tag_value(resource_tags, SERVICE_VERSION)
823}
824
825fn resolve_container_tags(
826 resource_tags: &TagSet, source: Option<&OtlpSource>, ignore_missing_fields: bool,
827) -> Option<MetaString> {
828 if let Some(tags) = get_resource_tag_value(resource_tags, KEY_DATADOG_CONTAINER_TAGS) {
832 if !tags.is_empty() {
833 return Some(MetaString::from(tags));
834 }
835 }
836
837 if ignore_missing_fields {
838 return None;
839 }
840 let mut container_tags = TagSet::default();
841 extract_container_tags_from_resource_tagset(resource_tags, &mut container_tags);
842 let is_fargate_source = source.is_some_and(|src| src.kind == OtlpSourceKind::AwsEcsFargateKind);
843 if container_tags.is_empty() && !is_fargate_source {
844 return None;
845 }
846
847 let mut flattened = flatten_container_tag(container_tags);
848 if is_fargate_source {
849 if let Some(src) = source {
850 append_tags(&mut flattened, &src.tag());
851 }
852 }
853
854 if flattened.is_empty() {
855 None
856 } else {
857 Some(MetaString::from(flattened))
858 }
859}
860
861fn flatten_container_tag(tags: TagSet) -> String {
862 let mut flattened = String::new();
863 for tag in tags {
864 if !flattened.is_empty() {
865 flattened.push(',');
866 }
867 flattened.push_str(tag.as_str());
868 }
869 flattened
870}
871
872fn append_tags(target: &mut String, tags: &str) {
873 if tags.is_empty() {
874 return;
875 }
876 if !target.is_empty() {
877 target.push(',');
878 }
879 target.push_str(tags);
880}