xref: /oneTBB/test/tbb/test_join_node.h (revision 5e91b2c0)
151c0b2f7Stbbdev /*
2c21e688aSSergey Zheltov     Copyright (c) 2005-2022 Intel Corporation
351c0b2f7Stbbdev 
451c0b2f7Stbbdev     Licensed under the Apache License, Version 2.0 (the "License");
551c0b2f7Stbbdev     you may not use this file except in compliance with the License.
651c0b2f7Stbbdev     You may obtain a copy of the License at
751c0b2f7Stbbdev 
851c0b2f7Stbbdev         http://www.apache.org/licenses/LICENSE-2.0
951c0b2f7Stbbdev 
1051c0b2f7Stbbdev     Unless required by applicable law or agreed to in writing, software
1151c0b2f7Stbbdev     distributed under the License is distributed on an "AS IS" BASIS,
1251c0b2f7Stbbdev     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1351c0b2f7Stbbdev     See the License for the specific language governing permissions and
1451c0b2f7Stbbdev     limitations under the License.
1551c0b2f7Stbbdev */
1651c0b2f7Stbbdev 
1751c0b2f7Stbbdev #ifndef tbb_test_join_node_H
1851c0b2f7Stbbdev #define tbb_test_join_node_H
1951c0b2f7Stbbdev 
2051c0b2f7Stbbdev #if _MSC_VER
2151c0b2f7Stbbdev // Suppress "decorated name length exceeded, name was truncated" warning
2251c0b2f7Stbbdev #if __INTEL_COMPILER
2351c0b2f7Stbbdev #pragma warning( disable: 2586 )
2451c0b2f7Stbbdev #else
2551c0b2f7Stbbdev #pragma warning( disable: 4503 )
2651c0b2f7Stbbdev #endif
2751c0b2f7Stbbdev #endif
2851c0b2f7Stbbdev 
2951c0b2f7Stbbdev #include "tbb/flow_graph.h"
3051c0b2f7Stbbdev 
3151c0b2f7Stbbdev #include "common/test.h"
3251c0b2f7Stbbdev #include "common/utils.h"
3351c0b2f7Stbbdev #include "common/checktype.h"
3451c0b2f7Stbbdev #include "common/graph_utils.h"
3551c0b2f7Stbbdev 
3651c0b2f7Stbbdev #include <type_traits>
3751c0b2f7Stbbdev 
3851c0b2f7Stbbdev const char *names[] = {
3951c0b2f7Stbbdev     "Adam", "Bruce", "Charles", "Daniel", "Evan", "Frederich", "George", "Hiram", "Ichabod",
4051c0b2f7Stbbdev     "John", "Kevin", "Leonard", "Michael", "Ned", "Olin", "Paul", "Quentin", "Ralph", "Steven",
4151c0b2f7Stbbdev     "Thomas", "Ulysses", "Victor", "Walter", "Xerxes", "Yitzhak", "Zebediah", "Anne", "Bethany",
4251c0b2f7Stbbdev     "Clarisse", "Dorothy", "Erin", "Fatima", "Gabrielle", "Helen", "Irene", "Jacqueline",
4351c0b2f7Stbbdev     "Katherine", "Lana", "Marilyn", "Noelle", "Okiilani", "Pauline", "Querida", "Rose", "Sybil",
4451c0b2f7Stbbdev     "Tatiana", "Umiko", "Victoria", "Wilma", "Xena", "Yolanda", "Zoe", "Algernon", "Benjamin",
4551c0b2f7Stbbdev     "Caleb", "Dylan", "Ezra", "Felix", "Gabriel", "Henry", "Issac", "Jasper", "Keifer",
4651c0b2f7Stbbdev     "Lincoln", "Milo", "Nathaniel", "Owen", "Peter", "Quincy", "Ronan", "Silas", "Theodore",
4751c0b2f7Stbbdev     "Uriah", "Vincent", "Wilbur", "Xavier", "Yoda", "Zachary", "Amelia", "Brielle", "Charlotte",
4851c0b2f7Stbbdev     "Daphne", "Emma", "Fiona", "Grace", "Hazel", "Isla", "Juliet", "Keira", "Lily", "Mia",
4951c0b2f7Stbbdev     "Nora", "Olivia", "Penelope", "Quintana", "Ruby", "Sophia", "Tessa", "Ursula", "Violet",
5051c0b2f7Stbbdev     "Willow", "Xanthe", "Yvonne", "ZsaZsa", "Asher", "Bennett", "Connor", "Dominic", "Ethan",
5151c0b2f7Stbbdev     "Finn", "Grayson", "Hudson", "Ian", "Jackson", "Kent", "Liam", "Matthew", "Noah", "Oliver",
5251c0b2f7Stbbdev     "Parker", "Quinn", "Rhys", "Sebastian", "Taylor", "Umberto", "Vito", "William", "Xanto",
5351c0b2f7Stbbdev     "Yogi", "Zane", "Ava", "Brenda", "Chloe", "Delilah", "Ella", "Felicity", "Genevieve",
5451c0b2f7Stbbdev     "Hannah", "Isabella", "Josephine", "Kacie", "Lucy", "Madeline", "Natalie", "Octavia",
5551c0b2f7Stbbdev     "Piper", "Qismah", "Rosalie", "Scarlett", "Tanya", "Uta", "Vivian", "Wendy", "Xola",
5651c0b2f7Stbbdev     "Yaritza", "Zanthe"};
5751c0b2f7Stbbdev 
5851c0b2f7Stbbdev static const int NameCnt = sizeof(names)/sizeof(char *);
5951c0b2f7Stbbdev 
6051c0b2f7Stbbdev template<typename K>
6151c0b2f7Stbbdev struct index_to_key {
operatorindex_to_key6251c0b2f7Stbbdev     K operator()(const int indx) {
6351c0b2f7Stbbdev         return (K)(3*indx+1);
6451c0b2f7Stbbdev     }
6551c0b2f7Stbbdev };
6651c0b2f7Stbbdev 
6751c0b2f7Stbbdev template<>
6851c0b2f7Stbbdev struct index_to_key<std::string> {
6951c0b2f7Stbbdev     std::string operator()(const int indx) {
7051c0b2f7Stbbdev         return std::string(names[indx % NameCnt]);
7151c0b2f7Stbbdev     }
7251c0b2f7Stbbdev };
7351c0b2f7Stbbdev 
7451c0b2f7Stbbdev template<typename K>
7551c0b2f7Stbbdev struct K_deref {
7651c0b2f7Stbbdev     typedef K type;
7751c0b2f7Stbbdev };
7851c0b2f7Stbbdev 
7951c0b2f7Stbbdev template<typename K>
8051c0b2f7Stbbdev struct K_deref<K&> {
8151c0b2f7Stbbdev     typedef K type;
8251c0b2f7Stbbdev };
8351c0b2f7Stbbdev 
8451c0b2f7Stbbdev template<typename K, typename V>
8551c0b2f7Stbbdev struct MyKeyFirst {
8651c0b2f7Stbbdev     K my_key;
8751c0b2f7Stbbdev     V my_value;
8851c0b2f7Stbbdev     MyKeyFirst(int i = 0, int v = 0): my_key(index_to_key<K>()(i)), my_value((V)v) {
8951c0b2f7Stbbdev     }
9051c0b2f7Stbbdev     void print_val() const {
9151c0b2f7Stbbdev         INFO("MyKeyFirst{"); print_my_value(my_key); INFO(","); print_my_value(my_value); INFO("}");
9251c0b2f7Stbbdev     }
9351c0b2f7Stbbdev     operator int() const { return (int)my_value; }
9451c0b2f7Stbbdev };
9551c0b2f7Stbbdev 
9651c0b2f7Stbbdev template<typename K, typename V>
9751c0b2f7Stbbdev struct MyKeySecond {
9851c0b2f7Stbbdev     V my_value;
9951c0b2f7Stbbdev     K my_key;
10051c0b2f7Stbbdev     MyKeySecond(int i = 0, int v = 0): my_value((V)v), my_key(index_to_key<K>()(i)) {
10151c0b2f7Stbbdev     }
10251c0b2f7Stbbdev     void print_val() const {
10351c0b2f7Stbbdev         INFO("MyKeySecond{"); print_my_value(my_key); INFO(","); print_my_value(my_value); INFO("}");
10451c0b2f7Stbbdev     }
10551c0b2f7Stbbdev     operator int() const { return (int)my_value; }
10651c0b2f7Stbbdev };
10751c0b2f7Stbbdev 
10851c0b2f7Stbbdev template<typename K, typename V>
10951c0b2f7Stbbdev struct MyMessageKeyWithoutKey {
11051c0b2f7Stbbdev     V my_value;
11151c0b2f7Stbbdev     K my_message_key;
11251c0b2f7Stbbdev     MyMessageKeyWithoutKey(int i = 0, int v = 0): my_value((V)v), my_message_key(index_to_key<K>()(i)) {
11351c0b2f7Stbbdev     }
11451c0b2f7Stbbdev     void print_val() const {
11551c0b2f7Stbbdev         INFO("MyMessageKeyWithoutKey{"); print_my_value(my_message_key); INFO(","); print_my_value(my_value); INFO("}");
11651c0b2f7Stbbdev     }
11751c0b2f7Stbbdev     operator int() const { return (int)my_value; }
11851c0b2f7Stbbdev     const K& key() const {
11951c0b2f7Stbbdev         return my_message_key;
12051c0b2f7Stbbdev     }
12151c0b2f7Stbbdev };
12251c0b2f7Stbbdev 
12351c0b2f7Stbbdev template<typename K, typename V>
12451c0b2f7Stbbdev struct MyMessageKeyWithBrokenKey {
12551c0b2f7Stbbdev     V my_value;
12651c0b2f7Stbbdev     K my_key;
12751c0b2f7Stbbdev     K my_message_key;
12851c0b2f7Stbbdev     MyMessageKeyWithBrokenKey(int i = 0, int v = 0): my_value((V)v), my_key(), my_message_key(index_to_key<K>()(i)) {
12951c0b2f7Stbbdev     }
13051c0b2f7Stbbdev     void print_val() const {
13151c0b2f7Stbbdev         INFO("MyMessageKeyWithBrokenKey{"); print_my_value(my_message_key); INFO(","); print_my_value(my_value); INFO("}");
13251c0b2f7Stbbdev     }
13351c0b2f7Stbbdev     operator int() const { return (int)my_value; }
13451c0b2f7Stbbdev     const K& key() const {
13551c0b2f7Stbbdev         return my_message_key;
13651c0b2f7Stbbdev     }
13751c0b2f7Stbbdev 
13851c0b2f7Stbbdev };
13951c0b2f7Stbbdev 
14051c0b2f7Stbbdev template<typename K, typename V>
14151c0b2f7Stbbdev struct MyKeyWithBrokenMessageKey {
14251c0b2f7Stbbdev     V my_value;
14351c0b2f7Stbbdev     K my_key;
14451c0b2f7Stbbdev     MyKeyWithBrokenMessageKey(int i = 0, int v = 0): my_value((V)v), my_key(index_to_key<K>()(i)) {
14551c0b2f7Stbbdev     }
14651c0b2f7Stbbdev     void print_val() const {
14751c0b2f7Stbbdev         INFO("MyKeyWithBrokenMessageKey{"); print_my_value(my_key); INFO(","); print_my_value(my_value); INFO("}");
14851c0b2f7Stbbdev     }
14951c0b2f7Stbbdev     operator int() const { return (int)my_value; }
15051c0b2f7Stbbdev     K key() const {
15151c0b2f7Stbbdev         CHECK_MESSAGE( (false), "The method should never be called");
15251c0b2f7Stbbdev         return K();
15351c0b2f7Stbbdev     }
15451c0b2f7Stbbdev };
15551c0b2f7Stbbdev 
15651c0b2f7Stbbdev template<typename K, typename V>
15751c0b2f7Stbbdev struct MyMessageKeyWithoutKeyMethod {
15851c0b2f7Stbbdev     V my_value;
15951c0b2f7Stbbdev     K my_message_key;
16051c0b2f7Stbbdev     MyMessageKeyWithoutKeyMethod(int i = 0, int v = 0): my_value((V)v), my_message_key(index_to_key<K>()(i)) {
16151c0b2f7Stbbdev     }
16251c0b2f7Stbbdev     void print_val() const {
16351c0b2f7Stbbdev         INFO("MyMessageKeyWithoutKeyMethod{"); print_my_value(my_message_key); INFO(","); print_my_value(my_value); INFO("}");
16451c0b2f7Stbbdev     }
16551c0b2f7Stbbdev     operator int() const { return (int)my_value; }
16651c0b2f7Stbbdev     //K key() const; // Do not define
16751c0b2f7Stbbdev };
16851c0b2f7Stbbdev 
16951c0b2f7Stbbdev // Overload for MyMessageKeyWithoutKeyMethod
17051c0b2f7Stbbdev template <typename K, typename V>
17151c0b2f7Stbbdev K key_from_message(const MyMessageKeyWithoutKeyMethod<typename std::decay<K>::type, V> &m) {
17251c0b2f7Stbbdev     return m.my_message_key;
17351c0b2f7Stbbdev }
17451c0b2f7Stbbdev 
17551c0b2f7Stbbdev 
17651c0b2f7Stbbdev // pattern for creating values in the tag_matching and key_matching, given an integer and the index in the tuple
17751c0b2f7Stbbdev template<typename TT, size_t INDEX>
17851c0b2f7Stbbdev struct make_thingie {
17951c0b2f7Stbbdev     TT operator()(int const &i) {
18051c0b2f7Stbbdev         return TT(i * (INDEX+1));
18151c0b2f7Stbbdev     }
18251c0b2f7Stbbdev };
18351c0b2f7Stbbdev 
18451c0b2f7Stbbdev template<template <typename, typename> class T, typename K, typename V, size_t INDEX>
18551c0b2f7Stbbdev struct make_thingie<T<K, V>, INDEX> {
18651c0b2f7Stbbdev     T<K, V> operator()(int const &i) {
18751c0b2f7Stbbdev         return T<K, V>(i, i*(INDEX+1));
18851c0b2f7Stbbdev     }
18951c0b2f7Stbbdev };
19051c0b2f7Stbbdev 
19151c0b2f7Stbbdev // cast_from<T>::my_int_val(i);
19251c0b2f7Stbbdev template<typename T>
19351c0b2f7Stbbdev struct cast_from {
19451c0b2f7Stbbdev     static int my_int_val(T const &i) { return (int)i; }
19551c0b2f7Stbbdev };
19651c0b2f7Stbbdev 
19751c0b2f7Stbbdev template<typename K, typename V>
19851c0b2f7Stbbdev struct cast_from<MyKeyFirst<K, V> > {
19951c0b2f7Stbbdev     static int my_int_val(MyKeyFirst<K, V> const &i) { return (int)(i.my_value); }
20051c0b2f7Stbbdev };
20151c0b2f7Stbbdev 
20251c0b2f7Stbbdev template<typename K, typename V>
20351c0b2f7Stbbdev struct cast_from<MyKeySecond<K, V> > {
20451c0b2f7Stbbdev     static int my_int_val(MyKeySecond<K, V> const &i) { return (int)(i.my_value); }
20551c0b2f7Stbbdev };
20651c0b2f7Stbbdev 
20751c0b2f7Stbbdev template<typename T>
20851c0b2f7Stbbdev void print_my_value(T const &i) {
20951c0b2f7Stbbdev     INFO(" " << cast_from<T>::my_int_val(i) << " " );
21051c0b2f7Stbbdev }
21151c0b2f7Stbbdev 
21251c0b2f7Stbbdev template<typename K, typename V>
21351c0b2f7Stbbdev void print_my_value(MyKeyFirst<K, V> const &i) {
21451c0b2f7Stbbdev     i.print_val();
21551c0b2f7Stbbdev }
21651c0b2f7Stbbdev 
21751c0b2f7Stbbdev template<typename K, typename V>
21851c0b2f7Stbbdev void print_my_value(MyKeySecond<K, V> const &i) {
21951c0b2f7Stbbdev     i.print_val();
22051c0b2f7Stbbdev }
22151c0b2f7Stbbdev 
22251c0b2f7Stbbdev template<>
22351c0b2f7Stbbdev void print_my_value(std::string const &i) {
22451c0b2f7Stbbdev     INFO("\"" << i.c_str() << "\"" );
22551c0b2f7Stbbdev }
22651c0b2f7Stbbdev 
22751c0b2f7Stbbdev //
22851c0b2f7Stbbdev // Tests
22951c0b2f7Stbbdev //
23051c0b2f7Stbbdev 
23151c0b2f7Stbbdev //!
23251c0b2f7Stbbdev // my_struct_key == given a type V with a field named my_key of type K, will return a copy of my_key
23351c0b2f7Stbbdev template<class K, typename V>
23451c0b2f7Stbbdev struct my_struct_key {
23551c0b2f7Stbbdev     K operator()(const V& mv) {
23651c0b2f7Stbbdev         return mv.my_key;
23751c0b2f7Stbbdev     }
23851c0b2f7Stbbdev };
23951c0b2f7Stbbdev 
24051c0b2f7Stbbdev // specialization returning reference to my_key.
24151c0b2f7Stbbdev template<class K, typename V>
24251c0b2f7Stbbdev struct my_struct_key<K&, V> {
243478de5b1Stbbdev     K& operator()(const V& mv) {
244478de5b1Stbbdev         return const_cast<K&>(mv.my_key);
24551c0b2f7Stbbdev     }
24651c0b2f7Stbbdev };
24751c0b2f7Stbbdev 
24851c0b2f7Stbbdev using tbb::detail::d1::type_to_key_function_body;
24951c0b2f7Stbbdev using tbb::detail::d1::hash_buffer;
25051c0b2f7Stbbdev using tbb::detail::d1::tbb_hash_compare;
25151c0b2f7Stbbdev using tbb::detail::d1::type_to_key_function_body_leaf;
25251c0b2f7Stbbdev 
25351c0b2f7Stbbdev template<class K, class V> struct VtoKFB {
25451c0b2f7Stbbdev     typedef type_to_key_function_body<V, K> type;
25551c0b2f7Stbbdev };
25651c0b2f7Stbbdev 
25751c0b2f7Stbbdev template<typename K> struct make_hash_compare { typedef tbb_hash_compare<K> type; };
25851c0b2f7Stbbdev 
25951c0b2f7Stbbdev template<typename K, class V>
26051c0b2f7Stbbdev void hash_buffer_test(const char *sname) {
26151c0b2f7Stbbdev     typedef typename K_deref<K>::type KnoR;
26251c0b2f7Stbbdev     hash_buffer<
26351c0b2f7Stbbdev         K,
26451c0b2f7Stbbdev         V,
26551c0b2f7Stbbdev         typename VtoKFB<K, V>::type,
26651c0b2f7Stbbdev         tbb_hash_compare<KnoR>
26751c0b2f7Stbbdev     > my_hash_buffer;
26851c0b2f7Stbbdev     const bool k_is_ref = std::is_reference<K>::value;
26951c0b2f7Stbbdev     typedef type_to_key_function_body_leaf<
27051c0b2f7Stbbdev         V, K, my_struct_key<K, V> > my_func_body_type;
27151c0b2f7Stbbdev     typename VtoKFB<K, V>::type *kp = new my_func_body_type(my_struct_key<K, V>());
27251c0b2f7Stbbdev     my_hash_buffer.set_key_func(kp);
27351c0b2f7Stbbdev     INFO("Running hash_buffer test on " << sname << "; is ref == " << (k_is_ref ? "true" : "false") << "\n" );
27451c0b2f7Stbbdev     V mv1, mv0;
27551c0b2f7Stbbdev     bool res;
27651c0b2f7Stbbdev     for(int cnt = 0; cnt < 2; ++cnt) {
27751c0b2f7Stbbdev         // insert 50 items after checking they are not already in the table
27851c0b2f7Stbbdev         for(int i = 0; i < 50; ++i) {
27951c0b2f7Stbbdev             KnoR kk = index_to_key<KnoR>()(i);
28051c0b2f7Stbbdev             mv1.my_key = kk;
28151c0b2f7Stbbdev             mv1.my_value = 0.5*i;
28251c0b2f7Stbbdev             res = my_hash_buffer.find_with_key(kk, mv0);
28351c0b2f7Stbbdev             CHECK_MESSAGE( (!res), "Found non-inserted item");
28451c0b2f7Stbbdev             res = my_hash_buffer.insert_with_key(mv1);
28551c0b2f7Stbbdev             CHECK_MESSAGE( (res), "insert failed");
28651c0b2f7Stbbdev             res = my_hash_buffer.find_with_key(kk, mv0);
28751c0b2f7Stbbdev             CHECK_MESSAGE( (res), "not found after insert");
28851c0b2f7Stbbdev             CHECK_MESSAGE( (mv0.my_value==mv1.my_value), "result not correct");
28951c0b2f7Stbbdev         }
29051c0b2f7Stbbdev         // go backwards checking they are still there.
29151c0b2f7Stbbdev         for(int i = 49; i>=0; --i) {
29251c0b2f7Stbbdev             KnoR kk = index_to_key<KnoR>()(i);
29351c0b2f7Stbbdev             double value = 0.5*i;
29451c0b2f7Stbbdev             res = my_hash_buffer.find_with_key(kk, mv0);
29551c0b2f7Stbbdev             CHECK_MESSAGE( (res), "find failed");
29651c0b2f7Stbbdev             CHECK_MESSAGE( (mv0.my_value==value), "result not correct");
29751c0b2f7Stbbdev         }
29851c0b2f7Stbbdev         // delete every third item, check they are gone
29951c0b2f7Stbbdev         for(int i = 0; i < 50; i += 3) {
30051c0b2f7Stbbdev             KnoR kk = index_to_key<KnoR>()(i);
30151c0b2f7Stbbdev             my_hash_buffer.delete_with_key(kk);
30251c0b2f7Stbbdev             res = my_hash_buffer.find_with_key(kk, mv0);
30351c0b2f7Stbbdev             CHECK_MESSAGE( (!res), "Found deleted item");
30451c0b2f7Stbbdev         }
30551c0b2f7Stbbdev         // check the deleted items are gone, the non-deleted items are there.
30651c0b2f7Stbbdev         for(int i = 0; i < 50; ++i) {
30751c0b2f7Stbbdev             KnoR kk = index_to_key<KnoR>()(i);
30851c0b2f7Stbbdev             double value = 0.5*i;
30951c0b2f7Stbbdev             if(i%3==0) {
31051c0b2f7Stbbdev                 res = my_hash_buffer.find_with_key(kk, mv0);
31151c0b2f7Stbbdev                 CHECK_MESSAGE( (!res), "found an item that was previously deleted");
31251c0b2f7Stbbdev             }
31351c0b2f7Stbbdev             else {
31451c0b2f7Stbbdev                 res = my_hash_buffer.find_with_key(kk, mv0);
31551c0b2f7Stbbdev                 CHECK_MESSAGE( (res), "find failed");
31651c0b2f7Stbbdev                 CHECK_MESSAGE( (mv0.my_value==value), "result not correct");
31751c0b2f7Stbbdev             }
31851c0b2f7Stbbdev         }
31951c0b2f7Stbbdev         // insert new items, check the deleted items return true, the non-deleted items return false.
32051c0b2f7Stbbdev         for(int i = 0; i < 50; ++i) {
32151c0b2f7Stbbdev             KnoR kk = index_to_key<KnoR>()(i);
32251c0b2f7Stbbdev             double value = 1.5*i;
32351c0b2f7Stbbdev             mv1.my_key = kk;
32451c0b2f7Stbbdev             mv1.my_value = value;
32551c0b2f7Stbbdev             res = my_hash_buffer.insert_with_key(mv1);
32651c0b2f7Stbbdev             if(i%3==0) {
32751c0b2f7Stbbdev                 CHECK_MESSAGE( (res), "didn't insert in empty slot");
32851c0b2f7Stbbdev             }
32951c0b2f7Stbbdev             else {
33051c0b2f7Stbbdev                 CHECK_MESSAGE( (!res), "slot was empty on insert");
33151c0b2f7Stbbdev             }
33251c0b2f7Stbbdev         }
33351c0b2f7Stbbdev         // delete all items
33451c0b2f7Stbbdev         for(int i = 0; i < 50; ++i) {
33551c0b2f7Stbbdev             KnoR kk = index_to_key<KnoR>()(i);
33651c0b2f7Stbbdev             my_hash_buffer.delete_with_key(kk);
33751c0b2f7Stbbdev             res = my_hash_buffer.find_with_key(kk, mv0);
33851c0b2f7Stbbdev             CHECK_MESSAGE( (!res), "Found deleted item");
33951c0b2f7Stbbdev         }
34051c0b2f7Stbbdev     }  // perform tasks twice
34151c0b2f7Stbbdev }
34251c0b2f7Stbbdev 
34351c0b2f7Stbbdev void
34451c0b2f7Stbbdev TestTaggedBuffers() {
34551c0b2f7Stbbdev     hash_buffer_test<int, MyKeyFirst<int, double> >("MyKeyFirst<int,double>");
34651c0b2f7Stbbdev     hash_buffer_test<int&, MyKeyFirst<int, double> >("MyKeyFirst<int,double> with int&");
34751c0b2f7Stbbdev     hash_buffer_test<int, MyKeySecond<int, double> >("MyKeySecond<int,double>");
34851c0b2f7Stbbdev 
34951c0b2f7Stbbdev     hash_buffer_test<std::string, MyKeyFirst<std::string, double> >("MyKeyFirst<std::string,double>");
35051c0b2f7Stbbdev     hash_buffer_test<std::string&, MyKeySecond<std::string, double> >("MyKeySecond<std::string,double> with std::string&");
35151c0b2f7Stbbdev }
35251c0b2f7Stbbdev 
35351c0b2f7Stbbdev struct threebyte {
35451c0b2f7Stbbdev     unsigned char b1;
35551c0b2f7Stbbdev     unsigned char b2;
35651c0b2f7Stbbdev     unsigned char b3;
35751c0b2f7Stbbdev     threebyte(int i = 0) {
35851c0b2f7Stbbdev         b1 = (unsigned char)(i&0xFF);
35951c0b2f7Stbbdev         b2 = (unsigned char)((i>>8)&0xFF);
36051c0b2f7Stbbdev         b3 = (unsigned char)((i>>16)&0xFF);
36151c0b2f7Stbbdev     }
36251c0b2f7Stbbdev     operator int() const { return (int)(b1+(b2<<8)+(b3<<16)); }
36351c0b2f7Stbbdev };
36451c0b2f7Stbbdev 
36551c0b2f7Stbbdev const int Count = 150;
36651c0b2f7Stbbdev 
36751c0b2f7Stbbdev const int Recirc_count = 1000;  // number of tuples to be generated
36851c0b2f7Stbbdev const int MaxPorts = 10;
36951c0b2f7Stbbdev const int MaxNInputs = 5; // max # of input_nodes to register for each join_node input in parallel test
37051c0b2f7Stbbdev bool outputCheck[MaxPorts][Count];  // for checking output
37151c0b2f7Stbbdev 
37251c0b2f7Stbbdev void
37351c0b2f7Stbbdev check_outputCheck(int nUsed, int maxCnt) {
37451c0b2f7Stbbdev     for(int i = 0; i < nUsed; ++i) {
37551c0b2f7Stbbdev         for(int j = 0; j < maxCnt; ++j) {
37651c0b2f7Stbbdev             CHECK_MESSAGE(outputCheck[i][j], "");
37751c0b2f7Stbbdev         }
37851c0b2f7Stbbdev     }
37951c0b2f7Stbbdev }
38051c0b2f7Stbbdev 
38151c0b2f7Stbbdev void
38251c0b2f7Stbbdev reset_outputCheck(int nUsed, int maxCnt) {
38351c0b2f7Stbbdev     for(int i = 0; i < nUsed; ++i) {
38451c0b2f7Stbbdev         for(int j = 0; j < maxCnt; ++j) {
38551c0b2f7Stbbdev             outputCheck[i][j] = false;
38651c0b2f7Stbbdev         }
38751c0b2f7Stbbdev     }
38851c0b2f7Stbbdev }
38951c0b2f7Stbbdev 
39051c0b2f7Stbbdev template<typename T>
39151c0b2f7Stbbdev class name_of {
39251c0b2f7Stbbdev public:
39351c0b2f7Stbbdev     static const char* name() { return  "Unknown"; }
39451c0b2f7Stbbdev };
39551c0b2f7Stbbdev template<typename T>
39651c0b2f7Stbbdev class name_of<CheckType<T> > {
39751c0b2f7Stbbdev public:
39851c0b2f7Stbbdev     static const char* name() { return "checktype"; }
39951c0b2f7Stbbdev };
40051c0b2f7Stbbdev template<>
40151c0b2f7Stbbdev class name_of<int> {
40251c0b2f7Stbbdev public:
40351c0b2f7Stbbdev     static const char* name() { return  "int"; }
40451c0b2f7Stbbdev };
40551c0b2f7Stbbdev template<>
40651c0b2f7Stbbdev class name_of<float> {
40751c0b2f7Stbbdev public:
40851c0b2f7Stbbdev     static const char* name() { return  "float"; }
40951c0b2f7Stbbdev };
41051c0b2f7Stbbdev template<>
41151c0b2f7Stbbdev class name_of<double> {
41251c0b2f7Stbbdev public:
41351c0b2f7Stbbdev     static const char* name() { return  "double"; }
41451c0b2f7Stbbdev };
41551c0b2f7Stbbdev template<>
41651c0b2f7Stbbdev class name_of<long> {
41751c0b2f7Stbbdev public:
41851c0b2f7Stbbdev     static const char* name() { return  "long"; }
41951c0b2f7Stbbdev };
42051c0b2f7Stbbdev template<>
42151c0b2f7Stbbdev class name_of<short> {
42251c0b2f7Stbbdev public:
42351c0b2f7Stbbdev     static const char* name() { return  "short"; }
42451c0b2f7Stbbdev };
42551c0b2f7Stbbdev template<>
42651c0b2f7Stbbdev class name_of<threebyte> {
42751c0b2f7Stbbdev public:
42851c0b2f7Stbbdev     static const char* name() { return "threebyte"; }
42951c0b2f7Stbbdev };
43051c0b2f7Stbbdev template<>
43151c0b2f7Stbbdev class name_of<std::string> {
43251c0b2f7Stbbdev public:
43351c0b2f7Stbbdev     static const char* name() { return "std::string"; }
43451c0b2f7Stbbdev };
43551c0b2f7Stbbdev template<typename K, typename V>
43651c0b2f7Stbbdev class name_of<MyKeyFirst<K, V> > {
43751c0b2f7Stbbdev public:
43851c0b2f7Stbbdev     static const char* name() { return "MyKeyFirst<K,V>"; }
43951c0b2f7Stbbdev };
44051c0b2f7Stbbdev template<typename K, typename V>
44151c0b2f7Stbbdev class name_of<MyKeySecond<K, V> > {
44251c0b2f7Stbbdev public:
44351c0b2f7Stbbdev     static const char* name() { return "MyKeySecond<K,V>"; }
44451c0b2f7Stbbdev };
44551c0b2f7Stbbdev 
44651c0b2f7Stbbdev // The additional policy to differ message based key matching from usual key matching.
44751c0b2f7Stbbdev // It only makes sense for the test because join_node is created with the key_matching policy for the both cases.
44851c0b2f7Stbbdev template <typename K, typename KHash = tbb_hash_compare<typename std::decay<K>::type > >
44951c0b2f7Stbbdev struct message_based_key_matching {};
45051c0b2f7Stbbdev 
45151c0b2f7Stbbdev // test for key_matching
45251c0b2f7Stbbdev template<class JP>
45351c0b2f7Stbbdev struct is_key_matching_join {
45451c0b2f7Stbbdev     static const bool value;
45551c0b2f7Stbbdev     typedef int key_type;  // have to define it to something
45651c0b2f7Stbbdev };
45751c0b2f7Stbbdev 
45851c0b2f7Stbbdev template<class JP>
45951c0b2f7Stbbdev const bool is_key_matching_join<JP>::value = false;
46051c0b2f7Stbbdev 
46151c0b2f7Stbbdev template<class K, class KHash>
46251c0b2f7Stbbdev struct is_key_matching_join<tbb::flow::key_matching<K, KHash> > {
46351c0b2f7Stbbdev     static const bool value;
46451c0b2f7Stbbdev     typedef K key_type;
46551c0b2f7Stbbdev };
46651c0b2f7Stbbdev 
46751c0b2f7Stbbdev template<class K, class KHash>
46851c0b2f7Stbbdev const bool is_key_matching_join<tbb::flow::key_matching<K, KHash> >::value = true;
46951c0b2f7Stbbdev 
47051c0b2f7Stbbdev template<class K, class KHash>
47151c0b2f7Stbbdev struct is_key_matching_join<message_based_key_matching<K, KHash> > {
47251c0b2f7Stbbdev     static const bool value;
47351c0b2f7Stbbdev     typedef K key_type;
47451c0b2f7Stbbdev };
47551c0b2f7Stbbdev 
47651c0b2f7Stbbdev template<class K, class KHash>
47751c0b2f7Stbbdev const bool is_key_matching_join<message_based_key_matching<K, KHash> >::value = true;
47851c0b2f7Stbbdev 
47951c0b2f7Stbbdev // for recirculating tags, input is tuple<index,continue_msg>
48051c0b2f7Stbbdev // output is index*my_mult cast to the right type
48151c0b2f7Stbbdev template<typename TT>
48251c0b2f7Stbbdev class recirc_func_body {
48351c0b2f7Stbbdev     TT my_mult;
48451c0b2f7Stbbdev public:
48551c0b2f7Stbbdev     typedef std::tuple<int, tbb::flow::continue_msg> input_type;
48651c0b2f7Stbbdev     recirc_func_body(TT multiplier): my_mult(multiplier) {}
48751c0b2f7Stbbdev     recirc_func_body(const recirc_func_body &other): my_mult(other.my_mult) { }
48851c0b2f7Stbbdev     void operator=(const recirc_func_body &other) { my_mult = other.my_mult; }
48951c0b2f7Stbbdev     TT operator()(const input_type &v) {
49051c0b2f7Stbbdev         return TT(std::get<0>(v)) * my_mult;
49151c0b2f7Stbbdev     }
49251c0b2f7Stbbdev };
49351c0b2f7Stbbdev 
49451c0b2f7Stbbdev static int input_count;  // input_nodes are serial
49551c0b2f7Stbbdev 
49651c0b2f7Stbbdev // emit input_count continue_msg
49751c0b2f7Stbbdev class recirc_input_node_body {
49851c0b2f7Stbbdev public:
49951c0b2f7Stbbdev     tbb::flow::continue_msg operator()(tbb::flow_control &fc) {
50051c0b2f7Stbbdev         if( --input_count < 0 ){
50151c0b2f7Stbbdev             fc.stop();
50251c0b2f7Stbbdev         }
50351c0b2f7Stbbdev         return tbb::flow::continue_msg();
50451c0b2f7Stbbdev     }
50551c0b2f7Stbbdev };
50651c0b2f7Stbbdev 
50751c0b2f7Stbbdev // T must be arithmetic, and shouldn't wrap around for reasonable sizes of Count (which is now 150, and maxPorts is 10,
50851c0b2f7Stbbdev // so the max number generated right now is 1500 or so.)  Input will generate a series of TT with value
50951c0b2f7Stbbdev // (init_val + (i-1)*addend) * my_mult, where i is the i-th invocation of the body.  We are attaching addend
51051c0b2f7Stbbdev // input nodes to a join_port, and each will generate part of the numerical series the port is expecting
51151c0b2f7Stbbdev // to receive.  If there is only one input node, the series order will be maintained; if more than one,
51251c0b2f7Stbbdev // this is not guaranteed.
51351c0b2f7Stbbdev template<typename TT, size_t INDEX>
51451c0b2f7Stbbdev class my_input_body {
51551c0b2f7Stbbdev     int my_count;
51651c0b2f7Stbbdev     int addend;
51751c0b2f7Stbbdev public:
51851c0b2f7Stbbdev     my_input_body(int init_val, int addto): my_count(init_val), addend(addto) { }
51951c0b2f7Stbbdev     TT operator()(tbb::flow_control& fc) {
52051c0b2f7Stbbdev         int lc = my_count;
52151c0b2f7Stbbdev         TT ret = make_thingie<TT, INDEX>()(my_count);
52251c0b2f7Stbbdev         my_count += addend;
52351c0b2f7Stbbdev         if ( lc < Count){
52451c0b2f7Stbbdev             return ret;
52551c0b2f7Stbbdev         }else{
52651c0b2f7Stbbdev             fc.stop();
52751c0b2f7Stbbdev             return TT();
52851c0b2f7Stbbdev         }
52951c0b2f7Stbbdev     }
53051c0b2f7Stbbdev };
53151c0b2f7Stbbdev 
53251c0b2f7Stbbdev template<typename TT>
53351c0b2f7Stbbdev class tag_func {
53451c0b2f7Stbbdev     TT my_mult;
53551c0b2f7Stbbdev public:
53651c0b2f7Stbbdev     tag_func(TT multiplier): my_mult(multiplier) { }
53751c0b2f7Stbbdev     // operator() will return [0 .. Count)
53851c0b2f7Stbbdev     tbb::flow::tag_value operator()(TT v) {
53951c0b2f7Stbbdev         tbb::flow::tag_value t = tbb::flow::tag_value(v/my_mult);
54051c0b2f7Stbbdev         return t;
54151c0b2f7Stbbdev     }
54251c0b2f7Stbbdev };
54351c0b2f7Stbbdev 
54451c0b2f7Stbbdev template <class JP>
54551c0b2f7Stbbdev struct filter_out_message_based_key_matching {
54651c0b2f7Stbbdev     typedef JP policy;
54751c0b2f7Stbbdev };
54851c0b2f7Stbbdev 
54951c0b2f7Stbbdev template <typename K, typename KHash>
55051c0b2f7Stbbdev struct filter_out_message_based_key_matching<message_based_key_matching<K, KHash> > {
55151c0b2f7Stbbdev     // To have message based key matching in join_node, the key_matchig policy should be specified.
55251c0b2f7Stbbdev     typedef tbb::flow::key_matching<K, KHash> policy;
55351c0b2f7Stbbdev };
55451c0b2f7Stbbdev 
55551c0b2f7Stbbdev // allocator for join_node.  This is specialized for tag_matching and key_matching joins because they require a variable number
55651c0b2f7Stbbdev // of tag_value methods passed to the constructor
55751c0b2f7Stbbdev 
55851c0b2f7Stbbdev template<int N, typename JType, class JP>
55951c0b2f7Stbbdev class makeJoin {
56051c0b2f7Stbbdev public:
56151c0b2f7Stbbdev     static JType *create(tbb::flow::graph& g) {
56251c0b2f7Stbbdev         JType *temp = new JType(g);
56351c0b2f7Stbbdev         return temp;
56451c0b2f7Stbbdev     }
56551c0b2f7Stbbdev     static void destroy(JType *p) { delete p; }
56651c0b2f7Stbbdev };
56751c0b2f7Stbbdev 
56851c0b2f7Stbbdev // for general key_matching case, each type in the tuple is a class that has the my_key field and the my_value field.
56951c0b2f7Stbbdev //
57051c0b2f7Stbbdev template<typename JType, typename K, typename KHash>
57151c0b2f7Stbbdev class makeJoin<2, JType, tbb::flow::key_matching<K, KHash> > {
57251c0b2f7Stbbdev     typedef typename JType::output_type TType;
57351c0b2f7Stbbdev     typedef typename std::tuple_element<0, TType>::type T0;
57451c0b2f7Stbbdev     typedef typename std::tuple_element<1, TType>::type T1;
57551c0b2f7Stbbdev public:
57651c0b2f7Stbbdev     static JType *create(tbb::flow::graph& g) {
57751c0b2f7Stbbdev         JType *temp = new JType(g,
57851c0b2f7Stbbdev             my_struct_key<K, T0>(),
57951c0b2f7Stbbdev             my_struct_key<K, T1>()
58051c0b2f7Stbbdev         );
58151c0b2f7Stbbdev         return temp;
58251c0b2f7Stbbdev     }
58351c0b2f7Stbbdev     static void destroy(JType *p) { delete p; }
58451c0b2f7Stbbdev };
58551c0b2f7Stbbdev 
58651c0b2f7Stbbdev template<typename JType>
58751c0b2f7Stbbdev class makeJoin<2, JType, tbb::flow::tag_matching> {
58851c0b2f7Stbbdev     typedef typename JType::output_type TType;
58951c0b2f7Stbbdev     typedef typename std::tuple_element<0, TType>::type T0;
59051c0b2f7Stbbdev     typedef typename std::tuple_element<1, TType>::type T1;
59151c0b2f7Stbbdev public:
59251c0b2f7Stbbdev     static JType *create(tbb::flow::graph& g) {
59351c0b2f7Stbbdev         JType *temp = new JType(g,
59451c0b2f7Stbbdev             tag_func<T0>(T0(2)),
59551c0b2f7Stbbdev             tag_func<T1>(T1(3))
59651c0b2f7Stbbdev         );
59751c0b2f7Stbbdev         return temp;
59851c0b2f7Stbbdev     }
59951c0b2f7Stbbdev     static void destroy(JType *p) { delete p; }
60051c0b2f7Stbbdev };
60151c0b2f7Stbbdev 
60251c0b2f7Stbbdev #if MAX_TUPLE_TEST_SIZE >= 3
60351c0b2f7Stbbdev template<typename JType, typename K, typename KHash>
60451c0b2f7Stbbdev class makeJoin<3, JType, tbb::flow::key_matching<K, KHash> > {
60551c0b2f7Stbbdev     typedef typename JType::output_type TType;
60651c0b2f7Stbbdev     typedef typename std::tuple_element<0, TType>::type T0;
60751c0b2f7Stbbdev     typedef typename std::tuple_element<1, TType>::type T1;
60851c0b2f7Stbbdev     typedef typename std::tuple_element<2, TType>::type T2;
60951c0b2f7Stbbdev public:
61051c0b2f7Stbbdev     static JType *create(tbb::flow::graph& g) {
61151c0b2f7Stbbdev         JType *temp = new JType(g,
61251c0b2f7Stbbdev             my_struct_key<K, T0>(),
61351c0b2f7Stbbdev             my_struct_key<K, T1>(),
61451c0b2f7Stbbdev             my_struct_key<K, T2>()
61551c0b2f7Stbbdev         );
61651c0b2f7Stbbdev         return temp;
61751c0b2f7Stbbdev     }
61851c0b2f7Stbbdev     static void destroy(JType *p) { delete p; }
61951c0b2f7Stbbdev };
62051c0b2f7Stbbdev 
62151c0b2f7Stbbdev template<typename JType>
62251c0b2f7Stbbdev class makeJoin<3, JType, tbb::flow::tag_matching> {
62351c0b2f7Stbbdev     typedef typename JType::output_type TType;
62451c0b2f7Stbbdev     typedef typename std::tuple_element<0, TType>::type T0;
62551c0b2f7Stbbdev     typedef typename std::tuple_element<1, TType>::type T1;
62651c0b2f7Stbbdev     typedef typename std::tuple_element<2, TType>::type T2;
62751c0b2f7Stbbdev public:
62851c0b2f7Stbbdev     static JType *create(tbb::flow::graph& g) {
62951c0b2f7Stbbdev         JType *temp = new JType(g,
63051c0b2f7Stbbdev             tag_func<T0>(T0(2)),
63151c0b2f7Stbbdev             tag_func<T1>(T1(3)),
63251c0b2f7Stbbdev             tag_func<T2>(T2(4))
63351c0b2f7Stbbdev         );
63451c0b2f7Stbbdev         return temp;
63551c0b2f7Stbbdev     }
63651c0b2f7Stbbdev     static void destroy(JType *p) { delete p; }
63751c0b2f7Stbbdev };
63851c0b2f7Stbbdev 
63951c0b2f7Stbbdev #endif
64051c0b2f7Stbbdev #if MAX_TUPLE_TEST_SIZE >= 4
64151c0b2f7Stbbdev 
64251c0b2f7Stbbdev template<typename JType, typename K, typename KHash>
64351c0b2f7Stbbdev class makeJoin<4, JType, tbb::flow::key_matching<K, KHash> > {
64451c0b2f7Stbbdev     typedef typename JType::output_type TType;
64551c0b2f7Stbbdev     typedef typename std::tuple_element<0, TType>::type T0;
64651c0b2f7Stbbdev     typedef typename std::tuple_element<1, TType>::type T1;
64751c0b2f7Stbbdev     typedef typename std::tuple_element<2, TType>::type T2;
64851c0b2f7Stbbdev     typedef typename std::tuple_element<3, TType>::type T3;
64951c0b2f7Stbbdev public:
65051c0b2f7Stbbdev     static JType *create(tbb::flow::graph& g) {
65151c0b2f7Stbbdev         JType *temp = new JType(g,
65251c0b2f7Stbbdev             my_struct_key<K, T0>(),
65351c0b2f7Stbbdev             my_struct_key<K, T1>(),
65451c0b2f7Stbbdev             my_struct_key<K, T2>(),
65551c0b2f7Stbbdev             my_struct_key<K, T3>()
65651c0b2f7Stbbdev         );
65751c0b2f7Stbbdev         return temp;
65851c0b2f7Stbbdev     }
65951c0b2f7Stbbdev     static void destroy(JType *p) { delete p; }
66051c0b2f7Stbbdev };
66151c0b2f7Stbbdev 
66251c0b2f7Stbbdev template<typename JType>
66351c0b2f7Stbbdev class makeJoin<4, JType, tbb::flow::tag_matching> {
66451c0b2f7Stbbdev     typedef typename JType::output_type TType;
66551c0b2f7Stbbdev     typedef typename std::tuple_element<0, TType>::type T0;
66651c0b2f7Stbbdev     typedef typename std::tuple_element<1, TType>::type T1;
66751c0b2f7Stbbdev     typedef typename std::tuple_element<2, TType>::type T2;
66851c0b2f7Stbbdev     typedef typename std::tuple_element<3, TType>::type T3;
66951c0b2f7Stbbdev public:
67051c0b2f7Stbbdev     static JType *create(tbb::flow::graph& g) {
67151c0b2f7Stbbdev         JType *temp = new JType(g,
67251c0b2f7Stbbdev             tag_func<T0>(T0(2)),
67351c0b2f7Stbbdev             tag_func<T1>(T1(3)),
67451c0b2f7Stbbdev             tag_func<T2>(T2(4)),
67551c0b2f7Stbbdev             tag_func<T3>(T3(5))
67651c0b2f7Stbbdev         );
67751c0b2f7Stbbdev         return temp;
67851c0b2f7Stbbdev     }
67951c0b2f7Stbbdev     static void destroy(JType *p) { delete p; }
68051c0b2f7Stbbdev };
68151c0b2f7Stbbdev 
68251c0b2f7Stbbdev #endif
68351c0b2f7Stbbdev #if MAX_TUPLE_TEST_SIZE >= 5
68451c0b2f7Stbbdev template<typename JType, typename K, typename KHash>
68551c0b2f7Stbbdev class makeJoin<5, JType, tbb::flow::key_matching<K, KHash> > {
68651c0b2f7Stbbdev     typedef typename JType::output_type TType;
68751c0b2f7Stbbdev     typedef typename std::tuple_element<0, TType>::type T0;
68851c0b2f7Stbbdev     typedef typename std::tuple_element<1, TType>::type T1;
68951c0b2f7Stbbdev     typedef typename std::tuple_element<2, TType>::type T2;
69051c0b2f7Stbbdev     typedef typename std::tuple_element<3, TType>::type T3;
69151c0b2f7Stbbdev     typedef typename std::tuple_element<4, TType>::type T4;
69251c0b2f7Stbbdev public:
69351c0b2f7Stbbdev     static JType *create(tbb::flow::graph& g) {
69451c0b2f7Stbbdev         JType *temp = new JType(g,
69551c0b2f7Stbbdev             my_struct_key<K, T0>(),
69651c0b2f7Stbbdev             my_struct_key<K, T1>(),
69751c0b2f7Stbbdev             my_struct_key<K, T2>(),
69851c0b2f7Stbbdev             my_struct_key<K, T3>(),
69951c0b2f7Stbbdev             my_struct_key<K, T4>()
70051c0b2f7Stbbdev         );
70151c0b2f7Stbbdev         return temp;
70251c0b2f7Stbbdev     }
70351c0b2f7Stbbdev     static void destroy(JType *p) { delete p; }
70451c0b2f7Stbbdev };
70551c0b2f7Stbbdev 
70651c0b2f7Stbbdev template<typename JType>
70751c0b2f7Stbbdev class makeJoin<5, JType, tbb::flow::tag_matching> {
70851c0b2f7Stbbdev     typedef typename JType::output_type TType;
70951c0b2f7Stbbdev     typedef typename std::tuple_element<0, TType>::type T0;
71051c0b2f7Stbbdev     typedef typename std::tuple_element<1, TType>::type T1;
71151c0b2f7Stbbdev     typedef typename std::tuple_element<2, TType>::type T2;
71251c0b2f7Stbbdev     typedef typename std::tuple_element<3, TType>::type T3;
71351c0b2f7Stbbdev     typedef typename std::tuple_element<4, TType>::type T4;
71451c0b2f7Stbbdev public:
71551c0b2f7Stbbdev     static JType *create(tbb::flow::graph& g) {
71651c0b2f7Stbbdev         JType *temp = new JType(g,
71751c0b2f7Stbbdev             tag_func<T0>(T0(2)),
71851c0b2f7Stbbdev             tag_func<T1>(T1(3)),
71951c0b2f7Stbbdev             tag_func<T2>(T2(4)),
72051c0b2f7Stbbdev             tag_func<T3>(T3(5)),
72151c0b2f7Stbbdev             tag_func<T4>(T4(6))
72251c0b2f7Stbbdev         );
72351c0b2f7Stbbdev         return temp;
72451c0b2f7Stbbdev     }
72551c0b2f7Stbbdev     static void destroy(JType *p) { delete p; }
72651c0b2f7Stbbdev };
72751c0b2f7Stbbdev #endif
72851c0b2f7Stbbdev #if MAX_TUPLE_TEST_SIZE >= 6
72951c0b2f7Stbbdev template<typename JType, typename K, typename KHash>
73051c0b2f7Stbbdev class makeJoin<6, JType, tbb::flow::key_matching<K, KHash> > {
73151c0b2f7Stbbdev     typedef typename JType::output_type TType;
73251c0b2f7Stbbdev     typedef typename std::tuple_element<0, TType>::type T0;
73351c0b2f7Stbbdev     typedef typename std::tuple_element<1, TType>::type T1;
73451c0b2f7Stbbdev     typedef typename std::tuple_element<2, TType>::type T2;
73551c0b2f7Stbbdev     typedef typename std::tuple_element<3, TType>::type T3;
73651c0b2f7Stbbdev     typedef typename std::tuple_element<4, TType>::type T4;
73751c0b2f7Stbbdev     typedef typename std::tuple_element<5, TType>::type T5;
73851c0b2f7Stbbdev public:
73951c0b2f7Stbbdev     static JType *create(tbb::flow::graph& g) {
74051c0b2f7Stbbdev         JType *temp = new JType(g,
74151c0b2f7Stbbdev             my_struct_key<K, T0>(),
74251c0b2f7Stbbdev             my_struct_key<K, T1>(),
74351c0b2f7Stbbdev             my_struct_key<K, T2>(),
74451c0b2f7Stbbdev             my_struct_key<K, T3>(),
74551c0b2f7Stbbdev             my_struct_key<K, T4>(),
74651c0b2f7Stbbdev             my_struct_key<K, T5>()
74751c0b2f7Stbbdev         );
74851c0b2f7Stbbdev         return temp;
74951c0b2f7Stbbdev     }
75051c0b2f7Stbbdev     static void destroy(JType *p) { delete p; }
75151c0b2f7Stbbdev };
75251c0b2f7Stbbdev 
75351c0b2f7Stbbdev template<typename JType>
75451c0b2f7Stbbdev class makeJoin<6, JType, tbb::flow::tag_matching> {
75551c0b2f7Stbbdev     typedef typename JType::output_type TType;
75651c0b2f7Stbbdev     typedef typename std::tuple_element<0, TType>::type T0;
75751c0b2f7Stbbdev     typedef typename std::tuple_element<1, TType>::type T1;
75851c0b2f7Stbbdev     typedef typename std::tuple_element<2, TType>::type T2;
75951c0b2f7Stbbdev     typedef typename std::tuple_element<3, TType>::type T3;
76051c0b2f7Stbbdev     typedef typename std::tuple_element<4, TType>::type T4;
76151c0b2f7Stbbdev     typedef typename std::tuple_element<5, TType>::type T5;
76251c0b2f7Stbbdev public:
76351c0b2f7Stbbdev     static JType *create(tbb::flow::graph& g) {
76451c0b2f7Stbbdev         JType *temp = new JType(g,
76551c0b2f7Stbbdev             tag_func<T0>(T0(2)),
76651c0b2f7Stbbdev             tag_func<T1>(T1(3)),
76751c0b2f7Stbbdev             tag_func<T2>(T2(4)),
76851c0b2f7Stbbdev             tag_func<T3>(T3(5)),
76951c0b2f7Stbbdev             tag_func<T4>(T4(6)),
77051c0b2f7Stbbdev             tag_func<T5>(T5(7))
77151c0b2f7Stbbdev         );
77251c0b2f7Stbbdev         return temp;
77351c0b2f7Stbbdev     }
77451c0b2f7Stbbdev     static void destroy(JType *p) { delete p; }
77551c0b2f7Stbbdev };
77651c0b2f7Stbbdev #endif
77751c0b2f7Stbbdev 
77851c0b2f7Stbbdev #if MAX_TUPLE_TEST_SIZE >= 7
77951c0b2f7Stbbdev template<typename JType, typename K, typename KHash>
78051c0b2f7Stbbdev class makeJoin<7, JType, tbb::flow::key_matching<K, KHash> > {
78151c0b2f7Stbbdev     typedef typename JType::output_type TType;
78251c0b2f7Stbbdev     typedef typename std::tuple_element<0, TType>::type T0;
78351c0b2f7Stbbdev     typedef typename std::tuple_element<1, TType>::type T1;
78451c0b2f7Stbbdev     typedef typename std::tuple_element<2, TType>::type T2;
78551c0b2f7Stbbdev     typedef typename std::tuple_element<3, TType>::type T3;
78651c0b2f7Stbbdev     typedef typename std::tuple_element<4, TType>::type T4;
78751c0b2f7Stbbdev     typedef typename std::tuple_element<5, TType>::type T5;
78851c0b2f7Stbbdev     typedef typename std::tuple_element<6, TType>::type T6;
78951c0b2f7Stbbdev public:
79051c0b2f7Stbbdev     static JType *create(tbb::flow::graph& g) {
79151c0b2f7Stbbdev         JType *temp = new JType(g,
79251c0b2f7Stbbdev             my_struct_key<K, T0>(),
79351c0b2f7Stbbdev             my_struct_key<K, T1>(),
79451c0b2f7Stbbdev             my_struct_key<K, T2>(),
79551c0b2f7Stbbdev             my_struct_key<K, T3>(),
79651c0b2f7Stbbdev             my_struct_key<K, T4>(),
79751c0b2f7Stbbdev             my_struct_key<K, T5>(),
79851c0b2f7Stbbdev             my_struct_key<K, T6>()
79951c0b2f7Stbbdev         );
80051c0b2f7Stbbdev         return temp;
80151c0b2f7Stbbdev     }
80251c0b2f7Stbbdev     static void destroy(JType *p) { delete p; }
80351c0b2f7Stbbdev };
80451c0b2f7Stbbdev 
80551c0b2f7Stbbdev template<typename JType>
80651c0b2f7Stbbdev class makeJoin<7, JType, tbb::flow::tag_matching> {
80751c0b2f7Stbbdev     typedef typename JType::output_type TType;
80851c0b2f7Stbbdev     typedef typename std::tuple_element<0, TType>::type T0;
80951c0b2f7Stbbdev     typedef typename std::tuple_element<1, TType>::type T1;
81051c0b2f7Stbbdev     typedef typename std::tuple_element<2, TType>::type T2;
81151c0b2f7Stbbdev     typedef typename std::tuple_element<3, TType>::type T3;
81251c0b2f7Stbbdev     typedef typename std::tuple_element<4, TType>::type T4;
81351c0b2f7Stbbdev     typedef typename std::tuple_element<5, TType>::type T5;
81451c0b2f7Stbbdev     typedef typename std::tuple_element<6, TType>::type T6;
81551c0b2f7Stbbdev public:
81651c0b2f7Stbbdev     static JType *create(tbb::flow::graph& g) {
81751c0b2f7Stbbdev         JType *temp = new JType(g,
81851c0b2f7Stbbdev             tag_func<T0>(T0(2)),
81951c0b2f7Stbbdev             tag_func<T1>(T1(3)),
82051c0b2f7Stbbdev             tag_func<T2>(T2(4)),
82151c0b2f7Stbbdev             tag_func<T3>(T3(5)),
82251c0b2f7Stbbdev             tag_func<T4>(T4(6)),
82351c0b2f7Stbbdev             tag_func<T5>(T5(7)),
82451c0b2f7Stbbdev             tag_func<T6>(T6(8))
82551c0b2f7Stbbdev         );
82651c0b2f7Stbbdev         return temp;
82751c0b2f7Stbbdev     }
82851c0b2f7Stbbdev     static void destroy(JType *p) { delete p; }
82951c0b2f7Stbbdev };
83051c0b2f7Stbbdev #endif
83151c0b2f7Stbbdev 
83251c0b2f7Stbbdev #if MAX_TUPLE_TEST_SIZE >= 8
83351c0b2f7Stbbdev template<typename JType, typename K, typename KHash>
83451c0b2f7Stbbdev class makeJoin<8, JType, tbb::flow::key_matching<K, KHash> > {
83551c0b2f7Stbbdev     typedef typename JType::output_type TType;
83651c0b2f7Stbbdev     typedef typename std::tuple_element<0, TType>::type T0;
83751c0b2f7Stbbdev     typedef typename std::tuple_element<1, TType>::type T1;
83851c0b2f7Stbbdev     typedef typename std::tuple_element<2, TType>::type T2;
83951c0b2f7Stbbdev     typedef typename std::tuple_element<3, TType>::type T3;
84051c0b2f7Stbbdev     typedef typename std::tuple_element<4, TType>::type T4;
84151c0b2f7Stbbdev     typedef typename std::tuple_element<5, TType>::type T5;
84251c0b2f7Stbbdev     typedef typename std::tuple_element<6, TType>::type T6;
84351c0b2f7Stbbdev     typedef typename std::tuple_element<7, TType>::type T7;
84451c0b2f7Stbbdev public:
84551c0b2f7Stbbdev     static JType *create(tbb::flow::graph& g) {
84651c0b2f7Stbbdev         JType *temp = new JType(g,
84751c0b2f7Stbbdev             my_struct_key<K, T0>(),
84851c0b2f7Stbbdev             my_struct_key<K, T1>(),
84951c0b2f7Stbbdev             my_struct_key<K, T2>(),
85051c0b2f7Stbbdev             my_struct_key<K, T3>(),
85151c0b2f7Stbbdev             my_struct_key<K, T4>(),
85251c0b2f7Stbbdev             my_struct_key<K, T5>(),
85351c0b2f7Stbbdev             my_struct_key<K, T6>(),
85451c0b2f7Stbbdev             my_struct_key<K, T7>()
85551c0b2f7Stbbdev         );
85651c0b2f7Stbbdev         return temp;
85751c0b2f7Stbbdev     }
85851c0b2f7Stbbdev     static void destroy(JType *p) { delete p; }
85951c0b2f7Stbbdev };
86051c0b2f7Stbbdev 
86151c0b2f7Stbbdev template<typename JType>
86251c0b2f7Stbbdev class makeJoin<8, JType, tbb::flow::tag_matching> {
86351c0b2f7Stbbdev     typedef typename JType::output_type TType;
86451c0b2f7Stbbdev     typedef typename std::tuple_element<0, TType>::type T0;
86551c0b2f7Stbbdev     typedef typename std::tuple_element<1, TType>::type T1;
86651c0b2f7Stbbdev     typedef typename std::tuple_element<2, TType>::type T2;
86751c0b2f7Stbbdev     typedef typename std::tuple_element<3, TType>::type T3;
86851c0b2f7Stbbdev     typedef typename std::tuple_element<4, TType>::type T4;
86951c0b2f7Stbbdev     typedef typename std::tuple_element<5, TType>::type T5;
87051c0b2f7Stbbdev     typedef typename std::tuple_element<6, TType>::type T6;
87151c0b2f7Stbbdev     typedef typename std::tuple_element<7, TType>::type T7;
87251c0b2f7Stbbdev public:
87351c0b2f7Stbbdev     static JType *create(tbb::flow::graph& g) {
87451c0b2f7Stbbdev         JType *temp = new JType(g,
87551c0b2f7Stbbdev             tag_func<T0>(T0(2)),
87651c0b2f7Stbbdev             tag_func<T1>(T1(3)),
87751c0b2f7Stbbdev             tag_func<T2>(T2(4)),
87851c0b2f7Stbbdev             tag_func<T3>(T3(5)),
87951c0b2f7Stbbdev             tag_func<T4>(T4(6)),
88051c0b2f7Stbbdev             tag_func<T5>(T5(7)),
88151c0b2f7Stbbdev             tag_func<T6>(T6(8)),
88251c0b2f7Stbbdev             tag_func<T7>(T7(9))
88351c0b2f7Stbbdev         );
88451c0b2f7Stbbdev         return temp;
88551c0b2f7Stbbdev     }
88651c0b2f7Stbbdev     static void destroy(JType *p) { delete p; }
88751c0b2f7Stbbdev };
88851c0b2f7Stbbdev #endif
88951c0b2f7Stbbdev 
89051c0b2f7Stbbdev #if MAX_TUPLE_TEST_SIZE >= 9
89151c0b2f7Stbbdev template<typename JType, typename K, typename KHash>
89251c0b2f7Stbbdev class makeJoin<9, JType, tbb::flow::key_matching<K, KHash> > {
89351c0b2f7Stbbdev     typedef typename JType::output_type TType;
89451c0b2f7Stbbdev     typedef typename std::tuple_element<0, TType>::type T0;
89551c0b2f7Stbbdev     typedef typename std::tuple_element<1, TType>::type T1;
89651c0b2f7Stbbdev     typedef typename std::tuple_element<2, TType>::type T2;
89751c0b2f7Stbbdev     typedef typename std::tuple_element<3, TType>::type T3;
89851c0b2f7Stbbdev     typedef typename std::tuple_element<4, TType>::type T4;
89951c0b2f7Stbbdev     typedef typename std::tuple_element<5, TType>::type T5;
90051c0b2f7Stbbdev     typedef typename std::tuple_element<6, TType>::type T6;
90151c0b2f7Stbbdev     typedef typename std::tuple_element<7, TType>::type T7;
90251c0b2f7Stbbdev     typedef typename std::tuple_element<8, TType>::type T8;
90351c0b2f7Stbbdev public:
90451c0b2f7Stbbdev     static JType *create(tbb::flow::graph& g) {
90551c0b2f7Stbbdev         JType *temp = new JType(g,
90651c0b2f7Stbbdev             my_struct_key<K, T0>(),
90751c0b2f7Stbbdev             my_struct_key<K, T1>(),
90851c0b2f7Stbbdev             my_struct_key<K, T2>(),
90951c0b2f7Stbbdev             my_struct_key<K, T3>(),
91051c0b2f7Stbbdev             my_struct_key<K, T4>(),
91151c0b2f7Stbbdev             my_struct_key<K, T5>(),
91251c0b2f7Stbbdev             my_struct_key<K, T6>(),
91351c0b2f7Stbbdev             my_struct_key<K, T7>(),
91451c0b2f7Stbbdev             my_struct_key<K, T8>()
91551c0b2f7Stbbdev         );
91651c0b2f7Stbbdev         return temp;
91751c0b2f7Stbbdev     }
91851c0b2f7Stbbdev     static void destroy(JType *p) { delete p; }
91951c0b2f7Stbbdev };
92051c0b2f7Stbbdev 
92151c0b2f7Stbbdev template<typename JType>
92251c0b2f7Stbbdev class makeJoin<9, JType, tbb::flow::tag_matching> {
92351c0b2f7Stbbdev     typedef typename JType::output_type TType;
92451c0b2f7Stbbdev     typedef typename std::tuple_element<0, TType>::type T0;
92551c0b2f7Stbbdev     typedef typename std::tuple_element<1, TType>::type T1;
92651c0b2f7Stbbdev     typedef typename std::tuple_element<2, TType>::type T2;
92751c0b2f7Stbbdev     typedef typename std::tuple_element<3, TType>::type T3;
92851c0b2f7Stbbdev     typedef typename std::tuple_element<4, TType>::type T4;
92951c0b2f7Stbbdev     typedef typename std::tuple_element<5, TType>::type T5;
93051c0b2f7Stbbdev     typedef typename std::tuple_element<6, TType>::type T6;
93151c0b2f7Stbbdev     typedef typename std::tuple_element<7, TType>::type T7;
93251c0b2f7Stbbdev     typedef typename std::tuple_element<8, TType>::type T8;
93351c0b2f7Stbbdev public:
93451c0b2f7Stbbdev     static JType *create(tbb::flow::graph& g) {
93551c0b2f7Stbbdev         JType *temp = new JType(g,
93651c0b2f7Stbbdev             tag_func<T0>(T0(2)),
93751c0b2f7Stbbdev             tag_func<T1>(T1(3)),
93851c0b2f7Stbbdev             tag_func<T2>(T2(4)),
93951c0b2f7Stbbdev             tag_func<T3>(T3(5)),
94051c0b2f7Stbbdev             tag_func<T4>(T4(6)),
94151c0b2f7Stbbdev             tag_func<T5>(T5(7)),
94251c0b2f7Stbbdev             tag_func<T6>(T6(8)),
94351c0b2f7Stbbdev             tag_func<T7>(T7(9)),
94451c0b2f7Stbbdev             tag_func<T8>(T8(10))
94551c0b2f7Stbbdev         );
94651c0b2f7Stbbdev         return temp;
94751c0b2f7Stbbdev     }
94851c0b2f7Stbbdev     static void destroy(JType *p) { delete p; }
94951c0b2f7Stbbdev };
95051c0b2f7Stbbdev #endif
95151c0b2f7Stbbdev 
95251c0b2f7Stbbdev #if MAX_TUPLE_TEST_SIZE >= 10
95351c0b2f7Stbbdev template<typename JType, typename K, typename KHash>
95451c0b2f7Stbbdev class makeJoin<10, JType, tbb::flow::key_matching<K, KHash> > {
95551c0b2f7Stbbdev     typedef typename JType::output_type TType;
95651c0b2f7Stbbdev     typedef typename std::tuple_element<0, TType>::type T0;
95751c0b2f7Stbbdev     typedef typename std::tuple_element<1, TType>::type T1;
95851c0b2f7Stbbdev     typedef typename std::tuple_element<2, TType>::type T2;
95951c0b2f7Stbbdev     typedef typename std::tuple_element<3, TType>::type T3;
96051c0b2f7Stbbdev     typedef typename std::tuple_element<4, TType>::type T4;
96151c0b2f7Stbbdev     typedef typename std::tuple_element<5, TType>::type T5;
96251c0b2f7Stbbdev     typedef typename std::tuple_element<6, TType>::type T6;
96351c0b2f7Stbbdev     typedef typename std::tuple_element<7, TType>::type T7;
96451c0b2f7Stbbdev     typedef typename std::tuple_element<8, TType>::type T8;
96551c0b2f7Stbbdev     typedef typename std::tuple_element<9, TType>::type T9;
96651c0b2f7Stbbdev public:
96751c0b2f7Stbbdev     static JType *create(tbb::flow::graph& g) {
96851c0b2f7Stbbdev         JType *temp = new JType(g,
96951c0b2f7Stbbdev             my_struct_key<K, T0>(),
97051c0b2f7Stbbdev             my_struct_key<K, T1>(),
97151c0b2f7Stbbdev             my_struct_key<K, T2>(),
97251c0b2f7Stbbdev             my_struct_key<K, T3>(),
97351c0b2f7Stbbdev             my_struct_key<K, T4>(),
97451c0b2f7Stbbdev             my_struct_key<K, T5>(),
97551c0b2f7Stbbdev             my_struct_key<K, T6>(),
97651c0b2f7Stbbdev             my_struct_key<K, T7>(),
97751c0b2f7Stbbdev             my_struct_key<K, T8>(),
97851c0b2f7Stbbdev             my_struct_key<K, T9>()
97951c0b2f7Stbbdev         );
98051c0b2f7Stbbdev         return temp;
98151c0b2f7Stbbdev     }
98251c0b2f7Stbbdev     static void destroy(JType *p) { delete p; }
98351c0b2f7Stbbdev };
98451c0b2f7Stbbdev 
98551c0b2f7Stbbdev template<typename JType>
98651c0b2f7Stbbdev class makeJoin<10, JType, tbb::flow::tag_matching> {
98751c0b2f7Stbbdev     typedef typename JType::output_type TType;
98851c0b2f7Stbbdev     typedef typename std::tuple_element<0, TType>::type T0;
98951c0b2f7Stbbdev     typedef typename std::tuple_element<1, TType>::type T1;
99051c0b2f7Stbbdev     typedef typename std::tuple_element<2, TType>::type T2;
99151c0b2f7Stbbdev     typedef typename std::tuple_element<3, TType>::type T3;
99251c0b2f7Stbbdev     typedef typename std::tuple_element<4, TType>::type T4;
99351c0b2f7Stbbdev     typedef typename std::tuple_element<5, TType>::type T5;
99451c0b2f7Stbbdev     typedef typename std::tuple_element<6, TType>::type T6;
99551c0b2f7Stbbdev     typedef typename std::tuple_element<7, TType>::type T7;
99651c0b2f7Stbbdev     typedef typename std::tuple_element<8, TType>::type T8;
99751c0b2f7Stbbdev     typedef typename std::tuple_element<9, TType>::type T9;
99851c0b2f7Stbbdev public:
99951c0b2f7Stbbdev     static JType *create(tbb::flow::graph& g) {
100051c0b2f7Stbbdev         JType *temp = new JType(g,
100151c0b2f7Stbbdev             tag_func<T0>(T0(2)),
100251c0b2f7Stbbdev             tag_func<T1>(T1(3)),
100351c0b2f7Stbbdev             tag_func<T2>(T2(4)),
100451c0b2f7Stbbdev             tag_func<T3>(T3(5)),
100551c0b2f7Stbbdev             tag_func<T4>(T4(6)),
100651c0b2f7Stbbdev             tag_func<T5>(T5(7)),
100751c0b2f7Stbbdev             tag_func<T6>(T6(8)),
100851c0b2f7Stbbdev             tag_func<T7>(T7(9)),
100951c0b2f7Stbbdev             tag_func<T8>(T8(10)),
101051c0b2f7Stbbdev             tag_func<T9>(T9(11))
101151c0b2f7Stbbdev         );
101251c0b2f7Stbbdev         return temp;
101351c0b2f7Stbbdev     }
101451c0b2f7Stbbdev     static void destroy(JType *p) { delete p; }
101551c0b2f7Stbbdev };
101651c0b2f7Stbbdev #endif
101751c0b2f7Stbbdev 
101851c0b2f7Stbbdev // holder for input_node pointers for eventual deletion
101951c0b2f7Stbbdev 
102051c0b2f7Stbbdev static void* all_input_nodes[MaxPorts][MaxNInputs];
102151c0b2f7Stbbdev 
102251c0b2f7Stbbdev template<int ELEM, typename JNT>
102351c0b2f7Stbbdev class input_node_helper {
102451c0b2f7Stbbdev public:
102551c0b2f7Stbbdev     typedef JNT join_node_type;
102651c0b2f7Stbbdev     typedef tbb::flow::join_node<std::tuple<int, tbb::flow::continue_msg>, tbb::flow::reserving> input_join_type;
102751c0b2f7Stbbdev     typedef typename join_node_type::output_type TT;
102851c0b2f7Stbbdev 
102951c0b2f7Stbbdev     typedef typename std::tuple_element<ELEM-1, TT>::type IT;
103051c0b2f7Stbbdev     typedef typename tbb::flow::input_node<IT> my_input_node_type;
103151c0b2f7Stbbdev     typedef typename tbb::flow::function_node<std::tuple<int, tbb::flow::continue_msg>, IT> my_recirc_function_type;
103251c0b2f7Stbbdev     static void print_remark(const char * str) {
103351c0b2f7Stbbdev         input_node_helper<ELEM-1, JNT>::print_remark(str);
103451c0b2f7Stbbdev         INFO(", " << name_of<IT>::name());
103551c0b2f7Stbbdev     }
103651c0b2f7Stbbdev     static void add_input_nodes(join_node_type &my_join, tbb::flow::graph &g, int nInputs) {
103751c0b2f7Stbbdev         for(int i = 0; i < nInputs; ++i) {
103851c0b2f7Stbbdev             my_input_node_type *new_node = new my_input_node_type(g, my_input_body<IT, ELEM>(i, nInputs));
103951c0b2f7Stbbdev             tbb::flow::make_edge(*new_node, tbb::flow::input_port<ELEM-1>(my_join));
104051c0b2f7Stbbdev             all_input_nodes[ELEM-1][i] = (void *)new_node;
104151c0b2f7Stbbdev             new_node->activate();
104251c0b2f7Stbbdev         }
104351c0b2f7Stbbdev         // add the next input_node
104451c0b2f7Stbbdev         input_node_helper<ELEM-1, JNT>::add_input_nodes(my_join, g, nInputs);
104551c0b2f7Stbbdev     }
104651c0b2f7Stbbdev 
104751c0b2f7Stbbdev     static void add_recirc_func_nodes(join_node_type &my_join, input_join_type &my_input, tbb::flow::graph &g) {
104851c0b2f7Stbbdev         my_recirc_function_type *new_node = new my_recirc_function_type(g, tbb::flow::unlimited, recirc_func_body<IT>((IT)(ELEM+1)));
104951c0b2f7Stbbdev         tbb::flow::make_edge(*new_node, tbb::flow::input_port<ELEM-1>(my_join));
105051c0b2f7Stbbdev         tbb::flow::make_edge(my_input, *new_node);
105151c0b2f7Stbbdev         all_input_nodes[ELEM-1][0] = (void *)new_node;
105251c0b2f7Stbbdev         input_node_helper<ELEM-1, JNT>::add_recirc_func_nodes(my_join, my_input, g);
105351c0b2f7Stbbdev     }
105451c0b2f7Stbbdev 
105551c0b2f7Stbbdev     static void only_check_value(const int i, const TT &v) {
105651c0b2f7Stbbdev         CHECK_MESSAGE(std::get<ELEM-1>(v)==(IT)(i*(ELEM+1)), "");
105751c0b2f7Stbbdev         input_node_helper<ELEM-1, JNT>::only_check_value(i, v);
105851c0b2f7Stbbdev     }
105951c0b2f7Stbbdev 
106051c0b2f7Stbbdev     static void check_value(int i, TT &v, bool is_serial) {
106151c0b2f7Stbbdev         // the fetched value will match only if there is only one input_node.
106251c0b2f7Stbbdev         bool is_correct = !is_serial||std::get<ELEM-1>(v)==(IT)(i*(ELEM+1));
106351c0b2f7Stbbdev         CHECK_MESSAGE(is_correct, "");
106451c0b2f7Stbbdev         // tally the fetched value.
106551c0b2f7Stbbdev         int ival = (int)std::get<ELEM-1>(v);
106651c0b2f7Stbbdev         CHECK_MESSAGE(!(ival%(ELEM+1)), "");
106751c0b2f7Stbbdev         ival /= (ELEM+1);
106851c0b2f7Stbbdev         CHECK_MESSAGE(!outputCheck[ELEM-1][ival], "");
106951c0b2f7Stbbdev         outputCheck[ELEM-1][ival] = true;
107051c0b2f7Stbbdev         input_node_helper<ELEM-1, JNT>::check_value(i, v, is_serial);
107151c0b2f7Stbbdev     }
107251c0b2f7Stbbdev     static void remove_input_nodes(join_node_type& my_join, int nInputs) {
107351c0b2f7Stbbdev         for(int i = 0; i< nInputs; ++i) {
107451c0b2f7Stbbdev             my_input_node_type *dp = reinterpret_cast<my_input_node_type *>(all_input_nodes[ELEM-1][i]);
107551c0b2f7Stbbdev             tbb::flow::remove_edge(*dp, tbb::flow::input_port<ELEM-1>(my_join));
107651c0b2f7Stbbdev             delete dp;
107751c0b2f7Stbbdev         }
107851c0b2f7Stbbdev         input_node_helper<ELEM-1, JNT>::remove_input_nodes(my_join, nInputs);
107951c0b2f7Stbbdev     }
108051c0b2f7Stbbdev 
108151c0b2f7Stbbdev     static void remove_recirc_func_nodes(join_node_type& my_join, input_join_type &my_input) {
108251c0b2f7Stbbdev         my_recirc_function_type *fn = reinterpret_cast<my_recirc_function_type *>(all_input_nodes[ELEM-1][0]);
108351c0b2f7Stbbdev         tbb::flow::remove_edge(*fn, tbb::flow::input_port<ELEM-1>(my_join));
108451c0b2f7Stbbdev         tbb::flow::remove_edge(my_input, *fn);
108551c0b2f7Stbbdev         delete fn;
108651c0b2f7Stbbdev         input_node_helper<ELEM-1, JNT>::remove_recirc_func_nodes(my_join, my_input);
108751c0b2f7Stbbdev     }
108851c0b2f7Stbbdev };
108951c0b2f7Stbbdev 
109051c0b2f7Stbbdev template<typename JNT>
109151c0b2f7Stbbdev class input_node_helper<1, JNT> {
109251c0b2f7Stbbdev     typedef JNT join_node_type;
109351c0b2f7Stbbdev     typedef tbb::flow::join_node<std::tuple<int, tbb::flow::continue_msg>, tbb::flow::reserving> input_join_type;
109451c0b2f7Stbbdev     typedef typename join_node_type::output_type TT;
109551c0b2f7Stbbdev 
109651c0b2f7Stbbdev     typedef typename std::tuple_element<0, TT>::type IT;
109751c0b2f7Stbbdev     typedef typename tbb::flow::input_node<IT> my_input_node_type;
109851c0b2f7Stbbdev     typedef typename tbb::flow::function_node<std::tuple<int, tbb::flow::continue_msg>, IT> my_recirc_function_type;
109951c0b2f7Stbbdev public:
110051c0b2f7Stbbdev     static void print_remark(const char * str) {
110151c0b2f7Stbbdev         INFO(str << "< " << name_of<IT>::name());
110251c0b2f7Stbbdev     }
110351c0b2f7Stbbdev     static void add_input_nodes(join_node_type &my_join, tbb::flow::graph &g, int nInputs) {
110451c0b2f7Stbbdev         for(int i = 0; i < nInputs; ++i) {
110551c0b2f7Stbbdev             my_input_node_type *new_node = new my_input_node_type(g, my_input_body<IT, 1>(i, nInputs));
110651c0b2f7Stbbdev             tbb::flow::make_edge(*new_node, tbb::flow::input_port<0>(my_join));
110751c0b2f7Stbbdev             all_input_nodes[0][i] = (void *)new_node;
110851c0b2f7Stbbdev             new_node->activate();
110951c0b2f7Stbbdev         }
111051c0b2f7Stbbdev     }
111151c0b2f7Stbbdev 
111251c0b2f7Stbbdev     static void add_recirc_func_nodes(join_node_type &my_join, input_join_type &my_input, tbb::flow::graph &g) {
111351c0b2f7Stbbdev         my_recirc_function_type *new_node = new my_recirc_function_type(g, tbb::flow::unlimited, recirc_func_body<IT>((IT)(2)));
111451c0b2f7Stbbdev         tbb::flow::make_edge(*new_node, tbb::flow::input_port<0>(my_join));
111551c0b2f7Stbbdev         tbb::flow::make_edge(my_input, *new_node);
111651c0b2f7Stbbdev         all_input_nodes[0][0] = (void *)new_node;
111751c0b2f7Stbbdev     }
111851c0b2f7Stbbdev 
111951c0b2f7Stbbdev     static void only_check_value(const int i, const TT &v) {
112051c0b2f7Stbbdev         CHECK_MESSAGE(std::get<0>(v)==(IT)(i*2), "");
112151c0b2f7Stbbdev     }
112251c0b2f7Stbbdev 
112351c0b2f7Stbbdev     static void check_value(int i, TT &v, bool is_serial) {
112451c0b2f7Stbbdev         bool is_correct = !is_serial||std::get<0>(v)==(IT)(i*(2));
112551c0b2f7Stbbdev         CHECK_MESSAGE(is_correct, "");
112651c0b2f7Stbbdev         int ival = (int)std::get<0>(v);
112751c0b2f7Stbbdev         CHECK_MESSAGE(!(ival%2), "");
112851c0b2f7Stbbdev         ival /= 2;
112951c0b2f7Stbbdev         CHECK_MESSAGE(!outputCheck[0][ival], "");
113051c0b2f7Stbbdev         outputCheck[0][ival] = true;
113151c0b2f7Stbbdev     }
113251c0b2f7Stbbdev     static void remove_input_nodes(join_node_type& my_join, int nInputs) {
113351c0b2f7Stbbdev         for(int i = 0; i < nInputs; ++i) {
113451c0b2f7Stbbdev             my_input_node_type *dp = reinterpret_cast<my_input_node_type *>(all_input_nodes[0][i]);
113551c0b2f7Stbbdev             tbb::flow::remove_edge(*dp, tbb::flow::input_port<0>(my_join));
113651c0b2f7Stbbdev             delete dp;
113751c0b2f7Stbbdev         }
113851c0b2f7Stbbdev     }
113951c0b2f7Stbbdev 
114051c0b2f7Stbbdev     static void remove_recirc_func_nodes(join_node_type& my_join, input_join_type &my_input) {
114151c0b2f7Stbbdev         my_recirc_function_type *fn = reinterpret_cast<my_recirc_function_type *>(all_input_nodes[0][0]);
114251c0b2f7Stbbdev         tbb::flow::remove_edge(*fn, tbb::flow::input_port<0>(my_join));
114351c0b2f7Stbbdev         tbb::flow::remove_edge(my_input, *fn);
114451c0b2f7Stbbdev         delete fn;
114551c0b2f7Stbbdev     }
114651c0b2f7Stbbdev };
114751c0b2f7Stbbdev 
114851c0b2f7Stbbdev #if _MSC_VER && !defined(__INTEL_COMPILER)
114951c0b2f7Stbbdev // Suppress "conditional expression is constant" warning.
115051c0b2f7Stbbdev #pragma warning( push )
115151c0b2f7Stbbdev #pragma warning( disable: 4127 )
115251c0b2f7Stbbdev #endif
115351c0b2f7Stbbdev 
115451c0b2f7Stbbdev template<typename JType, class JP>
115551c0b2f7Stbbdev class parallel_test {
115651c0b2f7Stbbdev public:
115751c0b2f7Stbbdev     typedef typename JType::output_type TType;
115851c0b2f7Stbbdev     typedef typename is_key_matching_join<JP>::key_type            key_type;
115951c0b2f7Stbbdev     static void test() {
116051c0b2f7Stbbdev         const int TUPLE_SIZE = std::tuple_size<TType>::value;
116151c0b2f7Stbbdev         const bool is_key_matching = is_key_matching_join<JP>::value;
116251c0b2f7Stbbdev 
116351c0b2f7Stbbdev         TType v;
116451c0b2f7Stbbdev         input_node_helper<TUPLE_SIZE, JType>::print_remark("Parallel test of join_node");
116551c0b2f7Stbbdev         INFO(" > ");
116651c0b2f7Stbbdev         if(is_key_matching) {
116751c0b2f7Stbbdev             INFO("with K == " << name_of<typename K_deref<typename is_key_matching_join<JP>::key_type>::type >::name());
116851c0b2f7Stbbdev             if(std::is_reference<typename is_key_matching_join<JP>::key_type>::value) {
116951c0b2f7Stbbdev                 INFO("&");
117051c0b2f7Stbbdev             }
117151c0b2f7Stbbdev         }
117251c0b2f7Stbbdev         INFO("\n");
117351c0b2f7Stbbdev         for(int i = 0; i < MaxPorts; ++i) {
117451c0b2f7Stbbdev             for(int j = 0; j < MaxNInputs; ++j) {
117557f524caSIlya Isaev                 all_input_nodes[i][j] = nullptr;
117651c0b2f7Stbbdev             }
117751c0b2f7Stbbdev         }
117851c0b2f7Stbbdev         for(int nInputs = 1; nInputs<=MaxNInputs; ++nInputs) {
117951c0b2f7Stbbdev             tbb::flow::graph g;
118051c0b2f7Stbbdev             bool not_out_of_order = (nInputs==1)&&(!is_key_matching);
118151c0b2f7Stbbdev             JType* my_join = makeJoin<TUPLE_SIZE, JType, JP>::create(g);
118251c0b2f7Stbbdev             tbb::flow::queue_node<TType> outq1(g);
118351c0b2f7Stbbdev             tbb::flow::queue_node<TType> outq2(g);
118451c0b2f7Stbbdev 
118551c0b2f7Stbbdev             tbb::flow::make_edge(*my_join, outq1);
118651c0b2f7Stbbdev             tbb::flow::make_edge(*my_join, outq2);
118751c0b2f7Stbbdev 
118851c0b2f7Stbbdev             input_node_helper<TUPLE_SIZE, JType>::add_input_nodes((*my_join), g, nInputs);
118951c0b2f7Stbbdev 
119051c0b2f7Stbbdev             g.wait_for_all();
119151c0b2f7Stbbdev 
119251c0b2f7Stbbdev             reset_outputCheck(TUPLE_SIZE, Count);
119351c0b2f7Stbbdev             for(int i = 0; i < Count; ++i) {
119451c0b2f7Stbbdev                 CHECK_MESSAGE(outq1.try_get(v), "");
119551c0b2f7Stbbdev                 input_node_helper<TUPLE_SIZE, JType>::check_value(i, v, not_out_of_order);
119651c0b2f7Stbbdev             }
119751c0b2f7Stbbdev 
119851c0b2f7Stbbdev             check_outputCheck(TUPLE_SIZE, Count);
119951c0b2f7Stbbdev             reset_outputCheck(TUPLE_SIZE, Count);
120051c0b2f7Stbbdev 
120151c0b2f7Stbbdev             for(int i = 0; i < Count; i++) {
1202*5e91b2c0SVladislav Shchapov                 CHECK_MESSAGE(outq2.try_get(v), "");
120351c0b2f7Stbbdev                 input_node_helper<TUPLE_SIZE, JType>::check_value(i, v, not_out_of_order);
120451c0b2f7Stbbdev             }
120551c0b2f7Stbbdev             check_outputCheck(TUPLE_SIZE, Count);
120651c0b2f7Stbbdev 
120751c0b2f7Stbbdev             CHECK_MESSAGE(!outq1.try_get(v), "");
120851c0b2f7Stbbdev             CHECK_MESSAGE(!outq2.try_get(v), "");
120951c0b2f7Stbbdev 
121051c0b2f7Stbbdev             input_node_helper<TUPLE_SIZE, JType>::remove_input_nodes((*my_join), nInputs);
121151c0b2f7Stbbdev             tbb::flow::remove_edge(*my_join, outq1);
121251c0b2f7Stbbdev             tbb::flow::remove_edge(*my_join, outq2);
121351c0b2f7Stbbdev             makeJoin<TUPLE_SIZE, JType, JP>::destroy(my_join);
121451c0b2f7Stbbdev         }
121551c0b2f7Stbbdev     }
121651c0b2f7Stbbdev };
121751c0b2f7Stbbdev 
121851c0b2f7Stbbdev 
121951c0b2f7Stbbdev template<int ELEM, typename JType>
122051c0b2f7Stbbdev class serial_queue_helper {
122151c0b2f7Stbbdev public:
122251c0b2f7Stbbdev     typedef typename JType::output_type TT;
122351c0b2f7Stbbdev     typedef typename std::tuple_element<ELEM-1, TT>::type IT;
122451c0b2f7Stbbdev     typedef typename tbb::flow::queue_node<IT> my_queue_node_type;
122551c0b2f7Stbbdev     static void print_remark() {
122651c0b2f7Stbbdev         serial_queue_helper<ELEM-1, JType>::print_remark();
122751c0b2f7Stbbdev         INFO(", " << name_of<IT>::name());
122851c0b2f7Stbbdev     }
122951c0b2f7Stbbdev     static void add_queue_nodes(tbb::flow::graph &g, JType &my_join) {
123051c0b2f7Stbbdev         serial_queue_helper<ELEM-1, JType>::add_queue_nodes(g, my_join);
123151c0b2f7Stbbdev         my_queue_node_type *new_node = new my_queue_node_type(g);
123251c0b2f7Stbbdev         tbb::flow::make_edge(*new_node, std::get<ELEM-1>(my_join.input_ports()));
123351c0b2f7Stbbdev         all_input_nodes[ELEM-1][0] = (void *)new_node;
123451c0b2f7Stbbdev     }
123551c0b2f7Stbbdev 
123651c0b2f7Stbbdev     static void fill_one_queue(int maxVal) {
123751c0b2f7Stbbdev         // fill queue to "left" of me
123851c0b2f7Stbbdev         my_queue_node_type *qptr = reinterpret_cast<my_queue_node_type *>(all_input_nodes[ELEM-1][0]);
123951c0b2f7Stbbdev         serial_queue_helper<ELEM-1, JType>::fill_one_queue(maxVal);
124051c0b2f7Stbbdev         for(int i = 0; i < maxVal; ++i) {
124151c0b2f7Stbbdev             CHECK_MESSAGE(qptr->try_put(make_thingie<IT, ELEM>()(i)), "");
124251c0b2f7Stbbdev         }
124351c0b2f7Stbbdev     }
124451c0b2f7Stbbdev 
124551c0b2f7Stbbdev     static void put_one_queue_val(int myVal) {
124651c0b2f7Stbbdev         // put this val to my "left".
124751c0b2f7Stbbdev         serial_queue_helper<ELEM-1, JType>::put_one_queue_val(myVal);
124851c0b2f7Stbbdev         my_queue_node_type *qptr = reinterpret_cast<my_queue_node_type *>(all_input_nodes[ELEM-1][0]);
124951c0b2f7Stbbdev         CHECK_MESSAGE(qptr->try_put(make_thingie<IT, ELEM>()(myVal)), "");
125051c0b2f7Stbbdev     }
125151c0b2f7Stbbdev 
125251c0b2f7Stbbdev     static void check_queue_value(int i, TT &v) {
125351c0b2f7Stbbdev         serial_queue_helper<ELEM-1, JType>::check_queue_value(i, v);
125451c0b2f7Stbbdev         CHECK_MESSAGE(cast_from<IT>::my_int_val(std::get<ELEM-1>(v))==i * (ELEM+1), "");
125551c0b2f7Stbbdev     }
125651c0b2f7Stbbdev 
125751c0b2f7Stbbdev     static void remove_queue_nodes(JType &my_join) {
125851c0b2f7Stbbdev         my_queue_node_type *vptr = reinterpret_cast<my_queue_node_type *>(all_input_nodes[ELEM-1][0]);
125951c0b2f7Stbbdev         tbb::flow::remove_edge(*vptr, std::get<ELEM-1>(my_join.input_ports()));
126051c0b2f7Stbbdev         serial_queue_helper<ELEM-1, JType>::remove_queue_nodes(my_join);
126151c0b2f7Stbbdev         delete vptr;
126251c0b2f7Stbbdev     }
126351c0b2f7Stbbdev };
126451c0b2f7Stbbdev 
126551c0b2f7Stbbdev template<typename JType>
126651c0b2f7Stbbdev class serial_queue_helper<1, JType> {
126751c0b2f7Stbbdev public:
126851c0b2f7Stbbdev     typedef typename JType::output_type TT;
126951c0b2f7Stbbdev     typedef typename std::tuple_element<0, TT>::type IT;
127051c0b2f7Stbbdev     typedef typename tbb::flow::queue_node<IT> my_queue_node_type;
127151c0b2f7Stbbdev     static void print_remark() {
127251c0b2f7Stbbdev         INFO("Serial test of join_node< " << name_of<IT>::name());
127351c0b2f7Stbbdev     }
127451c0b2f7Stbbdev 
127551c0b2f7Stbbdev     static void add_queue_nodes(tbb::flow::graph &g, JType &my_join) {
127651c0b2f7Stbbdev         my_queue_node_type *new_node = new my_queue_node_type(g);
127751c0b2f7Stbbdev         tbb::flow::make_edge(*new_node, tbb::flow::input_port<0>(my_join));
127851c0b2f7Stbbdev         all_input_nodes[0][0] = (void *)new_node;
127951c0b2f7Stbbdev     }
128051c0b2f7Stbbdev 
128151c0b2f7Stbbdev     static void fill_one_queue(int maxVal) {
128251c0b2f7Stbbdev         my_queue_node_type *qptr = reinterpret_cast<my_queue_node_type *>(all_input_nodes[0][0]);
128351c0b2f7Stbbdev         for(int i = 0; i < maxVal; ++i) {
128451c0b2f7Stbbdev             CHECK_MESSAGE(qptr->try_put(make_thingie<IT, 1>()(i)), "");
128551c0b2f7Stbbdev         }
128651c0b2f7Stbbdev     }
128751c0b2f7Stbbdev 
128851c0b2f7Stbbdev     static void put_one_queue_val(int myVal) {
128951c0b2f7Stbbdev         my_queue_node_type *qptr = reinterpret_cast<my_queue_node_type *>(all_input_nodes[0][0]);
129051c0b2f7Stbbdev         IT my_val = make_thingie<IT, 1>()(myVal);
129151c0b2f7Stbbdev         CHECK_MESSAGE(qptr->try_put(my_val), "");
129251c0b2f7Stbbdev     }
129351c0b2f7Stbbdev 
129451c0b2f7Stbbdev     static void check_queue_value(int i, TT &v) {
129551c0b2f7Stbbdev         CHECK_MESSAGE(cast_from<IT>::my_int_val(std::get<0>(v))==i*2, "");
129651c0b2f7Stbbdev     }
129751c0b2f7Stbbdev 
129851c0b2f7Stbbdev     static void remove_queue_nodes(JType &my_join) {
129951c0b2f7Stbbdev         my_queue_node_type *vptr = reinterpret_cast<my_queue_node_type *>(all_input_nodes[0][0]);
130051c0b2f7Stbbdev         tbb::flow::remove_edge(*vptr, std::get<0>(my_join.input_ports()));
130151c0b2f7Stbbdev         delete vptr;
130251c0b2f7Stbbdev     }
130351c0b2f7Stbbdev };
130451c0b2f7Stbbdev 
130551c0b2f7Stbbdev //
130651c0b2f7Stbbdev // Single reservable predecessor at each port, single accepting and rejecting successor
130751c0b2f7Stbbdev //   * put to buffer before port0, then put to buffer before port1, ...
130851c0b2f7Stbbdev //   * fill buffer before port0 then fill buffer before port1, ...
130951c0b2f7Stbbdev 
131051c0b2f7Stbbdev template<typename JType, class JP>
131151c0b2f7Stbbdev void test_one_serial(JType &my_join, tbb::flow::graph &g) {
131251c0b2f7Stbbdev     typedef typename JType::output_type TType;
131351c0b2f7Stbbdev     static const int TUPLE_SIZE = std::tuple_size<TType>::value;
131451c0b2f7Stbbdev     bool is_key_matching = is_key_matching_join<JP>::value;
131551c0b2f7Stbbdev     std::vector<bool> flags;
131651c0b2f7Stbbdev     serial_queue_helper<TUPLE_SIZE, JType>::add_queue_nodes(g, my_join);
131751c0b2f7Stbbdev     typedef TType q3_input_type;
131851c0b2f7Stbbdev     tbb::flow::queue_node< q3_input_type > q3(g);
131951c0b2f7Stbbdev 
132051c0b2f7Stbbdev     tbb::flow::make_edge(my_join, q3);
132151c0b2f7Stbbdev 
132251c0b2f7Stbbdev     // fill each queue with its value one-at-a-time
132351c0b2f7Stbbdev     flags.clear();
132451c0b2f7Stbbdev     for(int i = 0; i < Count; ++i) {
132551c0b2f7Stbbdev         serial_queue_helper<TUPLE_SIZE, JType>::put_one_queue_val(i);
132651c0b2f7Stbbdev         flags.push_back(false);
132751c0b2f7Stbbdev     }
132851c0b2f7Stbbdev 
132951c0b2f7Stbbdev     g.wait_for_all();
133051c0b2f7Stbbdev     for(int i = 0; i < Count; ++i) {
133151c0b2f7Stbbdev         q3_input_type v;
133251c0b2f7Stbbdev         g.wait_for_all();
133351c0b2f7Stbbdev         CHECK_MESSAGE( (q3.try_get(v)), "Error in try_get()");
133451c0b2f7Stbbdev         if(is_key_matching) {
133551c0b2f7Stbbdev             // because we look up tags in the hash table, the output may be out of order.
133651c0b2f7Stbbdev             int j = int(std::get<0>(v))/2;  // figure what the index should be
133751c0b2f7Stbbdev             serial_queue_helper<TUPLE_SIZE, JType>::check_queue_value(j, v);
133851c0b2f7Stbbdev             flags[j] = true;
133951c0b2f7Stbbdev         }
134051c0b2f7Stbbdev         else {
134151c0b2f7Stbbdev             serial_queue_helper<TUPLE_SIZE, JType>::check_queue_value(i, v);
134251c0b2f7Stbbdev         }
134351c0b2f7Stbbdev     }
134451c0b2f7Stbbdev 
134551c0b2f7Stbbdev     if(is_key_matching) {
134651c0b2f7Stbbdev         for(int i = 0; i < Count; ++i) {
134751c0b2f7Stbbdev             CHECK_MESSAGE(flags[i], "");
134851c0b2f7Stbbdev             flags[i] = false;
134951c0b2f7Stbbdev         }
135051c0b2f7Stbbdev     }
135151c0b2f7Stbbdev 
135251c0b2f7Stbbdev     tbb::flow::remove_edge(my_join, q3);
135351c0b2f7Stbbdev     tbb::flow::limiter_node<q3_input_type> limiter(g, Count / 2);
135451c0b2f7Stbbdev     tbb::flow::make_edge(my_join, limiter);
135551c0b2f7Stbbdev     tbb::flow::make_edge(limiter, q3);
135651c0b2f7Stbbdev 
135751c0b2f7Stbbdev     // fill each queue completely before filling the next.
135851c0b2f7Stbbdev     serial_queue_helper<TUPLE_SIZE, JType>::fill_one_queue(Count);
135951c0b2f7Stbbdev 
136051c0b2f7Stbbdev     g.wait_for_all();
136151c0b2f7Stbbdev     for(int i = 0; i < Count / 2; ++i) {
136251c0b2f7Stbbdev         q3_input_type v;
136351c0b2f7Stbbdev         g.wait_for_all();
136451c0b2f7Stbbdev         CHECK_MESSAGE( (q3.try_get(v)), "Error in try_get()");
136551c0b2f7Stbbdev         if(is_key_matching) {
136651c0b2f7Stbbdev             int j = int(std::get<0>(v))/2;
136751c0b2f7Stbbdev             serial_queue_helper<TUPLE_SIZE, JType>::check_queue_value(j, v);
136851c0b2f7Stbbdev             flags[j] = true;
136951c0b2f7Stbbdev         }
137051c0b2f7Stbbdev         else {
137151c0b2f7Stbbdev             serial_queue_helper<TUPLE_SIZE, JType>::check_queue_value(i, v);
137251c0b2f7Stbbdev         }
137351c0b2f7Stbbdev     }
137451c0b2f7Stbbdev 
137551c0b2f7Stbbdev     if(is_key_matching) {
137651c0b2f7Stbbdev         CHECK(std::count(flags.begin(), flags.end(), true) == Count / 2);
137751c0b2f7Stbbdev     }
137851c0b2f7Stbbdev 
137951c0b2f7Stbbdev     serial_queue_helper<TUPLE_SIZE, JType>::remove_queue_nodes(my_join);
138051c0b2f7Stbbdev }
138151c0b2f7Stbbdev 
138251c0b2f7Stbbdev template<typename JType, class JP>
138351c0b2f7Stbbdev class serial_test {
138451c0b2f7Stbbdev     typedef typename JType::output_type TType;
138551c0b2f7Stbbdev public:
138651c0b2f7Stbbdev     static void test() {
138751c0b2f7Stbbdev         tbb::flow::graph g;
138851c0b2f7Stbbdev         std::vector<bool> flags;
138951c0b2f7Stbbdev         bool is_key_matching = is_key_matching_join<JP>::value;
139051c0b2f7Stbbdev         flags.reserve(Count);
139151c0b2f7Stbbdev 
139251c0b2f7Stbbdev         const int TUPLE_SIZE = std::tuple_size<TType>::value;
139351c0b2f7Stbbdev         static const int ELEMS = 3;
139451c0b2f7Stbbdev 
139551c0b2f7Stbbdev         JType* my_join = makeJoin<TUPLE_SIZE, JType, JP>::create(g);
139651c0b2f7Stbbdev         test_input_ports_return_ref(*my_join);
139751c0b2f7Stbbdev         serial_queue_helper<TUPLE_SIZE, JType>::print_remark(); INFO(" >");
139851c0b2f7Stbbdev         if(is_key_matching) {
139951c0b2f7Stbbdev             INFO("with K == " << name_of<typename K_deref<typename is_key_matching_join<JP>::key_type>::type >::name());
140051c0b2f7Stbbdev             if(std::is_reference<typename is_key_matching_join<JP>::key_type>::value) {
140151c0b2f7Stbbdev                 INFO("&");
140251c0b2f7Stbbdev             }
140351c0b2f7Stbbdev         }
140451c0b2f7Stbbdev         INFO("\n");
140551c0b2f7Stbbdev 
140651c0b2f7Stbbdev         test_one_serial<JType, JP>(*my_join, g);
140751c0b2f7Stbbdev         // build the vector with copy construction from the used join node.
140851c0b2f7Stbbdev         std::vector<JType>join_vector(ELEMS, *my_join);
140951c0b2f7Stbbdev         // destroy the tired old join_node in case we're accidentally reusing pieces of it.
141051c0b2f7Stbbdev         makeJoin<TUPLE_SIZE, JType, JP>::destroy(my_join);
141151c0b2f7Stbbdev 
141251c0b2f7Stbbdev         for(int e = 0; e < ELEMS; ++e) {  // exercise each of the vector elements
141351c0b2f7Stbbdev             test_one_serial<JType, JP>(join_vector[e], g);
141451c0b2f7Stbbdev         }
141551c0b2f7Stbbdev     }
141651c0b2f7Stbbdev 
141751c0b2f7Stbbdev }; // serial_test
141851c0b2f7Stbbdev 
141951c0b2f7Stbbdev #if _MSC_VER && !defined(__INTEL_COMPILER)
142051c0b2f7Stbbdev #pragma warning( pop )
142151c0b2f7Stbbdev #endif
142251c0b2f7Stbbdev 
142351c0b2f7Stbbdev template<
142451c0b2f7Stbbdev     template<typename, class > class TestType,  // serial_test or parallel_test
142551c0b2f7Stbbdev     typename OutputTupleType,           // type of the output of the join
142651c0b2f7Stbbdev     class J>                 // graph_buffer_policy (reserving, queueing, tag_matching or key_matching)
142751c0b2f7Stbbdev     class generate_test {
142851c0b2f7Stbbdev     public:
142951c0b2f7Stbbdev         typedef tbb::flow::join_node<OutputTupleType, typename filter_out_message_based_key_matching<J>::policy> join_node_type;
143051c0b2f7Stbbdev         static void do_test() {
143151c0b2f7Stbbdev             TestType<join_node_type, J>::test();
143251c0b2f7Stbbdev         }
143351c0b2f7Stbbdev };
143451c0b2f7Stbbdev 
143551c0b2f7Stbbdev template<class JP>
143651c0b2f7Stbbdev void test_input_port_policies();
143751c0b2f7Stbbdev 
143851c0b2f7Stbbdev // join_node (reserving) does not consume inputs until an item is available at
143951c0b2f7Stbbdev // every input.  It tries to reserve each input, and if any fails it releases the
144051c0b2f7Stbbdev // reservation.  When it builds a tuple it broadcasts to all its successors and
144151c0b2f7Stbbdev // consumes all the inputs.
144251c0b2f7Stbbdev //
144351c0b2f7Stbbdev // So our test will put an item at one input port, then attach another node to the
144451c0b2f7Stbbdev // same node (a queue node in this case).  The second successor should receive the
144551c0b2f7Stbbdev // item in the queue, emptying it.
144651c0b2f7Stbbdev //
144751c0b2f7Stbbdev // We then place an item in the second input queue, and check the output queues; they
144851c0b2f7Stbbdev // should still be empty.  Then we place an item in the first queue; the output queues
144951c0b2f7Stbbdev // should then receive a tuple.
145051c0b2f7Stbbdev //
145151c0b2f7Stbbdev // we then attach another function node to the second input.  It should not receive
145251c0b2f7Stbbdev // an item, verifying that the item in the queue is consumed.
145351c0b2f7Stbbdev template<>
145451c0b2f7Stbbdev void test_input_port_policies<tbb::flow::reserving>() {
145551c0b2f7Stbbdev     tbb::flow::graph g;
145651c0b2f7Stbbdev     typedef tbb::flow::join_node<std::tuple<int, int>, tbb::flow::reserving > JType; // two-phase is the default policy
145751c0b2f7Stbbdev                                                                                            // create join_node<type0,type1> jn
145851c0b2f7Stbbdev     JType jn(g);
145951c0b2f7Stbbdev     // create output_queue oq0, oq1
146051c0b2f7Stbbdev     typedef JType::output_type OQType;
146151c0b2f7Stbbdev     tbb::flow::queue_node<OQType> oq0(g);
146251c0b2f7Stbbdev     tbb::flow::queue_node<OQType> oq1(g);
146351c0b2f7Stbbdev     // create iq0, iq1
146451c0b2f7Stbbdev     typedef tbb::flow::queue_node<int> IQType;
146551c0b2f7Stbbdev     IQType iq0(g);
146651c0b2f7Stbbdev     IQType iq1(g);
146751c0b2f7Stbbdev     // create qnp, qnq
146851c0b2f7Stbbdev     IQType qnp(g);
146951c0b2f7Stbbdev     IQType qnq(g);
147051c0b2f7Stbbdev     INFO("Testing policies of join_node<reserving> input ports\n");
147151c0b2f7Stbbdev     // attach jn to oq0, oq1
147251c0b2f7Stbbdev     tbb::flow::make_edge(jn, oq0);
147351c0b2f7Stbbdev     tbb::flow::make_edge(jn, oq1);
147451c0b2f7Stbbdev 
147551c0b2f7Stbbdev     // attach iq0, iq1 to jn
147651c0b2f7Stbbdev     tbb::flow::make_edge(iq0, std::get<0>(jn.input_ports()));
147751c0b2f7Stbbdev     tbb::flow::make_edge(iq1, std::get<1>(jn.input_ports()));
147851c0b2f7Stbbdev 
147951c0b2f7Stbbdev     for(int loop = 0; loop < 3; ++loop) {
148051c0b2f7Stbbdev         // place one item in iq0
148151c0b2f7Stbbdev         CHECK_MESSAGE( (iq0.try_put(1)), "Error putting to iq1");
148251c0b2f7Stbbdev         // attach iq0 to qnp
148351c0b2f7Stbbdev         tbb::flow::make_edge(iq0, qnp);
148451c0b2f7Stbbdev         // qnp should have an item in it.
148551c0b2f7Stbbdev         g.wait_for_all();
148651c0b2f7Stbbdev         {
148751c0b2f7Stbbdev             int i;
148851c0b2f7Stbbdev             CHECK_MESSAGE( (qnp.try_get(i)&&i==1), "Error in item fetched by qnp");
148951c0b2f7Stbbdev         }
149051c0b2f7Stbbdev         // place item in iq1
149151c0b2f7Stbbdev         CHECK_MESSAGE( (iq1.try_put(2)), "Error putting to iq1");
149251c0b2f7Stbbdev         // oq0, oq1 should be empty
149351c0b2f7Stbbdev         g.wait_for_all();
149451c0b2f7Stbbdev         {
149551c0b2f7Stbbdev             OQType t1;
149651c0b2f7Stbbdev             CHECK_MESSAGE( (!oq0.try_get(t1)&&!oq1.try_get(t1)), "oq0 and oq1 not empty");
149751c0b2f7Stbbdev         }
149851c0b2f7Stbbdev         // detach qnp from iq0
149951c0b2f7Stbbdev         tbb::flow::remove_edge(iq0, qnp); // if we don't remove qnp it will gobble any values we put in iq0
150051c0b2f7Stbbdev                                           // place item in iq0
150151c0b2f7Stbbdev         CHECK_MESSAGE( (iq0.try_put(3)), "Error on second put to iq0");
150251c0b2f7Stbbdev         // oq0, oq1 should have items in them
150351c0b2f7Stbbdev         g.wait_for_all();
150451c0b2f7Stbbdev         {
150551c0b2f7Stbbdev             OQType t0;
150651c0b2f7Stbbdev             OQType t1;
150751c0b2f7Stbbdev             CHECK_MESSAGE( (oq0.try_get(t0)&&std::get<0>(t0)==3&&std::get<1>(t0)==2), "Error in oq0 output");
150851c0b2f7Stbbdev             CHECK_MESSAGE( (oq1.try_get(t1)&&std::get<0>(t1)==3&&std::get<1>(t1)==2), "Error in oq1 output");
150951c0b2f7Stbbdev         }
151051c0b2f7Stbbdev         // attach qnp to iq0, qnq to iq1
151151c0b2f7Stbbdev         // qnp and qnq should be empty
151251c0b2f7Stbbdev         tbb::flow::make_edge(iq0, qnp);
151351c0b2f7Stbbdev         tbb::flow::make_edge(iq1, qnq);
151451c0b2f7Stbbdev         g.wait_for_all();
151551c0b2f7Stbbdev         {
151651c0b2f7Stbbdev             int i;
151751c0b2f7Stbbdev             CHECK_MESSAGE( (!qnp.try_get(i)), "iq0 still had value in it");
151851c0b2f7Stbbdev             CHECK_MESSAGE( (!qnq.try_get(i)), "iq1 still had value in it");
151951c0b2f7Stbbdev         }
152051c0b2f7Stbbdev         tbb::flow::remove_edge(iq0, qnp);
152151c0b2f7Stbbdev         tbb::flow::remove_edge(iq1, qnq);
152251c0b2f7Stbbdev     } // for ( int loop ...
152351c0b2f7Stbbdev }
152451c0b2f7Stbbdev 
152551c0b2f7Stbbdev // join_node (queueing) consumes inputs as soon as they are available at
152651c0b2f7Stbbdev // any input.  When it builds a tuple it broadcasts to all its successors and
152751c0b2f7Stbbdev // discards the broadcast values.
152851c0b2f7Stbbdev //
152951c0b2f7Stbbdev // So our test will put an item at one input port, then attach another node to the
153051c0b2f7Stbbdev // same node (a queue node in this case).  The second successor should not receive
153151c0b2f7Stbbdev // an item (because the join consumed it).
153251c0b2f7Stbbdev //
153351c0b2f7Stbbdev // We then place an item in the second input queue, and check the output queues; they
153451c0b2f7Stbbdev // should each have a tuple.
153551c0b2f7Stbbdev //
153651c0b2f7Stbbdev // we then attach another function node to the second input.  It should not receive
153751c0b2f7Stbbdev // an item, verifying that the item in the queue is consumed.
153851c0b2f7Stbbdev template<>
153951c0b2f7Stbbdev void test_input_port_policies<tbb::flow::queueing>() {
154051c0b2f7Stbbdev     tbb::flow::graph g;
154151c0b2f7Stbbdev     typedef tbb::flow::join_node<std::tuple<int, int>, tbb::flow::queueing > JType;
154251c0b2f7Stbbdev     // create join_node<type0,type1> jn
154351c0b2f7Stbbdev     JType jn(g);
154451c0b2f7Stbbdev     // create output_queue oq0, oq1
154551c0b2f7Stbbdev     typedef JType::output_type OQType;
154651c0b2f7Stbbdev     tbb::flow::queue_node<OQType> oq0(g);
154751c0b2f7Stbbdev     tbb::flow::queue_node<OQType> oq1(g);
154851c0b2f7Stbbdev     // create iq0, iq1
154951c0b2f7Stbbdev     typedef tbb::flow::queue_node<int> IQType;
155051c0b2f7Stbbdev     IQType iq0(g);
155151c0b2f7Stbbdev     IQType iq1(g);
155251c0b2f7Stbbdev     // create qnp, qnq
155351c0b2f7Stbbdev     IQType qnp(g);
155451c0b2f7Stbbdev     IQType qnq(g);
155551c0b2f7Stbbdev     INFO("Testing policies of join_node<queueing> input ports\n");
155651c0b2f7Stbbdev     // attach jn to oq0, oq1
155751c0b2f7Stbbdev     tbb::flow::make_edge(jn, oq0);
155851c0b2f7Stbbdev     tbb::flow::make_edge(jn, oq1);
155951c0b2f7Stbbdev 
156051c0b2f7Stbbdev     // attach iq0, iq1 to jn
156151c0b2f7Stbbdev     tbb::flow::make_edge(iq0, std::get<0>(jn.input_ports()));
156251c0b2f7Stbbdev     tbb::flow::make_edge(iq1, std::get<1>(jn.input_ports()));
156351c0b2f7Stbbdev 
156451c0b2f7Stbbdev     for(int loop = 0; loop < 3; ++loop) {
156551c0b2f7Stbbdev         // place one item in iq0
156651c0b2f7Stbbdev         CHECK_MESSAGE( (iq0.try_put(1)), "Error putting to iq1");
156751c0b2f7Stbbdev         // attach iq0 to qnp
156851c0b2f7Stbbdev         tbb::flow::make_edge(iq0, qnp);
156951c0b2f7Stbbdev         // qnp should have an item in it.
157051c0b2f7Stbbdev         g.wait_for_all();
157151c0b2f7Stbbdev         {
157251c0b2f7Stbbdev             int i;
157351c0b2f7Stbbdev             CHECK_MESSAGE( (!qnp.try_get(i)), "Item was received by qnp");
157451c0b2f7Stbbdev         }
157551c0b2f7Stbbdev         // place item in iq1
157651c0b2f7Stbbdev         CHECK_MESSAGE( (iq1.try_put(2)), "Error putting to iq1");
157751c0b2f7Stbbdev         // oq0, oq1 should have items
157851c0b2f7Stbbdev         g.wait_for_all();
157951c0b2f7Stbbdev         {
158051c0b2f7Stbbdev             OQType t0;
158151c0b2f7Stbbdev             OQType t1;
158251c0b2f7Stbbdev             CHECK_MESSAGE( (oq0.try_get(t0)&&std::get<0>(t0)==1&&std::get<1>(t0)==2), "Error in oq0 output");
158351c0b2f7Stbbdev             CHECK_MESSAGE( (oq1.try_get(t1)&&std::get<0>(t1)==1&&std::get<1>(t1)==2), "Error in oq1 output");
158451c0b2f7Stbbdev         }
158551c0b2f7Stbbdev         // attach qnq to iq1
158651c0b2f7Stbbdev         // qnp and qnq should be empty
158751c0b2f7Stbbdev         tbb::flow::make_edge(iq1, qnq);
158851c0b2f7Stbbdev         g.wait_for_all();
158951c0b2f7Stbbdev         {
159051c0b2f7Stbbdev             int i;
159151c0b2f7Stbbdev             CHECK_MESSAGE( (!qnp.try_get(i)), "iq0 still had value in it");
159251c0b2f7Stbbdev             CHECK_MESSAGE( (!qnq.try_get(i)), "iq1 still had value in it");
159351c0b2f7Stbbdev         }
159451c0b2f7Stbbdev         tbb::flow::remove_edge(iq0, qnp);
159551c0b2f7Stbbdev         tbb::flow::remove_edge(iq1, qnq);
159651c0b2f7Stbbdev     } // for ( int loop ...
159751c0b2f7Stbbdev }
159851c0b2f7Stbbdev 
159951c0b2f7Stbbdev template<typename T>
160051c0b2f7Stbbdev struct myTagValue {
160151c0b2f7Stbbdev     tbb::flow::tag_value operator()(T i) { return tbb::flow::tag_value(i); }
160251c0b2f7Stbbdev };
160351c0b2f7Stbbdev 
160451c0b2f7Stbbdev template<>
160551c0b2f7Stbbdev struct myTagValue<CheckType<int> > {
160651c0b2f7Stbbdev     tbb::flow::tag_value operator()(CheckType<int> i) { return tbb::flow::tag_value((int)i); }
160751c0b2f7Stbbdev };
160851c0b2f7Stbbdev 
160951c0b2f7Stbbdev // join_node (tag_matching) consumes inputs as soon as they are available at
161051c0b2f7Stbbdev // any input.  When it builds a tuple it broadcasts to all its successors and
161151c0b2f7Stbbdev // discards the broadcast values.
161251c0b2f7Stbbdev //
161351c0b2f7Stbbdev // It chooses the tuple it broadcasts by matching the tag values returned by the
161451c0b2f7Stbbdev // methods given the constructor of the join, in this case the method just casts
161551c0b2f7Stbbdev // the value in each port to tag_value.
161651c0b2f7Stbbdev //
161751c0b2f7Stbbdev // So our test will put an item at one input port, then attach another node to the
161851c0b2f7Stbbdev // same node (a queue node in this case).  The second successor should not receive
161951c0b2f7Stbbdev // an item (because the join consumed it).
162051c0b2f7Stbbdev //
162151c0b2f7Stbbdev // We then place an item in the second input queue, and check the output queues; they
162251c0b2f7Stbbdev // should each have a tuple.
162351c0b2f7Stbbdev //
162451c0b2f7Stbbdev // we then attach another queue node to the second input.  It should not receive
162551c0b2f7Stbbdev // an item, verifying that the item in the queue is consumed.
162651c0b2f7Stbbdev //
162751c0b2f7Stbbdev // We will then exercise the join with a bunch of values, and the output order should
162851c0b2f7Stbbdev // be determined by the order we insert items into the second queue.  (Each tuple set
162951c0b2f7Stbbdev // corresponding to a tag will be complete when the second item is inserted.)
163051c0b2f7Stbbdev template<>
163151c0b2f7Stbbdev void test_input_port_policies<tbb::flow::tag_matching>() {
163251c0b2f7Stbbdev     tbb::flow::graph g;
163351c0b2f7Stbbdev     typedef tbb::flow::join_node<std::tuple<int, CheckType<int> >, tbb::flow::tag_matching > JoinNodeType;
163451c0b2f7Stbbdev     typedef JoinNodeType::output_type CheckTupleType;
163551c0b2f7Stbbdev     JoinNodeType testJoinNode(g, myTagValue<int>(), myTagValue<CheckType<int> >());
163651c0b2f7Stbbdev     tbb::flow::queue_node<CheckTupleType> checkTupleQueue0(g);
163751c0b2f7Stbbdev     tbb::flow::queue_node<CheckTupleType> checkTupleQueue1(g);
163851c0b2f7Stbbdev     {
163951c0b2f7Stbbdev         Checker<CheckType<int> > my_check;
164051c0b2f7Stbbdev 
164151c0b2f7Stbbdev 
164251c0b2f7Stbbdev         typedef tbb::flow::queue_node<int> IntQueueType;
164351c0b2f7Stbbdev         typedef tbb::flow::queue_node<CheckType<int> > CheckQueueType;
164451c0b2f7Stbbdev         IntQueueType intInputQueue(g);
164551c0b2f7Stbbdev         CheckQueueType checkInputQueue(g);
164651c0b2f7Stbbdev         IntQueueType intEmptyTestQueue(g);
164751c0b2f7Stbbdev         CheckQueueType checkEmptyTestQueue(g);
164851c0b2f7Stbbdev         INFO("Testing policies of join_node<tag_matching> input ports\n");
164951c0b2f7Stbbdev         // attach testJoinNode to checkTupleQueue0, checkTupleQueue1
165051c0b2f7Stbbdev         tbb::flow::make_edge(testJoinNode, checkTupleQueue0);
165151c0b2f7Stbbdev         tbb::flow::make_edge(testJoinNode, checkTupleQueue1);
165251c0b2f7Stbbdev 
165351c0b2f7Stbbdev         // attach intInputQueue, checkInputQueue to testJoinNode
165451c0b2f7Stbbdev         tbb::flow::make_edge(intInputQueue, tbb::flow::input_port<0>(testJoinNode));
165551c0b2f7Stbbdev         tbb::flow::make_edge(checkInputQueue, tbb::flow::input_port<1>(testJoinNode));
165651c0b2f7Stbbdev 
165751c0b2f7Stbbdev         // we'll put four discrete values in the inputs to the join_node.  Each
165851c0b2f7Stbbdev         // set of inputs should result in one output.
165951c0b2f7Stbbdev         for(int loop = 0; loop < 4; ++loop) {
166051c0b2f7Stbbdev             // place one item in intInputQueue
166151c0b2f7Stbbdev             CHECK_MESSAGE( (intInputQueue.try_put(loop)), "Error putting to intInputQueue");
166251c0b2f7Stbbdev             // attach intInputQueue to intEmptyTestQueue
166351c0b2f7Stbbdev             tbb::flow::make_edge(intInputQueue, intEmptyTestQueue);
166451c0b2f7Stbbdev             // intEmptyTestQueue should not have an item in it.  (the join consumed it.)
166551c0b2f7Stbbdev             g.wait_for_all();
166651c0b2f7Stbbdev             {
166751c0b2f7Stbbdev                 int intVal0;
166851c0b2f7Stbbdev                 CHECK_MESSAGE( (!intEmptyTestQueue.try_get(intVal0)), "Item was received by intEmptyTestQueue");
166951c0b2f7Stbbdev             }
167051c0b2f7Stbbdev             // place item in checkInputQueue
167151c0b2f7Stbbdev             CheckType<int> checkVal0(loop);
167251c0b2f7Stbbdev             CHECK_MESSAGE( (checkInputQueue.try_put(checkVal0)), "Error putting to checkInputQueue");
167351c0b2f7Stbbdev             // checkTupleQueue0, checkTupleQueue1 should have items
167451c0b2f7Stbbdev             g.wait_for_all();
167551c0b2f7Stbbdev             {
167651c0b2f7Stbbdev                 CheckTupleType t0;
167751c0b2f7Stbbdev                 CheckTupleType t1;
167851c0b2f7Stbbdev                 CHECK_MESSAGE( (checkTupleQueue0.try_get(t0)&&std::get<0>(t0)==loop&&(int)std::get<1>(t0)==loop), "Error in checkTupleQueue0 output");
167951c0b2f7Stbbdev                 CHECK_MESSAGE( (checkTupleQueue1.try_get(t1)&&std::get<0>(t1)==loop&&(int)std::get<1>(t1)==loop), "Error in checkTupleQueue1 output");
168051c0b2f7Stbbdev                 CHECK_MESSAGE( (!checkTupleQueue0.try_get(t0)), "extra object in output queue checkTupleQueue0");
168151c0b2f7Stbbdev                 CHECK_MESSAGE( (!checkTupleQueue1.try_get(t0)), "extra object in output queue checkTupleQueue1");
168251c0b2f7Stbbdev             }
168351c0b2f7Stbbdev             // attach checkEmptyTestQueue to checkInputQueue
168451c0b2f7Stbbdev             // intEmptyTestQueue and checkEmptyTestQueue should be empty
168551c0b2f7Stbbdev             tbb::flow::make_edge(checkInputQueue, checkEmptyTestQueue);
168651c0b2f7Stbbdev             g.wait_for_all();
168751c0b2f7Stbbdev             {
168851c0b2f7Stbbdev                 int intVal1;
168951c0b2f7Stbbdev                 CheckType<int> checkVal1;
169051c0b2f7Stbbdev                 CHECK_MESSAGE( (!intEmptyTestQueue.try_get(intVal1)), "intInputQueue still had value in it");
169151c0b2f7Stbbdev                 CHECK_MESSAGE( (!checkEmptyTestQueue.try_get(checkVal1)), "checkInputQueue still had value in it");
169251c0b2f7Stbbdev             }
169351c0b2f7Stbbdev             tbb::flow::remove_edge(intInputQueue, intEmptyTestQueue);
169451c0b2f7Stbbdev             tbb::flow::remove_edge(checkInputQueue, checkEmptyTestQueue);
169551c0b2f7Stbbdev         } // for ( int loop ...
169651c0b2f7Stbbdev 
169751c0b2f7Stbbdev           // Now we'll put [4 .. nValues - 1] in intInputQueue, and then put [4 .. nValues - 1] in checkInputQueue in
169851c0b2f7Stbbdev           // a different order.  We should see tuples in the output queues in the order we inserted
169951c0b2f7Stbbdev           // the integers into checkInputQueue.
170051c0b2f7Stbbdev         const int nValues = 100;
170151c0b2f7Stbbdev         const int nIncr = 31;  // relatively prime to nValues
170251c0b2f7Stbbdev 
170351c0b2f7Stbbdev         for(int loop = 4; loop < 4+nValues; ++loop) {
170451c0b2f7Stbbdev             // place one item in intInputQueue
170551c0b2f7Stbbdev             CHECK_MESSAGE( (intInputQueue.try_put(loop)), "Error putting to intInputQueue");
170651c0b2f7Stbbdev             g.wait_for_all();
170751c0b2f7Stbbdev             {
170851c0b2f7Stbbdev                 CheckTupleType t3;
170951c0b2f7Stbbdev                 CHECK_MESSAGE( (!checkTupleQueue0.try_get(t3)), "Object in output queue");
171051c0b2f7Stbbdev                 CHECK_MESSAGE( (!checkTupleQueue1.try_get(t3)), "Object in output queue");
171151c0b2f7Stbbdev             }
171251c0b2f7Stbbdev         } // for ( int loop ...
171351c0b2f7Stbbdev 
171451c0b2f7Stbbdev         for(int loop = 1; loop<=nValues; ++loop) {
171551c0b2f7Stbbdev             int lp1 = 4+(loop * nIncr)%nValues;
171651c0b2f7Stbbdev             // place item in checkInputQueue
171751c0b2f7Stbbdev             CHECK_MESSAGE( (checkInputQueue.try_put(lp1)), "Error putting to checkInputQueue");
171851c0b2f7Stbbdev             // checkTupleQueue0, checkTupleQueue1 should have items
171951c0b2f7Stbbdev             g.wait_for_all();
172051c0b2f7Stbbdev             {
172151c0b2f7Stbbdev                 CheckTupleType t0;
172251c0b2f7Stbbdev                 CheckTupleType t1;
172351c0b2f7Stbbdev                 CHECK_MESSAGE( (checkTupleQueue0.try_get(t0)&&std::get<0>(t0)==lp1 && std::get<1>(t0)==lp1), "Error in checkTupleQueue0 output");
172451c0b2f7Stbbdev                 CHECK_MESSAGE( (checkTupleQueue1.try_get(t1)&&std::get<0>(t1)==lp1 && std::get<1>(t1)==lp1), "Error in checkTupleQueue1 output");
172551c0b2f7Stbbdev                 CHECK_MESSAGE( (!checkTupleQueue0.try_get(t0)), "extra object in output queue checkTupleQueue0");
172651c0b2f7Stbbdev                 CHECK_MESSAGE( (!checkTupleQueue1.try_get(t0)), "extra object in output queue checkTupleQueue1");
172751c0b2f7Stbbdev             }
172851c0b2f7Stbbdev         } // for ( int loop ...
172951c0b2f7Stbbdev     } // Check
173051c0b2f7Stbbdev }
173151c0b2f7Stbbdev 
173251c0b2f7Stbbdev template<typename Policy> struct policy_name {};
173351c0b2f7Stbbdev 
173451c0b2f7Stbbdev template<> struct policy_name<tbb::flow::queueing> {
173551c0b2f7Stbbdev const char* msg_beg() { return "queueing\n";}
173651c0b2f7Stbbdev const char* msg_end() { return "test queueing extract\n";}
173751c0b2f7Stbbdev };
173851c0b2f7Stbbdev 
173951c0b2f7Stbbdev template<> struct policy_name<tbb::flow::reserving> {
174051c0b2f7Stbbdev const char* msg_beg() { return "reserving\n";}
174151c0b2f7Stbbdev const char* msg_end() { return "test reserving extract\n";}
174251c0b2f7Stbbdev };
174351c0b2f7Stbbdev 
174451c0b2f7Stbbdev template<> struct policy_name<tbb::flow::tag_matching> {
174551c0b2f7Stbbdev const char* msg_beg() { return "tag_matching\n";}
174651c0b2f7Stbbdev const char* msg_end() { return "test tag_matching extract\n";}
174751c0b2f7Stbbdev };
174851c0b2f7Stbbdev 
174951c0b2f7Stbbdev template<typename Policy>
175051c0b2f7Stbbdev void test_main() {
175151c0b2f7Stbbdev     test_input_port_policies<Policy>();
175251c0b2f7Stbbdev     for(int p = 0; p < 2; ++p) {
175351c0b2f7Stbbdev         INFO(policy_name<Policy>().msg_beg());
175451c0b2f7Stbbdev         generate_test<serial_test, std::tuple<threebyte, double>, Policy>::do_test();
175551c0b2f7Stbbdev #if MAX_TUPLE_TEST_SIZE >= 4
175651c0b2f7Stbbdev         {
175751c0b2f7Stbbdev             Checker<CheckType<int> > my_check;
175851c0b2f7Stbbdev             generate_test<serial_test, std::tuple<float, double, CheckType<int>, long>, Policy>::do_test();
175951c0b2f7Stbbdev         }
176051c0b2f7Stbbdev #endif
176151c0b2f7Stbbdev #if MAX_TUPLE_TEST_SIZE >= 6
176251c0b2f7Stbbdev         generate_test<serial_test, std::tuple<double, double, int, long, int, short>, Policy>::do_test();
176351c0b2f7Stbbdev #endif
176451c0b2f7Stbbdev #if MAX_TUPLE_TEST_SIZE >= 8
176551c0b2f7Stbbdev         generate_test<serial_test, std::tuple<float, double, double, double, float, int, float, long>, Policy>::do_test();
176651c0b2f7Stbbdev #endif
176751c0b2f7Stbbdev #if MAX_TUPLE_TEST_SIZE >= 10
176851c0b2f7Stbbdev         generate_test<serial_test, std::tuple<float, double, int, double, double, float, long, int, float, long>, Policy>::do_test();
176951c0b2f7Stbbdev #endif
177051c0b2f7Stbbdev         {
177151c0b2f7Stbbdev             Checker<CheckType<int> > my_check1;
177251c0b2f7Stbbdev             generate_test<parallel_test, std::tuple<float, CheckType<int> >, Policy>::do_test();
177351c0b2f7Stbbdev         }
177451c0b2f7Stbbdev #if MAX_TUPLE_TEST_SIZE >= 3
177551c0b2f7Stbbdev         generate_test<parallel_test, std::tuple<float, int, long>, Policy>::do_test();
177651c0b2f7Stbbdev #endif
177751c0b2f7Stbbdev #if MAX_TUPLE_TEST_SIZE >= 5
177851c0b2f7Stbbdev         generate_test<parallel_test, std::tuple<double, double, int, int, short>, Policy>::do_test();
177951c0b2f7Stbbdev #endif
178051c0b2f7Stbbdev #if MAX_TUPLE_TEST_SIZE >= 7
178151c0b2f7Stbbdev         generate_test<parallel_test, std::tuple<float, int, double, float, long, float, long>, Policy>::do_test();
178251c0b2f7Stbbdev #endif
178351c0b2f7Stbbdev #if MAX_TUPLE_TEST_SIZE >= 9
178451c0b2f7Stbbdev         generate_test<parallel_test, std::tuple<float, double, int, double, double, long, int, float, long>, Policy>::do_test();
178551c0b2f7Stbbdev #endif
178651c0b2f7Stbbdev     }
178751c0b2f7Stbbdev }
178851c0b2f7Stbbdev 
178951c0b2f7Stbbdev #endif /* tbb_test_join_node_H */
1790