Line data Source code
1 : #include "datadog_agent.h"
2 :
3 : #include <cassert>
4 : #include <chrono>
5 : #include <string>
6 : #include <typeinfo>
7 : #include <unordered_map>
8 : #include <unordered_set>
9 :
10 : #include "collector_response.h"
11 : #include "datadog_agent_config.h"
12 : #include "dict_writer.h"
13 : #include "http_client.h"
14 : #include "json.hpp"
15 : #include "logger.h"
16 : #include "msgpack.h"
17 : #include "span_data.h"
18 : #include "string_view.h"
19 : #include "trace_sampler.h"
20 : #include "tracer.h"
21 : #include "version.h"
22 :
23 : namespace datadog {
24 : namespace tracing {
25 : namespace {
26 :
27 : constexpr StringView traces_api_path = "/v0.4/traces";
28 : constexpr StringView telemetry_v2_path = "/telemetry/proxy/api/v2/apmtelemetry";
29 : constexpr StringView remote_configuration_path = "/v0.7/config";
30 :
31 36 : void set_content_type_json(DictWriter& headers) {
32 36 : headers.set("Content-Type", "application/json");
33 36 : }
34 :
35 429 : HTTPClient::URL traces_endpoint(const HTTPClient::URL& agent_url) {
36 429 : auto traces_url = agent_url;
37 429 : append(traces_url.path, traces_api_path);
38 429 : return traces_url;
39 0 : }
40 :
41 429 : HTTPClient::URL telemetry_endpoint(const HTTPClient::URL& agent_url) {
42 429 : auto telemetry_v2_url = agent_url;
43 429 : append(telemetry_v2_url.path, telemetry_v2_path);
44 429 : return telemetry_v2_url;
45 0 : }
46 :
47 429 : HTTPClient::URL remote_configuration_endpoint(
48 : const HTTPClient::URL& agent_url) {
49 429 : auto remote_configuration = agent_url;
50 429 : append(remote_configuration.path, remote_configuration_path);
51 429 : return remote_configuration;
52 0 : }
53 :
54 418 : Expected<void> msgpack_encode(
55 : std::string& destination,
56 : const std::vector<DatadogAgent::TraceChunk>& trace_chunks) {
57 : return msgpack::pack_array(destination, trace_chunks,
58 418 : [](auto& destination, const auto& chunk) {
59 418 : return msgpack_encode(destination, chunk.spans);
60 418 : });
61 : }
62 :
63 9 : std::variant<CollectorResponse, std::string> parse_agent_traces_response(
64 : StringView body) try {
65 10 : nlohmann::json response = nlohmann::json::parse(body);
66 :
67 8 : StringView type = response.type_name();
68 8 : if (type != "object") {
69 1 : std::string message;
70 : message +=
71 : "Parsing the Datadog Agent's response to traces we sent it failed. "
72 : "The response is expected to be a JSON object, but instead it's a JSON "
73 1 : "value with type \"";
74 1 : append(message, type);
75 1 : message += '\"';
76 1 : message += "\nError occurred for response body (begins on next line):\n";
77 1 : append(message, body);
78 1 : return message;
79 1 : }
80 :
81 7 : const StringView sample_rates_property = "rate_by_service";
82 7 : const auto found = response.find(sample_rates_property);
83 7 : if (found == response.end()) {
84 2 : return CollectorResponse{};
85 : }
86 5 : const auto& rates_json = found.value();
87 5 : type = rates_json.type_name();
88 5 : if (type != "object") {
89 1 : std::string message;
90 : message +=
91 : "Parsing the Datadog Agent's response to traces we sent it failed. "
92 1 : "The \"";
93 1 : append(message, sample_rates_property);
94 : message +=
95 : "\" property of the response is expected to be a JSON object, but "
96 1 : "instead it's a JSON value with type \"";
97 1 : append(message, type);
98 1 : message += '\"';
99 1 : message += "\nError occurred for response body (begins on next line):\n";
100 1 : append(message, body);
101 1 : return message;
102 1 : }
103 :
104 4 : std::unordered_map<std::string, Rate> sample_rates;
105 7 : for (const auto& [key, value] : rates_json.items()) {
106 5 : type = value.type_name();
107 5 : if (type != "number") {
108 1 : std::string message;
109 : message +=
110 : "Datadog Agent response to traces included an invalid sample rate "
111 1 : "for the key \"";
112 1 : message += key;
113 1 : message += "\". Rate should be a number, but it's a \"";
114 1 : append(message, type);
115 1 : message += "\" instead.";
116 1 : message += "\nError occurred for response body (begins on next line):\n";
117 1 : append(message, body);
118 1 : return message;
119 1 : }
120 4 : auto maybe_rate = Rate::from(value);
121 4 : if (auto* error = maybe_rate.if_error()) {
122 1 : std::string message;
123 : message +=
124 : "Datadog Agent response trace traces included an invalid sample rate "
125 1 : "for the key \"";
126 1 : message += key;
127 1 : message += "\": ";
128 1 : message += error->message;
129 1 : message += "\nError occurred for response body (begins on next line):\n";
130 1 : append(message, body);
131 1 : return message;
132 1 : }
133 3 : sample_rates.emplace(key, *maybe_rate);
134 10 : }
135 2 : return CollectorResponse{std::move(sample_rates)};
136 9 : } catch (const nlohmann::json::exception& error) {
137 1 : std::string message;
138 : message +=
139 : "Parsing the Datadog Agent's response to traces we sent it failed with a "
140 1 : "JSON error: ";
141 1 : message += error.what();
142 1 : message += "\nError occurred for response body (begins on next line):\n";
143 1 : append(message, body);
144 1 : return message;
145 1 : }
146 :
147 : } // namespace
148 :
149 429 : DatadogAgent::DatadogAgent(
150 : const FinalizedDatadogAgentConfig& config,
151 : const std::shared_ptr<TracerTelemetry>& tracer_telemetry,
152 : const std::shared_ptr<Logger>& logger,
153 429 : const TracerSignature& tracer_signature, ConfigManager& config_manager)
154 429 : : tracer_telemetry_(tracer_telemetry),
155 429 : clock_(config.clock),
156 429 : logger_(logger),
157 429 : traces_endpoint_(traces_endpoint(config.url)),
158 429 : telemetry_endpoint_(telemetry_endpoint(config.url)),
159 429 : remote_configuration_endpoint_(remote_configuration_endpoint(config.url)),
160 429 : http_client_(config.http_client),
161 429 : event_scheduler_(config.event_scheduler),
162 429 : cancel_scheduled_flush_(event_scheduler_->schedule_recurring_event(
163 0 : config.flush_interval, [this]() { flush(); })),
164 429 : flush_interval_(config.flush_interval),
165 429 : request_timeout_(config.request_timeout),
166 429 : shutdown_timeout_(config.shutdown_timeout),
167 1287 : remote_config_(tracer_signature, config_manager) {
168 429 : assert(logger_);
169 429 : assert(tracer_telemetry_);
170 429 : if (tracer_telemetry_->enabled()) {
171 : // Callback for successful telemetry HTTP requests, to examine HTTP
172 : // status.
173 18 : telemetry_on_response_ = [logger = logger_](
174 : int response_status,
175 : const DictReader& /*response_headers*/,
176 : std::string response_body) {
177 0 : if (response_status < 200 || response_status >= 300) {
178 0 : logger->log_error([&](auto& stream) {
179 0 : stream << "Unexpected telemetry response status " << response_status
180 : << " with body (if any, starts on next line):\n"
181 0 : << response_body;
182 0 : });
183 : }
184 18 : };
185 :
186 : // Callback for unsuccessful telemetry HTTP requests.
187 54 : telemetry_on_error_ = [logger = logger_](Error error) {
188 36 : logger->log_error(error.with_prefix(
189 : "Error occurred during HTTP request for telemetry: "));
190 54 : };
191 :
192 : // Only schedule this if telemetry is enabled.
193 : // Every 10 seconds, have the tracer telemetry capture the metrics
194 : // values. Every 60 seconds, also report those values to the datadog
195 : // agent.
196 72 : cancel_telemetry_timer_ = event_scheduler_->schedule_recurring_event(
197 36 : std::chrono::seconds(10), [this, n = 0]() mutable {
198 0 : n++;
199 0 : tracer_telemetry_->capture_metrics();
200 0 : if (n % 6 == 0) {
201 0 : send_heartbeat_and_telemetry();
202 : }
203 18 : });
204 : }
205 :
206 : cancel_remote_configuration_task_ =
207 858 : event_scheduler_->schedule_recurring_event(
208 : config.remote_configuration_poll_interval,
209 429 : [this] { get_and_apply_remote_configuration_updates(); });
210 429 : }
211 :
212 429 : DatadogAgent::~DatadogAgent() {
213 429 : const auto deadline = clock_().tick + shutdown_timeout_;
214 429 : cancel_scheduled_flush_();
215 429 : flush();
216 429 : cancel_remote_configuration_task_();
217 429 : if (tracer_telemetry_->enabled()) {
218 : // This action only needs to occur if tracer telemetry is enabled.
219 18 : cancel_telemetry_timer_();
220 18 : tracer_telemetry_->capture_metrics();
221 : // The app-closing message is bundled with a message containing the
222 : // final metric values.
223 18 : send_app_closing();
224 : }
225 429 : http_client_->drain(deadline);
226 429 : }
227 :
228 418 : Expected<void> DatadogAgent::send(
229 : std::vector<std::unique_ptr<SpanData>>&& spans,
230 : const std::shared_ptr<TraceSampler>& response_handler) {
231 418 : std::lock_guard<std::mutex> lock(mutex_);
232 418 : trace_chunks_.push_back(TraceChunk{std::move(spans), response_handler});
233 836 : return nullopt;
234 418 : }
235 :
236 418 : nlohmann::json DatadogAgent::config_json() const {
237 : // clang-format off
238 2926 : return nlohmann::json::object({
239 : {"type", "datadog::tracing::DatadogAgent"},
240 17138 : {"config", nlohmann::json::object({
241 836 : {"traces_url", (traces_endpoint_.scheme + "://" + traces_endpoint_.authority + traces_endpoint_.path)},
242 836 : {"telemetry_url", (telemetry_endpoint_.scheme + "://" + telemetry_endpoint_.authority + telemetry_endpoint_.path)},
243 836 : {"remote_configuration_url", (remote_configuration_endpoint_.scheme + "://" + remote_configuration_endpoint_.authority + remote_configuration_endpoint_.path)},
244 836 : {"flush_interval_milliseconds", std::chrono::duration_cast<std::chrono::milliseconds>(flush_interval_).count() },
245 836 : {"request_timeout_milliseconds", std::chrono::duration_cast<std::chrono::milliseconds>(request_timeout_).count() },
246 836 : {"shutdown_timeout_milliseconds", std::chrono::duration_cast<std::chrono::milliseconds>(shutdown_timeout_).count() },
247 836 : {"http_client", http_client_->config_json()},
248 836 : {"event_scheduler", event_scheduler_->config_json()},
249 : })},
250 4180 : });
251 : // clang-format on
252 : }
253 :
254 429 : void DatadogAgent::flush() {
255 429 : std::vector<TraceChunk> trace_chunks;
256 : {
257 429 : std::lock_guard<std::mutex> lock(mutex_);
258 : using std::swap;
259 429 : swap(trace_chunks, trace_chunks_);
260 429 : }
261 :
262 429 : if (trace_chunks.empty()) {
263 11 : return;
264 : }
265 :
266 418 : std::string body;
267 418 : auto encode_result = msgpack_encode(body, trace_chunks);
268 418 : if (auto* error = encode_result.if_error()) {
269 0 : logger_->log_error(*error);
270 0 : return;
271 : }
272 :
273 : // One HTTP request to the Agent could possibly involve trace chunks from
274 : // multiple tracers, and thus multiple trace samplers might need to have
275 : // their rates updated. Unlikely, but possible.
276 418 : std::unordered_set<std::shared_ptr<TraceSampler>> response_handlers;
277 836 : for (auto& chunk : trace_chunks) {
278 418 : response_handlers.insert(std::move(chunk.response_handler));
279 : }
280 :
281 : // This is the callback for setting request headers.
282 : // It's invoked synchronously (before `post` returns).
283 417 : auto set_request_headers = [&](DictWriter& headers) {
284 417 : headers.set("Content-Type", "application/msgpack");
285 417 : headers.set("Datadog-Meta-Lang", "cpp");
286 417 : headers.set("Datadog-Meta-Lang-Version", std::to_string(__cplusplus));
287 417 : headers.set("Datadog-Meta-Tracer-Version", tracer_version);
288 417 : headers.set("X-Datadog-Trace-Count", std::to_string(trace_chunks.size()));
289 417 : };
290 :
291 : // This is the callback for the HTTP response. It's invoked
292 : // asynchronously.
293 827 : auto on_response = [telemetry = tracer_telemetry_,
294 418 : samplers = std::move(response_handlers),
295 418 : logger = logger_](int response_status,
296 : const DictReader& /*response_headers*/,
297 : std::string response_body) {
298 409 : if (response_status >= 500) {
299 100 : telemetry->metrics().trace_api.responses_5xx.inc();
300 309 : } else if (response_status >= 400) {
301 100 : telemetry->metrics().trace_api.responses_4xx.inc();
302 209 : } else if (response_status >= 300) {
303 100 : telemetry->metrics().trace_api.responses_3xx.inc();
304 109 : } else if (response_status >= 200) {
305 109 : telemetry->metrics().trace_api.responses_2xx.inc();
306 0 : } else if (response_status >= 100) {
307 0 : telemetry->metrics().trace_api.responses_1xx.inc();
308 : }
309 409 : if (response_status != 200) {
310 399 : logger->log_error([&](auto& stream) {
311 399 : stream << "Unexpected response status " << response_status
312 399 : << " in Datadog Agent response with body of length "
313 399 : << response_body.size() << " (starts on next line):\n"
314 399 : << response_body;
315 399 : });
316 405 : return;
317 : }
318 :
319 10 : if (response_body.empty()) {
320 1 : logger->log_error([](auto& stream) {
321 1 : stream << "Datadog Agent returned response without a body."
322 : " This tracer might be sending batches of traces too "
323 : "frequently";
324 1 : });
325 1 : return;
326 : }
327 :
328 9 : auto result = parse_agent_traces_response(response_body);
329 9 : if (const auto* error_message = std::get_if<std::string>(&result)) {
330 5 : logger->log_error(*error_message);
331 5 : return;
332 : }
333 4 : const auto& response = std::get<CollectorResponse>(result);
334 8 : for (const auto& sampler : samplers) {
335 4 : if (sampler) {
336 4 : sampler->handle_collector_response(response);
337 : }
338 : }
339 845 : };
340 :
341 : // This is the callback for if something goes wrong sending the
342 : // request or retrieving the response. It's invoked
343 : // asynchronously.
344 426 : auto on_error = [telemetry = tracer_telemetry_,
345 418 : logger = logger_](Error error) {
346 8 : telemetry->metrics().trace_api.errors_network.inc();
347 8 : logger->log_error(error.with_prefix(
348 : "Error occurred during HTTP request for submitting traces: "));
349 426 : };
350 :
351 418 : tracer_telemetry_->metrics().trace_api.requests.inc();
352 : auto post_result =
353 836 : http_client_->post(traces_endpoint_, std::move(set_request_headers),
354 836 : std::move(body), std::move(on_response),
355 2090 : std::move(on_error), clock_().tick + request_timeout_);
356 418 : if (auto* error = post_result.if_error()) {
357 2 : logger_->log_error(
358 2 : error->with_prefix("Unexpected error submitting traces: "));
359 : }
360 429 : }
361 :
362 18 : void DatadogAgent::send_app_started() {
363 18 : auto payload = tracer_telemetry_->app_started();
364 : auto post_result =
365 18 : http_client_->post(telemetry_endpoint_, set_content_type_json,
366 18 : std::move(payload), telemetry_on_response_,
367 54 : telemetry_on_error_, clock_().tick + request_timeout_);
368 18 : if (auto* error = post_result.if_error()) {
369 0 : logger_->log_error(error->with_prefix(
370 : "Unexpected error submitting telemetry app-started event: "));
371 : }
372 18 : }
373 :
374 0 : void DatadogAgent::send_heartbeat_and_telemetry() {
375 0 : auto payload = tracer_telemetry_->heartbeat_and_telemetry();
376 : auto post_result =
377 0 : http_client_->post(telemetry_endpoint_, set_content_type_json,
378 0 : std::move(payload), telemetry_on_response_,
379 0 : telemetry_on_error_, clock_().tick + request_timeout_);
380 0 : if (auto* error = post_result.if_error()) {
381 0 : logger_->log_error(error->with_prefix(
382 : "Unexpected error submitting telemetry app-heartbeat event: "));
383 : }
384 0 : }
385 :
386 18 : void DatadogAgent::send_app_closing() {
387 18 : auto payload = tracer_telemetry_->app_closing();
388 : auto post_result =
389 18 : http_client_->post(telemetry_endpoint_, set_content_type_json,
390 18 : std::move(payload), telemetry_on_response_,
391 54 : telemetry_on_error_, clock_().tick + request_timeout_);
392 18 : if (auto* error = post_result.if_error()) {
393 0 : logger_->log_error(error->with_prefix(
394 : "Unexpected error submitting telemetry app-closing event: "));
395 : }
396 18 : }
397 :
398 0 : void DatadogAgent::get_and_apply_remote_configuration_updates() {
399 : auto remote_configuration_on_response =
400 0 : [this](int response_status, const DictReader& /*response_headers*/,
401 0 : std::string response_body) {
402 0 : if (response_status < 200 || response_status >= 300) {
403 0 : if (response_status == 404) {
404 : // If the Datadog Agent doesn't understand remote configuration,
405 : // or if remote configuration is disabled in the agent, then it
406 : // will return 404. This is not an error.
407 : // We'll keep polling, though, because the agent's configuration
408 : // might change.
409 0 : return;
410 : }
411 :
412 0 : logger_->log_error([&](auto& stream) {
413 0 : stream << "Unexpected Remote Configuration status "
414 0 : << response_status
415 : << " with body (if any, starts on next line):\n"
416 0 : << response_body;
417 0 : });
418 :
419 0 : return;
420 : }
421 :
422 : const auto response_json =
423 : nlohmann::json::parse(/* input = */ response_body,
424 : /* parser_callback = */ nullptr,
425 0 : /* allow_exceptions = */ false);
426 0 : if (response_json.is_discarded()) {
427 0 : logger_->log_error([](auto& stream) {
428 0 : stream << "Could not parse Remote Configuration response body";
429 0 : });
430 0 : return;
431 : }
432 :
433 : // TODO(@dgoffredo): When is the parsed JSON object empty?
434 0 : if (!response_json.empty()) {
435 : // TODO (during Active Configuration): `process_response` should
436 : // return a list of configuration update and should be consumed by
437 : // telemetry.
438 0 : remote_config_.process_response(response_json);
439 : }
440 0 : };
441 :
442 0 : auto remote_configuration_on_error = [logger = logger_](Error error) {
443 0 : logger->log_error(error.with_prefix(
444 : "Error occurred during HTTP request for Remote Configuration: "));
445 0 : };
446 :
447 0 : auto post_result = http_client_->post(
448 0 : remote_configuration_endpoint_, set_content_type_json,
449 0 : remote_config_.make_request_payload().dump(),
450 : remote_configuration_on_response, remote_configuration_on_error,
451 0 : clock_().tick + request_timeout_);
452 0 : if (auto error = post_result.if_error()) {
453 0 : logger_->log_error(
454 0 : error->with_prefix("Unexpected error while requesting Remote "
455 : "Configuration updates: "));
456 : }
457 0 : }
458 :
459 : } // namespace tracing
460 : } // namespace datadog
|