1 /* 2 Copyright (c) 2005-2020 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #ifndef __TBB_concurrent_queue_H 18 #define __TBB_concurrent_queue_H 19 20 #include "detail/_namespace_injection.h" 21 #include "detail/_concurrent_queue_base.h" 22 #include "detail/_allocator_traits.h" 23 #include "detail/_exception.h" 24 #include "cache_aligned_allocator.h" 25 26 namespace tbb { 27 namespace detail { 28 namespace d1 { 29 30 // A high-performance thread-safe non-blocking concurrent queue. 31 // Multiple threads may each push and pop concurrently. 32 // Assignment construction is not allowed. 33 template <typename T, typename Allocator = tbb::cache_aligned_allocator<T>> 34 class concurrent_queue { 35 using allocator_traits_type = tbb::detail::allocator_traits<Allocator>; 36 using queue_representation_type = concurrent_queue_rep<T, Allocator>; 37 using queue_allocator_type = typename allocator_traits_type::template rebind_alloc<queue_representation_type>; 38 using queue_allocator_traits = tbb::detail::allocator_traits<queue_allocator_type>; 39 public: 40 using size_type = std::size_t; 41 using value_type = T; 42 using reference = T&; 43 using const_reference = const T&; 44 using difference_type = std::ptrdiff_t; 45 46 using allocator_type = Allocator; 47 using pointer = typename allocator_traits_type::pointer; 48 using const_pointer = typename allocator_traits_type::const_pointer; 49 50 using iterator = concurrent_queue_iterator<concurrent_queue, T, Allocator>; 51 using const_iterator = concurrent_queue_iterator<concurrent_queue, const T, Allocator>; 52 53 concurrent_queue() : concurrent_queue(allocator_type()) {} 54 55 explicit concurrent_queue(const allocator_type& a) : 56 my_allocator(a), my_queue_representation(nullptr) 57 { 58 my_queue_representation = static_cast<queue_representation_type*>(r1::cache_aligned_allocate(sizeof(queue_representation_type))); 59 queue_allocator_traits::construct(my_allocator, my_queue_representation, my_allocator); 60 61 __TBB_ASSERT(is_aligned(my_queue_representation, max_nfs_size), "alignment error" ); 62 __TBB_ASSERT(is_aligned(&my_queue_representation->head_counter, max_nfs_size), "alignment error" ); 63 __TBB_ASSERT(is_aligned(&my_queue_representation->tail_counter, max_nfs_size), "alignment error" ); 64 __TBB_ASSERT(is_aligned(&my_queue_representation->array, max_nfs_size), "alignment error" ); 65 } 66 67 template <typename InputIterator> 68 concurrent_queue(InputIterator begin, InputIterator end, const allocator_type& a = allocator_type()) : 69 concurrent_queue(a) 70 { 71 for (; begin != end; ++begin) 72 push(*begin); 73 } 74 75 concurrent_queue(const concurrent_queue& src, const allocator_type& a) : 76 concurrent_queue(a) 77 { 78 my_queue_representation->assign(*src.my_queue_representation, copy_construct_item); 79 } 80 81 concurrent_queue(const concurrent_queue& src) : 82 concurrent_queue(queue_allocator_traits::select_on_container_copy_construction(src.get_allocator())) 83 { 84 my_queue_representation->assign(*src.my_queue_representation, copy_construct_item); 85 } 86 87 // Move constructors 88 concurrent_queue(concurrent_queue&& src) : 89 concurrent_queue(std::move(src.my_allocator)) 90 { 91 internal_swap(src); 92 } 93 94 concurrent_queue(concurrent_queue&& src, const allocator_type& a) : 95 concurrent_queue(a) 96 { 97 // checking that memory allocated by one instance of allocator can be deallocated 98 // with another 99 if (my_allocator == src.my_allocator) { 100 internal_swap(src); 101 } else { 102 // allocators are different => performing per-element move 103 my_queue_representation->assign(*src.my_queue_representation, move_construct_item); 104 src.clear(); 105 } 106 } 107 108 // Destroy queue 109 ~concurrent_queue() { 110 clear(); 111 my_queue_representation->clear(); 112 queue_allocator_traits::destroy(my_allocator, my_queue_representation); 113 r1::cache_aligned_deallocate(my_queue_representation); 114 } 115 116 // Enqueue an item at tail of queue. 117 void push(const T& value) { 118 internal_push(value); 119 } 120 121 void push(T&& value) { 122 internal_push(std::move(value)); 123 } 124 125 template <typename... Args> 126 void emplace( Args&&... args ) { 127 internal_push(std::forward<Args>(args)...); 128 } 129 130 // Attempt to dequeue an item from head of queue. 131 /** Does not wait for item to become available. 132 Returns true if successful; false otherwise. */ 133 bool try_pop( T& result ) { 134 return internal_try_pop(&result); 135 } 136 137 // Return the number of items in the queue; thread unsafe 138 size_type unsafe_size() const { 139 std::ptrdiff_t size = my_queue_representation->size(); 140 return size < 0 ? 0 : size_type(size); 141 } 142 143 // Equivalent to size()==0. 144 bool empty() const { 145 return my_queue_representation->empty(); 146 } 147 148 // Clear the queue. not thread-safe. 149 void clear() { 150 while (!empty()) { 151 T value; 152 try_pop(value); 153 } 154 } 155 156 // Return allocator object 157 allocator_type get_allocator() const { return my_allocator; } 158 159 //------------------------------------------------------------------------ 160 // The iterators are intended only for debugging. They are slow and not thread safe. 161 //------------------------------------------------------------------------ 162 163 iterator unsafe_begin() { return concurrent_queue_iterator_provider::get<iterator>(*this); } 164 iterator unsafe_end() { return iterator(); } 165 const_iterator unsafe_begin() const { return concurrent_queue_iterator_provider::get<const_iterator>(*this); } 166 const_iterator unsafe_end() const { return const_iterator(); } 167 const_iterator unsafe_cbegin() const { return concurrent_queue_iterator_provider::get<const_iterator>(*this); } 168 const_iterator unsafe_cend() const { return const_iterator(); } 169 170 private: 171 void internal_swap(concurrent_queue& src) { 172 using std::swap; 173 swap(my_queue_representation, src.my_queue_representation); 174 } 175 176 template <typename... Args> 177 void internal_push( Args&&... args ) { 178 ticket_type k = my_queue_representation->tail_counter++; 179 my_queue_representation->choose(k).push(k, *my_queue_representation, std::forward<Args>(args)...); 180 } 181 182 bool internal_try_pop( void* dst ) { 183 ticket_type k; 184 do { 185 k = my_queue_representation->head_counter.load(std::memory_order_relaxed); 186 do { 187 if (static_cast<std::ptrdiff_t>(my_queue_representation->tail_counter.load(std::memory_order_relaxed) - k) <= 0) { 188 // Queue is empty 189 return false; 190 } 191 192 // Queue had item with ticket k when we looked. Attempt to get that item. 193 // Another thread snatched the item, retry. 194 } while (!my_queue_representation->head_counter.compare_exchange_strong(k, k + 1)); 195 } while (!my_queue_representation->choose(k).pop(dst, k, *my_queue_representation)); 196 return true; 197 } 198 199 template <typename Container, typename Value, typename A> 200 friend class concurrent_queue_iterator; 201 202 static void copy_construct_item(T* location, const void* src) { 203 // TODO: use allocator_traits for copy construction 204 new (location) value_type(*static_cast<const value_type*>(src)); 205 // queue_allocator_traits::construct(my_allocator, location, *static_cast<const T*>(src)); 206 } 207 208 static void move_construct_item(T* location, const void* src) { 209 // TODO: use allocator_traits for move construction 210 new (location) value_type(std::move(*static_cast<value_type*>(const_cast<void*>(src)))); 211 } 212 213 queue_allocator_type my_allocator; 214 queue_representation_type* my_queue_representation; 215 }; // class concurrent_queue 216 217 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT 218 // Deduction guide for the constructor from two iterators 219 template <typename InputIterator, 220 typename T = typename std::iterator_traits<InputIterator>::value_type, 221 typename A = tbb::cache_aligned_allocator<T>> 222 concurrent_queue(InputIterator, InputIterator, const A& = A()) 223 ->concurrent_queue<T, A>; 224 #endif /* __TBB_CPP17_DEDUCTION_GUIDES_PRESENT */ 225 226 class concurrent_monitor; 227 228 template <typename FuncType> 229 class delegated_function : public delegate_base { 230 public: 231 delegated_function(FuncType& f) : my_func(f) {} 232 233 bool operator()() const override { 234 return my_func(); 235 } 236 237 private: 238 FuncType &my_func; 239 }; // class delegated_function 240 241 // The concurrent monitor tags for concurrent_bounded_queue. 242 static constexpr std::size_t cbq_slots_avail_tag = 0; 243 static constexpr std::size_t cbq_items_avail_tag = 1; 244 } // namespace d1 245 246 247 namespace r1 { 248 class concurrent_monitor; 249 250 std::uint8_t* __TBB_EXPORTED_FUNC allocate_bounded_queue_rep( std::size_t queue_rep_size ); 251 void __TBB_EXPORTED_FUNC deallocate_bounded_queue_rep( std::uint8_t* mem, std::size_t queue_rep_size ); 252 void __TBB_EXPORTED_FUNC abort_bounded_queue_monitors( concurrent_monitor* monitors ); 253 void __TBB_EXPORTED_FUNC notify_bounded_queue_monitor( concurrent_monitor* monitors, std::size_t monitor_tag 254 , std::size_t ticket ); 255 void __TBB_EXPORTED_FUNC wait_bounded_queue_monitor( concurrent_monitor* monitors, std::size_t monitor_tag, 256 std::ptrdiff_t target, d1::delegate_base& predicate ); 257 } // namespace r1 258 259 260 namespace d1 { 261 // A high-performance thread-safe blocking concurrent bounded queue. 262 // Supports boundedness and blocking semantics. 263 // Multiple threads may each push and pop concurrently. 264 // Assignment construction is not allowed. 265 template <typename T, typename Allocator = tbb::cache_aligned_allocator<T>> 266 class concurrent_bounded_queue { 267 using allocator_traits_type = tbb::detail::allocator_traits<Allocator>; 268 using queue_representation_type = concurrent_queue_rep<T, Allocator>; 269 using queue_allocator_type = typename allocator_traits_type::template rebind_alloc<queue_representation_type>; 270 using queue_allocator_traits = tbb::detail::allocator_traits<queue_allocator_type>; 271 272 template <typename FuncType> 273 void internal_wait(r1::concurrent_monitor* monitors, std::size_t monitor_tag, std::ptrdiff_t target, FuncType pred) { 274 delegated_function<FuncType> func(pred); 275 r1::wait_bounded_queue_monitor(monitors, monitor_tag, target, func); 276 } 277 public: 278 using size_type = std::ptrdiff_t; 279 using value_type = T; 280 using reference = T&; 281 using const_reference = const T&; 282 using difference_type = std::ptrdiff_t; 283 284 using allocator_type = Allocator; 285 using pointer = typename allocator_traits_type::pointer; 286 using const_pointer = typename allocator_traits_type::const_pointer; 287 288 using iterator = concurrent_queue_iterator<concurrent_bounded_queue, T, Allocator>; 289 using const_iterator = concurrent_queue_iterator<concurrent_bounded_queue, const T, Allocator> ; 290 291 concurrent_bounded_queue() : concurrent_bounded_queue(allocator_type()) {} 292 293 explicit concurrent_bounded_queue( const allocator_type& a ) : 294 my_allocator(a), my_capacity(0), my_abort_counter(0), my_queue_representation(nullptr) 295 { 296 my_queue_representation = reinterpret_cast<queue_representation_type*>( 297 r1::allocate_bounded_queue_rep(sizeof(queue_representation_type))); 298 my_monitors = reinterpret_cast<r1::concurrent_monitor*>(my_queue_representation + 1); 299 queue_allocator_traits::construct(my_allocator, my_queue_representation, my_allocator); 300 my_capacity = std::size_t(-1) / (queue_representation_type::item_size > 1 ? queue_representation_type::item_size : 2); 301 302 __TBB_ASSERT(is_aligned(my_queue_representation, max_nfs_size), "alignment error" ); 303 __TBB_ASSERT(is_aligned(&my_queue_representation->head_counter, max_nfs_size), "alignment error" ); 304 __TBB_ASSERT(is_aligned(&my_queue_representation->tail_counter, max_nfs_size), "alignment error" ); 305 __TBB_ASSERT(is_aligned(&my_queue_representation->array, max_nfs_size), "alignment error" ); 306 } 307 308 template <typename InputIterator> 309 concurrent_bounded_queue( InputIterator begin, InputIterator end, const allocator_type& a = allocator_type() ) : 310 concurrent_bounded_queue(a) 311 { 312 for (; begin != end; ++begin) 313 push(*begin); 314 } 315 316 concurrent_bounded_queue( const concurrent_bounded_queue& src, const allocator_type& a ) : 317 concurrent_bounded_queue(a) 318 { 319 my_queue_representation->assign(*src.my_queue_representation, copy_construct_item); 320 } 321 322 concurrent_bounded_queue( const concurrent_bounded_queue& src ) : 323 concurrent_bounded_queue(queue_allocator_traits::select_on_container_copy_construction(src.get_allocator())) 324 { 325 my_queue_representation->assign(*src.my_queue_representation, copy_construct_item); 326 } 327 328 // Move constructors 329 concurrent_bounded_queue( concurrent_bounded_queue&& src ) : 330 concurrent_bounded_queue(std::move(src.my_allocator)) 331 { 332 internal_swap(src); 333 } 334 335 concurrent_bounded_queue( concurrent_bounded_queue&& src, const allocator_type& a ) : 336 concurrent_bounded_queue(a) 337 { 338 // checking that memory allocated by one instance of allocator can be deallocated 339 // with another 340 if (my_allocator == src.my_allocator) { 341 internal_swap(src); 342 } else { 343 // allocators are different => performing per-element move 344 my_queue_representation->assign(*src.my_queue_representation, move_construct_item); 345 src.clear(); 346 } 347 } 348 349 // Destroy queue 350 ~concurrent_bounded_queue() { 351 clear(); 352 my_queue_representation->clear(); 353 queue_allocator_traits::destroy(my_allocator, my_queue_representation); 354 r1::deallocate_bounded_queue_rep(reinterpret_cast<std::uint8_t*>(my_queue_representation), 355 sizeof(queue_representation_type)); 356 } 357 358 // Enqueue an item at tail of queue. 359 void push( const T& value ) { 360 internal_push(value); 361 } 362 363 void push( T&& value ) { 364 internal_push(std::move(value)); 365 } 366 367 // Enqueue an item at tail of queue if queue is not already full. 368 // Does not wait for queue to become not full. 369 // Returns true if item is pushed; false if queue was already full. 370 bool try_push( const T& value ) { 371 return internal_push_if_not_full(value); 372 } 373 374 bool try_push( T&& value ) { 375 return internal_push_if_not_full(std::move(value)); 376 } 377 378 template <typename... Args> 379 void emplace( Args&&... args ) { 380 internal_push(std::forward<Args>(args)...); 381 } 382 383 template <typename... Args> 384 bool try_emplace( Args&&... args ) { 385 return internal_push_if_not_full(std::forward<Args>(args)...); 386 } 387 388 // Attempt to dequeue an item from head of queue. 389 /** Does not wait for item to become available. 390 Returns true if successful; false otherwise. */ 391 bool pop( T& result ) { 392 return internal_pop(&result); 393 } 394 395 bool try_pop( T& result ) { 396 return internal_pop_if_present(&result); 397 } 398 399 void abort() { 400 internal_abort(); 401 } 402 403 // Return the number of items in the queue; thread unsafe 404 std::ptrdiff_t size() const { 405 return my_queue_representation->size(); 406 } 407 408 void set_capacity( size_type new_capacity ) { 409 std::ptrdiff_t c = new_capacity < 0 ? infinite_capacity : new_capacity; 410 my_capacity = c; 411 } 412 413 size_type capacity() const { 414 return my_capacity; 415 } 416 417 // Equivalent to size()==0. 418 bool empty() const { 419 return my_queue_representation->empty(); 420 } 421 422 // Clear the queue. not thread-safe. 423 void clear() { 424 while (!empty()) { 425 T value; 426 try_pop(value); 427 } 428 } 429 430 // Return allocator object 431 allocator_type get_allocator() const { return my_allocator; } 432 433 //------------------------------------------------------------------------ 434 // The iterators are intended only for debugging. They are slow and not thread safe. 435 //------------------------------------------------------------------------ 436 437 iterator unsafe_begin() { return concurrent_queue_iterator_provider::get<iterator>(*this); } 438 iterator unsafe_end() { return iterator(); } 439 const_iterator unsafe_begin() const { return concurrent_queue_iterator_provider::get<const_iterator>(*this); } 440 const_iterator unsafe_end() const { return const_iterator(); } 441 const_iterator unsafe_cbegin() const { return concurrent_queue_iterator_provider::get<const_iterator>(*this); } 442 const_iterator unsafe_cend() const { return const_iterator(); } 443 444 private: 445 void internal_swap( concurrent_bounded_queue& src ) { 446 std::swap(my_queue_representation, src.my_queue_representation); 447 std::swap(my_monitors, src.my_monitors); 448 } 449 450 static constexpr std::ptrdiff_t infinite_capacity = std::ptrdiff_t(~size_type(0) / 2); 451 452 template <typename... Args> 453 void internal_push( Args&&... args ) { 454 unsigned old_abort_counter = my_abort_counter.load(std::memory_order_relaxed); 455 ticket_type ticket = my_queue_representation->tail_counter++; 456 std::ptrdiff_t target = ticket - my_capacity; 457 458 if (static_cast<std::ptrdiff_t>(my_queue_representation->head_counter.load(std::memory_order_relaxed)) <= target) { // queue is full 459 auto pred = [&] { 460 if (my_abort_counter.load(std::memory_order_relaxed) != old_abort_counter) { 461 throw_exception(exception_id::user_abort); 462 } 463 464 return static_cast<std::ptrdiff_t>(my_queue_representation->head_counter.load(std::memory_order_relaxed)) <= target; 465 }; 466 467 try_call( [&] { 468 internal_wait(my_monitors, cbq_slots_avail_tag, target, pred); 469 }).on_exception( [&] { 470 my_queue_representation->choose(ticket).abort_push(ticket, *my_queue_representation); 471 }); 472 473 } 474 __TBB_ASSERT((static_cast<std::ptrdiff_t>(my_queue_representation->head_counter.load(std::memory_order_relaxed)) > target), nullptr); 475 my_queue_representation->choose(ticket).push(ticket, *my_queue_representation, std::forward<Args>(args)...); 476 r1::notify_bounded_queue_monitor(my_monitors, cbq_items_avail_tag, ticket); 477 } 478 479 template <typename... Args> 480 bool internal_push_if_not_full( Args&&... args ) { 481 ticket_type ticket = my_queue_representation->tail_counter.load(std::memory_order_relaxed); 482 do { 483 if (static_cast<std::ptrdiff_t>(ticket - my_queue_representation->head_counter.load(std::memory_order_relaxed)) >= my_capacity) { 484 // Queue is full 485 return false; 486 } 487 // Queue had empty slot with ticket k when we looked. Attempt to claim that slot. 488 // Another thread claimed the slot, so retry. 489 } while (!my_queue_representation->tail_counter.compare_exchange_strong(ticket, ticket + 1)); 490 491 my_queue_representation->choose(ticket).push(ticket, *my_queue_representation, std::forward<Args>(args)...); 492 r1::notify_bounded_queue_monitor(my_monitors, cbq_items_avail_tag, ticket); 493 return true; 494 } 495 496 bool internal_pop( void* dst ) { 497 std::ptrdiff_t target; 498 // This loop is a single pop operation; abort_counter should not be re-read inside 499 unsigned old_abort_counter = my_abort_counter.load(std::memory_order_relaxed); 500 501 do { 502 target = my_queue_representation->head_counter++; 503 if (static_cast<std::ptrdiff_t>(my_queue_representation->tail_counter.load(std::memory_order_relaxed)) <= target) { 504 auto pred = [&] { 505 if (my_abort_counter.load(std::memory_order_relaxed) != old_abort_counter) { 506 throw_exception(exception_id::user_abort); 507 } 508 509 return static_cast<std::ptrdiff_t>(my_queue_representation->tail_counter.load(std::memory_order_relaxed)) <= target; 510 }; 511 512 try_call( [&] { 513 internal_wait(my_monitors, cbq_items_avail_tag, target, pred); 514 }).on_exception( [&] { 515 my_queue_representation->head_counter--; 516 }); 517 } 518 __TBB_ASSERT(static_cast<std::ptrdiff_t>(my_queue_representation->tail_counter.load(std::memory_order_relaxed)) > target, nullptr); 519 } while (!my_queue_representation->choose(target).pop(dst, target, *my_queue_representation)); 520 521 r1::notify_bounded_queue_monitor(my_monitors, cbq_slots_avail_tag, target); 522 return true; 523 } 524 525 bool internal_pop_if_present( void* dst ) { 526 ticket_type ticket; 527 do { 528 ticket = my_queue_representation->head_counter.load(std::memory_order_relaxed); 529 do { 530 if (static_cast<std::ptrdiff_t>(my_queue_representation->tail_counter.load(std::memory_order_relaxed) - ticket) <= 0) { // queue is empty 531 // Queue is empty 532 return false; 533 } 534 // Queue had item with ticket k when we looked. Attempt to get that item. 535 // Another thread snatched the item, retry. 536 } while (!my_queue_representation->head_counter.compare_exchange_strong(ticket, ticket + 1)); 537 } while (!my_queue_representation->choose(ticket).pop(dst, ticket, *my_queue_representation)); 538 539 r1::notify_bounded_queue_monitor(my_monitors, cbq_slots_avail_tag, ticket); 540 return true; 541 } 542 543 void internal_abort() { 544 ++my_abort_counter; 545 r1::abort_bounded_queue_monitors(my_monitors); 546 } 547 548 static void copy_construct_item(T* location, const void* src) { 549 // TODO: use allocator_traits for copy construction 550 new (location) value_type(*static_cast<const value_type*>(src)); 551 } 552 553 static void move_construct_item(T* location, const void* src) { 554 // TODO: use allocator_traits for move construction 555 new (location) value_type(std::move(*static_cast<value_type*>(const_cast<void*>(src)))); 556 } 557 558 template <typename Container, typename Value, typename A> 559 friend class concurrent_queue_iterator; 560 561 queue_allocator_type my_allocator; 562 std::ptrdiff_t my_capacity; 563 std::atomic<unsigned> my_abort_counter; 564 queue_representation_type* my_queue_representation; 565 566 r1::concurrent_monitor* my_monitors; 567 }; // class concurrent_bounded_queue 568 569 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT 570 // Deduction guide for the constructor from two iterators 571 template <typename InputIterator, 572 typename T = typename std::iterator_traits<InputIterator>::value_type, 573 typename A = tbb::cache_aligned_allocator<T>> 574 concurrent_bounded_queue(InputIterator, InputIterator, const A& = A()) 575 ->concurrent_bounded_queue<T, A>; 576 #endif /* __TBB_CPP17_DEDUCTION_GUIDES_PRESENT */ 577 578 } //namespace d1 579 } // namesapce detail 580 581 inline namespace v1 { 582 583 using detail::d1::concurrent_queue; 584 using detail::d1::concurrent_bounded_queue; 585 using detail::r1::user_abort; 586 using detail::r1::bad_last_alloc; 587 588 } // inline namespace v1 589 } // namespace tbb 590 591 #endif // __TBB_concurrent_queue_H 592