1#![allow(dead_code)]
2
3use std::time::Duration;
4
5use async_trait::async_trait;
6use datadog_protos::traces::builders::{
7 attribute_any_value::AttributeAnyValueType, attribute_array_value::AttributeArrayValueType, AgentPayloadBuilder,
8 AttributeAnyValueBuilder, AttributeArrayValueBuilder,
9};
10use http::{uri::PathAndQuery, HeaderValue, Method, Uri};
11use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
12use opentelemetry_semantic_conventions::resource::{
13 CONTAINER_ID, DEPLOYMENT_ENVIRONMENT_NAME, K8S_POD_UID, SERVICE_VERSION,
14};
15use piecemeal::{ScratchBuffer, ScratchWriter};
16use saluki_common::task::HandleExt as _;
17use saluki_config::GenericConfiguration;
18use saluki_context::tags::{SharedTagSet, TagSet};
19use saluki_core::data_model::event::trace::{AttributeScalarValue, AttributeValue, Span as DdSpan};
20use saluki_core::topology::{EventsBuffer, PayloadsBuffer};
21use saluki_core::{
22 components::{encoders::*, ComponentContext},
23 data_model::{
24 event::{trace::Trace, EventType},
25 payload::{HttpPayload, Payload, PayloadMetadata, PayloadType},
26 },
27 observability::ComponentMetricsExt as _,
28};
29use saluki_env::host::providers::BoxedHostProvider;
30use saluki_env::{EnvironmentProvider, HostProvider};
31use saluki_error::generic_error;
32use saluki_error::{ErrorContext as _, GenericError};
33use saluki_io::compression::CompressionScheme;
34use saluki_metrics::MetricsBuilder;
35use serde::Deserialize;
36use stringtheory::MetaString;
37use tokio::{
38 select,
39 sync::mpsc::{self, Receiver, Sender},
40 time::sleep,
41};
42use tracing::{debug, error};
43
44use crate::common::datadog::{
45 apm::ApmConfig,
46 io::RB_BUFFER_CHUNK_SIZE,
47 request_builder::{EndpointEncoder, RequestBuilder},
48 telemetry::ComponentTelemetry,
49 DEFAULT_INTAKE_COMPRESSED_SIZE_LIMIT, DEFAULT_INTAKE_UNCOMPRESSED_SIZE_LIMIT, TAG_DECISION_MAKER,
50};
51use crate::common::otlp::config::TracesConfig;
52use crate::common::otlp::util::{
53 extract_container_tags_from_resource_tagset, tags_to_source, Source as OtlpSource, SourceKind as OtlpSourceKind,
54 DEPLOYMENT_ENVIRONMENT_KEY, KEY_DATADOG_CONTAINER_ID, KEY_DATADOG_CONTAINER_TAGS, KEY_DATADOG_ENVIRONMENT,
55 KEY_DATADOG_HOST, KEY_DATADOG_VERSION,
56};
57
58const CONTAINER_TAGS_META_KEY: &str = "_dd.tags.container";
59const MAX_TRACES_PER_PAYLOAD: usize = 10000;
60static CONTENT_TYPE_PROTOBUF: HeaderValue = HeaderValue::from_static("application/x-protobuf");
61
62const TAG_OTLP_SAMPLING_RATE: &str = "_dd.otlp_sr";
64const DEFAULT_CHUNK_PRIORITY: i32 = 1; fn default_serializer_compressor_kind() -> String {
67 "zstd".to_string()
68}
69
70const fn default_zstd_compressor_level() -> i32 {
71 3
72}
73
74const fn default_flush_timeout_secs() -> u64 {
75 2
76}
77
78fn default_env() -> String {
79 "none".to_string()
80}
81
82#[derive(Deserialize)]
88pub struct DatadogTraceConfiguration {
89 #[serde(
90 rename = "serializer_compressor_kind", default = "default_serializer_compressor_kind"
92 )]
93 compressor_kind: String,
94
95 #[serde(
96 rename = "serializer_zstd_compressor_level",
97 default = "default_zstd_compressor_level"
98 )]
99 zstd_compressor_level: i32,
100
101 #[serde(default = "default_flush_timeout_secs")]
109 flush_timeout_secs: u64,
110
111 #[serde(skip)]
112 default_hostname: Option<String>,
113
114 #[serde(skip)]
115 version: String,
116
117 #[serde(skip)]
118 apm_config: ApmConfig,
119
120 #[serde(skip)]
121 otlp_traces: TracesConfig,
122
123 #[serde(default = "default_env")]
124 env: String,
125}
126
127impl DatadogTraceConfiguration {
128 pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
130 let mut trace_config: Self = config.as_typed()?;
131
132 let app_details = saluki_metadata::get_app_details();
133 trace_config.version = format!("agent-data-plane/{}", app_details.version().raw());
134
135 trace_config.apm_config = ApmConfig::from_configuration(config)?;
136 trace_config.otlp_traces = config.try_get_typed("otlp_config.traces")?.unwrap_or_default();
137
138 Ok(trace_config)
139 }
140}
141
142impl DatadogTraceConfiguration {
143 pub async fn with_environment_provider<E>(mut self, environment_provider: E) -> Result<Self, GenericError>
145 where
146 E: EnvironmentProvider<Host = BoxedHostProvider>,
147 {
148 let host_provider = environment_provider.host();
149 let hostname = host_provider.get_hostname().await?;
150 self.default_hostname = Some(hostname);
151 Ok(self)
152 }
153}
154
155#[async_trait]
156impl EncoderBuilder for DatadogTraceConfiguration {
157 fn input_event_type(&self) -> EventType {
158 EventType::Trace
159 }
160
161 fn output_payload_type(&self) -> PayloadType {
162 PayloadType::Http
163 }
164
165 async fn build(&self, context: ComponentContext) -> Result<Box<dyn Encoder + Send>, GenericError> {
166 let metrics_builder = MetricsBuilder::from_component_context(&context);
167 let telemetry = ComponentTelemetry::from_builder(&metrics_builder);
168 let compression_scheme = CompressionScheme::new(&self.compressor_kind, self.zstd_compressor_level);
169
170 let default_hostname = self.default_hostname.clone().unwrap_or_default();
171 let default_hostname = MetaString::from(default_hostname);
172
173 let mut trace_rb = RequestBuilder::new(
176 TraceEndpointEncoder::new(
177 default_hostname,
178 self.version.clone(),
179 self.env.clone(),
180 self.apm_config.clone(),
181 self.otlp_traces.clone(),
182 ),
183 compression_scheme,
184 RB_BUFFER_CHUNK_SIZE,
185 )
186 .await?;
187 trace_rb.with_max_inputs_per_payload(MAX_TRACES_PER_PAYLOAD);
188
189 let flush_timeout = match self.flush_timeout_secs {
190 0 => Duration::from_millis(10),
193 secs => Duration::from_secs(secs),
194 };
195
196 Ok(Box::new(DatadogTrace {
197 trace_rb,
198 telemetry,
199 flush_timeout,
200 }))
201 }
202}
203
204impl MemoryBounds for DatadogTraceConfiguration {
205 fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
206 builder
208 .minimum()
209 .with_single_value::<DatadogTrace>("component struct")
210 .with_array::<EventsBuffer>("request builder events channel", 8)
211 .with_array::<PayloadsBuffer>("request builder payloads channel", 8);
212
213 builder
214 .firm()
215 .with_array::<Trace>("traces split re-encode buffer", MAX_TRACES_PER_PAYLOAD);
216 }
217}
218
219pub struct DatadogTrace {
220 trace_rb: RequestBuilder<TraceEndpointEncoder>,
221 telemetry: ComponentTelemetry,
222 flush_timeout: Duration,
223}
224
225#[async_trait]
227impl Encoder for DatadogTrace {
228 async fn run(mut self: Box<Self>, mut context: EncoderContext) -> Result<(), GenericError> {
229 let Self {
230 trace_rb,
231 telemetry,
232 flush_timeout,
233 } = *self;
234
235 let mut health = context.take_health_handle();
236
237 let (events_tx, events_rx) = mpsc::channel(8);
240 let (payloads_tx, mut payloads_rx) = mpsc::channel(8);
242 let request_builder_fut = run_request_builder(trace_rb, telemetry, events_rx, payloads_tx, flush_timeout);
243 let request_builder_handle = context
245 .topology_context()
246 .global_thread_pool() .spawn_traced_named("dd-traces-request-builder", request_builder_fut);
248
249 health.mark_ready();
250 debug!("Datadog Trace encoder started.");
251
252 loop {
253 select! {
254 biased; _ = health.live() => continue,
257 maybe_payload = payloads_rx.recv() => match maybe_payload {
258 Some(payload) => {
259 if let Err(e) = context.dispatcher().dispatch(payload).await {
261 error!("Failed to dispatch payload: {}", e);
262 }
263 }
264 None => break,
265 },
266 maybe_event_buffer = context.events().next() => match maybe_event_buffer {
267 Some(event_buffer) => events_tx.send(event_buffer).await
268 .error_context("Failed to send event buffer to request builder task.")?,
269 None => break,
270 },
271 }
272 }
273
274 drop(events_tx);
276
277 while let Some(payload) = payloads_rx.recv().await {
279 if let Err(e) = context.dispatcher().dispatch(payload).await {
280 error!("Failed to dispatch payload: {}", e);
281 }
282 }
283
284 match request_builder_handle.await {
286 Ok(Ok(())) => debug!("Request builder task stopped."),
287 Ok(Err(e)) => error!(error = %e, "Request builder task failed."),
288 Err(e) => error!(error = %e, "Request builder task panicked."),
289 }
290
291 debug!("Datadog Trace encoder stopped.");
292
293 Ok(())
294 }
295}
296
297async fn run_request_builder(
298 mut trace_request_builder: RequestBuilder<TraceEndpointEncoder>, telemetry: ComponentTelemetry,
299 mut events_rx: Receiver<EventsBuffer>, payloads_tx: Sender<PayloadsBuffer>, flush_timeout: std::time::Duration,
300) -> Result<(), GenericError> {
301 let mut pending_flush = false;
302 let pending_flush_timeout = sleep(flush_timeout);
303 tokio::pin!(pending_flush_timeout);
304
305 loop {
306 select! {
307 Some(event_buffer) = events_rx.recv() => {
308 for event in event_buffer {
309 let trace = match event.try_into_trace() {
310 Some(trace) => trace,
311 None => continue,
312 };
313 let trace_to_retry = match trace_request_builder.encode(trace).await {
316 Ok(None) => continue,
317 Ok(Some(trace)) => trace,
318 Err(e) => {
319 error!(error = %e, "Failed to encode trace.");
320 telemetry.events_dropped_encoder().increment(1);
321 continue;
322 }
323 };
324
325 let maybe_requests = trace_request_builder.flush().await;
326 if maybe_requests.is_empty() {
327 panic!("builder told us to flush, but gave us nothing");
328 }
329
330 for maybe_request in maybe_requests {
331 match maybe_request {
332 Ok((events, request)) => {
333 let payload_meta = PayloadMetadata::from_event_count(events);
334 let http_payload = HttpPayload::new(payload_meta, request);
335 let payload = Payload::Http(http_payload);
336
337 payloads_tx.send(payload).await
338 .map_err(|_| generic_error!("Failed to send payload to encoder."))?;
339 },
340 Err(e) => if e.is_recoverable() {
341 continue;
343 } else {
344 return Err(GenericError::from(e).context("Failed to flush request."));
345 }
346 }
347 }
348
349 if let Err(e) = trace_request_builder.encode(trace_to_retry).await {
351 error!(error = %e, "Failed to encode trace.");
352 telemetry.events_dropped_encoder().increment(1);
353 }
354 }
355
356 debug!("Processed event buffer.");
357
358 if !pending_flush {
360 pending_flush_timeout.as_mut().reset(tokio::time::Instant::now() + flush_timeout);
361 pending_flush = true;
362 }
363 },
364 _ = &mut pending_flush_timeout, if pending_flush => {
365 debug!("Flushing pending request(s).");
366
367 pending_flush = false;
368
369 let maybe_trace_requests = trace_request_builder.flush().await;
372 for maybe_request in maybe_trace_requests {
373 match maybe_request {
374 Ok((events, request)) => {
375 let payload_meta = PayloadMetadata::from_event_count(events);
376 let http_payload = HttpPayload::new(payload_meta, request);
377 let payload = Payload::Http(http_payload);
378
379 payloads_tx.send(payload).await
380 .map_err(|_| generic_error!("Failed to send payload to encoder."))?;
381 },
382 Err(e) => if e.is_recoverable() {
383 continue;
384 } else {
385 return Err(GenericError::from(e).context("Failed to flush request."));
386 }
387 }
388 }
389
390 debug!("All flushed requests sent to I/O task. Waiting for next event buffer...");
391 },
392
393 else => break,
395 }
396 }
397
398 Ok(())
399}
400
401#[derive(Debug)]
402struct TraceEndpointEncoder {
403 scratch: ScratchWriter<Vec<u8>>,
404 default_hostname: MetaString,
405 agent_hostname: String,
406 version: String,
407 env: String,
408 apm_config: ApmConfig,
409 otlp_traces: TracesConfig,
410}
411
412impl TraceEndpointEncoder {
413 fn new(
414 default_hostname: MetaString, version: String, env: String, apm_config: ApmConfig, otlp_traces: TracesConfig,
415 ) -> Self {
416 Self {
417 scratch: ScratchWriter::new(Vec::with_capacity(8192)),
418 agent_hostname: default_hostname.as_ref().to_string(),
419 default_hostname,
420 version,
421 env,
422 apm_config,
423 otlp_traces,
424 }
425 }
426
427 fn encode_tracer_payload(&mut self, trace: &Trace, output_buffer: &mut Vec<u8>) -> std::io::Result<()> {
428 let sampling_rate = self.sampling_rate();
429 let resource_tags = trace.resource_tags();
430 let first_span = trace.spans().first();
431 let source = tags_to_source(resource_tags);
432
433 let container_id = resolve_container_id(resource_tags, first_span);
435 let lang = get_resource_tag_value(resource_tags, "telemetry.sdk.language");
436 let sdk_version = get_resource_tag_value(resource_tags, "telemetry.sdk.version").unwrap_or("");
437 let tracer_version = format!("otlp-{}", sdk_version);
438 let container_tags = resolve_container_tags(
439 resource_tags,
440 source.as_ref(),
441 self.otlp_traces.ignore_missing_datadog_fields,
442 );
443 let env = resolve_env(resource_tags, self.otlp_traces.ignore_missing_datadog_fields);
444 let hostname = resolve_hostname(
445 resource_tags,
446 source.as_ref(),
447 Some(self.default_hostname.as_ref()),
448 self.otlp_traces.ignore_missing_datadog_fields,
449 );
450 let app_version = resolve_app_version(resource_tags);
451
452 let (priority, dropped_trace, decision_maker, otlp_sr) = match trace.sampling() {
454 Some(sampling) => (
455 sampling.priority.unwrap_or(DEFAULT_CHUNK_PRIORITY),
456 sampling.dropped_trace,
457 sampling.decision_maker.as_deref(),
458 sampling
459 .otlp_sampling_rate
460 .as_ref()
461 .map(|sr| sr.to_string())
462 .unwrap_or_else(|| format!("{:.2}", sampling_rate)),
463 ),
464 None => (DEFAULT_CHUNK_PRIORITY, false, None, format!("{:.2}", sampling_rate)),
465 };
466
467 let mut ap_builder = AgentPayloadBuilder::new(&mut self.scratch);
469
470 ap_builder
471 .host_name(&self.agent_hostname)?
472 .env(&self.env)?
473 .agent_version(&self.version)?
474 .target_tps(self.apm_config.target_traces_per_second())?
475 .error_tps(self.apm_config.errors_per_second())?;
476
477 ap_builder.add_tracer_payloads(|tp| {
478 if let Some(cid) = container_id {
479 tp.container_id(cid)?;
480 }
481 if let Some(l) = lang {
482 tp.language_name(l)?;
483 }
484 tp.tracer_version(&tracer_version)?;
485
486 tp.add_chunks(|chunk| {
488 chunk.priority(priority)?;
489
490 for span in trace.spans() {
491 chunk.add_spans(|s| {
492 s.service(span.service())?
493 .name(span.name())?
494 .resource(span.resource())?
495 .trace_id(span.trace_id())?
496 .span_id(span.span_id())?
497 .parent_id(span.parent_id())?
498 .start(span.start() as i64)?
499 .duration(span.duration() as i64)?
500 .error(span.error())?;
501
502 {
503 let mut meta = s.meta();
504 for (k, v) in span.meta() {
505 meta.write_entry(k.as_ref(), v.as_ref())?;
506 }
507 }
508
509 {
510 let mut metrics = s.metrics();
511 for (k, v) in span.metrics() {
512 metrics.write_entry(k.as_ref(), *v)?;
513 }
514 }
515
516 s.type_(span.span_type())?;
517
518 {
519 let mut ms = s.meta_struct();
520 for (k, v) in span.meta_struct() {
521 ms.write_entry(k.as_ref(), v.as_slice())?;
522 }
523 }
524
525 for link in span.span_links() {
526 s.add_span_links(|sl| {
527 sl.trace_id(link.trace_id())?
528 .trace_id_high(link.trace_id_high())?
529 .span_id(link.span_id())?;
530 {
531 let mut attrs = sl.attributes();
532 for (k, v) in link.attributes() {
533 attrs.write_entry(&**k, &**v)?;
534 }
535 }
536 let tracestate = link.tracestate().to_string();
537 sl.tracestate(tracestate.as_str())?.flags(link.flags())?;
538 Ok(())
539 })?;
540 }
541
542 for event in span.span_events() {
543 s.add_span_events(|se| {
544 se.time_unix_nano(event.time_unix_nano())?.name(event.name())?;
545 {
546 let mut attrs = se.attributes();
547 for (k, v) in event.attributes() {
548 attrs.write_entry(&**k, |av| encode_attribute_value(av, v))?;
549 }
550 }
551 Ok(())
552 })?;
553 }
554
555 Ok(())
556 })?;
557 }
558
559 {
561 let mut tags = chunk.tags();
562 if let Some(dm) = decision_maker {
563 tags.write_entry(TAG_DECISION_MAKER, dm)?;
564 }
565 tags.write_entry(TAG_OTLP_SAMPLING_RATE, otlp_sr.as_str())?;
566 }
567
568 if dropped_trace {
569 chunk.dropped_trace(true)?;
570 }
571
572 Ok(())
573 })?;
574
575 if let Some(ct) = container_tags {
577 let mut tags = tp.tags();
578 tags.write_entry(CONTAINER_TAGS_META_KEY, &*ct)?;
579 }
580
581 if let Some(e) = env {
582 tp.env(e)?;
583 }
584 if let Some(h) = hostname {
585 tp.hostname(h)?;
586 }
587 if let Some(av) = app_version {
588 tp.app_version(av)?;
589 }
590
591 Ok(())
592 })?;
593
594 ap_builder.finish(output_buffer)?;
595
596 Ok(())
597 }
598
599 fn sampling_rate(&self) -> f64 {
600 let rate = self.otlp_traces.probabilistic_sampler.sampling_percentage / 100.0;
601 if rate <= 0.0 || rate >= 1.0 {
602 return 1.0;
603 }
604 rate
605 }
606}
607
608impl EndpointEncoder for TraceEndpointEncoder {
609 type Input = Trace;
610 type EncodeError = std::io::Error;
611 fn encoder_name() -> &'static str {
612 "traces"
613 }
614
615 fn compressed_size_limit(&self) -> usize {
616 DEFAULT_INTAKE_COMPRESSED_SIZE_LIMIT
617 }
618
619 fn uncompressed_size_limit(&self) -> usize {
620 DEFAULT_INTAKE_UNCOMPRESSED_SIZE_LIMIT
621 }
622
623 fn encode(&mut self, trace: &Self::Input, buffer: &mut Vec<u8>) -> Result<(), Self::EncodeError> {
624 self.encode_tracer_payload(trace, buffer)
625 }
626
627 fn endpoint_uri(&self) -> Uri {
628 PathAndQuery::from_static("/api/v0.2/traces").into()
629 }
630
631 fn endpoint_method(&self) -> Method {
632 Method::POST
633 }
634
635 fn content_type(&self) -> HeaderValue {
636 CONTENT_TYPE_PROTOBUF.clone()
637 }
638}
639
640fn encode_attribute_value<S: ScratchBuffer>(
641 builder: &mut AttributeAnyValueBuilder<'_, S>, value: &AttributeValue,
642) -> std::io::Result<()> {
643 match value {
644 AttributeValue::String(v) => {
645 builder.type_(AttributeAnyValueType::STRING_VALUE)?.string_value(v)?;
646 }
647 AttributeValue::Bool(v) => {
648 builder.type_(AttributeAnyValueType::BOOL_VALUE)?.bool_value(*v)?;
649 }
650 AttributeValue::Int(v) => {
651 builder.type_(AttributeAnyValueType::INT_VALUE)?.int_value(*v)?;
652 }
653 AttributeValue::Double(v) => {
654 builder.type_(AttributeAnyValueType::DOUBLE_VALUE)?.double_value(*v)?;
655 }
656 AttributeValue::Array(values) => {
657 builder.type_(AttributeAnyValueType::ARRAY_VALUE)?.array_value(|arr| {
658 for val in values {
659 arr.add_values(|av| encode_attribute_array_value(av, val))?;
660 }
661 Ok(())
662 })?;
663 }
664 }
665 Ok(())
666}
667
668fn encode_attribute_array_value<S: ScratchBuffer>(
669 builder: &mut AttributeArrayValueBuilder<'_, S>, value: &AttributeScalarValue,
670) -> std::io::Result<()> {
671 match value {
672 AttributeScalarValue::String(v) => {
673 builder.type_(AttributeArrayValueType::STRING_VALUE)?.string_value(v)?;
674 }
675 AttributeScalarValue::Bool(v) => {
676 builder.type_(AttributeArrayValueType::BOOL_VALUE)?.bool_value(*v)?;
677 }
678 AttributeScalarValue::Int(v) => {
679 builder.type_(AttributeArrayValueType::INT_VALUE)?.int_value(*v)?;
680 }
681 AttributeScalarValue::Double(v) => {
682 builder.type_(AttributeArrayValueType::DOUBLE_VALUE)?.double_value(*v)?;
683 }
684 }
685 Ok(())
686}
687
688fn get_resource_tag_value<'a>(resource_tags: &'a SharedTagSet, key: &str) -> Option<&'a str> {
689 resource_tags.get_single_tag(key).and_then(|t| t.value())
690}
691
692fn resolve_hostname<'a>(
693 resource_tags: &'a SharedTagSet, source: Option<&'a OtlpSource>, default_hostname: Option<&'a str>,
694 ignore_missing_fields: bool,
695) -> Option<&'a str> {
696 let mut hostname = match source {
697 Some(src) => match src.kind {
698 OtlpSourceKind::HostnameKind => Some(src.identifier.as_str()),
699 _ => Some(""),
700 },
701 None => default_hostname,
702 };
703
704 if ignore_missing_fields {
705 hostname = Some("");
706 }
707
708 if let Some(value) = get_resource_tag_value(resource_tags, KEY_DATADOG_HOST) {
709 hostname = Some(value);
710 }
711
712 hostname
713}
714
715fn resolve_env(resource_tags: &SharedTagSet, ignore_missing_fields: bool) -> Option<&str> {
716 if let Some(value) = get_resource_tag_value(resource_tags, KEY_DATADOG_ENVIRONMENT) {
717 return Some(value);
718 }
719 if ignore_missing_fields {
720 return None;
721 }
722 if let Some(value) = get_resource_tag_value(resource_tags, DEPLOYMENT_ENVIRONMENT_NAME) {
723 return Some(value);
724 }
725 get_resource_tag_value(resource_tags, DEPLOYMENT_ENVIRONMENT_KEY)
726}
727
728fn resolve_container_id<'a>(resource_tags: &'a SharedTagSet, first_span: Option<&'a DdSpan>) -> Option<&'a str> {
729 for key in [KEY_DATADOG_CONTAINER_ID, CONTAINER_ID, K8S_POD_UID] {
730 if let Some(value) = get_resource_tag_value(resource_tags, key) {
731 return Some(value);
732 }
733 }
734 if let Some(span) = first_span {
737 for (k, v) in span.meta() {
738 if k == KEY_DATADOG_CONTAINER_ID || k == K8S_POD_UID {
739 return Some(v.as_ref());
740 }
741 }
742 }
743 None
744}
745
746fn resolve_app_version(resource_tags: &SharedTagSet) -> Option<&str> {
747 if let Some(value) = get_resource_tag_value(resource_tags, KEY_DATADOG_VERSION) {
748 return Some(value);
749 }
750 get_resource_tag_value(resource_tags, SERVICE_VERSION)
751}
752
753fn resolve_container_tags(
754 resource_tags: &SharedTagSet, source: Option<&OtlpSource>, ignore_missing_fields: bool,
755) -> Option<MetaString> {
756 if let Some(tags) = get_resource_tag_value(resource_tags, KEY_DATADOG_CONTAINER_TAGS) {
760 if !tags.is_empty() {
761 return Some(MetaString::from(tags));
762 }
763 }
764
765 if ignore_missing_fields {
766 return None;
767 }
768 let mut container_tags = TagSet::default();
769 extract_container_tags_from_resource_tagset(resource_tags, &mut container_tags);
770 let is_fargate_source = source.is_some_and(|src| src.kind == OtlpSourceKind::AwsEcsFargateKind);
771 if container_tags.is_empty() && !is_fargate_source {
772 return None;
773 }
774
775 let mut flattened = flatten_container_tag(container_tags);
776 if is_fargate_source {
777 if let Some(src) = source {
778 append_tags(&mut flattened, &src.tag());
779 }
780 }
781
782 if flattened.is_empty() {
783 None
784 } else {
785 Some(MetaString::from(flattened))
786 }
787}
788
789fn flatten_container_tag(tags: TagSet) -> String {
790 let mut flattened = String::new();
791 for tag in tags {
792 if !flattened.is_empty() {
793 flattened.push(',');
794 }
795 flattened.push_str(tag.as_str());
796 }
797 flattened
798}
799
800fn append_tags(target: &mut String, tags: &str) {
801 if tags.is_empty() {
802 return;
803 }
804 if !target.is_empty() {
805 target.push(',');
806 }
807 target.push_str(tags);
808}