xref: /oneTBB/src/tbb/concurrent_monitor.h (revision e1310717)
1 /*
2     Copyright (c) 2005-2023 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #ifndef __TBB_concurrent_monitor_H
18 #define __TBB_concurrent_monitor_H
19 
20 #include "oneapi/tbb/spin_mutex.h"
21 #include "oneapi/tbb/detail/_exception.h"
22 #include "oneapi/tbb/detail/_aligned_space.h"
23 #include "concurrent_monitor_mutex.h"
24 #include "semaphore.h"
25 
26 #include <atomic>
27 
28 namespace tbb {
29 namespace detail {
30 namespace r1 {
31 
32 //! Circular doubly-linked list with sentinel
33 /** head.next points to the front and head.prev points to the back */
34 class circular_doubly_linked_list_with_sentinel {
35 public:
36     struct base_node {
37         base_node* next;
38         base_node* prev;
39 
base_nodebase_node40         constexpr base_node(base_node* n, base_node* p) : next(n), prev(p) {}
base_nodebase_node41         explicit base_node() : next((base_node*)(uintptr_t)0xcdcdcdcd), prev((base_node*)(uintptr_t)0xcdcdcdcd) {}
42     };
43 
44     // ctor
circular_doubly_linked_list_with_sentinel()45     constexpr circular_doubly_linked_list_with_sentinel() : count(0), head(&head, &head) {}
46 
47     circular_doubly_linked_list_with_sentinel(const circular_doubly_linked_list_with_sentinel&) = delete;
48     circular_doubly_linked_list_with_sentinel& operator=(const circular_doubly_linked_list_with_sentinel&) = delete;
49 
size()50     inline std::size_t size() const { return count.load(std::memory_order_relaxed); }
empty()51     inline bool empty() const { return size() == 0; }
front()52     inline base_node* front() const { return head.next; }
last()53     inline base_node* last() const { return head.prev; }
end()54     inline const base_node* end() const { return &head; }
55 
56     //! add to the back of the list
add(base_node * n)57     inline void add( base_node* n ) {
58         count.store(count.load(std::memory_order_relaxed) + 1, std::memory_order_relaxed);
59         n->prev = head.prev;
60         n->next = &head;
61         head.prev->next = n;
62         head.prev = n;
63     }
64 
65     //! remove node 'n'
remove(base_node & n)66     inline void remove( base_node& n ) {
67         __TBB_ASSERT(count.load(std::memory_order_relaxed) > 0, "attempt to remove an item from an empty list");
68         count.store(count.load( std::memory_order_relaxed ) - 1, std::memory_order_relaxed);
69         n.prev->next = n.next;
70         n.next->prev = n.prev;
71     }
72 
73     //! move all elements to 'lst' and initialize the 'this' list
flush_to(circular_doubly_linked_list_with_sentinel & lst)74     inline void flush_to( circular_doubly_linked_list_with_sentinel& lst ) {
75         const std::size_t l_count = size();
76         if (l_count > 0) {
77             lst.count.store(l_count, std::memory_order_relaxed);
78             lst.head.next = head.next;
79             lst.head.prev = head.prev;
80             head.next->prev = &lst.head;
81             head.prev->next = &lst.head;
82             clear();
83         }
84     }
85 
clear()86     void clear() {
87         head.next = &head;
88         head.prev = &head;
89         count.store(0, std::memory_order_relaxed);
90     }
91 private:
92     std::atomic<std::size_t> count;
93     base_node head;
94 };
95 
96 using base_list = circular_doubly_linked_list_with_sentinel;
97 using base_node = circular_doubly_linked_list_with_sentinel::base_node;
98 
99 template <typename Context>
100 class concurrent_monitor_base;
101 
102 template <typename Context>
103 class wait_node : public base_node {
104 public:
105 
106 #if __TBB_GLIBCXX_VERSION >= 40800 && __TBB_GLIBCXX_VERSION < 40900
wait_node(Context ctx)107     wait_node(Context ctx) : my_context(ctx), my_is_in_list(false) {}
108 #else
109     wait_node(Context ctx) : my_context(ctx) {}
110 #endif
111 
112     virtual ~wait_node() = default;
113 
init()114     virtual void init() {
115         __TBB_ASSERT(!my_initialized, nullptr);
116         my_initialized = true;
117     }
118 
119     virtual void wait() = 0;
120 
reset()121     virtual void reset() {
122         __TBB_ASSERT(my_skipped_wakeup, nullptr);
123         my_skipped_wakeup = false;
124     }
125 
126     virtual void notify() = 0;
127 
128 protected:
129     friend class concurrent_monitor_base<Context>;
130     friend class thread_data;
131 
132     Context my_context{};
133 #if __TBB_GLIBCXX_VERSION >= 40800 && __TBB_GLIBCXX_VERSION < 40900
134     std::atomic<bool> my_is_in_list;
135 #else
136     std::atomic<bool> my_is_in_list{false};
137 #endif
138 
139     bool my_initialized{false};
140     bool my_skipped_wakeup{false};
141     bool my_aborted{false};
142     unsigned my_epoch{0};
143 };
144 
145 template <typename Context>
146 class sleep_node : public wait_node<Context> {
147     using base_type = wait_node<Context>;
148 public:
149     using base_type::base_type;
150 
~sleep_node()151     ~sleep_node() override {
152         if (this->my_initialized) {
153             if (this->my_skipped_wakeup) semaphore().P();
154             semaphore().~binary_semaphore();
155         }
156     }
157 
semaphore()158     binary_semaphore& semaphore() { return *sema.begin(); }
159 
init()160     void init() override {
161         if (!this->my_initialized) {
162             new (sema.begin()) binary_semaphore;
163             base_type::init();
164         }
165     }
166 
wait()167     void wait() override {
168         __TBB_ASSERT(this->my_initialized,
169             "Use of commit_wait() without prior prepare_wait()");
170         semaphore().P();
171         __TBB_ASSERT(!this->my_is_in_list.load(std::memory_order_relaxed), "Still in the queue?");
172         if (this->my_aborted)
173             throw_exception(exception_id::user_abort);
174     }
175 
reset()176     void reset() override {
177         base_type::reset();
178         semaphore().P();
179     }
180 
notify()181     void notify() override {
182         semaphore().V();
183     }
184 
185 private:
186     tbb::detail::aligned_space<binary_semaphore> sema;
187 };
188 
189 //! concurrent_monitor
190 /** fine-grained concurrent_monitor implementation */
191 template <typename Context>
192 class concurrent_monitor_base {
193 public:
194     //! ctor
concurrent_monitor_base()195     constexpr concurrent_monitor_base() {}
196     //! dtor
197     ~concurrent_monitor_base() = default;
198 
199     concurrent_monitor_base(const concurrent_monitor_base&) = delete;
200     concurrent_monitor_base& operator=(const concurrent_monitor_base&) = delete;
201 
202     //! prepare wait by inserting 'thr' into the wait queue
prepare_wait(wait_node<Context> & node)203     void prepare_wait( wait_node<Context>& node) {
204         // TODO: consider making even more lazy instantiation of the semaphore, that is only when it is actually needed, e.g. move it in node::wait()
205         if (!node.my_initialized) {
206             node.init();
207         }
208         // this is good place to pump previous skipped wakeup
209         else if (node.my_skipped_wakeup) {
210             node.reset();
211         }
212 
213         node.my_is_in_list.store(true, std::memory_order_relaxed);
214 
215         {
216             concurrent_monitor_mutex::scoped_lock l(my_mutex);
217             node.my_epoch = my_epoch.load(std::memory_order_relaxed);
218             my_waitset.add(&node);
219         }
220 
221         // Prepare wait guarantees Write Read memory barrier.
222         // In C++ only full fence covers this type of barrier.
223         atomic_fence_seq_cst();
224     }
225 
226     //! Commit wait if event count has not changed; otherwise, cancel wait.
227     /** Returns true if committed, false if canceled. */
commit_wait(wait_node<Context> & node)228     inline bool commit_wait( wait_node<Context>& node ) {
229         const bool do_it = node.my_epoch == my_epoch.load(std::memory_order_relaxed);
230         // this check is just an optimization
231         if (do_it) {
232            node.wait();
233         } else {
234             cancel_wait( node );
235         }
236         return do_it;
237     }
238 
239     //! Cancel the wait. Removes the thread from the wait queue if not removed yet.
cancel_wait(wait_node<Context> & node)240     void cancel_wait( wait_node<Context>& node ) {
241         // possible skipped wakeup will be pumped in the following prepare_wait()
242         node.my_skipped_wakeup = true;
243         // try to remove node from waitset
244         // Cancel wait guarantees acquire memory barrier.
245         bool in_list = node.my_is_in_list.load(std::memory_order_acquire);
246         if (in_list) {
247             concurrent_monitor_mutex::scoped_lock l(my_mutex);
248             if (node.my_is_in_list.load(std::memory_order_relaxed)) {
249                 my_waitset.remove(node);
250                 // node is removed from waitset, so there will be no wakeup
251                 node.my_is_in_list.store(false, std::memory_order_relaxed);
252                 node.my_skipped_wakeup = false;
253             }
254         }
255     }
256 
257     //! Wait for a condition to be satisfied with waiting-on my_context
258     template <typename NodeType, typename Pred>
wait(Pred && pred,NodeType && node)259     bool wait(Pred&& pred, NodeType&& node) {
260         prepare_wait(node);
261         while (!guarded_call(std::forward<Pred>(pred), node)) {
262             if (commit_wait(node)) {
263                 return true;
264             }
265 
266             prepare_wait(node);
267         }
268 
269         cancel_wait(node);
270         return false;
271     }
272 
273     //! Notify one thread about the event
notify_one()274     void notify_one() {
275         atomic_fence_seq_cst();
276         notify_one_relaxed();
277     }
278 
279     //! Notify one thread about the event. Relaxed version.
notify_one_relaxed()280     void notify_one_relaxed() {
281         if (my_waitset.empty()) {
282             return;
283         }
284 
285         base_node* n;
286         const base_node* end = my_waitset.end();
287         {
288             concurrent_monitor_mutex::scoped_lock l(my_mutex);
289             my_epoch.store(my_epoch.load(std::memory_order_relaxed) + 1, std::memory_order_relaxed);
290             n = my_waitset.front();
291             if (n != end) {
292                 my_waitset.remove(*n);
293 
294 // GCC 12.x-13.x issues a warning here that to_wait_node(n)->my_is_in_list might have size 0, since n is
295 // a base_node pointer. (This cannot happen, because only wait_node pointers are added to my_waitset.)
296 #if (__TBB_GCC_VERSION >= 120100 && __TBB_GCC_VERSION < 140000 ) && !__clang__ && !__INTEL_COMPILER
297 #pragma GCC diagnostic push
298 #pragma GCC diagnostic ignored "-Wstringop-overflow"
299 #endif
300                 to_wait_node(n)->my_is_in_list.store(false, std::memory_order_relaxed);
301 #if (__TBB_GCC_VERSION >= 120100 && __TBB_GCC_VERSION < 140000 ) && !__clang__ && !__INTEL_COMPILER
302 #pragma GCC diagnostic pop
303 #endif
304             }
305         }
306 
307         if (n != end) {
308             to_wait_node(n)->notify();
309         }
310     }
311 
312     //! Notify all waiting threads of the event
notify_all()313     void notify_all() {
314         atomic_fence_seq_cst();
315         notify_all_relaxed();
316     }
317 
318     // ! Notify all waiting threads of the event; Relaxed version
notify_all_relaxed()319     void notify_all_relaxed() {
320         if (my_waitset.empty()) {
321             return;
322         }
323 
324         base_list temp;
325         const base_node* end;
326         {
327             concurrent_monitor_mutex::scoped_lock l(my_mutex);
328             my_epoch.store(my_epoch.load(std::memory_order_relaxed) + 1, std::memory_order_relaxed);
329             // TODO: Possible optimization, don't change node state under lock, just do flush
330             my_waitset.flush_to(temp);
331             end = temp.end();
332             for (base_node* n = temp.front(); n != end; n = n->next) {
333                 to_wait_node(n)->my_is_in_list.store(false, std::memory_order_relaxed);
334             }
335         }
336 
337         base_node* nxt;
338         for (base_node* n = temp.front(); n != end; n=nxt) {
339             nxt = n->next;
340             to_wait_node(n)->notify();
341         }
342 #if TBB_USE_ASSERT
343         temp.clear();
344 #endif
345     }
346 
347     //! Notify waiting threads of the event that satisfies the given predicate
348     template <typename P>
notify(const P & predicate)349     void notify( const P& predicate ) {
350         atomic_fence_seq_cst();
351         notify_relaxed( predicate );
352     }
353 
354     //! Notify waiting threads of the event that satisfies the given predicate;
355     //! the predicate is called under the lock. Relaxed version.
356     template<typename P>
notify_relaxed(const P & predicate)357     void notify_relaxed( const P& predicate ) {
358         if (my_waitset.empty()) {
359             return;
360         }
361 
362         base_list temp;
363         base_node* nxt;
364         const base_node* end = my_waitset.end();
365         {
366             concurrent_monitor_mutex::scoped_lock l(my_mutex);
367             my_epoch.store(my_epoch.load( std::memory_order_relaxed ) + 1, std::memory_order_relaxed);
368             for (base_node* n = my_waitset.last(); n != end; n = nxt) {
369                 nxt = n->prev;
370                 auto* node = static_cast<wait_node<Context>*>(n);
371                 if (predicate(node->my_context)) {
372                     my_waitset.remove(*n);
373                     node->my_is_in_list.store(false, std::memory_order_relaxed);
374                     temp.add(n);
375                 }
376             }
377         }
378 
379         end = temp.end();
380         for (base_node* n=temp.front(); n != end; n = nxt) {
381             nxt = n->next;
382             to_wait_node(n)->notify();
383         }
384 #if TBB_USE_ASSERT
385         temp.clear();
386 #endif
387     }
388 
389     //! Notify waiting threads of the event that satisfies the given predicate;
390     //! the predicate is called under the lock. Relaxed version.
391     template<typename P>
notify_one_relaxed(const P & predicate)392     void notify_one_relaxed( const P& predicate ) {
393         if (my_waitset.empty()) {
394             return;
395         }
396 
397         base_node* tmp = nullptr;
398         base_node* next{};
399         const base_node* end = my_waitset.end();
400         {
401             concurrent_monitor_mutex::scoped_lock l(my_mutex);
402             my_epoch.store(my_epoch.load( std::memory_order_relaxed ) + 1, std::memory_order_relaxed);
403             for (base_node* n = my_waitset.last(); n != end; n = next) {
404                 next = n->prev;
405                 auto* node = static_cast<wait_node<Context>*>(n);
406                 if (predicate(node->my_context)) {
407                     my_waitset.remove(*n);
408                     node->my_is_in_list.store(false, std::memory_order_relaxed);
409                     tmp = n;
410                     break;
411                 }
412             }
413         }
414 
415         if (tmp) {
416             to_wait_node(tmp)->notify();
417         }
418     }
419 
420     //! Abort any sleeping threads at the time of the call
abort_all()421     void abort_all() {
422         atomic_fence_seq_cst();
423         abort_all_relaxed();
424     }
425 
426     //! Abort any sleeping threads at the time of the call; Relaxed version
abort_all_relaxed()427     void abort_all_relaxed() {
428         if (my_waitset.empty()) {
429             return;
430         }
431 
432         base_list temp;
433         const base_node* end;
434         {
435             concurrent_monitor_mutex::scoped_lock l(my_mutex);
436             my_epoch.store(my_epoch.load(std::memory_order_relaxed) + 1, std::memory_order_relaxed);
437             my_waitset.flush_to(temp);
438             end = temp.end();
439             for (base_node* n = temp.front(); n != end; n = n->next) {
440                 to_wait_node(n)->my_is_in_list.store(false, std::memory_order_relaxed);
441             }
442         }
443 
444         base_node* nxt;
445         for (base_node* n = temp.front(); n != end; n = nxt) {
446             nxt = n->next;
447             to_wait_node(n)->my_aborted = true;
448             to_wait_node(n)->notify();
449         }
450 #if TBB_USE_ASSERT
451         temp.clear();
452 #endif
453     }
454 
destroy()455     void destroy() {
456         this->abort_all();
457         my_mutex.destroy();
458         __TBB_ASSERT(this->my_waitset.empty(), "waitset not empty?");
459     }
460 
461 private:
462     template <typename NodeType, typename Pred>
guarded_call(Pred && predicate,NodeType & node)463     bool guarded_call(Pred&& predicate, NodeType& node) {
464         bool res = false;
465         tbb::detail::d0::try_call( [&] {
466             res = std::forward<Pred>(predicate)();
467         }).on_exception( [&] {
468             cancel_wait(node);
469         });
470 
471         return res;
472     }
473 
474     concurrent_monitor_mutex my_mutex{};
475     base_list my_waitset{};
476     std::atomic<unsigned> my_epoch{};
477 
to_wait_node(base_node * node)478     wait_node<Context>* to_wait_node( base_node* node ) { return static_cast<wait_node<Context>*>(node); }
479 };
480 
481 class concurrent_monitor : public concurrent_monitor_base<std::uintptr_t> {
482     using base_type = concurrent_monitor_base<std::uintptr_t>;
483 public:
484     using base_type::base_type;
485 
~concurrent_monitor()486     ~concurrent_monitor() {
487         destroy();
488     }
489 
490     /** per-thread descriptor for concurrent_monitor */
491     using thread_context = sleep_node<std::uintptr_t>;
492 };
493 
494 } // namespace r1
495 } // namespace detail
496 } // namespace tbb
497 
498 #endif /* __TBB_concurrent_monitor_H */
499