1 /* 2 Copyright (c) 2005-2022 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #ifndef __TBB_concurrent_queue_H 18 #define __TBB_concurrent_queue_H 19 20 #include "detail/_namespace_injection.h" 21 #include "detail/_concurrent_queue_base.h" 22 #include "detail/_allocator_traits.h" 23 #include "detail/_exception.h" 24 #include "detail/_containers_helpers.h" 25 #include "cache_aligned_allocator.h" 26 27 namespace tbb { 28 namespace detail { 29 namespace d2 { 30 31 // A high-performance thread-safe non-blocking concurrent queue. 32 // Multiple threads may each push and pop concurrently. 33 // Assignment construction is not allowed. 34 template <typename T, typename Allocator = tbb::cache_aligned_allocator<T>> 35 class concurrent_queue { 36 using allocator_traits_type = tbb::detail::allocator_traits<Allocator>; 37 using queue_representation_type = concurrent_queue_rep<T, Allocator>; 38 using queue_allocator_type = typename allocator_traits_type::template rebind_alloc<queue_representation_type>; 39 using queue_allocator_traits = tbb::detail::allocator_traits<queue_allocator_type>; 40 public: 41 using size_type = std::size_t; 42 using value_type = T; 43 using reference = T&; 44 using const_reference = const T&; 45 using difference_type = std::ptrdiff_t; 46 47 using allocator_type = Allocator; 48 using pointer = typename allocator_traits_type::pointer; 49 using const_pointer = typename allocator_traits_type::const_pointer; 50 51 using iterator = concurrent_queue_iterator<concurrent_queue, T, Allocator>; 52 using const_iterator = concurrent_queue_iterator<concurrent_queue, const T, Allocator>; 53 54 concurrent_queue() : concurrent_queue(allocator_type()) {} 55 56 explicit concurrent_queue(const allocator_type& a) : 57 my_allocator(a), my_queue_representation(nullptr) 58 { 59 my_queue_representation = static_cast<queue_representation_type*>(r1::cache_aligned_allocate(sizeof(queue_representation_type))); 60 queue_allocator_traits::construct(my_allocator, my_queue_representation); 61 62 __TBB_ASSERT(is_aligned(my_queue_representation, max_nfs_size), "alignment error" ); 63 __TBB_ASSERT(is_aligned(&my_queue_representation->head_counter, max_nfs_size), "alignment error" ); 64 __TBB_ASSERT(is_aligned(&my_queue_representation->tail_counter, max_nfs_size), "alignment error" ); 65 __TBB_ASSERT(is_aligned(&my_queue_representation->array, max_nfs_size), "alignment error" ); 66 } 67 68 template <typename InputIterator> 69 concurrent_queue(InputIterator begin, InputIterator end, const allocator_type& a = allocator_type()) : 70 concurrent_queue(a) 71 { 72 for (; begin != end; ++begin) 73 push(*begin); 74 } 75 76 concurrent_queue(const concurrent_queue& src, const allocator_type& a) : 77 concurrent_queue(a) 78 { 79 my_queue_representation->assign(*src.my_queue_representation, my_allocator, copy_construct_item); 80 } 81 82 concurrent_queue(const concurrent_queue& src) : 83 concurrent_queue(queue_allocator_traits::select_on_container_copy_construction(src.get_allocator())) 84 { 85 my_queue_representation->assign(*src.my_queue_representation, my_allocator, copy_construct_item); 86 } 87 88 // Move constructors 89 concurrent_queue(concurrent_queue&& src) : 90 concurrent_queue(std::move(src.my_allocator)) 91 { 92 internal_swap(src); 93 } 94 95 concurrent_queue(concurrent_queue&& src, const allocator_type& a) : 96 concurrent_queue(a) 97 { 98 // checking that memory allocated by one instance of allocator can be deallocated 99 // with another 100 if (my_allocator == src.my_allocator) { 101 internal_swap(src); 102 } else { 103 // allocators are different => performing per-element move 104 my_queue_representation->assign(*src.my_queue_representation, my_allocator, move_construct_item); 105 src.clear(); 106 } 107 } 108 109 // Destroy queue 110 ~concurrent_queue() { 111 clear(); 112 my_queue_representation->clear(my_allocator); 113 queue_allocator_traits::destroy(my_allocator, my_queue_representation); 114 r1::cache_aligned_deallocate(my_queue_representation); 115 } 116 117 // Enqueue an item at tail of queue. 118 void push(const T& value) { 119 internal_push(value); 120 } 121 122 void push(T&& value) { 123 internal_push(std::move(value)); 124 } 125 126 template <typename... Args> 127 void emplace( Args&&... args ) { 128 internal_push(std::forward<Args>(args)...); 129 } 130 131 // Attempt to dequeue an item from head of queue. 132 /** Does not wait for item to become available. 133 Returns true if successful; false otherwise. */ 134 bool try_pop( T& result ) { 135 return internal_try_pop(&result); 136 } 137 138 // Return the number of items in the queue; thread unsafe 139 size_type unsafe_size() const { 140 std::ptrdiff_t size = my_queue_representation->size(); 141 return size < 0 ? 0 : size_type(size); 142 } 143 144 // Equivalent to size()==0. 145 __TBB_nodiscard bool empty() const { 146 return my_queue_representation->empty(); 147 } 148 149 // Clear the queue. not thread-safe. 150 void clear() { 151 my_queue_representation->clear(my_allocator); 152 } 153 154 // Return allocator object 155 allocator_type get_allocator() const { return my_allocator; } 156 157 //------------------------------------------------------------------------ 158 // The iterators are intended only for debugging. They are slow and not thread safe. 159 //------------------------------------------------------------------------ 160 161 iterator unsafe_begin() { return concurrent_queue_iterator_provider::get<iterator>(*this); } 162 iterator unsafe_end() { return iterator(); } 163 const_iterator unsafe_begin() const { return concurrent_queue_iterator_provider::get<const_iterator>(*this); } 164 const_iterator unsafe_end() const { return const_iterator(); } 165 const_iterator unsafe_cbegin() const { return concurrent_queue_iterator_provider::get<const_iterator>(*this); } 166 const_iterator unsafe_cend() const { return const_iterator(); } 167 168 private: 169 void internal_swap(concurrent_queue& src) { 170 using std::swap; 171 swap(my_queue_representation, src.my_queue_representation); 172 } 173 174 template <typename... Args> 175 void internal_push( Args&&... args ) { 176 ticket_type k = my_queue_representation->tail_counter++; 177 my_queue_representation->choose(k).push(k, *my_queue_representation, my_allocator, std::forward<Args>(args)...); 178 } 179 180 bool internal_try_pop( void* dst ) { 181 ticket_type k; 182 do { 183 k = my_queue_representation->head_counter.load(std::memory_order_relaxed); 184 do { 185 if (static_cast<std::ptrdiff_t>(my_queue_representation->tail_counter.load(std::memory_order_relaxed) - k) <= 0) { 186 // Queue is empty 187 return false; 188 } 189 190 // Queue had item with ticket k when we looked. Attempt to get that item. 191 // Another thread snatched the item, retry. 192 } while (!my_queue_representation->head_counter.compare_exchange_strong(k, k + 1)); 193 } while (!my_queue_representation->choose(k).pop(dst, k, *my_queue_representation, my_allocator)); 194 return true; 195 } 196 197 template <typename Container, typename Value, typename A> 198 friend class concurrent_queue_iterator; 199 200 static void copy_construct_item(T* location, const void* src) { 201 // TODO: use allocator_traits for copy construction 202 new (location) value_type(*static_cast<const value_type*>(src)); 203 // queue_allocator_traits::construct(my_allocator, location, *static_cast<const T*>(src)); 204 } 205 206 static void move_construct_item(T* location, const void* src) { 207 // TODO: use allocator_traits for move construction 208 new (location) value_type(std::move(*static_cast<value_type*>(const_cast<void*>(src)))); 209 } 210 211 queue_allocator_type my_allocator; 212 queue_representation_type* my_queue_representation; 213 }; // class concurrent_queue 214 215 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT 216 // Deduction guide for the constructor from two iterators 217 template <typename It, typename Alloc = tbb::cache_aligned_allocator<iterator_value_t<It>>, 218 typename = std::enable_if_t<is_input_iterator_v<It>>, 219 typename = std::enable_if_t<is_allocator_v<Alloc>>> 220 concurrent_queue( It, It, Alloc = Alloc() ) 221 -> concurrent_queue<iterator_value_t<It>, Alloc>; 222 223 #endif /* __TBB_CPP17_DEDUCTION_GUIDES_PRESENT */ 224 225 class concurrent_monitor; 226 227 // The concurrent monitor tags for concurrent_bounded_queue. 228 static constexpr std::size_t cbq_slots_avail_tag = 0; 229 static constexpr std::size_t cbq_items_avail_tag = 1; 230 } // namespace d2 231 232 233 namespace r1 { 234 class concurrent_monitor; 235 236 TBB_EXPORT std::uint8_t* __TBB_EXPORTED_FUNC allocate_bounded_queue_rep( std::size_t queue_rep_size ); 237 TBB_EXPORT void __TBB_EXPORTED_FUNC deallocate_bounded_queue_rep( std::uint8_t* mem, std::size_t queue_rep_size ); 238 TBB_EXPORT void __TBB_EXPORTED_FUNC abort_bounded_queue_monitors( concurrent_monitor* monitors ); 239 TBB_EXPORT void __TBB_EXPORTED_FUNC notify_bounded_queue_monitor( concurrent_monitor* monitors, std::size_t monitor_tag 240 , std::size_t ticket ); 241 TBB_EXPORT void __TBB_EXPORTED_FUNC wait_bounded_queue_monitor( concurrent_monitor* monitors, std::size_t monitor_tag, 242 std::ptrdiff_t target, d1::delegate_base& predicate ); 243 } // namespace r1 244 245 246 namespace d2 { 247 // A high-performance thread-safe blocking concurrent bounded queue. 248 // Supports boundedness and blocking semantics. 249 // Multiple threads may each push and pop concurrently. 250 // Assignment construction is not allowed. 251 template <typename T, typename Allocator = tbb::cache_aligned_allocator<T>> 252 class concurrent_bounded_queue { 253 using allocator_traits_type = tbb::detail::allocator_traits<Allocator>; 254 using queue_representation_type = concurrent_queue_rep<T, Allocator>; 255 using queue_allocator_type = typename allocator_traits_type::template rebind_alloc<queue_representation_type>; 256 using queue_allocator_traits = tbb::detail::allocator_traits<queue_allocator_type>; 257 258 template <typename FuncType> 259 void internal_wait(r1::concurrent_monitor* monitors, std::size_t monitor_tag, std::ptrdiff_t target, FuncType pred) { 260 d1::delegated_function<FuncType> func(pred); 261 r1::wait_bounded_queue_monitor(monitors, monitor_tag, target, func); 262 } 263 public: 264 using size_type = std::ptrdiff_t; 265 using value_type = T; 266 using reference = T&; 267 using const_reference = const T&; 268 using difference_type = std::ptrdiff_t; 269 270 using allocator_type = Allocator; 271 using pointer = typename allocator_traits_type::pointer; 272 using const_pointer = typename allocator_traits_type::const_pointer; 273 274 using iterator = concurrent_queue_iterator<concurrent_bounded_queue, T, Allocator>; 275 using const_iterator = concurrent_queue_iterator<concurrent_bounded_queue, const T, Allocator> ; 276 277 concurrent_bounded_queue() : concurrent_bounded_queue(allocator_type()) {} 278 279 explicit concurrent_bounded_queue( const allocator_type& a ) : 280 my_allocator(a), my_capacity(0), my_abort_counter(0), my_queue_representation(nullptr) 281 { 282 my_queue_representation = reinterpret_cast<queue_representation_type*>( 283 r1::allocate_bounded_queue_rep(sizeof(queue_representation_type))); 284 my_monitors = reinterpret_cast<r1::concurrent_monitor*>(my_queue_representation + 1); 285 queue_allocator_traits::construct(my_allocator, my_queue_representation); 286 my_capacity = std::size_t(-1) / (queue_representation_type::item_size > 1 ? queue_representation_type::item_size : 2); 287 288 __TBB_ASSERT(is_aligned(my_queue_representation, max_nfs_size), "alignment error" ); 289 __TBB_ASSERT(is_aligned(&my_queue_representation->head_counter, max_nfs_size), "alignment error" ); 290 __TBB_ASSERT(is_aligned(&my_queue_representation->tail_counter, max_nfs_size), "alignment error" ); 291 __TBB_ASSERT(is_aligned(&my_queue_representation->array, max_nfs_size), "alignment error" ); 292 } 293 294 template <typename InputIterator> 295 concurrent_bounded_queue( InputIterator begin, InputIterator end, const allocator_type& a = allocator_type() ) : 296 concurrent_bounded_queue(a) 297 { 298 for (; begin != end; ++begin) 299 push(*begin); 300 } 301 302 concurrent_bounded_queue( const concurrent_bounded_queue& src, const allocator_type& a ) : 303 concurrent_bounded_queue(a) 304 { 305 my_queue_representation->assign(*src.my_queue_representation, my_allocator, copy_construct_item); 306 } 307 308 concurrent_bounded_queue( const concurrent_bounded_queue& src ) : 309 concurrent_bounded_queue(queue_allocator_traits::select_on_container_copy_construction(src.get_allocator())) 310 { 311 my_queue_representation->assign(*src.my_queue_representation, my_allocator, copy_construct_item); 312 } 313 314 // Move constructors 315 concurrent_bounded_queue( concurrent_bounded_queue&& src ) : 316 concurrent_bounded_queue(std::move(src.my_allocator)) 317 { 318 internal_swap(src); 319 } 320 321 concurrent_bounded_queue( concurrent_bounded_queue&& src, const allocator_type& a ) : 322 concurrent_bounded_queue(a) 323 { 324 // checking that memory allocated by one instance of allocator can be deallocated 325 // with another 326 if (my_allocator == src.my_allocator) { 327 internal_swap(src); 328 } else { 329 // allocators are different => performing per-element move 330 my_queue_representation->assign(*src.my_queue_representation, my_allocator, move_construct_item); 331 src.clear(); 332 } 333 } 334 335 // Destroy queue 336 ~concurrent_bounded_queue() { 337 clear(); 338 my_queue_representation->clear(my_allocator); 339 queue_allocator_traits::destroy(my_allocator, my_queue_representation); 340 r1::deallocate_bounded_queue_rep(reinterpret_cast<std::uint8_t*>(my_queue_representation), 341 sizeof(queue_representation_type)); 342 } 343 344 // Enqueue an item at tail of queue. 345 void push( const T& value ) { 346 internal_push(value); 347 } 348 349 void push( T&& value ) { 350 internal_push(std::move(value)); 351 } 352 353 // Enqueue an item at tail of queue if queue is not already full. 354 // Does not wait for queue to become not full. 355 // Returns true if item is pushed; false if queue was already full. 356 bool try_push( const T& value ) { 357 return internal_push_if_not_full(value); 358 } 359 360 bool try_push( T&& value ) { 361 return internal_push_if_not_full(std::move(value)); 362 } 363 364 template <typename... Args> 365 void emplace( Args&&... args ) { 366 internal_push(std::forward<Args>(args)...); 367 } 368 369 template <typename... Args> 370 bool try_emplace( Args&&... args ) { 371 return internal_push_if_not_full(std::forward<Args>(args)...); 372 } 373 374 // Attempt to dequeue an item from head of queue. 375 void pop( T& result ) { 376 internal_pop(&result); 377 } 378 379 /** Does not wait for item to become available. 380 Returns true if successful; false otherwise. */ 381 bool try_pop( T& result ) { 382 return internal_pop_if_present(&result); 383 } 384 385 void abort() { 386 internal_abort(); 387 } 388 389 // Return the number of items in the queue; thread unsafe 390 std::ptrdiff_t size() const { 391 return my_queue_representation->size(); 392 } 393 394 void set_capacity( size_type new_capacity ) { 395 std::ptrdiff_t c = new_capacity < 0 ? infinite_capacity : new_capacity; 396 my_capacity = c; 397 } 398 399 size_type capacity() const { 400 return my_capacity; 401 } 402 403 // Equivalent to size()==0. 404 __TBB_nodiscard bool empty() const { 405 return my_queue_representation->empty(); 406 } 407 408 // Clear the queue. not thread-safe. 409 void clear() { 410 my_queue_representation->clear(my_allocator); 411 } 412 413 // Return allocator object 414 allocator_type get_allocator() const { return my_allocator; } 415 416 //------------------------------------------------------------------------ 417 // The iterators are intended only for debugging. They are slow and not thread safe. 418 //------------------------------------------------------------------------ 419 420 iterator unsafe_begin() { return concurrent_queue_iterator_provider::get<iterator>(*this); } 421 iterator unsafe_end() { return iterator(); } 422 const_iterator unsafe_begin() const { return concurrent_queue_iterator_provider::get<const_iterator>(*this); } 423 const_iterator unsafe_end() const { return const_iterator(); } 424 const_iterator unsafe_cbegin() const { return concurrent_queue_iterator_provider::get<const_iterator>(*this); } 425 const_iterator unsafe_cend() const { return const_iterator(); } 426 427 private: 428 void internal_swap( concurrent_bounded_queue& src ) { 429 std::swap(my_queue_representation, src.my_queue_representation); 430 std::swap(my_monitors, src.my_monitors); 431 } 432 433 static constexpr std::ptrdiff_t infinite_capacity = std::ptrdiff_t(~size_type(0) / 2); 434 435 template <typename... Args> 436 void internal_push( Args&&... args ) { 437 unsigned old_abort_counter = my_abort_counter.load(std::memory_order_relaxed); 438 ticket_type ticket = my_queue_representation->tail_counter++; 439 std::ptrdiff_t target = ticket - my_capacity; 440 441 if (static_cast<std::ptrdiff_t>(my_queue_representation->head_counter.load(std::memory_order_relaxed)) <= target) { // queue is full 442 auto pred = [&] { 443 if (my_abort_counter.load(std::memory_order_relaxed) != old_abort_counter) { 444 throw_exception(exception_id::user_abort); 445 } 446 447 return static_cast<std::ptrdiff_t>(my_queue_representation->head_counter.load(std::memory_order_relaxed)) <= target; 448 }; 449 450 try_call( [&] { 451 internal_wait(my_monitors, cbq_slots_avail_tag, target, pred); 452 }).on_exception( [&] { 453 my_queue_representation->choose(ticket).abort_push(ticket, *my_queue_representation, my_allocator); 454 }); 455 456 } 457 __TBB_ASSERT((static_cast<std::ptrdiff_t>(my_queue_representation->head_counter.load(std::memory_order_relaxed)) > target), nullptr); 458 my_queue_representation->choose(ticket).push(ticket, *my_queue_representation, my_allocator, std::forward<Args>(args)...); 459 r1::notify_bounded_queue_monitor(my_monitors, cbq_items_avail_tag, ticket); 460 } 461 462 template <typename... Args> 463 bool internal_push_if_not_full( Args&&... args ) { 464 ticket_type ticket = my_queue_representation->tail_counter.load(std::memory_order_relaxed); 465 do { 466 if (static_cast<std::ptrdiff_t>(ticket - my_queue_representation->head_counter.load(std::memory_order_relaxed)) >= my_capacity) { 467 // Queue is full 468 return false; 469 } 470 // Queue had empty slot with ticket k when we looked. Attempt to claim that slot. 471 // Another thread claimed the slot, so retry. 472 } while (!my_queue_representation->tail_counter.compare_exchange_strong(ticket, ticket + 1)); 473 474 my_queue_representation->choose(ticket).push(ticket, *my_queue_representation, my_allocator, std::forward<Args>(args)...); 475 r1::notify_bounded_queue_monitor(my_monitors, cbq_items_avail_tag, ticket); 476 return true; 477 } 478 479 void internal_pop( void* dst ) { 480 std::ptrdiff_t target; 481 // This loop is a single pop operation; abort_counter should not be re-read inside 482 unsigned old_abort_counter = my_abort_counter.load(std::memory_order_relaxed); 483 484 do { 485 target = my_queue_representation->head_counter++; 486 if (static_cast<std::ptrdiff_t>(my_queue_representation->tail_counter.load(std::memory_order_relaxed)) <= target) { 487 auto pred = [&] { 488 if (my_abort_counter.load(std::memory_order_relaxed) != old_abort_counter) { 489 throw_exception(exception_id::user_abort); 490 } 491 492 return static_cast<std::ptrdiff_t>(my_queue_representation->tail_counter.load(std::memory_order_relaxed)) <= target; 493 }; 494 495 try_call( [&] { 496 internal_wait(my_monitors, cbq_items_avail_tag, target, pred); 497 }).on_exception( [&] { 498 my_queue_representation->head_counter--; 499 }); 500 } 501 __TBB_ASSERT(static_cast<std::ptrdiff_t>(my_queue_representation->tail_counter.load(std::memory_order_relaxed)) > target, nullptr); 502 } while (!my_queue_representation->choose(target).pop(dst, target, *my_queue_representation, my_allocator)); 503 504 r1::notify_bounded_queue_monitor(my_monitors, cbq_slots_avail_tag, target); 505 } 506 507 bool internal_pop_if_present( void* dst ) { 508 ticket_type ticket; 509 do { 510 ticket = my_queue_representation->head_counter.load(std::memory_order_relaxed); 511 do { 512 if (static_cast<std::ptrdiff_t>(my_queue_representation->tail_counter.load(std::memory_order_relaxed) - ticket) <= 0) { // queue is empty 513 // Queue is empty 514 return false; 515 } 516 // Queue had item with ticket k when we looked. Attempt to get that item. 517 // Another thread snatched the item, retry. 518 } while (!my_queue_representation->head_counter.compare_exchange_strong(ticket, ticket + 1)); 519 } while (!my_queue_representation->choose(ticket).pop(dst, ticket, *my_queue_representation, my_allocator)); 520 521 r1::notify_bounded_queue_monitor(my_monitors, cbq_slots_avail_tag, ticket); 522 return true; 523 } 524 525 void internal_abort() { 526 ++my_abort_counter; 527 r1::abort_bounded_queue_monitors(my_monitors); 528 } 529 530 static void copy_construct_item(T* location, const void* src) { 531 // TODO: use allocator_traits for copy construction 532 new (location) value_type(*static_cast<const value_type*>(src)); 533 } 534 535 static void move_construct_item(T* location, const void* src) { 536 // TODO: use allocator_traits for move construction 537 new (location) value_type(std::move(*static_cast<value_type*>(const_cast<void*>(src)))); 538 } 539 540 template <typename Container, typename Value, typename A> 541 friend class concurrent_queue_iterator; 542 543 queue_allocator_type my_allocator; 544 std::ptrdiff_t my_capacity; 545 std::atomic<unsigned> my_abort_counter; 546 queue_representation_type* my_queue_representation; 547 548 r1::concurrent_monitor* my_monitors; 549 }; // class concurrent_bounded_queue 550 551 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT 552 // Deduction guide for the constructor from two iterators 553 template <typename It, typename Alloc = tbb::cache_aligned_allocator<iterator_value_t<It>>> 554 concurrent_bounded_queue( It, It, Alloc = Alloc() ) 555 -> concurrent_bounded_queue<iterator_value_t<It>, Alloc>; 556 557 #endif /* __TBB_CPP17_DEDUCTION_GUIDES_PRESENT */ 558 559 } //namespace d2 560 } // namesapce detail 561 562 inline namespace v1 { 563 564 using detail::d2::concurrent_queue; 565 using detail::d2::concurrent_bounded_queue; 566 using detail::r1::user_abort; 567 using detail::r1::bad_last_alloc; 568 569 } // inline namespace v1 570 } // namespace tbb 571 572 #endif // __TBB_concurrent_queue_H 573