1 /* 2 Copyright (c) 2005-2023 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #ifndef __TBB__flow_graph_node_impl_H 18 #define __TBB__flow_graph_node_impl_H 19 20 #ifndef __TBB_flow_graph_H 21 #error Do not #include this internal file directly; use public TBB headers instead. 22 #endif 23 24 #include "_flow_graph_item_buffer_impl.h" 25 26 template< typename T, typename A > 27 class function_input_queue : public item_buffer<T,A> { 28 public: 29 bool empty() const { 30 return this->buffer_empty(); 31 } 32 33 const T& front() const { 34 return this->item_buffer<T, A>::front(); 35 } 36 37 void pop() { 38 this->destroy_front(); 39 } 40 41 bool push( T& t ) { 42 return this->push_back( t ); 43 } 44 }; 45 46 //! Input and scheduling for a function node that takes a type Input as input 47 // The only up-ref is apply_body_impl, which should implement the function 48 // call and any handling of the result. 49 template< typename Input, typename Policy, typename A, typename ImplType > 50 class function_input_base : public receiver<Input>, no_assign { 51 enum op_type {reg_pred, rem_pred, try_fwd, tryput_bypass, app_body_bypass, occupy_concurrency 52 }; 53 typedef function_input_base<Input, Policy, A, ImplType> class_type; 54 55 public: 56 57 //! The input type of this receiver 58 typedef Input input_type; 59 typedef typename receiver<input_type>::predecessor_type predecessor_type; 60 typedef predecessor_cache<input_type, null_mutex > predecessor_cache_type; 61 typedef function_input_queue<input_type, A> input_queue_type; 62 typedef typename allocator_traits<A>::template rebind_alloc<input_queue_type> allocator_type; 63 static_assert(!has_policy<queueing, Policy>::value || !has_policy<rejecting, Policy>::value, ""); 64 65 //! Constructor for function_input_base 66 function_input_base( graph &g, size_t max_concurrency, node_priority_t a_priority, bool is_no_throw ) 67 : my_graph_ref(g), my_max_concurrency(max_concurrency) 68 , my_concurrency(0), my_priority(a_priority), my_is_no_throw(is_no_throw) 69 , my_queue(!has_policy<rejecting, Policy>::value ? new input_queue_type() : nullptr) 70 , my_predecessors(this) 71 , forwarder_busy(false) 72 { 73 my_aggregator.initialize_handler(handler_type(this)); 74 } 75 76 //! Copy constructor 77 function_input_base( const function_input_base& src ) 78 : function_input_base(src.my_graph_ref, src.my_max_concurrency, src.my_priority, src.my_is_no_throw) {} 79 80 //! Destructor 81 // The queue is allocated by the constructor for {multi}function_node. 82 // TODO: pass the graph_buffer_policy to the base so it can allocate the queue instead. 83 // This would be an interface-breaking change. 84 virtual ~function_input_base() { 85 delete my_queue; 86 my_queue = nullptr; 87 } 88 89 graph_task* try_put_task( const input_type& t) override { 90 if ( my_is_no_throw ) 91 return try_put_task_impl(t, has_policy<lightweight, Policy>()); 92 else 93 return try_put_task_impl(t, std::false_type()); 94 } 95 96 //! Adds src to the list of cached predecessors. 97 bool register_predecessor( predecessor_type &src ) override { 98 operation_type op_data(reg_pred); 99 op_data.r = &src; 100 my_aggregator.execute(&op_data); 101 return true; 102 } 103 104 //! Removes src from the list of cached predecessors. 105 bool remove_predecessor( predecessor_type &src ) override { 106 operation_type op_data(rem_pred); 107 op_data.r = &src; 108 my_aggregator.execute(&op_data); 109 return true; 110 } 111 112 protected: 113 114 void reset_function_input_base( reset_flags f) { 115 my_concurrency = 0; 116 if(my_queue) { 117 my_queue->reset(); 118 } 119 reset_receiver(f); 120 forwarder_busy = false; 121 } 122 123 graph& my_graph_ref; 124 const size_t my_max_concurrency; 125 size_t my_concurrency; 126 node_priority_t my_priority; 127 const bool my_is_no_throw; 128 input_queue_type *my_queue; 129 predecessor_cache<input_type, null_mutex > my_predecessors; 130 131 void reset_receiver( reset_flags f) { 132 if( f & rf_clear_edges) my_predecessors.clear(); 133 else 134 my_predecessors.reset(); 135 __TBB_ASSERT(!(f & rf_clear_edges) || my_predecessors.empty(), "function_input_base reset failed"); 136 } 137 138 graph& graph_reference() const override { 139 return my_graph_ref; 140 } 141 142 graph_task* try_get_postponed_task(const input_type& i) { 143 operation_type op_data(i, app_body_bypass); // tries to pop an item or get_item 144 my_aggregator.execute(&op_data); 145 return op_data.bypass_t; 146 } 147 148 private: 149 150 friend class apply_body_task_bypass< class_type, input_type >; 151 friend class forward_task_bypass< class_type >; 152 153 class operation_type : public aggregated_operation< operation_type > { 154 public: 155 char type; 156 union { 157 input_type *elem; 158 predecessor_type *r; 159 }; 160 graph_task* bypass_t; 161 operation_type(const input_type& e, op_type t) : 162 type(char(t)), elem(const_cast<input_type*>(&e)), bypass_t(nullptr) {} 163 operation_type(op_type t) : type(char(t)), r(nullptr), bypass_t(nullptr) {} 164 }; 165 166 bool forwarder_busy; 167 typedef aggregating_functor<class_type, operation_type> handler_type; 168 friend class aggregating_functor<class_type, operation_type>; 169 aggregator< handler_type, operation_type > my_aggregator; 170 171 graph_task* perform_queued_requests() { 172 graph_task* new_task = nullptr; 173 if(my_queue) { 174 if(!my_queue->empty()) { 175 ++my_concurrency; 176 new_task = create_body_task(my_queue->front()); 177 178 my_queue->pop(); 179 } 180 } 181 else { 182 input_type i; 183 if(my_predecessors.get_item(i)) { 184 ++my_concurrency; 185 new_task = create_body_task(i); 186 } 187 } 188 return new_task; 189 } 190 void handle_operations(operation_type *op_list) { 191 operation_type* tmp; 192 while (op_list) { 193 tmp = op_list; 194 op_list = op_list->next; 195 switch (tmp->type) { 196 case reg_pred: 197 my_predecessors.add(*(tmp->r)); 198 tmp->status.store(SUCCEEDED, std::memory_order_release); 199 if (!forwarder_busy) { 200 forwarder_busy = true; 201 spawn_forward_task(); 202 } 203 break; 204 case rem_pred: 205 my_predecessors.remove(*(tmp->r)); 206 tmp->status.store(SUCCEEDED, std::memory_order_release); 207 break; 208 case app_body_bypass: { 209 tmp->bypass_t = nullptr; 210 __TBB_ASSERT(my_max_concurrency != 0, nullptr); 211 --my_concurrency; 212 if(my_concurrency<my_max_concurrency) 213 tmp->bypass_t = perform_queued_requests(); 214 tmp->status.store(SUCCEEDED, std::memory_order_release); 215 } 216 break; 217 case tryput_bypass: internal_try_put_task(tmp); break; 218 case try_fwd: internal_forward(tmp); break; 219 case occupy_concurrency: 220 if (my_concurrency < my_max_concurrency) { 221 ++my_concurrency; 222 tmp->status.store(SUCCEEDED, std::memory_order_release); 223 } else { 224 tmp->status.store(FAILED, std::memory_order_release); 225 } 226 break; 227 } 228 } 229 } 230 231 //! Put to the node, but return the task instead of enqueueing it 232 void internal_try_put_task(operation_type *op) { 233 __TBB_ASSERT(my_max_concurrency != 0, nullptr); 234 if (my_concurrency < my_max_concurrency) { 235 ++my_concurrency; 236 graph_task * new_task = create_body_task(*(op->elem)); 237 op->bypass_t = new_task; 238 op->status.store(SUCCEEDED, std::memory_order_release); 239 } else if ( my_queue && my_queue->push(*(op->elem)) ) { 240 op->bypass_t = SUCCESSFULLY_ENQUEUED; 241 op->status.store(SUCCEEDED, std::memory_order_release); 242 } else { 243 op->bypass_t = nullptr; 244 op->status.store(FAILED, std::memory_order_release); 245 } 246 } 247 248 //! Creates tasks for postponed messages if available and if concurrency allows 249 void internal_forward(operation_type *op) { 250 op->bypass_t = nullptr; 251 if (my_concurrency < my_max_concurrency) 252 op->bypass_t = perform_queued_requests(); 253 if(op->bypass_t) 254 op->status.store(SUCCEEDED, std::memory_order_release); 255 else { 256 forwarder_busy = false; 257 op->status.store(FAILED, std::memory_order_release); 258 } 259 } 260 261 graph_task* internal_try_put_bypass( const input_type& t ) { 262 operation_type op_data(t, tryput_bypass); 263 my_aggregator.execute(&op_data); 264 if( op_data.status == SUCCEEDED ) { 265 return op_data.bypass_t; 266 } 267 return nullptr; 268 } 269 270 graph_task* try_put_task_impl( const input_type& t, /*lightweight=*/std::true_type ) { 271 if( my_max_concurrency == 0 ) { 272 return apply_body_bypass(t); 273 } else { 274 operation_type check_op(t, occupy_concurrency); 275 my_aggregator.execute(&check_op); 276 if( check_op.status == SUCCEEDED ) { 277 return apply_body_bypass(t); 278 } 279 return internal_try_put_bypass(t); 280 } 281 } 282 283 graph_task* try_put_task_impl( const input_type& t, /*lightweight=*/std::false_type ) { 284 if( my_max_concurrency == 0 ) { 285 return create_body_task(t); 286 } else { 287 return internal_try_put_bypass(t); 288 } 289 } 290 291 //! Applies the body to the provided input 292 // then decides if more work is available 293 graph_task* apply_body_bypass( const input_type &i ) { 294 return static_cast<ImplType *>(this)->apply_body_impl_bypass(i); 295 } 296 297 //! allocates a task to apply a body 298 graph_task* create_body_task( const input_type &input ) { 299 if (!is_graph_active(my_graph_ref)) { 300 return nullptr; 301 } 302 // TODO revamp: extract helper for common graph task allocation part 303 small_object_allocator allocator{}; 304 typedef apply_body_task_bypass<class_type, input_type> task_type; 305 graph_task* t = allocator.new_object<task_type>( my_graph_ref, allocator, *this, input, my_priority ); 306 graph_reference().reserve_wait(); 307 return t; 308 } 309 310 //! This is executed by an enqueued task, the "forwarder" 311 graph_task* forward_task() { 312 operation_type op_data(try_fwd); 313 graph_task* rval = nullptr; 314 do { 315 op_data.status = WAIT; 316 my_aggregator.execute(&op_data); 317 if(op_data.status == SUCCEEDED) { 318 graph_task* ttask = op_data.bypass_t; 319 __TBB_ASSERT( ttask && ttask != SUCCESSFULLY_ENQUEUED, nullptr); 320 rval = combine_tasks(my_graph_ref, rval, ttask); 321 } 322 } while (op_data.status == SUCCEEDED); 323 return rval; 324 } 325 326 inline graph_task* create_forward_task() { 327 if (!is_graph_active(my_graph_ref)) { 328 return nullptr; 329 } 330 small_object_allocator allocator{}; 331 typedef forward_task_bypass<class_type> task_type; 332 graph_task* t = allocator.new_object<task_type>( graph_reference(), allocator, *this, my_priority ); 333 graph_reference().reserve_wait(); 334 return t; 335 } 336 337 //! Spawns a task that calls forward() 338 inline void spawn_forward_task() { 339 graph_task* tp = create_forward_task(); 340 if(tp) { 341 spawn_in_graph_arena(graph_reference(), *tp); 342 } 343 } 344 345 node_priority_t priority() const override { return my_priority; } 346 }; // function_input_base 347 348 //! Implements methods for a function node that takes a type Input as input and sends 349 // a type Output to its successors. 350 template< typename Input, typename Output, typename Policy, typename A> 351 class function_input : public function_input_base<Input, Policy, A, function_input<Input,Output,Policy,A> > { 352 public: 353 typedef Input input_type; 354 typedef Output output_type; 355 typedef function_body<input_type, output_type> function_body_type; 356 typedef function_input<Input, Output, Policy,A> my_class; 357 typedef function_input_base<Input, Policy, A, my_class> base_type; 358 typedef function_input_queue<input_type, A> input_queue_type; 359 360 // constructor 361 template<typename Body> 362 function_input( 363 graph &g, size_t max_concurrency, Body& body, node_priority_t a_priority ) 364 : base_type(g, max_concurrency, a_priority, noexcept(tbb::detail::invoke(body, input_type()))) 365 , my_body( new function_body_leaf< input_type, output_type, Body>(body) ) 366 , my_init_body( new function_body_leaf< input_type, output_type, Body>(body) ) { 367 } 368 369 //! Copy constructor 370 function_input( const function_input& src ) : 371 base_type(src), 372 my_body( src.my_init_body->clone() ), 373 my_init_body(src.my_init_body->clone() ) { 374 } 375 #if __INTEL_COMPILER <= 2021 376 // Suppress superfluous diagnostic about virtual keyword absence in a destructor of an inherited 377 // class while the parent class has the virtual keyword for the destrocutor. 378 virtual 379 #endif 380 ~function_input() { 381 delete my_body; 382 delete my_init_body; 383 } 384 385 template< typename Body > 386 Body copy_function_object() { 387 function_body_type &body_ref = *this->my_body; 388 return dynamic_cast< function_body_leaf<input_type, output_type, Body> & >(body_ref).get_body(); 389 } 390 391 output_type apply_body_impl( const input_type& i) { 392 // There is an extra copied needed to capture the 393 // body execution without the try_put 394 fgt_begin_body( my_body ); 395 output_type v = tbb::detail::invoke(*my_body, i); 396 fgt_end_body( my_body ); 397 return v; 398 } 399 400 //TODO: consider moving into the base class 401 graph_task* apply_body_impl_bypass( const input_type &i) { 402 output_type v = apply_body_impl(i); 403 graph_task* postponed_task = nullptr; 404 if( base_type::my_max_concurrency != 0 ) { 405 postponed_task = base_type::try_get_postponed_task(i); 406 __TBB_ASSERT( !postponed_task || postponed_task != SUCCESSFULLY_ENQUEUED, nullptr); 407 } 408 if( postponed_task ) { 409 // make the task available for other workers since we do not know successors' 410 // execution policy 411 spawn_in_graph_arena(base_type::graph_reference(), *postponed_task); 412 } 413 graph_task* successor_task = successors().try_put_task(v); 414 #if _MSC_VER && !__INTEL_COMPILER 415 #pragma warning (push) 416 #pragma warning (disable: 4127) /* suppress conditional expression is constant */ 417 #endif 418 if(has_policy<lightweight, Policy>::value) { 419 #if _MSC_VER && !__INTEL_COMPILER 420 #pragma warning (pop) 421 #endif 422 if(!successor_task) { 423 // Return confirmative status since current 424 // node's body has been executed anyway 425 successor_task = SUCCESSFULLY_ENQUEUED; 426 } 427 } 428 return successor_task; 429 } 430 431 protected: 432 433 void reset_function_input(reset_flags f) { 434 base_type::reset_function_input_base(f); 435 if(f & rf_reset_bodies) { 436 function_body_type *tmp = my_init_body->clone(); 437 delete my_body; 438 my_body = tmp; 439 } 440 } 441 442 function_body_type *my_body; 443 function_body_type *my_init_body; 444 virtual broadcast_cache<output_type > &successors() = 0; 445 446 }; // function_input 447 448 449 // helper templates to clear the successor edges of the output ports of an multifunction_node 450 template<int N> struct clear_element { 451 template<typename P> static void clear_this(P &p) { 452 (void)std::get<N-1>(p).successors().clear(); 453 clear_element<N-1>::clear_this(p); 454 } 455 #if TBB_USE_ASSERT 456 template<typename P> static bool this_empty(P &p) { 457 if(std::get<N-1>(p).successors().empty()) 458 return clear_element<N-1>::this_empty(p); 459 return false; 460 } 461 #endif 462 }; 463 464 template<> struct clear_element<1> { 465 template<typename P> static void clear_this(P &p) { 466 (void)std::get<0>(p).successors().clear(); 467 } 468 #if TBB_USE_ASSERT 469 template<typename P> static bool this_empty(P &p) { 470 return std::get<0>(p).successors().empty(); 471 } 472 #endif 473 }; 474 475 template <typename OutputTuple> 476 struct init_output_ports { 477 template <typename... Args> 478 static OutputTuple call(graph& g, const std::tuple<Args...>&) { 479 return OutputTuple(Args(g)...); 480 } 481 }; // struct init_output_ports 482 483 //! Implements methods for a function node that takes a type Input as input 484 // and has a tuple of output ports specified. 485 template< typename Input, typename OutputPortSet, typename Policy, typename A> 486 class multifunction_input : public function_input_base<Input, Policy, A, multifunction_input<Input,OutputPortSet,Policy,A> > { 487 public: 488 static const int N = std::tuple_size<OutputPortSet>::value; 489 typedef Input input_type; 490 typedef OutputPortSet output_ports_type; 491 typedef multifunction_body<input_type, output_ports_type> multifunction_body_type; 492 typedef multifunction_input<Input, OutputPortSet, Policy, A> my_class; 493 typedef function_input_base<Input, Policy, A, my_class> base_type; 494 typedef function_input_queue<input_type, A> input_queue_type; 495 496 // constructor 497 template<typename Body> 498 multifunction_input(graph &g, size_t max_concurrency,Body& body, node_priority_t a_priority ) 499 : base_type(g, max_concurrency, a_priority, noexcept(tbb::detail::invoke(body, input_type(), my_output_ports))) 500 , my_body( new multifunction_body_leaf<input_type, output_ports_type, Body>(body) ) 501 , my_init_body( new multifunction_body_leaf<input_type, output_ports_type, Body>(body) ) 502 , my_output_ports(init_output_ports<output_ports_type>::call(g, my_output_ports)){ 503 } 504 505 //! Copy constructor 506 multifunction_input( const multifunction_input& src ) : 507 base_type(src), 508 my_body( src.my_init_body->clone() ), 509 my_init_body(src.my_init_body->clone() ), 510 my_output_ports( init_output_ports<output_ports_type>::call(src.my_graph_ref, my_output_ports) ) { 511 } 512 513 ~multifunction_input() { 514 delete my_body; 515 delete my_init_body; 516 } 517 518 template< typename Body > 519 Body copy_function_object() { 520 multifunction_body_type &body_ref = *this->my_body; 521 return *static_cast<Body*>(dynamic_cast< multifunction_body_leaf<input_type, output_ports_type, Body> & >(body_ref).get_body_ptr()); 522 } 523 524 // for multifunction nodes we do not have a single successor as such. So we just tell 525 // the task we were successful. 526 //TODO: consider moving common parts with implementation in function_input into separate function 527 graph_task* apply_body_impl_bypass( const input_type &i ) { 528 fgt_begin_body( my_body ); 529 (*my_body)(i, my_output_ports); 530 fgt_end_body( my_body ); 531 graph_task* ttask = nullptr; 532 if(base_type::my_max_concurrency != 0) { 533 ttask = base_type::try_get_postponed_task(i); 534 } 535 return ttask ? ttask : SUCCESSFULLY_ENQUEUED; 536 } 537 538 output_ports_type &output_ports(){ return my_output_ports; } 539 540 protected: 541 542 void reset(reset_flags f) { 543 base_type::reset_function_input_base(f); 544 if(f & rf_clear_edges)clear_element<N>::clear_this(my_output_ports); 545 if(f & rf_reset_bodies) { 546 multifunction_body_type* tmp = my_init_body->clone(); 547 delete my_body; 548 my_body = tmp; 549 } 550 __TBB_ASSERT(!(f & rf_clear_edges) || clear_element<N>::this_empty(my_output_ports), "multifunction_node reset failed"); 551 } 552 553 multifunction_body_type *my_body; 554 multifunction_body_type *my_init_body; 555 output_ports_type my_output_ports; 556 557 }; // multifunction_input 558 559 // template to refer to an output port of a multifunction_node 560 template<size_t N, typename MOP> 561 typename std::tuple_element<N, typename MOP::output_ports_type>::type &output_port(MOP &op) { 562 return std::get<N>(op.output_ports()); 563 } 564 565 inline void check_task_and_spawn(graph& g, graph_task* t) { 566 if (t && t != SUCCESSFULLY_ENQUEUED) { 567 spawn_in_graph_arena(g, *t); 568 } 569 } 570 571 // helper structs for split_node 572 template<int N> 573 struct emit_element { 574 template<typename T, typename P> 575 static graph_task* emit_this(graph& g, const T &t, P &p) { 576 // TODO: consider to collect all the tasks in task_list and spawn them all at once 577 graph_task* last_task = std::get<N-1>(p).try_put_task(std::get<N-1>(t)); 578 check_task_and_spawn(g, last_task); 579 return emit_element<N-1>::emit_this(g,t,p); 580 } 581 }; 582 583 template<> 584 struct emit_element<1> { 585 template<typename T, typename P> 586 static graph_task* emit_this(graph& g, const T &t, P &p) { 587 graph_task* last_task = std::get<0>(p).try_put_task(std::get<0>(t)); 588 check_task_and_spawn(g, last_task); 589 return SUCCESSFULLY_ENQUEUED; 590 } 591 }; 592 593 //! Implements methods for an executable node that takes continue_msg as input 594 template< typename Output, typename Policy> 595 class continue_input : public continue_receiver { 596 public: 597 598 //! The input type of this receiver 599 typedef continue_msg input_type; 600 601 //! The output type of this receiver 602 typedef Output output_type; 603 typedef function_body<input_type, output_type> function_body_type; 604 typedef continue_input<output_type, Policy> class_type; 605 606 template< typename Body > 607 continue_input( graph &g, Body& body, node_priority_t a_priority ) 608 : continue_receiver(/*number_of_predecessors=*/0, a_priority) 609 , my_graph_ref(g) 610 , my_body( new function_body_leaf< input_type, output_type, Body>(body) ) 611 , my_init_body( new function_body_leaf< input_type, output_type, Body>(body) ) 612 { } 613 614 template< typename Body > 615 continue_input( graph &g, int number_of_predecessors, 616 Body& body, node_priority_t a_priority ) 617 : continue_receiver( number_of_predecessors, a_priority ) 618 , my_graph_ref(g) 619 , my_body( new function_body_leaf< input_type, output_type, Body>(body) ) 620 , my_init_body( new function_body_leaf< input_type, output_type, Body>(body) ) 621 { } 622 623 continue_input( const continue_input& src ) : continue_receiver(src), 624 my_graph_ref(src.my_graph_ref), 625 my_body( src.my_init_body->clone() ), 626 my_init_body( src.my_init_body->clone() ) {} 627 628 ~continue_input() { 629 delete my_body; 630 delete my_init_body; 631 } 632 633 template< typename Body > 634 Body copy_function_object() { 635 function_body_type &body_ref = *my_body; 636 return dynamic_cast< function_body_leaf<input_type, output_type, Body> & >(body_ref).get_body(); 637 } 638 639 void reset_receiver( reset_flags f) override { 640 continue_receiver::reset_receiver(f); 641 if(f & rf_reset_bodies) { 642 function_body_type *tmp = my_init_body->clone(); 643 delete my_body; 644 my_body = tmp; 645 } 646 } 647 648 protected: 649 650 graph& my_graph_ref; 651 function_body_type *my_body; 652 function_body_type *my_init_body; 653 654 virtual broadcast_cache<output_type > &successors() = 0; 655 656 friend class apply_body_task_bypass< class_type, continue_msg >; 657 658 //! Applies the body to the provided input 659 graph_task* apply_body_bypass( input_type ) { 660 // There is an extra copied needed to capture the 661 // body execution without the try_put 662 fgt_begin_body( my_body ); 663 output_type v = (*my_body)( continue_msg() ); 664 fgt_end_body( my_body ); 665 return successors().try_put_task( v ); 666 } 667 668 graph_task* execute() override { 669 if(!is_graph_active(my_graph_ref)) { 670 return nullptr; 671 } 672 #if _MSC_VER && !__INTEL_COMPILER 673 #pragma warning (push) 674 #pragma warning (disable: 4127) /* suppress conditional expression is constant */ 675 #endif 676 if(has_policy<lightweight, Policy>::value) { 677 #if _MSC_VER && !__INTEL_COMPILER 678 #pragma warning (pop) 679 #endif 680 return apply_body_bypass( continue_msg() ); 681 } 682 else { 683 small_object_allocator allocator{}; 684 typedef apply_body_task_bypass<class_type, continue_msg> task_type; 685 graph_task* t = allocator.new_object<task_type>( graph_reference(), allocator, *this, continue_msg(), my_priority ); 686 graph_reference().reserve_wait(); 687 return t; 688 } 689 } 690 691 graph& graph_reference() const override { 692 return my_graph_ref; 693 } 694 }; // continue_input 695 696 //! Implements methods for both executable and function nodes that puts Output to its successors 697 template< typename Output > 698 class function_output : public sender<Output> { 699 public: 700 701 template<int N> friend struct clear_element; 702 typedef Output output_type; 703 typedef typename sender<output_type>::successor_type successor_type; 704 typedef broadcast_cache<output_type> broadcast_cache_type; 705 706 function_output(graph& g) : my_successors(this), my_graph_ref(g) {} 707 function_output(const function_output& other) = delete; 708 709 //! Adds a new successor to this node 710 bool register_successor( successor_type &r ) override { 711 successors().register_successor( r ); 712 return true; 713 } 714 715 //! Removes a successor from this node 716 bool remove_successor( successor_type &r ) override { 717 successors().remove_successor( r ); 718 return true; 719 } 720 721 broadcast_cache_type &successors() { return my_successors; } 722 723 graph& graph_reference() const { return my_graph_ref; } 724 protected: 725 broadcast_cache_type my_successors; 726 graph& my_graph_ref; 727 }; // function_output 728 729 template< typename Output > 730 class multifunction_output : public function_output<Output> { 731 public: 732 typedef Output output_type; 733 typedef function_output<output_type> base_type; 734 using base_type::my_successors; 735 736 multifunction_output(graph& g) : base_type(g) {} 737 multifunction_output(const multifunction_output& other) : base_type(other.my_graph_ref) {} 738 739 bool try_put(const output_type &i) { 740 graph_task *res = try_put_task(i); 741 if( !res ) return false; 742 if( res != SUCCESSFULLY_ENQUEUED ) { 743 // wrapping in task_arena::execute() is not needed since the method is called from 744 // inside task::execute() 745 spawn_in_graph_arena(graph_reference(), *res); 746 } 747 return true; 748 } 749 750 using base_type::graph_reference; 751 752 protected: 753 754 graph_task* try_put_task(const output_type &i) { 755 return my_successors.try_put_task(i); 756 } 757 758 template <int N> friend struct emit_element; 759 760 }; // multifunction_output 761 762 //composite_node 763 template<typename CompositeType> 764 void add_nodes_impl(CompositeType*, bool) {} 765 766 template< typename CompositeType, typename NodeType1, typename... NodeTypes > 767 void add_nodes_impl(CompositeType *c_node, bool visible, const NodeType1& n1, const NodeTypes&... n) { 768 void *addr = const_cast<NodeType1 *>(&n1); 769 770 fgt_alias_port(c_node, addr, visible); 771 add_nodes_impl(c_node, visible, n...); 772 } 773 774 #endif // __TBB__flow_graph_node_impl_H 775