1 /* 2 Copyright (c) 2005-2020 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #ifndef __TBB_concurrent_priority_queue_H 18 #define __TBB_concurrent_priority_queue_H 19 20 #include "detail/_namespace_injection.h" 21 #include "detail/_aggregator.h" 22 #include "detail/_template_helpers.h" 23 #include "detail/_allocator_traits.h" 24 #include "detail/_range_common.h" 25 #include "detail/_exception.h" 26 #include "detail/_utils.h" 27 #include "detail/_containers_helpers.h" 28 #include "cache_aligned_allocator.h" 29 #include <vector> 30 #include <iterator> 31 #include <functional> 32 #include <utility> 33 #include <initializer_list> 34 #include <type_traits> 35 36 namespace tbb { 37 namespace detail { 38 namespace d1 { 39 40 template <typename T, typename Compare = std::less<T>, typename Allocator = cache_aligned_allocator<T>> 41 class concurrent_priority_queue { 42 public: 43 using value_type = T; 44 using reference = T&; 45 using const_reference = const T&; 46 47 using size_type = std::size_t; 48 using difference_type = std::ptrdiff_t; 49 50 using allocator_type = Allocator; 51 52 concurrent_priority_queue() : concurrent_priority_queue(allocator_type{}) {} 53 54 explicit concurrent_priority_queue( const allocator_type& alloc ) 55 : mark(0), my_size(0), my_compare(), data(alloc) 56 { 57 my_aggregator.initialize_handler(functor{this}); 58 } 59 60 explicit concurrent_priority_queue( const Compare& compare, const allocator_type& alloc = allocator_type() ) 61 : mark(0), my_size(0), my_compare(compare), data(alloc) 62 { 63 my_aggregator.initialize_handler(functor{this}); 64 } 65 66 explicit concurrent_priority_queue( size_type init_capacity, const allocator_type& alloc = allocator_type() ) 67 : mark(0), my_size(0), my_compare(), data(alloc) 68 { 69 data.reserve(init_capacity); 70 my_aggregator.initialize_handler(functor{this}); 71 } 72 73 explicit concurrent_priority_queue( size_type init_capacity, const Compare& compare, const allocator_type& alloc = allocator_type() ) 74 : mark(0), my_size(0), my_compare(compare), data(alloc) 75 { 76 data.reserve(init_capacity); 77 my_aggregator.initialize_handler(functor{this}); 78 } 79 80 template <typename InputIterator> 81 concurrent_priority_queue( InputIterator begin, InputIterator end, const Compare& compare, const allocator_type& alloc = allocator_type() ) 82 : mark(0), my_compare(compare), data(begin, end, alloc) 83 { 84 my_aggregator.initialize_handler(functor{this}); 85 heapify(); 86 my_size.store(data.size(), std::memory_order_relaxed); 87 } 88 89 template <typename InputIterator> 90 concurrent_priority_queue( InputIterator begin, InputIterator end, const allocator_type& alloc = allocator_type() ) 91 : concurrent_priority_queue(begin, end, Compare(), alloc) {} 92 93 concurrent_priority_queue( std::initializer_list<value_type> init, const Compare& compare, const allocator_type& alloc = allocator_type() ) 94 : concurrent_priority_queue(init.begin(), init.end(), compare, alloc) {} 95 96 concurrent_priority_queue( std::initializer_list<value_type> init, const allocator_type& alloc = allocator_type() ) 97 : concurrent_priority_queue(init, Compare(), alloc) {} 98 99 concurrent_priority_queue( const concurrent_priority_queue& other ) 100 : mark(other.mark), my_size(other.my_size.load(std::memory_order_relaxed)), my_compare(other.my_compare), 101 data(other.data) 102 { 103 my_aggregator.initialize_handler(functor{this}); 104 } 105 106 concurrent_priority_queue( const concurrent_priority_queue& other, const allocator_type& alloc ) 107 : mark(other.mark), my_size(other.my_size.load(std::memory_order_relaxed)), my_compare(other.my_compare), 108 data(other.data, alloc) 109 { 110 my_aggregator.initialize_handler(functor{this}); 111 } 112 113 concurrent_priority_queue( concurrent_priority_queue&& other ) 114 : mark(other.mark), my_size(other.my_size.load(std::memory_order_relaxed)), my_compare(other.my_compare), 115 data(std::move(other.data)) 116 { 117 my_aggregator.initialize_handler(functor{this}); 118 } 119 120 concurrent_priority_queue( concurrent_priority_queue&& other, const allocator_type& alloc ) 121 : mark(other.mark), my_size(other.my_size.load(std::memory_order_relaxed)), my_compare(other.my_compare), 122 data(std::move(other.data), alloc) 123 { 124 my_aggregator.initialize_handler(functor{this}); 125 } 126 127 concurrent_priority_queue& operator=( const concurrent_priority_queue& other ) { 128 if (this != &other) { 129 data = other.data; 130 mark = other.mark; 131 my_size.store(other.my_size.load(std::memory_order_relaxed), std::memory_order_relaxed); 132 } 133 return *this; 134 } 135 136 concurrent_priority_queue& operator=( concurrent_priority_queue&& other ) { 137 if (this != &other) { 138 // TODO: check if exceptions from std::vector::operator=(vector&&) should be handled separately 139 data = std::move(other.data); 140 mark = other.mark; 141 my_size.store(other.my_size.load(std::memory_order_relaxed), std::memory_order_relaxed); 142 } 143 return *this; 144 } 145 146 concurrent_priority_queue& operator=( std::initializer_list<value_type> init ) { 147 assign(init.begin(), init.end()); 148 return *this; 149 } 150 151 template <typename InputIterator> 152 void assign( InputIterator begin, InputIterator end ) { 153 data.assign(begin, end); 154 mark = 0; 155 my_size.store(data.size(), std::memory_order_relaxed); 156 heapify(); 157 } 158 159 void assign( std::initializer_list<value_type> init ) { 160 assign(init.begin(), init.end()); 161 } 162 163 /* Returned value may not reflect results of pending operations. 164 This operation reads shared data and will trigger a race condition. */ 165 bool empty() const { return size() == 0; } 166 167 // Returns the current number of elements contained in the queue 168 /* Returned value may not reflect results of pending operations. 169 This operation reads shared data and will trigger a race condition. */ 170 size_type size() const { return my_size.load(std::memory_order_relaxed); } 171 172 /* This operation can be safely used concurrently with other push, try_pop or emplace operations. */ 173 void push( const value_type& value ) { 174 cpq_operation op_data(value, PUSH_OP); 175 my_aggregator.execute(&op_data); 176 if (op_data.status == FAILED) 177 throw_exception(exception_id::bad_alloc); 178 } 179 180 /* This operation can be safely used concurrently with other push, try_pop or emplace operations. */ 181 void push( value_type&& value ) { 182 cpq_operation op_data(value, PUSH_RVALUE_OP); 183 my_aggregator.execute(&op_data); 184 if (op_data.status == FAILED) 185 throw_exception(exception_id::bad_alloc); 186 } 187 188 /* This operation can be safely used concurrently with other push, try_pop or emplace operations. */ 189 template <typename... Args> 190 void emplace( Args&&... args ) { 191 // TODO: support uses allocator construction in this place 192 push(value_type(std::forward<Args>(args)...)); 193 } 194 195 // Gets a reference to and removes highest priority element 196 /* If a highest priority element was found, sets elem and returns true, 197 otherwise returns false. 198 This operation can be safely used concurrently with other push, try_pop or emplace operations. */ 199 bool try_pop( value_type& value ) { 200 cpq_operation op_data(value, POP_OP); 201 my_aggregator.execute(&op_data); 202 return op_data.status == SUCCEEDED; 203 } 204 205 // This operation affects the whole container => it is not thread-safe 206 void clear() { 207 data.clear(); 208 mark = 0; 209 my_size.store(0, std::memory_order_relaxed); 210 } 211 212 // This operation affects the whole container => it is not thread-safe 213 void swap( concurrent_priority_queue& other ) { 214 if (this != &other) { 215 using std::swap; 216 swap(data, other.data); 217 swap(mark, other.mark); 218 219 size_type sz = my_size.load(std::memory_order_relaxed); 220 my_size.store(other.my_size.load(std::memory_order_relaxed), std::memory_order_relaxed); 221 other.my_size.store(sz, std::memory_order_relaxed); 222 } 223 } 224 225 allocator_type get_allocator() const { return data.get_allocator(); } 226 private: 227 enum operation_type {INVALID_OP, PUSH_OP, POP_OP, PUSH_RVALUE_OP}; 228 enum operation_status {WAIT = 0, SUCCEEDED, FAILED}; 229 230 class cpq_operation : public aggregated_operation<cpq_operation> { 231 public: 232 operation_type type; 233 union { 234 value_type* elem; 235 size_type sz; 236 }; 237 cpq_operation( const value_type& value, operation_type t ) 238 : type(t), elem(const_cast<value_type*>(&value)) {} 239 }; // class cpq_operation 240 241 class functor { 242 concurrent_priority_queue* my_cpq; 243 public: 244 functor() : my_cpq(nullptr) {} 245 functor( concurrent_priority_queue* cpq ) : my_cpq(cpq) {} 246 247 void operator()(cpq_operation* op_list) { 248 __TBB_ASSERT(my_cpq != nullptr, "Invalid functor"); 249 my_cpq->handle_operations(op_list); 250 } 251 }; // class functor 252 253 void handle_operations( cpq_operation* op_list ) { 254 call_itt_notify(acquired, this); 255 cpq_operation* tmp, *pop_list = nullptr; 256 __TBB_ASSERT(mark == data.size(), NULL); 257 258 // First pass processes all constant (amortized; reallocation may happen) time pushes and pops. 259 while(op_list) { 260 // ITT note: &(op_list->status) tag is used to cover accesses to op_list 261 // node. This thread is going to handle the operation, and so will acquire it 262 // and perform the associated operation w/o triggering a race condition; the 263 // thread that created the operation is waiting on the status field, so when 264 // this thread is done with the operation, it will perform a 265 // store_with_release to give control back to the waiting thread in 266 // aggregator::insert_operation. 267 // TODO: enable 268 call_itt_notify(acquired, &(op_list->status)); 269 __TBB_ASSERT(op_list->type != INVALID_OP, NULL); 270 271 tmp = op_list; 272 op_list = op_list->next.load(std::memory_order_relaxed); 273 if (tmp->type == POP_OP) { 274 if (mark < data.size() && 275 my_compare(data[0], data.back())) 276 { 277 // there are newly pushed elems and the last one is higher than top 278 *(tmp->elem) = std::move(data.back()); 279 my_size.store(my_size.load(std::memory_order_relaxed) - 1, std::memory_order_relaxed); 280 tmp->status.store(uintptr_t(SUCCEEDED), std::memory_order_release); 281 282 data.pop_back(); 283 __TBB_ASSERT(mark <= data.size(), NULL); 284 } else { // no convenient item to pop; postpone 285 tmp->next.store(pop_list, std::memory_order_relaxed); 286 pop_list = tmp; 287 } 288 } else { // PUSH_OP or PUSH_RVALUE_OP 289 __TBB_ASSERT(tmp->type == PUSH_OP || tmp->type == PUSH_RVALUE_OP, "Unknown operation"); 290 #if TBB_USE_EXCEPTIONS 291 try 292 #endif 293 { 294 if (tmp->type == PUSH_OP) { 295 push_back_helper(*(tmp->elem)); 296 } else { 297 data.push_back(std::move(*(tmp->elem))); 298 } 299 my_size.store(my_size.load(std::memory_order_relaxed) + 1, std::memory_order_relaxed); 300 tmp->status.store(uintptr_t(SUCCEEDED), std::memory_order_release); 301 } 302 #if TBB_USE_EXCEPTIONS 303 catch(...) { 304 tmp->status.store(uintptr_t(FAILED), std::memory_order_release); 305 } 306 #endif 307 } 308 } 309 310 // Second pass processes pop operations 311 while(pop_list) { 312 tmp = pop_list; 313 pop_list = pop_list->next.load(std::memory_order_relaxed); 314 __TBB_ASSERT(tmp->type == POP_OP, NULL); 315 if (data.empty()) { 316 tmp->status.store(uintptr_t(FAILED), std::memory_order_release); 317 } else { 318 __TBB_ASSERT(mark <= data.size(), NULL); 319 if (mark < data.size() && 320 my_compare(data[0], data.back())) 321 { 322 // there are newly pushed elems and the last one is higher than top 323 *(tmp->elem) = std::move(data.back()); 324 my_size.store(my_size.load(std::memory_order_relaxed) - 1, std::memory_order_relaxed); 325 tmp->status.store(uintptr_t(SUCCEEDED), std::memory_order_release); 326 data.pop_back(); 327 } else { // extract top and push last element down heap 328 *(tmp->elem) = std::move(data[0]); 329 my_size.store(my_size.load(std::memory_order_relaxed) - 1, std::memory_order_relaxed); 330 tmp->status.store(uintptr_t(SUCCEEDED), std::memory_order_release); 331 reheap(); 332 } 333 } 334 } 335 336 // heapify any leftover pushed elements before doing the next 337 // batch of operations 338 if (mark < data.size()) heapify(); 339 __TBB_ASSERT(mark == data.size(), NULL); 340 call_itt_notify(releasing, this); 341 } 342 343 // Merge unsorted elements into heap 344 void heapify() { 345 if (!mark && data.size() > 0) mark = 1; 346 for (; mark < data.size(); ++mark) { 347 // for each unheapified element under size 348 size_type cur_pos = mark; 349 value_type to_place = std::move(data[mark]); 350 do { // push to_place up the heap 351 size_type parent = (cur_pos - 1) >> 1; 352 if (!my_compare(data[parent], to_place)) 353 break; 354 data[cur_pos] = std::move(data[parent]); 355 cur_pos = parent; 356 } while(cur_pos); 357 data[cur_pos] = std::move(to_place); 358 } 359 } 360 361 // Re-heapify after an extraction 362 // Re-heapify by pushing last element down the heap from the root. 363 void reheap() { 364 size_type cur_pos = 0, child = 1; 365 366 while(child < mark) { 367 size_type target = child; 368 if (child + 1 < mark && my_compare(data[child], data[child + 1])) 369 ++target; 370 // target now has the higher priority child 371 if (my_compare(data[target], data.back())) 372 break; 373 data[cur_pos] = std::move(data[target]); 374 cur_pos = target; 375 child = (cur_pos << 1) + 1; 376 } 377 if (cur_pos != data.size() - 1) 378 data[cur_pos] = std::move(data.back()); 379 data.pop_back(); 380 if (mark > data.size()) mark = data.size(); 381 } 382 383 void push_back_helper( const T& value ) { 384 push_back_helper_impl(value, std::is_copy_constructible<T>{}); 385 } 386 387 void push_back_helper_impl( const T& value, /*is_copy_constructible = */std::true_type ) { 388 data.push_back(value); 389 } 390 391 void push_back_helper_impl( const T&, /*is_copy_constructible = */std::false_type ) { 392 __TBB_ASSERT(false, "error: calling tbb::concurrent_priority_queue.push(const value_type&) for move-only type"); 393 } 394 395 using aggregator_type = aggregator<functor, cpq_operation>; 396 397 aggregator_type my_aggregator; 398 // Padding added to avoid false sharing 399 char padding1[max_nfs_size - sizeof(aggregator_type)]; 400 // The point at which unsorted elements begin 401 size_type mark; 402 std::atomic<size_type> my_size; 403 Compare my_compare; 404 405 // Padding added to avoid false sharing 406 char padding2[max_nfs_size - (2*sizeof(size_type)) - sizeof(Compare)]; 407 //! Storage for the heap of elements in queue, plus unheapified elements 408 /** data has the following structure: 409 410 binary unheapified 411 heap elements 412 ____|_______|____ 413 | | | 414 v v v 415 [_|...|_|_|...|_| |...| ] 416 0 ^ ^ ^ 417 | | |__capacity 418 | |__my_size 419 |__mark 420 421 Thus, data stores the binary heap starting at position 0 through 422 mark-1 (it may be empty). Then there are 0 or more elements 423 that have not yet been inserted into the heap, in positions 424 mark through my_size-1. */ 425 426 using vector_type = std::vector<value_type, allocator_type>; 427 vector_type data; 428 429 template <typename Type, typename Comp, typename Alloc> 430 friend bool operator==( const concurrent_priority_queue<Type, Comp, Alloc>&, 431 const concurrent_priority_queue<Type, Comp, Alloc>& ); 432 }; // class concurrent_priority_queue 433 434 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT 435 template <typename It, 436 typename Comp = std::less<iterator_value_t<It>>, 437 typename Alloc = tbb::cache_aligned_allocator<iterator_value_t<It>>, 438 typename = std::enable_if_t<is_input_iterator_v<It>>, 439 typename = std::enable_if_t<is_allocator_v<Alloc>>, 440 typename = std::enable_if_t<!is_allocator_v<Comp>>> 441 concurrent_priority_queue( It, It, Comp = Comp(), Alloc = Alloc() ) 442 -> concurrent_priority_queue<iterator_value_t<It>, Comp, Alloc>; 443 444 template <typename It, typename Alloc, 445 typename = std::enable_if_t<is_input_iterator_v<It>>, 446 typename = std::enable_if_t<is_allocator_v<Alloc>>> 447 concurrent_priority_queue( It, It, Alloc ) 448 -> concurrent_priority_queue<iterator_value_t<It>, std::less<iterator_value_t<It>>, Alloc>; 449 450 template <typename T, 451 typename Comp = std::less<T>, 452 typename Alloc = tbb::cache_aligned_allocator<T>, 453 typename = std::enable_if_t<is_allocator_v<Alloc>>, 454 typename = std::enable_if_t<!is_allocator_v<Comp>>> 455 concurrent_priority_queue( std::initializer_list<T>, Comp = Comp(), Alloc = Alloc() ) 456 -> concurrent_priority_queue<T, Comp, Alloc>; 457 458 template <typename T, typename Alloc, 459 typename = std::enable_if_t<is_allocator_v<Alloc>>> 460 concurrent_priority_queue( std::initializer_list<T>, Alloc ) 461 -> concurrent_priority_queue<T, std::less<T>, Alloc>; 462 463 #endif // __TBB_CPP17_DEDUCTION_GUIDES_PRESENT 464 465 template <typename T, typename Compare, typename Allocator> 466 bool operator==( const concurrent_priority_queue<T, Compare, Allocator>& lhs, 467 const concurrent_priority_queue<T, Compare, Allocator>& rhs ) 468 { 469 return lhs.data == rhs.data; 470 } 471 472 template <typename T, typename Compare, typename Allocator> 473 bool operator!=( const concurrent_priority_queue<T, Compare, Allocator>& lhs, 474 const concurrent_priority_queue<T, Compare, Allocator>& rhs ) 475 { 476 return !(lhs == rhs); 477 } 478 479 template <typename T, typename Compare, typename Allocator> 480 void swap( concurrent_priority_queue<T, Compare, Allocator>& lhs, 481 concurrent_priority_queue<T, Compare, Allocator>& rhs ) 482 { 483 lhs.swap(rhs); 484 } 485 486 } // namespace d1 487 } // namespace detail 488 inline namespace v1 { 489 using detail::d1::concurrent_priority_queue; 490 491 } // inline namespace v1 492 } // namespace tbb 493 494 #endif // __TBB_concurrent_priority_queue_H 495