149e08aacStbbdev /*
2*c21e688aSSergey Zheltov Copyright (c) 2005-2022 Intel Corporation
349e08aacStbbdev
449e08aacStbbdev Licensed under the Apache License, Version 2.0 (the "License");
549e08aacStbbdev you may not use this file except in compliance with the License.
649e08aacStbbdev You may obtain a copy of the License at
749e08aacStbbdev
849e08aacStbbdev http://www.apache.org/licenses/LICENSE-2.0
949e08aacStbbdev
1049e08aacStbbdev Unless required by applicable law or agreed to in writing, software
1149e08aacStbbdev distributed under the License is distributed on an "AS IS" BASIS,
1249e08aacStbbdev WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1349e08aacStbbdev See the License for the specific language governing permissions and
1449e08aacStbbdev limitations under the License.
1549e08aacStbbdev */
1649e08aacStbbdev
1749e08aacStbbdev #ifndef __TBB_concurrent_priority_queue_H
1849e08aacStbbdev #define __TBB_concurrent_priority_queue_H
1949e08aacStbbdev
2049e08aacStbbdev #include "detail/_namespace_injection.h"
2149e08aacStbbdev #include "detail/_aggregator.h"
2249e08aacStbbdev #include "detail/_template_helpers.h"
2349e08aacStbbdev #include "detail/_allocator_traits.h"
2449e08aacStbbdev #include "detail/_range_common.h"
2549e08aacStbbdev #include "detail/_exception.h"
2649e08aacStbbdev #include "detail/_utils.h"
27d86ed7fbStbbdev #include "detail/_containers_helpers.h"
2849e08aacStbbdev #include "cache_aligned_allocator.h"
2949e08aacStbbdev #include <vector>
3049e08aacStbbdev #include <iterator>
3149e08aacStbbdev #include <functional>
3249e08aacStbbdev #include <utility>
3349e08aacStbbdev #include <initializer_list>
3449e08aacStbbdev #include <type_traits>
3549e08aacStbbdev
3649e08aacStbbdev namespace tbb {
3749e08aacStbbdev namespace detail {
3849e08aacStbbdev namespace d1 {
3949e08aacStbbdev
4049e08aacStbbdev template <typename T, typename Compare = std::less<T>, typename Allocator = cache_aligned_allocator<T>>
4149e08aacStbbdev class concurrent_priority_queue {
4249e08aacStbbdev public:
4349e08aacStbbdev using value_type = T;
4449e08aacStbbdev using reference = T&;
4549e08aacStbbdev using const_reference = const T&;
4649e08aacStbbdev
4749e08aacStbbdev using size_type = std::size_t;
4849e08aacStbbdev using difference_type = std::ptrdiff_t;
4949e08aacStbbdev
5049e08aacStbbdev using allocator_type = Allocator;
5149e08aacStbbdev
concurrent_priority_queue()5249e08aacStbbdev concurrent_priority_queue() : concurrent_priority_queue(allocator_type{}) {}
5349e08aacStbbdev
concurrent_priority_queue(const allocator_type & alloc)5449e08aacStbbdev explicit concurrent_priority_queue( const allocator_type& alloc )
5549e08aacStbbdev : mark(0), my_size(0), my_compare(), data(alloc)
5649e08aacStbbdev {
5749e08aacStbbdev my_aggregator.initialize_handler(functor{this});
5849e08aacStbbdev }
5949e08aacStbbdev
6049e08aacStbbdev explicit concurrent_priority_queue( const Compare& compare, const allocator_type& alloc = allocator_type() )
6149e08aacStbbdev : mark(0), my_size(0), my_compare(compare), data(alloc)
6249e08aacStbbdev {
6349e08aacStbbdev my_aggregator.initialize_handler(functor{this});
6449e08aacStbbdev }
6549e08aacStbbdev
6649e08aacStbbdev explicit concurrent_priority_queue( size_type init_capacity, const allocator_type& alloc = allocator_type() )
6749e08aacStbbdev : mark(0), my_size(0), my_compare(), data(alloc)
6849e08aacStbbdev {
6949e08aacStbbdev data.reserve(init_capacity);
7049e08aacStbbdev my_aggregator.initialize_handler(functor{this});
7149e08aacStbbdev }
7249e08aacStbbdev
7349e08aacStbbdev explicit concurrent_priority_queue( size_type init_capacity, const Compare& compare, const allocator_type& alloc = allocator_type() )
7449e08aacStbbdev : mark(0), my_size(0), my_compare(compare), data(alloc)
7549e08aacStbbdev {
7649e08aacStbbdev data.reserve(init_capacity);
7749e08aacStbbdev my_aggregator.initialize_handler(functor{this});
7849e08aacStbbdev }
7949e08aacStbbdev
8049e08aacStbbdev template <typename InputIterator>
8149e08aacStbbdev concurrent_priority_queue( InputIterator begin, InputIterator end, const Compare& compare, const allocator_type& alloc = allocator_type() )
8249e08aacStbbdev : mark(0), my_compare(compare), data(begin, end, alloc)
8349e08aacStbbdev {
8449e08aacStbbdev my_aggregator.initialize_handler(functor{this});
8549e08aacStbbdev heapify();
8649e08aacStbbdev my_size.store(data.size(), std::memory_order_relaxed);
8749e08aacStbbdev }
8849e08aacStbbdev
8949e08aacStbbdev template <typename InputIterator>
9049e08aacStbbdev concurrent_priority_queue( InputIterator begin, InputIterator end, const allocator_type& alloc = allocator_type() )
concurrent_priority_queue(begin,end,Compare (),alloc)9149e08aacStbbdev : concurrent_priority_queue(begin, end, Compare(), alloc) {}
9249e08aacStbbdev
9349e08aacStbbdev concurrent_priority_queue( std::initializer_list<value_type> init, const Compare& compare, const allocator_type& alloc = allocator_type() )
9449e08aacStbbdev : concurrent_priority_queue(init.begin(), init.end(), compare, alloc) {}
9549e08aacStbbdev
9649e08aacStbbdev concurrent_priority_queue( std::initializer_list<value_type> init, const allocator_type& alloc = allocator_type() )
concurrent_priority_queue(init,Compare (),alloc)9749e08aacStbbdev : concurrent_priority_queue(init, Compare(), alloc) {}
9849e08aacStbbdev
concurrent_priority_queue(const concurrent_priority_queue & other)9949e08aacStbbdev concurrent_priority_queue( const concurrent_priority_queue& other )
10049e08aacStbbdev : mark(other.mark), my_size(other.my_size.load(std::memory_order_relaxed)), my_compare(other.my_compare),
10149e08aacStbbdev data(other.data)
10249e08aacStbbdev {
10349e08aacStbbdev my_aggregator.initialize_handler(functor{this});
10449e08aacStbbdev }
10549e08aacStbbdev
concurrent_priority_queue(const concurrent_priority_queue & other,const allocator_type & alloc)10649e08aacStbbdev concurrent_priority_queue( const concurrent_priority_queue& other, const allocator_type& alloc )
10749e08aacStbbdev : mark(other.mark), my_size(other.my_size.load(std::memory_order_relaxed)), my_compare(other.my_compare),
10849e08aacStbbdev data(other.data, alloc)
10949e08aacStbbdev {
11049e08aacStbbdev my_aggregator.initialize_handler(functor{this});
11149e08aacStbbdev }
11249e08aacStbbdev
concurrent_priority_queue(concurrent_priority_queue && other)11349e08aacStbbdev concurrent_priority_queue( concurrent_priority_queue&& other )
11449e08aacStbbdev : mark(other.mark), my_size(other.my_size.load(std::memory_order_relaxed)), my_compare(other.my_compare),
11549e08aacStbbdev data(std::move(other.data))
11649e08aacStbbdev {
11749e08aacStbbdev my_aggregator.initialize_handler(functor{this});
11849e08aacStbbdev }
11949e08aacStbbdev
concurrent_priority_queue(concurrent_priority_queue && other,const allocator_type & alloc)12049e08aacStbbdev concurrent_priority_queue( concurrent_priority_queue&& other, const allocator_type& alloc )
12149e08aacStbbdev : mark(other.mark), my_size(other.my_size.load(std::memory_order_relaxed)), my_compare(other.my_compare),
12249e08aacStbbdev data(std::move(other.data), alloc)
12349e08aacStbbdev {
12449e08aacStbbdev my_aggregator.initialize_handler(functor{this});
12549e08aacStbbdev }
12649e08aacStbbdev
12749e08aacStbbdev concurrent_priority_queue& operator=( const concurrent_priority_queue& other ) {
12849e08aacStbbdev if (this != &other) {
12949e08aacStbbdev data = other.data;
13049e08aacStbbdev mark = other.mark;
13149e08aacStbbdev my_size.store(other.my_size.load(std::memory_order_relaxed), std::memory_order_relaxed);
13249e08aacStbbdev }
13349e08aacStbbdev return *this;
13449e08aacStbbdev }
13549e08aacStbbdev
13649e08aacStbbdev concurrent_priority_queue& operator=( concurrent_priority_queue&& other ) {
13749e08aacStbbdev if (this != &other) {
13849e08aacStbbdev // TODO: check if exceptions from std::vector::operator=(vector&&) should be handled separately
13949e08aacStbbdev data = std::move(other.data);
14049e08aacStbbdev mark = other.mark;
14149e08aacStbbdev my_size.store(other.my_size.load(std::memory_order_relaxed), std::memory_order_relaxed);
14249e08aacStbbdev }
14349e08aacStbbdev return *this;
14449e08aacStbbdev }
14549e08aacStbbdev
14649e08aacStbbdev concurrent_priority_queue& operator=( std::initializer_list<value_type> init ) {
14749e08aacStbbdev assign(init.begin(), init.end());
14849e08aacStbbdev return *this;
14949e08aacStbbdev }
15049e08aacStbbdev
15149e08aacStbbdev template <typename InputIterator>
assign(InputIterator begin,InputIterator end)15249e08aacStbbdev void assign( InputIterator begin, InputIterator end ) {
15349e08aacStbbdev data.assign(begin, end);
15449e08aacStbbdev mark = 0;
15549e08aacStbbdev my_size.store(data.size(), std::memory_order_relaxed);
15649e08aacStbbdev heapify();
15749e08aacStbbdev }
15849e08aacStbbdev
assign(std::initializer_list<value_type> init)15949e08aacStbbdev void assign( std::initializer_list<value_type> init ) {
16049e08aacStbbdev assign(init.begin(), init.end());
16149e08aacStbbdev }
16249e08aacStbbdev
16349e08aacStbbdev /* Returned value may not reflect results of pending operations.
16449e08aacStbbdev This operation reads shared data and will trigger a race condition. */
empty()165b15aabb3Stbbdev __TBB_nodiscard bool empty() const { return size() == 0; }
16649e08aacStbbdev
16749e08aacStbbdev // Returns the current number of elements contained in the queue
16849e08aacStbbdev /* Returned value may not reflect results of pending operations.
16949e08aacStbbdev This operation reads shared data and will trigger a race condition. */
size()17049e08aacStbbdev size_type size() const { return my_size.load(std::memory_order_relaxed); }
17149e08aacStbbdev
17249e08aacStbbdev /* This operation can be safely used concurrently with other push, try_pop or emplace operations. */
push(const value_type & value)17349e08aacStbbdev void push( const value_type& value ) {
17449e08aacStbbdev cpq_operation op_data(value, PUSH_OP);
17549e08aacStbbdev my_aggregator.execute(&op_data);
17649e08aacStbbdev if (op_data.status == FAILED)
17749e08aacStbbdev throw_exception(exception_id::bad_alloc);
17849e08aacStbbdev }
17949e08aacStbbdev
18049e08aacStbbdev /* This operation can be safely used concurrently with other push, try_pop or emplace operations. */
push(value_type && value)18149e08aacStbbdev void push( value_type&& value ) {
18249e08aacStbbdev cpq_operation op_data(value, PUSH_RVALUE_OP);
18349e08aacStbbdev my_aggregator.execute(&op_data);
18449e08aacStbbdev if (op_data.status == FAILED)
18549e08aacStbbdev throw_exception(exception_id::bad_alloc);
18649e08aacStbbdev }
18749e08aacStbbdev
18849e08aacStbbdev /* This operation can be safely used concurrently with other push, try_pop or emplace operations. */
18949e08aacStbbdev template <typename... Args>
emplace(Args &&...args)19049e08aacStbbdev void emplace( Args&&... args ) {
19149e08aacStbbdev // TODO: support uses allocator construction in this place
19249e08aacStbbdev push(value_type(std::forward<Args>(args)...));
19349e08aacStbbdev }
19449e08aacStbbdev
19549e08aacStbbdev // Gets a reference to and removes highest priority element
19649e08aacStbbdev /* If a highest priority element was found, sets elem and returns true,
19749e08aacStbbdev otherwise returns false.
19849e08aacStbbdev This operation can be safely used concurrently with other push, try_pop or emplace operations. */
try_pop(value_type & value)19949e08aacStbbdev bool try_pop( value_type& value ) {
20049e08aacStbbdev cpq_operation op_data(value, POP_OP);
20149e08aacStbbdev my_aggregator.execute(&op_data);
20249e08aacStbbdev return op_data.status == SUCCEEDED;
20349e08aacStbbdev }
20449e08aacStbbdev
20549e08aacStbbdev // This operation affects the whole container => it is not thread-safe
clear()20649e08aacStbbdev void clear() {
20749e08aacStbbdev data.clear();
20849e08aacStbbdev mark = 0;
20949e08aacStbbdev my_size.store(0, std::memory_order_relaxed);
21049e08aacStbbdev }
21149e08aacStbbdev
21249e08aacStbbdev // This operation affects the whole container => it is not thread-safe
swap(concurrent_priority_queue & other)21349e08aacStbbdev void swap( concurrent_priority_queue& other ) {
21449e08aacStbbdev if (this != &other) {
21549e08aacStbbdev using std::swap;
21649e08aacStbbdev swap(data, other.data);
21749e08aacStbbdev swap(mark, other.mark);
21849e08aacStbbdev
21949e08aacStbbdev size_type sz = my_size.load(std::memory_order_relaxed);
22049e08aacStbbdev my_size.store(other.my_size.load(std::memory_order_relaxed), std::memory_order_relaxed);
22149e08aacStbbdev other.my_size.store(sz, std::memory_order_relaxed);
22249e08aacStbbdev }
22349e08aacStbbdev }
22449e08aacStbbdev
get_allocator()22549e08aacStbbdev allocator_type get_allocator() const { return data.get_allocator(); }
22649e08aacStbbdev private:
22749e08aacStbbdev enum operation_type {INVALID_OP, PUSH_OP, POP_OP, PUSH_RVALUE_OP};
22849e08aacStbbdev enum operation_status {WAIT = 0, SUCCEEDED, FAILED};
22949e08aacStbbdev
23049e08aacStbbdev class cpq_operation : public aggregated_operation<cpq_operation> {
23149e08aacStbbdev public:
23249e08aacStbbdev operation_type type;
23349e08aacStbbdev union {
23449e08aacStbbdev value_type* elem;
23549e08aacStbbdev size_type sz;
23649e08aacStbbdev };
cpq_operation(const value_type & value,operation_type t)23749e08aacStbbdev cpq_operation( const value_type& value, operation_type t )
23849e08aacStbbdev : type(t), elem(const_cast<value_type*>(&value)) {}
23949e08aacStbbdev }; // class cpq_operation
24049e08aacStbbdev
24149e08aacStbbdev class functor {
24249e08aacStbbdev concurrent_priority_queue* my_cpq;
24349e08aacStbbdev public:
functor()24449e08aacStbbdev functor() : my_cpq(nullptr) {}
functor(concurrent_priority_queue * cpq)24549e08aacStbbdev functor( concurrent_priority_queue* cpq ) : my_cpq(cpq) {}
24649e08aacStbbdev
operator()24749e08aacStbbdev void operator()(cpq_operation* op_list) {
24849e08aacStbbdev __TBB_ASSERT(my_cpq != nullptr, "Invalid functor");
24949e08aacStbbdev my_cpq->handle_operations(op_list);
25049e08aacStbbdev }
25149e08aacStbbdev }; // class functor
25249e08aacStbbdev
handle_operations(cpq_operation * op_list)25349e08aacStbbdev void handle_operations( cpq_operation* op_list ) {
25449e08aacStbbdev call_itt_notify(acquired, this);
25549e08aacStbbdev cpq_operation* tmp, *pop_list = nullptr;
25657f524caSIlya Isaev __TBB_ASSERT(mark == data.size(), nullptr);
25749e08aacStbbdev
25849e08aacStbbdev // First pass processes all constant (amortized; reallocation may happen) time pushes and pops.
25949e08aacStbbdev while(op_list) {
26049e08aacStbbdev // ITT note: &(op_list->status) tag is used to cover accesses to op_list
26149e08aacStbbdev // node. This thread is going to handle the operation, and so will acquire it
26249e08aacStbbdev // and perform the associated operation w/o triggering a race condition; the
26349e08aacStbbdev // thread that created the operation is waiting on the status field, so when
26449e08aacStbbdev // this thread is done with the operation, it will perform a
26549e08aacStbbdev // store_with_release to give control back to the waiting thread in
26649e08aacStbbdev // aggregator::insert_operation.
26749e08aacStbbdev // TODO: enable
26849e08aacStbbdev call_itt_notify(acquired, &(op_list->status));
26957f524caSIlya Isaev __TBB_ASSERT(op_list->type != INVALID_OP, nullptr);
27049e08aacStbbdev
27149e08aacStbbdev tmp = op_list;
27249e08aacStbbdev op_list = op_list->next.load(std::memory_order_relaxed);
27349e08aacStbbdev if (tmp->type == POP_OP) {
27449e08aacStbbdev if (mark < data.size() &&
27549e08aacStbbdev my_compare(data[0], data.back()))
27649e08aacStbbdev {
27749e08aacStbbdev // there are newly pushed elems and the last one is higher than top
27849e08aacStbbdev *(tmp->elem) = std::move(data.back());
27949e08aacStbbdev my_size.store(my_size.load(std::memory_order_relaxed) - 1, std::memory_order_relaxed);
28049e08aacStbbdev tmp->status.store(uintptr_t(SUCCEEDED), std::memory_order_release);
28149e08aacStbbdev
28249e08aacStbbdev data.pop_back();
28357f524caSIlya Isaev __TBB_ASSERT(mark <= data.size(), nullptr);
28449e08aacStbbdev } else { // no convenient item to pop; postpone
28549e08aacStbbdev tmp->next.store(pop_list, std::memory_order_relaxed);
28649e08aacStbbdev pop_list = tmp;
28749e08aacStbbdev }
28849e08aacStbbdev } else { // PUSH_OP or PUSH_RVALUE_OP
28949e08aacStbbdev __TBB_ASSERT(tmp->type == PUSH_OP || tmp->type == PUSH_RVALUE_OP, "Unknown operation");
29049e08aacStbbdev #if TBB_USE_EXCEPTIONS
29149e08aacStbbdev try
29249e08aacStbbdev #endif
29349e08aacStbbdev {
29449e08aacStbbdev if (tmp->type == PUSH_OP) {
29549e08aacStbbdev push_back_helper(*(tmp->elem));
29649e08aacStbbdev } else {
29749e08aacStbbdev data.push_back(std::move(*(tmp->elem)));
29849e08aacStbbdev }
29949e08aacStbbdev my_size.store(my_size.load(std::memory_order_relaxed) + 1, std::memory_order_relaxed);
30049e08aacStbbdev tmp->status.store(uintptr_t(SUCCEEDED), std::memory_order_release);
30149e08aacStbbdev }
30249e08aacStbbdev #if TBB_USE_EXCEPTIONS
30349e08aacStbbdev catch(...) {
30449e08aacStbbdev tmp->status.store(uintptr_t(FAILED), std::memory_order_release);
30549e08aacStbbdev }
30649e08aacStbbdev #endif
30749e08aacStbbdev }
30849e08aacStbbdev }
30949e08aacStbbdev
31049e08aacStbbdev // Second pass processes pop operations
31149e08aacStbbdev while(pop_list) {
31249e08aacStbbdev tmp = pop_list;
31349e08aacStbbdev pop_list = pop_list->next.load(std::memory_order_relaxed);
31457f524caSIlya Isaev __TBB_ASSERT(tmp->type == POP_OP, nullptr);
31549e08aacStbbdev if (data.empty()) {
31649e08aacStbbdev tmp->status.store(uintptr_t(FAILED), std::memory_order_release);
31749e08aacStbbdev } else {
31857f524caSIlya Isaev __TBB_ASSERT(mark <= data.size(), nullptr);
31949e08aacStbbdev if (mark < data.size() &&
32049e08aacStbbdev my_compare(data[0], data.back()))
32149e08aacStbbdev {
32249e08aacStbbdev // there are newly pushed elems and the last one is higher than top
32349e08aacStbbdev *(tmp->elem) = std::move(data.back());
32449e08aacStbbdev my_size.store(my_size.load(std::memory_order_relaxed) - 1, std::memory_order_relaxed);
32549e08aacStbbdev tmp->status.store(uintptr_t(SUCCEEDED), std::memory_order_release);
32649e08aacStbbdev data.pop_back();
32749e08aacStbbdev } else { // extract top and push last element down heap
32849e08aacStbbdev *(tmp->elem) = std::move(data[0]);
32949e08aacStbbdev my_size.store(my_size.load(std::memory_order_relaxed) - 1, std::memory_order_relaxed);
33049e08aacStbbdev tmp->status.store(uintptr_t(SUCCEEDED), std::memory_order_release);
33149e08aacStbbdev reheap();
33249e08aacStbbdev }
33349e08aacStbbdev }
33449e08aacStbbdev }
33549e08aacStbbdev
33649e08aacStbbdev // heapify any leftover pushed elements before doing the next
33749e08aacStbbdev // batch of operations
33849e08aacStbbdev if (mark < data.size()) heapify();
33957f524caSIlya Isaev __TBB_ASSERT(mark == data.size(), nullptr);
34049e08aacStbbdev call_itt_notify(releasing, this);
34149e08aacStbbdev }
34249e08aacStbbdev
34349e08aacStbbdev // Merge unsorted elements into heap
heapify()34449e08aacStbbdev void heapify() {
34549e08aacStbbdev if (!mark && data.size() > 0) mark = 1;
34649e08aacStbbdev for (; mark < data.size(); ++mark) {
34749e08aacStbbdev // for each unheapified element under size
34849e08aacStbbdev size_type cur_pos = mark;
34949e08aacStbbdev value_type to_place = std::move(data[mark]);
35049e08aacStbbdev do { // push to_place up the heap
35149e08aacStbbdev size_type parent = (cur_pos - 1) >> 1;
35249e08aacStbbdev if (!my_compare(data[parent], to_place))
35349e08aacStbbdev break;
35449e08aacStbbdev data[cur_pos] = std::move(data[parent]);
35549e08aacStbbdev cur_pos = parent;
35649e08aacStbbdev } while(cur_pos);
35749e08aacStbbdev data[cur_pos] = std::move(to_place);
35849e08aacStbbdev }
35949e08aacStbbdev }
36049e08aacStbbdev
36149e08aacStbbdev // Re-heapify after an extraction
36249e08aacStbbdev // Re-heapify by pushing last element down the heap from the root.
reheap()36349e08aacStbbdev void reheap() {
36449e08aacStbbdev size_type cur_pos = 0, child = 1;
36549e08aacStbbdev
36649e08aacStbbdev while(child < mark) {
36749e08aacStbbdev size_type target = child;
36849e08aacStbbdev if (child + 1 < mark && my_compare(data[child], data[child + 1]))
36949e08aacStbbdev ++target;
37049e08aacStbbdev // target now has the higher priority child
37149e08aacStbbdev if (my_compare(data[target], data.back()))
37249e08aacStbbdev break;
37349e08aacStbbdev data[cur_pos] = std::move(data[target]);
37449e08aacStbbdev cur_pos = target;
37549e08aacStbbdev child = (cur_pos << 1) + 1;
37649e08aacStbbdev }
37749e08aacStbbdev if (cur_pos != data.size() - 1)
37849e08aacStbbdev data[cur_pos] = std::move(data.back());
37949e08aacStbbdev data.pop_back();
38049e08aacStbbdev if (mark > data.size()) mark = data.size();
38149e08aacStbbdev }
38249e08aacStbbdev
push_back_helper(const T & value)38349e08aacStbbdev void push_back_helper( const T& value ) {
38449e08aacStbbdev push_back_helper_impl(value, std::is_copy_constructible<T>{});
38549e08aacStbbdev }
38649e08aacStbbdev
push_back_helper_impl(const T & value,std::true_type)38749e08aacStbbdev void push_back_helper_impl( const T& value, /*is_copy_constructible = */std::true_type ) {
38849e08aacStbbdev data.push_back(value);
38949e08aacStbbdev }
39049e08aacStbbdev
push_back_helper_impl(const T &,std::false_type)39149e08aacStbbdev void push_back_helper_impl( const T&, /*is_copy_constructible = */std::false_type ) {
39249e08aacStbbdev __TBB_ASSERT(false, "error: calling tbb::concurrent_priority_queue.push(const value_type&) for move-only type");
39349e08aacStbbdev }
39449e08aacStbbdev
39549e08aacStbbdev using aggregator_type = aggregator<functor, cpq_operation>;
39649e08aacStbbdev
39749e08aacStbbdev aggregator_type my_aggregator;
39849e08aacStbbdev // Padding added to avoid false sharing
39949e08aacStbbdev char padding1[max_nfs_size - sizeof(aggregator_type)];
40049e08aacStbbdev // The point at which unsorted elements begin
40149e08aacStbbdev size_type mark;
40249e08aacStbbdev std::atomic<size_type> my_size;
40349e08aacStbbdev Compare my_compare;
40449e08aacStbbdev
40549e08aacStbbdev // Padding added to avoid false sharing
40649e08aacStbbdev char padding2[max_nfs_size - (2*sizeof(size_type)) - sizeof(Compare)];
40749e08aacStbbdev //! Storage for the heap of elements in queue, plus unheapified elements
40849e08aacStbbdev /** data has the following structure:
40949e08aacStbbdev
41049e08aacStbbdev binary unheapified
41149e08aacStbbdev heap elements
41249e08aacStbbdev ____|_______|____
41349e08aacStbbdev | | |
41449e08aacStbbdev v v v
41549e08aacStbbdev [_|...|_|_|...|_| |...| ]
41649e08aacStbbdev 0 ^ ^ ^
41749e08aacStbbdev | | |__capacity
41849e08aacStbbdev | |__my_size
41949e08aacStbbdev |__mark
42049e08aacStbbdev
42149e08aacStbbdev Thus, data stores the binary heap starting at position 0 through
42249e08aacStbbdev mark-1 (it may be empty). Then there are 0 or more elements
42349e08aacStbbdev that have not yet been inserted into the heap, in positions
42449e08aacStbbdev mark through my_size-1. */
42549e08aacStbbdev
42649e08aacStbbdev using vector_type = std::vector<value_type, allocator_type>;
42749e08aacStbbdev vector_type data;
42849e08aacStbbdev
429b15aabb3Stbbdev friend bool operator==( const concurrent_priority_queue& lhs,
430b15aabb3Stbbdev const concurrent_priority_queue& rhs )
431b15aabb3Stbbdev {
432b15aabb3Stbbdev return lhs.data == rhs.data;
433b15aabb3Stbbdev }
434b15aabb3Stbbdev
435b15aabb3Stbbdev #if !__TBB_CPP20_COMPARISONS_PRESENT
436b15aabb3Stbbdev friend bool operator!=( const concurrent_priority_queue& lhs,
437b15aabb3Stbbdev const concurrent_priority_queue& rhs )
438b15aabb3Stbbdev {
439b15aabb3Stbbdev return !(lhs == rhs);
440b15aabb3Stbbdev }
441b15aabb3Stbbdev #endif
44249e08aacStbbdev }; // class concurrent_priority_queue
44349e08aacStbbdev
44449e08aacStbbdev #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
445d86ed7fbStbbdev template <typename It,
446d86ed7fbStbbdev typename Comp = std::less<iterator_value_t<It>>,
447d86ed7fbStbbdev typename Alloc = tbb::cache_aligned_allocator<iterator_value_t<It>>,
448d86ed7fbStbbdev typename = std::enable_if_t<is_input_iterator_v<It>>,
449d86ed7fbStbbdev typename = std::enable_if_t<is_allocator_v<Alloc>>,
450d86ed7fbStbbdev typename = std::enable_if_t<!is_allocator_v<Comp>>>
451d86ed7fbStbbdev concurrent_priority_queue( It, It, Comp = Comp(), Alloc = Alloc() )
452d86ed7fbStbbdev -> concurrent_priority_queue<iterator_value_t<It>, Comp, Alloc>;
45349e08aacStbbdev
454d86ed7fbStbbdev template <typename It, typename Alloc,
455d86ed7fbStbbdev typename = std::enable_if_t<is_input_iterator_v<It>>,
456d86ed7fbStbbdev typename = std::enable_if_t<is_allocator_v<Alloc>>>
457d86ed7fbStbbdev concurrent_priority_queue( It, It, Alloc )
458d86ed7fbStbbdev -> concurrent_priority_queue<iterator_value_t<It>, std::less<iterator_value_t<It>>, Alloc>;
45949e08aacStbbdev
460d86ed7fbStbbdev template <typename T,
461d86ed7fbStbbdev typename Comp = std::less<T>,
462d86ed7fbStbbdev typename Alloc = tbb::cache_aligned_allocator<T>,
463d86ed7fbStbbdev typename = std::enable_if_t<is_allocator_v<Alloc>>,
464d86ed7fbStbbdev typename = std::enable_if_t<!is_allocator_v<Comp>>>
465d86ed7fbStbbdev concurrent_priority_queue( std::initializer_list<T>, Comp = Comp(), Alloc = Alloc() )
466d86ed7fbStbbdev -> concurrent_priority_queue<T, Comp, Alloc>;
467d86ed7fbStbbdev
468d86ed7fbStbbdev template <typename T, typename Alloc,
469d86ed7fbStbbdev typename = std::enable_if_t<is_allocator_v<Alloc>>>
470d86ed7fbStbbdev concurrent_priority_queue( std::initializer_list<T>, Alloc )
471d86ed7fbStbbdev -> concurrent_priority_queue<T, std::less<T>, Alloc>;
472d86ed7fbStbbdev
47349e08aacStbbdev #endif // __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
47449e08aacStbbdev
47549e08aacStbbdev template <typename T, typename Compare, typename Allocator>
swap(concurrent_priority_queue<T,Compare,Allocator> & lhs,concurrent_priority_queue<T,Compare,Allocator> & rhs)47649e08aacStbbdev void swap( concurrent_priority_queue<T, Compare, Allocator>& lhs,
47749e08aacStbbdev concurrent_priority_queue<T, Compare, Allocator>& rhs )
47849e08aacStbbdev {
47949e08aacStbbdev lhs.swap(rhs);
48049e08aacStbbdev }
48149e08aacStbbdev
48249e08aacStbbdev } // namespace d1
48349e08aacStbbdev } // namespace detail
48449e08aacStbbdev inline namespace v1 {
48549e08aacStbbdev using detail::d1::concurrent_priority_queue;
48649e08aacStbbdev
48749e08aacStbbdev } // inline namespace v1
48849e08aacStbbdev } // namespace tbb
48949e08aacStbbdev
49049e08aacStbbdev #endif // __TBB_concurrent_priority_queue_H
491