1 /*
2     Copyright (c) 2005-2020 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #ifndef __TBB_concurrent_priority_queue_H
18 #define __TBB_concurrent_priority_queue_H
19 
20 #include "detail/_namespace_injection.h"
21 #include "detail/_aggregator.h"
22 #include "detail/_template_helpers.h"
23 #include "detail/_allocator_traits.h"
24 #include "detail/_range_common.h"
25 #include "detail/_exception.h"
26 #include "detail/_utils.h"
27 #include "cache_aligned_allocator.h"
28 #include <vector>
29 #include <iterator>
30 #include <functional>
31 #include <utility>
32 #include <initializer_list>
33 #include <type_traits>
34 
35 namespace tbb {
36 namespace detail {
37 namespace d1 {
38 
39 template <typename T, typename Compare = std::less<T>, typename Allocator = cache_aligned_allocator<T>>
40 class concurrent_priority_queue {
41 public:
42     using value_type = T;
43     using reference = T&;
44     using const_reference = const T&;
45 
46     using size_type = std::size_t;
47     using difference_type = std::ptrdiff_t;
48 
49     using allocator_type = Allocator;
50 
51     concurrent_priority_queue() : concurrent_priority_queue(allocator_type{}) {}
52 
53     explicit concurrent_priority_queue( const allocator_type& alloc )
54         : mark(0), my_size(0), my_compare(), data(alloc)
55     {
56         my_aggregator.initialize_handler(functor{this});
57     }
58 
59     explicit concurrent_priority_queue( const Compare& compare, const allocator_type& alloc = allocator_type() )
60         : mark(0), my_size(0), my_compare(compare), data(alloc)
61     {
62         my_aggregator.initialize_handler(functor{this});
63     }
64 
65     explicit concurrent_priority_queue( size_type init_capacity, const allocator_type& alloc = allocator_type() )
66         : mark(0), my_size(0), my_compare(), data(alloc)
67     {
68         data.reserve(init_capacity);
69         my_aggregator.initialize_handler(functor{this});
70     }
71 
72     explicit concurrent_priority_queue( size_type init_capacity, const Compare& compare, const allocator_type& alloc = allocator_type() )
73         : mark(0), my_size(0), my_compare(compare), data(alloc)
74     {
75         data.reserve(init_capacity);
76         my_aggregator.initialize_handler(functor{this});
77     }
78 
79     template <typename InputIterator>
80     concurrent_priority_queue( InputIterator begin, InputIterator end, const Compare& compare, const allocator_type& alloc = allocator_type() )
81         : mark(0), my_compare(compare), data(begin, end, alloc)
82     {
83         my_aggregator.initialize_handler(functor{this});
84         heapify();
85         my_size.store(data.size(), std::memory_order_relaxed);
86     }
87 
88     template <typename InputIterator>
89     concurrent_priority_queue( InputIterator begin, InputIterator end, const allocator_type& alloc = allocator_type() )
90         : concurrent_priority_queue(begin, end, Compare(), alloc) {}
91 
92     concurrent_priority_queue( std::initializer_list<value_type> init, const Compare& compare, const allocator_type& alloc = allocator_type() )
93         : concurrent_priority_queue(init.begin(), init.end(), compare, alloc) {}
94 
95     concurrent_priority_queue( std::initializer_list<value_type> init, const allocator_type& alloc = allocator_type() )
96         : concurrent_priority_queue(init, Compare(), alloc) {}
97 
98     concurrent_priority_queue( const concurrent_priority_queue& other )
99         : mark(other.mark), my_size(other.my_size.load(std::memory_order_relaxed)), my_compare(other.my_compare),
100           data(other.data)
101     {
102         my_aggregator.initialize_handler(functor{this});
103     }
104 
105     concurrent_priority_queue( const concurrent_priority_queue& other, const allocator_type& alloc )
106         : mark(other.mark), my_size(other.my_size.load(std::memory_order_relaxed)), my_compare(other.my_compare),
107           data(other.data, alloc)
108     {
109         my_aggregator.initialize_handler(functor{this});
110     }
111 
112     concurrent_priority_queue( concurrent_priority_queue&& other )
113         : mark(other.mark), my_size(other.my_size.load(std::memory_order_relaxed)), my_compare(other.my_compare),
114           data(std::move(other.data))
115     {
116         my_aggregator.initialize_handler(functor{this});
117     }
118 
119     concurrent_priority_queue( concurrent_priority_queue&& other, const allocator_type& alloc )
120         : mark(other.mark), my_size(other.my_size.load(std::memory_order_relaxed)), my_compare(other.my_compare),
121           data(std::move(other.data), alloc)
122     {
123         my_aggregator.initialize_handler(functor{this});
124     }
125 
126     concurrent_priority_queue& operator=( const concurrent_priority_queue& other ) {
127         if (this != &other) {
128             data = other.data;
129             mark = other.mark;
130             my_size.store(other.my_size.load(std::memory_order_relaxed), std::memory_order_relaxed);
131         }
132         return *this;
133     }
134 
135     concurrent_priority_queue& operator=( concurrent_priority_queue&& other ) {
136         if (this != &other) {
137             // TODO: check if exceptions from std::vector::operator=(vector&&) should be handled separately
138             data = std::move(other.data);
139             mark = other.mark;
140             my_size.store(other.my_size.load(std::memory_order_relaxed), std::memory_order_relaxed);
141         }
142         return *this;
143     }
144 
145     concurrent_priority_queue& operator=( std::initializer_list<value_type> init ) {
146         assign(init.begin(), init.end());
147         return *this;
148     }
149 
150     template <typename InputIterator>
151     void assign( InputIterator begin, InputIterator end ) {
152         data.assign(begin, end);
153         mark = 0;
154         my_size.store(data.size(), std::memory_order_relaxed);
155         heapify();
156     }
157 
158     void assign( std::initializer_list<value_type> init ) {
159         assign(init.begin(), init.end());
160     }
161 
162     /* Returned value may not reflect results of pending operations.
163        This operation reads shared data and will trigger a race condition. */
164     bool empty() const { return size() == 0; }
165 
166     // Returns the current number of elements contained in the queue
167     /* Returned value may not reflect results of pending operations.
168        This operation reads shared data and will trigger a race condition. */
169     size_type size() const { return my_size.load(std::memory_order_relaxed); }
170 
171     /* This operation can be safely used concurrently with other push, try_pop or emplace operations. */
172     void push( const value_type& value ) {
173         cpq_operation op_data(value, PUSH_OP);
174         my_aggregator.execute(&op_data);
175         if (op_data.status == FAILED)
176             throw_exception(exception_id::bad_alloc);
177     }
178 
179     /* This operation can be safely used concurrently with other push, try_pop or emplace operations. */
180     void push( value_type&& value ) {
181         cpq_operation op_data(value, PUSH_RVALUE_OP);
182         my_aggregator.execute(&op_data);
183         if (op_data.status == FAILED)
184             throw_exception(exception_id::bad_alloc);
185     }
186 
187     /* This operation can be safely used concurrently with other push, try_pop or emplace operations. */
188     template <typename... Args>
189     void emplace( Args&&... args ) {
190         // TODO: support uses allocator construction in this place
191         push(value_type(std::forward<Args>(args)...));
192     }
193 
194     // Gets a reference to and removes highest priority element
195     /* If a highest priority element was found, sets elem and returns true,
196        otherwise returns false.
197        This operation can be safely used concurrently with other push, try_pop or emplace operations. */
198     bool try_pop( value_type& value ) {
199         cpq_operation op_data(value, POP_OP);
200         my_aggregator.execute(&op_data);
201         return op_data.status == SUCCEEDED;
202     }
203 
204     // This operation affects the whole container => it is not thread-safe
205     void clear() {
206         data.clear();
207         mark = 0;
208         my_size.store(0, std::memory_order_relaxed);
209     }
210 
211     // This operation affects the whole container => it is not thread-safe
212     void swap( concurrent_priority_queue& other ) {
213         if (this != &other) {
214             using std::swap;
215             swap(data, other.data);
216             swap(mark, other.mark);
217 
218             size_type sz = my_size.load(std::memory_order_relaxed);
219             my_size.store(other.my_size.load(std::memory_order_relaxed), std::memory_order_relaxed);
220             other.my_size.store(sz, std::memory_order_relaxed);
221         }
222     }
223 
224     allocator_type get_allocator() const { return data.get_allocator(); }
225 private:
226     enum operation_type {INVALID_OP, PUSH_OP, POP_OP, PUSH_RVALUE_OP};
227     enum operation_status {WAIT = 0, SUCCEEDED, FAILED};
228 
229     class cpq_operation : public aggregated_operation<cpq_operation> {
230     public:
231         operation_type type;
232         union {
233             value_type* elem;
234             size_type sz;
235         };
236         cpq_operation( const value_type& value, operation_type t )
237             : type(t), elem(const_cast<value_type*>(&value)) {}
238     }; // class cpq_operation
239 
240     class functor {
241         concurrent_priority_queue* my_cpq;
242     public:
243         functor() : my_cpq(nullptr) {}
244         functor( concurrent_priority_queue* cpq ) : my_cpq(cpq) {}
245 
246         void operator()(cpq_operation* op_list) {
247             __TBB_ASSERT(my_cpq != nullptr, "Invalid functor");
248             my_cpq->handle_operations(op_list);
249         }
250     }; // class functor
251 
252     void handle_operations( cpq_operation* op_list ) {
253         call_itt_notify(acquired, this);
254         cpq_operation* tmp, *pop_list = nullptr;
255         __TBB_ASSERT(mark == data.size(), NULL);
256 
257         // First pass processes all constant (amortized; reallocation may happen) time pushes and pops.
258         while(op_list) {
259             // ITT note: &(op_list->status) tag is used to cover accesses to op_list
260             // node. This thread is going to handle the operation, and so will acquire it
261             // and perform the associated operation w/o triggering a race condition; the
262             // thread that created the operation is waiting on the status field, so when
263             // this thread is done with the operation, it will perform a
264             // store_with_release to give control back to the waiting thread in
265             // aggregator::insert_operation.
266             // TODO: enable
267             call_itt_notify(acquired, &(op_list->status));
268             __TBB_ASSERT(op_list->type != INVALID_OP, NULL);
269 
270             tmp = op_list;
271             op_list = op_list->next.load(std::memory_order_relaxed);
272             if (tmp->type == POP_OP) {
273                 if (mark < data.size() &&
274                     my_compare(data[0], data.back()))
275                 {
276                     // there are newly pushed elems and the last one is higher than top
277                     *(tmp->elem) = std::move(data.back());
278                     my_size.store(my_size.load(std::memory_order_relaxed) - 1, std::memory_order_relaxed);
279                     tmp->status.store(uintptr_t(SUCCEEDED), std::memory_order_release);
280 
281                     data.pop_back();
282                     __TBB_ASSERT(mark <= data.size(), NULL);
283                 } else { // no convenient item to pop; postpone
284                     tmp->next.store(pop_list, std::memory_order_relaxed);
285                     pop_list = tmp;
286                 }
287             } else { // PUSH_OP or PUSH_RVALUE_OP
288                 __TBB_ASSERT(tmp->type == PUSH_OP || tmp->type == PUSH_RVALUE_OP, "Unknown operation");
289 #if TBB_USE_EXCEPTIONS
290                 try
291 #endif
292                 {
293                     if (tmp->type == PUSH_OP) {
294                         push_back_helper(*(tmp->elem));
295                     } else {
296                         data.push_back(std::move(*(tmp->elem)));
297                     }
298                     my_size.store(my_size.load(std::memory_order_relaxed) + 1, std::memory_order_relaxed);
299                     tmp->status.store(uintptr_t(SUCCEEDED), std::memory_order_release);
300                 }
301 #if TBB_USE_EXCEPTIONS
302                 catch(...) {
303                     tmp->status.store(uintptr_t(FAILED), std::memory_order_release);
304                 }
305 #endif
306             }
307         }
308 
309         // Second pass processes pop operations
310         while(pop_list) {
311             tmp = pop_list;
312             pop_list = pop_list->next.load(std::memory_order_relaxed);
313             __TBB_ASSERT(tmp->type == POP_OP, NULL);
314             if (data.empty()) {
315                 tmp->status.store(uintptr_t(FAILED), std::memory_order_release);
316             } else {
317                 __TBB_ASSERT(mark <= data.size(), NULL);
318                 if (mark < data.size() &&
319                     my_compare(data[0], data.back()))
320                 {
321                     // there are newly pushed elems and the last one is higher than top
322                     *(tmp->elem) = std::move(data.back());
323                     my_size.store(my_size.load(std::memory_order_relaxed) - 1, std::memory_order_relaxed);
324                     tmp->status.store(uintptr_t(SUCCEEDED), std::memory_order_release);
325                     data.pop_back();
326                 } else { // extract top and push last element down heap
327                     *(tmp->elem) = std::move(data[0]);
328                     my_size.store(my_size.load(std::memory_order_relaxed) - 1, std::memory_order_relaxed);
329                     tmp->status.store(uintptr_t(SUCCEEDED), std::memory_order_release);
330                     reheap();
331                 }
332             }
333         }
334 
335         // heapify any leftover pushed elements before doing the next
336         // batch of operations
337         if (mark < data.size()) heapify();
338         __TBB_ASSERT(mark == data.size(), NULL);
339         call_itt_notify(releasing, this);
340     }
341 
342     // Merge unsorted elements into heap
343     void heapify() {
344         if (!mark && data.size() > 0) mark = 1;
345         for (; mark < data.size(); ++mark) {
346             // for each unheapified element under size
347             size_type cur_pos = mark;
348             value_type to_place = std::move(data[mark]);
349             do { // push to_place up the heap
350                 size_type parent = (cur_pos - 1) >> 1;
351                 if (!my_compare(data[parent], to_place))
352                     break;
353                 data[cur_pos] = std::move(data[parent]);
354                 cur_pos = parent;
355             } while(cur_pos);
356             data[cur_pos] = std::move(to_place);
357         }
358     }
359 
360     // Re-heapify after an extraction
361     // Re-heapify by pushing last element down the heap from the root.
362     void reheap() {
363         size_type cur_pos = 0, child = 1;
364 
365         while(child < mark) {
366             size_type target = child;
367             if (child + 1 < mark && my_compare(data[child], data[child + 1]))
368                 ++target;
369             // target now has the higher priority child
370             if (my_compare(data[target], data.back()))
371                 break;
372             data[cur_pos] = std::move(data[target]);
373             cur_pos = target;
374             child = (cur_pos << 1) + 1;
375         }
376         if (cur_pos != data.size() - 1)
377             data[cur_pos] = std::move(data.back());
378         data.pop_back();
379         if (mark > data.size()) mark = data.size();
380     }
381 
382     void push_back_helper( const T& value ) {
383         push_back_helper_impl(value, std::is_copy_constructible<T>{});
384     }
385 
386     void push_back_helper_impl( const T& value, /*is_copy_constructible = */std::true_type ) {
387         data.push_back(value);
388     }
389 
390     void push_back_helper_impl( const T&, /*is_copy_constructible = */std::false_type ) {
391         __TBB_ASSERT(false, "error: calling tbb::concurrent_priority_queue.push(const value_type&) for move-only type");
392     }
393 
394     using aggregator_type = aggregator<functor, cpq_operation>;
395 
396     aggregator_type my_aggregator;
397     // Padding added to avoid false sharing
398     char padding1[max_nfs_size - sizeof(aggregator_type)];
399     // The point at which unsorted elements begin
400     size_type mark;
401     std::atomic<size_type> my_size;
402     Compare my_compare;
403 
404     // Padding added to avoid false sharing
405     char padding2[max_nfs_size - (2*sizeof(size_type)) - sizeof(Compare)];
406     //! Storage for the heap of elements in queue, plus unheapified elements
407     /** data has the following structure:
408 
409          binary unheapified
410           heap   elements
411         ____|_______|____
412         |       |       |
413         v       v       v
414         [_|...|_|_|...|_| |...| ]
415          0       ^       ^       ^
416                  |       |       |__capacity
417                  |       |__my_size
418                  |__mark
419 
420         Thus, data stores the binary heap starting at position 0 through
421         mark-1 (it may be empty).  Then there are 0 or more elements
422         that have not yet been inserted into the heap, in positions
423         mark through my_size-1. */
424 
425     using vector_type = std::vector<value_type, allocator_type>;
426     vector_type data;
427 
428     template <typename Type, typename Comp, typename Alloc>
429     friend bool operator==( const concurrent_priority_queue<Type, Comp, Alloc>&,
430                             const concurrent_priority_queue<Type, Comp, Alloc>& );
431 }; // class concurrent_priority_queue
432 
433 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
434 template <typename T, typename... Args>
435 using priority_queue_type = concurrent_priority_queue<T,
436                                                       std::conditional_t<(sizeof...(Args) > 0) && !is_allocator_v<pack_element_t<0, Args...>>,
437                                                                          pack_element_t<0, Args...>, std::less<T>>,
438                                                       std::conditional_t<(sizeof...(Args) > 0) && is_allocator_v<pack_element_t<sizeof...(Args) - 1, Args...>>,
439                                                                          pack_element_t<sizeof...(Args) - 1, Args...>,
440                                                                          cache_aligned_allocator<T>>>;
441 
442 template <typename InputIterator,
443           typename T = typename std::iterator_traits<InputIterator>::value_type,
444           typename... Args>
445 concurrent_priority_queue( InputIterator, InputIterator, Args... )
446 -> priority_queue_type<T, Args...>;
447 
448 template <typename T, typename CompareOrAllocator>
449 concurrent_priority_queue( std::initializer_list<T> init_list, CompareOrAllocator )
450 -> priority_queue_type<T, CompareOrAllocator>;
451 #endif // __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
452 
453 template <typename T, typename Compare, typename Allocator>
454 bool operator==( const concurrent_priority_queue<T, Compare, Allocator>& lhs,
455                  const concurrent_priority_queue<T, Compare, Allocator>& rhs )
456 {
457     return lhs.data == rhs.data;
458 }
459 
460 template <typename T, typename Compare, typename Allocator>
461 bool operator!=( const concurrent_priority_queue<T, Compare, Allocator>& lhs,
462                  const concurrent_priority_queue<T, Compare, Allocator>& rhs )
463 {
464     return !(lhs == rhs);
465 }
466 
467 template <typename T, typename Compare, typename Allocator>
468 void swap( concurrent_priority_queue<T, Compare, Allocator>& lhs,
469            concurrent_priority_queue<T, Compare, Allocator>& rhs )
470 {
471     lhs.swap(rhs);
472 }
473 
474 } // namespace d1
475 } // namespace detail
476 inline namespace v1 {
477 using detail::d1::concurrent_priority_queue;
478 
479 } // inline namespace v1
480 } // namespace tbb
481 
482 #endif // __TBB_concurrent_priority_queue_H
483