1 /*
2 Copyright (c) 2005-2023 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16
17 #ifndef __TBB_concurrent_queue_H
18 #define __TBB_concurrent_queue_H
19
20 #include "detail/_namespace_injection.h"
21 #include "detail/_concurrent_queue_base.h"
22 #include "detail/_allocator_traits.h"
23 #include "detail/_exception.h"
24 #include "detail/_containers_helpers.h"
25 #include "cache_aligned_allocator.h"
26
27 namespace tbb {
28 namespace detail {
29 namespace d2 {
30
31 template <typename QueueRep, typename Allocator>
internal_try_pop_impl(void * dst,QueueRep & queue,Allocator & alloc)32 std::pair<bool, ticket_type> internal_try_pop_impl(void* dst, QueueRep& queue, Allocator& alloc ) {
33 ticket_type ticket{};
34 do {
35 // Basically, we need to read `head_counter` before `tail_counter`. To achieve it we build happens-before on `head_counter`
36 ticket = queue.head_counter.load(std::memory_order_acquire);
37 do {
38 if (static_cast<std::ptrdiff_t>(queue.tail_counter.load(std::memory_order_relaxed) - ticket) <= 0) { // queue is empty
39 // Queue is empty
40 return { false, ticket };
41 }
42 // Queue had item with ticket k when we looked. Attempt to get that item.
43 // Another thread snatched the item, retry.
44 } while (!queue.head_counter.compare_exchange_strong(ticket, ticket + 1));
45 } while (!queue.choose(ticket).pop(dst, ticket, queue, alloc));
46 return { true, ticket };
47 }
48
49 // A high-performance thread-safe non-blocking concurrent queue.
50 // Multiple threads may each push and pop concurrently.
51 // Assignment construction is not allowed.
52 template <typename T, typename Allocator = tbb::cache_aligned_allocator<T>>
53 class concurrent_queue {
54 using allocator_traits_type = tbb::detail::allocator_traits<Allocator>;
55 using queue_representation_type = concurrent_queue_rep<T, Allocator>;
56 using queue_allocator_type = typename allocator_traits_type::template rebind_alloc<queue_representation_type>;
57 using queue_allocator_traits = tbb::detail::allocator_traits<queue_allocator_type>;
58 public:
59 using size_type = std::size_t;
60 using value_type = T;
61 using reference = T&;
62 using const_reference = const T&;
63 using difference_type = std::ptrdiff_t;
64
65 using allocator_type = Allocator;
66 using pointer = typename allocator_traits_type::pointer;
67 using const_pointer = typename allocator_traits_type::const_pointer;
68
69 using iterator = concurrent_queue_iterator<concurrent_queue, T, Allocator>;
70 using const_iterator = concurrent_queue_iterator<concurrent_queue, const T, Allocator>;
71
concurrent_queue()72 concurrent_queue() : concurrent_queue(allocator_type()) {}
73
concurrent_queue(const allocator_type & a)74 explicit concurrent_queue(const allocator_type& a) :
75 my_allocator(a), my_queue_representation(nullptr)
76 {
77 my_queue_representation = static_cast<queue_representation_type*>(r1::cache_aligned_allocate(sizeof(queue_representation_type)));
78 queue_allocator_traits::construct(my_allocator, my_queue_representation);
79
80 __TBB_ASSERT(is_aligned(my_queue_representation, max_nfs_size), "alignment error" );
81 __TBB_ASSERT(is_aligned(&my_queue_representation->head_counter, max_nfs_size), "alignment error" );
82 __TBB_ASSERT(is_aligned(&my_queue_representation->tail_counter, max_nfs_size), "alignment error" );
83 __TBB_ASSERT(is_aligned(&my_queue_representation->array, max_nfs_size), "alignment error" );
84 }
85
86 template <typename InputIterator>
87 concurrent_queue(InputIterator begin, InputIterator end, const allocator_type& a = allocator_type()) :
concurrent_queue(a)88 concurrent_queue(a)
89 {
90 for (; begin != end; ++begin)
91 push(*begin);
92 }
93
94 concurrent_queue( std::initializer_list<value_type> init, const allocator_type& alloc = allocator_type() ) :
95 concurrent_queue(init.begin(), init.end(), alloc)
96 {}
97
concurrent_queue(const concurrent_queue & src,const allocator_type & a)98 concurrent_queue(const concurrent_queue& src, const allocator_type& a) :
99 concurrent_queue(a)
100 {
101 my_queue_representation->assign(*src.my_queue_representation, my_allocator, copy_construct_item);
102 }
103
concurrent_queue(const concurrent_queue & src)104 concurrent_queue(const concurrent_queue& src) :
105 concurrent_queue(queue_allocator_traits::select_on_container_copy_construction(src.get_allocator()))
106 {
107 my_queue_representation->assign(*src.my_queue_representation, my_allocator, copy_construct_item);
108 }
109
110 // Move constructors
concurrent_queue(concurrent_queue && src)111 concurrent_queue(concurrent_queue&& src) :
112 concurrent_queue(std::move(src.my_allocator))
113 {
114 internal_swap(src);
115 }
116
concurrent_queue(concurrent_queue && src,const allocator_type & a)117 concurrent_queue(concurrent_queue&& src, const allocator_type& a) :
118 concurrent_queue(a)
119 {
120 // checking that memory allocated by one instance of allocator can be deallocated
121 // with another
122 if (my_allocator == src.my_allocator) {
123 internal_swap(src);
124 } else {
125 // allocators are different => performing per-element move
126 my_queue_representation->assign(*src.my_queue_representation, my_allocator, move_construct_item);
127 src.clear();
128 }
129 }
130
131 // Destroy queue
~concurrent_queue()132 ~concurrent_queue() {
133 clear();
134 my_queue_representation->clear(my_allocator);
135 queue_allocator_traits::destroy(my_allocator, my_queue_representation);
136 r1::cache_aligned_deallocate(my_queue_representation);
137 }
138
139 concurrent_queue& operator=( const concurrent_queue& other ) {
140 //TODO: implement support for std::allocator_traits::propagate_on_container_copy_assignment
141 if (my_queue_representation != other.my_queue_representation) {
142 clear();
143 my_allocator = other.my_allocator;
144 my_queue_representation->assign(*other.my_queue_representation, my_allocator, copy_construct_item);
145 }
146 return *this;
147 }
148
149 concurrent_queue& operator=( concurrent_queue&& other ) {
150 //TODO: implement support for std::allocator_traits::propagate_on_container_move_assignment
151 if (my_queue_representation != other.my_queue_representation) {
152 clear();
153 if (my_allocator == other.my_allocator) {
154 internal_swap(other);
155 } else {
156 my_queue_representation->assign(*other.my_queue_representation, other.my_allocator, move_construct_item);
157 other.clear();
158 my_allocator = std::move(other.my_allocator);
159 }
160 }
161 return *this;
162 }
163
164 concurrent_queue& operator=( std::initializer_list<value_type> init ) {
165 assign(init);
166 return *this;
167 }
168
169 template <typename InputIterator>
assign(InputIterator first,InputIterator last)170 void assign( InputIterator first, InputIterator last ) {
171 concurrent_queue src(first, last);
172 clear();
173 my_queue_representation->assign(*src.my_queue_representation, my_allocator, move_construct_item);
174 }
175
assign(std::initializer_list<value_type> init)176 void assign( std::initializer_list<value_type> init ) {
177 assign(init.begin(), init.end());
178 }
179
swap(concurrent_queue & other)180 void swap ( concurrent_queue& other ) {
181 //TODO: implement support for std::allocator_traits::propagate_on_container_swap
182 __TBB_ASSERT(my_allocator == other.my_allocator, "unequal allocators");
183 internal_swap(other);
184 }
185
186 // Enqueue an item at tail of queue.
push(const T & value)187 void push(const T& value) {
188 internal_push(value);
189 }
190
push(T && value)191 void push(T&& value) {
192 internal_push(std::move(value));
193 }
194
195 template <typename... Args>
emplace(Args &&...args)196 void emplace( Args&&... args ) {
197 internal_push(std::forward<Args>(args)...);
198 }
199
200 // Attempt to dequeue an item from head of queue.
201 /** Does not wait for item to become available.
202 Returns true if successful; false otherwise. */
try_pop(T & result)203 bool try_pop( T& result ) {
204 return internal_try_pop(&result);
205 }
206
207 // Return the number of items in the queue; thread unsafe
unsafe_size()208 size_type unsafe_size() const {
209 std::ptrdiff_t size = my_queue_representation->size();
210 return size < 0 ? 0 : size_type(size);
211 }
212
213 // Equivalent to size()==0.
empty()214 __TBB_nodiscard bool empty() const {
215 return my_queue_representation->empty();
216 }
217
218 // Clear the queue. not thread-safe.
clear()219 void clear() {
220 my_queue_representation->clear(my_allocator);
221 }
222
223 // Return allocator object
get_allocator()224 allocator_type get_allocator() const { return my_allocator; }
225
226 //------------------------------------------------------------------------
227 // The iterators are intended only for debugging. They are slow and not thread safe.
228 //------------------------------------------------------------------------
229
unsafe_begin()230 iterator unsafe_begin() { return concurrent_queue_iterator_provider::get<iterator>(*this); }
unsafe_end()231 iterator unsafe_end() { return iterator(); }
unsafe_begin()232 const_iterator unsafe_begin() const { return concurrent_queue_iterator_provider::get<const_iterator>(*this); }
unsafe_end()233 const_iterator unsafe_end() const { return const_iterator(); }
unsafe_cbegin()234 const_iterator unsafe_cbegin() const { return concurrent_queue_iterator_provider::get<const_iterator>(*this); }
unsafe_cend()235 const_iterator unsafe_cend() const { return const_iterator(); }
236
237 private:
internal_swap(concurrent_queue & src)238 void internal_swap(concurrent_queue& src) {
239 using std::swap;
240 swap(my_queue_representation, src.my_queue_representation);
241 }
242
243 template <typename... Args>
internal_push(Args &&...args)244 void internal_push( Args&&... args ) {
245 ticket_type k = my_queue_representation->tail_counter++;
246 my_queue_representation->choose(k).push(k, *my_queue_representation, my_allocator, std::forward<Args>(args)...);
247 }
248
internal_try_pop(void * dst)249 bool internal_try_pop( void* dst ) {
250 return internal_try_pop_impl(dst, *my_queue_representation, my_allocator).first;
251 }
252
253 template <typename Container, typename Value, typename A>
254 friend class concurrent_queue_iterator;
255
copy_construct_item(T * location,const void * src)256 static void copy_construct_item(T* location, const void* src) {
257 // TODO: use allocator_traits for copy construction
258 new (location) value_type(*static_cast<const value_type*>(src));
259 // queue_allocator_traits::construct(my_allocator, location, *static_cast<const T*>(src));
260 }
261
move_construct_item(T * location,const void * src)262 static void move_construct_item(T* location, const void* src) {
263 // TODO: use allocator_traits for move construction
264 new (location) value_type(std::move(*static_cast<value_type*>(const_cast<void*>(src))));
265 }
266
267 queue_allocator_type my_allocator;
268 queue_representation_type* my_queue_representation;
269
swap(concurrent_queue & lhs,concurrent_queue & rhs)270 friend void swap( concurrent_queue& lhs, concurrent_queue& rhs ) {
271 lhs.swap(rhs);
272 }
273
274 friend bool operator==( const concurrent_queue& lhs, const concurrent_queue& rhs ) {
275 return lhs.unsafe_size() == rhs.unsafe_size() && std::equal(lhs.unsafe_begin(), lhs.unsafe_end(), rhs.unsafe_begin());
276 }
277
278 #if !__TBB_CPP20_COMPARISONS_PRESENT
279 friend bool operator!=( const concurrent_queue& lhs, const concurrent_queue& rhs ) {
280 return !(lhs == rhs);
281 }
282 #endif // __TBB_CPP20_COMPARISONS_PRESENT
283 }; // class concurrent_queue
284
285 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
286 // Deduction guide for the constructor from two iterators
287 template <typename It, typename Alloc = tbb::cache_aligned_allocator<iterator_value_t<It>>,
288 typename = std::enable_if_t<is_input_iterator_v<It>>,
289 typename = std::enable_if_t<is_allocator_v<Alloc>>>
290 concurrent_queue( It, It, Alloc = Alloc() )
291 -> concurrent_queue<iterator_value_t<It>, Alloc>;
292
293 #endif /* __TBB_CPP17_DEDUCTION_GUIDES_PRESENT */
294
295 class concurrent_monitor;
296
297 // The concurrent monitor tags for concurrent_bounded_queue.
298 static constexpr std::size_t cbq_slots_avail_tag = 0;
299 static constexpr std::size_t cbq_items_avail_tag = 1;
300 } // namespace d2
301
302
303 namespace r1 {
304 class concurrent_monitor;
305
306 TBB_EXPORT std::uint8_t* __TBB_EXPORTED_FUNC allocate_bounded_queue_rep( std::size_t queue_rep_size );
307 TBB_EXPORT void __TBB_EXPORTED_FUNC deallocate_bounded_queue_rep( std::uint8_t* mem, std::size_t queue_rep_size );
308 TBB_EXPORT void __TBB_EXPORTED_FUNC abort_bounded_queue_monitors( concurrent_monitor* monitors );
309 TBB_EXPORT void __TBB_EXPORTED_FUNC notify_bounded_queue_monitor( concurrent_monitor* monitors, std::size_t monitor_tag
310 , std::size_t ticket );
311 TBB_EXPORT void __TBB_EXPORTED_FUNC wait_bounded_queue_monitor( concurrent_monitor* monitors, std::size_t monitor_tag,
312 std::ptrdiff_t target, d1::delegate_base& predicate );
313 } // namespace r1
314
315
316 namespace d2 {
317 // A high-performance thread-safe blocking concurrent bounded queue.
318 // Supports boundedness and blocking semantics.
319 // Multiple threads may each push and pop concurrently.
320 // Assignment construction is not allowed.
321 template <typename T, typename Allocator = tbb::cache_aligned_allocator<T>>
322 class concurrent_bounded_queue {
323 using allocator_traits_type = tbb::detail::allocator_traits<Allocator>;
324 using queue_representation_type = concurrent_queue_rep<T, Allocator>;
325 using queue_allocator_type = typename allocator_traits_type::template rebind_alloc<queue_representation_type>;
326 using queue_allocator_traits = tbb::detail::allocator_traits<queue_allocator_type>;
327
328 template <typename FuncType>
internal_wait(r1::concurrent_monitor * monitors,std::size_t monitor_tag,std::ptrdiff_t target,FuncType pred)329 void internal_wait(r1::concurrent_monitor* monitors, std::size_t monitor_tag, std::ptrdiff_t target, FuncType pred) {
330 d1::delegated_function<FuncType> func(pred);
331 r1::wait_bounded_queue_monitor(monitors, monitor_tag, target, func);
332 }
333 public:
334 using size_type = std::ptrdiff_t;
335 using value_type = T;
336 using reference = T&;
337 using const_reference = const T&;
338 using difference_type = std::ptrdiff_t;
339
340 using allocator_type = Allocator;
341 using pointer = typename allocator_traits_type::pointer;
342 using const_pointer = typename allocator_traits_type::const_pointer;
343
344 using iterator = concurrent_queue_iterator<concurrent_bounded_queue, T, Allocator>;
345 using const_iterator = concurrent_queue_iterator<concurrent_bounded_queue, const T, Allocator> ;
346
concurrent_bounded_queue()347 concurrent_bounded_queue() : concurrent_bounded_queue(allocator_type()) {}
348
concurrent_bounded_queue(const allocator_type & a)349 explicit concurrent_bounded_queue( const allocator_type& a ) :
350 my_allocator(a), my_capacity(0), my_abort_counter(0), my_queue_representation(nullptr)
351 {
352 my_queue_representation = reinterpret_cast<queue_representation_type*>(
353 r1::allocate_bounded_queue_rep(sizeof(queue_representation_type)));
354 my_monitors = reinterpret_cast<r1::concurrent_monitor*>(my_queue_representation + 1);
355 queue_allocator_traits::construct(my_allocator, my_queue_representation);
356 my_capacity = std::size_t(-1) / (queue_representation_type::item_size > 1 ? queue_representation_type::item_size : 2);
357
358 __TBB_ASSERT(is_aligned(my_queue_representation, max_nfs_size), "alignment error" );
359 __TBB_ASSERT(is_aligned(&my_queue_representation->head_counter, max_nfs_size), "alignment error" );
360 __TBB_ASSERT(is_aligned(&my_queue_representation->tail_counter, max_nfs_size), "alignment error" );
361 __TBB_ASSERT(is_aligned(&my_queue_representation->array, max_nfs_size), "alignment error" );
362 }
363
364 template <typename InputIterator>
365 concurrent_bounded_queue( InputIterator begin, InputIterator end, const allocator_type& a = allocator_type() ) :
concurrent_bounded_queue(a)366 concurrent_bounded_queue(a)
367 {
368 for (; begin != end; ++begin)
369 push(*begin);
370 }
371
372 concurrent_bounded_queue( std::initializer_list<value_type> init, const allocator_type& alloc = allocator_type() ):
373 concurrent_bounded_queue(init.begin(), init.end(), alloc)
374 {}
375
concurrent_bounded_queue(const concurrent_bounded_queue & src,const allocator_type & a)376 concurrent_bounded_queue( const concurrent_bounded_queue& src, const allocator_type& a ) :
377 concurrent_bounded_queue(a)
378 {
379 my_queue_representation->assign(*src.my_queue_representation, my_allocator, copy_construct_item);
380 }
381
concurrent_bounded_queue(const concurrent_bounded_queue & src)382 concurrent_bounded_queue( const concurrent_bounded_queue& src ) :
383 concurrent_bounded_queue(queue_allocator_traits::select_on_container_copy_construction(src.get_allocator()))
384 {
385 my_queue_representation->assign(*src.my_queue_representation, my_allocator, copy_construct_item);
386 }
387
388 // Move constructors
concurrent_bounded_queue(concurrent_bounded_queue && src)389 concurrent_bounded_queue( concurrent_bounded_queue&& src ) :
390 concurrent_bounded_queue(std::move(src.my_allocator))
391 {
392 internal_swap(src);
393 }
394
concurrent_bounded_queue(concurrent_bounded_queue && src,const allocator_type & a)395 concurrent_bounded_queue( concurrent_bounded_queue&& src, const allocator_type& a ) :
396 concurrent_bounded_queue(a)
397 {
398 // checking that memory allocated by one instance of allocator can be deallocated
399 // with another
400 if (my_allocator == src.my_allocator) {
401 internal_swap(src);
402 } else {
403 // allocators are different => performing per-element move
404 my_queue_representation->assign(*src.my_queue_representation, my_allocator, move_construct_item);
405 src.clear();
406 }
407 }
408
409 // Destroy queue
~concurrent_bounded_queue()410 ~concurrent_bounded_queue() {
411 clear();
412 my_queue_representation->clear(my_allocator);
413 queue_allocator_traits::destroy(my_allocator, my_queue_representation);
414 r1::deallocate_bounded_queue_rep(reinterpret_cast<std::uint8_t*>(my_queue_representation),
415 sizeof(queue_representation_type));
416 }
417
418 concurrent_bounded_queue& operator=( const concurrent_bounded_queue& other ) {
419 //TODO: implement support for std::allocator_traits::propagate_on_container_copy_assignment
420 if (my_queue_representation != other.my_queue_representation) {
421 clear();
422 my_allocator = other.my_allocator;
423 my_queue_representation->assign(*other.my_queue_representation, my_allocator, copy_construct_item);
424 }
425 return *this;
426 }
427
428 concurrent_bounded_queue& operator=( concurrent_bounded_queue&& other ) {
429 //TODO: implement support for std::allocator_traits::propagate_on_container_move_assignment
430 if (my_queue_representation != other.my_queue_representation) {
431 clear();
432 if (my_allocator == other.my_allocator) {
433 internal_swap(other);
434 } else {
435 my_queue_representation->assign(*other.my_queue_representation, other.my_allocator, move_construct_item);
436 other.clear();
437 my_allocator = std::move(other.my_allocator);
438 }
439 }
440 return *this;
441 }
442
443 concurrent_bounded_queue& operator=( std::initializer_list<value_type> init ) {
444 assign(init);
445 return *this;
446 }
447
448 template <typename InputIterator>
assign(InputIterator first,InputIterator last)449 void assign( InputIterator first, InputIterator last ) {
450 concurrent_bounded_queue src(first, last);
451 clear();
452 my_queue_representation->assign(*src.my_queue_representation, my_allocator, move_construct_item);
453 }
454
assign(std::initializer_list<value_type> init)455 void assign( std::initializer_list<value_type> init ) {
456 assign(init.begin(), init.end());
457 }
458
swap(concurrent_bounded_queue & other)459 void swap ( concurrent_bounded_queue& other ) {
460 //TODO: implement support for std::allocator_traits::propagate_on_container_swap
461 __TBB_ASSERT(my_allocator == other.my_allocator, "unequal allocators");
462 internal_swap(other);
463 }
464
465 // Enqueue an item at tail of queue.
push(const T & value)466 void push( const T& value ) {
467 internal_push(value);
468 }
469
push(T && value)470 void push( T&& value ) {
471 internal_push(std::move(value));
472 }
473
474 // Enqueue an item at tail of queue if queue is not already full.
475 // Does not wait for queue to become not full.
476 // Returns true if item is pushed; false if queue was already full.
try_push(const T & value)477 bool try_push( const T& value ) {
478 return internal_push_if_not_full(value);
479 }
480
try_push(T && value)481 bool try_push( T&& value ) {
482 return internal_push_if_not_full(std::move(value));
483 }
484
485 template <typename... Args>
emplace(Args &&...args)486 void emplace( Args&&... args ) {
487 internal_push(std::forward<Args>(args)...);
488 }
489
490 template <typename... Args>
try_emplace(Args &&...args)491 bool try_emplace( Args&&... args ) {
492 return internal_push_if_not_full(std::forward<Args>(args)...);
493 }
494
495 // Attempt to dequeue an item from head of queue.
pop(T & result)496 void pop( T& result ) {
497 internal_pop(&result);
498 }
499
500 /** Does not wait for item to become available.
501 Returns true if successful; false otherwise. */
try_pop(T & result)502 bool try_pop( T& result ) {
503 return internal_pop_if_present(&result);
504 }
505
abort()506 void abort() {
507 internal_abort();
508 }
509
510 // Return the number of items in the queue; thread unsafe
size()511 std::ptrdiff_t size() const {
512 return my_queue_representation->size();
513 }
514
set_capacity(size_type new_capacity)515 void set_capacity( size_type new_capacity ) {
516 std::ptrdiff_t c = new_capacity < 0 ? infinite_capacity : new_capacity;
517 my_capacity = c;
518 }
519
capacity()520 size_type capacity() const {
521 return my_capacity;
522 }
523
524 // Equivalent to size()==0.
empty()525 __TBB_nodiscard bool empty() const {
526 return my_queue_representation->empty();
527 }
528
529 // Clear the queue. not thread-safe.
clear()530 void clear() {
531 my_queue_representation->clear(my_allocator);
532 }
533
534 // Return allocator object
get_allocator()535 allocator_type get_allocator() const { return my_allocator; }
536
537 //------------------------------------------------------------------------
538 // The iterators are intended only for debugging. They are slow and not thread safe.
539 //------------------------------------------------------------------------
540
unsafe_begin()541 iterator unsafe_begin() { return concurrent_queue_iterator_provider::get<iterator>(*this); }
unsafe_end()542 iterator unsafe_end() { return iterator(); }
unsafe_begin()543 const_iterator unsafe_begin() const { return concurrent_queue_iterator_provider::get<const_iterator>(*this); }
unsafe_end()544 const_iterator unsafe_end() const { return const_iterator(); }
unsafe_cbegin()545 const_iterator unsafe_cbegin() const { return concurrent_queue_iterator_provider::get<const_iterator>(*this); }
unsafe_cend()546 const_iterator unsafe_cend() const { return const_iterator(); }
547
548 private:
internal_swap(concurrent_bounded_queue & src)549 void internal_swap( concurrent_bounded_queue& src ) {
550 std::swap(my_queue_representation, src.my_queue_representation);
551 std::swap(my_monitors, src.my_monitors);
552 }
553
554 static constexpr std::ptrdiff_t infinite_capacity = std::ptrdiff_t(~size_type(0) / 2);
555
556 template <typename... Args>
internal_push(Args &&...args)557 void internal_push( Args&&... args ) {
558 unsigned old_abort_counter = my_abort_counter.load(std::memory_order_relaxed);
559 ticket_type ticket = my_queue_representation->tail_counter++;
560 std::ptrdiff_t target = ticket - my_capacity;
561
562 if (static_cast<std::ptrdiff_t>(my_queue_representation->head_counter.load(std::memory_order_relaxed)) <= target) { // queue is full
563 auto pred = [&] {
564 if (my_abort_counter.load(std::memory_order_relaxed) != old_abort_counter) {
565 throw_exception(exception_id::user_abort);
566 }
567
568 return static_cast<std::ptrdiff_t>(my_queue_representation->head_counter.load(std::memory_order_relaxed)) <= target;
569 };
570
571 try_call( [&] {
572 internal_wait(my_monitors, cbq_slots_avail_tag, target, pred);
573 }).on_exception( [&] {
574 my_queue_representation->choose(ticket).abort_push(ticket, *my_queue_representation, my_allocator);
575 });
576
577 }
578 __TBB_ASSERT((static_cast<std::ptrdiff_t>(my_queue_representation->head_counter.load(std::memory_order_relaxed)) > target), nullptr);
579 my_queue_representation->choose(ticket).push(ticket, *my_queue_representation, my_allocator, std::forward<Args>(args)...);
580 r1::notify_bounded_queue_monitor(my_monitors, cbq_items_avail_tag, ticket);
581 }
582
583 template <typename... Args>
internal_push_if_not_full(Args &&...args)584 bool internal_push_if_not_full( Args&&... args ) {
585 ticket_type ticket = my_queue_representation->tail_counter.load(std::memory_order_relaxed);
586 do {
587 if (static_cast<std::ptrdiff_t>(ticket - my_queue_representation->head_counter.load(std::memory_order_relaxed)) >= my_capacity) {
588 // Queue is full
589 return false;
590 }
591 // Queue had empty slot with ticket k when we looked. Attempt to claim that slot.
592 // Another thread claimed the slot, so retry.
593 } while (!my_queue_representation->tail_counter.compare_exchange_strong(ticket, ticket + 1));
594
595 my_queue_representation->choose(ticket).push(ticket, *my_queue_representation, my_allocator, std::forward<Args>(args)...);
596 r1::notify_bounded_queue_monitor(my_monitors, cbq_items_avail_tag, ticket);
597 return true;
598 }
599
internal_pop(void * dst)600 void internal_pop( void* dst ) {
601 std::ptrdiff_t target;
602 // This loop is a single pop operation; abort_counter should not be re-read inside
603 unsigned old_abort_counter = my_abort_counter.load(std::memory_order_relaxed);
604
605 do {
606 target = my_queue_representation->head_counter++;
607 if (static_cast<std::ptrdiff_t>(my_queue_representation->tail_counter.load(std::memory_order_relaxed)) <= target) {
608 auto pred = [&] {
609 if (my_abort_counter.load(std::memory_order_relaxed) != old_abort_counter) {
610 throw_exception(exception_id::user_abort);
611 }
612
613 return static_cast<std::ptrdiff_t>(my_queue_representation->tail_counter.load(std::memory_order_relaxed)) <= target;
614 };
615
616 try_call( [&] {
617 internal_wait(my_monitors, cbq_items_avail_tag, target, pred);
618 }).on_exception( [&] {
619 my_queue_representation->head_counter--;
620 });
621 }
622 __TBB_ASSERT(static_cast<std::ptrdiff_t>(my_queue_representation->tail_counter.load(std::memory_order_relaxed)) > target, nullptr);
623 } while (!my_queue_representation->choose(target).pop(dst, target, *my_queue_representation, my_allocator));
624
625 r1::notify_bounded_queue_monitor(my_monitors, cbq_slots_avail_tag, target);
626 }
627
internal_pop_if_present(void * dst)628 bool internal_pop_if_present( void* dst ) {
629 bool present{};
630 ticket_type ticket{};
631 std::tie(present, ticket) = internal_try_pop_impl(dst, *my_queue_representation, my_allocator);
632
633 if (present) {
634 r1::notify_bounded_queue_monitor(my_monitors, cbq_slots_avail_tag, ticket);
635 }
636 return present;
637 }
638
internal_abort()639 void internal_abort() {
640 ++my_abort_counter;
641 r1::abort_bounded_queue_monitors(my_monitors);
642 }
643
copy_construct_item(T * location,const void * src)644 static void copy_construct_item(T* location, const void* src) {
645 // TODO: use allocator_traits for copy construction
646 new (location) value_type(*static_cast<const value_type*>(src));
647 }
648
move_construct_item(T * location,const void * src)649 static void move_construct_item(T* location, const void* src) {
650 // TODO: use allocator_traits for move construction
651 new (location) value_type(std::move(*static_cast<value_type*>(const_cast<void*>(src))));
652 }
653
654 template <typename Container, typename Value, typename A>
655 friend class concurrent_queue_iterator;
656
657 queue_allocator_type my_allocator;
658 std::ptrdiff_t my_capacity;
659 std::atomic<unsigned> my_abort_counter;
660 queue_representation_type* my_queue_representation;
661
662 r1::concurrent_monitor* my_monitors;
663
swap(concurrent_bounded_queue & lhs,concurrent_bounded_queue & rhs)664 friend void swap( concurrent_bounded_queue& lhs, concurrent_bounded_queue& rhs ) {
665 lhs.swap(rhs);
666 }
667
668 friend bool operator==( const concurrent_bounded_queue& lhs, const concurrent_bounded_queue& rhs ) {
669 return lhs.size() == rhs.size() && std::equal(lhs.unsafe_begin(), lhs.unsafe_end(), rhs.unsafe_begin());
670 }
671
672 #if !__TBB_CPP20_COMPARISONS_PRESENT
673 friend bool operator!=( const concurrent_bounded_queue& lhs, const concurrent_bounded_queue& rhs ) {
674 return !(lhs == rhs);
675 }
676 #endif // __TBB_CPP20_COMPARISONS_PRESENT
677 }; // class concurrent_bounded_queue
678
679 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
680 // Deduction guide for the constructor from two iterators
681 template <typename It, typename Alloc = tbb::cache_aligned_allocator<iterator_value_t<It>>>
682 concurrent_bounded_queue( It, It, Alloc = Alloc() )
683 -> concurrent_bounded_queue<iterator_value_t<It>, Alloc>;
684
685 #endif /* __TBB_CPP17_DEDUCTION_GUIDES_PRESENT */
686
687 } //namespace d2
688 } // namespace detail
689
690 inline namespace v1 {
691
692 using detail::d2::concurrent_queue;
693 using detail::d2::concurrent_bounded_queue;
694 using detail::r1::user_abort;
695 using detail::r1::bad_last_alloc;
696
697 } // inline namespace v1
698 } // namespace tbb
699
700 #endif // __TBB_concurrent_queue_H
701