1 /* 2 Copyright (c) 2005-2022 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #ifndef __TBB__flow_graph_join_impl_H 18 #define __TBB__flow_graph_join_impl_H 19 20 #ifndef __TBB_flow_graph_H 21 #error Do not #include this internal file directly; use public TBB headers instead. 22 #endif 23 24 // included into namespace tbb::detail::d1 25 26 struct forwarding_base : no_assign { forwarding_baseforwarding_base27 forwarding_base(graph &g) : graph_ref(g) {} ~forwarding_baseforwarding_base28 virtual ~forwarding_base() {} 29 graph& graph_ref; 30 }; 31 32 struct queueing_forwarding_base : forwarding_base { 33 using forwarding_base::forwarding_base; 34 // decrement_port_count may create a forwarding task. If we cannot handle the task 35 // ourselves, ask decrement_port_count to deal with it. 36 virtual graph_task* decrement_port_count(bool handle_task) = 0; 37 }; 38 39 struct reserving_forwarding_base : forwarding_base { 40 using forwarding_base::forwarding_base; 41 // decrement_port_count may create a forwarding task. If we cannot handle the task 42 // ourselves, ask decrement_port_count to deal with it. 43 virtual graph_task* decrement_port_count() = 0; 44 virtual void increment_port_count() = 0; 45 }; 46 47 // specialization that lets us keep a copy of the current_key for building results. 48 // KeyType can be a reference type. 49 template<typename KeyType> 50 struct matching_forwarding_base : public forwarding_base { 51 typedef typename std::decay<KeyType>::type current_key_type; matching_forwarding_basematching_forwarding_base52 matching_forwarding_base(graph &g) : forwarding_base(g) { } 53 virtual graph_task* increment_key_count(current_key_type const & /*t*/) = 0; 54 current_key_type current_key; // so ports can refer to FE's desired items 55 }; 56 57 template< int N > 58 struct join_helper { 59 60 template< typename TupleType, typename PortType > set_join_node_pointerjoin_helper61 static inline void set_join_node_pointer(TupleType &my_input, PortType *port) { 62 std::get<N-1>( my_input ).set_join_node_pointer(port); 63 join_helper<N-1>::set_join_node_pointer( my_input, port ); 64 } 65 template< typename TupleType > consume_reservationsjoin_helper66 static inline void consume_reservations( TupleType &my_input ) { 67 std::get<N-1>( my_input ).consume(); 68 join_helper<N-1>::consume_reservations( my_input ); 69 } 70 71 template< typename TupleType > release_my_reservationjoin_helper72 static inline void release_my_reservation( TupleType &my_input ) { 73 std::get<N-1>( my_input ).release(); 74 } 75 76 template <typename TupleType> release_reservationsjoin_helper77 static inline void release_reservations( TupleType &my_input) { 78 join_helper<N-1>::release_reservations(my_input); 79 release_my_reservation(my_input); 80 } 81 82 template< typename InputTuple, typename OutputTuple > reservejoin_helper83 static inline bool reserve( InputTuple &my_input, OutputTuple &out) { 84 if ( !std::get<N-1>( my_input ).reserve( std::get<N-1>( out ) ) ) return false; 85 if ( !join_helper<N-1>::reserve( my_input, out ) ) { 86 release_my_reservation( my_input ); 87 return false; 88 } 89 return true; 90 } 91 92 template<typename InputTuple, typename OutputTuple> get_my_itemjoin_helper93 static inline bool get_my_item( InputTuple &my_input, OutputTuple &out) { 94 bool res = std::get<N-1>(my_input).get_item(std::get<N-1>(out) ); // may fail 95 return join_helper<N-1>::get_my_item(my_input, out) && res; // do get on other inputs before returning 96 } 97 98 template<typename InputTuple, typename OutputTuple> get_itemsjoin_helper99 static inline bool get_items(InputTuple &my_input, OutputTuple &out) { 100 return get_my_item(my_input, out); 101 } 102 103 template<typename InputTuple> reset_my_portjoin_helper104 static inline void reset_my_port(InputTuple &my_input) { 105 join_helper<N-1>::reset_my_port(my_input); 106 std::get<N-1>(my_input).reset_port(); 107 } 108 109 template<typename InputTuple> reset_portsjoin_helper110 static inline void reset_ports(InputTuple& my_input) { 111 reset_my_port(my_input); 112 } 113 114 template<typename InputTuple, typename KeyFuncTuple> set_key_functorsjoin_helper115 static inline void set_key_functors(InputTuple &my_input, KeyFuncTuple &my_key_funcs) { 116 std::get<N-1>(my_input).set_my_key_func(std::get<N-1>(my_key_funcs)); 117 std::get<N-1>(my_key_funcs) = nullptr; 118 join_helper<N-1>::set_key_functors(my_input, my_key_funcs); 119 } 120 121 template< typename KeyFuncTuple> copy_key_functorsjoin_helper122 static inline void copy_key_functors(KeyFuncTuple &my_inputs, KeyFuncTuple &other_inputs) { 123 __TBB_ASSERT( 124 std::get<N-1>(other_inputs).get_my_key_func(), 125 "key matching join node should not be instantiated without functors." 126 ); 127 std::get<N-1>(my_inputs).set_my_key_func(std::get<N-1>(other_inputs).get_my_key_func()->clone()); 128 join_helper<N-1>::copy_key_functors(my_inputs, other_inputs); 129 } 130 131 template<typename InputTuple> reset_inputsjoin_helper132 static inline void reset_inputs(InputTuple &my_input, reset_flags f) { 133 join_helper<N-1>::reset_inputs(my_input, f); 134 std::get<N-1>(my_input).reset_receiver(f); 135 } 136 }; // join_helper<N> 137 138 template< > 139 struct join_helper<1> { 140 141 template< typename TupleType, typename PortType > 142 static inline void set_join_node_pointer(TupleType &my_input, PortType *port) { 143 std::get<0>( my_input ).set_join_node_pointer(port); 144 } 145 146 template< typename TupleType > 147 static inline void consume_reservations( TupleType &my_input ) { 148 std::get<0>( my_input ).consume(); 149 } 150 151 template< typename TupleType > 152 static inline void release_my_reservation( TupleType &my_input ) { 153 std::get<0>( my_input ).release(); 154 } 155 156 template<typename TupleType> 157 static inline void release_reservations( TupleType &my_input) { 158 release_my_reservation(my_input); 159 } 160 161 template< typename InputTuple, typename OutputTuple > 162 static inline bool reserve( InputTuple &my_input, OutputTuple &out) { 163 return std::get<0>( my_input ).reserve( std::get<0>( out ) ); 164 } 165 166 template<typename InputTuple, typename OutputTuple> 167 static inline bool get_my_item( InputTuple &my_input, OutputTuple &out) { 168 return std::get<0>(my_input).get_item(std::get<0>(out)); 169 } 170 171 template<typename InputTuple, typename OutputTuple> 172 static inline bool get_items(InputTuple &my_input, OutputTuple &out) { 173 return get_my_item(my_input, out); 174 } 175 176 template<typename InputTuple> 177 static inline void reset_my_port(InputTuple &my_input) { 178 std::get<0>(my_input).reset_port(); 179 } 180 181 template<typename InputTuple> 182 static inline void reset_ports(InputTuple& my_input) { 183 reset_my_port(my_input); 184 } 185 186 template<typename InputTuple, typename KeyFuncTuple> 187 static inline void set_key_functors(InputTuple &my_input, KeyFuncTuple &my_key_funcs) { 188 std::get<0>(my_input).set_my_key_func(std::get<0>(my_key_funcs)); 189 std::get<0>(my_key_funcs) = nullptr; 190 } 191 192 template< typename KeyFuncTuple> 193 static inline void copy_key_functors(KeyFuncTuple &my_inputs, KeyFuncTuple &other_inputs) { 194 __TBB_ASSERT( 195 std::get<0>(other_inputs).get_my_key_func(), 196 "key matching join node should not be instantiated without functors." 197 ); 198 std::get<0>(my_inputs).set_my_key_func(std::get<0>(other_inputs).get_my_key_func()->clone()); 199 } 200 template<typename InputTuple> 201 static inline void reset_inputs(InputTuple &my_input, reset_flags f) { 202 std::get<0>(my_input).reset_receiver(f); 203 } 204 }; // join_helper<1> 205 206 //! The two-phase join port 207 template< typename T > 208 class reserving_port : public receiver<T> { 209 public: 210 typedef T input_type; 211 typedef typename receiver<input_type>::predecessor_type predecessor_type; 212 213 private: 214 // ----------- Aggregator ------------ 215 enum op_type { reg_pred, rem_pred, res_item, rel_res, con_res 216 }; 217 typedef reserving_port<T> class_type; 218 219 class reserving_port_operation : public aggregated_operation<reserving_port_operation> { 220 public: 221 char type; 222 union { 223 T *my_arg; 224 predecessor_type *my_pred; 225 }; 226 reserving_port_operation(const T& e, op_type t) : 227 type(char(t)), my_arg(const_cast<T*>(&e)) {} 228 reserving_port_operation(const predecessor_type &s, op_type t) : type(char(t)), 229 my_pred(const_cast<predecessor_type *>(&s)) {} 230 reserving_port_operation(op_type t) : type(char(t)) {} 231 }; 232 233 typedef aggregating_functor<class_type, reserving_port_operation> handler_type; 234 friend class aggregating_functor<class_type, reserving_port_operation>; 235 aggregator<handler_type, reserving_port_operation> my_aggregator; 236 237 void handle_operations(reserving_port_operation* op_list) { 238 reserving_port_operation *current; 239 bool was_missing_predecessors = false; 240 while(op_list) { 241 current = op_list; 242 op_list = op_list->next; 243 switch(current->type) { 244 case reg_pred: 245 was_missing_predecessors = my_predecessors.empty(); 246 my_predecessors.add(*(current->my_pred)); 247 if ( was_missing_predecessors ) { 248 (void) my_join->decrement_port_count(); // may try to forward 249 } 250 current->status.store( SUCCEEDED, std::memory_order_release); 251 break; 252 case rem_pred: 253 if ( !my_predecessors.empty() ) { 254 my_predecessors.remove(*(current->my_pred)); 255 if ( my_predecessors.empty() ) // was the last predecessor 256 my_join->increment_port_count(); 257 } 258 // TODO: consider returning failure if there were no predecessors to remove 259 current->status.store( SUCCEEDED, std::memory_order_release ); 260 break; 261 case res_item: 262 if ( reserved ) { 263 current->status.store( FAILED, std::memory_order_release); 264 } 265 else if ( my_predecessors.try_reserve( *(current->my_arg) ) ) { 266 reserved = true; 267 current->status.store( SUCCEEDED, std::memory_order_release); 268 } else { 269 if ( my_predecessors.empty() ) { 270 my_join->increment_port_count(); 271 } 272 current->status.store( FAILED, std::memory_order_release); 273 } 274 break; 275 case rel_res: 276 reserved = false; 277 my_predecessors.try_release( ); 278 current->status.store( SUCCEEDED, std::memory_order_release); 279 break; 280 case con_res: 281 reserved = false; 282 my_predecessors.try_consume( ); 283 current->status.store( SUCCEEDED, std::memory_order_release); 284 break; 285 } 286 } 287 } 288 289 protected: 290 template< typename R, typename B > friend class run_and_put_task; 291 template<typename X, typename Y> friend class broadcast_cache; 292 template<typename X, typename Y> friend class round_robin_cache; 293 graph_task* try_put_task( const T & ) override { 294 return nullptr; 295 } 296 297 graph& graph_reference() const override { 298 return my_join->graph_ref; 299 } 300 301 public: 302 303 //! Constructor 304 reserving_port() : my_join(nullptr), my_predecessors(this), reserved(false) { 305 my_aggregator.initialize_handler(handler_type(this)); 306 } 307 308 // copy constructor 309 reserving_port(const reserving_port& /* other */) = delete; 310 311 void set_join_node_pointer(reserving_forwarding_base *join) { 312 my_join = join; 313 } 314 315 //! Add a predecessor 316 bool register_predecessor( predecessor_type &src ) override { 317 reserving_port_operation op_data(src, reg_pred); 318 my_aggregator.execute(&op_data); 319 return op_data.status == SUCCEEDED; 320 } 321 322 //! Remove a predecessor 323 bool remove_predecessor( predecessor_type &src ) override { 324 reserving_port_operation op_data(src, rem_pred); 325 my_aggregator.execute(&op_data); 326 return op_data.status == SUCCEEDED; 327 } 328 329 //! Reserve an item from the port 330 bool reserve( T &v ) { 331 reserving_port_operation op_data(v, res_item); 332 my_aggregator.execute(&op_data); 333 return op_data.status == SUCCEEDED; 334 } 335 336 //! Release the port 337 void release( ) { 338 reserving_port_operation op_data(rel_res); 339 my_aggregator.execute(&op_data); 340 } 341 342 //! Complete use of the port 343 void consume( ) { 344 reserving_port_operation op_data(con_res); 345 my_aggregator.execute(&op_data); 346 } 347 348 void reset_receiver( reset_flags f) { 349 if(f & rf_clear_edges) my_predecessors.clear(); 350 else 351 my_predecessors.reset(); 352 reserved = false; 353 __TBB_ASSERT(!(f&rf_clear_edges) || my_predecessors.empty(), "port edges not removed"); 354 } 355 356 private: 357 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 358 friend class get_graph_helper; 359 #endif 360 361 reserving_forwarding_base *my_join; 362 reservable_predecessor_cache< T, null_mutex > my_predecessors; 363 bool reserved; 364 }; // reserving_port 365 366 //! queueing join_port 367 template<typename T> 368 class queueing_port : public receiver<T>, public item_buffer<T> { 369 public: 370 typedef T input_type; 371 typedef typename receiver<input_type>::predecessor_type predecessor_type; 372 typedef queueing_port<T> class_type; 373 374 // ----------- Aggregator ------------ 375 private: 376 enum op_type { get__item, res_port, try__put_task 377 }; 378 379 class queueing_port_operation : public aggregated_operation<queueing_port_operation> { 380 public: 381 char type; 382 T my_val; 383 T* my_arg; 384 graph_task* bypass_t; 385 // constructor for value parameter 386 queueing_port_operation(const T& e, op_type t) : 387 type(char(t)), my_val(e), my_arg(nullptr) 388 , bypass_t(nullptr) 389 {} 390 // constructor for pointer parameter 391 queueing_port_operation(const T* p, op_type t) : 392 type(char(t)), my_arg(const_cast<T*>(p)) 393 , bypass_t(nullptr) 394 {} 395 // constructor with no parameter 396 queueing_port_operation(op_type t) : type(char(t)), my_arg(nullptr) 397 , bypass_t(nullptr) 398 {} 399 }; 400 401 typedef aggregating_functor<class_type, queueing_port_operation> handler_type; 402 friend class aggregating_functor<class_type, queueing_port_operation>; 403 aggregator<handler_type, queueing_port_operation> my_aggregator; 404 405 void handle_operations(queueing_port_operation* op_list) { 406 queueing_port_operation *current; 407 bool was_empty; 408 while(op_list) { 409 current = op_list; 410 op_list = op_list->next; 411 switch(current->type) { 412 case try__put_task: { 413 graph_task* rtask = nullptr; 414 was_empty = this->buffer_empty(); 415 this->push_back(current->my_val); 416 if (was_empty) rtask = my_join->decrement_port_count(false); 417 else 418 rtask = SUCCESSFULLY_ENQUEUED; 419 current->bypass_t = rtask; 420 current->status.store( SUCCEEDED, std::memory_order_release); 421 } 422 break; 423 case get__item: 424 if(!this->buffer_empty()) { 425 __TBB_ASSERT(current->my_arg, nullptr); 426 *(current->my_arg) = this->front(); 427 current->status.store( SUCCEEDED, std::memory_order_release); 428 } 429 else { 430 current->status.store( FAILED, std::memory_order_release); 431 } 432 break; 433 case res_port: 434 __TBB_ASSERT(this->my_item_valid(this->my_head), "No item to reset"); 435 this->destroy_front(); 436 if(this->my_item_valid(this->my_head)) { 437 (void)my_join->decrement_port_count(true); 438 } 439 current->status.store( SUCCEEDED, std::memory_order_release); 440 break; 441 } 442 } 443 } 444 // ------------ End Aggregator --------------- 445 446 protected: 447 template< typename R, typename B > friend class run_and_put_task; 448 template<typename X, typename Y> friend class broadcast_cache; 449 template<typename X, typename Y> friend class round_robin_cache; 450 graph_task* try_put_task(const T &v) override { 451 queueing_port_operation op_data(v, try__put_task); 452 my_aggregator.execute(&op_data); 453 __TBB_ASSERT(op_data.status == SUCCEEDED || !op_data.bypass_t, "inconsistent return from aggregator"); 454 if(!op_data.bypass_t) return SUCCESSFULLY_ENQUEUED; 455 return op_data.bypass_t; 456 } 457 458 graph& graph_reference() const override { 459 return my_join->graph_ref; 460 } 461 462 public: 463 464 //! Constructor 465 queueing_port() : item_buffer<T>() { 466 my_join = nullptr; 467 my_aggregator.initialize_handler(handler_type(this)); 468 } 469 470 //! copy constructor 471 queueing_port(const queueing_port& /* other */) = delete; 472 473 //! record parent for tallying available items 474 void set_join_node_pointer(queueing_forwarding_base *join) { 475 my_join = join; 476 } 477 478 bool get_item( T &v ) { 479 queueing_port_operation op_data(&v, get__item); 480 my_aggregator.execute(&op_data); 481 return op_data.status == SUCCEEDED; 482 } 483 484 // reset_port is called when item is accepted by successor, but 485 // is initiated by join_node. 486 void reset_port() { 487 queueing_port_operation op_data(res_port); 488 my_aggregator.execute(&op_data); 489 return; 490 } 491 492 void reset_receiver(reset_flags) { 493 item_buffer<T>::reset(); 494 } 495 496 private: 497 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 498 friend class get_graph_helper; 499 #endif 500 501 queueing_forwarding_base *my_join; 502 }; // queueing_port 503 504 #include "_flow_graph_tagged_buffer_impl.h" 505 506 template<typename K> 507 struct count_element { 508 K my_key; 509 size_t my_value; 510 }; 511 512 // method to access the key in the counting table 513 // the ref has already been removed from K 514 template< typename K > 515 struct key_to_count_functor { 516 typedef count_element<K> table_item_type; 517 const K& operator()(const table_item_type& v) { return v.my_key; } 518 }; 519 520 // the ports can have only one template parameter. We wrap the types needed in 521 // a traits type 522 template< class TraitsType > 523 class key_matching_port : 524 public receiver<typename TraitsType::T>, 525 public hash_buffer< typename TraitsType::K, typename TraitsType::T, typename TraitsType::TtoK, 526 typename TraitsType::KHash > { 527 public: 528 typedef TraitsType traits; 529 typedef key_matching_port<traits> class_type; 530 typedef typename TraitsType::T input_type; 531 typedef typename TraitsType::K key_type; 532 typedef typename std::decay<key_type>::type noref_key_type; 533 typedef typename receiver<input_type>::predecessor_type predecessor_type; 534 typedef typename TraitsType::TtoK type_to_key_func_type; 535 typedef typename TraitsType::KHash hash_compare_type; 536 typedef hash_buffer< key_type, input_type, type_to_key_func_type, hash_compare_type > buffer_type; 537 538 private: 539 // ----------- Aggregator ------------ 540 private: 541 enum op_type { try__put, get__item, res_port 542 }; 543 544 class key_matching_port_operation : public aggregated_operation<key_matching_port_operation> { 545 public: 546 char type; 547 input_type my_val; 548 input_type *my_arg; 549 // constructor for value parameter 550 key_matching_port_operation(const input_type& e, op_type t) : 551 type(char(t)), my_val(e), my_arg(nullptr) {} 552 // constructor for pointer parameter 553 key_matching_port_operation(const input_type* p, op_type t) : 554 type(char(t)), my_arg(const_cast<input_type*>(p)) {} 555 // constructor with no parameter 556 key_matching_port_operation(op_type t) : type(char(t)), my_arg(nullptr) {} 557 }; 558 559 typedef aggregating_functor<class_type, key_matching_port_operation> handler_type; 560 friend class aggregating_functor<class_type, key_matching_port_operation>; 561 aggregator<handler_type, key_matching_port_operation> my_aggregator; 562 563 void handle_operations(key_matching_port_operation* op_list) { 564 key_matching_port_operation *current; 565 while(op_list) { 566 current = op_list; 567 op_list = op_list->next; 568 switch(current->type) { 569 case try__put: { 570 bool was_inserted = this->insert_with_key(current->my_val); 571 // return failure if a duplicate insertion occurs 572 current->status.store( was_inserted ? SUCCEEDED : FAILED, std::memory_order_release); 573 } 574 break; 575 case get__item: 576 // use current_key from FE for item 577 __TBB_ASSERT(current->my_arg, nullptr); 578 if(!this->find_with_key(my_join->current_key, *(current->my_arg))) { 579 __TBB_ASSERT(false, "Failed to find item corresponding to current_key."); 580 } 581 current->status.store( SUCCEEDED, std::memory_order_release); 582 break; 583 case res_port: 584 // use current_key from FE for item 585 this->delete_with_key(my_join->current_key); 586 current->status.store( SUCCEEDED, std::memory_order_release); 587 break; 588 } 589 } 590 } 591 // ------------ End Aggregator --------------- 592 protected: 593 template< typename R, typename B > friend class run_and_put_task; 594 template<typename X, typename Y> friend class broadcast_cache; 595 template<typename X, typename Y> friend class round_robin_cache; 596 graph_task* try_put_task(const input_type& v) override { 597 key_matching_port_operation op_data(v, try__put); 598 graph_task* rtask = nullptr; 599 my_aggregator.execute(&op_data); 600 if(op_data.status == SUCCEEDED) { 601 rtask = my_join->increment_key_count((*(this->get_key_func()))(v)); // may spawn 602 // rtask has to reflect the return status of the try_put 603 if(!rtask) rtask = SUCCESSFULLY_ENQUEUED; 604 } 605 return rtask; 606 } 607 608 graph& graph_reference() const override { 609 return my_join->graph_ref; 610 } 611 612 public: 613 614 key_matching_port() : receiver<input_type>(), buffer_type() { 615 my_join = nullptr; 616 my_aggregator.initialize_handler(handler_type(this)); 617 } 618 619 // copy constructor 620 key_matching_port(const key_matching_port& /*other*/) = delete; 621 #if __INTEL_COMPILER <= 2021 622 // Suppress superfluous diagnostic about virtual keyword absence in a destructor of an inherited 623 // class while the parent class has the virtual keyword for the destrocutor. 624 virtual 625 #endif 626 ~key_matching_port() { } 627 628 void set_join_node_pointer(forwarding_base *join) { 629 my_join = dynamic_cast<matching_forwarding_base<key_type>*>(join); 630 } 631 632 void set_my_key_func(type_to_key_func_type *f) { this->set_key_func(f); } 633 634 type_to_key_func_type* get_my_key_func() { return this->get_key_func(); } 635 636 bool get_item( input_type &v ) { 637 // aggregator uses current_key from FE for Key 638 key_matching_port_operation op_data(&v, get__item); 639 my_aggregator.execute(&op_data); 640 return op_data.status == SUCCEEDED; 641 } 642 643 // reset_port is called when item is accepted by successor, but 644 // is initiated by join_node. 645 void reset_port() { 646 key_matching_port_operation op_data(res_port); 647 my_aggregator.execute(&op_data); 648 return; 649 } 650 651 void reset_receiver(reset_flags ) { 652 buffer_type::reset(); 653 } 654 655 private: 656 // my_join forwarding base used to count number of inputs that 657 // received key. 658 matching_forwarding_base<key_type> *my_join; 659 }; // key_matching_port 660 661 using namespace graph_policy_namespace; 662 663 template<typename JP, typename InputTuple, typename OutputTuple> 664 class join_node_base; 665 666 //! join_node_FE : implements input port policy 667 template<typename JP, typename InputTuple, typename OutputTuple> 668 class join_node_FE; 669 670 template<typename InputTuple, typename OutputTuple> 671 class join_node_FE<reserving, InputTuple, OutputTuple> : public reserving_forwarding_base { 672 private: 673 static const int N = std::tuple_size<OutputTuple>::value; 674 typedef OutputTuple output_type; 675 typedef InputTuple input_type; 676 typedef join_node_base<reserving, InputTuple, OutputTuple> base_node_type; // for forwarding 677 public: 678 join_node_FE(graph &g) : reserving_forwarding_base(g), my_node(nullptr) { 679 ports_with_no_inputs = N; 680 join_helper<N>::set_join_node_pointer(my_inputs, this); 681 } 682 683 join_node_FE(const join_node_FE& other) : reserving_forwarding_base((other.reserving_forwarding_base::graph_ref)), my_node(nullptr) { 684 ports_with_no_inputs = N; 685 join_helper<N>::set_join_node_pointer(my_inputs, this); 686 } 687 688 void set_my_node(base_node_type *new_my_node) { my_node = new_my_node; } 689 690 void increment_port_count() override { 691 ++ports_with_no_inputs; 692 } 693 694 // if all input_ports have predecessors, spawn forward to try and consume tuples 695 graph_task* decrement_port_count() override { 696 if(ports_with_no_inputs.fetch_sub(1) == 1) { 697 if(is_graph_active(this->graph_ref)) { 698 small_object_allocator allocator{}; 699 typedef forward_task_bypass<base_node_type> task_type; 700 graph_task* t = allocator.new_object<task_type>(graph_ref, allocator, *my_node); 701 graph_ref.reserve_wait(); 702 spawn_in_graph_arena(this->graph_ref, *t); 703 } 704 } 705 return nullptr; 706 } 707 708 input_type &input_ports() { return my_inputs; } 709 710 protected: 711 712 void reset( reset_flags f) { 713 // called outside of parallel contexts 714 ports_with_no_inputs = N; 715 join_helper<N>::reset_inputs(my_inputs, f); 716 } 717 718 // all methods on input ports should be called under mutual exclusion from join_node_base. 719 720 bool tuple_build_may_succeed() { 721 return !ports_with_no_inputs; 722 } 723 724 bool try_to_make_tuple(output_type &out) { 725 if(ports_with_no_inputs) return false; 726 return join_helper<N>::reserve(my_inputs, out); 727 } 728 729 void tuple_accepted() { 730 join_helper<N>::consume_reservations(my_inputs); 731 } 732 void tuple_rejected() { 733 join_helper<N>::release_reservations(my_inputs); 734 } 735 736 input_type my_inputs; 737 base_node_type *my_node; 738 std::atomic<std::size_t> ports_with_no_inputs; 739 }; // join_node_FE<reserving, ... > 740 741 template<typename InputTuple, typename OutputTuple> 742 class join_node_FE<queueing, InputTuple, OutputTuple> : public queueing_forwarding_base { 743 public: 744 static const int N = std::tuple_size<OutputTuple>::value; 745 typedef OutputTuple output_type; 746 typedef InputTuple input_type; 747 typedef join_node_base<queueing, InputTuple, OutputTuple> base_node_type; // for forwarding 748 749 join_node_FE(graph &g) : queueing_forwarding_base(g), my_node(nullptr) { 750 ports_with_no_items = N; 751 join_helper<N>::set_join_node_pointer(my_inputs, this); 752 } 753 754 join_node_FE(const join_node_FE& other) : queueing_forwarding_base((other.queueing_forwarding_base::graph_ref)), my_node(nullptr) { 755 ports_with_no_items = N; 756 join_helper<N>::set_join_node_pointer(my_inputs, this); 757 } 758 759 // needed for forwarding 760 void set_my_node(base_node_type *new_my_node) { my_node = new_my_node; } 761 762 void reset_port_count() { 763 ports_with_no_items = N; 764 } 765 766 // if all input_ports have items, spawn forward to try and consume tuples 767 graph_task* decrement_port_count(bool handle_task) override 768 { 769 if(ports_with_no_items.fetch_sub(1) == 1) { 770 if(is_graph_active(this->graph_ref)) { 771 small_object_allocator allocator{}; 772 typedef forward_task_bypass<base_node_type> task_type; 773 graph_task* t = allocator.new_object<task_type>(graph_ref, allocator, *my_node); 774 graph_ref.reserve_wait(); 775 if( !handle_task ) 776 return t; 777 spawn_in_graph_arena(this->graph_ref, *t); 778 } 779 } 780 return nullptr; 781 } 782 783 input_type &input_ports() { return my_inputs; } 784 785 protected: 786 787 void reset( reset_flags f) { 788 reset_port_count(); 789 join_helper<N>::reset_inputs(my_inputs, f ); 790 } 791 792 // all methods on input ports should be called under mutual exclusion from join_node_base. 793 794 bool tuple_build_may_succeed() { 795 return !ports_with_no_items; 796 } 797 798 bool try_to_make_tuple(output_type &out) { 799 if(ports_with_no_items) return false; 800 return join_helper<N>::get_items(my_inputs, out); 801 } 802 803 void tuple_accepted() { 804 reset_port_count(); 805 join_helper<N>::reset_ports(my_inputs); 806 } 807 void tuple_rejected() { 808 // nothing to do. 809 } 810 811 input_type my_inputs; 812 base_node_type *my_node; 813 std::atomic<std::size_t> ports_with_no_items; 814 }; // join_node_FE<queueing, ...> 815 816 // key_matching join front-end. 817 template<typename InputTuple, typename OutputTuple, typename K, typename KHash> 818 class join_node_FE<key_matching<K,KHash>, InputTuple, OutputTuple> : public matching_forwarding_base<K>, 819 // buffer of key value counts 820 public hash_buffer< // typedefed below to key_to_count_buffer_type 821 typename std::decay<K>::type&, // force ref type on K 822 count_element<typename std::decay<K>::type>, 823 type_to_key_function_body< 824 count_element<typename std::decay<K>::type>, 825 typename std::decay<K>::type& >, 826 KHash >, 827 // buffer of output items 828 public item_buffer<OutputTuple> { 829 public: 830 static const int N = std::tuple_size<OutputTuple>::value; 831 typedef OutputTuple output_type; 832 typedef InputTuple input_type; 833 typedef K key_type; 834 typedef typename std::decay<key_type>::type unref_key_type; 835 typedef KHash key_hash_compare; 836 // must use K without ref. 837 typedef count_element<unref_key_type> count_element_type; 838 // method that lets us refer to the key of this type. 839 typedef key_to_count_functor<unref_key_type> key_to_count_func; 840 typedef type_to_key_function_body< count_element_type, unref_key_type&> TtoK_function_body_type; 841 typedef type_to_key_function_body_leaf<count_element_type, unref_key_type&, key_to_count_func> TtoK_function_body_leaf_type; 842 // this is the type of the special table that keeps track of the number of discrete 843 // elements corresponding to each key that we've seen. 844 typedef hash_buffer< unref_key_type&, count_element_type, TtoK_function_body_type, key_hash_compare > 845 key_to_count_buffer_type; 846 typedef item_buffer<output_type> output_buffer_type; 847 typedef join_node_base<key_matching<key_type,key_hash_compare>, InputTuple, OutputTuple> base_node_type; // for forwarding 848 typedef matching_forwarding_base<key_type> forwarding_base_type; 849 850 // ----------- Aggregator ------------ 851 // the aggregator is only needed to serialize the access to the hash table. 852 // and the output_buffer_type base class 853 private: 854 enum op_type { res_count, inc_count, may_succeed, try_make }; 855 typedef join_node_FE<key_matching<key_type,key_hash_compare>, InputTuple, OutputTuple> class_type; 856 857 class key_matching_FE_operation : public aggregated_operation<key_matching_FE_operation> { 858 public: 859 char type; 860 unref_key_type my_val; 861 output_type* my_output; 862 graph_task* bypass_t; 863 // constructor for value parameter 864 key_matching_FE_operation(const unref_key_type& e , op_type t) : type(char(t)), my_val(e), 865 my_output(nullptr), bypass_t(nullptr) {} 866 key_matching_FE_operation(output_type *p, op_type t) : type(char(t)), my_output(p), bypass_t(nullptr) {} 867 // constructor with no parameter 868 key_matching_FE_operation(op_type t) : type(char(t)), my_output(nullptr), bypass_t(nullptr) {} 869 }; 870 871 typedef aggregating_functor<class_type, key_matching_FE_operation> handler_type; 872 friend class aggregating_functor<class_type, key_matching_FE_operation>; 873 aggregator<handler_type, key_matching_FE_operation> my_aggregator; 874 875 // called from aggregator, so serialized 876 // returns a task pointer if the a task would have been enqueued but we asked that 877 // it be returned. Otherwise returns nullptr. 878 graph_task* fill_output_buffer(unref_key_type &t) { 879 output_type l_out; 880 graph_task* rtask = nullptr; 881 bool do_fwd = this->buffer_empty() && is_graph_active(this->graph_ref); 882 this->current_key = t; 883 this->delete_with_key(this->current_key); // remove the key 884 if(join_helper<N>::get_items(my_inputs, l_out)) { // <== call back 885 this->push_back(l_out); 886 if(do_fwd) { // we enqueue if receiving an item from predecessor, not if successor asks for item 887 small_object_allocator allocator{}; 888 typedef forward_task_bypass<base_node_type> task_type; 889 rtask = allocator.new_object<task_type>(this->graph_ref, allocator, *my_node); 890 this->graph_ref.reserve_wait(); 891 do_fwd = false; 892 } 893 // retire the input values 894 join_helper<N>::reset_ports(my_inputs); // <== call back 895 } 896 else { 897 __TBB_ASSERT(false, "should have had something to push"); 898 } 899 return rtask; 900 } 901 902 void handle_operations(key_matching_FE_operation* op_list) { 903 key_matching_FE_operation *current; 904 while(op_list) { 905 current = op_list; 906 op_list = op_list->next; 907 switch(current->type) { 908 case res_count: // called from BE 909 { 910 this->destroy_front(); 911 current->status.store( SUCCEEDED, std::memory_order_release); 912 } 913 break; 914 case inc_count: { // called from input ports 915 count_element_type *p = nullptr; 916 unref_key_type &t = current->my_val; 917 if(!(this->find_ref_with_key(t,p))) { 918 count_element_type ev; 919 ev.my_key = t; 920 ev.my_value = 0; 921 this->insert_with_key(ev); 922 bool found = this->find_ref_with_key(t, p); 923 __TBB_ASSERT_EX(found, "should find key after inserting it"); 924 } 925 if(++(p->my_value) == size_t(N)) { 926 current->bypass_t = fill_output_buffer(t); 927 } 928 } 929 current->status.store( SUCCEEDED, std::memory_order_release); 930 break; 931 case may_succeed: // called from BE 932 current->status.store( this->buffer_empty() ? FAILED : SUCCEEDED, std::memory_order_release); 933 break; 934 case try_make: // called from BE 935 if(this->buffer_empty()) { 936 current->status.store( FAILED, std::memory_order_release); 937 } 938 else { 939 *(current->my_output) = this->front(); 940 current->status.store( SUCCEEDED, std::memory_order_release); 941 } 942 break; 943 } 944 } 945 } 946 // ------------ End Aggregator --------------- 947 948 public: 949 template<typename FunctionTuple> 950 join_node_FE(graph &g, FunctionTuple &TtoK_funcs) : forwarding_base_type(g), my_node(nullptr) { 951 join_helper<N>::set_join_node_pointer(my_inputs, this); 952 join_helper<N>::set_key_functors(my_inputs, TtoK_funcs); 953 my_aggregator.initialize_handler(handler_type(this)); 954 TtoK_function_body_type *cfb = new TtoK_function_body_leaf_type(key_to_count_func()); 955 this->set_key_func(cfb); 956 } 957 958 join_node_FE(const join_node_FE& other) : forwarding_base_type((other.forwarding_base_type::graph_ref)), key_to_count_buffer_type(), 959 output_buffer_type() { 960 my_node = nullptr; 961 join_helper<N>::set_join_node_pointer(my_inputs, this); 962 join_helper<N>::copy_key_functors(my_inputs, const_cast<input_type &>(other.my_inputs)); 963 my_aggregator.initialize_handler(handler_type(this)); 964 TtoK_function_body_type *cfb = new TtoK_function_body_leaf_type(key_to_count_func()); 965 this->set_key_func(cfb); 966 } 967 968 // needed for forwarding 969 void set_my_node(base_node_type *new_my_node) { my_node = new_my_node; } 970 971 void reset_port_count() { // called from BE 972 key_matching_FE_operation op_data(res_count); 973 my_aggregator.execute(&op_data); 974 return; 975 } 976 977 // if all input_ports have items, spawn forward to try and consume tuples 978 // return a task if we are asked and did create one. 979 graph_task *increment_key_count(unref_key_type const & t) override { // called from input_ports 980 key_matching_FE_operation op_data(t, inc_count); 981 my_aggregator.execute(&op_data); 982 return op_data.bypass_t; 983 } 984 985 input_type &input_ports() { return my_inputs; } 986 987 protected: 988 989 void reset( reset_flags f ) { 990 // called outside of parallel contexts 991 join_helper<N>::reset_inputs(my_inputs, f); 992 993 key_to_count_buffer_type::reset(); 994 output_buffer_type::reset(); 995 } 996 997 // all methods on input ports should be called under mutual exclusion from join_node_base. 998 999 bool tuple_build_may_succeed() { // called from back-end 1000 key_matching_FE_operation op_data(may_succeed); 1001 my_aggregator.execute(&op_data); 1002 return op_data.status == SUCCEEDED; 1003 } 1004 1005 // cannot lock while calling back to input_ports. current_key will only be set 1006 // and reset under the aggregator, so it will remain consistent. 1007 bool try_to_make_tuple(output_type &out) { 1008 key_matching_FE_operation op_data(&out,try_make); 1009 my_aggregator.execute(&op_data); 1010 return op_data.status == SUCCEEDED; 1011 } 1012 1013 void tuple_accepted() { 1014 reset_port_count(); // reset current_key after ports reset. 1015 } 1016 1017 void tuple_rejected() { 1018 // nothing to do. 1019 } 1020 1021 input_type my_inputs; // input ports 1022 base_node_type *my_node; 1023 }; // join_node_FE<key_matching<K,KHash>, InputTuple, OutputTuple> 1024 1025 //! join_node_base 1026 template<typename JP, typename InputTuple, typename OutputTuple> 1027 class join_node_base : public graph_node, public join_node_FE<JP, InputTuple, OutputTuple>, 1028 public sender<OutputTuple> { 1029 protected: 1030 using graph_node::my_graph; 1031 public: 1032 typedef OutputTuple output_type; 1033 1034 typedef typename sender<output_type>::successor_type successor_type; 1035 typedef join_node_FE<JP, InputTuple, OutputTuple> input_ports_type; 1036 using input_ports_type::tuple_build_may_succeed; 1037 using input_ports_type::try_to_make_tuple; 1038 using input_ports_type::tuple_accepted; 1039 using input_ports_type::tuple_rejected; 1040 1041 private: 1042 // ----------- Aggregator ------------ 1043 enum op_type { reg_succ, rem_succ, try__get, do_fwrd, do_fwrd_bypass 1044 }; 1045 typedef join_node_base<JP,InputTuple,OutputTuple> class_type; 1046 1047 class join_node_base_operation : public aggregated_operation<join_node_base_operation> { 1048 public: 1049 char type; 1050 union { 1051 output_type *my_arg; 1052 successor_type *my_succ; 1053 }; 1054 graph_task* bypass_t; 1055 join_node_base_operation(const output_type& e, op_type t) : type(char(t)), 1056 my_arg(const_cast<output_type*>(&e)), bypass_t(nullptr) {} 1057 join_node_base_operation(const successor_type &s, op_type t) : type(char(t)), 1058 my_succ(const_cast<successor_type *>(&s)), bypass_t(nullptr) {} 1059 join_node_base_operation(op_type t) : type(char(t)), bypass_t(nullptr) {} 1060 }; 1061 1062 typedef aggregating_functor<class_type, join_node_base_operation> handler_type; 1063 friend class aggregating_functor<class_type, join_node_base_operation>; 1064 bool forwarder_busy; 1065 aggregator<handler_type, join_node_base_operation> my_aggregator; 1066 1067 void handle_operations(join_node_base_operation* op_list) { 1068 join_node_base_operation *current; 1069 while(op_list) { 1070 current = op_list; 1071 op_list = op_list->next; 1072 switch(current->type) { 1073 case reg_succ: { 1074 my_successors.register_successor(*(current->my_succ)); 1075 if(tuple_build_may_succeed() && !forwarder_busy && is_graph_active(my_graph)) { 1076 small_object_allocator allocator{}; 1077 typedef forward_task_bypass< join_node_base<JP, InputTuple, OutputTuple> > task_type; 1078 graph_task* t = allocator.new_object<task_type>(my_graph, allocator, *this); 1079 my_graph.reserve_wait(); 1080 spawn_in_graph_arena(my_graph, *t); 1081 forwarder_busy = true; 1082 } 1083 current->status.store( SUCCEEDED, std::memory_order_release); 1084 } 1085 break; 1086 case rem_succ: 1087 my_successors.remove_successor(*(current->my_succ)); 1088 current->status.store( SUCCEEDED, std::memory_order_release); 1089 break; 1090 case try__get: 1091 if(tuple_build_may_succeed()) { 1092 if(try_to_make_tuple(*(current->my_arg))) { 1093 tuple_accepted(); 1094 current->status.store( SUCCEEDED, std::memory_order_release); 1095 } 1096 else current->status.store( FAILED, std::memory_order_release); 1097 } 1098 else current->status.store( FAILED, std::memory_order_release); 1099 break; 1100 case do_fwrd_bypass: { 1101 bool build_succeeded; 1102 graph_task *last_task = nullptr; 1103 output_type out; 1104 // forwarding must be exclusive, because try_to_make_tuple and tuple_accepted 1105 // are separate locked methods in the FE. We could conceivably fetch the front 1106 // of the FE queue, then be swapped out, have someone else consume the FE's 1107 // object, then come back, forward, and then try to remove it from the queue 1108 // again. Without reservation of the FE, the methods accessing it must be locked. 1109 // We could remember the keys of the objects we forwarded, and then remove 1110 // them from the input ports after forwarding is complete? 1111 if(tuple_build_may_succeed()) { // checks output queue of FE 1112 do { 1113 build_succeeded = try_to_make_tuple(out); // fetch front_end of queue 1114 if(build_succeeded) { 1115 graph_task *new_task = my_successors.try_put_task(out); 1116 last_task = combine_tasks(my_graph, last_task, new_task); 1117 if(new_task) { 1118 tuple_accepted(); 1119 } 1120 else { 1121 tuple_rejected(); 1122 build_succeeded = false; 1123 } 1124 } 1125 } while(build_succeeded); 1126 } 1127 current->bypass_t = last_task; 1128 current->status.store( SUCCEEDED, std::memory_order_release); 1129 forwarder_busy = false; 1130 } 1131 break; 1132 } 1133 } 1134 } 1135 // ---------- end aggregator ----------- 1136 public: 1137 join_node_base(graph &g) 1138 : graph_node(g), input_ports_type(g), forwarder_busy(false), my_successors(this) 1139 { 1140 input_ports_type::set_my_node(this); 1141 my_aggregator.initialize_handler(handler_type(this)); 1142 } 1143 1144 join_node_base(const join_node_base& other) : 1145 graph_node(other.graph_node::my_graph), input_ports_type(other), 1146 sender<OutputTuple>(), forwarder_busy(false), my_successors(this) 1147 { 1148 input_ports_type::set_my_node(this); 1149 my_aggregator.initialize_handler(handler_type(this)); 1150 } 1151 1152 template<typename FunctionTuple> 1153 join_node_base(graph &g, FunctionTuple f) 1154 : graph_node(g), input_ports_type(g, f), forwarder_busy(false), my_successors(this) 1155 { 1156 input_ports_type::set_my_node(this); 1157 my_aggregator.initialize_handler(handler_type(this)); 1158 } 1159 1160 bool register_successor(successor_type &r) override { 1161 join_node_base_operation op_data(r, reg_succ); 1162 my_aggregator.execute(&op_data); 1163 return op_data.status == SUCCEEDED; 1164 } 1165 1166 bool remove_successor( successor_type &r) override { 1167 join_node_base_operation op_data(r, rem_succ); 1168 my_aggregator.execute(&op_data); 1169 return op_data.status == SUCCEEDED; 1170 } 1171 1172 bool try_get( output_type &v) override { 1173 join_node_base_operation op_data(v, try__get); 1174 my_aggregator.execute(&op_data); 1175 return op_data.status == SUCCEEDED; 1176 } 1177 1178 protected: 1179 void reset_node(reset_flags f) override { 1180 input_ports_type::reset(f); 1181 if(f & rf_clear_edges) my_successors.clear(); 1182 } 1183 1184 private: 1185 broadcast_cache<output_type, null_rw_mutex> my_successors; 1186 1187 friend class forward_task_bypass< join_node_base<JP, InputTuple, OutputTuple> >; 1188 graph_task *forward_task() { 1189 join_node_base_operation op_data(do_fwrd_bypass); 1190 my_aggregator.execute(&op_data); 1191 return op_data.bypass_t; 1192 } 1193 1194 }; // join_node_base 1195 1196 // join base class type generator 1197 template<int N, template<class> class PT, typename OutputTuple, typename JP> 1198 struct join_base { 1199 typedef join_node_base<JP, typename wrap_tuple_elements<N,PT,OutputTuple>::type, OutputTuple> type; 1200 }; 1201 1202 template<int N, typename OutputTuple, typename K, typename KHash> 1203 struct join_base<N, key_matching_port, OutputTuple, key_matching<K,KHash> > { 1204 typedef key_matching<K, KHash> key_traits_type; 1205 typedef K key_type; 1206 typedef KHash key_hash_compare; 1207 typedef join_node_base< key_traits_type, 1208 // ports type 1209 typename wrap_key_tuple_elements<N,key_matching_port,key_traits_type,OutputTuple>::type, 1210 OutputTuple > type; 1211 }; 1212 1213 //! unfolded_join_node : passes input_ports_type to join_node_base. We build the input port type 1214 // using tuple_element. The class PT is the port type (reserving_port, queueing_port, key_matching_port) 1215 // and should match the typename. 1216 1217 template<int M, template<class> class PT, typename OutputTuple, typename JP> 1218 class unfolded_join_node : public join_base<M,PT,OutputTuple,JP>::type { 1219 public: 1220 typedef typename wrap_tuple_elements<M, PT, OutputTuple>::type input_ports_type; 1221 typedef OutputTuple output_type; 1222 private: 1223 typedef join_node_base<JP, input_ports_type, output_type > base_type; 1224 public: 1225 unfolded_join_node(graph &g) : base_type(g) {} 1226 unfolded_join_node(const unfolded_join_node &other) : base_type(other) {} 1227 }; 1228 1229 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING 1230 template <typename K, typename T> 1231 struct key_from_message_body { 1232 K operator()(const T& t) const { 1233 return key_from_message<K>(t); 1234 } 1235 }; 1236 // Adds const to reference type 1237 template <typename K, typename T> 1238 struct key_from_message_body<K&,T> { 1239 const K& operator()(const T& t) const { 1240 return key_from_message<const K&>(t); 1241 } 1242 }; 1243 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */ 1244 // key_matching unfolded_join_node. This must be a separate specialization because the constructors 1245 // differ. 1246 1247 template<typename OutputTuple, typename K, typename KHash> 1248 class unfolded_join_node<2,key_matching_port,OutputTuple,key_matching<K,KHash> > : public 1249 join_base<2,key_matching_port,OutputTuple,key_matching<K,KHash> >::type { 1250 typedef typename std::tuple_element<0, OutputTuple>::type T0; 1251 typedef typename std::tuple_element<1, OutputTuple>::type T1; 1252 public: 1253 typedef typename wrap_key_tuple_elements<2,key_matching_port,key_matching<K,KHash>,OutputTuple>::type input_ports_type; 1254 typedef OutputTuple output_type; 1255 private: 1256 typedef join_node_base<key_matching<K,KHash>, input_ports_type, output_type > base_type; 1257 typedef type_to_key_function_body<T0, K> *f0_p; 1258 typedef type_to_key_function_body<T1, K> *f1_p; 1259 typedef std::tuple< f0_p, f1_p > func_initializer_type; 1260 public: 1261 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING 1262 unfolded_join_node(graph &g) : base_type(g, 1263 func_initializer_type( 1264 new type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()), 1265 new type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()) 1266 ) ) { 1267 } 1268 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */ 1269 template<typename Body0, typename Body1> 1270 unfolded_join_node(graph &g, Body0 body0, Body1 body1) : base_type(g, 1271 func_initializer_type( 1272 new type_to_key_function_body_leaf<T0, K, Body0>(body0), 1273 new type_to_key_function_body_leaf<T1, K, Body1>(body1) 1274 ) ) { 1275 static_assert(std::tuple_size<OutputTuple>::value == 2, "wrong number of body initializers"); 1276 } 1277 unfolded_join_node(const unfolded_join_node &other) : base_type(other) {} 1278 }; 1279 1280 template<typename OutputTuple, typename K, typename KHash> 1281 class unfolded_join_node<3,key_matching_port,OutputTuple,key_matching<K,KHash> > : public 1282 join_base<3,key_matching_port,OutputTuple,key_matching<K,KHash> >::type { 1283 typedef typename std::tuple_element<0, OutputTuple>::type T0; 1284 typedef typename std::tuple_element<1, OutputTuple>::type T1; 1285 typedef typename std::tuple_element<2, OutputTuple>::type T2; 1286 public: 1287 typedef typename wrap_key_tuple_elements<3,key_matching_port,key_matching<K,KHash>,OutputTuple>::type input_ports_type; 1288 typedef OutputTuple output_type; 1289 private: 1290 typedef join_node_base<key_matching<K,KHash>, input_ports_type, output_type > base_type; 1291 typedef type_to_key_function_body<T0, K> *f0_p; 1292 typedef type_to_key_function_body<T1, K> *f1_p; 1293 typedef type_to_key_function_body<T2, K> *f2_p; 1294 typedef std::tuple< f0_p, f1_p, f2_p > func_initializer_type; 1295 public: 1296 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING 1297 unfolded_join_node(graph &g) : base_type(g, 1298 func_initializer_type( 1299 new type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()), 1300 new type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()), 1301 new type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()) 1302 ) ) { 1303 } 1304 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */ 1305 template<typename Body0, typename Body1, typename Body2> 1306 unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2) : base_type(g, 1307 func_initializer_type( 1308 new type_to_key_function_body_leaf<T0, K, Body0>(body0), 1309 new type_to_key_function_body_leaf<T1, K, Body1>(body1), 1310 new type_to_key_function_body_leaf<T2, K, Body2>(body2) 1311 ) ) { 1312 static_assert(std::tuple_size<OutputTuple>::value == 3, "wrong number of body initializers"); 1313 } 1314 unfolded_join_node(const unfolded_join_node &other) : base_type(other) {} 1315 }; 1316 1317 template<typename OutputTuple, typename K, typename KHash> 1318 class unfolded_join_node<4,key_matching_port,OutputTuple,key_matching<K,KHash> > : public 1319 join_base<4,key_matching_port,OutputTuple,key_matching<K,KHash> >::type { 1320 typedef typename std::tuple_element<0, OutputTuple>::type T0; 1321 typedef typename std::tuple_element<1, OutputTuple>::type T1; 1322 typedef typename std::tuple_element<2, OutputTuple>::type T2; 1323 typedef typename std::tuple_element<3, OutputTuple>::type T3; 1324 public: 1325 typedef typename wrap_key_tuple_elements<4,key_matching_port,key_matching<K,KHash>,OutputTuple>::type input_ports_type; 1326 typedef OutputTuple output_type; 1327 private: 1328 typedef join_node_base<key_matching<K,KHash>, input_ports_type, output_type > base_type; 1329 typedef type_to_key_function_body<T0, K> *f0_p; 1330 typedef type_to_key_function_body<T1, K> *f1_p; 1331 typedef type_to_key_function_body<T2, K> *f2_p; 1332 typedef type_to_key_function_body<T3, K> *f3_p; 1333 typedef std::tuple< f0_p, f1_p, f2_p, f3_p > func_initializer_type; 1334 public: 1335 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING 1336 unfolded_join_node(graph &g) : base_type(g, 1337 func_initializer_type( 1338 new type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()), 1339 new type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()), 1340 new type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()), 1341 new type_to_key_function_body_leaf<T3, K, key_from_message_body<K,T3> >(key_from_message_body<K,T3>()) 1342 ) ) { 1343 } 1344 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */ 1345 template<typename Body0, typename Body1, typename Body2, typename Body3> 1346 unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3) : base_type(g, 1347 func_initializer_type( 1348 new type_to_key_function_body_leaf<T0, K, Body0>(body0), 1349 new type_to_key_function_body_leaf<T1, K, Body1>(body1), 1350 new type_to_key_function_body_leaf<T2, K, Body2>(body2), 1351 new type_to_key_function_body_leaf<T3, K, Body3>(body3) 1352 ) ) { 1353 static_assert(std::tuple_size<OutputTuple>::value == 4, "wrong number of body initializers"); 1354 } 1355 unfolded_join_node(const unfolded_join_node &other) : base_type(other) {} 1356 }; 1357 1358 template<typename OutputTuple, typename K, typename KHash> 1359 class unfolded_join_node<5,key_matching_port,OutputTuple,key_matching<K,KHash> > : public 1360 join_base<5,key_matching_port,OutputTuple,key_matching<K,KHash> >::type { 1361 typedef typename std::tuple_element<0, OutputTuple>::type T0; 1362 typedef typename std::tuple_element<1, OutputTuple>::type T1; 1363 typedef typename std::tuple_element<2, OutputTuple>::type T2; 1364 typedef typename std::tuple_element<3, OutputTuple>::type T3; 1365 typedef typename std::tuple_element<4, OutputTuple>::type T4; 1366 public: 1367 typedef typename wrap_key_tuple_elements<5,key_matching_port,key_matching<K,KHash>,OutputTuple>::type input_ports_type; 1368 typedef OutputTuple output_type; 1369 private: 1370 typedef join_node_base<key_matching<K,KHash> , input_ports_type, output_type > base_type; 1371 typedef type_to_key_function_body<T0, K> *f0_p; 1372 typedef type_to_key_function_body<T1, K> *f1_p; 1373 typedef type_to_key_function_body<T2, K> *f2_p; 1374 typedef type_to_key_function_body<T3, K> *f3_p; 1375 typedef type_to_key_function_body<T4, K> *f4_p; 1376 typedef std::tuple< f0_p, f1_p, f2_p, f3_p, f4_p > func_initializer_type; 1377 public: 1378 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING 1379 unfolded_join_node(graph &g) : base_type(g, 1380 func_initializer_type( 1381 new type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()), 1382 new type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()), 1383 new type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()), 1384 new type_to_key_function_body_leaf<T3, K, key_from_message_body<K,T3> >(key_from_message_body<K,T3>()), 1385 new type_to_key_function_body_leaf<T4, K, key_from_message_body<K,T4> >(key_from_message_body<K,T4>()) 1386 ) ) { 1387 } 1388 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */ 1389 template<typename Body0, typename Body1, typename Body2, typename Body3, typename Body4> 1390 unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4) : base_type(g, 1391 func_initializer_type( 1392 new type_to_key_function_body_leaf<T0, K, Body0>(body0), 1393 new type_to_key_function_body_leaf<T1, K, Body1>(body1), 1394 new type_to_key_function_body_leaf<T2, K, Body2>(body2), 1395 new type_to_key_function_body_leaf<T3, K, Body3>(body3), 1396 new type_to_key_function_body_leaf<T4, K, Body4>(body4) 1397 ) ) { 1398 static_assert(std::tuple_size<OutputTuple>::value == 5, "wrong number of body initializers"); 1399 } 1400 unfolded_join_node(const unfolded_join_node &other) : base_type(other) {} 1401 }; 1402 1403 #if __TBB_VARIADIC_MAX >= 6 1404 template<typename OutputTuple, typename K, typename KHash> 1405 class unfolded_join_node<6,key_matching_port,OutputTuple,key_matching<K,KHash> > : public 1406 join_base<6,key_matching_port,OutputTuple,key_matching<K,KHash> >::type { 1407 typedef typename std::tuple_element<0, OutputTuple>::type T0; 1408 typedef typename std::tuple_element<1, OutputTuple>::type T1; 1409 typedef typename std::tuple_element<2, OutputTuple>::type T2; 1410 typedef typename std::tuple_element<3, OutputTuple>::type T3; 1411 typedef typename std::tuple_element<4, OutputTuple>::type T4; 1412 typedef typename std::tuple_element<5, OutputTuple>::type T5; 1413 public: 1414 typedef typename wrap_key_tuple_elements<6,key_matching_port,key_matching<K,KHash>,OutputTuple>::type input_ports_type; 1415 typedef OutputTuple output_type; 1416 private: 1417 typedef join_node_base<key_matching<K,KHash> , input_ports_type, output_type > base_type; 1418 typedef type_to_key_function_body<T0, K> *f0_p; 1419 typedef type_to_key_function_body<T1, K> *f1_p; 1420 typedef type_to_key_function_body<T2, K> *f2_p; 1421 typedef type_to_key_function_body<T3, K> *f3_p; 1422 typedef type_to_key_function_body<T4, K> *f4_p; 1423 typedef type_to_key_function_body<T5, K> *f5_p; 1424 typedef std::tuple< f0_p, f1_p, f2_p, f3_p, f4_p, f5_p > func_initializer_type; 1425 public: 1426 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING 1427 unfolded_join_node(graph &g) : base_type(g, 1428 func_initializer_type( 1429 new type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()), 1430 new type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()), 1431 new type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()), 1432 new type_to_key_function_body_leaf<T3, K, key_from_message_body<K,T3> >(key_from_message_body<K,T3>()), 1433 new type_to_key_function_body_leaf<T4, K, key_from_message_body<K,T4> >(key_from_message_body<K,T4>()), 1434 new type_to_key_function_body_leaf<T5, K, key_from_message_body<K,T5> >(key_from_message_body<K,T5>()) 1435 ) ) { 1436 } 1437 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */ 1438 template<typename Body0, typename Body1, typename Body2, typename Body3, typename Body4, typename Body5> 1439 unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4, Body5 body5) 1440 : base_type(g, func_initializer_type( 1441 new type_to_key_function_body_leaf<T0, K, Body0>(body0), 1442 new type_to_key_function_body_leaf<T1, K, Body1>(body1), 1443 new type_to_key_function_body_leaf<T2, K, Body2>(body2), 1444 new type_to_key_function_body_leaf<T3, K, Body3>(body3), 1445 new type_to_key_function_body_leaf<T4, K, Body4>(body4), 1446 new type_to_key_function_body_leaf<T5, K, Body5>(body5) 1447 ) ) { 1448 static_assert(std::tuple_size<OutputTuple>::value == 6, "wrong number of body initializers"); 1449 } 1450 unfolded_join_node(const unfolded_join_node &other) : base_type(other) {} 1451 }; 1452 #endif 1453 1454 #if __TBB_VARIADIC_MAX >= 7 1455 template<typename OutputTuple, typename K, typename KHash> 1456 class unfolded_join_node<7,key_matching_port,OutputTuple,key_matching<K,KHash> > : public 1457 join_base<7,key_matching_port,OutputTuple,key_matching<K,KHash> >::type { 1458 typedef typename std::tuple_element<0, OutputTuple>::type T0; 1459 typedef typename std::tuple_element<1, OutputTuple>::type T1; 1460 typedef typename std::tuple_element<2, OutputTuple>::type T2; 1461 typedef typename std::tuple_element<3, OutputTuple>::type T3; 1462 typedef typename std::tuple_element<4, OutputTuple>::type T4; 1463 typedef typename std::tuple_element<5, OutputTuple>::type T5; 1464 typedef typename std::tuple_element<6, OutputTuple>::type T6; 1465 public: 1466 typedef typename wrap_key_tuple_elements<7,key_matching_port,key_matching<K,KHash>,OutputTuple>::type input_ports_type; 1467 typedef OutputTuple output_type; 1468 private: 1469 typedef join_node_base<key_matching<K,KHash> , input_ports_type, output_type > base_type; 1470 typedef type_to_key_function_body<T0, K> *f0_p; 1471 typedef type_to_key_function_body<T1, K> *f1_p; 1472 typedef type_to_key_function_body<T2, K> *f2_p; 1473 typedef type_to_key_function_body<T3, K> *f3_p; 1474 typedef type_to_key_function_body<T4, K> *f4_p; 1475 typedef type_to_key_function_body<T5, K> *f5_p; 1476 typedef type_to_key_function_body<T6, K> *f6_p; 1477 typedef std::tuple< f0_p, f1_p, f2_p, f3_p, f4_p, f5_p, f6_p > func_initializer_type; 1478 public: 1479 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING 1480 unfolded_join_node(graph &g) : base_type(g, 1481 func_initializer_type( 1482 new type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()), 1483 new type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()), 1484 new type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()), 1485 new type_to_key_function_body_leaf<T3, K, key_from_message_body<K,T3> >(key_from_message_body<K,T3>()), 1486 new type_to_key_function_body_leaf<T4, K, key_from_message_body<K,T4> >(key_from_message_body<K,T4>()), 1487 new type_to_key_function_body_leaf<T5, K, key_from_message_body<K,T5> >(key_from_message_body<K,T5>()), 1488 new type_to_key_function_body_leaf<T6, K, key_from_message_body<K,T6> >(key_from_message_body<K,T6>()) 1489 ) ) { 1490 } 1491 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */ 1492 template<typename Body0, typename Body1, typename Body2, typename Body3, typename Body4, 1493 typename Body5, typename Body6> 1494 unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4, 1495 Body5 body5, Body6 body6) : base_type(g, func_initializer_type( 1496 new type_to_key_function_body_leaf<T0, K, Body0>(body0), 1497 new type_to_key_function_body_leaf<T1, K, Body1>(body1), 1498 new type_to_key_function_body_leaf<T2, K, Body2>(body2), 1499 new type_to_key_function_body_leaf<T3, K, Body3>(body3), 1500 new type_to_key_function_body_leaf<T4, K, Body4>(body4), 1501 new type_to_key_function_body_leaf<T5, K, Body5>(body5), 1502 new type_to_key_function_body_leaf<T6, K, Body6>(body6) 1503 ) ) { 1504 static_assert(std::tuple_size<OutputTuple>::value == 7, "wrong number of body initializers"); 1505 } 1506 unfolded_join_node(const unfolded_join_node &other) : base_type(other) {} 1507 }; 1508 #endif 1509 1510 #if __TBB_VARIADIC_MAX >= 8 1511 template<typename OutputTuple, typename K, typename KHash> 1512 class unfolded_join_node<8,key_matching_port,OutputTuple,key_matching<K,KHash> > : public 1513 join_base<8,key_matching_port,OutputTuple,key_matching<K,KHash> >::type { 1514 typedef typename std::tuple_element<0, OutputTuple>::type T0; 1515 typedef typename std::tuple_element<1, OutputTuple>::type T1; 1516 typedef typename std::tuple_element<2, OutputTuple>::type T2; 1517 typedef typename std::tuple_element<3, OutputTuple>::type T3; 1518 typedef typename std::tuple_element<4, OutputTuple>::type T4; 1519 typedef typename std::tuple_element<5, OutputTuple>::type T5; 1520 typedef typename std::tuple_element<6, OutputTuple>::type T6; 1521 typedef typename std::tuple_element<7, OutputTuple>::type T7; 1522 public: 1523 typedef typename wrap_key_tuple_elements<8,key_matching_port,key_matching<K,KHash>,OutputTuple>::type input_ports_type; 1524 typedef OutputTuple output_type; 1525 private: 1526 typedef join_node_base<key_matching<K,KHash> , input_ports_type, output_type > base_type; 1527 typedef type_to_key_function_body<T0, K> *f0_p; 1528 typedef type_to_key_function_body<T1, K> *f1_p; 1529 typedef type_to_key_function_body<T2, K> *f2_p; 1530 typedef type_to_key_function_body<T3, K> *f3_p; 1531 typedef type_to_key_function_body<T4, K> *f4_p; 1532 typedef type_to_key_function_body<T5, K> *f5_p; 1533 typedef type_to_key_function_body<T6, K> *f6_p; 1534 typedef type_to_key_function_body<T7, K> *f7_p; 1535 typedef std::tuple< f0_p, f1_p, f2_p, f3_p, f4_p, f5_p, f6_p, f7_p > func_initializer_type; 1536 public: 1537 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING 1538 unfolded_join_node(graph &g) : base_type(g, 1539 func_initializer_type( 1540 new type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()), 1541 new type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()), 1542 new type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()), 1543 new type_to_key_function_body_leaf<T3, K, key_from_message_body<K,T3> >(key_from_message_body<K,T3>()), 1544 new type_to_key_function_body_leaf<T4, K, key_from_message_body<K,T4> >(key_from_message_body<K,T4>()), 1545 new type_to_key_function_body_leaf<T5, K, key_from_message_body<K,T5> >(key_from_message_body<K,T5>()), 1546 new type_to_key_function_body_leaf<T6, K, key_from_message_body<K,T6> >(key_from_message_body<K,T6>()), 1547 new type_to_key_function_body_leaf<T7, K, key_from_message_body<K,T7> >(key_from_message_body<K,T7>()) 1548 ) ) { 1549 } 1550 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */ 1551 template<typename Body0, typename Body1, typename Body2, typename Body3, typename Body4, 1552 typename Body5, typename Body6, typename Body7> 1553 unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4, 1554 Body5 body5, Body6 body6, Body7 body7) : base_type(g, func_initializer_type( 1555 new type_to_key_function_body_leaf<T0, K, Body0>(body0), 1556 new type_to_key_function_body_leaf<T1, K, Body1>(body1), 1557 new type_to_key_function_body_leaf<T2, K, Body2>(body2), 1558 new type_to_key_function_body_leaf<T3, K, Body3>(body3), 1559 new type_to_key_function_body_leaf<T4, K, Body4>(body4), 1560 new type_to_key_function_body_leaf<T5, K, Body5>(body5), 1561 new type_to_key_function_body_leaf<T6, K, Body6>(body6), 1562 new type_to_key_function_body_leaf<T7, K, Body7>(body7) 1563 ) ) { 1564 static_assert(std::tuple_size<OutputTuple>::value == 8, "wrong number of body initializers"); 1565 } 1566 unfolded_join_node(const unfolded_join_node &other) : base_type(other) {} 1567 }; 1568 #endif 1569 1570 #if __TBB_VARIADIC_MAX >= 9 1571 template<typename OutputTuple, typename K, typename KHash> 1572 class unfolded_join_node<9,key_matching_port,OutputTuple,key_matching<K,KHash> > : public 1573 join_base<9,key_matching_port,OutputTuple,key_matching<K,KHash> >::type { 1574 typedef typename std::tuple_element<0, OutputTuple>::type T0; 1575 typedef typename std::tuple_element<1, OutputTuple>::type T1; 1576 typedef typename std::tuple_element<2, OutputTuple>::type T2; 1577 typedef typename std::tuple_element<3, OutputTuple>::type T3; 1578 typedef typename std::tuple_element<4, OutputTuple>::type T4; 1579 typedef typename std::tuple_element<5, OutputTuple>::type T5; 1580 typedef typename std::tuple_element<6, OutputTuple>::type T6; 1581 typedef typename std::tuple_element<7, OutputTuple>::type T7; 1582 typedef typename std::tuple_element<8, OutputTuple>::type T8; 1583 public: 1584 typedef typename wrap_key_tuple_elements<9,key_matching_port,key_matching<K,KHash>,OutputTuple>::type input_ports_type; 1585 typedef OutputTuple output_type; 1586 private: 1587 typedef join_node_base<key_matching<K,KHash> , input_ports_type, output_type > base_type; 1588 typedef type_to_key_function_body<T0, K> *f0_p; 1589 typedef type_to_key_function_body<T1, K> *f1_p; 1590 typedef type_to_key_function_body<T2, K> *f2_p; 1591 typedef type_to_key_function_body<T3, K> *f3_p; 1592 typedef type_to_key_function_body<T4, K> *f4_p; 1593 typedef type_to_key_function_body<T5, K> *f5_p; 1594 typedef type_to_key_function_body<T6, K> *f6_p; 1595 typedef type_to_key_function_body<T7, K> *f7_p; 1596 typedef type_to_key_function_body<T8, K> *f8_p; 1597 typedef std::tuple< f0_p, f1_p, f2_p, f3_p, f4_p, f5_p, f6_p, f7_p, f8_p > func_initializer_type; 1598 public: 1599 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING 1600 unfolded_join_node(graph &g) : base_type(g, 1601 func_initializer_type( 1602 new type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()), 1603 new type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()), 1604 new type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()), 1605 new type_to_key_function_body_leaf<T3, K, key_from_message_body<K,T3> >(key_from_message_body<K,T3>()), 1606 new type_to_key_function_body_leaf<T4, K, key_from_message_body<K,T4> >(key_from_message_body<K,T4>()), 1607 new type_to_key_function_body_leaf<T5, K, key_from_message_body<K,T5> >(key_from_message_body<K,T5>()), 1608 new type_to_key_function_body_leaf<T6, K, key_from_message_body<K,T6> >(key_from_message_body<K,T6>()), 1609 new type_to_key_function_body_leaf<T7, K, key_from_message_body<K,T7> >(key_from_message_body<K,T7>()), 1610 new type_to_key_function_body_leaf<T8, K, key_from_message_body<K,T8> >(key_from_message_body<K,T8>()) 1611 ) ) { 1612 } 1613 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */ 1614 template<typename Body0, typename Body1, typename Body2, typename Body3, typename Body4, 1615 typename Body5, typename Body6, typename Body7, typename Body8> 1616 unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4, 1617 Body5 body5, Body6 body6, Body7 body7, Body8 body8) : base_type(g, func_initializer_type( 1618 new type_to_key_function_body_leaf<T0, K, Body0>(body0), 1619 new type_to_key_function_body_leaf<T1, K, Body1>(body1), 1620 new type_to_key_function_body_leaf<T2, K, Body2>(body2), 1621 new type_to_key_function_body_leaf<T3, K, Body3>(body3), 1622 new type_to_key_function_body_leaf<T4, K, Body4>(body4), 1623 new type_to_key_function_body_leaf<T5, K, Body5>(body5), 1624 new type_to_key_function_body_leaf<T6, K, Body6>(body6), 1625 new type_to_key_function_body_leaf<T7, K, Body7>(body7), 1626 new type_to_key_function_body_leaf<T8, K, Body8>(body8) 1627 ) ) { 1628 static_assert(std::tuple_size<OutputTuple>::value == 9, "wrong number of body initializers"); 1629 } 1630 unfolded_join_node(const unfolded_join_node &other) : base_type(other) {} 1631 }; 1632 #endif 1633 1634 #if __TBB_VARIADIC_MAX >= 10 1635 template<typename OutputTuple, typename K, typename KHash> 1636 class unfolded_join_node<10,key_matching_port,OutputTuple,key_matching<K,KHash> > : public 1637 join_base<10,key_matching_port,OutputTuple,key_matching<K,KHash> >::type { 1638 typedef typename std::tuple_element<0, OutputTuple>::type T0; 1639 typedef typename std::tuple_element<1, OutputTuple>::type T1; 1640 typedef typename std::tuple_element<2, OutputTuple>::type T2; 1641 typedef typename std::tuple_element<3, OutputTuple>::type T3; 1642 typedef typename std::tuple_element<4, OutputTuple>::type T4; 1643 typedef typename std::tuple_element<5, OutputTuple>::type T5; 1644 typedef typename std::tuple_element<6, OutputTuple>::type T6; 1645 typedef typename std::tuple_element<7, OutputTuple>::type T7; 1646 typedef typename std::tuple_element<8, OutputTuple>::type T8; 1647 typedef typename std::tuple_element<9, OutputTuple>::type T9; 1648 public: 1649 typedef typename wrap_key_tuple_elements<10,key_matching_port,key_matching<K,KHash>,OutputTuple>::type input_ports_type; 1650 typedef OutputTuple output_type; 1651 private: 1652 typedef join_node_base<key_matching<K,KHash> , input_ports_type, output_type > base_type; 1653 typedef type_to_key_function_body<T0, K> *f0_p; 1654 typedef type_to_key_function_body<T1, K> *f1_p; 1655 typedef type_to_key_function_body<T2, K> *f2_p; 1656 typedef type_to_key_function_body<T3, K> *f3_p; 1657 typedef type_to_key_function_body<T4, K> *f4_p; 1658 typedef type_to_key_function_body<T5, K> *f5_p; 1659 typedef type_to_key_function_body<T6, K> *f6_p; 1660 typedef type_to_key_function_body<T7, K> *f7_p; 1661 typedef type_to_key_function_body<T8, K> *f8_p; 1662 typedef type_to_key_function_body<T9, K> *f9_p; 1663 typedef std::tuple< f0_p, f1_p, f2_p, f3_p, f4_p, f5_p, f6_p, f7_p, f8_p, f9_p > func_initializer_type; 1664 public: 1665 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING 1666 unfolded_join_node(graph &g) : base_type(g, 1667 func_initializer_type( 1668 new type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()), 1669 new type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()), 1670 new type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()), 1671 new type_to_key_function_body_leaf<T3, K, key_from_message_body<K,T3> >(key_from_message_body<K,T3>()), 1672 new type_to_key_function_body_leaf<T4, K, key_from_message_body<K,T4> >(key_from_message_body<K,T4>()), 1673 new type_to_key_function_body_leaf<T5, K, key_from_message_body<K,T5> >(key_from_message_body<K,T5>()), 1674 new type_to_key_function_body_leaf<T6, K, key_from_message_body<K,T6> >(key_from_message_body<K,T6>()), 1675 new type_to_key_function_body_leaf<T7, K, key_from_message_body<K,T7> >(key_from_message_body<K,T7>()), 1676 new type_to_key_function_body_leaf<T8, K, key_from_message_body<K,T8> >(key_from_message_body<K,T8>()), 1677 new type_to_key_function_body_leaf<T9, K, key_from_message_body<K,T9> >(key_from_message_body<K,T9>()) 1678 ) ) { 1679 } 1680 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */ 1681 template<typename Body0, typename Body1, typename Body2, typename Body3, typename Body4, 1682 typename Body5, typename Body6, typename Body7, typename Body8, typename Body9> 1683 unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4, 1684 Body5 body5, Body6 body6, Body7 body7, Body8 body8, Body9 body9) : base_type(g, func_initializer_type( 1685 new type_to_key_function_body_leaf<T0, K, Body0>(body0), 1686 new type_to_key_function_body_leaf<T1, K, Body1>(body1), 1687 new type_to_key_function_body_leaf<T2, K, Body2>(body2), 1688 new type_to_key_function_body_leaf<T3, K, Body3>(body3), 1689 new type_to_key_function_body_leaf<T4, K, Body4>(body4), 1690 new type_to_key_function_body_leaf<T5, K, Body5>(body5), 1691 new type_to_key_function_body_leaf<T6, K, Body6>(body6), 1692 new type_to_key_function_body_leaf<T7, K, Body7>(body7), 1693 new type_to_key_function_body_leaf<T8, K, Body8>(body8), 1694 new type_to_key_function_body_leaf<T9, K, Body9>(body9) 1695 ) ) { 1696 static_assert(std::tuple_size<OutputTuple>::value == 10, "wrong number of body initializers"); 1697 } 1698 unfolded_join_node(const unfolded_join_node &other) : base_type(other) {} 1699 }; 1700 #endif 1701 1702 //! templated function to refer to input ports of the join node 1703 template<size_t N, typename JNT> 1704 typename std::tuple_element<N, typename JNT::input_ports_type>::type &input_port(JNT &jn) { 1705 return std::get<N>(jn.input_ports()); 1706 } 1707 1708 #endif // __TBB__flow_graph_join_impl_H 1709