1 /*
2     Copyright (c) 2005-2021 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #ifndef __TBB_concurrent_priority_queue_H
18 #define __TBB_concurrent_priority_queue_H
19 
20 #include "detail/_namespace_injection.h"
21 #include "detail/_aggregator.h"
22 #include "detail/_template_helpers.h"
23 #include "detail/_allocator_traits.h"
24 #include "detail/_range_common.h"
25 #include "detail/_exception.h"
26 #include "detail/_utils.h"
27 #include "detail/_containers_helpers.h"
28 #include "cache_aligned_allocator.h"
29 #include <vector>
30 #include <iterator>
31 #include <functional>
32 #include <utility>
33 #include <initializer_list>
34 #include <type_traits>
35 
36 namespace tbb {
37 namespace detail {
38 namespace d1 {
39 
40 template <typename T, typename Compare = std::less<T>, typename Allocator = cache_aligned_allocator<T>>
41 class concurrent_priority_queue {
42 public:
43     using value_type = T;
44     using reference = T&;
45     using const_reference = const T&;
46 
47     using size_type = std::size_t;
48     using difference_type = std::ptrdiff_t;
49 
50     using allocator_type = Allocator;
51 
52     concurrent_priority_queue() : concurrent_priority_queue(allocator_type{}) {}
53 
54     explicit concurrent_priority_queue( const allocator_type& alloc )
55         : mark(0), my_size(0), my_compare(), data(alloc)
56     {
57         my_aggregator.initialize_handler(functor{this});
58     }
59 
60     explicit concurrent_priority_queue( const Compare& compare, const allocator_type& alloc = allocator_type() )
61         : mark(0), my_size(0), my_compare(compare), data(alloc)
62     {
63         my_aggregator.initialize_handler(functor{this});
64     }
65 
66     explicit concurrent_priority_queue( size_type init_capacity, const allocator_type& alloc = allocator_type() )
67         : mark(0), my_size(0), my_compare(), data(alloc)
68     {
69         data.reserve(init_capacity);
70         my_aggregator.initialize_handler(functor{this});
71     }
72 
73     explicit concurrent_priority_queue( size_type init_capacity, const Compare& compare, const allocator_type& alloc = allocator_type() )
74         : mark(0), my_size(0), my_compare(compare), data(alloc)
75     {
76         data.reserve(init_capacity);
77         my_aggregator.initialize_handler(functor{this});
78     }
79 
80     template <typename InputIterator>
81     concurrent_priority_queue( InputIterator begin, InputIterator end, const Compare& compare, const allocator_type& alloc = allocator_type() )
82         : mark(0), my_compare(compare), data(begin, end, alloc)
83     {
84         my_aggregator.initialize_handler(functor{this});
85         heapify();
86         my_size.store(data.size(), std::memory_order_relaxed);
87     }
88 
89     template <typename InputIterator>
90     concurrent_priority_queue( InputIterator begin, InputIterator end, const allocator_type& alloc = allocator_type() )
91         : concurrent_priority_queue(begin, end, Compare(), alloc) {}
92 
93     concurrent_priority_queue( std::initializer_list<value_type> init, const Compare& compare, const allocator_type& alloc = allocator_type() )
94         : concurrent_priority_queue(init.begin(), init.end(), compare, alloc) {}
95 
96     concurrent_priority_queue( std::initializer_list<value_type> init, const allocator_type& alloc = allocator_type() )
97         : concurrent_priority_queue(init, Compare(), alloc) {}
98 
99     concurrent_priority_queue( const concurrent_priority_queue& other )
100         : mark(other.mark), my_size(other.my_size.load(std::memory_order_relaxed)), my_compare(other.my_compare),
101           data(other.data)
102     {
103         my_aggregator.initialize_handler(functor{this});
104     }
105 
106     concurrent_priority_queue( const concurrent_priority_queue& other, const allocator_type& alloc )
107         : mark(other.mark), my_size(other.my_size.load(std::memory_order_relaxed)), my_compare(other.my_compare),
108           data(other.data, alloc)
109     {
110         my_aggregator.initialize_handler(functor{this});
111     }
112 
113     concurrent_priority_queue( concurrent_priority_queue&& other )
114         : mark(other.mark), my_size(other.my_size.load(std::memory_order_relaxed)), my_compare(other.my_compare),
115           data(std::move(other.data))
116     {
117         my_aggregator.initialize_handler(functor{this});
118     }
119 
120     concurrent_priority_queue( concurrent_priority_queue&& other, const allocator_type& alloc )
121         : mark(other.mark), my_size(other.my_size.load(std::memory_order_relaxed)), my_compare(other.my_compare),
122           data(std::move(other.data), alloc)
123     {
124         my_aggregator.initialize_handler(functor{this});
125     }
126 
127     concurrent_priority_queue& operator=( const concurrent_priority_queue& other ) {
128         if (this != &other) {
129             data = other.data;
130             mark = other.mark;
131             my_size.store(other.my_size.load(std::memory_order_relaxed), std::memory_order_relaxed);
132         }
133         return *this;
134     }
135 
136     concurrent_priority_queue& operator=( concurrent_priority_queue&& other ) {
137         if (this != &other) {
138             // TODO: check if exceptions from std::vector::operator=(vector&&) should be handled separately
139             data = std::move(other.data);
140             mark = other.mark;
141             my_size.store(other.my_size.load(std::memory_order_relaxed), std::memory_order_relaxed);
142         }
143         return *this;
144     }
145 
146     concurrent_priority_queue& operator=( std::initializer_list<value_type> init ) {
147         assign(init.begin(), init.end());
148         return *this;
149     }
150 
151     template <typename InputIterator>
152     void assign( InputIterator begin, InputIterator end ) {
153         data.assign(begin, end);
154         mark = 0;
155         my_size.store(data.size(), std::memory_order_relaxed);
156         heapify();
157     }
158 
159     void assign( std::initializer_list<value_type> init ) {
160         assign(init.begin(), init.end());
161     }
162 
163     /* Returned value may not reflect results of pending operations.
164        This operation reads shared data and will trigger a race condition. */
165     __TBB_nodiscard bool empty() const { return size() == 0; }
166 
167     // Returns the current number of elements contained in the queue
168     /* Returned value may not reflect results of pending operations.
169        This operation reads shared data and will trigger a race condition. */
170     size_type size() const { return my_size.load(std::memory_order_relaxed); }
171 
172     /* This operation can be safely used concurrently with other push, try_pop or emplace operations. */
173     void push( const value_type& value ) {
174         cpq_operation op_data(value, PUSH_OP);
175         my_aggregator.execute(&op_data);
176         if (op_data.status == FAILED)
177             throw_exception(exception_id::bad_alloc);
178     }
179 
180     /* This operation can be safely used concurrently with other push, try_pop or emplace operations. */
181     void push( value_type&& value ) {
182         cpq_operation op_data(value, PUSH_RVALUE_OP);
183         my_aggregator.execute(&op_data);
184         if (op_data.status == FAILED)
185             throw_exception(exception_id::bad_alloc);
186     }
187 
188     /* This operation can be safely used concurrently with other push, try_pop or emplace operations. */
189     template <typename... Args>
190     void emplace( Args&&... args ) {
191         // TODO: support uses allocator construction in this place
192         push(value_type(std::forward<Args>(args)...));
193     }
194 
195     // Gets a reference to and removes highest priority element
196     /* If a highest priority element was found, sets elem and returns true,
197        otherwise returns false.
198        This operation can be safely used concurrently with other push, try_pop or emplace operations. */
199     bool try_pop( value_type& value ) {
200         cpq_operation op_data(value, POP_OP);
201         my_aggregator.execute(&op_data);
202         return op_data.status == SUCCEEDED;
203     }
204 
205     // This operation affects the whole container => it is not thread-safe
206     void clear() {
207         data.clear();
208         mark = 0;
209         my_size.store(0, std::memory_order_relaxed);
210     }
211 
212     // This operation affects the whole container => it is not thread-safe
213     void swap( concurrent_priority_queue& other ) {
214         if (this != &other) {
215             using std::swap;
216             swap(data, other.data);
217             swap(mark, other.mark);
218 
219             size_type sz = my_size.load(std::memory_order_relaxed);
220             my_size.store(other.my_size.load(std::memory_order_relaxed), std::memory_order_relaxed);
221             other.my_size.store(sz, std::memory_order_relaxed);
222         }
223     }
224 
225     allocator_type get_allocator() const { return data.get_allocator(); }
226 private:
227     enum operation_type {INVALID_OP, PUSH_OP, POP_OP, PUSH_RVALUE_OP};
228     enum operation_status {WAIT = 0, SUCCEEDED, FAILED};
229 
230     class cpq_operation : public aggregated_operation<cpq_operation> {
231     public:
232         operation_type type;
233         union {
234             value_type* elem;
235             size_type sz;
236         };
237         cpq_operation( const value_type& value, operation_type t )
238             : type(t), elem(const_cast<value_type*>(&value)) {}
239     }; // class cpq_operation
240 
241     class functor {
242         concurrent_priority_queue* my_cpq;
243     public:
244         functor() : my_cpq(nullptr) {}
245         functor( concurrent_priority_queue* cpq ) : my_cpq(cpq) {}
246 
247         void operator()(cpq_operation* op_list) {
248             __TBB_ASSERT(my_cpq != nullptr, "Invalid functor");
249             my_cpq->handle_operations(op_list);
250         }
251     }; // class functor
252 
253     void handle_operations( cpq_operation* op_list ) {
254         call_itt_notify(acquired, this);
255         cpq_operation* tmp, *pop_list = nullptr;
256         __TBB_ASSERT(mark == data.size(), NULL);
257 
258         // First pass processes all constant (amortized; reallocation may happen) time pushes and pops.
259         while(op_list) {
260             // ITT note: &(op_list->status) tag is used to cover accesses to op_list
261             // node. This thread is going to handle the operation, and so will acquire it
262             // and perform the associated operation w/o triggering a race condition; the
263             // thread that created the operation is waiting on the status field, so when
264             // this thread is done with the operation, it will perform a
265             // store_with_release to give control back to the waiting thread in
266             // aggregator::insert_operation.
267             // TODO: enable
268             call_itt_notify(acquired, &(op_list->status));
269             __TBB_ASSERT(op_list->type != INVALID_OP, NULL);
270 
271             tmp = op_list;
272             op_list = op_list->next.load(std::memory_order_relaxed);
273             if (tmp->type == POP_OP) {
274                 if (mark < data.size() &&
275                     my_compare(data[0], data.back()))
276                 {
277                     // there are newly pushed elems and the last one is higher than top
278                     *(tmp->elem) = std::move(data.back());
279                     my_size.store(my_size.load(std::memory_order_relaxed) - 1, std::memory_order_relaxed);
280                     tmp->status.store(uintptr_t(SUCCEEDED), std::memory_order_release);
281 
282                     data.pop_back();
283                     __TBB_ASSERT(mark <= data.size(), NULL);
284                 } else { // no convenient item to pop; postpone
285                     tmp->next.store(pop_list, std::memory_order_relaxed);
286                     pop_list = tmp;
287                 }
288             } else { // PUSH_OP or PUSH_RVALUE_OP
289                 __TBB_ASSERT(tmp->type == PUSH_OP || tmp->type == PUSH_RVALUE_OP, "Unknown operation");
290 #if TBB_USE_EXCEPTIONS
291                 try
292 #endif
293                 {
294                     if (tmp->type == PUSH_OP) {
295                         push_back_helper(*(tmp->elem));
296                     } else {
297                         data.push_back(std::move(*(tmp->elem)));
298                     }
299                     my_size.store(my_size.load(std::memory_order_relaxed) + 1, std::memory_order_relaxed);
300                     tmp->status.store(uintptr_t(SUCCEEDED), std::memory_order_release);
301                 }
302 #if TBB_USE_EXCEPTIONS
303                 catch(...) {
304                     tmp->status.store(uintptr_t(FAILED), std::memory_order_release);
305                 }
306 #endif
307             }
308         }
309 
310         // Second pass processes pop operations
311         while(pop_list) {
312             tmp = pop_list;
313             pop_list = pop_list->next.load(std::memory_order_relaxed);
314             __TBB_ASSERT(tmp->type == POP_OP, NULL);
315             if (data.empty()) {
316                 tmp->status.store(uintptr_t(FAILED), std::memory_order_release);
317             } else {
318                 __TBB_ASSERT(mark <= data.size(), NULL);
319                 if (mark < data.size() &&
320                     my_compare(data[0], data.back()))
321                 {
322                     // there are newly pushed elems and the last one is higher than top
323                     *(tmp->elem) = std::move(data.back());
324                     my_size.store(my_size.load(std::memory_order_relaxed) - 1, std::memory_order_relaxed);
325                     tmp->status.store(uintptr_t(SUCCEEDED), std::memory_order_release);
326                     data.pop_back();
327                 } else { // extract top and push last element down heap
328                     *(tmp->elem) = std::move(data[0]);
329                     my_size.store(my_size.load(std::memory_order_relaxed) - 1, std::memory_order_relaxed);
330                     tmp->status.store(uintptr_t(SUCCEEDED), std::memory_order_release);
331                     reheap();
332                 }
333             }
334         }
335 
336         // heapify any leftover pushed elements before doing the next
337         // batch of operations
338         if (mark < data.size()) heapify();
339         __TBB_ASSERT(mark == data.size(), NULL);
340         call_itt_notify(releasing, this);
341     }
342 
343     // Merge unsorted elements into heap
344     void heapify() {
345         if (!mark && data.size() > 0) mark = 1;
346         for (; mark < data.size(); ++mark) {
347             // for each unheapified element under size
348             size_type cur_pos = mark;
349             value_type to_place = std::move(data[mark]);
350             do { // push to_place up the heap
351                 size_type parent = (cur_pos - 1) >> 1;
352                 if (!my_compare(data[parent], to_place))
353                     break;
354                 data[cur_pos] = std::move(data[parent]);
355                 cur_pos = parent;
356             } while(cur_pos);
357             data[cur_pos] = std::move(to_place);
358         }
359     }
360 
361     // Re-heapify after an extraction
362     // Re-heapify by pushing last element down the heap from the root.
363     void reheap() {
364         size_type cur_pos = 0, child = 1;
365 
366         while(child < mark) {
367             size_type target = child;
368             if (child + 1 < mark && my_compare(data[child], data[child + 1]))
369                 ++target;
370             // target now has the higher priority child
371             if (my_compare(data[target], data.back()))
372                 break;
373             data[cur_pos] = std::move(data[target]);
374             cur_pos = target;
375             child = (cur_pos << 1) + 1;
376         }
377         if (cur_pos != data.size() - 1)
378             data[cur_pos] = std::move(data.back());
379         data.pop_back();
380         if (mark > data.size()) mark = data.size();
381     }
382 
383     void push_back_helper( const T& value ) {
384         push_back_helper_impl(value, std::is_copy_constructible<T>{});
385     }
386 
387     void push_back_helper_impl( const T& value, /*is_copy_constructible = */std::true_type ) {
388         data.push_back(value);
389     }
390 
391     void push_back_helper_impl( const T&, /*is_copy_constructible = */std::false_type ) {
392         __TBB_ASSERT(false, "error: calling tbb::concurrent_priority_queue.push(const value_type&) for move-only type");
393     }
394 
395     using aggregator_type = aggregator<functor, cpq_operation>;
396 
397     aggregator_type my_aggregator;
398     // Padding added to avoid false sharing
399     char padding1[max_nfs_size - sizeof(aggregator_type)];
400     // The point at which unsorted elements begin
401     size_type mark;
402     std::atomic<size_type> my_size;
403     Compare my_compare;
404 
405     // Padding added to avoid false sharing
406     char padding2[max_nfs_size - (2*sizeof(size_type)) - sizeof(Compare)];
407     //! Storage for the heap of elements in queue, plus unheapified elements
408     /** data has the following structure:
409 
410          binary unheapified
411           heap   elements
412         ____|_______|____
413         |       |       |
414         v       v       v
415         [_|...|_|_|...|_| |...| ]
416          0       ^       ^       ^
417                  |       |       |__capacity
418                  |       |__my_size
419                  |__mark
420 
421         Thus, data stores the binary heap starting at position 0 through
422         mark-1 (it may be empty).  Then there are 0 or more elements
423         that have not yet been inserted into the heap, in positions
424         mark through my_size-1. */
425 
426     using vector_type = std::vector<value_type, allocator_type>;
427     vector_type data;
428 
429     friend bool operator==( const concurrent_priority_queue& lhs,
430                             const concurrent_priority_queue& rhs )
431     {
432         return lhs.data == rhs.data;
433     }
434 
435 #if !__TBB_CPP20_COMPARISONS_PRESENT
436     friend bool operator!=( const concurrent_priority_queue& lhs,
437                             const concurrent_priority_queue& rhs )
438     {
439         return !(lhs == rhs);
440     }
441 #endif
442 }; // class concurrent_priority_queue
443 
444 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
445 template <typename It,
446           typename Comp = std::less<iterator_value_t<It>>,
447           typename Alloc = tbb::cache_aligned_allocator<iterator_value_t<It>>,
448           typename = std::enable_if_t<is_input_iterator_v<It>>,
449           typename = std::enable_if_t<is_allocator_v<Alloc>>,
450           typename = std::enable_if_t<!is_allocator_v<Comp>>>
451 concurrent_priority_queue( It, It, Comp = Comp(), Alloc = Alloc() )
452 -> concurrent_priority_queue<iterator_value_t<It>, Comp, Alloc>;
453 
454 template <typename It, typename Alloc,
455           typename = std::enable_if_t<is_input_iterator_v<It>>,
456           typename = std::enable_if_t<is_allocator_v<Alloc>>>
457 concurrent_priority_queue( It, It, Alloc )
458 -> concurrent_priority_queue<iterator_value_t<It>, std::less<iterator_value_t<It>>, Alloc>;
459 
460 template <typename T,
461           typename Comp = std::less<T>,
462           typename Alloc = tbb::cache_aligned_allocator<T>,
463           typename = std::enable_if_t<is_allocator_v<Alloc>>,
464           typename = std::enable_if_t<!is_allocator_v<Comp>>>
465 concurrent_priority_queue( std::initializer_list<T>, Comp = Comp(), Alloc = Alloc() )
466 -> concurrent_priority_queue<T, Comp, Alloc>;
467 
468 template <typename T, typename Alloc,
469           typename = std::enable_if_t<is_allocator_v<Alloc>>>
470 concurrent_priority_queue( std::initializer_list<T>, Alloc )
471 -> concurrent_priority_queue<T, std::less<T>, Alloc>;
472 
473 #endif // __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
474 
475 template <typename T, typename Compare, typename Allocator>
476 void swap( concurrent_priority_queue<T, Compare, Allocator>& lhs,
477            concurrent_priority_queue<T, Compare, Allocator>& rhs )
478 {
479     lhs.swap(rhs);
480 }
481 
482 } // namespace d1
483 } // namespace detail
484 inline namespace v1 {
485 using detail::d1::concurrent_priority_queue;
486 
487 } // inline namespace v1
488 } // namespace tbb
489 
490 #endif // __TBB_concurrent_priority_queue_H
491