1 /* 2 Copyright (c) 2005-2022 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #ifndef __TBB_concurrent_queue_H 18 #define __TBB_concurrent_queue_H 19 20 #include "detail/_namespace_injection.h" 21 #include "detail/_concurrent_queue_base.h" 22 #include "detail/_allocator_traits.h" 23 #include "detail/_exception.h" 24 #include "detail/_containers_helpers.h" 25 #include "cache_aligned_allocator.h" 26 27 namespace tbb { 28 namespace detail { 29 namespace d2 { 30 31 template <typename QueueRep, typename Allocator> 32 std::pair<bool, ticket_type> internal_try_pop_impl(void* dst, QueueRep& queue, Allocator& alloc ) { 33 ticket_type ticket{}; 34 do { 35 // Basically, we need to read `head_counter` before `tail_counter`. To achieve it we build happens-before on `head_counter` 36 ticket = queue.head_counter.load(std::memory_order_acquire); 37 do { 38 if (static_cast<std::ptrdiff_t>(queue.tail_counter.load(std::memory_order_relaxed) - ticket) <= 0) { // queue is empty 39 // Queue is empty 40 return { false, ticket }; 41 } 42 // Queue had item with ticket k when we looked. Attempt to get that item. 43 // Another thread snatched the item, retry. 44 } while (!queue.head_counter.compare_exchange_strong(ticket, ticket + 1)); 45 } while (!queue.choose(ticket).pop(dst, ticket, queue, alloc)); 46 return { true, ticket }; 47 } 48 49 // A high-performance thread-safe non-blocking concurrent queue. 50 // Multiple threads may each push and pop concurrently. 51 // Assignment construction is not allowed. 52 template <typename T, typename Allocator = tbb::cache_aligned_allocator<T>> 53 class concurrent_queue { 54 using allocator_traits_type = tbb::detail::allocator_traits<Allocator>; 55 using queue_representation_type = concurrent_queue_rep<T, Allocator>; 56 using queue_allocator_type = typename allocator_traits_type::template rebind_alloc<queue_representation_type>; 57 using queue_allocator_traits = tbb::detail::allocator_traits<queue_allocator_type>; 58 public: 59 using size_type = std::size_t; 60 using value_type = T; 61 using reference = T&; 62 using const_reference = const T&; 63 using difference_type = std::ptrdiff_t; 64 65 using allocator_type = Allocator; 66 using pointer = typename allocator_traits_type::pointer; 67 using const_pointer = typename allocator_traits_type::const_pointer; 68 69 using iterator = concurrent_queue_iterator<concurrent_queue, T, Allocator>; 70 using const_iterator = concurrent_queue_iterator<concurrent_queue, const T, Allocator>; 71 72 concurrent_queue() : concurrent_queue(allocator_type()) {} 73 74 explicit concurrent_queue(const allocator_type& a) : 75 my_allocator(a), my_queue_representation(nullptr) 76 { 77 my_queue_representation = static_cast<queue_representation_type*>(r1::cache_aligned_allocate(sizeof(queue_representation_type))); 78 queue_allocator_traits::construct(my_allocator, my_queue_representation); 79 80 __TBB_ASSERT(is_aligned(my_queue_representation, max_nfs_size), "alignment error" ); 81 __TBB_ASSERT(is_aligned(&my_queue_representation->head_counter, max_nfs_size), "alignment error" ); 82 __TBB_ASSERT(is_aligned(&my_queue_representation->tail_counter, max_nfs_size), "alignment error" ); 83 __TBB_ASSERT(is_aligned(&my_queue_representation->array, max_nfs_size), "alignment error" ); 84 } 85 86 template <typename InputIterator> 87 concurrent_queue(InputIterator begin, InputIterator end, const allocator_type& a = allocator_type()) : 88 concurrent_queue(a) 89 { 90 for (; begin != end; ++begin) 91 push(*begin); 92 } 93 94 concurrent_queue(const concurrent_queue& src, const allocator_type& a) : 95 concurrent_queue(a) 96 { 97 my_queue_representation->assign(*src.my_queue_representation, my_allocator, copy_construct_item); 98 } 99 100 concurrent_queue(const concurrent_queue& src) : 101 concurrent_queue(queue_allocator_traits::select_on_container_copy_construction(src.get_allocator())) 102 { 103 my_queue_representation->assign(*src.my_queue_representation, my_allocator, copy_construct_item); 104 } 105 106 // Move constructors 107 concurrent_queue(concurrent_queue&& src) : 108 concurrent_queue(std::move(src.my_allocator)) 109 { 110 internal_swap(src); 111 } 112 113 concurrent_queue(concurrent_queue&& src, const allocator_type& a) : 114 concurrent_queue(a) 115 { 116 // checking that memory allocated by one instance of allocator can be deallocated 117 // with another 118 if (my_allocator == src.my_allocator) { 119 internal_swap(src); 120 } else { 121 // allocators are different => performing per-element move 122 my_queue_representation->assign(*src.my_queue_representation, my_allocator, move_construct_item); 123 src.clear(); 124 } 125 } 126 127 // Destroy queue 128 ~concurrent_queue() { 129 clear(); 130 my_queue_representation->clear(my_allocator); 131 queue_allocator_traits::destroy(my_allocator, my_queue_representation); 132 r1::cache_aligned_deallocate(my_queue_representation); 133 } 134 135 // Enqueue an item at tail of queue. 136 void push(const T& value) { 137 internal_push(value); 138 } 139 140 void push(T&& value) { 141 internal_push(std::move(value)); 142 } 143 144 template <typename... Args> 145 void emplace( Args&&... args ) { 146 internal_push(std::forward<Args>(args)...); 147 } 148 149 // Attempt to dequeue an item from head of queue. 150 /** Does not wait for item to become available. 151 Returns true if successful; false otherwise. */ 152 bool try_pop( T& result ) { 153 return internal_try_pop(&result); 154 } 155 156 // Return the number of items in the queue; thread unsafe 157 size_type unsafe_size() const { 158 std::ptrdiff_t size = my_queue_representation->size(); 159 return size < 0 ? 0 : size_type(size); 160 } 161 162 // Equivalent to size()==0. 163 __TBB_nodiscard bool empty() const { 164 return my_queue_representation->empty(); 165 } 166 167 // Clear the queue. not thread-safe. 168 void clear() { 169 my_queue_representation->clear(my_allocator); 170 } 171 172 // Return allocator object 173 allocator_type get_allocator() const { return my_allocator; } 174 175 //------------------------------------------------------------------------ 176 // The iterators are intended only for debugging. They are slow and not thread safe. 177 //------------------------------------------------------------------------ 178 179 iterator unsafe_begin() { return concurrent_queue_iterator_provider::get<iterator>(*this); } 180 iterator unsafe_end() { return iterator(); } 181 const_iterator unsafe_begin() const { return concurrent_queue_iterator_provider::get<const_iterator>(*this); } 182 const_iterator unsafe_end() const { return const_iterator(); } 183 const_iterator unsafe_cbegin() const { return concurrent_queue_iterator_provider::get<const_iterator>(*this); } 184 const_iterator unsafe_cend() const { return const_iterator(); } 185 186 private: 187 void internal_swap(concurrent_queue& src) { 188 using std::swap; 189 swap(my_queue_representation, src.my_queue_representation); 190 } 191 192 template <typename... Args> 193 void internal_push( Args&&... args ) { 194 ticket_type k = my_queue_representation->tail_counter++; 195 my_queue_representation->choose(k).push(k, *my_queue_representation, my_allocator, std::forward<Args>(args)...); 196 } 197 198 bool internal_try_pop( void* dst ) { 199 return internal_try_pop_impl(dst, *my_queue_representation, my_allocator).first; 200 } 201 202 template <typename Container, typename Value, typename A> 203 friend class concurrent_queue_iterator; 204 205 static void copy_construct_item(T* location, const void* src) { 206 // TODO: use allocator_traits for copy construction 207 new (location) value_type(*static_cast<const value_type*>(src)); 208 // queue_allocator_traits::construct(my_allocator, location, *static_cast<const T*>(src)); 209 } 210 211 static void move_construct_item(T* location, const void* src) { 212 // TODO: use allocator_traits for move construction 213 new (location) value_type(std::move(*static_cast<value_type*>(const_cast<void*>(src)))); 214 } 215 216 queue_allocator_type my_allocator; 217 queue_representation_type* my_queue_representation; 218 }; // class concurrent_queue 219 220 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT 221 // Deduction guide for the constructor from two iterators 222 template <typename It, typename Alloc = tbb::cache_aligned_allocator<iterator_value_t<It>>, 223 typename = std::enable_if_t<is_input_iterator_v<It>>, 224 typename = std::enable_if_t<is_allocator_v<Alloc>>> 225 concurrent_queue( It, It, Alloc = Alloc() ) 226 -> concurrent_queue<iterator_value_t<It>, Alloc>; 227 228 #endif /* __TBB_CPP17_DEDUCTION_GUIDES_PRESENT */ 229 230 class concurrent_monitor; 231 232 // The concurrent monitor tags for concurrent_bounded_queue. 233 static constexpr std::size_t cbq_slots_avail_tag = 0; 234 static constexpr std::size_t cbq_items_avail_tag = 1; 235 } // namespace d2 236 237 238 namespace r1 { 239 class concurrent_monitor; 240 241 TBB_EXPORT std::uint8_t* __TBB_EXPORTED_FUNC allocate_bounded_queue_rep( std::size_t queue_rep_size ); 242 TBB_EXPORT void __TBB_EXPORTED_FUNC deallocate_bounded_queue_rep( std::uint8_t* mem, std::size_t queue_rep_size ); 243 TBB_EXPORT void __TBB_EXPORTED_FUNC abort_bounded_queue_monitors( concurrent_monitor* monitors ); 244 TBB_EXPORT void __TBB_EXPORTED_FUNC notify_bounded_queue_monitor( concurrent_monitor* monitors, std::size_t monitor_tag 245 , std::size_t ticket ); 246 TBB_EXPORT void __TBB_EXPORTED_FUNC wait_bounded_queue_monitor( concurrent_monitor* monitors, std::size_t monitor_tag, 247 std::ptrdiff_t target, d1::delegate_base& predicate ); 248 } // namespace r1 249 250 251 namespace d2 { 252 // A high-performance thread-safe blocking concurrent bounded queue. 253 // Supports boundedness and blocking semantics. 254 // Multiple threads may each push and pop concurrently. 255 // Assignment construction is not allowed. 256 template <typename T, typename Allocator = tbb::cache_aligned_allocator<T>> 257 class concurrent_bounded_queue { 258 using allocator_traits_type = tbb::detail::allocator_traits<Allocator>; 259 using queue_representation_type = concurrent_queue_rep<T, Allocator>; 260 using queue_allocator_type = typename allocator_traits_type::template rebind_alloc<queue_representation_type>; 261 using queue_allocator_traits = tbb::detail::allocator_traits<queue_allocator_type>; 262 263 template <typename FuncType> 264 void internal_wait(r1::concurrent_monitor* monitors, std::size_t monitor_tag, std::ptrdiff_t target, FuncType pred) { 265 d1::delegated_function<FuncType> func(pred); 266 r1::wait_bounded_queue_monitor(monitors, monitor_tag, target, func); 267 } 268 public: 269 using size_type = std::ptrdiff_t; 270 using value_type = T; 271 using reference = T&; 272 using const_reference = const T&; 273 using difference_type = std::ptrdiff_t; 274 275 using allocator_type = Allocator; 276 using pointer = typename allocator_traits_type::pointer; 277 using const_pointer = typename allocator_traits_type::const_pointer; 278 279 using iterator = concurrent_queue_iterator<concurrent_bounded_queue, T, Allocator>; 280 using const_iterator = concurrent_queue_iterator<concurrent_bounded_queue, const T, Allocator> ; 281 282 concurrent_bounded_queue() : concurrent_bounded_queue(allocator_type()) {} 283 284 explicit concurrent_bounded_queue( const allocator_type& a ) : 285 my_allocator(a), my_capacity(0), my_abort_counter(0), my_queue_representation(nullptr) 286 { 287 my_queue_representation = reinterpret_cast<queue_representation_type*>( 288 r1::allocate_bounded_queue_rep(sizeof(queue_representation_type))); 289 my_monitors = reinterpret_cast<r1::concurrent_monitor*>(my_queue_representation + 1); 290 queue_allocator_traits::construct(my_allocator, my_queue_representation); 291 my_capacity = std::size_t(-1) / (queue_representation_type::item_size > 1 ? queue_representation_type::item_size : 2); 292 293 __TBB_ASSERT(is_aligned(my_queue_representation, max_nfs_size), "alignment error" ); 294 __TBB_ASSERT(is_aligned(&my_queue_representation->head_counter, max_nfs_size), "alignment error" ); 295 __TBB_ASSERT(is_aligned(&my_queue_representation->tail_counter, max_nfs_size), "alignment error" ); 296 __TBB_ASSERT(is_aligned(&my_queue_representation->array, max_nfs_size), "alignment error" ); 297 } 298 299 template <typename InputIterator> 300 concurrent_bounded_queue( InputIterator begin, InputIterator end, const allocator_type& a = allocator_type() ) : 301 concurrent_bounded_queue(a) 302 { 303 for (; begin != end; ++begin) 304 push(*begin); 305 } 306 307 concurrent_bounded_queue( const concurrent_bounded_queue& src, const allocator_type& a ) : 308 concurrent_bounded_queue(a) 309 { 310 my_queue_representation->assign(*src.my_queue_representation, my_allocator, copy_construct_item); 311 } 312 313 concurrent_bounded_queue( const concurrent_bounded_queue& src ) : 314 concurrent_bounded_queue(queue_allocator_traits::select_on_container_copy_construction(src.get_allocator())) 315 { 316 my_queue_representation->assign(*src.my_queue_representation, my_allocator, copy_construct_item); 317 } 318 319 // Move constructors 320 concurrent_bounded_queue( concurrent_bounded_queue&& src ) : 321 concurrent_bounded_queue(std::move(src.my_allocator)) 322 { 323 internal_swap(src); 324 } 325 326 concurrent_bounded_queue( concurrent_bounded_queue&& src, const allocator_type& a ) : 327 concurrent_bounded_queue(a) 328 { 329 // checking that memory allocated by one instance of allocator can be deallocated 330 // with another 331 if (my_allocator == src.my_allocator) { 332 internal_swap(src); 333 } else { 334 // allocators are different => performing per-element move 335 my_queue_representation->assign(*src.my_queue_representation, my_allocator, move_construct_item); 336 src.clear(); 337 } 338 } 339 340 // Destroy queue 341 ~concurrent_bounded_queue() { 342 clear(); 343 my_queue_representation->clear(my_allocator); 344 queue_allocator_traits::destroy(my_allocator, my_queue_representation); 345 r1::deallocate_bounded_queue_rep(reinterpret_cast<std::uint8_t*>(my_queue_representation), 346 sizeof(queue_representation_type)); 347 } 348 349 // Enqueue an item at tail of queue. 350 void push( const T& value ) { 351 internal_push(value); 352 } 353 354 void push( T&& value ) { 355 internal_push(std::move(value)); 356 } 357 358 // Enqueue an item at tail of queue if queue is not already full. 359 // Does not wait for queue to become not full. 360 // Returns true if item is pushed; false if queue was already full. 361 bool try_push( const T& value ) { 362 return internal_push_if_not_full(value); 363 } 364 365 bool try_push( T&& value ) { 366 return internal_push_if_not_full(std::move(value)); 367 } 368 369 template <typename... Args> 370 void emplace( Args&&... args ) { 371 internal_push(std::forward<Args>(args)...); 372 } 373 374 template <typename... Args> 375 bool try_emplace( Args&&... args ) { 376 return internal_push_if_not_full(std::forward<Args>(args)...); 377 } 378 379 // Attempt to dequeue an item from head of queue. 380 void pop( T& result ) { 381 internal_pop(&result); 382 } 383 384 /** Does not wait for item to become available. 385 Returns true if successful; false otherwise. */ 386 bool try_pop( T& result ) { 387 return internal_pop_if_present(&result); 388 } 389 390 void abort() { 391 internal_abort(); 392 } 393 394 // Return the number of items in the queue; thread unsafe 395 std::ptrdiff_t size() const { 396 return my_queue_representation->size(); 397 } 398 399 void set_capacity( size_type new_capacity ) { 400 std::ptrdiff_t c = new_capacity < 0 ? infinite_capacity : new_capacity; 401 my_capacity = c; 402 } 403 404 size_type capacity() const { 405 return my_capacity; 406 } 407 408 // Equivalent to size()==0. 409 __TBB_nodiscard bool empty() const { 410 return my_queue_representation->empty(); 411 } 412 413 // Clear the queue. not thread-safe. 414 void clear() { 415 my_queue_representation->clear(my_allocator); 416 } 417 418 // Return allocator object 419 allocator_type get_allocator() const { return my_allocator; } 420 421 //------------------------------------------------------------------------ 422 // The iterators are intended only for debugging. They are slow and not thread safe. 423 //------------------------------------------------------------------------ 424 425 iterator unsafe_begin() { return concurrent_queue_iterator_provider::get<iterator>(*this); } 426 iterator unsafe_end() { return iterator(); } 427 const_iterator unsafe_begin() const { return concurrent_queue_iterator_provider::get<const_iterator>(*this); } 428 const_iterator unsafe_end() const { return const_iterator(); } 429 const_iterator unsafe_cbegin() const { return concurrent_queue_iterator_provider::get<const_iterator>(*this); } 430 const_iterator unsafe_cend() const { return const_iterator(); } 431 432 private: 433 void internal_swap( concurrent_bounded_queue& src ) { 434 std::swap(my_queue_representation, src.my_queue_representation); 435 std::swap(my_monitors, src.my_monitors); 436 } 437 438 static constexpr std::ptrdiff_t infinite_capacity = std::ptrdiff_t(~size_type(0) / 2); 439 440 template <typename... Args> 441 void internal_push( Args&&... args ) { 442 unsigned old_abort_counter = my_abort_counter.load(std::memory_order_relaxed); 443 ticket_type ticket = my_queue_representation->tail_counter++; 444 std::ptrdiff_t target = ticket - my_capacity; 445 446 if (static_cast<std::ptrdiff_t>(my_queue_representation->head_counter.load(std::memory_order_relaxed)) <= target) { // queue is full 447 auto pred = [&] { 448 if (my_abort_counter.load(std::memory_order_relaxed) != old_abort_counter) { 449 throw_exception(exception_id::user_abort); 450 } 451 452 return static_cast<std::ptrdiff_t>(my_queue_representation->head_counter.load(std::memory_order_relaxed)) <= target; 453 }; 454 455 try_call( [&] { 456 internal_wait(my_monitors, cbq_slots_avail_tag, target, pred); 457 }).on_exception( [&] { 458 my_queue_representation->choose(ticket).abort_push(ticket, *my_queue_representation, my_allocator); 459 }); 460 461 } 462 __TBB_ASSERT((static_cast<std::ptrdiff_t>(my_queue_representation->head_counter.load(std::memory_order_relaxed)) > target), nullptr); 463 my_queue_representation->choose(ticket).push(ticket, *my_queue_representation, my_allocator, std::forward<Args>(args)...); 464 r1::notify_bounded_queue_monitor(my_monitors, cbq_items_avail_tag, ticket); 465 } 466 467 template <typename... Args> 468 bool internal_push_if_not_full( Args&&... args ) { 469 ticket_type ticket = my_queue_representation->tail_counter.load(std::memory_order_relaxed); 470 do { 471 if (static_cast<std::ptrdiff_t>(ticket - my_queue_representation->head_counter.load(std::memory_order_relaxed)) >= my_capacity) { 472 // Queue is full 473 return false; 474 } 475 // Queue had empty slot with ticket k when we looked. Attempt to claim that slot. 476 // Another thread claimed the slot, so retry. 477 } while (!my_queue_representation->tail_counter.compare_exchange_strong(ticket, ticket + 1)); 478 479 my_queue_representation->choose(ticket).push(ticket, *my_queue_representation, my_allocator, std::forward<Args>(args)...); 480 r1::notify_bounded_queue_monitor(my_monitors, cbq_items_avail_tag, ticket); 481 return true; 482 } 483 484 void internal_pop( void* dst ) { 485 std::ptrdiff_t target; 486 // This loop is a single pop operation; abort_counter should not be re-read inside 487 unsigned old_abort_counter = my_abort_counter.load(std::memory_order_relaxed); 488 489 do { 490 target = my_queue_representation->head_counter++; 491 if (static_cast<std::ptrdiff_t>(my_queue_representation->tail_counter.load(std::memory_order_relaxed)) <= target) { 492 auto pred = [&] { 493 if (my_abort_counter.load(std::memory_order_relaxed) != old_abort_counter) { 494 throw_exception(exception_id::user_abort); 495 } 496 497 return static_cast<std::ptrdiff_t>(my_queue_representation->tail_counter.load(std::memory_order_relaxed)) <= target; 498 }; 499 500 try_call( [&] { 501 internal_wait(my_monitors, cbq_items_avail_tag, target, pred); 502 }).on_exception( [&] { 503 my_queue_representation->head_counter--; 504 }); 505 } 506 __TBB_ASSERT(static_cast<std::ptrdiff_t>(my_queue_representation->tail_counter.load(std::memory_order_relaxed)) > target, nullptr); 507 } while (!my_queue_representation->choose(target).pop(dst, target, *my_queue_representation, my_allocator)); 508 509 r1::notify_bounded_queue_monitor(my_monitors, cbq_slots_avail_tag, target); 510 } 511 512 bool internal_pop_if_present( void* dst ) { 513 bool present{}; 514 ticket_type ticket{}; 515 std::tie(present, ticket) = internal_try_pop_impl(dst, *my_queue_representation, my_allocator); 516 517 if (present) { 518 r1::notify_bounded_queue_monitor(my_monitors, cbq_slots_avail_tag, ticket); 519 } 520 return present; 521 } 522 523 void internal_abort() { 524 ++my_abort_counter; 525 r1::abort_bounded_queue_monitors(my_monitors); 526 } 527 528 static void copy_construct_item(T* location, const void* src) { 529 // TODO: use allocator_traits for copy construction 530 new (location) value_type(*static_cast<const value_type*>(src)); 531 } 532 533 static void move_construct_item(T* location, const void* src) { 534 // TODO: use allocator_traits for move construction 535 new (location) value_type(std::move(*static_cast<value_type*>(const_cast<void*>(src)))); 536 } 537 538 template <typename Container, typename Value, typename A> 539 friend class concurrent_queue_iterator; 540 541 queue_allocator_type my_allocator; 542 std::ptrdiff_t my_capacity; 543 std::atomic<unsigned> my_abort_counter; 544 queue_representation_type* my_queue_representation; 545 546 r1::concurrent_monitor* my_monitors; 547 }; // class concurrent_bounded_queue 548 549 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT 550 // Deduction guide for the constructor from two iterators 551 template <typename It, typename Alloc = tbb::cache_aligned_allocator<iterator_value_t<It>>> 552 concurrent_bounded_queue( It, It, Alloc = Alloc() ) 553 -> concurrent_bounded_queue<iterator_value_t<It>, Alloc>; 554 555 #endif /* __TBB_CPP17_DEDUCTION_GUIDES_PRESENT */ 556 557 } //namespace d2 558 } // namesapce detail 559 560 inline namespace v1 { 561 562 using detail::d2::concurrent_queue; 563 using detail::d2::concurrent_bounded_queue; 564 using detail::r1::user_abort; 565 using detail::r1::bad_last_alloc; 566 567 } // inline namespace v1 568 } // namespace tbb 569 570 #endif // __TBB_concurrent_queue_H 571