1 /* 2 Copyright (c) 2005-2023 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #ifndef __TBB_concurrent_queue_H 18 #define __TBB_concurrent_queue_H 19 20 #include "detail/_namespace_injection.h" 21 #include "detail/_concurrent_queue_base.h" 22 #include "detail/_allocator_traits.h" 23 #include "detail/_exception.h" 24 #include "detail/_containers_helpers.h" 25 #include "cache_aligned_allocator.h" 26 27 namespace tbb { 28 namespace detail { 29 namespace d2 { 30 31 template <typename QueueRep, typename Allocator> 32 std::pair<bool, ticket_type> internal_try_pop_impl(void* dst, QueueRep& queue, Allocator& alloc ) { 33 ticket_type ticket{}; 34 do { 35 // Basically, we need to read `head_counter` before `tail_counter`. To achieve it we build happens-before on `head_counter` 36 ticket = queue.head_counter.load(std::memory_order_acquire); 37 do { 38 if (static_cast<std::ptrdiff_t>(queue.tail_counter.load(std::memory_order_relaxed) - ticket) <= 0) { // queue is empty 39 // Queue is empty 40 return { false, ticket }; 41 } 42 // Queue had item with ticket k when we looked. Attempt to get that item. 43 // Another thread snatched the item, retry. 44 } while (!queue.head_counter.compare_exchange_strong(ticket, ticket + 1)); 45 } while (!queue.choose(ticket).pop(dst, ticket, queue, alloc)); 46 return { true, ticket }; 47 } 48 49 // A high-performance thread-safe non-blocking concurrent queue. 50 // Multiple threads may each push and pop concurrently. 51 // Assignment construction is not allowed. 52 template <typename T, typename Allocator = tbb::cache_aligned_allocator<T>> 53 class concurrent_queue { 54 using allocator_traits_type = tbb::detail::allocator_traits<Allocator>; 55 using queue_representation_type = concurrent_queue_rep<T, Allocator>; 56 using queue_allocator_type = typename allocator_traits_type::template rebind_alloc<queue_representation_type>; 57 using queue_allocator_traits = tbb::detail::allocator_traits<queue_allocator_type>; 58 public: 59 using size_type = std::size_t; 60 using value_type = T; 61 using reference = T&; 62 using const_reference = const T&; 63 using difference_type = std::ptrdiff_t; 64 65 using allocator_type = Allocator; 66 using pointer = typename allocator_traits_type::pointer; 67 using const_pointer = typename allocator_traits_type::const_pointer; 68 69 using iterator = concurrent_queue_iterator<concurrent_queue, T, Allocator>; 70 using const_iterator = concurrent_queue_iterator<concurrent_queue, const T, Allocator>; 71 72 concurrent_queue() : concurrent_queue(allocator_type()) {} 73 74 explicit concurrent_queue(const allocator_type& a) : 75 my_allocator(a), my_queue_representation(nullptr) 76 { 77 my_queue_representation = static_cast<queue_representation_type*>(r1::cache_aligned_allocate(sizeof(queue_representation_type))); 78 queue_allocator_traits::construct(my_allocator, my_queue_representation); 79 80 __TBB_ASSERT(is_aligned(my_queue_representation, max_nfs_size), "alignment error" ); 81 __TBB_ASSERT(is_aligned(&my_queue_representation->head_counter, max_nfs_size), "alignment error" ); 82 __TBB_ASSERT(is_aligned(&my_queue_representation->tail_counter, max_nfs_size), "alignment error" ); 83 __TBB_ASSERT(is_aligned(&my_queue_representation->array, max_nfs_size), "alignment error" ); 84 } 85 86 template <typename InputIterator> 87 concurrent_queue(InputIterator begin, InputIterator end, const allocator_type& a = allocator_type()) : 88 concurrent_queue(a) 89 { 90 for (; begin != end; ++begin) 91 push(*begin); 92 } 93 94 concurrent_queue( std::initializer_list<value_type> init, const allocator_type& alloc = allocator_type() ) : 95 concurrent_queue(init.begin(), init.end(), alloc) 96 {} 97 98 concurrent_queue(const concurrent_queue& src, const allocator_type& a) : 99 concurrent_queue(a) 100 { 101 my_queue_representation->assign(*src.my_queue_representation, my_allocator, copy_construct_item); 102 } 103 104 concurrent_queue(const concurrent_queue& src) : 105 concurrent_queue(queue_allocator_traits::select_on_container_copy_construction(src.get_allocator())) 106 { 107 my_queue_representation->assign(*src.my_queue_representation, my_allocator, copy_construct_item); 108 } 109 110 // Move constructors 111 concurrent_queue(concurrent_queue&& src) : 112 concurrent_queue(std::move(src.my_allocator)) 113 { 114 internal_swap(src); 115 } 116 117 concurrent_queue(concurrent_queue&& src, const allocator_type& a) : 118 concurrent_queue(a) 119 { 120 // checking that memory allocated by one instance of allocator can be deallocated 121 // with another 122 if (my_allocator == src.my_allocator) { 123 internal_swap(src); 124 } else { 125 // allocators are different => performing per-element move 126 my_queue_representation->assign(*src.my_queue_representation, my_allocator, move_construct_item); 127 src.clear(); 128 } 129 } 130 131 // Destroy queue 132 ~concurrent_queue() { 133 clear(); 134 my_queue_representation->clear(my_allocator); 135 queue_allocator_traits::destroy(my_allocator, my_queue_representation); 136 r1::cache_aligned_deallocate(my_queue_representation); 137 } 138 139 concurrent_queue& operator=( const concurrent_queue& other ) { 140 //TODO: implement support for std::allocator_traits::propagate_on_container_copy_assignment 141 if (my_queue_representation != other.my_queue_representation) { 142 clear(); 143 my_allocator = other.my_allocator; 144 my_queue_representation->assign(*other.my_queue_representation, my_allocator, copy_construct_item); 145 } 146 return *this; 147 } 148 149 concurrent_queue& operator=( concurrent_queue&& other ) { 150 //TODO: implement support for std::allocator_traits::propagate_on_container_move_assignment 151 if (my_queue_representation != other.my_queue_representation) { 152 clear(); 153 if (my_allocator == other.my_allocator) { 154 internal_swap(other); 155 } else { 156 my_queue_representation->assign(*other.my_queue_representation, other.my_allocator, move_construct_item); 157 other.clear(); 158 my_allocator = std::move(other.my_allocator); 159 } 160 } 161 return *this; 162 } 163 164 concurrent_queue& operator=( std::initializer_list<value_type> init ) { 165 assign(init); 166 return *this; 167 } 168 169 template <typename InputIterator> 170 void assign( InputIterator first, InputIterator last ) { 171 concurrent_queue src(first, last); 172 clear(); 173 my_queue_representation->assign(*src.my_queue_representation, my_allocator, move_construct_item); 174 } 175 176 void assign( std::initializer_list<value_type> init ) { 177 assign(init.begin(), init.end()); 178 } 179 180 void swap ( concurrent_queue& other ) { 181 //TODO: implement support for std::allocator_traits::propagate_on_container_swap 182 __TBB_ASSERT(my_allocator == other.my_allocator, "unequal allocators"); 183 internal_swap(other); 184 } 185 186 // Enqueue an item at tail of queue. 187 void push(const T& value) { 188 internal_push(value); 189 } 190 191 void push(T&& value) { 192 internal_push(std::move(value)); 193 } 194 195 template <typename... Args> 196 void emplace( Args&&... args ) { 197 internal_push(std::forward<Args>(args)...); 198 } 199 200 // Attempt to dequeue an item from head of queue. 201 /** Does not wait for item to become available. 202 Returns true if successful; false otherwise. */ 203 bool try_pop( T& result ) { 204 return internal_try_pop(&result); 205 } 206 207 // Return the number of items in the queue; thread unsafe 208 size_type unsafe_size() const { 209 std::ptrdiff_t size = my_queue_representation->size(); 210 return size < 0 ? 0 : size_type(size); 211 } 212 213 // Equivalent to size()==0. 214 __TBB_nodiscard bool empty() const { 215 return my_queue_representation->empty(); 216 } 217 218 // Clear the queue. not thread-safe. 219 void clear() { 220 my_queue_representation->clear(my_allocator); 221 } 222 223 // Return allocator object 224 allocator_type get_allocator() const { return my_allocator; } 225 226 //------------------------------------------------------------------------ 227 // The iterators are intended only for debugging. They are slow and not thread safe. 228 //------------------------------------------------------------------------ 229 230 iterator unsafe_begin() { return concurrent_queue_iterator_provider::get<iterator>(*this); } 231 iterator unsafe_end() { return iterator(); } 232 const_iterator unsafe_begin() const { return concurrent_queue_iterator_provider::get<const_iterator>(*this); } 233 const_iterator unsafe_end() const { return const_iterator(); } 234 const_iterator unsafe_cbegin() const { return concurrent_queue_iterator_provider::get<const_iterator>(*this); } 235 const_iterator unsafe_cend() const { return const_iterator(); } 236 237 private: 238 void internal_swap(concurrent_queue& src) { 239 using std::swap; 240 swap(my_queue_representation, src.my_queue_representation); 241 } 242 243 template <typename... Args> 244 void internal_push( Args&&... args ) { 245 ticket_type k = my_queue_representation->tail_counter++; 246 my_queue_representation->choose(k).push(k, *my_queue_representation, my_allocator, std::forward<Args>(args)...); 247 } 248 249 bool internal_try_pop( void* dst ) { 250 return internal_try_pop_impl(dst, *my_queue_representation, my_allocator).first; 251 } 252 253 template <typename Container, typename Value, typename A> 254 friend class concurrent_queue_iterator; 255 256 static void copy_construct_item(T* location, const void* src) { 257 // TODO: use allocator_traits for copy construction 258 new (location) value_type(*static_cast<const value_type*>(src)); 259 // queue_allocator_traits::construct(my_allocator, location, *static_cast<const T*>(src)); 260 } 261 262 static void move_construct_item(T* location, const void* src) { 263 // TODO: use allocator_traits for move construction 264 new (location) value_type(std::move(*static_cast<value_type*>(const_cast<void*>(src)))); 265 } 266 267 queue_allocator_type my_allocator; 268 queue_representation_type* my_queue_representation; 269 270 friend void swap( concurrent_queue& lhs, concurrent_queue& rhs ) { 271 lhs.swap(rhs); 272 } 273 274 friend bool operator==( const concurrent_queue& lhs, const concurrent_queue& rhs ) { 275 return lhs.unsafe_size() == rhs.unsafe_size() && std::equal(lhs.unsafe_begin(), lhs.unsafe_end(), rhs.unsafe_begin()); 276 } 277 278 #if !__TBB_CPP20_COMPARISONS_PRESENT 279 friend bool operator!=( const concurrent_queue& lhs, const concurrent_queue& rhs ) { 280 return !(lhs == rhs); 281 } 282 #endif // __TBB_CPP20_COMPARISONS_PRESENT 283 }; // class concurrent_queue 284 285 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT 286 // Deduction guide for the constructor from two iterators 287 template <typename It, typename Alloc = tbb::cache_aligned_allocator<iterator_value_t<It>>, 288 typename = std::enable_if_t<is_input_iterator_v<It>>, 289 typename = std::enable_if_t<is_allocator_v<Alloc>>> 290 concurrent_queue( It, It, Alloc = Alloc() ) 291 -> concurrent_queue<iterator_value_t<It>, Alloc>; 292 293 #endif /* __TBB_CPP17_DEDUCTION_GUIDES_PRESENT */ 294 295 class concurrent_monitor; 296 297 // The concurrent monitor tags for concurrent_bounded_queue. 298 static constexpr std::size_t cbq_slots_avail_tag = 0; 299 static constexpr std::size_t cbq_items_avail_tag = 1; 300 } // namespace d2 301 302 303 namespace r1 { 304 class concurrent_monitor; 305 306 TBB_EXPORT std::uint8_t* __TBB_EXPORTED_FUNC allocate_bounded_queue_rep( std::size_t queue_rep_size ); 307 TBB_EXPORT void __TBB_EXPORTED_FUNC deallocate_bounded_queue_rep( std::uint8_t* mem, std::size_t queue_rep_size ); 308 TBB_EXPORT void __TBB_EXPORTED_FUNC abort_bounded_queue_monitors( concurrent_monitor* monitors ); 309 TBB_EXPORT void __TBB_EXPORTED_FUNC notify_bounded_queue_monitor( concurrent_monitor* monitors, std::size_t monitor_tag 310 , std::size_t ticket ); 311 TBB_EXPORT void __TBB_EXPORTED_FUNC wait_bounded_queue_monitor( concurrent_monitor* monitors, std::size_t monitor_tag, 312 std::ptrdiff_t target, d1::delegate_base& predicate ); 313 } // namespace r1 314 315 316 namespace d2 { 317 // A high-performance thread-safe blocking concurrent bounded queue. 318 // Supports boundedness and blocking semantics. 319 // Multiple threads may each push and pop concurrently. 320 // Assignment construction is not allowed. 321 template <typename T, typename Allocator = tbb::cache_aligned_allocator<T>> 322 class concurrent_bounded_queue { 323 using allocator_traits_type = tbb::detail::allocator_traits<Allocator>; 324 using queue_representation_type = concurrent_queue_rep<T, Allocator>; 325 using queue_allocator_type = typename allocator_traits_type::template rebind_alloc<queue_representation_type>; 326 using queue_allocator_traits = tbb::detail::allocator_traits<queue_allocator_type>; 327 328 template <typename FuncType> 329 void internal_wait(r1::concurrent_monitor* monitors, std::size_t monitor_tag, std::ptrdiff_t target, FuncType pred) { 330 d1::delegated_function<FuncType> func(pred); 331 r1::wait_bounded_queue_monitor(monitors, monitor_tag, target, func); 332 } 333 public: 334 using size_type = std::ptrdiff_t; 335 using value_type = T; 336 using reference = T&; 337 using const_reference = const T&; 338 using difference_type = std::ptrdiff_t; 339 340 using allocator_type = Allocator; 341 using pointer = typename allocator_traits_type::pointer; 342 using const_pointer = typename allocator_traits_type::const_pointer; 343 344 using iterator = concurrent_queue_iterator<concurrent_bounded_queue, T, Allocator>; 345 using const_iterator = concurrent_queue_iterator<concurrent_bounded_queue, const T, Allocator> ; 346 347 concurrent_bounded_queue() : concurrent_bounded_queue(allocator_type()) {} 348 349 explicit concurrent_bounded_queue( const allocator_type& a ) : 350 my_allocator(a), my_capacity(0), my_abort_counter(0), my_queue_representation(nullptr) 351 { 352 my_queue_representation = reinterpret_cast<queue_representation_type*>( 353 r1::allocate_bounded_queue_rep(sizeof(queue_representation_type))); 354 my_monitors = reinterpret_cast<r1::concurrent_monitor*>(my_queue_representation + 1); 355 queue_allocator_traits::construct(my_allocator, my_queue_representation); 356 my_capacity = std::size_t(-1) / (queue_representation_type::item_size > 1 ? queue_representation_type::item_size : 2); 357 358 __TBB_ASSERT(is_aligned(my_queue_representation, max_nfs_size), "alignment error" ); 359 __TBB_ASSERT(is_aligned(&my_queue_representation->head_counter, max_nfs_size), "alignment error" ); 360 __TBB_ASSERT(is_aligned(&my_queue_representation->tail_counter, max_nfs_size), "alignment error" ); 361 __TBB_ASSERT(is_aligned(&my_queue_representation->array, max_nfs_size), "alignment error" ); 362 } 363 364 template <typename InputIterator> 365 concurrent_bounded_queue( InputIterator begin, InputIterator end, const allocator_type& a = allocator_type() ) : 366 concurrent_bounded_queue(a) 367 { 368 for (; begin != end; ++begin) 369 push(*begin); 370 } 371 372 concurrent_bounded_queue( std::initializer_list<value_type> init, const allocator_type& alloc = allocator_type() ): 373 concurrent_bounded_queue(init.begin(), init.end(), alloc) 374 {} 375 376 concurrent_bounded_queue( const concurrent_bounded_queue& src, const allocator_type& a ) : 377 concurrent_bounded_queue(a) 378 { 379 my_queue_representation->assign(*src.my_queue_representation, my_allocator, copy_construct_item); 380 } 381 382 concurrent_bounded_queue( const concurrent_bounded_queue& src ) : 383 concurrent_bounded_queue(queue_allocator_traits::select_on_container_copy_construction(src.get_allocator())) 384 { 385 my_queue_representation->assign(*src.my_queue_representation, my_allocator, copy_construct_item); 386 } 387 388 // Move constructors 389 concurrent_bounded_queue( concurrent_bounded_queue&& src ) : 390 concurrent_bounded_queue(std::move(src.my_allocator)) 391 { 392 internal_swap(src); 393 } 394 395 concurrent_bounded_queue( concurrent_bounded_queue&& src, const allocator_type& a ) : 396 concurrent_bounded_queue(a) 397 { 398 // checking that memory allocated by one instance of allocator can be deallocated 399 // with another 400 if (my_allocator == src.my_allocator) { 401 internal_swap(src); 402 } else { 403 // allocators are different => performing per-element move 404 my_queue_representation->assign(*src.my_queue_representation, my_allocator, move_construct_item); 405 src.clear(); 406 } 407 } 408 409 // Destroy queue 410 ~concurrent_bounded_queue() { 411 clear(); 412 my_queue_representation->clear(my_allocator); 413 queue_allocator_traits::destroy(my_allocator, my_queue_representation); 414 r1::deallocate_bounded_queue_rep(reinterpret_cast<std::uint8_t*>(my_queue_representation), 415 sizeof(queue_representation_type)); 416 } 417 418 concurrent_bounded_queue& operator=( const concurrent_bounded_queue& other ) { 419 //TODO: implement support for std::allocator_traits::propagate_on_container_copy_assignment 420 if (my_queue_representation != other.my_queue_representation) { 421 clear(); 422 my_allocator = other.my_allocator; 423 my_queue_representation->assign(*other.my_queue_representation, my_allocator, copy_construct_item); 424 } 425 return *this; 426 } 427 428 concurrent_bounded_queue& operator=( concurrent_bounded_queue&& other ) { 429 //TODO: implement support for std::allocator_traits::propagate_on_container_move_assignment 430 if (my_queue_representation != other.my_queue_representation) { 431 clear(); 432 if (my_allocator == other.my_allocator) { 433 internal_swap(other); 434 } else { 435 my_queue_representation->assign(*other.my_queue_representation, other.my_allocator, move_construct_item); 436 other.clear(); 437 my_allocator = std::move(other.my_allocator); 438 } 439 } 440 return *this; 441 } 442 443 concurrent_bounded_queue& operator=( std::initializer_list<value_type> init ) { 444 assign(init); 445 return *this; 446 } 447 448 template <typename InputIterator> 449 void assign( InputIterator first, InputIterator last ) { 450 concurrent_bounded_queue src(first, last); 451 clear(); 452 my_queue_representation->assign(*src.my_queue_representation, my_allocator, move_construct_item); 453 } 454 455 void assign( std::initializer_list<value_type> init ) { 456 assign(init.begin(), init.end()); 457 } 458 459 void swap ( concurrent_bounded_queue& other ) { 460 //TODO: implement support for std::allocator_traits::propagate_on_container_swap 461 __TBB_ASSERT(my_allocator == other.my_allocator, "unequal allocators"); 462 internal_swap(other); 463 } 464 465 // Enqueue an item at tail of queue. 466 void push( const T& value ) { 467 internal_push(value); 468 } 469 470 void push( T&& value ) { 471 internal_push(std::move(value)); 472 } 473 474 // Enqueue an item at tail of queue if queue is not already full. 475 // Does not wait for queue to become not full. 476 // Returns true if item is pushed; false if queue was already full. 477 bool try_push( const T& value ) { 478 return internal_push_if_not_full(value); 479 } 480 481 bool try_push( T&& value ) { 482 return internal_push_if_not_full(std::move(value)); 483 } 484 485 template <typename... Args> 486 void emplace( Args&&... args ) { 487 internal_push(std::forward<Args>(args)...); 488 } 489 490 template <typename... Args> 491 bool try_emplace( Args&&... args ) { 492 return internal_push_if_not_full(std::forward<Args>(args)...); 493 } 494 495 // Attempt to dequeue an item from head of queue. 496 void pop( T& result ) { 497 internal_pop(&result); 498 } 499 500 /** Does not wait for item to become available. 501 Returns true if successful; false otherwise. */ 502 bool try_pop( T& result ) { 503 return internal_pop_if_present(&result); 504 } 505 506 void abort() { 507 internal_abort(); 508 } 509 510 // Return the number of items in the queue; thread unsafe 511 std::ptrdiff_t size() const { 512 return my_queue_representation->size(); 513 } 514 515 void set_capacity( size_type new_capacity ) { 516 std::ptrdiff_t c = new_capacity < 0 ? infinite_capacity : new_capacity; 517 my_capacity = c; 518 } 519 520 size_type capacity() const { 521 return my_capacity; 522 } 523 524 // Equivalent to size()==0. 525 __TBB_nodiscard bool empty() const { 526 return my_queue_representation->empty(); 527 } 528 529 // Clear the queue. not thread-safe. 530 void clear() { 531 my_queue_representation->clear(my_allocator); 532 } 533 534 // Return allocator object 535 allocator_type get_allocator() const { return my_allocator; } 536 537 //------------------------------------------------------------------------ 538 // The iterators are intended only for debugging. They are slow and not thread safe. 539 //------------------------------------------------------------------------ 540 541 iterator unsafe_begin() { return concurrent_queue_iterator_provider::get<iterator>(*this); } 542 iterator unsafe_end() { return iterator(); } 543 const_iterator unsafe_begin() const { return concurrent_queue_iterator_provider::get<const_iterator>(*this); } 544 const_iterator unsafe_end() const { return const_iterator(); } 545 const_iterator unsafe_cbegin() const { return concurrent_queue_iterator_provider::get<const_iterator>(*this); } 546 const_iterator unsafe_cend() const { return const_iterator(); } 547 548 private: 549 void internal_swap( concurrent_bounded_queue& src ) { 550 std::swap(my_queue_representation, src.my_queue_representation); 551 std::swap(my_monitors, src.my_monitors); 552 } 553 554 static constexpr std::ptrdiff_t infinite_capacity = std::ptrdiff_t(~size_type(0) / 2); 555 556 template <typename... Args> 557 void internal_push( Args&&... args ) { 558 unsigned old_abort_counter = my_abort_counter.load(std::memory_order_relaxed); 559 ticket_type ticket = my_queue_representation->tail_counter++; 560 std::ptrdiff_t target = ticket - my_capacity; 561 562 if (static_cast<std::ptrdiff_t>(my_queue_representation->head_counter.load(std::memory_order_relaxed)) <= target) { // queue is full 563 auto pred = [&] { 564 if (my_abort_counter.load(std::memory_order_relaxed) != old_abort_counter) { 565 throw_exception(exception_id::user_abort); 566 } 567 568 return static_cast<std::ptrdiff_t>(my_queue_representation->head_counter.load(std::memory_order_relaxed)) <= target; 569 }; 570 571 try_call( [&] { 572 internal_wait(my_monitors, cbq_slots_avail_tag, target, pred); 573 }).on_exception( [&] { 574 my_queue_representation->choose(ticket).abort_push(ticket, *my_queue_representation, my_allocator); 575 }); 576 577 } 578 __TBB_ASSERT((static_cast<std::ptrdiff_t>(my_queue_representation->head_counter.load(std::memory_order_relaxed)) > target), nullptr); 579 my_queue_representation->choose(ticket).push(ticket, *my_queue_representation, my_allocator, std::forward<Args>(args)...); 580 r1::notify_bounded_queue_monitor(my_monitors, cbq_items_avail_tag, ticket); 581 } 582 583 template <typename... Args> 584 bool internal_push_if_not_full( Args&&... args ) { 585 ticket_type ticket = my_queue_representation->tail_counter.load(std::memory_order_relaxed); 586 do { 587 if (static_cast<std::ptrdiff_t>(ticket - my_queue_representation->head_counter.load(std::memory_order_relaxed)) >= my_capacity) { 588 // Queue is full 589 return false; 590 } 591 // Queue had empty slot with ticket k when we looked. Attempt to claim that slot. 592 // Another thread claimed the slot, so retry. 593 } while (!my_queue_representation->tail_counter.compare_exchange_strong(ticket, ticket + 1)); 594 595 my_queue_representation->choose(ticket).push(ticket, *my_queue_representation, my_allocator, std::forward<Args>(args)...); 596 r1::notify_bounded_queue_monitor(my_monitors, cbq_items_avail_tag, ticket); 597 return true; 598 } 599 600 void internal_pop( void* dst ) { 601 std::ptrdiff_t target; 602 // This loop is a single pop operation; abort_counter should not be re-read inside 603 unsigned old_abort_counter = my_abort_counter.load(std::memory_order_relaxed); 604 605 do { 606 target = my_queue_representation->head_counter++; 607 if (static_cast<std::ptrdiff_t>(my_queue_representation->tail_counter.load(std::memory_order_relaxed)) <= target) { 608 auto pred = [&] { 609 if (my_abort_counter.load(std::memory_order_relaxed) != old_abort_counter) { 610 throw_exception(exception_id::user_abort); 611 } 612 613 return static_cast<std::ptrdiff_t>(my_queue_representation->tail_counter.load(std::memory_order_relaxed)) <= target; 614 }; 615 616 try_call( [&] { 617 internal_wait(my_monitors, cbq_items_avail_tag, target, pred); 618 }).on_exception( [&] { 619 my_queue_representation->head_counter--; 620 }); 621 } 622 __TBB_ASSERT(static_cast<std::ptrdiff_t>(my_queue_representation->tail_counter.load(std::memory_order_relaxed)) > target, nullptr); 623 } while (!my_queue_representation->choose(target).pop(dst, target, *my_queue_representation, my_allocator)); 624 625 r1::notify_bounded_queue_monitor(my_monitors, cbq_slots_avail_tag, target); 626 } 627 628 bool internal_pop_if_present( void* dst ) { 629 bool present{}; 630 ticket_type ticket{}; 631 std::tie(present, ticket) = internal_try_pop_impl(dst, *my_queue_representation, my_allocator); 632 633 if (present) { 634 r1::notify_bounded_queue_monitor(my_monitors, cbq_slots_avail_tag, ticket); 635 } 636 return present; 637 } 638 639 void internal_abort() { 640 ++my_abort_counter; 641 r1::abort_bounded_queue_monitors(my_monitors); 642 } 643 644 static void copy_construct_item(T* location, const void* src) { 645 // TODO: use allocator_traits for copy construction 646 new (location) value_type(*static_cast<const value_type*>(src)); 647 } 648 649 static void move_construct_item(T* location, const void* src) { 650 // TODO: use allocator_traits for move construction 651 new (location) value_type(std::move(*static_cast<value_type*>(const_cast<void*>(src)))); 652 } 653 654 template <typename Container, typename Value, typename A> 655 friend class concurrent_queue_iterator; 656 657 queue_allocator_type my_allocator; 658 std::ptrdiff_t my_capacity; 659 std::atomic<unsigned> my_abort_counter; 660 queue_representation_type* my_queue_representation; 661 662 r1::concurrent_monitor* my_monitors; 663 664 friend void swap( concurrent_bounded_queue& lhs, concurrent_bounded_queue& rhs ) { 665 lhs.swap(rhs); 666 } 667 668 friend bool operator==( const concurrent_bounded_queue& lhs, const concurrent_bounded_queue& rhs ) { 669 return lhs.size() == rhs.size() && std::equal(lhs.unsafe_begin(), lhs.unsafe_end(), rhs.unsafe_begin()); 670 } 671 672 #if !__TBB_CPP20_COMPARISONS_PRESENT 673 friend bool operator!=( const concurrent_bounded_queue& lhs, const concurrent_bounded_queue& rhs ) { 674 return !(lhs == rhs); 675 } 676 #endif // __TBB_CPP20_COMPARISONS_PRESENT 677 }; // class concurrent_bounded_queue 678 679 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT 680 // Deduction guide for the constructor from two iterators 681 template <typename It, typename Alloc = tbb::cache_aligned_allocator<iterator_value_t<It>>> 682 concurrent_bounded_queue( It, It, Alloc = Alloc() ) 683 -> concurrent_bounded_queue<iterator_value_t<It>, Alloc>; 684 685 #endif /* __TBB_CPP17_DEDUCTION_GUIDES_PRESENT */ 686 687 } //namespace d2 688 } // namespace detail 689 690 inline namespace v1 { 691 692 using detail::d2::concurrent_queue; 693 using detail::d2::concurrent_bounded_queue; 694 using detail::r1::user_abort; 695 using detail::r1::bad_last_alloc; 696 697 } // inline namespace v1 698 } // namespace tbb 699 700 #endif // __TBB_concurrent_queue_H 701