1 /* 2 Copyright (c) 2005-2021 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #ifndef __TBB_parallel_scan_H 18 #define __TBB_parallel_scan_H 19 20 #include <functional> 21 22 #include "detail/_config.h" 23 #include "detail/_namespace_injection.h" 24 #include "detail/_exception.h" 25 #include "detail/_task.h" 26 27 #include "profiling.h" 28 #include "partitioner.h" 29 #include "blocked_range.h" 30 #include "task_group.h" 31 32 namespace tbb { 33 namespace detail { 34 namespace d1 { 35 36 //! Used to indicate that the initial scan is being performed. 37 /** @ingroup algorithms */ 38 struct pre_scan_tag { 39 static bool is_final_scan() {return false;} 40 operator bool() {return is_final_scan();} 41 }; 42 43 //! Used to indicate that the final scan is being performed. 44 /** @ingroup algorithms */ 45 struct final_scan_tag { 46 static bool is_final_scan() {return true;} 47 operator bool() {return is_final_scan();} 48 }; 49 50 template<typename Range, typename Body> 51 struct sum_node; 52 53 #if __TBB_CPP20_CONCEPTS_PRESENT 54 } // namespace d1 55 namespace d0 { 56 57 template <typename Body, typename Range> 58 concept parallel_scan_body = splittable<Body> && 59 requires( Body& body, const Range& range, Body& other ) { 60 body(range, tbb::detail::d1::pre_scan_tag{}); 61 body(range, tbb::detail::d1::final_scan_tag{}); 62 body.reverse_join(other); 63 body.assign(other); 64 }; 65 66 template <typename Function, typename Range, typename Value> 67 concept parallel_scan_function = requires( const std::remove_reference_t<Function>& func, 68 const Range& range, const Value& value ) { 69 { func(range, value, true) } -> std::convertible_to<Value>; 70 }; 71 72 template <typename Combine, typename Value> 73 concept parallel_scan_combine = requires( const std::remove_reference_t<Combine>& combine, 74 const Value& lhs, const Value& rhs ) { 75 { combine(lhs, rhs) } -> std::convertible_to<Value>; 76 }; 77 78 } // namespace d0 79 namespace d1 { 80 #endif // __TBB_CPP20_CONCEPTS_PRESENT 81 82 //! Performs final scan for a leaf 83 /** @ingroup algorithms */ 84 template<typename Range, typename Body> 85 struct final_sum : public task { 86 private: 87 using sum_node_type = sum_node<Range, Body>; 88 Body m_body; 89 aligned_space<Range> m_range; 90 //! Where to put result of last subrange, or nullptr if not last subrange. 91 Body* m_stuff_last; 92 93 wait_context& m_wait_context; 94 sum_node_type* m_parent = nullptr; 95 public: 96 small_object_allocator m_allocator; 97 final_sum( Body& body, wait_context& w_o, small_object_allocator& alloc ) : 98 m_body(body, split()), m_wait_context(w_o), m_allocator(alloc) { 99 poison_pointer(m_stuff_last); 100 } 101 102 final_sum( final_sum& sum, small_object_allocator& alloc ) : 103 m_body(sum.m_body, split()), m_wait_context(sum.m_wait_context), m_allocator(alloc) { 104 poison_pointer(m_stuff_last); 105 } 106 107 ~final_sum() { 108 m_range.begin()->~Range(); 109 } 110 void finish_construction( sum_node_type* parent, const Range& range, Body* stuff_last ) { 111 __TBB_ASSERT( m_parent == nullptr, nullptr ); 112 m_parent = parent; 113 new( m_range.begin() ) Range(range); 114 m_stuff_last = stuff_last; 115 } 116 private: 117 sum_node_type* release_parent() { 118 call_itt_task_notify(releasing, m_parent); 119 if (m_parent) { 120 auto parent = m_parent; 121 m_parent = nullptr; 122 if (parent->ref_count.fetch_sub(1) == 1) { 123 return parent; 124 } 125 } 126 else 127 m_wait_context.release(); 128 return nullptr; 129 } 130 sum_node_type* finalize(const execution_data& ed){ 131 sum_node_type* next_task = release_parent(); 132 m_allocator.delete_object<final_sum>(this, ed); 133 return next_task; 134 } 135 136 public: 137 task* execute(execution_data& ed) override { 138 m_body( *m_range.begin(), final_scan_tag() ); 139 if( m_stuff_last ) 140 m_stuff_last->assign(m_body); 141 142 return finalize(ed); 143 } 144 task* cancel(execution_data& ed) override { 145 return finalize(ed); 146 } 147 template<typename Tag> 148 void operator()( const Range& r, Tag tag ) { 149 m_body( r, tag ); 150 } 151 void reverse_join( final_sum& a ) { 152 m_body.reverse_join(a.m_body); 153 } 154 void reverse_join( Body& body ) { 155 m_body.reverse_join(body); 156 } 157 void assign_to( Body& body ) { 158 body.assign(m_body); 159 } 160 void self_destroy(const execution_data& ed) { 161 m_allocator.delete_object<final_sum>(this, ed); 162 } 163 }; 164 165 //! Split work to be done in the scan. 166 /** @ingroup algorithms */ 167 template<typename Range, typename Body> 168 struct sum_node : public task { 169 private: 170 using final_sum_type = final_sum<Range,Body>; 171 public: 172 final_sum_type *m_incoming; 173 final_sum_type *m_body; 174 Body *m_stuff_last; 175 private: 176 final_sum_type *m_left_sum; 177 sum_node *m_left; 178 sum_node *m_right; 179 bool m_left_is_final; 180 Range m_range; 181 wait_context& m_wait_context; 182 sum_node* m_parent; 183 small_object_allocator m_allocator; 184 public: 185 std::atomic<unsigned int> ref_count{0}; 186 sum_node( const Range range, bool left_is_final_, sum_node* parent, wait_context& w_o, small_object_allocator& alloc ) : 187 m_stuff_last(nullptr), 188 m_left_sum(nullptr), 189 m_left(nullptr), 190 m_right(nullptr), 191 m_left_is_final(left_is_final_), 192 m_range(range), 193 m_wait_context(w_o), 194 m_parent(parent), 195 m_allocator(alloc) 196 { 197 if( m_parent ) 198 m_parent->ref_count.fetch_add(1); 199 // Poison fields that will be set by second pass. 200 poison_pointer(m_body); 201 poison_pointer(m_incoming); 202 } 203 204 ~sum_node() { 205 if (m_parent) 206 m_parent->ref_count.fetch_sub(1); 207 } 208 private: 209 sum_node* release_parent() { 210 call_itt_task_notify(releasing, m_parent); 211 if (m_parent) { 212 auto parent = m_parent; 213 m_parent = nullptr; 214 if (parent->ref_count.fetch_sub(1) == 1) { 215 return parent; 216 } 217 } 218 else 219 m_wait_context.release(); 220 return nullptr; 221 } 222 task* create_child( const Range& range, final_sum_type& body, sum_node* child, final_sum_type* incoming, Body* stuff_last ) { 223 if( child ) { 224 __TBB_ASSERT( is_poisoned(child->m_body) && is_poisoned(child->m_incoming), nullptr ); 225 child->prepare_for_execution(body, incoming, stuff_last); 226 return child; 227 } else { 228 body.finish_construction(this, range, stuff_last); 229 return &body; 230 } 231 } 232 233 sum_node* finalize(const execution_data& ed) { 234 sum_node* next_task = release_parent(); 235 m_allocator.delete_object<sum_node>(this, ed); 236 return next_task; 237 } 238 239 public: 240 void prepare_for_execution(final_sum_type& body, final_sum_type* incoming, Body *stuff_last) { 241 this->m_body = &body; 242 this->m_incoming = incoming; 243 this->m_stuff_last = stuff_last; 244 } 245 task* execute(execution_data& ed) override { 246 if( m_body ) { 247 if( m_incoming ) 248 m_left_sum->reverse_join( *m_incoming ); 249 task* right_child = this->create_child(Range(m_range,split()), *m_left_sum, m_right, m_left_sum, m_stuff_last); 250 task* left_child = m_left_is_final ? nullptr : this->create_child(m_range, *m_body, m_left, m_incoming, nullptr); 251 ref_count = (left_child != nullptr) + (right_child != nullptr); 252 m_body = nullptr; 253 if( left_child ) { 254 spawn(*right_child, *ed.context); 255 return left_child; 256 } else { 257 return right_child; 258 } 259 } else { 260 return finalize(ed); 261 } 262 } 263 task* cancel(execution_data& ed) override { 264 return finalize(ed); 265 } 266 void self_destroy(const execution_data& ed) { 267 m_allocator.delete_object<sum_node>(this, ed); 268 } 269 template<typename range,typename body,typename partitioner> 270 friend struct start_scan; 271 272 template<typename range,typename body> 273 friend struct finish_scan; 274 }; 275 276 //! Combine partial results 277 /** @ingroup algorithms */ 278 template<typename Range, typename Body> 279 struct finish_scan : public task { 280 private: 281 using sum_node_type = sum_node<Range,Body>; 282 using final_sum_type = final_sum<Range,Body>; 283 final_sum_type** const m_sum_slot; 284 sum_node_type*& m_return_slot; 285 small_object_allocator m_allocator; 286 public: 287 std::atomic<final_sum_type*> m_right_zombie; 288 sum_node_type& m_result; 289 std::atomic<unsigned int> ref_count{2}; 290 finish_scan* m_parent; 291 wait_context& m_wait_context; 292 task* execute(execution_data& ed) override { 293 __TBB_ASSERT( m_result.ref_count.load() == static_cast<unsigned int>((m_result.m_left!=nullptr)+(m_result.m_right!=nullptr)), nullptr ); 294 if( m_result.m_left ) 295 m_result.m_left_is_final = false; 296 final_sum_type* right_zombie = m_right_zombie.load(std::memory_order_acquire); 297 if( right_zombie && m_sum_slot ) 298 (*m_sum_slot)->reverse_join(*m_result.m_left_sum); 299 __TBB_ASSERT( !m_return_slot, nullptr ); 300 if( right_zombie || m_result.m_right ) { 301 m_return_slot = &m_result; 302 } else { 303 m_result.self_destroy(ed); 304 } 305 if( right_zombie && !m_sum_slot && !m_result.m_right ) { 306 right_zombie->self_destroy(ed); 307 m_right_zombie.store(nullptr, std::memory_order_relaxed); 308 } 309 return finalize(ed); 310 } 311 task* cancel(execution_data& ed) override { 312 return finalize(ed); 313 } 314 finish_scan(sum_node_type*& return_slot, final_sum_type** sum, sum_node_type& result_, finish_scan* parent, wait_context& w_o, small_object_allocator& alloc) : 315 m_sum_slot(sum), 316 m_return_slot(return_slot), 317 m_allocator(alloc), 318 m_right_zombie(nullptr), 319 m_result(result_), 320 m_parent(parent), 321 m_wait_context(w_o) 322 { 323 __TBB_ASSERT( !m_return_slot, nullptr ); 324 } 325 private: 326 finish_scan* release_parent() { 327 call_itt_task_notify(releasing, m_parent); 328 if (m_parent) { 329 auto parent = m_parent; 330 m_parent = nullptr; 331 if (parent->ref_count.fetch_sub(1) == 1) { 332 return parent; 333 } 334 } 335 else 336 m_wait_context.release(); 337 return nullptr; 338 } 339 finish_scan* finalize(const execution_data& ed) { 340 finish_scan* next_task = release_parent(); 341 m_allocator.delete_object<finish_scan>(this, ed); 342 return next_task; 343 } 344 }; 345 346 //! Initial task to split the work 347 /** @ingroup algorithms */ 348 template<typename Range, typename Body, typename Partitioner> 349 struct start_scan : public task { 350 private: 351 using sum_node_type = sum_node<Range,Body>; 352 using final_sum_type = final_sum<Range,Body>; 353 using finish_pass1_type = finish_scan<Range,Body>; 354 std::reference_wrapper<sum_node_type*> m_return_slot; 355 Range m_range; 356 std::reference_wrapper<final_sum_type> m_body; 357 typename Partitioner::partition_type m_partition; 358 /** Non-null if caller is requesting total. */ 359 final_sum_type** m_sum_slot; 360 bool m_is_final; 361 bool m_is_right_child; 362 363 finish_pass1_type* m_parent; 364 small_object_allocator m_allocator; 365 wait_context& m_wait_context; 366 367 finish_pass1_type* release_parent() { 368 call_itt_task_notify(releasing, m_parent); 369 if (m_parent) { 370 auto parent = m_parent; 371 m_parent = nullptr; 372 if (parent->ref_count.fetch_sub(1) == 1) { 373 return parent; 374 } 375 } 376 else 377 m_wait_context.release(); 378 return nullptr; 379 } 380 381 finish_pass1_type* finalize( const execution_data& ed ) { 382 finish_pass1_type* next_task = release_parent(); 383 m_allocator.delete_object<start_scan>(this, ed); 384 return next_task; 385 } 386 387 public: 388 task* execute( execution_data& ) override; 389 task* cancel( execution_data& ed ) override { 390 return finalize(ed); 391 } 392 start_scan( sum_node_type*& return_slot, start_scan& parent, small_object_allocator& alloc ) : 393 m_return_slot(return_slot), 394 m_range(parent.m_range,split()), 395 m_body(parent.m_body), 396 m_partition(parent.m_partition,split()), 397 m_sum_slot(parent.m_sum_slot), 398 m_is_final(parent.m_is_final), 399 m_is_right_child(true), 400 m_parent(parent.m_parent), 401 m_allocator(alloc), 402 m_wait_context(parent.m_wait_context) 403 { 404 __TBB_ASSERT( !m_return_slot, nullptr ); 405 parent.m_is_right_child = false; 406 } 407 408 start_scan( sum_node_type*& return_slot, const Range& range, final_sum_type& body, const Partitioner& partitioner, wait_context& w_o, small_object_allocator& alloc ) : 409 m_return_slot(return_slot), 410 m_range(range), 411 m_body(body), 412 m_partition(partitioner), 413 m_sum_slot(nullptr), 414 m_is_final(true), 415 m_is_right_child(false), 416 m_parent(nullptr), 417 m_allocator(alloc), 418 m_wait_context(w_o) 419 { 420 __TBB_ASSERT( !m_return_slot, nullptr ); 421 } 422 423 static void run( const Range& range, Body& body, const Partitioner& partitioner ) { 424 if( !range.empty() ) { 425 task_group_context context(PARALLEL_SCAN); 426 427 using start_pass1_type = start_scan<Range,Body,Partitioner>; 428 sum_node_type* root = nullptr; 429 wait_context w_ctx{1}; 430 small_object_allocator alloc{}; 431 432 auto& temp_body = *alloc.new_object<final_sum_type>(body, w_ctx, alloc); 433 temp_body.reverse_join(body); 434 435 auto& pass1 = *alloc.new_object<start_pass1_type>(/*m_return_slot=*/root, range, temp_body, partitioner, w_ctx, alloc); 436 437 execute_and_wait(pass1, context, w_ctx, context); 438 if( root ) { 439 root->prepare_for_execution(temp_body, nullptr, &body); 440 w_ctx.reserve(); 441 execute_and_wait(*root, context, w_ctx, context); 442 } else { 443 temp_body.assign_to(body); 444 temp_body.finish_construction(nullptr, range, nullptr); 445 alloc.delete_object<final_sum_type>(&temp_body); 446 } 447 } 448 } 449 }; 450 451 template<typename Range, typename Body, typename Partitioner> 452 task* start_scan<Range,Body,Partitioner>::execute( execution_data& ed ) { 453 // Inspecting m_parent->result.left_sum would ordinarily be a race condition. 454 // But we inspect it only if we are not a stolen task, in which case we 455 // know that task assigning to m_parent->result.left_sum has completed. 456 __TBB_ASSERT(!m_is_right_child || m_parent, "right child is never an orphan"); 457 bool treat_as_stolen = m_is_right_child && (is_stolen(ed) || &m_body.get()!=m_parent->m_result.m_left_sum); 458 if( treat_as_stolen ) { 459 // Invocation is for right child that has been really stolen or needs to be virtually stolen 460 small_object_allocator alloc{}; 461 final_sum_type* right_zombie = alloc.new_object<final_sum_type>(m_body, alloc); 462 m_parent->m_right_zombie.store(right_zombie, std::memory_order_release); 463 m_body = *right_zombie; 464 m_is_final = false; 465 } 466 task* next_task = nullptr; 467 if( (m_is_right_child && !treat_as_stolen) || !m_range.is_divisible() || m_partition.should_execute_range(ed) ) { 468 if( m_is_final ) 469 m_body(m_range, final_scan_tag()); 470 else if( m_sum_slot ) 471 m_body(m_range, pre_scan_tag()); 472 if( m_sum_slot ) 473 *m_sum_slot = &m_body.get(); 474 __TBB_ASSERT( !m_return_slot, nullptr ); 475 476 next_task = finalize(ed); 477 } else { 478 small_object_allocator alloc{}; 479 auto result = alloc.new_object<sum_node_type>(m_range,/*m_left_is_final=*/m_is_final, m_parent? &m_parent->m_result: nullptr, m_wait_context, alloc); 480 481 auto new_parent = alloc.new_object<finish_pass1_type>(m_return_slot, m_sum_slot, *result, m_parent, m_wait_context, alloc); 482 m_parent = new_parent; 483 484 // Split off right child 485 auto& right_child = *alloc.new_object<start_scan>(/*m_return_slot=*/result->m_right, *this, alloc); 486 487 spawn(right_child, *ed.context); 488 489 m_sum_slot = &result->m_left_sum; 490 m_return_slot = result->m_left; 491 492 __TBB_ASSERT( !m_return_slot, nullptr ); 493 next_task = this; 494 } 495 return next_task; 496 } 497 498 template<typename Range, typename Value, typename Scan, typename ReverseJoin> 499 class lambda_scan_body { 500 Value m_sum_slot; 501 const Value& identity_element; 502 const Scan& m_scan; 503 const ReverseJoin& m_reverse_join; 504 public: 505 void operator=(const lambda_scan_body&) = delete; 506 lambda_scan_body(const lambda_scan_body&) = default; 507 508 lambda_scan_body( const Value& identity, const Scan& scan, const ReverseJoin& rev_join ) 509 : m_sum_slot(identity) 510 , identity_element(identity) 511 , m_scan(scan) 512 , m_reverse_join(rev_join) {} 513 514 lambda_scan_body( lambda_scan_body& b, split ) 515 : m_sum_slot(b.identity_element) 516 , identity_element(b.identity_element) 517 , m_scan(b.m_scan) 518 , m_reverse_join(b.m_reverse_join) {} 519 520 template<typename Tag> 521 void operator()( const Range& r, Tag tag ) { 522 m_sum_slot = m_scan(r, m_sum_slot, tag); 523 } 524 525 void reverse_join( lambda_scan_body& a ) { 526 m_sum_slot = m_reverse_join(a.m_sum_slot, m_sum_slot); 527 } 528 529 void assign( lambda_scan_body& b ) { 530 m_sum_slot = b.m_sum_slot; 531 } 532 533 Value result() const { 534 return m_sum_slot; 535 } 536 }; 537 538 // Requirements on Range concept are documented in blocked_range.h 539 540 /** \page parallel_scan_body_req Requirements on parallel_scan body 541 Class \c Body implementing the concept of parallel_scan body must define: 542 - \code Body::Body( Body&, split ); \endcode Splitting constructor. 543 Split \c b so that \c this and \c b can accumulate separately 544 - \code Body::~Body(); \endcode Destructor 545 - \code void Body::operator()( const Range& r, pre_scan_tag ); \endcode 546 Preprocess iterations for range \c r 547 - \code void Body::operator()( const Range& r, final_scan_tag ); \endcode 548 Do final processing for iterations of range \c r 549 - \code void Body::reverse_join( Body& a ); \endcode 550 Merge preprocessing state of \c a into \c this, where \c a was 551 created earlier from \c b by b's splitting constructor 552 **/ 553 554 /** \name parallel_scan 555 See also requirements on \ref range_req "Range" and \ref parallel_scan_body_req "parallel_scan Body". **/ 556 //@{ 557 558 //! Parallel prefix with default partitioner 559 /** @ingroup algorithms **/ 560 template<typename Range, typename Body> 561 __TBB_requires(tbb_range<Range> && parallel_scan_body<Body, Range>) 562 void parallel_scan( const Range& range, Body& body ) { 563 start_scan<Range, Body, auto_partitioner>::run(range,body,__TBB_DEFAULT_PARTITIONER()); 564 } 565 566 //! Parallel prefix with simple_partitioner 567 /** @ingroup algorithms **/ 568 template<typename Range, typename Body> 569 __TBB_requires(tbb_range<Range> && parallel_scan_body<Body, Range>) 570 void parallel_scan( const Range& range, Body& body, const simple_partitioner& partitioner ) { 571 start_scan<Range, Body, simple_partitioner>::run(range, body, partitioner); 572 } 573 574 //! Parallel prefix with auto_partitioner 575 /** @ingroup algorithms **/ 576 template<typename Range, typename Body> 577 __TBB_requires(tbb_range<Range> && parallel_scan_body<Body, Range>) 578 void parallel_scan( const Range& range, Body& body, const auto_partitioner& partitioner ) { 579 start_scan<Range,Body,auto_partitioner>::run(range, body, partitioner); 580 } 581 582 //! Parallel prefix with default partitioner 583 /** @ingroup algorithms **/ 584 template<typename Range, typename Value, typename Scan, typename ReverseJoin> 585 __TBB_requires(tbb_range<Range> && parallel_scan_function<Scan, Range, Value> && 586 parallel_scan_combine<ReverseJoin, Value>) 587 Value parallel_scan( const Range& range, const Value& identity, const Scan& scan, const ReverseJoin& reverse_join ) { 588 lambda_scan_body<Range, Value, Scan, ReverseJoin> body(identity, scan, reverse_join); 589 parallel_scan(range, body, __TBB_DEFAULT_PARTITIONER()); 590 return body.result(); 591 } 592 593 //! Parallel prefix with simple_partitioner 594 /** @ingroup algorithms **/ 595 template<typename Range, typename Value, typename Scan, typename ReverseJoin> 596 __TBB_requires(tbb_range<Range> && parallel_scan_function<Scan, Range, Value> && 597 parallel_scan_combine<ReverseJoin, Value>) 598 Value parallel_scan( const Range& range, const Value& identity, const Scan& scan, const ReverseJoin& reverse_join, 599 const simple_partitioner& partitioner ) { 600 lambda_scan_body<Range, Value, Scan, ReverseJoin> body(identity, scan, reverse_join); 601 parallel_scan(range, body, partitioner); 602 return body.result(); 603 } 604 605 //! Parallel prefix with auto_partitioner 606 /** @ingroup algorithms **/ 607 template<typename Range, typename Value, typename Scan, typename ReverseJoin> 608 __TBB_requires(tbb_range<Range> && parallel_scan_function<Scan, Range, Value> && 609 parallel_scan_combine<ReverseJoin, Value>) 610 Value parallel_scan( const Range& range, const Value& identity, const Scan& scan, const ReverseJoin& reverse_join, 611 const auto_partitioner& partitioner ) { 612 lambda_scan_body<Range, Value, Scan, ReverseJoin> body(identity, scan, reverse_join); 613 parallel_scan(range, body, partitioner); 614 return body.result(); 615 } 616 617 } // namespace d1 618 } // namespace detail 619 620 inline namespace v1 { 621 using detail::d1::parallel_scan; 622 using detail::d1::pre_scan_tag; 623 using detail::d1::final_scan_tag; 624 } // namespace v1 625 626 } // namespace tbb 627 628 #endif /* __TBB_parallel_scan_H */ 629 630