xref: /oneTBB/include/oneapi/tbb/parallel_scan.h (revision cf95d2cc)
1 /*
2     Copyright (c) 2005-2021 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #ifndef __TBB_parallel_scan_H
18 #define __TBB_parallel_scan_H
19 
20 #include <functional>
21 
22 #include "detail/_config.h"
23 #include "detail/_namespace_injection.h"
24 #include "detail/_exception.h"
25 #include "detail/_task.h"
26 
27 #include "profiling.h"
28 #include "partitioner.h"
29 #include "blocked_range.h"
30 #include "task_group.h"
31 
32 namespace tbb {
33 namespace detail {
34 namespace d1 {
35 
36 //! Used to indicate that the initial scan is being performed.
37 /** @ingroup algorithms */
38 struct pre_scan_tag {
39     static bool is_final_scan() {return false;}
40     operator bool() {return is_final_scan();}
41 };
42 
43 //! Used to indicate that the final scan is being performed.
44 /** @ingroup algorithms */
45 struct final_scan_tag {
46     static bool is_final_scan() {return true;}
47     operator bool() {return is_final_scan();}
48 };
49 
50 template<typename Range, typename Body>
51 struct sum_node;
52 
53 #if __TBB_CPP20_CONCEPTS_PRESENT
54 } // namespace d1
55 namespace d0 {
56 
57 template <typename Body, typename Range>
58 concept parallel_scan_body = splittable<Body> &&
59                              requires( Body& body, const Range& range, Body& other ) {
60                                  body(range, tbb::detail::d1::pre_scan_tag{});
61                                  body(range, tbb::detail::d1::final_scan_tag{});
62                                  body.reverse_join(other);
63                                  body.assign(other);
64                              };
65 
66 template <typename Function, typename Range, typename Value>
67 concept parallel_scan_function = requires( const std::remove_reference_t<Function>& func,
68                                            const Range& range, const Value& value ) {
69     { func(range, value, true) } -> std::convertible_to<Value>;
70 };
71 
72 template <typename Combine, typename Value>
73 concept parallel_scan_combine = requires( const std::remove_reference_t<Combine>& combine,
74                                           const Value& lhs, const Value& rhs ) {
75     { combine(lhs, rhs) } -> std::convertible_to<Value>;
76 };
77 
78 } // namespace d0
79 namespace d1 {
80 #endif // __TBB_CPP20_CONCEPTS_PRESENT
81 
82 //! Performs final scan for a leaf
83 /** @ingroup algorithms */
84 template<typename Range, typename Body>
85 struct final_sum : public task {
86 private:
87     using sum_node_type = sum_node<Range, Body>;
88     Body m_body;
89     aligned_space<Range> m_range;
90     //! Where to put result of last subrange, or nullptr if not last subrange.
91     Body* m_stuff_last;
92 
93     wait_context& m_wait_context;
94     sum_node_type* m_parent = nullptr;
95 public:
96     small_object_allocator m_allocator;
97     final_sum( Body& body, wait_context& w_o, small_object_allocator& alloc ) :
98         m_body(body, split()), m_wait_context(w_o), m_allocator(alloc) {
99         poison_pointer(m_stuff_last);
100     }
101 
102     final_sum( final_sum& sum, small_object_allocator& alloc ) :
103         m_body(sum.m_body, split()), m_wait_context(sum.m_wait_context), m_allocator(alloc) {
104         poison_pointer(m_stuff_last);
105     }
106 
107     ~final_sum() {
108         m_range.begin()->~Range();
109     }
110     void finish_construction( sum_node_type* parent, const Range& range, Body* stuff_last ) {
111         __TBB_ASSERT( m_parent == nullptr, nullptr );
112         m_parent = parent;
113         new( m_range.begin() ) Range(range);
114         m_stuff_last = stuff_last;
115     }
116 private:
117     sum_node_type* release_parent() {
118         call_itt_task_notify(releasing, m_parent);
119         if (m_parent) {
120             auto parent = m_parent;
121             m_parent = nullptr;
122             if (parent->ref_count.fetch_sub(1) == 1) {
123                 return parent;
124             }
125         }
126         else
127             m_wait_context.release();
128         return nullptr;
129     }
130     sum_node_type* finalize(const execution_data& ed){
131         sum_node_type* next_task = release_parent();
132         m_allocator.delete_object<final_sum>(this, ed);
133         return next_task;
134     }
135 
136 public:
137     task* execute(execution_data& ed) override {
138         m_body( *m_range.begin(), final_scan_tag() );
139         if( m_stuff_last )
140             m_stuff_last->assign(m_body);
141 
142         return finalize(ed);
143     }
144     task* cancel(execution_data& ed) override {
145         return finalize(ed);
146     }
147     template<typename Tag>
148     void operator()( const Range& r, Tag tag ) {
149         m_body( r, tag );
150     }
151     void reverse_join( final_sum& a ) {
152         m_body.reverse_join(a.m_body);
153     }
154     void reverse_join( Body& body ) {
155         m_body.reverse_join(body);
156     }
157     void assign_to( Body& body ) {
158         body.assign(m_body);
159     }
160     void self_destroy(const execution_data& ed) {
161         m_allocator.delete_object<final_sum>(this, ed);
162     }
163 };
164 
165 //! Split work to be done in the scan.
166 /** @ingroup algorithms */
167 template<typename Range, typename Body>
168 struct sum_node : public task {
169 private:
170     using final_sum_type = final_sum<Range,Body>;
171 public:
172     final_sum_type *m_incoming;
173     final_sum_type *m_body;
174     Body *m_stuff_last;
175 private:
176     final_sum_type *m_left_sum;
177     sum_node *m_left;
178     sum_node *m_right;
179     bool m_left_is_final;
180     Range m_range;
181     wait_context& m_wait_context;
182     sum_node* m_parent;
183     small_object_allocator m_allocator;
184 public:
185     std::atomic<unsigned int> ref_count{0};
186     sum_node( const Range range, bool left_is_final_, sum_node* parent, wait_context& w_o, small_object_allocator& alloc ) :
187         m_stuff_last(nullptr),
188         m_left_sum(nullptr),
189         m_left(nullptr),
190         m_right(nullptr),
191         m_left_is_final(left_is_final_),
192         m_range(range),
193         m_wait_context(w_o),
194         m_parent(parent),
195         m_allocator(alloc)
196     {
197         if( m_parent )
198             m_parent->ref_count.fetch_add(1);
199         // Poison fields that will be set by second pass.
200         poison_pointer(m_body);
201         poison_pointer(m_incoming);
202     }
203 
204     ~sum_node() {
205         if (m_parent)
206             m_parent->ref_count.fetch_sub(1);
207     }
208 private:
209     sum_node* release_parent() {
210         call_itt_task_notify(releasing, m_parent);
211         if (m_parent) {
212             auto parent = m_parent;
213             m_parent = nullptr;
214             if (parent->ref_count.fetch_sub(1) == 1) {
215                 return parent;
216             }
217         }
218         else
219             m_wait_context.release();
220         return nullptr;
221     }
222     task* create_child( const Range& range, final_sum_type& body, sum_node* child, final_sum_type* incoming, Body* stuff_last ) {
223         if( child ) {
224             __TBB_ASSERT( is_poisoned(child->m_body) && is_poisoned(child->m_incoming), nullptr );
225             child->prepare_for_execution(body, incoming, stuff_last);
226             return child;
227         } else {
228             body.finish_construction(this, range, stuff_last);
229             return &body;
230         }
231     }
232 
233     sum_node* finalize(const execution_data& ed) {
234         sum_node* next_task = release_parent();
235         m_allocator.delete_object<sum_node>(this, ed);
236         return next_task;
237     }
238 
239 public:
240     void prepare_for_execution(final_sum_type& body, final_sum_type* incoming, Body *stuff_last) {
241         this->m_body = &body;
242         this->m_incoming = incoming;
243         this->m_stuff_last = stuff_last;
244     }
245     task* execute(execution_data& ed) override {
246         if( m_body ) {
247             if( m_incoming )
248                 m_left_sum->reverse_join( *m_incoming );
249             task* right_child = this->create_child(Range(m_range,split()), *m_left_sum, m_right, m_left_sum, m_stuff_last);
250             task* left_child = m_left_is_final ? nullptr : this->create_child(m_range, *m_body, m_left, m_incoming, nullptr);
251             ref_count = (left_child != nullptr) + (right_child != nullptr);
252             m_body = nullptr;
253             if( left_child ) {
254                 spawn(*right_child, *ed.context);
255                 return left_child;
256             } else {
257                 return right_child;
258             }
259         } else {
260             return finalize(ed);
261         }
262     }
263     task* cancel(execution_data& ed) override {
264         return finalize(ed);
265     }
266     void self_destroy(const execution_data& ed) {
267         m_allocator.delete_object<sum_node>(this, ed);
268     }
269     template<typename range,typename body,typename partitioner>
270     friend struct start_scan;
271 
272     template<typename range,typename body>
273     friend struct finish_scan;
274 };
275 
276 //! Combine partial results
277 /** @ingroup algorithms */
278 template<typename Range, typename Body>
279 struct finish_scan : public task {
280 private:
281     using sum_node_type = sum_node<Range,Body>;
282     using final_sum_type = final_sum<Range,Body>;
283     final_sum_type** const m_sum_slot;
284     sum_node_type*& m_return_slot;
285     small_object_allocator m_allocator;
286 public:
287     std::atomic<final_sum_type*> m_right_zombie;
288     sum_node_type& m_result;
289     std::atomic<unsigned int> ref_count{2};
290     finish_scan*  m_parent;
291     wait_context& m_wait_context;
292     task* execute(execution_data& ed) override {
293         __TBB_ASSERT( m_result.ref_count.load() == static_cast<unsigned int>((m_result.m_left!=nullptr)+(m_result.m_right!=nullptr)), nullptr );
294         if( m_result.m_left )
295             m_result.m_left_is_final = false;
296         final_sum_type* right_zombie = m_right_zombie.load(std::memory_order_acquire);
297         if( right_zombie && m_sum_slot )
298             (*m_sum_slot)->reverse_join(*m_result.m_left_sum);
299         __TBB_ASSERT( !m_return_slot, nullptr );
300         if( right_zombie || m_result.m_right ) {
301             m_return_slot = &m_result;
302         } else {
303             m_result.self_destroy(ed);
304         }
305         if( right_zombie && !m_sum_slot && !m_result.m_right ) {
306             right_zombie->self_destroy(ed);
307             m_right_zombie.store(nullptr, std::memory_order_relaxed);
308         }
309         return finalize(ed);
310     }
311     task* cancel(execution_data& ed) override {
312         return finalize(ed);
313     }
314     finish_scan(sum_node_type*& return_slot, final_sum_type** sum, sum_node_type& result_, finish_scan* parent, wait_context& w_o, small_object_allocator& alloc) :
315         m_sum_slot(sum),
316         m_return_slot(return_slot),
317         m_allocator(alloc),
318         m_right_zombie(nullptr),
319         m_result(result_),
320         m_parent(parent),
321         m_wait_context(w_o)
322     {
323         __TBB_ASSERT( !m_return_slot, nullptr );
324     }
325 private:
326     finish_scan* release_parent() {
327         call_itt_task_notify(releasing, m_parent);
328         if (m_parent) {
329             auto parent = m_parent;
330             m_parent = nullptr;
331             if (parent->ref_count.fetch_sub(1) == 1) {
332                 return parent;
333             }
334         }
335         else
336             m_wait_context.release();
337         return nullptr;
338     }
339     finish_scan* finalize(const execution_data& ed) {
340         finish_scan* next_task = release_parent();
341         m_allocator.delete_object<finish_scan>(this, ed);
342         return next_task;
343     }
344 };
345 
346 //! Initial task to split the work
347 /** @ingroup algorithms */
348 template<typename Range, typename Body, typename Partitioner>
349 struct start_scan : public task {
350 private:
351     using sum_node_type = sum_node<Range,Body>;
352     using final_sum_type = final_sum<Range,Body>;
353     using finish_pass1_type = finish_scan<Range,Body>;
354     std::reference_wrapper<sum_node_type*> m_return_slot;
355     Range m_range;
356     std::reference_wrapper<final_sum_type> m_body;
357     typename Partitioner::partition_type m_partition;
358     /** Non-null if caller is requesting total. */
359     final_sum_type** m_sum_slot;
360     bool m_is_final;
361     bool m_is_right_child;
362 
363     finish_pass1_type*  m_parent;
364     small_object_allocator m_allocator;
365     wait_context& m_wait_context;
366 
367     finish_pass1_type* release_parent() {
368         call_itt_task_notify(releasing, m_parent);
369         if (m_parent) {
370             auto parent = m_parent;
371             m_parent = nullptr;
372             if (parent->ref_count.fetch_sub(1) == 1) {
373                 return parent;
374             }
375         }
376         else
377             m_wait_context.release();
378         return nullptr;
379     }
380 
381     finish_pass1_type* finalize( const execution_data& ed ) {
382         finish_pass1_type* next_task = release_parent();
383         m_allocator.delete_object<start_scan>(this, ed);
384         return next_task;
385     }
386 
387 public:
388     task* execute( execution_data& ) override;
389     task* cancel( execution_data& ed ) override {
390         return finalize(ed);
391     }
392     start_scan( sum_node_type*& return_slot, start_scan& parent, small_object_allocator& alloc ) :
393         m_return_slot(return_slot),
394         m_range(parent.m_range,split()),
395         m_body(parent.m_body),
396         m_partition(parent.m_partition,split()),
397         m_sum_slot(parent.m_sum_slot),
398         m_is_final(parent.m_is_final),
399         m_is_right_child(true),
400         m_parent(parent.m_parent),
401         m_allocator(alloc),
402         m_wait_context(parent.m_wait_context)
403     {
404         __TBB_ASSERT( !m_return_slot, nullptr );
405         parent.m_is_right_child = false;
406     }
407 
408     start_scan( sum_node_type*& return_slot, const Range& range, final_sum_type& body, const Partitioner& partitioner, wait_context& w_o, small_object_allocator& alloc ) :
409         m_return_slot(return_slot),
410         m_range(range),
411         m_body(body),
412         m_partition(partitioner),
413         m_sum_slot(nullptr),
414         m_is_final(true),
415         m_is_right_child(false),
416         m_parent(nullptr),
417         m_allocator(alloc),
418         m_wait_context(w_o)
419     {
420         __TBB_ASSERT( !m_return_slot, nullptr );
421     }
422 
423     static void run( const Range& range, Body& body, const Partitioner& partitioner ) {
424         if( !range.empty() ) {
425             task_group_context context(PARALLEL_SCAN);
426 
427             using start_pass1_type = start_scan<Range,Body,Partitioner>;
428             sum_node_type* root = nullptr;
429             wait_context w_ctx{1};
430             small_object_allocator alloc{};
431 
432             auto& temp_body = *alloc.new_object<final_sum_type>(body, w_ctx, alloc);
433             temp_body.reverse_join(body);
434 
435             auto& pass1 = *alloc.new_object<start_pass1_type>(/*m_return_slot=*/root, range, temp_body, partitioner, w_ctx, alloc);
436 
437             execute_and_wait(pass1, context, w_ctx, context);
438             if( root ) {
439                 root->prepare_for_execution(temp_body, nullptr, &body);
440                 w_ctx.reserve();
441                 execute_and_wait(*root, context, w_ctx, context);
442             } else {
443                 temp_body.assign_to(body);
444                 temp_body.finish_construction(nullptr, range, nullptr);
445                 alloc.delete_object<final_sum_type>(&temp_body);
446             }
447         }
448     }
449 };
450 
451 template<typename Range, typename Body, typename Partitioner>
452 task* start_scan<Range,Body,Partitioner>::execute( execution_data& ed ) {
453     // Inspecting m_parent->result.left_sum would ordinarily be a race condition.
454     // But we inspect it only if we are not a stolen task, in which case we
455     // know that task assigning to m_parent->result.left_sum has completed.
456     __TBB_ASSERT(!m_is_right_child || m_parent, "right child is never an orphan");
457     bool treat_as_stolen = m_is_right_child && (is_stolen(ed) || &m_body.get()!=m_parent->m_result.m_left_sum);
458     if( treat_as_stolen ) {
459         // Invocation is for right child that has been really stolen or needs to be virtually stolen
460         small_object_allocator alloc{};
461         final_sum_type* right_zombie = alloc.new_object<final_sum_type>(m_body, alloc);
462         m_parent->m_right_zombie.store(right_zombie, std::memory_order_release);
463         m_body = *right_zombie;
464         m_is_final = false;
465     }
466     task* next_task = nullptr;
467     if( (m_is_right_child && !treat_as_stolen) || !m_range.is_divisible() || m_partition.should_execute_range(ed) ) {
468         if( m_is_final )
469             m_body(m_range, final_scan_tag());
470         else if( m_sum_slot )
471             m_body(m_range, pre_scan_tag());
472         if( m_sum_slot )
473             *m_sum_slot = &m_body.get();
474         __TBB_ASSERT( !m_return_slot, nullptr );
475 
476         next_task = finalize(ed);
477     } else {
478         small_object_allocator alloc{};
479         auto result = alloc.new_object<sum_node_type>(m_range,/*m_left_is_final=*/m_is_final, m_parent? &m_parent->m_result: nullptr, m_wait_context, alloc);
480 
481         auto new_parent = alloc.new_object<finish_pass1_type>(m_return_slot, m_sum_slot, *result, m_parent, m_wait_context, alloc);
482         m_parent = new_parent;
483 
484         // Split off right child
485         auto& right_child = *alloc.new_object<start_scan>(/*m_return_slot=*/result->m_right, *this, alloc);
486 
487         spawn(right_child, *ed.context);
488 
489         m_sum_slot = &result->m_left_sum;
490         m_return_slot = result->m_left;
491 
492         __TBB_ASSERT( !m_return_slot, nullptr );
493         next_task = this;
494     }
495     return next_task;
496 }
497 
498 template<typename Range, typename Value, typename Scan, typename ReverseJoin>
499 class lambda_scan_body {
500     Value               m_sum_slot;
501     const Value&        identity_element;
502     const Scan&         m_scan;
503     const ReverseJoin&  m_reverse_join;
504 public:
505     void operator=(const lambda_scan_body&) = delete;
506     lambda_scan_body(const lambda_scan_body&) = default;
507 
508     lambda_scan_body( const Value& identity, const Scan& scan, const ReverseJoin& rev_join )
509         : m_sum_slot(identity)
510         , identity_element(identity)
511         , m_scan(scan)
512         , m_reverse_join(rev_join) {}
513 
514     lambda_scan_body( lambda_scan_body& b, split )
515         : m_sum_slot(b.identity_element)
516         , identity_element(b.identity_element)
517         , m_scan(b.m_scan)
518         , m_reverse_join(b.m_reverse_join) {}
519 
520     template<typename Tag>
521     void operator()( const Range& r, Tag tag ) {
522         m_sum_slot = m_scan(r, m_sum_slot, tag);
523     }
524 
525     void reverse_join( lambda_scan_body& a ) {
526         m_sum_slot = m_reverse_join(a.m_sum_slot, m_sum_slot);
527     }
528 
529     void assign( lambda_scan_body& b ) {
530         m_sum_slot = b.m_sum_slot;
531     }
532 
533     Value result() const {
534         return m_sum_slot;
535     }
536 };
537 
538 // Requirements on Range concept are documented in blocked_range.h
539 
540 /** \page parallel_scan_body_req Requirements on parallel_scan body
541     Class \c Body implementing the concept of parallel_scan body must define:
542     - \code Body::Body( Body&, split ); \endcode    Splitting constructor.
543                                                     Split \c b so that \c this and \c b can accumulate separately
544     - \code Body::~Body(); \endcode                 Destructor
545     - \code void Body::operator()( const Range& r, pre_scan_tag ); \endcode
546                                                     Preprocess iterations for range \c r
547     - \code void Body::operator()( const Range& r, final_scan_tag ); \endcode
548                                                     Do final processing for iterations of range \c r
549     - \code void Body::reverse_join( Body& a ); \endcode
550                                                     Merge preprocessing state of \c a into \c this, where \c a was
551                                                     created earlier from \c b by b's splitting constructor
552 **/
553 
554 /** \name parallel_scan
555     See also requirements on \ref range_req "Range" and \ref parallel_scan_body_req "parallel_scan Body". **/
556 //@{
557 
558 //! Parallel prefix with default partitioner
559 /** @ingroup algorithms **/
560 template<typename Range, typename Body>
561     __TBB_requires(tbb_range<Range> && parallel_scan_body<Body, Range>)
562 void parallel_scan( const Range& range, Body& body ) {
563     start_scan<Range, Body, auto_partitioner>::run(range,body,__TBB_DEFAULT_PARTITIONER());
564 }
565 
566 //! Parallel prefix with simple_partitioner
567 /** @ingroup algorithms **/
568 template<typename Range, typename Body>
569     __TBB_requires(tbb_range<Range> && parallel_scan_body<Body, Range>)
570 void parallel_scan( const Range& range, Body& body, const simple_partitioner& partitioner ) {
571     start_scan<Range, Body, simple_partitioner>::run(range, body, partitioner);
572 }
573 
574 //! Parallel prefix with auto_partitioner
575 /** @ingroup algorithms **/
576 template<typename Range, typename Body>
577     __TBB_requires(tbb_range<Range> && parallel_scan_body<Body, Range>)
578 void parallel_scan( const Range& range, Body& body, const auto_partitioner& partitioner ) {
579     start_scan<Range,Body,auto_partitioner>::run(range, body, partitioner);
580 }
581 
582 //! Parallel prefix with default partitioner
583 /** @ingroup algorithms **/
584 template<typename Range, typename Value, typename Scan, typename ReverseJoin>
585     __TBB_requires(tbb_range<Range> && parallel_scan_function<Scan, Range, Value> &&
586                    parallel_scan_combine<ReverseJoin, Value>)
587 Value parallel_scan( const Range& range, const Value& identity, const Scan& scan, const ReverseJoin& reverse_join ) {
588     lambda_scan_body<Range, Value, Scan, ReverseJoin> body(identity, scan, reverse_join);
589     parallel_scan(range, body, __TBB_DEFAULT_PARTITIONER());
590     return body.result();
591 }
592 
593 //! Parallel prefix with simple_partitioner
594 /** @ingroup algorithms **/
595 template<typename Range, typename Value, typename Scan, typename ReverseJoin>
596     __TBB_requires(tbb_range<Range> && parallel_scan_function<Scan, Range, Value> &&
597                    parallel_scan_combine<ReverseJoin, Value>)
598 Value parallel_scan( const Range& range, const Value& identity, const Scan& scan, const ReverseJoin& reverse_join,
599                      const simple_partitioner& partitioner ) {
600     lambda_scan_body<Range, Value, Scan, ReverseJoin> body(identity, scan, reverse_join);
601     parallel_scan(range, body, partitioner);
602     return body.result();
603 }
604 
605 //! Parallel prefix with auto_partitioner
606 /** @ingroup algorithms **/
607 template<typename Range, typename Value, typename Scan, typename ReverseJoin>
608     __TBB_requires(tbb_range<Range> && parallel_scan_function<Scan, Range, Value> &&
609                    parallel_scan_combine<ReverseJoin, Value>)
610 Value parallel_scan( const Range& range, const Value& identity, const Scan& scan, const ReverseJoin& reverse_join,
611                      const auto_partitioner& partitioner ) {
612     lambda_scan_body<Range, Value, Scan, ReverseJoin> body(identity, scan, reverse_join);
613     parallel_scan(range, body, partitioner);
614     return body.result();
615 }
616 
617 } // namespace d1
618 } // namespace detail
619 
620 inline namespace v1 {
621     using detail::d1::parallel_scan;
622     using detail::d1::pre_scan_tag;
623     using detail::d1::final_scan_tag;
624 } // namespace v1
625 
626 } // namespace tbb
627 
628 #endif /* __TBB_parallel_scan_H */
629 
630