1 //  Portions Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Borrowed from
7 // http://www.crazygaze.com/blog/2016/03/24/portable-c-timer-queue/
8 // Timer Queue
9 //
10 // License
11 //
12 // The source code in this article is licensed under the CC0 license, so feel
13 // free to copy, modify, share, do whatever you want with it.
14 // No attribution is required, but Ill be happy if you do.
15 // CC0 license
16 
17 // The person who associated a work with this deed has dedicated the work to the
18 // public domain by waiving all of his or her rights to the work worldwide
19 // under copyright law, including all related and neighboring rights, to the
20 // extent allowed by law.  You can copy, modify, distribute and perform the
21 // work, even for commercial purposes, all without asking permission.
22 
23 #pragma once
24 
25 #include <assert.h>
26 #include <chrono>
27 #include <condition_variable>
28 #include <functional>
29 #include <queue>
30 #include <thread>
31 #include <utility>
32 #include <vector>
33 
34 #include "port/port.h"
35 #include "test_util/sync_point.h"
36 
37 // Allows execution of handlers at a specified time in the future
38 // Guarantees:
39 //  - All handlers are executed ONCE, even if cancelled (aborted parameter will
40 // be set to true)
41 //      - If TimerQueue is destroyed, it will cancel all handlers.
42 //  - Handlers are ALWAYS executed in the Timer Queue worker thread.
43 //  - Handlers execution order is NOT guaranteed
44 //
45 ////////////////////////////////////////////////////////////////////////////////
46 // borrowed from
47 // http://www.crazygaze.com/blog/2016/03/24/portable-c-timer-queue/
48 class TimerQueue {
49  public:
TimerQueue()50   TimerQueue() : m_th(&TimerQueue::run, this) {}
51 
~TimerQueue()52   ~TimerQueue() { shutdown(); }
53 
54   // This function is not thread-safe.
shutdown()55   void shutdown() {
56     if (closed_) {
57       return;
58     }
59     cancelAll();
60     // Abusing the timer queue to trigger the shutdown.
61     add(0, [this](bool) {
62       m_finish = true;
63       return std::make_pair(false, 0);
64     });
65     m_th.join();
66     closed_ = true;
67   }
68 
69   // Adds a new timer
70   // \return
71   //  Returns the ID of the new timer. You can use this ID to cancel the
72   // timer
add(int64_t milliseconds,std::function<std::pair<bool,int64_t> (bool)> handler)73   uint64_t add(int64_t milliseconds,
74                std::function<std::pair<bool, int64_t>(bool)> handler) {
75     WorkItem item;
76     Clock::time_point tp = Clock::now();
77     item.end = tp + std::chrono::milliseconds(milliseconds);
78     TEST_SYNC_POINT_CALLBACK("TimeQueue::Add:item.end", &item.end);
79     item.period = milliseconds;
80     item.handler = std::move(handler);
81 
82     std::unique_lock<std::mutex> lk(m_mtx);
83     uint64_t id = ++m_idcounter;
84     item.id = id;
85     m_items.push(std::move(item));
86 
87     // Something changed, so wake up timer thread
88     m_checkWork.notify_one();
89     return id;
90   }
91 
92   // Cancels the specified timer
93   // \return
94   //  1 if the timer was cancelled.
95   //  0 if you were too late to cancel (or the timer ID was never valid to
96   // start with)
cancel(uint64_t id)97   size_t cancel(uint64_t id) {
98     // Instead of removing the item from the container (thus breaking the
99     // heap integrity), we set the item as having no handler, and put
100     // that handler on a new item at the top for immediate execution
101     // The timer thread will then ignore the original item, since it has no
102     // handler.
103     std::unique_lock<std::mutex> lk(m_mtx);
104     for (auto&& item : m_items.getContainer()) {
105       if (item.id == id && item.handler) {
106         WorkItem newItem;
107         // Zero time, so it stays at the top for immediate execution
108         newItem.end = Clock::time_point();
109         newItem.id = 0;  // Means it is a canceled item
110         // Move the handler from item to newitem (thus clearing item)
111         newItem.handler = std::move(item.handler);
112         m_items.push(std::move(newItem));
113 
114         // Something changed, so wake up timer thread
115         m_checkWork.notify_one();
116         return 1;
117       }
118     }
119     return 0;
120   }
121 
122   // Cancels all timers
123   // \return
124   //  The number of timers cancelled
cancelAll()125   size_t cancelAll() {
126     // Setting all "end" to 0 (for immediate execution) is ok,
127     // since it maintains the heap integrity
128     std::unique_lock<std::mutex> lk(m_mtx);
129     m_cancel = true;
130     for (auto&& item : m_items.getContainer()) {
131       if (item.id && item.handler) {
132         item.end = Clock::time_point();
133         item.id = 0;
134       }
135     }
136     auto ret = m_items.size();
137 
138     m_checkWork.notify_one();
139     return ret;
140   }
141 
142  private:
143   using Clock = std::chrono::steady_clock;
144   TimerQueue(const TimerQueue&) = delete;
145   TimerQueue& operator=(const TimerQueue&) = delete;
146 
run()147   void run() {
148     std::unique_lock<std::mutex> lk(m_mtx);
149     while (!m_finish) {
150       auto end = calcWaitTime_lock();
151       if (end.first) {
152         // Timers found, so wait until it expires (or something else
153         // changes)
154         m_checkWork.wait_until(lk, end.second);
155       } else {
156         // No timers exist, so wait forever until something changes
157         m_checkWork.wait(lk);
158       }
159 
160       // Check and execute as much work as possible, such as, all expired
161       // timers
162       checkWork(&lk);
163     }
164 
165     // If we are shutting down, we should not have any items left,
166     // since the shutdown cancels all items
167     assert(m_items.size() == 0);
168   }
169 
calcWaitTime_lock()170   std::pair<bool, Clock::time_point> calcWaitTime_lock() {
171     while (m_items.size()) {
172       if (m_items.top().handler) {
173         // Item present, so return the new wait time
174         return std::make_pair(true, m_items.top().end);
175       } else {
176         // Discard empty handlers (they were cancelled)
177         m_items.pop();
178       }
179     }
180 
181     // No items found, so return no wait time (causes the thread to wait
182     // indefinitely)
183     return std::make_pair(false, Clock::time_point());
184   }
185 
checkWork(std::unique_lock<std::mutex> * lk)186   void checkWork(std::unique_lock<std::mutex>* lk) {
187     while (m_items.size() && m_items.top().end <= Clock::now()) {
188       WorkItem item(m_items.top());
189       m_items.pop();
190 
191       if (item.handler) {
192         (*lk).unlock();
193         auto reschedule_pair = item.handler(item.id == 0);
194         (*lk).lock();
195         if (!m_cancel && reschedule_pair.first) {
196           int64_t new_period = (reschedule_pair.second == -1)
197                                    ? item.period
198                                    : reschedule_pair.second;
199 
200           item.period = new_period;
201           item.end = Clock::now() + std::chrono::milliseconds(new_period);
202           m_items.push(std::move(item));
203         }
204       }
205     }
206   }
207 
208   bool m_finish = false;
209   bool m_cancel = false;
210   uint64_t m_idcounter = 0;
211   std::condition_variable m_checkWork;
212 
213   struct WorkItem {
214     Clock::time_point end;
215     int64_t period;
216     uint64_t id;  // id==0 means it was cancelled
217     std::function<std::pair<bool, int64_t>(bool)> handler;
218     bool operator>(const WorkItem& other) const { return end > other.end; }
219   };
220 
221   std::mutex m_mtx;
222   // Inheriting from priority_queue, so we can access the internal container
223   class Queue : public std::priority_queue<WorkItem, std::vector<WorkItem>,
224                                            std::greater<WorkItem>> {
225    public:
getContainer()226     std::vector<WorkItem>& getContainer() { return this->c; }
227   } m_items;
228   ROCKSDB_NAMESPACE::port::Thread m_th;
229   bool closed_ = false;
230 };
231