1 /* 2 Copyright (c) 2005-2021 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #ifndef __TBB__flow_graph_node_impl_H 18 #define __TBB__flow_graph_node_impl_H 19 20 #ifndef __TBB_flow_graph_H 21 #error Do not #include this internal file directly; use public TBB headers instead. 22 #endif 23 24 #include "_flow_graph_item_buffer_impl.h" 25 26 template< typename T, typename A > 27 class function_input_queue : public item_buffer<T,A> { 28 public: 29 bool empty() const { 30 return this->buffer_empty(); 31 } 32 33 const T& front() const { 34 return this->item_buffer<T, A>::front(); 35 } 36 37 void pop() { 38 this->destroy_front(); 39 } 40 41 bool push( T& t ) { 42 return this->push_back( t ); 43 } 44 }; 45 46 //! Input and scheduling for a function node that takes a type Input as input 47 // The only up-ref is apply_body_impl, which should implement the function 48 // call and any handling of the result. 49 template< typename Input, typename Policy, typename A, typename ImplType > 50 class function_input_base : public receiver<Input>, no_assign { 51 enum op_type {reg_pred, rem_pred, try_fwd, tryput_bypass, app_body_bypass, occupy_concurrency 52 }; 53 typedef function_input_base<Input, Policy, A, ImplType> class_type; 54 55 public: 56 57 //! The input type of this receiver 58 typedef Input input_type; 59 typedef typename receiver<input_type>::predecessor_type predecessor_type; 60 typedef predecessor_cache<input_type, null_mutex > predecessor_cache_type; 61 typedef function_input_queue<input_type, A> input_queue_type; 62 typedef typename allocator_traits<A>::template rebind_alloc<input_queue_type> allocator_type; 63 static_assert(!has_policy<queueing, Policy>::value || !has_policy<rejecting, Policy>::value, ""); 64 65 //! Constructor for function_input_base 66 function_input_base( graph &g, size_t max_concurrency, node_priority_t a_priority ) 67 : my_graph_ref(g), my_max_concurrency(max_concurrency) 68 , my_concurrency(0), my_priority(a_priority) 69 , my_queue(!has_policy<rejecting, Policy>::value ? new input_queue_type() : NULL) 70 , my_predecessors(this) 71 , forwarder_busy(false) 72 { 73 my_aggregator.initialize_handler(handler_type(this)); 74 } 75 76 //! Copy constructor 77 function_input_base( const function_input_base& src ) 78 : function_input_base(src.my_graph_ref, src.my_max_concurrency, src.my_priority) {} 79 80 //! Destructor 81 // The queue is allocated by the constructor for {multi}function_node. 82 // TODO: pass the graph_buffer_policy to the base so it can allocate the queue instead. 83 // This would be an interface-breaking change. 84 virtual ~function_input_base() { 85 delete my_queue; 86 my_queue = nullptr; 87 } 88 89 graph_task* try_put_task( const input_type& t) override { 90 return try_put_task_impl(t, has_policy<lightweight, Policy>()); 91 } 92 93 //! Adds src to the list of cached predecessors. 94 bool register_predecessor( predecessor_type &src ) override { 95 operation_type op_data(reg_pred); 96 op_data.r = &src; 97 my_aggregator.execute(&op_data); 98 return true; 99 } 100 101 //! Removes src from the list of cached predecessors. 102 bool remove_predecessor( predecessor_type &src ) override { 103 operation_type op_data(rem_pred); 104 op_data.r = &src; 105 my_aggregator.execute(&op_data); 106 return true; 107 } 108 109 protected: 110 111 void reset_function_input_base( reset_flags f) { 112 my_concurrency = 0; 113 if(my_queue) { 114 my_queue->reset(); 115 } 116 reset_receiver(f); 117 forwarder_busy = false; 118 } 119 120 graph& my_graph_ref; 121 const size_t my_max_concurrency; 122 size_t my_concurrency; 123 node_priority_t my_priority; 124 input_queue_type *my_queue; 125 predecessor_cache<input_type, null_mutex > my_predecessors; 126 127 void reset_receiver( reset_flags f) { 128 if( f & rf_clear_edges) my_predecessors.clear(); 129 else 130 my_predecessors.reset(); 131 __TBB_ASSERT(!(f & rf_clear_edges) || my_predecessors.empty(), "function_input_base reset failed"); 132 } 133 134 graph& graph_reference() const override { 135 return my_graph_ref; 136 } 137 138 graph_task* try_get_postponed_task(const input_type& i) { 139 operation_type op_data(i, app_body_bypass); // tries to pop an item or get_item 140 my_aggregator.execute(&op_data); 141 return op_data.bypass_t; 142 } 143 144 private: 145 146 friend class apply_body_task_bypass< class_type, input_type >; 147 friend class forward_task_bypass< class_type >; 148 149 class operation_type : public aggregated_operation< operation_type > { 150 public: 151 char type; 152 union { 153 input_type *elem; 154 predecessor_type *r; 155 }; 156 graph_task* bypass_t; 157 operation_type(const input_type& e, op_type t) : 158 type(char(t)), elem(const_cast<input_type*>(&e)) {} 159 operation_type(op_type t) : type(char(t)), r(NULL) {} 160 }; 161 162 bool forwarder_busy; 163 typedef aggregating_functor<class_type, operation_type> handler_type; 164 friend class aggregating_functor<class_type, operation_type>; 165 aggregator< handler_type, operation_type > my_aggregator; 166 167 graph_task* perform_queued_requests() { 168 graph_task* new_task = NULL; 169 if(my_queue) { 170 if(!my_queue->empty()) { 171 ++my_concurrency; 172 new_task = create_body_task(my_queue->front()); 173 174 my_queue->pop(); 175 } 176 } 177 else { 178 input_type i; 179 if(my_predecessors.get_item(i)) { 180 ++my_concurrency; 181 new_task = create_body_task(i); 182 } 183 } 184 return new_task; 185 } 186 void handle_operations(operation_type *op_list) { 187 operation_type* tmp; 188 while (op_list) { 189 tmp = op_list; 190 op_list = op_list->next; 191 switch (tmp->type) { 192 case reg_pred: 193 my_predecessors.add(*(tmp->r)); 194 tmp->status.store(SUCCEEDED, std::memory_order_release); 195 if (!forwarder_busy) { 196 forwarder_busy = true; 197 spawn_forward_task(); 198 } 199 break; 200 case rem_pred: 201 my_predecessors.remove(*(tmp->r)); 202 tmp->status.store(SUCCEEDED, std::memory_order_release); 203 break; 204 case app_body_bypass: { 205 tmp->bypass_t = NULL; 206 __TBB_ASSERT(my_max_concurrency != 0, NULL); 207 --my_concurrency; 208 if(my_concurrency<my_max_concurrency) 209 tmp->bypass_t = perform_queued_requests(); 210 tmp->status.store(SUCCEEDED, std::memory_order_release); 211 } 212 break; 213 case tryput_bypass: internal_try_put_task(tmp); break; 214 case try_fwd: internal_forward(tmp); break; 215 case occupy_concurrency: 216 if (my_concurrency < my_max_concurrency) { 217 ++my_concurrency; 218 tmp->status.store(SUCCEEDED, std::memory_order_release); 219 } else { 220 tmp->status.store(FAILED, std::memory_order_release); 221 } 222 break; 223 } 224 } 225 } 226 227 //! Put to the node, but return the task instead of enqueueing it 228 void internal_try_put_task(operation_type *op) { 229 __TBB_ASSERT(my_max_concurrency != 0, NULL); 230 if (my_concurrency < my_max_concurrency) { 231 ++my_concurrency; 232 graph_task * new_task = create_body_task(*(op->elem)); 233 op->bypass_t = new_task; 234 op->status.store(SUCCEEDED, std::memory_order_release); 235 } else if ( my_queue && my_queue->push(*(op->elem)) ) { 236 op->bypass_t = SUCCESSFULLY_ENQUEUED; 237 op->status.store(SUCCEEDED, std::memory_order_release); 238 } else { 239 op->bypass_t = NULL; 240 op->status.store(FAILED, std::memory_order_release); 241 } 242 } 243 244 //! Creates tasks for postponed messages if available and if concurrency allows 245 void internal_forward(operation_type *op) { 246 op->bypass_t = NULL; 247 if (my_concurrency < my_max_concurrency) 248 op->bypass_t = perform_queued_requests(); 249 if(op->bypass_t) 250 op->status.store(SUCCEEDED, std::memory_order_release); 251 else { 252 forwarder_busy = false; 253 op->status.store(FAILED, std::memory_order_release); 254 } 255 } 256 257 graph_task* internal_try_put_bypass( const input_type& t ) { 258 operation_type op_data(t, tryput_bypass); 259 my_aggregator.execute(&op_data); 260 if( op_data.status == SUCCEEDED ) { 261 return op_data.bypass_t; 262 } 263 return NULL; 264 } 265 266 graph_task* try_put_task_impl( const input_type& t, /*lightweight=*/std::true_type ) { 267 if( my_max_concurrency == 0 ) { 268 return apply_body_bypass(t); 269 } else { 270 operation_type check_op(t, occupy_concurrency); 271 my_aggregator.execute(&check_op); 272 if( check_op.status == SUCCEEDED ) { 273 return apply_body_bypass(t); 274 } 275 return internal_try_put_bypass(t); 276 } 277 } 278 279 graph_task* try_put_task_impl( const input_type& t, /*lightweight=*/std::false_type ) { 280 if( my_max_concurrency == 0 ) { 281 return create_body_task(t); 282 } else { 283 return internal_try_put_bypass(t); 284 } 285 } 286 287 //! Applies the body to the provided input 288 // then decides if more work is available 289 graph_task* apply_body_bypass( const input_type &i ) { 290 return static_cast<ImplType *>(this)->apply_body_impl_bypass(i); 291 } 292 293 //! allocates a task to apply a body 294 graph_task* create_body_task( const input_type &input ) { 295 if (!is_graph_active(my_graph_ref)) { 296 return nullptr; 297 } 298 // TODO revamp: extract helper for common graph task allocation part 299 small_object_allocator allocator{}; 300 typedef apply_body_task_bypass<class_type, input_type> task_type; 301 graph_task* t = allocator.new_object<task_type>( my_graph_ref, allocator, *this, input, my_priority ); 302 graph_reference().reserve_wait(); 303 return t; 304 } 305 306 //! This is executed by an enqueued task, the "forwarder" 307 graph_task* forward_task() { 308 operation_type op_data(try_fwd); 309 graph_task* rval = NULL; 310 do { 311 op_data.status = WAIT; 312 my_aggregator.execute(&op_data); 313 if(op_data.status == SUCCEEDED) { 314 graph_task* ttask = op_data.bypass_t; 315 __TBB_ASSERT( ttask && ttask != SUCCESSFULLY_ENQUEUED, NULL ); 316 rval = combine_tasks(my_graph_ref, rval, ttask); 317 } 318 } while (op_data.status == SUCCEEDED); 319 return rval; 320 } 321 322 inline graph_task* create_forward_task() { 323 if (!is_graph_active(my_graph_ref)) { 324 return nullptr; 325 } 326 small_object_allocator allocator{}; 327 typedef forward_task_bypass<class_type> task_type; 328 graph_task* t = allocator.new_object<task_type>( graph_reference(), allocator, *this, my_priority ); 329 graph_reference().reserve_wait(); 330 return t; 331 } 332 333 //! Spawns a task that calls forward() 334 inline void spawn_forward_task() { 335 graph_task* tp = create_forward_task(); 336 if(tp) { 337 spawn_in_graph_arena(graph_reference(), *tp); 338 } 339 } 340 341 node_priority_t priority() const override { return my_priority; } 342 }; // function_input_base 343 344 //! Implements methods for a function node that takes a type Input as input and sends 345 // a type Output to its successors. 346 template< typename Input, typename Output, typename Policy, typename A> 347 class function_input : public function_input_base<Input, Policy, A, function_input<Input,Output,Policy,A> > { 348 public: 349 typedef Input input_type; 350 typedef Output output_type; 351 typedef function_body<input_type, output_type> function_body_type; 352 typedef function_input<Input, Output, Policy,A> my_class; 353 typedef function_input_base<Input, Policy, A, my_class> base_type; 354 typedef function_input_queue<input_type, A> input_queue_type; 355 356 // constructor 357 template<typename Body> 358 function_input( 359 graph &g, size_t max_concurrency, Body& body, node_priority_t a_priority ) 360 : base_type(g, max_concurrency, a_priority) 361 , my_body( new function_body_leaf< input_type, output_type, Body>(body) ) 362 , my_init_body( new function_body_leaf< input_type, output_type, Body>(body) ) { 363 } 364 365 //! Copy constructor 366 function_input( const function_input& src ) : 367 base_type(src), 368 my_body( src.my_init_body->clone() ), 369 my_init_body(src.my_init_body->clone() ) { 370 } 371 #if __INTEL_COMPILER <= 2021 372 // Suppress superfluous diagnostic about virtual keyword absence in a destructor of an inherited 373 // class while the parent class has the virtual keyword for the destrocutor. 374 virtual 375 #endif 376 ~function_input() { 377 delete my_body; 378 delete my_init_body; 379 } 380 381 template< typename Body > 382 Body copy_function_object() { 383 function_body_type &body_ref = *this->my_body; 384 return dynamic_cast< function_body_leaf<input_type, output_type, Body> & >(body_ref).get_body(); 385 } 386 387 output_type apply_body_impl( const input_type& i) { 388 // There is an extra copied needed to capture the 389 // body execution without the try_put 390 fgt_begin_body( my_body ); 391 output_type v = (*my_body)(i); 392 fgt_end_body( my_body ); 393 return v; 394 } 395 396 //TODO: consider moving into the base class 397 graph_task* apply_body_impl_bypass( const input_type &i) { 398 output_type v = apply_body_impl(i); 399 graph_task* postponed_task = NULL; 400 if( base_type::my_max_concurrency != 0 ) { 401 postponed_task = base_type::try_get_postponed_task(i); 402 __TBB_ASSERT( !postponed_task || postponed_task != SUCCESSFULLY_ENQUEUED, NULL ); 403 } 404 if( postponed_task ) { 405 // make the task available for other workers since we do not know successors' 406 // execution policy 407 spawn_in_graph_arena(base_type::graph_reference(), *postponed_task); 408 } 409 graph_task* successor_task = successors().try_put_task(v); 410 #if _MSC_VER && !__INTEL_COMPILER 411 #pragma warning (push) 412 #pragma warning (disable: 4127) /* suppress conditional expression is constant */ 413 #endif 414 if(has_policy<lightweight, Policy>::value) { 415 #if _MSC_VER && !__INTEL_COMPILER 416 #pragma warning (pop) 417 #endif 418 if(!successor_task) { 419 // Return confirmative status since current 420 // node's body has been executed anyway 421 successor_task = SUCCESSFULLY_ENQUEUED; 422 } 423 } 424 return successor_task; 425 } 426 427 protected: 428 429 void reset_function_input(reset_flags f) { 430 base_type::reset_function_input_base(f); 431 if(f & rf_reset_bodies) { 432 function_body_type *tmp = my_init_body->clone(); 433 delete my_body; 434 my_body = tmp; 435 } 436 } 437 438 function_body_type *my_body; 439 function_body_type *my_init_body; 440 virtual broadcast_cache<output_type > &successors() = 0; 441 442 }; // function_input 443 444 445 // helper templates to clear the successor edges of the output ports of an multifunction_node 446 template<int N> struct clear_element { 447 template<typename P> static void clear_this(P &p) { 448 (void)std::get<N-1>(p).successors().clear(); 449 clear_element<N-1>::clear_this(p); 450 } 451 #if TBB_USE_ASSERT 452 template<typename P> static bool this_empty(P &p) { 453 if(std::get<N-1>(p).successors().empty()) 454 return clear_element<N-1>::this_empty(p); 455 return false; 456 } 457 #endif 458 }; 459 460 template<> struct clear_element<1> { 461 template<typename P> static void clear_this(P &p) { 462 (void)std::get<0>(p).successors().clear(); 463 } 464 #if TBB_USE_ASSERT 465 template<typename P> static bool this_empty(P &p) { 466 return std::get<0>(p).successors().empty(); 467 } 468 #endif 469 }; 470 471 template <typename OutputTuple> 472 struct init_output_ports { 473 template <typename... Args> 474 static OutputTuple call(graph& g, const std::tuple<Args...>&) { 475 return OutputTuple(Args(g)...); 476 } 477 }; // struct init_output_ports 478 479 //! Implements methods for a function node that takes a type Input as input 480 // and has a tuple of output ports specified. 481 template< typename Input, typename OutputPortSet, typename Policy, typename A> 482 class multifunction_input : public function_input_base<Input, Policy, A, multifunction_input<Input,OutputPortSet,Policy,A> > { 483 public: 484 static const int N = std::tuple_size<OutputPortSet>::value; 485 typedef Input input_type; 486 typedef OutputPortSet output_ports_type; 487 typedef multifunction_body<input_type, output_ports_type> multifunction_body_type; 488 typedef multifunction_input<Input, OutputPortSet, Policy, A> my_class; 489 typedef function_input_base<Input, Policy, A, my_class> base_type; 490 typedef function_input_queue<input_type, A> input_queue_type; 491 492 // constructor 493 template<typename Body> 494 multifunction_input(graph &g, size_t max_concurrency,Body& body, node_priority_t a_priority ) 495 : base_type(g, max_concurrency, a_priority) 496 , my_body( new multifunction_body_leaf<input_type, output_ports_type, Body>(body) ) 497 , my_init_body( new multifunction_body_leaf<input_type, output_ports_type, Body>(body) ) 498 , my_output_ports(init_output_ports<output_ports_type>::call(g, my_output_ports)){ 499 } 500 501 //! Copy constructor 502 multifunction_input( const multifunction_input& src ) : 503 base_type(src), 504 my_body( src.my_init_body->clone() ), 505 my_init_body(src.my_init_body->clone() ), 506 my_output_ports( init_output_ports<output_ports_type>::call(src.my_graph_ref, my_output_ports) ) { 507 } 508 509 ~multifunction_input() { 510 delete my_body; 511 delete my_init_body; 512 } 513 514 template< typename Body > 515 Body copy_function_object() { 516 multifunction_body_type &body_ref = *this->my_body; 517 return *static_cast<Body*>(dynamic_cast< multifunction_body_leaf<input_type, output_ports_type, Body> & >(body_ref).get_body_ptr()); 518 } 519 520 // for multifunction nodes we do not have a single successor as such. So we just tell 521 // the task we were successful. 522 //TODO: consider moving common parts with implementation in function_input into separate function 523 graph_task* apply_body_impl_bypass( const input_type &i ) { 524 fgt_begin_body( my_body ); 525 (*my_body)(i, my_output_ports); 526 fgt_end_body( my_body ); 527 graph_task* ttask = NULL; 528 if(base_type::my_max_concurrency != 0) { 529 ttask = base_type::try_get_postponed_task(i); 530 } 531 return ttask ? ttask : SUCCESSFULLY_ENQUEUED; 532 } 533 534 output_ports_type &output_ports(){ return my_output_ports; } 535 536 protected: 537 538 void reset(reset_flags f) { 539 base_type::reset_function_input_base(f); 540 if(f & rf_clear_edges)clear_element<N>::clear_this(my_output_ports); 541 if(f & rf_reset_bodies) { 542 multifunction_body_type* tmp = my_init_body->clone(); 543 delete my_body; 544 my_body = tmp; 545 } 546 __TBB_ASSERT(!(f & rf_clear_edges) || clear_element<N>::this_empty(my_output_ports), "multifunction_node reset failed"); 547 } 548 549 multifunction_body_type *my_body; 550 multifunction_body_type *my_init_body; 551 output_ports_type my_output_ports; 552 553 }; // multifunction_input 554 555 // template to refer to an output port of a multifunction_node 556 template<size_t N, typename MOP> 557 typename std::tuple_element<N, typename MOP::output_ports_type>::type &output_port(MOP &op) { 558 return std::get<N>(op.output_ports()); 559 } 560 561 inline void check_task_and_spawn(graph& g, graph_task* t) { 562 if (t && t != SUCCESSFULLY_ENQUEUED) { 563 spawn_in_graph_arena(g, *t); 564 } 565 } 566 567 // helper structs for split_node 568 template<int N> 569 struct emit_element { 570 template<typename T, typename P> 571 static graph_task* emit_this(graph& g, const T &t, P &p) { 572 // TODO: consider to collect all the tasks in task_list and spawn them all at once 573 graph_task* last_task = std::get<N-1>(p).try_put_task(std::get<N-1>(t)); 574 check_task_and_spawn(g, last_task); 575 return emit_element<N-1>::emit_this(g,t,p); 576 } 577 }; 578 579 template<> 580 struct emit_element<1> { 581 template<typename T, typename P> 582 static graph_task* emit_this(graph& g, const T &t, P &p) { 583 graph_task* last_task = std::get<0>(p).try_put_task(std::get<0>(t)); 584 check_task_and_spawn(g, last_task); 585 return SUCCESSFULLY_ENQUEUED; 586 } 587 }; 588 589 //! Implements methods for an executable node that takes continue_msg as input 590 template< typename Output, typename Policy> 591 class continue_input : public continue_receiver { 592 public: 593 594 //! The input type of this receiver 595 typedef continue_msg input_type; 596 597 //! The output type of this receiver 598 typedef Output output_type; 599 typedef function_body<input_type, output_type> function_body_type; 600 typedef continue_input<output_type, Policy> class_type; 601 602 template< typename Body > 603 continue_input( graph &g, Body& body, node_priority_t a_priority ) 604 : continue_receiver(/*number_of_predecessors=*/0, a_priority) 605 , my_graph_ref(g) 606 , my_body( new function_body_leaf< input_type, output_type, Body>(body) ) 607 , my_init_body( new function_body_leaf< input_type, output_type, Body>(body) ) 608 { } 609 610 template< typename Body > 611 continue_input( graph &g, int number_of_predecessors, 612 Body& body, node_priority_t a_priority ) 613 : continue_receiver( number_of_predecessors, a_priority ) 614 , my_graph_ref(g) 615 , my_body( new function_body_leaf< input_type, output_type, Body>(body) ) 616 , my_init_body( new function_body_leaf< input_type, output_type, Body>(body) ) 617 { } 618 619 continue_input( const continue_input& src ) : continue_receiver(src), 620 my_graph_ref(src.my_graph_ref), 621 my_body( src.my_init_body->clone() ), 622 my_init_body( src.my_init_body->clone() ) {} 623 624 ~continue_input() { 625 delete my_body; 626 delete my_init_body; 627 } 628 629 template< typename Body > 630 Body copy_function_object() { 631 function_body_type &body_ref = *my_body; 632 return dynamic_cast< function_body_leaf<input_type, output_type, Body> & >(body_ref).get_body(); 633 } 634 635 void reset_receiver( reset_flags f) override { 636 continue_receiver::reset_receiver(f); 637 if(f & rf_reset_bodies) { 638 function_body_type *tmp = my_init_body->clone(); 639 delete my_body; 640 my_body = tmp; 641 } 642 } 643 644 protected: 645 646 graph& my_graph_ref; 647 function_body_type *my_body; 648 function_body_type *my_init_body; 649 650 virtual broadcast_cache<output_type > &successors() = 0; 651 652 friend class apply_body_task_bypass< class_type, continue_msg >; 653 654 //! Applies the body to the provided input 655 graph_task* apply_body_bypass( input_type ) { 656 // There is an extra copied needed to capture the 657 // body execution without the try_put 658 fgt_begin_body( my_body ); 659 output_type v = (*my_body)( continue_msg() ); 660 fgt_end_body( my_body ); 661 return successors().try_put_task( v ); 662 } 663 664 graph_task* execute() override { 665 if(!is_graph_active(my_graph_ref)) { 666 return NULL; 667 } 668 #if _MSC_VER && !__INTEL_COMPILER 669 #pragma warning (push) 670 #pragma warning (disable: 4127) /* suppress conditional expression is constant */ 671 #endif 672 if(has_policy<lightweight, Policy>::value) { 673 #if _MSC_VER && !__INTEL_COMPILER 674 #pragma warning (pop) 675 #endif 676 return apply_body_bypass( continue_msg() ); 677 } 678 else { 679 small_object_allocator allocator{}; 680 typedef apply_body_task_bypass<class_type, continue_msg> task_type; 681 graph_task* t = allocator.new_object<task_type>( graph_reference(), allocator, *this, continue_msg(), my_priority ); 682 graph_reference().reserve_wait(); 683 return t; 684 } 685 } 686 687 graph& graph_reference() const override { 688 return my_graph_ref; 689 } 690 }; // continue_input 691 692 //! Implements methods for both executable and function nodes that puts Output to its successors 693 template< typename Output > 694 class function_output : public sender<Output> { 695 public: 696 697 template<int N> friend struct clear_element; 698 typedef Output output_type; 699 typedef typename sender<output_type>::successor_type successor_type; 700 typedef broadcast_cache<output_type> broadcast_cache_type; 701 702 function_output(graph& g) : my_successors(this), my_graph_ref(g) {} 703 function_output(const function_output& other) = delete; 704 705 //! Adds a new successor to this node 706 bool register_successor( successor_type &r ) override { 707 successors().register_successor( r ); 708 return true; 709 } 710 711 //! Removes a successor from this node 712 bool remove_successor( successor_type &r ) override { 713 successors().remove_successor( r ); 714 return true; 715 } 716 717 broadcast_cache_type &successors() { return my_successors; } 718 719 graph& graph_reference() const { return my_graph_ref; } 720 protected: 721 broadcast_cache_type my_successors; 722 graph& my_graph_ref; 723 }; // function_output 724 725 template< typename Output > 726 class multifunction_output : public function_output<Output> { 727 public: 728 typedef Output output_type; 729 typedef function_output<output_type> base_type; 730 using base_type::my_successors; 731 732 multifunction_output(graph& g) : base_type(g) {} 733 multifunction_output(const multifunction_output& other) : base_type(other.my_graph_ref) {} 734 735 bool try_put(const output_type &i) { 736 graph_task *res = try_put_task(i); 737 if( !res ) return false; 738 if( res != SUCCESSFULLY_ENQUEUED ) { 739 // wrapping in task_arena::execute() is not needed since the method is called from 740 // inside task::execute() 741 spawn_in_graph_arena(graph_reference(), *res); 742 } 743 return true; 744 } 745 746 using base_type::graph_reference; 747 748 protected: 749 750 graph_task* try_put_task(const output_type &i) { 751 return my_successors.try_put_task(i); 752 } 753 754 template <int N> friend struct emit_element; 755 756 }; // multifunction_output 757 758 //composite_node 759 template<typename CompositeType> 760 void add_nodes_impl(CompositeType*, bool) {} 761 762 template< typename CompositeType, typename NodeType1, typename... NodeTypes > 763 void add_nodes_impl(CompositeType *c_node, bool visible, const NodeType1& n1, const NodeTypes&... n) { 764 void *addr = const_cast<NodeType1 *>(&n1); 765 766 fgt_alias_port(c_node, addr, visible); 767 add_nodes_impl(c_node, visible, n...); 768 } 769 770 #endif // __TBB__flow_graph_node_impl_H 771