1 /* 2 Copyright (c) 2005-2021 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #ifndef __TBB__flow_graph_node_impl_H 18 #define __TBB__flow_graph_node_impl_H 19 20 #ifndef __TBB_flow_graph_H 21 #error Do not #include this internal file directly; use public TBB headers instead. 22 #endif 23 24 #include "_flow_graph_item_buffer_impl.h" 25 26 template< typename T, typename A > 27 class function_input_queue : public item_buffer<T,A> { 28 public: 29 bool empty() const { 30 return this->buffer_empty(); 31 } 32 33 const T& front() const { 34 return this->item_buffer<T, A>::front(); 35 } 36 37 void pop() { 38 this->destroy_front(); 39 } 40 41 bool push( T& t ) { 42 return this->push_back( t ); 43 } 44 }; 45 46 //! Input and scheduling for a function node that takes a type Input as input 47 // The only up-ref is apply_body_impl, which should implement the function 48 // call and any handling of the result. 49 template< typename Input, typename Policy, typename A, typename ImplType > 50 class function_input_base : public receiver<Input>, no_assign { 51 enum op_type {reg_pred, rem_pred, try_fwd, tryput_bypass, app_body_bypass, occupy_concurrency 52 }; 53 typedef function_input_base<Input, Policy, A, ImplType> class_type; 54 55 public: 56 57 //! The input type of this receiver 58 typedef Input input_type; 59 typedef typename receiver<input_type>::predecessor_type predecessor_type; 60 typedef predecessor_cache<input_type, null_mutex > predecessor_cache_type; 61 typedef function_input_queue<input_type, A> input_queue_type; 62 typedef typename allocator_traits<A>::template rebind_alloc<input_queue_type> allocator_type; 63 static_assert(!has_policy<queueing, Policy>::value || !has_policy<rejecting, Policy>::value, ""); 64 65 //! Constructor for function_input_base 66 function_input_base( graph &g, size_t max_concurrency, node_priority_t a_priority ) 67 : my_graph_ref(g), my_max_concurrency(max_concurrency) 68 , my_concurrency(0), my_priority(a_priority) 69 , my_queue(!has_policy<rejecting, Policy>::value ? new input_queue_type() : NULL) 70 , my_predecessors(this) 71 , forwarder_busy(false) 72 { 73 my_aggregator.initialize_handler(handler_type(this)); 74 } 75 76 //! Copy constructor 77 function_input_base( const function_input_base& src ) 78 : function_input_base(src.my_graph_ref, src.my_max_concurrency, src.my_priority) {} 79 80 //! Destructor 81 // The queue is allocated by the constructor for {multi}function_node. 82 // TODO: pass the graph_buffer_policy to the base so it can allocate the queue instead. 83 // This would be an interface-breaking change. 84 virtual ~function_input_base() { 85 if ( my_queue ) delete my_queue; 86 } 87 88 graph_task* try_put_task( const input_type& t) override { 89 return try_put_task_impl(t, has_policy<lightweight, Policy>()); 90 } 91 92 //! Adds src to the list of cached predecessors. 93 bool register_predecessor( predecessor_type &src ) override { 94 operation_type op_data(reg_pred); 95 op_data.r = &src; 96 my_aggregator.execute(&op_data); 97 return true; 98 } 99 100 //! Removes src from the list of cached predecessors. 101 bool remove_predecessor( predecessor_type &src ) override { 102 operation_type op_data(rem_pred); 103 op_data.r = &src; 104 my_aggregator.execute(&op_data); 105 return true; 106 } 107 108 protected: 109 110 void reset_function_input_base( reset_flags f) { 111 my_concurrency = 0; 112 if(my_queue) { 113 my_queue->reset(); 114 } 115 reset_receiver(f); 116 forwarder_busy = false; 117 } 118 119 graph& my_graph_ref; 120 const size_t my_max_concurrency; 121 size_t my_concurrency; 122 node_priority_t my_priority; 123 input_queue_type *my_queue; 124 predecessor_cache<input_type, null_mutex > my_predecessors; 125 126 void reset_receiver( reset_flags f) { 127 if( f & rf_clear_edges) my_predecessors.clear(); 128 else 129 my_predecessors.reset(); 130 __TBB_ASSERT(!(f & rf_clear_edges) || my_predecessors.empty(), "function_input_base reset failed"); 131 } 132 133 graph& graph_reference() const override { 134 return my_graph_ref; 135 } 136 137 graph_task* try_get_postponed_task(const input_type& i) { 138 operation_type op_data(i, app_body_bypass); // tries to pop an item or get_item 139 my_aggregator.execute(&op_data); 140 return op_data.bypass_t; 141 } 142 143 private: 144 145 friend class apply_body_task_bypass< class_type, input_type >; 146 friend class forward_task_bypass< class_type >; 147 148 class operation_type : public aggregated_operation< operation_type > { 149 public: 150 char type; 151 union { 152 input_type *elem; 153 predecessor_type *r; 154 }; 155 graph_task* bypass_t; 156 operation_type(const input_type& e, op_type t) : 157 type(char(t)), elem(const_cast<input_type*>(&e)) {} 158 operation_type(op_type t) : type(char(t)), r(NULL) {} 159 }; 160 161 bool forwarder_busy; 162 typedef aggregating_functor<class_type, operation_type> handler_type; 163 friend class aggregating_functor<class_type, operation_type>; 164 aggregator< handler_type, operation_type > my_aggregator; 165 166 graph_task* perform_queued_requests() { 167 graph_task* new_task = NULL; 168 if(my_queue) { 169 if(!my_queue->empty()) { 170 ++my_concurrency; 171 new_task = create_body_task(my_queue->front()); 172 173 my_queue->pop(); 174 } 175 } 176 else { 177 input_type i; 178 if(my_predecessors.get_item(i)) { 179 ++my_concurrency; 180 new_task = create_body_task(i); 181 } 182 } 183 return new_task; 184 } 185 void handle_operations(operation_type *op_list) { 186 operation_type* tmp; 187 while (op_list) { 188 tmp = op_list; 189 op_list = op_list->next; 190 switch (tmp->type) { 191 case reg_pred: 192 my_predecessors.add(*(tmp->r)); 193 tmp->status.store(SUCCEEDED, std::memory_order_release); 194 if (!forwarder_busy) { 195 forwarder_busy = true; 196 spawn_forward_task(); 197 } 198 break; 199 case rem_pred: 200 my_predecessors.remove(*(tmp->r)); 201 tmp->status.store(SUCCEEDED, std::memory_order_release); 202 break; 203 case app_body_bypass: { 204 tmp->bypass_t = NULL; 205 __TBB_ASSERT(my_max_concurrency != 0, NULL); 206 --my_concurrency; 207 if(my_concurrency<my_max_concurrency) 208 tmp->bypass_t = perform_queued_requests(); 209 tmp->status.store(SUCCEEDED, std::memory_order_release); 210 } 211 break; 212 case tryput_bypass: internal_try_put_task(tmp); break; 213 case try_fwd: internal_forward(tmp); break; 214 case occupy_concurrency: 215 if (my_concurrency < my_max_concurrency) { 216 ++my_concurrency; 217 tmp->status.store(SUCCEEDED, std::memory_order_release); 218 } else { 219 tmp->status.store(FAILED, std::memory_order_release); 220 } 221 break; 222 } 223 } 224 } 225 226 //! Put to the node, but return the task instead of enqueueing it 227 void internal_try_put_task(operation_type *op) { 228 __TBB_ASSERT(my_max_concurrency != 0, NULL); 229 if (my_concurrency < my_max_concurrency) { 230 ++my_concurrency; 231 graph_task * new_task = create_body_task(*(op->elem)); 232 op->bypass_t = new_task; 233 op->status.store(SUCCEEDED, std::memory_order_release); 234 } else if ( my_queue && my_queue->push(*(op->elem)) ) { 235 op->bypass_t = SUCCESSFULLY_ENQUEUED; 236 op->status.store(SUCCEEDED, std::memory_order_release); 237 } else { 238 op->bypass_t = NULL; 239 op->status.store(FAILED, std::memory_order_release); 240 } 241 } 242 243 //! Creates tasks for postponed messages if available and if concurrency allows 244 void internal_forward(operation_type *op) { 245 op->bypass_t = NULL; 246 if (my_concurrency < my_max_concurrency) 247 op->bypass_t = perform_queued_requests(); 248 if(op->bypass_t) 249 op->status.store(SUCCEEDED, std::memory_order_release); 250 else { 251 forwarder_busy = false; 252 op->status.store(FAILED, std::memory_order_release); 253 } 254 } 255 256 graph_task* internal_try_put_bypass( const input_type& t ) { 257 operation_type op_data(t, tryput_bypass); 258 my_aggregator.execute(&op_data); 259 if( op_data.status == SUCCEEDED ) { 260 return op_data.bypass_t; 261 } 262 return NULL; 263 } 264 265 graph_task* try_put_task_impl( const input_type& t, /*lightweight=*/std::true_type ) { 266 if( my_max_concurrency == 0 ) { 267 return apply_body_bypass(t); 268 } else { 269 operation_type check_op(t, occupy_concurrency); 270 my_aggregator.execute(&check_op); 271 if( check_op.status == SUCCEEDED ) { 272 return apply_body_bypass(t); 273 } 274 return internal_try_put_bypass(t); 275 } 276 } 277 278 graph_task* try_put_task_impl( const input_type& t, /*lightweight=*/std::false_type ) { 279 if( my_max_concurrency == 0 ) { 280 return create_body_task(t); 281 } else { 282 return internal_try_put_bypass(t); 283 } 284 } 285 286 //! Applies the body to the provided input 287 // then decides if more work is available 288 graph_task* apply_body_bypass( const input_type &i ) { 289 return static_cast<ImplType *>(this)->apply_body_impl_bypass(i); 290 } 291 292 //! allocates a task to apply a body 293 graph_task* create_body_task( const input_type &input ) { 294 if (!is_graph_active(my_graph_ref)) { 295 return nullptr; 296 } 297 // TODO revamp: extract helper for common graph task allocation part 298 small_object_allocator allocator{}; 299 typedef apply_body_task_bypass<class_type, input_type> task_type; 300 graph_task* t = allocator.new_object<task_type>( my_graph_ref, allocator, *this, input, my_priority ); 301 graph_reference().reserve_wait(); 302 return t; 303 } 304 305 //! This is executed by an enqueued task, the "forwarder" 306 graph_task* forward_task() { 307 operation_type op_data(try_fwd); 308 graph_task* rval = NULL; 309 do { 310 op_data.status = WAIT; 311 my_aggregator.execute(&op_data); 312 if(op_data.status == SUCCEEDED) { 313 graph_task* ttask = op_data.bypass_t; 314 __TBB_ASSERT( ttask && ttask != SUCCESSFULLY_ENQUEUED, NULL ); 315 rval = combine_tasks(my_graph_ref, rval, ttask); 316 } 317 } while (op_data.status == SUCCEEDED); 318 return rval; 319 } 320 321 inline graph_task* create_forward_task() { 322 if (!is_graph_active(my_graph_ref)) { 323 return nullptr; 324 } 325 small_object_allocator allocator{}; 326 typedef forward_task_bypass<class_type> task_type; 327 graph_task* t = allocator.new_object<task_type>( graph_reference(), allocator, *this, my_priority ); 328 graph_reference().reserve_wait(); 329 return t; 330 } 331 332 //! Spawns a task that calls forward() 333 inline void spawn_forward_task() { 334 graph_task* tp = create_forward_task(); 335 if(tp) { 336 spawn_in_graph_arena(graph_reference(), *tp); 337 } 338 } 339 340 node_priority_t priority() const override { return my_priority; } 341 }; // function_input_base 342 343 //! Implements methods for a function node that takes a type Input as input and sends 344 // a type Output to its successors. 345 template< typename Input, typename Output, typename Policy, typename A> 346 class function_input : public function_input_base<Input, Policy, A, function_input<Input,Output,Policy,A> > { 347 public: 348 typedef Input input_type; 349 typedef Output output_type; 350 typedef function_body<input_type, output_type> function_body_type; 351 typedef function_input<Input, Output, Policy,A> my_class; 352 typedef function_input_base<Input, Policy, A, my_class> base_type; 353 typedef function_input_queue<input_type, A> input_queue_type; 354 355 // constructor 356 template<typename Body> 357 function_input( 358 graph &g, size_t max_concurrency, Body& body, node_priority_t a_priority ) 359 : base_type(g, max_concurrency, a_priority) 360 , my_body( new function_body_leaf< input_type, output_type, Body>(body) ) 361 , my_init_body( new function_body_leaf< input_type, output_type, Body>(body) ) { 362 } 363 364 //! Copy constructor 365 function_input( const function_input& src ) : 366 base_type(src), 367 my_body( src.my_init_body->clone() ), 368 my_init_body(src.my_init_body->clone() ) { 369 } 370 #if __INTEL_COMPILER <= 2021 371 // Suppress superfluous diagnostic about virtual keyword absence in a destructor of an inherited 372 // class while the parent class has the virtual keyword for the destrocutor. 373 virtual 374 #endif 375 ~function_input() { 376 delete my_body; 377 delete my_init_body; 378 } 379 380 template< typename Body > 381 Body copy_function_object() { 382 function_body_type &body_ref = *this->my_body; 383 return dynamic_cast< function_body_leaf<input_type, output_type, Body> & >(body_ref).get_body(); 384 } 385 386 output_type apply_body_impl( const input_type& i) { 387 // There is an extra copied needed to capture the 388 // body execution without the try_put 389 fgt_begin_body( my_body ); 390 output_type v = (*my_body)(i); 391 fgt_end_body( my_body ); 392 return v; 393 } 394 395 //TODO: consider moving into the base class 396 graph_task* apply_body_impl_bypass( const input_type &i) { 397 output_type v = apply_body_impl(i); 398 graph_task* postponed_task = NULL; 399 if( base_type::my_max_concurrency != 0 ) { 400 postponed_task = base_type::try_get_postponed_task(i); 401 __TBB_ASSERT( !postponed_task || postponed_task != SUCCESSFULLY_ENQUEUED, NULL ); 402 } 403 if( postponed_task ) { 404 // make the task available for other workers since we do not know successors' 405 // execution policy 406 spawn_in_graph_arena(base_type::graph_reference(), *postponed_task); 407 } 408 graph_task* successor_task = successors().try_put_task(v); 409 #if _MSC_VER && !__INTEL_COMPILER 410 #pragma warning (push) 411 #pragma warning (disable: 4127) /* suppress conditional expression is constant */ 412 #endif 413 if(has_policy<lightweight, Policy>::value) { 414 #if _MSC_VER && !__INTEL_COMPILER 415 #pragma warning (pop) 416 #endif 417 if(!successor_task) { 418 // Return confirmative status since current 419 // node's body has been executed anyway 420 successor_task = SUCCESSFULLY_ENQUEUED; 421 } 422 } 423 return successor_task; 424 } 425 426 protected: 427 428 void reset_function_input(reset_flags f) { 429 base_type::reset_function_input_base(f); 430 if(f & rf_reset_bodies) { 431 function_body_type *tmp = my_init_body->clone(); 432 delete my_body; 433 my_body = tmp; 434 } 435 } 436 437 function_body_type *my_body; 438 function_body_type *my_init_body; 439 virtual broadcast_cache<output_type > &successors() = 0; 440 441 }; // function_input 442 443 444 // helper templates to clear the successor edges of the output ports of an multifunction_node 445 template<int N> struct clear_element { 446 template<typename P> static void clear_this(P &p) { 447 (void)std::get<N-1>(p).successors().clear(); 448 clear_element<N-1>::clear_this(p); 449 } 450 #if TBB_USE_ASSERT 451 template<typename P> static bool this_empty(P &p) { 452 if(std::get<N-1>(p).successors().empty()) 453 return clear_element<N-1>::this_empty(p); 454 return false; 455 } 456 #endif 457 }; 458 459 template<> struct clear_element<1> { 460 template<typename P> static void clear_this(P &p) { 461 (void)std::get<0>(p).successors().clear(); 462 } 463 #if TBB_USE_ASSERT 464 template<typename P> static bool this_empty(P &p) { 465 return std::get<0>(p).successors().empty(); 466 } 467 #endif 468 }; 469 470 template <typename OutputTuple> 471 struct init_output_ports { 472 template <typename... Args> 473 static OutputTuple call(graph& g, const std::tuple<Args...>&) { 474 return OutputTuple(Args(g)...); 475 } 476 }; // struct init_output_ports 477 478 //! Implements methods for a function node that takes a type Input as input 479 // and has a tuple of output ports specified. 480 template< typename Input, typename OutputPortSet, typename Policy, typename A> 481 class multifunction_input : public function_input_base<Input, Policy, A, multifunction_input<Input,OutputPortSet,Policy,A> > { 482 public: 483 static const int N = std::tuple_size<OutputPortSet>::value; 484 typedef Input input_type; 485 typedef OutputPortSet output_ports_type; 486 typedef multifunction_body<input_type, output_ports_type> multifunction_body_type; 487 typedef multifunction_input<Input, OutputPortSet, Policy, A> my_class; 488 typedef function_input_base<Input, Policy, A, my_class> base_type; 489 typedef function_input_queue<input_type, A> input_queue_type; 490 491 // constructor 492 template<typename Body> 493 multifunction_input(graph &g, size_t max_concurrency,Body& body, node_priority_t a_priority ) 494 : base_type(g, max_concurrency, a_priority) 495 , my_body( new multifunction_body_leaf<input_type, output_ports_type, Body>(body) ) 496 , my_init_body( new multifunction_body_leaf<input_type, output_ports_type, Body>(body) ) 497 , my_output_ports(init_output_ports<output_ports_type>::call(g, my_output_ports)){ 498 } 499 500 //! Copy constructor 501 multifunction_input( const multifunction_input& src ) : 502 base_type(src), 503 my_body( src.my_init_body->clone() ), 504 my_init_body(src.my_init_body->clone() ), 505 my_output_ports( init_output_ports<output_ports_type>::call(src.my_graph_ref, my_output_ports) ) { 506 } 507 508 ~multifunction_input() { 509 delete my_body; 510 delete my_init_body; 511 } 512 513 template< typename Body > 514 Body copy_function_object() { 515 multifunction_body_type &body_ref = *this->my_body; 516 return *static_cast<Body*>(dynamic_cast< multifunction_body_leaf<input_type, output_ports_type, Body> & >(body_ref).get_body_ptr()); 517 } 518 519 // for multifunction nodes we do not have a single successor as such. So we just tell 520 // the task we were successful. 521 //TODO: consider moving common parts with implementation in function_input into separate function 522 graph_task* apply_body_impl_bypass( const input_type &i ) { 523 fgt_begin_body( my_body ); 524 (*my_body)(i, my_output_ports); 525 fgt_end_body( my_body ); 526 graph_task* ttask = NULL; 527 if(base_type::my_max_concurrency != 0) { 528 ttask = base_type::try_get_postponed_task(i); 529 } 530 return ttask ? ttask : SUCCESSFULLY_ENQUEUED; 531 } 532 533 output_ports_type &output_ports(){ return my_output_ports; } 534 535 protected: 536 537 void reset(reset_flags f) { 538 base_type::reset_function_input_base(f); 539 if(f & rf_clear_edges)clear_element<N>::clear_this(my_output_ports); 540 if(f & rf_reset_bodies) { 541 multifunction_body_type* tmp = my_init_body->clone(); 542 delete my_body; 543 my_body = tmp; 544 } 545 __TBB_ASSERT(!(f & rf_clear_edges) || clear_element<N>::this_empty(my_output_ports), "multifunction_node reset failed"); 546 } 547 548 multifunction_body_type *my_body; 549 multifunction_body_type *my_init_body; 550 output_ports_type my_output_ports; 551 552 }; // multifunction_input 553 554 // template to refer to an output port of a multifunction_node 555 template<size_t N, typename MOP> 556 typename std::tuple_element<N, typename MOP::output_ports_type>::type &output_port(MOP &op) { 557 return std::get<N>(op.output_ports()); 558 } 559 560 inline void check_task_and_spawn(graph& g, graph_task* t) { 561 if (t && t != SUCCESSFULLY_ENQUEUED) { 562 spawn_in_graph_arena(g, *t); 563 } 564 } 565 566 // helper structs for split_node 567 template<int N> 568 struct emit_element { 569 template<typename T, typename P> 570 static graph_task* emit_this(graph& g, const T &t, P &p) { 571 // TODO: consider to collect all the tasks in task_list and spawn them all at once 572 graph_task* last_task = std::get<N-1>(p).try_put_task(std::get<N-1>(t)); 573 check_task_and_spawn(g, last_task); 574 return emit_element<N-1>::emit_this(g,t,p); 575 } 576 }; 577 578 template<> 579 struct emit_element<1> { 580 template<typename T, typename P> 581 static graph_task* emit_this(graph& g, const T &t, P &p) { 582 graph_task* last_task = std::get<0>(p).try_put_task(std::get<0>(t)); 583 check_task_and_spawn(g, last_task); 584 return SUCCESSFULLY_ENQUEUED; 585 } 586 }; 587 588 //! Implements methods for an executable node that takes continue_msg as input 589 template< typename Output, typename Policy> 590 class continue_input : public continue_receiver { 591 public: 592 593 //! The input type of this receiver 594 typedef continue_msg input_type; 595 596 //! The output type of this receiver 597 typedef Output output_type; 598 typedef function_body<input_type, output_type> function_body_type; 599 typedef continue_input<output_type, Policy> class_type; 600 601 template< typename Body > 602 continue_input( graph &g, Body& body, node_priority_t a_priority ) 603 : continue_receiver(/*number_of_predecessors=*/0, a_priority) 604 , my_graph_ref(g) 605 , my_body( new function_body_leaf< input_type, output_type, Body>(body) ) 606 , my_init_body( new function_body_leaf< input_type, output_type, Body>(body) ) 607 { } 608 609 template< typename Body > 610 continue_input( graph &g, int number_of_predecessors, 611 Body& body, node_priority_t a_priority ) 612 : continue_receiver( number_of_predecessors, a_priority ) 613 , my_graph_ref(g) 614 , my_body( new function_body_leaf< input_type, output_type, Body>(body) ) 615 , my_init_body( new function_body_leaf< input_type, output_type, Body>(body) ) 616 { } 617 618 continue_input( const continue_input& src ) : continue_receiver(src), 619 my_graph_ref(src.my_graph_ref), 620 my_body( src.my_init_body->clone() ), 621 my_init_body( src.my_init_body->clone() ) {} 622 623 ~continue_input() { 624 delete my_body; 625 delete my_init_body; 626 } 627 628 template< typename Body > 629 Body copy_function_object() { 630 function_body_type &body_ref = *my_body; 631 return dynamic_cast< function_body_leaf<input_type, output_type, Body> & >(body_ref).get_body(); 632 } 633 634 void reset_receiver( reset_flags f) override { 635 continue_receiver::reset_receiver(f); 636 if(f & rf_reset_bodies) { 637 function_body_type *tmp = my_init_body->clone(); 638 delete my_body; 639 my_body = tmp; 640 } 641 } 642 643 protected: 644 645 graph& my_graph_ref; 646 function_body_type *my_body; 647 function_body_type *my_init_body; 648 649 virtual broadcast_cache<output_type > &successors() = 0; 650 651 friend class apply_body_task_bypass< class_type, continue_msg >; 652 653 //! Applies the body to the provided input 654 graph_task* apply_body_bypass( input_type ) { 655 // There is an extra copied needed to capture the 656 // body execution without the try_put 657 fgt_begin_body( my_body ); 658 output_type v = (*my_body)( continue_msg() ); 659 fgt_end_body( my_body ); 660 return successors().try_put_task( v ); 661 } 662 663 graph_task* execute() override { 664 if(!is_graph_active(my_graph_ref)) { 665 return NULL; 666 } 667 #if _MSC_VER && !__INTEL_COMPILER 668 #pragma warning (push) 669 #pragma warning (disable: 4127) /* suppress conditional expression is constant */ 670 #endif 671 if(has_policy<lightweight, Policy>::value) { 672 #if _MSC_VER && !__INTEL_COMPILER 673 #pragma warning (pop) 674 #endif 675 return apply_body_bypass( continue_msg() ); 676 } 677 else { 678 small_object_allocator allocator{}; 679 typedef apply_body_task_bypass<class_type, continue_msg> task_type; 680 graph_task* t = allocator.new_object<task_type>( graph_reference(), allocator, *this, continue_msg(), my_priority ); 681 graph_reference().reserve_wait(); 682 return t; 683 } 684 } 685 686 graph& graph_reference() const override { 687 return my_graph_ref; 688 } 689 }; // continue_input 690 691 //! Implements methods for both executable and function nodes that puts Output to its successors 692 template< typename Output > 693 class function_output : public sender<Output> { 694 public: 695 696 template<int N> friend struct clear_element; 697 typedef Output output_type; 698 typedef typename sender<output_type>::successor_type successor_type; 699 typedef broadcast_cache<output_type> broadcast_cache_type; 700 701 function_output(graph& g) : my_successors(this), my_graph_ref(g) {} 702 function_output(const function_output& other) = delete; 703 704 //! Adds a new successor to this node 705 bool register_successor( successor_type &r ) override { 706 successors().register_successor( r ); 707 return true; 708 } 709 710 //! Removes a successor from this node 711 bool remove_successor( successor_type &r ) override { 712 successors().remove_successor( r ); 713 return true; 714 } 715 716 broadcast_cache_type &successors() { return my_successors; } 717 718 graph& graph_reference() const { return my_graph_ref; } 719 protected: 720 broadcast_cache_type my_successors; 721 graph& my_graph_ref; 722 }; // function_output 723 724 template< typename Output > 725 class multifunction_output : public function_output<Output> { 726 public: 727 typedef Output output_type; 728 typedef function_output<output_type> base_type; 729 using base_type::my_successors; 730 731 multifunction_output(graph& g) : base_type(g) {} 732 multifunction_output(const multifunction_output& other) : base_type(other.my_graph_ref) {} 733 734 bool try_put(const output_type &i) { 735 graph_task *res = try_put_task(i); 736 if( !res ) return false; 737 if( res != SUCCESSFULLY_ENQUEUED ) { 738 // wrapping in task_arena::execute() is not needed since the method is called from 739 // inside task::execute() 740 spawn_in_graph_arena(graph_reference(), *res); 741 } 742 return true; 743 } 744 745 using base_type::graph_reference; 746 747 protected: 748 749 graph_task* try_put_task(const output_type &i) { 750 return my_successors.try_put_task(i); 751 } 752 753 template <int N> friend struct emit_element; 754 755 }; // multifunction_output 756 757 //composite_node 758 template<typename CompositeType> 759 void add_nodes_impl(CompositeType*, bool) {} 760 761 template< typename CompositeType, typename NodeType1, typename... NodeTypes > 762 void add_nodes_impl(CompositeType *c_node, bool visible, const NodeType1& n1, const NodeTypes&... n) { 763 void *addr = const_cast<NodeType1 *>(&n1); 764 765 fgt_alias_port(c_node, addr, visible); 766 add_nodes_impl(c_node, visible, n...); 767 } 768 769 #endif // __TBB__flow_graph_node_impl_H 770