xref: /oneTBB/test/tbb/test_join_node.h (revision 5e91b2c0)
1 /*
2     Copyright (c) 2005-2022 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #ifndef tbb_test_join_node_H
18 #define tbb_test_join_node_H
19 
20 #if _MSC_VER
21 // Suppress "decorated name length exceeded, name was truncated" warning
22 #if __INTEL_COMPILER
23 #pragma warning( disable: 2586 )
24 #else
25 #pragma warning( disable: 4503 )
26 #endif
27 #endif
28 
29 #include "tbb/flow_graph.h"
30 
31 #include "common/test.h"
32 #include "common/utils.h"
33 #include "common/checktype.h"
34 #include "common/graph_utils.h"
35 
36 #include <type_traits>
37 
38 const char *names[] = {
39     "Adam", "Bruce", "Charles", "Daniel", "Evan", "Frederich", "George", "Hiram", "Ichabod",
40     "John", "Kevin", "Leonard", "Michael", "Ned", "Olin", "Paul", "Quentin", "Ralph", "Steven",
41     "Thomas", "Ulysses", "Victor", "Walter", "Xerxes", "Yitzhak", "Zebediah", "Anne", "Bethany",
42     "Clarisse", "Dorothy", "Erin", "Fatima", "Gabrielle", "Helen", "Irene", "Jacqueline",
43     "Katherine", "Lana", "Marilyn", "Noelle", "Okiilani", "Pauline", "Querida", "Rose", "Sybil",
44     "Tatiana", "Umiko", "Victoria", "Wilma", "Xena", "Yolanda", "Zoe", "Algernon", "Benjamin",
45     "Caleb", "Dylan", "Ezra", "Felix", "Gabriel", "Henry", "Issac", "Jasper", "Keifer",
46     "Lincoln", "Milo", "Nathaniel", "Owen", "Peter", "Quincy", "Ronan", "Silas", "Theodore",
47     "Uriah", "Vincent", "Wilbur", "Xavier", "Yoda", "Zachary", "Amelia", "Brielle", "Charlotte",
48     "Daphne", "Emma", "Fiona", "Grace", "Hazel", "Isla", "Juliet", "Keira", "Lily", "Mia",
49     "Nora", "Olivia", "Penelope", "Quintana", "Ruby", "Sophia", "Tessa", "Ursula", "Violet",
50     "Willow", "Xanthe", "Yvonne", "ZsaZsa", "Asher", "Bennett", "Connor", "Dominic", "Ethan",
51     "Finn", "Grayson", "Hudson", "Ian", "Jackson", "Kent", "Liam", "Matthew", "Noah", "Oliver",
52     "Parker", "Quinn", "Rhys", "Sebastian", "Taylor", "Umberto", "Vito", "William", "Xanto",
53     "Yogi", "Zane", "Ava", "Brenda", "Chloe", "Delilah", "Ella", "Felicity", "Genevieve",
54     "Hannah", "Isabella", "Josephine", "Kacie", "Lucy", "Madeline", "Natalie", "Octavia",
55     "Piper", "Qismah", "Rosalie", "Scarlett", "Tanya", "Uta", "Vivian", "Wendy", "Xola",
56     "Yaritza", "Zanthe"};
57 
58 static const int NameCnt = sizeof(names)/sizeof(char *);
59 
60 template<typename K>
61 struct index_to_key {
operatorindex_to_key62     K operator()(const int indx) {
63         return (K)(3*indx+1);
64     }
65 };
66 
67 template<>
68 struct index_to_key<std::string> {
69     std::string operator()(const int indx) {
70         return std::string(names[indx % NameCnt]);
71     }
72 };
73 
74 template<typename K>
75 struct K_deref {
76     typedef K type;
77 };
78 
79 template<typename K>
80 struct K_deref<K&> {
81     typedef K type;
82 };
83 
84 template<typename K, typename V>
85 struct MyKeyFirst {
86     K my_key;
87     V my_value;
88     MyKeyFirst(int i = 0, int v = 0): my_key(index_to_key<K>()(i)), my_value((V)v) {
89     }
90     void print_val() const {
91         INFO("MyKeyFirst{"); print_my_value(my_key); INFO(","); print_my_value(my_value); INFO("}");
92     }
93     operator int() const { return (int)my_value; }
94 };
95 
96 template<typename K, typename V>
97 struct MyKeySecond {
98     V my_value;
99     K my_key;
100     MyKeySecond(int i = 0, int v = 0): my_value((V)v), my_key(index_to_key<K>()(i)) {
101     }
102     void print_val() const {
103         INFO("MyKeySecond{"); print_my_value(my_key); INFO(","); print_my_value(my_value); INFO("}");
104     }
105     operator int() const { return (int)my_value; }
106 };
107 
108 template<typename K, typename V>
109 struct MyMessageKeyWithoutKey {
110     V my_value;
111     K my_message_key;
112     MyMessageKeyWithoutKey(int i = 0, int v = 0): my_value((V)v), my_message_key(index_to_key<K>()(i)) {
113     }
114     void print_val() const {
115         INFO("MyMessageKeyWithoutKey{"); print_my_value(my_message_key); INFO(","); print_my_value(my_value); INFO("}");
116     }
117     operator int() const { return (int)my_value; }
118     const K& key() const {
119         return my_message_key;
120     }
121 };
122 
123 template<typename K, typename V>
124 struct MyMessageKeyWithBrokenKey {
125     V my_value;
126     K my_key;
127     K my_message_key;
128     MyMessageKeyWithBrokenKey(int i = 0, int v = 0): my_value((V)v), my_key(), my_message_key(index_to_key<K>()(i)) {
129     }
130     void print_val() const {
131         INFO("MyMessageKeyWithBrokenKey{"); print_my_value(my_message_key); INFO(","); print_my_value(my_value); INFO("}");
132     }
133     operator int() const { return (int)my_value; }
134     const K& key() const {
135         return my_message_key;
136     }
137 
138 };
139 
140 template<typename K, typename V>
141 struct MyKeyWithBrokenMessageKey {
142     V my_value;
143     K my_key;
144     MyKeyWithBrokenMessageKey(int i = 0, int v = 0): my_value((V)v), my_key(index_to_key<K>()(i)) {
145     }
146     void print_val() const {
147         INFO("MyKeyWithBrokenMessageKey{"); print_my_value(my_key); INFO(","); print_my_value(my_value); INFO("}");
148     }
149     operator int() const { return (int)my_value; }
150     K key() const {
151         CHECK_MESSAGE( (false), "The method should never be called");
152         return K();
153     }
154 };
155 
156 template<typename K, typename V>
157 struct MyMessageKeyWithoutKeyMethod {
158     V my_value;
159     K my_message_key;
160     MyMessageKeyWithoutKeyMethod(int i = 0, int v = 0): my_value((V)v), my_message_key(index_to_key<K>()(i)) {
161     }
162     void print_val() const {
163         INFO("MyMessageKeyWithoutKeyMethod{"); print_my_value(my_message_key); INFO(","); print_my_value(my_value); INFO("}");
164     }
165     operator int() const { return (int)my_value; }
166     //K key() const; // Do not define
167 };
168 
169 // Overload for MyMessageKeyWithoutKeyMethod
170 template <typename K, typename V>
171 K key_from_message(const MyMessageKeyWithoutKeyMethod<typename std::decay<K>::type, V> &m) {
172     return m.my_message_key;
173 }
174 
175 
176 // pattern for creating values in the tag_matching and key_matching, given an integer and the index in the tuple
177 template<typename TT, size_t INDEX>
178 struct make_thingie {
179     TT operator()(int const &i) {
180         return TT(i * (INDEX+1));
181     }
182 };
183 
184 template<template <typename, typename> class T, typename K, typename V, size_t INDEX>
185 struct make_thingie<T<K, V>, INDEX> {
186     T<K, V> operator()(int const &i) {
187         return T<K, V>(i, i*(INDEX+1));
188     }
189 };
190 
191 // cast_from<T>::my_int_val(i);
192 template<typename T>
193 struct cast_from {
194     static int my_int_val(T const &i) { return (int)i; }
195 };
196 
197 template<typename K, typename V>
198 struct cast_from<MyKeyFirst<K, V> > {
199     static int my_int_val(MyKeyFirst<K, V> const &i) { return (int)(i.my_value); }
200 };
201 
202 template<typename K, typename V>
203 struct cast_from<MyKeySecond<K, V> > {
204     static int my_int_val(MyKeySecond<K, V> const &i) { return (int)(i.my_value); }
205 };
206 
207 template<typename T>
208 void print_my_value(T const &i) {
209     INFO(" " << cast_from<T>::my_int_val(i) << " " );
210 }
211 
212 template<typename K, typename V>
213 void print_my_value(MyKeyFirst<K, V> const &i) {
214     i.print_val();
215 }
216 
217 template<typename K, typename V>
218 void print_my_value(MyKeySecond<K, V> const &i) {
219     i.print_val();
220 }
221 
222 template<>
223 void print_my_value(std::string const &i) {
224     INFO("\"" << i.c_str() << "\"" );
225 }
226 
227 //
228 // Tests
229 //
230 
231 //!
232 // my_struct_key == given a type V with a field named my_key of type K, will return a copy of my_key
233 template<class K, typename V>
234 struct my_struct_key {
235     K operator()(const V& mv) {
236         return mv.my_key;
237     }
238 };
239 
240 // specialization returning reference to my_key.
241 template<class K, typename V>
242 struct my_struct_key<K&, V> {
243     K& operator()(const V& mv) {
244         return const_cast<K&>(mv.my_key);
245     }
246 };
247 
248 using tbb::detail::d1::type_to_key_function_body;
249 using tbb::detail::d1::hash_buffer;
250 using tbb::detail::d1::tbb_hash_compare;
251 using tbb::detail::d1::type_to_key_function_body_leaf;
252 
253 template<class K, class V> struct VtoKFB {
254     typedef type_to_key_function_body<V, K> type;
255 };
256 
257 template<typename K> struct make_hash_compare { typedef tbb_hash_compare<K> type; };
258 
259 template<typename K, class V>
260 void hash_buffer_test(const char *sname) {
261     typedef typename K_deref<K>::type KnoR;
262     hash_buffer<
263         K,
264         V,
265         typename VtoKFB<K, V>::type,
266         tbb_hash_compare<KnoR>
267     > my_hash_buffer;
268     const bool k_is_ref = std::is_reference<K>::value;
269     typedef type_to_key_function_body_leaf<
270         V, K, my_struct_key<K, V> > my_func_body_type;
271     typename VtoKFB<K, V>::type *kp = new my_func_body_type(my_struct_key<K, V>());
272     my_hash_buffer.set_key_func(kp);
273     INFO("Running hash_buffer test on " << sname << "; is ref == " << (k_is_ref ? "true" : "false") << "\n" );
274     V mv1, mv0;
275     bool res;
276     for(int cnt = 0; cnt < 2; ++cnt) {
277         // insert 50 items after checking they are not already in the table
278         for(int i = 0; i < 50; ++i) {
279             KnoR kk = index_to_key<KnoR>()(i);
280             mv1.my_key = kk;
281             mv1.my_value = 0.5*i;
282             res = my_hash_buffer.find_with_key(kk, mv0);
283             CHECK_MESSAGE( (!res), "Found non-inserted item");
284             res = my_hash_buffer.insert_with_key(mv1);
285             CHECK_MESSAGE( (res), "insert failed");
286             res = my_hash_buffer.find_with_key(kk, mv0);
287             CHECK_MESSAGE( (res), "not found after insert");
288             CHECK_MESSAGE( (mv0.my_value==mv1.my_value), "result not correct");
289         }
290         // go backwards checking they are still there.
291         for(int i = 49; i>=0; --i) {
292             KnoR kk = index_to_key<KnoR>()(i);
293             double value = 0.5*i;
294             res = my_hash_buffer.find_with_key(kk, mv0);
295             CHECK_MESSAGE( (res), "find failed");
296             CHECK_MESSAGE( (mv0.my_value==value), "result not correct");
297         }
298         // delete every third item, check they are gone
299         for(int i = 0; i < 50; i += 3) {
300             KnoR kk = index_to_key<KnoR>()(i);
301             my_hash_buffer.delete_with_key(kk);
302             res = my_hash_buffer.find_with_key(kk, mv0);
303             CHECK_MESSAGE( (!res), "Found deleted item");
304         }
305         // check the deleted items are gone, the non-deleted items are there.
306         for(int i = 0; i < 50; ++i) {
307             KnoR kk = index_to_key<KnoR>()(i);
308             double value = 0.5*i;
309             if(i%3==0) {
310                 res = my_hash_buffer.find_with_key(kk, mv0);
311                 CHECK_MESSAGE( (!res), "found an item that was previously deleted");
312             }
313             else {
314                 res = my_hash_buffer.find_with_key(kk, mv0);
315                 CHECK_MESSAGE( (res), "find failed");
316                 CHECK_MESSAGE( (mv0.my_value==value), "result not correct");
317             }
318         }
319         // insert new items, check the deleted items return true, the non-deleted items return false.
320         for(int i = 0; i < 50; ++i) {
321             KnoR kk = index_to_key<KnoR>()(i);
322             double value = 1.5*i;
323             mv1.my_key = kk;
324             mv1.my_value = value;
325             res = my_hash_buffer.insert_with_key(mv1);
326             if(i%3==0) {
327                 CHECK_MESSAGE( (res), "didn't insert in empty slot");
328             }
329             else {
330                 CHECK_MESSAGE( (!res), "slot was empty on insert");
331             }
332         }
333         // delete all items
334         for(int i = 0; i < 50; ++i) {
335             KnoR kk = index_to_key<KnoR>()(i);
336             my_hash_buffer.delete_with_key(kk);
337             res = my_hash_buffer.find_with_key(kk, mv0);
338             CHECK_MESSAGE( (!res), "Found deleted item");
339         }
340     }  // perform tasks twice
341 }
342 
343 void
344 TestTaggedBuffers() {
345     hash_buffer_test<int, MyKeyFirst<int, double> >("MyKeyFirst<int,double>");
346     hash_buffer_test<int&, MyKeyFirst<int, double> >("MyKeyFirst<int,double> with int&");
347     hash_buffer_test<int, MyKeySecond<int, double> >("MyKeySecond<int,double>");
348 
349     hash_buffer_test<std::string, MyKeyFirst<std::string, double> >("MyKeyFirst<std::string,double>");
350     hash_buffer_test<std::string&, MyKeySecond<std::string, double> >("MyKeySecond<std::string,double> with std::string&");
351 }
352 
353 struct threebyte {
354     unsigned char b1;
355     unsigned char b2;
356     unsigned char b3;
357     threebyte(int i = 0) {
358         b1 = (unsigned char)(i&0xFF);
359         b2 = (unsigned char)((i>>8)&0xFF);
360         b3 = (unsigned char)((i>>16)&0xFF);
361     }
362     operator int() const { return (int)(b1+(b2<<8)+(b3<<16)); }
363 };
364 
365 const int Count = 150;
366 
367 const int Recirc_count = 1000;  // number of tuples to be generated
368 const int MaxPorts = 10;
369 const int MaxNInputs = 5; // max # of input_nodes to register for each join_node input in parallel test
370 bool outputCheck[MaxPorts][Count];  // for checking output
371 
372 void
373 check_outputCheck(int nUsed, int maxCnt) {
374     for(int i = 0; i < nUsed; ++i) {
375         for(int j = 0; j < maxCnt; ++j) {
376             CHECK_MESSAGE(outputCheck[i][j], "");
377         }
378     }
379 }
380 
381 void
382 reset_outputCheck(int nUsed, int maxCnt) {
383     for(int i = 0; i < nUsed; ++i) {
384         for(int j = 0; j < maxCnt; ++j) {
385             outputCheck[i][j] = false;
386         }
387     }
388 }
389 
390 template<typename T>
391 class name_of {
392 public:
393     static const char* name() { return  "Unknown"; }
394 };
395 template<typename T>
396 class name_of<CheckType<T> > {
397 public:
398     static const char* name() { return "checktype"; }
399 };
400 template<>
401 class name_of<int> {
402 public:
403     static const char* name() { return  "int"; }
404 };
405 template<>
406 class name_of<float> {
407 public:
408     static const char* name() { return  "float"; }
409 };
410 template<>
411 class name_of<double> {
412 public:
413     static const char* name() { return  "double"; }
414 };
415 template<>
416 class name_of<long> {
417 public:
418     static const char* name() { return  "long"; }
419 };
420 template<>
421 class name_of<short> {
422 public:
423     static const char* name() { return  "short"; }
424 };
425 template<>
426 class name_of<threebyte> {
427 public:
428     static const char* name() { return "threebyte"; }
429 };
430 template<>
431 class name_of<std::string> {
432 public:
433     static const char* name() { return "std::string"; }
434 };
435 template<typename K, typename V>
436 class name_of<MyKeyFirst<K, V> > {
437 public:
438     static const char* name() { return "MyKeyFirst<K,V>"; }
439 };
440 template<typename K, typename V>
441 class name_of<MyKeySecond<K, V> > {
442 public:
443     static const char* name() { return "MyKeySecond<K,V>"; }
444 };
445 
446 // The additional policy to differ message based key matching from usual key matching.
447 // It only makes sense for the test because join_node is created with the key_matching policy for the both cases.
448 template <typename K, typename KHash = tbb_hash_compare<typename std::decay<K>::type > >
449 struct message_based_key_matching {};
450 
451 // test for key_matching
452 template<class JP>
453 struct is_key_matching_join {
454     static const bool value;
455     typedef int key_type;  // have to define it to something
456 };
457 
458 template<class JP>
459 const bool is_key_matching_join<JP>::value = false;
460 
461 template<class K, class KHash>
462 struct is_key_matching_join<tbb::flow::key_matching<K, KHash> > {
463     static const bool value;
464     typedef K key_type;
465 };
466 
467 template<class K, class KHash>
468 const bool is_key_matching_join<tbb::flow::key_matching<K, KHash> >::value = true;
469 
470 template<class K, class KHash>
471 struct is_key_matching_join<message_based_key_matching<K, KHash> > {
472     static const bool value;
473     typedef K key_type;
474 };
475 
476 template<class K, class KHash>
477 const bool is_key_matching_join<message_based_key_matching<K, KHash> >::value = true;
478 
479 // for recirculating tags, input is tuple<index,continue_msg>
480 // output is index*my_mult cast to the right type
481 template<typename TT>
482 class recirc_func_body {
483     TT my_mult;
484 public:
485     typedef std::tuple<int, tbb::flow::continue_msg> input_type;
486     recirc_func_body(TT multiplier): my_mult(multiplier) {}
487     recirc_func_body(const recirc_func_body &other): my_mult(other.my_mult) { }
488     void operator=(const recirc_func_body &other) { my_mult = other.my_mult; }
489     TT operator()(const input_type &v) {
490         return TT(std::get<0>(v)) * my_mult;
491     }
492 };
493 
494 static int input_count;  // input_nodes are serial
495 
496 // emit input_count continue_msg
497 class recirc_input_node_body {
498 public:
499     tbb::flow::continue_msg operator()(tbb::flow_control &fc) {
500         if( --input_count < 0 ){
501             fc.stop();
502         }
503         return tbb::flow::continue_msg();
504     }
505 };
506 
507 // T must be arithmetic, and shouldn't wrap around for reasonable sizes of Count (which is now 150, and maxPorts is 10,
508 // so the max number generated right now is 1500 or so.)  Input will generate a series of TT with value
509 // (init_val + (i-1)*addend) * my_mult, where i is the i-th invocation of the body.  We are attaching addend
510 // input nodes to a join_port, and each will generate part of the numerical series the port is expecting
511 // to receive.  If there is only one input node, the series order will be maintained; if more than one,
512 // this is not guaranteed.
513 template<typename TT, size_t INDEX>
514 class my_input_body {
515     int my_count;
516     int addend;
517 public:
518     my_input_body(int init_val, int addto): my_count(init_val), addend(addto) { }
519     TT operator()(tbb::flow_control& fc) {
520         int lc = my_count;
521         TT ret = make_thingie<TT, INDEX>()(my_count);
522         my_count += addend;
523         if ( lc < Count){
524             return ret;
525         }else{
526             fc.stop();
527             return TT();
528         }
529     }
530 };
531 
532 template<typename TT>
533 class tag_func {
534     TT my_mult;
535 public:
536     tag_func(TT multiplier): my_mult(multiplier) { }
537     // operator() will return [0 .. Count)
538     tbb::flow::tag_value operator()(TT v) {
539         tbb::flow::tag_value t = tbb::flow::tag_value(v/my_mult);
540         return t;
541     }
542 };
543 
544 template <class JP>
545 struct filter_out_message_based_key_matching {
546     typedef JP policy;
547 };
548 
549 template <typename K, typename KHash>
550 struct filter_out_message_based_key_matching<message_based_key_matching<K, KHash> > {
551     // To have message based key matching in join_node, the key_matchig policy should be specified.
552     typedef tbb::flow::key_matching<K, KHash> policy;
553 };
554 
555 // allocator for join_node.  This is specialized for tag_matching and key_matching joins because they require a variable number
556 // of tag_value methods passed to the constructor
557 
558 template<int N, typename JType, class JP>
559 class makeJoin {
560 public:
561     static JType *create(tbb::flow::graph& g) {
562         JType *temp = new JType(g);
563         return temp;
564     }
565     static void destroy(JType *p) { delete p; }
566 };
567 
568 // for general key_matching case, each type in the tuple is a class that has the my_key field and the my_value field.
569 //
570 template<typename JType, typename K, typename KHash>
571 class makeJoin<2, JType, tbb::flow::key_matching<K, KHash> > {
572     typedef typename JType::output_type TType;
573     typedef typename std::tuple_element<0, TType>::type T0;
574     typedef typename std::tuple_element<1, TType>::type T1;
575 public:
576     static JType *create(tbb::flow::graph& g) {
577         JType *temp = new JType(g,
578             my_struct_key<K, T0>(),
579             my_struct_key<K, T1>()
580         );
581         return temp;
582     }
583     static void destroy(JType *p) { delete p; }
584 };
585 
586 template<typename JType>
587 class makeJoin<2, JType, tbb::flow::tag_matching> {
588     typedef typename JType::output_type TType;
589     typedef typename std::tuple_element<0, TType>::type T0;
590     typedef typename std::tuple_element<1, TType>::type T1;
591 public:
592     static JType *create(tbb::flow::graph& g) {
593         JType *temp = new JType(g,
594             tag_func<T0>(T0(2)),
595             tag_func<T1>(T1(3))
596         );
597         return temp;
598     }
599     static void destroy(JType *p) { delete p; }
600 };
601 
602 #if MAX_TUPLE_TEST_SIZE >= 3
603 template<typename JType, typename K, typename KHash>
604 class makeJoin<3, JType, tbb::flow::key_matching<K, KHash> > {
605     typedef typename JType::output_type TType;
606     typedef typename std::tuple_element<0, TType>::type T0;
607     typedef typename std::tuple_element<1, TType>::type T1;
608     typedef typename std::tuple_element<2, TType>::type T2;
609 public:
610     static JType *create(tbb::flow::graph& g) {
611         JType *temp = new JType(g,
612             my_struct_key<K, T0>(),
613             my_struct_key<K, T1>(),
614             my_struct_key<K, T2>()
615         );
616         return temp;
617     }
618     static void destroy(JType *p) { delete p; }
619 };
620 
621 template<typename JType>
622 class makeJoin<3, JType, tbb::flow::tag_matching> {
623     typedef typename JType::output_type TType;
624     typedef typename std::tuple_element<0, TType>::type T0;
625     typedef typename std::tuple_element<1, TType>::type T1;
626     typedef typename std::tuple_element<2, TType>::type T2;
627 public:
628     static JType *create(tbb::flow::graph& g) {
629         JType *temp = new JType(g,
630             tag_func<T0>(T0(2)),
631             tag_func<T1>(T1(3)),
632             tag_func<T2>(T2(4))
633         );
634         return temp;
635     }
636     static void destroy(JType *p) { delete p; }
637 };
638 
639 #endif
640 #if MAX_TUPLE_TEST_SIZE >= 4
641 
642 template<typename JType, typename K, typename KHash>
643 class makeJoin<4, JType, tbb::flow::key_matching<K, KHash> > {
644     typedef typename JType::output_type TType;
645     typedef typename std::tuple_element<0, TType>::type T0;
646     typedef typename std::tuple_element<1, TType>::type T1;
647     typedef typename std::tuple_element<2, TType>::type T2;
648     typedef typename std::tuple_element<3, TType>::type T3;
649 public:
650     static JType *create(tbb::flow::graph& g) {
651         JType *temp = new JType(g,
652             my_struct_key<K, T0>(),
653             my_struct_key<K, T1>(),
654             my_struct_key<K, T2>(),
655             my_struct_key<K, T3>()
656         );
657         return temp;
658     }
659     static void destroy(JType *p) { delete p; }
660 };
661 
662 template<typename JType>
663 class makeJoin<4, JType, tbb::flow::tag_matching> {
664     typedef typename JType::output_type TType;
665     typedef typename std::tuple_element<0, TType>::type T0;
666     typedef typename std::tuple_element<1, TType>::type T1;
667     typedef typename std::tuple_element<2, TType>::type T2;
668     typedef typename std::tuple_element<3, TType>::type T3;
669 public:
670     static JType *create(tbb::flow::graph& g) {
671         JType *temp = new JType(g,
672             tag_func<T0>(T0(2)),
673             tag_func<T1>(T1(3)),
674             tag_func<T2>(T2(4)),
675             tag_func<T3>(T3(5))
676         );
677         return temp;
678     }
679     static void destroy(JType *p) { delete p; }
680 };
681 
682 #endif
683 #if MAX_TUPLE_TEST_SIZE >= 5
684 template<typename JType, typename K, typename KHash>
685 class makeJoin<5, JType, tbb::flow::key_matching<K, KHash> > {
686     typedef typename JType::output_type TType;
687     typedef typename std::tuple_element<0, TType>::type T0;
688     typedef typename std::tuple_element<1, TType>::type T1;
689     typedef typename std::tuple_element<2, TType>::type T2;
690     typedef typename std::tuple_element<3, TType>::type T3;
691     typedef typename std::tuple_element<4, TType>::type T4;
692 public:
693     static JType *create(tbb::flow::graph& g) {
694         JType *temp = new JType(g,
695             my_struct_key<K, T0>(),
696             my_struct_key<K, T1>(),
697             my_struct_key<K, T2>(),
698             my_struct_key<K, T3>(),
699             my_struct_key<K, T4>()
700         );
701         return temp;
702     }
703     static void destroy(JType *p) { delete p; }
704 };
705 
706 template<typename JType>
707 class makeJoin<5, JType, tbb::flow::tag_matching> {
708     typedef typename JType::output_type TType;
709     typedef typename std::tuple_element<0, TType>::type T0;
710     typedef typename std::tuple_element<1, TType>::type T1;
711     typedef typename std::tuple_element<2, TType>::type T2;
712     typedef typename std::tuple_element<3, TType>::type T3;
713     typedef typename std::tuple_element<4, TType>::type T4;
714 public:
715     static JType *create(tbb::flow::graph& g) {
716         JType *temp = new JType(g,
717             tag_func<T0>(T0(2)),
718             tag_func<T1>(T1(3)),
719             tag_func<T2>(T2(4)),
720             tag_func<T3>(T3(5)),
721             tag_func<T4>(T4(6))
722         );
723         return temp;
724     }
725     static void destroy(JType *p) { delete p; }
726 };
727 #endif
728 #if MAX_TUPLE_TEST_SIZE >= 6
729 template<typename JType, typename K, typename KHash>
730 class makeJoin<6, JType, tbb::flow::key_matching<K, KHash> > {
731     typedef typename JType::output_type TType;
732     typedef typename std::tuple_element<0, TType>::type T0;
733     typedef typename std::tuple_element<1, TType>::type T1;
734     typedef typename std::tuple_element<2, TType>::type T2;
735     typedef typename std::tuple_element<3, TType>::type T3;
736     typedef typename std::tuple_element<4, TType>::type T4;
737     typedef typename std::tuple_element<5, TType>::type T5;
738 public:
739     static JType *create(tbb::flow::graph& g) {
740         JType *temp = new JType(g,
741             my_struct_key<K, T0>(),
742             my_struct_key<K, T1>(),
743             my_struct_key<K, T2>(),
744             my_struct_key<K, T3>(),
745             my_struct_key<K, T4>(),
746             my_struct_key<K, T5>()
747         );
748         return temp;
749     }
750     static void destroy(JType *p) { delete p; }
751 };
752 
753 template<typename JType>
754 class makeJoin<6, JType, tbb::flow::tag_matching> {
755     typedef typename JType::output_type TType;
756     typedef typename std::tuple_element<0, TType>::type T0;
757     typedef typename std::tuple_element<1, TType>::type T1;
758     typedef typename std::tuple_element<2, TType>::type T2;
759     typedef typename std::tuple_element<3, TType>::type T3;
760     typedef typename std::tuple_element<4, TType>::type T4;
761     typedef typename std::tuple_element<5, TType>::type T5;
762 public:
763     static JType *create(tbb::flow::graph& g) {
764         JType *temp = new JType(g,
765             tag_func<T0>(T0(2)),
766             tag_func<T1>(T1(3)),
767             tag_func<T2>(T2(4)),
768             tag_func<T3>(T3(5)),
769             tag_func<T4>(T4(6)),
770             tag_func<T5>(T5(7))
771         );
772         return temp;
773     }
774     static void destroy(JType *p) { delete p; }
775 };
776 #endif
777 
778 #if MAX_TUPLE_TEST_SIZE >= 7
779 template<typename JType, typename K, typename KHash>
780 class makeJoin<7, JType, tbb::flow::key_matching<K, KHash> > {
781     typedef typename JType::output_type TType;
782     typedef typename std::tuple_element<0, TType>::type T0;
783     typedef typename std::tuple_element<1, TType>::type T1;
784     typedef typename std::tuple_element<2, TType>::type T2;
785     typedef typename std::tuple_element<3, TType>::type T3;
786     typedef typename std::tuple_element<4, TType>::type T4;
787     typedef typename std::tuple_element<5, TType>::type T5;
788     typedef typename std::tuple_element<6, TType>::type T6;
789 public:
790     static JType *create(tbb::flow::graph& g) {
791         JType *temp = new JType(g,
792             my_struct_key<K, T0>(),
793             my_struct_key<K, T1>(),
794             my_struct_key<K, T2>(),
795             my_struct_key<K, T3>(),
796             my_struct_key<K, T4>(),
797             my_struct_key<K, T5>(),
798             my_struct_key<K, T6>()
799         );
800         return temp;
801     }
802     static void destroy(JType *p) { delete p; }
803 };
804 
805 template<typename JType>
806 class makeJoin<7, JType, tbb::flow::tag_matching> {
807     typedef typename JType::output_type TType;
808     typedef typename std::tuple_element<0, TType>::type T0;
809     typedef typename std::tuple_element<1, TType>::type T1;
810     typedef typename std::tuple_element<2, TType>::type T2;
811     typedef typename std::tuple_element<3, TType>::type T3;
812     typedef typename std::tuple_element<4, TType>::type T4;
813     typedef typename std::tuple_element<5, TType>::type T5;
814     typedef typename std::tuple_element<6, TType>::type T6;
815 public:
816     static JType *create(tbb::flow::graph& g) {
817         JType *temp = new JType(g,
818             tag_func<T0>(T0(2)),
819             tag_func<T1>(T1(3)),
820             tag_func<T2>(T2(4)),
821             tag_func<T3>(T3(5)),
822             tag_func<T4>(T4(6)),
823             tag_func<T5>(T5(7)),
824             tag_func<T6>(T6(8))
825         );
826         return temp;
827     }
828     static void destroy(JType *p) { delete p; }
829 };
830 #endif
831 
832 #if MAX_TUPLE_TEST_SIZE >= 8
833 template<typename JType, typename K, typename KHash>
834 class makeJoin<8, JType, tbb::flow::key_matching<K, KHash> > {
835     typedef typename JType::output_type TType;
836     typedef typename std::tuple_element<0, TType>::type T0;
837     typedef typename std::tuple_element<1, TType>::type T1;
838     typedef typename std::tuple_element<2, TType>::type T2;
839     typedef typename std::tuple_element<3, TType>::type T3;
840     typedef typename std::tuple_element<4, TType>::type T4;
841     typedef typename std::tuple_element<5, TType>::type T5;
842     typedef typename std::tuple_element<6, TType>::type T6;
843     typedef typename std::tuple_element<7, TType>::type T7;
844 public:
845     static JType *create(tbb::flow::graph& g) {
846         JType *temp = new JType(g,
847             my_struct_key<K, T0>(),
848             my_struct_key<K, T1>(),
849             my_struct_key<K, T2>(),
850             my_struct_key<K, T3>(),
851             my_struct_key<K, T4>(),
852             my_struct_key<K, T5>(),
853             my_struct_key<K, T6>(),
854             my_struct_key<K, T7>()
855         );
856         return temp;
857     }
858     static void destroy(JType *p) { delete p; }
859 };
860 
861 template<typename JType>
862 class makeJoin<8, JType, tbb::flow::tag_matching> {
863     typedef typename JType::output_type TType;
864     typedef typename std::tuple_element<0, TType>::type T0;
865     typedef typename std::tuple_element<1, TType>::type T1;
866     typedef typename std::tuple_element<2, TType>::type T2;
867     typedef typename std::tuple_element<3, TType>::type T3;
868     typedef typename std::tuple_element<4, TType>::type T4;
869     typedef typename std::tuple_element<5, TType>::type T5;
870     typedef typename std::tuple_element<6, TType>::type T6;
871     typedef typename std::tuple_element<7, TType>::type T7;
872 public:
873     static JType *create(tbb::flow::graph& g) {
874         JType *temp = new JType(g,
875             tag_func<T0>(T0(2)),
876             tag_func<T1>(T1(3)),
877             tag_func<T2>(T2(4)),
878             tag_func<T3>(T3(5)),
879             tag_func<T4>(T4(6)),
880             tag_func<T5>(T5(7)),
881             tag_func<T6>(T6(8)),
882             tag_func<T7>(T7(9))
883         );
884         return temp;
885     }
886     static void destroy(JType *p) { delete p; }
887 };
888 #endif
889 
890 #if MAX_TUPLE_TEST_SIZE >= 9
891 template<typename JType, typename K, typename KHash>
892 class makeJoin<9, JType, tbb::flow::key_matching<K, KHash> > {
893     typedef typename JType::output_type TType;
894     typedef typename std::tuple_element<0, TType>::type T0;
895     typedef typename std::tuple_element<1, TType>::type T1;
896     typedef typename std::tuple_element<2, TType>::type T2;
897     typedef typename std::tuple_element<3, TType>::type T3;
898     typedef typename std::tuple_element<4, TType>::type T4;
899     typedef typename std::tuple_element<5, TType>::type T5;
900     typedef typename std::tuple_element<6, TType>::type T6;
901     typedef typename std::tuple_element<7, TType>::type T7;
902     typedef typename std::tuple_element<8, TType>::type T8;
903 public:
904     static JType *create(tbb::flow::graph& g) {
905         JType *temp = new JType(g,
906             my_struct_key<K, T0>(),
907             my_struct_key<K, T1>(),
908             my_struct_key<K, T2>(),
909             my_struct_key<K, T3>(),
910             my_struct_key<K, T4>(),
911             my_struct_key<K, T5>(),
912             my_struct_key<K, T6>(),
913             my_struct_key<K, T7>(),
914             my_struct_key<K, T8>()
915         );
916         return temp;
917     }
918     static void destroy(JType *p) { delete p; }
919 };
920 
921 template<typename JType>
922 class makeJoin<9, JType, tbb::flow::tag_matching> {
923     typedef typename JType::output_type TType;
924     typedef typename std::tuple_element<0, TType>::type T0;
925     typedef typename std::tuple_element<1, TType>::type T1;
926     typedef typename std::tuple_element<2, TType>::type T2;
927     typedef typename std::tuple_element<3, TType>::type T3;
928     typedef typename std::tuple_element<4, TType>::type T4;
929     typedef typename std::tuple_element<5, TType>::type T5;
930     typedef typename std::tuple_element<6, TType>::type T6;
931     typedef typename std::tuple_element<7, TType>::type T7;
932     typedef typename std::tuple_element<8, TType>::type T8;
933 public:
934     static JType *create(tbb::flow::graph& g) {
935         JType *temp = new JType(g,
936             tag_func<T0>(T0(2)),
937             tag_func<T1>(T1(3)),
938             tag_func<T2>(T2(4)),
939             tag_func<T3>(T3(5)),
940             tag_func<T4>(T4(6)),
941             tag_func<T5>(T5(7)),
942             tag_func<T6>(T6(8)),
943             tag_func<T7>(T7(9)),
944             tag_func<T8>(T8(10))
945         );
946         return temp;
947     }
948     static void destroy(JType *p) { delete p; }
949 };
950 #endif
951 
952 #if MAX_TUPLE_TEST_SIZE >= 10
953 template<typename JType, typename K, typename KHash>
954 class makeJoin<10, JType, tbb::flow::key_matching<K, KHash> > {
955     typedef typename JType::output_type TType;
956     typedef typename std::tuple_element<0, TType>::type T0;
957     typedef typename std::tuple_element<1, TType>::type T1;
958     typedef typename std::tuple_element<2, TType>::type T2;
959     typedef typename std::tuple_element<3, TType>::type T3;
960     typedef typename std::tuple_element<4, TType>::type T4;
961     typedef typename std::tuple_element<5, TType>::type T5;
962     typedef typename std::tuple_element<6, TType>::type T6;
963     typedef typename std::tuple_element<7, TType>::type T7;
964     typedef typename std::tuple_element<8, TType>::type T8;
965     typedef typename std::tuple_element<9, TType>::type T9;
966 public:
967     static JType *create(tbb::flow::graph& g) {
968         JType *temp = new JType(g,
969             my_struct_key<K, T0>(),
970             my_struct_key<K, T1>(),
971             my_struct_key<K, T2>(),
972             my_struct_key<K, T3>(),
973             my_struct_key<K, T4>(),
974             my_struct_key<K, T5>(),
975             my_struct_key<K, T6>(),
976             my_struct_key<K, T7>(),
977             my_struct_key<K, T8>(),
978             my_struct_key<K, T9>()
979         );
980         return temp;
981     }
982     static void destroy(JType *p) { delete p; }
983 };
984 
985 template<typename JType>
986 class makeJoin<10, JType, tbb::flow::tag_matching> {
987     typedef typename JType::output_type TType;
988     typedef typename std::tuple_element<0, TType>::type T0;
989     typedef typename std::tuple_element<1, TType>::type T1;
990     typedef typename std::tuple_element<2, TType>::type T2;
991     typedef typename std::tuple_element<3, TType>::type T3;
992     typedef typename std::tuple_element<4, TType>::type T4;
993     typedef typename std::tuple_element<5, TType>::type T5;
994     typedef typename std::tuple_element<6, TType>::type T6;
995     typedef typename std::tuple_element<7, TType>::type T7;
996     typedef typename std::tuple_element<8, TType>::type T8;
997     typedef typename std::tuple_element<9, TType>::type T9;
998 public:
999     static JType *create(tbb::flow::graph& g) {
1000         JType *temp = new JType(g,
1001             tag_func<T0>(T0(2)),
1002             tag_func<T1>(T1(3)),
1003             tag_func<T2>(T2(4)),
1004             tag_func<T3>(T3(5)),
1005             tag_func<T4>(T4(6)),
1006             tag_func<T5>(T5(7)),
1007             tag_func<T6>(T6(8)),
1008             tag_func<T7>(T7(9)),
1009             tag_func<T8>(T8(10)),
1010             tag_func<T9>(T9(11))
1011         );
1012         return temp;
1013     }
1014     static void destroy(JType *p) { delete p; }
1015 };
1016 #endif
1017 
1018 // holder for input_node pointers for eventual deletion
1019 
1020 static void* all_input_nodes[MaxPorts][MaxNInputs];
1021 
1022 template<int ELEM, typename JNT>
1023 class input_node_helper {
1024 public:
1025     typedef JNT join_node_type;
1026     typedef tbb::flow::join_node<std::tuple<int, tbb::flow::continue_msg>, tbb::flow::reserving> input_join_type;
1027     typedef typename join_node_type::output_type TT;
1028 
1029     typedef typename std::tuple_element<ELEM-1, TT>::type IT;
1030     typedef typename tbb::flow::input_node<IT> my_input_node_type;
1031     typedef typename tbb::flow::function_node<std::tuple<int, tbb::flow::continue_msg>, IT> my_recirc_function_type;
1032     static void print_remark(const char * str) {
1033         input_node_helper<ELEM-1, JNT>::print_remark(str);
1034         INFO(", " << name_of<IT>::name());
1035     }
1036     static void add_input_nodes(join_node_type &my_join, tbb::flow::graph &g, int nInputs) {
1037         for(int i = 0; i < nInputs; ++i) {
1038             my_input_node_type *new_node = new my_input_node_type(g, my_input_body<IT, ELEM>(i, nInputs));
1039             tbb::flow::make_edge(*new_node, tbb::flow::input_port<ELEM-1>(my_join));
1040             all_input_nodes[ELEM-1][i] = (void *)new_node;
1041             new_node->activate();
1042         }
1043         // add the next input_node
1044         input_node_helper<ELEM-1, JNT>::add_input_nodes(my_join, g, nInputs);
1045     }
1046 
1047     static void add_recirc_func_nodes(join_node_type &my_join, input_join_type &my_input, tbb::flow::graph &g) {
1048         my_recirc_function_type *new_node = new my_recirc_function_type(g, tbb::flow::unlimited, recirc_func_body<IT>((IT)(ELEM+1)));
1049         tbb::flow::make_edge(*new_node, tbb::flow::input_port<ELEM-1>(my_join));
1050         tbb::flow::make_edge(my_input, *new_node);
1051         all_input_nodes[ELEM-1][0] = (void *)new_node;
1052         input_node_helper<ELEM-1, JNT>::add_recirc_func_nodes(my_join, my_input, g);
1053     }
1054 
1055     static void only_check_value(const int i, const TT &v) {
1056         CHECK_MESSAGE(std::get<ELEM-1>(v)==(IT)(i*(ELEM+1)), "");
1057         input_node_helper<ELEM-1, JNT>::only_check_value(i, v);
1058     }
1059 
1060     static void check_value(int i, TT &v, bool is_serial) {
1061         // the fetched value will match only if there is only one input_node.
1062         bool is_correct = !is_serial||std::get<ELEM-1>(v)==(IT)(i*(ELEM+1));
1063         CHECK_MESSAGE(is_correct, "");
1064         // tally the fetched value.
1065         int ival = (int)std::get<ELEM-1>(v);
1066         CHECK_MESSAGE(!(ival%(ELEM+1)), "");
1067         ival /= (ELEM+1);
1068         CHECK_MESSAGE(!outputCheck[ELEM-1][ival], "");
1069         outputCheck[ELEM-1][ival] = true;
1070         input_node_helper<ELEM-1, JNT>::check_value(i, v, is_serial);
1071     }
1072     static void remove_input_nodes(join_node_type& my_join, int nInputs) {
1073         for(int i = 0; i< nInputs; ++i) {
1074             my_input_node_type *dp = reinterpret_cast<my_input_node_type *>(all_input_nodes[ELEM-1][i]);
1075             tbb::flow::remove_edge(*dp, tbb::flow::input_port<ELEM-1>(my_join));
1076             delete dp;
1077         }
1078         input_node_helper<ELEM-1, JNT>::remove_input_nodes(my_join, nInputs);
1079     }
1080 
1081     static void remove_recirc_func_nodes(join_node_type& my_join, input_join_type &my_input) {
1082         my_recirc_function_type *fn = reinterpret_cast<my_recirc_function_type *>(all_input_nodes[ELEM-1][0]);
1083         tbb::flow::remove_edge(*fn, tbb::flow::input_port<ELEM-1>(my_join));
1084         tbb::flow::remove_edge(my_input, *fn);
1085         delete fn;
1086         input_node_helper<ELEM-1, JNT>::remove_recirc_func_nodes(my_join, my_input);
1087     }
1088 };
1089 
1090 template<typename JNT>
1091 class input_node_helper<1, JNT> {
1092     typedef JNT join_node_type;
1093     typedef tbb::flow::join_node<std::tuple<int, tbb::flow::continue_msg>, tbb::flow::reserving> input_join_type;
1094     typedef typename join_node_type::output_type TT;
1095 
1096     typedef typename std::tuple_element<0, TT>::type IT;
1097     typedef typename tbb::flow::input_node<IT> my_input_node_type;
1098     typedef typename tbb::flow::function_node<std::tuple<int, tbb::flow::continue_msg>, IT> my_recirc_function_type;
1099 public:
1100     static void print_remark(const char * str) {
1101         INFO(str << "< " << name_of<IT>::name());
1102     }
1103     static void add_input_nodes(join_node_type &my_join, tbb::flow::graph &g, int nInputs) {
1104         for(int i = 0; i < nInputs; ++i) {
1105             my_input_node_type *new_node = new my_input_node_type(g, my_input_body<IT, 1>(i, nInputs));
1106             tbb::flow::make_edge(*new_node, tbb::flow::input_port<0>(my_join));
1107             all_input_nodes[0][i] = (void *)new_node;
1108             new_node->activate();
1109         }
1110     }
1111 
1112     static void add_recirc_func_nodes(join_node_type &my_join, input_join_type &my_input, tbb::flow::graph &g) {
1113         my_recirc_function_type *new_node = new my_recirc_function_type(g, tbb::flow::unlimited, recirc_func_body<IT>((IT)(2)));
1114         tbb::flow::make_edge(*new_node, tbb::flow::input_port<0>(my_join));
1115         tbb::flow::make_edge(my_input, *new_node);
1116         all_input_nodes[0][0] = (void *)new_node;
1117     }
1118 
1119     static void only_check_value(const int i, const TT &v) {
1120         CHECK_MESSAGE(std::get<0>(v)==(IT)(i*2), "");
1121     }
1122 
1123     static void check_value(int i, TT &v, bool is_serial) {
1124         bool is_correct = !is_serial||std::get<0>(v)==(IT)(i*(2));
1125         CHECK_MESSAGE(is_correct, "");
1126         int ival = (int)std::get<0>(v);
1127         CHECK_MESSAGE(!(ival%2), "");
1128         ival /= 2;
1129         CHECK_MESSAGE(!outputCheck[0][ival], "");
1130         outputCheck[0][ival] = true;
1131     }
1132     static void remove_input_nodes(join_node_type& my_join, int nInputs) {
1133         for(int i = 0; i < nInputs; ++i) {
1134             my_input_node_type *dp = reinterpret_cast<my_input_node_type *>(all_input_nodes[0][i]);
1135             tbb::flow::remove_edge(*dp, tbb::flow::input_port<0>(my_join));
1136             delete dp;
1137         }
1138     }
1139 
1140     static void remove_recirc_func_nodes(join_node_type& my_join, input_join_type &my_input) {
1141         my_recirc_function_type *fn = reinterpret_cast<my_recirc_function_type *>(all_input_nodes[0][0]);
1142         tbb::flow::remove_edge(*fn, tbb::flow::input_port<0>(my_join));
1143         tbb::flow::remove_edge(my_input, *fn);
1144         delete fn;
1145     }
1146 };
1147 
1148 #if _MSC_VER && !defined(__INTEL_COMPILER)
1149 // Suppress "conditional expression is constant" warning.
1150 #pragma warning( push )
1151 #pragma warning( disable: 4127 )
1152 #endif
1153 
1154 template<typename JType, class JP>
1155 class parallel_test {
1156 public:
1157     typedef typename JType::output_type TType;
1158     typedef typename is_key_matching_join<JP>::key_type            key_type;
1159     static void test() {
1160         const int TUPLE_SIZE = std::tuple_size<TType>::value;
1161         const bool is_key_matching = is_key_matching_join<JP>::value;
1162 
1163         TType v;
1164         input_node_helper<TUPLE_SIZE, JType>::print_remark("Parallel test of join_node");
1165         INFO(" > ");
1166         if(is_key_matching) {
1167             INFO("with K == " << name_of<typename K_deref<typename is_key_matching_join<JP>::key_type>::type >::name());
1168             if(std::is_reference<typename is_key_matching_join<JP>::key_type>::value) {
1169                 INFO("&");
1170             }
1171         }
1172         INFO("\n");
1173         for(int i = 0; i < MaxPorts; ++i) {
1174             for(int j = 0; j < MaxNInputs; ++j) {
1175                 all_input_nodes[i][j] = nullptr;
1176             }
1177         }
1178         for(int nInputs = 1; nInputs<=MaxNInputs; ++nInputs) {
1179             tbb::flow::graph g;
1180             bool not_out_of_order = (nInputs==1)&&(!is_key_matching);
1181             JType* my_join = makeJoin<TUPLE_SIZE, JType, JP>::create(g);
1182             tbb::flow::queue_node<TType> outq1(g);
1183             tbb::flow::queue_node<TType> outq2(g);
1184 
1185             tbb::flow::make_edge(*my_join, outq1);
1186             tbb::flow::make_edge(*my_join, outq2);
1187 
1188             input_node_helper<TUPLE_SIZE, JType>::add_input_nodes((*my_join), g, nInputs);
1189 
1190             g.wait_for_all();
1191 
1192             reset_outputCheck(TUPLE_SIZE, Count);
1193             for(int i = 0; i < Count; ++i) {
1194                 CHECK_MESSAGE(outq1.try_get(v), "");
1195                 input_node_helper<TUPLE_SIZE, JType>::check_value(i, v, not_out_of_order);
1196             }
1197 
1198             check_outputCheck(TUPLE_SIZE, Count);
1199             reset_outputCheck(TUPLE_SIZE, Count);
1200 
1201             for(int i = 0; i < Count; i++) {
1202                 CHECK_MESSAGE(outq2.try_get(v), "");
1203                 input_node_helper<TUPLE_SIZE, JType>::check_value(i, v, not_out_of_order);
1204             }
1205             check_outputCheck(TUPLE_SIZE, Count);
1206 
1207             CHECK_MESSAGE(!outq1.try_get(v), "");
1208             CHECK_MESSAGE(!outq2.try_get(v), "");
1209 
1210             input_node_helper<TUPLE_SIZE, JType>::remove_input_nodes((*my_join), nInputs);
1211             tbb::flow::remove_edge(*my_join, outq1);
1212             tbb::flow::remove_edge(*my_join, outq2);
1213             makeJoin<TUPLE_SIZE, JType, JP>::destroy(my_join);
1214         }
1215     }
1216 };
1217 
1218 
1219 template<int ELEM, typename JType>
1220 class serial_queue_helper {
1221 public:
1222     typedef typename JType::output_type TT;
1223     typedef typename std::tuple_element<ELEM-1, TT>::type IT;
1224     typedef typename tbb::flow::queue_node<IT> my_queue_node_type;
1225     static void print_remark() {
1226         serial_queue_helper<ELEM-1, JType>::print_remark();
1227         INFO(", " << name_of<IT>::name());
1228     }
1229     static void add_queue_nodes(tbb::flow::graph &g, JType &my_join) {
1230         serial_queue_helper<ELEM-1, JType>::add_queue_nodes(g, my_join);
1231         my_queue_node_type *new_node = new my_queue_node_type(g);
1232         tbb::flow::make_edge(*new_node, std::get<ELEM-1>(my_join.input_ports()));
1233         all_input_nodes[ELEM-1][0] = (void *)new_node;
1234     }
1235 
1236     static void fill_one_queue(int maxVal) {
1237         // fill queue to "left" of me
1238         my_queue_node_type *qptr = reinterpret_cast<my_queue_node_type *>(all_input_nodes[ELEM-1][0]);
1239         serial_queue_helper<ELEM-1, JType>::fill_one_queue(maxVal);
1240         for(int i = 0; i < maxVal; ++i) {
1241             CHECK_MESSAGE(qptr->try_put(make_thingie<IT, ELEM>()(i)), "");
1242         }
1243     }
1244 
1245     static void put_one_queue_val(int myVal) {
1246         // put this val to my "left".
1247         serial_queue_helper<ELEM-1, JType>::put_one_queue_val(myVal);
1248         my_queue_node_type *qptr = reinterpret_cast<my_queue_node_type *>(all_input_nodes[ELEM-1][0]);
1249         CHECK_MESSAGE(qptr->try_put(make_thingie<IT, ELEM>()(myVal)), "");
1250     }
1251 
1252     static void check_queue_value(int i, TT &v) {
1253         serial_queue_helper<ELEM-1, JType>::check_queue_value(i, v);
1254         CHECK_MESSAGE(cast_from<IT>::my_int_val(std::get<ELEM-1>(v))==i * (ELEM+1), "");
1255     }
1256 
1257     static void remove_queue_nodes(JType &my_join) {
1258         my_queue_node_type *vptr = reinterpret_cast<my_queue_node_type *>(all_input_nodes[ELEM-1][0]);
1259         tbb::flow::remove_edge(*vptr, std::get<ELEM-1>(my_join.input_ports()));
1260         serial_queue_helper<ELEM-1, JType>::remove_queue_nodes(my_join);
1261         delete vptr;
1262     }
1263 };
1264 
1265 template<typename JType>
1266 class serial_queue_helper<1, JType> {
1267 public:
1268     typedef typename JType::output_type TT;
1269     typedef typename std::tuple_element<0, TT>::type IT;
1270     typedef typename tbb::flow::queue_node<IT> my_queue_node_type;
1271     static void print_remark() {
1272         INFO("Serial test of join_node< " << name_of<IT>::name());
1273     }
1274 
1275     static void add_queue_nodes(tbb::flow::graph &g, JType &my_join) {
1276         my_queue_node_type *new_node = new my_queue_node_type(g);
1277         tbb::flow::make_edge(*new_node, tbb::flow::input_port<0>(my_join));
1278         all_input_nodes[0][0] = (void *)new_node;
1279     }
1280 
1281     static void fill_one_queue(int maxVal) {
1282         my_queue_node_type *qptr = reinterpret_cast<my_queue_node_type *>(all_input_nodes[0][0]);
1283         for(int i = 0; i < maxVal; ++i) {
1284             CHECK_MESSAGE(qptr->try_put(make_thingie<IT, 1>()(i)), "");
1285         }
1286     }
1287 
1288     static void put_one_queue_val(int myVal) {
1289         my_queue_node_type *qptr = reinterpret_cast<my_queue_node_type *>(all_input_nodes[0][0]);
1290         IT my_val = make_thingie<IT, 1>()(myVal);
1291         CHECK_MESSAGE(qptr->try_put(my_val), "");
1292     }
1293 
1294     static void check_queue_value(int i, TT &v) {
1295         CHECK_MESSAGE(cast_from<IT>::my_int_val(std::get<0>(v))==i*2, "");
1296     }
1297 
1298     static void remove_queue_nodes(JType &my_join) {
1299         my_queue_node_type *vptr = reinterpret_cast<my_queue_node_type *>(all_input_nodes[0][0]);
1300         tbb::flow::remove_edge(*vptr, std::get<0>(my_join.input_ports()));
1301         delete vptr;
1302     }
1303 };
1304 
1305 //
1306 // Single reservable predecessor at each port, single accepting and rejecting successor
1307 //   * put to buffer before port0, then put to buffer before port1, ...
1308 //   * fill buffer before port0 then fill buffer before port1, ...
1309 
1310 template<typename JType, class JP>
1311 void test_one_serial(JType &my_join, tbb::flow::graph &g) {
1312     typedef typename JType::output_type TType;
1313     static const int TUPLE_SIZE = std::tuple_size<TType>::value;
1314     bool is_key_matching = is_key_matching_join<JP>::value;
1315     std::vector<bool> flags;
1316     serial_queue_helper<TUPLE_SIZE, JType>::add_queue_nodes(g, my_join);
1317     typedef TType q3_input_type;
1318     tbb::flow::queue_node< q3_input_type > q3(g);
1319 
1320     tbb::flow::make_edge(my_join, q3);
1321 
1322     // fill each queue with its value one-at-a-time
1323     flags.clear();
1324     for(int i = 0; i < Count; ++i) {
1325         serial_queue_helper<TUPLE_SIZE, JType>::put_one_queue_val(i);
1326         flags.push_back(false);
1327     }
1328 
1329     g.wait_for_all();
1330     for(int i = 0; i < Count; ++i) {
1331         q3_input_type v;
1332         g.wait_for_all();
1333         CHECK_MESSAGE( (q3.try_get(v)), "Error in try_get()");
1334         if(is_key_matching) {
1335             // because we look up tags in the hash table, the output may be out of order.
1336             int j = int(std::get<0>(v))/2;  // figure what the index should be
1337             serial_queue_helper<TUPLE_SIZE, JType>::check_queue_value(j, v);
1338             flags[j] = true;
1339         }
1340         else {
1341             serial_queue_helper<TUPLE_SIZE, JType>::check_queue_value(i, v);
1342         }
1343     }
1344 
1345     if(is_key_matching) {
1346         for(int i = 0; i < Count; ++i) {
1347             CHECK_MESSAGE(flags[i], "");
1348             flags[i] = false;
1349         }
1350     }
1351 
1352     tbb::flow::remove_edge(my_join, q3);
1353     tbb::flow::limiter_node<q3_input_type> limiter(g, Count / 2);
1354     tbb::flow::make_edge(my_join, limiter);
1355     tbb::flow::make_edge(limiter, q3);
1356 
1357     // fill each queue completely before filling the next.
1358     serial_queue_helper<TUPLE_SIZE, JType>::fill_one_queue(Count);
1359 
1360     g.wait_for_all();
1361     for(int i = 0; i < Count / 2; ++i) {
1362         q3_input_type v;
1363         g.wait_for_all();
1364         CHECK_MESSAGE( (q3.try_get(v)), "Error in try_get()");
1365         if(is_key_matching) {
1366             int j = int(std::get<0>(v))/2;
1367             serial_queue_helper<TUPLE_SIZE, JType>::check_queue_value(j, v);
1368             flags[j] = true;
1369         }
1370         else {
1371             serial_queue_helper<TUPLE_SIZE, JType>::check_queue_value(i, v);
1372         }
1373     }
1374 
1375     if(is_key_matching) {
1376         CHECK(std::count(flags.begin(), flags.end(), true) == Count / 2);
1377     }
1378 
1379     serial_queue_helper<TUPLE_SIZE, JType>::remove_queue_nodes(my_join);
1380 }
1381 
1382 template<typename JType, class JP>
1383 class serial_test {
1384     typedef typename JType::output_type TType;
1385 public:
1386     static void test() {
1387         tbb::flow::graph g;
1388         std::vector<bool> flags;
1389         bool is_key_matching = is_key_matching_join<JP>::value;
1390         flags.reserve(Count);
1391 
1392         const int TUPLE_SIZE = std::tuple_size<TType>::value;
1393         static const int ELEMS = 3;
1394 
1395         JType* my_join = makeJoin<TUPLE_SIZE, JType, JP>::create(g);
1396         test_input_ports_return_ref(*my_join);
1397         serial_queue_helper<TUPLE_SIZE, JType>::print_remark(); INFO(" >");
1398         if(is_key_matching) {
1399             INFO("with K == " << name_of<typename K_deref<typename is_key_matching_join<JP>::key_type>::type >::name());
1400             if(std::is_reference<typename is_key_matching_join<JP>::key_type>::value) {
1401                 INFO("&");
1402             }
1403         }
1404         INFO("\n");
1405 
1406         test_one_serial<JType, JP>(*my_join, g);
1407         // build the vector with copy construction from the used join node.
1408         std::vector<JType>join_vector(ELEMS, *my_join);
1409         // destroy the tired old join_node in case we're accidentally reusing pieces of it.
1410         makeJoin<TUPLE_SIZE, JType, JP>::destroy(my_join);
1411 
1412         for(int e = 0; e < ELEMS; ++e) {  // exercise each of the vector elements
1413             test_one_serial<JType, JP>(join_vector[e], g);
1414         }
1415     }
1416 
1417 }; // serial_test
1418 
1419 #if _MSC_VER && !defined(__INTEL_COMPILER)
1420 #pragma warning( pop )
1421 #endif
1422 
1423 template<
1424     template<typename, class > class TestType,  // serial_test or parallel_test
1425     typename OutputTupleType,           // type of the output of the join
1426     class J>                 // graph_buffer_policy (reserving, queueing, tag_matching or key_matching)
1427     class generate_test {
1428     public:
1429         typedef tbb::flow::join_node<OutputTupleType, typename filter_out_message_based_key_matching<J>::policy> join_node_type;
1430         static void do_test() {
1431             TestType<join_node_type, J>::test();
1432         }
1433 };
1434 
1435 template<class JP>
1436 void test_input_port_policies();
1437 
1438 // join_node (reserving) does not consume inputs until an item is available at
1439 // every input.  It tries to reserve each input, and if any fails it releases the
1440 // reservation.  When it builds a tuple it broadcasts to all its successors and
1441 // consumes all the inputs.
1442 //
1443 // So our test will put an item at one input port, then attach another node to the
1444 // same node (a queue node in this case).  The second successor should receive the
1445 // item in the queue, emptying it.
1446 //
1447 // We then place an item in the second input queue, and check the output queues; they
1448 // should still be empty.  Then we place an item in the first queue; the output queues
1449 // should then receive a tuple.
1450 //
1451 // we then attach another function node to the second input.  It should not receive
1452 // an item, verifying that the item in the queue is consumed.
1453 template<>
1454 void test_input_port_policies<tbb::flow::reserving>() {
1455     tbb::flow::graph g;
1456     typedef tbb::flow::join_node<std::tuple<int, int>, tbb::flow::reserving > JType; // two-phase is the default policy
1457                                                                                            // create join_node<type0,type1> jn
1458     JType jn(g);
1459     // create output_queue oq0, oq1
1460     typedef JType::output_type OQType;
1461     tbb::flow::queue_node<OQType> oq0(g);
1462     tbb::flow::queue_node<OQType> oq1(g);
1463     // create iq0, iq1
1464     typedef tbb::flow::queue_node<int> IQType;
1465     IQType iq0(g);
1466     IQType iq1(g);
1467     // create qnp, qnq
1468     IQType qnp(g);
1469     IQType qnq(g);
1470     INFO("Testing policies of join_node<reserving> input ports\n");
1471     // attach jn to oq0, oq1
1472     tbb::flow::make_edge(jn, oq0);
1473     tbb::flow::make_edge(jn, oq1);
1474 
1475     // attach iq0, iq1 to jn
1476     tbb::flow::make_edge(iq0, std::get<0>(jn.input_ports()));
1477     tbb::flow::make_edge(iq1, std::get<1>(jn.input_ports()));
1478 
1479     for(int loop = 0; loop < 3; ++loop) {
1480         // place one item in iq0
1481         CHECK_MESSAGE( (iq0.try_put(1)), "Error putting to iq1");
1482         // attach iq0 to qnp
1483         tbb::flow::make_edge(iq0, qnp);
1484         // qnp should have an item in it.
1485         g.wait_for_all();
1486         {
1487             int i;
1488             CHECK_MESSAGE( (qnp.try_get(i)&&i==1), "Error in item fetched by qnp");
1489         }
1490         // place item in iq1
1491         CHECK_MESSAGE( (iq1.try_put(2)), "Error putting to iq1");
1492         // oq0, oq1 should be empty
1493         g.wait_for_all();
1494         {
1495             OQType t1;
1496             CHECK_MESSAGE( (!oq0.try_get(t1)&&!oq1.try_get(t1)), "oq0 and oq1 not empty");
1497         }
1498         // detach qnp from iq0
1499         tbb::flow::remove_edge(iq0, qnp); // if we don't remove qnp it will gobble any values we put in iq0
1500                                           // place item in iq0
1501         CHECK_MESSAGE( (iq0.try_put(3)), "Error on second put to iq0");
1502         // oq0, oq1 should have items in them
1503         g.wait_for_all();
1504         {
1505             OQType t0;
1506             OQType t1;
1507             CHECK_MESSAGE( (oq0.try_get(t0)&&std::get<0>(t0)==3&&std::get<1>(t0)==2), "Error in oq0 output");
1508             CHECK_MESSAGE( (oq1.try_get(t1)&&std::get<0>(t1)==3&&std::get<1>(t1)==2), "Error in oq1 output");
1509         }
1510         // attach qnp to iq0, qnq to iq1
1511         // qnp and qnq should be empty
1512         tbb::flow::make_edge(iq0, qnp);
1513         tbb::flow::make_edge(iq1, qnq);
1514         g.wait_for_all();
1515         {
1516             int i;
1517             CHECK_MESSAGE( (!qnp.try_get(i)), "iq0 still had value in it");
1518             CHECK_MESSAGE( (!qnq.try_get(i)), "iq1 still had value in it");
1519         }
1520         tbb::flow::remove_edge(iq0, qnp);
1521         tbb::flow::remove_edge(iq1, qnq);
1522     } // for ( int loop ...
1523 }
1524 
1525 // join_node (queueing) consumes inputs as soon as they are available at
1526 // any input.  When it builds a tuple it broadcasts to all its successors and
1527 // discards the broadcast values.
1528 //
1529 // So our test will put an item at one input port, then attach another node to the
1530 // same node (a queue node in this case).  The second successor should not receive
1531 // an item (because the join consumed it).
1532 //
1533 // We then place an item in the second input queue, and check the output queues; they
1534 // should each have a tuple.
1535 //
1536 // we then attach another function node to the second input.  It should not receive
1537 // an item, verifying that the item in the queue is consumed.
1538 template<>
1539 void test_input_port_policies<tbb::flow::queueing>() {
1540     tbb::flow::graph g;
1541     typedef tbb::flow::join_node<std::tuple<int, int>, tbb::flow::queueing > JType;
1542     // create join_node<type0,type1> jn
1543     JType jn(g);
1544     // create output_queue oq0, oq1
1545     typedef JType::output_type OQType;
1546     tbb::flow::queue_node<OQType> oq0(g);
1547     tbb::flow::queue_node<OQType> oq1(g);
1548     // create iq0, iq1
1549     typedef tbb::flow::queue_node<int> IQType;
1550     IQType iq0(g);
1551     IQType iq1(g);
1552     // create qnp, qnq
1553     IQType qnp(g);
1554     IQType qnq(g);
1555     INFO("Testing policies of join_node<queueing> input ports\n");
1556     // attach jn to oq0, oq1
1557     tbb::flow::make_edge(jn, oq0);
1558     tbb::flow::make_edge(jn, oq1);
1559 
1560     // attach iq0, iq1 to jn
1561     tbb::flow::make_edge(iq0, std::get<0>(jn.input_ports()));
1562     tbb::flow::make_edge(iq1, std::get<1>(jn.input_ports()));
1563 
1564     for(int loop = 0; loop < 3; ++loop) {
1565         // place one item in iq0
1566         CHECK_MESSAGE( (iq0.try_put(1)), "Error putting to iq1");
1567         // attach iq0 to qnp
1568         tbb::flow::make_edge(iq0, qnp);
1569         // qnp should have an item in it.
1570         g.wait_for_all();
1571         {
1572             int i;
1573             CHECK_MESSAGE( (!qnp.try_get(i)), "Item was received by qnp");
1574         }
1575         // place item in iq1
1576         CHECK_MESSAGE( (iq1.try_put(2)), "Error putting to iq1");
1577         // oq0, oq1 should have items
1578         g.wait_for_all();
1579         {
1580             OQType t0;
1581             OQType t1;
1582             CHECK_MESSAGE( (oq0.try_get(t0)&&std::get<0>(t0)==1&&std::get<1>(t0)==2), "Error in oq0 output");
1583             CHECK_MESSAGE( (oq1.try_get(t1)&&std::get<0>(t1)==1&&std::get<1>(t1)==2), "Error in oq1 output");
1584         }
1585         // attach qnq to iq1
1586         // qnp and qnq should be empty
1587         tbb::flow::make_edge(iq1, qnq);
1588         g.wait_for_all();
1589         {
1590             int i;
1591             CHECK_MESSAGE( (!qnp.try_get(i)), "iq0 still had value in it");
1592             CHECK_MESSAGE( (!qnq.try_get(i)), "iq1 still had value in it");
1593         }
1594         tbb::flow::remove_edge(iq0, qnp);
1595         tbb::flow::remove_edge(iq1, qnq);
1596     } // for ( int loop ...
1597 }
1598 
1599 template<typename T>
1600 struct myTagValue {
1601     tbb::flow::tag_value operator()(T i) { return tbb::flow::tag_value(i); }
1602 };
1603 
1604 template<>
1605 struct myTagValue<CheckType<int> > {
1606     tbb::flow::tag_value operator()(CheckType<int> i) { return tbb::flow::tag_value((int)i); }
1607 };
1608 
1609 // join_node (tag_matching) consumes inputs as soon as they are available at
1610 // any input.  When it builds a tuple it broadcasts to all its successors and
1611 // discards the broadcast values.
1612 //
1613 // It chooses the tuple it broadcasts by matching the tag values returned by the
1614 // methods given the constructor of the join, in this case the method just casts
1615 // the value in each port to tag_value.
1616 //
1617 // So our test will put an item at one input port, then attach another node to the
1618 // same node (a queue node in this case).  The second successor should not receive
1619 // an item (because the join consumed it).
1620 //
1621 // We then place an item in the second input queue, and check the output queues; they
1622 // should each have a tuple.
1623 //
1624 // we then attach another queue node to the second input.  It should not receive
1625 // an item, verifying that the item in the queue is consumed.
1626 //
1627 // We will then exercise the join with a bunch of values, and the output order should
1628 // be determined by the order we insert items into the second queue.  (Each tuple set
1629 // corresponding to a tag will be complete when the second item is inserted.)
1630 template<>
1631 void test_input_port_policies<tbb::flow::tag_matching>() {
1632     tbb::flow::graph g;
1633     typedef tbb::flow::join_node<std::tuple<int, CheckType<int> >, tbb::flow::tag_matching > JoinNodeType;
1634     typedef JoinNodeType::output_type CheckTupleType;
1635     JoinNodeType testJoinNode(g, myTagValue<int>(), myTagValue<CheckType<int> >());
1636     tbb::flow::queue_node<CheckTupleType> checkTupleQueue0(g);
1637     tbb::flow::queue_node<CheckTupleType> checkTupleQueue1(g);
1638     {
1639         Checker<CheckType<int> > my_check;
1640 
1641 
1642         typedef tbb::flow::queue_node<int> IntQueueType;
1643         typedef tbb::flow::queue_node<CheckType<int> > CheckQueueType;
1644         IntQueueType intInputQueue(g);
1645         CheckQueueType checkInputQueue(g);
1646         IntQueueType intEmptyTestQueue(g);
1647         CheckQueueType checkEmptyTestQueue(g);
1648         INFO("Testing policies of join_node<tag_matching> input ports\n");
1649         // attach testJoinNode to checkTupleQueue0, checkTupleQueue1
1650         tbb::flow::make_edge(testJoinNode, checkTupleQueue0);
1651         tbb::flow::make_edge(testJoinNode, checkTupleQueue1);
1652 
1653         // attach intInputQueue, checkInputQueue to testJoinNode
1654         tbb::flow::make_edge(intInputQueue, tbb::flow::input_port<0>(testJoinNode));
1655         tbb::flow::make_edge(checkInputQueue, tbb::flow::input_port<1>(testJoinNode));
1656 
1657         // we'll put four discrete values in the inputs to the join_node.  Each
1658         // set of inputs should result in one output.
1659         for(int loop = 0; loop < 4; ++loop) {
1660             // place one item in intInputQueue
1661             CHECK_MESSAGE( (intInputQueue.try_put(loop)), "Error putting to intInputQueue");
1662             // attach intInputQueue to intEmptyTestQueue
1663             tbb::flow::make_edge(intInputQueue, intEmptyTestQueue);
1664             // intEmptyTestQueue should not have an item in it.  (the join consumed it.)
1665             g.wait_for_all();
1666             {
1667                 int intVal0;
1668                 CHECK_MESSAGE( (!intEmptyTestQueue.try_get(intVal0)), "Item was received by intEmptyTestQueue");
1669             }
1670             // place item in checkInputQueue
1671             CheckType<int> checkVal0(loop);
1672             CHECK_MESSAGE( (checkInputQueue.try_put(checkVal0)), "Error putting to checkInputQueue");
1673             // checkTupleQueue0, checkTupleQueue1 should have items
1674             g.wait_for_all();
1675             {
1676                 CheckTupleType t0;
1677                 CheckTupleType t1;
1678                 CHECK_MESSAGE( (checkTupleQueue0.try_get(t0)&&std::get<0>(t0)==loop&&(int)std::get<1>(t0)==loop), "Error in checkTupleQueue0 output");
1679                 CHECK_MESSAGE( (checkTupleQueue1.try_get(t1)&&std::get<0>(t1)==loop&&(int)std::get<1>(t1)==loop), "Error in checkTupleQueue1 output");
1680                 CHECK_MESSAGE( (!checkTupleQueue0.try_get(t0)), "extra object in output queue checkTupleQueue0");
1681                 CHECK_MESSAGE( (!checkTupleQueue1.try_get(t0)), "extra object in output queue checkTupleQueue1");
1682             }
1683             // attach checkEmptyTestQueue to checkInputQueue
1684             // intEmptyTestQueue and checkEmptyTestQueue should be empty
1685             tbb::flow::make_edge(checkInputQueue, checkEmptyTestQueue);
1686             g.wait_for_all();
1687             {
1688                 int intVal1;
1689                 CheckType<int> checkVal1;
1690                 CHECK_MESSAGE( (!intEmptyTestQueue.try_get(intVal1)), "intInputQueue still had value in it");
1691                 CHECK_MESSAGE( (!checkEmptyTestQueue.try_get(checkVal1)), "checkInputQueue still had value in it");
1692             }
1693             tbb::flow::remove_edge(intInputQueue, intEmptyTestQueue);
1694             tbb::flow::remove_edge(checkInputQueue, checkEmptyTestQueue);
1695         } // for ( int loop ...
1696 
1697           // Now we'll put [4 .. nValues - 1] in intInputQueue, and then put [4 .. nValues - 1] in checkInputQueue in
1698           // a different order.  We should see tuples in the output queues in the order we inserted
1699           // the integers into checkInputQueue.
1700         const int nValues = 100;
1701         const int nIncr = 31;  // relatively prime to nValues
1702 
1703         for(int loop = 4; loop < 4+nValues; ++loop) {
1704             // place one item in intInputQueue
1705             CHECK_MESSAGE( (intInputQueue.try_put(loop)), "Error putting to intInputQueue");
1706             g.wait_for_all();
1707             {
1708                 CheckTupleType t3;
1709                 CHECK_MESSAGE( (!checkTupleQueue0.try_get(t3)), "Object in output queue");
1710                 CHECK_MESSAGE( (!checkTupleQueue1.try_get(t3)), "Object in output queue");
1711             }
1712         } // for ( int loop ...
1713 
1714         for(int loop = 1; loop<=nValues; ++loop) {
1715             int lp1 = 4+(loop * nIncr)%nValues;
1716             // place item in checkInputQueue
1717             CHECK_MESSAGE( (checkInputQueue.try_put(lp1)), "Error putting to checkInputQueue");
1718             // checkTupleQueue0, checkTupleQueue1 should have items
1719             g.wait_for_all();
1720             {
1721                 CheckTupleType t0;
1722                 CheckTupleType t1;
1723                 CHECK_MESSAGE( (checkTupleQueue0.try_get(t0)&&std::get<0>(t0)==lp1 && std::get<1>(t0)==lp1), "Error in checkTupleQueue0 output");
1724                 CHECK_MESSAGE( (checkTupleQueue1.try_get(t1)&&std::get<0>(t1)==lp1 && std::get<1>(t1)==lp1), "Error in checkTupleQueue1 output");
1725                 CHECK_MESSAGE( (!checkTupleQueue0.try_get(t0)), "extra object in output queue checkTupleQueue0");
1726                 CHECK_MESSAGE( (!checkTupleQueue1.try_get(t0)), "extra object in output queue checkTupleQueue1");
1727             }
1728         } // for ( int loop ...
1729     } // Check
1730 }
1731 
1732 template<typename Policy> struct policy_name {};
1733 
1734 template<> struct policy_name<tbb::flow::queueing> {
1735 const char* msg_beg() { return "queueing\n";}
1736 const char* msg_end() { return "test queueing extract\n";}
1737 };
1738 
1739 template<> struct policy_name<tbb::flow::reserving> {
1740 const char* msg_beg() { return "reserving\n";}
1741 const char* msg_end() { return "test reserving extract\n";}
1742 };
1743 
1744 template<> struct policy_name<tbb::flow::tag_matching> {
1745 const char* msg_beg() { return "tag_matching\n";}
1746 const char* msg_end() { return "test tag_matching extract\n";}
1747 };
1748 
1749 template<typename Policy>
1750 void test_main() {
1751     test_input_port_policies<Policy>();
1752     for(int p = 0; p < 2; ++p) {
1753         INFO(policy_name<Policy>().msg_beg());
1754         generate_test<serial_test, std::tuple<threebyte, double>, Policy>::do_test();
1755 #if MAX_TUPLE_TEST_SIZE >= 4
1756         {
1757             Checker<CheckType<int> > my_check;
1758             generate_test<serial_test, std::tuple<float, double, CheckType<int>, long>, Policy>::do_test();
1759         }
1760 #endif
1761 #if MAX_TUPLE_TEST_SIZE >= 6
1762         generate_test<serial_test, std::tuple<double, double, int, long, int, short>, Policy>::do_test();
1763 #endif
1764 #if MAX_TUPLE_TEST_SIZE >= 8
1765         generate_test<serial_test, std::tuple<float, double, double, double, float, int, float, long>, Policy>::do_test();
1766 #endif
1767 #if MAX_TUPLE_TEST_SIZE >= 10
1768         generate_test<serial_test, std::tuple<float, double, int, double, double, float, long, int, float, long>, Policy>::do_test();
1769 #endif
1770         {
1771             Checker<CheckType<int> > my_check1;
1772             generate_test<parallel_test, std::tuple<float, CheckType<int> >, Policy>::do_test();
1773         }
1774 #if MAX_TUPLE_TEST_SIZE >= 3
1775         generate_test<parallel_test, std::tuple<float, int, long>, Policy>::do_test();
1776 #endif
1777 #if MAX_TUPLE_TEST_SIZE >= 5
1778         generate_test<parallel_test, std::tuple<double, double, int, int, short>, Policy>::do_test();
1779 #endif
1780 #if MAX_TUPLE_TEST_SIZE >= 7
1781         generate_test<parallel_test, std::tuple<float, int, double, float, long, float, long>, Policy>::do_test();
1782 #endif
1783 #if MAX_TUPLE_TEST_SIZE >= 9
1784         generate_test<parallel_test, std::tuple<float, double, int, double, double, long, int, float, long>, Policy>::do_test();
1785 #endif
1786     }
1787 }
1788 
1789 #endif /* tbb_test_join_node_H */
1790