LCOV - code coverage report
Current view: top level - datadog - trace_segment.cpp (source / functions) Hit Total Coverage
Test: filtered.info Lines: 188 193 97.4 %
Date: 2024-01-03 20:30:12 Functions: 16 16 100.0 %

          Line data    Source code
       1             : #include "trace_segment.h"
       2             : 
       3             : #include <cassert>
       4             : #include <string>
       5             : #include <unordered_map>
       6             : #include <utility>
       7             : #include <vector>
       8             : 
       9             : #include "collector.h"
      10             : #include "collector_response.h"
      11             : #include "dict_writer.h"
      12             : #include "error.h"
      13             : #include "hex.h"
      14             : #include "logger.h"
      15             : #include "optional.h"
      16             : #include "platform_util.h"
      17             : #include "random.h"
      18             : #include "span_data.h"
      19             : #include "span_defaults.h"
      20             : #include "span_sampler.h"
      21             : #include "tag_propagation.h"
      22             : #include "tags.h"
      23             : #include "trace_sampler.h"
      24             : #include "w3c_propagation.h"
      25             : 
      26             : namespace datadog {
      27             : namespace tracing {
      28             : namespace {
      29             : 
      30             : struct Cache {
      31             :   static int process_id;
      32             : 
      33           1 :   static void recalculate_values() { process_id = get_process_id(); }
      34             : 
      35           1 :   Cache() {
      36           1 :     recalculate_values();
      37           1 :     at_fork_in_child(&recalculate_values);
      38           1 :   }
      39             : };
      40             : 
      41             : int Cache::process_id;
      42             : 
      43             : // `cache_singleton` exists solely to invoke `Cache`'s constructor.
      44             : // All data members are static, so use e.g. `Cache::process_id` instead of
      45             : // `cache_singleton.process_id`.
      46             : Cache cache_singleton;
      47             : 
      48             : // Encode the specified `trace_tags`. If the encoded value is not longer than
      49             : // the specified `tags_header_max_size`, then set it as the "x-datadog-tags"
      50             : // header using the specified `writer`. If the encoded value is oversized, then
      51             : // write a diagnostic to the specified `logger` and set a propagation error tag
      52             : // on the specified `local_root_tags`.
      53          12 : void inject_trace_tags(
      54             :     DictWriter& writer,
      55             :     const std::vector<std::pair<std::string, std::string>>& trace_tags,
      56             :     std::size_t tags_header_max_size,
      57             :     std::unordered_map<std::string, std::string>& local_root_tags,
      58             :     Logger& logger) {
      59          12 :   const std::string encoded_trace_tags = encode_tags(trace_tags);
      60             : 
      61          12 :   if (encoded_trace_tags.size() > tags_header_max_size) {
      62           1 :     std::string message;
      63             :     message +=
      64             :         "Serialized x-datadog-tags header value is too large.  The configured "
      65           1 :         "maximum size is ";
      66           1 :     message += std::to_string(tags_header_max_size);
      67           1 :     message += " bytes, but the encoded value is ";
      68           1 :     message += std::to_string(encoded_trace_tags.size());
      69           1 :     message += " bytes.";
      70           1 :     logger.log_error(message);
      71           1 :     local_root_tags[tags::internal::propagation_error] = "inject_max_size";
      72          12 :   } else if (!encoded_trace_tags.empty()) {
      73           9 :     writer.set("x-datadog-tags", encoded_trace_tags);
      74             :   }
      75          12 : }
      76             : 
      77             : }  // namespace
      78             : 
      79       82875 : TraceSegment::TraceSegment(
      80             :     const std::shared_ptr<Logger>& logger,
      81             :     const std::shared_ptr<Collector>& collector,
      82             :     const std::shared_ptr<TracerTelemetry>& tracer_telemetry,
      83             :     const std::shared_ptr<TraceSampler>& trace_sampler,
      84             :     const std::shared_ptr<SpanSampler>& span_sampler,
      85             :     const std::shared_ptr<const SpanDefaults>& defaults,
      86             :     const RuntimeID& runtime_id,
      87             :     const std::vector<PropagationStyle>& injection_styles,
      88             :     const Optional<std::string>& hostname, Optional<std::string> origin,
      89             :     std::size_t tags_header_max_size,
      90             :     std::vector<std::pair<std::string, std::string>> trace_tags,
      91             :     Optional<SamplingDecision> sampling_decision,
      92             :     Optional<std::string> additional_w3c_tracestate,
      93             :     Optional<std::string> additional_datadog_w3c_tracestate,
      94       82875 :     std::unique_ptr<SpanData> local_root)
      95       82875 :     : logger_(logger),
      96       82875 :       collector_(collector),
      97       82875 :       tracer_telemetry_(tracer_telemetry),
      98       82875 :       trace_sampler_(trace_sampler),
      99       82875 :       span_sampler_(span_sampler),
     100       82875 :       defaults_(defaults),
     101       82875 :       runtime_id_(runtime_id),
     102       82875 :       injection_styles_(injection_styles),
     103       82875 :       hostname_(hostname),
     104       82875 :       origin_(std::move(origin)),
     105       82875 :       tags_header_max_size_(tags_header_max_size),
     106       82875 :       trace_tags_(std::move(trace_tags)),
     107       82875 :       num_finished_spans_(0),
     108       82875 :       sampling_decision_(std::move(sampling_decision)),
     109       82875 :       additional_w3c_tracestate_(std::move(additional_w3c_tracestate)),
     110       82875 :       additional_datadog_w3c_tracestate_(
     111      248625 :           std::move(additional_datadog_w3c_tracestate)) {
     112       82875 :   assert(logger_);
     113       82875 :   assert(collector_);
     114       82875 :   assert(tracer_telemetry_);
     115       82875 :   assert(trace_sampler_);
     116       82875 :   assert(span_sampler_);
     117       82875 :   assert(defaults_);
     118             : 
     119       82875 :   register_span(std::move(local_root));
     120       82875 : }
     121             : 
     122          42 : const SpanDefaults& TraceSegment::defaults() const { return *defaults_; }
     123             : 
     124           4 : const Optional<std::string>& TraceSegment::hostname() const {
     125           4 :   return hostname_;
     126             : }
     127             : 
     128           1 : const Optional<std::string>& TraceSegment::origin() const { return origin_; }
     129             : 
     130          34 : Optional<SamplingDecision> TraceSegment::sampling_decision() const {
     131             :   // `sampling_decision_` can change, so we need a lock.
     132          34 :   std::lock_guard<std::mutex> lock(mutex_);
     133          34 :   return sampling_decision_;
     134          34 : }
     135             : 
     136           1 : Logger& TraceSegment::logger() const { return *logger_; }
     137             : 
     138       82916 : void TraceSegment::register_span(std::unique_ptr<SpanData> span) {
     139       82916 :   tracer_telemetry_->metrics().tracer.spans_created.inc();
     140             : 
     141       82916 :   std::lock_guard<std::mutex> lock(mutex_);
     142       82916 :   assert(spans_.empty() || num_finished_spans_ < spans_.size());
     143       82916 :   spans_.emplace_back(std::move(span));
     144       82916 : }
     145             : 
     146       82916 : void TraceSegment::span_finished() {
     147             :   {
     148       82916 :     tracer_telemetry_->metrics().tracer.spans_finished.inc();
     149       82916 :     std::lock_guard<std::mutex> lock(mutex_);
     150       82916 :     ++num_finished_spans_;
     151       82916 :     assert(num_finished_spans_ <= spans_.size());
     152       82916 :     if (num_finished_spans_ < spans_.size()) {
     153          41 :       return;
     154             :     }
     155       82916 :   }
     156             :   // We don't need the lock anymore.  There's nobody left to call our methods.
     157             :   // On the other hand, there's nobody left to contend for the mutex, so it
     158             :   // doesn't make any difference.
     159       82875 :   make_sampling_decision_if_null();
     160       82875 :   assert(sampling_decision_);
     161             : 
     162             :   // All of our spans are finished.  Run the span sampler, finalize the spans,
     163             :   // and then send the spans to the collector.
     164       82875 :   if (sampling_decision_->priority <= 0) {
     165             :     // Span sampling happens when the trace is dropped.
     166       74458 :     for (const auto& span_ptr : spans_) {
     167       37238 :       SpanData& span = *span_ptr;
     168       37238 :       auto* rule = span_sampler_->match(span);
     169       37238 :       if (!rule) {
     170       36127 :         continue;
     171             :       }
     172        2012 :       const SamplingDecision decision = rule->decide(span);
     173        2012 :       if (decision.priority <= 0) {
     174         901 :         continue;
     175             :       }
     176        2222 :       span.numeric_tags[tags::internal::span_sampling_mechanism] =
     177        1111 :           *decision.mechanism;
     178        1111 :       span.numeric_tags[tags::internal::span_sampling_rule_rate] =
     179        1111 :           *decision.configured_rate;
     180        1111 :       if (decision.limiter_max_per_second) {
     181         100 :         span.numeric_tags[tags::internal::span_sampling_limit] =
     182         100 :             *decision.limiter_max_per_second;
     183             :       }
     184             :     }
     185             :   }
     186             : 
     187       82875 :   const SamplingDecision& decision = *sampling_decision_;
     188             : 
     189       82875 :   auto& local_root = *spans_.front();
     190       82875 :   local_root.tags.insert(trace_tags_.begin(), trace_tags_.end());
     191      165750 :   local_root.numeric_tags[tags::internal::sampling_priority] =
     192       82875 :       decision.priority;
     193       82875 :   if (hostname_) {
     194           3 :     local_root.tags[tags::internal::hostname] = *hostname_;
     195             :   }
     196       82875 :   if (decision.origin == SamplingDecision::Origin::LOCAL) {
     197      145665 :     if (decision.mechanism == int(SamplingMechanism::AGENT_RATE) ||
     198      145665 :         decision.mechanism == int(SamplingMechanism::DEFAULT)) {
     199       30494 :       local_root.numeric_tags[tags::internal::agent_sample_rate] =
     200       30494 :           *decision.configured_rate;
     201       52338 :     } else if (decision.mechanism == int(SamplingMechanism::RULE)) {
     202       52327 :       local_root.numeric_tags[tags::internal::rule_sample_rate] =
     203       52327 :           *decision.configured_rate;
     204       52327 :       if (decision.limiter_effective_rate) {
     205       25316 :         local_root.numeric_tags[tags::internal::rule_limiter_sample_rate] =
     206       25316 :             *decision.limiter_effective_rate;
     207             :       }
     208             :     }
     209             :   }
     210             : 
     211             :   // Some tags are repeated on all spans.
     212      165791 :   for (const auto& span_ptr : spans_) {
     213       82916 :     SpanData& span = *span_ptr;
     214       82916 :     if (origin_) {
     215          36 :       span.tags[tags::internal::origin] = *origin_;
     216             :     }
     217       82916 :     span.numeric_tags[tags::internal::process_id] = Cache::process_id;
     218       82916 :     span.tags[tags::internal::language] = "cpp";
     219       82916 :     span.tags[tags::internal::runtime_id] = runtime_id_.string();
     220             :   }
     221             : 
     222       82875 :   const auto result = collector_->send(std::move(spans_), trace_sampler_);
     223       82875 :   if (auto* error = result.if_error()) {
     224           2 :     logger_->log_error(
     225           2 :         error->with_prefix("Error sending spans to collector: "));
     226             :   }
     227             : 
     228       82875 :   tracer_telemetry_->metrics().tracer.trace_segments_closed.inc();
     229       82875 : }
     230             : 
     231          11 : void TraceSegment::override_sampling_priority(int priority) {
     232          11 :   SamplingDecision decision;
     233          11 :   decision.priority = priority;
     234          11 :   decision.mechanism = int(SamplingMechanism::MANUAL);
     235          11 :   decision.origin = SamplingDecision::Origin::LOCAL;
     236             : 
     237          11 :   std::lock_guard<std::mutex> lock(mutex_);
     238          11 :   sampling_decision_ = decision;
     239          11 :   update_decision_maker_trace_tag();
     240          11 : }
     241             : 
     242       82906 : void TraceSegment::make_sampling_decision_if_null() {
     243             :   // Depending on the context, `mutex_` might need already to be locked.
     244             : 
     245       82906 :   if (sampling_decision_) {
     246          85 :     return;
     247             :   }
     248             : 
     249       82821 :   const SpanData& local_root = *spans_.front();
     250       82821 :   sampling_decision_ = trace_sampler_->decide(local_root);
     251             : 
     252       82821 :   update_decision_maker_trace_tag();
     253             : }
     254             : 
     255       82832 : void TraceSegment::update_decision_maker_trace_tag() {
     256             :   // Depending on the context, `mutex_` might need already to be locked.
     257             : 
     258       82832 :   assert(sampling_decision_);
     259             : 
     260             :   // Note that `found` might be erased below (in case you refactor this code).
     261       82832 :   const auto found = std::find_if(
     262       92809 :       trace_tags_.begin(), trace_tags_.end(), [](const auto& entry) {
     263       92809 :         return entry.first == tags::internal::decision_maker;
     264             :       });
     265             : 
     266       82832 :   if (sampling_decision_->priority <= 0) {
     267       37197 :     if (found != trace_tags_.end()) {
     268           0 :       trace_tags_.erase(found);
     269             :     }
     270       37197 :     return;
     271             :   }
     272             : 
     273             :   // Note that `value` is moved-from below (in case you refactor this code).
     274       45635 :   auto value = "-" + std::to_string(*sampling_decision_->mechanism);
     275       45635 :   if (found == trace_tags_.end()) {
     276       45635 :     trace_tags_.emplace_back(tags::internal::decision_maker, std::move(value));
     277             :   } else {
     278           0 :     found->second = std::move(value);
     279             :   }
     280       45635 : }
     281             : 
     282          32 : void TraceSegment::inject(DictWriter& writer, const SpanData& span) {
     283             :   // If the only injection style is `NONE`, then don't do anything.
     284          56 :   if (injection_styles_.size() == 1 &&
     285          24 :       injection_styles_[0] == PropagationStyle::NONE) {
     286           1 :     return;
     287             :   }
     288             : 
     289             :   // The sampling priority can change (it can be overridden on another thread),
     290             :   // and trace tags might change when that happens ("_dd.p.dm").
     291             :   // So, we lock here, make a sampling decision if necessary, and then copy the
     292             :   // decision and trace tags before unlocking.
     293             :   int sampling_priority;
     294          31 :   std::vector<std::pair<std::string, std::string>> trace_tags;
     295             :   {
     296          31 :     std::lock_guard<std::mutex> lock(mutex_);
     297          31 :     make_sampling_decision_if_null();
     298          31 :     assert(sampling_decision_);
     299          31 :     sampling_priority = sampling_decision_->priority;
     300          31 :     trace_tags = trace_tags_;
     301          31 :   }
     302             : 
     303          71 :   for (const auto style : injection_styles_) {
     304          40 :     switch (style) {
     305           8 :       case PropagationStyle::DATADOG:
     306           8 :         writer.set("x-datadog-trace-id", std::to_string(span.trace_id.low));
     307           8 :         writer.set("x-datadog-parent-id", std::to_string(span.span_id));
     308           8 :         writer.set("x-datadog-sampling-priority",
     309          16 :                    std::to_string(sampling_priority));
     310           8 :         if (origin_) {
     311           2 :           writer.set("x-datadog-origin", *origin_);
     312             :         }
     313           8 :         inject_trace_tags(writer, trace_tags, tags_header_max_size_,
     314           8 :                           spans_.front()->tags, *logger_);
     315           8 :         break;
     316           4 :       case PropagationStyle::B3:
     317           4 :         if (span.trace_id.high) {
     318           1 :           writer.set("x-b3-traceid", span.trace_id.hex_padded());
     319             :         } else {
     320           3 :           writer.set("x-b3-traceid", hex_padded(span.trace_id.low));
     321             :         }
     322           4 :         writer.set("x-b3-spanid", hex_padded(span.span_id));
     323           4 :         writer.set("x-b3-sampled", std::to_string(int(sampling_priority > 0)));
     324           4 :         if (origin_) {
     325           2 :           writer.set("x-datadog-origin", *origin_);
     326             :         }
     327           4 :         inject_trace_tags(writer, trace_tags, tags_header_max_size_,
     328           4 :                           spans_.front()->tags, *logger_);
     329           4 :         break;
     330          28 :       case PropagationStyle::W3C:
     331          28 :         writer.set(
     332             :             "traceparent",
     333          56 :             encode_traceparent(span.trace_id, span.span_id, sampling_priority));
     334          28 :         writer.set("tracestate",
     335          28 :                    encode_tracestate(sampling_priority, origin_, trace_tags,
     336          28 :                                      additional_datadog_w3c_tracestate_,
     337          28 :                                      additional_w3c_tracestate_));
     338          28 :         break;
     339           0 :       default:
     340           0 :         assert(style == PropagationStyle::NONE);
     341           0 :         break;
     342             :     }
     343             :   }
     344          31 : }
     345             : 
     346             : }  // namespace tracing
     347             : }  // namespace datadog

Generated by: LCOV version 1.16