Line data Source code
1 : #include "threaded_event_scheduler.h" 2 : 3 : #include <thread> 4 : 5 : #include "json.hpp" 6 : 7 : namespace datadog { 8 : namespace tracing { 9 : 10 56 : ThreadedEventScheduler::EventConfig::EventConfig( 11 : std::function<void()> callback, 12 56 : std::chrono::steady_clock::duration interval) 13 56 : : callback(callback), interval(interval), cancelled(false) {} 14 : 15 37 : bool ThreadedEventScheduler::GreaterThan::operator()( 16 : const ScheduledRun& left, const ScheduledRun& right) const { 17 37 : return left.when > right.when; 18 : } 19 : 20 208 : ThreadedEventScheduler::ThreadedEventScheduler() 21 208 : : running_current_(false), 22 208 : shutting_down_(false), 23 416 : dispatcher_([this]() { run(); }) {} 24 : 25 208 : ThreadedEventScheduler::~ThreadedEventScheduler() { 26 : { 27 208 : std::lock_guard guard(mutex_); 28 208 : shutting_down_ = true; 29 208 : schedule_or_shutdown_.notify_one(); 30 208 : } 31 208 : dispatcher_.join(); 32 208 : } 33 : 34 56 : EventScheduler::Cancel ThreadedEventScheduler::schedule_recurring_event( 35 : std::chrono::steady_clock::duration interval, 36 : std::function<void()> callback) { 37 56 : const auto now = std::chrono::steady_clock::now(); 38 56 : auto config = std::make_shared<EventConfig>(std::move(callback), interval); 39 : 40 : { 41 56 : std::lock_guard<std::mutex> guard(mutex_); 42 56 : upcoming_.push(ScheduledRun{now + interval, config}); 43 56 : schedule_or_shutdown_.notify_one(); 44 56 : } 45 : 46 : // Return a cancellation function. 47 224 : return [this, config = std::move(config)]() mutable { 48 56 : if (!config) { 49 0 : return; 50 : } 51 : 52 56 : std::unique_lock<std::mutex> lock(mutex_); 53 56 : config->cancelled = true; 54 56 : current_done_.wait(lock, [this, &config]() { 55 56 : return !running_current_ || current_.config != config; 56 : }); 57 56 : config.reset(); 58 168 : }; 59 56 : } 60 : 61 8 : nlohmann::json ThreadedEventScheduler::config_json() const { 62 32 : return nlohmann::json::object( 63 40 : {{"type", "datadog::tracing::ThreadedEventScheduler"}}); 64 : } 65 : 66 208 : void ThreadedEventScheduler::run() { 67 208 : std::unique_lock<std::mutex> lock(mutex_); 68 : 69 : for (;;) { 70 210 : schedule_or_shutdown_.wait( 71 409 : lock, [this]() { return shutting_down_ || !upcoming_.empty(); }); 72 210 : if (shutting_down_) { 73 190 : return; 74 : } 75 : 76 20 : current_ = upcoming_.top(); 77 : 78 20 : if (current_.config->cancelled) { 79 1 : upcoming_.pop(); 80 1 : continue; 81 : } 82 : 83 : const bool changed = 84 19 : schedule_or_shutdown_.wait_until(lock, current_.when, [this]() { 85 59 : return shutting_down_ || upcoming_.top().config != current_.config; 86 : }); 87 : 88 19 : if (shutting_down_) { 89 18 : return; 90 : } 91 : 92 1 : if (changed) { 93 : // A more recent event has been scheduled. 94 0 : continue; 95 : } 96 : 97 : // We waited for `current_` and now it's its turn. 98 1 : upcoming_.pop(); 99 1 : if (current_.config->cancelled) { 100 1 : continue; 101 : } 102 : 103 0 : upcoming_.push(ScheduledRun{current_.when + current_.config->interval, 104 0 : current_.config}); 105 0 : running_current_ = true; 106 0 : lock.unlock(); 107 0 : current_.config->callback(); 108 0 : lock.lock(); 109 0 : running_current_ = false; 110 0 : current_done_.notify_all(); 111 2 : } 112 208 : } 113 : 114 : } // namespace tracing 115 : } // namespace datadog