149e08aacStbbdev /*
2155acce7SJhaShweta1 Copyright (c) 2005-2023 Intel Corporation
349e08aacStbbdev
449e08aacStbbdev Licensed under the Apache License, Version 2.0 (the "License");
549e08aacStbbdev you may not use this file except in compliance with the License.
649e08aacStbbdev You may obtain a copy of the License at
749e08aacStbbdev
849e08aacStbbdev http://www.apache.org/licenses/LICENSE-2.0
949e08aacStbbdev
1049e08aacStbbdev Unless required by applicable law or agreed to in writing, software
1149e08aacStbbdev distributed under the License is distributed on an "AS IS" BASIS,
1249e08aacStbbdev WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1349e08aacStbbdev See the License for the specific language governing permissions and
1449e08aacStbbdev limitations under the License.
1549e08aacStbbdev */
1649e08aacStbbdev
1749e08aacStbbdev #ifndef __TBB_concurrent_queue_H
1849e08aacStbbdev #define __TBB_concurrent_queue_H
1949e08aacStbbdev
2049e08aacStbbdev #include "detail/_namespace_injection.h"
2149e08aacStbbdev #include "detail/_concurrent_queue_base.h"
2249e08aacStbbdev #include "detail/_allocator_traits.h"
2349e08aacStbbdev #include "detail/_exception.h"
24d86ed7fbStbbdev #include "detail/_containers_helpers.h"
2549e08aacStbbdev #include "cache_aligned_allocator.h"
2649e08aacStbbdev
2749e08aacStbbdev namespace tbb {
2849e08aacStbbdev namespace detail {
29fbc48b39Svlserov namespace d2 {
3049e08aacStbbdev
31eac94650SAlex template <typename QueueRep, typename Allocator>
internal_try_pop_impl(void * dst,QueueRep & queue,Allocator & alloc)32eac94650SAlex std::pair<bool, ticket_type> internal_try_pop_impl(void* dst, QueueRep& queue, Allocator& alloc ) {
33eac94650SAlex ticket_type ticket{};
34eac94650SAlex do {
35eac94650SAlex // Basically, we need to read `head_counter` before `tail_counter`. To achieve it we build happens-before on `head_counter`
36eac94650SAlex ticket = queue.head_counter.load(std::memory_order_acquire);
37eac94650SAlex do {
38eac94650SAlex if (static_cast<std::ptrdiff_t>(queue.tail_counter.load(std::memory_order_relaxed) - ticket) <= 0) { // queue is empty
39eac94650SAlex // Queue is empty
40eac94650SAlex return { false, ticket };
41eac94650SAlex }
42eac94650SAlex // Queue had item with ticket k when we looked. Attempt to get that item.
43eac94650SAlex // Another thread snatched the item, retry.
44eac94650SAlex } while (!queue.head_counter.compare_exchange_strong(ticket, ticket + 1));
45eac94650SAlex } while (!queue.choose(ticket).pop(dst, ticket, queue, alloc));
46eac94650SAlex return { true, ticket };
47eac94650SAlex }
48eac94650SAlex
4949e08aacStbbdev // A high-performance thread-safe non-blocking concurrent queue.
5049e08aacStbbdev // Multiple threads may each push and pop concurrently.
5149e08aacStbbdev // Assignment construction is not allowed.
5249e08aacStbbdev template <typename T, typename Allocator = tbb::cache_aligned_allocator<T>>
5349e08aacStbbdev class concurrent_queue {
5449e08aacStbbdev using allocator_traits_type = tbb::detail::allocator_traits<Allocator>;
5549e08aacStbbdev using queue_representation_type = concurrent_queue_rep<T, Allocator>;
5649e08aacStbbdev using queue_allocator_type = typename allocator_traits_type::template rebind_alloc<queue_representation_type>;
5749e08aacStbbdev using queue_allocator_traits = tbb::detail::allocator_traits<queue_allocator_type>;
5849e08aacStbbdev public:
5949e08aacStbbdev using size_type = std::size_t;
6049e08aacStbbdev using value_type = T;
6149e08aacStbbdev using reference = T&;
6249e08aacStbbdev using const_reference = const T&;
6349e08aacStbbdev using difference_type = std::ptrdiff_t;
6449e08aacStbbdev
6549e08aacStbbdev using allocator_type = Allocator;
6649e08aacStbbdev using pointer = typename allocator_traits_type::pointer;
6749e08aacStbbdev using const_pointer = typename allocator_traits_type::const_pointer;
6849e08aacStbbdev
6949e08aacStbbdev using iterator = concurrent_queue_iterator<concurrent_queue, T, Allocator>;
7049e08aacStbbdev using const_iterator = concurrent_queue_iterator<concurrent_queue, const T, Allocator>;
7149e08aacStbbdev
concurrent_queue()7249e08aacStbbdev concurrent_queue() : concurrent_queue(allocator_type()) {}
7349e08aacStbbdev
concurrent_queue(const allocator_type & a)7449e08aacStbbdev explicit concurrent_queue(const allocator_type& a) :
7549e08aacStbbdev my_allocator(a), my_queue_representation(nullptr)
7649e08aacStbbdev {
7749e08aacStbbdev my_queue_representation = static_cast<queue_representation_type*>(r1::cache_aligned_allocate(sizeof(queue_representation_type)));
78fbc48b39Svlserov queue_allocator_traits::construct(my_allocator, my_queue_representation);
7949e08aacStbbdev
8049e08aacStbbdev __TBB_ASSERT(is_aligned(my_queue_representation, max_nfs_size), "alignment error" );
8149e08aacStbbdev __TBB_ASSERT(is_aligned(&my_queue_representation->head_counter, max_nfs_size), "alignment error" );
8249e08aacStbbdev __TBB_ASSERT(is_aligned(&my_queue_representation->tail_counter, max_nfs_size), "alignment error" );
8349e08aacStbbdev __TBB_ASSERT(is_aligned(&my_queue_representation->array, max_nfs_size), "alignment error" );
8449e08aacStbbdev }
8549e08aacStbbdev
8649e08aacStbbdev template <typename InputIterator>
8749e08aacStbbdev concurrent_queue(InputIterator begin, InputIterator end, const allocator_type& a = allocator_type()) :
concurrent_queue(a)8849e08aacStbbdev concurrent_queue(a)
8949e08aacStbbdev {
9049e08aacStbbdev for (; begin != end; ++begin)
9149e08aacStbbdev push(*begin);
9249e08aacStbbdev }
9349e08aacStbbdev
94155acce7SJhaShweta1 concurrent_queue( std::initializer_list<value_type> init, const allocator_type& alloc = allocator_type() ) :
95155acce7SJhaShweta1 concurrent_queue(init.begin(), init.end(), alloc)
96155acce7SJhaShweta1 {}
97155acce7SJhaShweta1
concurrent_queue(const concurrent_queue & src,const allocator_type & a)9849e08aacStbbdev concurrent_queue(const concurrent_queue& src, const allocator_type& a) :
9949e08aacStbbdev concurrent_queue(a)
10049e08aacStbbdev {
101fbc48b39Svlserov my_queue_representation->assign(*src.my_queue_representation, my_allocator, copy_construct_item);
10249e08aacStbbdev }
10349e08aacStbbdev
concurrent_queue(const concurrent_queue & src)10449e08aacStbbdev concurrent_queue(const concurrent_queue& src) :
10549e08aacStbbdev concurrent_queue(queue_allocator_traits::select_on_container_copy_construction(src.get_allocator()))
10649e08aacStbbdev {
107fbc48b39Svlserov my_queue_representation->assign(*src.my_queue_representation, my_allocator, copy_construct_item);
10849e08aacStbbdev }
10949e08aacStbbdev
11049e08aacStbbdev // Move constructors
concurrent_queue(concurrent_queue && src)11149e08aacStbbdev concurrent_queue(concurrent_queue&& src) :
11249e08aacStbbdev concurrent_queue(std::move(src.my_allocator))
11349e08aacStbbdev {
11449e08aacStbbdev internal_swap(src);
11549e08aacStbbdev }
11649e08aacStbbdev
concurrent_queue(concurrent_queue && src,const allocator_type & a)11749e08aacStbbdev concurrent_queue(concurrent_queue&& src, const allocator_type& a) :
11849e08aacStbbdev concurrent_queue(a)
11949e08aacStbbdev {
12049e08aacStbbdev // checking that memory allocated by one instance of allocator can be deallocated
12149e08aacStbbdev // with another
12249e08aacStbbdev if (my_allocator == src.my_allocator) {
12349e08aacStbbdev internal_swap(src);
12449e08aacStbbdev } else {
12549e08aacStbbdev // allocators are different => performing per-element move
126fbc48b39Svlserov my_queue_representation->assign(*src.my_queue_representation, my_allocator, move_construct_item);
12749e08aacStbbdev src.clear();
12849e08aacStbbdev }
12949e08aacStbbdev }
13049e08aacStbbdev
13149e08aacStbbdev // Destroy queue
~concurrent_queue()13249e08aacStbbdev ~concurrent_queue() {
13349e08aacStbbdev clear();
134fbc48b39Svlserov my_queue_representation->clear(my_allocator);
13549e08aacStbbdev queue_allocator_traits::destroy(my_allocator, my_queue_representation);
13649e08aacStbbdev r1::cache_aligned_deallocate(my_queue_representation);
13749e08aacStbbdev }
13849e08aacStbbdev
139155acce7SJhaShweta1 concurrent_queue& operator=( const concurrent_queue& other ) {
140155acce7SJhaShweta1 //TODO: implement support for std::allocator_traits::propagate_on_container_copy_assignment
141155acce7SJhaShweta1 if (my_queue_representation != other.my_queue_representation) {
142155acce7SJhaShweta1 clear();
143155acce7SJhaShweta1 my_allocator = other.my_allocator;
144155acce7SJhaShweta1 my_queue_representation->assign(*other.my_queue_representation, my_allocator, copy_construct_item);
145155acce7SJhaShweta1 }
146155acce7SJhaShweta1 return *this;
147155acce7SJhaShweta1 }
148155acce7SJhaShweta1
149155acce7SJhaShweta1 concurrent_queue& operator=( concurrent_queue&& other ) {
150155acce7SJhaShweta1 //TODO: implement support for std::allocator_traits::propagate_on_container_move_assignment
151155acce7SJhaShweta1 if (my_queue_representation != other.my_queue_representation) {
152155acce7SJhaShweta1 clear();
153155acce7SJhaShweta1 if (my_allocator == other.my_allocator) {
154155acce7SJhaShweta1 internal_swap(other);
155155acce7SJhaShweta1 } else {
156155acce7SJhaShweta1 my_queue_representation->assign(*other.my_queue_representation, other.my_allocator, move_construct_item);
157155acce7SJhaShweta1 other.clear();
158155acce7SJhaShweta1 my_allocator = std::move(other.my_allocator);
159155acce7SJhaShweta1 }
160155acce7SJhaShweta1 }
161155acce7SJhaShweta1 return *this;
162155acce7SJhaShweta1 }
163155acce7SJhaShweta1
164155acce7SJhaShweta1 concurrent_queue& operator=( std::initializer_list<value_type> init ) {
165155acce7SJhaShweta1 assign(init);
166155acce7SJhaShweta1 return *this;
167155acce7SJhaShweta1 }
168155acce7SJhaShweta1
169155acce7SJhaShweta1 template <typename InputIterator>
assign(InputIterator first,InputIterator last)170155acce7SJhaShweta1 void assign( InputIterator first, InputIterator last ) {
171155acce7SJhaShweta1 concurrent_queue src(first, last);
172155acce7SJhaShweta1 clear();
173155acce7SJhaShweta1 my_queue_representation->assign(*src.my_queue_representation, my_allocator, move_construct_item);
174155acce7SJhaShweta1 }
175155acce7SJhaShweta1
assign(std::initializer_list<value_type> init)176155acce7SJhaShweta1 void assign( std::initializer_list<value_type> init ) {
177155acce7SJhaShweta1 assign(init.begin(), init.end());
178155acce7SJhaShweta1 }
179155acce7SJhaShweta1
swap(concurrent_queue & other)180155acce7SJhaShweta1 void swap ( concurrent_queue& other ) {
181155acce7SJhaShweta1 //TODO: implement support for std::allocator_traits::propagate_on_container_swap
182155acce7SJhaShweta1 __TBB_ASSERT(my_allocator == other.my_allocator, "unequal allocators");
183155acce7SJhaShweta1 internal_swap(other);
184155acce7SJhaShweta1 }
185155acce7SJhaShweta1
18649e08aacStbbdev // Enqueue an item at tail of queue.
push(const T & value)18749e08aacStbbdev void push(const T& value) {
18849e08aacStbbdev internal_push(value);
18949e08aacStbbdev }
19049e08aacStbbdev
push(T && value)19149e08aacStbbdev void push(T&& value) {
19249e08aacStbbdev internal_push(std::move(value));
19349e08aacStbbdev }
19449e08aacStbbdev
19549e08aacStbbdev template <typename... Args>
emplace(Args &&...args)19649e08aacStbbdev void emplace( Args&&... args ) {
19749e08aacStbbdev internal_push(std::forward<Args>(args)...);
19849e08aacStbbdev }
19949e08aacStbbdev
20049e08aacStbbdev // Attempt to dequeue an item from head of queue.
20149e08aacStbbdev /** Does not wait for item to become available.
20249e08aacStbbdev Returns true if successful; false otherwise. */
try_pop(T & result)20349e08aacStbbdev bool try_pop( T& result ) {
20449e08aacStbbdev return internal_try_pop(&result);
20549e08aacStbbdev }
20649e08aacStbbdev
20749e08aacStbbdev // Return the number of items in the queue; thread unsafe
unsafe_size()20849e08aacStbbdev size_type unsafe_size() const {
20949e08aacStbbdev std::ptrdiff_t size = my_queue_representation->size();
21049e08aacStbbdev return size < 0 ? 0 : size_type(size);
21149e08aacStbbdev }
21249e08aacStbbdev
21349e08aacStbbdev // Equivalent to size()==0.
empty()214b15aabb3Stbbdev __TBB_nodiscard bool empty() const {
21549e08aacStbbdev return my_queue_representation->empty();
21649e08aacStbbdev }
21749e08aacStbbdev
21849e08aacStbbdev // Clear the queue. not thread-safe.
clear()21949e08aacStbbdev void clear() {
2208155aaebSkboyarinov my_queue_representation->clear(my_allocator);
22149e08aacStbbdev }
22249e08aacStbbdev
22349e08aacStbbdev // Return allocator object
get_allocator()22449e08aacStbbdev allocator_type get_allocator() const { return my_allocator; }
22549e08aacStbbdev
22649e08aacStbbdev //------------------------------------------------------------------------
22749e08aacStbbdev // The iterators are intended only for debugging. They are slow and not thread safe.
22849e08aacStbbdev //------------------------------------------------------------------------
22949e08aacStbbdev
unsafe_begin()23049e08aacStbbdev iterator unsafe_begin() { return concurrent_queue_iterator_provider::get<iterator>(*this); }
unsafe_end()23149e08aacStbbdev iterator unsafe_end() { return iterator(); }
unsafe_begin()23249e08aacStbbdev const_iterator unsafe_begin() const { return concurrent_queue_iterator_provider::get<const_iterator>(*this); }
unsafe_end()23349e08aacStbbdev const_iterator unsafe_end() const { return const_iterator(); }
unsafe_cbegin()23449e08aacStbbdev const_iterator unsafe_cbegin() const { return concurrent_queue_iterator_provider::get<const_iterator>(*this); }
unsafe_cend()23549e08aacStbbdev const_iterator unsafe_cend() const { return const_iterator(); }
23649e08aacStbbdev
23749e08aacStbbdev private:
internal_swap(concurrent_queue & src)23849e08aacStbbdev void internal_swap(concurrent_queue& src) {
23949e08aacStbbdev using std::swap;
24049e08aacStbbdev swap(my_queue_representation, src.my_queue_representation);
24149e08aacStbbdev }
24249e08aacStbbdev
24349e08aacStbbdev template <typename... Args>
internal_push(Args &&...args)24449e08aacStbbdev void internal_push( Args&&... args ) {
24549e08aacStbbdev ticket_type k = my_queue_representation->tail_counter++;
246fbc48b39Svlserov my_queue_representation->choose(k).push(k, *my_queue_representation, my_allocator, std::forward<Args>(args)...);
24749e08aacStbbdev }
24849e08aacStbbdev
internal_try_pop(void * dst)24949e08aacStbbdev bool internal_try_pop( void* dst ) {
250eac94650SAlex return internal_try_pop_impl(dst, *my_queue_representation, my_allocator).first;
25149e08aacStbbdev }
25249e08aacStbbdev
25349e08aacStbbdev template <typename Container, typename Value, typename A>
25449e08aacStbbdev friend class concurrent_queue_iterator;
25549e08aacStbbdev
copy_construct_item(T * location,const void * src)25649e08aacStbbdev static void copy_construct_item(T* location, const void* src) {
25749e08aacStbbdev // TODO: use allocator_traits for copy construction
25849e08aacStbbdev new (location) value_type(*static_cast<const value_type*>(src));
25949e08aacStbbdev // queue_allocator_traits::construct(my_allocator, location, *static_cast<const T*>(src));
26049e08aacStbbdev }
26149e08aacStbbdev
move_construct_item(T * location,const void * src)26249e08aacStbbdev static void move_construct_item(T* location, const void* src) {
26349e08aacStbbdev // TODO: use allocator_traits for move construction
26449e08aacStbbdev new (location) value_type(std::move(*static_cast<value_type*>(const_cast<void*>(src))));
26549e08aacStbbdev }
26649e08aacStbbdev
26749e08aacStbbdev queue_allocator_type my_allocator;
26849e08aacStbbdev queue_representation_type* my_queue_representation;
269155acce7SJhaShweta1
swap(concurrent_queue & lhs,concurrent_queue & rhs)270155acce7SJhaShweta1 friend void swap( concurrent_queue& lhs, concurrent_queue& rhs ) {
271155acce7SJhaShweta1 lhs.swap(rhs);
272155acce7SJhaShweta1 }
273155acce7SJhaShweta1
274155acce7SJhaShweta1 friend bool operator==( const concurrent_queue& lhs, const concurrent_queue& rhs ) {
275155acce7SJhaShweta1 return lhs.unsafe_size() == rhs.unsafe_size() && std::equal(lhs.unsafe_begin(), lhs.unsafe_end(), rhs.unsafe_begin());
276155acce7SJhaShweta1 }
277155acce7SJhaShweta1
278155acce7SJhaShweta1 #if !__TBB_CPP20_COMPARISONS_PRESENT
279155acce7SJhaShweta1 friend bool operator!=( const concurrent_queue& lhs, const concurrent_queue& rhs ) {
280155acce7SJhaShweta1 return !(lhs == rhs);
281155acce7SJhaShweta1 }
282155acce7SJhaShweta1 #endif // __TBB_CPP20_COMPARISONS_PRESENT
28349e08aacStbbdev }; // class concurrent_queue
28449e08aacStbbdev
28549e08aacStbbdev #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
28649e08aacStbbdev // Deduction guide for the constructor from two iterators
287d86ed7fbStbbdev template <typename It, typename Alloc = tbb::cache_aligned_allocator<iterator_value_t<It>>,
288d86ed7fbStbbdev typename = std::enable_if_t<is_input_iterator_v<It>>,
289d86ed7fbStbbdev typename = std::enable_if_t<is_allocator_v<Alloc>>>
290d86ed7fbStbbdev concurrent_queue( It, It, Alloc = Alloc() )
291d86ed7fbStbbdev -> concurrent_queue<iterator_value_t<It>, Alloc>;
292d86ed7fbStbbdev
29349e08aacStbbdev #endif /* __TBB_CPP17_DEDUCTION_GUIDES_PRESENT */
29449e08aacStbbdev
29549e08aacStbbdev class concurrent_monitor;
29649e08aacStbbdev
29749e08aacStbbdev // The concurrent monitor tags for concurrent_bounded_queue.
29849e08aacStbbdev static constexpr std::size_t cbq_slots_avail_tag = 0;
29949e08aacStbbdev static constexpr std::size_t cbq_items_avail_tag = 1;
300fbc48b39Svlserov } // namespace d2
30149e08aacStbbdev
30249e08aacStbbdev
30349e08aacStbbdev namespace r1 {
30449e08aacStbbdev class concurrent_monitor;
30549e08aacStbbdev
3068827ea7dSLong Nguyen TBB_EXPORT std::uint8_t* __TBB_EXPORTED_FUNC allocate_bounded_queue_rep( std::size_t queue_rep_size );
3078827ea7dSLong Nguyen TBB_EXPORT void __TBB_EXPORTED_FUNC deallocate_bounded_queue_rep( std::uint8_t* mem, std::size_t queue_rep_size );
3088827ea7dSLong Nguyen TBB_EXPORT void __TBB_EXPORTED_FUNC abort_bounded_queue_monitors( concurrent_monitor* monitors );
3098827ea7dSLong Nguyen TBB_EXPORT void __TBB_EXPORTED_FUNC notify_bounded_queue_monitor( concurrent_monitor* monitors, std::size_t monitor_tag
31049e08aacStbbdev , std::size_t ticket );
3118827ea7dSLong Nguyen TBB_EXPORT void __TBB_EXPORTED_FUNC wait_bounded_queue_monitor( concurrent_monitor* monitors, std::size_t monitor_tag,
31249e08aacStbbdev std::ptrdiff_t target, d1::delegate_base& predicate );
31349e08aacStbbdev } // namespace r1
31449e08aacStbbdev
31549e08aacStbbdev
316fbc48b39Svlserov namespace d2 {
31749e08aacStbbdev // A high-performance thread-safe blocking concurrent bounded queue.
31849e08aacStbbdev // Supports boundedness and blocking semantics.
31949e08aacStbbdev // Multiple threads may each push and pop concurrently.
32049e08aacStbbdev // Assignment construction is not allowed.
32149e08aacStbbdev template <typename T, typename Allocator = tbb::cache_aligned_allocator<T>>
32249e08aacStbbdev class concurrent_bounded_queue {
32349e08aacStbbdev using allocator_traits_type = tbb::detail::allocator_traits<Allocator>;
32449e08aacStbbdev using queue_representation_type = concurrent_queue_rep<T, Allocator>;
32549e08aacStbbdev using queue_allocator_type = typename allocator_traits_type::template rebind_alloc<queue_representation_type>;
32649e08aacStbbdev using queue_allocator_traits = tbb::detail::allocator_traits<queue_allocator_type>;
32749e08aacStbbdev
32849e08aacStbbdev template <typename FuncType>
internal_wait(r1::concurrent_monitor * monitors,std::size_t monitor_tag,std::ptrdiff_t target,FuncType pred)32949e08aacStbbdev void internal_wait(r1::concurrent_monitor* monitors, std::size_t monitor_tag, std::ptrdiff_t target, FuncType pred) {
330fbc48b39Svlserov d1::delegated_function<FuncType> func(pred);
33149e08aacStbbdev r1::wait_bounded_queue_monitor(monitors, monitor_tag, target, func);
33249e08aacStbbdev }
33349e08aacStbbdev public:
33449e08aacStbbdev using size_type = std::ptrdiff_t;
33549e08aacStbbdev using value_type = T;
33649e08aacStbbdev using reference = T&;
33749e08aacStbbdev using const_reference = const T&;
33849e08aacStbbdev using difference_type = std::ptrdiff_t;
33949e08aacStbbdev
34049e08aacStbbdev using allocator_type = Allocator;
34149e08aacStbbdev using pointer = typename allocator_traits_type::pointer;
34249e08aacStbbdev using const_pointer = typename allocator_traits_type::const_pointer;
34349e08aacStbbdev
34449e08aacStbbdev using iterator = concurrent_queue_iterator<concurrent_bounded_queue, T, Allocator>;
34549e08aacStbbdev using const_iterator = concurrent_queue_iterator<concurrent_bounded_queue, const T, Allocator> ;
34649e08aacStbbdev
concurrent_bounded_queue()34749e08aacStbbdev concurrent_bounded_queue() : concurrent_bounded_queue(allocator_type()) {}
34849e08aacStbbdev
concurrent_bounded_queue(const allocator_type & a)34949e08aacStbbdev explicit concurrent_bounded_queue( const allocator_type& a ) :
35049e08aacStbbdev my_allocator(a), my_capacity(0), my_abort_counter(0), my_queue_representation(nullptr)
35149e08aacStbbdev {
35249e08aacStbbdev my_queue_representation = reinterpret_cast<queue_representation_type*>(
35349e08aacStbbdev r1::allocate_bounded_queue_rep(sizeof(queue_representation_type)));
35449e08aacStbbdev my_monitors = reinterpret_cast<r1::concurrent_monitor*>(my_queue_representation + 1);
355fbc48b39Svlserov queue_allocator_traits::construct(my_allocator, my_queue_representation);
35649e08aacStbbdev my_capacity = std::size_t(-1) / (queue_representation_type::item_size > 1 ? queue_representation_type::item_size : 2);
35749e08aacStbbdev
35849e08aacStbbdev __TBB_ASSERT(is_aligned(my_queue_representation, max_nfs_size), "alignment error" );
35949e08aacStbbdev __TBB_ASSERT(is_aligned(&my_queue_representation->head_counter, max_nfs_size), "alignment error" );
36049e08aacStbbdev __TBB_ASSERT(is_aligned(&my_queue_representation->tail_counter, max_nfs_size), "alignment error" );
36149e08aacStbbdev __TBB_ASSERT(is_aligned(&my_queue_representation->array, max_nfs_size), "alignment error" );
36249e08aacStbbdev }
36349e08aacStbbdev
36449e08aacStbbdev template <typename InputIterator>
36549e08aacStbbdev concurrent_bounded_queue( InputIterator begin, InputIterator end, const allocator_type& a = allocator_type() ) :
concurrent_bounded_queue(a)36649e08aacStbbdev concurrent_bounded_queue(a)
36749e08aacStbbdev {
36849e08aacStbbdev for (; begin != end; ++begin)
36949e08aacStbbdev push(*begin);
37049e08aacStbbdev }
37149e08aacStbbdev
372155acce7SJhaShweta1 concurrent_bounded_queue( std::initializer_list<value_type> init, const allocator_type& alloc = allocator_type() ):
373155acce7SJhaShweta1 concurrent_bounded_queue(init.begin(), init.end(), alloc)
374155acce7SJhaShweta1 {}
375155acce7SJhaShweta1
concurrent_bounded_queue(const concurrent_bounded_queue & src,const allocator_type & a)37649e08aacStbbdev concurrent_bounded_queue( const concurrent_bounded_queue& src, const allocator_type& a ) :
37749e08aacStbbdev concurrent_bounded_queue(a)
37849e08aacStbbdev {
379fbc48b39Svlserov my_queue_representation->assign(*src.my_queue_representation, my_allocator, copy_construct_item);
38049e08aacStbbdev }
38149e08aacStbbdev
concurrent_bounded_queue(const concurrent_bounded_queue & src)38249e08aacStbbdev concurrent_bounded_queue( const concurrent_bounded_queue& src ) :
38349e08aacStbbdev concurrent_bounded_queue(queue_allocator_traits::select_on_container_copy_construction(src.get_allocator()))
38449e08aacStbbdev {
385fbc48b39Svlserov my_queue_representation->assign(*src.my_queue_representation, my_allocator, copy_construct_item);
38649e08aacStbbdev }
38749e08aacStbbdev
38849e08aacStbbdev // Move constructors
concurrent_bounded_queue(concurrent_bounded_queue && src)38949e08aacStbbdev concurrent_bounded_queue( concurrent_bounded_queue&& src ) :
39049e08aacStbbdev concurrent_bounded_queue(std::move(src.my_allocator))
39149e08aacStbbdev {
39249e08aacStbbdev internal_swap(src);
39349e08aacStbbdev }
39449e08aacStbbdev
concurrent_bounded_queue(concurrent_bounded_queue && src,const allocator_type & a)39549e08aacStbbdev concurrent_bounded_queue( concurrent_bounded_queue&& src, const allocator_type& a ) :
39649e08aacStbbdev concurrent_bounded_queue(a)
39749e08aacStbbdev {
39849e08aacStbbdev // checking that memory allocated by one instance of allocator can be deallocated
39949e08aacStbbdev // with another
40049e08aacStbbdev if (my_allocator == src.my_allocator) {
40149e08aacStbbdev internal_swap(src);
40249e08aacStbbdev } else {
40349e08aacStbbdev // allocators are different => performing per-element move
404fbc48b39Svlserov my_queue_representation->assign(*src.my_queue_representation, my_allocator, move_construct_item);
40549e08aacStbbdev src.clear();
40649e08aacStbbdev }
40749e08aacStbbdev }
40849e08aacStbbdev
40949e08aacStbbdev // Destroy queue
~concurrent_bounded_queue()41049e08aacStbbdev ~concurrent_bounded_queue() {
41149e08aacStbbdev clear();
412fbc48b39Svlserov my_queue_representation->clear(my_allocator);
41349e08aacStbbdev queue_allocator_traits::destroy(my_allocator, my_queue_representation);
41449e08aacStbbdev r1::deallocate_bounded_queue_rep(reinterpret_cast<std::uint8_t*>(my_queue_representation),
41549e08aacStbbdev sizeof(queue_representation_type));
41649e08aacStbbdev }
41749e08aacStbbdev
418155acce7SJhaShweta1 concurrent_bounded_queue& operator=( const concurrent_bounded_queue& other ) {
419155acce7SJhaShweta1 //TODO: implement support for std::allocator_traits::propagate_on_container_copy_assignment
420155acce7SJhaShweta1 if (my_queue_representation != other.my_queue_representation) {
421155acce7SJhaShweta1 clear();
422155acce7SJhaShweta1 my_allocator = other.my_allocator;
423155acce7SJhaShweta1 my_queue_representation->assign(*other.my_queue_representation, my_allocator, copy_construct_item);
424155acce7SJhaShweta1 }
425155acce7SJhaShweta1 return *this;
426155acce7SJhaShweta1 }
427155acce7SJhaShweta1
428155acce7SJhaShweta1 concurrent_bounded_queue& operator=( concurrent_bounded_queue&& other ) {
429155acce7SJhaShweta1 //TODO: implement support for std::allocator_traits::propagate_on_container_move_assignment
430155acce7SJhaShweta1 if (my_queue_representation != other.my_queue_representation) {
431155acce7SJhaShweta1 clear();
432155acce7SJhaShweta1 if (my_allocator == other.my_allocator) {
433155acce7SJhaShweta1 internal_swap(other);
434155acce7SJhaShweta1 } else {
435155acce7SJhaShweta1 my_queue_representation->assign(*other.my_queue_representation, other.my_allocator, move_construct_item);
436155acce7SJhaShweta1 other.clear();
437155acce7SJhaShweta1 my_allocator = std::move(other.my_allocator);
438155acce7SJhaShweta1 }
439155acce7SJhaShweta1 }
440155acce7SJhaShweta1 return *this;
441155acce7SJhaShweta1 }
442155acce7SJhaShweta1
443155acce7SJhaShweta1 concurrent_bounded_queue& operator=( std::initializer_list<value_type> init ) {
444155acce7SJhaShweta1 assign(init);
445155acce7SJhaShweta1 return *this;
446155acce7SJhaShweta1 }
447155acce7SJhaShweta1
448155acce7SJhaShweta1 template <typename InputIterator>
assign(InputIterator first,InputIterator last)449155acce7SJhaShweta1 void assign( InputIterator first, InputIterator last ) {
450155acce7SJhaShweta1 concurrent_bounded_queue src(first, last);
451155acce7SJhaShweta1 clear();
452155acce7SJhaShweta1 my_queue_representation->assign(*src.my_queue_representation, my_allocator, move_construct_item);
453155acce7SJhaShweta1 }
454155acce7SJhaShweta1
assign(std::initializer_list<value_type> init)455155acce7SJhaShweta1 void assign( std::initializer_list<value_type> init ) {
456155acce7SJhaShweta1 assign(init.begin(), init.end());
457155acce7SJhaShweta1 }
458155acce7SJhaShweta1
swap(concurrent_bounded_queue & other)459155acce7SJhaShweta1 void swap ( concurrent_bounded_queue& other ) {
460155acce7SJhaShweta1 //TODO: implement support for std::allocator_traits::propagate_on_container_swap
461155acce7SJhaShweta1 __TBB_ASSERT(my_allocator == other.my_allocator, "unequal allocators");
462155acce7SJhaShweta1 internal_swap(other);
463155acce7SJhaShweta1 }
464155acce7SJhaShweta1
46549e08aacStbbdev // Enqueue an item at tail of queue.
push(const T & value)46649e08aacStbbdev void push( const T& value ) {
46749e08aacStbbdev internal_push(value);
46849e08aacStbbdev }
46949e08aacStbbdev
push(T && value)47049e08aacStbbdev void push( T&& value ) {
47149e08aacStbbdev internal_push(std::move(value));
47249e08aacStbbdev }
47349e08aacStbbdev
47449e08aacStbbdev // Enqueue an item at tail of queue if queue is not already full.
47549e08aacStbbdev // Does not wait for queue to become not full.
47649e08aacStbbdev // Returns true if item is pushed; false if queue was already full.
try_push(const T & value)47749e08aacStbbdev bool try_push( const T& value ) {
47849e08aacStbbdev return internal_push_if_not_full(value);
47949e08aacStbbdev }
48049e08aacStbbdev
try_push(T && value)48149e08aacStbbdev bool try_push( T&& value ) {
48249e08aacStbbdev return internal_push_if_not_full(std::move(value));
48349e08aacStbbdev }
48449e08aacStbbdev
48549e08aacStbbdev template <typename... Args>
emplace(Args &&...args)48649e08aacStbbdev void emplace( Args&&... args ) {
48749e08aacStbbdev internal_push(std::forward<Args>(args)...);
48849e08aacStbbdev }
48949e08aacStbbdev
49049e08aacStbbdev template <typename... Args>
try_emplace(Args &&...args)49149e08aacStbbdev bool try_emplace( Args&&... args ) {
49249e08aacStbbdev return internal_push_if_not_full(std::forward<Args>(args)...);
49349e08aacStbbdev }
49449e08aacStbbdev
49549e08aacStbbdev // Attempt to dequeue an item from head of queue.
pop(T & result)49657f8bb1dSkboyarinov void pop( T& result ) {
49757f8bb1dSkboyarinov internal_pop(&result);
49849e08aacStbbdev }
49949e08aacStbbdev
50001fa1414SJakub Kopal /** Does not wait for item to become available.
50101fa1414SJakub Kopal Returns true if successful; false otherwise. */
try_pop(T & result)50249e08aacStbbdev bool try_pop( T& result ) {
50349e08aacStbbdev return internal_pop_if_present(&result);
50449e08aacStbbdev }
50549e08aacStbbdev
abort()50649e08aacStbbdev void abort() {
50749e08aacStbbdev internal_abort();
50849e08aacStbbdev }
50949e08aacStbbdev
51049e08aacStbbdev // Return the number of items in the queue; thread unsafe
size()51149e08aacStbbdev std::ptrdiff_t size() const {
51249e08aacStbbdev return my_queue_representation->size();
51349e08aacStbbdev }
51449e08aacStbbdev
set_capacity(size_type new_capacity)51549e08aacStbbdev void set_capacity( size_type new_capacity ) {
51649e08aacStbbdev std::ptrdiff_t c = new_capacity < 0 ? infinite_capacity : new_capacity;
51749e08aacStbbdev my_capacity = c;
51849e08aacStbbdev }
51949e08aacStbbdev
capacity()52049e08aacStbbdev size_type capacity() const {
52149e08aacStbbdev return my_capacity;
52249e08aacStbbdev }
52349e08aacStbbdev
52449e08aacStbbdev // Equivalent to size()==0.
empty()525b15aabb3Stbbdev __TBB_nodiscard bool empty() const {
52649e08aacStbbdev return my_queue_representation->empty();
52749e08aacStbbdev }
52849e08aacStbbdev
52949e08aacStbbdev // Clear the queue. not thread-safe.
clear()53049e08aacStbbdev void clear() {
5318155aaebSkboyarinov my_queue_representation->clear(my_allocator);
53249e08aacStbbdev }
53349e08aacStbbdev
53449e08aacStbbdev // Return allocator object
get_allocator()53549e08aacStbbdev allocator_type get_allocator() const { return my_allocator; }
53649e08aacStbbdev
53749e08aacStbbdev //------------------------------------------------------------------------
53849e08aacStbbdev // The iterators are intended only for debugging. They are slow and not thread safe.
53949e08aacStbbdev //------------------------------------------------------------------------
54049e08aacStbbdev
unsafe_begin()54149e08aacStbbdev iterator unsafe_begin() { return concurrent_queue_iterator_provider::get<iterator>(*this); }
unsafe_end()54249e08aacStbbdev iterator unsafe_end() { return iterator(); }
unsafe_begin()54349e08aacStbbdev const_iterator unsafe_begin() const { return concurrent_queue_iterator_provider::get<const_iterator>(*this); }
unsafe_end()54449e08aacStbbdev const_iterator unsafe_end() const { return const_iterator(); }
unsafe_cbegin()54549e08aacStbbdev const_iterator unsafe_cbegin() const { return concurrent_queue_iterator_provider::get<const_iterator>(*this); }
unsafe_cend()54649e08aacStbbdev const_iterator unsafe_cend() const { return const_iterator(); }
54749e08aacStbbdev
54849e08aacStbbdev private:
internal_swap(concurrent_bounded_queue & src)54949e08aacStbbdev void internal_swap( concurrent_bounded_queue& src ) {
55049e08aacStbbdev std::swap(my_queue_representation, src.my_queue_representation);
55149e08aacStbbdev std::swap(my_monitors, src.my_monitors);
55249e08aacStbbdev }
55349e08aacStbbdev
55449e08aacStbbdev static constexpr std::ptrdiff_t infinite_capacity = std::ptrdiff_t(~size_type(0) / 2);
55549e08aacStbbdev
55649e08aacStbbdev template <typename... Args>
internal_push(Args &&...args)55749e08aacStbbdev void internal_push( Args&&... args ) {
55849e08aacStbbdev unsigned old_abort_counter = my_abort_counter.load(std::memory_order_relaxed);
55949e08aacStbbdev ticket_type ticket = my_queue_representation->tail_counter++;
56049e08aacStbbdev std::ptrdiff_t target = ticket - my_capacity;
56149e08aacStbbdev
56249e08aacStbbdev if (static_cast<std::ptrdiff_t>(my_queue_representation->head_counter.load(std::memory_order_relaxed)) <= target) { // queue is full
56349e08aacStbbdev auto pred = [&] {
56449e08aacStbbdev if (my_abort_counter.load(std::memory_order_relaxed) != old_abort_counter) {
56549e08aacStbbdev throw_exception(exception_id::user_abort);
56649e08aacStbbdev }
56749e08aacStbbdev
56849e08aacStbbdev return static_cast<std::ptrdiff_t>(my_queue_representation->head_counter.load(std::memory_order_relaxed)) <= target;
56949e08aacStbbdev };
57049e08aacStbbdev
57149e08aacStbbdev try_call( [&] {
57249e08aacStbbdev internal_wait(my_monitors, cbq_slots_avail_tag, target, pred);
57349e08aacStbbdev }).on_exception( [&] {
574fbc48b39Svlserov my_queue_representation->choose(ticket).abort_push(ticket, *my_queue_representation, my_allocator);
57549e08aacStbbdev });
57649e08aacStbbdev
57749e08aacStbbdev }
57849e08aacStbbdev __TBB_ASSERT((static_cast<std::ptrdiff_t>(my_queue_representation->head_counter.load(std::memory_order_relaxed)) > target), nullptr);
579fbc48b39Svlserov my_queue_representation->choose(ticket).push(ticket, *my_queue_representation, my_allocator, std::forward<Args>(args)...);
58049e08aacStbbdev r1::notify_bounded_queue_monitor(my_monitors, cbq_items_avail_tag, ticket);
58149e08aacStbbdev }
58249e08aacStbbdev
58349e08aacStbbdev template <typename... Args>
internal_push_if_not_full(Args &&...args)58449e08aacStbbdev bool internal_push_if_not_full( Args&&... args ) {
58549e08aacStbbdev ticket_type ticket = my_queue_representation->tail_counter.load(std::memory_order_relaxed);
58649e08aacStbbdev do {
58749e08aacStbbdev if (static_cast<std::ptrdiff_t>(ticket - my_queue_representation->head_counter.load(std::memory_order_relaxed)) >= my_capacity) {
58849e08aacStbbdev // Queue is full
58949e08aacStbbdev return false;
59049e08aacStbbdev }
59149e08aacStbbdev // Queue had empty slot with ticket k when we looked. Attempt to claim that slot.
59249e08aacStbbdev // Another thread claimed the slot, so retry.
59349e08aacStbbdev } while (!my_queue_representation->tail_counter.compare_exchange_strong(ticket, ticket + 1));
59449e08aacStbbdev
595fbc48b39Svlserov my_queue_representation->choose(ticket).push(ticket, *my_queue_representation, my_allocator, std::forward<Args>(args)...);
59649e08aacStbbdev r1::notify_bounded_queue_monitor(my_monitors, cbq_items_avail_tag, ticket);
59749e08aacStbbdev return true;
59849e08aacStbbdev }
59949e08aacStbbdev
internal_pop(void * dst)60057f8bb1dSkboyarinov void internal_pop( void* dst ) {
60149e08aacStbbdev std::ptrdiff_t target;
60249e08aacStbbdev // This loop is a single pop operation; abort_counter should not be re-read inside
60349e08aacStbbdev unsigned old_abort_counter = my_abort_counter.load(std::memory_order_relaxed);
60449e08aacStbbdev
60549e08aacStbbdev do {
60649e08aacStbbdev target = my_queue_representation->head_counter++;
60749e08aacStbbdev if (static_cast<std::ptrdiff_t>(my_queue_representation->tail_counter.load(std::memory_order_relaxed)) <= target) {
60849e08aacStbbdev auto pred = [&] {
60949e08aacStbbdev if (my_abort_counter.load(std::memory_order_relaxed) != old_abort_counter) {
61049e08aacStbbdev throw_exception(exception_id::user_abort);
61149e08aacStbbdev }
61249e08aacStbbdev
61349e08aacStbbdev return static_cast<std::ptrdiff_t>(my_queue_representation->tail_counter.load(std::memory_order_relaxed)) <= target;
61449e08aacStbbdev };
61549e08aacStbbdev
61649e08aacStbbdev try_call( [&] {
61749e08aacStbbdev internal_wait(my_monitors, cbq_items_avail_tag, target, pred);
61849e08aacStbbdev }).on_exception( [&] {
61949e08aacStbbdev my_queue_representation->head_counter--;
62049e08aacStbbdev });
62149e08aacStbbdev }
62249e08aacStbbdev __TBB_ASSERT(static_cast<std::ptrdiff_t>(my_queue_representation->tail_counter.load(std::memory_order_relaxed)) > target, nullptr);
623fbc48b39Svlserov } while (!my_queue_representation->choose(target).pop(dst, target, *my_queue_representation, my_allocator));
62449e08aacStbbdev
62549e08aacStbbdev r1::notify_bounded_queue_monitor(my_monitors, cbq_slots_avail_tag, target);
62649e08aacStbbdev }
62749e08aacStbbdev
internal_pop_if_present(void * dst)62849e08aacStbbdev bool internal_pop_if_present( void* dst ) {
629eac94650SAlex bool present{};
630eac94650SAlex ticket_type ticket{};
631eac94650SAlex std::tie(present, ticket) = internal_try_pop_impl(dst, *my_queue_representation, my_allocator);
63249e08aacStbbdev
633eac94650SAlex if (present) {
63449e08aacStbbdev r1::notify_bounded_queue_monitor(my_monitors, cbq_slots_avail_tag, ticket);
635eac94650SAlex }
636eac94650SAlex return present;
63749e08aacStbbdev }
63849e08aacStbbdev
internal_abort()63949e08aacStbbdev void internal_abort() {
64049e08aacStbbdev ++my_abort_counter;
64149e08aacStbbdev r1::abort_bounded_queue_monitors(my_monitors);
64249e08aacStbbdev }
64349e08aacStbbdev
copy_construct_item(T * location,const void * src)64449e08aacStbbdev static void copy_construct_item(T* location, const void* src) {
64549e08aacStbbdev // TODO: use allocator_traits for copy construction
64649e08aacStbbdev new (location) value_type(*static_cast<const value_type*>(src));
64749e08aacStbbdev }
64849e08aacStbbdev
move_construct_item(T * location,const void * src)64949e08aacStbbdev static void move_construct_item(T* location, const void* src) {
65049e08aacStbbdev // TODO: use allocator_traits for move construction
65149e08aacStbbdev new (location) value_type(std::move(*static_cast<value_type*>(const_cast<void*>(src))));
65249e08aacStbbdev }
65349e08aacStbbdev
65449e08aacStbbdev template <typename Container, typename Value, typename A>
65549e08aacStbbdev friend class concurrent_queue_iterator;
65649e08aacStbbdev
65749e08aacStbbdev queue_allocator_type my_allocator;
65849e08aacStbbdev std::ptrdiff_t my_capacity;
65949e08aacStbbdev std::atomic<unsigned> my_abort_counter;
66049e08aacStbbdev queue_representation_type* my_queue_representation;
66149e08aacStbbdev
66249e08aacStbbdev r1::concurrent_monitor* my_monitors;
663155acce7SJhaShweta1
swap(concurrent_bounded_queue & lhs,concurrent_bounded_queue & rhs)664155acce7SJhaShweta1 friend void swap( concurrent_bounded_queue& lhs, concurrent_bounded_queue& rhs ) {
665155acce7SJhaShweta1 lhs.swap(rhs);
666155acce7SJhaShweta1 }
667155acce7SJhaShweta1
668155acce7SJhaShweta1 friend bool operator==( const concurrent_bounded_queue& lhs, const concurrent_bounded_queue& rhs ) {
669155acce7SJhaShweta1 return lhs.size() == rhs.size() && std::equal(lhs.unsafe_begin(), lhs.unsafe_end(), rhs.unsafe_begin());
670155acce7SJhaShweta1 }
671155acce7SJhaShweta1
672155acce7SJhaShweta1 #if !__TBB_CPP20_COMPARISONS_PRESENT
673155acce7SJhaShweta1 friend bool operator!=( const concurrent_bounded_queue& lhs, const concurrent_bounded_queue& rhs ) {
674155acce7SJhaShweta1 return !(lhs == rhs);
675155acce7SJhaShweta1 }
676155acce7SJhaShweta1 #endif // __TBB_CPP20_COMPARISONS_PRESENT
67749e08aacStbbdev }; // class concurrent_bounded_queue
67849e08aacStbbdev
67949e08aacStbbdev #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
68049e08aacStbbdev // Deduction guide for the constructor from two iterators
681d86ed7fbStbbdev template <typename It, typename Alloc = tbb::cache_aligned_allocator<iterator_value_t<It>>>
682d86ed7fbStbbdev concurrent_bounded_queue( It, It, Alloc = Alloc() )
683d86ed7fbStbbdev -> concurrent_bounded_queue<iterator_value_t<It>, Alloc>;
684d86ed7fbStbbdev
68549e08aacStbbdev #endif /* __TBB_CPP17_DEDUCTION_GUIDES_PRESENT */
68649e08aacStbbdev
687fbc48b39Svlserov } //namespace d2
688*c4a799dfSJhaShweta1 } // namespace detail
68949e08aacStbbdev
69049e08aacStbbdev inline namespace v1 {
69149e08aacStbbdev
692fbc48b39Svlserov using detail::d2::concurrent_queue;
693fbc48b39Svlserov using detail::d2::concurrent_bounded_queue;
69449e08aacStbbdev using detail::r1::user_abort;
69549e08aacStbbdev using detail::r1::bad_last_alloc;
69649e08aacStbbdev
69749e08aacStbbdev } // inline namespace v1
69849e08aacStbbdev } // namespace tbb
69949e08aacStbbdev
70049e08aacStbbdev #endif // __TBB_concurrent_queue_H
701