1 /* 2 Copyright (c) 2005-2023 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #ifndef __TBB__flow_graph_body_impl_H 18 #define __TBB__flow_graph_body_impl_H 19 20 #ifndef __TBB_flow_graph_H 21 #error Do not #include this internal file directly; use public TBB headers instead. 22 #endif 23 24 // included in namespace tbb::detail::d1 (in flow_graph.h) 25 26 typedef std::uint64_t tag_value; 27 28 29 // TODO revamp: find out if there is already helper for has_policy. 30 template<typename ... Policies> struct Policy {}; 31 32 template<typename ... Policies> struct has_policy; 33 34 template<typename ExpectedPolicy, typename FirstPolicy, typename ...Policies> 35 struct has_policy<ExpectedPolicy, FirstPolicy, Policies...> : 36 std::integral_constant<bool, has_policy<ExpectedPolicy, FirstPolicy>::value || 37 has_policy<ExpectedPolicy, Policies...>::value> {}; 38 39 template<typename ExpectedPolicy, typename SinglePolicy> 40 struct has_policy<ExpectedPolicy, SinglePolicy> : 41 std::integral_constant<bool, std::is_same<ExpectedPolicy, SinglePolicy>::value> {}; 42 43 template<typename ExpectedPolicy, typename ...Policies> 44 struct has_policy<ExpectedPolicy, Policy<Policies...> > : has_policy<ExpectedPolicy, Policies...> {}; 45 46 namespace graph_policy_namespace { 47 48 struct rejecting { }; 49 struct reserving { }; 50 struct queueing { }; 51 struct lightweight { }; 52 53 // K == type of field used for key-matching. Each tag-matching port will be provided 54 // functor that, given an object accepted by the port, will return the 55 /// field of type K being used for matching. 56 template<typename K, typename KHash=tbb_hash_compare<typename std::decay<K>::type > > 57 __TBB_requires(tbb::detail::hash_compare<KHash, K>) 58 struct key_matching { 59 typedef K key_type; 60 typedef typename std::decay<K>::type base_key_type; 61 typedef KHash hash_compare_type; 62 }; 63 64 // old tag_matching join's new specifier 65 typedef key_matching<tag_value> tag_matching; 66 67 // Aliases for Policy combinations 68 typedef Policy<queueing, lightweight> queueing_lightweight; 69 typedef Policy<rejecting, lightweight> rejecting_lightweight; 70 71 } // namespace graph_policy_namespace 72 73 // -------------- function_body containers ---------------------- 74 75 //! A functor that takes no input and generates a value of type Output 76 template< typename Output > 77 class input_body : no_assign { 78 public: 79 virtual ~input_body() {} 80 virtual Output operator()(flow_control& fc) = 0; 81 virtual input_body* clone() = 0; 82 }; 83 84 //! The leaf for input_body 85 template< typename Output, typename Body> 86 class input_body_leaf : public input_body<Output> { 87 public: 88 input_body_leaf( const Body &_body ) : body(_body) { } 89 Output operator()(flow_control& fc) override { return body(fc); } 90 input_body_leaf* clone() override { 91 return new input_body_leaf< Output, Body >(body); 92 } 93 Body get_body() { return body; } 94 private: 95 Body body; 96 }; 97 98 //! A functor that takes an Input and generates an Output 99 template< typename Input, typename Output > 100 class function_body : no_assign { 101 public: 102 virtual ~function_body() {} 103 virtual Output operator()(const Input &input) = 0; 104 virtual function_body* clone() = 0; 105 }; 106 107 //! the leaf for function_body 108 template <typename Input, typename Output, typename B> 109 class function_body_leaf : public function_body< Input, Output > { 110 public: 111 function_body_leaf( const B &_body ) : body(_body) { } 112 Output operator()(const Input &i) override { return tbb::detail::invoke(body,i); } 113 B get_body() { return body; } 114 function_body_leaf* clone() override { 115 return new function_body_leaf< Input, Output, B >(body); 116 } 117 private: 118 B body; 119 }; 120 121 //! the leaf for function_body specialized for Input and output of continue_msg 122 template <typename B> 123 class function_body_leaf< continue_msg, continue_msg, B> : public function_body< continue_msg, continue_msg > { 124 public: 125 function_body_leaf( const B &_body ) : body(_body) { } 126 continue_msg operator()( const continue_msg &i ) override { 127 body(i); 128 return i; 129 } 130 B get_body() { return body; } 131 function_body_leaf* clone() override { 132 return new function_body_leaf< continue_msg, continue_msg, B >(body); 133 } 134 private: 135 B body; 136 }; 137 138 //! the leaf for function_body specialized for Output of continue_msg 139 template <typename Input, typename B> 140 class function_body_leaf< Input, continue_msg, B> : public function_body< Input, continue_msg > { 141 public: 142 function_body_leaf( const B &_body ) : body(_body) { } 143 continue_msg operator()(const Input &i) override { 144 body(i); 145 return continue_msg(); 146 } 147 B get_body() { return body; } 148 function_body_leaf* clone() override { 149 return new function_body_leaf< Input, continue_msg, B >(body); 150 } 151 private: 152 B body; 153 }; 154 155 //! the leaf for function_body specialized for Input of continue_msg 156 template <typename Output, typename B> 157 class function_body_leaf< continue_msg, Output, B > : public function_body< continue_msg, Output > { 158 public: 159 function_body_leaf( const B &_body ) : body(_body) { } 160 Output operator()(const continue_msg &i) override { 161 return body(i); 162 } 163 B get_body() { return body; } 164 function_body_leaf* clone() override { 165 return new function_body_leaf< continue_msg, Output, B >(body); 166 } 167 private: 168 B body; 169 }; 170 171 //! function_body that takes an Input and a set of output ports 172 template<typename Input, typename OutputSet> 173 class multifunction_body : no_assign { 174 public: 175 virtual ~multifunction_body () {} 176 virtual void operator()(const Input &/* input*/, OutputSet &/*oset*/) = 0; 177 virtual multifunction_body* clone() = 0; 178 virtual void* get_body_ptr() = 0; 179 }; 180 181 //! leaf for multifunction. OutputSet can be a std::tuple or a vector. 182 template<typename Input, typename OutputSet, typename B > 183 class multifunction_body_leaf : public multifunction_body<Input, OutputSet> { 184 public: 185 multifunction_body_leaf(const B &_body) : body(_body) { } 186 void operator()(const Input &input, OutputSet &oset) override { 187 tbb::detail::invoke(body, input, oset); // body may explicitly put() to one or more of oset. 188 } 189 void* get_body_ptr() override { return &body; } 190 multifunction_body_leaf* clone() override { 191 return new multifunction_body_leaf<Input, OutputSet,B>(body); 192 } 193 194 private: 195 B body; 196 }; 197 198 // ------ function bodies for hash_buffers and key-matching joins. 199 200 template<typename Input, typename Output> 201 class type_to_key_function_body : no_assign { 202 public: 203 virtual ~type_to_key_function_body() {} 204 virtual Output operator()(const Input &input) = 0; // returns an Output 205 virtual type_to_key_function_body* clone() = 0; 206 }; 207 208 // specialization for ref output 209 template<typename Input, typename Output> 210 class type_to_key_function_body<Input,Output&> : no_assign { 211 public: 212 virtual ~type_to_key_function_body() {} 213 virtual const Output & operator()(const Input &input) = 0; // returns a const Output& 214 virtual type_to_key_function_body* clone() = 0; 215 }; 216 217 template <typename Input, typename Output, typename B> 218 class type_to_key_function_body_leaf : public type_to_key_function_body<Input, Output> { 219 public: 220 type_to_key_function_body_leaf( const B &_body ) : body(_body) { } 221 Output operator()(const Input &i) override { return tbb::detail::invoke(body, i); } 222 type_to_key_function_body_leaf* clone() override { 223 return new type_to_key_function_body_leaf< Input, Output, B>(body); 224 } 225 private: 226 B body; 227 }; 228 229 template <typename Input, typename Output, typename B> 230 class type_to_key_function_body_leaf<Input,Output&,B> : public type_to_key_function_body< Input, Output&> { 231 public: 232 type_to_key_function_body_leaf( const B &_body ) : body(_body) { } 233 const Output& operator()(const Input &i) override { 234 return tbb::detail::invoke(body, i); 235 } 236 type_to_key_function_body_leaf* clone() override { 237 return new type_to_key_function_body_leaf< Input, Output&, B>(body); 238 } 239 private: 240 B body; 241 }; 242 243 // --------------------------- end of function_body containers ------------------------ 244 245 // --------------------------- node task bodies --------------------------------------- 246 247 //! A task that calls a node's forward_task function 248 template< typename NodeType > 249 class forward_task_bypass : public graph_task { 250 NodeType &my_node; 251 public: 252 forward_task_bypass( graph& g, small_object_allocator& allocator, NodeType &n 253 , node_priority_t node_priority = no_priority 254 ) : graph_task(g, allocator, node_priority), 255 my_node(n) {} 256 257 task* execute(execution_data& ed) override { 258 graph_task* next_task = my_node.forward_task(); 259 if (SUCCESSFULLY_ENQUEUED == next_task) 260 next_task = nullptr; 261 else if (next_task) 262 next_task = prioritize_task(my_node.graph_reference(), *next_task); 263 finalize<forward_task_bypass>(ed); 264 return next_task; 265 } 266 267 task* cancel(execution_data& ed) override { 268 finalize<forward_task_bypass>(ed); 269 return nullptr; 270 } 271 }; 272 273 //! A task that calls a node's apply_body_bypass function, passing in an input of type Input 274 // return the task* unless it is SUCCESSFULLY_ENQUEUED, in which case return nullptr 275 template< typename NodeType, typename Input > 276 class apply_body_task_bypass : public graph_task { 277 NodeType &my_node; 278 Input my_input; 279 public: 280 281 apply_body_task_bypass( graph& g, small_object_allocator& allocator, NodeType &n, const Input &i 282 , node_priority_t node_priority = no_priority 283 ) : graph_task(g, allocator, node_priority), 284 my_node(n), my_input(i) {} 285 286 task* execute(execution_data& ed) override { 287 graph_task* next_task = my_node.apply_body_bypass( my_input ); 288 if (SUCCESSFULLY_ENQUEUED == next_task) 289 next_task = nullptr; 290 else if (next_task) 291 next_task = prioritize_task(my_node.graph_reference(), *next_task); 292 finalize<apply_body_task_bypass>(ed); 293 return next_task; 294 } 295 296 task* cancel(execution_data& ed) override { 297 finalize<apply_body_task_bypass>(ed); 298 return nullptr; 299 } 300 }; 301 302 //! A task that calls a node's apply_body_bypass function with no input 303 template< typename NodeType > 304 class input_node_task_bypass : public graph_task { 305 NodeType &my_node; 306 public: 307 input_node_task_bypass( graph& g, small_object_allocator& allocator, NodeType &n ) 308 : graph_task(g, allocator), my_node(n) {} 309 310 task* execute(execution_data& ed) override { 311 graph_task* next_task = my_node.apply_body_bypass( ); 312 if (SUCCESSFULLY_ENQUEUED == next_task) 313 next_task = nullptr; 314 else if (next_task) 315 next_task = prioritize_task(my_node.graph_reference(), *next_task); 316 finalize<input_node_task_bypass>(ed); 317 return next_task; 318 } 319 320 task* cancel(execution_data& ed) override { 321 finalize<input_node_task_bypass>(ed); 322 return nullptr; 323 } 324 }; 325 326 // ------------------------ end of node task bodies ----------------------------------- 327 328 template<typename T, typename DecrementType, typename DummyType = void> 329 class threshold_regulator; 330 331 template<typename T, typename DecrementType> 332 class threshold_regulator<T, DecrementType, 333 typename std::enable_if<std::is_integral<DecrementType>::value>::type> 334 : public receiver<DecrementType>, no_copy 335 { 336 T* my_node; 337 protected: 338 339 graph_task* try_put_task( const DecrementType& value ) override { 340 graph_task* result = my_node->decrement_counter( value ); 341 if( !result ) 342 result = SUCCESSFULLY_ENQUEUED; 343 return result; 344 } 345 346 graph& graph_reference() const override { 347 return my_node->my_graph; 348 } 349 350 template<typename U, typename V> friend class limiter_node; 351 void reset_receiver( reset_flags ) {} 352 353 public: 354 threshold_regulator(T* owner) : my_node(owner) { 355 // Do not work with the passed pointer here as it may not be fully initialized yet 356 } 357 }; 358 359 template<typename T> 360 class threshold_regulator<T, continue_msg, void> : public continue_receiver, no_copy { 361 362 T *my_node; 363 364 graph_task* execute() override { 365 return my_node->decrement_counter( 1 ); 366 } 367 368 protected: 369 370 graph& graph_reference() const override { 371 return my_node->my_graph; 372 } 373 374 public: 375 376 typedef continue_msg input_type; 377 typedef continue_msg output_type; 378 threshold_regulator(T* owner) 379 : continue_receiver( /*number_of_predecessors=*/0, no_priority ), my_node(owner) 380 { 381 // Do not work with the passed pointer here as it may not be fully initialized yet 382 } 383 }; 384 385 #endif // __TBB__flow_graph_body_impl_H 386