1 /* 2 Copyright (c) 2005-2022 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #ifndef __TBB__flow_graph_join_impl_H 18 #define __TBB__flow_graph_join_impl_H 19 20 #ifndef __TBB_flow_graph_H 21 #error Do not #include this internal file directly; use public TBB headers instead. 22 #endif 23 24 // included into namespace tbb::detail::d1 25 26 struct forwarding_base : no_assign { 27 forwarding_base(graph &g) : graph_ref(g) {} 28 virtual ~forwarding_base() {} 29 graph& graph_ref; 30 }; 31 32 struct queueing_forwarding_base : forwarding_base { 33 using forwarding_base::forwarding_base; 34 // decrement_port_count may create a forwarding task. If we cannot handle the task 35 // ourselves, ask decrement_port_count to deal with it. 36 virtual graph_task* decrement_port_count(bool handle_task) = 0; 37 }; 38 39 struct reserving_forwarding_base : forwarding_base { 40 using forwarding_base::forwarding_base; 41 // decrement_port_count may create a forwarding task. If we cannot handle the task 42 // ourselves, ask decrement_port_count to deal with it. 43 virtual graph_task* decrement_port_count() = 0; 44 virtual void increment_port_count() = 0; 45 }; 46 47 // specialization that lets us keep a copy of the current_key for building results. 48 // KeyType can be a reference type. 49 template<typename KeyType> 50 struct matching_forwarding_base : public forwarding_base { 51 typedef typename std::decay<KeyType>::type current_key_type; 52 matching_forwarding_base(graph &g) : forwarding_base(g) { } 53 virtual graph_task* increment_key_count(current_key_type const & /*t*/) = 0; 54 current_key_type current_key; // so ports can refer to FE's desired items 55 }; 56 57 template< int N > 58 struct join_helper { 59 60 template< typename TupleType, typename PortType > 61 static inline void set_join_node_pointer(TupleType &my_input, PortType *port) { 62 std::get<N-1>( my_input ).set_join_node_pointer(port); 63 join_helper<N-1>::set_join_node_pointer( my_input, port ); 64 } 65 template< typename TupleType > 66 static inline void consume_reservations( TupleType &my_input ) { 67 std::get<N-1>( my_input ).consume(); 68 join_helper<N-1>::consume_reservations( my_input ); 69 } 70 71 template< typename TupleType > 72 static inline void release_my_reservation( TupleType &my_input ) { 73 std::get<N-1>( my_input ).release(); 74 } 75 76 template <typename TupleType> 77 static inline void release_reservations( TupleType &my_input) { 78 join_helper<N-1>::release_reservations(my_input); 79 release_my_reservation(my_input); 80 } 81 82 template< typename InputTuple, typename OutputTuple > 83 static inline bool reserve( InputTuple &my_input, OutputTuple &out) { 84 if ( !std::get<N-1>( my_input ).reserve( std::get<N-1>( out ) ) ) return false; 85 if ( !join_helper<N-1>::reserve( my_input, out ) ) { 86 release_my_reservation( my_input ); 87 return false; 88 } 89 return true; 90 } 91 92 template<typename InputTuple, typename OutputTuple> 93 static inline bool get_my_item( InputTuple &my_input, OutputTuple &out) { 94 bool res = std::get<N-1>(my_input).get_item(std::get<N-1>(out) ); // may fail 95 return join_helper<N-1>::get_my_item(my_input, out) && res; // do get on other inputs before returning 96 } 97 98 template<typename InputTuple, typename OutputTuple> 99 static inline bool get_items(InputTuple &my_input, OutputTuple &out) { 100 return get_my_item(my_input, out); 101 } 102 103 template<typename InputTuple> 104 static inline void reset_my_port(InputTuple &my_input) { 105 join_helper<N-1>::reset_my_port(my_input); 106 std::get<N-1>(my_input).reset_port(); 107 } 108 109 template<typename InputTuple> 110 static inline void reset_ports(InputTuple& my_input) { 111 reset_my_port(my_input); 112 } 113 114 template<typename InputTuple, typename KeyFuncTuple> 115 static inline void set_key_functors(InputTuple &my_input, KeyFuncTuple &my_key_funcs) { 116 std::get<N-1>(my_input).set_my_key_func(std::get<N-1>(my_key_funcs)); 117 std::get<N-1>(my_key_funcs) = nullptr; 118 join_helper<N-1>::set_key_functors(my_input, my_key_funcs); 119 } 120 121 template< typename KeyFuncTuple> 122 static inline void copy_key_functors(KeyFuncTuple &my_inputs, KeyFuncTuple &other_inputs) { 123 __TBB_ASSERT( 124 std::get<N-1>(other_inputs).get_my_key_func(), 125 "key matching join node should not be instantiated without functors." 126 ); 127 std::get<N-1>(my_inputs).set_my_key_func(std::get<N-1>(other_inputs).get_my_key_func()->clone()); 128 join_helper<N-1>::copy_key_functors(my_inputs, other_inputs); 129 } 130 131 template<typename InputTuple> 132 static inline void reset_inputs(InputTuple &my_input, reset_flags f) { 133 join_helper<N-1>::reset_inputs(my_input, f); 134 std::get<N-1>(my_input).reset_receiver(f); 135 } 136 }; // join_helper<N> 137 138 template< > 139 struct join_helper<1> { 140 141 template< typename TupleType, typename PortType > 142 static inline void set_join_node_pointer(TupleType &my_input, PortType *port) { 143 std::get<0>( my_input ).set_join_node_pointer(port); 144 } 145 146 template< typename TupleType > 147 static inline void consume_reservations( TupleType &my_input ) { 148 std::get<0>( my_input ).consume(); 149 } 150 151 template< typename TupleType > 152 static inline void release_my_reservation( TupleType &my_input ) { 153 std::get<0>( my_input ).release(); 154 } 155 156 template<typename TupleType> 157 static inline void release_reservations( TupleType &my_input) { 158 release_my_reservation(my_input); 159 } 160 161 template< typename InputTuple, typename OutputTuple > 162 static inline bool reserve( InputTuple &my_input, OutputTuple &out) { 163 return std::get<0>( my_input ).reserve( std::get<0>( out ) ); 164 } 165 166 template<typename InputTuple, typename OutputTuple> 167 static inline bool get_my_item( InputTuple &my_input, OutputTuple &out) { 168 return std::get<0>(my_input).get_item(std::get<0>(out)); 169 } 170 171 template<typename InputTuple, typename OutputTuple> 172 static inline bool get_items(InputTuple &my_input, OutputTuple &out) { 173 return get_my_item(my_input, out); 174 } 175 176 template<typename InputTuple> 177 static inline void reset_my_port(InputTuple &my_input) { 178 std::get<0>(my_input).reset_port(); 179 } 180 181 template<typename InputTuple> 182 static inline void reset_ports(InputTuple& my_input) { 183 reset_my_port(my_input); 184 } 185 186 template<typename InputTuple, typename KeyFuncTuple> 187 static inline void set_key_functors(InputTuple &my_input, KeyFuncTuple &my_key_funcs) { 188 std::get<0>(my_input).set_my_key_func(std::get<0>(my_key_funcs)); 189 std::get<0>(my_key_funcs) = nullptr; 190 } 191 192 template< typename KeyFuncTuple> 193 static inline void copy_key_functors(KeyFuncTuple &my_inputs, KeyFuncTuple &other_inputs) { 194 __TBB_ASSERT( 195 std::get<0>(other_inputs).get_my_key_func(), 196 "key matching join node should not be instantiated without functors." 197 ); 198 std::get<0>(my_inputs).set_my_key_func(std::get<0>(other_inputs).get_my_key_func()->clone()); 199 } 200 template<typename InputTuple> 201 static inline void reset_inputs(InputTuple &my_input, reset_flags f) { 202 std::get<0>(my_input).reset_receiver(f); 203 } 204 }; // join_helper<1> 205 206 //! The two-phase join port 207 template< typename T > 208 class reserving_port : public receiver<T> { 209 public: 210 typedef T input_type; 211 typedef typename receiver<input_type>::predecessor_type predecessor_type; 212 213 private: 214 // ----------- Aggregator ------------ 215 enum op_type { reg_pred, rem_pred, res_item, rel_res, con_res 216 }; 217 typedef reserving_port<T> class_type; 218 219 class reserving_port_operation : public aggregated_operation<reserving_port_operation> { 220 public: 221 char type; 222 union { 223 T *my_arg; 224 predecessor_type *my_pred; 225 }; 226 reserving_port_operation(const T& e, op_type t) : 227 type(char(t)), my_arg(const_cast<T*>(&e)) {} 228 reserving_port_operation(const predecessor_type &s, op_type t) : type(char(t)), 229 my_pred(const_cast<predecessor_type *>(&s)) {} 230 reserving_port_operation(op_type t) : type(char(t)) {} 231 }; 232 233 typedef aggregating_functor<class_type, reserving_port_operation> handler_type; 234 friend class aggregating_functor<class_type, reserving_port_operation>; 235 aggregator<handler_type, reserving_port_operation> my_aggregator; 236 237 void handle_operations(reserving_port_operation* op_list) { 238 reserving_port_operation *current; 239 bool was_missing_predecessors = false; 240 while(op_list) { 241 current = op_list; 242 op_list = op_list->next; 243 switch(current->type) { 244 case reg_pred: 245 was_missing_predecessors = my_predecessors.empty(); 246 my_predecessors.add(*(current->my_pred)); 247 if ( was_missing_predecessors ) { 248 (void) my_join->decrement_port_count(); // may try to forward 249 } 250 current->status.store( SUCCEEDED, std::memory_order_release); 251 break; 252 case rem_pred: 253 if ( !my_predecessors.empty() ) { 254 my_predecessors.remove(*(current->my_pred)); 255 if ( my_predecessors.empty() ) // was the last predecessor 256 my_join->increment_port_count(); 257 } 258 // TODO: consider returning failure if there were no predecessors to remove 259 current->status.store( SUCCEEDED, std::memory_order_release ); 260 break; 261 case res_item: 262 if ( reserved ) { 263 current->status.store( FAILED, std::memory_order_release); 264 } 265 else if ( my_predecessors.try_reserve( *(current->my_arg) ) ) { 266 reserved = true; 267 current->status.store( SUCCEEDED, std::memory_order_release); 268 } else { 269 if ( my_predecessors.empty() ) { 270 my_join->increment_port_count(); 271 } 272 current->status.store( FAILED, std::memory_order_release); 273 } 274 break; 275 case rel_res: 276 reserved = false; 277 my_predecessors.try_release( ); 278 current->status.store( SUCCEEDED, std::memory_order_release); 279 break; 280 case con_res: 281 reserved = false; 282 my_predecessors.try_consume( ); 283 current->status.store( SUCCEEDED, std::memory_order_release); 284 break; 285 } 286 } 287 } 288 289 protected: 290 template< typename R, typename B > friend class run_and_put_task; 291 template<typename X, typename Y> friend class broadcast_cache; 292 template<typename X, typename Y> friend class round_robin_cache; 293 graph_task* try_put_task( const T & ) override { 294 return nullptr; 295 } 296 297 graph& graph_reference() const override { 298 return my_join->graph_ref; 299 } 300 301 public: 302 303 //! Constructor 304 reserving_port() : my_join(nullptr), my_predecessors(this), reserved(false) { 305 my_aggregator.initialize_handler(handler_type(this)); 306 } 307 308 // copy constructor 309 reserving_port(const reserving_port& /* other */) = delete; 310 311 void set_join_node_pointer(reserving_forwarding_base *join) { 312 my_join = join; 313 } 314 315 //! Add a predecessor 316 bool register_predecessor( predecessor_type &src ) override { 317 reserving_port_operation op_data(src, reg_pred); 318 my_aggregator.execute(&op_data); 319 return op_data.status == SUCCEEDED; 320 } 321 322 //! Remove a predecessor 323 bool remove_predecessor( predecessor_type &src ) override { 324 reserving_port_operation op_data(src, rem_pred); 325 my_aggregator.execute(&op_data); 326 return op_data.status == SUCCEEDED; 327 } 328 329 //! Reserve an item from the port 330 bool reserve( T &v ) { 331 reserving_port_operation op_data(v, res_item); 332 my_aggregator.execute(&op_data); 333 return op_data.status == SUCCEEDED; 334 } 335 336 //! Release the port 337 void release( ) { 338 reserving_port_operation op_data(rel_res); 339 my_aggregator.execute(&op_data); 340 } 341 342 //! Complete use of the port 343 void consume( ) { 344 reserving_port_operation op_data(con_res); 345 my_aggregator.execute(&op_data); 346 } 347 348 void reset_receiver( reset_flags f) { 349 if(f & rf_clear_edges) my_predecessors.clear(); 350 else 351 my_predecessors.reset(); 352 reserved = false; 353 __TBB_ASSERT(!(f&rf_clear_edges) || my_predecessors.empty(), "port edges not removed"); 354 } 355 356 private: 357 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 358 friend class get_graph_helper; 359 #endif 360 361 reserving_forwarding_base *my_join; 362 reservable_predecessor_cache< T, null_mutex > my_predecessors; 363 bool reserved; 364 }; // reserving_port 365 366 //! queueing join_port 367 template<typename T> 368 class queueing_port : public receiver<T>, public item_buffer<T> { 369 public: 370 typedef T input_type; 371 typedef typename receiver<input_type>::predecessor_type predecessor_type; 372 typedef queueing_port<T> class_type; 373 374 // ----------- Aggregator ------------ 375 private: 376 enum op_type { get__item, res_port, try__put_task 377 }; 378 379 class queueing_port_operation : public aggregated_operation<queueing_port_operation> { 380 public: 381 char type; 382 T my_val; 383 T* my_arg; 384 graph_task* bypass_t; 385 // constructor for value parameter 386 queueing_port_operation(const T& e, op_type t) : 387 type(char(t)), my_val(e) 388 , bypass_t(nullptr) 389 {} 390 // constructor for pointer parameter 391 queueing_port_operation(const T* p, op_type t) : 392 type(char(t)), my_arg(const_cast<T*>(p)) 393 , bypass_t(nullptr) 394 {} 395 // constructor with no parameter 396 queueing_port_operation(op_type t) : type(char(t)) 397 , bypass_t(nullptr) 398 {} 399 }; 400 401 typedef aggregating_functor<class_type, queueing_port_operation> handler_type; 402 friend class aggregating_functor<class_type, queueing_port_operation>; 403 aggregator<handler_type, queueing_port_operation> my_aggregator; 404 405 void handle_operations(queueing_port_operation* op_list) { 406 queueing_port_operation *current; 407 bool was_empty; 408 while(op_list) { 409 current = op_list; 410 op_list = op_list->next; 411 switch(current->type) { 412 case try__put_task: { 413 graph_task* rtask = nullptr; 414 was_empty = this->buffer_empty(); 415 this->push_back(current->my_val); 416 if (was_empty) rtask = my_join->decrement_port_count(false); 417 else 418 rtask = SUCCESSFULLY_ENQUEUED; 419 current->bypass_t = rtask; 420 current->status.store( SUCCEEDED, std::memory_order_release); 421 } 422 break; 423 case get__item: 424 if(!this->buffer_empty()) { 425 *(current->my_arg) = this->front(); 426 current->status.store( SUCCEEDED, std::memory_order_release); 427 } 428 else { 429 current->status.store( FAILED, std::memory_order_release); 430 } 431 break; 432 case res_port: 433 __TBB_ASSERT(this->my_item_valid(this->my_head), "No item to reset"); 434 this->destroy_front(); 435 if(this->my_item_valid(this->my_head)) { 436 (void)my_join->decrement_port_count(true); 437 } 438 current->status.store( SUCCEEDED, std::memory_order_release); 439 break; 440 } 441 } 442 } 443 // ------------ End Aggregator --------------- 444 445 protected: 446 template< typename R, typename B > friend class run_and_put_task; 447 template<typename X, typename Y> friend class broadcast_cache; 448 template<typename X, typename Y> friend class round_robin_cache; 449 graph_task* try_put_task(const T &v) override { 450 queueing_port_operation op_data(v, try__put_task); 451 my_aggregator.execute(&op_data); 452 __TBB_ASSERT(op_data.status == SUCCEEDED || !op_data.bypass_t, "inconsistent return from aggregator"); 453 if(!op_data.bypass_t) return SUCCESSFULLY_ENQUEUED; 454 return op_data.bypass_t; 455 } 456 457 graph& graph_reference() const override { 458 return my_join->graph_ref; 459 } 460 461 public: 462 463 //! Constructor 464 queueing_port() : item_buffer<T>() { 465 my_join = nullptr; 466 my_aggregator.initialize_handler(handler_type(this)); 467 } 468 469 //! copy constructor 470 queueing_port(const queueing_port& /* other */) = delete; 471 472 //! record parent for tallying available items 473 void set_join_node_pointer(queueing_forwarding_base *join) { 474 my_join = join; 475 } 476 477 bool get_item( T &v ) { 478 queueing_port_operation op_data(&v, get__item); 479 my_aggregator.execute(&op_data); 480 return op_data.status == SUCCEEDED; 481 } 482 483 // reset_port is called when item is accepted by successor, but 484 // is initiated by join_node. 485 void reset_port() { 486 queueing_port_operation op_data(res_port); 487 my_aggregator.execute(&op_data); 488 return; 489 } 490 491 void reset_receiver(reset_flags) { 492 item_buffer<T>::reset(); 493 } 494 495 private: 496 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 497 friend class get_graph_helper; 498 #endif 499 500 queueing_forwarding_base *my_join; 501 }; // queueing_port 502 503 #include "_flow_graph_tagged_buffer_impl.h" 504 505 template<typename K> 506 struct count_element { 507 K my_key; 508 size_t my_value; 509 }; 510 511 // method to access the key in the counting table 512 // the ref has already been removed from K 513 template< typename K > 514 struct key_to_count_functor { 515 typedef count_element<K> table_item_type; 516 const K& operator()(const table_item_type& v) { return v.my_key; } 517 }; 518 519 // the ports can have only one template parameter. We wrap the types needed in 520 // a traits type 521 template< class TraitsType > 522 class key_matching_port : 523 public receiver<typename TraitsType::T>, 524 public hash_buffer< typename TraitsType::K, typename TraitsType::T, typename TraitsType::TtoK, 525 typename TraitsType::KHash > { 526 public: 527 typedef TraitsType traits; 528 typedef key_matching_port<traits> class_type; 529 typedef typename TraitsType::T input_type; 530 typedef typename TraitsType::K key_type; 531 typedef typename std::decay<key_type>::type noref_key_type; 532 typedef typename receiver<input_type>::predecessor_type predecessor_type; 533 typedef typename TraitsType::TtoK type_to_key_func_type; 534 typedef typename TraitsType::KHash hash_compare_type; 535 typedef hash_buffer< key_type, input_type, type_to_key_func_type, hash_compare_type > buffer_type; 536 537 private: 538 // ----------- Aggregator ------------ 539 private: 540 enum op_type { try__put, get__item, res_port 541 }; 542 543 class key_matching_port_operation : public aggregated_operation<key_matching_port_operation> { 544 public: 545 char type; 546 input_type my_val; 547 input_type *my_arg; 548 // constructor for value parameter 549 key_matching_port_operation(const input_type& e, op_type t) : 550 type(char(t)), my_val(e) {} 551 // constructor for pointer parameter 552 key_matching_port_operation(const input_type* p, op_type t) : 553 type(char(t)), my_arg(const_cast<input_type*>(p)) {} 554 // constructor with no parameter 555 key_matching_port_operation(op_type t) : type(char(t)) {} 556 }; 557 558 typedef aggregating_functor<class_type, key_matching_port_operation> handler_type; 559 friend class aggregating_functor<class_type, key_matching_port_operation>; 560 aggregator<handler_type, key_matching_port_operation> my_aggregator; 561 562 void handle_operations(key_matching_port_operation* op_list) { 563 key_matching_port_operation *current; 564 while(op_list) { 565 current = op_list; 566 op_list = op_list->next; 567 switch(current->type) { 568 case try__put: { 569 bool was_inserted = this->insert_with_key(current->my_val); 570 // return failure if a duplicate insertion occurs 571 current->status.store( was_inserted ? SUCCEEDED : FAILED, std::memory_order_release); 572 } 573 break; 574 case get__item: 575 // use current_key from FE for item 576 if(!this->find_with_key(my_join->current_key, *(current->my_arg))) { 577 __TBB_ASSERT(false, "Failed to find item corresponding to current_key."); 578 } 579 current->status.store( SUCCEEDED, std::memory_order_release); 580 break; 581 case res_port: 582 // use current_key from FE for item 583 this->delete_with_key(my_join->current_key); 584 current->status.store( SUCCEEDED, std::memory_order_release); 585 break; 586 } 587 } 588 } 589 // ------------ End Aggregator --------------- 590 protected: 591 template< typename R, typename B > friend class run_and_put_task; 592 template<typename X, typename Y> friend class broadcast_cache; 593 template<typename X, typename Y> friend class round_robin_cache; 594 graph_task* try_put_task(const input_type& v) override { 595 key_matching_port_operation op_data(v, try__put); 596 graph_task* rtask = nullptr; 597 my_aggregator.execute(&op_data); 598 if(op_data.status == SUCCEEDED) { 599 rtask = my_join->increment_key_count((*(this->get_key_func()))(v)); // may spawn 600 // rtask has to reflect the return status of the try_put 601 if(!rtask) rtask = SUCCESSFULLY_ENQUEUED; 602 } 603 return rtask; 604 } 605 606 graph& graph_reference() const override { 607 return my_join->graph_ref; 608 } 609 610 public: 611 612 key_matching_port() : receiver<input_type>(), buffer_type() { 613 my_join = nullptr; 614 my_aggregator.initialize_handler(handler_type(this)); 615 } 616 617 // copy constructor 618 key_matching_port(const key_matching_port& /*other*/) = delete; 619 #if __INTEL_COMPILER <= 2021 620 // Suppress superfluous diagnostic about virtual keyword absence in a destructor of an inherited 621 // class while the parent class has the virtual keyword for the destrocutor. 622 virtual 623 #endif 624 ~key_matching_port() { } 625 626 void set_join_node_pointer(forwarding_base *join) { 627 my_join = dynamic_cast<matching_forwarding_base<key_type>*>(join); 628 } 629 630 void set_my_key_func(type_to_key_func_type *f) { this->set_key_func(f); } 631 632 type_to_key_func_type* get_my_key_func() { return this->get_key_func(); } 633 634 bool get_item( input_type &v ) { 635 // aggregator uses current_key from FE for Key 636 key_matching_port_operation op_data(&v, get__item); 637 my_aggregator.execute(&op_data); 638 return op_data.status == SUCCEEDED; 639 } 640 641 // reset_port is called when item is accepted by successor, but 642 // is initiated by join_node. 643 void reset_port() { 644 key_matching_port_operation op_data(res_port); 645 my_aggregator.execute(&op_data); 646 return; 647 } 648 649 void reset_receiver(reset_flags ) { 650 buffer_type::reset(); 651 } 652 653 private: 654 // my_join forwarding base used to count number of inputs that 655 // received key. 656 matching_forwarding_base<key_type> *my_join; 657 }; // key_matching_port 658 659 using namespace graph_policy_namespace; 660 661 template<typename JP, typename InputTuple, typename OutputTuple> 662 class join_node_base; 663 664 //! join_node_FE : implements input port policy 665 template<typename JP, typename InputTuple, typename OutputTuple> 666 class join_node_FE; 667 668 template<typename InputTuple, typename OutputTuple> 669 class join_node_FE<reserving, InputTuple, OutputTuple> : public reserving_forwarding_base { 670 public: 671 static const int N = std::tuple_size<OutputTuple>::value; 672 typedef OutputTuple output_type; 673 typedef InputTuple input_type; 674 typedef join_node_base<reserving, InputTuple, OutputTuple> base_node_type; // for forwarding 675 676 join_node_FE(graph &g) : reserving_forwarding_base(g), my_node(nullptr) { 677 ports_with_no_inputs = N; 678 join_helper<N>::set_join_node_pointer(my_inputs, this); 679 } 680 681 join_node_FE(const join_node_FE& other) : reserving_forwarding_base((other.reserving_forwarding_base::graph_ref)), my_node(nullptr) { 682 ports_with_no_inputs = N; 683 join_helper<N>::set_join_node_pointer(my_inputs, this); 684 } 685 686 void set_my_node(base_node_type *new_my_node) { my_node = new_my_node; } 687 688 void increment_port_count() override { 689 ++ports_with_no_inputs; 690 } 691 692 // if all input_ports have predecessors, spawn forward to try and consume tuples 693 graph_task* decrement_port_count() override { 694 if(ports_with_no_inputs.fetch_sub(1) == 1) { 695 if(is_graph_active(this->graph_ref)) { 696 small_object_allocator allocator{}; 697 typedef forward_task_bypass<base_node_type> task_type; 698 graph_task* t = allocator.new_object<task_type>(graph_ref, allocator, *my_node); 699 graph_ref.reserve_wait(); 700 spawn_in_graph_arena(this->graph_ref, *t); 701 } 702 } 703 return nullptr; 704 } 705 706 input_type &input_ports() { return my_inputs; } 707 708 protected: 709 710 void reset( reset_flags f) { 711 // called outside of parallel contexts 712 ports_with_no_inputs = N; 713 join_helper<N>::reset_inputs(my_inputs, f); 714 } 715 716 // all methods on input ports should be called under mutual exclusion from join_node_base. 717 718 bool tuple_build_may_succeed() { 719 return !ports_with_no_inputs; 720 } 721 722 bool try_to_make_tuple(output_type &out) { 723 if(ports_with_no_inputs) return false; 724 return join_helper<N>::reserve(my_inputs, out); 725 } 726 727 void tuple_accepted() { 728 join_helper<N>::consume_reservations(my_inputs); 729 } 730 void tuple_rejected() { 731 join_helper<N>::release_reservations(my_inputs); 732 } 733 734 input_type my_inputs; 735 base_node_type *my_node; 736 std::atomic<std::size_t> ports_with_no_inputs; 737 }; // join_node_FE<reserving, ... > 738 739 template<typename InputTuple, typename OutputTuple> 740 class join_node_FE<queueing, InputTuple, OutputTuple> : public queueing_forwarding_base { 741 public: 742 static const int N = std::tuple_size<OutputTuple>::value; 743 typedef OutputTuple output_type; 744 typedef InputTuple input_type; 745 typedef join_node_base<queueing, InputTuple, OutputTuple> base_node_type; // for forwarding 746 747 join_node_FE(graph &g) : queueing_forwarding_base(g), my_node(nullptr) { 748 ports_with_no_items = N; 749 join_helper<N>::set_join_node_pointer(my_inputs, this); 750 } 751 752 join_node_FE(const join_node_FE& other) : queueing_forwarding_base((other.queueing_forwarding_base::graph_ref)), my_node(nullptr) { 753 ports_with_no_items = N; 754 join_helper<N>::set_join_node_pointer(my_inputs, this); 755 } 756 757 // needed for forwarding 758 void set_my_node(base_node_type *new_my_node) { my_node = new_my_node; } 759 760 void reset_port_count() { 761 ports_with_no_items = N; 762 } 763 764 // if all input_ports have items, spawn forward to try and consume tuples 765 graph_task* decrement_port_count(bool handle_task) override 766 { 767 if(ports_with_no_items.fetch_sub(1) == 1) { 768 if(is_graph_active(this->graph_ref)) { 769 small_object_allocator allocator{}; 770 typedef forward_task_bypass<base_node_type> task_type; 771 graph_task* t = allocator.new_object<task_type>(graph_ref, allocator, *my_node); 772 graph_ref.reserve_wait(); 773 if( !handle_task ) 774 return t; 775 spawn_in_graph_arena(this->graph_ref, *t); 776 } 777 } 778 return nullptr; 779 } 780 781 input_type &input_ports() { return my_inputs; } 782 783 protected: 784 785 void reset( reset_flags f) { 786 reset_port_count(); 787 join_helper<N>::reset_inputs(my_inputs, f ); 788 } 789 790 // all methods on input ports should be called under mutual exclusion from join_node_base. 791 792 bool tuple_build_may_succeed() { 793 return !ports_with_no_items; 794 } 795 796 bool try_to_make_tuple(output_type &out) { 797 if(ports_with_no_items) return false; 798 return join_helper<N>::get_items(my_inputs, out); 799 } 800 801 void tuple_accepted() { 802 reset_port_count(); 803 join_helper<N>::reset_ports(my_inputs); 804 } 805 void tuple_rejected() { 806 // nothing to do. 807 } 808 809 input_type my_inputs; 810 base_node_type *my_node; 811 std::atomic<std::size_t> ports_with_no_items; 812 }; // join_node_FE<queueing, ...> 813 814 // key_matching join front-end. 815 template<typename InputTuple, typename OutputTuple, typename K, typename KHash> 816 class join_node_FE<key_matching<K,KHash>, InputTuple, OutputTuple> : public matching_forwarding_base<K>, 817 // buffer of key value counts 818 public hash_buffer< // typedefed below to key_to_count_buffer_type 819 typename std::decay<K>::type&, // force ref type on K 820 count_element<typename std::decay<K>::type>, 821 type_to_key_function_body< 822 count_element<typename std::decay<K>::type>, 823 typename std::decay<K>::type& >, 824 KHash >, 825 // buffer of output items 826 public item_buffer<OutputTuple> { 827 public: 828 static const int N = std::tuple_size<OutputTuple>::value; 829 typedef OutputTuple output_type; 830 typedef InputTuple input_type; 831 typedef K key_type; 832 typedef typename std::decay<key_type>::type unref_key_type; 833 typedef KHash key_hash_compare; 834 // must use K without ref. 835 typedef count_element<unref_key_type> count_element_type; 836 // method that lets us refer to the key of this type. 837 typedef key_to_count_functor<unref_key_type> key_to_count_func; 838 typedef type_to_key_function_body< count_element_type, unref_key_type&> TtoK_function_body_type; 839 typedef type_to_key_function_body_leaf<count_element_type, unref_key_type&, key_to_count_func> TtoK_function_body_leaf_type; 840 // this is the type of the special table that keeps track of the number of discrete 841 // elements corresponding to each key that we've seen. 842 typedef hash_buffer< unref_key_type&, count_element_type, TtoK_function_body_type, key_hash_compare > 843 key_to_count_buffer_type; 844 typedef item_buffer<output_type> output_buffer_type; 845 typedef join_node_base<key_matching<key_type,key_hash_compare>, InputTuple, OutputTuple> base_node_type; // for forwarding 846 typedef matching_forwarding_base<key_type> forwarding_base_type; 847 848 // ----------- Aggregator ------------ 849 // the aggregator is only needed to serialize the access to the hash table. 850 // and the output_buffer_type base class 851 private: 852 enum op_type { res_count, inc_count, may_succeed, try_make }; 853 typedef join_node_FE<key_matching<key_type,key_hash_compare>, InputTuple, OutputTuple> class_type; 854 855 class key_matching_FE_operation : public aggregated_operation<key_matching_FE_operation> { 856 public: 857 char type; 858 unref_key_type my_val; 859 output_type* my_output; 860 graph_task* bypass_t; 861 // constructor for value parameter 862 key_matching_FE_operation(const unref_key_type& e , op_type t) : type(char(t)), my_val(e), 863 my_output(nullptr), bypass_t(nullptr) {} 864 key_matching_FE_operation(output_type *p, op_type t) : type(char(t)), my_output(p), bypass_t(nullptr) {} 865 // constructor with no parameter 866 key_matching_FE_operation(op_type t) : type(char(t)), my_output(nullptr), bypass_t(nullptr) {} 867 }; 868 869 typedef aggregating_functor<class_type, key_matching_FE_operation> handler_type; 870 friend class aggregating_functor<class_type, key_matching_FE_operation>; 871 aggregator<handler_type, key_matching_FE_operation> my_aggregator; 872 873 // called from aggregator, so serialized 874 // returns a task pointer if the a task would have been enqueued but we asked that 875 // it be returned. Otherwise returns nullptr. 876 graph_task* fill_output_buffer(unref_key_type &t) { 877 output_type l_out; 878 graph_task* rtask = nullptr; 879 bool do_fwd = this->buffer_empty() && is_graph_active(this->graph_ref); 880 this->current_key = t; 881 this->delete_with_key(this->current_key); // remove the key 882 if(join_helper<N>::get_items(my_inputs, l_out)) { // <== call back 883 this->push_back(l_out); 884 if(do_fwd) { // we enqueue if receiving an item from predecessor, not if successor asks for item 885 small_object_allocator allocator{}; 886 typedef forward_task_bypass<base_node_type> task_type; 887 rtask = allocator.new_object<task_type>(this->graph_ref, allocator, *my_node); 888 this->graph_ref.reserve_wait(); 889 do_fwd = false; 890 } 891 // retire the input values 892 join_helper<N>::reset_ports(my_inputs); // <== call back 893 } 894 else { 895 __TBB_ASSERT(false, "should have had something to push"); 896 } 897 return rtask; 898 } 899 900 void handle_operations(key_matching_FE_operation* op_list) { 901 key_matching_FE_operation *current; 902 while(op_list) { 903 current = op_list; 904 op_list = op_list->next; 905 switch(current->type) { 906 case res_count: // called from BE 907 { 908 this->destroy_front(); 909 current->status.store( SUCCEEDED, std::memory_order_release); 910 } 911 break; 912 case inc_count: { // called from input ports 913 count_element_type *p = nullptr; 914 unref_key_type &t = current->my_val; 915 if(!(this->find_ref_with_key(t,p))) { 916 count_element_type ev; 917 ev.my_key = t; 918 ev.my_value = 0; 919 this->insert_with_key(ev); 920 bool found = this->find_ref_with_key(t, p); 921 __TBB_ASSERT_EX(found, "should find key after inserting it"); 922 } 923 if(++(p->my_value) == size_t(N)) { 924 current->bypass_t = fill_output_buffer(t); 925 } 926 } 927 current->status.store( SUCCEEDED, std::memory_order_release); 928 break; 929 case may_succeed: // called from BE 930 current->status.store( this->buffer_empty() ? FAILED : SUCCEEDED, std::memory_order_release); 931 break; 932 case try_make: // called from BE 933 if(this->buffer_empty()) { 934 current->status.store( FAILED, std::memory_order_release); 935 } 936 else { 937 *(current->my_output) = this->front(); 938 current->status.store( SUCCEEDED, std::memory_order_release); 939 } 940 break; 941 } 942 } 943 } 944 // ------------ End Aggregator --------------- 945 946 public: 947 template<typename FunctionTuple> 948 join_node_FE(graph &g, FunctionTuple &TtoK_funcs) : forwarding_base_type(g), my_node(nullptr) { 949 join_helper<N>::set_join_node_pointer(my_inputs, this); 950 join_helper<N>::set_key_functors(my_inputs, TtoK_funcs); 951 my_aggregator.initialize_handler(handler_type(this)); 952 TtoK_function_body_type *cfb = new TtoK_function_body_leaf_type(key_to_count_func()); 953 this->set_key_func(cfb); 954 } 955 956 join_node_FE(const join_node_FE& other) : forwarding_base_type((other.forwarding_base_type::graph_ref)), key_to_count_buffer_type(), 957 output_buffer_type() { 958 my_node = nullptr; 959 join_helper<N>::set_join_node_pointer(my_inputs, this); 960 join_helper<N>::copy_key_functors(my_inputs, const_cast<input_type &>(other.my_inputs)); 961 my_aggregator.initialize_handler(handler_type(this)); 962 TtoK_function_body_type *cfb = new TtoK_function_body_leaf_type(key_to_count_func()); 963 this->set_key_func(cfb); 964 } 965 966 // needed for forwarding 967 void set_my_node(base_node_type *new_my_node) { my_node = new_my_node; } 968 969 void reset_port_count() { // called from BE 970 key_matching_FE_operation op_data(res_count); 971 my_aggregator.execute(&op_data); 972 return; 973 } 974 975 // if all input_ports have items, spawn forward to try and consume tuples 976 // return a task if we are asked and did create one. 977 graph_task *increment_key_count(unref_key_type const & t) override { // called from input_ports 978 key_matching_FE_operation op_data(t, inc_count); 979 my_aggregator.execute(&op_data); 980 return op_data.bypass_t; 981 } 982 983 input_type &input_ports() { return my_inputs; } 984 985 protected: 986 987 void reset( reset_flags f ) { 988 // called outside of parallel contexts 989 join_helper<N>::reset_inputs(my_inputs, f); 990 991 key_to_count_buffer_type::reset(); 992 output_buffer_type::reset(); 993 } 994 995 // all methods on input ports should be called under mutual exclusion from join_node_base. 996 997 bool tuple_build_may_succeed() { // called from back-end 998 key_matching_FE_operation op_data(may_succeed); 999 my_aggregator.execute(&op_data); 1000 return op_data.status == SUCCEEDED; 1001 } 1002 1003 // cannot lock while calling back to input_ports. current_key will only be set 1004 // and reset under the aggregator, so it will remain consistent. 1005 bool try_to_make_tuple(output_type &out) { 1006 key_matching_FE_operation op_data(&out,try_make); 1007 my_aggregator.execute(&op_data); 1008 return op_data.status == SUCCEEDED; 1009 } 1010 1011 void tuple_accepted() { 1012 reset_port_count(); // reset current_key after ports reset. 1013 } 1014 1015 void tuple_rejected() { 1016 // nothing to do. 1017 } 1018 1019 input_type my_inputs; // input ports 1020 base_node_type *my_node; 1021 }; // join_node_FE<key_matching<K,KHash>, InputTuple, OutputTuple> 1022 1023 //! join_node_base 1024 template<typename JP, typename InputTuple, typename OutputTuple> 1025 class join_node_base : public graph_node, public join_node_FE<JP, InputTuple, OutputTuple>, 1026 public sender<OutputTuple> { 1027 protected: 1028 using graph_node::my_graph; 1029 public: 1030 typedef OutputTuple output_type; 1031 1032 typedef typename sender<output_type>::successor_type successor_type; 1033 typedef join_node_FE<JP, InputTuple, OutputTuple> input_ports_type; 1034 using input_ports_type::tuple_build_may_succeed; 1035 using input_ports_type::try_to_make_tuple; 1036 using input_ports_type::tuple_accepted; 1037 using input_ports_type::tuple_rejected; 1038 1039 private: 1040 // ----------- Aggregator ------------ 1041 enum op_type { reg_succ, rem_succ, try__get, do_fwrd, do_fwrd_bypass 1042 }; 1043 typedef join_node_base<JP,InputTuple,OutputTuple> class_type; 1044 1045 class join_node_base_operation : public aggregated_operation<join_node_base_operation> { 1046 public: 1047 char type; 1048 union { 1049 output_type *my_arg; 1050 successor_type *my_succ; 1051 }; 1052 graph_task* bypass_t; 1053 join_node_base_operation(const output_type& e, op_type t) : type(char(t)), 1054 my_arg(const_cast<output_type*>(&e)), bypass_t(nullptr) {} 1055 join_node_base_operation(const successor_type &s, op_type t) : type(char(t)), 1056 my_succ(const_cast<successor_type *>(&s)), bypass_t(nullptr) {} 1057 join_node_base_operation(op_type t) : type(char(t)), bypass_t(nullptr) {} 1058 }; 1059 1060 typedef aggregating_functor<class_type, join_node_base_operation> handler_type; 1061 friend class aggregating_functor<class_type, join_node_base_operation>; 1062 bool forwarder_busy; 1063 aggregator<handler_type, join_node_base_operation> my_aggregator; 1064 1065 void handle_operations(join_node_base_operation* op_list) { 1066 join_node_base_operation *current; 1067 while(op_list) { 1068 current = op_list; 1069 op_list = op_list->next; 1070 switch(current->type) { 1071 case reg_succ: { 1072 my_successors.register_successor(*(current->my_succ)); 1073 if(tuple_build_may_succeed() && !forwarder_busy && is_graph_active(my_graph)) { 1074 small_object_allocator allocator{}; 1075 typedef forward_task_bypass< join_node_base<JP, InputTuple, OutputTuple> > task_type; 1076 graph_task* t = allocator.new_object<task_type>(my_graph, allocator, *this); 1077 my_graph.reserve_wait(); 1078 spawn_in_graph_arena(my_graph, *t); 1079 forwarder_busy = true; 1080 } 1081 current->status.store( SUCCEEDED, std::memory_order_release); 1082 } 1083 break; 1084 case rem_succ: 1085 my_successors.remove_successor(*(current->my_succ)); 1086 current->status.store( SUCCEEDED, std::memory_order_release); 1087 break; 1088 case try__get: 1089 if(tuple_build_may_succeed()) { 1090 if(try_to_make_tuple(*(current->my_arg))) { 1091 tuple_accepted(); 1092 current->status.store( SUCCEEDED, std::memory_order_release); 1093 } 1094 else current->status.store( FAILED, std::memory_order_release); 1095 } 1096 else current->status.store( FAILED, std::memory_order_release); 1097 break; 1098 case do_fwrd_bypass: { 1099 bool build_succeeded; 1100 graph_task *last_task = nullptr; 1101 output_type out; 1102 // forwarding must be exclusive, because try_to_make_tuple and tuple_accepted 1103 // are separate locked methods in the FE. We could conceivably fetch the front 1104 // of the FE queue, then be swapped out, have someone else consume the FE's 1105 // object, then come back, forward, and then try to remove it from the queue 1106 // again. Without reservation of the FE, the methods accessing it must be locked. 1107 // We could remember the keys of the objects we forwarded, and then remove 1108 // them from the input ports after forwarding is complete? 1109 if(tuple_build_may_succeed()) { // checks output queue of FE 1110 do { 1111 build_succeeded = try_to_make_tuple(out); // fetch front_end of queue 1112 if(build_succeeded) { 1113 graph_task *new_task = my_successors.try_put_task(out); 1114 last_task = combine_tasks(my_graph, last_task, new_task); 1115 if(new_task) { 1116 tuple_accepted(); 1117 } 1118 else { 1119 tuple_rejected(); 1120 build_succeeded = false; 1121 } 1122 } 1123 } while(build_succeeded); 1124 } 1125 current->bypass_t = last_task; 1126 current->status.store( SUCCEEDED, std::memory_order_release); 1127 forwarder_busy = false; 1128 } 1129 break; 1130 } 1131 } 1132 } 1133 // ---------- end aggregator ----------- 1134 public: 1135 join_node_base(graph &g) 1136 : graph_node(g), input_ports_type(g), forwarder_busy(false), my_successors(this) 1137 { 1138 input_ports_type::set_my_node(this); 1139 my_aggregator.initialize_handler(handler_type(this)); 1140 } 1141 1142 join_node_base(const join_node_base& other) : 1143 graph_node(other.graph_node::my_graph), input_ports_type(other), 1144 sender<OutputTuple>(), forwarder_busy(false), my_successors(this) 1145 { 1146 input_ports_type::set_my_node(this); 1147 my_aggregator.initialize_handler(handler_type(this)); 1148 } 1149 1150 template<typename FunctionTuple> 1151 join_node_base(graph &g, FunctionTuple f) 1152 : graph_node(g), input_ports_type(g, f), forwarder_busy(false), my_successors(this) 1153 { 1154 input_ports_type::set_my_node(this); 1155 my_aggregator.initialize_handler(handler_type(this)); 1156 } 1157 1158 bool register_successor(successor_type &r) override { 1159 join_node_base_operation op_data(r, reg_succ); 1160 my_aggregator.execute(&op_data); 1161 return op_data.status == SUCCEEDED; 1162 } 1163 1164 bool remove_successor( successor_type &r) override { 1165 join_node_base_operation op_data(r, rem_succ); 1166 my_aggregator.execute(&op_data); 1167 return op_data.status == SUCCEEDED; 1168 } 1169 1170 bool try_get( output_type &v) override { 1171 join_node_base_operation op_data(v, try__get); 1172 my_aggregator.execute(&op_data); 1173 return op_data.status == SUCCEEDED; 1174 } 1175 1176 protected: 1177 void reset_node(reset_flags f) override { 1178 input_ports_type::reset(f); 1179 if(f & rf_clear_edges) my_successors.clear(); 1180 } 1181 1182 private: 1183 broadcast_cache<output_type, null_rw_mutex> my_successors; 1184 1185 friend class forward_task_bypass< join_node_base<JP, InputTuple, OutputTuple> >; 1186 graph_task *forward_task() { 1187 join_node_base_operation op_data(do_fwrd_bypass); 1188 my_aggregator.execute(&op_data); 1189 return op_data.bypass_t; 1190 } 1191 1192 }; // join_node_base 1193 1194 // join base class type generator 1195 template<int N, template<class> class PT, typename OutputTuple, typename JP> 1196 struct join_base { 1197 typedef join_node_base<JP, typename wrap_tuple_elements<N,PT,OutputTuple>::type, OutputTuple> type; 1198 }; 1199 1200 template<int N, typename OutputTuple, typename K, typename KHash> 1201 struct join_base<N, key_matching_port, OutputTuple, key_matching<K,KHash> > { 1202 typedef key_matching<K, KHash> key_traits_type; 1203 typedef K key_type; 1204 typedef KHash key_hash_compare; 1205 typedef join_node_base< key_traits_type, 1206 // ports type 1207 typename wrap_key_tuple_elements<N,key_matching_port,key_traits_type,OutputTuple>::type, 1208 OutputTuple > type; 1209 }; 1210 1211 //! unfolded_join_node : passes input_ports_type to join_node_base. We build the input port type 1212 // using tuple_element. The class PT is the port type (reserving_port, queueing_port, key_matching_port) 1213 // and should match the typename. 1214 1215 template<int N, template<class> class PT, typename OutputTuple, typename JP> 1216 class unfolded_join_node : public join_base<N,PT,OutputTuple,JP>::type { 1217 public: 1218 typedef typename wrap_tuple_elements<N, PT, OutputTuple>::type input_ports_type; 1219 typedef OutputTuple output_type; 1220 private: 1221 typedef join_node_base<JP, input_ports_type, output_type > base_type; 1222 public: 1223 unfolded_join_node(graph &g) : base_type(g) {} 1224 unfolded_join_node(const unfolded_join_node &other) : base_type(other) {} 1225 }; 1226 1227 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING 1228 template <typename K, typename T> 1229 struct key_from_message_body { 1230 K operator()(const T& t) const { 1231 return key_from_message<K>(t); 1232 } 1233 }; 1234 // Adds const to reference type 1235 template <typename K, typename T> 1236 struct key_from_message_body<K&,T> { 1237 const K& operator()(const T& t) const { 1238 return key_from_message<const K&>(t); 1239 } 1240 }; 1241 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */ 1242 // key_matching unfolded_join_node. This must be a separate specialization because the constructors 1243 // differ. 1244 1245 template<typename OutputTuple, typename K, typename KHash> 1246 class unfolded_join_node<2,key_matching_port,OutputTuple,key_matching<K,KHash> > : public 1247 join_base<2,key_matching_port,OutputTuple,key_matching<K,KHash> >::type { 1248 typedef typename std::tuple_element<0, OutputTuple>::type T0; 1249 typedef typename std::tuple_element<1, OutputTuple>::type T1; 1250 public: 1251 typedef typename wrap_key_tuple_elements<2,key_matching_port,key_matching<K,KHash>,OutputTuple>::type input_ports_type; 1252 typedef OutputTuple output_type; 1253 private: 1254 typedef join_node_base<key_matching<K,KHash>, input_ports_type, output_type > base_type; 1255 typedef type_to_key_function_body<T0, K> *f0_p; 1256 typedef type_to_key_function_body<T1, K> *f1_p; 1257 typedef std::tuple< f0_p, f1_p > func_initializer_type; 1258 public: 1259 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING 1260 unfolded_join_node(graph &g) : base_type(g, 1261 func_initializer_type( 1262 new type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()), 1263 new type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()) 1264 ) ) { 1265 } 1266 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */ 1267 template<typename Body0, typename Body1> 1268 unfolded_join_node(graph &g, Body0 body0, Body1 body1) : base_type(g, 1269 func_initializer_type( 1270 new type_to_key_function_body_leaf<T0, K, Body0>(body0), 1271 new type_to_key_function_body_leaf<T1, K, Body1>(body1) 1272 ) ) { 1273 static_assert(std::tuple_size<OutputTuple>::value == 2, "wrong number of body initializers"); 1274 } 1275 unfolded_join_node(const unfolded_join_node &other) : base_type(other) {} 1276 }; 1277 1278 template<typename OutputTuple, typename K, typename KHash> 1279 class unfolded_join_node<3,key_matching_port,OutputTuple,key_matching<K,KHash> > : public 1280 join_base<3,key_matching_port,OutputTuple,key_matching<K,KHash> >::type { 1281 typedef typename std::tuple_element<0, OutputTuple>::type T0; 1282 typedef typename std::tuple_element<1, OutputTuple>::type T1; 1283 typedef typename std::tuple_element<2, OutputTuple>::type T2; 1284 public: 1285 typedef typename wrap_key_tuple_elements<3,key_matching_port,key_matching<K,KHash>,OutputTuple>::type input_ports_type; 1286 typedef OutputTuple output_type; 1287 private: 1288 typedef join_node_base<key_matching<K,KHash>, input_ports_type, output_type > base_type; 1289 typedef type_to_key_function_body<T0, K> *f0_p; 1290 typedef type_to_key_function_body<T1, K> *f1_p; 1291 typedef type_to_key_function_body<T2, K> *f2_p; 1292 typedef std::tuple< f0_p, f1_p, f2_p > func_initializer_type; 1293 public: 1294 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING 1295 unfolded_join_node(graph &g) : base_type(g, 1296 func_initializer_type( 1297 new type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()), 1298 new type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()), 1299 new type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()) 1300 ) ) { 1301 } 1302 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */ 1303 template<typename Body0, typename Body1, typename Body2> 1304 unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2) : base_type(g, 1305 func_initializer_type( 1306 new type_to_key_function_body_leaf<T0, K, Body0>(body0), 1307 new type_to_key_function_body_leaf<T1, K, Body1>(body1), 1308 new type_to_key_function_body_leaf<T2, K, Body2>(body2) 1309 ) ) { 1310 static_assert(std::tuple_size<OutputTuple>::value == 3, "wrong number of body initializers"); 1311 } 1312 unfolded_join_node(const unfolded_join_node &other) : base_type(other) {} 1313 }; 1314 1315 template<typename OutputTuple, typename K, typename KHash> 1316 class unfolded_join_node<4,key_matching_port,OutputTuple,key_matching<K,KHash> > : public 1317 join_base<4,key_matching_port,OutputTuple,key_matching<K,KHash> >::type { 1318 typedef typename std::tuple_element<0, OutputTuple>::type T0; 1319 typedef typename std::tuple_element<1, OutputTuple>::type T1; 1320 typedef typename std::tuple_element<2, OutputTuple>::type T2; 1321 typedef typename std::tuple_element<3, OutputTuple>::type T3; 1322 public: 1323 typedef typename wrap_key_tuple_elements<4,key_matching_port,key_matching<K,KHash>,OutputTuple>::type input_ports_type; 1324 typedef OutputTuple output_type; 1325 private: 1326 typedef join_node_base<key_matching<K,KHash>, input_ports_type, output_type > base_type; 1327 typedef type_to_key_function_body<T0, K> *f0_p; 1328 typedef type_to_key_function_body<T1, K> *f1_p; 1329 typedef type_to_key_function_body<T2, K> *f2_p; 1330 typedef type_to_key_function_body<T3, K> *f3_p; 1331 typedef std::tuple< f0_p, f1_p, f2_p, f3_p > func_initializer_type; 1332 public: 1333 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING 1334 unfolded_join_node(graph &g) : base_type(g, 1335 func_initializer_type( 1336 new type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()), 1337 new type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()), 1338 new type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()), 1339 new type_to_key_function_body_leaf<T3, K, key_from_message_body<K,T3> >(key_from_message_body<K,T3>()) 1340 ) ) { 1341 } 1342 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */ 1343 template<typename Body0, typename Body1, typename Body2, typename Body3> 1344 unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3) : base_type(g, 1345 func_initializer_type( 1346 new type_to_key_function_body_leaf<T0, K, Body0>(body0), 1347 new type_to_key_function_body_leaf<T1, K, Body1>(body1), 1348 new type_to_key_function_body_leaf<T2, K, Body2>(body2), 1349 new type_to_key_function_body_leaf<T3, K, Body3>(body3) 1350 ) ) { 1351 static_assert(std::tuple_size<OutputTuple>::value == 4, "wrong number of body initializers"); 1352 } 1353 unfolded_join_node(const unfolded_join_node &other) : base_type(other) {} 1354 }; 1355 1356 template<typename OutputTuple, typename K, typename KHash> 1357 class unfolded_join_node<5,key_matching_port,OutputTuple,key_matching<K,KHash> > : public 1358 join_base<5,key_matching_port,OutputTuple,key_matching<K,KHash> >::type { 1359 typedef typename std::tuple_element<0, OutputTuple>::type T0; 1360 typedef typename std::tuple_element<1, OutputTuple>::type T1; 1361 typedef typename std::tuple_element<2, OutputTuple>::type T2; 1362 typedef typename std::tuple_element<3, OutputTuple>::type T3; 1363 typedef typename std::tuple_element<4, OutputTuple>::type T4; 1364 public: 1365 typedef typename wrap_key_tuple_elements<5,key_matching_port,key_matching<K,KHash>,OutputTuple>::type input_ports_type; 1366 typedef OutputTuple output_type; 1367 private: 1368 typedef join_node_base<key_matching<K,KHash> , input_ports_type, output_type > base_type; 1369 typedef type_to_key_function_body<T0, K> *f0_p; 1370 typedef type_to_key_function_body<T1, K> *f1_p; 1371 typedef type_to_key_function_body<T2, K> *f2_p; 1372 typedef type_to_key_function_body<T3, K> *f3_p; 1373 typedef type_to_key_function_body<T4, K> *f4_p; 1374 typedef std::tuple< f0_p, f1_p, f2_p, f3_p, f4_p > func_initializer_type; 1375 public: 1376 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING 1377 unfolded_join_node(graph &g) : base_type(g, 1378 func_initializer_type( 1379 new type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()), 1380 new type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()), 1381 new type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()), 1382 new type_to_key_function_body_leaf<T3, K, key_from_message_body<K,T3> >(key_from_message_body<K,T3>()), 1383 new type_to_key_function_body_leaf<T4, K, key_from_message_body<K,T4> >(key_from_message_body<K,T4>()) 1384 ) ) { 1385 } 1386 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */ 1387 template<typename Body0, typename Body1, typename Body2, typename Body3, typename Body4> 1388 unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4) : base_type(g, 1389 func_initializer_type( 1390 new type_to_key_function_body_leaf<T0, K, Body0>(body0), 1391 new type_to_key_function_body_leaf<T1, K, Body1>(body1), 1392 new type_to_key_function_body_leaf<T2, K, Body2>(body2), 1393 new type_to_key_function_body_leaf<T3, K, Body3>(body3), 1394 new type_to_key_function_body_leaf<T4, K, Body4>(body4) 1395 ) ) { 1396 static_assert(std::tuple_size<OutputTuple>::value == 5, "wrong number of body initializers"); 1397 } 1398 unfolded_join_node(const unfolded_join_node &other) : base_type(other) {} 1399 }; 1400 1401 #if __TBB_VARIADIC_MAX >= 6 1402 template<typename OutputTuple, typename K, typename KHash> 1403 class unfolded_join_node<6,key_matching_port,OutputTuple,key_matching<K,KHash> > : public 1404 join_base<6,key_matching_port,OutputTuple,key_matching<K,KHash> >::type { 1405 typedef typename std::tuple_element<0, OutputTuple>::type T0; 1406 typedef typename std::tuple_element<1, OutputTuple>::type T1; 1407 typedef typename std::tuple_element<2, OutputTuple>::type T2; 1408 typedef typename std::tuple_element<3, OutputTuple>::type T3; 1409 typedef typename std::tuple_element<4, OutputTuple>::type T4; 1410 typedef typename std::tuple_element<5, OutputTuple>::type T5; 1411 public: 1412 typedef typename wrap_key_tuple_elements<6,key_matching_port,key_matching<K,KHash>,OutputTuple>::type input_ports_type; 1413 typedef OutputTuple output_type; 1414 private: 1415 typedef join_node_base<key_matching<K,KHash> , input_ports_type, output_type > base_type; 1416 typedef type_to_key_function_body<T0, K> *f0_p; 1417 typedef type_to_key_function_body<T1, K> *f1_p; 1418 typedef type_to_key_function_body<T2, K> *f2_p; 1419 typedef type_to_key_function_body<T3, K> *f3_p; 1420 typedef type_to_key_function_body<T4, K> *f4_p; 1421 typedef type_to_key_function_body<T5, K> *f5_p; 1422 typedef std::tuple< f0_p, f1_p, f2_p, f3_p, f4_p, f5_p > func_initializer_type; 1423 public: 1424 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING 1425 unfolded_join_node(graph &g) : base_type(g, 1426 func_initializer_type( 1427 new type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()), 1428 new type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()), 1429 new type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()), 1430 new type_to_key_function_body_leaf<T3, K, key_from_message_body<K,T3> >(key_from_message_body<K,T3>()), 1431 new type_to_key_function_body_leaf<T4, K, key_from_message_body<K,T4> >(key_from_message_body<K,T4>()), 1432 new type_to_key_function_body_leaf<T5, K, key_from_message_body<K,T5> >(key_from_message_body<K,T5>()) 1433 ) ) { 1434 } 1435 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */ 1436 template<typename Body0, typename Body1, typename Body2, typename Body3, typename Body4, typename Body5> 1437 unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4, Body5 body5) 1438 : base_type(g, func_initializer_type( 1439 new type_to_key_function_body_leaf<T0, K, Body0>(body0), 1440 new type_to_key_function_body_leaf<T1, K, Body1>(body1), 1441 new type_to_key_function_body_leaf<T2, K, Body2>(body2), 1442 new type_to_key_function_body_leaf<T3, K, Body3>(body3), 1443 new type_to_key_function_body_leaf<T4, K, Body4>(body4), 1444 new type_to_key_function_body_leaf<T5, K, Body5>(body5) 1445 ) ) { 1446 static_assert(std::tuple_size<OutputTuple>::value == 6, "wrong number of body initializers"); 1447 } 1448 unfolded_join_node(const unfolded_join_node &other) : base_type(other) {} 1449 }; 1450 #endif 1451 1452 #if __TBB_VARIADIC_MAX >= 7 1453 template<typename OutputTuple, typename K, typename KHash> 1454 class unfolded_join_node<7,key_matching_port,OutputTuple,key_matching<K,KHash> > : public 1455 join_base<7,key_matching_port,OutputTuple,key_matching<K,KHash> >::type { 1456 typedef typename std::tuple_element<0, OutputTuple>::type T0; 1457 typedef typename std::tuple_element<1, OutputTuple>::type T1; 1458 typedef typename std::tuple_element<2, OutputTuple>::type T2; 1459 typedef typename std::tuple_element<3, OutputTuple>::type T3; 1460 typedef typename std::tuple_element<4, OutputTuple>::type T4; 1461 typedef typename std::tuple_element<5, OutputTuple>::type T5; 1462 typedef typename std::tuple_element<6, OutputTuple>::type T6; 1463 public: 1464 typedef typename wrap_key_tuple_elements<7,key_matching_port,key_matching<K,KHash>,OutputTuple>::type input_ports_type; 1465 typedef OutputTuple output_type; 1466 private: 1467 typedef join_node_base<key_matching<K,KHash> , input_ports_type, output_type > base_type; 1468 typedef type_to_key_function_body<T0, K> *f0_p; 1469 typedef type_to_key_function_body<T1, K> *f1_p; 1470 typedef type_to_key_function_body<T2, K> *f2_p; 1471 typedef type_to_key_function_body<T3, K> *f3_p; 1472 typedef type_to_key_function_body<T4, K> *f4_p; 1473 typedef type_to_key_function_body<T5, K> *f5_p; 1474 typedef type_to_key_function_body<T6, K> *f6_p; 1475 typedef std::tuple< f0_p, f1_p, f2_p, f3_p, f4_p, f5_p, f6_p > func_initializer_type; 1476 public: 1477 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING 1478 unfolded_join_node(graph &g) : base_type(g, 1479 func_initializer_type( 1480 new type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()), 1481 new type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()), 1482 new type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()), 1483 new type_to_key_function_body_leaf<T3, K, key_from_message_body<K,T3> >(key_from_message_body<K,T3>()), 1484 new type_to_key_function_body_leaf<T4, K, key_from_message_body<K,T4> >(key_from_message_body<K,T4>()), 1485 new type_to_key_function_body_leaf<T5, K, key_from_message_body<K,T5> >(key_from_message_body<K,T5>()), 1486 new type_to_key_function_body_leaf<T6, K, key_from_message_body<K,T6> >(key_from_message_body<K,T6>()) 1487 ) ) { 1488 } 1489 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */ 1490 template<typename Body0, typename Body1, typename Body2, typename Body3, typename Body4, 1491 typename Body5, typename Body6> 1492 unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4, 1493 Body5 body5, Body6 body6) : base_type(g, func_initializer_type( 1494 new type_to_key_function_body_leaf<T0, K, Body0>(body0), 1495 new type_to_key_function_body_leaf<T1, K, Body1>(body1), 1496 new type_to_key_function_body_leaf<T2, K, Body2>(body2), 1497 new type_to_key_function_body_leaf<T3, K, Body3>(body3), 1498 new type_to_key_function_body_leaf<T4, K, Body4>(body4), 1499 new type_to_key_function_body_leaf<T5, K, Body5>(body5), 1500 new type_to_key_function_body_leaf<T6, K, Body6>(body6) 1501 ) ) { 1502 static_assert(std::tuple_size<OutputTuple>::value == 7, "wrong number of body initializers"); 1503 } 1504 unfolded_join_node(const unfolded_join_node &other) : base_type(other) {} 1505 }; 1506 #endif 1507 1508 #if __TBB_VARIADIC_MAX >= 8 1509 template<typename OutputTuple, typename K, typename KHash> 1510 class unfolded_join_node<8,key_matching_port,OutputTuple,key_matching<K,KHash> > : public 1511 join_base<8,key_matching_port,OutputTuple,key_matching<K,KHash> >::type { 1512 typedef typename std::tuple_element<0, OutputTuple>::type T0; 1513 typedef typename std::tuple_element<1, OutputTuple>::type T1; 1514 typedef typename std::tuple_element<2, OutputTuple>::type T2; 1515 typedef typename std::tuple_element<3, OutputTuple>::type T3; 1516 typedef typename std::tuple_element<4, OutputTuple>::type T4; 1517 typedef typename std::tuple_element<5, OutputTuple>::type T5; 1518 typedef typename std::tuple_element<6, OutputTuple>::type T6; 1519 typedef typename std::tuple_element<7, OutputTuple>::type T7; 1520 public: 1521 typedef typename wrap_key_tuple_elements<8,key_matching_port,key_matching<K,KHash>,OutputTuple>::type input_ports_type; 1522 typedef OutputTuple output_type; 1523 private: 1524 typedef join_node_base<key_matching<K,KHash> , input_ports_type, output_type > base_type; 1525 typedef type_to_key_function_body<T0, K> *f0_p; 1526 typedef type_to_key_function_body<T1, K> *f1_p; 1527 typedef type_to_key_function_body<T2, K> *f2_p; 1528 typedef type_to_key_function_body<T3, K> *f3_p; 1529 typedef type_to_key_function_body<T4, K> *f4_p; 1530 typedef type_to_key_function_body<T5, K> *f5_p; 1531 typedef type_to_key_function_body<T6, K> *f6_p; 1532 typedef type_to_key_function_body<T7, K> *f7_p; 1533 typedef std::tuple< f0_p, f1_p, f2_p, f3_p, f4_p, f5_p, f6_p, f7_p > func_initializer_type; 1534 public: 1535 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING 1536 unfolded_join_node(graph &g) : base_type(g, 1537 func_initializer_type( 1538 new type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()), 1539 new type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()), 1540 new type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()), 1541 new type_to_key_function_body_leaf<T3, K, key_from_message_body<K,T3> >(key_from_message_body<K,T3>()), 1542 new type_to_key_function_body_leaf<T4, K, key_from_message_body<K,T4> >(key_from_message_body<K,T4>()), 1543 new type_to_key_function_body_leaf<T5, K, key_from_message_body<K,T5> >(key_from_message_body<K,T5>()), 1544 new type_to_key_function_body_leaf<T6, K, key_from_message_body<K,T6> >(key_from_message_body<K,T6>()), 1545 new type_to_key_function_body_leaf<T7, K, key_from_message_body<K,T7> >(key_from_message_body<K,T7>()) 1546 ) ) { 1547 } 1548 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */ 1549 template<typename Body0, typename Body1, typename Body2, typename Body3, typename Body4, 1550 typename Body5, typename Body6, typename Body7> 1551 unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4, 1552 Body5 body5, Body6 body6, Body7 body7) : base_type(g, func_initializer_type( 1553 new type_to_key_function_body_leaf<T0, K, Body0>(body0), 1554 new type_to_key_function_body_leaf<T1, K, Body1>(body1), 1555 new type_to_key_function_body_leaf<T2, K, Body2>(body2), 1556 new type_to_key_function_body_leaf<T3, K, Body3>(body3), 1557 new type_to_key_function_body_leaf<T4, K, Body4>(body4), 1558 new type_to_key_function_body_leaf<T5, K, Body5>(body5), 1559 new type_to_key_function_body_leaf<T6, K, Body6>(body6), 1560 new type_to_key_function_body_leaf<T7, K, Body7>(body7) 1561 ) ) { 1562 static_assert(std::tuple_size<OutputTuple>::value == 8, "wrong number of body initializers"); 1563 } 1564 unfolded_join_node(const unfolded_join_node &other) : base_type(other) {} 1565 }; 1566 #endif 1567 1568 #if __TBB_VARIADIC_MAX >= 9 1569 template<typename OutputTuple, typename K, typename KHash> 1570 class unfolded_join_node<9,key_matching_port,OutputTuple,key_matching<K,KHash> > : public 1571 join_base<9,key_matching_port,OutputTuple,key_matching<K,KHash> >::type { 1572 typedef typename std::tuple_element<0, OutputTuple>::type T0; 1573 typedef typename std::tuple_element<1, OutputTuple>::type T1; 1574 typedef typename std::tuple_element<2, OutputTuple>::type T2; 1575 typedef typename std::tuple_element<3, OutputTuple>::type T3; 1576 typedef typename std::tuple_element<4, OutputTuple>::type T4; 1577 typedef typename std::tuple_element<5, OutputTuple>::type T5; 1578 typedef typename std::tuple_element<6, OutputTuple>::type T6; 1579 typedef typename std::tuple_element<7, OutputTuple>::type T7; 1580 typedef typename std::tuple_element<8, OutputTuple>::type T8; 1581 public: 1582 typedef typename wrap_key_tuple_elements<9,key_matching_port,key_matching<K,KHash>,OutputTuple>::type input_ports_type; 1583 typedef OutputTuple output_type; 1584 private: 1585 typedef join_node_base<key_matching<K,KHash> , input_ports_type, output_type > base_type; 1586 typedef type_to_key_function_body<T0, K> *f0_p; 1587 typedef type_to_key_function_body<T1, K> *f1_p; 1588 typedef type_to_key_function_body<T2, K> *f2_p; 1589 typedef type_to_key_function_body<T3, K> *f3_p; 1590 typedef type_to_key_function_body<T4, K> *f4_p; 1591 typedef type_to_key_function_body<T5, K> *f5_p; 1592 typedef type_to_key_function_body<T6, K> *f6_p; 1593 typedef type_to_key_function_body<T7, K> *f7_p; 1594 typedef type_to_key_function_body<T8, K> *f8_p; 1595 typedef std::tuple< f0_p, f1_p, f2_p, f3_p, f4_p, f5_p, f6_p, f7_p, f8_p > func_initializer_type; 1596 public: 1597 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING 1598 unfolded_join_node(graph &g) : base_type(g, 1599 func_initializer_type( 1600 new type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()), 1601 new type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()), 1602 new type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()), 1603 new type_to_key_function_body_leaf<T3, K, key_from_message_body<K,T3> >(key_from_message_body<K,T3>()), 1604 new type_to_key_function_body_leaf<T4, K, key_from_message_body<K,T4> >(key_from_message_body<K,T4>()), 1605 new type_to_key_function_body_leaf<T5, K, key_from_message_body<K,T5> >(key_from_message_body<K,T5>()), 1606 new type_to_key_function_body_leaf<T6, K, key_from_message_body<K,T6> >(key_from_message_body<K,T6>()), 1607 new type_to_key_function_body_leaf<T7, K, key_from_message_body<K,T7> >(key_from_message_body<K,T7>()), 1608 new type_to_key_function_body_leaf<T8, K, key_from_message_body<K,T8> >(key_from_message_body<K,T8>()) 1609 ) ) { 1610 } 1611 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */ 1612 template<typename Body0, typename Body1, typename Body2, typename Body3, typename Body4, 1613 typename Body5, typename Body6, typename Body7, typename Body8> 1614 unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4, 1615 Body5 body5, Body6 body6, Body7 body7, Body8 body8) : base_type(g, func_initializer_type( 1616 new type_to_key_function_body_leaf<T0, K, Body0>(body0), 1617 new type_to_key_function_body_leaf<T1, K, Body1>(body1), 1618 new type_to_key_function_body_leaf<T2, K, Body2>(body2), 1619 new type_to_key_function_body_leaf<T3, K, Body3>(body3), 1620 new type_to_key_function_body_leaf<T4, K, Body4>(body4), 1621 new type_to_key_function_body_leaf<T5, K, Body5>(body5), 1622 new type_to_key_function_body_leaf<T6, K, Body6>(body6), 1623 new type_to_key_function_body_leaf<T7, K, Body7>(body7), 1624 new type_to_key_function_body_leaf<T8, K, Body8>(body8) 1625 ) ) { 1626 static_assert(std::tuple_size<OutputTuple>::value == 9, "wrong number of body initializers"); 1627 } 1628 unfolded_join_node(const unfolded_join_node &other) : base_type(other) {} 1629 }; 1630 #endif 1631 1632 #if __TBB_VARIADIC_MAX >= 10 1633 template<typename OutputTuple, typename K, typename KHash> 1634 class unfolded_join_node<10,key_matching_port,OutputTuple,key_matching<K,KHash> > : public 1635 join_base<10,key_matching_port,OutputTuple,key_matching<K,KHash> >::type { 1636 typedef typename std::tuple_element<0, OutputTuple>::type T0; 1637 typedef typename std::tuple_element<1, OutputTuple>::type T1; 1638 typedef typename std::tuple_element<2, OutputTuple>::type T2; 1639 typedef typename std::tuple_element<3, OutputTuple>::type T3; 1640 typedef typename std::tuple_element<4, OutputTuple>::type T4; 1641 typedef typename std::tuple_element<5, OutputTuple>::type T5; 1642 typedef typename std::tuple_element<6, OutputTuple>::type T6; 1643 typedef typename std::tuple_element<7, OutputTuple>::type T7; 1644 typedef typename std::tuple_element<8, OutputTuple>::type T8; 1645 typedef typename std::tuple_element<9, OutputTuple>::type T9; 1646 public: 1647 typedef typename wrap_key_tuple_elements<10,key_matching_port,key_matching<K,KHash>,OutputTuple>::type input_ports_type; 1648 typedef OutputTuple output_type; 1649 private: 1650 typedef join_node_base<key_matching<K,KHash> , input_ports_type, output_type > base_type; 1651 typedef type_to_key_function_body<T0, K> *f0_p; 1652 typedef type_to_key_function_body<T1, K> *f1_p; 1653 typedef type_to_key_function_body<T2, K> *f2_p; 1654 typedef type_to_key_function_body<T3, K> *f3_p; 1655 typedef type_to_key_function_body<T4, K> *f4_p; 1656 typedef type_to_key_function_body<T5, K> *f5_p; 1657 typedef type_to_key_function_body<T6, K> *f6_p; 1658 typedef type_to_key_function_body<T7, K> *f7_p; 1659 typedef type_to_key_function_body<T8, K> *f8_p; 1660 typedef type_to_key_function_body<T9, K> *f9_p; 1661 typedef std::tuple< f0_p, f1_p, f2_p, f3_p, f4_p, f5_p, f6_p, f7_p, f8_p, f9_p > func_initializer_type; 1662 public: 1663 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING 1664 unfolded_join_node(graph &g) : base_type(g, 1665 func_initializer_type( 1666 new type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()), 1667 new type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()), 1668 new type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()), 1669 new type_to_key_function_body_leaf<T3, K, key_from_message_body<K,T3> >(key_from_message_body<K,T3>()), 1670 new type_to_key_function_body_leaf<T4, K, key_from_message_body<K,T4> >(key_from_message_body<K,T4>()), 1671 new type_to_key_function_body_leaf<T5, K, key_from_message_body<K,T5> >(key_from_message_body<K,T5>()), 1672 new type_to_key_function_body_leaf<T6, K, key_from_message_body<K,T6> >(key_from_message_body<K,T6>()), 1673 new type_to_key_function_body_leaf<T7, K, key_from_message_body<K,T7> >(key_from_message_body<K,T7>()), 1674 new type_to_key_function_body_leaf<T8, K, key_from_message_body<K,T8> >(key_from_message_body<K,T8>()), 1675 new type_to_key_function_body_leaf<T9, K, key_from_message_body<K,T9> >(key_from_message_body<K,T9>()) 1676 ) ) { 1677 } 1678 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */ 1679 template<typename Body0, typename Body1, typename Body2, typename Body3, typename Body4, 1680 typename Body5, typename Body6, typename Body7, typename Body8, typename Body9> 1681 unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4, 1682 Body5 body5, Body6 body6, Body7 body7, Body8 body8, Body9 body9) : base_type(g, func_initializer_type( 1683 new type_to_key_function_body_leaf<T0, K, Body0>(body0), 1684 new type_to_key_function_body_leaf<T1, K, Body1>(body1), 1685 new type_to_key_function_body_leaf<T2, K, Body2>(body2), 1686 new type_to_key_function_body_leaf<T3, K, Body3>(body3), 1687 new type_to_key_function_body_leaf<T4, K, Body4>(body4), 1688 new type_to_key_function_body_leaf<T5, K, Body5>(body5), 1689 new type_to_key_function_body_leaf<T6, K, Body6>(body6), 1690 new type_to_key_function_body_leaf<T7, K, Body7>(body7), 1691 new type_to_key_function_body_leaf<T8, K, Body8>(body8), 1692 new type_to_key_function_body_leaf<T9, K, Body9>(body9) 1693 ) ) { 1694 static_assert(std::tuple_size<OutputTuple>::value == 10, "wrong number of body initializers"); 1695 } 1696 unfolded_join_node(const unfolded_join_node &other) : base_type(other) {} 1697 }; 1698 #endif 1699 1700 //! templated function to refer to input ports of the join node 1701 template<size_t N, typename JNT> 1702 typename std::tuple_element<N, typename JNT::input_ports_type>::type &input_port(JNT &jn) { 1703 return std::get<N>(jn.input_ports()); 1704 } 1705 1706 #endif // __TBB__flow_graph_join_impl_H 1707