1 /*
2     Copyright (c) 2005-2021 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #ifndef __TBB__flow_graph_join_impl_H
18 #define __TBB__flow_graph_join_impl_H
19 
20 #ifndef __TBB_flow_graph_H
21 #error Do not #include this internal file directly; use public TBB headers instead.
22 #endif
23 
24 // included into namespace tbb::detail::d1
25 
26     struct forwarding_base : no_assign {
27         forwarding_base(graph &g) : graph_ref(g) {}
28         virtual ~forwarding_base() {}
29         graph& graph_ref;
30     };
31 
32     struct queueing_forwarding_base : forwarding_base {
33         using forwarding_base::forwarding_base;
34         // decrement_port_count may create a forwarding task.  If we cannot handle the task
35         // ourselves, ask decrement_port_count to deal with it.
36         virtual graph_task* decrement_port_count(bool handle_task) = 0;
37     };
38 
39     struct reserving_forwarding_base : forwarding_base {
40         using forwarding_base::forwarding_base;
41         // decrement_port_count may create a forwarding task.  If we cannot handle the task
42         // ourselves, ask decrement_port_count to deal with it.
43         virtual graph_task* decrement_port_count() = 0;
44         virtual void increment_port_count() = 0;
45     };
46 
47     // specialization that lets us keep a copy of the current_key for building results.
48     // KeyType can be a reference type.
49     template<typename KeyType>
50     struct matching_forwarding_base : public forwarding_base {
51         typedef typename std::decay<KeyType>::type current_key_type;
52         matching_forwarding_base(graph &g) : forwarding_base(g) { }
53         virtual graph_task* increment_key_count(current_key_type const & /*t*/) = 0;
54         current_key_type current_key; // so ports can refer to FE's desired items
55     };
56 
57     template< int N >
58     struct join_helper {
59 
60         template< typename TupleType, typename PortType >
61         static inline void set_join_node_pointer(TupleType &my_input, PortType *port) {
62             std::get<N-1>( my_input ).set_join_node_pointer(port);
63             join_helper<N-1>::set_join_node_pointer( my_input, port );
64         }
65         template< typename TupleType >
66         static inline void consume_reservations( TupleType &my_input ) {
67             std::get<N-1>( my_input ).consume();
68             join_helper<N-1>::consume_reservations( my_input );
69         }
70 
71         template< typename TupleType >
72         static inline void release_my_reservation( TupleType &my_input ) {
73             std::get<N-1>( my_input ).release();
74         }
75 
76         template <typename TupleType>
77         static inline void release_reservations( TupleType &my_input) {
78             join_helper<N-1>::release_reservations(my_input);
79             release_my_reservation(my_input);
80         }
81 
82         template< typename InputTuple, typename OutputTuple >
83         static inline bool reserve( InputTuple &my_input, OutputTuple &out) {
84             if ( !std::get<N-1>( my_input ).reserve( std::get<N-1>( out ) ) ) return false;
85             if ( !join_helper<N-1>::reserve( my_input, out ) ) {
86                 release_my_reservation( my_input );
87                 return false;
88             }
89             return true;
90         }
91 
92         template<typename InputTuple, typename OutputTuple>
93         static inline bool get_my_item( InputTuple &my_input, OutputTuple &out) {
94             bool res = std::get<N-1>(my_input).get_item(std::get<N-1>(out) ); // may fail
95             return join_helper<N-1>::get_my_item(my_input, out) && res;       // do get on other inputs before returning
96         }
97 
98         template<typename InputTuple, typename OutputTuple>
99         static inline bool get_items(InputTuple &my_input, OutputTuple &out) {
100             return get_my_item(my_input, out);
101         }
102 
103         template<typename InputTuple>
104         static inline void reset_my_port(InputTuple &my_input) {
105             join_helper<N-1>::reset_my_port(my_input);
106             std::get<N-1>(my_input).reset_port();
107         }
108 
109         template<typename InputTuple>
110         static inline void reset_ports(InputTuple& my_input) {
111             reset_my_port(my_input);
112         }
113 
114         template<typename InputTuple, typename KeyFuncTuple>
115         static inline void set_key_functors(InputTuple &my_input, KeyFuncTuple &my_key_funcs) {
116             std::get<N-1>(my_input).set_my_key_func(std::get<N-1>(my_key_funcs));
117             std::get<N-1>(my_key_funcs) = nullptr;
118             join_helper<N-1>::set_key_functors(my_input, my_key_funcs);
119         }
120 
121         template< typename KeyFuncTuple>
122         static inline void copy_key_functors(KeyFuncTuple &my_inputs, KeyFuncTuple &other_inputs) {
123             __TBB_ASSERT(
124                 std::get<N-1>(other_inputs).get_my_key_func(),
125                 "key matching join node should not be instantiated without functors."
126             );
127             std::get<N-1>(my_inputs).set_my_key_func(std::get<N-1>(other_inputs).get_my_key_func()->clone());
128             join_helper<N-1>::copy_key_functors(my_inputs, other_inputs);
129         }
130 
131         template<typename InputTuple>
132         static inline void reset_inputs(InputTuple &my_input, reset_flags f) {
133             join_helper<N-1>::reset_inputs(my_input, f);
134             std::get<N-1>(my_input).reset_receiver(f);
135         }
136     };  // join_helper<N>
137 
138     template< >
139     struct join_helper<1> {
140 
141         template< typename TupleType, typename PortType >
142         static inline void set_join_node_pointer(TupleType &my_input, PortType *port) {
143             std::get<0>( my_input ).set_join_node_pointer(port);
144         }
145 
146         template< typename TupleType >
147         static inline void consume_reservations( TupleType &my_input ) {
148             std::get<0>( my_input ).consume();
149         }
150 
151         template< typename TupleType >
152         static inline void release_my_reservation( TupleType &my_input ) {
153             std::get<0>( my_input ).release();
154         }
155 
156         template<typename TupleType>
157         static inline void release_reservations( TupleType &my_input) {
158             release_my_reservation(my_input);
159         }
160 
161         template< typename InputTuple, typename OutputTuple >
162         static inline bool reserve( InputTuple &my_input, OutputTuple &out) {
163             return std::get<0>( my_input ).reserve( std::get<0>( out ) );
164         }
165 
166         template<typename InputTuple, typename OutputTuple>
167         static inline bool get_my_item( InputTuple &my_input, OutputTuple &out) {
168             return std::get<0>(my_input).get_item(std::get<0>(out));
169         }
170 
171         template<typename InputTuple, typename OutputTuple>
172         static inline bool get_items(InputTuple &my_input, OutputTuple &out) {
173             return get_my_item(my_input, out);
174         }
175 
176         template<typename InputTuple>
177         static inline void reset_my_port(InputTuple &my_input) {
178             std::get<0>(my_input).reset_port();
179         }
180 
181         template<typename InputTuple>
182         static inline void reset_ports(InputTuple& my_input) {
183             reset_my_port(my_input);
184         }
185 
186         template<typename InputTuple, typename KeyFuncTuple>
187         static inline void set_key_functors(InputTuple &my_input, KeyFuncTuple &my_key_funcs) {
188             std::get<0>(my_input).set_my_key_func(std::get<0>(my_key_funcs));
189             std::get<0>(my_key_funcs) = nullptr;
190         }
191 
192         template< typename KeyFuncTuple>
193         static inline void copy_key_functors(KeyFuncTuple &my_inputs, KeyFuncTuple &other_inputs) {
194             __TBB_ASSERT(
195                 std::get<0>(other_inputs).get_my_key_func(),
196                 "key matching join node should not be instantiated without functors."
197             );
198             std::get<0>(my_inputs).set_my_key_func(std::get<0>(other_inputs).get_my_key_func()->clone());
199         }
200         template<typename InputTuple>
201         static inline void reset_inputs(InputTuple &my_input, reset_flags f) {
202             std::get<0>(my_input).reset_receiver(f);
203         }
204     };  // join_helper<1>
205 
206     //! The two-phase join port
207     template< typename T >
208     class reserving_port : public receiver<T> {
209     public:
210         typedef T input_type;
211         typedef typename receiver<input_type>::predecessor_type predecessor_type;
212 
213     private:
214         // ----------- Aggregator ------------
215         enum op_type { reg_pred, rem_pred, res_item, rel_res, con_res
216         };
217         typedef reserving_port<T> class_type;
218 
219         class reserving_port_operation : public aggregated_operation<reserving_port_operation> {
220         public:
221             char type;
222             union {
223                 T *my_arg;
224                 predecessor_type *my_pred;
225             };
226             reserving_port_operation(const T& e, op_type t) :
227                 type(char(t)), my_arg(const_cast<T*>(&e)) {}
228             reserving_port_operation(const predecessor_type &s, op_type t) : type(char(t)),
229                 my_pred(const_cast<predecessor_type *>(&s)) {}
230             reserving_port_operation(op_type t) : type(char(t)) {}
231         };
232 
233         typedef aggregating_functor<class_type, reserving_port_operation> handler_type;
234         friend class aggregating_functor<class_type, reserving_port_operation>;
235         aggregator<handler_type, reserving_port_operation> my_aggregator;
236 
237         void handle_operations(reserving_port_operation* op_list) {
238             reserving_port_operation *current;
239             bool was_missing_predecessors = false;
240             while(op_list) {
241                 current = op_list;
242                 op_list = op_list->next;
243                 switch(current->type) {
244                 case reg_pred:
245                     was_missing_predecessors = my_predecessors.empty();
246                     my_predecessors.add(*(current->my_pred));
247                     if ( was_missing_predecessors ) {
248                         (void) my_join->decrement_port_count(); // may try to forward
249                     }
250                     current->status.store( SUCCEEDED, std::memory_order_release);
251                     break;
252                 case rem_pred:
253                     if ( !my_predecessors.empty() ) {
254                         my_predecessors.remove(*(current->my_pred));
255                         if ( my_predecessors.empty() ) // was the last predecessor
256                             my_join->increment_port_count();
257                     }
258                     // TODO: consider returning failure if there were no predecessors to remove
259                     current->status.store( SUCCEEDED, std::memory_order_release );
260                     break;
261                 case res_item:
262                     if ( reserved ) {
263                         current->status.store( FAILED, std::memory_order_release);
264                     }
265                     else if ( my_predecessors.try_reserve( *(current->my_arg) ) ) {
266                         reserved = true;
267                         current->status.store( SUCCEEDED, std::memory_order_release);
268                     } else {
269                         if ( my_predecessors.empty() ) {
270                             my_join->increment_port_count();
271                         }
272                         current->status.store( FAILED, std::memory_order_release);
273                     }
274                     break;
275                 case rel_res:
276                     reserved = false;
277                     my_predecessors.try_release( );
278                     current->status.store( SUCCEEDED, std::memory_order_release);
279                     break;
280                 case con_res:
281                     reserved = false;
282                     my_predecessors.try_consume( );
283                     current->status.store( SUCCEEDED, std::memory_order_release);
284                     break;
285                 }
286             }
287         }
288 
289     protected:
290         template< typename R, typename B > friend class run_and_put_task;
291         template<typename X, typename Y> friend class broadcast_cache;
292         template<typename X, typename Y> friend class round_robin_cache;
293         graph_task* try_put_task( const T & ) override {
294             return nullptr;
295         }
296 
297         graph& graph_reference() const override {
298             return my_join->graph_ref;
299         }
300 
301     public:
302 
303         //! Constructor
304         reserving_port() : my_join(nullptr), my_predecessors(this), reserved(false) {
305             my_aggregator.initialize_handler(handler_type(this));
306         }
307 
308         // copy constructor
309         reserving_port(const reserving_port& /* other */) = delete;
310 
311         void set_join_node_pointer(reserving_forwarding_base *join) {
312             my_join = join;
313         }
314 
315         //! Add a predecessor
316         bool register_predecessor( predecessor_type &src ) override {
317             reserving_port_operation op_data(src, reg_pred);
318             my_aggregator.execute(&op_data);
319             return op_data.status == SUCCEEDED;
320         }
321 
322         //! Remove a predecessor
323         bool remove_predecessor( predecessor_type &src ) override {
324             reserving_port_operation op_data(src, rem_pred);
325             my_aggregator.execute(&op_data);
326             return op_data.status == SUCCEEDED;
327         }
328 
329         //! Reserve an item from the port
330         bool reserve( T &v ) {
331             reserving_port_operation op_data(v, res_item);
332             my_aggregator.execute(&op_data);
333             return op_data.status == SUCCEEDED;
334         }
335 
336         //! Release the port
337         void release( ) {
338             reserving_port_operation op_data(rel_res);
339             my_aggregator.execute(&op_data);
340         }
341 
342         //! Complete use of the port
343         void consume( ) {
344             reserving_port_operation op_data(con_res);
345             my_aggregator.execute(&op_data);
346         }
347 
348         void reset_receiver( reset_flags f) {
349             if(f & rf_clear_edges) my_predecessors.clear();
350             else
351             my_predecessors.reset();
352             reserved = false;
353             __TBB_ASSERT(!(f&rf_clear_edges) || my_predecessors.empty(), "port edges not removed");
354         }
355 
356     private:
357 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
358         friend class get_graph_helper;
359 #endif
360 
361         reserving_forwarding_base *my_join;
362         reservable_predecessor_cache< T, null_mutex > my_predecessors;
363         bool reserved;
364     };  // reserving_port
365 
366     //! queueing join_port
367     template<typename T>
368     class queueing_port : public receiver<T>, public item_buffer<T> {
369     public:
370         typedef T input_type;
371         typedef typename receiver<input_type>::predecessor_type predecessor_type;
372         typedef queueing_port<T> class_type;
373 
374     // ----------- Aggregator ------------
375     private:
376         enum op_type { get__item, res_port, try__put_task
377         };
378 
379         class queueing_port_operation : public aggregated_operation<queueing_port_operation> {
380         public:
381             char type;
382             T my_val;
383             T* my_arg;
384             graph_task* bypass_t;
385             // constructor for value parameter
386             queueing_port_operation(const T& e, op_type t) :
387                 type(char(t)), my_val(e)
388                 , bypass_t(nullptr)
389             {}
390             // constructor for pointer parameter
391             queueing_port_operation(const T* p, op_type t) :
392                 type(char(t)), my_arg(const_cast<T*>(p))
393                 , bypass_t(nullptr)
394             {}
395             // constructor with no parameter
396             queueing_port_operation(op_type t) : type(char(t))
397                 , bypass_t(nullptr)
398             {}
399         };
400 
401         typedef aggregating_functor<class_type, queueing_port_operation> handler_type;
402         friend class aggregating_functor<class_type, queueing_port_operation>;
403         aggregator<handler_type, queueing_port_operation> my_aggregator;
404 
405         void handle_operations(queueing_port_operation* op_list) {
406             queueing_port_operation *current;
407             bool was_empty;
408             while(op_list) {
409                 current = op_list;
410                 op_list = op_list->next;
411                 switch(current->type) {
412                 case try__put_task: {
413                         graph_task* rtask = nullptr;
414                         was_empty = this->buffer_empty();
415                         this->push_back(current->my_val);
416                         if (was_empty) rtask = my_join->decrement_port_count(false);
417                         else
418                             rtask = SUCCESSFULLY_ENQUEUED;
419                         current->bypass_t = rtask;
420                         current->status.store( SUCCEEDED, std::memory_order_release);
421                     }
422                     break;
423                 case get__item:
424                     if(!this->buffer_empty()) {
425                         *(current->my_arg) = this->front();
426                         current->status.store( SUCCEEDED, std::memory_order_release);
427                     }
428                     else {
429                         current->status.store( FAILED, std::memory_order_release);
430                     }
431                     break;
432                 case res_port:
433                     __TBB_ASSERT(this->my_item_valid(this->my_head), "No item to reset");
434                     this->destroy_front();
435                     if(this->my_item_valid(this->my_head)) {
436                         (void)my_join->decrement_port_count(true);
437                     }
438                     current->status.store( SUCCEEDED, std::memory_order_release);
439                     break;
440                 }
441             }
442         }
443     // ------------ End Aggregator ---------------
444 
445     protected:
446         template< typename R, typename B > friend class run_and_put_task;
447         template<typename X, typename Y> friend class broadcast_cache;
448         template<typename X, typename Y> friend class round_robin_cache;
449         graph_task* try_put_task(const T &v) override {
450             queueing_port_operation op_data(v, try__put_task);
451             my_aggregator.execute(&op_data);
452             __TBB_ASSERT(op_data.status == SUCCEEDED || !op_data.bypass_t, "inconsistent return from aggregator");
453             if(!op_data.bypass_t) return SUCCESSFULLY_ENQUEUED;
454             return op_data.bypass_t;
455         }
456 
457         graph& graph_reference() const override {
458             return my_join->graph_ref;
459         }
460 
461     public:
462 
463         //! Constructor
464         queueing_port() : item_buffer<T>() {
465             my_join = nullptr;
466             my_aggregator.initialize_handler(handler_type(this));
467         }
468 
469         //! copy constructor
470         queueing_port(const queueing_port& /* other */) = delete;
471 
472         //! record parent for tallying available items
473         void set_join_node_pointer(queueing_forwarding_base *join) {
474             my_join = join;
475         }
476 
477         bool get_item( T &v ) {
478             queueing_port_operation op_data(&v, get__item);
479             my_aggregator.execute(&op_data);
480             return op_data.status == SUCCEEDED;
481         }
482 
483         // reset_port is called when item is accepted by successor, but
484         // is initiated by join_node.
485         void reset_port() {
486             queueing_port_operation op_data(res_port);
487             my_aggregator.execute(&op_data);
488             return;
489         }
490 
491         void reset_receiver(reset_flags) {
492             item_buffer<T>::reset();
493         }
494 
495     private:
496 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
497         friend class get_graph_helper;
498 #endif
499 
500         queueing_forwarding_base *my_join;
501     };  // queueing_port
502 
503 #include "_flow_graph_tagged_buffer_impl.h"
504 
505     template<typename K>
506     struct count_element {
507         K my_key;
508         size_t my_value;
509     };
510 
511     // method to access the key in the counting table
512     // the ref has already been removed from K
513     template< typename K >
514     struct key_to_count_functor {
515         typedef count_element<K> table_item_type;
516         const K& operator()(const table_item_type& v) { return v.my_key; }
517     };
518 
519     // the ports can have only one template parameter.  We wrap the types needed in
520     // a traits type
521     template< class TraitsType >
522     class key_matching_port :
523         public receiver<typename TraitsType::T>,
524         public hash_buffer< typename TraitsType::K, typename TraitsType::T, typename TraitsType::TtoK,
525                 typename TraitsType::KHash > {
526     public:
527         typedef TraitsType traits;
528         typedef key_matching_port<traits> class_type;
529         typedef typename TraitsType::T input_type;
530         typedef typename TraitsType::K key_type;
531         typedef typename std::decay<key_type>::type noref_key_type;
532         typedef typename receiver<input_type>::predecessor_type predecessor_type;
533         typedef typename TraitsType::TtoK type_to_key_func_type;
534         typedef typename TraitsType::KHash hash_compare_type;
535         typedef hash_buffer< key_type, input_type, type_to_key_func_type, hash_compare_type > buffer_type;
536 
537     private:
538 // ----------- Aggregator ------------
539     private:
540         enum op_type { try__put, get__item, res_port
541         };
542 
543         class key_matching_port_operation : public aggregated_operation<key_matching_port_operation> {
544         public:
545             char type;
546             input_type my_val;
547             input_type *my_arg;
548             // constructor for value parameter
549             key_matching_port_operation(const input_type& e, op_type t) :
550                 type(char(t)), my_val(e) {}
551             // constructor for pointer parameter
552             key_matching_port_operation(const input_type* p, op_type t) :
553                 type(char(t)), my_arg(const_cast<input_type*>(p)) {}
554             // constructor with no parameter
555             key_matching_port_operation(op_type t) : type(char(t)) {}
556         };
557 
558         typedef aggregating_functor<class_type, key_matching_port_operation> handler_type;
559         friend class aggregating_functor<class_type, key_matching_port_operation>;
560         aggregator<handler_type, key_matching_port_operation> my_aggregator;
561 
562         void handle_operations(key_matching_port_operation* op_list) {
563             key_matching_port_operation *current;
564             while(op_list) {
565                 current = op_list;
566                 op_list = op_list->next;
567                 switch(current->type) {
568                 case try__put: {
569                         bool was_inserted = this->insert_with_key(current->my_val);
570                         // return failure if a duplicate insertion occurs
571                         current->status.store( was_inserted ? SUCCEEDED : FAILED, std::memory_order_release);
572                     }
573                     break;
574                 case get__item:
575                     // use current_key from FE for item
576                     if(!this->find_with_key(my_join->current_key, *(current->my_arg))) {
577                         __TBB_ASSERT(false, "Failed to find item corresponding to current_key.");
578                     }
579                     current->status.store( SUCCEEDED, std::memory_order_release);
580                     break;
581                 case res_port:
582                     // use current_key from FE for item
583                     this->delete_with_key(my_join->current_key);
584                     current->status.store( SUCCEEDED, std::memory_order_release);
585                     break;
586                 }
587             }
588         }
589 // ------------ End Aggregator ---------------
590     protected:
591         template< typename R, typename B > friend class run_and_put_task;
592         template<typename X, typename Y> friend class broadcast_cache;
593         template<typename X, typename Y> friend class round_robin_cache;
594         graph_task* try_put_task(const input_type& v) override {
595             key_matching_port_operation op_data(v, try__put);
596             graph_task* rtask = nullptr;
597             my_aggregator.execute(&op_data);
598             if(op_data.status == SUCCEEDED) {
599                 rtask = my_join->increment_key_count((*(this->get_key_func()))(v));  // may spawn
600                 // rtask has to reflect the return status of the try_put
601                 if(!rtask) rtask = SUCCESSFULLY_ENQUEUED;
602             }
603             return rtask;
604         }
605 
606         graph& graph_reference() const override {
607             return my_join->graph_ref;
608         }
609 
610     public:
611 
612         key_matching_port() : receiver<input_type>(), buffer_type() {
613             my_join = nullptr;
614             my_aggregator.initialize_handler(handler_type(this));
615         }
616 
617         // copy constructor
618         key_matching_port(const key_matching_port& /*other*/) = delete;
619 #if __INTEL_COMPILER <= 2021
620         // Suppress superfluous diagnostic about virtual keyword absence in a destructor of an inherited
621         // class while the parent class has the virtual keyword for the destrocutor.
622         virtual
623 #endif
624         ~key_matching_port() { }
625 
626         void set_join_node_pointer(forwarding_base *join) {
627             my_join = dynamic_cast<matching_forwarding_base<key_type>*>(join);
628         }
629 
630         void set_my_key_func(type_to_key_func_type *f) { this->set_key_func(f); }
631 
632         type_to_key_func_type* get_my_key_func() { return this->get_key_func(); }
633 
634         bool get_item( input_type &v ) {
635             // aggregator uses current_key from FE for Key
636             key_matching_port_operation op_data(&v, get__item);
637             my_aggregator.execute(&op_data);
638             return op_data.status == SUCCEEDED;
639         }
640 
641         // reset_port is called when item is accepted by successor, but
642         // is initiated by join_node.
643         void reset_port() {
644             key_matching_port_operation op_data(res_port);
645             my_aggregator.execute(&op_data);
646             return;
647         }
648 
649         void reset_receiver(reset_flags ) {
650             buffer_type::reset();
651         }
652 
653     private:
654         // my_join forwarding base used to count number of inputs that
655         // received key.
656         matching_forwarding_base<key_type> *my_join;
657     };  // key_matching_port
658 
659     using namespace graph_policy_namespace;
660 
661     template<typename JP, typename InputTuple, typename OutputTuple>
662     class join_node_base;
663 
664     //! join_node_FE : implements input port policy
665     template<typename JP, typename InputTuple, typename OutputTuple>
666     class join_node_FE;
667 
668     template<typename InputTuple, typename OutputTuple>
669     class join_node_FE<reserving, InputTuple, OutputTuple> : public reserving_forwarding_base {
670     public:
671         static const int N = std::tuple_size<OutputTuple>::value;
672         typedef OutputTuple output_type;
673         typedef InputTuple input_type;
674         typedef join_node_base<reserving, InputTuple, OutputTuple> base_node_type; // for forwarding
675 
676         join_node_FE(graph &g) : reserving_forwarding_base(g), my_node(nullptr) {
677             ports_with_no_inputs = N;
678             join_helper<N>::set_join_node_pointer(my_inputs, this);
679         }
680 
681         join_node_FE(const join_node_FE& other) : reserving_forwarding_base((other.reserving_forwarding_base::graph_ref)), my_node(nullptr) {
682             ports_with_no_inputs = N;
683             join_helper<N>::set_join_node_pointer(my_inputs, this);
684         }
685 
686         void set_my_node(base_node_type *new_my_node) { my_node = new_my_node; }
687 
688        void increment_port_count() override {
689             ++ports_with_no_inputs;
690         }
691 
692         // if all input_ports have predecessors, spawn forward to try and consume tuples
693         graph_task* decrement_port_count() override {
694             if(ports_with_no_inputs.fetch_sub(1) == 1) {
695                 if(is_graph_active(this->graph_ref)) {
696                     small_object_allocator allocator{};
697                     typedef forward_task_bypass<base_node_type> task_type;
698                     graph_task* t = allocator.new_object<task_type>(graph_ref, allocator, *my_node);
699                     graph_ref.reserve_wait();
700                     spawn_in_graph_arena(this->graph_ref, *t);
701                 }
702             }
703             return nullptr;
704         }
705 
706         input_type &input_ports() { return my_inputs; }
707 
708     protected:
709 
710         void reset(  reset_flags f) {
711             // called outside of parallel contexts
712             ports_with_no_inputs = N;
713             join_helper<N>::reset_inputs(my_inputs, f);
714         }
715 
716         // all methods on input ports should be called under mutual exclusion from join_node_base.
717 
718         bool tuple_build_may_succeed() {
719             return !ports_with_no_inputs;
720         }
721 
722         bool try_to_make_tuple(output_type &out) {
723             if(ports_with_no_inputs) return false;
724             return join_helper<N>::reserve(my_inputs, out);
725         }
726 
727         void tuple_accepted() {
728             join_helper<N>::consume_reservations(my_inputs);
729         }
730         void tuple_rejected() {
731             join_helper<N>::release_reservations(my_inputs);
732         }
733 
734         input_type my_inputs;
735         base_node_type *my_node;
736         std::atomic<std::size_t> ports_with_no_inputs;
737     };  // join_node_FE<reserving, ... >
738 
739     template<typename InputTuple, typename OutputTuple>
740     class join_node_FE<queueing, InputTuple, OutputTuple> : public queueing_forwarding_base {
741     public:
742         static const int N = std::tuple_size<OutputTuple>::value;
743         typedef OutputTuple output_type;
744         typedef InputTuple input_type;
745         typedef join_node_base<queueing, InputTuple, OutputTuple> base_node_type; // for forwarding
746 
747         join_node_FE(graph &g) : queueing_forwarding_base(g), my_node(nullptr) {
748             ports_with_no_items = N;
749             join_helper<N>::set_join_node_pointer(my_inputs, this);
750         }
751 
752         join_node_FE(const join_node_FE& other) : queueing_forwarding_base((other.queueing_forwarding_base::graph_ref)), my_node(nullptr) {
753             ports_with_no_items = N;
754             join_helper<N>::set_join_node_pointer(my_inputs, this);
755         }
756 
757         // needed for forwarding
758         void set_my_node(base_node_type *new_my_node) { my_node = new_my_node; }
759 
760         void reset_port_count() {
761             ports_with_no_items = N;
762         }
763 
764         // if all input_ports have items, spawn forward to try and consume tuples
765         graph_task* decrement_port_count(bool handle_task) override
766         {
767             if(ports_with_no_items.fetch_sub(1) == 1) {
768                 if(is_graph_active(this->graph_ref)) {
769                     small_object_allocator allocator{};
770                     typedef forward_task_bypass<base_node_type> task_type;
771                     graph_task* t = allocator.new_object<task_type>(graph_ref, allocator, *my_node);
772                     graph_ref.reserve_wait();
773                     if( !handle_task )
774                         return t;
775                     spawn_in_graph_arena(this->graph_ref, *t);
776                 }
777             }
778             return nullptr;
779         }
780 
781         input_type &input_ports() { return my_inputs; }
782 
783     protected:
784 
785         void reset(  reset_flags f) {
786             reset_port_count();
787             join_helper<N>::reset_inputs(my_inputs, f );
788         }
789 
790         // all methods on input ports should be called under mutual exclusion from join_node_base.
791 
792         bool tuple_build_may_succeed() {
793             return !ports_with_no_items;
794         }
795 
796         bool try_to_make_tuple(output_type &out) {
797             if(ports_with_no_items) return false;
798             return join_helper<N>::get_items(my_inputs, out);
799         }
800 
801         void tuple_accepted() {
802             reset_port_count();
803             join_helper<N>::reset_ports(my_inputs);
804         }
805         void tuple_rejected() {
806             // nothing to do.
807         }
808 
809         input_type my_inputs;
810         base_node_type *my_node;
811         std::atomic<std::size_t> ports_with_no_items;
812     };  // join_node_FE<queueing, ...>
813 
814     // key_matching join front-end.
815     template<typename InputTuple, typename OutputTuple, typename K, typename KHash>
816     class join_node_FE<key_matching<K,KHash>, InputTuple, OutputTuple> : public matching_forwarding_base<K>,
817              // buffer of key value counts
818               public hash_buffer<   // typedefed below to key_to_count_buffer_type
819                   typename std::decay<K>::type&,        // force ref type on K
820                   count_element<typename std::decay<K>::type>,
821                   type_to_key_function_body<
822                       count_element<typename std::decay<K>::type>,
823                       typename std::decay<K>::type& >,
824                   KHash >,
825              // buffer of output items
826              public item_buffer<OutputTuple> {
827     public:
828         static const int N = std::tuple_size<OutputTuple>::value;
829         typedef OutputTuple output_type;
830         typedef InputTuple input_type;
831         typedef K key_type;
832         typedef typename std::decay<key_type>::type unref_key_type;
833         typedef KHash key_hash_compare;
834         // must use K without ref.
835         typedef count_element<unref_key_type> count_element_type;
836         // method that lets us refer to the key of this type.
837         typedef key_to_count_functor<unref_key_type> key_to_count_func;
838         typedef type_to_key_function_body< count_element_type, unref_key_type&> TtoK_function_body_type;
839         typedef type_to_key_function_body_leaf<count_element_type, unref_key_type&, key_to_count_func> TtoK_function_body_leaf_type;
840         // this is the type of the special table that keeps track of the number of discrete
841         // elements corresponding to each key that we've seen.
842         typedef hash_buffer< unref_key_type&, count_element_type, TtoK_function_body_type, key_hash_compare >
843                  key_to_count_buffer_type;
844         typedef item_buffer<output_type> output_buffer_type;
845         typedef join_node_base<key_matching<key_type,key_hash_compare>, InputTuple, OutputTuple> base_node_type; // for forwarding
846         typedef matching_forwarding_base<key_type> forwarding_base_type;
847 
848 // ----------- Aggregator ------------
849         // the aggregator is only needed to serialize the access to the hash table.
850         // and the output_buffer_type base class
851     private:
852         enum op_type { res_count, inc_count, may_succeed, try_make };
853         typedef join_node_FE<key_matching<key_type,key_hash_compare>, InputTuple, OutputTuple> class_type;
854 
855         class key_matching_FE_operation : public aggregated_operation<key_matching_FE_operation> {
856         public:
857             char type;
858             unref_key_type my_val;
859             output_type* my_output;
860             graph_task* bypass_t;
861             // constructor for value parameter
862             key_matching_FE_operation(const unref_key_type& e , op_type t) : type(char(t)), my_val(e),
863                  my_output(nullptr), bypass_t(nullptr) {}
864             key_matching_FE_operation(output_type *p, op_type t) : type(char(t)), my_output(p), bypass_t(nullptr) {}
865             // constructor with no parameter
866             key_matching_FE_operation(op_type t) : type(char(t)), my_output(nullptr), bypass_t(nullptr) {}
867         };
868 
869         typedef aggregating_functor<class_type, key_matching_FE_operation> handler_type;
870         friend class aggregating_functor<class_type, key_matching_FE_operation>;
871         aggregator<handler_type, key_matching_FE_operation> my_aggregator;
872 
873         // called from aggregator, so serialized
874         // returns a task pointer if the a task would have been enqueued but we asked that
875         // it be returned.  Otherwise returns nullptr.
876         graph_task* fill_output_buffer(unref_key_type &t) {
877             output_type l_out;
878             graph_task* rtask = nullptr;
879             bool do_fwd = this->buffer_empty() && is_graph_active(this->graph_ref);
880             this->current_key = t;
881             this->delete_with_key(this->current_key);   // remove the key
882             if(join_helper<N>::get_items(my_inputs, l_out)) {  //  <== call back
883                 this->push_back(l_out);
884                 if(do_fwd) {  // we enqueue if receiving an item from predecessor, not if successor asks for item
885                     small_object_allocator allocator{};
886                     typedef forward_task_bypass<base_node_type> task_type;
887                     rtask = allocator.new_object<task_type>(this->graph_ref, allocator, *my_node);
888                     this->graph_ref.reserve_wait();
889                     do_fwd = false;
890                 }
891                 // retire the input values
892                 join_helper<N>::reset_ports(my_inputs);  //  <== call back
893             }
894             else {
895                 __TBB_ASSERT(false, "should have had something to push");
896             }
897             return rtask;
898         }
899 
900         void handle_operations(key_matching_FE_operation* op_list) {
901             key_matching_FE_operation *current;
902             while(op_list) {
903                 current = op_list;
904                 op_list = op_list->next;
905                 switch(current->type) {
906                 case res_count:  // called from BE
907                     {
908                         this->destroy_front();
909                         current->status.store( SUCCEEDED, std::memory_order_release);
910                     }
911                     break;
912                 case inc_count: {  // called from input ports
913                         count_element_type *p = 0;
914                         unref_key_type &t = current->my_val;
915                         if(!(this->find_ref_with_key(t,p))) {
916                             count_element_type ev;
917                             ev.my_key = t;
918                             ev.my_value = 0;
919                             this->insert_with_key(ev);
920                             bool found = this->find_ref_with_key(t, p);
921                             __TBB_ASSERT_EX(found, "should find key after inserting it");
922                         }
923                         if(++(p->my_value) == size_t(N)) {
924                             current->bypass_t = fill_output_buffer(t);
925                         }
926                     }
927                     current->status.store( SUCCEEDED, std::memory_order_release);
928                     break;
929                 case may_succeed:  // called from BE
930                     current->status.store( this->buffer_empty() ? FAILED : SUCCEEDED, std::memory_order_release);
931                     break;
932                 case try_make:  // called from BE
933                     if(this->buffer_empty()) {
934                         current->status.store( FAILED, std::memory_order_release);
935                     }
936                     else {
937                         *(current->my_output) = this->front();
938                         current->status.store( SUCCEEDED, std::memory_order_release);
939                     }
940                     break;
941                 }
942             }
943         }
944 // ------------ End Aggregator ---------------
945 
946     public:
947         template<typename FunctionTuple>
948         join_node_FE(graph &g, FunctionTuple &TtoK_funcs) : forwarding_base_type(g), my_node(nullptr) {
949             join_helper<N>::set_join_node_pointer(my_inputs, this);
950             join_helper<N>::set_key_functors(my_inputs, TtoK_funcs);
951             my_aggregator.initialize_handler(handler_type(this));
952                     TtoK_function_body_type *cfb = new TtoK_function_body_leaf_type(key_to_count_func());
953             this->set_key_func(cfb);
954         }
955 
956         join_node_FE(const join_node_FE& other) : forwarding_base_type((other.forwarding_base_type::graph_ref)), key_to_count_buffer_type(),
957         output_buffer_type() {
958             my_node = nullptr;
959             join_helper<N>::set_join_node_pointer(my_inputs, this);
960             join_helper<N>::copy_key_functors(my_inputs, const_cast<input_type &>(other.my_inputs));
961             my_aggregator.initialize_handler(handler_type(this));
962             TtoK_function_body_type *cfb = new TtoK_function_body_leaf_type(key_to_count_func());
963             this->set_key_func(cfb);
964         }
965 
966         // needed for forwarding
967         void set_my_node(base_node_type *new_my_node) { my_node = new_my_node; }
968 
969         void reset_port_count() {  // called from BE
970             key_matching_FE_operation op_data(res_count);
971             my_aggregator.execute(&op_data);
972             return;
973         }
974 
975         // if all input_ports have items, spawn forward to try and consume tuples
976         // return a task if we are asked and did create one.
977         graph_task *increment_key_count(unref_key_type const & t) override {  // called from input_ports
978             key_matching_FE_operation op_data(t, inc_count);
979             my_aggregator.execute(&op_data);
980             return op_data.bypass_t;
981         }
982 
983         input_type &input_ports() { return my_inputs; }
984 
985     protected:
986 
987         void reset(  reset_flags f ) {
988             // called outside of parallel contexts
989             join_helper<N>::reset_inputs(my_inputs, f);
990 
991             key_to_count_buffer_type::reset();
992             output_buffer_type::reset();
993         }
994 
995         // all methods on input ports should be called under mutual exclusion from join_node_base.
996 
997         bool tuple_build_may_succeed() {  // called from back-end
998             key_matching_FE_operation op_data(may_succeed);
999             my_aggregator.execute(&op_data);
1000             return op_data.status == SUCCEEDED;
1001         }
1002 
1003         // cannot lock while calling back to input_ports.  current_key will only be set
1004         // and reset under the aggregator, so it will remain consistent.
1005         bool try_to_make_tuple(output_type &out) {
1006             key_matching_FE_operation op_data(&out,try_make);
1007             my_aggregator.execute(&op_data);
1008             return op_data.status == SUCCEEDED;
1009         }
1010 
1011         void tuple_accepted() {
1012             reset_port_count();  // reset current_key after ports reset.
1013         }
1014 
1015         void tuple_rejected() {
1016             // nothing to do.
1017         }
1018 
1019         input_type my_inputs;  // input ports
1020         base_node_type *my_node;
1021     }; // join_node_FE<key_matching<K,KHash>, InputTuple, OutputTuple>
1022 
1023     //! join_node_base
1024     template<typename JP, typename InputTuple, typename OutputTuple>
1025     class join_node_base : public graph_node, public join_node_FE<JP, InputTuple, OutputTuple>,
1026                            public sender<OutputTuple> {
1027     protected:
1028         using graph_node::my_graph;
1029     public:
1030         typedef OutputTuple output_type;
1031 
1032         typedef typename sender<output_type>::successor_type successor_type;
1033         typedef join_node_FE<JP, InputTuple, OutputTuple> input_ports_type;
1034         using input_ports_type::tuple_build_may_succeed;
1035         using input_ports_type::try_to_make_tuple;
1036         using input_ports_type::tuple_accepted;
1037         using input_ports_type::tuple_rejected;
1038 
1039     private:
1040         // ----------- Aggregator ------------
1041         enum op_type { reg_succ, rem_succ, try__get, do_fwrd, do_fwrd_bypass
1042         };
1043         typedef join_node_base<JP,InputTuple,OutputTuple> class_type;
1044 
1045         class join_node_base_operation : public aggregated_operation<join_node_base_operation> {
1046         public:
1047             char type;
1048             union {
1049                 output_type *my_arg;
1050                 successor_type *my_succ;
1051             };
1052             graph_task* bypass_t;
1053             join_node_base_operation(const output_type& e, op_type t) : type(char(t)),
1054                 my_arg(const_cast<output_type*>(&e)), bypass_t(nullptr) {}
1055             join_node_base_operation(const successor_type &s, op_type t) : type(char(t)),
1056                 my_succ(const_cast<successor_type *>(&s)), bypass_t(nullptr) {}
1057             join_node_base_operation(op_type t) : type(char(t)), bypass_t(nullptr) {}
1058         };
1059 
1060         typedef aggregating_functor<class_type, join_node_base_operation> handler_type;
1061         friend class aggregating_functor<class_type, join_node_base_operation>;
1062         bool forwarder_busy;
1063         aggregator<handler_type, join_node_base_operation> my_aggregator;
1064 
1065         void handle_operations(join_node_base_operation* op_list) {
1066             join_node_base_operation *current;
1067             while(op_list) {
1068                 current = op_list;
1069                 op_list = op_list->next;
1070                 switch(current->type) {
1071                 case reg_succ: {
1072                         my_successors.register_successor(*(current->my_succ));
1073                         if(tuple_build_may_succeed() && !forwarder_busy && is_graph_active(my_graph)) {
1074                             small_object_allocator allocator{};
1075                             typedef forward_task_bypass< join_node_base<JP, InputTuple, OutputTuple> > task_type;
1076                             graph_task* t = allocator.new_object<task_type>(my_graph, allocator, *this);
1077                             my_graph.reserve_wait();
1078                             spawn_in_graph_arena(my_graph, *t);
1079                             forwarder_busy = true;
1080                         }
1081                         current->status.store( SUCCEEDED, std::memory_order_release);
1082                     }
1083                     break;
1084                 case rem_succ:
1085                     my_successors.remove_successor(*(current->my_succ));
1086                     current->status.store( SUCCEEDED, std::memory_order_release);
1087                     break;
1088                 case try__get:
1089                     if(tuple_build_may_succeed()) {
1090                         if(try_to_make_tuple(*(current->my_arg))) {
1091                             tuple_accepted();
1092                             current->status.store( SUCCEEDED, std::memory_order_release);
1093                         }
1094                         else current->status.store( FAILED, std::memory_order_release);
1095                     }
1096                     else current->status.store( FAILED, std::memory_order_release);
1097                     break;
1098                 case do_fwrd_bypass: {
1099                         bool build_succeeded;
1100                         graph_task *last_task = nullptr;
1101                         output_type out;
1102                         // forwarding must be exclusive, because try_to_make_tuple and tuple_accepted
1103                         // are separate locked methods in the FE.  We could conceivably fetch the front
1104                         // of the FE queue, then be swapped out, have someone else consume the FE's
1105                         // object, then come back, forward, and then try to remove it from the queue
1106                         // again. Without reservation of the FE, the methods accessing it must be locked.
1107                         // We could remember the keys of the objects we forwarded, and then remove
1108                         // them from the input ports after forwarding is complete?
1109                         if(tuple_build_may_succeed()) {  // checks output queue of FE
1110                             do {
1111                                 build_succeeded = try_to_make_tuple(out);  // fetch front_end of queue
1112                                 if(build_succeeded) {
1113                                     graph_task *new_task = my_successors.try_put_task(out);
1114                                     last_task = combine_tasks(my_graph, last_task, new_task);
1115                                     if(new_task) {
1116                                         tuple_accepted();
1117                                     }
1118                                     else {
1119                                         tuple_rejected();
1120                                         build_succeeded = false;
1121                                     }
1122                                 }
1123                             } while(build_succeeded);
1124                         }
1125                         current->bypass_t = last_task;
1126                         current->status.store( SUCCEEDED, std::memory_order_release);
1127                         forwarder_busy = false;
1128                     }
1129                     break;
1130                 }
1131             }
1132         }
1133         // ---------- end aggregator -----------
1134     public:
1135         join_node_base(graph &g)
1136             : graph_node(g), input_ports_type(g), forwarder_busy(false), my_successors(this)
1137         {
1138             input_ports_type::set_my_node(this);
1139             my_aggregator.initialize_handler(handler_type(this));
1140         }
1141 
1142         join_node_base(const join_node_base& other) :
1143             graph_node(other.graph_node::my_graph), input_ports_type(other),
1144             sender<OutputTuple>(), forwarder_busy(false), my_successors(this)
1145         {
1146             input_ports_type::set_my_node(this);
1147             my_aggregator.initialize_handler(handler_type(this));
1148         }
1149 
1150         template<typename FunctionTuple>
1151         join_node_base(graph &g, FunctionTuple f)
1152             : graph_node(g), input_ports_type(g, f), forwarder_busy(false), my_successors(this)
1153         {
1154             input_ports_type::set_my_node(this);
1155             my_aggregator.initialize_handler(handler_type(this));
1156         }
1157 
1158         bool register_successor(successor_type &r) override {
1159             join_node_base_operation op_data(r, reg_succ);
1160             my_aggregator.execute(&op_data);
1161             return op_data.status == SUCCEEDED;
1162         }
1163 
1164         bool remove_successor( successor_type &r) override {
1165             join_node_base_operation op_data(r, rem_succ);
1166             my_aggregator.execute(&op_data);
1167             return op_data.status == SUCCEEDED;
1168         }
1169 
1170         bool try_get( output_type &v) override {
1171             join_node_base_operation op_data(v, try__get);
1172             my_aggregator.execute(&op_data);
1173             return op_data.status == SUCCEEDED;
1174         }
1175 
1176     protected:
1177         void reset_node(reset_flags f) override {
1178             input_ports_type::reset(f);
1179             if(f & rf_clear_edges) my_successors.clear();
1180         }
1181 
1182     private:
1183         broadcast_cache<output_type, null_rw_mutex> my_successors;
1184 
1185         friend class forward_task_bypass< join_node_base<JP, InputTuple, OutputTuple> >;
1186         graph_task *forward_task() {
1187             join_node_base_operation op_data(do_fwrd_bypass);
1188             my_aggregator.execute(&op_data);
1189             return op_data.bypass_t;
1190         }
1191 
1192     };  // join_node_base
1193 
1194     // join base class type generator
1195     template<int N, template<class> class PT, typename OutputTuple, typename JP>
1196     struct join_base {
1197         typedef join_node_base<JP, typename wrap_tuple_elements<N,PT,OutputTuple>::type, OutputTuple> type;
1198     };
1199 
1200     template<int N, typename OutputTuple, typename K, typename KHash>
1201     struct join_base<N, key_matching_port, OutputTuple, key_matching<K,KHash> > {
1202         typedef key_matching<K, KHash> key_traits_type;
1203         typedef K key_type;
1204         typedef KHash key_hash_compare;
1205         typedef join_node_base< key_traits_type,
1206                 // ports type
1207                 typename wrap_key_tuple_elements<N,key_matching_port,key_traits_type,OutputTuple>::type,
1208                 OutputTuple > type;
1209     };
1210 
1211     //! unfolded_join_node : passes input_ports_type to join_node_base.  We build the input port type
1212     //  using tuple_element.  The class PT is the port type (reserving_port, queueing_port, key_matching_port)
1213     //  and should match the typename.
1214 
1215     template<int N, template<class> class PT, typename OutputTuple, typename JP>
1216     class unfolded_join_node : public join_base<N,PT,OutputTuple,JP>::type {
1217     public:
1218         typedef typename wrap_tuple_elements<N, PT, OutputTuple>::type input_ports_type;
1219         typedef OutputTuple output_type;
1220     private:
1221         typedef join_node_base<JP, input_ports_type, output_type > base_type;
1222     public:
1223         unfolded_join_node(graph &g) : base_type(g) {}
1224         unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1225     };
1226 
1227 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1228     template <typename K, typename T>
1229     struct key_from_message_body {
1230         K operator()(const T& t) const {
1231             return key_from_message<K>(t);
1232         }
1233     };
1234     // Adds const to reference type
1235     template <typename K, typename T>
1236     struct key_from_message_body<K&,T> {
1237         const K& operator()(const T& t) const {
1238             return key_from_message<const K&>(t);
1239         }
1240     };
1241 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
1242     // key_matching unfolded_join_node.  This must be a separate specialization because the constructors
1243     // differ.
1244 
1245     template<typename OutputTuple, typename K, typename KHash>
1246     class unfolded_join_node<2,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1247             join_base<2,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1248         typedef typename std::tuple_element<0, OutputTuple>::type T0;
1249         typedef typename std::tuple_element<1, OutputTuple>::type T1;
1250     public:
1251         typedef typename wrap_key_tuple_elements<2,key_matching_port,key_matching<K,KHash>,OutputTuple>::type input_ports_type;
1252         typedef OutputTuple output_type;
1253     private:
1254         typedef join_node_base<key_matching<K,KHash>, input_ports_type, output_type > base_type;
1255         typedef type_to_key_function_body<T0, K> *f0_p;
1256         typedef type_to_key_function_body<T1, K> *f1_p;
1257         typedef std::tuple< f0_p, f1_p > func_initializer_type;
1258     public:
1259 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1260         unfolded_join_node(graph &g) : base_type(g,
1261                 func_initializer_type(
1262                     new type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1263                     new type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>())
1264                     ) ) {
1265         }
1266 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
1267         template<typename Body0, typename Body1>
1268         unfolded_join_node(graph &g, Body0 body0, Body1 body1) : base_type(g,
1269                 func_initializer_type(
1270                     new type_to_key_function_body_leaf<T0, K, Body0>(body0),
1271                     new type_to_key_function_body_leaf<T1, K, Body1>(body1)
1272                     ) ) {
1273             static_assert(std::tuple_size<OutputTuple>::value == 2, "wrong number of body initializers");
1274         }
1275         unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1276     };
1277 
1278     template<typename OutputTuple, typename K, typename KHash>
1279     class unfolded_join_node<3,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1280             join_base<3,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1281         typedef typename std::tuple_element<0, OutputTuple>::type T0;
1282         typedef typename std::tuple_element<1, OutputTuple>::type T1;
1283         typedef typename std::tuple_element<2, OutputTuple>::type T2;
1284     public:
1285         typedef typename wrap_key_tuple_elements<3,key_matching_port,key_matching<K,KHash>,OutputTuple>::type input_ports_type;
1286         typedef OutputTuple output_type;
1287     private:
1288         typedef join_node_base<key_matching<K,KHash>, input_ports_type, output_type > base_type;
1289         typedef type_to_key_function_body<T0, K> *f0_p;
1290         typedef type_to_key_function_body<T1, K> *f1_p;
1291         typedef type_to_key_function_body<T2, K> *f2_p;
1292         typedef std::tuple< f0_p, f1_p, f2_p > func_initializer_type;
1293     public:
1294 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1295         unfolded_join_node(graph &g) : base_type(g,
1296                 func_initializer_type(
1297                     new type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1298                     new type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()),
1299                     new type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>())
1300                     ) ) {
1301         }
1302 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
1303         template<typename Body0, typename Body1, typename Body2>
1304         unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2) : base_type(g,
1305                 func_initializer_type(
1306                     new type_to_key_function_body_leaf<T0, K, Body0>(body0),
1307                     new type_to_key_function_body_leaf<T1, K, Body1>(body1),
1308                     new type_to_key_function_body_leaf<T2, K, Body2>(body2)
1309                     ) ) {
1310             static_assert(std::tuple_size<OutputTuple>::value == 3, "wrong number of body initializers");
1311         }
1312         unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1313     };
1314 
1315     template<typename OutputTuple, typename K, typename KHash>
1316     class unfolded_join_node<4,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1317             join_base<4,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1318         typedef typename std::tuple_element<0, OutputTuple>::type T0;
1319         typedef typename std::tuple_element<1, OutputTuple>::type T1;
1320         typedef typename std::tuple_element<2, OutputTuple>::type T2;
1321         typedef typename std::tuple_element<3, OutputTuple>::type T3;
1322     public:
1323         typedef typename wrap_key_tuple_elements<4,key_matching_port,key_matching<K,KHash>,OutputTuple>::type input_ports_type;
1324         typedef OutputTuple output_type;
1325     private:
1326         typedef join_node_base<key_matching<K,KHash>, input_ports_type, output_type > base_type;
1327         typedef type_to_key_function_body<T0, K> *f0_p;
1328         typedef type_to_key_function_body<T1, K> *f1_p;
1329         typedef type_to_key_function_body<T2, K> *f2_p;
1330         typedef type_to_key_function_body<T3, K> *f3_p;
1331         typedef std::tuple< f0_p, f1_p, f2_p, f3_p > func_initializer_type;
1332     public:
1333 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1334         unfolded_join_node(graph &g) : base_type(g,
1335                 func_initializer_type(
1336                     new type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1337                     new type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()),
1338                     new type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()),
1339                     new type_to_key_function_body_leaf<T3, K, key_from_message_body<K,T3> >(key_from_message_body<K,T3>())
1340                     ) ) {
1341         }
1342 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
1343         template<typename Body0, typename Body1, typename Body2, typename Body3>
1344         unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3) : base_type(g,
1345                 func_initializer_type(
1346                     new type_to_key_function_body_leaf<T0, K, Body0>(body0),
1347                     new type_to_key_function_body_leaf<T1, K, Body1>(body1),
1348                     new type_to_key_function_body_leaf<T2, K, Body2>(body2),
1349                     new type_to_key_function_body_leaf<T3, K, Body3>(body3)
1350                     ) ) {
1351             static_assert(std::tuple_size<OutputTuple>::value == 4, "wrong number of body initializers");
1352         }
1353         unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1354     };
1355 
1356     template<typename OutputTuple, typename K, typename KHash>
1357     class unfolded_join_node<5,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1358             join_base<5,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1359         typedef typename std::tuple_element<0, OutputTuple>::type T0;
1360         typedef typename std::tuple_element<1, OutputTuple>::type T1;
1361         typedef typename std::tuple_element<2, OutputTuple>::type T2;
1362         typedef typename std::tuple_element<3, OutputTuple>::type T3;
1363         typedef typename std::tuple_element<4, OutputTuple>::type T4;
1364     public:
1365         typedef typename wrap_key_tuple_elements<5,key_matching_port,key_matching<K,KHash>,OutputTuple>::type input_ports_type;
1366         typedef OutputTuple output_type;
1367     private:
1368         typedef join_node_base<key_matching<K,KHash> , input_ports_type, output_type > base_type;
1369         typedef type_to_key_function_body<T0, K> *f0_p;
1370         typedef type_to_key_function_body<T1, K> *f1_p;
1371         typedef type_to_key_function_body<T2, K> *f2_p;
1372         typedef type_to_key_function_body<T3, K> *f3_p;
1373         typedef type_to_key_function_body<T4, K> *f4_p;
1374         typedef std::tuple< f0_p, f1_p, f2_p, f3_p, f4_p > func_initializer_type;
1375     public:
1376 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1377         unfolded_join_node(graph &g) : base_type(g,
1378                 func_initializer_type(
1379                     new type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1380                     new type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()),
1381                     new type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()),
1382                     new type_to_key_function_body_leaf<T3, K, key_from_message_body<K,T3> >(key_from_message_body<K,T3>()),
1383                     new type_to_key_function_body_leaf<T4, K, key_from_message_body<K,T4> >(key_from_message_body<K,T4>())
1384                     ) ) {
1385         }
1386 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
1387         template<typename Body0, typename Body1, typename Body2, typename Body3, typename Body4>
1388         unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4) : base_type(g,
1389                 func_initializer_type(
1390                     new type_to_key_function_body_leaf<T0, K, Body0>(body0),
1391                     new type_to_key_function_body_leaf<T1, K, Body1>(body1),
1392                     new type_to_key_function_body_leaf<T2, K, Body2>(body2),
1393                     new type_to_key_function_body_leaf<T3, K, Body3>(body3),
1394                     new type_to_key_function_body_leaf<T4, K, Body4>(body4)
1395                     ) ) {
1396             static_assert(std::tuple_size<OutputTuple>::value == 5, "wrong number of body initializers");
1397         }
1398         unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1399     };
1400 
1401 #if __TBB_VARIADIC_MAX >= 6
1402     template<typename OutputTuple, typename K, typename KHash>
1403     class unfolded_join_node<6,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1404             join_base<6,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1405         typedef typename std::tuple_element<0, OutputTuple>::type T0;
1406         typedef typename std::tuple_element<1, OutputTuple>::type T1;
1407         typedef typename std::tuple_element<2, OutputTuple>::type T2;
1408         typedef typename std::tuple_element<3, OutputTuple>::type T3;
1409         typedef typename std::tuple_element<4, OutputTuple>::type T4;
1410         typedef typename std::tuple_element<5, OutputTuple>::type T5;
1411     public:
1412         typedef typename wrap_key_tuple_elements<6,key_matching_port,key_matching<K,KHash>,OutputTuple>::type input_ports_type;
1413         typedef OutputTuple output_type;
1414     private:
1415         typedef join_node_base<key_matching<K,KHash> , input_ports_type, output_type > base_type;
1416         typedef type_to_key_function_body<T0, K> *f0_p;
1417         typedef type_to_key_function_body<T1, K> *f1_p;
1418         typedef type_to_key_function_body<T2, K> *f2_p;
1419         typedef type_to_key_function_body<T3, K> *f3_p;
1420         typedef type_to_key_function_body<T4, K> *f4_p;
1421         typedef type_to_key_function_body<T5, K> *f5_p;
1422         typedef std::tuple< f0_p, f1_p, f2_p, f3_p, f4_p, f5_p > func_initializer_type;
1423     public:
1424 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1425         unfolded_join_node(graph &g) : base_type(g,
1426                 func_initializer_type(
1427                     new type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1428                     new type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()),
1429                     new type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()),
1430                     new type_to_key_function_body_leaf<T3, K, key_from_message_body<K,T3> >(key_from_message_body<K,T3>()),
1431                     new type_to_key_function_body_leaf<T4, K, key_from_message_body<K,T4> >(key_from_message_body<K,T4>()),
1432                     new type_to_key_function_body_leaf<T5, K, key_from_message_body<K,T5> >(key_from_message_body<K,T5>())
1433                     ) ) {
1434         }
1435 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
1436         template<typename Body0, typename Body1, typename Body2, typename Body3, typename Body4, typename Body5>
1437         unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4, Body5 body5)
1438                 : base_type(g, func_initializer_type(
1439                     new type_to_key_function_body_leaf<T0, K, Body0>(body0),
1440                     new type_to_key_function_body_leaf<T1, K, Body1>(body1),
1441                     new type_to_key_function_body_leaf<T2, K, Body2>(body2),
1442                     new type_to_key_function_body_leaf<T3, K, Body3>(body3),
1443                     new type_to_key_function_body_leaf<T4, K, Body4>(body4),
1444                     new type_to_key_function_body_leaf<T5, K, Body5>(body5)
1445                     ) ) {
1446             static_assert(std::tuple_size<OutputTuple>::value == 6, "wrong number of body initializers");
1447         }
1448         unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1449     };
1450 #endif
1451 
1452 #if __TBB_VARIADIC_MAX >= 7
1453     template<typename OutputTuple, typename K, typename KHash>
1454     class unfolded_join_node<7,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1455             join_base<7,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1456         typedef typename std::tuple_element<0, OutputTuple>::type T0;
1457         typedef typename std::tuple_element<1, OutputTuple>::type T1;
1458         typedef typename std::tuple_element<2, OutputTuple>::type T2;
1459         typedef typename std::tuple_element<3, OutputTuple>::type T3;
1460         typedef typename std::tuple_element<4, OutputTuple>::type T4;
1461         typedef typename std::tuple_element<5, OutputTuple>::type T5;
1462         typedef typename std::tuple_element<6, OutputTuple>::type T6;
1463     public:
1464         typedef typename wrap_key_tuple_elements<7,key_matching_port,key_matching<K,KHash>,OutputTuple>::type input_ports_type;
1465         typedef OutputTuple output_type;
1466     private:
1467         typedef join_node_base<key_matching<K,KHash> , input_ports_type, output_type > base_type;
1468         typedef type_to_key_function_body<T0, K> *f0_p;
1469         typedef type_to_key_function_body<T1, K> *f1_p;
1470         typedef type_to_key_function_body<T2, K> *f2_p;
1471         typedef type_to_key_function_body<T3, K> *f3_p;
1472         typedef type_to_key_function_body<T4, K> *f4_p;
1473         typedef type_to_key_function_body<T5, K> *f5_p;
1474         typedef type_to_key_function_body<T6, K> *f6_p;
1475         typedef std::tuple< f0_p, f1_p, f2_p, f3_p, f4_p, f5_p, f6_p > func_initializer_type;
1476     public:
1477 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1478         unfolded_join_node(graph &g) : base_type(g,
1479                 func_initializer_type(
1480                     new type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1481                     new type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()),
1482                     new type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()),
1483                     new type_to_key_function_body_leaf<T3, K, key_from_message_body<K,T3> >(key_from_message_body<K,T3>()),
1484                     new type_to_key_function_body_leaf<T4, K, key_from_message_body<K,T4> >(key_from_message_body<K,T4>()),
1485                     new type_to_key_function_body_leaf<T5, K, key_from_message_body<K,T5> >(key_from_message_body<K,T5>()),
1486                     new type_to_key_function_body_leaf<T6, K, key_from_message_body<K,T6> >(key_from_message_body<K,T6>())
1487                     ) ) {
1488         }
1489 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
1490         template<typename Body0, typename Body1, typename Body2, typename Body3, typename Body4,
1491                  typename Body5, typename Body6>
1492         unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4,
1493                 Body5 body5, Body6 body6) : base_type(g, func_initializer_type(
1494                     new type_to_key_function_body_leaf<T0, K, Body0>(body0),
1495                     new type_to_key_function_body_leaf<T1, K, Body1>(body1),
1496                     new type_to_key_function_body_leaf<T2, K, Body2>(body2),
1497                     new type_to_key_function_body_leaf<T3, K, Body3>(body3),
1498                     new type_to_key_function_body_leaf<T4, K, Body4>(body4),
1499                     new type_to_key_function_body_leaf<T5, K, Body5>(body5),
1500                     new type_to_key_function_body_leaf<T6, K, Body6>(body6)
1501                     ) ) {
1502             static_assert(std::tuple_size<OutputTuple>::value == 7, "wrong number of body initializers");
1503         }
1504         unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1505     };
1506 #endif
1507 
1508 #if __TBB_VARIADIC_MAX >= 8
1509     template<typename OutputTuple, typename K, typename KHash>
1510     class unfolded_join_node<8,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1511             join_base<8,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1512         typedef typename std::tuple_element<0, OutputTuple>::type T0;
1513         typedef typename std::tuple_element<1, OutputTuple>::type T1;
1514         typedef typename std::tuple_element<2, OutputTuple>::type T2;
1515         typedef typename std::tuple_element<3, OutputTuple>::type T3;
1516         typedef typename std::tuple_element<4, OutputTuple>::type T4;
1517         typedef typename std::tuple_element<5, OutputTuple>::type T5;
1518         typedef typename std::tuple_element<6, OutputTuple>::type T6;
1519         typedef typename std::tuple_element<7, OutputTuple>::type T7;
1520     public:
1521         typedef typename wrap_key_tuple_elements<8,key_matching_port,key_matching<K,KHash>,OutputTuple>::type input_ports_type;
1522         typedef OutputTuple output_type;
1523     private:
1524         typedef join_node_base<key_matching<K,KHash> , input_ports_type, output_type > base_type;
1525         typedef type_to_key_function_body<T0, K> *f0_p;
1526         typedef type_to_key_function_body<T1, K> *f1_p;
1527         typedef type_to_key_function_body<T2, K> *f2_p;
1528         typedef type_to_key_function_body<T3, K> *f3_p;
1529         typedef type_to_key_function_body<T4, K> *f4_p;
1530         typedef type_to_key_function_body<T5, K> *f5_p;
1531         typedef type_to_key_function_body<T6, K> *f6_p;
1532         typedef type_to_key_function_body<T7, K> *f7_p;
1533         typedef std::tuple< f0_p, f1_p, f2_p, f3_p, f4_p, f5_p, f6_p, f7_p > func_initializer_type;
1534     public:
1535 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1536         unfolded_join_node(graph &g) : base_type(g,
1537                 func_initializer_type(
1538                     new type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1539                     new type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()),
1540                     new type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()),
1541                     new type_to_key_function_body_leaf<T3, K, key_from_message_body<K,T3> >(key_from_message_body<K,T3>()),
1542                     new type_to_key_function_body_leaf<T4, K, key_from_message_body<K,T4> >(key_from_message_body<K,T4>()),
1543                     new type_to_key_function_body_leaf<T5, K, key_from_message_body<K,T5> >(key_from_message_body<K,T5>()),
1544                     new type_to_key_function_body_leaf<T6, K, key_from_message_body<K,T6> >(key_from_message_body<K,T6>()),
1545                     new type_to_key_function_body_leaf<T7, K, key_from_message_body<K,T7> >(key_from_message_body<K,T7>())
1546                     ) ) {
1547         }
1548 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
1549         template<typename Body0, typename Body1, typename Body2, typename Body3, typename Body4,
1550                  typename Body5, typename Body6, typename Body7>
1551         unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4,
1552                 Body5 body5, Body6 body6, Body7 body7) : base_type(g, func_initializer_type(
1553                     new type_to_key_function_body_leaf<T0, K, Body0>(body0),
1554                     new type_to_key_function_body_leaf<T1, K, Body1>(body1),
1555                     new type_to_key_function_body_leaf<T2, K, Body2>(body2),
1556                     new type_to_key_function_body_leaf<T3, K, Body3>(body3),
1557                     new type_to_key_function_body_leaf<T4, K, Body4>(body4),
1558                     new type_to_key_function_body_leaf<T5, K, Body5>(body5),
1559                     new type_to_key_function_body_leaf<T6, K, Body6>(body6),
1560                     new type_to_key_function_body_leaf<T7, K, Body7>(body7)
1561                     ) ) {
1562             static_assert(std::tuple_size<OutputTuple>::value == 8, "wrong number of body initializers");
1563         }
1564         unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1565     };
1566 #endif
1567 
1568 #if __TBB_VARIADIC_MAX >= 9
1569     template<typename OutputTuple, typename K, typename KHash>
1570     class unfolded_join_node<9,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1571             join_base<9,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1572         typedef typename std::tuple_element<0, OutputTuple>::type T0;
1573         typedef typename std::tuple_element<1, OutputTuple>::type T1;
1574         typedef typename std::tuple_element<2, OutputTuple>::type T2;
1575         typedef typename std::tuple_element<3, OutputTuple>::type T3;
1576         typedef typename std::tuple_element<4, OutputTuple>::type T4;
1577         typedef typename std::tuple_element<5, OutputTuple>::type T5;
1578         typedef typename std::tuple_element<6, OutputTuple>::type T6;
1579         typedef typename std::tuple_element<7, OutputTuple>::type T7;
1580         typedef typename std::tuple_element<8, OutputTuple>::type T8;
1581     public:
1582         typedef typename wrap_key_tuple_elements<9,key_matching_port,key_matching<K,KHash>,OutputTuple>::type input_ports_type;
1583         typedef OutputTuple output_type;
1584     private:
1585         typedef join_node_base<key_matching<K,KHash> , input_ports_type, output_type > base_type;
1586         typedef type_to_key_function_body<T0, K> *f0_p;
1587         typedef type_to_key_function_body<T1, K> *f1_p;
1588         typedef type_to_key_function_body<T2, K> *f2_p;
1589         typedef type_to_key_function_body<T3, K> *f3_p;
1590         typedef type_to_key_function_body<T4, K> *f4_p;
1591         typedef type_to_key_function_body<T5, K> *f5_p;
1592         typedef type_to_key_function_body<T6, K> *f6_p;
1593         typedef type_to_key_function_body<T7, K> *f7_p;
1594         typedef type_to_key_function_body<T8, K> *f8_p;
1595         typedef std::tuple< f0_p, f1_p, f2_p, f3_p, f4_p, f5_p, f6_p, f7_p, f8_p > func_initializer_type;
1596     public:
1597 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1598         unfolded_join_node(graph &g) : base_type(g,
1599                 func_initializer_type(
1600                     new type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1601                     new type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()),
1602                     new type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()),
1603                     new type_to_key_function_body_leaf<T3, K, key_from_message_body<K,T3> >(key_from_message_body<K,T3>()),
1604                     new type_to_key_function_body_leaf<T4, K, key_from_message_body<K,T4> >(key_from_message_body<K,T4>()),
1605                     new type_to_key_function_body_leaf<T5, K, key_from_message_body<K,T5> >(key_from_message_body<K,T5>()),
1606                     new type_to_key_function_body_leaf<T6, K, key_from_message_body<K,T6> >(key_from_message_body<K,T6>()),
1607                     new type_to_key_function_body_leaf<T7, K, key_from_message_body<K,T7> >(key_from_message_body<K,T7>()),
1608                     new type_to_key_function_body_leaf<T8, K, key_from_message_body<K,T8> >(key_from_message_body<K,T8>())
1609                     ) ) {
1610         }
1611 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
1612         template<typename Body0, typename Body1, typename Body2, typename Body3, typename Body4,
1613                  typename Body5, typename Body6, typename Body7, typename Body8>
1614         unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4,
1615                 Body5 body5, Body6 body6, Body7 body7, Body8 body8) : base_type(g, func_initializer_type(
1616                     new type_to_key_function_body_leaf<T0, K, Body0>(body0),
1617                     new type_to_key_function_body_leaf<T1, K, Body1>(body1),
1618                     new type_to_key_function_body_leaf<T2, K, Body2>(body2),
1619                     new type_to_key_function_body_leaf<T3, K, Body3>(body3),
1620                     new type_to_key_function_body_leaf<T4, K, Body4>(body4),
1621                     new type_to_key_function_body_leaf<T5, K, Body5>(body5),
1622                     new type_to_key_function_body_leaf<T6, K, Body6>(body6),
1623                     new type_to_key_function_body_leaf<T7, K, Body7>(body7),
1624                     new type_to_key_function_body_leaf<T8, K, Body8>(body8)
1625                     ) ) {
1626             static_assert(std::tuple_size<OutputTuple>::value == 9, "wrong number of body initializers");
1627         }
1628         unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1629     };
1630 #endif
1631 
1632 #if __TBB_VARIADIC_MAX >= 10
1633     template<typename OutputTuple, typename K, typename KHash>
1634     class unfolded_join_node<10,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1635             join_base<10,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1636         typedef typename std::tuple_element<0, OutputTuple>::type T0;
1637         typedef typename std::tuple_element<1, OutputTuple>::type T1;
1638         typedef typename std::tuple_element<2, OutputTuple>::type T2;
1639         typedef typename std::tuple_element<3, OutputTuple>::type T3;
1640         typedef typename std::tuple_element<4, OutputTuple>::type T4;
1641         typedef typename std::tuple_element<5, OutputTuple>::type T5;
1642         typedef typename std::tuple_element<6, OutputTuple>::type T6;
1643         typedef typename std::tuple_element<7, OutputTuple>::type T7;
1644         typedef typename std::tuple_element<8, OutputTuple>::type T8;
1645         typedef typename std::tuple_element<9, OutputTuple>::type T9;
1646     public:
1647         typedef typename wrap_key_tuple_elements<10,key_matching_port,key_matching<K,KHash>,OutputTuple>::type input_ports_type;
1648         typedef OutputTuple output_type;
1649     private:
1650         typedef join_node_base<key_matching<K,KHash> , input_ports_type, output_type > base_type;
1651         typedef type_to_key_function_body<T0, K> *f0_p;
1652         typedef type_to_key_function_body<T1, K> *f1_p;
1653         typedef type_to_key_function_body<T2, K> *f2_p;
1654         typedef type_to_key_function_body<T3, K> *f3_p;
1655         typedef type_to_key_function_body<T4, K> *f4_p;
1656         typedef type_to_key_function_body<T5, K> *f5_p;
1657         typedef type_to_key_function_body<T6, K> *f6_p;
1658         typedef type_to_key_function_body<T7, K> *f7_p;
1659         typedef type_to_key_function_body<T8, K> *f8_p;
1660         typedef type_to_key_function_body<T9, K> *f9_p;
1661         typedef std::tuple< f0_p, f1_p, f2_p, f3_p, f4_p, f5_p, f6_p, f7_p, f8_p, f9_p > func_initializer_type;
1662     public:
1663 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1664         unfolded_join_node(graph &g) : base_type(g,
1665                 func_initializer_type(
1666                     new type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1667                     new type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()),
1668                     new type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()),
1669                     new type_to_key_function_body_leaf<T3, K, key_from_message_body<K,T3> >(key_from_message_body<K,T3>()),
1670                     new type_to_key_function_body_leaf<T4, K, key_from_message_body<K,T4> >(key_from_message_body<K,T4>()),
1671                     new type_to_key_function_body_leaf<T5, K, key_from_message_body<K,T5> >(key_from_message_body<K,T5>()),
1672                     new type_to_key_function_body_leaf<T6, K, key_from_message_body<K,T6> >(key_from_message_body<K,T6>()),
1673                     new type_to_key_function_body_leaf<T7, K, key_from_message_body<K,T7> >(key_from_message_body<K,T7>()),
1674                     new type_to_key_function_body_leaf<T8, K, key_from_message_body<K,T8> >(key_from_message_body<K,T8>()),
1675                     new type_to_key_function_body_leaf<T9, K, key_from_message_body<K,T9> >(key_from_message_body<K,T9>())
1676                     ) ) {
1677         }
1678 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
1679         template<typename Body0, typename Body1, typename Body2, typename Body3, typename Body4,
1680             typename Body5, typename Body6, typename Body7, typename Body8, typename Body9>
1681         unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4,
1682                 Body5 body5, Body6 body6, Body7 body7, Body8 body8, Body9 body9) : base_type(g, func_initializer_type(
1683                     new type_to_key_function_body_leaf<T0, K, Body0>(body0),
1684                     new type_to_key_function_body_leaf<T1, K, Body1>(body1),
1685                     new type_to_key_function_body_leaf<T2, K, Body2>(body2),
1686                     new type_to_key_function_body_leaf<T3, K, Body3>(body3),
1687                     new type_to_key_function_body_leaf<T4, K, Body4>(body4),
1688                     new type_to_key_function_body_leaf<T5, K, Body5>(body5),
1689                     new type_to_key_function_body_leaf<T6, K, Body6>(body6),
1690                     new type_to_key_function_body_leaf<T7, K, Body7>(body7),
1691                     new type_to_key_function_body_leaf<T8, K, Body8>(body8),
1692                     new type_to_key_function_body_leaf<T9, K, Body9>(body9)
1693                     ) ) {
1694             static_assert(std::tuple_size<OutputTuple>::value == 10, "wrong number of body initializers");
1695         }
1696         unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1697     };
1698 #endif
1699 
1700     //! templated function to refer to input ports of the join node
1701     template<size_t N, typename JNT>
1702     typename std::tuple_element<N, typename JNT::input_ports_type>::type &input_port(JNT &jn) {
1703         return std::get<N>(jn.input_ports());
1704     }
1705 
1706 #endif // __TBB__flow_graph_join_impl_H
1707