xref: /oneTBB/src/tbb/concurrent_monitor.h (revision 3e352b48)
1 /*
2     Copyright (c) 2005-2021 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #ifndef __TBB_concurrent_monitor_H
18 #define __TBB_concurrent_monitor_H
19 
20 #include "oneapi/tbb/spin_mutex.h"
21 #include "oneapi/tbb/detail/_exception.h"
22 #include "oneapi/tbb/detail/_aligned_space.h"
23 #include "concurrent_monitor_mutex.h"
24 #include "semaphore.h"
25 
26 #include <atomic>
27 
28 namespace tbb {
29 namespace detail {
30 namespace r1 {
31 
32 //! Circular doubly-linked list with sentinel
33 /** head.next points to the front and head.prev points to the back */
34 class circular_doubly_linked_list_with_sentinel {
35 public:
36     struct base_node {
37         base_node* next;
38         base_node* prev;
39 
40         constexpr base_node(base_node* n, base_node* p) : next(n), prev(p) {}
41         explicit base_node() : next((base_node*)(uintptr_t)0xcdcdcdcd), prev((base_node*)(uintptr_t)0xcdcdcdcd) {}
42     };
43 
44     // ctor
45     constexpr circular_doubly_linked_list_with_sentinel() : count(0), head(&head, &head) {}
46 
47     circular_doubly_linked_list_with_sentinel(const circular_doubly_linked_list_with_sentinel&) = delete;
48     circular_doubly_linked_list_with_sentinel& operator=(const circular_doubly_linked_list_with_sentinel&) = delete;
49 
50     inline std::size_t size() const { return count.load(std::memory_order_relaxed); }
51     inline bool empty() const { return size() == 0; }
52     inline base_node* front() const { return head.next; }
53     inline base_node* last() const { return head.prev; }
54     inline const base_node* end() const { return &head; }
55 
56     //! add to the back of the list
57     inline void add( base_node* n ) {
58         count.store(count.load(std::memory_order_relaxed) + 1, std::memory_order_relaxed);
59         n->prev = head.prev;
60         n->next = &head;
61         head.prev->next = n;
62         head.prev = n;
63     }
64 
65     //! remove node 'n'
66     inline void remove( base_node& n ) {
67         __TBB_ASSERT(count.load(std::memory_order_relaxed) > 0, "attempt to remove an item from an empty list");
68         count.store(count.load( std::memory_order_relaxed ) - 1, std::memory_order_relaxed);
69         n.prev->next = n.next;
70         n.next->prev = n.prev;
71     }
72 
73     //! move all elements to 'lst' and initialize the 'this' list
74     inline void flush_to( circular_doubly_linked_list_with_sentinel& lst ) {
75         const std::size_t l_count = size();
76         if (l_count > 0) {
77             lst.count.store(l_count, std::memory_order_relaxed);
78             lst.head.next = head.next;
79             lst.head.prev = head.prev;
80             head.next->prev = &lst.head;
81             head.prev->next = &lst.head;
82             clear();
83         }
84     }
85 
86     void clear() {
87         head.next = &head;
88         head.prev = &head;
89         count.store(0, std::memory_order_relaxed);
90     }
91 private:
92     std::atomic<std::size_t> count;
93     base_node head;
94 };
95 
96 using base_list = circular_doubly_linked_list_with_sentinel;
97 using base_node = circular_doubly_linked_list_with_sentinel::base_node;
98 
99 template <typename Context>
100 class concurrent_monitor_base;
101 
102 template <typename Context>
103 class wait_node : public base_node {
104 public:
105 
106 #if __TBB_GLIBCXX_VERSION >= 40800 && __TBB_GLIBCXX_VERSION < 40900
107     wait_node(Context ctx) : my_context(ctx), my_is_in_list(false) {}
108 #else
109     wait_node(Context ctx) : my_context(ctx) {}
110 #endif
111 
112     virtual ~wait_node() = default;
113 
114     virtual void init() {
115         __TBB_ASSERT(!my_initialized, nullptr);
116         my_initialized = true;
117     }
118 
119     virtual void wait() = 0;
120 
121     virtual void reset() {
122         __TBB_ASSERT(my_skipped_wakeup, nullptr);
123         my_skipped_wakeup = false;
124     }
125 
126     virtual void notify() = 0;
127 
128 protected:
129     friend class concurrent_monitor_base<Context>;
130     friend class thread_data;
131 
132     Context my_context{};
133 #if __TBB_GLIBCXX_VERSION >= 40800 && __TBB_GLIBCXX_VERSION < 40900
134     std::atomic<bool> my_is_in_list;
135 #else
136     std::atomic<bool> my_is_in_list{false};
137 #endif
138 
139     bool my_initialized{false};
140     bool my_skipped_wakeup{false};
141     bool my_aborted{false};
142     unsigned my_epoch{0};
143 };
144 
145 template <typename Context>
146 class sleep_node : public wait_node<Context> {
147     using base_type = wait_node<Context>;
148 public:
149     using base_type::base_type;
150 
151     ~sleep_node() override {
152         if (this->my_initialized) {
153             if (this->my_skipped_wakeup) semaphore().P();
154             semaphore().~binary_semaphore();
155         }
156     }
157 
158     binary_semaphore& semaphore() { return *sema.begin(); }
159 
160     void init() override {
161         if (!this->my_initialized) {
162             new (sema.begin()) binary_semaphore;
163             base_type::init();
164         }
165     }
166 
167     void wait() override {
168         __TBB_ASSERT(this->my_initialized,
169             "Use of commit_wait() without prior prepare_wait()");
170         semaphore().P();
171         __TBB_ASSERT(!this->my_is_in_list.load(std::memory_order_relaxed), "Still in the queue?");
172         if (this->my_aborted)
173             throw_exception(exception_id::user_abort);
174     }
175 
176     void reset() override {
177         base_type::reset();
178         semaphore().P();
179     }
180 
181     void notify() override {
182         semaphore().V();
183     }
184 
185 private:
186     tbb::detail::aligned_space<binary_semaphore> sema;
187 };
188 
189 //! concurrent_monitor
190 /** fine-grained concurrent_monitor implementation */
191 template <typename Context>
192 class concurrent_monitor_base {
193 public:
194     //! ctor
195     constexpr concurrent_monitor_base() {}
196     //! dtor
197     ~concurrent_monitor_base() = default;
198 
199     concurrent_monitor_base(const concurrent_monitor_base&) = delete;
200     concurrent_monitor_base& operator=(const concurrent_monitor_base&) = delete;
201 
202     //! prepare wait by inserting 'thr' into the wait queue
203     void prepare_wait( wait_node<Context>& node) {
204         // TODO: consider making even more lazy instantiation of the semaphore, that is only when it is actually needed, e.g. move it in node::wait()
205         if (!node.my_initialized) {
206             node.init();
207         }
208         // this is good place to pump previous skipped wakeup
209         else if (node.my_skipped_wakeup) {
210             node.reset();
211         }
212 
213         node.my_is_in_list.store(true, std::memory_order_relaxed);
214 
215         {
216             concurrent_monitor_mutex::scoped_lock l(my_mutex);
217             node.my_epoch = my_epoch.load(std::memory_order_relaxed);
218             my_waitset.add(&node);
219         }
220 
221         // Prepare wait guarantees Write Read memory barrier.
222         // In C++ only full fence covers this type of barrier.
223         atomic_fence_seq_cst();
224     }
225 
226     //! Commit wait if event count has not changed; otherwise, cancel wait.
227     /** Returns true if committed, false if canceled. */
228     inline bool commit_wait( wait_node<Context>& node ) {
229         const bool do_it = node.my_epoch == my_epoch.load(std::memory_order_relaxed);
230         // this check is just an optimization
231         if (do_it) {
232            node.wait();
233         } else {
234             cancel_wait( node );
235         }
236         return do_it;
237     }
238 
239     //! Cancel the wait. Removes the thread from the wait queue if not removed yet.
240     void cancel_wait( wait_node<Context>& node ) {
241         // possible skipped wakeup will be pumped in the following prepare_wait()
242         node.my_skipped_wakeup = true;
243         // try to remove node from waitset
244         // Cancel wait guarantees acquire memory barrier.
245         bool in_list = node.my_is_in_list.load(std::memory_order_acquire);
246         if (in_list) {
247             concurrent_monitor_mutex::scoped_lock l(my_mutex);
248             if (node.my_is_in_list.load(std::memory_order_relaxed)) {
249                 my_waitset.remove(node);
250                 // node is removed from waitset, so there will be no wakeup
251                 node.my_is_in_list.store(false, std::memory_order_relaxed);
252                 node.my_skipped_wakeup = false;
253             }
254         }
255     }
256 
257     //! Wait for a condition to be satisfied with waiting-on my_context
258     template <typename NodeType, typename Pred>
259     bool wait(Pred&& pred, NodeType&& node) {
260         prepare_wait(node);
261         while (!guarded_call(std::forward<Pred>(pred), node)) {
262             if (commit_wait(node)) {
263                 return true;
264             }
265 
266             prepare_wait(node);
267         }
268 
269         cancel_wait(node);
270         return false;
271     }
272 
273     //! Notify one thread about the event
274     void notify_one() {
275         atomic_fence_seq_cst();
276         notify_one_relaxed();
277     }
278 
279     //! Notify one thread about the event. Relaxed version.
280     void notify_one_relaxed() {
281         if (my_waitset.empty()) {
282             return;
283         }
284 
285         base_node* n;
286         const base_node* end = my_waitset.end();
287         {
288             concurrent_monitor_mutex::scoped_lock l(my_mutex);
289             my_epoch.store(my_epoch.load(std::memory_order_relaxed) + 1, std::memory_order_relaxed);
290             n = my_waitset.front();
291             if (n != end) {
292                 my_waitset.remove(*n);
293                 to_wait_node(n)->my_is_in_list.store(false, std::memory_order_relaxed);
294             }
295         }
296 
297         if (n != end) {
298             to_wait_node(n)->notify();
299         }
300     }
301 
302     //! Notify all waiting threads of the event
303     void notify_all() {
304         atomic_fence_seq_cst();
305         notify_all_relaxed();
306     }
307 
308     // ! Notify all waiting threads of the event; Relaxed version
309     void notify_all_relaxed() {
310         if (my_waitset.empty()) {
311             return;
312         }
313 
314         base_list temp;
315         const base_node* end;
316         {
317             concurrent_monitor_mutex::scoped_lock l(my_mutex);
318             my_epoch.store(my_epoch.load(std::memory_order_relaxed) + 1, std::memory_order_relaxed);
319             // TODO: Possible optimization, don't change node state under lock, just do flush
320             my_waitset.flush_to(temp);
321             end = temp.end();
322             for (base_node* n = temp.front(); n != end; n = n->next) {
323                 to_wait_node(n)->my_is_in_list.store(false, std::memory_order_relaxed);
324             }
325         }
326 
327         base_node* nxt;
328         for (base_node* n = temp.front(); n != end; n=nxt) {
329             nxt = n->next;
330             to_wait_node(n)->notify();
331         }
332 #if TBB_USE_ASSERT
333         temp.clear();
334 #endif
335     }
336 
337     //! Notify waiting threads of the event that satisfies the given predicate
338     template <typename P>
339     void notify( const P& predicate ) {
340         atomic_fence_seq_cst();
341         notify_relaxed( predicate );
342     }
343 
344     //! Notify waiting threads of the event that satisfies the given predicate;
345     //! the predicate is called under the lock. Relaxed version.
346     template<typename P>
347     void notify_relaxed( const P& predicate ) {
348         if (my_waitset.empty()) {
349             return;
350         }
351 
352         base_list temp;
353         base_node* nxt;
354         const base_node* end = my_waitset.end();
355         {
356             concurrent_monitor_mutex::scoped_lock l(my_mutex);
357             my_epoch.store(my_epoch.load( std::memory_order_relaxed ) + 1, std::memory_order_relaxed);
358             for (base_node* n = my_waitset.last(); n != end; n = nxt) {
359                 nxt = n->prev;
360                 auto* node = static_cast<wait_node<Context>*>(n);
361                 if (predicate(node->my_context)) {
362                     my_waitset.remove(*n);
363                     node->my_is_in_list.store(false, std::memory_order_relaxed);
364                     temp.add(n);
365                 }
366             }
367         }
368 
369         end = temp.end();
370         for (base_node* n=temp.front(); n != end; n = nxt) {
371             nxt = n->next;
372             to_wait_node(n)->notify();
373         }
374 #if TBB_USE_ASSERT
375         temp.clear();
376 #endif
377     }
378 
379     //! Notify waiting threads of the event that satisfies the given predicate;
380     //! the predicate is called under the lock. Relaxed version.
381     template<typename P>
382     void notify_one_relaxed( const P& predicate ) {
383         if (my_waitset.empty()) {
384             return;
385         }
386 
387         base_node* tmp = nullptr;
388         base_node* next{};
389         const base_node* end = my_waitset.end();
390         {
391             concurrent_monitor_mutex::scoped_lock l(my_mutex);
392             my_epoch.store(my_epoch.load( std::memory_order_relaxed ) + 1, std::memory_order_relaxed);
393             for (base_node* n = my_waitset.last(); n != end; n = next) {
394                 next = n->prev;
395                 auto* node = static_cast<wait_node<Context>*>(n);
396                 if (predicate(node->my_context)) {
397                     my_waitset.remove(*n);
398                     node->my_is_in_list.store(false, std::memory_order_relaxed);
399                     tmp = n;
400                     break;
401                 }
402             }
403         }
404 
405         if (tmp) {
406             to_wait_node(tmp)->notify();
407         }
408     }
409 
410     //! Abort any sleeping threads at the time of the call
411     void abort_all() {
412         atomic_fence_seq_cst();
413         abort_all_relaxed();
414     }
415 
416     //! Abort any sleeping threads at the time of the call; Relaxed version
417     void abort_all_relaxed() {
418         if (my_waitset.empty()) {
419             return;
420         }
421 
422         base_list temp;
423         const base_node* end;
424         {
425             concurrent_monitor_mutex::scoped_lock l(my_mutex);
426             my_epoch.store(my_epoch.load(std::memory_order_relaxed) + 1, std::memory_order_relaxed);
427             my_waitset.flush_to(temp);
428             end = temp.end();
429             for (base_node* n = temp.front(); n != end; n = n->next) {
430                 to_wait_node(n)->my_is_in_list.store(false, std::memory_order_relaxed);
431             }
432         }
433 
434         base_node* nxt;
435         for (base_node* n = temp.front(); n != end; n = nxt) {
436             nxt = n->next;
437             to_wait_node(n)->my_aborted = true;
438             to_wait_node(n)->notify();
439         }
440 #if TBB_USE_ASSERT
441         temp.clear();
442 #endif
443     }
444 
445     void destroy() {
446         this->abort_all();
447         my_mutex.destroy();
448         __TBB_ASSERT(this->my_waitset.empty(), "waitset not empty?");
449     }
450 
451 private:
452     template <typename NodeType, typename Pred>
453     bool guarded_call(Pred&& predicate, NodeType& node) {
454         bool res = false;
455         tbb::detail::d0::try_call( [&] {
456             res = std::forward<Pred>(predicate)();
457         }).on_exception( [&] {
458             cancel_wait(node);
459         });
460 
461         return res;
462     }
463 
464     concurrent_monitor_mutex my_mutex{};
465     base_list my_waitset{};
466     std::atomic<unsigned> my_epoch{};
467 
468     wait_node<Context>* to_wait_node( base_node* node ) { return static_cast<wait_node<Context>*>(node); }
469 };
470 
471 class concurrent_monitor : public concurrent_monitor_base<std::uintptr_t> {
472     using base_type = concurrent_monitor_base<std::uintptr_t>;
473 public:
474     using base_type::base_type;
475 
476     ~concurrent_monitor() {
477         destroy();
478     }
479 
480     /** per-thread descriptor for concurrent_monitor */
481     using thread_context = sleep_node<std::uintptr_t>;
482 };
483 
484 } // namespace r1
485 } // namespace detail
486 } // namespace tbb
487 
488 #endif /* __TBB_concurrent_monitor_H */
489