LCOV - code coverage report
Current view: top level - datadog - datadog_agent.cpp (source / functions) Hit Total Coverage
Test: filtered.info Lines: 221 280 78.9 %
Date: 2024-01-03 20:30:12 Functions: 20 32 62.5 %

          Line data    Source code
       1             : #include "datadog_agent.h"
       2             : 
       3             : #include <cassert>
       4             : #include <chrono>
       5             : #include <string>
       6             : #include <typeinfo>
       7             : #include <unordered_map>
       8             : #include <unordered_set>
       9             : 
      10             : #include "collector_response.h"
      11             : #include "datadog_agent_config.h"
      12             : #include "dict_writer.h"
      13             : #include "http_client.h"
      14             : #include "json.hpp"
      15             : #include "logger.h"
      16             : #include "msgpack.h"
      17             : #include "span_data.h"
      18             : #include "string_view.h"
      19             : #include "trace_sampler.h"
      20             : #include "tracer.h"
      21             : #include "version.h"
      22             : 
      23             : namespace datadog {
      24             : namespace tracing {
      25             : namespace {
      26             : 
      27             : constexpr StringView traces_api_path = "/v0.4/traces";
      28             : constexpr StringView telemetry_v2_path = "/telemetry/proxy/api/v2/apmtelemetry";
      29             : constexpr StringView remote_configuration_path = "/v0.7/config";
      30             : 
      31          36 : void set_content_type_json(DictWriter& headers) {
      32          36 :   headers.set("Content-Type", "application/json");
      33          36 : }
      34             : 
      35         429 : HTTPClient::URL traces_endpoint(const HTTPClient::URL& agent_url) {
      36         429 :   auto traces_url = agent_url;
      37         429 :   append(traces_url.path, traces_api_path);
      38         429 :   return traces_url;
      39           0 : }
      40             : 
      41         429 : HTTPClient::URL telemetry_endpoint(const HTTPClient::URL& agent_url) {
      42         429 :   auto telemetry_v2_url = agent_url;
      43         429 :   append(telemetry_v2_url.path, telemetry_v2_path);
      44         429 :   return telemetry_v2_url;
      45           0 : }
      46             : 
      47         429 : HTTPClient::URL remote_configuration_endpoint(
      48             :     const HTTPClient::URL& agent_url) {
      49         429 :   auto remote_configuration = agent_url;
      50         429 :   append(remote_configuration.path, remote_configuration_path);
      51         429 :   return remote_configuration;
      52           0 : }
      53             : 
      54         418 : Expected<void> msgpack_encode(
      55             :     std::string& destination,
      56             :     const std::vector<DatadogAgent::TraceChunk>& trace_chunks) {
      57             :   return msgpack::pack_array(destination, trace_chunks,
      58         418 :                              [](auto& destination, const auto& chunk) {
      59         418 :                                return msgpack_encode(destination, chunk.spans);
      60         418 :                              });
      61             : }
      62             : 
      63           9 : std::variant<CollectorResponse, std::string> parse_agent_traces_response(
      64             :     StringView body) try {
      65          10 :   nlohmann::json response = nlohmann::json::parse(body);
      66             : 
      67           8 :   StringView type = response.type_name();
      68           8 :   if (type != "object") {
      69           1 :     std::string message;
      70             :     message +=
      71             :         "Parsing the Datadog Agent's response to traces we sent it failed.  "
      72             :         "The response is expected to be a JSON object, but instead it's a JSON "
      73           1 :         "value with type \"";
      74           1 :     append(message, type);
      75           1 :     message += '\"';
      76           1 :     message += "\nError occurred for response body (begins on next line):\n";
      77           1 :     append(message, body);
      78           1 :     return message;
      79           1 :   }
      80             : 
      81           7 :   const StringView sample_rates_property = "rate_by_service";
      82           7 :   const auto found = response.find(sample_rates_property);
      83           7 :   if (found == response.end()) {
      84           2 :     return CollectorResponse{};
      85             :   }
      86           5 :   const auto& rates_json = found.value();
      87           5 :   type = rates_json.type_name();
      88           5 :   if (type != "object") {
      89           1 :     std::string message;
      90             :     message +=
      91             :         "Parsing the Datadog Agent's response to traces we sent it failed.  "
      92           1 :         "The \"";
      93           1 :     append(message, sample_rates_property);
      94             :     message +=
      95             :         "\" property of the response is expected to be a JSON object, but "
      96           1 :         "instead it's a JSON value with type \"";
      97           1 :     append(message, type);
      98           1 :     message += '\"';
      99           1 :     message += "\nError occurred for response body (begins on next line):\n";
     100           1 :     append(message, body);
     101           1 :     return message;
     102           1 :   }
     103             : 
     104           4 :   std::unordered_map<std::string, Rate> sample_rates;
     105           7 :   for (const auto& [key, value] : rates_json.items()) {
     106           5 :     type = value.type_name();
     107           5 :     if (type != "number") {
     108           1 :       std::string message;
     109             :       message +=
     110             :           "Datadog Agent response to traces included an invalid sample rate "
     111           1 :           "for the key \"";
     112           1 :       message += key;
     113           1 :       message += "\". Rate should be a number, but it's a \"";
     114           1 :       append(message, type);
     115           1 :       message += "\" instead.";
     116           1 :       message += "\nError occurred for response body (begins on next line):\n";
     117           1 :       append(message, body);
     118           1 :       return message;
     119           1 :     }
     120           4 :     auto maybe_rate = Rate::from(value);
     121           4 :     if (auto* error = maybe_rate.if_error()) {
     122           1 :       std::string message;
     123             :       message +=
     124             :           "Datadog Agent response trace traces included an invalid sample rate "
     125           1 :           "for the key \"";
     126           1 :       message += key;
     127           1 :       message += "\": ";
     128           1 :       message += error->message;
     129           1 :       message += "\nError occurred for response body (begins on next line):\n";
     130           1 :       append(message, body);
     131           1 :       return message;
     132           1 :     }
     133           3 :     sample_rates.emplace(key, *maybe_rate);
     134          10 :   }
     135           2 :   return CollectorResponse{std::move(sample_rates)};
     136           9 : } catch (const nlohmann::json::exception& error) {
     137           1 :   std::string message;
     138             :   message +=
     139             :       "Parsing the Datadog Agent's response to traces we sent it failed with a "
     140           1 :       "JSON error: ";
     141           1 :   message += error.what();
     142           1 :   message += "\nError occurred for response body (begins on next line):\n";
     143           1 :   append(message, body);
     144           1 :   return message;
     145           1 : }
     146             : 
     147             : }  // namespace
     148             : 
     149         429 : DatadogAgent::DatadogAgent(
     150             :     const FinalizedDatadogAgentConfig& config,
     151             :     const std::shared_ptr<TracerTelemetry>& tracer_telemetry,
     152             :     const std::shared_ptr<Logger>& logger,
     153         429 :     const TracerSignature& tracer_signature, ConfigManager& config_manager)
     154         429 :     : tracer_telemetry_(tracer_telemetry),
     155         429 :       clock_(config.clock),
     156         429 :       logger_(logger),
     157         429 :       traces_endpoint_(traces_endpoint(config.url)),
     158         429 :       telemetry_endpoint_(telemetry_endpoint(config.url)),
     159         429 :       remote_configuration_endpoint_(remote_configuration_endpoint(config.url)),
     160         429 :       http_client_(config.http_client),
     161         429 :       event_scheduler_(config.event_scheduler),
     162         429 :       cancel_scheduled_flush_(event_scheduler_->schedule_recurring_event(
     163           0 :           config.flush_interval, [this]() { flush(); })),
     164         429 :       flush_interval_(config.flush_interval),
     165         429 :       request_timeout_(config.request_timeout),
     166         429 :       shutdown_timeout_(config.shutdown_timeout),
     167        1287 :       remote_config_(tracer_signature, config_manager) {
     168         429 :   assert(logger_);
     169         429 :   assert(tracer_telemetry_);
     170         429 :   if (tracer_telemetry_->enabled()) {
     171             :     // Callback for successful telemetry HTTP requests, to examine HTTP
     172             :     // status.
     173          18 :     telemetry_on_response_ = [logger = logger_](
     174             :                                  int response_status,
     175             :                                  const DictReader& /*response_headers*/,
     176             :                                  std::string response_body) {
     177           0 :       if (response_status < 200 || response_status >= 300) {
     178           0 :         logger->log_error([&](auto& stream) {
     179           0 :           stream << "Unexpected telemetry response status " << response_status
     180             :                  << " with body (if any, starts on next line):\n"
     181           0 :                  << response_body;
     182           0 :         });
     183             :       }
     184          18 :     };
     185             : 
     186             :     // Callback for unsuccessful telemetry HTTP requests.
     187          54 :     telemetry_on_error_ = [logger = logger_](Error error) {
     188          36 :       logger->log_error(error.with_prefix(
     189             :           "Error occurred during HTTP request for telemetry: "));
     190          54 :     };
     191             : 
     192             :     // Only schedule this if telemetry is enabled.
     193             :     // Every 10 seconds, have the tracer telemetry capture the metrics
     194             :     // values. Every 60 seconds, also report those values to the datadog
     195             :     // agent.
     196          72 :     cancel_telemetry_timer_ = event_scheduler_->schedule_recurring_event(
     197          36 :         std::chrono::seconds(10), [this, n = 0]() mutable {
     198           0 :           n++;
     199           0 :           tracer_telemetry_->capture_metrics();
     200           0 :           if (n % 6 == 0) {
     201           0 :             send_heartbeat_and_telemetry();
     202             :           }
     203          18 :         });
     204             :   }
     205             : 
     206             :   cancel_remote_configuration_task_ =
     207         858 :       event_scheduler_->schedule_recurring_event(
     208             :           config.remote_configuration_poll_interval,
     209         429 :           [this] { get_and_apply_remote_configuration_updates(); });
     210         429 : }
     211             : 
     212         429 : DatadogAgent::~DatadogAgent() {
     213         429 :   const auto deadline = clock_().tick + shutdown_timeout_;
     214         429 :   cancel_scheduled_flush_();
     215         429 :   flush();
     216         429 :   cancel_remote_configuration_task_();
     217         429 :   if (tracer_telemetry_->enabled()) {
     218             :     // This action only needs to occur if tracer telemetry is enabled.
     219          18 :     cancel_telemetry_timer_();
     220          18 :     tracer_telemetry_->capture_metrics();
     221             :     // The app-closing message is bundled with a message containing the
     222             :     // final metric values.
     223          18 :     send_app_closing();
     224             :   }
     225         429 :   http_client_->drain(deadline);
     226         429 : }
     227             : 
     228         418 : Expected<void> DatadogAgent::send(
     229             :     std::vector<std::unique_ptr<SpanData>>&& spans,
     230             :     const std::shared_ptr<TraceSampler>& response_handler) {
     231         418 :   std::lock_guard<std::mutex> lock(mutex_);
     232         418 :   trace_chunks_.push_back(TraceChunk{std::move(spans), response_handler});
     233         836 :   return nullopt;
     234         418 : }
     235             : 
     236         418 : nlohmann::json DatadogAgent::config_json() const {
     237             :   // clang-format off
     238        2926 :   return nlohmann::json::object({
     239             :     {"type", "datadog::tracing::DatadogAgent"},
     240       17138 :     {"config", nlohmann::json::object({
     241         836 :       {"traces_url", (traces_endpoint_.scheme + "://" + traces_endpoint_.authority + traces_endpoint_.path)},
     242         836 :       {"telemetry_url", (telemetry_endpoint_.scheme + "://" + telemetry_endpoint_.authority + telemetry_endpoint_.path)},
     243         836 :       {"remote_configuration_url", (remote_configuration_endpoint_.scheme + "://" + remote_configuration_endpoint_.authority + remote_configuration_endpoint_.path)},
     244         836 :       {"flush_interval_milliseconds", std::chrono::duration_cast<std::chrono::milliseconds>(flush_interval_).count() },
     245         836 :       {"request_timeout_milliseconds", std::chrono::duration_cast<std::chrono::milliseconds>(request_timeout_).count() },
     246         836 :       {"shutdown_timeout_milliseconds", std::chrono::duration_cast<std::chrono::milliseconds>(shutdown_timeout_).count() },
     247         836 :       {"http_client", http_client_->config_json()},
     248         836 :       {"event_scheduler", event_scheduler_->config_json()},
     249             :     })},
     250        4180 :   });
     251             :   // clang-format on
     252             : }
     253             : 
     254         429 : void DatadogAgent::flush() {
     255         429 :   std::vector<TraceChunk> trace_chunks;
     256             :   {
     257         429 :     std::lock_guard<std::mutex> lock(mutex_);
     258             :     using std::swap;
     259         429 :     swap(trace_chunks, trace_chunks_);
     260         429 :   }
     261             : 
     262         429 :   if (trace_chunks.empty()) {
     263          11 :     return;
     264             :   }
     265             : 
     266         418 :   std::string body;
     267         418 :   auto encode_result = msgpack_encode(body, trace_chunks);
     268         418 :   if (auto* error = encode_result.if_error()) {
     269           0 :     logger_->log_error(*error);
     270           0 :     return;
     271             :   }
     272             : 
     273             :   // One HTTP request to the Agent could possibly involve trace chunks from
     274             :   // multiple tracers, and thus multiple trace samplers might need to have
     275             :   // their rates updated. Unlikely, but possible.
     276         418 :   std::unordered_set<std::shared_ptr<TraceSampler>> response_handlers;
     277         836 :   for (auto& chunk : trace_chunks) {
     278         418 :     response_handlers.insert(std::move(chunk.response_handler));
     279             :   }
     280             : 
     281             :   // This is the callback for setting request headers.
     282             :   // It's invoked synchronously (before `post` returns).
     283         417 :   auto set_request_headers = [&](DictWriter& headers) {
     284         417 :     headers.set("Content-Type", "application/msgpack");
     285         417 :     headers.set("Datadog-Meta-Lang", "cpp");
     286         417 :     headers.set("Datadog-Meta-Lang-Version", std::to_string(__cplusplus));
     287         417 :     headers.set("Datadog-Meta-Tracer-Version", tracer_version);
     288         417 :     headers.set("X-Datadog-Trace-Count", std::to_string(trace_chunks.size()));
     289         417 :   };
     290             : 
     291             :   // This is the callback for the HTTP response.  It's invoked
     292             :   // asynchronously.
     293         827 :   auto on_response = [telemetry = tracer_telemetry_,
     294         418 :                       samplers = std::move(response_handlers),
     295         418 :                       logger = logger_](int response_status,
     296             :                                         const DictReader& /*response_headers*/,
     297             :                                         std::string response_body) {
     298         409 :     if (response_status >= 500) {
     299         100 :       telemetry->metrics().trace_api.responses_5xx.inc();
     300         309 :     } else if (response_status >= 400) {
     301         100 :       telemetry->metrics().trace_api.responses_4xx.inc();
     302         209 :     } else if (response_status >= 300) {
     303         100 :       telemetry->metrics().trace_api.responses_3xx.inc();
     304         109 :     } else if (response_status >= 200) {
     305         109 :       telemetry->metrics().trace_api.responses_2xx.inc();
     306           0 :     } else if (response_status >= 100) {
     307           0 :       telemetry->metrics().trace_api.responses_1xx.inc();
     308             :     }
     309         409 :     if (response_status != 200) {
     310         399 :       logger->log_error([&](auto& stream) {
     311         399 :         stream << "Unexpected response status " << response_status
     312         399 :                << " in Datadog Agent response with body of length "
     313         399 :                << response_body.size() << " (starts on next line):\n"
     314         399 :                << response_body;
     315         399 :       });
     316         405 :       return;
     317             :     }
     318             : 
     319          10 :     if (response_body.empty()) {
     320           1 :       logger->log_error([](auto& stream) {
     321           1 :         stream << "Datadog Agent returned response without a body."
     322             :                   " This tracer might be sending batches of traces too "
     323             :                   "frequently";
     324           1 :       });
     325           1 :       return;
     326             :     }
     327             : 
     328           9 :     auto result = parse_agent_traces_response(response_body);
     329           9 :     if (const auto* error_message = std::get_if<std::string>(&result)) {
     330           5 :       logger->log_error(*error_message);
     331           5 :       return;
     332             :     }
     333           4 :     const auto& response = std::get<CollectorResponse>(result);
     334           8 :     for (const auto& sampler : samplers) {
     335           4 :       if (sampler) {
     336           4 :         sampler->handle_collector_response(response);
     337             :       }
     338             :     }
     339         845 :   };
     340             : 
     341             :   // This is the callback for if something goes wrong sending the
     342             :   // request or retrieving the response.  It's invoked
     343             :   // asynchronously.
     344         426 :   auto on_error = [telemetry = tracer_telemetry_,
     345         418 :                    logger = logger_](Error error) {
     346           8 :     telemetry->metrics().trace_api.errors_network.inc();
     347           8 :     logger->log_error(error.with_prefix(
     348             :         "Error occurred during HTTP request for submitting traces: "));
     349         426 :   };
     350             : 
     351         418 :   tracer_telemetry_->metrics().trace_api.requests.inc();
     352             :   auto post_result =
     353         836 :       http_client_->post(traces_endpoint_, std::move(set_request_headers),
     354         836 :                          std::move(body), std::move(on_response),
     355        2090 :                          std::move(on_error), clock_().tick + request_timeout_);
     356         418 :   if (auto* error = post_result.if_error()) {
     357           2 :     logger_->log_error(
     358           2 :         error->with_prefix("Unexpected error submitting traces: "));
     359             :   }
     360         429 : }
     361             : 
     362          18 : void DatadogAgent::send_app_started() {
     363          18 :   auto payload = tracer_telemetry_->app_started();
     364             :   auto post_result =
     365          18 :       http_client_->post(telemetry_endpoint_, set_content_type_json,
     366          18 :                          std::move(payload), telemetry_on_response_,
     367          54 :                          telemetry_on_error_, clock_().tick + request_timeout_);
     368          18 :   if (auto* error = post_result.if_error()) {
     369           0 :     logger_->log_error(error->with_prefix(
     370             :         "Unexpected error submitting telemetry app-started event: "));
     371             :   }
     372          18 : }
     373             : 
     374           0 : void DatadogAgent::send_heartbeat_and_telemetry() {
     375           0 :   auto payload = tracer_telemetry_->heartbeat_and_telemetry();
     376             :   auto post_result =
     377           0 :       http_client_->post(telemetry_endpoint_, set_content_type_json,
     378           0 :                          std::move(payload), telemetry_on_response_,
     379           0 :                          telemetry_on_error_, clock_().tick + request_timeout_);
     380           0 :   if (auto* error = post_result.if_error()) {
     381           0 :     logger_->log_error(error->with_prefix(
     382             :         "Unexpected error submitting telemetry app-heartbeat event: "));
     383             :   }
     384           0 : }
     385             : 
     386          18 : void DatadogAgent::send_app_closing() {
     387          18 :   auto payload = tracer_telemetry_->app_closing();
     388             :   auto post_result =
     389          18 :       http_client_->post(telemetry_endpoint_, set_content_type_json,
     390          18 :                          std::move(payload), telemetry_on_response_,
     391          54 :                          telemetry_on_error_, clock_().tick + request_timeout_);
     392          18 :   if (auto* error = post_result.if_error()) {
     393           0 :     logger_->log_error(error->with_prefix(
     394             :         "Unexpected error submitting telemetry app-closing event: "));
     395             :   }
     396          18 : }
     397             : 
     398           0 : void DatadogAgent::get_and_apply_remote_configuration_updates() {
     399             :   auto remote_configuration_on_response =
     400           0 :       [this](int response_status, const DictReader& /*response_headers*/,
     401           0 :              std::string response_body) {
     402           0 :         if (response_status < 200 || response_status >= 300) {
     403           0 :           if (response_status == 404) {
     404             :             // If the Datadog Agent doesn't understand remote configuration,
     405             :             // or if remote configuration is disabled in the agent, then it
     406             :             // will return 404. This is not an error.
     407             :             // We'll keep polling, though, because the agent's configuration
     408             :             // might change.
     409           0 :             return;
     410             :           }
     411             : 
     412           0 :           logger_->log_error([&](auto& stream) {
     413           0 :             stream << "Unexpected Remote Configuration status "
     414           0 :                    << response_status
     415             :                    << " with body (if any, starts on next line):\n"
     416           0 :                    << response_body;
     417           0 :           });
     418             : 
     419           0 :           return;
     420             :         }
     421             : 
     422             :         const auto response_json =
     423             :             nlohmann::json::parse(/* input = */ response_body,
     424             :                                   /* parser_callback = */ nullptr,
     425           0 :                                   /* allow_exceptions = */ false);
     426           0 :         if (response_json.is_discarded()) {
     427           0 :           logger_->log_error([](auto& stream) {
     428           0 :             stream << "Could not parse Remote Configuration response body";
     429           0 :           });
     430           0 :           return;
     431             :         }
     432             : 
     433             :         // TODO(@dgoffredo): When is the parsed JSON object empty?
     434           0 :         if (!response_json.empty()) {
     435             :           // TODO (during Active Configuration): `process_response` should
     436             :           // return a list of configuration update and should be consumed by
     437             :           // telemetry.
     438           0 :           remote_config_.process_response(response_json);
     439             :         }
     440           0 :       };
     441             : 
     442           0 :   auto remote_configuration_on_error = [logger = logger_](Error error) {
     443           0 :     logger->log_error(error.with_prefix(
     444             :         "Error occurred during HTTP request for Remote Configuration: "));
     445           0 :   };
     446             : 
     447           0 :   auto post_result = http_client_->post(
     448           0 :       remote_configuration_endpoint_, set_content_type_json,
     449           0 :       remote_config_.make_request_payload().dump(),
     450             :       remote_configuration_on_response, remote_configuration_on_error,
     451           0 :       clock_().tick + request_timeout_);
     452           0 :   if (auto error = post_result.if_error()) {
     453           0 :     logger_->log_error(
     454           0 :         error->with_prefix("Unexpected error while requesting Remote "
     455             :                            "Configuration updates: "));
     456             :   }
     457           0 : }
     458             : 
     459             : }  // namespace tracing
     460             : }  // namespace datadog

Generated by: LCOV version 1.16