1 /* 2 Copyright (c) 2005-2020 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #ifndef __TBB_concurrent_queue_H 18 #define __TBB_concurrent_queue_H 19 20 #include "detail/_namespace_injection.h" 21 #include "detail/_concurrent_queue_base.h" 22 #include "detail/_allocator_traits.h" 23 #include "detail/_exception.h" 24 #include "detail/_containers_helpers.h" 25 #include "cache_aligned_allocator.h" 26 27 namespace tbb { 28 namespace detail { 29 namespace d1 { 30 31 // A high-performance thread-safe non-blocking concurrent queue. 32 // Multiple threads may each push and pop concurrently. 33 // Assignment construction is not allowed. 34 template <typename T, typename Allocator = tbb::cache_aligned_allocator<T>> 35 class concurrent_queue { 36 using allocator_traits_type = tbb::detail::allocator_traits<Allocator>; 37 using queue_representation_type = concurrent_queue_rep<T, Allocator>; 38 using queue_allocator_type = typename allocator_traits_type::template rebind_alloc<queue_representation_type>; 39 using queue_allocator_traits = tbb::detail::allocator_traits<queue_allocator_type>; 40 public: 41 using size_type = std::size_t; 42 using value_type = T; 43 using reference = T&; 44 using const_reference = const T&; 45 using difference_type = std::ptrdiff_t; 46 47 using allocator_type = Allocator; 48 using pointer = typename allocator_traits_type::pointer; 49 using const_pointer = typename allocator_traits_type::const_pointer; 50 51 using iterator = concurrent_queue_iterator<concurrent_queue, T, Allocator>; 52 using const_iterator = concurrent_queue_iterator<concurrent_queue, const T, Allocator>; 53 54 concurrent_queue() : concurrent_queue(allocator_type()) {} 55 56 explicit concurrent_queue(const allocator_type& a) : 57 my_allocator(a), my_queue_representation(nullptr) 58 { 59 my_queue_representation = static_cast<queue_representation_type*>(r1::cache_aligned_allocate(sizeof(queue_representation_type))); 60 queue_allocator_traits::construct(my_allocator, my_queue_representation, my_allocator); 61 62 __TBB_ASSERT(is_aligned(my_queue_representation, max_nfs_size), "alignment error" ); 63 __TBB_ASSERT(is_aligned(&my_queue_representation->head_counter, max_nfs_size), "alignment error" ); 64 __TBB_ASSERT(is_aligned(&my_queue_representation->tail_counter, max_nfs_size), "alignment error" ); 65 __TBB_ASSERT(is_aligned(&my_queue_representation->array, max_nfs_size), "alignment error" ); 66 } 67 68 template <typename InputIterator> 69 concurrent_queue(InputIterator begin, InputIterator end, const allocator_type& a = allocator_type()) : 70 concurrent_queue(a) 71 { 72 for (; begin != end; ++begin) 73 push(*begin); 74 } 75 76 concurrent_queue(const concurrent_queue& src, const allocator_type& a) : 77 concurrent_queue(a) 78 { 79 my_queue_representation->assign(*src.my_queue_representation, copy_construct_item); 80 } 81 82 concurrent_queue(const concurrent_queue& src) : 83 concurrent_queue(queue_allocator_traits::select_on_container_copy_construction(src.get_allocator())) 84 { 85 my_queue_representation->assign(*src.my_queue_representation, copy_construct_item); 86 } 87 88 // Move constructors 89 concurrent_queue(concurrent_queue&& src) : 90 concurrent_queue(std::move(src.my_allocator)) 91 { 92 internal_swap(src); 93 } 94 95 concurrent_queue(concurrent_queue&& src, const allocator_type& a) : 96 concurrent_queue(a) 97 { 98 // checking that memory allocated by one instance of allocator can be deallocated 99 // with another 100 if (my_allocator == src.my_allocator) { 101 internal_swap(src); 102 } else { 103 // allocators are different => performing per-element move 104 my_queue_representation->assign(*src.my_queue_representation, move_construct_item); 105 src.clear(); 106 } 107 } 108 109 // Destroy queue 110 ~concurrent_queue() { 111 clear(); 112 my_queue_representation->clear(); 113 queue_allocator_traits::destroy(my_allocator, my_queue_representation); 114 r1::cache_aligned_deallocate(my_queue_representation); 115 } 116 117 // Enqueue an item at tail of queue. 118 void push(const T& value) { 119 internal_push(value); 120 } 121 122 void push(T&& value) { 123 internal_push(std::move(value)); 124 } 125 126 template <typename... Args> 127 void emplace( Args&&... args ) { 128 internal_push(std::forward<Args>(args)...); 129 } 130 131 // Attempt to dequeue an item from head of queue. 132 /** Does not wait for item to become available. 133 Returns true if successful; false otherwise. */ 134 bool try_pop( T& result ) { 135 return internal_try_pop(&result); 136 } 137 138 // Return the number of items in the queue; thread unsafe 139 size_type unsafe_size() const { 140 std::ptrdiff_t size = my_queue_representation->size(); 141 return size < 0 ? 0 : size_type(size); 142 } 143 144 // Equivalent to size()==0. 145 bool empty() const { 146 return my_queue_representation->empty(); 147 } 148 149 // Clear the queue. not thread-safe. 150 void clear() { 151 while (!empty()) { 152 T value; 153 try_pop(value); 154 } 155 } 156 157 // Return allocator object 158 allocator_type get_allocator() const { return my_allocator; } 159 160 //------------------------------------------------------------------------ 161 // The iterators are intended only for debugging. They are slow and not thread safe. 162 //------------------------------------------------------------------------ 163 164 iterator unsafe_begin() { return concurrent_queue_iterator_provider::get<iterator>(*this); } 165 iterator unsafe_end() { return iterator(); } 166 const_iterator unsafe_begin() const { return concurrent_queue_iterator_provider::get<const_iterator>(*this); } 167 const_iterator unsafe_end() const { return const_iterator(); } 168 const_iterator unsafe_cbegin() const { return concurrent_queue_iterator_provider::get<const_iterator>(*this); } 169 const_iterator unsafe_cend() const { return const_iterator(); } 170 171 private: 172 void internal_swap(concurrent_queue& src) { 173 using std::swap; 174 swap(my_queue_representation, src.my_queue_representation); 175 } 176 177 template <typename... Args> 178 void internal_push( Args&&... args ) { 179 ticket_type k = my_queue_representation->tail_counter++; 180 my_queue_representation->choose(k).push(k, *my_queue_representation, std::forward<Args>(args)...); 181 } 182 183 bool internal_try_pop( void* dst ) { 184 ticket_type k; 185 do { 186 k = my_queue_representation->head_counter.load(std::memory_order_relaxed); 187 do { 188 if (static_cast<std::ptrdiff_t>(my_queue_representation->tail_counter.load(std::memory_order_relaxed) - k) <= 0) { 189 // Queue is empty 190 return false; 191 } 192 193 // Queue had item with ticket k when we looked. Attempt to get that item. 194 // Another thread snatched the item, retry. 195 } while (!my_queue_representation->head_counter.compare_exchange_strong(k, k + 1)); 196 } while (!my_queue_representation->choose(k).pop(dst, k, *my_queue_representation)); 197 return true; 198 } 199 200 template <typename Container, typename Value, typename A> 201 friend class concurrent_queue_iterator; 202 203 static void copy_construct_item(T* location, const void* src) { 204 // TODO: use allocator_traits for copy construction 205 new (location) value_type(*static_cast<const value_type*>(src)); 206 // queue_allocator_traits::construct(my_allocator, location, *static_cast<const T*>(src)); 207 } 208 209 static void move_construct_item(T* location, const void* src) { 210 // TODO: use allocator_traits for move construction 211 new (location) value_type(std::move(*static_cast<value_type*>(const_cast<void*>(src)))); 212 } 213 214 queue_allocator_type my_allocator; 215 queue_representation_type* my_queue_representation; 216 }; // class concurrent_queue 217 218 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT 219 // Deduction guide for the constructor from two iterators 220 template <typename It, typename Alloc = tbb::cache_aligned_allocator<iterator_value_t<It>>, 221 typename = std::enable_if_t<is_input_iterator_v<It>>, 222 typename = std::enable_if_t<is_allocator_v<Alloc>>> 223 concurrent_queue( It, It, Alloc = Alloc() ) 224 -> concurrent_queue<iterator_value_t<It>, Alloc>; 225 226 #endif /* __TBB_CPP17_DEDUCTION_GUIDES_PRESENT */ 227 228 class concurrent_monitor; 229 230 template <typename FuncType> 231 class delegated_function : public delegate_base { 232 public: 233 delegated_function(FuncType& f) : my_func(f) {} 234 235 bool operator()() const override { 236 return my_func(); 237 } 238 239 private: 240 FuncType &my_func; 241 }; // class delegated_function 242 243 // The concurrent monitor tags for concurrent_bounded_queue. 244 static constexpr std::size_t cbq_slots_avail_tag = 0; 245 static constexpr std::size_t cbq_items_avail_tag = 1; 246 } // namespace d1 247 248 249 namespace r1 { 250 class concurrent_monitor; 251 252 std::uint8_t* __TBB_EXPORTED_FUNC allocate_bounded_queue_rep( std::size_t queue_rep_size ); 253 void __TBB_EXPORTED_FUNC deallocate_bounded_queue_rep( std::uint8_t* mem, std::size_t queue_rep_size ); 254 void __TBB_EXPORTED_FUNC abort_bounded_queue_monitors( concurrent_monitor* monitors ); 255 void __TBB_EXPORTED_FUNC notify_bounded_queue_monitor( concurrent_monitor* monitors, std::size_t monitor_tag 256 , std::size_t ticket ); 257 void __TBB_EXPORTED_FUNC wait_bounded_queue_monitor( concurrent_monitor* monitors, std::size_t monitor_tag, 258 std::ptrdiff_t target, d1::delegate_base& predicate ); 259 } // namespace r1 260 261 262 namespace d1 { 263 // A high-performance thread-safe blocking concurrent bounded queue. 264 // Supports boundedness and blocking semantics. 265 // Multiple threads may each push and pop concurrently. 266 // Assignment construction is not allowed. 267 template <typename T, typename Allocator = tbb::cache_aligned_allocator<T>> 268 class concurrent_bounded_queue { 269 using allocator_traits_type = tbb::detail::allocator_traits<Allocator>; 270 using queue_representation_type = concurrent_queue_rep<T, Allocator>; 271 using queue_allocator_type = typename allocator_traits_type::template rebind_alloc<queue_representation_type>; 272 using queue_allocator_traits = tbb::detail::allocator_traits<queue_allocator_type>; 273 274 template <typename FuncType> 275 void internal_wait(r1::concurrent_monitor* monitors, std::size_t monitor_tag, std::ptrdiff_t target, FuncType pred) { 276 delegated_function<FuncType> func(pred); 277 r1::wait_bounded_queue_monitor(monitors, monitor_tag, target, func); 278 } 279 public: 280 using size_type = std::ptrdiff_t; 281 using value_type = T; 282 using reference = T&; 283 using const_reference = const T&; 284 using difference_type = std::ptrdiff_t; 285 286 using allocator_type = Allocator; 287 using pointer = typename allocator_traits_type::pointer; 288 using const_pointer = typename allocator_traits_type::const_pointer; 289 290 using iterator = concurrent_queue_iterator<concurrent_bounded_queue, T, Allocator>; 291 using const_iterator = concurrent_queue_iterator<concurrent_bounded_queue, const T, Allocator> ; 292 293 concurrent_bounded_queue() : concurrent_bounded_queue(allocator_type()) {} 294 295 explicit concurrent_bounded_queue( const allocator_type& a ) : 296 my_allocator(a), my_capacity(0), my_abort_counter(0), my_queue_representation(nullptr) 297 { 298 my_queue_representation = reinterpret_cast<queue_representation_type*>( 299 r1::allocate_bounded_queue_rep(sizeof(queue_representation_type))); 300 my_monitors = reinterpret_cast<r1::concurrent_monitor*>(my_queue_representation + 1); 301 queue_allocator_traits::construct(my_allocator, my_queue_representation, my_allocator); 302 my_capacity = std::size_t(-1) / (queue_representation_type::item_size > 1 ? queue_representation_type::item_size : 2); 303 304 __TBB_ASSERT(is_aligned(my_queue_representation, max_nfs_size), "alignment error" ); 305 __TBB_ASSERT(is_aligned(&my_queue_representation->head_counter, max_nfs_size), "alignment error" ); 306 __TBB_ASSERT(is_aligned(&my_queue_representation->tail_counter, max_nfs_size), "alignment error" ); 307 __TBB_ASSERT(is_aligned(&my_queue_representation->array, max_nfs_size), "alignment error" ); 308 } 309 310 template <typename InputIterator> 311 concurrent_bounded_queue( InputIterator begin, InputIterator end, const allocator_type& a = allocator_type() ) : 312 concurrent_bounded_queue(a) 313 { 314 for (; begin != end; ++begin) 315 push(*begin); 316 } 317 318 concurrent_bounded_queue( const concurrent_bounded_queue& src, const allocator_type& a ) : 319 concurrent_bounded_queue(a) 320 { 321 my_queue_representation->assign(*src.my_queue_representation, copy_construct_item); 322 } 323 324 concurrent_bounded_queue( const concurrent_bounded_queue& src ) : 325 concurrent_bounded_queue(queue_allocator_traits::select_on_container_copy_construction(src.get_allocator())) 326 { 327 my_queue_representation->assign(*src.my_queue_representation, copy_construct_item); 328 } 329 330 // Move constructors 331 concurrent_bounded_queue( concurrent_bounded_queue&& src ) : 332 concurrent_bounded_queue(std::move(src.my_allocator)) 333 { 334 internal_swap(src); 335 } 336 337 concurrent_bounded_queue( concurrent_bounded_queue&& src, const allocator_type& a ) : 338 concurrent_bounded_queue(a) 339 { 340 // checking that memory allocated by one instance of allocator can be deallocated 341 // with another 342 if (my_allocator == src.my_allocator) { 343 internal_swap(src); 344 } else { 345 // allocators are different => performing per-element move 346 my_queue_representation->assign(*src.my_queue_representation, move_construct_item); 347 src.clear(); 348 } 349 } 350 351 // Destroy queue 352 ~concurrent_bounded_queue() { 353 clear(); 354 my_queue_representation->clear(); 355 queue_allocator_traits::destroy(my_allocator, my_queue_representation); 356 r1::deallocate_bounded_queue_rep(reinterpret_cast<std::uint8_t*>(my_queue_representation), 357 sizeof(queue_representation_type)); 358 } 359 360 // Enqueue an item at tail of queue. 361 void push( const T& value ) { 362 internal_push(value); 363 } 364 365 void push( T&& value ) { 366 internal_push(std::move(value)); 367 } 368 369 // Enqueue an item at tail of queue if queue is not already full. 370 // Does not wait for queue to become not full. 371 // Returns true if item is pushed; false if queue was already full. 372 bool try_push( const T& value ) { 373 return internal_push_if_not_full(value); 374 } 375 376 bool try_push( T&& value ) { 377 return internal_push_if_not_full(std::move(value)); 378 } 379 380 template <typename... Args> 381 void emplace( Args&&... args ) { 382 internal_push(std::forward<Args>(args)...); 383 } 384 385 template <typename... Args> 386 bool try_emplace( Args&&... args ) { 387 return internal_push_if_not_full(std::forward<Args>(args)...); 388 } 389 390 // Attempt to dequeue an item from head of queue. 391 /** Does not wait for item to become available. 392 Returns true if successful; false otherwise. */ 393 bool pop( T& result ) { 394 return internal_pop(&result); 395 } 396 397 bool try_pop( T& result ) { 398 return internal_pop_if_present(&result); 399 } 400 401 void abort() { 402 internal_abort(); 403 } 404 405 // Return the number of items in the queue; thread unsafe 406 std::ptrdiff_t size() const { 407 return my_queue_representation->size(); 408 } 409 410 void set_capacity( size_type new_capacity ) { 411 std::ptrdiff_t c = new_capacity < 0 ? infinite_capacity : new_capacity; 412 my_capacity = c; 413 } 414 415 size_type capacity() const { 416 return my_capacity; 417 } 418 419 // Equivalent to size()==0. 420 bool empty() const { 421 return my_queue_representation->empty(); 422 } 423 424 // Clear the queue. not thread-safe. 425 void clear() { 426 while (!empty()) { 427 T value; 428 try_pop(value); 429 } 430 } 431 432 // Return allocator object 433 allocator_type get_allocator() const { return my_allocator; } 434 435 //------------------------------------------------------------------------ 436 // The iterators are intended only for debugging. They are slow and not thread safe. 437 //------------------------------------------------------------------------ 438 439 iterator unsafe_begin() { return concurrent_queue_iterator_provider::get<iterator>(*this); } 440 iterator unsafe_end() { return iterator(); } 441 const_iterator unsafe_begin() const { return concurrent_queue_iterator_provider::get<const_iterator>(*this); } 442 const_iterator unsafe_end() const { return const_iterator(); } 443 const_iterator unsafe_cbegin() const { return concurrent_queue_iterator_provider::get<const_iterator>(*this); } 444 const_iterator unsafe_cend() const { return const_iterator(); } 445 446 private: 447 void internal_swap( concurrent_bounded_queue& src ) { 448 std::swap(my_queue_representation, src.my_queue_representation); 449 std::swap(my_monitors, src.my_monitors); 450 } 451 452 static constexpr std::ptrdiff_t infinite_capacity = std::ptrdiff_t(~size_type(0) / 2); 453 454 template <typename... Args> 455 void internal_push( Args&&... args ) { 456 unsigned old_abort_counter = my_abort_counter.load(std::memory_order_relaxed); 457 ticket_type ticket = my_queue_representation->tail_counter++; 458 std::ptrdiff_t target = ticket - my_capacity; 459 460 if (static_cast<std::ptrdiff_t>(my_queue_representation->head_counter.load(std::memory_order_relaxed)) <= target) { // queue is full 461 auto pred = [&] { 462 if (my_abort_counter.load(std::memory_order_relaxed) != old_abort_counter) { 463 throw_exception(exception_id::user_abort); 464 } 465 466 return static_cast<std::ptrdiff_t>(my_queue_representation->head_counter.load(std::memory_order_relaxed)) <= target; 467 }; 468 469 try_call( [&] { 470 internal_wait(my_monitors, cbq_slots_avail_tag, target, pred); 471 }).on_exception( [&] { 472 my_queue_representation->choose(ticket).abort_push(ticket, *my_queue_representation); 473 }); 474 475 } 476 __TBB_ASSERT((static_cast<std::ptrdiff_t>(my_queue_representation->head_counter.load(std::memory_order_relaxed)) > target), nullptr); 477 my_queue_representation->choose(ticket).push(ticket, *my_queue_representation, std::forward<Args>(args)...); 478 r1::notify_bounded_queue_monitor(my_monitors, cbq_items_avail_tag, ticket); 479 } 480 481 template <typename... Args> 482 bool internal_push_if_not_full( Args&&... args ) { 483 ticket_type ticket = my_queue_representation->tail_counter.load(std::memory_order_relaxed); 484 do { 485 if (static_cast<std::ptrdiff_t>(ticket - my_queue_representation->head_counter.load(std::memory_order_relaxed)) >= my_capacity) { 486 // Queue is full 487 return false; 488 } 489 // Queue had empty slot with ticket k when we looked. Attempt to claim that slot. 490 // Another thread claimed the slot, so retry. 491 } while (!my_queue_representation->tail_counter.compare_exchange_strong(ticket, ticket + 1)); 492 493 my_queue_representation->choose(ticket).push(ticket, *my_queue_representation, std::forward<Args>(args)...); 494 r1::notify_bounded_queue_monitor(my_monitors, cbq_items_avail_tag, ticket); 495 return true; 496 } 497 498 bool internal_pop( void* dst ) { 499 std::ptrdiff_t target; 500 // This loop is a single pop operation; abort_counter should not be re-read inside 501 unsigned old_abort_counter = my_abort_counter.load(std::memory_order_relaxed); 502 503 do { 504 target = my_queue_representation->head_counter++; 505 if (static_cast<std::ptrdiff_t>(my_queue_representation->tail_counter.load(std::memory_order_relaxed)) <= target) { 506 auto pred = [&] { 507 if (my_abort_counter.load(std::memory_order_relaxed) != old_abort_counter) { 508 throw_exception(exception_id::user_abort); 509 } 510 511 return static_cast<std::ptrdiff_t>(my_queue_representation->tail_counter.load(std::memory_order_relaxed)) <= target; 512 }; 513 514 try_call( [&] { 515 internal_wait(my_monitors, cbq_items_avail_tag, target, pred); 516 }).on_exception( [&] { 517 my_queue_representation->head_counter--; 518 }); 519 } 520 __TBB_ASSERT(static_cast<std::ptrdiff_t>(my_queue_representation->tail_counter.load(std::memory_order_relaxed)) > target, nullptr); 521 } while (!my_queue_representation->choose(target).pop(dst, target, *my_queue_representation)); 522 523 r1::notify_bounded_queue_monitor(my_monitors, cbq_slots_avail_tag, target); 524 return true; 525 } 526 527 bool internal_pop_if_present( void* dst ) { 528 ticket_type ticket; 529 do { 530 ticket = my_queue_representation->head_counter.load(std::memory_order_relaxed); 531 do { 532 if (static_cast<std::ptrdiff_t>(my_queue_representation->tail_counter.load(std::memory_order_relaxed) - ticket) <= 0) { // queue is empty 533 // Queue is empty 534 return false; 535 } 536 // Queue had item with ticket k when we looked. Attempt to get that item. 537 // Another thread snatched the item, retry. 538 } while (!my_queue_representation->head_counter.compare_exchange_strong(ticket, ticket + 1)); 539 } while (!my_queue_representation->choose(ticket).pop(dst, ticket, *my_queue_representation)); 540 541 r1::notify_bounded_queue_monitor(my_monitors, cbq_slots_avail_tag, ticket); 542 return true; 543 } 544 545 void internal_abort() { 546 ++my_abort_counter; 547 r1::abort_bounded_queue_monitors(my_monitors); 548 } 549 550 static void copy_construct_item(T* location, const void* src) { 551 // TODO: use allocator_traits for copy construction 552 new (location) value_type(*static_cast<const value_type*>(src)); 553 } 554 555 static void move_construct_item(T* location, const void* src) { 556 // TODO: use allocator_traits for move construction 557 new (location) value_type(std::move(*static_cast<value_type*>(const_cast<void*>(src)))); 558 } 559 560 template <typename Container, typename Value, typename A> 561 friend class concurrent_queue_iterator; 562 563 queue_allocator_type my_allocator; 564 std::ptrdiff_t my_capacity; 565 std::atomic<unsigned> my_abort_counter; 566 queue_representation_type* my_queue_representation; 567 568 r1::concurrent_monitor* my_monitors; 569 }; // class concurrent_bounded_queue 570 571 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT 572 // Deduction guide for the constructor from two iterators 573 template <typename It, typename Alloc = tbb::cache_aligned_allocator<iterator_value_t<It>>> 574 concurrent_bounded_queue( It, It, Alloc = Alloc() ) 575 -> concurrent_bounded_queue<iterator_value_t<It>, Alloc>; 576 577 #endif /* __TBB_CPP17_DEDUCTION_GUIDES_PRESENT */ 578 579 } //namespace d1 580 } // namesapce detail 581 582 inline namespace v1 { 583 584 using detail::d1::concurrent_queue; 585 using detail::d1::concurrent_bounded_queue; 586 using detail::r1::user_abort; 587 using detail::r1::bad_last_alloc; 588 589 } // inline namespace v1 590 } // namespace tbb 591 592 #endif // __TBB_concurrent_queue_H 593