1 /* 2 Copyright (c) 2005-2020 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #ifndef __TBB_concurrent_queue_H 18 #define __TBB_concurrent_queue_H 19 20 #include "detail/_concurrent_queue_base.h" 21 #include "detail/_allocator_traits.h" 22 #include "detail/_exception.h" 23 #include "cache_aligned_allocator.h" 24 25 namespace tbb { 26 namespace detail { 27 namespace d1 { 28 29 // A high-performance thread-safe non-blocking concurrent queue. 30 // Multiple threads may each push and pop concurrently. 31 // Assignment construction is not allowed. 32 template <typename T, typename Allocator = tbb::cache_aligned_allocator<T>> 33 class concurrent_queue { 34 using allocator_traits_type = tbb::detail::allocator_traits<Allocator>; 35 using queue_representation_type = concurrent_queue_rep<T, Allocator>; 36 using queue_allocator_type = typename allocator_traits_type::template rebind_alloc<queue_representation_type>; 37 using queue_allocator_traits = tbb::detail::allocator_traits<queue_allocator_type>; 38 public: 39 using size_type = std::size_t; 40 using value_type = T; 41 using reference = T&; 42 using const_reference = const T&; 43 using difference_type = std::ptrdiff_t; 44 45 using allocator_type = Allocator; 46 using pointer = typename allocator_traits_type::pointer; 47 using const_pointer = typename allocator_traits_type::const_pointer; 48 49 using iterator = concurrent_queue_iterator<concurrent_queue, T, Allocator>; 50 using const_iterator = concurrent_queue_iterator<concurrent_queue, const T, Allocator>; 51 52 concurrent_queue() : concurrent_queue(allocator_type()) {} 53 54 explicit concurrent_queue(const allocator_type& a) : 55 my_allocator(a), my_queue_representation(nullptr) 56 { 57 my_queue_representation = static_cast<queue_representation_type*>(r1::cache_aligned_allocate(sizeof(queue_representation_type))); 58 queue_allocator_traits::construct(my_allocator, my_queue_representation, my_allocator); 59 60 __TBB_ASSERT(is_aligned(my_queue_representation, max_nfs_size), "alignment error" ); 61 __TBB_ASSERT(is_aligned(&my_queue_representation->head_counter, max_nfs_size), "alignment error" ); 62 __TBB_ASSERT(is_aligned(&my_queue_representation->tail_counter, max_nfs_size), "alignment error" ); 63 __TBB_ASSERT(is_aligned(&my_queue_representation->array, max_nfs_size), "alignment error" ); 64 } 65 66 template <typename InputIterator> 67 concurrent_queue(InputIterator begin, InputIterator end, const allocator_type& a = allocator_type()) : 68 concurrent_queue(a) 69 { 70 for (; begin != end; ++begin) 71 push(*begin); 72 } 73 74 concurrent_queue(const concurrent_queue& src, const allocator_type& a) : 75 concurrent_queue(a) 76 { 77 my_queue_representation->assign(*src.my_queue_representation, copy_construct_item); 78 } 79 80 concurrent_queue(const concurrent_queue& src) : 81 concurrent_queue(queue_allocator_traits::select_on_container_copy_construction(src.get_allocator())) 82 { 83 my_queue_representation->assign(*src.my_queue_representation, copy_construct_item); 84 } 85 86 // Move constructors 87 concurrent_queue(concurrent_queue&& src) : 88 concurrent_queue(std::move(src.my_allocator)) 89 { 90 internal_swap(src); 91 } 92 93 concurrent_queue(concurrent_queue&& src, const allocator_type& a) : 94 concurrent_queue(a) 95 { 96 // checking that memory allocated by one instance of allocator can be deallocated 97 // with another 98 if (my_allocator == src.my_allocator) { 99 internal_swap(src); 100 } else { 101 // allocators are different => performing per-element move 102 my_queue_representation->assign(*src.my_queue_representation, move_construct_item); 103 src.clear(); 104 } 105 } 106 107 // Destroy queue 108 ~concurrent_queue() { 109 clear(); 110 my_queue_representation->clear(); 111 queue_allocator_traits::destroy(my_allocator, my_queue_representation); 112 r1::cache_aligned_deallocate(my_queue_representation); 113 } 114 115 // Enqueue an item at tail of queue. 116 void push(const T& value) { 117 internal_push(value); 118 } 119 120 void push(T&& value) { 121 internal_push(std::move(value)); 122 } 123 124 template <typename... Args> 125 void emplace( Args&&... args ) { 126 internal_push(std::forward<Args>(args)...); 127 } 128 129 // Attempt to dequeue an item from head of queue. 130 /** Does not wait for item to become available. 131 Returns true if successful; false otherwise. */ 132 bool try_pop( T& result ) { 133 return internal_try_pop(&result); 134 } 135 136 // Return the number of items in the queue; thread unsafe 137 size_type unsafe_size() const { 138 std::ptrdiff_t size = my_queue_representation->size(); 139 return size < 0 ? 0 : size_type(size); 140 } 141 142 // Equivalent to size()==0. 143 bool empty() const { 144 return my_queue_representation->empty(); 145 } 146 147 // Clear the queue. not thread-safe. 148 void clear() { 149 while (!empty()) { 150 T value; 151 try_pop(value); 152 } 153 } 154 155 // Return allocator object 156 allocator_type get_allocator() const { return my_allocator; } 157 158 //------------------------------------------------------------------------ 159 // The iterators are intended only for debugging. They are slow and not thread safe. 160 //------------------------------------------------------------------------ 161 162 iterator unsafe_begin() { return concurrent_queue_iterator_provider::get<iterator>(*this); } 163 iterator unsafe_end() { return iterator(); } 164 const_iterator unsafe_begin() const { return concurrent_queue_iterator_provider::get<const_iterator>(*this); } 165 const_iterator unsafe_end() const { return const_iterator(); } 166 const_iterator unsafe_cbegin() const { return concurrent_queue_iterator_provider::get<const_iterator>(*this); } 167 const_iterator unsafe_cend() const { return const_iterator(); } 168 169 private: 170 void internal_swap(concurrent_queue& src) { 171 using std::swap; 172 swap(my_queue_representation, src.my_queue_representation); 173 } 174 175 template <typename... Args> 176 void internal_push( Args&&... args ) { 177 ticket_type k = my_queue_representation->tail_counter++; 178 my_queue_representation->choose(k).push(k, *my_queue_representation, std::forward<Args>(args)...); 179 } 180 181 bool internal_try_pop( void* dst ) { 182 ticket_type k; 183 do { 184 k = my_queue_representation->head_counter.load(std::memory_order_relaxed); 185 do { 186 if (static_cast<std::ptrdiff_t>(my_queue_representation->tail_counter.load(std::memory_order_relaxed) - k) <= 0) { 187 // Queue is empty 188 return false; 189 } 190 191 // Queue had item with ticket k when we looked. Attempt to get that item. 192 // Another thread snatched the item, retry. 193 } while (!my_queue_representation->head_counter.compare_exchange_strong(k, k + 1)); 194 } while (!my_queue_representation->choose(k).pop(dst, k, *my_queue_representation)); 195 return true; 196 } 197 198 template <typename Container, typename Value, typename A> 199 friend class concurrent_queue_iterator; 200 201 static void copy_construct_item(T* location, const void* src) { 202 // TODO: use allocator_traits for copy construction 203 new (location) value_type(*static_cast<const value_type*>(src)); 204 // queue_allocator_traits::construct(my_allocator, location, *static_cast<const T*>(src)); 205 } 206 207 static void move_construct_item(T* location, const void* src) { 208 // TODO: use allocator_traits for move construction 209 new (location) value_type(std::move(*static_cast<value_type*>(const_cast<void*>(src)))); 210 } 211 212 queue_allocator_type my_allocator; 213 queue_representation_type* my_queue_representation; 214 }; // class concurrent_queue 215 216 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT 217 // Deduction guide for the constructor from two iterators 218 template <typename InputIterator, 219 typename T = typename std::iterator_traits<InputIterator>::value_type, 220 typename A = tbb::cache_aligned_allocator<T>> 221 concurrent_queue(InputIterator, InputIterator, const A& = A()) 222 ->concurrent_queue<T, A>; 223 #endif /* __TBB_CPP17_DEDUCTION_GUIDES_PRESENT */ 224 225 class concurrent_monitor; 226 227 template <typename FuncType> 228 class delegated_function : public delegate_base { 229 public: 230 delegated_function(FuncType& f) : my_func(f) {} 231 232 bool operator()() const override { 233 return my_func(); 234 } 235 236 private: 237 FuncType &my_func; 238 }; // class delegated_function 239 240 // The concurrent monitor tags for concurrent_bounded_queue. 241 static constexpr std::size_t cbq_slots_avail_tag = 0; 242 static constexpr std::size_t cbq_items_avail_tag = 1; 243 } // namespace d1 244 245 246 namespace r1 { 247 class concurrent_monitor; 248 249 std::uint8_t* __TBB_EXPORTED_FUNC allocate_bounded_queue_rep( std::size_t queue_rep_size ); 250 void __TBB_EXPORTED_FUNC deallocate_bounded_queue_rep( std::uint8_t* mem, std::size_t queue_rep_size ); 251 void __TBB_EXPORTED_FUNC abort_bounded_queue_monitors( concurrent_monitor* monitors ); 252 void __TBB_EXPORTED_FUNC notify_bounded_queue_monitor( concurrent_monitor* monitors, std::size_t monitor_tag 253 , std::size_t ticket ); 254 void __TBB_EXPORTED_FUNC wait_bounded_queue_monitor( concurrent_monitor* monitors, std::size_t monitor_tag, 255 std::ptrdiff_t target, d1::delegate_base& predicate ); 256 } // namespace r1 257 258 259 namespace d1 { 260 // A high-performance thread-safe blocking concurrent bounded queue. 261 // Supports boundedness and blocking semantics. 262 // Multiple threads may each push and pop concurrently. 263 // Assignment construction is not allowed. 264 template <typename T, typename Allocator = tbb::cache_aligned_allocator<T>> 265 class concurrent_bounded_queue { 266 using allocator_traits_type = tbb::detail::allocator_traits<Allocator>; 267 using queue_representation_type = concurrent_queue_rep<T, Allocator>; 268 using queue_allocator_type = typename allocator_traits_type::template rebind_alloc<queue_representation_type>; 269 using queue_allocator_traits = tbb::detail::allocator_traits<queue_allocator_type>; 270 271 template <typename FuncType> 272 void internal_wait(r1::concurrent_monitor* monitors, std::size_t monitor_tag, std::ptrdiff_t target, FuncType pred) { 273 delegated_function<FuncType> func(pred); 274 r1::wait_bounded_queue_monitor(monitors, monitor_tag, target, func); 275 } 276 public: 277 using size_type = std::ptrdiff_t; 278 using value_type = T; 279 using reference = T&; 280 using const_reference = const T&; 281 using difference_type = std::ptrdiff_t; 282 283 using allocator_type = Allocator; 284 using pointer = typename allocator_traits_type::pointer; 285 using const_pointer = typename allocator_traits_type::const_pointer; 286 287 using iterator = concurrent_queue_iterator<concurrent_bounded_queue, T, Allocator>; 288 using const_iterator = concurrent_queue_iterator<concurrent_bounded_queue, const T, Allocator> ; 289 290 concurrent_bounded_queue() : concurrent_bounded_queue(allocator_type()) {} 291 292 explicit concurrent_bounded_queue( const allocator_type& a ) : 293 my_allocator(a), my_capacity(0), my_abort_counter(0), my_queue_representation(nullptr) 294 { 295 my_queue_representation = reinterpret_cast<queue_representation_type*>( 296 r1::allocate_bounded_queue_rep(sizeof(queue_representation_type))); 297 my_monitors = reinterpret_cast<r1::concurrent_monitor*>(my_queue_representation + 1); 298 queue_allocator_traits::construct(my_allocator, my_queue_representation, my_allocator); 299 my_capacity = std::size_t(-1) / (queue_representation_type::item_size > 1 ? queue_representation_type::item_size : 2); 300 301 __TBB_ASSERT(is_aligned(my_queue_representation, max_nfs_size), "alignment error" ); 302 __TBB_ASSERT(is_aligned(&my_queue_representation->head_counter, max_nfs_size), "alignment error" ); 303 __TBB_ASSERT(is_aligned(&my_queue_representation->tail_counter, max_nfs_size), "alignment error" ); 304 __TBB_ASSERT(is_aligned(&my_queue_representation->array, max_nfs_size), "alignment error" ); 305 } 306 307 template <typename InputIterator> 308 concurrent_bounded_queue( InputIterator begin, InputIterator end, const allocator_type& a = allocator_type() ) : 309 concurrent_bounded_queue(a) 310 { 311 for (; begin != end; ++begin) 312 push(*begin); 313 } 314 315 concurrent_bounded_queue( const concurrent_bounded_queue& src, const allocator_type& a ) : 316 concurrent_bounded_queue(a) 317 { 318 my_queue_representation->assign(*src.my_queue_representation, copy_construct_item); 319 } 320 321 concurrent_bounded_queue( const concurrent_bounded_queue& src ) : 322 concurrent_bounded_queue(queue_allocator_traits::select_on_container_copy_construction(src.get_allocator())) 323 { 324 my_queue_representation->assign(*src.my_queue_representation, copy_construct_item); 325 } 326 327 // Move constructors 328 concurrent_bounded_queue( concurrent_bounded_queue&& src ) : 329 concurrent_bounded_queue(std::move(src.my_allocator)) 330 { 331 internal_swap(src); 332 } 333 334 concurrent_bounded_queue( concurrent_bounded_queue&& src, const allocator_type& a ) : 335 concurrent_bounded_queue(a) 336 { 337 // checking that memory allocated by one instance of allocator can be deallocated 338 // with another 339 if (my_allocator == src.my_allocator) { 340 internal_swap(src); 341 } else { 342 // allocators are different => performing per-element move 343 my_queue_representation->assign(*src.my_queue_representation, move_construct_item); 344 src.clear(); 345 } 346 } 347 348 // Destroy queue 349 ~concurrent_bounded_queue() { 350 clear(); 351 my_queue_representation->clear(); 352 queue_allocator_traits::destroy(my_allocator, my_queue_representation); 353 r1::deallocate_bounded_queue_rep(reinterpret_cast<std::uint8_t*>(my_queue_representation), 354 sizeof(queue_representation_type)); 355 } 356 357 // Enqueue an item at tail of queue. 358 void push( const T& value ) { 359 internal_push(value); 360 } 361 362 void push( T&& value ) { 363 internal_push(std::move(value)); 364 } 365 366 // Enqueue an item at tail of queue if queue is not already full. 367 // Does not wait for queue to become not full. 368 // Returns true if item is pushed; false if queue was already full. 369 bool try_push( const T& value ) { 370 return internal_push_if_not_full(value); 371 } 372 373 bool try_push( T&& value ) { 374 return internal_push_if_not_full(std::move(value)); 375 } 376 377 template <typename... Args> 378 void emplace( Args&&... args ) { 379 internal_push(std::forward<Args>(args)...); 380 } 381 382 template <typename... Args> 383 bool try_emplace( Args&&... args ) { 384 return internal_push_if_not_full(std::forward<Args>(args)...); 385 } 386 387 // Attempt to dequeue an item from head of queue. 388 /** Does not wait for item to become available. 389 Returns true if successful; false otherwise. */ 390 bool pop( T& result ) { 391 return internal_pop(&result); 392 } 393 394 bool try_pop( T& result ) { 395 return internal_pop_if_present(&result); 396 } 397 398 void abort() { 399 internal_abort(); 400 } 401 402 // Return the number of items in the queue; thread unsafe 403 std::ptrdiff_t size() const { 404 return my_queue_representation->size(); 405 } 406 407 void set_capacity( size_type new_capacity ) { 408 std::ptrdiff_t c = new_capacity < 0 ? infinite_capacity : new_capacity; 409 my_capacity = c; 410 } 411 412 size_type capacity() const { 413 return my_capacity; 414 } 415 416 // Equivalent to size()==0. 417 bool empty() const { 418 return my_queue_representation->empty(); 419 } 420 421 // Clear the queue. not thread-safe. 422 void clear() { 423 while (!empty()) { 424 T value; 425 try_pop(value); 426 } 427 } 428 429 // Return allocator object 430 allocator_type get_allocator() const { return my_allocator; } 431 432 //------------------------------------------------------------------------ 433 // The iterators are intended only for debugging. They are slow and not thread safe. 434 //------------------------------------------------------------------------ 435 436 iterator unsafe_begin() { return concurrent_queue_iterator_provider::get<iterator>(*this); } 437 iterator unsafe_end() { return iterator(); } 438 const_iterator unsafe_begin() const { return concurrent_queue_iterator_provider::get<const_iterator>(*this); } 439 const_iterator unsafe_end() const { return const_iterator(); } 440 const_iterator unsafe_cbegin() const { return concurrent_queue_iterator_provider::get<const_iterator>(*this); } 441 const_iterator unsafe_cend() const { return const_iterator(); } 442 443 private: 444 void internal_swap( concurrent_bounded_queue& src ) { 445 std::swap(my_queue_representation, src.my_queue_representation); 446 std::swap(my_monitors, src.my_monitors); 447 } 448 449 static constexpr std::ptrdiff_t infinite_capacity = std::ptrdiff_t(~size_type(0) / 2); 450 451 template <typename... Args> 452 void internal_push( Args&&... args ) { 453 unsigned old_abort_counter = my_abort_counter.load(std::memory_order_relaxed); 454 ticket_type ticket = my_queue_representation->tail_counter++; 455 std::ptrdiff_t target = ticket - my_capacity; 456 457 if (static_cast<std::ptrdiff_t>(my_queue_representation->head_counter.load(std::memory_order_relaxed)) <= target) { // queue is full 458 auto pred = [&] { 459 if (my_abort_counter.load(std::memory_order_relaxed) != old_abort_counter) { 460 throw_exception(exception_id::user_abort); 461 } 462 463 return static_cast<std::ptrdiff_t>(my_queue_representation->head_counter.load(std::memory_order_relaxed)) <= target; 464 }; 465 466 try_call( [&] { 467 internal_wait(my_monitors, cbq_slots_avail_tag, target, pred); 468 }).on_exception( [&] { 469 my_queue_representation->choose(ticket).abort_push(ticket, *my_queue_representation); 470 }); 471 472 } 473 __TBB_ASSERT((static_cast<std::ptrdiff_t>(my_queue_representation->head_counter.load(std::memory_order_relaxed)) > target), nullptr); 474 my_queue_representation->choose(ticket).push(ticket, *my_queue_representation, std::forward<Args>(args)...); 475 r1::notify_bounded_queue_monitor(my_monitors, cbq_items_avail_tag, ticket); 476 } 477 478 template <typename... Args> 479 bool internal_push_if_not_full( Args&&... args ) { 480 ticket_type ticket = my_queue_representation->tail_counter.load(std::memory_order_relaxed); 481 do { 482 if (static_cast<std::ptrdiff_t>(ticket - my_queue_representation->head_counter.load(std::memory_order_relaxed)) >= my_capacity) { 483 // Queue is full 484 return false; 485 } 486 // Queue had empty slot with ticket k when we looked. Attempt to claim that slot. 487 // Another thread claimed the slot, so retry. 488 } while (!my_queue_representation->tail_counter.compare_exchange_strong(ticket, ticket + 1)); 489 490 my_queue_representation->choose(ticket).push(ticket, *my_queue_representation, std::forward<Args>(args)...); 491 r1::notify_bounded_queue_monitor(my_monitors, cbq_items_avail_tag, ticket); 492 return true; 493 } 494 495 bool internal_pop( void* dst ) { 496 std::ptrdiff_t target; 497 // This loop is a single pop operation; abort_counter should not be re-read inside 498 unsigned old_abort_counter = my_abort_counter.load(std::memory_order_relaxed); 499 500 do { 501 target = my_queue_representation->head_counter++; 502 if (static_cast<std::ptrdiff_t>(my_queue_representation->tail_counter.load(std::memory_order_relaxed)) <= target) { 503 auto pred = [&] { 504 if (my_abort_counter.load(std::memory_order_relaxed) != old_abort_counter) { 505 throw_exception(exception_id::user_abort); 506 } 507 508 return static_cast<std::ptrdiff_t>(my_queue_representation->tail_counter.load(std::memory_order_relaxed)) <= target; 509 }; 510 511 try_call( [&] { 512 internal_wait(my_monitors, cbq_items_avail_tag, target, pred); 513 }).on_exception( [&] { 514 my_queue_representation->head_counter--; 515 }); 516 } 517 __TBB_ASSERT(static_cast<std::ptrdiff_t>(my_queue_representation->tail_counter.load(std::memory_order_relaxed)) > target, nullptr); 518 } while (!my_queue_representation->choose(target).pop(dst, target, *my_queue_representation)); 519 520 r1::notify_bounded_queue_monitor(my_monitors, cbq_slots_avail_tag, target); 521 return true; 522 } 523 524 bool internal_pop_if_present( void* dst ) { 525 ticket_type ticket; 526 do { 527 ticket = my_queue_representation->head_counter.load(std::memory_order_relaxed); 528 do { 529 if (static_cast<std::ptrdiff_t>(my_queue_representation->tail_counter.load(std::memory_order_relaxed) - ticket) <= 0) { // queue is empty 530 // Queue is empty 531 return false; 532 } 533 // Queue had item with ticket k when we looked. Attempt to get that item. 534 // Another thread snatched the item, retry. 535 } while (!my_queue_representation->head_counter.compare_exchange_strong(ticket, ticket + 1)); 536 } while (!my_queue_representation->choose(ticket).pop(dst, ticket, *my_queue_representation)); 537 538 r1::notify_bounded_queue_monitor(my_monitors, cbq_slots_avail_tag, ticket); 539 return true; 540 } 541 542 void internal_abort() { 543 ++my_abort_counter; 544 r1::abort_bounded_queue_monitors(my_monitors); 545 } 546 547 static void copy_construct_item(T* location, const void* src) { 548 // TODO: use allocator_traits for copy construction 549 new (location) value_type(*static_cast<const value_type*>(src)); 550 } 551 552 static void move_construct_item(T* location, const void* src) { 553 // TODO: use allocator_traits for move construction 554 new (location) value_type(std::move(*static_cast<value_type*>(const_cast<void*>(src)))); 555 } 556 557 template <typename Container, typename Value, typename A> 558 friend class concurrent_queue_iterator; 559 560 queue_allocator_type my_allocator; 561 std::ptrdiff_t my_capacity; 562 std::atomic<unsigned> my_abort_counter; 563 queue_representation_type* my_queue_representation; 564 565 r1::concurrent_monitor* my_monitors; 566 }; // class concurrent_bounded_queue 567 568 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT 569 // Deduction guide for the constructor from two iterators 570 template <typename InputIterator, 571 typename T = typename std::iterator_traits<InputIterator>::value_type, 572 typename A = tbb::cache_aligned_allocator<T>> 573 concurrent_bounded_queue(InputIterator, InputIterator, const A& = A()) 574 ->concurrent_bounded_queue<T, A>; 575 #endif /* __TBB_CPP17_DEDUCTION_GUIDES_PRESENT */ 576 577 } //namespace d1 578 } // namesapce detail 579 580 inline namespace v1 { 581 582 using detail::d1::concurrent_queue; 583 using detail::d1::concurrent_bounded_queue; 584 using detail::r1::user_abort; 585 using detail::r1::bad_last_alloc; 586 587 } // inline namespace v1 588 } // namespace tbb 589 590 #endif // __TBB_concurrent_queue_H 591