1 /* 2 Copyright (c) 2005-2021 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #ifndef __TBB_concurrent_monitor_H 18 #define __TBB_concurrent_monitor_H 19 20 #include "oneapi/tbb/spin_mutex.h" 21 #include "oneapi/tbb/detail/_exception.h" 22 #include "oneapi/tbb/detail/_aligned_space.h" 23 #include "concurrent_monitor_mutex.h" 24 #include "semaphore.h" 25 26 #include <atomic> 27 28 namespace tbb { 29 namespace detail { 30 namespace r1 { 31 32 //! Circular doubly-linked list with sentinel 33 /** head.next points to the front and head.prev points to the back */ 34 class circular_doubly_linked_list_with_sentinel { 35 public: 36 struct base_node { 37 base_node* next; 38 base_node* prev; 39 40 constexpr base_node(base_node* n, base_node* p) : next(n), prev(p) {} 41 explicit base_node() : next((base_node*)(uintptr_t)0xcdcdcdcd), prev((base_node*)(uintptr_t)0xcdcdcdcd) {} 42 }; 43 44 // ctor 45 constexpr circular_doubly_linked_list_with_sentinel() : count(0), head(&head, &head) {} 46 47 circular_doubly_linked_list_with_sentinel(const circular_doubly_linked_list_with_sentinel&) = delete; 48 circular_doubly_linked_list_with_sentinel& operator=(const circular_doubly_linked_list_with_sentinel&) = delete; 49 50 inline std::size_t size() const { return count.load(std::memory_order_relaxed); } 51 inline bool empty() const { return size() == 0; } 52 inline base_node* front() const { return head.next; } 53 inline base_node* last() const { return head.prev; } 54 inline const base_node* end() const { return &head; } 55 56 //! add to the back of the list 57 inline void add( base_node* n ) { 58 count.store(count.load(std::memory_order_relaxed) + 1, std::memory_order_relaxed); 59 n->prev = head.prev; 60 n->next = &head; 61 head.prev->next = n; 62 head.prev = n; 63 } 64 65 //! remove node 'n' 66 inline void remove( base_node& n ) { 67 __TBB_ASSERT(count.load(std::memory_order_relaxed) > 0, "attempt to remove an item from an empty list"); 68 count.store(count.load( std::memory_order_relaxed ) - 1, std::memory_order_relaxed); 69 n.prev->next = n.next; 70 n.next->prev = n.prev; 71 } 72 73 //! move all elements to 'lst' and initialize the 'this' list 74 inline void flush_to( circular_doubly_linked_list_with_sentinel& lst ) { 75 const std::size_t l_count = size(); 76 if (l_count > 0) { 77 lst.count.store(l_count, std::memory_order_relaxed); 78 lst.head.next = head.next; 79 lst.head.prev = head.prev; 80 head.next->prev = &lst.head; 81 head.prev->next = &lst.head; 82 clear(); 83 } 84 } 85 86 void clear() { 87 head.next = &head; 88 head.prev = &head; 89 count.store(0, std::memory_order_relaxed); 90 } 91 private: 92 std::atomic<std::size_t> count; 93 base_node head; 94 }; 95 96 using base_list = circular_doubly_linked_list_with_sentinel; 97 using base_node = circular_doubly_linked_list_with_sentinel::base_node; 98 99 template <typename Context> 100 class concurrent_monitor_base; 101 102 template <typename Context> 103 class wait_node : public base_node { 104 public: 105 106 #if __TBB_GLIBCXX_VERSION >= 40800 && __TBB_GLIBCXX_VERSION < 40900 107 wait_node(Context ctx) : my_context(ctx), my_is_in_list(false) {} 108 #else 109 wait_node(Context ctx) : my_context(ctx) {} 110 #endif 111 112 virtual ~wait_node() = default; 113 114 virtual void init() { 115 __TBB_ASSERT(!my_initialized, nullptr); 116 my_initialized = true; 117 } 118 119 virtual void wait() = 0; 120 121 virtual void reset() { 122 __TBB_ASSERT(my_skipped_wakeup, nullptr); 123 my_skipped_wakeup = false; 124 } 125 126 virtual void notify() = 0; 127 128 protected: 129 friend class concurrent_monitor_base<Context>; 130 friend class thread_data; 131 132 Context my_context{}; 133 #if __TBB_GLIBCXX_VERSION >= 40800 && __TBB_GLIBCXX_VERSION < 40900 134 std::atomic<bool> my_is_in_list; 135 #else 136 std::atomic<bool> my_is_in_list{false}; 137 #endif 138 139 bool my_initialized{false}; 140 bool my_skipped_wakeup{false}; 141 bool my_aborted{false}; 142 unsigned my_epoch{0}; 143 }; 144 145 template <typename Context> 146 class sleep_node : public wait_node<Context> { 147 using base_type = wait_node<Context>; 148 public: 149 using base_type::base_type; 150 151 ~sleep_node() override { 152 if (this->my_initialized) { 153 if (this->my_skipped_wakeup) semaphore().P(); 154 semaphore().~binary_semaphore(); 155 } 156 } 157 158 binary_semaphore& semaphore() { return *sema.begin(); } 159 160 void init() override { 161 if (!this->my_initialized) { 162 new (sema.begin()) binary_semaphore; 163 base_type::init(); 164 } 165 } 166 167 void wait() override { 168 __TBB_ASSERT(this->my_initialized, 169 "Use of commit_wait() without prior prepare_wait()"); 170 semaphore().P(); 171 __TBB_ASSERT(!this->my_is_in_list.load(std::memory_order_relaxed), "Still in the queue?"); 172 if (this->my_aborted) 173 throw_exception(exception_id::user_abort); 174 } 175 176 void reset() override { 177 base_type::reset(); 178 semaphore().P(); 179 } 180 181 void notify() override { 182 semaphore().V(); 183 } 184 185 private: 186 tbb::detail::aligned_space<binary_semaphore> sema; 187 }; 188 189 //! concurrent_monitor 190 /** fine-grained concurrent_monitor implementation */ 191 template <typename Context> 192 class concurrent_monitor_base { 193 public: 194 //! ctor 195 constexpr concurrent_monitor_base() {} 196 //! dtor 197 ~concurrent_monitor_base() = default; 198 199 concurrent_monitor_base(const concurrent_monitor_base&) = delete; 200 concurrent_monitor_base& operator=(const concurrent_monitor_base&) = delete; 201 202 //! prepare wait by inserting 'thr' into the wait queue 203 void prepare_wait( wait_node<Context>& node) { 204 // TODO: consider making even more lazy instantiation of the semaphore, that is only when it is actually needed, e.g. move it in node::wait() 205 if (!node.my_initialized) { 206 node.init(); 207 } 208 // this is good place to pump previous skipped wakeup 209 else if (node.my_skipped_wakeup) { 210 node.reset(); 211 } 212 213 node.my_is_in_list.store(true, std::memory_order_relaxed); 214 215 { 216 concurrent_monitor_mutex::scoped_lock l(my_mutex); 217 node.my_epoch = my_epoch.load(std::memory_order_relaxed); 218 my_waitset.add(&node); 219 } 220 221 // Prepare wait guarantees Write Read memory barrier. 222 // In C++ only full fence covers this type of barrier. 223 atomic_fence_seq_cst(); 224 } 225 226 //! Commit wait if event count has not changed; otherwise, cancel wait. 227 /** Returns true if committed, false if canceled. */ 228 inline bool commit_wait( wait_node<Context>& node ) { 229 const bool do_it = node.my_epoch == my_epoch.load(std::memory_order_relaxed); 230 // this check is just an optimization 231 if (do_it) { 232 node.wait(); 233 } else { 234 cancel_wait( node ); 235 } 236 return do_it; 237 } 238 239 //! Cancel the wait. Removes the thread from the wait queue if not removed yet. 240 void cancel_wait( wait_node<Context>& node ) { 241 // possible skipped wakeup will be pumped in the following prepare_wait() 242 node.my_skipped_wakeup = true; 243 // try to remove node from waitset 244 // Cancel wait guarantees acquire memory barrier. 245 bool in_list = node.my_is_in_list.load(std::memory_order_acquire); 246 if (in_list) { 247 concurrent_monitor_mutex::scoped_lock l(my_mutex); 248 if (node.my_is_in_list.load(std::memory_order_relaxed)) { 249 my_waitset.remove(node); 250 // node is removed from waitset, so there will be no wakeup 251 node.my_is_in_list.store(false, std::memory_order_relaxed); 252 node.my_skipped_wakeup = false; 253 } 254 } 255 } 256 257 //! Wait for a condition to be satisfied with waiting-on my_context 258 template <typename NodeType, typename Pred> 259 bool wait(Pred&& pred, NodeType&& node) { 260 prepare_wait(node); 261 while (!guarded_call(std::forward<Pred>(pred), node)) { 262 if (commit_wait(node)) { 263 return true; 264 } 265 266 prepare_wait(node); 267 } 268 269 cancel_wait(node); 270 return false; 271 } 272 273 //! Notify one thread about the event 274 void notify_one() { 275 atomic_fence_seq_cst(); 276 notify_one_relaxed(); 277 } 278 279 //! Notify one thread about the event. Relaxed version. 280 void notify_one_relaxed() { 281 if (my_waitset.empty()) { 282 return; 283 } 284 285 base_node* n; 286 const base_node* end = my_waitset.end(); 287 { 288 concurrent_monitor_mutex::scoped_lock l(my_mutex); 289 my_epoch.store(my_epoch.load(std::memory_order_relaxed) + 1, std::memory_order_relaxed); 290 n = my_waitset.front(); 291 if (n != end) { 292 my_waitset.remove(*n); 293 to_wait_node(n)->my_is_in_list.store(false, std::memory_order_relaxed); 294 } 295 } 296 297 if (n != end) { 298 to_wait_node(n)->notify(); 299 } 300 } 301 302 //! Notify all waiting threads of the event 303 void notify_all() { 304 atomic_fence_seq_cst(); 305 notify_all_relaxed(); 306 } 307 308 // ! Notify all waiting threads of the event; Relaxed version 309 void notify_all_relaxed() { 310 if (my_waitset.empty()) { 311 return; 312 } 313 314 base_list temp; 315 const base_node* end; 316 { 317 concurrent_monitor_mutex::scoped_lock l(my_mutex); 318 my_epoch.store(my_epoch.load(std::memory_order_relaxed) + 1, std::memory_order_relaxed); 319 // TODO: Possible optimization, don't change node state under lock, just do flush 320 my_waitset.flush_to(temp); 321 end = temp.end(); 322 for (base_node* n = temp.front(); n != end; n = n->next) { 323 to_wait_node(n)->my_is_in_list.store(false, std::memory_order_relaxed); 324 } 325 } 326 327 base_node* nxt; 328 for (base_node* n = temp.front(); n != end; n=nxt) { 329 nxt = n->next; 330 to_wait_node(n)->notify(); 331 } 332 #if TBB_USE_ASSERT 333 temp.clear(); 334 #endif 335 } 336 337 //! Notify waiting threads of the event that satisfies the given predicate 338 template <typename P> 339 void notify( const P& predicate ) { 340 atomic_fence_seq_cst(); 341 notify_relaxed( predicate ); 342 } 343 344 //! Notify waiting threads of the event that satisfies the given predicate; 345 //! the predicate is called under the lock. Relaxed version. 346 template<typename P> 347 void notify_relaxed( const P& predicate ) { 348 if (my_waitset.empty()) { 349 return; 350 } 351 352 base_list temp; 353 base_node* nxt; 354 const base_node* end = my_waitset.end(); 355 { 356 concurrent_monitor_mutex::scoped_lock l(my_mutex); 357 my_epoch.store(my_epoch.load( std::memory_order_relaxed ) + 1, std::memory_order_relaxed); 358 for (base_node* n = my_waitset.last(); n != end; n = nxt) { 359 nxt = n->prev; 360 auto* node = static_cast<wait_node<Context>*>(n); 361 if (predicate(node->my_context)) { 362 my_waitset.remove(*n); 363 node->my_is_in_list.store(false, std::memory_order_relaxed); 364 temp.add(n); 365 } 366 } 367 } 368 369 end = temp.end(); 370 for (base_node* n=temp.front(); n != end; n = nxt) { 371 nxt = n->next; 372 to_wait_node(n)->notify(); 373 } 374 #if TBB_USE_ASSERT 375 temp.clear(); 376 #endif 377 } 378 379 //! Notify waiting threads of the event that satisfies the given predicate; 380 //! the predicate is called under the lock. Relaxed version. 381 template<typename P> 382 void notify_one_relaxed( const P& predicate ) { 383 if (my_waitset.empty()) { 384 return; 385 } 386 387 base_node* tmp = nullptr; 388 base_node* next{}; 389 const base_node* end = my_waitset.end(); 390 { 391 concurrent_monitor_mutex::scoped_lock l(my_mutex); 392 my_epoch.store(my_epoch.load( std::memory_order_relaxed ) + 1, std::memory_order_relaxed); 393 for (base_node* n = my_waitset.last(); n != end; n = next) { 394 next = n->prev; 395 auto* node = static_cast<wait_node<Context>*>(n); 396 if (predicate(node->my_context)) { 397 my_waitset.remove(*n); 398 node->my_is_in_list.store(false, std::memory_order_relaxed); 399 tmp = n; 400 break; 401 } 402 } 403 } 404 405 if (tmp) { 406 to_wait_node(tmp)->notify(); 407 } 408 } 409 410 //! Abort any sleeping threads at the time of the call 411 void abort_all() { 412 atomic_fence_seq_cst(); 413 abort_all_relaxed(); 414 } 415 416 //! Abort any sleeping threads at the time of the call; Relaxed version 417 void abort_all_relaxed() { 418 if (my_waitset.empty()) { 419 return; 420 } 421 422 base_list temp; 423 const base_node* end; 424 { 425 concurrent_monitor_mutex::scoped_lock l(my_mutex); 426 my_epoch.store(my_epoch.load(std::memory_order_relaxed) + 1, std::memory_order_relaxed); 427 my_waitset.flush_to(temp); 428 end = temp.end(); 429 for (base_node* n = temp.front(); n != end; n = n->next) { 430 to_wait_node(n)->my_is_in_list.store(false, std::memory_order_relaxed); 431 } 432 } 433 434 base_node* nxt; 435 for (base_node* n = temp.front(); n != end; n = nxt) { 436 nxt = n->next; 437 to_wait_node(n)->my_aborted = true; 438 to_wait_node(n)->notify(); 439 } 440 #if TBB_USE_ASSERT 441 temp.clear(); 442 #endif 443 } 444 445 void destroy() { 446 this->abort_all(); 447 my_mutex.destroy(); 448 __TBB_ASSERT(this->my_waitset.empty(), "waitset not empty?"); 449 } 450 451 private: 452 template <typename NodeType, typename Pred> 453 bool guarded_call(Pred&& predicate, NodeType& node) { 454 bool res = false; 455 tbb::detail::d0::try_call( [&] { 456 res = std::forward<Pred>(predicate)(); 457 }).on_exception( [&] { 458 cancel_wait(node); 459 }); 460 461 return res; 462 } 463 464 concurrent_monitor_mutex my_mutex{}; 465 base_list my_waitset{}; 466 std::atomic<unsigned> my_epoch{}; 467 468 wait_node<Context>* to_wait_node( base_node* node ) { return static_cast<wait_node<Context>*>(node); } 469 }; 470 471 class concurrent_monitor : public concurrent_monitor_base<std::uintptr_t> { 472 using base_type = concurrent_monitor_base<std::uintptr_t>; 473 public: 474 using base_type::base_type; 475 476 ~concurrent_monitor() { 477 destroy(); 478 } 479 480 /** per-thread descriptor for concurrent_monitor */ 481 using thread_context = sleep_node<std::uintptr_t>; 482 }; 483 484 } // namespace r1 485 } // namespace detail 486 } // namespace tbb 487 488 #endif /* __TBB_concurrent_monitor_H */ 489