1 /* 2 Copyright (c) 2005-2020 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #ifndef __TBB_concurrent_priority_queue_H 18 #define __TBB_concurrent_priority_queue_H 19 20 #include "detail/_namespace_injection.h" 21 #include "detail/_aggregator.h" 22 #include "detail/_template_helpers.h" 23 #include "detail/_allocator_traits.h" 24 #include "detail/_range_common.h" 25 #include "detail/_exception.h" 26 #include "detail/_utils.h" 27 #include "cache_aligned_allocator.h" 28 #include <vector> 29 #include <iterator> 30 #include <functional> 31 #include <utility> 32 #include <initializer_list> 33 #include <type_traits> 34 35 namespace tbb { 36 namespace detail { 37 namespace d1 { 38 39 template <typename T, typename Compare = std::less<T>, typename Allocator = cache_aligned_allocator<T>> 40 class concurrent_priority_queue { 41 public: 42 using value_type = T; 43 using reference = T&; 44 using const_reference = const T&; 45 46 using size_type = std::size_t; 47 using difference_type = std::ptrdiff_t; 48 49 using allocator_type = Allocator; 50 51 concurrent_priority_queue() : concurrent_priority_queue(allocator_type{}) {} 52 53 explicit concurrent_priority_queue( const allocator_type& alloc ) 54 : mark(0), my_size(0), my_compare(), data(alloc) 55 { 56 my_aggregator.initialize_handler(functor{this}); 57 } 58 59 explicit concurrent_priority_queue( const Compare& compare, const allocator_type& alloc = allocator_type() ) 60 : mark(0), my_size(0), my_compare(compare), data(alloc) 61 { 62 my_aggregator.initialize_handler(functor{this}); 63 } 64 65 explicit concurrent_priority_queue( size_type init_capacity, const allocator_type& alloc = allocator_type() ) 66 : mark(0), my_size(0), my_compare(), data(alloc) 67 { 68 data.reserve(init_capacity); 69 my_aggregator.initialize_handler(functor{this}); 70 } 71 72 explicit concurrent_priority_queue( size_type init_capacity, const Compare& compare, const allocator_type& alloc = allocator_type() ) 73 : mark(0), my_size(0), my_compare(compare), data(alloc) 74 { 75 data.reserve(init_capacity); 76 my_aggregator.initialize_handler(functor{this}); 77 } 78 79 template <typename InputIterator> 80 concurrent_priority_queue( InputIterator begin, InputIterator end, const Compare& compare, const allocator_type& alloc = allocator_type() ) 81 : mark(0), my_compare(compare), data(begin, end, alloc) 82 { 83 my_aggregator.initialize_handler(functor{this}); 84 heapify(); 85 my_size.store(data.size(), std::memory_order_relaxed); 86 } 87 88 template <typename InputIterator> 89 concurrent_priority_queue( InputIterator begin, InputIterator end, const allocator_type& alloc = allocator_type() ) 90 : concurrent_priority_queue(begin, end, Compare(), alloc) {} 91 92 concurrent_priority_queue( std::initializer_list<value_type> init, const Compare& compare, const allocator_type& alloc = allocator_type() ) 93 : concurrent_priority_queue(init.begin(), init.end(), compare, alloc) {} 94 95 concurrent_priority_queue( std::initializer_list<value_type> init, const allocator_type& alloc = allocator_type() ) 96 : concurrent_priority_queue(init, Compare(), alloc) {} 97 98 concurrent_priority_queue( const concurrent_priority_queue& other ) 99 : mark(other.mark), my_size(other.my_size.load(std::memory_order_relaxed)), my_compare(other.my_compare), 100 data(other.data) 101 { 102 my_aggregator.initialize_handler(functor{this}); 103 } 104 105 concurrent_priority_queue( const concurrent_priority_queue& other, const allocator_type& alloc ) 106 : mark(other.mark), my_size(other.my_size.load(std::memory_order_relaxed)), my_compare(other.my_compare), 107 data(other.data, alloc) 108 { 109 my_aggregator.initialize_handler(functor{this}); 110 } 111 112 concurrent_priority_queue( concurrent_priority_queue&& other ) 113 : mark(other.mark), my_size(other.my_size.load(std::memory_order_relaxed)), my_compare(other.my_compare), 114 data(std::move(other.data)) 115 { 116 my_aggregator.initialize_handler(functor{this}); 117 } 118 119 concurrent_priority_queue( concurrent_priority_queue&& other, const allocator_type& alloc ) 120 : mark(other.mark), my_size(other.my_size.load(std::memory_order_relaxed)), my_compare(other.my_compare), 121 data(std::move(other.data), alloc) 122 { 123 my_aggregator.initialize_handler(functor{this}); 124 } 125 126 concurrent_priority_queue& operator=( const concurrent_priority_queue& other ) { 127 if (this != &other) { 128 data = other.data; 129 mark = other.mark; 130 my_size.store(other.my_size.load(std::memory_order_relaxed), std::memory_order_relaxed); 131 } 132 return *this; 133 } 134 135 concurrent_priority_queue& operator=( concurrent_priority_queue&& other ) { 136 if (this != &other) { 137 // TODO: check if exceptions from std::vector::operator=(vector&&) should be handled separately 138 data = std::move(other.data); 139 mark = other.mark; 140 my_size.store(other.my_size.load(std::memory_order_relaxed), std::memory_order_relaxed); 141 } 142 return *this; 143 } 144 145 concurrent_priority_queue& operator=( std::initializer_list<value_type> init ) { 146 assign(init.begin(), init.end()); 147 return *this; 148 } 149 150 template <typename InputIterator> 151 void assign( InputIterator begin, InputIterator end ) { 152 data.assign(begin, end); 153 mark = 0; 154 my_size.store(data.size(), std::memory_order_relaxed); 155 heapify(); 156 } 157 158 void assign( std::initializer_list<value_type> init ) { 159 assign(init.begin(), init.end()); 160 } 161 162 /* Returned value may not reflect results of pending operations. 163 This operation reads shared data and will trigger a race condition. */ 164 bool empty() const { return size() == 0; } 165 166 // Returns the current number of elements contained in the queue 167 /* Returned value may not reflect results of pending operations. 168 This operation reads shared data and will trigger a race condition. */ 169 size_type size() const { return my_size.load(std::memory_order_relaxed); } 170 171 /* This operation can be safely used concurrently with other push, try_pop or emplace operations. */ 172 void push( const value_type& value ) { 173 cpq_operation op_data(value, PUSH_OP); 174 my_aggregator.execute(&op_data); 175 if (op_data.status == FAILED) 176 throw_exception(exception_id::bad_alloc); 177 } 178 179 /* This operation can be safely used concurrently with other push, try_pop or emplace operations. */ 180 void push( value_type&& value ) { 181 cpq_operation op_data(value, PUSH_RVALUE_OP); 182 my_aggregator.execute(&op_data); 183 if (op_data.status == FAILED) 184 throw_exception(exception_id::bad_alloc); 185 } 186 187 /* This operation can be safely used concurrently with other push, try_pop or emplace operations. */ 188 template <typename... Args> 189 void emplace( Args&&... args ) { 190 // TODO: support uses allocator construction in this place 191 push(value_type(std::forward<Args>(args)...)); 192 } 193 194 // Gets a reference to and removes highest priority element 195 /* If a highest priority element was found, sets elem and returns true, 196 otherwise returns false. 197 This operation can be safely used concurrently with other push, try_pop or emplace operations. */ 198 bool try_pop( value_type& value ) { 199 cpq_operation op_data(value, POP_OP); 200 my_aggregator.execute(&op_data); 201 return op_data.status == SUCCEEDED; 202 } 203 204 // This operation affects the whole container => it is not thread-safe 205 void clear() { 206 data.clear(); 207 mark = 0; 208 my_size.store(0, std::memory_order_relaxed); 209 } 210 211 // This operation affects the whole container => it is not thread-safe 212 void swap( concurrent_priority_queue& other ) { 213 if (this != &other) { 214 using std::swap; 215 swap(data, other.data); 216 swap(mark, other.mark); 217 218 size_type sz = my_size.load(std::memory_order_relaxed); 219 my_size.store(other.my_size.load(std::memory_order_relaxed), std::memory_order_relaxed); 220 other.my_size.store(sz, std::memory_order_relaxed); 221 } 222 } 223 224 allocator_type get_allocator() const { return data.get_allocator(); } 225 private: 226 enum operation_type {INVALID_OP, PUSH_OP, POP_OP, PUSH_RVALUE_OP}; 227 enum operation_status {WAIT = 0, SUCCEEDED, FAILED}; 228 229 class cpq_operation : public aggregated_operation<cpq_operation> { 230 public: 231 operation_type type; 232 union { 233 value_type* elem; 234 size_type sz; 235 }; 236 cpq_operation( const value_type& value, operation_type t ) 237 : type(t), elem(const_cast<value_type*>(&value)) {} 238 }; // class cpq_operation 239 240 class functor { 241 concurrent_priority_queue* my_cpq; 242 public: 243 functor() : my_cpq(nullptr) {} 244 functor( concurrent_priority_queue* cpq ) : my_cpq(cpq) {} 245 246 void operator()(cpq_operation* op_list) { 247 __TBB_ASSERT(my_cpq != nullptr, "Invalid functor"); 248 my_cpq->handle_operations(op_list); 249 } 250 }; // class functor 251 252 void handle_operations( cpq_operation* op_list ) { 253 call_itt_notify(acquired, this); 254 cpq_operation* tmp, *pop_list = nullptr; 255 __TBB_ASSERT(mark == data.size(), NULL); 256 257 // First pass processes all constant (amortized; reallocation may happen) time pushes and pops. 258 while(op_list) { 259 // ITT note: &(op_list->status) tag is used to cover accesses to op_list 260 // node. This thread is going to handle the operation, and so will acquire it 261 // and perform the associated operation w/o triggering a race condition; the 262 // thread that created the operation is waiting on the status field, so when 263 // this thread is done with the operation, it will perform a 264 // store_with_release to give control back to the waiting thread in 265 // aggregator::insert_operation. 266 // TODO: enable 267 call_itt_notify(acquired, &(op_list->status)); 268 __TBB_ASSERT(op_list->type != INVALID_OP, NULL); 269 270 tmp = op_list; 271 op_list = op_list->next.load(std::memory_order_relaxed); 272 if (tmp->type == POP_OP) { 273 if (mark < data.size() && 274 my_compare(data[0], data.back())) 275 { 276 // there are newly pushed elems and the last one is higher than top 277 *(tmp->elem) = std::move(data.back()); 278 my_size.store(my_size.load(std::memory_order_relaxed) - 1, std::memory_order_relaxed); 279 tmp->status.store(uintptr_t(SUCCEEDED), std::memory_order_release); 280 281 data.pop_back(); 282 __TBB_ASSERT(mark <= data.size(), NULL); 283 } else { // no convenient item to pop; postpone 284 tmp->next.store(pop_list, std::memory_order_relaxed); 285 pop_list = tmp; 286 } 287 } else { // PUSH_OP or PUSH_RVALUE_OP 288 __TBB_ASSERT(tmp->type == PUSH_OP || tmp->type == PUSH_RVALUE_OP, "Unknown operation"); 289 #if TBB_USE_EXCEPTIONS 290 try 291 #endif 292 { 293 if (tmp->type == PUSH_OP) { 294 push_back_helper(*(tmp->elem)); 295 } else { 296 data.push_back(std::move(*(tmp->elem))); 297 } 298 my_size.store(my_size.load(std::memory_order_relaxed) + 1, std::memory_order_relaxed); 299 tmp->status.store(uintptr_t(SUCCEEDED), std::memory_order_release); 300 } 301 #if TBB_USE_EXCEPTIONS 302 catch(...) { 303 tmp->status.store(uintptr_t(FAILED), std::memory_order_release); 304 } 305 #endif 306 } 307 } 308 309 // Second pass processes pop operations 310 while(pop_list) { 311 tmp = pop_list; 312 pop_list = pop_list->next.load(std::memory_order_relaxed); 313 __TBB_ASSERT(tmp->type == POP_OP, NULL); 314 if (data.empty()) { 315 tmp->status.store(uintptr_t(FAILED), std::memory_order_release); 316 } else { 317 __TBB_ASSERT(mark <= data.size(), NULL); 318 if (mark < data.size() && 319 my_compare(data[0], data.back())) 320 { 321 // there are newly pushed elems and the last one is higher than top 322 *(tmp->elem) = std::move(data.back()); 323 my_size.store(my_size.load(std::memory_order_relaxed) - 1, std::memory_order_relaxed); 324 tmp->status.store(uintptr_t(SUCCEEDED), std::memory_order_release); 325 data.pop_back(); 326 } else { // extract top and push last element down heap 327 *(tmp->elem) = std::move(data[0]); 328 my_size.store(my_size.load(std::memory_order_relaxed) - 1, std::memory_order_relaxed); 329 tmp->status.store(uintptr_t(SUCCEEDED), std::memory_order_release); 330 reheap(); 331 } 332 } 333 } 334 335 // heapify any leftover pushed elements before doing the next 336 // batch of operations 337 if (mark < data.size()) heapify(); 338 __TBB_ASSERT(mark == data.size(), NULL); 339 call_itt_notify(releasing, this); 340 } 341 342 // Merge unsorted elements into heap 343 void heapify() { 344 if (!mark && data.size() > 0) mark = 1; 345 for (; mark < data.size(); ++mark) { 346 // for each unheapified element under size 347 size_type cur_pos = mark; 348 value_type to_place = std::move(data[mark]); 349 do { // push to_place up the heap 350 size_type parent = (cur_pos - 1) >> 1; 351 if (!my_compare(data[parent], to_place)) 352 break; 353 data[cur_pos] = std::move(data[parent]); 354 cur_pos = parent; 355 } while(cur_pos); 356 data[cur_pos] = std::move(to_place); 357 } 358 } 359 360 // Re-heapify after an extraction 361 // Re-heapify by pushing last element down the heap from the root. 362 void reheap() { 363 size_type cur_pos = 0, child = 1; 364 365 while(child < mark) { 366 size_type target = child; 367 if (child + 1 < mark && my_compare(data[child], data[child + 1])) 368 ++target; 369 // target now has the higher priority child 370 if (my_compare(data[target], data.back())) 371 break; 372 data[cur_pos] = std::move(data[target]); 373 cur_pos = target; 374 child = (cur_pos << 1) + 1; 375 } 376 if (cur_pos != data.size() - 1) 377 data[cur_pos] = std::move(data.back()); 378 data.pop_back(); 379 if (mark > data.size()) mark = data.size(); 380 } 381 382 void push_back_helper( const T& value ) { 383 push_back_helper_impl(value, std::is_copy_constructible<T>{}); 384 } 385 386 void push_back_helper_impl( const T& value, /*is_copy_constructible = */std::true_type ) { 387 data.push_back(value); 388 } 389 390 void push_back_helper_impl( const T&, /*is_copy_constructible = */std::false_type ) { 391 __TBB_ASSERT(false, "error: calling tbb::concurrent_priority_queue.push(const value_type&) for move-only type"); 392 } 393 394 using aggregator_type = aggregator<functor, cpq_operation>; 395 396 aggregator_type my_aggregator; 397 // Padding added to avoid false sharing 398 char padding1[max_nfs_size - sizeof(aggregator_type)]; 399 // The point at which unsorted elements begin 400 size_type mark; 401 std::atomic<size_type> my_size; 402 Compare my_compare; 403 404 // Padding added to avoid false sharing 405 char padding2[max_nfs_size - (2*sizeof(size_type)) - sizeof(Compare)]; 406 //! Storage for the heap of elements in queue, plus unheapified elements 407 /** data has the following structure: 408 409 binary unheapified 410 heap elements 411 ____|_______|____ 412 | | | 413 v v v 414 [_|...|_|_|...|_| |...| ] 415 0 ^ ^ ^ 416 | | |__capacity 417 | |__my_size 418 |__mark 419 420 Thus, data stores the binary heap starting at position 0 through 421 mark-1 (it may be empty). Then there are 0 or more elements 422 that have not yet been inserted into the heap, in positions 423 mark through my_size-1. */ 424 425 using vector_type = std::vector<value_type, allocator_type>; 426 vector_type data; 427 428 template <typename Type, typename Comp, typename Alloc> 429 friend bool operator==( const concurrent_priority_queue<Type, Comp, Alloc>&, 430 const concurrent_priority_queue<Type, Comp, Alloc>& ); 431 }; // class concurrent_priority_queue 432 433 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT 434 template <typename T, typename... Args> 435 using priority_queue_type = concurrent_priority_queue<T, 436 std::conditional_t<(sizeof...(Args) > 0) && !is_allocator_v<pack_element_t<0, Args...>>, 437 pack_element_t<0, Args...>, std::less<T>>, 438 std::conditional_t<(sizeof...(Args) > 0) && is_allocator_v<pack_element_t<sizeof...(Args) - 1, Args...>>, 439 pack_element_t<sizeof...(Args) - 1, Args...>, 440 cache_aligned_allocator<T>>>; 441 442 template <typename InputIterator, 443 typename T = typename std::iterator_traits<InputIterator>::value_type, 444 typename... Args> 445 concurrent_priority_queue( InputIterator, InputIterator, Args... ) 446 -> priority_queue_type<T, Args...>; 447 448 template <typename T, typename CompareOrAllocator> 449 concurrent_priority_queue( std::initializer_list<T> init_list, CompareOrAllocator ) 450 -> priority_queue_type<T, CompareOrAllocator>; 451 #endif // __TBB_CPP17_DEDUCTION_GUIDES_PRESENT 452 453 template <typename T, typename Compare, typename Allocator> 454 bool operator==( const concurrent_priority_queue<T, Compare, Allocator>& lhs, 455 const concurrent_priority_queue<T, Compare, Allocator>& rhs ) 456 { 457 return lhs.data == rhs.data; 458 } 459 460 template <typename T, typename Compare, typename Allocator> 461 bool operator!=( const concurrent_priority_queue<T, Compare, Allocator>& lhs, 462 const concurrent_priority_queue<T, Compare, Allocator>& rhs ) 463 { 464 return !(lhs == rhs); 465 } 466 467 template <typename T, typename Compare, typename Allocator> 468 void swap( concurrent_priority_queue<T, Compare, Allocator>& lhs, 469 concurrent_priority_queue<T, Compare, Allocator>& rhs ) 470 { 471 lhs.swap(rhs); 472 } 473 474 } // namespace d1 475 } // namespace detail 476 inline namespace v1 { 477 using detail::d1::concurrent_priority_queue; 478 479 } // inline namespace v1 480 } // namespace tbb 481 482 #endif // __TBB_concurrent_priority_queue_H 483