1 /* 2 Copyright (c) 2005-2020 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #ifndef __TBB_concurrent_priority_queue_H 18 #define __TBB_concurrent_priority_queue_H 19 20 #include "detail/_aggregator.h" 21 #include "detail/_template_helpers.h" 22 #include "detail/_allocator_traits.h" 23 #include "detail/_range_common.h" 24 #include "detail/_exception.h" 25 #include "detail/_utils.h" 26 #include "cache_aligned_allocator.h" 27 #include <vector> 28 #include <iterator> 29 #include <functional> 30 #include <utility> 31 #include <initializer_list> 32 #include <type_traits> 33 34 namespace tbb { 35 namespace detail { 36 namespace d1 { 37 38 template <typename T, typename Compare = std::less<T>, typename Allocator = cache_aligned_allocator<T>> 39 class concurrent_priority_queue { 40 public: 41 using value_type = T; 42 using reference = T&; 43 using const_reference = const T&; 44 45 using size_type = std::size_t; 46 using difference_type = std::ptrdiff_t; 47 48 using allocator_type = Allocator; 49 50 concurrent_priority_queue() : concurrent_priority_queue(allocator_type{}) {} 51 52 explicit concurrent_priority_queue( const allocator_type& alloc ) 53 : mark(0), my_size(0), my_compare(), data(alloc) 54 { 55 my_aggregator.initialize_handler(functor{this}); 56 } 57 58 explicit concurrent_priority_queue( const Compare& compare, const allocator_type& alloc = allocator_type() ) 59 : mark(0), my_size(0), my_compare(compare), data(alloc) 60 { 61 my_aggregator.initialize_handler(functor{this}); 62 } 63 64 explicit concurrent_priority_queue( size_type init_capacity, const allocator_type& alloc = allocator_type() ) 65 : mark(0), my_size(0), my_compare(), data(alloc) 66 { 67 data.reserve(init_capacity); 68 my_aggregator.initialize_handler(functor{this}); 69 } 70 71 explicit concurrent_priority_queue( size_type init_capacity, const Compare& compare, const allocator_type& alloc = allocator_type() ) 72 : mark(0), my_size(0), my_compare(compare), data(alloc) 73 { 74 data.reserve(init_capacity); 75 my_aggregator.initialize_handler(functor{this}); 76 } 77 78 template <typename InputIterator> 79 concurrent_priority_queue( InputIterator begin, InputIterator end, const Compare& compare, const allocator_type& alloc = allocator_type() ) 80 : mark(0), my_compare(compare), data(begin, end, alloc) 81 { 82 my_aggregator.initialize_handler(functor{this}); 83 heapify(); 84 my_size.store(data.size(), std::memory_order_relaxed); 85 } 86 87 template <typename InputIterator> 88 concurrent_priority_queue( InputIterator begin, InputIterator end, const allocator_type& alloc = allocator_type() ) 89 : concurrent_priority_queue(begin, end, Compare(), alloc) {} 90 91 concurrent_priority_queue( std::initializer_list<value_type> init, const Compare& compare, const allocator_type& alloc = allocator_type() ) 92 : concurrent_priority_queue(init.begin(), init.end(), compare, alloc) {} 93 94 concurrent_priority_queue( std::initializer_list<value_type> init, const allocator_type& alloc = allocator_type() ) 95 : concurrent_priority_queue(init, Compare(), alloc) {} 96 97 concurrent_priority_queue( const concurrent_priority_queue& other ) 98 : mark(other.mark), my_size(other.my_size.load(std::memory_order_relaxed)), my_compare(other.my_compare), 99 data(other.data) 100 { 101 my_aggregator.initialize_handler(functor{this}); 102 } 103 104 concurrent_priority_queue( const concurrent_priority_queue& other, const allocator_type& alloc ) 105 : mark(other.mark), my_size(other.my_size.load(std::memory_order_relaxed)), my_compare(other.my_compare), 106 data(other.data, alloc) 107 { 108 my_aggregator.initialize_handler(functor{this}); 109 } 110 111 concurrent_priority_queue( concurrent_priority_queue&& other ) 112 : mark(other.mark), my_size(other.my_size.load(std::memory_order_relaxed)), my_compare(other.my_compare), 113 data(std::move(other.data)) 114 { 115 my_aggregator.initialize_handler(functor{this}); 116 } 117 118 concurrent_priority_queue( concurrent_priority_queue&& other, const allocator_type& alloc ) 119 : mark(other.mark), my_size(other.my_size.load(std::memory_order_relaxed)), my_compare(other.my_compare), 120 data(std::move(other.data), alloc) 121 { 122 my_aggregator.initialize_handler(functor{this}); 123 } 124 125 concurrent_priority_queue& operator=( const concurrent_priority_queue& other ) { 126 if (this != &other) { 127 data = other.data; 128 mark = other.mark; 129 my_size.store(other.my_size.load(std::memory_order_relaxed), std::memory_order_relaxed); 130 } 131 return *this; 132 } 133 134 concurrent_priority_queue& operator=( concurrent_priority_queue&& other ) { 135 if (this != &other) { 136 // TODO: check if exceptions from std::vector::operator=(vector&&) should be handled separately 137 data = std::move(other.data); 138 mark = other.mark; 139 my_size.store(other.my_size.load(std::memory_order_relaxed), std::memory_order_relaxed); 140 } 141 return *this; 142 } 143 144 concurrent_priority_queue& operator=( std::initializer_list<value_type> init ) { 145 assign(init.begin(), init.end()); 146 return *this; 147 } 148 149 template <typename InputIterator> 150 void assign( InputIterator begin, InputIterator end ) { 151 data.assign(begin, end); 152 mark = 0; 153 my_size.store(data.size(), std::memory_order_relaxed); 154 heapify(); 155 } 156 157 void assign( std::initializer_list<value_type> init ) { 158 assign(init.begin(), init.end()); 159 } 160 161 /* Returned value may not reflect results of pending operations. 162 This operation reads shared data and will trigger a race condition. */ 163 bool empty() const { return size() == 0; } 164 165 // Returns the current number of elements contained in the queue 166 /* Returned value may not reflect results of pending operations. 167 This operation reads shared data and will trigger a race condition. */ 168 size_type size() const { return my_size.load(std::memory_order_relaxed); } 169 170 /* This operation can be safely used concurrently with other push, try_pop or emplace operations. */ 171 void push( const value_type& value ) { 172 cpq_operation op_data(value, PUSH_OP); 173 my_aggregator.execute(&op_data); 174 if (op_data.status == FAILED) 175 throw_exception(exception_id::bad_alloc); 176 } 177 178 /* This operation can be safely used concurrently with other push, try_pop or emplace operations. */ 179 void push( value_type&& value ) { 180 cpq_operation op_data(value, PUSH_RVALUE_OP); 181 my_aggregator.execute(&op_data); 182 if (op_data.status == FAILED) 183 throw_exception(exception_id::bad_alloc); 184 } 185 186 /* This operation can be safely used concurrently with other push, try_pop or emplace operations. */ 187 template <typename... Args> 188 void emplace( Args&&... args ) { 189 // TODO: support uses allocator construction in this place 190 push(value_type(std::forward<Args>(args)...)); 191 } 192 193 // Gets a reference to and removes highest priority element 194 /* If a highest priority element was found, sets elem and returns true, 195 otherwise returns false. 196 This operation can be safely used concurrently with other push, try_pop or emplace operations. */ 197 bool try_pop( value_type& value ) { 198 cpq_operation op_data(value, POP_OP); 199 my_aggregator.execute(&op_data); 200 return op_data.status == SUCCEEDED; 201 } 202 203 // This operation affects the whole container => it is not thread-safe 204 void clear() { 205 data.clear(); 206 mark = 0; 207 my_size.store(0, std::memory_order_relaxed); 208 } 209 210 // This operation affects the whole container => it is not thread-safe 211 void swap( concurrent_priority_queue& other ) { 212 if (this != &other) { 213 using std::swap; 214 swap(data, other.data); 215 swap(mark, other.mark); 216 217 size_type sz = my_size.load(std::memory_order_relaxed); 218 my_size.store(other.my_size.load(std::memory_order_relaxed), std::memory_order_relaxed); 219 other.my_size.store(sz, std::memory_order_relaxed); 220 } 221 } 222 223 allocator_type get_allocator() const { return data.get_allocator(); } 224 private: 225 enum operation_type {INVALID_OP, PUSH_OP, POP_OP, PUSH_RVALUE_OP}; 226 enum operation_status {WAIT = 0, SUCCEEDED, FAILED}; 227 228 class cpq_operation : public aggregated_operation<cpq_operation> { 229 public: 230 operation_type type; 231 union { 232 value_type* elem; 233 size_type sz; 234 }; 235 cpq_operation( const value_type& value, operation_type t ) 236 : type(t), elem(const_cast<value_type*>(&value)) {} 237 }; // class cpq_operation 238 239 class functor { 240 concurrent_priority_queue* my_cpq; 241 public: 242 functor() : my_cpq(nullptr) {} 243 functor( concurrent_priority_queue* cpq ) : my_cpq(cpq) {} 244 245 void operator()(cpq_operation* op_list) { 246 __TBB_ASSERT(my_cpq != nullptr, "Invalid functor"); 247 my_cpq->handle_operations(op_list); 248 } 249 }; // class functor 250 251 void handle_operations( cpq_operation* op_list ) { 252 call_itt_notify(acquired, this); 253 cpq_operation* tmp, *pop_list = nullptr; 254 __TBB_ASSERT(mark == data.size(), NULL); 255 256 // First pass processes all constant (amortized; reallocation may happen) time pushes and pops. 257 while(op_list) { 258 // ITT note: &(op_list->status) tag is used to cover accesses to op_list 259 // node. This thread is going to handle the operation, and so will acquire it 260 // and perform the associated operation w/o triggering a race condition; the 261 // thread that created the operation is waiting on the status field, so when 262 // this thread is done with the operation, it will perform a 263 // store_with_release to give control back to the waiting thread in 264 // aggregator::insert_operation. 265 // TODO: enable 266 call_itt_notify(acquired, &(op_list->status)); 267 __TBB_ASSERT(op_list->type != INVALID_OP, NULL); 268 269 tmp = op_list; 270 op_list = op_list->next.load(std::memory_order_relaxed); 271 if (tmp->type == POP_OP) { 272 if (mark < data.size() && 273 my_compare(data[0], data.back())) 274 { 275 // there are newly pushed elems and the last one is higher than top 276 *(tmp->elem) = std::move(data.back()); 277 my_size.store(my_size.load(std::memory_order_relaxed) - 1, std::memory_order_relaxed); 278 tmp->status.store(uintptr_t(SUCCEEDED), std::memory_order_release); 279 280 data.pop_back(); 281 __TBB_ASSERT(mark <= data.size(), NULL); 282 } else { // no convenient item to pop; postpone 283 tmp->next.store(pop_list, std::memory_order_relaxed); 284 pop_list = tmp; 285 } 286 } else { // PUSH_OP or PUSH_RVALUE_OP 287 __TBB_ASSERT(tmp->type == PUSH_OP || tmp->type == PUSH_RVALUE_OP, "Unknown operation"); 288 #if TBB_USE_EXCEPTIONS 289 try 290 #endif 291 { 292 if (tmp->type == PUSH_OP) { 293 push_back_helper(*(tmp->elem)); 294 } else { 295 data.push_back(std::move(*(tmp->elem))); 296 } 297 my_size.store(my_size.load(std::memory_order_relaxed) + 1, std::memory_order_relaxed); 298 tmp->status.store(uintptr_t(SUCCEEDED), std::memory_order_release); 299 } 300 #if TBB_USE_EXCEPTIONS 301 catch(...) { 302 tmp->status.store(uintptr_t(FAILED), std::memory_order_release); 303 } 304 #endif 305 } 306 } 307 308 // Second pass processes pop operations 309 while(pop_list) { 310 tmp = pop_list; 311 pop_list = pop_list->next.load(std::memory_order_relaxed); 312 __TBB_ASSERT(tmp->type == POP_OP, NULL); 313 if (data.empty()) { 314 tmp->status.store(uintptr_t(FAILED), std::memory_order_release); 315 } else { 316 __TBB_ASSERT(mark <= data.size(), NULL); 317 if (mark < data.size() && 318 my_compare(data[0], data.back())) 319 { 320 // there are newly pushed elems and the last one is higher than top 321 *(tmp->elem) = std::move(data.back()); 322 my_size.store(my_size.load(std::memory_order_relaxed) - 1, std::memory_order_relaxed); 323 tmp->status.store(uintptr_t(SUCCEEDED), std::memory_order_release); 324 data.pop_back(); 325 } else { // extract top and push last element down heap 326 *(tmp->elem) = std::move(data[0]); 327 my_size.store(my_size.load(std::memory_order_relaxed) - 1, std::memory_order_relaxed); 328 tmp->status.store(uintptr_t(SUCCEEDED), std::memory_order_release); 329 reheap(); 330 } 331 } 332 } 333 334 // heapify any leftover pushed elements before doing the next 335 // batch of operations 336 if (mark < data.size()) heapify(); 337 __TBB_ASSERT(mark == data.size(), NULL); 338 call_itt_notify(releasing, this); 339 } 340 341 // Merge unsorted elements into heap 342 void heapify() { 343 if (!mark && data.size() > 0) mark = 1; 344 for (; mark < data.size(); ++mark) { 345 // for each unheapified element under size 346 size_type cur_pos = mark; 347 value_type to_place = std::move(data[mark]); 348 do { // push to_place up the heap 349 size_type parent = (cur_pos - 1) >> 1; 350 if (!my_compare(data[parent], to_place)) 351 break; 352 data[cur_pos] = std::move(data[parent]); 353 cur_pos = parent; 354 } while(cur_pos); 355 data[cur_pos] = std::move(to_place); 356 } 357 } 358 359 // Re-heapify after an extraction 360 // Re-heapify by pushing last element down the heap from the root. 361 void reheap() { 362 size_type cur_pos = 0, child = 1; 363 364 while(child < mark) { 365 size_type target = child; 366 if (child + 1 < mark && my_compare(data[child], data[child + 1])) 367 ++target; 368 // target now has the higher priority child 369 if (my_compare(data[target], data.back())) 370 break; 371 data[cur_pos] = std::move(data[target]); 372 cur_pos = target; 373 child = (cur_pos << 1) + 1; 374 } 375 if (cur_pos != data.size() - 1) 376 data[cur_pos] = std::move(data.back()); 377 data.pop_back(); 378 if (mark > data.size()) mark = data.size(); 379 } 380 381 void push_back_helper( const T& value ) { 382 push_back_helper_impl(value, std::is_copy_constructible<T>{}); 383 } 384 385 void push_back_helper_impl( const T& value, /*is_copy_constructible = */std::true_type ) { 386 data.push_back(value); 387 } 388 389 void push_back_helper_impl( const T&, /*is_copy_constructible = */std::false_type ) { 390 __TBB_ASSERT(false, "error: calling tbb::concurrent_priority_queue.push(const value_type&) for move-only type"); 391 } 392 393 using aggregator_type = aggregator<functor, cpq_operation>; 394 395 aggregator_type my_aggregator; 396 // Padding added to avoid false sharing 397 char padding1[max_nfs_size - sizeof(aggregator_type)]; 398 // The point at which unsorted elements begin 399 size_type mark; 400 std::atomic<size_type> my_size; 401 Compare my_compare; 402 403 // Padding added to avoid false sharing 404 char padding2[max_nfs_size - (2*sizeof(size_type)) - sizeof(Compare)]; 405 //! Storage for the heap of elements in queue, plus unheapified elements 406 /** data has the following structure: 407 408 binary unheapified 409 heap elements 410 ____|_______|____ 411 | | | 412 v v v 413 [_|...|_|_|...|_| |...| ] 414 0 ^ ^ ^ 415 | | |__capacity 416 | |__my_size 417 |__mark 418 419 Thus, data stores the binary heap starting at position 0 through 420 mark-1 (it may be empty). Then there are 0 or more elements 421 that have not yet been inserted into the heap, in positions 422 mark through my_size-1. */ 423 424 using vector_type = std::vector<value_type, allocator_type>; 425 vector_type data; 426 427 template <typename Type, typename Comp, typename Alloc> 428 friend bool operator==( const concurrent_priority_queue<Type, Comp, Alloc>&, 429 const concurrent_priority_queue<Type, Comp, Alloc>& ); 430 }; // class concurrent_priority_queue 431 432 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT 433 template <typename T, typename... Args> 434 using priority_queue_type = concurrent_priority_queue<T, 435 std::conditional_t<(sizeof...(Args) > 0) && !is_allocator_v<pack_element_t<0, Args...>>, 436 pack_element_t<0, Args...>, std::less<T>>, 437 std::conditional_t<(sizeof...(Args) > 0) && is_allocator_v<pack_element_t<sizeof...(Args) - 1, Args...>>, 438 pack_element_t<sizeof...(Args) - 1, Args...>, 439 cache_aligned_allocator<T>>>; 440 441 template <typename InputIterator, 442 typename T = typename std::iterator_traits<InputIterator>::value_type, 443 typename... Args> 444 concurrent_priority_queue( InputIterator, InputIterator, Args... ) 445 -> priority_queue_type<T, Args...>; 446 447 template <typename T, typename CompareOrAllocator> 448 concurrent_priority_queue( std::initializer_list<T> init_list, CompareOrAllocator ) 449 -> priority_queue_type<T, CompareOrAllocator>; 450 #endif // __TBB_CPP17_DEDUCTION_GUIDES_PRESENT 451 452 template <typename T, typename Compare, typename Allocator> 453 bool operator==( const concurrent_priority_queue<T, Compare, Allocator>& lhs, 454 const concurrent_priority_queue<T, Compare, Allocator>& rhs ) 455 { 456 return lhs.data == rhs.data; 457 } 458 459 template <typename T, typename Compare, typename Allocator> 460 bool operator!=( const concurrent_priority_queue<T, Compare, Allocator>& lhs, 461 const concurrent_priority_queue<T, Compare, Allocator>& rhs ) 462 { 463 return !(lhs == rhs); 464 } 465 466 template <typename T, typename Compare, typename Allocator> 467 void swap( concurrent_priority_queue<T, Compare, Allocator>& lhs, 468 concurrent_priority_queue<T, Compare, Allocator>& rhs ) 469 { 470 lhs.swap(rhs); 471 } 472 473 } // namespace d1 474 } // namespace detail 475 inline namespace v1 { 476 using detail::d1::concurrent_priority_queue; 477 478 } // inline namespace v1 479 } // namespace tbb 480 481 #endif // __TBB_concurrent_priority_queue_H 482