1 /* 2 Copyright (c) 2005-2023 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #ifndef __TBB_concurrent_monitor_H 18 #define __TBB_concurrent_monitor_H 19 20 #include "oneapi/tbb/spin_mutex.h" 21 #include "oneapi/tbb/detail/_exception.h" 22 #include "oneapi/tbb/detail/_aligned_space.h" 23 #include "concurrent_monitor_mutex.h" 24 #include "semaphore.h" 25 26 #include <atomic> 27 28 namespace tbb { 29 namespace detail { 30 namespace r1 { 31 32 //! Circular doubly-linked list with sentinel 33 /** head.next points to the front and head.prev points to the back */ 34 class circular_doubly_linked_list_with_sentinel { 35 public: 36 struct base_node { 37 base_node* next; 38 base_node* prev; 39 base_nodebase_node40 constexpr base_node(base_node* n, base_node* p) : next(n), prev(p) {} base_nodebase_node41 explicit base_node() : next((base_node*)(uintptr_t)0xcdcdcdcd), prev((base_node*)(uintptr_t)0xcdcdcdcd) {} 42 }; 43 44 // ctor circular_doubly_linked_list_with_sentinel()45 constexpr circular_doubly_linked_list_with_sentinel() : count(0), head(&head, &head) {} 46 47 circular_doubly_linked_list_with_sentinel(const circular_doubly_linked_list_with_sentinel&) = delete; 48 circular_doubly_linked_list_with_sentinel& operator=(const circular_doubly_linked_list_with_sentinel&) = delete; 49 size()50 inline std::size_t size() const { return count.load(std::memory_order_relaxed); } empty()51 inline bool empty() const { return size() == 0; } front()52 inline base_node* front() const { return head.next; } last()53 inline base_node* last() const { return head.prev; } end()54 inline const base_node* end() const { return &head; } 55 56 //! add to the back of the list add(base_node * n)57 inline void add( base_node* n ) { 58 count.store(count.load(std::memory_order_relaxed) + 1, std::memory_order_relaxed); 59 n->prev = head.prev; 60 n->next = &head; 61 head.prev->next = n; 62 head.prev = n; 63 } 64 65 //! remove node 'n' remove(base_node & n)66 inline void remove( base_node& n ) { 67 __TBB_ASSERT(count.load(std::memory_order_relaxed) > 0, "attempt to remove an item from an empty list"); 68 count.store(count.load( std::memory_order_relaxed ) - 1, std::memory_order_relaxed); 69 n.prev->next = n.next; 70 n.next->prev = n.prev; 71 } 72 73 //! move all elements to 'lst' and initialize the 'this' list flush_to(circular_doubly_linked_list_with_sentinel & lst)74 inline void flush_to( circular_doubly_linked_list_with_sentinel& lst ) { 75 const std::size_t l_count = size(); 76 if (l_count > 0) { 77 lst.count.store(l_count, std::memory_order_relaxed); 78 lst.head.next = head.next; 79 lst.head.prev = head.prev; 80 head.next->prev = &lst.head; 81 head.prev->next = &lst.head; 82 clear(); 83 } 84 } 85 clear()86 void clear() { 87 head.next = &head; 88 head.prev = &head; 89 count.store(0, std::memory_order_relaxed); 90 } 91 private: 92 std::atomic<std::size_t> count; 93 base_node head; 94 }; 95 96 using base_list = circular_doubly_linked_list_with_sentinel; 97 using base_node = circular_doubly_linked_list_with_sentinel::base_node; 98 99 template <typename Context> 100 class concurrent_monitor_base; 101 102 template <typename Context> 103 class wait_node : public base_node { 104 public: 105 106 #if __TBB_GLIBCXX_VERSION >= 40800 && __TBB_GLIBCXX_VERSION < 40900 wait_node(Context ctx)107 wait_node(Context ctx) : my_context(ctx), my_is_in_list(false) {} 108 #else 109 wait_node(Context ctx) : my_context(ctx) {} 110 #endif 111 112 virtual ~wait_node() = default; 113 init()114 virtual void init() { 115 __TBB_ASSERT(!my_initialized, nullptr); 116 my_initialized = true; 117 } 118 119 virtual void wait() = 0; 120 reset()121 virtual void reset() { 122 __TBB_ASSERT(my_skipped_wakeup, nullptr); 123 my_skipped_wakeup = false; 124 } 125 126 virtual void notify() = 0; 127 128 protected: 129 friend class concurrent_monitor_base<Context>; 130 friend class thread_data; 131 132 Context my_context{}; 133 #if __TBB_GLIBCXX_VERSION >= 40800 && __TBB_GLIBCXX_VERSION < 40900 134 std::atomic<bool> my_is_in_list; 135 #else 136 std::atomic<bool> my_is_in_list{false}; 137 #endif 138 139 bool my_initialized{false}; 140 bool my_skipped_wakeup{false}; 141 bool my_aborted{false}; 142 unsigned my_epoch{0}; 143 }; 144 145 template <typename Context> 146 class sleep_node : public wait_node<Context> { 147 using base_type = wait_node<Context>; 148 public: 149 using base_type::base_type; 150 ~sleep_node()151 ~sleep_node() override { 152 if (this->my_initialized) { 153 if (this->my_skipped_wakeup) semaphore().P(); 154 semaphore().~binary_semaphore(); 155 } 156 } 157 semaphore()158 binary_semaphore& semaphore() { return *sema.begin(); } 159 init()160 void init() override { 161 if (!this->my_initialized) { 162 new (sema.begin()) binary_semaphore; 163 base_type::init(); 164 } 165 } 166 wait()167 void wait() override { 168 __TBB_ASSERT(this->my_initialized, 169 "Use of commit_wait() without prior prepare_wait()"); 170 semaphore().P(); 171 __TBB_ASSERT(!this->my_is_in_list.load(std::memory_order_relaxed), "Still in the queue?"); 172 if (this->my_aborted) 173 throw_exception(exception_id::user_abort); 174 } 175 reset()176 void reset() override { 177 base_type::reset(); 178 semaphore().P(); 179 } 180 notify()181 void notify() override { 182 semaphore().V(); 183 } 184 185 private: 186 tbb::detail::aligned_space<binary_semaphore> sema; 187 }; 188 189 //! concurrent_monitor 190 /** fine-grained concurrent_monitor implementation */ 191 template <typename Context> 192 class concurrent_monitor_base { 193 public: 194 //! ctor concurrent_monitor_base()195 constexpr concurrent_monitor_base() {} 196 //! dtor 197 ~concurrent_monitor_base() = default; 198 199 concurrent_monitor_base(const concurrent_monitor_base&) = delete; 200 concurrent_monitor_base& operator=(const concurrent_monitor_base&) = delete; 201 202 //! prepare wait by inserting 'thr' into the wait queue prepare_wait(wait_node<Context> & node)203 void prepare_wait( wait_node<Context>& node) { 204 // TODO: consider making even more lazy instantiation of the semaphore, that is only when it is actually needed, e.g. move it in node::wait() 205 if (!node.my_initialized) { 206 node.init(); 207 } 208 // this is good place to pump previous skipped wakeup 209 else if (node.my_skipped_wakeup) { 210 node.reset(); 211 } 212 213 node.my_is_in_list.store(true, std::memory_order_relaxed); 214 215 { 216 concurrent_monitor_mutex::scoped_lock l(my_mutex); 217 node.my_epoch = my_epoch.load(std::memory_order_relaxed); 218 my_waitset.add(&node); 219 } 220 221 // Prepare wait guarantees Write Read memory barrier. 222 // In C++ only full fence covers this type of barrier. 223 atomic_fence_seq_cst(); 224 } 225 226 //! Commit wait if event count has not changed; otherwise, cancel wait. 227 /** Returns true if committed, false if canceled. */ commit_wait(wait_node<Context> & node)228 inline bool commit_wait( wait_node<Context>& node ) { 229 const bool do_it = node.my_epoch == my_epoch.load(std::memory_order_relaxed); 230 // this check is just an optimization 231 if (do_it) { 232 node.wait(); 233 } else { 234 cancel_wait( node ); 235 } 236 return do_it; 237 } 238 239 //! Cancel the wait. Removes the thread from the wait queue if not removed yet. cancel_wait(wait_node<Context> & node)240 void cancel_wait( wait_node<Context>& node ) { 241 // possible skipped wakeup will be pumped in the following prepare_wait() 242 node.my_skipped_wakeup = true; 243 // try to remove node from waitset 244 // Cancel wait guarantees acquire memory barrier. 245 bool in_list = node.my_is_in_list.load(std::memory_order_acquire); 246 if (in_list) { 247 concurrent_monitor_mutex::scoped_lock l(my_mutex); 248 if (node.my_is_in_list.load(std::memory_order_relaxed)) { 249 my_waitset.remove(node); 250 // node is removed from waitset, so there will be no wakeup 251 node.my_is_in_list.store(false, std::memory_order_relaxed); 252 node.my_skipped_wakeup = false; 253 } 254 } 255 } 256 257 //! Wait for a condition to be satisfied with waiting-on my_context 258 template <typename NodeType, typename Pred> wait(Pred && pred,NodeType && node)259 bool wait(Pred&& pred, NodeType&& node) { 260 prepare_wait(node); 261 while (!guarded_call(std::forward<Pred>(pred), node)) { 262 if (commit_wait(node)) { 263 return true; 264 } 265 266 prepare_wait(node); 267 } 268 269 cancel_wait(node); 270 return false; 271 } 272 273 //! Notify one thread about the event notify_one()274 void notify_one() { 275 atomic_fence_seq_cst(); 276 notify_one_relaxed(); 277 } 278 279 //! Notify one thread about the event. Relaxed version. notify_one_relaxed()280 void notify_one_relaxed() { 281 if (my_waitset.empty()) { 282 return; 283 } 284 285 base_node* n; 286 const base_node* end = my_waitset.end(); 287 { 288 concurrent_monitor_mutex::scoped_lock l(my_mutex); 289 my_epoch.store(my_epoch.load(std::memory_order_relaxed) + 1, std::memory_order_relaxed); 290 n = my_waitset.front(); 291 if (n != end) { 292 my_waitset.remove(*n); 293 294 // GCC 12.x-13.x issues a warning here that to_wait_node(n)->my_is_in_list might have size 0, since n is 295 // a base_node pointer. (This cannot happen, because only wait_node pointers are added to my_waitset.) 296 #if (__TBB_GCC_VERSION >= 120100 && __TBB_GCC_VERSION < 140000 ) && !__clang__ && !__INTEL_COMPILER 297 #pragma GCC diagnostic push 298 #pragma GCC diagnostic ignored "-Wstringop-overflow" 299 #endif 300 to_wait_node(n)->my_is_in_list.store(false, std::memory_order_relaxed); 301 #if (__TBB_GCC_VERSION >= 120100 && __TBB_GCC_VERSION < 140000 ) && !__clang__ && !__INTEL_COMPILER 302 #pragma GCC diagnostic pop 303 #endif 304 } 305 } 306 307 if (n != end) { 308 to_wait_node(n)->notify(); 309 } 310 } 311 312 //! Notify all waiting threads of the event notify_all()313 void notify_all() { 314 atomic_fence_seq_cst(); 315 notify_all_relaxed(); 316 } 317 318 // ! Notify all waiting threads of the event; Relaxed version notify_all_relaxed()319 void notify_all_relaxed() { 320 if (my_waitset.empty()) { 321 return; 322 } 323 324 base_list temp; 325 const base_node* end; 326 { 327 concurrent_monitor_mutex::scoped_lock l(my_mutex); 328 my_epoch.store(my_epoch.load(std::memory_order_relaxed) + 1, std::memory_order_relaxed); 329 // TODO: Possible optimization, don't change node state under lock, just do flush 330 my_waitset.flush_to(temp); 331 end = temp.end(); 332 for (base_node* n = temp.front(); n != end; n = n->next) { 333 to_wait_node(n)->my_is_in_list.store(false, std::memory_order_relaxed); 334 } 335 } 336 337 base_node* nxt; 338 for (base_node* n = temp.front(); n != end; n=nxt) { 339 nxt = n->next; 340 to_wait_node(n)->notify(); 341 } 342 #if TBB_USE_ASSERT 343 temp.clear(); 344 #endif 345 } 346 347 //! Notify waiting threads of the event that satisfies the given predicate 348 template <typename P> notify(const P & predicate)349 void notify( const P& predicate ) { 350 atomic_fence_seq_cst(); 351 notify_relaxed( predicate ); 352 } 353 354 //! Notify waiting threads of the event that satisfies the given predicate; 355 //! the predicate is called under the lock. Relaxed version. 356 template<typename P> notify_relaxed(const P & predicate)357 void notify_relaxed( const P& predicate ) { 358 if (my_waitset.empty()) { 359 return; 360 } 361 362 base_list temp; 363 base_node* nxt; 364 const base_node* end = my_waitset.end(); 365 { 366 concurrent_monitor_mutex::scoped_lock l(my_mutex); 367 my_epoch.store(my_epoch.load( std::memory_order_relaxed ) + 1, std::memory_order_relaxed); 368 for (base_node* n = my_waitset.last(); n != end; n = nxt) { 369 nxt = n->prev; 370 auto* node = static_cast<wait_node<Context>*>(n); 371 if (predicate(node->my_context)) { 372 my_waitset.remove(*n); 373 node->my_is_in_list.store(false, std::memory_order_relaxed); 374 temp.add(n); 375 } 376 } 377 } 378 379 end = temp.end(); 380 for (base_node* n=temp.front(); n != end; n = nxt) { 381 nxt = n->next; 382 to_wait_node(n)->notify(); 383 } 384 #if TBB_USE_ASSERT 385 temp.clear(); 386 #endif 387 } 388 389 //! Notify waiting threads of the event that satisfies the given predicate; 390 //! the predicate is called under the lock. Relaxed version. 391 template<typename P> notify_one_relaxed(const P & predicate)392 void notify_one_relaxed( const P& predicate ) { 393 if (my_waitset.empty()) { 394 return; 395 } 396 397 base_node* tmp = nullptr; 398 base_node* next{}; 399 const base_node* end = my_waitset.end(); 400 { 401 concurrent_monitor_mutex::scoped_lock l(my_mutex); 402 my_epoch.store(my_epoch.load( std::memory_order_relaxed ) + 1, std::memory_order_relaxed); 403 for (base_node* n = my_waitset.last(); n != end; n = next) { 404 next = n->prev; 405 auto* node = static_cast<wait_node<Context>*>(n); 406 if (predicate(node->my_context)) { 407 my_waitset.remove(*n); 408 node->my_is_in_list.store(false, std::memory_order_relaxed); 409 tmp = n; 410 break; 411 } 412 } 413 } 414 415 if (tmp) { 416 to_wait_node(tmp)->notify(); 417 } 418 } 419 420 //! Abort any sleeping threads at the time of the call abort_all()421 void abort_all() { 422 atomic_fence_seq_cst(); 423 abort_all_relaxed(); 424 } 425 426 //! Abort any sleeping threads at the time of the call; Relaxed version abort_all_relaxed()427 void abort_all_relaxed() { 428 if (my_waitset.empty()) { 429 return; 430 } 431 432 base_list temp; 433 const base_node* end; 434 { 435 concurrent_monitor_mutex::scoped_lock l(my_mutex); 436 my_epoch.store(my_epoch.load(std::memory_order_relaxed) + 1, std::memory_order_relaxed); 437 my_waitset.flush_to(temp); 438 end = temp.end(); 439 for (base_node* n = temp.front(); n != end; n = n->next) { 440 to_wait_node(n)->my_is_in_list.store(false, std::memory_order_relaxed); 441 } 442 } 443 444 base_node* nxt; 445 for (base_node* n = temp.front(); n != end; n = nxt) { 446 nxt = n->next; 447 to_wait_node(n)->my_aborted = true; 448 to_wait_node(n)->notify(); 449 } 450 #if TBB_USE_ASSERT 451 temp.clear(); 452 #endif 453 } 454 destroy()455 void destroy() { 456 this->abort_all(); 457 my_mutex.destroy(); 458 __TBB_ASSERT(this->my_waitset.empty(), "waitset not empty?"); 459 } 460 461 private: 462 template <typename NodeType, typename Pred> guarded_call(Pred && predicate,NodeType & node)463 bool guarded_call(Pred&& predicate, NodeType& node) { 464 bool res = false; 465 tbb::detail::d0::try_call( [&] { 466 res = std::forward<Pred>(predicate)(); 467 }).on_exception( [&] { 468 cancel_wait(node); 469 }); 470 471 return res; 472 } 473 474 concurrent_monitor_mutex my_mutex{}; 475 base_list my_waitset{}; 476 std::atomic<unsigned> my_epoch{}; 477 to_wait_node(base_node * node)478 wait_node<Context>* to_wait_node( base_node* node ) { return static_cast<wait_node<Context>*>(node); } 479 }; 480 481 class concurrent_monitor : public concurrent_monitor_base<std::uintptr_t> { 482 using base_type = concurrent_monitor_base<std::uintptr_t>; 483 public: 484 using base_type::base_type; 485 ~concurrent_monitor()486 ~concurrent_monitor() { 487 destroy(); 488 } 489 490 /** per-thread descriptor for concurrent_monitor */ 491 using thread_context = sleep_node<std::uintptr_t>; 492 }; 493 494 } // namespace r1 495 } // namespace detail 496 } // namespace tbb 497 498 #endif /* __TBB_concurrent_monitor_H */ 499