1#![allow(dead_code)]
2
3use std::{fmt::Write, time::Duration};
4
5use async_trait::async_trait;
6use datadog_protos::traces::builders::{
7 attribute_any_value::AttributeAnyValueType, attribute_array_value::AttributeArrayValueType, AgentPayloadBuilder,
8 AttributeAnyValueBuilder, AttributeArrayValueBuilder,
9};
10use http::{uri::PathAndQuery, HeaderValue, Method, Uri};
11use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
12use opentelemetry_semantic_conventions::resource::{
13 CONTAINER_ID, DEPLOYMENT_ENVIRONMENT_NAME, K8S_POD_UID, SERVICE_VERSION,
14};
15use piecemeal::{ScratchBuffer, ScratchWriter};
16use saluki_common::strings::StringBuilder;
17use saluki_common::task::HandleExt as _;
18use saluki_config::GenericConfiguration;
19use saluki_context::tags::{SharedTagSet, TagSet};
20use saluki_core::data_model::event::trace::{AttributeScalarValue, AttributeValue, Span as DdSpan};
21use saluki_core::topology::{EventsBuffer, PayloadsBuffer};
22use saluki_core::{
23 components::{encoders::*, ComponentContext},
24 data_model::{
25 event::{trace::Trace, EventType},
26 payload::{HttpPayload, Payload, PayloadMetadata, PayloadType},
27 },
28 observability::ComponentMetricsExt as _,
29};
30use saluki_env::host::providers::BoxedHostProvider;
31use saluki_env::{EnvironmentProvider, HostProvider};
32use saluki_error::generic_error;
33use saluki_error::{ErrorContext as _, GenericError};
34use saluki_io::compression::CompressionScheme;
35use saluki_metrics::MetricsBuilder;
36use serde::Deserialize;
37use stringtheory::MetaString;
38use tokio::{
39 select,
40 sync::mpsc::{self, Receiver, Sender},
41 time::sleep,
42};
43use tracing::{debug, error};
44
45use crate::common::datadog::{
46 apm::ApmConfig,
47 io::RB_BUFFER_CHUNK_SIZE,
48 request_builder::{EndpointEncoder, RequestBuilder},
49 telemetry::ComponentTelemetry,
50 DEFAULT_INTAKE_COMPRESSED_SIZE_LIMIT, DEFAULT_INTAKE_UNCOMPRESSED_SIZE_LIMIT, TAG_DECISION_MAKER,
51};
52use crate::common::otlp::config::TracesConfig;
53use crate::common::otlp::util::{
54 extract_container_tags_from_resource_tagset, tags_to_source, Source as OtlpSource, SourceKind as OtlpSourceKind,
55 DEPLOYMENT_ENVIRONMENT_KEY, KEY_DATADOG_CONTAINER_ID, KEY_DATADOG_CONTAINER_TAGS, KEY_DATADOG_ENVIRONMENT,
56 KEY_DATADOG_HOST, KEY_DATADOG_VERSION,
57};
58
59const CONTAINER_TAGS_META_KEY: &str = "_dd.tags.container";
60const MAX_TRACES_PER_PAYLOAD: usize = 10000;
61static CONTENT_TYPE_PROTOBUF: HeaderValue = HeaderValue::from_static("application/x-protobuf");
62
63const TAG_OTLP_SAMPLING_RATE: &str = "_dd.otlp_sr";
65const DEFAULT_CHUNK_PRIORITY: i32 = 1; fn default_serializer_compressor_kind() -> String {
68 "zstd".to_string()
69}
70
71const fn default_zstd_compressor_level() -> i32 {
72 3
73}
74
75const fn default_flush_timeout_secs() -> u64 {
76 2
77}
78
79fn default_env() -> String {
80 "none".to_string()
81}
82
83#[derive(Deserialize)]
89pub struct DatadogTraceConfiguration {
90 #[serde(
91 rename = "serializer_compressor_kind", default = "default_serializer_compressor_kind"
93 )]
94 compressor_kind: String,
95
96 #[serde(
97 rename = "serializer_zstd_compressor_level",
98 default = "default_zstd_compressor_level"
99 )]
100 zstd_compressor_level: i32,
101
102 #[serde(default = "default_flush_timeout_secs")]
110 flush_timeout_secs: u64,
111
112 #[serde(skip)]
113 default_hostname: Option<String>,
114
115 #[serde(skip)]
116 version: String,
117
118 #[serde(skip)]
119 apm_config: ApmConfig,
120
121 #[serde(skip)]
122 otlp_traces: TracesConfig,
123
124 #[serde(default = "default_env")]
125 env: String,
126}
127
128impl DatadogTraceConfiguration {
129 pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
131 let mut trace_config: Self = config.as_typed()?;
132
133 let app_details = saluki_metadata::get_app_details();
134 trace_config.version = format!("agent-data-plane/{}", app_details.version().raw());
135
136 trace_config.apm_config = ApmConfig::from_configuration(config)?;
137 trace_config.otlp_traces = config.try_get_typed("otlp_config.traces")?.unwrap_or_default();
138
139 Ok(trace_config)
140 }
141}
142
143impl DatadogTraceConfiguration {
144 pub async fn with_environment_provider<E>(mut self, environment_provider: E) -> Result<Self, GenericError>
146 where
147 E: EnvironmentProvider<Host = BoxedHostProvider>,
148 {
149 let host_provider = environment_provider.host();
150 let hostname = host_provider.get_hostname().await?;
151 self.default_hostname = Some(hostname);
152 Ok(self)
153 }
154}
155
156#[async_trait]
157impl EncoderBuilder for DatadogTraceConfiguration {
158 fn input_event_type(&self) -> EventType {
159 EventType::Trace
160 }
161
162 fn output_payload_type(&self) -> PayloadType {
163 PayloadType::Http
164 }
165
166 async fn build(&self, context: ComponentContext) -> Result<Box<dyn Encoder + Send>, GenericError> {
167 let metrics_builder = MetricsBuilder::from_component_context(&context);
168 let telemetry = ComponentTelemetry::from_builder(&metrics_builder);
169 let compression_scheme = CompressionScheme::new(&self.compressor_kind, self.zstd_compressor_level);
170
171 let default_hostname = self.default_hostname.clone().unwrap_or_default();
172 let default_hostname = MetaString::from(default_hostname);
173
174 let mut trace_rb = RequestBuilder::new(
177 TraceEndpointEncoder::new(
178 default_hostname,
179 self.version.clone(),
180 self.env.clone(),
181 self.apm_config.clone(),
182 self.otlp_traces.clone(),
183 ),
184 compression_scheme,
185 RB_BUFFER_CHUNK_SIZE,
186 )
187 .await?;
188 trace_rb.with_max_inputs_per_payload(MAX_TRACES_PER_PAYLOAD);
189
190 let flush_timeout = match self.flush_timeout_secs {
191 0 => Duration::from_millis(10),
194 secs => Duration::from_secs(secs),
195 };
196
197 Ok(Box::new(DatadogTrace {
198 trace_rb,
199 telemetry,
200 flush_timeout,
201 }))
202 }
203}
204
205impl MemoryBounds for DatadogTraceConfiguration {
206 fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
207 builder
209 .minimum()
210 .with_single_value::<DatadogTrace>("component struct")
211 .with_array::<EventsBuffer>("request builder events channel", 8)
212 .with_array::<PayloadsBuffer>("request builder payloads channel", 8);
213
214 builder
215 .firm()
216 .with_array::<Trace>("traces split re-encode buffer", MAX_TRACES_PER_PAYLOAD);
217 }
218}
219
220pub struct DatadogTrace {
221 trace_rb: RequestBuilder<TraceEndpointEncoder>,
222 telemetry: ComponentTelemetry,
223 flush_timeout: Duration,
224}
225
226#[async_trait]
228impl Encoder for DatadogTrace {
229 async fn run(mut self: Box<Self>, mut context: EncoderContext) -> Result<(), GenericError> {
230 let Self {
231 trace_rb,
232 telemetry,
233 flush_timeout,
234 } = *self;
235
236 let mut health = context.take_health_handle();
237
238 let (events_tx, events_rx) = mpsc::channel(8);
241 let (payloads_tx, mut payloads_rx) = mpsc::channel(8);
243 let request_builder_fut = run_request_builder(trace_rb, telemetry, events_rx, payloads_tx, flush_timeout);
244 let request_builder_handle = context
246 .topology_context()
247 .global_thread_pool() .spawn_traced_named("dd-traces-request-builder", request_builder_fut);
249
250 health.mark_ready();
251 debug!("Datadog Trace encoder started.");
252
253 loop {
254 select! {
255 biased; _ = health.live() => continue,
258 maybe_payload = payloads_rx.recv() => match maybe_payload {
259 Some(payload) => {
260 if let Err(e) = context.dispatcher().dispatch(payload).await {
262 error!("Failed to dispatch payload: {}", e);
263 }
264 }
265 None => break,
266 },
267 maybe_event_buffer = context.events().next() => match maybe_event_buffer {
268 Some(event_buffer) => events_tx.send(event_buffer).await
269 .error_context("Failed to send event buffer to request builder task.")?,
270 None => break,
271 },
272 }
273 }
274
275 drop(events_tx);
277
278 while let Some(payload) = payloads_rx.recv().await {
280 if let Err(e) = context.dispatcher().dispatch(payload).await {
281 error!("Failed to dispatch payload: {}", e);
282 }
283 }
284
285 match request_builder_handle.await {
287 Ok(Ok(())) => debug!("Request builder task stopped."),
288 Ok(Err(e)) => error!(error = %e, "Request builder task failed."),
289 Err(e) => error!(error = %e, "Request builder task panicked."),
290 }
291
292 debug!("Datadog Trace encoder stopped.");
293
294 Ok(())
295 }
296}
297
298async fn run_request_builder(
299 mut trace_request_builder: RequestBuilder<TraceEndpointEncoder>, telemetry: ComponentTelemetry,
300 mut events_rx: Receiver<EventsBuffer>, payloads_tx: Sender<PayloadsBuffer>, flush_timeout: std::time::Duration,
301) -> Result<(), GenericError> {
302 let mut pending_flush = false;
303 let pending_flush_timeout = sleep(flush_timeout);
304 tokio::pin!(pending_flush_timeout);
305
306 loop {
307 select! {
308 Some(event_buffer) = events_rx.recv() => {
309 for event in event_buffer {
310 let trace = match event.try_into_trace() {
311 Some(trace) => trace,
312 None => continue,
313 };
314 let trace_to_retry = match trace_request_builder.encode(trace).await {
317 Ok(None) => continue,
318 Ok(Some(trace)) => trace,
319 Err(e) => {
320 error!(error = %e, "Failed to encode trace.");
321 telemetry.events_dropped_encoder().increment(1);
322 continue;
323 }
324 };
325
326 let maybe_requests = trace_request_builder.flush().await;
327 if maybe_requests.is_empty() {
328 panic!("builder told us to flush, but gave us nothing");
329 }
330
331 for maybe_request in maybe_requests {
332 match maybe_request {
333 Ok((events, request)) => {
334 let payload_meta = PayloadMetadata::from_event_count(events);
335 let http_payload = HttpPayload::new(payload_meta, request);
336 let payload = Payload::Http(http_payload);
337
338 payloads_tx.send(payload).await
339 .map_err(|_| generic_error!("Failed to send payload to encoder."))?;
340 },
341 Err(e) => if e.is_recoverable() {
342 continue;
344 } else {
345 return Err(GenericError::from(e).context("Failed to flush request."));
346 }
347 }
348 }
349
350 if let Err(e) = trace_request_builder.encode(trace_to_retry).await {
352 error!(error = %e, "Failed to encode trace.");
353 telemetry.events_dropped_encoder().increment(1);
354 }
355 }
356
357 debug!("Processed event buffer.");
358
359 if !pending_flush {
361 pending_flush_timeout.as_mut().reset(tokio::time::Instant::now() + flush_timeout);
362 pending_flush = true;
363 }
364 },
365 _ = &mut pending_flush_timeout, if pending_flush => {
366 debug!("Flushing pending request(s).");
367
368 pending_flush = false;
369
370 let maybe_trace_requests = trace_request_builder.flush().await;
373 for maybe_request in maybe_trace_requests {
374 match maybe_request {
375 Ok((events, request)) => {
376 let payload_meta = PayloadMetadata::from_event_count(events);
377 let http_payload = HttpPayload::new(payload_meta, request);
378 let payload = Payload::Http(http_payload);
379
380 payloads_tx.send(payload).await
381 .map_err(|_| generic_error!("Failed to send payload to encoder."))?;
382 },
383 Err(e) => if e.is_recoverable() {
384 continue;
385 } else {
386 return Err(GenericError::from(e).context("Failed to flush request."));
387 }
388 }
389 }
390
391 debug!("All flushed requests sent to I/O task. Waiting for next event buffer...");
392 },
393
394 else => break,
396 }
397 }
398
399 Ok(())
400}
401
402#[derive(Debug)]
403struct TraceEndpointEncoder {
404 scratch: ScratchWriter<Vec<u8>>,
405 default_hostname: MetaString,
406 agent_hostname: String,
407 version: String,
408 env: String,
409 apm_config: ApmConfig,
410 otlp_traces: TracesConfig,
411 string_builder: StringBuilder,
412}
413
414impl TraceEndpointEncoder {
415 fn new(
416 default_hostname: MetaString, version: String, env: String, apm_config: ApmConfig, otlp_traces: TracesConfig,
417 ) -> Self {
418 Self {
419 scratch: ScratchWriter::new(Vec::with_capacity(8192)),
420 agent_hostname: default_hostname.as_ref().to_string(),
421 default_hostname,
422 version,
423 env,
424 apm_config,
425 otlp_traces,
426 string_builder: StringBuilder::new(),
427 }
428 }
429
430 fn encode_tracer_payload(&mut self, trace: &Trace, output_buffer: &mut Vec<u8>) -> std::io::Result<()> {
431 let sampling_rate = self.sampling_rate();
432 let resource_tags = trace.resource_tags();
433 let first_span = trace.spans().first();
434 let source = tags_to_source(resource_tags);
435
436 let container_id = resolve_container_id(resource_tags, first_span);
438 let lang = get_resource_tag_value(resource_tags, "telemetry.sdk.language");
439 let sdk_version = get_resource_tag_value(resource_tags, "telemetry.sdk.version").unwrap_or("");
440 let tracer_version = format!("otlp-{}", sdk_version);
441 let container_tags = resolve_container_tags(
442 resource_tags,
443 source.as_ref(),
444 self.otlp_traces.ignore_missing_datadog_fields,
445 );
446 let env = resolve_env(resource_tags, self.otlp_traces.ignore_missing_datadog_fields);
447 let hostname = resolve_hostname(
448 resource_tags,
449 source.as_ref(),
450 Some(self.default_hostname.as_ref()),
451 self.otlp_traces.ignore_missing_datadog_fields,
452 );
453 let app_version = resolve_app_version(resource_tags);
454
455 let (priority, dropped_trace, decision_maker, otlp_sr) = match trace.sampling() {
457 Some(sampling) => (
458 sampling.priority.unwrap_or(DEFAULT_CHUNK_PRIORITY),
459 sampling.dropped_trace,
460 sampling.decision_maker.as_deref(),
461 sampling.otlp_sampling_rate.unwrap_or(sampling_rate),
462 ),
463 None => (DEFAULT_CHUNK_PRIORITY, false, None, sampling_rate),
464 };
465
466 let mut ap_builder = AgentPayloadBuilder::new(&mut self.scratch);
468
469 ap_builder
470 .host_name(&self.agent_hostname)?
471 .env(&self.env)?
472 .agent_version(&self.version)?
473 .target_tps(self.apm_config.target_traces_per_second())?
474 .error_tps(self.apm_config.errors_per_second())?;
475
476 ap_builder.add_tracer_payloads(|tp| {
477 if let Some(cid) = container_id {
478 tp.container_id(cid)?;
479 }
480 if let Some(l) = lang {
481 tp.language_name(l)?;
482 }
483 tp.tracer_version(&tracer_version)?;
484
485 tp.add_chunks(|chunk| {
487 chunk.priority(priority)?;
488
489 for span in trace.spans() {
490 chunk.add_spans(|s| {
491 s.service(span.service())?
492 .name(span.name())?
493 .resource(span.resource())?
494 .trace_id(span.trace_id())?
495 .span_id(span.span_id())?
496 .parent_id(span.parent_id())?
497 .start(span.start() as i64)?
498 .duration(span.duration() as i64)?
499 .error(span.error())?;
500
501 {
502 let mut meta = s.meta();
503 for (k, v) in span.meta() {
504 meta.write_entry(k.as_ref(), v.as_ref())?;
505 }
506 }
507
508 {
509 let mut metrics = s.metrics();
510 for (k, v) in span.metrics() {
511 metrics.write_entry(k.as_ref(), *v)?;
512 }
513 }
514
515 s.type_(span.span_type())?;
516
517 {
518 let mut ms = s.meta_struct();
519 for (k, v) in span.meta_struct() {
520 ms.write_entry(k.as_ref(), v.as_slice())?;
521 }
522 }
523
524 for link in span.span_links() {
525 s.add_span_links(|sl| {
526 sl.trace_id(link.trace_id())?
527 .trace_id_high(link.trace_id_high())?
528 .span_id(link.span_id())?;
529 {
530 let mut attrs = sl.attributes();
531 for (k, v) in link.attributes() {
532 attrs.write_entry(&**k, &**v)?;
533 }
534 }
535 let tracestate = link.tracestate().to_string();
536 sl.tracestate(tracestate.as_str())?.flags(link.flags())?;
537 Ok(())
538 })?;
539 }
540
541 for event in span.span_events() {
542 s.add_span_events(|se| {
543 se.time_unix_nano(event.time_unix_nano())?.name(event.name())?;
544 {
545 let mut attrs = se.attributes();
546 for (k, v) in event.attributes() {
547 attrs.write_entry(&**k, |av| encode_attribute_value(av, v))?;
548 }
549 }
550 Ok(())
551 })?;
552 }
553
554 Ok(())
555 })?;
556 }
557
558 {
560 let mut tags = chunk.tags();
561 if let Some(dm) = decision_maker {
562 tags.write_entry(TAG_DECISION_MAKER, dm)?;
563 }
564
565 self.string_builder.clear();
566 write!(&mut self.string_builder, "{:.2}", otlp_sr)
567 .expect("should never fail to format sampling rate");
568 tags.write_entry(TAG_OTLP_SAMPLING_RATE, self.string_builder.as_str())?;
569 }
570
571 if dropped_trace {
572 chunk.dropped_trace(true)?;
573 }
574
575 Ok(())
576 })?;
577
578 if let Some(ct) = container_tags {
580 let mut tags = tp.tags();
581 tags.write_entry(CONTAINER_TAGS_META_KEY, &*ct)?;
582 }
583
584 if let Some(e) = env {
585 tp.env(e)?;
586 }
587 if let Some(h) = hostname {
588 tp.hostname(h)?;
589 }
590 if let Some(av) = app_version {
591 tp.app_version(av)?;
592 }
593
594 Ok(())
595 })?;
596
597 ap_builder.finish(output_buffer)?;
598
599 Ok(())
600 }
601
602 fn sampling_rate(&self) -> f64 {
603 let rate = self.otlp_traces.probabilistic_sampler.sampling_percentage / 100.0;
604 if rate <= 0.0 || rate >= 1.0 {
605 return 1.0;
606 }
607 rate
608 }
609}
610
611impl EndpointEncoder for TraceEndpointEncoder {
612 type Input = Trace;
613 type EncodeError = std::io::Error;
614 fn encoder_name() -> &'static str {
615 "traces"
616 }
617
618 fn compressed_size_limit(&self) -> usize {
619 DEFAULT_INTAKE_COMPRESSED_SIZE_LIMIT
620 }
621
622 fn uncompressed_size_limit(&self) -> usize {
623 DEFAULT_INTAKE_UNCOMPRESSED_SIZE_LIMIT
624 }
625
626 fn encode(&mut self, trace: &Self::Input, buffer: &mut Vec<u8>) -> Result<(), Self::EncodeError> {
627 self.encode_tracer_payload(trace, buffer)
628 }
629
630 fn endpoint_uri(&self) -> Uri {
631 PathAndQuery::from_static("/api/v0.2/traces").into()
632 }
633
634 fn endpoint_method(&self) -> Method {
635 Method::POST
636 }
637
638 fn content_type(&self) -> HeaderValue {
639 CONTENT_TYPE_PROTOBUF.clone()
640 }
641}
642
643fn encode_attribute_value<S: ScratchBuffer>(
644 builder: &mut AttributeAnyValueBuilder<'_, S>, value: &AttributeValue,
645) -> std::io::Result<()> {
646 match value {
647 AttributeValue::String(v) => {
648 builder.type_(AttributeAnyValueType::STRING_VALUE)?.string_value(v)?;
649 }
650 AttributeValue::Bool(v) => {
651 builder.type_(AttributeAnyValueType::BOOL_VALUE)?.bool_value(*v)?;
652 }
653 AttributeValue::Int(v) => {
654 builder.type_(AttributeAnyValueType::INT_VALUE)?.int_value(*v)?;
655 }
656 AttributeValue::Double(v) => {
657 builder.type_(AttributeAnyValueType::DOUBLE_VALUE)?.double_value(*v)?;
658 }
659 AttributeValue::Array(values) => {
660 builder.type_(AttributeAnyValueType::ARRAY_VALUE)?.array_value(|arr| {
661 for val in values {
662 arr.add_values(|av| encode_attribute_array_value(av, val))?;
663 }
664 Ok(())
665 })?;
666 }
667 }
668 Ok(())
669}
670
671fn encode_attribute_array_value<S: ScratchBuffer>(
672 builder: &mut AttributeArrayValueBuilder<'_, S>, value: &AttributeScalarValue,
673) -> std::io::Result<()> {
674 match value {
675 AttributeScalarValue::String(v) => {
676 builder.type_(AttributeArrayValueType::STRING_VALUE)?.string_value(v)?;
677 }
678 AttributeScalarValue::Bool(v) => {
679 builder.type_(AttributeArrayValueType::BOOL_VALUE)?.bool_value(*v)?;
680 }
681 AttributeScalarValue::Int(v) => {
682 builder.type_(AttributeArrayValueType::INT_VALUE)?.int_value(*v)?;
683 }
684 AttributeScalarValue::Double(v) => {
685 builder.type_(AttributeArrayValueType::DOUBLE_VALUE)?.double_value(*v)?;
686 }
687 }
688 Ok(())
689}
690
691fn get_resource_tag_value<'a>(resource_tags: &'a SharedTagSet, key: &str) -> Option<&'a str> {
692 resource_tags.get_single_tag(key).and_then(|t| t.value())
693}
694
695fn resolve_hostname<'a>(
696 resource_tags: &'a SharedTagSet, source: Option<&'a OtlpSource>, default_hostname: Option<&'a str>,
697 ignore_missing_fields: bool,
698) -> Option<&'a str> {
699 let mut hostname = match source {
700 Some(src) => match src.kind {
701 OtlpSourceKind::HostnameKind => Some(src.identifier.as_str()),
702 _ => Some(""),
703 },
704 None => default_hostname,
705 };
706
707 if ignore_missing_fields {
708 hostname = Some("");
709 }
710
711 if let Some(value) = get_resource_tag_value(resource_tags, KEY_DATADOG_HOST) {
712 hostname = Some(value);
713 }
714
715 hostname
716}
717
718fn resolve_env(resource_tags: &SharedTagSet, ignore_missing_fields: bool) -> Option<&str> {
719 if let Some(value) = get_resource_tag_value(resource_tags, KEY_DATADOG_ENVIRONMENT) {
720 return Some(value);
721 }
722 if ignore_missing_fields {
723 return None;
724 }
725 if let Some(value) = get_resource_tag_value(resource_tags, DEPLOYMENT_ENVIRONMENT_NAME) {
726 return Some(value);
727 }
728 get_resource_tag_value(resource_tags, DEPLOYMENT_ENVIRONMENT_KEY)
729}
730
731fn resolve_container_id<'a>(resource_tags: &'a SharedTagSet, first_span: Option<&'a DdSpan>) -> Option<&'a str> {
732 for key in [KEY_DATADOG_CONTAINER_ID, CONTAINER_ID, K8S_POD_UID] {
733 if let Some(value) = get_resource_tag_value(resource_tags, key) {
734 return Some(value);
735 }
736 }
737 if let Some(span) = first_span {
740 for (k, v) in span.meta() {
741 if k == KEY_DATADOG_CONTAINER_ID || k == K8S_POD_UID {
742 return Some(v.as_ref());
743 }
744 }
745 }
746 None
747}
748
749fn resolve_app_version(resource_tags: &SharedTagSet) -> Option<&str> {
750 if let Some(value) = get_resource_tag_value(resource_tags, KEY_DATADOG_VERSION) {
751 return Some(value);
752 }
753 get_resource_tag_value(resource_tags, SERVICE_VERSION)
754}
755
756fn resolve_container_tags(
757 resource_tags: &SharedTagSet, source: Option<&OtlpSource>, ignore_missing_fields: bool,
758) -> Option<MetaString> {
759 if let Some(tags) = get_resource_tag_value(resource_tags, KEY_DATADOG_CONTAINER_TAGS) {
763 if !tags.is_empty() {
764 return Some(MetaString::from(tags));
765 }
766 }
767
768 if ignore_missing_fields {
769 return None;
770 }
771 let mut container_tags = TagSet::default();
772 extract_container_tags_from_resource_tagset(resource_tags, &mut container_tags);
773 let is_fargate_source = source.is_some_and(|src| src.kind == OtlpSourceKind::AwsEcsFargateKind);
774 if container_tags.is_empty() && !is_fargate_source {
775 return None;
776 }
777
778 let mut flattened = flatten_container_tag(container_tags);
779 if is_fargate_source {
780 if let Some(src) = source {
781 append_tags(&mut flattened, &src.tag());
782 }
783 }
784
785 if flattened.is_empty() {
786 None
787 } else {
788 Some(MetaString::from(flattened))
789 }
790}
791
792fn flatten_container_tag(tags: TagSet) -> String {
793 let mut flattened = String::new();
794 for tag in tags {
795 if !flattened.is_empty() {
796 flattened.push(',');
797 }
798 flattened.push_str(tag.as_str());
799 }
800 flattened
801}
802
803fn append_tags(target: &mut String, tags: &str) {
804 if tags.is_empty() {
805 return;
806 }
807 if !target.is_empty() {
808 target.push(',');
809 }
810 target.push_str(tags);
811}