LCOV - code coverage report
Current view: top level - datadog - threaded_event_scheduler.cpp (source / functions) Hit Total Coverage
Test: filtered.info Lines: 55 65 84.6 %
Date: 2024-01-03 20:30:12 Functions: 12 13 92.3 %

          Line data    Source code
       1             : #include "threaded_event_scheduler.h"
       2             : 
       3             : #include <thread>
       4             : 
       5             : #include "json.hpp"
       6             : 
       7             : namespace datadog {
       8             : namespace tracing {
       9             : 
      10          56 : ThreadedEventScheduler::EventConfig::EventConfig(
      11             :     std::function<void()> callback,
      12          56 :     std::chrono::steady_clock::duration interval)
      13          56 :     : callback(callback), interval(interval), cancelled(false) {}
      14             : 
      15          37 : bool ThreadedEventScheduler::GreaterThan::operator()(
      16             :     const ScheduledRun& left, const ScheduledRun& right) const {
      17          37 :   return left.when > right.when;
      18             : }
      19             : 
      20         208 : ThreadedEventScheduler::ThreadedEventScheduler()
      21         208 :     : running_current_(false),
      22         208 :       shutting_down_(false),
      23         416 :       dispatcher_([this]() { run(); }) {}
      24             : 
      25         208 : ThreadedEventScheduler::~ThreadedEventScheduler() {
      26             :   {
      27         208 :     std::lock_guard guard(mutex_);
      28         208 :     shutting_down_ = true;
      29         208 :     schedule_or_shutdown_.notify_one();
      30         208 :   }
      31         208 :   dispatcher_.join();
      32         208 : }
      33             : 
      34          56 : EventScheduler::Cancel ThreadedEventScheduler::schedule_recurring_event(
      35             :     std::chrono::steady_clock::duration interval,
      36             :     std::function<void()> callback) {
      37          56 :   const auto now = std::chrono::steady_clock::now();
      38          56 :   auto config = std::make_shared<EventConfig>(std::move(callback), interval);
      39             : 
      40             :   {
      41          56 :     std::lock_guard<std::mutex> guard(mutex_);
      42          56 :     upcoming_.push(ScheduledRun{now + interval, config});
      43          56 :     schedule_or_shutdown_.notify_one();
      44          56 :   }
      45             : 
      46             :   // Return a cancellation function.
      47         224 :   return [this, config = std::move(config)]() mutable {
      48          56 :     if (!config) {
      49           0 :       return;
      50             :     }
      51             : 
      52          56 :     std::unique_lock<std::mutex> lock(mutex_);
      53          56 :     config->cancelled = true;
      54          56 :     current_done_.wait(lock, [this, &config]() {
      55          56 :       return !running_current_ || current_.config != config;
      56             :     });
      57          56 :     config.reset();
      58         168 :   };
      59          56 : }
      60             : 
      61           8 : nlohmann::json ThreadedEventScheduler::config_json() const {
      62          32 :   return nlohmann::json::object(
      63          40 :       {{"type", "datadog::tracing::ThreadedEventScheduler"}});
      64             : }
      65             : 
      66         208 : void ThreadedEventScheduler::run() {
      67         208 :   std::unique_lock<std::mutex> lock(mutex_);
      68             : 
      69             :   for (;;) {
      70         210 :     schedule_or_shutdown_.wait(
      71         409 :         lock, [this]() { return shutting_down_ || !upcoming_.empty(); });
      72         210 :     if (shutting_down_) {
      73         190 :       return;
      74             :     }
      75             : 
      76          20 :     current_ = upcoming_.top();
      77             : 
      78          20 :     if (current_.config->cancelled) {
      79           1 :       upcoming_.pop();
      80           1 :       continue;
      81             :     }
      82             : 
      83             :     const bool changed =
      84          19 :         schedule_or_shutdown_.wait_until(lock, current_.when, [this]() {
      85          59 :           return shutting_down_ || upcoming_.top().config != current_.config;
      86             :         });
      87             : 
      88          19 :     if (shutting_down_) {
      89          18 :       return;
      90             :     }
      91             : 
      92           1 :     if (changed) {
      93             :       // A more recent event has been scheduled.
      94           0 :       continue;
      95             :     }
      96             : 
      97             :     // We waited for `current_` and now it's its turn.
      98           1 :     upcoming_.pop();
      99           1 :     if (current_.config->cancelled) {
     100           1 :       continue;
     101             :     }
     102             : 
     103           0 :     upcoming_.push(ScheduledRun{current_.when + current_.config->interval,
     104           0 :                                 current_.config});
     105           0 :     running_current_ = true;
     106           0 :     lock.unlock();
     107           0 :     current_.config->callback();
     108           0 :     lock.lock();
     109           0 :     running_current_ = false;
     110           0 :     current_done_.notify_all();
     111           2 :   }
     112         208 : }
     113             : 
     114             : }  // namespace tracing
     115             : }  // namespace datadog

Generated by: LCOV version 1.16