1 /*
2 Copyright (c) 2005-2022 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16
17 #ifndef __TBB_concurrent_priority_queue_H
18 #define __TBB_concurrent_priority_queue_H
19
20 #include "detail/_namespace_injection.h"
21 #include "detail/_aggregator.h"
22 #include "detail/_template_helpers.h"
23 #include "detail/_allocator_traits.h"
24 #include "detail/_range_common.h"
25 #include "detail/_exception.h"
26 #include "detail/_utils.h"
27 #include "detail/_containers_helpers.h"
28 #include "cache_aligned_allocator.h"
29 #include <vector>
30 #include <iterator>
31 #include <functional>
32 #include <utility>
33 #include <initializer_list>
34 #include <type_traits>
35
36 namespace tbb {
37 namespace detail {
38 namespace d1 {
39
40 template <typename T, typename Compare = std::less<T>, typename Allocator = cache_aligned_allocator<T>>
41 class concurrent_priority_queue {
42 public:
43 using value_type = T;
44 using reference = T&;
45 using const_reference = const T&;
46
47 using size_type = std::size_t;
48 using difference_type = std::ptrdiff_t;
49
50 using allocator_type = Allocator;
51
concurrent_priority_queue()52 concurrent_priority_queue() : concurrent_priority_queue(allocator_type{}) {}
53
concurrent_priority_queue(const allocator_type & alloc)54 explicit concurrent_priority_queue( const allocator_type& alloc )
55 : mark(0), my_size(0), my_compare(), data(alloc)
56 {
57 my_aggregator.initialize_handler(functor{this});
58 }
59
60 explicit concurrent_priority_queue( const Compare& compare, const allocator_type& alloc = allocator_type() )
61 : mark(0), my_size(0), my_compare(compare), data(alloc)
62 {
63 my_aggregator.initialize_handler(functor{this});
64 }
65
66 explicit concurrent_priority_queue( size_type init_capacity, const allocator_type& alloc = allocator_type() )
67 : mark(0), my_size(0), my_compare(), data(alloc)
68 {
69 data.reserve(init_capacity);
70 my_aggregator.initialize_handler(functor{this});
71 }
72
73 explicit concurrent_priority_queue( size_type init_capacity, const Compare& compare, const allocator_type& alloc = allocator_type() )
74 : mark(0), my_size(0), my_compare(compare), data(alloc)
75 {
76 data.reserve(init_capacity);
77 my_aggregator.initialize_handler(functor{this});
78 }
79
80 template <typename InputIterator>
81 concurrent_priority_queue( InputIterator begin, InputIterator end, const Compare& compare, const allocator_type& alloc = allocator_type() )
82 : mark(0), my_compare(compare), data(begin, end, alloc)
83 {
84 my_aggregator.initialize_handler(functor{this});
85 heapify();
86 my_size.store(data.size(), std::memory_order_relaxed);
87 }
88
89 template <typename InputIterator>
90 concurrent_priority_queue( InputIterator begin, InputIterator end, const allocator_type& alloc = allocator_type() )
concurrent_priority_queue(begin,end,Compare (),alloc)91 : concurrent_priority_queue(begin, end, Compare(), alloc) {}
92
93 concurrent_priority_queue( std::initializer_list<value_type> init, const Compare& compare, const allocator_type& alloc = allocator_type() )
94 : concurrent_priority_queue(init.begin(), init.end(), compare, alloc) {}
95
96 concurrent_priority_queue( std::initializer_list<value_type> init, const allocator_type& alloc = allocator_type() )
concurrent_priority_queue(init,Compare (),alloc)97 : concurrent_priority_queue(init, Compare(), alloc) {}
98
concurrent_priority_queue(const concurrent_priority_queue & other)99 concurrent_priority_queue( const concurrent_priority_queue& other )
100 : mark(other.mark), my_size(other.my_size.load(std::memory_order_relaxed)), my_compare(other.my_compare),
101 data(other.data)
102 {
103 my_aggregator.initialize_handler(functor{this});
104 }
105
concurrent_priority_queue(const concurrent_priority_queue & other,const allocator_type & alloc)106 concurrent_priority_queue( const concurrent_priority_queue& other, const allocator_type& alloc )
107 : mark(other.mark), my_size(other.my_size.load(std::memory_order_relaxed)), my_compare(other.my_compare),
108 data(other.data, alloc)
109 {
110 my_aggregator.initialize_handler(functor{this});
111 }
112
concurrent_priority_queue(concurrent_priority_queue && other)113 concurrent_priority_queue( concurrent_priority_queue&& other )
114 : mark(other.mark), my_size(other.my_size.load(std::memory_order_relaxed)), my_compare(other.my_compare),
115 data(std::move(other.data))
116 {
117 my_aggregator.initialize_handler(functor{this});
118 }
119
concurrent_priority_queue(concurrent_priority_queue && other,const allocator_type & alloc)120 concurrent_priority_queue( concurrent_priority_queue&& other, const allocator_type& alloc )
121 : mark(other.mark), my_size(other.my_size.load(std::memory_order_relaxed)), my_compare(other.my_compare),
122 data(std::move(other.data), alloc)
123 {
124 my_aggregator.initialize_handler(functor{this});
125 }
126
127 concurrent_priority_queue& operator=( const concurrent_priority_queue& other ) {
128 if (this != &other) {
129 data = other.data;
130 mark = other.mark;
131 my_size.store(other.my_size.load(std::memory_order_relaxed), std::memory_order_relaxed);
132 }
133 return *this;
134 }
135
136 concurrent_priority_queue& operator=( concurrent_priority_queue&& other ) {
137 if (this != &other) {
138 // TODO: check if exceptions from std::vector::operator=(vector&&) should be handled separately
139 data = std::move(other.data);
140 mark = other.mark;
141 my_size.store(other.my_size.load(std::memory_order_relaxed), std::memory_order_relaxed);
142 }
143 return *this;
144 }
145
146 concurrent_priority_queue& operator=( std::initializer_list<value_type> init ) {
147 assign(init.begin(), init.end());
148 return *this;
149 }
150
151 template <typename InputIterator>
assign(InputIterator begin,InputIterator end)152 void assign( InputIterator begin, InputIterator end ) {
153 data.assign(begin, end);
154 mark = 0;
155 my_size.store(data.size(), std::memory_order_relaxed);
156 heapify();
157 }
158
assign(std::initializer_list<value_type> init)159 void assign( std::initializer_list<value_type> init ) {
160 assign(init.begin(), init.end());
161 }
162
163 /* Returned value may not reflect results of pending operations.
164 This operation reads shared data and will trigger a race condition. */
empty()165 __TBB_nodiscard bool empty() const { return size() == 0; }
166
167 // Returns the current number of elements contained in the queue
168 /* Returned value may not reflect results of pending operations.
169 This operation reads shared data and will trigger a race condition. */
size()170 size_type size() const { return my_size.load(std::memory_order_relaxed); }
171
172 /* This operation can be safely used concurrently with other push, try_pop or emplace operations. */
push(const value_type & value)173 void push( const value_type& value ) {
174 cpq_operation op_data(value, PUSH_OP);
175 my_aggregator.execute(&op_data);
176 if (op_data.status == FAILED)
177 throw_exception(exception_id::bad_alloc);
178 }
179
180 /* This operation can be safely used concurrently with other push, try_pop or emplace operations. */
push(value_type && value)181 void push( value_type&& value ) {
182 cpq_operation op_data(value, PUSH_RVALUE_OP);
183 my_aggregator.execute(&op_data);
184 if (op_data.status == FAILED)
185 throw_exception(exception_id::bad_alloc);
186 }
187
188 /* This operation can be safely used concurrently with other push, try_pop or emplace operations. */
189 template <typename... Args>
emplace(Args &&...args)190 void emplace( Args&&... args ) {
191 // TODO: support uses allocator construction in this place
192 push(value_type(std::forward<Args>(args)...));
193 }
194
195 // Gets a reference to and removes highest priority element
196 /* If a highest priority element was found, sets elem and returns true,
197 otherwise returns false.
198 This operation can be safely used concurrently with other push, try_pop or emplace operations. */
try_pop(value_type & value)199 bool try_pop( value_type& value ) {
200 cpq_operation op_data(value, POP_OP);
201 my_aggregator.execute(&op_data);
202 return op_data.status == SUCCEEDED;
203 }
204
205 // This operation affects the whole container => it is not thread-safe
clear()206 void clear() {
207 data.clear();
208 mark = 0;
209 my_size.store(0, std::memory_order_relaxed);
210 }
211
212 // This operation affects the whole container => it is not thread-safe
swap(concurrent_priority_queue & other)213 void swap( concurrent_priority_queue& other ) {
214 if (this != &other) {
215 using std::swap;
216 swap(data, other.data);
217 swap(mark, other.mark);
218
219 size_type sz = my_size.load(std::memory_order_relaxed);
220 my_size.store(other.my_size.load(std::memory_order_relaxed), std::memory_order_relaxed);
221 other.my_size.store(sz, std::memory_order_relaxed);
222 }
223 }
224
get_allocator()225 allocator_type get_allocator() const { return data.get_allocator(); }
226 private:
227 enum operation_type {INVALID_OP, PUSH_OP, POP_OP, PUSH_RVALUE_OP};
228 enum operation_status {WAIT = 0, SUCCEEDED, FAILED};
229
230 class cpq_operation : public aggregated_operation<cpq_operation> {
231 public:
232 operation_type type;
233 union {
234 value_type* elem;
235 size_type sz;
236 };
cpq_operation(const value_type & value,operation_type t)237 cpq_operation( const value_type& value, operation_type t )
238 : type(t), elem(const_cast<value_type*>(&value)) {}
239 }; // class cpq_operation
240
241 class functor {
242 concurrent_priority_queue* my_cpq;
243 public:
functor()244 functor() : my_cpq(nullptr) {}
functor(concurrent_priority_queue * cpq)245 functor( concurrent_priority_queue* cpq ) : my_cpq(cpq) {}
246
operator()247 void operator()(cpq_operation* op_list) {
248 __TBB_ASSERT(my_cpq != nullptr, "Invalid functor");
249 my_cpq->handle_operations(op_list);
250 }
251 }; // class functor
252
handle_operations(cpq_operation * op_list)253 void handle_operations( cpq_operation* op_list ) {
254 call_itt_notify(acquired, this);
255 cpq_operation* tmp, *pop_list = nullptr;
256 __TBB_ASSERT(mark == data.size(), nullptr);
257
258 // First pass processes all constant (amortized; reallocation may happen) time pushes and pops.
259 while(op_list) {
260 // ITT note: &(op_list->status) tag is used to cover accesses to op_list
261 // node. This thread is going to handle the operation, and so will acquire it
262 // and perform the associated operation w/o triggering a race condition; the
263 // thread that created the operation is waiting on the status field, so when
264 // this thread is done with the operation, it will perform a
265 // store_with_release to give control back to the waiting thread in
266 // aggregator::insert_operation.
267 // TODO: enable
268 call_itt_notify(acquired, &(op_list->status));
269 __TBB_ASSERT(op_list->type != INVALID_OP, nullptr);
270
271 tmp = op_list;
272 op_list = op_list->next.load(std::memory_order_relaxed);
273 if (tmp->type == POP_OP) {
274 if (mark < data.size() &&
275 my_compare(data[0], data.back()))
276 {
277 // there are newly pushed elems and the last one is higher than top
278 *(tmp->elem) = std::move(data.back());
279 my_size.store(my_size.load(std::memory_order_relaxed) - 1, std::memory_order_relaxed);
280 tmp->status.store(uintptr_t(SUCCEEDED), std::memory_order_release);
281
282 data.pop_back();
283 __TBB_ASSERT(mark <= data.size(), nullptr);
284 } else { // no convenient item to pop; postpone
285 tmp->next.store(pop_list, std::memory_order_relaxed);
286 pop_list = tmp;
287 }
288 } else { // PUSH_OP or PUSH_RVALUE_OP
289 __TBB_ASSERT(tmp->type == PUSH_OP || tmp->type == PUSH_RVALUE_OP, "Unknown operation");
290 #if TBB_USE_EXCEPTIONS
291 try
292 #endif
293 {
294 if (tmp->type == PUSH_OP) {
295 push_back_helper(*(tmp->elem));
296 } else {
297 data.push_back(std::move(*(tmp->elem)));
298 }
299 my_size.store(my_size.load(std::memory_order_relaxed) + 1, std::memory_order_relaxed);
300 tmp->status.store(uintptr_t(SUCCEEDED), std::memory_order_release);
301 }
302 #if TBB_USE_EXCEPTIONS
303 catch(...) {
304 tmp->status.store(uintptr_t(FAILED), std::memory_order_release);
305 }
306 #endif
307 }
308 }
309
310 // Second pass processes pop operations
311 while(pop_list) {
312 tmp = pop_list;
313 pop_list = pop_list->next.load(std::memory_order_relaxed);
314 __TBB_ASSERT(tmp->type == POP_OP, nullptr);
315 if (data.empty()) {
316 tmp->status.store(uintptr_t(FAILED), std::memory_order_release);
317 } else {
318 __TBB_ASSERT(mark <= data.size(), nullptr);
319 if (mark < data.size() &&
320 my_compare(data[0], data.back()))
321 {
322 // there are newly pushed elems and the last one is higher than top
323 *(tmp->elem) = std::move(data.back());
324 my_size.store(my_size.load(std::memory_order_relaxed) - 1, std::memory_order_relaxed);
325 tmp->status.store(uintptr_t(SUCCEEDED), std::memory_order_release);
326 data.pop_back();
327 } else { // extract top and push last element down heap
328 *(tmp->elem) = std::move(data[0]);
329 my_size.store(my_size.load(std::memory_order_relaxed) - 1, std::memory_order_relaxed);
330 tmp->status.store(uintptr_t(SUCCEEDED), std::memory_order_release);
331 reheap();
332 }
333 }
334 }
335
336 // heapify any leftover pushed elements before doing the next
337 // batch of operations
338 if (mark < data.size()) heapify();
339 __TBB_ASSERT(mark == data.size(), nullptr);
340 call_itt_notify(releasing, this);
341 }
342
343 // Merge unsorted elements into heap
heapify()344 void heapify() {
345 if (!mark && data.size() > 0) mark = 1;
346 for (; mark < data.size(); ++mark) {
347 // for each unheapified element under size
348 size_type cur_pos = mark;
349 value_type to_place = std::move(data[mark]);
350 do { // push to_place up the heap
351 size_type parent = (cur_pos - 1) >> 1;
352 if (!my_compare(data[parent], to_place))
353 break;
354 data[cur_pos] = std::move(data[parent]);
355 cur_pos = parent;
356 } while(cur_pos);
357 data[cur_pos] = std::move(to_place);
358 }
359 }
360
361 // Re-heapify after an extraction
362 // Re-heapify by pushing last element down the heap from the root.
reheap()363 void reheap() {
364 size_type cur_pos = 0, child = 1;
365
366 while(child < mark) {
367 size_type target = child;
368 if (child + 1 < mark && my_compare(data[child], data[child + 1]))
369 ++target;
370 // target now has the higher priority child
371 if (my_compare(data[target], data.back()))
372 break;
373 data[cur_pos] = std::move(data[target]);
374 cur_pos = target;
375 child = (cur_pos << 1) + 1;
376 }
377 if (cur_pos != data.size() - 1)
378 data[cur_pos] = std::move(data.back());
379 data.pop_back();
380 if (mark > data.size()) mark = data.size();
381 }
382
push_back_helper(const T & value)383 void push_back_helper( const T& value ) {
384 push_back_helper_impl(value, std::is_copy_constructible<T>{});
385 }
386
push_back_helper_impl(const T & value,std::true_type)387 void push_back_helper_impl( const T& value, /*is_copy_constructible = */std::true_type ) {
388 data.push_back(value);
389 }
390
push_back_helper_impl(const T &,std::false_type)391 void push_back_helper_impl( const T&, /*is_copy_constructible = */std::false_type ) {
392 __TBB_ASSERT(false, "error: calling tbb::concurrent_priority_queue.push(const value_type&) for move-only type");
393 }
394
395 using aggregator_type = aggregator<functor, cpq_operation>;
396
397 aggregator_type my_aggregator;
398 // Padding added to avoid false sharing
399 char padding1[max_nfs_size - sizeof(aggregator_type)];
400 // The point at which unsorted elements begin
401 size_type mark;
402 std::atomic<size_type> my_size;
403 Compare my_compare;
404
405 // Padding added to avoid false sharing
406 char padding2[max_nfs_size - (2*sizeof(size_type)) - sizeof(Compare)];
407 //! Storage for the heap of elements in queue, plus unheapified elements
408 /** data has the following structure:
409
410 binary unheapified
411 heap elements
412 ____|_______|____
413 | | |
414 v v v
415 [_|...|_|_|...|_| |...| ]
416 0 ^ ^ ^
417 | | |__capacity
418 | |__my_size
419 |__mark
420
421 Thus, data stores the binary heap starting at position 0 through
422 mark-1 (it may be empty). Then there are 0 or more elements
423 that have not yet been inserted into the heap, in positions
424 mark through my_size-1. */
425
426 using vector_type = std::vector<value_type, allocator_type>;
427 vector_type data;
428
429 friend bool operator==( const concurrent_priority_queue& lhs,
430 const concurrent_priority_queue& rhs )
431 {
432 return lhs.data == rhs.data;
433 }
434
435 #if !__TBB_CPP20_COMPARISONS_PRESENT
436 friend bool operator!=( const concurrent_priority_queue& lhs,
437 const concurrent_priority_queue& rhs )
438 {
439 return !(lhs == rhs);
440 }
441 #endif
442 }; // class concurrent_priority_queue
443
444 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
445 template <typename It,
446 typename Comp = std::less<iterator_value_t<It>>,
447 typename Alloc = tbb::cache_aligned_allocator<iterator_value_t<It>>,
448 typename = std::enable_if_t<is_input_iterator_v<It>>,
449 typename = std::enable_if_t<is_allocator_v<Alloc>>,
450 typename = std::enable_if_t<!is_allocator_v<Comp>>>
451 concurrent_priority_queue( It, It, Comp = Comp(), Alloc = Alloc() )
452 -> concurrent_priority_queue<iterator_value_t<It>, Comp, Alloc>;
453
454 template <typename It, typename Alloc,
455 typename = std::enable_if_t<is_input_iterator_v<It>>,
456 typename = std::enable_if_t<is_allocator_v<Alloc>>>
457 concurrent_priority_queue( It, It, Alloc )
458 -> concurrent_priority_queue<iterator_value_t<It>, std::less<iterator_value_t<It>>, Alloc>;
459
460 template <typename T,
461 typename Comp = std::less<T>,
462 typename Alloc = tbb::cache_aligned_allocator<T>,
463 typename = std::enable_if_t<is_allocator_v<Alloc>>,
464 typename = std::enable_if_t<!is_allocator_v<Comp>>>
465 concurrent_priority_queue( std::initializer_list<T>, Comp = Comp(), Alloc = Alloc() )
466 -> concurrent_priority_queue<T, Comp, Alloc>;
467
468 template <typename T, typename Alloc,
469 typename = std::enable_if_t<is_allocator_v<Alloc>>>
470 concurrent_priority_queue( std::initializer_list<T>, Alloc )
471 -> concurrent_priority_queue<T, std::less<T>, Alloc>;
472
473 #endif // __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
474
475 template <typename T, typename Compare, typename Allocator>
swap(concurrent_priority_queue<T,Compare,Allocator> & lhs,concurrent_priority_queue<T,Compare,Allocator> & rhs)476 void swap( concurrent_priority_queue<T, Compare, Allocator>& lhs,
477 concurrent_priority_queue<T, Compare, Allocator>& rhs )
478 {
479 lhs.swap(rhs);
480 }
481
482 } // namespace d1
483 } // namespace detail
484 inline namespace v1 {
485 using detail::d1::concurrent_priority_queue;
486
487 } // inline namespace v1
488 } // namespace tbb
489
490 #endif // __TBB_concurrent_priority_queue_H
491