1 /*
2     Copyright (c) 2005-2023 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #ifndef __TBB__flow_graph_body_impl_H
18 #define __TBB__flow_graph_body_impl_H
19 
20 #ifndef __TBB_flow_graph_H
21 #error Do not #include this internal file directly; use public TBB headers instead.
22 #endif
23 
24 // included in namespace tbb::detail::d1 (in flow_graph.h)
25 
26 typedef std::uint64_t tag_value;
27 
28 
29 // TODO revamp: find out if there is already helper for has_policy.
30 template<typename ... Policies> struct Policy {};
31 
32 template<typename ... Policies> struct has_policy;
33 
34 template<typename ExpectedPolicy, typename FirstPolicy, typename ...Policies>
35 struct has_policy<ExpectedPolicy, FirstPolicy, Policies...> :
36     std::integral_constant<bool, has_policy<ExpectedPolicy, FirstPolicy>::value ||
37                                  has_policy<ExpectedPolicy, Policies...>::value> {};
38 
39 template<typename ExpectedPolicy, typename SinglePolicy>
40 struct has_policy<ExpectedPolicy, SinglePolicy> :
41     std::integral_constant<bool, std::is_same<ExpectedPolicy, SinglePolicy>::value> {};
42 
43 template<typename ExpectedPolicy, typename ...Policies>
44 struct has_policy<ExpectedPolicy, Policy<Policies...> > : has_policy<ExpectedPolicy, Policies...> {};
45 
46 namespace graph_policy_namespace {
47 
48     struct rejecting { };
49     struct reserving { };
50     struct queueing  { };
51     struct lightweight  { };
52 
53     // K == type of field used for key-matching.  Each tag-matching port will be provided
54     // functor that, given an object accepted by the port, will return the
55     /// field of type K being used for matching.
56     template<typename K, typename KHash=tbb_hash_compare<typename std::decay<K>::type > >
57         __TBB_requires(tbb::detail::hash_compare<KHash, K>)
58     struct key_matching {
59         typedef K key_type;
60         typedef typename std::decay<K>::type base_key_type;
61         typedef KHash hash_compare_type;
62     };
63 
64     // old tag_matching join's new specifier
65     typedef key_matching<tag_value> tag_matching;
66 
67     // Aliases for Policy combinations
68     typedef Policy<queueing, lightweight> queueing_lightweight;
69     typedef Policy<rejecting, lightweight> rejecting_lightweight;
70 
71 } // namespace graph_policy_namespace
72 
73 // -------------- function_body containers ----------------------
74 
75 //! A functor that takes no input and generates a value of type Output
76 template< typename Output >
77 class input_body : no_assign {
78 public:
79     virtual ~input_body() {}
80     virtual Output operator()(flow_control& fc) = 0;
81     virtual input_body* clone() = 0;
82 };
83 
84 //! The leaf for input_body
85 template< typename Output, typename Body>
86 class input_body_leaf : public input_body<Output> {
87 public:
88     input_body_leaf( const Body &_body ) : body(_body) { }
89     Output operator()(flow_control& fc) override { return body(fc); }
90     input_body_leaf* clone() override {
91         return new input_body_leaf< Output, Body >(body);
92     }
93     Body get_body() { return body; }
94 private:
95     Body body;
96 };
97 
98 //! A functor that takes an Input and generates an Output
99 template< typename Input, typename Output >
100 class function_body : no_assign {
101 public:
102     virtual ~function_body() {}
103     virtual Output operator()(const Input &input) = 0;
104     virtual function_body* clone() = 0;
105 };
106 
107 //! the leaf for function_body
108 template <typename Input, typename Output, typename B>
109 class function_body_leaf : public function_body< Input, Output > {
110 public:
111     function_body_leaf( const B &_body ) : body(_body) { }
112     Output operator()(const Input &i) override { return tbb::detail::invoke(body,i); }
113     B get_body() { return body; }
114     function_body_leaf* clone() override {
115         return new function_body_leaf< Input, Output, B >(body);
116     }
117 private:
118     B body;
119 };
120 
121 //! the leaf for function_body specialized for Input and output of continue_msg
122 template <typename B>
123 class function_body_leaf< continue_msg, continue_msg, B> : public function_body< continue_msg, continue_msg > {
124 public:
125     function_body_leaf( const B &_body ) : body(_body) { }
126     continue_msg operator()( const continue_msg &i ) override {
127         body(i);
128         return i;
129     }
130     B get_body() { return body; }
131     function_body_leaf* clone() override {
132         return new function_body_leaf< continue_msg, continue_msg, B >(body);
133     }
134 private:
135     B body;
136 };
137 
138 //! the leaf for function_body specialized for Output of continue_msg
139 template <typename Input, typename B>
140 class function_body_leaf< Input, continue_msg, B> : public function_body< Input, continue_msg > {
141 public:
142     function_body_leaf( const B &_body ) : body(_body) { }
143     continue_msg operator()(const Input &i) override {
144         body(i);
145         return continue_msg();
146     }
147     B get_body() { return body; }
148     function_body_leaf* clone() override {
149         return new function_body_leaf< Input, continue_msg, B >(body);
150     }
151 private:
152     B body;
153 };
154 
155 //! the leaf for function_body specialized for Input of continue_msg
156 template <typename Output, typename B>
157 class function_body_leaf< continue_msg, Output, B > : public function_body< continue_msg, Output > {
158 public:
159     function_body_leaf( const B &_body ) : body(_body) { }
160     Output operator()(const continue_msg &i) override {
161         return body(i);
162     }
163     B get_body() { return body; }
164     function_body_leaf* clone() override {
165         return new function_body_leaf< continue_msg, Output, B >(body);
166     }
167 private:
168     B body;
169 };
170 
171 //! function_body that takes an Input and a set of output ports
172 template<typename Input, typename OutputSet>
173 class multifunction_body : no_assign {
174 public:
175     virtual ~multifunction_body () {}
176     virtual void operator()(const Input &/* input*/, OutputSet &/*oset*/) = 0;
177     virtual multifunction_body* clone() = 0;
178     virtual void* get_body_ptr() = 0;
179 };
180 
181 //! leaf for multifunction.  OutputSet can be a std::tuple or a vector.
182 template<typename Input, typename OutputSet, typename B >
183 class multifunction_body_leaf : public multifunction_body<Input, OutputSet> {
184 public:
185     multifunction_body_leaf(const B &_body) : body(_body) { }
186     void operator()(const Input &input, OutputSet &oset) override {
187         tbb::detail::invoke(body, input, oset); // body may explicitly put() to one or more of oset.
188     }
189     void* get_body_ptr() override { return &body; }
190     multifunction_body_leaf* clone() override {
191         return new multifunction_body_leaf<Input, OutputSet,B>(body);
192     }
193 
194 private:
195     B body;
196 };
197 
198 // ------ function bodies for hash_buffers and key-matching joins.
199 
200 template<typename Input, typename Output>
201 class type_to_key_function_body : no_assign {
202     public:
203         virtual ~type_to_key_function_body() {}
204         virtual Output operator()(const Input &input) = 0;  // returns an Output
205         virtual type_to_key_function_body* clone() = 0;
206 };
207 
208 // specialization for ref output
209 template<typename Input, typename Output>
210 class type_to_key_function_body<Input,Output&> : no_assign {
211     public:
212         virtual ~type_to_key_function_body() {}
213         virtual const Output & operator()(const Input &input) = 0;  // returns a const Output&
214         virtual type_to_key_function_body* clone() = 0;
215 };
216 
217 template <typename Input, typename Output, typename B>
218 class type_to_key_function_body_leaf : public type_to_key_function_body<Input, Output> {
219 public:
220     type_to_key_function_body_leaf( const B &_body ) : body(_body) { }
221     Output operator()(const Input &i) override { return tbb::detail::invoke(body, i); }
222     type_to_key_function_body_leaf* clone() override {
223         return new type_to_key_function_body_leaf< Input, Output, B>(body);
224     }
225 private:
226     B body;
227 };
228 
229 template <typename Input, typename Output, typename B>
230 class type_to_key_function_body_leaf<Input,Output&,B> : public type_to_key_function_body< Input, Output&> {
231 public:
232     type_to_key_function_body_leaf( const B &_body ) : body(_body) { }
233     const Output& operator()(const Input &i) override {
234         return tbb::detail::invoke(body, i);
235     }
236     type_to_key_function_body_leaf* clone() override {
237         return new type_to_key_function_body_leaf< Input, Output&, B>(body);
238     }
239 private:
240     B body;
241 };
242 
243 // --------------------------- end of function_body containers ------------------------
244 
245 // --------------------------- node task bodies ---------------------------------------
246 
247 //! A task that calls a node's forward_task function
248 template< typename NodeType >
249 class forward_task_bypass : public graph_task {
250     NodeType &my_node;
251 public:
252     forward_task_bypass( graph& g, small_object_allocator& allocator, NodeType &n
253                          , node_priority_t node_priority = no_priority
254     ) : graph_task(g, allocator, node_priority),
255     my_node(n) {}
256 
257     task* execute(execution_data& ed) override {
258         graph_task* next_task = my_node.forward_task();
259         if (SUCCESSFULLY_ENQUEUED == next_task)
260             next_task = nullptr;
261         else if (next_task)
262             next_task = prioritize_task(my_node.graph_reference(), *next_task);
263         finalize<forward_task_bypass>(ed);
264         return next_task;
265     }
266 
267     task* cancel(execution_data& ed) override {
268         finalize<forward_task_bypass>(ed);
269         return nullptr;
270     }
271 };
272 
273 //! A task that calls a node's apply_body_bypass function, passing in an input of type Input
274 //  return the task* unless it is SUCCESSFULLY_ENQUEUED, in which case return nullptr
275 template< typename NodeType, typename Input >
276 class apply_body_task_bypass : public graph_task {
277     NodeType &my_node;
278     Input my_input;
279 public:
280 
281     apply_body_task_bypass( graph& g, small_object_allocator& allocator, NodeType &n, const Input &i
282                             , node_priority_t node_priority = no_priority
283     ) : graph_task(g, allocator, node_priority),
284         my_node(n), my_input(i) {}
285 
286     task* execute(execution_data& ed) override {
287         graph_task* next_task = my_node.apply_body_bypass( my_input );
288         if (SUCCESSFULLY_ENQUEUED == next_task)
289             next_task = nullptr;
290         else if (next_task)
291             next_task = prioritize_task(my_node.graph_reference(), *next_task);
292         finalize<apply_body_task_bypass>(ed);
293         return next_task;
294     }
295 
296     task* cancel(execution_data& ed) override {
297         finalize<apply_body_task_bypass>(ed);
298         return nullptr;
299     }
300 };
301 
302 //! A task that calls a node's apply_body_bypass function with no input
303 template< typename NodeType >
304 class input_node_task_bypass : public graph_task {
305     NodeType &my_node;
306 public:
307     input_node_task_bypass( graph& g, small_object_allocator& allocator, NodeType &n )
308         : graph_task(g, allocator), my_node(n) {}
309 
310     task* execute(execution_data& ed) override {
311         graph_task* next_task = my_node.apply_body_bypass( );
312         if (SUCCESSFULLY_ENQUEUED == next_task)
313             next_task = nullptr;
314         else if (next_task)
315             next_task = prioritize_task(my_node.graph_reference(), *next_task);
316         finalize<input_node_task_bypass>(ed);
317         return next_task;
318     }
319 
320     task* cancel(execution_data& ed) override {
321         finalize<input_node_task_bypass>(ed);
322         return nullptr;
323     }
324 };
325 
326 // ------------------------ end of node task bodies -----------------------------------
327 
328 template<typename T, typename DecrementType, typename DummyType = void>
329 class threshold_regulator;
330 
331 template<typename T, typename DecrementType>
332 class threshold_regulator<T, DecrementType,
333                   typename std::enable_if<std::is_integral<DecrementType>::value>::type>
334     : public receiver<DecrementType>, no_copy
335 {
336     T* my_node;
337 protected:
338 
339     graph_task* try_put_task( const DecrementType& value ) override {
340         graph_task* result = my_node->decrement_counter( value );
341         if( !result )
342             result = SUCCESSFULLY_ENQUEUED;
343         return result;
344     }
345 
346     graph& graph_reference() const override {
347         return my_node->my_graph;
348     }
349 
350     template<typename U, typename V> friend class limiter_node;
351     void reset_receiver( reset_flags ) {}
352 
353 public:
354     threshold_regulator(T* owner) : my_node(owner) {
355         // Do not work with the passed pointer here as it may not be fully initialized yet
356     }
357 };
358 
359 template<typename T>
360 class threshold_regulator<T, continue_msg, void> : public continue_receiver, no_copy {
361 
362     T *my_node;
363 
364     graph_task* execute() override {
365         return my_node->decrement_counter( 1 );
366     }
367 
368 protected:
369 
370     graph& graph_reference() const override {
371         return my_node->my_graph;
372     }
373 
374 public:
375 
376     typedef continue_msg input_type;
377     typedef continue_msg output_type;
378     threshold_regulator(T* owner)
379         : continue_receiver( /*number_of_predecessors=*/0, no_priority ), my_node(owner)
380     {
381         // Do not work with the passed pointer here as it may not be fully initialized yet
382     }
383 };
384 
385 #endif // __TBB__flow_graph_body_impl_H
386