Line data Source code
1 : #include "trace_segment.h"
2 :
3 : #include <cassert>
4 : #include <string>
5 : #include <unordered_map>
6 : #include <utility>
7 : #include <vector>
8 :
9 : #include "collector.h"
10 : #include "collector_response.h"
11 : #include "dict_writer.h"
12 : #include "error.h"
13 : #include "hex.h"
14 : #include "logger.h"
15 : #include "optional.h"
16 : #include "platform_util.h"
17 : #include "random.h"
18 : #include "span_data.h"
19 : #include "span_defaults.h"
20 : #include "span_sampler.h"
21 : #include "tag_propagation.h"
22 : #include "tags.h"
23 : #include "trace_sampler.h"
24 : #include "w3c_propagation.h"
25 :
26 : namespace datadog {
27 : namespace tracing {
28 : namespace {
29 :
30 : struct Cache {
31 : static int process_id;
32 :
33 1 : static void recalculate_values() { process_id = get_process_id(); }
34 :
35 1 : Cache() {
36 1 : recalculate_values();
37 1 : at_fork_in_child(&recalculate_values);
38 1 : }
39 : };
40 :
41 : int Cache::process_id;
42 :
43 : // `cache_singleton` exists solely to invoke `Cache`'s constructor.
44 : // All data members are static, so use e.g. `Cache::process_id` instead of
45 : // `cache_singleton.process_id`.
46 : Cache cache_singleton;
47 :
48 : // Encode the specified `trace_tags`. If the encoded value is not longer than
49 : // the specified `tags_header_max_size`, then set it as the "x-datadog-tags"
50 : // header using the specified `writer`. If the encoded value is oversized, then
51 : // write a diagnostic to the specified `logger` and set a propagation error tag
52 : // on the specified `local_root_tags`.
53 12 : void inject_trace_tags(
54 : DictWriter& writer,
55 : const std::vector<std::pair<std::string, std::string>>& trace_tags,
56 : std::size_t tags_header_max_size,
57 : std::unordered_map<std::string, std::string>& local_root_tags,
58 : Logger& logger) {
59 12 : const std::string encoded_trace_tags = encode_tags(trace_tags);
60 :
61 12 : if (encoded_trace_tags.size() > tags_header_max_size) {
62 1 : std::string message;
63 : message +=
64 : "Serialized x-datadog-tags header value is too large. The configured "
65 1 : "maximum size is ";
66 1 : message += std::to_string(tags_header_max_size);
67 1 : message += " bytes, but the encoded value is ";
68 1 : message += std::to_string(encoded_trace_tags.size());
69 1 : message += " bytes.";
70 1 : logger.log_error(message);
71 1 : local_root_tags[tags::internal::propagation_error] = "inject_max_size";
72 12 : } else if (!encoded_trace_tags.empty()) {
73 9 : writer.set("x-datadog-tags", encoded_trace_tags);
74 : }
75 12 : }
76 :
77 : } // namespace
78 :
79 82875 : TraceSegment::TraceSegment(
80 : const std::shared_ptr<Logger>& logger,
81 : const std::shared_ptr<Collector>& collector,
82 : const std::shared_ptr<TracerTelemetry>& tracer_telemetry,
83 : const std::shared_ptr<TraceSampler>& trace_sampler,
84 : const std::shared_ptr<SpanSampler>& span_sampler,
85 : const std::shared_ptr<const SpanDefaults>& defaults,
86 : const RuntimeID& runtime_id,
87 : const std::vector<PropagationStyle>& injection_styles,
88 : const Optional<std::string>& hostname, Optional<std::string> origin,
89 : std::size_t tags_header_max_size,
90 : std::vector<std::pair<std::string, std::string>> trace_tags,
91 : Optional<SamplingDecision> sampling_decision,
92 : Optional<std::string> additional_w3c_tracestate,
93 : Optional<std::string> additional_datadog_w3c_tracestate,
94 82875 : std::unique_ptr<SpanData> local_root)
95 82875 : : logger_(logger),
96 82875 : collector_(collector),
97 82875 : tracer_telemetry_(tracer_telemetry),
98 82875 : trace_sampler_(trace_sampler),
99 82875 : span_sampler_(span_sampler),
100 82875 : defaults_(defaults),
101 82875 : runtime_id_(runtime_id),
102 82875 : injection_styles_(injection_styles),
103 82875 : hostname_(hostname),
104 82875 : origin_(std::move(origin)),
105 82875 : tags_header_max_size_(tags_header_max_size),
106 82875 : trace_tags_(std::move(trace_tags)),
107 82875 : num_finished_spans_(0),
108 82875 : sampling_decision_(std::move(sampling_decision)),
109 82875 : additional_w3c_tracestate_(std::move(additional_w3c_tracestate)),
110 82875 : additional_datadog_w3c_tracestate_(
111 248625 : std::move(additional_datadog_w3c_tracestate)) {
112 82875 : assert(logger_);
113 82875 : assert(collector_);
114 82875 : assert(tracer_telemetry_);
115 82875 : assert(trace_sampler_);
116 82875 : assert(span_sampler_);
117 82875 : assert(defaults_);
118 :
119 82875 : register_span(std::move(local_root));
120 82875 : }
121 :
122 42 : const SpanDefaults& TraceSegment::defaults() const { return *defaults_; }
123 :
124 4 : const Optional<std::string>& TraceSegment::hostname() const {
125 4 : return hostname_;
126 : }
127 :
128 1 : const Optional<std::string>& TraceSegment::origin() const { return origin_; }
129 :
130 34 : Optional<SamplingDecision> TraceSegment::sampling_decision() const {
131 : // `sampling_decision_` can change, so we need a lock.
132 34 : std::lock_guard<std::mutex> lock(mutex_);
133 34 : return sampling_decision_;
134 34 : }
135 :
136 1 : Logger& TraceSegment::logger() const { return *logger_; }
137 :
138 82916 : void TraceSegment::register_span(std::unique_ptr<SpanData> span) {
139 82916 : tracer_telemetry_->metrics().tracer.spans_created.inc();
140 :
141 82916 : std::lock_guard<std::mutex> lock(mutex_);
142 82916 : assert(spans_.empty() || num_finished_spans_ < spans_.size());
143 82916 : spans_.emplace_back(std::move(span));
144 82916 : }
145 :
146 82916 : void TraceSegment::span_finished() {
147 : {
148 82916 : tracer_telemetry_->metrics().tracer.spans_finished.inc();
149 82916 : std::lock_guard<std::mutex> lock(mutex_);
150 82916 : ++num_finished_spans_;
151 82916 : assert(num_finished_spans_ <= spans_.size());
152 82916 : if (num_finished_spans_ < spans_.size()) {
153 41 : return;
154 : }
155 82916 : }
156 : // We don't need the lock anymore. There's nobody left to call our methods.
157 : // On the other hand, there's nobody left to contend for the mutex, so it
158 : // doesn't make any difference.
159 82875 : make_sampling_decision_if_null();
160 82875 : assert(sampling_decision_);
161 :
162 : // All of our spans are finished. Run the span sampler, finalize the spans,
163 : // and then send the spans to the collector.
164 82875 : if (sampling_decision_->priority <= 0) {
165 : // Span sampling happens when the trace is dropped.
166 74458 : for (const auto& span_ptr : spans_) {
167 37238 : SpanData& span = *span_ptr;
168 37238 : auto* rule = span_sampler_->match(span);
169 37238 : if (!rule) {
170 36127 : continue;
171 : }
172 2012 : const SamplingDecision decision = rule->decide(span);
173 2012 : if (decision.priority <= 0) {
174 901 : continue;
175 : }
176 2222 : span.numeric_tags[tags::internal::span_sampling_mechanism] =
177 1111 : *decision.mechanism;
178 1111 : span.numeric_tags[tags::internal::span_sampling_rule_rate] =
179 1111 : *decision.configured_rate;
180 1111 : if (decision.limiter_max_per_second) {
181 100 : span.numeric_tags[tags::internal::span_sampling_limit] =
182 100 : *decision.limiter_max_per_second;
183 : }
184 : }
185 : }
186 :
187 82875 : const SamplingDecision& decision = *sampling_decision_;
188 :
189 82875 : auto& local_root = *spans_.front();
190 82875 : local_root.tags.insert(trace_tags_.begin(), trace_tags_.end());
191 165750 : local_root.numeric_tags[tags::internal::sampling_priority] =
192 82875 : decision.priority;
193 82875 : if (hostname_) {
194 3 : local_root.tags[tags::internal::hostname] = *hostname_;
195 : }
196 82875 : if (decision.origin == SamplingDecision::Origin::LOCAL) {
197 145665 : if (decision.mechanism == int(SamplingMechanism::AGENT_RATE) ||
198 145665 : decision.mechanism == int(SamplingMechanism::DEFAULT)) {
199 30494 : local_root.numeric_tags[tags::internal::agent_sample_rate] =
200 30494 : *decision.configured_rate;
201 52338 : } else if (decision.mechanism == int(SamplingMechanism::RULE)) {
202 52327 : local_root.numeric_tags[tags::internal::rule_sample_rate] =
203 52327 : *decision.configured_rate;
204 52327 : if (decision.limiter_effective_rate) {
205 25316 : local_root.numeric_tags[tags::internal::rule_limiter_sample_rate] =
206 25316 : *decision.limiter_effective_rate;
207 : }
208 : }
209 : }
210 :
211 : // Some tags are repeated on all spans.
212 165791 : for (const auto& span_ptr : spans_) {
213 82916 : SpanData& span = *span_ptr;
214 82916 : if (origin_) {
215 36 : span.tags[tags::internal::origin] = *origin_;
216 : }
217 82916 : span.numeric_tags[tags::internal::process_id] = Cache::process_id;
218 82916 : span.tags[tags::internal::language] = "cpp";
219 82916 : span.tags[tags::internal::runtime_id] = runtime_id_.string();
220 : }
221 :
222 82875 : const auto result = collector_->send(std::move(spans_), trace_sampler_);
223 82875 : if (auto* error = result.if_error()) {
224 2 : logger_->log_error(
225 2 : error->with_prefix("Error sending spans to collector: "));
226 : }
227 :
228 82875 : tracer_telemetry_->metrics().tracer.trace_segments_closed.inc();
229 82875 : }
230 :
231 11 : void TraceSegment::override_sampling_priority(int priority) {
232 11 : SamplingDecision decision;
233 11 : decision.priority = priority;
234 11 : decision.mechanism = int(SamplingMechanism::MANUAL);
235 11 : decision.origin = SamplingDecision::Origin::LOCAL;
236 :
237 11 : std::lock_guard<std::mutex> lock(mutex_);
238 11 : sampling_decision_ = decision;
239 11 : update_decision_maker_trace_tag();
240 11 : }
241 :
242 82906 : void TraceSegment::make_sampling_decision_if_null() {
243 : // Depending on the context, `mutex_` might need already to be locked.
244 :
245 82906 : if (sampling_decision_) {
246 85 : return;
247 : }
248 :
249 82821 : const SpanData& local_root = *spans_.front();
250 82821 : sampling_decision_ = trace_sampler_->decide(local_root);
251 :
252 82821 : update_decision_maker_trace_tag();
253 : }
254 :
255 82832 : void TraceSegment::update_decision_maker_trace_tag() {
256 : // Depending on the context, `mutex_` might need already to be locked.
257 :
258 82832 : assert(sampling_decision_);
259 :
260 : // Note that `found` might be erased below (in case you refactor this code).
261 82832 : const auto found = std::find_if(
262 92809 : trace_tags_.begin(), trace_tags_.end(), [](const auto& entry) {
263 92809 : return entry.first == tags::internal::decision_maker;
264 : });
265 :
266 82832 : if (sampling_decision_->priority <= 0) {
267 37197 : if (found != trace_tags_.end()) {
268 0 : trace_tags_.erase(found);
269 : }
270 37197 : return;
271 : }
272 :
273 : // Note that `value` is moved-from below (in case you refactor this code).
274 45635 : auto value = "-" + std::to_string(*sampling_decision_->mechanism);
275 45635 : if (found == trace_tags_.end()) {
276 45635 : trace_tags_.emplace_back(tags::internal::decision_maker, std::move(value));
277 : } else {
278 0 : found->second = std::move(value);
279 : }
280 45635 : }
281 :
282 32 : void TraceSegment::inject(DictWriter& writer, const SpanData& span) {
283 : // If the only injection style is `NONE`, then don't do anything.
284 56 : if (injection_styles_.size() == 1 &&
285 24 : injection_styles_[0] == PropagationStyle::NONE) {
286 1 : return;
287 : }
288 :
289 : // The sampling priority can change (it can be overridden on another thread),
290 : // and trace tags might change when that happens ("_dd.p.dm").
291 : // So, we lock here, make a sampling decision if necessary, and then copy the
292 : // decision and trace tags before unlocking.
293 : int sampling_priority;
294 31 : std::vector<std::pair<std::string, std::string>> trace_tags;
295 : {
296 31 : std::lock_guard<std::mutex> lock(mutex_);
297 31 : make_sampling_decision_if_null();
298 31 : assert(sampling_decision_);
299 31 : sampling_priority = sampling_decision_->priority;
300 31 : trace_tags = trace_tags_;
301 31 : }
302 :
303 71 : for (const auto style : injection_styles_) {
304 40 : switch (style) {
305 8 : case PropagationStyle::DATADOG:
306 8 : writer.set("x-datadog-trace-id", std::to_string(span.trace_id.low));
307 8 : writer.set("x-datadog-parent-id", std::to_string(span.span_id));
308 8 : writer.set("x-datadog-sampling-priority",
309 16 : std::to_string(sampling_priority));
310 8 : if (origin_) {
311 2 : writer.set("x-datadog-origin", *origin_);
312 : }
313 8 : inject_trace_tags(writer, trace_tags, tags_header_max_size_,
314 8 : spans_.front()->tags, *logger_);
315 8 : break;
316 4 : case PropagationStyle::B3:
317 4 : if (span.trace_id.high) {
318 1 : writer.set("x-b3-traceid", span.trace_id.hex_padded());
319 : } else {
320 3 : writer.set("x-b3-traceid", hex_padded(span.trace_id.low));
321 : }
322 4 : writer.set("x-b3-spanid", hex_padded(span.span_id));
323 4 : writer.set("x-b3-sampled", std::to_string(int(sampling_priority > 0)));
324 4 : if (origin_) {
325 2 : writer.set("x-datadog-origin", *origin_);
326 : }
327 4 : inject_trace_tags(writer, trace_tags, tags_header_max_size_,
328 4 : spans_.front()->tags, *logger_);
329 4 : break;
330 28 : case PropagationStyle::W3C:
331 28 : writer.set(
332 : "traceparent",
333 56 : encode_traceparent(span.trace_id, span.span_id, sampling_priority));
334 28 : writer.set("tracestate",
335 28 : encode_tracestate(sampling_priority, origin_, trace_tags,
336 28 : additional_datadog_w3c_tracestate_,
337 28 : additional_w3c_tracestate_));
338 28 : break;
339 0 : default:
340 0 : assert(style == PropagationStyle::NONE);
341 0 : break;
342 : }
343 : }
344 31 : }
345 :
346 : } // namespace tracing
347 : } // namespace datadog
|