1 // Portions Copyright (c) 2011-present, Facebook, Inc. All rights reserved. 2 // This source code is licensed under both the GPLv2 (found in the 3 // COPYING file in the root directory) and Apache 2.0 License 4 // (found in the LICENSE.Apache file in the root directory). 5 // 6 // Borrowed from 7 // http://www.crazygaze.com/blog/2016/03/24/portable-c-timer-queue/ 8 // Timer Queue 9 // 10 // License 11 // 12 // The source code in this article is licensed under the CC0 license, so feel 13 // free to copy, modify, share, do whatever you want with it. 14 // No attribution is required, but Ill be happy if you do. 15 // CC0 license 16 17 // The person who associated a work with this deed has dedicated the work to the 18 // public domain by waiving all of his or her rights to the work worldwide 19 // under copyright law, including all related and neighboring rights, to the 20 // extent allowed by law. You can copy, modify, distribute and perform the 21 // work, even for commercial purposes, all without asking permission. 22 23 #pragma once 24 25 #include <assert.h> 26 #include <chrono> 27 #include <condition_variable> 28 #include <functional> 29 #include <queue> 30 #include <thread> 31 #include <utility> 32 #include <vector> 33 34 #include "port/port.h" 35 #include "test_util/sync_point.h" 36 37 // Allows execution of handlers at a specified time in the future 38 // Guarantees: 39 // - All handlers are executed ONCE, even if cancelled (aborted parameter will 40 // be set to true) 41 // - If TimerQueue is destroyed, it will cancel all handlers. 42 // - Handlers are ALWAYS executed in the Timer Queue worker thread. 43 // - Handlers execution order is NOT guaranteed 44 // 45 //////////////////////////////////////////////////////////////////////////////// 46 // borrowed from 47 // http://www.crazygaze.com/blog/2016/03/24/portable-c-timer-queue/ 48 class TimerQueue { 49 public: TimerQueue()50 TimerQueue() : m_th(&TimerQueue::run, this) {} 51 ~TimerQueue()52 ~TimerQueue() { shutdown(); } 53 54 // This function is not thread-safe. shutdown()55 void shutdown() { 56 if (closed_) { 57 return; 58 } 59 cancelAll(); 60 // Abusing the timer queue to trigger the shutdown. 61 add(0, [this](bool) { 62 m_finish = true; 63 return std::make_pair(false, 0); 64 }); 65 m_th.join(); 66 closed_ = true; 67 } 68 69 // Adds a new timer 70 // \return 71 // Returns the ID of the new timer. You can use this ID to cancel the 72 // timer add(int64_t milliseconds,std::function<std::pair<bool,int64_t> (bool)> handler)73 uint64_t add(int64_t milliseconds, 74 std::function<std::pair<bool, int64_t>(bool)> handler) { 75 WorkItem item; 76 Clock::time_point tp = Clock::now(); 77 item.end = tp + std::chrono::milliseconds(milliseconds); 78 TEST_SYNC_POINT_CALLBACK("TimeQueue::Add:item.end", &item.end); 79 item.period = milliseconds; 80 item.handler = std::move(handler); 81 82 std::unique_lock<std::mutex> lk(m_mtx); 83 uint64_t id = ++m_idcounter; 84 item.id = id; 85 m_items.push(std::move(item)); 86 87 // Something changed, so wake up timer thread 88 m_checkWork.notify_one(); 89 return id; 90 } 91 92 // Cancels the specified timer 93 // \return 94 // 1 if the timer was cancelled. 95 // 0 if you were too late to cancel (or the timer ID was never valid to 96 // start with) cancel(uint64_t id)97 size_t cancel(uint64_t id) { 98 // Instead of removing the item from the container (thus breaking the 99 // heap integrity), we set the item as having no handler, and put 100 // that handler on a new item at the top for immediate execution 101 // The timer thread will then ignore the original item, since it has no 102 // handler. 103 std::unique_lock<std::mutex> lk(m_mtx); 104 for (auto&& item : m_items.getContainer()) { 105 if (item.id == id && item.handler) { 106 WorkItem newItem; 107 // Zero time, so it stays at the top for immediate execution 108 newItem.end = Clock::time_point(); 109 newItem.id = 0; // Means it is a canceled item 110 // Move the handler from item to newitem (thus clearing item) 111 newItem.handler = std::move(item.handler); 112 m_items.push(std::move(newItem)); 113 114 // Something changed, so wake up timer thread 115 m_checkWork.notify_one(); 116 return 1; 117 } 118 } 119 return 0; 120 } 121 122 // Cancels all timers 123 // \return 124 // The number of timers cancelled cancelAll()125 size_t cancelAll() { 126 // Setting all "end" to 0 (for immediate execution) is ok, 127 // since it maintains the heap integrity 128 std::unique_lock<std::mutex> lk(m_mtx); 129 m_cancel = true; 130 for (auto&& item : m_items.getContainer()) { 131 if (item.id && item.handler) { 132 item.end = Clock::time_point(); 133 item.id = 0; 134 } 135 } 136 auto ret = m_items.size(); 137 138 m_checkWork.notify_one(); 139 return ret; 140 } 141 142 private: 143 using Clock = std::chrono::steady_clock; 144 TimerQueue(const TimerQueue&) = delete; 145 TimerQueue& operator=(const TimerQueue&) = delete; 146 run()147 void run() { 148 std::unique_lock<std::mutex> lk(m_mtx); 149 while (!m_finish) { 150 auto end = calcWaitTime_lock(); 151 if (end.first) { 152 // Timers found, so wait until it expires (or something else 153 // changes) 154 m_checkWork.wait_until(lk, end.second); 155 } else { 156 // No timers exist, so wait forever until something changes 157 m_checkWork.wait(lk); 158 } 159 160 // Check and execute as much work as possible, such as, all expired 161 // timers 162 checkWork(&lk); 163 } 164 165 // If we are shutting down, we should not have any items left, 166 // since the shutdown cancels all items 167 assert(m_items.size() == 0); 168 } 169 calcWaitTime_lock()170 std::pair<bool, Clock::time_point> calcWaitTime_lock() { 171 while (m_items.size()) { 172 if (m_items.top().handler) { 173 // Item present, so return the new wait time 174 return std::make_pair(true, m_items.top().end); 175 } else { 176 // Discard empty handlers (they were cancelled) 177 m_items.pop(); 178 } 179 } 180 181 // No items found, so return no wait time (causes the thread to wait 182 // indefinitely) 183 return std::make_pair(false, Clock::time_point()); 184 } 185 checkWork(std::unique_lock<std::mutex> * lk)186 void checkWork(std::unique_lock<std::mutex>* lk) { 187 while (m_items.size() && m_items.top().end <= Clock::now()) { 188 WorkItem item(m_items.top()); 189 m_items.pop(); 190 191 if (item.handler) { 192 (*lk).unlock(); 193 auto reschedule_pair = item.handler(item.id == 0); 194 (*lk).lock(); 195 if (!m_cancel && reschedule_pair.first) { 196 int64_t new_period = (reschedule_pair.second == -1) 197 ? item.period 198 : reschedule_pair.second; 199 200 item.period = new_period; 201 item.end = Clock::now() + std::chrono::milliseconds(new_period); 202 m_items.push(std::move(item)); 203 } 204 } 205 } 206 } 207 208 bool m_finish = false; 209 bool m_cancel = false; 210 uint64_t m_idcounter = 0; 211 std::condition_variable m_checkWork; 212 213 struct WorkItem { 214 Clock::time_point end; 215 int64_t period; 216 uint64_t id; // id==0 means it was cancelled 217 std::function<std::pair<bool, int64_t>(bool)> handler; 218 bool operator>(const WorkItem& other) const { return end > other.end; } 219 }; 220 221 std::mutex m_mtx; 222 // Inheriting from priority_queue, so we can access the internal container 223 class Queue : public std::priority_queue<WorkItem, std::vector<WorkItem>, 224 std::greater<WorkItem>> { 225 public: getContainer()226 std::vector<WorkItem>& getContainer() { return this->c; } 227 } m_items; 228 ROCKSDB_NAMESPACE::port::Thread m_th; 229 bool closed_ = false; 230 }; 231