xref: /oneTBB/include/oneapi/tbb/parallel_invoke.h (revision c4a799df)
1 /*
2     Copyright (c) 2005-2023 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #ifndef __TBB_parallel_invoke_H
18 #define __TBB_parallel_invoke_H
19 
20 #include "detail/_config.h"
21 #include "detail/_namespace_injection.h"
22 #include "detail/_exception.h"
23 #include "detail/_task.h"
24 #include "detail/_template_helpers.h"
25 #include "detail/_small_object_pool.h"
26 
27 #include "task_group.h"
28 
29 #include <tuple>
30 #include <atomic>
31 #include <utility>
32 
33 namespace tbb {
34 namespace detail {
35 namespace d1 {
36 
37 //! Simple task object, executing user method
38 template<typename Function, typename WaitObject>
39 struct function_invoker : public task {
function_invokerfunction_invoker40     function_invoker(const Function& function, WaitObject& wait_ctx) :
41         my_function(function),
42         parent_wait_ctx(wait_ctx)
43     {}
44 
executefunction_invoker45     task* execute(execution_data& ed) override {
46         my_function();
47         parent_wait_ctx.release(ed);
48         call_itt_task_notify(destroy, this);
49         return nullptr;
50     }
51 
cancelfunction_invoker52     task* cancel(execution_data& ed) override {
53         parent_wait_ctx.release(ed);
54         return nullptr;
55     }
56 
57     const Function& my_function;
58     WaitObject& parent_wait_ctx;
59 }; // struct function_invoker
60 
61 //! Task object for managing subroots in trinary task trees.
62 // Endowed with additional synchronization logic (compatible with wait object interfaces) to support
63 // continuation passing execution. This task spawns 2 function_invoker tasks with first and second functors
64 // and then executes first functor by itself. But only the last executed functor must destruct and deallocate
65 // the subroot task.
66 template<typename F1, typename F2, typename F3>
67 struct invoke_subroot_task : public task {
68     wait_context& root_wait_ctx;
69     std::atomic<unsigned> ref_count{0};
70     bool child_spawned = false;
71 
72     const F1& self_invoked_functor;
73     function_invoker<F2, invoke_subroot_task<F1, F2, F3>> f2_invoker;
74     function_invoker<F3, invoke_subroot_task<F1, F2, F3>> f3_invoker;
75 
76     task_group_context& my_execution_context;
77     small_object_allocator my_allocator;
78 
invoke_subroot_taskinvoke_subroot_task79     invoke_subroot_task(const F1& f1, const F2& f2, const F3& f3, wait_context& wait_ctx, task_group_context& context,
80                  small_object_allocator& alloc) :
81         root_wait_ctx(wait_ctx),
82         self_invoked_functor(f1),
83         f2_invoker(f2, *this),
84         f3_invoker(f3, *this),
85         my_execution_context(context),
86         my_allocator(alloc)
87     {
88         root_wait_ctx.reserve();
89     }
90 
finalizeinvoke_subroot_task91     void finalize(const execution_data& ed) {
92         root_wait_ctx.release();
93 
94         my_allocator.delete_object(this, ed);
95     }
96 
releaseinvoke_subroot_task97     void release(const execution_data& ed) {
98         __TBB_ASSERT(ref_count > 0, nullptr);
99         call_itt_task_notify(releasing, this);
100         if( --ref_count == 0 ) {
101             call_itt_task_notify(acquired, this);
102             finalize(ed);
103         }
104     }
105 
executeinvoke_subroot_task106     task* execute(execution_data& ed) override {
107         ref_count.fetch_add(3, std::memory_order_relaxed);
108         spawn(f3_invoker, my_execution_context);
109         spawn(f2_invoker, my_execution_context);
110         self_invoked_functor();
111 
112         release(ed);
113         return nullptr;
114     }
115 
cancelinvoke_subroot_task116     task* cancel(execution_data& ed) override {
117         if( ref_count > 0 ) { // detect children spawn
118             release(ed);
119         } else {
120             finalize(ed);
121         }
122         return nullptr;
123     }
124 }; // struct subroot_task
125 
126 class invoke_root_task {
127 public:
invoke_root_task(wait_context & wc)128     invoke_root_task(wait_context& wc) : my_wait_context(wc) {}
release(const execution_data &)129     void release(const execution_data&) {
130         my_wait_context.release();
131     }
132 private:
133     wait_context& my_wait_context;
134 };
135 
136 template<typename F1>
invoke_recursive_separation(wait_context & root_wait_ctx,task_group_context & context,const F1 & f1)137 void invoke_recursive_separation(wait_context& root_wait_ctx, task_group_context& context, const F1& f1) {
138     root_wait_ctx.reserve(1);
139     invoke_root_task root(root_wait_ctx);
140     function_invoker<F1, invoke_root_task> invoker1(f1, root);
141 
142     execute_and_wait(invoker1, context, root_wait_ctx, context);
143 }
144 
145 template<typename F1, typename F2>
invoke_recursive_separation(wait_context & root_wait_ctx,task_group_context & context,const F1 & f1,const F2 & f2)146 void invoke_recursive_separation(wait_context& root_wait_ctx, task_group_context& context, const F1& f1, const F2& f2) {
147     root_wait_ctx.reserve(2);
148     invoke_root_task root(root_wait_ctx);
149     function_invoker<F1, invoke_root_task> invoker1(f1, root);
150     function_invoker<F2, invoke_root_task> invoker2(f2, root);
151 
152     spawn(invoker1, context);
153     execute_and_wait(invoker2, context, root_wait_ctx, context);
154 }
155 
156 template<typename F1, typename F2, typename F3>
invoke_recursive_separation(wait_context & root_wait_ctx,task_group_context & context,const F1 & f1,const F2 & f2,const F3 & f3)157 void invoke_recursive_separation(wait_context& root_wait_ctx, task_group_context& context, const F1& f1, const F2& f2, const F3& f3) {
158     root_wait_ctx.reserve(3);
159     invoke_root_task root(root_wait_ctx);
160     function_invoker<F1, invoke_root_task> invoker1(f1, root);
161     function_invoker<F2, invoke_root_task> invoker2(f2, root);
162     function_invoker<F3, invoke_root_task> invoker3(f3, root);
163 
164     //TODO: implement sub root for two tasks (measure performance)
165     spawn(invoker1, context);
166     spawn(invoker2, context);
167     execute_and_wait(invoker3, context, root_wait_ctx, context);
168 }
169 
170 template<typename F1, typename F2, typename F3, typename... Fs>
invoke_recursive_separation(wait_context & root_wait_ctx,task_group_context & context,const F1 & f1,const F2 & f2,const F3 & f3,const Fs &...fs)171 void invoke_recursive_separation(wait_context& root_wait_ctx, task_group_context& context,
172                                  const F1& f1, const F2& f2, const F3& f3, const Fs&... fs) {
173     small_object_allocator alloc{};
174     auto sub_root = alloc.new_object<invoke_subroot_task<F1, F2, F3>>(f1, f2, f3, root_wait_ctx, context, alloc);
175     spawn(*sub_root, context);
176 
177     invoke_recursive_separation(root_wait_ctx, context, fs...);
178 }
179 
180 template<typename... Fs>
parallel_invoke_impl(task_group_context & context,const Fs &...fs)181 void parallel_invoke_impl(task_group_context& context, const Fs&... fs) {
182     static_assert(sizeof...(Fs) >= 2, "Parallel invoke may be called with at least two callable");
183     wait_context root_wait_ctx{0};
184 
185     invoke_recursive_separation(root_wait_ctx, context, fs...);
186 }
187 
188 template<typename F1, typename... Fs>
parallel_invoke_impl(const F1 & f1,const Fs &...fs)189 void parallel_invoke_impl(const F1& f1, const Fs&... fs) {
190     static_assert(sizeof...(Fs) >= 1, "Parallel invoke may be called with at least two callable");
191     task_group_context context(PARALLEL_INVOKE);
192     wait_context root_wait_ctx{0};
193 
194     invoke_recursive_separation(root_wait_ctx, context, fs..., f1);
195 }
196 
197 //! Passes last argument of variadic pack as first for handling user provided task_group_context
198 template <typename Tuple, typename... Fs>
199 struct invoke_helper;
200 
201 template <typename... Args, typename T, typename... Fs>
202 struct invoke_helper<std::tuple<Args...>, T, Fs...> : invoke_helper<std::tuple<Args..., T>, Fs...> {};
203 
204 template <typename... Fs, typename T/*task_group_context or callable*/>
205 struct invoke_helper<std::tuple<Fs...>, T> {
206     void operator()(Fs&&... args, T&& t) {
207         parallel_invoke_impl(std::forward<T>(t), std::forward<Fs>(args)...);
208     }
209 };
210 
211 //! Parallel execution of several function objects
212 // We need to pass parameter pack through forwarding reference,
213 // since this pack may contain task_group_context that must be passed via lvalue non-const reference
214 template<typename... Fs>
215 void parallel_invoke(Fs&&... fs) {
216     invoke_helper<std::tuple<>, Fs...>()(std::forward<Fs>(fs)...);
217 }
218 
219 } // namespace d1
220 } // namespace detail
221 
222 inline namespace v1 {
223 using detail::d1::parallel_invoke;
224 } // namespace v1
225 
226 } // namespace tbb
227 #endif /* __TBB_parallel_invoke_H */
228