1 /*
2     Copyright (c) 2005-2023 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #ifndef __TBB_concurrent_queue_H
18 #define __TBB_concurrent_queue_H
19 
20 #include "detail/_namespace_injection.h"
21 #include "detail/_concurrent_queue_base.h"
22 #include "detail/_allocator_traits.h"
23 #include "detail/_exception.h"
24 #include "detail/_containers_helpers.h"
25 #include "cache_aligned_allocator.h"
26 
27 namespace tbb {
28 namespace detail {
29 namespace d2 {
30 
31 template <typename QueueRep, typename Allocator>
32 std::pair<bool, ticket_type> internal_try_pop_impl(void* dst, QueueRep& queue, Allocator& alloc ) {
33     ticket_type ticket{};
34     do {
35         // Basically, we need to read `head_counter` before `tail_counter`. To achieve it we build happens-before on `head_counter`
36         ticket = queue.head_counter.load(std::memory_order_acquire);
37         do {
38             if (static_cast<std::ptrdiff_t>(queue.tail_counter.load(std::memory_order_relaxed) - ticket) <= 0) { // queue is empty
39                 // Queue is empty
40                 return { false, ticket };
41             }
42             // Queue had item with ticket k when we looked.  Attempt to get that item.
43             // Another thread snatched the item, retry.
44         } while (!queue.head_counter.compare_exchange_strong(ticket, ticket + 1));
45     } while (!queue.choose(ticket).pop(dst, ticket, queue, alloc));
46     return { true, ticket };
47 }
48 
49 // A high-performance thread-safe non-blocking concurrent queue.
50 // Multiple threads may each push and pop concurrently.
51 // Assignment construction is not allowed.
52 template <typename T, typename Allocator = tbb::cache_aligned_allocator<T>>
53 class concurrent_queue {
54     using allocator_traits_type = tbb::detail::allocator_traits<Allocator>;
55     using queue_representation_type = concurrent_queue_rep<T, Allocator>;
56     using queue_allocator_type = typename allocator_traits_type::template rebind_alloc<queue_representation_type>;
57     using queue_allocator_traits = tbb::detail::allocator_traits<queue_allocator_type>;
58 public:
59     using size_type = std::size_t;
60     using value_type = T;
61     using reference = T&;
62     using const_reference = const T&;
63     using difference_type = std::ptrdiff_t;
64 
65     using allocator_type = Allocator;
66     using pointer = typename allocator_traits_type::pointer;
67     using const_pointer = typename allocator_traits_type::const_pointer;
68 
69     using iterator = concurrent_queue_iterator<concurrent_queue, T, Allocator>;
70     using const_iterator = concurrent_queue_iterator<concurrent_queue, const T, Allocator>;
71 
72     concurrent_queue() : concurrent_queue(allocator_type()) {}
73 
74     explicit concurrent_queue(const allocator_type& a) :
75         my_allocator(a), my_queue_representation(nullptr)
76     {
77         my_queue_representation = static_cast<queue_representation_type*>(r1::cache_aligned_allocate(sizeof(queue_representation_type)));
78         queue_allocator_traits::construct(my_allocator, my_queue_representation);
79 
80         __TBB_ASSERT(is_aligned(my_queue_representation, max_nfs_size), "alignment error" );
81         __TBB_ASSERT(is_aligned(&my_queue_representation->head_counter, max_nfs_size), "alignment error" );
82         __TBB_ASSERT(is_aligned(&my_queue_representation->tail_counter, max_nfs_size), "alignment error" );
83         __TBB_ASSERT(is_aligned(&my_queue_representation->array, max_nfs_size), "alignment error" );
84     }
85 
86     template <typename InputIterator>
87     concurrent_queue(InputIterator begin, InputIterator end, const allocator_type& a = allocator_type()) :
88         concurrent_queue(a)
89     {
90         for (; begin != end; ++begin)
91             push(*begin);
92     }
93 
94     concurrent_queue( std::initializer_list<value_type> init, const allocator_type& alloc = allocator_type() ) :
95         concurrent_queue(init.begin(), init.end(), alloc)
96     {}
97 
98     concurrent_queue(const concurrent_queue& src, const allocator_type& a) :
99         concurrent_queue(a)
100     {
101         my_queue_representation->assign(*src.my_queue_representation, my_allocator, copy_construct_item);
102     }
103 
104     concurrent_queue(const concurrent_queue& src) :
105         concurrent_queue(queue_allocator_traits::select_on_container_copy_construction(src.get_allocator()))
106     {
107         my_queue_representation->assign(*src.my_queue_representation, my_allocator, copy_construct_item);
108     }
109 
110     // Move constructors
111     concurrent_queue(concurrent_queue&& src) :
112         concurrent_queue(std::move(src.my_allocator))
113     {
114         internal_swap(src);
115     }
116 
117     concurrent_queue(concurrent_queue&& src, const allocator_type& a) :
118         concurrent_queue(a)
119     {
120         // checking that memory allocated by one instance of allocator can be deallocated
121         // with another
122         if (my_allocator == src.my_allocator) {
123             internal_swap(src);
124         } else {
125             // allocators are different => performing per-element move
126             my_queue_representation->assign(*src.my_queue_representation, my_allocator, move_construct_item);
127             src.clear();
128         }
129     }
130 
131     // Destroy queue
132     ~concurrent_queue() {
133         clear();
134         my_queue_representation->clear(my_allocator);
135         queue_allocator_traits::destroy(my_allocator, my_queue_representation);
136         r1::cache_aligned_deallocate(my_queue_representation);
137     }
138 
139     concurrent_queue& operator=( const concurrent_queue& other ) {
140         //TODO: implement support for std::allocator_traits::propagate_on_container_copy_assignment
141         if (my_queue_representation != other.my_queue_representation) {
142             clear();
143             my_allocator = other.my_allocator;
144             my_queue_representation->assign(*other.my_queue_representation, my_allocator, copy_construct_item);
145         }
146         return *this;
147     }
148 
149     concurrent_queue& operator=( concurrent_queue&& other ) {
150         //TODO: implement support for std::allocator_traits::propagate_on_container_move_assignment
151         if (my_queue_representation != other.my_queue_representation) {
152             clear();
153             if (my_allocator == other.my_allocator) {
154                 internal_swap(other);
155             } else {
156                 my_queue_representation->assign(*other.my_queue_representation, other.my_allocator, move_construct_item);
157                 other.clear();
158                 my_allocator = std::move(other.my_allocator);
159             }
160         }
161         return *this;
162     }
163 
164     concurrent_queue& operator=( std::initializer_list<value_type> init ) {
165         assign(init);
166         return *this;
167     }
168 
169     template <typename InputIterator>
170     void assign( InputIterator first, InputIterator last ) {
171         concurrent_queue src(first, last);
172         clear();
173         my_queue_representation->assign(*src.my_queue_representation, my_allocator, move_construct_item);
174     }
175 
176     void assign( std::initializer_list<value_type> init ) {
177         assign(init.begin(), init.end());
178     }
179 
180     void swap ( concurrent_queue& other ) {
181         //TODO: implement support for std::allocator_traits::propagate_on_container_swap
182         __TBB_ASSERT(my_allocator == other.my_allocator, "unequal allocators");
183         internal_swap(other);
184     }
185 
186     // Enqueue an item at tail of queue.
187     void push(const T& value) {
188         internal_push(value);
189     }
190 
191     void push(T&& value) {
192         internal_push(std::move(value));
193     }
194 
195     template <typename... Args>
196     void emplace( Args&&... args ) {
197         internal_push(std::forward<Args>(args)...);
198     }
199 
200     // Attempt to dequeue an item from head of queue.
201     /** Does not wait for item to become available.
202         Returns true if successful; false otherwise. */
203     bool try_pop( T& result ) {
204         return internal_try_pop(&result);
205     }
206 
207     // Return the number of items in the queue; thread unsafe
208     size_type unsafe_size() const {
209         std::ptrdiff_t size = my_queue_representation->size();
210         return size < 0 ? 0 :  size_type(size);
211     }
212 
213     // Equivalent to size()==0.
214     __TBB_nodiscard bool empty() const {
215         return my_queue_representation->empty();
216     }
217 
218     // Clear the queue. not thread-safe.
219     void clear() {
220         my_queue_representation->clear(my_allocator);
221     }
222 
223     // Return allocator object
224     allocator_type get_allocator() const { return my_allocator; }
225 
226     //------------------------------------------------------------------------
227     // The iterators are intended only for debugging.  They are slow and not thread safe.
228     //------------------------------------------------------------------------
229 
230     iterator unsafe_begin() { return concurrent_queue_iterator_provider::get<iterator>(*this); }
231     iterator unsafe_end() { return iterator(); }
232     const_iterator unsafe_begin() const { return concurrent_queue_iterator_provider::get<const_iterator>(*this); }
233     const_iterator unsafe_end() const { return const_iterator(); }
234     const_iterator unsafe_cbegin() const { return concurrent_queue_iterator_provider::get<const_iterator>(*this); }
235     const_iterator unsafe_cend() const { return const_iterator(); }
236 
237 private:
238     void internal_swap(concurrent_queue& src) {
239         using std::swap;
240         swap(my_queue_representation, src.my_queue_representation);
241     }
242 
243     template <typename... Args>
244     void internal_push( Args&&... args ) {
245         ticket_type k = my_queue_representation->tail_counter++;
246         my_queue_representation->choose(k).push(k, *my_queue_representation, my_allocator, std::forward<Args>(args)...);
247     }
248 
249     bool internal_try_pop( void* dst ) {
250         return internal_try_pop_impl(dst, *my_queue_representation, my_allocator).first;
251     }
252 
253     template <typename Container, typename Value, typename A>
254     friend class concurrent_queue_iterator;
255 
256     static void copy_construct_item(T* location, const void* src) {
257         // TODO: use allocator_traits for copy construction
258         new (location) value_type(*static_cast<const value_type*>(src));
259         // queue_allocator_traits::construct(my_allocator, location, *static_cast<const T*>(src));
260     }
261 
262     static void move_construct_item(T* location, const void* src) {
263         // TODO: use allocator_traits for move construction
264         new (location) value_type(std::move(*static_cast<value_type*>(const_cast<void*>(src))));
265     }
266 
267     queue_allocator_type my_allocator;
268     queue_representation_type* my_queue_representation;
269 
270     friend void swap( concurrent_queue& lhs, concurrent_queue& rhs ) {
271         lhs.swap(rhs);
272     }
273 
274     friend bool operator==( const concurrent_queue& lhs, const concurrent_queue& rhs ) {
275         return lhs.unsafe_size() == rhs.unsafe_size() && std::equal(lhs.unsafe_begin(), lhs.unsafe_end(), rhs.unsafe_begin());
276     }
277 
278 #if !__TBB_CPP20_COMPARISONS_PRESENT
279     friend bool operator!=( const concurrent_queue& lhs,  const concurrent_queue& rhs ) {
280         return !(lhs == rhs);
281     }
282 #endif // __TBB_CPP20_COMPARISONS_PRESENT
283 }; // class concurrent_queue
284 
285 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
286 // Deduction guide for the constructor from two iterators
287 template <typename It, typename Alloc = tbb::cache_aligned_allocator<iterator_value_t<It>>,
288           typename = std::enable_if_t<is_input_iterator_v<It>>,
289           typename = std::enable_if_t<is_allocator_v<Alloc>>>
290 concurrent_queue( It, It, Alloc = Alloc() )
291 -> concurrent_queue<iterator_value_t<It>, Alloc>;
292 
293 #endif /* __TBB_CPP17_DEDUCTION_GUIDES_PRESENT */
294 
295 class concurrent_monitor;
296 
297 // The concurrent monitor tags for concurrent_bounded_queue.
298 static constexpr std::size_t cbq_slots_avail_tag = 0;
299 static constexpr std::size_t cbq_items_avail_tag = 1;
300 } // namespace d2
301 
302 
303 namespace r1 {
304     class concurrent_monitor;
305 
306     TBB_EXPORT std::uint8_t* __TBB_EXPORTED_FUNC allocate_bounded_queue_rep( std::size_t queue_rep_size );
307     TBB_EXPORT void __TBB_EXPORTED_FUNC deallocate_bounded_queue_rep( std::uint8_t* mem, std::size_t queue_rep_size );
308     TBB_EXPORT void __TBB_EXPORTED_FUNC abort_bounded_queue_monitors( concurrent_monitor* monitors );
309     TBB_EXPORT void __TBB_EXPORTED_FUNC notify_bounded_queue_monitor( concurrent_monitor* monitors, std::size_t monitor_tag
310                                                             , std::size_t ticket );
311     TBB_EXPORT void __TBB_EXPORTED_FUNC wait_bounded_queue_monitor( concurrent_monitor* monitors, std::size_t monitor_tag,
312                                                             std::ptrdiff_t target, d1::delegate_base& predicate );
313 } // namespace r1
314 
315 
316 namespace d2 {
317 // A high-performance thread-safe blocking concurrent bounded queue.
318 // Supports boundedness and blocking semantics.
319 // Multiple threads may each push and pop concurrently.
320 // Assignment construction is not allowed.
321 template <typename T, typename Allocator = tbb::cache_aligned_allocator<T>>
322 class concurrent_bounded_queue {
323     using allocator_traits_type = tbb::detail::allocator_traits<Allocator>;
324     using queue_representation_type = concurrent_queue_rep<T, Allocator>;
325     using queue_allocator_type = typename allocator_traits_type::template rebind_alloc<queue_representation_type>;
326     using queue_allocator_traits = tbb::detail::allocator_traits<queue_allocator_type>;
327 
328     template <typename FuncType>
329     void internal_wait(r1::concurrent_monitor* monitors, std::size_t monitor_tag, std::ptrdiff_t target, FuncType pred) {
330         d1::delegated_function<FuncType> func(pred);
331         r1::wait_bounded_queue_monitor(monitors, monitor_tag, target, func);
332     }
333 public:
334     using size_type = std::ptrdiff_t;
335     using value_type = T;
336     using reference = T&;
337     using const_reference = const T&;
338     using difference_type = std::ptrdiff_t;
339 
340     using allocator_type = Allocator;
341     using pointer = typename allocator_traits_type::pointer;
342     using const_pointer = typename allocator_traits_type::const_pointer;
343 
344     using iterator = concurrent_queue_iterator<concurrent_bounded_queue, T, Allocator>;
345     using const_iterator = concurrent_queue_iterator<concurrent_bounded_queue, const T, Allocator> ;
346 
347     concurrent_bounded_queue() : concurrent_bounded_queue(allocator_type()) {}
348 
349     explicit concurrent_bounded_queue( const allocator_type& a ) :
350         my_allocator(a), my_capacity(0), my_abort_counter(0), my_queue_representation(nullptr)
351     {
352         my_queue_representation = reinterpret_cast<queue_representation_type*>(
353             r1::allocate_bounded_queue_rep(sizeof(queue_representation_type)));
354         my_monitors = reinterpret_cast<r1::concurrent_monitor*>(my_queue_representation + 1);
355         queue_allocator_traits::construct(my_allocator, my_queue_representation);
356         my_capacity = std::size_t(-1) / (queue_representation_type::item_size > 1 ? queue_representation_type::item_size : 2);
357 
358         __TBB_ASSERT(is_aligned(my_queue_representation, max_nfs_size), "alignment error" );
359         __TBB_ASSERT(is_aligned(&my_queue_representation->head_counter, max_nfs_size), "alignment error" );
360         __TBB_ASSERT(is_aligned(&my_queue_representation->tail_counter, max_nfs_size), "alignment error" );
361         __TBB_ASSERT(is_aligned(&my_queue_representation->array, max_nfs_size), "alignment error" );
362     }
363 
364     template <typename InputIterator>
365     concurrent_bounded_queue( InputIterator begin, InputIterator end, const allocator_type& a = allocator_type() ) :
366         concurrent_bounded_queue(a)
367     {
368         for (; begin != end; ++begin)
369             push(*begin);
370     }
371 
372     concurrent_bounded_queue( std::initializer_list<value_type> init, const allocator_type& alloc = allocator_type() ):
373         concurrent_bounded_queue(init.begin(), init.end(), alloc)
374     {}
375 
376     concurrent_bounded_queue( const concurrent_bounded_queue& src, const allocator_type& a ) :
377         concurrent_bounded_queue(a)
378     {
379         my_queue_representation->assign(*src.my_queue_representation, my_allocator, copy_construct_item);
380     }
381 
382     concurrent_bounded_queue( const concurrent_bounded_queue& src ) :
383         concurrent_bounded_queue(queue_allocator_traits::select_on_container_copy_construction(src.get_allocator()))
384     {
385         my_queue_representation->assign(*src.my_queue_representation, my_allocator, copy_construct_item);
386     }
387 
388     // Move constructors
389     concurrent_bounded_queue( concurrent_bounded_queue&& src ) :
390         concurrent_bounded_queue(std::move(src.my_allocator))
391     {
392         internal_swap(src);
393     }
394 
395     concurrent_bounded_queue( concurrent_bounded_queue&& src, const allocator_type& a ) :
396         concurrent_bounded_queue(a)
397     {
398         // checking that memory allocated by one instance of allocator can be deallocated
399         // with another
400         if (my_allocator == src.my_allocator) {
401             internal_swap(src);
402         } else {
403             // allocators are different => performing per-element move
404             my_queue_representation->assign(*src.my_queue_representation, my_allocator, move_construct_item);
405             src.clear();
406         }
407     }
408 
409     // Destroy queue
410     ~concurrent_bounded_queue() {
411         clear();
412         my_queue_representation->clear(my_allocator);
413         queue_allocator_traits::destroy(my_allocator, my_queue_representation);
414         r1::deallocate_bounded_queue_rep(reinterpret_cast<std::uint8_t*>(my_queue_representation),
415                                          sizeof(queue_representation_type));
416     }
417 
418     concurrent_bounded_queue& operator=( const concurrent_bounded_queue& other ) {
419         //TODO: implement support for std::allocator_traits::propagate_on_container_copy_assignment
420         if (my_queue_representation != other.my_queue_representation) {
421             clear();
422             my_allocator = other.my_allocator;
423             my_queue_representation->assign(*other.my_queue_representation, my_allocator, copy_construct_item);
424         }
425         return *this;
426     }
427 
428     concurrent_bounded_queue& operator=( concurrent_bounded_queue&& other ) {
429         //TODO: implement support for std::allocator_traits::propagate_on_container_move_assignment
430         if (my_queue_representation != other.my_queue_representation) {
431             clear();
432             if (my_allocator == other.my_allocator) {
433                 internal_swap(other);
434             } else {
435                 my_queue_representation->assign(*other.my_queue_representation, other.my_allocator, move_construct_item);
436                 other.clear();
437                 my_allocator = std::move(other.my_allocator);
438             }
439         }
440         return *this;
441     }
442 
443     concurrent_bounded_queue& operator=( std::initializer_list<value_type> init ) {
444         assign(init);
445         return *this;
446     }
447 
448     template <typename InputIterator>
449     void assign( InputIterator first, InputIterator last ) {
450         concurrent_bounded_queue src(first, last);
451         clear();
452         my_queue_representation->assign(*src.my_queue_representation, my_allocator, move_construct_item);
453     }
454 
455     void assign( std::initializer_list<value_type> init ) {
456         assign(init.begin(), init.end());
457     }
458 
459     void swap ( concurrent_bounded_queue& other ) {
460         //TODO: implement support for std::allocator_traits::propagate_on_container_swap
461         __TBB_ASSERT(my_allocator == other.my_allocator, "unequal allocators");
462         internal_swap(other);
463     }
464 
465     // Enqueue an item at tail of queue.
466     void push( const T& value ) {
467         internal_push(value);
468     }
469 
470     void push( T&& value ) {
471         internal_push(std::move(value));
472     }
473 
474     // Enqueue an item at tail of queue if queue is not already full.
475     // Does not wait for queue to become not full.
476     // Returns true if item is pushed; false if queue was already full.
477     bool try_push( const T& value ) {
478         return internal_push_if_not_full(value);
479     }
480 
481     bool try_push( T&& value ) {
482         return internal_push_if_not_full(std::move(value));
483     }
484 
485     template <typename... Args>
486     void emplace( Args&&... args ) {
487         internal_push(std::forward<Args>(args)...);
488     }
489 
490     template <typename... Args>
491     bool try_emplace( Args&&... args ) {
492         return internal_push_if_not_full(std::forward<Args>(args)...);
493     }
494 
495     // Attempt to dequeue an item from head of queue.
496     void pop( T& result ) {
497         internal_pop(&result);
498     }
499 
500     /** Does not wait for item to become available.
501         Returns true if successful; false otherwise. */
502     bool try_pop( T& result ) {
503         return internal_pop_if_present(&result);
504     }
505 
506     void abort() {
507         internal_abort();
508     }
509 
510     // Return the number of items in the queue; thread unsafe
511     std::ptrdiff_t size() const {
512         return my_queue_representation->size();
513     }
514 
515     void set_capacity( size_type new_capacity ) {
516         std::ptrdiff_t c = new_capacity < 0 ? infinite_capacity : new_capacity;
517         my_capacity = c;
518     }
519 
520     size_type capacity() const {
521         return my_capacity;
522     }
523 
524     // Equivalent to size()==0.
525     __TBB_nodiscard bool empty() const {
526         return my_queue_representation->empty();
527     }
528 
529     // Clear the queue. not thread-safe.
530     void clear() {
531         my_queue_representation->clear(my_allocator);
532     }
533 
534     // Return allocator object
535     allocator_type get_allocator() const { return my_allocator; }
536 
537     //------------------------------------------------------------------------
538     // The iterators are intended only for debugging.  They are slow and not thread safe.
539     //------------------------------------------------------------------------
540 
541     iterator unsafe_begin() { return concurrent_queue_iterator_provider::get<iterator>(*this); }
542     iterator unsafe_end() { return iterator(); }
543     const_iterator unsafe_begin() const { return concurrent_queue_iterator_provider::get<const_iterator>(*this); }
544     const_iterator unsafe_end() const { return const_iterator(); }
545     const_iterator unsafe_cbegin() const { return concurrent_queue_iterator_provider::get<const_iterator>(*this); }
546     const_iterator unsafe_cend() const { return const_iterator(); }
547 
548 private:
549     void internal_swap( concurrent_bounded_queue& src ) {
550         std::swap(my_queue_representation, src.my_queue_representation);
551         std::swap(my_monitors, src.my_monitors);
552     }
553 
554     static constexpr std::ptrdiff_t infinite_capacity = std::ptrdiff_t(~size_type(0) / 2);
555 
556     template <typename... Args>
557     void internal_push( Args&&... args ) {
558         unsigned old_abort_counter = my_abort_counter.load(std::memory_order_relaxed);
559         ticket_type ticket = my_queue_representation->tail_counter++;
560         std::ptrdiff_t target = ticket - my_capacity;
561 
562         if (static_cast<std::ptrdiff_t>(my_queue_representation->head_counter.load(std::memory_order_relaxed)) <= target) { // queue is full
563             auto pred = [&] {
564                 if (my_abort_counter.load(std::memory_order_relaxed) != old_abort_counter) {
565                     throw_exception(exception_id::user_abort);
566                 }
567 
568                 return static_cast<std::ptrdiff_t>(my_queue_representation->head_counter.load(std::memory_order_relaxed)) <= target;
569             };
570 
571             try_call( [&] {
572                 internal_wait(my_monitors, cbq_slots_avail_tag, target, pred);
573             }).on_exception( [&] {
574                 my_queue_representation->choose(ticket).abort_push(ticket, *my_queue_representation, my_allocator);
575             });
576 
577         }
578         __TBB_ASSERT((static_cast<std::ptrdiff_t>(my_queue_representation->head_counter.load(std::memory_order_relaxed)) > target), nullptr);
579         my_queue_representation->choose(ticket).push(ticket, *my_queue_representation, my_allocator, std::forward<Args>(args)...);
580         r1::notify_bounded_queue_monitor(my_monitors, cbq_items_avail_tag, ticket);
581     }
582 
583     template <typename... Args>
584     bool internal_push_if_not_full( Args&&... args ) {
585         ticket_type ticket = my_queue_representation->tail_counter.load(std::memory_order_relaxed);
586         do {
587             if (static_cast<std::ptrdiff_t>(ticket - my_queue_representation->head_counter.load(std::memory_order_relaxed)) >= my_capacity) {
588                 // Queue is full
589                 return false;
590             }
591             // Queue had empty slot with ticket k when we looked. Attempt to claim that slot.
592             // Another thread claimed the slot, so retry.
593         } while (!my_queue_representation->tail_counter.compare_exchange_strong(ticket, ticket + 1));
594 
595         my_queue_representation->choose(ticket).push(ticket, *my_queue_representation, my_allocator, std::forward<Args>(args)...);
596         r1::notify_bounded_queue_monitor(my_monitors, cbq_items_avail_tag, ticket);
597         return true;
598     }
599 
600     void internal_pop( void* dst ) {
601         std::ptrdiff_t target;
602         // This loop is a single pop operation; abort_counter should not be re-read inside
603         unsigned old_abort_counter = my_abort_counter.load(std::memory_order_relaxed);
604 
605         do {
606             target = my_queue_representation->head_counter++;
607             if (static_cast<std::ptrdiff_t>(my_queue_representation->tail_counter.load(std::memory_order_relaxed)) <= target) {
608                 auto pred = [&] {
609                     if (my_abort_counter.load(std::memory_order_relaxed) != old_abort_counter) {
610                             throw_exception(exception_id::user_abort);
611                     }
612 
613                     return static_cast<std::ptrdiff_t>(my_queue_representation->tail_counter.load(std::memory_order_relaxed)) <= target;
614                 };
615 
616                 try_call( [&] {
617                     internal_wait(my_monitors, cbq_items_avail_tag, target, pred);
618                 }).on_exception( [&] {
619                     my_queue_representation->head_counter--;
620                 });
621             }
622             __TBB_ASSERT(static_cast<std::ptrdiff_t>(my_queue_representation->tail_counter.load(std::memory_order_relaxed)) > target, nullptr);
623         } while (!my_queue_representation->choose(target).pop(dst, target, *my_queue_representation, my_allocator));
624 
625         r1::notify_bounded_queue_monitor(my_monitors, cbq_slots_avail_tag, target);
626     }
627 
628     bool internal_pop_if_present( void* dst ) {
629         bool present{};
630         ticket_type ticket{};
631         std::tie(present, ticket) = internal_try_pop_impl(dst, *my_queue_representation, my_allocator);
632 
633         if (present) {
634             r1::notify_bounded_queue_monitor(my_monitors, cbq_slots_avail_tag, ticket);
635         }
636         return present;
637     }
638 
639     void internal_abort() {
640         ++my_abort_counter;
641         r1::abort_bounded_queue_monitors(my_monitors);
642     }
643 
644     static void copy_construct_item(T* location, const void* src) {
645         // TODO: use allocator_traits for copy construction
646         new (location) value_type(*static_cast<const value_type*>(src));
647     }
648 
649     static void move_construct_item(T* location, const void* src) {
650         // TODO: use allocator_traits for move construction
651         new (location) value_type(std::move(*static_cast<value_type*>(const_cast<void*>(src))));
652     }
653 
654     template <typename Container, typename Value, typename A>
655     friend class concurrent_queue_iterator;
656 
657     queue_allocator_type my_allocator;
658     std::ptrdiff_t my_capacity;
659     std::atomic<unsigned> my_abort_counter;
660     queue_representation_type* my_queue_representation;
661 
662     r1::concurrent_monitor* my_monitors;
663 
664     friend void swap( concurrent_bounded_queue& lhs, concurrent_bounded_queue& rhs ) {
665         lhs.swap(rhs);
666     }
667 
668     friend bool operator==( const concurrent_bounded_queue& lhs, const concurrent_bounded_queue& rhs ) {
669         return lhs.size() == rhs.size() && std::equal(lhs.unsafe_begin(), lhs.unsafe_end(), rhs.unsafe_begin());
670     }
671 
672 #if !__TBB_CPP20_COMPARISONS_PRESENT
673     friend bool operator!=( const concurrent_bounded_queue& lhs, const concurrent_bounded_queue& rhs ) {
674         return !(lhs == rhs);
675     }
676 #endif // __TBB_CPP20_COMPARISONS_PRESENT
677 }; // class concurrent_bounded_queue
678 
679 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
680 // Deduction guide for the constructor from two iterators
681 template <typename It, typename Alloc = tbb::cache_aligned_allocator<iterator_value_t<It>>>
682 concurrent_bounded_queue( It, It, Alloc = Alloc() )
683 -> concurrent_bounded_queue<iterator_value_t<It>, Alloc>;
684 
685 #endif /* __TBB_CPP17_DEDUCTION_GUIDES_PRESENT */
686 
687 } //namespace d2
688 } // namesapce detail
689 
690 inline namespace v1 {
691 
692 using detail::d2::concurrent_queue;
693 using detail::d2::concurrent_bounded_queue;
694 using detail::r1::user_abort;
695 using detail::r1::bad_last_alloc;
696 
697 } // inline namespace v1
698 } // namespace tbb
699 
700 #endif // __TBB_concurrent_queue_H
701