149e08aacStbbdev /*
2155acce7SJhaShweta1     Copyright (c) 2005-2023 Intel Corporation
349e08aacStbbdev 
449e08aacStbbdev     Licensed under the Apache License, Version 2.0 (the "License");
549e08aacStbbdev     you may not use this file except in compliance with the License.
649e08aacStbbdev     You may obtain a copy of the License at
749e08aacStbbdev 
849e08aacStbbdev         http://www.apache.org/licenses/LICENSE-2.0
949e08aacStbbdev 
1049e08aacStbbdev     Unless required by applicable law or agreed to in writing, software
1149e08aacStbbdev     distributed under the License is distributed on an "AS IS" BASIS,
1249e08aacStbbdev     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1349e08aacStbbdev     See the License for the specific language governing permissions and
1449e08aacStbbdev     limitations under the License.
1549e08aacStbbdev */
1649e08aacStbbdev 
1749e08aacStbbdev #ifndef __TBB_concurrent_queue_H
1849e08aacStbbdev #define __TBB_concurrent_queue_H
1949e08aacStbbdev 
2049e08aacStbbdev #include "detail/_namespace_injection.h"
2149e08aacStbbdev #include "detail/_concurrent_queue_base.h"
2249e08aacStbbdev #include "detail/_allocator_traits.h"
2349e08aacStbbdev #include "detail/_exception.h"
24d86ed7fbStbbdev #include "detail/_containers_helpers.h"
2549e08aacStbbdev #include "cache_aligned_allocator.h"
2649e08aacStbbdev 
2749e08aacStbbdev namespace tbb {
2849e08aacStbbdev namespace detail {
29fbc48b39Svlserov namespace d2 {
3049e08aacStbbdev 
31eac94650SAlex template <typename QueueRep, typename Allocator>
internal_try_pop_impl(void * dst,QueueRep & queue,Allocator & alloc)32eac94650SAlex std::pair<bool, ticket_type> internal_try_pop_impl(void* dst, QueueRep& queue, Allocator& alloc ) {
33eac94650SAlex     ticket_type ticket{};
34eac94650SAlex     do {
35eac94650SAlex         // Basically, we need to read `head_counter` before `tail_counter`. To achieve it we build happens-before on `head_counter`
36eac94650SAlex         ticket = queue.head_counter.load(std::memory_order_acquire);
37eac94650SAlex         do {
38eac94650SAlex             if (static_cast<std::ptrdiff_t>(queue.tail_counter.load(std::memory_order_relaxed) - ticket) <= 0) { // queue is empty
39eac94650SAlex                 // Queue is empty
40eac94650SAlex                 return { false, ticket };
41eac94650SAlex             }
42eac94650SAlex             // Queue had item with ticket k when we looked.  Attempt to get that item.
43eac94650SAlex             // Another thread snatched the item, retry.
44eac94650SAlex         } while (!queue.head_counter.compare_exchange_strong(ticket, ticket + 1));
45eac94650SAlex     } while (!queue.choose(ticket).pop(dst, ticket, queue, alloc));
46eac94650SAlex     return { true, ticket };
47eac94650SAlex }
48eac94650SAlex 
4949e08aacStbbdev // A high-performance thread-safe non-blocking concurrent queue.
5049e08aacStbbdev // Multiple threads may each push and pop concurrently.
5149e08aacStbbdev // Assignment construction is not allowed.
5249e08aacStbbdev template <typename T, typename Allocator = tbb::cache_aligned_allocator<T>>
5349e08aacStbbdev class concurrent_queue {
5449e08aacStbbdev     using allocator_traits_type = tbb::detail::allocator_traits<Allocator>;
5549e08aacStbbdev     using queue_representation_type = concurrent_queue_rep<T, Allocator>;
5649e08aacStbbdev     using queue_allocator_type = typename allocator_traits_type::template rebind_alloc<queue_representation_type>;
5749e08aacStbbdev     using queue_allocator_traits = tbb::detail::allocator_traits<queue_allocator_type>;
5849e08aacStbbdev public:
5949e08aacStbbdev     using size_type = std::size_t;
6049e08aacStbbdev     using value_type = T;
6149e08aacStbbdev     using reference = T&;
6249e08aacStbbdev     using const_reference = const T&;
6349e08aacStbbdev     using difference_type = std::ptrdiff_t;
6449e08aacStbbdev 
6549e08aacStbbdev     using allocator_type = Allocator;
6649e08aacStbbdev     using pointer = typename allocator_traits_type::pointer;
6749e08aacStbbdev     using const_pointer = typename allocator_traits_type::const_pointer;
6849e08aacStbbdev 
6949e08aacStbbdev     using iterator = concurrent_queue_iterator<concurrent_queue, T, Allocator>;
7049e08aacStbbdev     using const_iterator = concurrent_queue_iterator<concurrent_queue, const T, Allocator>;
7149e08aacStbbdev 
concurrent_queue()7249e08aacStbbdev     concurrent_queue() : concurrent_queue(allocator_type()) {}
7349e08aacStbbdev 
concurrent_queue(const allocator_type & a)7449e08aacStbbdev     explicit concurrent_queue(const allocator_type& a) :
7549e08aacStbbdev         my_allocator(a), my_queue_representation(nullptr)
7649e08aacStbbdev     {
7749e08aacStbbdev         my_queue_representation = static_cast<queue_representation_type*>(r1::cache_aligned_allocate(sizeof(queue_representation_type)));
78fbc48b39Svlserov         queue_allocator_traits::construct(my_allocator, my_queue_representation);
7949e08aacStbbdev 
8049e08aacStbbdev         __TBB_ASSERT(is_aligned(my_queue_representation, max_nfs_size), "alignment error" );
8149e08aacStbbdev         __TBB_ASSERT(is_aligned(&my_queue_representation->head_counter, max_nfs_size), "alignment error" );
8249e08aacStbbdev         __TBB_ASSERT(is_aligned(&my_queue_representation->tail_counter, max_nfs_size), "alignment error" );
8349e08aacStbbdev         __TBB_ASSERT(is_aligned(&my_queue_representation->array, max_nfs_size), "alignment error" );
8449e08aacStbbdev     }
8549e08aacStbbdev 
8649e08aacStbbdev     template <typename InputIterator>
8749e08aacStbbdev     concurrent_queue(InputIterator begin, InputIterator end, const allocator_type& a = allocator_type()) :
concurrent_queue(a)8849e08aacStbbdev         concurrent_queue(a)
8949e08aacStbbdev     {
9049e08aacStbbdev         for (; begin != end; ++begin)
9149e08aacStbbdev             push(*begin);
9249e08aacStbbdev     }
9349e08aacStbbdev 
94155acce7SJhaShweta1     concurrent_queue( std::initializer_list<value_type> init, const allocator_type& alloc = allocator_type() ) :
95155acce7SJhaShweta1         concurrent_queue(init.begin(), init.end(), alloc)
96155acce7SJhaShweta1     {}
97155acce7SJhaShweta1 
concurrent_queue(const concurrent_queue & src,const allocator_type & a)9849e08aacStbbdev     concurrent_queue(const concurrent_queue& src, const allocator_type& a) :
9949e08aacStbbdev         concurrent_queue(a)
10049e08aacStbbdev     {
101fbc48b39Svlserov         my_queue_representation->assign(*src.my_queue_representation, my_allocator, copy_construct_item);
10249e08aacStbbdev     }
10349e08aacStbbdev 
concurrent_queue(const concurrent_queue & src)10449e08aacStbbdev     concurrent_queue(const concurrent_queue& src) :
10549e08aacStbbdev         concurrent_queue(queue_allocator_traits::select_on_container_copy_construction(src.get_allocator()))
10649e08aacStbbdev     {
107fbc48b39Svlserov         my_queue_representation->assign(*src.my_queue_representation, my_allocator, copy_construct_item);
10849e08aacStbbdev     }
10949e08aacStbbdev 
11049e08aacStbbdev     // Move constructors
concurrent_queue(concurrent_queue && src)11149e08aacStbbdev     concurrent_queue(concurrent_queue&& src) :
11249e08aacStbbdev         concurrent_queue(std::move(src.my_allocator))
11349e08aacStbbdev     {
11449e08aacStbbdev         internal_swap(src);
11549e08aacStbbdev     }
11649e08aacStbbdev 
concurrent_queue(concurrent_queue && src,const allocator_type & a)11749e08aacStbbdev     concurrent_queue(concurrent_queue&& src, const allocator_type& a) :
11849e08aacStbbdev         concurrent_queue(a)
11949e08aacStbbdev     {
12049e08aacStbbdev         // checking that memory allocated by one instance of allocator can be deallocated
12149e08aacStbbdev         // with another
12249e08aacStbbdev         if (my_allocator == src.my_allocator) {
12349e08aacStbbdev             internal_swap(src);
12449e08aacStbbdev         } else {
12549e08aacStbbdev             // allocators are different => performing per-element move
126fbc48b39Svlserov             my_queue_representation->assign(*src.my_queue_representation, my_allocator, move_construct_item);
12749e08aacStbbdev             src.clear();
12849e08aacStbbdev         }
12949e08aacStbbdev     }
13049e08aacStbbdev 
13149e08aacStbbdev     // Destroy queue
~concurrent_queue()13249e08aacStbbdev     ~concurrent_queue() {
13349e08aacStbbdev         clear();
134fbc48b39Svlserov         my_queue_representation->clear(my_allocator);
13549e08aacStbbdev         queue_allocator_traits::destroy(my_allocator, my_queue_representation);
13649e08aacStbbdev         r1::cache_aligned_deallocate(my_queue_representation);
13749e08aacStbbdev     }
13849e08aacStbbdev 
139155acce7SJhaShweta1     concurrent_queue& operator=( const concurrent_queue& other ) {
140155acce7SJhaShweta1         //TODO: implement support for std::allocator_traits::propagate_on_container_copy_assignment
141155acce7SJhaShweta1         if (my_queue_representation != other.my_queue_representation) {
142155acce7SJhaShweta1             clear();
143155acce7SJhaShweta1             my_allocator = other.my_allocator;
144155acce7SJhaShweta1             my_queue_representation->assign(*other.my_queue_representation, my_allocator, copy_construct_item);
145155acce7SJhaShweta1         }
146155acce7SJhaShweta1         return *this;
147155acce7SJhaShweta1     }
148155acce7SJhaShweta1 
149155acce7SJhaShweta1     concurrent_queue& operator=( concurrent_queue&& other ) {
150155acce7SJhaShweta1         //TODO: implement support for std::allocator_traits::propagate_on_container_move_assignment
151155acce7SJhaShweta1         if (my_queue_representation != other.my_queue_representation) {
152155acce7SJhaShweta1             clear();
153155acce7SJhaShweta1             if (my_allocator == other.my_allocator) {
154155acce7SJhaShweta1                 internal_swap(other);
155155acce7SJhaShweta1             } else {
156155acce7SJhaShweta1                 my_queue_representation->assign(*other.my_queue_representation, other.my_allocator, move_construct_item);
157155acce7SJhaShweta1                 other.clear();
158155acce7SJhaShweta1                 my_allocator = std::move(other.my_allocator);
159155acce7SJhaShweta1             }
160155acce7SJhaShweta1         }
161155acce7SJhaShweta1         return *this;
162155acce7SJhaShweta1     }
163155acce7SJhaShweta1 
164155acce7SJhaShweta1     concurrent_queue& operator=( std::initializer_list<value_type> init ) {
165155acce7SJhaShweta1         assign(init);
166155acce7SJhaShweta1         return *this;
167155acce7SJhaShweta1     }
168155acce7SJhaShweta1 
169155acce7SJhaShweta1     template <typename InputIterator>
assign(InputIterator first,InputIterator last)170155acce7SJhaShweta1     void assign( InputIterator first, InputIterator last ) {
171155acce7SJhaShweta1         concurrent_queue src(first, last);
172155acce7SJhaShweta1         clear();
173155acce7SJhaShweta1         my_queue_representation->assign(*src.my_queue_representation, my_allocator, move_construct_item);
174155acce7SJhaShweta1     }
175155acce7SJhaShweta1 
assign(std::initializer_list<value_type> init)176155acce7SJhaShweta1     void assign( std::initializer_list<value_type> init ) {
177155acce7SJhaShweta1         assign(init.begin(), init.end());
178155acce7SJhaShweta1     }
179155acce7SJhaShweta1 
swap(concurrent_queue & other)180155acce7SJhaShweta1     void swap ( concurrent_queue& other ) {
181155acce7SJhaShweta1         //TODO: implement support for std::allocator_traits::propagate_on_container_swap
182155acce7SJhaShweta1         __TBB_ASSERT(my_allocator == other.my_allocator, "unequal allocators");
183155acce7SJhaShweta1         internal_swap(other);
184155acce7SJhaShweta1     }
185155acce7SJhaShweta1 
18649e08aacStbbdev     // Enqueue an item at tail of queue.
push(const T & value)18749e08aacStbbdev     void push(const T& value) {
18849e08aacStbbdev         internal_push(value);
18949e08aacStbbdev     }
19049e08aacStbbdev 
push(T && value)19149e08aacStbbdev     void push(T&& value) {
19249e08aacStbbdev         internal_push(std::move(value));
19349e08aacStbbdev     }
19449e08aacStbbdev 
19549e08aacStbbdev     template <typename... Args>
emplace(Args &&...args)19649e08aacStbbdev     void emplace( Args&&... args ) {
19749e08aacStbbdev         internal_push(std::forward<Args>(args)...);
19849e08aacStbbdev     }
19949e08aacStbbdev 
20049e08aacStbbdev     // Attempt to dequeue an item from head of queue.
20149e08aacStbbdev     /** Does not wait for item to become available.
20249e08aacStbbdev         Returns true if successful; false otherwise. */
try_pop(T & result)20349e08aacStbbdev     bool try_pop( T& result ) {
20449e08aacStbbdev         return internal_try_pop(&result);
20549e08aacStbbdev     }
20649e08aacStbbdev 
20749e08aacStbbdev     // Return the number of items in the queue; thread unsafe
unsafe_size()20849e08aacStbbdev     size_type unsafe_size() const {
20949e08aacStbbdev         std::ptrdiff_t size = my_queue_representation->size();
21049e08aacStbbdev         return size < 0 ? 0 :  size_type(size);
21149e08aacStbbdev     }
21249e08aacStbbdev 
21349e08aacStbbdev     // Equivalent to size()==0.
empty()214b15aabb3Stbbdev     __TBB_nodiscard bool empty() const {
21549e08aacStbbdev         return my_queue_representation->empty();
21649e08aacStbbdev     }
21749e08aacStbbdev 
21849e08aacStbbdev     // Clear the queue. not thread-safe.
clear()21949e08aacStbbdev     void clear() {
2208155aaebSkboyarinov         my_queue_representation->clear(my_allocator);
22149e08aacStbbdev     }
22249e08aacStbbdev 
22349e08aacStbbdev     // Return allocator object
get_allocator()22449e08aacStbbdev     allocator_type get_allocator() const { return my_allocator; }
22549e08aacStbbdev 
22649e08aacStbbdev     //------------------------------------------------------------------------
22749e08aacStbbdev     // The iterators are intended only for debugging.  They are slow and not thread safe.
22849e08aacStbbdev     //------------------------------------------------------------------------
22949e08aacStbbdev 
unsafe_begin()23049e08aacStbbdev     iterator unsafe_begin() { return concurrent_queue_iterator_provider::get<iterator>(*this); }
unsafe_end()23149e08aacStbbdev     iterator unsafe_end() { return iterator(); }
unsafe_begin()23249e08aacStbbdev     const_iterator unsafe_begin() const { return concurrent_queue_iterator_provider::get<const_iterator>(*this); }
unsafe_end()23349e08aacStbbdev     const_iterator unsafe_end() const { return const_iterator(); }
unsafe_cbegin()23449e08aacStbbdev     const_iterator unsafe_cbegin() const { return concurrent_queue_iterator_provider::get<const_iterator>(*this); }
unsafe_cend()23549e08aacStbbdev     const_iterator unsafe_cend() const { return const_iterator(); }
23649e08aacStbbdev 
23749e08aacStbbdev private:
internal_swap(concurrent_queue & src)23849e08aacStbbdev     void internal_swap(concurrent_queue& src) {
23949e08aacStbbdev         using std::swap;
24049e08aacStbbdev         swap(my_queue_representation, src.my_queue_representation);
24149e08aacStbbdev     }
24249e08aacStbbdev 
24349e08aacStbbdev     template <typename... Args>
internal_push(Args &&...args)24449e08aacStbbdev     void internal_push( Args&&... args ) {
24549e08aacStbbdev         ticket_type k = my_queue_representation->tail_counter++;
246fbc48b39Svlserov         my_queue_representation->choose(k).push(k, *my_queue_representation, my_allocator, std::forward<Args>(args)...);
24749e08aacStbbdev     }
24849e08aacStbbdev 
internal_try_pop(void * dst)24949e08aacStbbdev     bool internal_try_pop( void* dst ) {
250eac94650SAlex         return internal_try_pop_impl(dst, *my_queue_representation, my_allocator).first;
25149e08aacStbbdev     }
25249e08aacStbbdev 
25349e08aacStbbdev     template <typename Container, typename Value, typename A>
25449e08aacStbbdev     friend class concurrent_queue_iterator;
25549e08aacStbbdev 
copy_construct_item(T * location,const void * src)25649e08aacStbbdev     static void copy_construct_item(T* location, const void* src) {
25749e08aacStbbdev         // TODO: use allocator_traits for copy construction
25849e08aacStbbdev         new (location) value_type(*static_cast<const value_type*>(src));
25949e08aacStbbdev         // queue_allocator_traits::construct(my_allocator, location, *static_cast<const T*>(src));
26049e08aacStbbdev     }
26149e08aacStbbdev 
move_construct_item(T * location,const void * src)26249e08aacStbbdev     static void move_construct_item(T* location, const void* src) {
26349e08aacStbbdev         // TODO: use allocator_traits for move construction
26449e08aacStbbdev         new (location) value_type(std::move(*static_cast<value_type*>(const_cast<void*>(src))));
26549e08aacStbbdev     }
26649e08aacStbbdev 
26749e08aacStbbdev     queue_allocator_type my_allocator;
26849e08aacStbbdev     queue_representation_type* my_queue_representation;
269155acce7SJhaShweta1 
swap(concurrent_queue & lhs,concurrent_queue & rhs)270155acce7SJhaShweta1     friend void swap( concurrent_queue& lhs, concurrent_queue& rhs ) {
271155acce7SJhaShweta1         lhs.swap(rhs);
272155acce7SJhaShweta1     }
273155acce7SJhaShweta1 
274155acce7SJhaShweta1     friend bool operator==( const concurrent_queue& lhs, const concurrent_queue& rhs ) {
275155acce7SJhaShweta1         return lhs.unsafe_size() == rhs.unsafe_size() && std::equal(lhs.unsafe_begin(), lhs.unsafe_end(), rhs.unsafe_begin());
276155acce7SJhaShweta1     }
277155acce7SJhaShweta1 
278155acce7SJhaShweta1 #if !__TBB_CPP20_COMPARISONS_PRESENT
279155acce7SJhaShweta1     friend bool operator!=( const concurrent_queue& lhs,  const concurrent_queue& rhs ) {
280155acce7SJhaShweta1         return !(lhs == rhs);
281155acce7SJhaShweta1     }
282155acce7SJhaShweta1 #endif // __TBB_CPP20_COMPARISONS_PRESENT
28349e08aacStbbdev }; // class concurrent_queue
28449e08aacStbbdev 
28549e08aacStbbdev #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
28649e08aacStbbdev // Deduction guide for the constructor from two iterators
287d86ed7fbStbbdev template <typename It, typename Alloc = tbb::cache_aligned_allocator<iterator_value_t<It>>,
288d86ed7fbStbbdev           typename = std::enable_if_t<is_input_iterator_v<It>>,
289d86ed7fbStbbdev           typename = std::enable_if_t<is_allocator_v<Alloc>>>
290d86ed7fbStbbdev concurrent_queue( It, It, Alloc = Alloc() )
291d86ed7fbStbbdev -> concurrent_queue<iterator_value_t<It>, Alloc>;
292d86ed7fbStbbdev 
29349e08aacStbbdev #endif /* __TBB_CPP17_DEDUCTION_GUIDES_PRESENT */
29449e08aacStbbdev 
29549e08aacStbbdev class concurrent_monitor;
29649e08aacStbbdev 
29749e08aacStbbdev // The concurrent monitor tags for concurrent_bounded_queue.
29849e08aacStbbdev static constexpr std::size_t cbq_slots_avail_tag = 0;
29949e08aacStbbdev static constexpr std::size_t cbq_items_avail_tag = 1;
300fbc48b39Svlserov } // namespace d2
30149e08aacStbbdev 
30249e08aacStbbdev 
30349e08aacStbbdev namespace r1 {
30449e08aacStbbdev     class concurrent_monitor;
30549e08aacStbbdev 
3068827ea7dSLong Nguyen     TBB_EXPORT std::uint8_t* __TBB_EXPORTED_FUNC allocate_bounded_queue_rep( std::size_t queue_rep_size );
3078827ea7dSLong Nguyen     TBB_EXPORT void __TBB_EXPORTED_FUNC deallocate_bounded_queue_rep( std::uint8_t* mem, std::size_t queue_rep_size );
3088827ea7dSLong Nguyen     TBB_EXPORT void __TBB_EXPORTED_FUNC abort_bounded_queue_monitors( concurrent_monitor* monitors );
3098827ea7dSLong Nguyen     TBB_EXPORT void __TBB_EXPORTED_FUNC notify_bounded_queue_monitor( concurrent_monitor* monitors, std::size_t monitor_tag
31049e08aacStbbdev                                                             , std::size_t ticket );
3118827ea7dSLong Nguyen     TBB_EXPORT void __TBB_EXPORTED_FUNC wait_bounded_queue_monitor( concurrent_monitor* monitors, std::size_t monitor_tag,
31249e08aacStbbdev                                                             std::ptrdiff_t target, d1::delegate_base& predicate );
31349e08aacStbbdev } // namespace r1
31449e08aacStbbdev 
31549e08aacStbbdev 
316fbc48b39Svlserov namespace d2 {
31749e08aacStbbdev // A high-performance thread-safe blocking concurrent bounded queue.
31849e08aacStbbdev // Supports boundedness and blocking semantics.
31949e08aacStbbdev // Multiple threads may each push and pop concurrently.
32049e08aacStbbdev // Assignment construction is not allowed.
32149e08aacStbbdev template <typename T, typename Allocator = tbb::cache_aligned_allocator<T>>
32249e08aacStbbdev class concurrent_bounded_queue {
32349e08aacStbbdev     using allocator_traits_type = tbb::detail::allocator_traits<Allocator>;
32449e08aacStbbdev     using queue_representation_type = concurrent_queue_rep<T, Allocator>;
32549e08aacStbbdev     using queue_allocator_type = typename allocator_traits_type::template rebind_alloc<queue_representation_type>;
32649e08aacStbbdev     using queue_allocator_traits = tbb::detail::allocator_traits<queue_allocator_type>;
32749e08aacStbbdev 
32849e08aacStbbdev     template <typename FuncType>
internal_wait(r1::concurrent_monitor * monitors,std::size_t monitor_tag,std::ptrdiff_t target,FuncType pred)32949e08aacStbbdev     void internal_wait(r1::concurrent_monitor* monitors, std::size_t monitor_tag, std::ptrdiff_t target, FuncType pred) {
330fbc48b39Svlserov         d1::delegated_function<FuncType> func(pred);
33149e08aacStbbdev         r1::wait_bounded_queue_monitor(monitors, monitor_tag, target, func);
33249e08aacStbbdev     }
33349e08aacStbbdev public:
33449e08aacStbbdev     using size_type = std::ptrdiff_t;
33549e08aacStbbdev     using value_type = T;
33649e08aacStbbdev     using reference = T&;
33749e08aacStbbdev     using const_reference = const T&;
33849e08aacStbbdev     using difference_type = std::ptrdiff_t;
33949e08aacStbbdev 
34049e08aacStbbdev     using allocator_type = Allocator;
34149e08aacStbbdev     using pointer = typename allocator_traits_type::pointer;
34249e08aacStbbdev     using const_pointer = typename allocator_traits_type::const_pointer;
34349e08aacStbbdev 
34449e08aacStbbdev     using iterator = concurrent_queue_iterator<concurrent_bounded_queue, T, Allocator>;
34549e08aacStbbdev     using const_iterator = concurrent_queue_iterator<concurrent_bounded_queue, const T, Allocator> ;
34649e08aacStbbdev 
concurrent_bounded_queue()34749e08aacStbbdev     concurrent_bounded_queue() : concurrent_bounded_queue(allocator_type()) {}
34849e08aacStbbdev 
concurrent_bounded_queue(const allocator_type & a)34949e08aacStbbdev     explicit concurrent_bounded_queue( const allocator_type& a ) :
35049e08aacStbbdev         my_allocator(a), my_capacity(0), my_abort_counter(0), my_queue_representation(nullptr)
35149e08aacStbbdev     {
35249e08aacStbbdev         my_queue_representation = reinterpret_cast<queue_representation_type*>(
35349e08aacStbbdev             r1::allocate_bounded_queue_rep(sizeof(queue_representation_type)));
35449e08aacStbbdev         my_monitors = reinterpret_cast<r1::concurrent_monitor*>(my_queue_representation + 1);
355fbc48b39Svlserov         queue_allocator_traits::construct(my_allocator, my_queue_representation);
35649e08aacStbbdev         my_capacity = std::size_t(-1) / (queue_representation_type::item_size > 1 ? queue_representation_type::item_size : 2);
35749e08aacStbbdev 
35849e08aacStbbdev         __TBB_ASSERT(is_aligned(my_queue_representation, max_nfs_size), "alignment error" );
35949e08aacStbbdev         __TBB_ASSERT(is_aligned(&my_queue_representation->head_counter, max_nfs_size), "alignment error" );
36049e08aacStbbdev         __TBB_ASSERT(is_aligned(&my_queue_representation->tail_counter, max_nfs_size), "alignment error" );
36149e08aacStbbdev         __TBB_ASSERT(is_aligned(&my_queue_representation->array, max_nfs_size), "alignment error" );
36249e08aacStbbdev     }
36349e08aacStbbdev 
36449e08aacStbbdev     template <typename InputIterator>
36549e08aacStbbdev     concurrent_bounded_queue( InputIterator begin, InputIterator end, const allocator_type& a = allocator_type() ) :
concurrent_bounded_queue(a)36649e08aacStbbdev         concurrent_bounded_queue(a)
36749e08aacStbbdev     {
36849e08aacStbbdev         for (; begin != end; ++begin)
36949e08aacStbbdev             push(*begin);
37049e08aacStbbdev     }
37149e08aacStbbdev 
372155acce7SJhaShweta1     concurrent_bounded_queue( std::initializer_list<value_type> init, const allocator_type& alloc = allocator_type() ):
373155acce7SJhaShweta1         concurrent_bounded_queue(init.begin(), init.end(), alloc)
374155acce7SJhaShweta1     {}
375155acce7SJhaShweta1 
concurrent_bounded_queue(const concurrent_bounded_queue & src,const allocator_type & a)37649e08aacStbbdev     concurrent_bounded_queue( const concurrent_bounded_queue& src, const allocator_type& a ) :
37749e08aacStbbdev         concurrent_bounded_queue(a)
37849e08aacStbbdev     {
379fbc48b39Svlserov         my_queue_representation->assign(*src.my_queue_representation, my_allocator, copy_construct_item);
38049e08aacStbbdev     }
38149e08aacStbbdev 
concurrent_bounded_queue(const concurrent_bounded_queue & src)38249e08aacStbbdev     concurrent_bounded_queue( const concurrent_bounded_queue& src ) :
38349e08aacStbbdev         concurrent_bounded_queue(queue_allocator_traits::select_on_container_copy_construction(src.get_allocator()))
38449e08aacStbbdev     {
385fbc48b39Svlserov         my_queue_representation->assign(*src.my_queue_representation, my_allocator, copy_construct_item);
38649e08aacStbbdev     }
38749e08aacStbbdev 
38849e08aacStbbdev     // Move constructors
concurrent_bounded_queue(concurrent_bounded_queue && src)38949e08aacStbbdev     concurrent_bounded_queue( concurrent_bounded_queue&& src ) :
39049e08aacStbbdev         concurrent_bounded_queue(std::move(src.my_allocator))
39149e08aacStbbdev     {
39249e08aacStbbdev         internal_swap(src);
39349e08aacStbbdev     }
39449e08aacStbbdev 
concurrent_bounded_queue(concurrent_bounded_queue && src,const allocator_type & a)39549e08aacStbbdev     concurrent_bounded_queue( concurrent_bounded_queue&& src, const allocator_type& a ) :
39649e08aacStbbdev         concurrent_bounded_queue(a)
39749e08aacStbbdev     {
39849e08aacStbbdev         // checking that memory allocated by one instance of allocator can be deallocated
39949e08aacStbbdev         // with another
40049e08aacStbbdev         if (my_allocator == src.my_allocator) {
40149e08aacStbbdev             internal_swap(src);
40249e08aacStbbdev         } else {
40349e08aacStbbdev             // allocators are different => performing per-element move
404fbc48b39Svlserov             my_queue_representation->assign(*src.my_queue_representation, my_allocator, move_construct_item);
40549e08aacStbbdev             src.clear();
40649e08aacStbbdev         }
40749e08aacStbbdev     }
40849e08aacStbbdev 
40949e08aacStbbdev     // Destroy queue
~concurrent_bounded_queue()41049e08aacStbbdev     ~concurrent_bounded_queue() {
41149e08aacStbbdev         clear();
412fbc48b39Svlserov         my_queue_representation->clear(my_allocator);
41349e08aacStbbdev         queue_allocator_traits::destroy(my_allocator, my_queue_representation);
41449e08aacStbbdev         r1::deallocate_bounded_queue_rep(reinterpret_cast<std::uint8_t*>(my_queue_representation),
41549e08aacStbbdev                                          sizeof(queue_representation_type));
41649e08aacStbbdev     }
41749e08aacStbbdev 
418155acce7SJhaShweta1     concurrent_bounded_queue& operator=( const concurrent_bounded_queue& other ) {
419155acce7SJhaShweta1         //TODO: implement support for std::allocator_traits::propagate_on_container_copy_assignment
420155acce7SJhaShweta1         if (my_queue_representation != other.my_queue_representation) {
421155acce7SJhaShweta1             clear();
422155acce7SJhaShweta1             my_allocator = other.my_allocator;
423155acce7SJhaShweta1             my_queue_representation->assign(*other.my_queue_representation, my_allocator, copy_construct_item);
424155acce7SJhaShweta1         }
425155acce7SJhaShweta1         return *this;
426155acce7SJhaShweta1     }
427155acce7SJhaShweta1 
428155acce7SJhaShweta1     concurrent_bounded_queue& operator=( concurrent_bounded_queue&& other ) {
429155acce7SJhaShweta1         //TODO: implement support for std::allocator_traits::propagate_on_container_move_assignment
430155acce7SJhaShweta1         if (my_queue_representation != other.my_queue_representation) {
431155acce7SJhaShweta1             clear();
432155acce7SJhaShweta1             if (my_allocator == other.my_allocator) {
433155acce7SJhaShweta1                 internal_swap(other);
434155acce7SJhaShweta1             } else {
435155acce7SJhaShweta1                 my_queue_representation->assign(*other.my_queue_representation, other.my_allocator, move_construct_item);
436155acce7SJhaShweta1                 other.clear();
437155acce7SJhaShweta1                 my_allocator = std::move(other.my_allocator);
438155acce7SJhaShweta1             }
439155acce7SJhaShweta1         }
440155acce7SJhaShweta1         return *this;
441155acce7SJhaShweta1     }
442155acce7SJhaShweta1 
443155acce7SJhaShweta1     concurrent_bounded_queue& operator=( std::initializer_list<value_type> init ) {
444155acce7SJhaShweta1         assign(init);
445155acce7SJhaShweta1         return *this;
446155acce7SJhaShweta1     }
447155acce7SJhaShweta1 
448155acce7SJhaShweta1     template <typename InputIterator>
assign(InputIterator first,InputIterator last)449155acce7SJhaShweta1     void assign( InputIterator first, InputIterator last ) {
450155acce7SJhaShweta1         concurrent_bounded_queue src(first, last);
451155acce7SJhaShweta1         clear();
452155acce7SJhaShweta1         my_queue_representation->assign(*src.my_queue_representation, my_allocator, move_construct_item);
453155acce7SJhaShweta1     }
454155acce7SJhaShweta1 
assign(std::initializer_list<value_type> init)455155acce7SJhaShweta1     void assign( std::initializer_list<value_type> init ) {
456155acce7SJhaShweta1         assign(init.begin(), init.end());
457155acce7SJhaShweta1     }
458155acce7SJhaShweta1 
swap(concurrent_bounded_queue & other)459155acce7SJhaShweta1     void swap ( concurrent_bounded_queue& other ) {
460155acce7SJhaShweta1         //TODO: implement support for std::allocator_traits::propagate_on_container_swap
461155acce7SJhaShweta1         __TBB_ASSERT(my_allocator == other.my_allocator, "unequal allocators");
462155acce7SJhaShweta1         internal_swap(other);
463155acce7SJhaShweta1     }
464155acce7SJhaShweta1 
46549e08aacStbbdev     // Enqueue an item at tail of queue.
push(const T & value)46649e08aacStbbdev     void push( const T& value ) {
46749e08aacStbbdev         internal_push(value);
46849e08aacStbbdev     }
46949e08aacStbbdev 
push(T && value)47049e08aacStbbdev     void push( T&& value ) {
47149e08aacStbbdev         internal_push(std::move(value));
47249e08aacStbbdev     }
47349e08aacStbbdev 
47449e08aacStbbdev     // Enqueue an item at tail of queue if queue is not already full.
47549e08aacStbbdev     // Does not wait for queue to become not full.
47649e08aacStbbdev     // Returns true if item is pushed; false if queue was already full.
try_push(const T & value)47749e08aacStbbdev     bool try_push( const T& value ) {
47849e08aacStbbdev         return internal_push_if_not_full(value);
47949e08aacStbbdev     }
48049e08aacStbbdev 
try_push(T && value)48149e08aacStbbdev     bool try_push( T&& value ) {
48249e08aacStbbdev         return internal_push_if_not_full(std::move(value));
48349e08aacStbbdev     }
48449e08aacStbbdev 
48549e08aacStbbdev     template <typename... Args>
emplace(Args &&...args)48649e08aacStbbdev     void emplace( Args&&... args ) {
48749e08aacStbbdev         internal_push(std::forward<Args>(args)...);
48849e08aacStbbdev     }
48949e08aacStbbdev 
49049e08aacStbbdev     template <typename... Args>
try_emplace(Args &&...args)49149e08aacStbbdev     bool try_emplace( Args&&... args ) {
49249e08aacStbbdev         return internal_push_if_not_full(std::forward<Args>(args)...);
49349e08aacStbbdev     }
49449e08aacStbbdev 
49549e08aacStbbdev     // Attempt to dequeue an item from head of queue.
pop(T & result)49657f8bb1dSkboyarinov     void pop( T& result ) {
49757f8bb1dSkboyarinov         internal_pop(&result);
49849e08aacStbbdev     }
49949e08aacStbbdev 
50001fa1414SJakub Kopal     /** Does not wait for item to become available.
50101fa1414SJakub Kopal         Returns true if successful; false otherwise. */
try_pop(T & result)50249e08aacStbbdev     bool try_pop( T& result ) {
50349e08aacStbbdev         return internal_pop_if_present(&result);
50449e08aacStbbdev     }
50549e08aacStbbdev 
abort()50649e08aacStbbdev     void abort() {
50749e08aacStbbdev         internal_abort();
50849e08aacStbbdev     }
50949e08aacStbbdev 
51049e08aacStbbdev     // Return the number of items in the queue; thread unsafe
size()51149e08aacStbbdev     std::ptrdiff_t size() const {
51249e08aacStbbdev         return my_queue_representation->size();
51349e08aacStbbdev     }
51449e08aacStbbdev 
set_capacity(size_type new_capacity)51549e08aacStbbdev     void set_capacity( size_type new_capacity ) {
51649e08aacStbbdev         std::ptrdiff_t c = new_capacity < 0 ? infinite_capacity : new_capacity;
51749e08aacStbbdev         my_capacity = c;
51849e08aacStbbdev     }
51949e08aacStbbdev 
capacity()52049e08aacStbbdev     size_type capacity() const {
52149e08aacStbbdev         return my_capacity;
52249e08aacStbbdev     }
52349e08aacStbbdev 
52449e08aacStbbdev     // Equivalent to size()==0.
empty()525b15aabb3Stbbdev     __TBB_nodiscard bool empty() const {
52649e08aacStbbdev         return my_queue_representation->empty();
52749e08aacStbbdev     }
52849e08aacStbbdev 
52949e08aacStbbdev     // Clear the queue. not thread-safe.
clear()53049e08aacStbbdev     void clear() {
5318155aaebSkboyarinov         my_queue_representation->clear(my_allocator);
53249e08aacStbbdev     }
53349e08aacStbbdev 
53449e08aacStbbdev     // Return allocator object
get_allocator()53549e08aacStbbdev     allocator_type get_allocator() const { return my_allocator; }
53649e08aacStbbdev 
53749e08aacStbbdev     //------------------------------------------------------------------------
53849e08aacStbbdev     // The iterators are intended only for debugging.  They are slow and not thread safe.
53949e08aacStbbdev     //------------------------------------------------------------------------
54049e08aacStbbdev 
unsafe_begin()54149e08aacStbbdev     iterator unsafe_begin() { return concurrent_queue_iterator_provider::get<iterator>(*this); }
unsafe_end()54249e08aacStbbdev     iterator unsafe_end() { return iterator(); }
unsafe_begin()54349e08aacStbbdev     const_iterator unsafe_begin() const { return concurrent_queue_iterator_provider::get<const_iterator>(*this); }
unsafe_end()54449e08aacStbbdev     const_iterator unsafe_end() const { return const_iterator(); }
unsafe_cbegin()54549e08aacStbbdev     const_iterator unsafe_cbegin() const { return concurrent_queue_iterator_provider::get<const_iterator>(*this); }
unsafe_cend()54649e08aacStbbdev     const_iterator unsafe_cend() const { return const_iterator(); }
54749e08aacStbbdev 
54849e08aacStbbdev private:
internal_swap(concurrent_bounded_queue & src)54949e08aacStbbdev     void internal_swap( concurrent_bounded_queue& src ) {
55049e08aacStbbdev         std::swap(my_queue_representation, src.my_queue_representation);
55149e08aacStbbdev         std::swap(my_monitors, src.my_monitors);
55249e08aacStbbdev     }
55349e08aacStbbdev 
55449e08aacStbbdev     static constexpr std::ptrdiff_t infinite_capacity = std::ptrdiff_t(~size_type(0) / 2);
55549e08aacStbbdev 
55649e08aacStbbdev     template <typename... Args>
internal_push(Args &&...args)55749e08aacStbbdev     void internal_push( Args&&... args ) {
55849e08aacStbbdev         unsigned old_abort_counter = my_abort_counter.load(std::memory_order_relaxed);
55949e08aacStbbdev         ticket_type ticket = my_queue_representation->tail_counter++;
56049e08aacStbbdev         std::ptrdiff_t target = ticket - my_capacity;
56149e08aacStbbdev 
56249e08aacStbbdev         if (static_cast<std::ptrdiff_t>(my_queue_representation->head_counter.load(std::memory_order_relaxed)) <= target) { // queue is full
56349e08aacStbbdev             auto pred = [&] {
56449e08aacStbbdev                 if (my_abort_counter.load(std::memory_order_relaxed) != old_abort_counter) {
56549e08aacStbbdev                     throw_exception(exception_id::user_abort);
56649e08aacStbbdev                 }
56749e08aacStbbdev 
56849e08aacStbbdev                 return static_cast<std::ptrdiff_t>(my_queue_representation->head_counter.load(std::memory_order_relaxed)) <= target;
56949e08aacStbbdev             };
57049e08aacStbbdev 
57149e08aacStbbdev             try_call( [&] {
57249e08aacStbbdev                 internal_wait(my_monitors, cbq_slots_avail_tag, target, pred);
57349e08aacStbbdev             }).on_exception( [&] {
574fbc48b39Svlserov                 my_queue_representation->choose(ticket).abort_push(ticket, *my_queue_representation, my_allocator);
57549e08aacStbbdev             });
57649e08aacStbbdev 
57749e08aacStbbdev         }
57849e08aacStbbdev         __TBB_ASSERT((static_cast<std::ptrdiff_t>(my_queue_representation->head_counter.load(std::memory_order_relaxed)) > target), nullptr);
579fbc48b39Svlserov         my_queue_representation->choose(ticket).push(ticket, *my_queue_representation, my_allocator, std::forward<Args>(args)...);
58049e08aacStbbdev         r1::notify_bounded_queue_monitor(my_monitors, cbq_items_avail_tag, ticket);
58149e08aacStbbdev     }
58249e08aacStbbdev 
58349e08aacStbbdev     template <typename... Args>
internal_push_if_not_full(Args &&...args)58449e08aacStbbdev     bool internal_push_if_not_full( Args&&... args ) {
58549e08aacStbbdev         ticket_type ticket = my_queue_representation->tail_counter.load(std::memory_order_relaxed);
58649e08aacStbbdev         do {
58749e08aacStbbdev             if (static_cast<std::ptrdiff_t>(ticket - my_queue_representation->head_counter.load(std::memory_order_relaxed)) >= my_capacity) {
58849e08aacStbbdev                 // Queue is full
58949e08aacStbbdev                 return false;
59049e08aacStbbdev             }
59149e08aacStbbdev             // Queue had empty slot with ticket k when we looked. Attempt to claim that slot.
59249e08aacStbbdev             // Another thread claimed the slot, so retry.
59349e08aacStbbdev         } while (!my_queue_representation->tail_counter.compare_exchange_strong(ticket, ticket + 1));
59449e08aacStbbdev 
595fbc48b39Svlserov         my_queue_representation->choose(ticket).push(ticket, *my_queue_representation, my_allocator, std::forward<Args>(args)...);
59649e08aacStbbdev         r1::notify_bounded_queue_monitor(my_monitors, cbq_items_avail_tag, ticket);
59749e08aacStbbdev         return true;
59849e08aacStbbdev     }
59949e08aacStbbdev 
internal_pop(void * dst)60057f8bb1dSkboyarinov     void internal_pop( void* dst ) {
60149e08aacStbbdev         std::ptrdiff_t target;
60249e08aacStbbdev         // This loop is a single pop operation; abort_counter should not be re-read inside
60349e08aacStbbdev         unsigned old_abort_counter = my_abort_counter.load(std::memory_order_relaxed);
60449e08aacStbbdev 
60549e08aacStbbdev         do {
60649e08aacStbbdev             target = my_queue_representation->head_counter++;
60749e08aacStbbdev             if (static_cast<std::ptrdiff_t>(my_queue_representation->tail_counter.load(std::memory_order_relaxed)) <= target) {
60849e08aacStbbdev                 auto pred = [&] {
60949e08aacStbbdev                     if (my_abort_counter.load(std::memory_order_relaxed) != old_abort_counter) {
61049e08aacStbbdev                             throw_exception(exception_id::user_abort);
61149e08aacStbbdev                     }
61249e08aacStbbdev 
61349e08aacStbbdev                     return static_cast<std::ptrdiff_t>(my_queue_representation->tail_counter.load(std::memory_order_relaxed)) <= target;
61449e08aacStbbdev                 };
61549e08aacStbbdev 
61649e08aacStbbdev                 try_call( [&] {
61749e08aacStbbdev                     internal_wait(my_monitors, cbq_items_avail_tag, target, pred);
61849e08aacStbbdev                 }).on_exception( [&] {
61949e08aacStbbdev                     my_queue_representation->head_counter--;
62049e08aacStbbdev                 });
62149e08aacStbbdev             }
62249e08aacStbbdev             __TBB_ASSERT(static_cast<std::ptrdiff_t>(my_queue_representation->tail_counter.load(std::memory_order_relaxed)) > target, nullptr);
623fbc48b39Svlserov         } while (!my_queue_representation->choose(target).pop(dst, target, *my_queue_representation, my_allocator));
62449e08aacStbbdev 
62549e08aacStbbdev         r1::notify_bounded_queue_monitor(my_monitors, cbq_slots_avail_tag, target);
62649e08aacStbbdev     }
62749e08aacStbbdev 
internal_pop_if_present(void * dst)62849e08aacStbbdev     bool internal_pop_if_present( void* dst ) {
629eac94650SAlex         bool present{};
630eac94650SAlex         ticket_type ticket{};
631eac94650SAlex         std::tie(present, ticket) = internal_try_pop_impl(dst, *my_queue_representation, my_allocator);
63249e08aacStbbdev 
633eac94650SAlex         if (present) {
63449e08aacStbbdev             r1::notify_bounded_queue_monitor(my_monitors, cbq_slots_avail_tag, ticket);
635eac94650SAlex         }
636eac94650SAlex         return present;
63749e08aacStbbdev     }
63849e08aacStbbdev 
internal_abort()63949e08aacStbbdev     void internal_abort() {
64049e08aacStbbdev         ++my_abort_counter;
64149e08aacStbbdev         r1::abort_bounded_queue_monitors(my_monitors);
64249e08aacStbbdev     }
64349e08aacStbbdev 
copy_construct_item(T * location,const void * src)64449e08aacStbbdev     static void copy_construct_item(T* location, const void* src) {
64549e08aacStbbdev         // TODO: use allocator_traits for copy construction
64649e08aacStbbdev         new (location) value_type(*static_cast<const value_type*>(src));
64749e08aacStbbdev     }
64849e08aacStbbdev 
move_construct_item(T * location,const void * src)64949e08aacStbbdev     static void move_construct_item(T* location, const void* src) {
65049e08aacStbbdev         // TODO: use allocator_traits for move construction
65149e08aacStbbdev         new (location) value_type(std::move(*static_cast<value_type*>(const_cast<void*>(src))));
65249e08aacStbbdev     }
65349e08aacStbbdev 
65449e08aacStbbdev     template <typename Container, typename Value, typename A>
65549e08aacStbbdev     friend class concurrent_queue_iterator;
65649e08aacStbbdev 
65749e08aacStbbdev     queue_allocator_type my_allocator;
65849e08aacStbbdev     std::ptrdiff_t my_capacity;
65949e08aacStbbdev     std::atomic<unsigned> my_abort_counter;
66049e08aacStbbdev     queue_representation_type* my_queue_representation;
66149e08aacStbbdev 
66249e08aacStbbdev     r1::concurrent_monitor* my_monitors;
663155acce7SJhaShweta1 
swap(concurrent_bounded_queue & lhs,concurrent_bounded_queue & rhs)664155acce7SJhaShweta1     friend void swap( concurrent_bounded_queue& lhs, concurrent_bounded_queue& rhs ) {
665155acce7SJhaShweta1         lhs.swap(rhs);
666155acce7SJhaShweta1     }
667155acce7SJhaShweta1 
668155acce7SJhaShweta1     friend bool operator==( const concurrent_bounded_queue& lhs, const concurrent_bounded_queue& rhs ) {
669155acce7SJhaShweta1         return lhs.size() == rhs.size() && std::equal(lhs.unsafe_begin(), lhs.unsafe_end(), rhs.unsafe_begin());
670155acce7SJhaShweta1     }
671155acce7SJhaShweta1 
672155acce7SJhaShweta1 #if !__TBB_CPP20_COMPARISONS_PRESENT
673155acce7SJhaShweta1     friend bool operator!=( const concurrent_bounded_queue& lhs, const concurrent_bounded_queue& rhs ) {
674155acce7SJhaShweta1         return !(lhs == rhs);
675155acce7SJhaShweta1     }
676155acce7SJhaShweta1 #endif // __TBB_CPP20_COMPARISONS_PRESENT
67749e08aacStbbdev }; // class concurrent_bounded_queue
67849e08aacStbbdev 
67949e08aacStbbdev #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
68049e08aacStbbdev // Deduction guide for the constructor from two iterators
681d86ed7fbStbbdev template <typename It, typename Alloc = tbb::cache_aligned_allocator<iterator_value_t<It>>>
682d86ed7fbStbbdev concurrent_bounded_queue( It, It, Alloc = Alloc() )
683d86ed7fbStbbdev -> concurrent_bounded_queue<iterator_value_t<It>, Alloc>;
684d86ed7fbStbbdev 
68549e08aacStbbdev #endif /* __TBB_CPP17_DEDUCTION_GUIDES_PRESENT */
68649e08aacStbbdev 
687fbc48b39Svlserov } //namespace d2
688*c4a799dfSJhaShweta1 } // namespace detail
68949e08aacStbbdev 
69049e08aacStbbdev inline namespace v1 {
69149e08aacStbbdev 
692fbc48b39Svlserov using detail::d2::concurrent_queue;
693fbc48b39Svlserov using detail::d2::concurrent_bounded_queue;
69449e08aacStbbdev using detail::r1::user_abort;
69549e08aacStbbdev using detail::r1::bad_last_alloc;
69649e08aacStbbdev 
69749e08aacStbbdev } // inline namespace v1
69849e08aacStbbdev } // namespace tbb
69949e08aacStbbdev 
70049e08aacStbbdev #endif // __TBB_concurrent_queue_H
701