1 /*
2     Copyright (c) 2005-2022 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #ifndef __TBB__flow_graph_join_impl_H
18 #define __TBB__flow_graph_join_impl_H
19 
20 #ifndef __TBB_flow_graph_H
21 #error Do not #include this internal file directly; use public TBB headers instead.
22 #endif
23 
24 // included into namespace tbb::detail::d1
25 
26     struct forwarding_base : no_assign {
forwarding_baseforwarding_base27         forwarding_base(graph &g) : graph_ref(g) {}
~forwarding_baseforwarding_base28         virtual ~forwarding_base() {}
29         graph& graph_ref;
30     };
31 
32     struct queueing_forwarding_base : forwarding_base {
33         using forwarding_base::forwarding_base;
34         // decrement_port_count may create a forwarding task.  If we cannot handle the task
35         // ourselves, ask decrement_port_count to deal with it.
36         virtual graph_task* decrement_port_count(bool handle_task) = 0;
37     };
38 
39     struct reserving_forwarding_base : forwarding_base {
40         using forwarding_base::forwarding_base;
41         // decrement_port_count may create a forwarding task.  If we cannot handle the task
42         // ourselves, ask decrement_port_count to deal with it.
43         virtual graph_task* decrement_port_count() = 0;
44         virtual void increment_port_count() = 0;
45     };
46 
47     // specialization that lets us keep a copy of the current_key for building results.
48     // KeyType can be a reference type.
49     template<typename KeyType>
50     struct matching_forwarding_base : public forwarding_base {
51         typedef typename std::decay<KeyType>::type current_key_type;
matching_forwarding_basematching_forwarding_base52         matching_forwarding_base(graph &g) : forwarding_base(g) { }
53         virtual graph_task* increment_key_count(current_key_type const & /*t*/) = 0;
54         current_key_type current_key; // so ports can refer to FE's desired items
55     };
56 
57     template< int N >
58     struct join_helper {
59 
60         template< typename TupleType, typename PortType >
set_join_node_pointerjoin_helper61         static inline void set_join_node_pointer(TupleType &my_input, PortType *port) {
62             std::get<N-1>( my_input ).set_join_node_pointer(port);
63             join_helper<N-1>::set_join_node_pointer( my_input, port );
64         }
65         template< typename TupleType >
consume_reservationsjoin_helper66         static inline void consume_reservations( TupleType &my_input ) {
67             std::get<N-1>( my_input ).consume();
68             join_helper<N-1>::consume_reservations( my_input );
69         }
70 
71         template< typename TupleType >
release_my_reservationjoin_helper72         static inline void release_my_reservation( TupleType &my_input ) {
73             std::get<N-1>( my_input ).release();
74         }
75 
76         template <typename TupleType>
release_reservationsjoin_helper77         static inline void release_reservations( TupleType &my_input) {
78             join_helper<N-1>::release_reservations(my_input);
79             release_my_reservation(my_input);
80         }
81 
82         template< typename InputTuple, typename OutputTuple >
reservejoin_helper83         static inline bool reserve( InputTuple &my_input, OutputTuple &out) {
84             if ( !std::get<N-1>( my_input ).reserve( std::get<N-1>( out ) ) ) return false;
85             if ( !join_helper<N-1>::reserve( my_input, out ) ) {
86                 release_my_reservation( my_input );
87                 return false;
88             }
89             return true;
90         }
91 
92         template<typename InputTuple, typename OutputTuple>
get_my_itemjoin_helper93         static inline bool get_my_item( InputTuple &my_input, OutputTuple &out) {
94             bool res = std::get<N-1>(my_input).get_item(std::get<N-1>(out) ); // may fail
95             return join_helper<N-1>::get_my_item(my_input, out) && res;       // do get on other inputs before returning
96         }
97 
98         template<typename InputTuple, typename OutputTuple>
get_itemsjoin_helper99         static inline bool get_items(InputTuple &my_input, OutputTuple &out) {
100             return get_my_item(my_input, out);
101         }
102 
103         template<typename InputTuple>
reset_my_portjoin_helper104         static inline void reset_my_port(InputTuple &my_input) {
105             join_helper<N-1>::reset_my_port(my_input);
106             std::get<N-1>(my_input).reset_port();
107         }
108 
109         template<typename InputTuple>
reset_portsjoin_helper110         static inline void reset_ports(InputTuple& my_input) {
111             reset_my_port(my_input);
112         }
113 
114         template<typename InputTuple, typename KeyFuncTuple>
set_key_functorsjoin_helper115         static inline void set_key_functors(InputTuple &my_input, KeyFuncTuple &my_key_funcs) {
116             std::get<N-1>(my_input).set_my_key_func(std::get<N-1>(my_key_funcs));
117             std::get<N-1>(my_key_funcs) = nullptr;
118             join_helper<N-1>::set_key_functors(my_input, my_key_funcs);
119         }
120 
121         template< typename KeyFuncTuple>
copy_key_functorsjoin_helper122         static inline void copy_key_functors(KeyFuncTuple &my_inputs, KeyFuncTuple &other_inputs) {
123             __TBB_ASSERT(
124                 std::get<N-1>(other_inputs).get_my_key_func(),
125                 "key matching join node should not be instantiated without functors."
126             );
127             std::get<N-1>(my_inputs).set_my_key_func(std::get<N-1>(other_inputs).get_my_key_func()->clone());
128             join_helper<N-1>::copy_key_functors(my_inputs, other_inputs);
129         }
130 
131         template<typename InputTuple>
reset_inputsjoin_helper132         static inline void reset_inputs(InputTuple &my_input, reset_flags f) {
133             join_helper<N-1>::reset_inputs(my_input, f);
134             std::get<N-1>(my_input).reset_receiver(f);
135         }
136     };  // join_helper<N>
137 
138     template< >
139     struct join_helper<1> {
140 
141         template< typename TupleType, typename PortType >
142         static inline void set_join_node_pointer(TupleType &my_input, PortType *port) {
143             std::get<0>( my_input ).set_join_node_pointer(port);
144         }
145 
146         template< typename TupleType >
147         static inline void consume_reservations( TupleType &my_input ) {
148             std::get<0>( my_input ).consume();
149         }
150 
151         template< typename TupleType >
152         static inline void release_my_reservation( TupleType &my_input ) {
153             std::get<0>( my_input ).release();
154         }
155 
156         template<typename TupleType>
157         static inline void release_reservations( TupleType &my_input) {
158             release_my_reservation(my_input);
159         }
160 
161         template< typename InputTuple, typename OutputTuple >
162         static inline bool reserve( InputTuple &my_input, OutputTuple &out) {
163             return std::get<0>( my_input ).reserve( std::get<0>( out ) );
164         }
165 
166         template<typename InputTuple, typename OutputTuple>
167         static inline bool get_my_item( InputTuple &my_input, OutputTuple &out) {
168             return std::get<0>(my_input).get_item(std::get<0>(out));
169         }
170 
171         template<typename InputTuple, typename OutputTuple>
172         static inline bool get_items(InputTuple &my_input, OutputTuple &out) {
173             return get_my_item(my_input, out);
174         }
175 
176         template<typename InputTuple>
177         static inline void reset_my_port(InputTuple &my_input) {
178             std::get<0>(my_input).reset_port();
179         }
180 
181         template<typename InputTuple>
182         static inline void reset_ports(InputTuple& my_input) {
183             reset_my_port(my_input);
184         }
185 
186         template<typename InputTuple, typename KeyFuncTuple>
187         static inline void set_key_functors(InputTuple &my_input, KeyFuncTuple &my_key_funcs) {
188             std::get<0>(my_input).set_my_key_func(std::get<0>(my_key_funcs));
189             std::get<0>(my_key_funcs) = nullptr;
190         }
191 
192         template< typename KeyFuncTuple>
193         static inline void copy_key_functors(KeyFuncTuple &my_inputs, KeyFuncTuple &other_inputs) {
194             __TBB_ASSERT(
195                 std::get<0>(other_inputs).get_my_key_func(),
196                 "key matching join node should not be instantiated without functors."
197             );
198             std::get<0>(my_inputs).set_my_key_func(std::get<0>(other_inputs).get_my_key_func()->clone());
199         }
200         template<typename InputTuple>
201         static inline void reset_inputs(InputTuple &my_input, reset_flags f) {
202             std::get<0>(my_input).reset_receiver(f);
203         }
204     };  // join_helper<1>
205 
206     //! The two-phase join port
207     template< typename T >
208     class reserving_port : public receiver<T> {
209     public:
210         typedef T input_type;
211         typedef typename receiver<input_type>::predecessor_type predecessor_type;
212 
213     private:
214         // ----------- Aggregator ------------
215         enum op_type { reg_pred, rem_pred, res_item, rel_res, con_res
216         };
217         typedef reserving_port<T> class_type;
218 
219         class reserving_port_operation : public aggregated_operation<reserving_port_operation> {
220         public:
221             char type;
222             union {
223                 T *my_arg;
224                 predecessor_type *my_pred;
225             };
226             reserving_port_operation(const T& e, op_type t) :
227                 type(char(t)), my_arg(const_cast<T*>(&e)) {}
228             reserving_port_operation(const predecessor_type &s, op_type t) : type(char(t)),
229                 my_pred(const_cast<predecessor_type *>(&s)) {}
230             reserving_port_operation(op_type t) : type(char(t)) {}
231         };
232 
233         typedef aggregating_functor<class_type, reserving_port_operation> handler_type;
234         friend class aggregating_functor<class_type, reserving_port_operation>;
235         aggregator<handler_type, reserving_port_operation> my_aggregator;
236 
237         void handle_operations(reserving_port_operation* op_list) {
238             reserving_port_operation *current;
239             bool was_missing_predecessors = false;
240             while(op_list) {
241                 current = op_list;
242                 op_list = op_list->next;
243                 switch(current->type) {
244                 case reg_pred:
245                     was_missing_predecessors = my_predecessors.empty();
246                     my_predecessors.add(*(current->my_pred));
247                     if ( was_missing_predecessors ) {
248                         (void) my_join->decrement_port_count(); // may try to forward
249                     }
250                     current->status.store( SUCCEEDED, std::memory_order_release);
251                     break;
252                 case rem_pred:
253                     if ( !my_predecessors.empty() ) {
254                         my_predecessors.remove(*(current->my_pred));
255                         if ( my_predecessors.empty() ) // was the last predecessor
256                             my_join->increment_port_count();
257                     }
258                     // TODO: consider returning failure if there were no predecessors to remove
259                     current->status.store( SUCCEEDED, std::memory_order_release );
260                     break;
261                 case res_item:
262                     if ( reserved ) {
263                         current->status.store( FAILED, std::memory_order_release);
264                     }
265                     else if ( my_predecessors.try_reserve( *(current->my_arg) ) ) {
266                         reserved = true;
267                         current->status.store( SUCCEEDED, std::memory_order_release);
268                     } else {
269                         if ( my_predecessors.empty() ) {
270                             my_join->increment_port_count();
271                         }
272                         current->status.store( FAILED, std::memory_order_release);
273                     }
274                     break;
275                 case rel_res:
276                     reserved = false;
277                     my_predecessors.try_release( );
278                     current->status.store( SUCCEEDED, std::memory_order_release);
279                     break;
280                 case con_res:
281                     reserved = false;
282                     my_predecessors.try_consume( );
283                     current->status.store( SUCCEEDED, std::memory_order_release);
284                     break;
285                 }
286             }
287         }
288 
289     protected:
290         template< typename R, typename B > friend class run_and_put_task;
291         template<typename X, typename Y> friend class broadcast_cache;
292         template<typename X, typename Y> friend class round_robin_cache;
293         graph_task* try_put_task( const T & ) override {
294             return nullptr;
295         }
296 
297         graph& graph_reference() const override {
298             return my_join->graph_ref;
299         }
300 
301     public:
302 
303         //! Constructor
304         reserving_port() : my_join(nullptr), my_predecessors(this), reserved(false) {
305             my_aggregator.initialize_handler(handler_type(this));
306         }
307 
308         // copy constructor
309         reserving_port(const reserving_port& /* other */) = delete;
310 
311         void set_join_node_pointer(reserving_forwarding_base *join) {
312             my_join = join;
313         }
314 
315         //! Add a predecessor
316         bool register_predecessor( predecessor_type &src ) override {
317             reserving_port_operation op_data(src, reg_pred);
318             my_aggregator.execute(&op_data);
319             return op_data.status == SUCCEEDED;
320         }
321 
322         //! Remove a predecessor
323         bool remove_predecessor( predecessor_type &src ) override {
324             reserving_port_operation op_data(src, rem_pred);
325             my_aggregator.execute(&op_data);
326             return op_data.status == SUCCEEDED;
327         }
328 
329         //! Reserve an item from the port
330         bool reserve( T &v ) {
331             reserving_port_operation op_data(v, res_item);
332             my_aggregator.execute(&op_data);
333             return op_data.status == SUCCEEDED;
334         }
335 
336         //! Release the port
337         void release( ) {
338             reserving_port_operation op_data(rel_res);
339             my_aggregator.execute(&op_data);
340         }
341 
342         //! Complete use of the port
343         void consume( ) {
344             reserving_port_operation op_data(con_res);
345             my_aggregator.execute(&op_data);
346         }
347 
348         void reset_receiver( reset_flags f) {
349             if(f & rf_clear_edges) my_predecessors.clear();
350             else
351             my_predecessors.reset();
352             reserved = false;
353             __TBB_ASSERT(!(f&rf_clear_edges) || my_predecessors.empty(), "port edges not removed");
354         }
355 
356     private:
357 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
358         friend class get_graph_helper;
359 #endif
360 
361         reserving_forwarding_base *my_join;
362         reservable_predecessor_cache< T, null_mutex > my_predecessors;
363         bool reserved;
364     };  // reserving_port
365 
366     //! queueing join_port
367     template<typename T>
368     class queueing_port : public receiver<T>, public item_buffer<T> {
369     public:
370         typedef T input_type;
371         typedef typename receiver<input_type>::predecessor_type predecessor_type;
372         typedef queueing_port<T> class_type;
373 
374     // ----------- Aggregator ------------
375     private:
376         enum op_type { get__item, res_port, try__put_task
377         };
378 
379         class queueing_port_operation : public aggregated_operation<queueing_port_operation> {
380         public:
381             char type;
382             T my_val;
383             T* my_arg;
384             graph_task* bypass_t;
385             // constructor for value parameter
386             queueing_port_operation(const T& e, op_type t) :
387                 type(char(t)), my_val(e), my_arg(nullptr)
388                 , bypass_t(nullptr)
389             {}
390             // constructor for pointer parameter
391             queueing_port_operation(const T* p, op_type t) :
392                 type(char(t)), my_arg(const_cast<T*>(p))
393                 , bypass_t(nullptr)
394             {}
395             // constructor with no parameter
396             queueing_port_operation(op_type t) : type(char(t)), my_arg(nullptr)
397                 , bypass_t(nullptr)
398             {}
399         };
400 
401         typedef aggregating_functor<class_type, queueing_port_operation> handler_type;
402         friend class aggregating_functor<class_type, queueing_port_operation>;
403         aggregator<handler_type, queueing_port_operation> my_aggregator;
404 
405         void handle_operations(queueing_port_operation* op_list) {
406             queueing_port_operation *current;
407             bool was_empty;
408             while(op_list) {
409                 current = op_list;
410                 op_list = op_list->next;
411                 switch(current->type) {
412                 case try__put_task: {
413                         graph_task* rtask = nullptr;
414                         was_empty = this->buffer_empty();
415                         this->push_back(current->my_val);
416                         if (was_empty) rtask = my_join->decrement_port_count(false);
417                         else
418                             rtask = SUCCESSFULLY_ENQUEUED;
419                         current->bypass_t = rtask;
420                         current->status.store( SUCCEEDED, std::memory_order_release);
421                     }
422                     break;
423                 case get__item:
424                     if(!this->buffer_empty()) {
425                         __TBB_ASSERT(current->my_arg, nullptr);
426                         *(current->my_arg) = this->front();
427                         current->status.store( SUCCEEDED, std::memory_order_release);
428                     }
429                     else {
430                         current->status.store( FAILED, std::memory_order_release);
431                     }
432                     break;
433                 case res_port:
434                     __TBB_ASSERT(this->my_item_valid(this->my_head), "No item to reset");
435                     this->destroy_front();
436                     if(this->my_item_valid(this->my_head)) {
437                         (void)my_join->decrement_port_count(true);
438                     }
439                     current->status.store( SUCCEEDED, std::memory_order_release);
440                     break;
441                 }
442             }
443         }
444     // ------------ End Aggregator ---------------
445 
446     protected:
447         template< typename R, typename B > friend class run_and_put_task;
448         template<typename X, typename Y> friend class broadcast_cache;
449         template<typename X, typename Y> friend class round_robin_cache;
450         graph_task* try_put_task(const T &v) override {
451             queueing_port_operation op_data(v, try__put_task);
452             my_aggregator.execute(&op_data);
453             __TBB_ASSERT(op_data.status == SUCCEEDED || !op_data.bypass_t, "inconsistent return from aggregator");
454             if(!op_data.bypass_t) return SUCCESSFULLY_ENQUEUED;
455             return op_data.bypass_t;
456         }
457 
458         graph& graph_reference() const override {
459             return my_join->graph_ref;
460         }
461 
462     public:
463 
464         //! Constructor
465         queueing_port() : item_buffer<T>() {
466             my_join = nullptr;
467             my_aggregator.initialize_handler(handler_type(this));
468         }
469 
470         //! copy constructor
471         queueing_port(const queueing_port& /* other */) = delete;
472 
473         //! record parent for tallying available items
474         void set_join_node_pointer(queueing_forwarding_base *join) {
475             my_join = join;
476         }
477 
478         bool get_item( T &v ) {
479             queueing_port_operation op_data(&v, get__item);
480             my_aggregator.execute(&op_data);
481             return op_data.status == SUCCEEDED;
482         }
483 
484         // reset_port is called when item is accepted by successor, but
485         // is initiated by join_node.
486         void reset_port() {
487             queueing_port_operation op_data(res_port);
488             my_aggregator.execute(&op_data);
489             return;
490         }
491 
492         void reset_receiver(reset_flags) {
493             item_buffer<T>::reset();
494         }
495 
496     private:
497 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
498         friend class get_graph_helper;
499 #endif
500 
501         queueing_forwarding_base *my_join;
502     };  // queueing_port
503 
504 #include "_flow_graph_tagged_buffer_impl.h"
505 
506     template<typename K>
507     struct count_element {
508         K my_key;
509         size_t my_value;
510     };
511 
512     // method to access the key in the counting table
513     // the ref has already been removed from K
514     template< typename K >
515     struct key_to_count_functor {
516         typedef count_element<K> table_item_type;
517         const K& operator()(const table_item_type& v) { return v.my_key; }
518     };
519 
520     // the ports can have only one template parameter.  We wrap the types needed in
521     // a traits type
522     template< class TraitsType >
523     class key_matching_port :
524         public receiver<typename TraitsType::T>,
525         public hash_buffer< typename TraitsType::K, typename TraitsType::T, typename TraitsType::TtoK,
526                 typename TraitsType::KHash > {
527     public:
528         typedef TraitsType traits;
529         typedef key_matching_port<traits> class_type;
530         typedef typename TraitsType::T input_type;
531         typedef typename TraitsType::K key_type;
532         typedef typename std::decay<key_type>::type noref_key_type;
533         typedef typename receiver<input_type>::predecessor_type predecessor_type;
534         typedef typename TraitsType::TtoK type_to_key_func_type;
535         typedef typename TraitsType::KHash hash_compare_type;
536         typedef hash_buffer< key_type, input_type, type_to_key_func_type, hash_compare_type > buffer_type;
537 
538     private:
539 // ----------- Aggregator ------------
540     private:
541         enum op_type { try__put, get__item, res_port
542         };
543 
544         class key_matching_port_operation : public aggregated_operation<key_matching_port_operation> {
545         public:
546             char type;
547             input_type my_val;
548             input_type *my_arg;
549             // constructor for value parameter
550             key_matching_port_operation(const input_type& e, op_type t) :
551                 type(char(t)), my_val(e), my_arg(nullptr) {}
552             // constructor for pointer parameter
553             key_matching_port_operation(const input_type* p, op_type t) :
554                 type(char(t)), my_arg(const_cast<input_type*>(p)) {}
555             // constructor with no parameter
556             key_matching_port_operation(op_type t) : type(char(t)), my_arg(nullptr) {}
557         };
558 
559         typedef aggregating_functor<class_type, key_matching_port_operation> handler_type;
560         friend class aggregating_functor<class_type, key_matching_port_operation>;
561         aggregator<handler_type, key_matching_port_operation> my_aggregator;
562 
563         void handle_operations(key_matching_port_operation* op_list) {
564             key_matching_port_operation *current;
565             while(op_list) {
566                 current = op_list;
567                 op_list = op_list->next;
568                 switch(current->type) {
569                 case try__put: {
570                         bool was_inserted = this->insert_with_key(current->my_val);
571                         // return failure if a duplicate insertion occurs
572                         current->status.store( was_inserted ? SUCCEEDED : FAILED, std::memory_order_release);
573                     }
574                     break;
575                 case get__item:
576                     // use current_key from FE for item
577                     __TBB_ASSERT(current->my_arg, nullptr);
578                     if(!this->find_with_key(my_join->current_key, *(current->my_arg))) {
579                         __TBB_ASSERT(false, "Failed to find item corresponding to current_key.");
580                     }
581                     current->status.store( SUCCEEDED, std::memory_order_release);
582                     break;
583                 case res_port:
584                     // use current_key from FE for item
585                     this->delete_with_key(my_join->current_key);
586                     current->status.store( SUCCEEDED, std::memory_order_release);
587                     break;
588                 }
589             }
590         }
591 // ------------ End Aggregator ---------------
592     protected:
593         template< typename R, typename B > friend class run_and_put_task;
594         template<typename X, typename Y> friend class broadcast_cache;
595         template<typename X, typename Y> friend class round_robin_cache;
596         graph_task* try_put_task(const input_type& v) override {
597             key_matching_port_operation op_data(v, try__put);
598             graph_task* rtask = nullptr;
599             my_aggregator.execute(&op_data);
600             if(op_data.status == SUCCEEDED) {
601                 rtask = my_join->increment_key_count((*(this->get_key_func()))(v));  // may spawn
602                 // rtask has to reflect the return status of the try_put
603                 if(!rtask) rtask = SUCCESSFULLY_ENQUEUED;
604             }
605             return rtask;
606         }
607 
608         graph& graph_reference() const override {
609             return my_join->graph_ref;
610         }
611 
612     public:
613 
614         key_matching_port() : receiver<input_type>(), buffer_type() {
615             my_join = nullptr;
616             my_aggregator.initialize_handler(handler_type(this));
617         }
618 
619         // copy constructor
620         key_matching_port(const key_matching_port& /*other*/) = delete;
621 #if __INTEL_COMPILER <= 2021
622         // Suppress superfluous diagnostic about virtual keyword absence in a destructor of an inherited
623         // class while the parent class has the virtual keyword for the destrocutor.
624         virtual
625 #endif
626         ~key_matching_port() { }
627 
628         void set_join_node_pointer(forwarding_base *join) {
629             my_join = dynamic_cast<matching_forwarding_base<key_type>*>(join);
630         }
631 
632         void set_my_key_func(type_to_key_func_type *f) { this->set_key_func(f); }
633 
634         type_to_key_func_type* get_my_key_func() { return this->get_key_func(); }
635 
636         bool get_item( input_type &v ) {
637             // aggregator uses current_key from FE for Key
638             key_matching_port_operation op_data(&v, get__item);
639             my_aggregator.execute(&op_data);
640             return op_data.status == SUCCEEDED;
641         }
642 
643         // reset_port is called when item is accepted by successor, but
644         // is initiated by join_node.
645         void reset_port() {
646             key_matching_port_operation op_data(res_port);
647             my_aggregator.execute(&op_data);
648             return;
649         }
650 
651         void reset_receiver(reset_flags ) {
652             buffer_type::reset();
653         }
654 
655     private:
656         // my_join forwarding base used to count number of inputs that
657         // received key.
658         matching_forwarding_base<key_type> *my_join;
659     };  // key_matching_port
660 
661     using namespace graph_policy_namespace;
662 
663     template<typename JP, typename InputTuple, typename OutputTuple>
664     class join_node_base;
665 
666     //! join_node_FE : implements input port policy
667     template<typename JP, typename InputTuple, typename OutputTuple>
668     class join_node_FE;
669 
670     template<typename InputTuple, typename OutputTuple>
671     class join_node_FE<reserving, InputTuple, OutputTuple> : public reserving_forwarding_base {
672     private:
673         static const int N = std::tuple_size<OutputTuple>::value;
674         typedef OutputTuple output_type;
675         typedef InputTuple input_type;
676         typedef join_node_base<reserving, InputTuple, OutputTuple> base_node_type; // for forwarding
677     public:
678         join_node_FE(graph &g) : reserving_forwarding_base(g), my_node(nullptr) {
679             ports_with_no_inputs = N;
680             join_helper<N>::set_join_node_pointer(my_inputs, this);
681         }
682 
683         join_node_FE(const join_node_FE& other) : reserving_forwarding_base((other.reserving_forwarding_base::graph_ref)), my_node(nullptr) {
684             ports_with_no_inputs = N;
685             join_helper<N>::set_join_node_pointer(my_inputs, this);
686         }
687 
688         void set_my_node(base_node_type *new_my_node) { my_node = new_my_node; }
689 
690        void increment_port_count() override {
691             ++ports_with_no_inputs;
692         }
693 
694         // if all input_ports have predecessors, spawn forward to try and consume tuples
695         graph_task* decrement_port_count() override {
696             if(ports_with_no_inputs.fetch_sub(1) == 1) {
697                 if(is_graph_active(this->graph_ref)) {
698                     small_object_allocator allocator{};
699                     typedef forward_task_bypass<base_node_type> task_type;
700                     graph_task* t = allocator.new_object<task_type>(graph_ref, allocator, *my_node);
701                     graph_ref.reserve_wait();
702                     spawn_in_graph_arena(this->graph_ref, *t);
703                 }
704             }
705             return nullptr;
706         }
707 
708         input_type &input_ports() { return my_inputs; }
709 
710     protected:
711 
712         void reset(  reset_flags f) {
713             // called outside of parallel contexts
714             ports_with_no_inputs = N;
715             join_helper<N>::reset_inputs(my_inputs, f);
716         }
717 
718         // all methods on input ports should be called under mutual exclusion from join_node_base.
719 
720         bool tuple_build_may_succeed() {
721             return !ports_with_no_inputs;
722         }
723 
724         bool try_to_make_tuple(output_type &out) {
725             if(ports_with_no_inputs) return false;
726             return join_helper<N>::reserve(my_inputs, out);
727         }
728 
729         void tuple_accepted() {
730             join_helper<N>::consume_reservations(my_inputs);
731         }
732         void tuple_rejected() {
733             join_helper<N>::release_reservations(my_inputs);
734         }
735 
736         input_type my_inputs;
737         base_node_type *my_node;
738         std::atomic<std::size_t> ports_with_no_inputs;
739     };  // join_node_FE<reserving, ... >
740 
741     template<typename InputTuple, typename OutputTuple>
742     class join_node_FE<queueing, InputTuple, OutputTuple> : public queueing_forwarding_base {
743     public:
744         static const int N = std::tuple_size<OutputTuple>::value;
745         typedef OutputTuple output_type;
746         typedef InputTuple input_type;
747         typedef join_node_base<queueing, InputTuple, OutputTuple> base_node_type; // for forwarding
748 
749         join_node_FE(graph &g) : queueing_forwarding_base(g), my_node(nullptr) {
750             ports_with_no_items = N;
751             join_helper<N>::set_join_node_pointer(my_inputs, this);
752         }
753 
754         join_node_FE(const join_node_FE& other) : queueing_forwarding_base((other.queueing_forwarding_base::graph_ref)), my_node(nullptr) {
755             ports_with_no_items = N;
756             join_helper<N>::set_join_node_pointer(my_inputs, this);
757         }
758 
759         // needed for forwarding
760         void set_my_node(base_node_type *new_my_node) { my_node = new_my_node; }
761 
762         void reset_port_count() {
763             ports_with_no_items = N;
764         }
765 
766         // if all input_ports have items, spawn forward to try and consume tuples
767         graph_task* decrement_port_count(bool handle_task) override
768         {
769             if(ports_with_no_items.fetch_sub(1) == 1) {
770                 if(is_graph_active(this->graph_ref)) {
771                     small_object_allocator allocator{};
772                     typedef forward_task_bypass<base_node_type> task_type;
773                     graph_task* t = allocator.new_object<task_type>(graph_ref, allocator, *my_node);
774                     graph_ref.reserve_wait();
775                     if( !handle_task )
776                         return t;
777                     spawn_in_graph_arena(this->graph_ref, *t);
778                 }
779             }
780             return nullptr;
781         }
782 
783         input_type &input_ports() { return my_inputs; }
784 
785     protected:
786 
787         void reset(  reset_flags f) {
788             reset_port_count();
789             join_helper<N>::reset_inputs(my_inputs, f );
790         }
791 
792         // all methods on input ports should be called under mutual exclusion from join_node_base.
793 
794         bool tuple_build_may_succeed() {
795             return !ports_with_no_items;
796         }
797 
798         bool try_to_make_tuple(output_type &out) {
799             if(ports_with_no_items) return false;
800             return join_helper<N>::get_items(my_inputs, out);
801         }
802 
803         void tuple_accepted() {
804             reset_port_count();
805             join_helper<N>::reset_ports(my_inputs);
806         }
807         void tuple_rejected() {
808             // nothing to do.
809         }
810 
811         input_type my_inputs;
812         base_node_type *my_node;
813         std::atomic<std::size_t> ports_with_no_items;
814     };  // join_node_FE<queueing, ...>
815 
816     // key_matching join front-end.
817     template<typename InputTuple, typename OutputTuple, typename K, typename KHash>
818     class join_node_FE<key_matching<K,KHash>, InputTuple, OutputTuple> : public matching_forwarding_base<K>,
819              // buffer of key value counts
820               public hash_buffer<   // typedefed below to key_to_count_buffer_type
821                   typename std::decay<K>::type&,        // force ref type on K
822                   count_element<typename std::decay<K>::type>,
823                   type_to_key_function_body<
824                       count_element<typename std::decay<K>::type>,
825                       typename std::decay<K>::type& >,
826                   KHash >,
827              // buffer of output items
828              public item_buffer<OutputTuple> {
829     public:
830         static const int N = std::tuple_size<OutputTuple>::value;
831         typedef OutputTuple output_type;
832         typedef InputTuple input_type;
833         typedef K key_type;
834         typedef typename std::decay<key_type>::type unref_key_type;
835         typedef KHash key_hash_compare;
836         // must use K without ref.
837         typedef count_element<unref_key_type> count_element_type;
838         // method that lets us refer to the key of this type.
839         typedef key_to_count_functor<unref_key_type> key_to_count_func;
840         typedef type_to_key_function_body< count_element_type, unref_key_type&> TtoK_function_body_type;
841         typedef type_to_key_function_body_leaf<count_element_type, unref_key_type&, key_to_count_func> TtoK_function_body_leaf_type;
842         // this is the type of the special table that keeps track of the number of discrete
843         // elements corresponding to each key that we've seen.
844         typedef hash_buffer< unref_key_type&, count_element_type, TtoK_function_body_type, key_hash_compare >
845                  key_to_count_buffer_type;
846         typedef item_buffer<output_type> output_buffer_type;
847         typedef join_node_base<key_matching<key_type,key_hash_compare>, InputTuple, OutputTuple> base_node_type; // for forwarding
848         typedef matching_forwarding_base<key_type> forwarding_base_type;
849 
850 // ----------- Aggregator ------------
851         // the aggregator is only needed to serialize the access to the hash table.
852         // and the output_buffer_type base class
853     private:
854         enum op_type { res_count, inc_count, may_succeed, try_make };
855         typedef join_node_FE<key_matching<key_type,key_hash_compare>, InputTuple, OutputTuple> class_type;
856 
857         class key_matching_FE_operation : public aggregated_operation<key_matching_FE_operation> {
858         public:
859             char type;
860             unref_key_type my_val;
861             output_type* my_output;
862             graph_task* bypass_t;
863             // constructor for value parameter
864             key_matching_FE_operation(const unref_key_type& e , op_type t) : type(char(t)), my_val(e),
865                  my_output(nullptr), bypass_t(nullptr) {}
866             key_matching_FE_operation(output_type *p, op_type t) : type(char(t)), my_output(p), bypass_t(nullptr) {}
867             // constructor with no parameter
868             key_matching_FE_operation(op_type t) : type(char(t)), my_output(nullptr), bypass_t(nullptr) {}
869         };
870 
871         typedef aggregating_functor<class_type, key_matching_FE_operation> handler_type;
872         friend class aggregating_functor<class_type, key_matching_FE_operation>;
873         aggregator<handler_type, key_matching_FE_operation> my_aggregator;
874 
875         // called from aggregator, so serialized
876         // returns a task pointer if the a task would have been enqueued but we asked that
877         // it be returned.  Otherwise returns nullptr.
878         graph_task* fill_output_buffer(unref_key_type &t) {
879             output_type l_out;
880             graph_task* rtask = nullptr;
881             bool do_fwd = this->buffer_empty() && is_graph_active(this->graph_ref);
882             this->current_key = t;
883             this->delete_with_key(this->current_key);   // remove the key
884             if(join_helper<N>::get_items(my_inputs, l_out)) {  //  <== call back
885                 this->push_back(l_out);
886                 if(do_fwd) {  // we enqueue if receiving an item from predecessor, not if successor asks for item
887                     small_object_allocator allocator{};
888                     typedef forward_task_bypass<base_node_type> task_type;
889                     rtask = allocator.new_object<task_type>(this->graph_ref, allocator, *my_node);
890                     this->graph_ref.reserve_wait();
891                     do_fwd = false;
892                 }
893                 // retire the input values
894                 join_helper<N>::reset_ports(my_inputs);  //  <== call back
895             }
896             else {
897                 __TBB_ASSERT(false, "should have had something to push");
898             }
899             return rtask;
900         }
901 
902         void handle_operations(key_matching_FE_operation* op_list) {
903             key_matching_FE_operation *current;
904             while(op_list) {
905                 current = op_list;
906                 op_list = op_list->next;
907                 switch(current->type) {
908                 case res_count:  // called from BE
909                     {
910                         this->destroy_front();
911                         current->status.store( SUCCEEDED, std::memory_order_release);
912                     }
913                     break;
914                 case inc_count: {  // called from input ports
915                         count_element_type *p = nullptr;
916                         unref_key_type &t = current->my_val;
917                         if(!(this->find_ref_with_key(t,p))) {
918                             count_element_type ev;
919                             ev.my_key = t;
920                             ev.my_value = 0;
921                             this->insert_with_key(ev);
922                             bool found = this->find_ref_with_key(t, p);
923                             __TBB_ASSERT_EX(found, "should find key after inserting it");
924                         }
925                         if(++(p->my_value) == size_t(N)) {
926                             current->bypass_t = fill_output_buffer(t);
927                         }
928                     }
929                     current->status.store( SUCCEEDED, std::memory_order_release);
930                     break;
931                 case may_succeed:  // called from BE
932                     current->status.store( this->buffer_empty() ? FAILED : SUCCEEDED, std::memory_order_release);
933                     break;
934                 case try_make:  // called from BE
935                     if(this->buffer_empty()) {
936                         current->status.store( FAILED, std::memory_order_release);
937                     }
938                     else {
939                         *(current->my_output) = this->front();
940                         current->status.store( SUCCEEDED, std::memory_order_release);
941                     }
942                     break;
943                 }
944             }
945         }
946 // ------------ End Aggregator ---------------
947 
948     public:
949         template<typename FunctionTuple>
950         join_node_FE(graph &g, FunctionTuple &TtoK_funcs) : forwarding_base_type(g), my_node(nullptr) {
951             join_helper<N>::set_join_node_pointer(my_inputs, this);
952             join_helper<N>::set_key_functors(my_inputs, TtoK_funcs);
953             my_aggregator.initialize_handler(handler_type(this));
954                     TtoK_function_body_type *cfb = new TtoK_function_body_leaf_type(key_to_count_func());
955             this->set_key_func(cfb);
956         }
957 
958         join_node_FE(const join_node_FE& other) : forwarding_base_type((other.forwarding_base_type::graph_ref)), key_to_count_buffer_type(),
959         output_buffer_type() {
960             my_node = nullptr;
961             join_helper<N>::set_join_node_pointer(my_inputs, this);
962             join_helper<N>::copy_key_functors(my_inputs, const_cast<input_type &>(other.my_inputs));
963             my_aggregator.initialize_handler(handler_type(this));
964             TtoK_function_body_type *cfb = new TtoK_function_body_leaf_type(key_to_count_func());
965             this->set_key_func(cfb);
966         }
967 
968         // needed for forwarding
969         void set_my_node(base_node_type *new_my_node) { my_node = new_my_node; }
970 
971         void reset_port_count() {  // called from BE
972             key_matching_FE_operation op_data(res_count);
973             my_aggregator.execute(&op_data);
974             return;
975         }
976 
977         // if all input_ports have items, spawn forward to try and consume tuples
978         // return a task if we are asked and did create one.
979         graph_task *increment_key_count(unref_key_type const & t) override {  // called from input_ports
980             key_matching_FE_operation op_data(t, inc_count);
981             my_aggregator.execute(&op_data);
982             return op_data.bypass_t;
983         }
984 
985         input_type &input_ports() { return my_inputs; }
986 
987     protected:
988 
989         void reset(  reset_flags f ) {
990             // called outside of parallel contexts
991             join_helper<N>::reset_inputs(my_inputs, f);
992 
993             key_to_count_buffer_type::reset();
994             output_buffer_type::reset();
995         }
996 
997         // all methods on input ports should be called under mutual exclusion from join_node_base.
998 
999         bool tuple_build_may_succeed() {  // called from back-end
1000             key_matching_FE_operation op_data(may_succeed);
1001             my_aggregator.execute(&op_data);
1002             return op_data.status == SUCCEEDED;
1003         }
1004 
1005         // cannot lock while calling back to input_ports.  current_key will only be set
1006         // and reset under the aggregator, so it will remain consistent.
1007         bool try_to_make_tuple(output_type &out) {
1008             key_matching_FE_operation op_data(&out,try_make);
1009             my_aggregator.execute(&op_data);
1010             return op_data.status == SUCCEEDED;
1011         }
1012 
1013         void tuple_accepted() {
1014             reset_port_count();  // reset current_key after ports reset.
1015         }
1016 
1017         void tuple_rejected() {
1018             // nothing to do.
1019         }
1020 
1021         input_type my_inputs;  // input ports
1022         base_node_type *my_node;
1023     }; // join_node_FE<key_matching<K,KHash>, InputTuple, OutputTuple>
1024 
1025     //! join_node_base
1026     template<typename JP, typename InputTuple, typename OutputTuple>
1027     class join_node_base : public graph_node, public join_node_FE<JP, InputTuple, OutputTuple>,
1028                            public sender<OutputTuple> {
1029     protected:
1030         using graph_node::my_graph;
1031     public:
1032         typedef OutputTuple output_type;
1033 
1034         typedef typename sender<output_type>::successor_type successor_type;
1035         typedef join_node_FE<JP, InputTuple, OutputTuple> input_ports_type;
1036         using input_ports_type::tuple_build_may_succeed;
1037         using input_ports_type::try_to_make_tuple;
1038         using input_ports_type::tuple_accepted;
1039         using input_ports_type::tuple_rejected;
1040 
1041     private:
1042         // ----------- Aggregator ------------
1043         enum op_type { reg_succ, rem_succ, try__get, do_fwrd, do_fwrd_bypass
1044         };
1045         typedef join_node_base<JP,InputTuple,OutputTuple> class_type;
1046 
1047         class join_node_base_operation : public aggregated_operation<join_node_base_operation> {
1048         public:
1049             char type;
1050             union {
1051                 output_type *my_arg;
1052                 successor_type *my_succ;
1053             };
1054             graph_task* bypass_t;
1055             join_node_base_operation(const output_type& e, op_type t) : type(char(t)),
1056                 my_arg(const_cast<output_type*>(&e)), bypass_t(nullptr) {}
1057             join_node_base_operation(const successor_type &s, op_type t) : type(char(t)),
1058                 my_succ(const_cast<successor_type *>(&s)), bypass_t(nullptr) {}
1059             join_node_base_operation(op_type t) : type(char(t)), bypass_t(nullptr) {}
1060         };
1061 
1062         typedef aggregating_functor<class_type, join_node_base_operation> handler_type;
1063         friend class aggregating_functor<class_type, join_node_base_operation>;
1064         bool forwarder_busy;
1065         aggregator<handler_type, join_node_base_operation> my_aggregator;
1066 
1067         void handle_operations(join_node_base_operation* op_list) {
1068             join_node_base_operation *current;
1069             while(op_list) {
1070                 current = op_list;
1071                 op_list = op_list->next;
1072                 switch(current->type) {
1073                 case reg_succ: {
1074                         my_successors.register_successor(*(current->my_succ));
1075                         if(tuple_build_may_succeed() && !forwarder_busy && is_graph_active(my_graph)) {
1076                             small_object_allocator allocator{};
1077                             typedef forward_task_bypass< join_node_base<JP, InputTuple, OutputTuple> > task_type;
1078                             graph_task* t = allocator.new_object<task_type>(my_graph, allocator, *this);
1079                             my_graph.reserve_wait();
1080                             spawn_in_graph_arena(my_graph, *t);
1081                             forwarder_busy = true;
1082                         }
1083                         current->status.store( SUCCEEDED, std::memory_order_release);
1084                     }
1085                     break;
1086                 case rem_succ:
1087                     my_successors.remove_successor(*(current->my_succ));
1088                     current->status.store( SUCCEEDED, std::memory_order_release);
1089                     break;
1090                 case try__get:
1091                     if(tuple_build_may_succeed()) {
1092                         if(try_to_make_tuple(*(current->my_arg))) {
1093                             tuple_accepted();
1094                             current->status.store( SUCCEEDED, std::memory_order_release);
1095                         }
1096                         else current->status.store( FAILED, std::memory_order_release);
1097                     }
1098                     else current->status.store( FAILED, std::memory_order_release);
1099                     break;
1100                 case do_fwrd_bypass: {
1101                         bool build_succeeded;
1102                         graph_task *last_task = nullptr;
1103                         output_type out;
1104                         // forwarding must be exclusive, because try_to_make_tuple and tuple_accepted
1105                         // are separate locked methods in the FE.  We could conceivably fetch the front
1106                         // of the FE queue, then be swapped out, have someone else consume the FE's
1107                         // object, then come back, forward, and then try to remove it from the queue
1108                         // again. Without reservation of the FE, the methods accessing it must be locked.
1109                         // We could remember the keys of the objects we forwarded, and then remove
1110                         // them from the input ports after forwarding is complete?
1111                         if(tuple_build_may_succeed()) {  // checks output queue of FE
1112                             do {
1113                                 build_succeeded = try_to_make_tuple(out);  // fetch front_end of queue
1114                                 if(build_succeeded) {
1115                                     graph_task *new_task = my_successors.try_put_task(out);
1116                                     last_task = combine_tasks(my_graph, last_task, new_task);
1117                                     if(new_task) {
1118                                         tuple_accepted();
1119                                     }
1120                                     else {
1121                                         tuple_rejected();
1122                                         build_succeeded = false;
1123                                     }
1124                                 }
1125                             } while(build_succeeded);
1126                         }
1127                         current->bypass_t = last_task;
1128                         current->status.store( SUCCEEDED, std::memory_order_release);
1129                         forwarder_busy = false;
1130                     }
1131                     break;
1132                 }
1133             }
1134         }
1135         // ---------- end aggregator -----------
1136     public:
1137         join_node_base(graph &g)
1138             : graph_node(g), input_ports_type(g), forwarder_busy(false), my_successors(this)
1139         {
1140             input_ports_type::set_my_node(this);
1141             my_aggregator.initialize_handler(handler_type(this));
1142         }
1143 
1144         join_node_base(const join_node_base& other) :
1145             graph_node(other.graph_node::my_graph), input_ports_type(other),
1146             sender<OutputTuple>(), forwarder_busy(false), my_successors(this)
1147         {
1148             input_ports_type::set_my_node(this);
1149             my_aggregator.initialize_handler(handler_type(this));
1150         }
1151 
1152         template<typename FunctionTuple>
1153         join_node_base(graph &g, FunctionTuple f)
1154             : graph_node(g), input_ports_type(g, f), forwarder_busy(false), my_successors(this)
1155         {
1156             input_ports_type::set_my_node(this);
1157             my_aggregator.initialize_handler(handler_type(this));
1158         }
1159 
1160         bool register_successor(successor_type &r) override {
1161             join_node_base_operation op_data(r, reg_succ);
1162             my_aggregator.execute(&op_data);
1163             return op_data.status == SUCCEEDED;
1164         }
1165 
1166         bool remove_successor( successor_type &r) override {
1167             join_node_base_operation op_data(r, rem_succ);
1168             my_aggregator.execute(&op_data);
1169             return op_data.status == SUCCEEDED;
1170         }
1171 
1172         bool try_get( output_type &v) override {
1173             join_node_base_operation op_data(v, try__get);
1174             my_aggregator.execute(&op_data);
1175             return op_data.status == SUCCEEDED;
1176         }
1177 
1178     protected:
1179         void reset_node(reset_flags f) override {
1180             input_ports_type::reset(f);
1181             if(f & rf_clear_edges) my_successors.clear();
1182         }
1183 
1184     private:
1185         broadcast_cache<output_type, null_rw_mutex> my_successors;
1186 
1187         friend class forward_task_bypass< join_node_base<JP, InputTuple, OutputTuple> >;
1188         graph_task *forward_task() {
1189             join_node_base_operation op_data(do_fwrd_bypass);
1190             my_aggregator.execute(&op_data);
1191             return op_data.bypass_t;
1192         }
1193 
1194     };  // join_node_base
1195 
1196     // join base class type generator
1197     template<int N, template<class> class PT, typename OutputTuple, typename JP>
1198     struct join_base {
1199         typedef join_node_base<JP, typename wrap_tuple_elements<N,PT,OutputTuple>::type, OutputTuple> type;
1200     };
1201 
1202     template<int N, typename OutputTuple, typename K, typename KHash>
1203     struct join_base<N, key_matching_port, OutputTuple, key_matching<K,KHash> > {
1204         typedef key_matching<K, KHash> key_traits_type;
1205         typedef K key_type;
1206         typedef KHash key_hash_compare;
1207         typedef join_node_base< key_traits_type,
1208                 // ports type
1209                 typename wrap_key_tuple_elements<N,key_matching_port,key_traits_type,OutputTuple>::type,
1210                 OutputTuple > type;
1211     };
1212 
1213     //! unfolded_join_node : passes input_ports_type to join_node_base.  We build the input port type
1214     //  using tuple_element.  The class PT is the port type (reserving_port, queueing_port, key_matching_port)
1215     //  and should match the typename.
1216 
1217     template<int M, template<class> class PT, typename OutputTuple, typename JP>
1218     class unfolded_join_node : public join_base<M,PT,OutputTuple,JP>::type {
1219     public:
1220         typedef typename wrap_tuple_elements<M, PT, OutputTuple>::type input_ports_type;
1221         typedef OutputTuple output_type;
1222     private:
1223         typedef join_node_base<JP, input_ports_type, output_type > base_type;
1224     public:
1225         unfolded_join_node(graph &g) : base_type(g) {}
1226         unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1227     };
1228 
1229 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1230     template <typename K, typename T>
1231     struct key_from_message_body {
1232         K operator()(const T& t) const {
1233             return key_from_message<K>(t);
1234         }
1235     };
1236     // Adds const to reference type
1237     template <typename K, typename T>
1238     struct key_from_message_body<K&,T> {
1239         const K& operator()(const T& t) const {
1240             return key_from_message<const K&>(t);
1241         }
1242     };
1243 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
1244     // key_matching unfolded_join_node.  This must be a separate specialization because the constructors
1245     // differ.
1246 
1247     template<typename OutputTuple, typename K, typename KHash>
1248     class unfolded_join_node<2,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1249             join_base<2,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1250         typedef typename std::tuple_element<0, OutputTuple>::type T0;
1251         typedef typename std::tuple_element<1, OutputTuple>::type T1;
1252     public:
1253         typedef typename wrap_key_tuple_elements<2,key_matching_port,key_matching<K,KHash>,OutputTuple>::type input_ports_type;
1254         typedef OutputTuple output_type;
1255     private:
1256         typedef join_node_base<key_matching<K,KHash>, input_ports_type, output_type > base_type;
1257         typedef type_to_key_function_body<T0, K> *f0_p;
1258         typedef type_to_key_function_body<T1, K> *f1_p;
1259         typedef std::tuple< f0_p, f1_p > func_initializer_type;
1260     public:
1261 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1262         unfolded_join_node(graph &g) : base_type(g,
1263                 func_initializer_type(
1264                     new type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1265                     new type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>())
1266                     ) ) {
1267         }
1268 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
1269         template<typename Body0, typename Body1>
1270         unfolded_join_node(graph &g, Body0 body0, Body1 body1) : base_type(g,
1271                 func_initializer_type(
1272                     new type_to_key_function_body_leaf<T0, K, Body0>(body0),
1273                     new type_to_key_function_body_leaf<T1, K, Body1>(body1)
1274                     ) ) {
1275             static_assert(std::tuple_size<OutputTuple>::value == 2, "wrong number of body initializers");
1276         }
1277         unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1278     };
1279 
1280     template<typename OutputTuple, typename K, typename KHash>
1281     class unfolded_join_node<3,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1282             join_base<3,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1283         typedef typename std::tuple_element<0, OutputTuple>::type T0;
1284         typedef typename std::tuple_element<1, OutputTuple>::type T1;
1285         typedef typename std::tuple_element<2, OutputTuple>::type T2;
1286     public:
1287         typedef typename wrap_key_tuple_elements<3,key_matching_port,key_matching<K,KHash>,OutputTuple>::type input_ports_type;
1288         typedef OutputTuple output_type;
1289     private:
1290         typedef join_node_base<key_matching<K,KHash>, input_ports_type, output_type > base_type;
1291         typedef type_to_key_function_body<T0, K> *f0_p;
1292         typedef type_to_key_function_body<T1, K> *f1_p;
1293         typedef type_to_key_function_body<T2, K> *f2_p;
1294         typedef std::tuple< f0_p, f1_p, f2_p > func_initializer_type;
1295     public:
1296 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1297         unfolded_join_node(graph &g) : base_type(g,
1298                 func_initializer_type(
1299                     new type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1300                     new type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()),
1301                     new type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>())
1302                     ) ) {
1303         }
1304 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
1305         template<typename Body0, typename Body1, typename Body2>
1306         unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2) : base_type(g,
1307                 func_initializer_type(
1308                     new type_to_key_function_body_leaf<T0, K, Body0>(body0),
1309                     new type_to_key_function_body_leaf<T1, K, Body1>(body1),
1310                     new type_to_key_function_body_leaf<T2, K, Body2>(body2)
1311                     ) ) {
1312             static_assert(std::tuple_size<OutputTuple>::value == 3, "wrong number of body initializers");
1313         }
1314         unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1315     };
1316 
1317     template<typename OutputTuple, typename K, typename KHash>
1318     class unfolded_join_node<4,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1319             join_base<4,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1320         typedef typename std::tuple_element<0, OutputTuple>::type T0;
1321         typedef typename std::tuple_element<1, OutputTuple>::type T1;
1322         typedef typename std::tuple_element<2, OutputTuple>::type T2;
1323         typedef typename std::tuple_element<3, OutputTuple>::type T3;
1324     public:
1325         typedef typename wrap_key_tuple_elements<4,key_matching_port,key_matching<K,KHash>,OutputTuple>::type input_ports_type;
1326         typedef OutputTuple output_type;
1327     private:
1328         typedef join_node_base<key_matching<K,KHash>, input_ports_type, output_type > base_type;
1329         typedef type_to_key_function_body<T0, K> *f0_p;
1330         typedef type_to_key_function_body<T1, K> *f1_p;
1331         typedef type_to_key_function_body<T2, K> *f2_p;
1332         typedef type_to_key_function_body<T3, K> *f3_p;
1333         typedef std::tuple< f0_p, f1_p, f2_p, f3_p > func_initializer_type;
1334     public:
1335 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1336         unfolded_join_node(graph &g) : base_type(g,
1337                 func_initializer_type(
1338                     new type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1339                     new type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()),
1340                     new type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()),
1341                     new type_to_key_function_body_leaf<T3, K, key_from_message_body<K,T3> >(key_from_message_body<K,T3>())
1342                     ) ) {
1343         }
1344 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
1345         template<typename Body0, typename Body1, typename Body2, typename Body3>
1346         unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3) : base_type(g,
1347                 func_initializer_type(
1348                     new type_to_key_function_body_leaf<T0, K, Body0>(body0),
1349                     new type_to_key_function_body_leaf<T1, K, Body1>(body1),
1350                     new type_to_key_function_body_leaf<T2, K, Body2>(body2),
1351                     new type_to_key_function_body_leaf<T3, K, Body3>(body3)
1352                     ) ) {
1353             static_assert(std::tuple_size<OutputTuple>::value == 4, "wrong number of body initializers");
1354         }
1355         unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1356     };
1357 
1358     template<typename OutputTuple, typename K, typename KHash>
1359     class unfolded_join_node<5,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1360             join_base<5,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1361         typedef typename std::tuple_element<0, OutputTuple>::type T0;
1362         typedef typename std::tuple_element<1, OutputTuple>::type T1;
1363         typedef typename std::tuple_element<2, OutputTuple>::type T2;
1364         typedef typename std::tuple_element<3, OutputTuple>::type T3;
1365         typedef typename std::tuple_element<4, OutputTuple>::type T4;
1366     public:
1367         typedef typename wrap_key_tuple_elements<5,key_matching_port,key_matching<K,KHash>,OutputTuple>::type input_ports_type;
1368         typedef OutputTuple output_type;
1369     private:
1370         typedef join_node_base<key_matching<K,KHash> , input_ports_type, output_type > base_type;
1371         typedef type_to_key_function_body<T0, K> *f0_p;
1372         typedef type_to_key_function_body<T1, K> *f1_p;
1373         typedef type_to_key_function_body<T2, K> *f2_p;
1374         typedef type_to_key_function_body<T3, K> *f3_p;
1375         typedef type_to_key_function_body<T4, K> *f4_p;
1376         typedef std::tuple< f0_p, f1_p, f2_p, f3_p, f4_p > func_initializer_type;
1377     public:
1378 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1379         unfolded_join_node(graph &g) : base_type(g,
1380                 func_initializer_type(
1381                     new type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1382                     new type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()),
1383                     new type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()),
1384                     new type_to_key_function_body_leaf<T3, K, key_from_message_body<K,T3> >(key_from_message_body<K,T3>()),
1385                     new type_to_key_function_body_leaf<T4, K, key_from_message_body<K,T4> >(key_from_message_body<K,T4>())
1386                     ) ) {
1387         }
1388 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
1389         template<typename Body0, typename Body1, typename Body2, typename Body3, typename Body4>
1390         unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4) : base_type(g,
1391                 func_initializer_type(
1392                     new type_to_key_function_body_leaf<T0, K, Body0>(body0),
1393                     new type_to_key_function_body_leaf<T1, K, Body1>(body1),
1394                     new type_to_key_function_body_leaf<T2, K, Body2>(body2),
1395                     new type_to_key_function_body_leaf<T3, K, Body3>(body3),
1396                     new type_to_key_function_body_leaf<T4, K, Body4>(body4)
1397                     ) ) {
1398             static_assert(std::tuple_size<OutputTuple>::value == 5, "wrong number of body initializers");
1399         }
1400         unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1401     };
1402 
1403 #if __TBB_VARIADIC_MAX >= 6
1404     template<typename OutputTuple, typename K, typename KHash>
1405     class unfolded_join_node<6,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1406             join_base<6,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1407         typedef typename std::tuple_element<0, OutputTuple>::type T0;
1408         typedef typename std::tuple_element<1, OutputTuple>::type T1;
1409         typedef typename std::tuple_element<2, OutputTuple>::type T2;
1410         typedef typename std::tuple_element<3, OutputTuple>::type T3;
1411         typedef typename std::tuple_element<4, OutputTuple>::type T4;
1412         typedef typename std::tuple_element<5, OutputTuple>::type T5;
1413     public:
1414         typedef typename wrap_key_tuple_elements<6,key_matching_port,key_matching<K,KHash>,OutputTuple>::type input_ports_type;
1415         typedef OutputTuple output_type;
1416     private:
1417         typedef join_node_base<key_matching<K,KHash> , input_ports_type, output_type > base_type;
1418         typedef type_to_key_function_body<T0, K> *f0_p;
1419         typedef type_to_key_function_body<T1, K> *f1_p;
1420         typedef type_to_key_function_body<T2, K> *f2_p;
1421         typedef type_to_key_function_body<T3, K> *f3_p;
1422         typedef type_to_key_function_body<T4, K> *f4_p;
1423         typedef type_to_key_function_body<T5, K> *f5_p;
1424         typedef std::tuple< f0_p, f1_p, f2_p, f3_p, f4_p, f5_p > func_initializer_type;
1425     public:
1426 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1427         unfolded_join_node(graph &g) : base_type(g,
1428                 func_initializer_type(
1429                     new type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1430                     new type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()),
1431                     new type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()),
1432                     new type_to_key_function_body_leaf<T3, K, key_from_message_body<K,T3> >(key_from_message_body<K,T3>()),
1433                     new type_to_key_function_body_leaf<T4, K, key_from_message_body<K,T4> >(key_from_message_body<K,T4>()),
1434                     new type_to_key_function_body_leaf<T5, K, key_from_message_body<K,T5> >(key_from_message_body<K,T5>())
1435                     ) ) {
1436         }
1437 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
1438         template<typename Body0, typename Body1, typename Body2, typename Body3, typename Body4, typename Body5>
1439         unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4, Body5 body5)
1440                 : base_type(g, func_initializer_type(
1441                     new type_to_key_function_body_leaf<T0, K, Body0>(body0),
1442                     new type_to_key_function_body_leaf<T1, K, Body1>(body1),
1443                     new type_to_key_function_body_leaf<T2, K, Body2>(body2),
1444                     new type_to_key_function_body_leaf<T3, K, Body3>(body3),
1445                     new type_to_key_function_body_leaf<T4, K, Body4>(body4),
1446                     new type_to_key_function_body_leaf<T5, K, Body5>(body5)
1447                     ) ) {
1448             static_assert(std::tuple_size<OutputTuple>::value == 6, "wrong number of body initializers");
1449         }
1450         unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1451     };
1452 #endif
1453 
1454 #if __TBB_VARIADIC_MAX >= 7
1455     template<typename OutputTuple, typename K, typename KHash>
1456     class unfolded_join_node<7,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1457             join_base<7,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1458         typedef typename std::tuple_element<0, OutputTuple>::type T0;
1459         typedef typename std::tuple_element<1, OutputTuple>::type T1;
1460         typedef typename std::tuple_element<2, OutputTuple>::type T2;
1461         typedef typename std::tuple_element<3, OutputTuple>::type T3;
1462         typedef typename std::tuple_element<4, OutputTuple>::type T4;
1463         typedef typename std::tuple_element<5, OutputTuple>::type T5;
1464         typedef typename std::tuple_element<6, OutputTuple>::type T6;
1465     public:
1466         typedef typename wrap_key_tuple_elements<7,key_matching_port,key_matching<K,KHash>,OutputTuple>::type input_ports_type;
1467         typedef OutputTuple output_type;
1468     private:
1469         typedef join_node_base<key_matching<K,KHash> , input_ports_type, output_type > base_type;
1470         typedef type_to_key_function_body<T0, K> *f0_p;
1471         typedef type_to_key_function_body<T1, K> *f1_p;
1472         typedef type_to_key_function_body<T2, K> *f2_p;
1473         typedef type_to_key_function_body<T3, K> *f3_p;
1474         typedef type_to_key_function_body<T4, K> *f4_p;
1475         typedef type_to_key_function_body<T5, K> *f5_p;
1476         typedef type_to_key_function_body<T6, K> *f6_p;
1477         typedef std::tuple< f0_p, f1_p, f2_p, f3_p, f4_p, f5_p, f6_p > func_initializer_type;
1478     public:
1479 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1480         unfolded_join_node(graph &g) : base_type(g,
1481                 func_initializer_type(
1482                     new type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1483                     new type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()),
1484                     new type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()),
1485                     new type_to_key_function_body_leaf<T3, K, key_from_message_body<K,T3> >(key_from_message_body<K,T3>()),
1486                     new type_to_key_function_body_leaf<T4, K, key_from_message_body<K,T4> >(key_from_message_body<K,T4>()),
1487                     new type_to_key_function_body_leaf<T5, K, key_from_message_body<K,T5> >(key_from_message_body<K,T5>()),
1488                     new type_to_key_function_body_leaf<T6, K, key_from_message_body<K,T6> >(key_from_message_body<K,T6>())
1489                     ) ) {
1490         }
1491 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
1492         template<typename Body0, typename Body1, typename Body2, typename Body3, typename Body4,
1493                  typename Body5, typename Body6>
1494         unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4,
1495                 Body5 body5, Body6 body6) : base_type(g, func_initializer_type(
1496                     new type_to_key_function_body_leaf<T0, K, Body0>(body0),
1497                     new type_to_key_function_body_leaf<T1, K, Body1>(body1),
1498                     new type_to_key_function_body_leaf<T2, K, Body2>(body2),
1499                     new type_to_key_function_body_leaf<T3, K, Body3>(body3),
1500                     new type_to_key_function_body_leaf<T4, K, Body4>(body4),
1501                     new type_to_key_function_body_leaf<T5, K, Body5>(body5),
1502                     new type_to_key_function_body_leaf<T6, K, Body6>(body6)
1503                     ) ) {
1504             static_assert(std::tuple_size<OutputTuple>::value == 7, "wrong number of body initializers");
1505         }
1506         unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1507     };
1508 #endif
1509 
1510 #if __TBB_VARIADIC_MAX >= 8
1511     template<typename OutputTuple, typename K, typename KHash>
1512     class unfolded_join_node<8,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1513             join_base<8,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1514         typedef typename std::tuple_element<0, OutputTuple>::type T0;
1515         typedef typename std::tuple_element<1, OutputTuple>::type T1;
1516         typedef typename std::tuple_element<2, OutputTuple>::type T2;
1517         typedef typename std::tuple_element<3, OutputTuple>::type T3;
1518         typedef typename std::tuple_element<4, OutputTuple>::type T4;
1519         typedef typename std::tuple_element<5, OutputTuple>::type T5;
1520         typedef typename std::tuple_element<6, OutputTuple>::type T6;
1521         typedef typename std::tuple_element<7, OutputTuple>::type T7;
1522     public:
1523         typedef typename wrap_key_tuple_elements<8,key_matching_port,key_matching<K,KHash>,OutputTuple>::type input_ports_type;
1524         typedef OutputTuple output_type;
1525     private:
1526         typedef join_node_base<key_matching<K,KHash> , input_ports_type, output_type > base_type;
1527         typedef type_to_key_function_body<T0, K> *f0_p;
1528         typedef type_to_key_function_body<T1, K> *f1_p;
1529         typedef type_to_key_function_body<T2, K> *f2_p;
1530         typedef type_to_key_function_body<T3, K> *f3_p;
1531         typedef type_to_key_function_body<T4, K> *f4_p;
1532         typedef type_to_key_function_body<T5, K> *f5_p;
1533         typedef type_to_key_function_body<T6, K> *f6_p;
1534         typedef type_to_key_function_body<T7, K> *f7_p;
1535         typedef std::tuple< f0_p, f1_p, f2_p, f3_p, f4_p, f5_p, f6_p, f7_p > func_initializer_type;
1536     public:
1537 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1538         unfolded_join_node(graph &g) : base_type(g,
1539                 func_initializer_type(
1540                     new type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1541                     new type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()),
1542                     new type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()),
1543                     new type_to_key_function_body_leaf<T3, K, key_from_message_body<K,T3> >(key_from_message_body<K,T3>()),
1544                     new type_to_key_function_body_leaf<T4, K, key_from_message_body<K,T4> >(key_from_message_body<K,T4>()),
1545                     new type_to_key_function_body_leaf<T5, K, key_from_message_body<K,T5> >(key_from_message_body<K,T5>()),
1546                     new type_to_key_function_body_leaf<T6, K, key_from_message_body<K,T6> >(key_from_message_body<K,T6>()),
1547                     new type_to_key_function_body_leaf<T7, K, key_from_message_body<K,T7> >(key_from_message_body<K,T7>())
1548                     ) ) {
1549         }
1550 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
1551         template<typename Body0, typename Body1, typename Body2, typename Body3, typename Body4,
1552                  typename Body5, typename Body6, typename Body7>
1553         unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4,
1554                 Body5 body5, Body6 body6, Body7 body7) : base_type(g, func_initializer_type(
1555                     new type_to_key_function_body_leaf<T0, K, Body0>(body0),
1556                     new type_to_key_function_body_leaf<T1, K, Body1>(body1),
1557                     new type_to_key_function_body_leaf<T2, K, Body2>(body2),
1558                     new type_to_key_function_body_leaf<T3, K, Body3>(body3),
1559                     new type_to_key_function_body_leaf<T4, K, Body4>(body4),
1560                     new type_to_key_function_body_leaf<T5, K, Body5>(body5),
1561                     new type_to_key_function_body_leaf<T6, K, Body6>(body6),
1562                     new type_to_key_function_body_leaf<T7, K, Body7>(body7)
1563                     ) ) {
1564             static_assert(std::tuple_size<OutputTuple>::value == 8, "wrong number of body initializers");
1565         }
1566         unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1567     };
1568 #endif
1569 
1570 #if __TBB_VARIADIC_MAX >= 9
1571     template<typename OutputTuple, typename K, typename KHash>
1572     class unfolded_join_node<9,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1573             join_base<9,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1574         typedef typename std::tuple_element<0, OutputTuple>::type T0;
1575         typedef typename std::tuple_element<1, OutputTuple>::type T1;
1576         typedef typename std::tuple_element<2, OutputTuple>::type T2;
1577         typedef typename std::tuple_element<3, OutputTuple>::type T3;
1578         typedef typename std::tuple_element<4, OutputTuple>::type T4;
1579         typedef typename std::tuple_element<5, OutputTuple>::type T5;
1580         typedef typename std::tuple_element<6, OutputTuple>::type T6;
1581         typedef typename std::tuple_element<7, OutputTuple>::type T7;
1582         typedef typename std::tuple_element<8, OutputTuple>::type T8;
1583     public:
1584         typedef typename wrap_key_tuple_elements<9,key_matching_port,key_matching<K,KHash>,OutputTuple>::type input_ports_type;
1585         typedef OutputTuple output_type;
1586     private:
1587         typedef join_node_base<key_matching<K,KHash> , input_ports_type, output_type > base_type;
1588         typedef type_to_key_function_body<T0, K> *f0_p;
1589         typedef type_to_key_function_body<T1, K> *f1_p;
1590         typedef type_to_key_function_body<T2, K> *f2_p;
1591         typedef type_to_key_function_body<T3, K> *f3_p;
1592         typedef type_to_key_function_body<T4, K> *f4_p;
1593         typedef type_to_key_function_body<T5, K> *f5_p;
1594         typedef type_to_key_function_body<T6, K> *f6_p;
1595         typedef type_to_key_function_body<T7, K> *f7_p;
1596         typedef type_to_key_function_body<T8, K> *f8_p;
1597         typedef std::tuple< f0_p, f1_p, f2_p, f3_p, f4_p, f5_p, f6_p, f7_p, f8_p > func_initializer_type;
1598     public:
1599 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1600         unfolded_join_node(graph &g) : base_type(g,
1601                 func_initializer_type(
1602                     new type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1603                     new type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()),
1604                     new type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()),
1605                     new type_to_key_function_body_leaf<T3, K, key_from_message_body<K,T3> >(key_from_message_body<K,T3>()),
1606                     new type_to_key_function_body_leaf<T4, K, key_from_message_body<K,T4> >(key_from_message_body<K,T4>()),
1607                     new type_to_key_function_body_leaf<T5, K, key_from_message_body<K,T5> >(key_from_message_body<K,T5>()),
1608                     new type_to_key_function_body_leaf<T6, K, key_from_message_body<K,T6> >(key_from_message_body<K,T6>()),
1609                     new type_to_key_function_body_leaf<T7, K, key_from_message_body<K,T7> >(key_from_message_body<K,T7>()),
1610                     new type_to_key_function_body_leaf<T8, K, key_from_message_body<K,T8> >(key_from_message_body<K,T8>())
1611                     ) ) {
1612         }
1613 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
1614         template<typename Body0, typename Body1, typename Body2, typename Body3, typename Body4,
1615                  typename Body5, typename Body6, typename Body7, typename Body8>
1616         unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4,
1617                 Body5 body5, Body6 body6, Body7 body7, Body8 body8) : base_type(g, func_initializer_type(
1618                     new type_to_key_function_body_leaf<T0, K, Body0>(body0),
1619                     new type_to_key_function_body_leaf<T1, K, Body1>(body1),
1620                     new type_to_key_function_body_leaf<T2, K, Body2>(body2),
1621                     new type_to_key_function_body_leaf<T3, K, Body3>(body3),
1622                     new type_to_key_function_body_leaf<T4, K, Body4>(body4),
1623                     new type_to_key_function_body_leaf<T5, K, Body5>(body5),
1624                     new type_to_key_function_body_leaf<T6, K, Body6>(body6),
1625                     new type_to_key_function_body_leaf<T7, K, Body7>(body7),
1626                     new type_to_key_function_body_leaf<T8, K, Body8>(body8)
1627                     ) ) {
1628             static_assert(std::tuple_size<OutputTuple>::value == 9, "wrong number of body initializers");
1629         }
1630         unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1631     };
1632 #endif
1633 
1634 #if __TBB_VARIADIC_MAX >= 10
1635     template<typename OutputTuple, typename K, typename KHash>
1636     class unfolded_join_node<10,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1637             join_base<10,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1638         typedef typename std::tuple_element<0, OutputTuple>::type T0;
1639         typedef typename std::tuple_element<1, OutputTuple>::type T1;
1640         typedef typename std::tuple_element<2, OutputTuple>::type T2;
1641         typedef typename std::tuple_element<3, OutputTuple>::type T3;
1642         typedef typename std::tuple_element<4, OutputTuple>::type T4;
1643         typedef typename std::tuple_element<5, OutputTuple>::type T5;
1644         typedef typename std::tuple_element<6, OutputTuple>::type T6;
1645         typedef typename std::tuple_element<7, OutputTuple>::type T7;
1646         typedef typename std::tuple_element<8, OutputTuple>::type T8;
1647         typedef typename std::tuple_element<9, OutputTuple>::type T9;
1648     public:
1649         typedef typename wrap_key_tuple_elements<10,key_matching_port,key_matching<K,KHash>,OutputTuple>::type input_ports_type;
1650         typedef OutputTuple output_type;
1651     private:
1652         typedef join_node_base<key_matching<K,KHash> , input_ports_type, output_type > base_type;
1653         typedef type_to_key_function_body<T0, K> *f0_p;
1654         typedef type_to_key_function_body<T1, K> *f1_p;
1655         typedef type_to_key_function_body<T2, K> *f2_p;
1656         typedef type_to_key_function_body<T3, K> *f3_p;
1657         typedef type_to_key_function_body<T4, K> *f4_p;
1658         typedef type_to_key_function_body<T5, K> *f5_p;
1659         typedef type_to_key_function_body<T6, K> *f6_p;
1660         typedef type_to_key_function_body<T7, K> *f7_p;
1661         typedef type_to_key_function_body<T8, K> *f8_p;
1662         typedef type_to_key_function_body<T9, K> *f9_p;
1663         typedef std::tuple< f0_p, f1_p, f2_p, f3_p, f4_p, f5_p, f6_p, f7_p, f8_p, f9_p > func_initializer_type;
1664     public:
1665 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1666         unfolded_join_node(graph &g) : base_type(g,
1667                 func_initializer_type(
1668                     new type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1669                     new type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()),
1670                     new type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()),
1671                     new type_to_key_function_body_leaf<T3, K, key_from_message_body<K,T3> >(key_from_message_body<K,T3>()),
1672                     new type_to_key_function_body_leaf<T4, K, key_from_message_body<K,T4> >(key_from_message_body<K,T4>()),
1673                     new type_to_key_function_body_leaf<T5, K, key_from_message_body<K,T5> >(key_from_message_body<K,T5>()),
1674                     new type_to_key_function_body_leaf<T6, K, key_from_message_body<K,T6> >(key_from_message_body<K,T6>()),
1675                     new type_to_key_function_body_leaf<T7, K, key_from_message_body<K,T7> >(key_from_message_body<K,T7>()),
1676                     new type_to_key_function_body_leaf<T8, K, key_from_message_body<K,T8> >(key_from_message_body<K,T8>()),
1677                     new type_to_key_function_body_leaf<T9, K, key_from_message_body<K,T9> >(key_from_message_body<K,T9>())
1678                     ) ) {
1679         }
1680 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
1681         template<typename Body0, typename Body1, typename Body2, typename Body3, typename Body4,
1682             typename Body5, typename Body6, typename Body7, typename Body8, typename Body9>
1683         unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4,
1684                 Body5 body5, Body6 body6, Body7 body7, Body8 body8, Body9 body9) : base_type(g, func_initializer_type(
1685                     new type_to_key_function_body_leaf<T0, K, Body0>(body0),
1686                     new type_to_key_function_body_leaf<T1, K, Body1>(body1),
1687                     new type_to_key_function_body_leaf<T2, K, Body2>(body2),
1688                     new type_to_key_function_body_leaf<T3, K, Body3>(body3),
1689                     new type_to_key_function_body_leaf<T4, K, Body4>(body4),
1690                     new type_to_key_function_body_leaf<T5, K, Body5>(body5),
1691                     new type_to_key_function_body_leaf<T6, K, Body6>(body6),
1692                     new type_to_key_function_body_leaf<T7, K, Body7>(body7),
1693                     new type_to_key_function_body_leaf<T8, K, Body8>(body8),
1694                     new type_to_key_function_body_leaf<T9, K, Body9>(body9)
1695                     ) ) {
1696             static_assert(std::tuple_size<OutputTuple>::value == 10, "wrong number of body initializers");
1697         }
1698         unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1699     };
1700 #endif
1701 
1702     //! templated function to refer to input ports of the join node
1703     template<size_t N, typename JNT>
1704     typename std::tuple_element<N, typename JNT::input_ports_type>::type &input_port(JNT &jn) {
1705         return std::get<N>(jn.input_ports());
1706     }
1707 
1708 #endif // __TBB__flow_graph_join_impl_H
1709