1 /*
2     Copyright (c) 2005-2020 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #ifndef __TBB_concurrent_priority_queue_H
18 #define __TBB_concurrent_priority_queue_H
19 
20 #include "detail/_aggregator.h"
21 #include "detail/_template_helpers.h"
22 #include "detail/_allocator_traits.h"
23 #include "detail/_range_common.h"
24 #include "detail/_exception.h"
25 #include "detail/_utils.h"
26 #include "cache_aligned_allocator.h"
27 #include <vector>
28 #include <iterator>
29 #include <functional>
30 #include <utility>
31 #include <initializer_list>
32 #include <type_traits>
33 
34 namespace tbb {
35 namespace detail {
36 namespace d1 {
37 
38 template <typename T, typename Compare = std::less<T>, typename Allocator = cache_aligned_allocator<T>>
39 class concurrent_priority_queue {
40 public:
41     using value_type = T;
42     using reference = T&;
43     using const_reference = const T&;
44 
45     using size_type = std::size_t;
46     using difference_type = std::ptrdiff_t;
47 
48     using allocator_type = Allocator;
49 
50     concurrent_priority_queue() : concurrent_priority_queue(allocator_type{}) {}
51 
52     explicit concurrent_priority_queue( const allocator_type& alloc )
53         : mark(0), my_size(0), my_compare(), data(alloc)
54     {
55         my_aggregator.initialize_handler(functor{this});
56     }
57 
58     explicit concurrent_priority_queue( const Compare& compare, const allocator_type& alloc = allocator_type() )
59         : mark(0), my_size(0), my_compare(compare), data(alloc)
60     {
61         my_aggregator.initialize_handler(functor{this});
62     }
63 
64     explicit concurrent_priority_queue( size_type init_capacity, const allocator_type& alloc = allocator_type() )
65         : mark(0), my_size(0), my_compare(), data(alloc)
66     {
67         data.reserve(init_capacity);
68         my_aggregator.initialize_handler(functor{this});
69     }
70 
71     explicit concurrent_priority_queue( size_type init_capacity, const Compare& compare, const allocator_type& alloc = allocator_type() )
72         : mark(0), my_size(0), my_compare(compare), data(alloc)
73     {
74         data.reserve(init_capacity);
75         my_aggregator.initialize_handler(functor{this});
76     }
77 
78     template <typename InputIterator>
79     concurrent_priority_queue( InputIterator begin, InputIterator end, const Compare& compare, const allocator_type& alloc = allocator_type() )
80         : mark(0), my_compare(compare), data(begin, end, alloc)
81     {
82         my_aggregator.initialize_handler(functor{this});
83         heapify();
84         my_size.store(data.size(), std::memory_order_relaxed);
85     }
86 
87     template <typename InputIterator>
88     concurrent_priority_queue( InputIterator begin, InputIterator end, const allocator_type& alloc = allocator_type() )
89         : concurrent_priority_queue(begin, end, Compare(), alloc) {}
90 
91     concurrent_priority_queue( std::initializer_list<value_type> init, const Compare& compare, const allocator_type& alloc = allocator_type() )
92         : concurrent_priority_queue(init.begin(), init.end(), compare, alloc) {}
93 
94     concurrent_priority_queue( std::initializer_list<value_type> init, const allocator_type& alloc = allocator_type() )
95         : concurrent_priority_queue(init, Compare(), alloc) {}
96 
97     concurrent_priority_queue( const concurrent_priority_queue& other )
98         : mark(other.mark), my_size(other.my_size.load(std::memory_order_relaxed)), my_compare(other.my_compare),
99           data(other.data)
100     {
101         my_aggregator.initialize_handler(functor{this});
102     }
103 
104     concurrent_priority_queue( const concurrent_priority_queue& other, const allocator_type& alloc )
105         : mark(other.mark), my_size(other.my_size.load(std::memory_order_relaxed)), my_compare(other.my_compare),
106           data(other.data, alloc)
107     {
108         my_aggregator.initialize_handler(functor{this});
109     }
110 
111     concurrent_priority_queue( concurrent_priority_queue&& other )
112         : mark(other.mark), my_size(other.my_size.load(std::memory_order_relaxed)), my_compare(other.my_compare),
113           data(std::move(other.data))
114     {
115         my_aggregator.initialize_handler(functor{this});
116     }
117 
118     concurrent_priority_queue( concurrent_priority_queue&& other, const allocator_type& alloc )
119         : mark(other.mark), my_size(other.my_size.load(std::memory_order_relaxed)), my_compare(other.my_compare),
120           data(std::move(other.data), alloc)
121     {
122         my_aggregator.initialize_handler(functor{this});
123     }
124 
125     concurrent_priority_queue& operator=( const concurrent_priority_queue& other ) {
126         if (this != &other) {
127             data = other.data;
128             mark = other.mark;
129             my_size.store(other.my_size.load(std::memory_order_relaxed), std::memory_order_relaxed);
130         }
131         return *this;
132     }
133 
134     concurrent_priority_queue& operator=( concurrent_priority_queue&& other ) {
135         if (this != &other) {
136             // TODO: check if exceptions from std::vector::operator=(vector&&) should be handled separately
137             data = std::move(other.data);
138             mark = other.mark;
139             my_size.store(other.my_size.load(std::memory_order_relaxed), std::memory_order_relaxed);
140         }
141         return *this;
142     }
143 
144     concurrent_priority_queue& operator=( std::initializer_list<value_type> init ) {
145         assign(init.begin(), init.end());
146         return *this;
147     }
148 
149     template <typename InputIterator>
150     void assign( InputIterator begin, InputIterator end ) {
151         data.assign(begin, end);
152         mark = 0;
153         my_size.store(data.size(), std::memory_order_relaxed);
154         heapify();
155     }
156 
157     void assign( std::initializer_list<value_type> init ) {
158         assign(init.begin(), init.end());
159     }
160 
161     /* Returned value may not reflect results of pending operations.
162        This operation reads shared data and will trigger a race condition. */
163     bool empty() const { return size() == 0; }
164 
165     // Returns the current number of elements contained in the queue
166     /* Returned value may not reflect results of pending operations.
167        This operation reads shared data and will trigger a race condition. */
168     size_type size() const { return my_size.load(std::memory_order_relaxed); }
169 
170     /* This operation can be safely used concurrently with other push, try_pop or emplace operations. */
171     void push( const value_type& value ) {
172         cpq_operation op_data(value, PUSH_OP);
173         my_aggregator.execute(&op_data);
174         if (op_data.status == FAILED)
175             throw_exception(exception_id::bad_alloc);
176     }
177 
178     /* This operation can be safely used concurrently with other push, try_pop or emplace operations. */
179     void push( value_type&& value ) {
180         cpq_operation op_data(value, PUSH_RVALUE_OP);
181         my_aggregator.execute(&op_data);
182         if (op_data.status == FAILED)
183             throw_exception(exception_id::bad_alloc);
184     }
185 
186     /* This operation can be safely used concurrently with other push, try_pop or emplace operations. */
187     template <typename... Args>
188     void emplace( Args&&... args ) {
189         // TODO: support uses allocator construction in this place
190         push(value_type(std::forward<Args>(args)...));
191     }
192 
193     // Gets a reference to and removes highest priority element
194     /* If a highest priority element was found, sets elem and returns true,
195        otherwise returns false.
196        This operation can be safely used concurrently with other push, try_pop or emplace operations. */
197     bool try_pop( value_type& value ) {
198         cpq_operation op_data(value, POP_OP);
199         my_aggregator.execute(&op_data);
200         return op_data.status == SUCCEEDED;
201     }
202 
203     // This operation affects the whole container => it is not thread-safe
204     void clear() {
205         data.clear();
206         mark = 0;
207         my_size.store(0, std::memory_order_relaxed);
208     }
209 
210     // This operation affects the whole container => it is not thread-safe
211     void swap( concurrent_priority_queue& other ) {
212         if (this != &other) {
213             using std::swap;
214             swap(data, other.data);
215             swap(mark, other.mark);
216 
217             size_type sz = my_size.load(std::memory_order_relaxed);
218             my_size.store(other.my_size.load(std::memory_order_relaxed), std::memory_order_relaxed);
219             other.my_size.store(sz, std::memory_order_relaxed);
220         }
221     }
222 
223     allocator_type get_allocator() const { return data.get_allocator(); }
224 private:
225     enum operation_type {INVALID_OP, PUSH_OP, POP_OP, PUSH_RVALUE_OP};
226     enum operation_status {WAIT = 0, SUCCEEDED, FAILED};
227 
228     class cpq_operation : public aggregated_operation<cpq_operation> {
229     public:
230         operation_type type;
231         union {
232             value_type* elem;
233             size_type sz;
234         };
235         cpq_operation( const value_type& value, operation_type t )
236             : type(t), elem(const_cast<value_type*>(&value)) {}
237     }; // class cpq_operation
238 
239     class functor {
240         concurrent_priority_queue* my_cpq;
241     public:
242         functor() : my_cpq(nullptr) {}
243         functor( concurrent_priority_queue* cpq ) : my_cpq(cpq) {}
244 
245         void operator()(cpq_operation* op_list) {
246             __TBB_ASSERT(my_cpq != nullptr, "Invalid functor");
247             my_cpq->handle_operations(op_list);
248         }
249     }; // class functor
250 
251     void handle_operations( cpq_operation* op_list ) {
252         call_itt_notify(acquired, this);
253         cpq_operation* tmp, *pop_list = nullptr;
254         __TBB_ASSERT(mark == data.size(), NULL);
255 
256         // First pass processes all constant (amortized; reallocation may happen) time pushes and pops.
257         while(op_list) {
258             // ITT note: &(op_list->status) tag is used to cover accesses to op_list
259             // node. This thread is going to handle the operation, and so will acquire it
260             // and perform the associated operation w/o triggering a race condition; the
261             // thread that created the operation is waiting on the status field, so when
262             // this thread is done with the operation, it will perform a
263             // store_with_release to give control back to the waiting thread in
264             // aggregator::insert_operation.
265             // TODO: enable
266             call_itt_notify(acquired, &(op_list->status));
267             __TBB_ASSERT(op_list->type != INVALID_OP, NULL);
268 
269             tmp = op_list;
270             op_list = op_list->next.load(std::memory_order_relaxed);
271             if (tmp->type == POP_OP) {
272                 if (mark < data.size() &&
273                     my_compare(data[0], data.back()))
274                 {
275                     // there are newly pushed elems and the last one is higher than top
276                     *(tmp->elem) = std::move(data.back());
277                     my_size.store(my_size.load(std::memory_order_relaxed) - 1, std::memory_order_relaxed);
278                     tmp->status.store(uintptr_t(SUCCEEDED), std::memory_order_release);
279 
280                     data.pop_back();
281                     __TBB_ASSERT(mark <= data.size(), NULL);
282                 } else { // no convenient item to pop; postpone
283                     tmp->next.store(pop_list, std::memory_order_relaxed);
284                     pop_list = tmp;
285                 }
286             } else { // PUSH_OP or PUSH_RVALUE_OP
287                 __TBB_ASSERT(tmp->type == PUSH_OP || tmp->type == PUSH_RVALUE_OP, "Unknown operation");
288 #if TBB_USE_EXCEPTIONS
289                 try
290 #endif
291                 {
292                     if (tmp->type == PUSH_OP) {
293                         push_back_helper(*(tmp->elem));
294                     } else {
295                         data.push_back(std::move(*(tmp->elem)));
296                     }
297                     my_size.store(my_size.load(std::memory_order_relaxed) + 1, std::memory_order_relaxed);
298                     tmp->status.store(uintptr_t(SUCCEEDED), std::memory_order_release);
299                 }
300 #if TBB_USE_EXCEPTIONS
301                 catch(...) {
302                     tmp->status.store(uintptr_t(FAILED), std::memory_order_release);
303                 }
304 #endif
305             }
306         }
307 
308         // Second pass processes pop operations
309         while(pop_list) {
310             tmp = pop_list;
311             pop_list = pop_list->next.load(std::memory_order_relaxed);
312             __TBB_ASSERT(tmp->type == POP_OP, NULL);
313             if (data.empty()) {
314                 tmp->status.store(uintptr_t(FAILED), std::memory_order_release);
315             } else {
316                 __TBB_ASSERT(mark <= data.size(), NULL);
317                 if (mark < data.size() &&
318                     my_compare(data[0], data.back()))
319                 {
320                     // there are newly pushed elems and the last one is higher than top
321                     *(tmp->elem) = std::move(data.back());
322                     my_size.store(my_size.load(std::memory_order_relaxed) - 1, std::memory_order_relaxed);
323                     tmp->status.store(uintptr_t(SUCCEEDED), std::memory_order_release);
324                     data.pop_back();
325                 } else { // extract top and push last element down heap
326                     *(tmp->elem) = std::move(data[0]);
327                     my_size.store(my_size.load(std::memory_order_relaxed) - 1, std::memory_order_relaxed);
328                     tmp->status.store(uintptr_t(SUCCEEDED), std::memory_order_release);
329                     reheap();
330                 }
331             }
332         }
333 
334         // heapify any leftover pushed elements before doing the next
335         // batch of operations
336         if (mark < data.size()) heapify();
337         __TBB_ASSERT(mark == data.size(), NULL);
338         call_itt_notify(releasing, this);
339     }
340 
341     // Merge unsorted elements into heap
342     void heapify() {
343         if (!mark && data.size() > 0) mark = 1;
344         for (; mark < data.size(); ++mark) {
345             // for each unheapified element under size
346             size_type cur_pos = mark;
347             value_type to_place = std::move(data[mark]);
348             do { // push to_place up the heap
349                 size_type parent = (cur_pos - 1) >> 1;
350                 if (!my_compare(data[parent], to_place))
351                     break;
352                 data[cur_pos] = std::move(data[parent]);
353                 cur_pos = parent;
354             } while(cur_pos);
355             data[cur_pos] = std::move(to_place);
356         }
357     }
358 
359     // Re-heapify after an extraction
360     // Re-heapify by pushing last element down the heap from the root.
361     void reheap() {
362         size_type cur_pos = 0, child = 1;
363 
364         while(child < mark) {
365             size_type target = child;
366             if (child + 1 < mark && my_compare(data[child], data[child + 1]))
367                 ++target;
368             // target now has the higher priority child
369             if (my_compare(data[target], data.back()))
370                 break;
371             data[cur_pos] = std::move(data[target]);
372             cur_pos = target;
373             child = (cur_pos << 1) + 1;
374         }
375         if (cur_pos != data.size() - 1)
376             data[cur_pos] = std::move(data.back());
377         data.pop_back();
378         if (mark > data.size()) mark = data.size();
379     }
380 
381     void push_back_helper( const T& value ) {
382         push_back_helper_impl(value, std::is_copy_constructible<T>{});
383     }
384 
385     void push_back_helper_impl( const T& value, /*is_copy_constructible = */std::true_type ) {
386         data.push_back(value);
387     }
388 
389     void push_back_helper_impl( const T&, /*is_copy_constructible = */std::false_type ) {
390         __TBB_ASSERT(false, "error: calling tbb::concurrent_priority_queue.push(const value_type&) for move-only type");
391     }
392 
393     using aggregator_type = aggregator<functor, cpq_operation>;
394 
395     aggregator_type my_aggregator;
396     // Padding added to avoid false sharing
397     char padding1[max_nfs_size - sizeof(aggregator_type)];
398     // The point at which unsorted elements begin
399     size_type mark;
400     std::atomic<size_type> my_size;
401     Compare my_compare;
402 
403     // Padding added to avoid false sharing
404     char padding2[max_nfs_size - (2*sizeof(size_type)) - sizeof(Compare)];
405     //! Storage for the heap of elements in queue, plus unheapified elements
406     /** data has the following structure:
407 
408          binary unheapified
409           heap   elements
410         ____|_______|____
411         |       |       |
412         v       v       v
413         [_|...|_|_|...|_| |...| ]
414          0       ^       ^       ^
415                  |       |       |__capacity
416                  |       |__my_size
417                  |__mark
418 
419         Thus, data stores the binary heap starting at position 0 through
420         mark-1 (it may be empty).  Then there are 0 or more elements
421         that have not yet been inserted into the heap, in positions
422         mark through my_size-1. */
423 
424     using vector_type = std::vector<value_type, allocator_type>;
425     vector_type data;
426 
427     template <typename Type, typename Comp, typename Alloc>
428     friend bool operator==( const concurrent_priority_queue<Type, Comp, Alloc>&,
429                             const concurrent_priority_queue<Type, Comp, Alloc>& );
430 }; // class concurrent_priority_queue
431 
432 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
433 template <typename T, typename... Args>
434 using priority_queue_type = concurrent_priority_queue<T,
435                                                       std::conditional_t<(sizeof...(Args) > 0) && !is_allocator_v<pack_element_t<0, Args...>>,
436                                                                          pack_element_t<0, Args...>, std::less<T>>,
437                                                       std::conditional_t<(sizeof...(Args) > 0) && is_allocator_v<pack_element_t<sizeof...(Args) - 1, Args...>>,
438                                                                          pack_element_t<sizeof...(Args) - 1, Args...>,
439                                                                          cache_aligned_allocator<T>>>;
440 
441 template <typename InputIterator,
442           typename T = typename std::iterator_traits<InputIterator>::value_type,
443           typename... Args>
444 concurrent_priority_queue( InputIterator, InputIterator, Args... )
445 -> priority_queue_type<T, Args...>;
446 
447 template <typename T, typename CompareOrAllocator>
448 concurrent_priority_queue( std::initializer_list<T> init_list, CompareOrAllocator )
449 -> priority_queue_type<T, CompareOrAllocator>;
450 #endif // __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
451 
452 template <typename T, typename Compare, typename Allocator>
453 bool operator==( const concurrent_priority_queue<T, Compare, Allocator>& lhs,
454                  const concurrent_priority_queue<T, Compare, Allocator>& rhs )
455 {
456     return lhs.data == rhs.data;
457 }
458 
459 template <typename T, typename Compare, typename Allocator>
460 bool operator!=( const concurrent_priority_queue<T, Compare, Allocator>& lhs,
461                  const concurrent_priority_queue<T, Compare, Allocator>& rhs )
462 {
463     return !(lhs == rhs);
464 }
465 
466 template <typename T, typename Compare, typename Allocator>
467 void swap( concurrent_priority_queue<T, Compare, Allocator>& lhs,
468            concurrent_priority_queue<T, Compare, Allocator>& rhs )
469 {
470     lhs.swap(rhs);
471 }
472 
473 } // namespace d1
474 } // namespace detail
475 inline namespace v1 {
476 using detail::d1::concurrent_priority_queue;
477 
478 } // inline namespace v1
479 } // namespace tbb
480 
481 #endif // __TBB_concurrent_priority_queue_H
482