xref: /oneTBB/test/tbb/test_join_node.h (revision 0a2b3987)
1 /*
2     Copyright (c) 2005-2021 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #ifndef tbb_test_join_node_H
18 #define tbb_test_join_node_H
19 
20 #if _MSC_VER
21 // Suppress "decorated name length exceeded, name was truncated" warning
22 #if __INTEL_COMPILER
23 #pragma warning( disable: 2586 )
24 #else
25 #pragma warning( disable: 4503 )
26 #endif
27 #endif
28 
29 #include "tbb/flow_graph.h"
30 
31 #include "common/test.h"
32 #include "common/utils.h"
33 #include "common/checktype.h"
34 #include "common/graph_utils.h"
35 #include "common/test_follows_and_precedes_api.h"
36 
37 #include <type_traits>
38 
39 const char *names[] = {
40     "Adam", "Bruce", "Charles", "Daniel", "Evan", "Frederich", "George", "Hiram", "Ichabod",
41     "John", "Kevin", "Leonard", "Michael", "Ned", "Olin", "Paul", "Quentin", "Ralph", "Steven",
42     "Thomas", "Ulysses", "Victor", "Walter", "Xerxes", "Yitzhak", "Zebediah", "Anne", "Bethany",
43     "Clarisse", "Dorothy", "Erin", "Fatima", "Gabrielle", "Helen", "Irene", "Jacqueline",
44     "Katherine", "Lana", "Marilyn", "Noelle", "Okiilani", "Pauline", "Querida", "Rose", "Sybil",
45     "Tatiana", "Umiko", "Victoria", "Wilma", "Xena", "Yolanda", "Zoe", "Algernon", "Benjamin",
46     "Caleb", "Dylan", "Ezra", "Felix", "Gabriel", "Henry", "Issac", "Jasper", "Keifer",
47     "Lincoln", "Milo", "Nathaniel", "Owen", "Peter", "Quincy", "Ronan", "Silas", "Theodore",
48     "Uriah", "Vincent", "Wilbur", "Xavier", "Yoda", "Zachary", "Amelia", "Brielle", "Charlotte",
49     "Daphne", "Emma", "Fiona", "Grace", "Hazel", "Isla", "Juliet", "Keira", "Lily", "Mia",
50     "Nora", "Olivia", "Penelope", "Quintana", "Ruby", "Sophia", "Tessa", "Ursula", "Violet",
51     "Willow", "Xanthe", "Yvonne", "ZsaZsa", "Asher", "Bennett", "Connor", "Dominic", "Ethan",
52     "Finn", "Grayson", "Hudson", "Ian", "Jackson", "Kent", "Liam", "Matthew", "Noah", "Oliver",
53     "Parker", "Quinn", "Rhys", "Sebastian", "Taylor", "Umberto", "Vito", "William", "Xanto",
54     "Yogi", "Zane", "Ava", "Brenda", "Chloe", "Delilah", "Ella", "Felicity", "Genevieve",
55     "Hannah", "Isabella", "Josephine", "Kacie", "Lucy", "Madeline", "Natalie", "Octavia",
56     "Piper", "Qismah", "Rosalie", "Scarlett", "Tanya", "Uta", "Vivian", "Wendy", "Xola",
57     "Yaritza", "Zanthe"};
58 
59 static const int NameCnt = sizeof(names)/sizeof(char *);
60 
61 template<typename K>
62 struct index_to_key {
63     K operator()(const int indx) {
64         return (K)(3*indx+1);
65     }
66 };
67 
68 template<>
69 struct index_to_key<std::string> {
70     std::string operator()(const int indx) {
71         return std::string(names[indx % NameCnt]);
72     }
73 };
74 
75 template<typename K>
76 struct K_deref {
77     typedef K type;
78 };
79 
80 template<typename K>
81 struct K_deref<K&> {
82     typedef K type;
83 };
84 
85 template<typename K, typename V>
86 struct MyKeyFirst {
87     K my_key;
88     V my_value;
89     MyKeyFirst(int i = 0, int v = 0): my_key(index_to_key<K>()(i)), my_value((V)v) {
90     }
91     void print_val() const {
92         INFO("MyKeyFirst{"); print_my_value(my_key); INFO(","); print_my_value(my_value); INFO("}");
93     }
94     operator int() const { return (int)my_value; }
95 };
96 
97 template<typename K, typename V>
98 struct MyKeySecond {
99     V my_value;
100     K my_key;
101     MyKeySecond(int i = 0, int v = 0): my_value((V)v), my_key(index_to_key<K>()(i)) {
102     }
103     void print_val() const {
104         INFO("MyKeySecond{"); print_my_value(my_key); INFO(","); print_my_value(my_value); INFO("}");
105     }
106     operator int() const { return (int)my_value; }
107 };
108 
109 template<typename K, typename V>
110 struct MyMessageKeyWithoutKey {
111     V my_value;
112     K my_message_key;
113     MyMessageKeyWithoutKey(int i = 0, int v = 0): my_value((V)v), my_message_key(index_to_key<K>()(i)) {
114     }
115     void print_val() const {
116         INFO("MyMessageKeyWithoutKey{"); print_my_value(my_message_key); INFO(","); print_my_value(my_value); INFO("}");
117     }
118     operator int() const { return (int)my_value; }
119     const K& key() const {
120         return my_message_key;
121     }
122 };
123 
124 template<typename K, typename V>
125 struct MyMessageKeyWithBrokenKey {
126     V my_value;
127     K my_key;
128     K my_message_key;
129     MyMessageKeyWithBrokenKey(int i = 0, int v = 0): my_value((V)v), my_key(), my_message_key(index_to_key<K>()(i)) {
130     }
131     void print_val() const {
132         INFO("MyMessageKeyWithBrokenKey{"); print_my_value(my_message_key); INFO(","); print_my_value(my_value); INFO("}");
133     }
134     operator int() const { return (int)my_value; }
135     const K& key() const {
136         return my_message_key;
137     }
138 
139 };
140 
141 template<typename K, typename V>
142 struct MyKeyWithBrokenMessageKey {
143     V my_value;
144     K my_key;
145     MyKeyWithBrokenMessageKey(int i = 0, int v = 0): my_value((V)v), my_key(index_to_key<K>()(i)) {
146     }
147     void print_val() const {
148         INFO("MyKeyWithBrokenMessageKey{"); print_my_value(my_key); INFO(","); print_my_value(my_value); INFO("}");
149     }
150     operator int() const { return (int)my_value; }
151     K key() const {
152         CHECK_MESSAGE( (false), "The method should never be called");
153         return K();
154     }
155 };
156 
157 template<typename K, typename V>
158 struct MyMessageKeyWithoutKeyMethod {
159     V my_value;
160     K my_message_key;
161     MyMessageKeyWithoutKeyMethod(int i = 0, int v = 0): my_value((V)v), my_message_key(index_to_key<K>()(i)) {
162     }
163     void print_val() const {
164         INFO("MyMessageKeyWithoutKeyMethod{"); print_my_value(my_message_key); INFO(","); print_my_value(my_value); INFO("}");
165     }
166     operator int() const { return (int)my_value; }
167     //K key() const; // Do not define
168 };
169 
170 // Overload for MyMessageKeyWithoutKeyMethod
171 template <typename K, typename V>
172 K key_from_message(const MyMessageKeyWithoutKeyMethod<typename std::decay<K>::type, V> &m) {
173     return m.my_message_key;
174 }
175 
176 
177 // pattern for creating values in the tag_matching and key_matching, given an integer and the index in the tuple
178 template<typename TT, size_t INDEX>
179 struct make_thingie {
180     TT operator()(int const &i) {
181         return TT(i * (INDEX+1));
182     }
183 };
184 
185 template<template <typename, typename> class T, typename K, typename V, size_t INDEX>
186 struct make_thingie<T<K, V>, INDEX> {
187     T<K, V> operator()(int const &i) {
188         return T<K, V>(i, i*(INDEX+1));
189     }
190 };
191 
192 // cast_from<T>::my_int_val(i);
193 template<typename T>
194 struct cast_from {
195     static int my_int_val(T const &i) { return (int)i; }
196 };
197 
198 template<typename K, typename V>
199 struct cast_from<MyKeyFirst<K, V> > {
200     static int my_int_val(MyKeyFirst<K, V> const &i) { return (int)(i.my_value); }
201 };
202 
203 template<typename K, typename V>
204 struct cast_from<MyKeySecond<K, V> > {
205     static int my_int_val(MyKeySecond<K, V> const &i) { return (int)(i.my_value); }
206 };
207 
208 template<typename T>
209 void print_my_value(T const &i) {
210     INFO(" " << cast_from<T>::my_int_val(i) << " " );
211 }
212 
213 template<typename K, typename V>
214 void print_my_value(MyKeyFirst<K, V> const &i) {
215     i.print_val();
216 }
217 
218 template<typename K, typename V>
219 void print_my_value(MyKeySecond<K, V> const &i) {
220     i.print_val();
221 }
222 
223 template<>
224 void print_my_value(std::string const &i) {
225     INFO("\"" << i.c_str() << "\"" );
226 }
227 
228 //
229 // Tests
230 //
231 
232 //!
233 // my_struct_key == given a type V with a field named my_key of type K, will return a copy of my_key
234 template<class K, typename V>
235 struct my_struct_key {
236     K operator()(const V& mv) {
237         return mv.my_key;
238     }
239 };
240 
241 // specialization returning reference to my_key.
242 template<class K, typename V>
243 struct my_struct_key<K&, V> {
244     K& operator()(const V& mv) {
245         return const_cast<K&>(mv.my_key);
246     }
247 };
248 
249 using tbb::detail::d1::type_to_key_function_body;
250 using tbb::detail::d1::hash_buffer;
251 using tbb::detail::d1::tbb_hash_compare;
252 using tbb::detail::d1::type_to_key_function_body_leaf;
253 
254 template<class K, class V> struct VtoKFB {
255     typedef type_to_key_function_body<V, K> type;
256 };
257 
258 template<typename K> struct make_hash_compare { typedef tbb_hash_compare<K> type; };
259 
260 template<typename K, class V>
261 void hash_buffer_test(const char *sname) {
262     typedef typename K_deref<K>::type KnoR;
263     hash_buffer<
264         K,
265         V,
266         typename VtoKFB<K, V>::type,
267         tbb_hash_compare<KnoR>
268     > my_hash_buffer;
269     const bool k_is_ref = std::is_reference<K>::value;
270     typedef type_to_key_function_body_leaf<
271         V, K, my_struct_key<K, V> > my_func_body_type;
272     typename VtoKFB<K, V>::type *kp = new my_func_body_type(my_struct_key<K, V>());
273     my_hash_buffer.set_key_func(kp);
274     INFO("Running hash_buffer test on " << sname << "; is ref == " << (k_is_ref ? "true" : "false") << "\n" );
275     V mv1, mv0;
276     bool res;
277     for(int cnt = 0; cnt < 2; ++cnt) {
278         // insert 50 items after checking they are not already in the table
279         for(int i = 0; i < 50; ++i) {
280             KnoR kk = index_to_key<KnoR>()(i);
281             mv1.my_key = kk;
282             mv1.my_value = 0.5*i;
283             res = my_hash_buffer.find_with_key(kk, mv0);
284             CHECK_MESSAGE( (!res), "Found non-inserted item");
285             res = my_hash_buffer.insert_with_key(mv1);
286             CHECK_MESSAGE( (res), "insert failed");
287             res = my_hash_buffer.find_with_key(kk, mv0);
288             CHECK_MESSAGE( (res), "not found after insert");
289             CHECK_MESSAGE( (mv0.my_value==mv1.my_value), "result not correct");
290         }
291         // go backwards checking they are still there.
292         for(int i = 49; i>=0; --i) {
293             KnoR kk = index_to_key<KnoR>()(i);
294             double value = 0.5*i;
295             res = my_hash_buffer.find_with_key(kk, mv0);
296             CHECK_MESSAGE( (res), "find failed");
297             CHECK_MESSAGE( (mv0.my_value==value), "result not correct");
298         }
299         // delete every third item, check they are gone
300         for(int i = 0; i < 50; i += 3) {
301             KnoR kk = index_to_key<KnoR>()(i);
302             my_hash_buffer.delete_with_key(kk);
303             res = my_hash_buffer.find_with_key(kk, mv0);
304             CHECK_MESSAGE( (!res), "Found deleted item");
305         }
306         // check the deleted items are gone, the non-deleted items are there.
307         for(int i = 0; i < 50; ++i) {
308             KnoR kk = index_to_key<KnoR>()(i);
309             double value = 0.5*i;
310             if(i%3==0) {
311                 res = my_hash_buffer.find_with_key(kk, mv0);
312                 CHECK_MESSAGE( (!res), "found an item that was previously deleted");
313             }
314             else {
315                 res = my_hash_buffer.find_with_key(kk, mv0);
316                 CHECK_MESSAGE( (res), "find failed");
317                 CHECK_MESSAGE( (mv0.my_value==value), "result not correct");
318             }
319         }
320         // insert new items, check the deleted items return true, the non-deleted items return false.
321         for(int i = 0; i < 50; ++i) {
322             KnoR kk = index_to_key<KnoR>()(i);
323             double value = 1.5*i;
324             mv1.my_key = kk;
325             mv1.my_value = value;
326             res = my_hash_buffer.insert_with_key(mv1);
327             if(i%3==0) {
328                 CHECK_MESSAGE( (res), "didn't insert in empty slot");
329             }
330             else {
331                 CHECK_MESSAGE( (!res), "slot was empty on insert");
332             }
333         }
334         // delete all items
335         for(int i = 0; i < 50; ++i) {
336             KnoR kk = index_to_key<KnoR>()(i);
337             my_hash_buffer.delete_with_key(kk);
338             res = my_hash_buffer.find_with_key(kk, mv0);
339             CHECK_MESSAGE( (!res), "Found deleted item");
340         }
341     }  // perform tasks twice
342 }
343 
344 void
345 TestTaggedBuffers() {
346     hash_buffer_test<int, MyKeyFirst<int, double> >("MyKeyFirst<int,double>");
347     hash_buffer_test<int&, MyKeyFirst<int, double> >("MyKeyFirst<int,double> with int&");
348     hash_buffer_test<int, MyKeySecond<int, double> >("MyKeySecond<int,double>");
349 
350     hash_buffer_test<std::string, MyKeyFirst<std::string, double> >("MyKeyFirst<std::string,double>");
351     hash_buffer_test<std::string&, MyKeySecond<std::string, double> >("MyKeySecond<std::string,double> with std::string&");
352 }
353 
354 struct threebyte {
355     unsigned char b1;
356     unsigned char b2;
357     unsigned char b3;
358     threebyte(int i = 0) {
359         b1 = (unsigned char)(i&0xFF);
360         b2 = (unsigned char)((i>>8)&0xFF);
361         b3 = (unsigned char)((i>>16)&0xFF);
362     }
363     operator int() const { return (int)(b1+(b2<<8)+(b3<<16)); }
364 };
365 
366 const int Count = 150;
367 
368 const int Recirc_count = 1000;  // number of tuples to be generated
369 const int MaxPorts = 10;
370 const int MaxNInputs = 5; // max # of input_nodes to register for each join_node input in parallel test
371 bool outputCheck[MaxPorts][Count];  // for checking output
372 
373 void
374 check_outputCheck(int nUsed, int maxCnt) {
375     for(int i = 0; i < nUsed; ++i) {
376         for(int j = 0; j < maxCnt; ++j) {
377             CHECK_MESSAGE(outputCheck[i][j], "");
378         }
379     }
380 }
381 
382 void
383 reset_outputCheck(int nUsed, int maxCnt) {
384     for(int i = 0; i < nUsed; ++i) {
385         for(int j = 0; j < maxCnt; ++j) {
386             outputCheck[i][j] = false;
387         }
388     }
389 }
390 
391 template<typename T>
392 class name_of {
393 public:
394     static const char* name() { return  "Unknown"; }
395 };
396 template<typename T>
397 class name_of<CheckType<T> > {
398 public:
399     static const char* name() { return "checktype"; }
400 };
401 template<>
402 class name_of<int> {
403 public:
404     static const char* name() { return  "int"; }
405 };
406 template<>
407 class name_of<float> {
408 public:
409     static const char* name() { return  "float"; }
410 };
411 template<>
412 class name_of<double> {
413 public:
414     static const char* name() { return  "double"; }
415 };
416 template<>
417 class name_of<long> {
418 public:
419     static const char* name() { return  "long"; }
420 };
421 template<>
422 class name_of<short> {
423 public:
424     static const char* name() { return  "short"; }
425 };
426 template<>
427 class name_of<threebyte> {
428 public:
429     static const char* name() { return "threebyte"; }
430 };
431 template<>
432 class name_of<std::string> {
433 public:
434     static const char* name() { return "std::string"; }
435 };
436 template<typename K, typename V>
437 class name_of<MyKeyFirst<K, V> > {
438 public:
439     static const char* name() { return "MyKeyFirst<K,V>"; }
440 };
441 template<typename K, typename V>
442 class name_of<MyKeySecond<K, V> > {
443 public:
444     static const char* name() { return "MyKeySecond<K,V>"; }
445 };
446 
447 // The additional policy to differ message based key matching from usual key matching.
448 // It only makes sense for the test because join_node is created with the key_matching policy for the both cases.
449 template <typename K, typename KHash = tbb_hash_compare<typename std::decay<K>::type > >
450 struct message_based_key_matching {};
451 
452 // test for key_matching
453 template<class JP>
454 struct is_key_matching_join {
455     static const bool value;
456     typedef int key_type;  // have to define it to something
457 };
458 
459 template<class JP>
460 const bool is_key_matching_join<JP>::value = false;
461 
462 template<class K, class KHash>
463 struct is_key_matching_join<tbb::flow::key_matching<K, KHash> > {
464     static const bool value;
465     typedef K key_type;
466 };
467 
468 template<class K, class KHash>
469 const bool is_key_matching_join<tbb::flow::key_matching<K, KHash> >::value = true;
470 
471 template<class K, class KHash>
472 struct is_key_matching_join<message_based_key_matching<K, KHash> > {
473     static const bool value;
474     typedef K key_type;
475 };
476 
477 template<class K, class KHash>
478 const bool is_key_matching_join<message_based_key_matching<K, KHash> >::value = true;
479 
480 // for recirculating tags, input is tuple<index,continue_msg>
481 // output is index*my_mult cast to the right type
482 template<typename TT>
483 class recirc_func_body {
484     TT my_mult;
485 public:
486     typedef std::tuple<int, tbb::flow::continue_msg> input_type;
487     recirc_func_body(TT multiplier): my_mult(multiplier) {}
488     recirc_func_body(const recirc_func_body &other): my_mult(other.my_mult) { }
489     void operator=(const recirc_func_body &other) { my_mult = other.my_mult; }
490     TT operator()(const input_type &v) {
491         return TT(std::get<0>(v)) * my_mult;
492     }
493 };
494 
495 static int input_count;  // input_nodes are serial
496 
497 // emit input_count continue_msg
498 class recirc_input_node_body {
499 public:
500     tbb::flow::continue_msg operator()(tbb::flow_control &fc) {
501         if( --input_count < 0 ){
502             fc.stop();
503         }
504         return tbb::flow::continue_msg();
505     }
506 };
507 
508 // T must be arithmetic, and shouldn't wrap around for reasonable sizes of Count (which is now 150, and maxPorts is 10,
509 // so the max number generated right now is 1500 or so.)  Input will generate a series of TT with value
510 // (init_val + (i-1)*addend) * my_mult, where i is the i-th invocation of the body.  We are attaching addend
511 // input nodes to a join_port, and each will generate part of the numerical series the port is expecting
512 // to receive.  If there is only one input node, the series order will be maintained; if more than one,
513 // this is not guaranteed.
514 template<typename TT, size_t INDEX>
515 class my_input_body {
516     int my_count;
517     int addend;
518 public:
519     my_input_body(int init_val, int addto): my_count(init_val), addend(addto) { }
520     TT operator()(tbb::flow_control& fc) {
521         int lc = my_count;
522         TT ret = make_thingie<TT, INDEX>()(my_count);
523         my_count += addend;
524         if ( lc < Count){
525             return ret;
526         }else{
527             fc.stop();
528             return TT();
529         }
530     }
531 };
532 
533 template<typename TT>
534 class tag_func {
535     TT my_mult;
536 public:
537     tag_func(TT multiplier): my_mult(multiplier) { }
538     // operator() will return [0 .. Count)
539     tbb::flow::tag_value operator()(TT v) {
540         tbb::flow::tag_value t = tbb::flow::tag_value(v/my_mult);
541         return t;
542     }
543 };
544 
545 template <class JP>
546 struct filter_out_message_based_key_matching {
547     typedef JP policy;
548 };
549 
550 template <typename K, typename KHash>
551 struct filter_out_message_based_key_matching<message_based_key_matching<K, KHash> > {
552     // To have message based key matching in join_node, the key_matchig policy should be specified.
553     typedef tbb::flow::key_matching<K, KHash> policy;
554 };
555 
556 // allocator for join_node.  This is specialized for tag_matching and key_matching joins because they require a variable number
557 // of tag_value methods passed to the constructor
558 
559 template<int N, typename JType, class JP>
560 class makeJoin {
561 public:
562     static JType *create(tbb::flow::graph& g) {
563         JType *temp = new JType(g);
564         return temp;
565     }
566     static void destroy(JType *p) { delete p; }
567 };
568 
569 // for general key_matching case, each type in the tuple is a class that has the my_key field and the my_value field.
570 //
571 template<typename JType, typename K, typename KHash>
572 class makeJoin<2, JType, tbb::flow::key_matching<K, KHash> > {
573     typedef typename JType::output_type TType;
574     typedef typename std::tuple_element<0, TType>::type T0;
575     typedef typename std::tuple_element<1, TType>::type T1;
576 public:
577     static JType *create(tbb::flow::graph& g) {
578         JType *temp = new JType(g,
579             my_struct_key<K, T0>(),
580             my_struct_key<K, T1>()
581         );
582         return temp;
583     }
584     static void destroy(JType *p) { delete p; }
585 };
586 
587 template<typename JType>
588 class makeJoin<2, JType, tbb::flow::tag_matching> {
589     typedef typename JType::output_type TType;
590     typedef typename std::tuple_element<0, TType>::type T0;
591     typedef typename std::tuple_element<1, TType>::type T1;
592 public:
593     static JType *create(tbb::flow::graph& g) {
594         JType *temp = new JType(g,
595             tag_func<T0>(T0(2)),
596             tag_func<T1>(T1(3))
597         );
598         return temp;
599     }
600     static void destroy(JType *p) { delete p; }
601 };
602 
603 #if MAX_TUPLE_TEST_SIZE >= 3
604 template<typename JType, typename K, typename KHash>
605 class makeJoin<3, JType, tbb::flow::key_matching<K, KHash> > {
606     typedef typename JType::output_type TType;
607     typedef typename std::tuple_element<0, TType>::type T0;
608     typedef typename std::tuple_element<1, TType>::type T1;
609     typedef typename std::tuple_element<2, TType>::type T2;
610 public:
611     static JType *create(tbb::flow::graph& g) {
612         JType *temp = new JType(g,
613             my_struct_key<K, T0>(),
614             my_struct_key<K, T1>(),
615             my_struct_key<K, T2>()
616         );
617         return temp;
618     }
619     static void destroy(JType *p) { delete p; }
620 };
621 
622 template<typename JType>
623 class makeJoin<3, JType, tbb::flow::tag_matching> {
624     typedef typename JType::output_type TType;
625     typedef typename std::tuple_element<0, TType>::type T0;
626     typedef typename std::tuple_element<1, TType>::type T1;
627     typedef typename std::tuple_element<2, TType>::type T2;
628 public:
629     static JType *create(tbb::flow::graph& g) {
630         JType *temp = new JType(g,
631             tag_func<T0>(T0(2)),
632             tag_func<T1>(T1(3)),
633             tag_func<T2>(T2(4))
634         );
635         return temp;
636     }
637     static void destroy(JType *p) { delete p; }
638 };
639 
640 #endif
641 #if MAX_TUPLE_TEST_SIZE >= 4
642 
643 template<typename JType, typename K, typename KHash>
644 class makeJoin<4, JType, tbb::flow::key_matching<K, KHash> > {
645     typedef typename JType::output_type TType;
646     typedef typename std::tuple_element<0, TType>::type T0;
647     typedef typename std::tuple_element<1, TType>::type T1;
648     typedef typename std::tuple_element<2, TType>::type T2;
649     typedef typename std::tuple_element<3, TType>::type T3;
650 public:
651     static JType *create(tbb::flow::graph& g) {
652         JType *temp = new JType(g,
653             my_struct_key<K, T0>(),
654             my_struct_key<K, T1>(),
655             my_struct_key<K, T2>(),
656             my_struct_key<K, T3>()
657         );
658         return temp;
659     }
660     static void destroy(JType *p) { delete p; }
661 };
662 
663 template<typename JType>
664 class makeJoin<4, JType, tbb::flow::tag_matching> {
665     typedef typename JType::output_type TType;
666     typedef typename std::tuple_element<0, TType>::type T0;
667     typedef typename std::tuple_element<1, TType>::type T1;
668     typedef typename std::tuple_element<2, TType>::type T2;
669     typedef typename std::tuple_element<3, TType>::type T3;
670 public:
671     static JType *create(tbb::flow::graph& g) {
672         JType *temp = new JType(g,
673             tag_func<T0>(T0(2)),
674             tag_func<T1>(T1(3)),
675             tag_func<T2>(T2(4)),
676             tag_func<T3>(T3(5))
677         );
678         return temp;
679     }
680     static void destroy(JType *p) { delete p; }
681 };
682 
683 #endif
684 #if MAX_TUPLE_TEST_SIZE >= 5
685 template<typename JType, typename K, typename KHash>
686 class makeJoin<5, JType, tbb::flow::key_matching<K, KHash> > {
687     typedef typename JType::output_type TType;
688     typedef typename std::tuple_element<0, TType>::type T0;
689     typedef typename std::tuple_element<1, TType>::type T1;
690     typedef typename std::tuple_element<2, TType>::type T2;
691     typedef typename std::tuple_element<3, TType>::type T3;
692     typedef typename std::tuple_element<4, TType>::type T4;
693 public:
694     static JType *create(tbb::flow::graph& g) {
695         JType *temp = new JType(g,
696             my_struct_key<K, T0>(),
697             my_struct_key<K, T1>(),
698             my_struct_key<K, T2>(),
699             my_struct_key<K, T3>(),
700             my_struct_key<K, T4>()
701         );
702         return temp;
703     }
704     static void destroy(JType *p) { delete p; }
705 };
706 
707 template<typename JType>
708 class makeJoin<5, JType, tbb::flow::tag_matching> {
709     typedef typename JType::output_type TType;
710     typedef typename std::tuple_element<0, TType>::type T0;
711     typedef typename std::tuple_element<1, TType>::type T1;
712     typedef typename std::tuple_element<2, TType>::type T2;
713     typedef typename std::tuple_element<3, TType>::type T3;
714     typedef typename std::tuple_element<4, TType>::type T4;
715 public:
716     static JType *create(tbb::flow::graph& g) {
717         JType *temp = new JType(g,
718             tag_func<T0>(T0(2)),
719             tag_func<T1>(T1(3)),
720             tag_func<T2>(T2(4)),
721             tag_func<T3>(T3(5)),
722             tag_func<T4>(T4(6))
723         );
724         return temp;
725     }
726     static void destroy(JType *p) { delete p; }
727 };
728 #endif
729 #if MAX_TUPLE_TEST_SIZE >= 6
730 template<typename JType, typename K, typename KHash>
731 class makeJoin<6, JType, tbb::flow::key_matching<K, KHash> > {
732     typedef typename JType::output_type TType;
733     typedef typename std::tuple_element<0, TType>::type T0;
734     typedef typename std::tuple_element<1, TType>::type T1;
735     typedef typename std::tuple_element<2, TType>::type T2;
736     typedef typename std::tuple_element<3, TType>::type T3;
737     typedef typename std::tuple_element<4, TType>::type T4;
738     typedef typename std::tuple_element<5, TType>::type T5;
739 public:
740     static JType *create(tbb::flow::graph& g) {
741         JType *temp = new JType(g,
742             my_struct_key<K, T0>(),
743             my_struct_key<K, T1>(),
744             my_struct_key<K, T2>(),
745             my_struct_key<K, T3>(),
746             my_struct_key<K, T4>(),
747             my_struct_key<K, T5>()
748         );
749         return temp;
750     }
751     static void destroy(JType *p) { delete p; }
752 };
753 
754 template<typename JType>
755 class makeJoin<6, JType, tbb::flow::tag_matching> {
756     typedef typename JType::output_type TType;
757     typedef typename std::tuple_element<0, TType>::type T0;
758     typedef typename std::tuple_element<1, TType>::type T1;
759     typedef typename std::tuple_element<2, TType>::type T2;
760     typedef typename std::tuple_element<3, TType>::type T3;
761     typedef typename std::tuple_element<4, TType>::type T4;
762     typedef typename std::tuple_element<5, TType>::type T5;
763 public:
764     static JType *create(tbb::flow::graph& g) {
765         JType *temp = new JType(g,
766             tag_func<T0>(T0(2)),
767             tag_func<T1>(T1(3)),
768             tag_func<T2>(T2(4)),
769             tag_func<T3>(T3(5)),
770             tag_func<T4>(T4(6)),
771             tag_func<T5>(T5(7))
772         );
773         return temp;
774     }
775     static void destroy(JType *p) { delete p; }
776 };
777 #endif
778 
779 #if MAX_TUPLE_TEST_SIZE >= 7
780 template<typename JType, typename K, typename KHash>
781 class makeJoin<7, JType, tbb::flow::key_matching<K, KHash> > {
782     typedef typename JType::output_type TType;
783     typedef typename std::tuple_element<0, TType>::type T0;
784     typedef typename std::tuple_element<1, TType>::type T1;
785     typedef typename std::tuple_element<2, TType>::type T2;
786     typedef typename std::tuple_element<3, TType>::type T3;
787     typedef typename std::tuple_element<4, TType>::type T4;
788     typedef typename std::tuple_element<5, TType>::type T5;
789     typedef typename std::tuple_element<6, TType>::type T6;
790 public:
791     static JType *create(tbb::flow::graph& g) {
792         JType *temp = new JType(g,
793             my_struct_key<K, T0>(),
794             my_struct_key<K, T1>(),
795             my_struct_key<K, T2>(),
796             my_struct_key<K, T3>(),
797             my_struct_key<K, T4>(),
798             my_struct_key<K, T5>(),
799             my_struct_key<K, T6>()
800         );
801         return temp;
802     }
803     static void destroy(JType *p) { delete p; }
804 };
805 
806 template<typename JType>
807 class makeJoin<7, JType, tbb::flow::tag_matching> {
808     typedef typename JType::output_type TType;
809     typedef typename std::tuple_element<0, TType>::type T0;
810     typedef typename std::tuple_element<1, TType>::type T1;
811     typedef typename std::tuple_element<2, TType>::type T2;
812     typedef typename std::tuple_element<3, TType>::type T3;
813     typedef typename std::tuple_element<4, TType>::type T4;
814     typedef typename std::tuple_element<5, TType>::type T5;
815     typedef typename std::tuple_element<6, TType>::type T6;
816 public:
817     static JType *create(tbb::flow::graph& g) {
818         JType *temp = new JType(g,
819             tag_func<T0>(T0(2)),
820             tag_func<T1>(T1(3)),
821             tag_func<T2>(T2(4)),
822             tag_func<T3>(T3(5)),
823             tag_func<T4>(T4(6)),
824             tag_func<T5>(T5(7)),
825             tag_func<T6>(T6(8))
826         );
827         return temp;
828     }
829     static void destroy(JType *p) { delete p; }
830 };
831 #endif
832 
833 #if MAX_TUPLE_TEST_SIZE >= 8
834 template<typename JType, typename K, typename KHash>
835 class makeJoin<8, JType, tbb::flow::key_matching<K, KHash> > {
836     typedef typename JType::output_type TType;
837     typedef typename std::tuple_element<0, TType>::type T0;
838     typedef typename std::tuple_element<1, TType>::type T1;
839     typedef typename std::tuple_element<2, TType>::type T2;
840     typedef typename std::tuple_element<3, TType>::type T3;
841     typedef typename std::tuple_element<4, TType>::type T4;
842     typedef typename std::tuple_element<5, TType>::type T5;
843     typedef typename std::tuple_element<6, TType>::type T6;
844     typedef typename std::tuple_element<7, TType>::type T7;
845 public:
846     static JType *create(tbb::flow::graph& g) {
847         JType *temp = new JType(g,
848             my_struct_key<K, T0>(),
849             my_struct_key<K, T1>(),
850             my_struct_key<K, T2>(),
851             my_struct_key<K, T3>(),
852             my_struct_key<K, T4>(),
853             my_struct_key<K, T5>(),
854             my_struct_key<K, T6>(),
855             my_struct_key<K, T7>()
856         );
857         return temp;
858     }
859     static void destroy(JType *p) { delete p; }
860 };
861 
862 template<typename JType>
863 class makeJoin<8, JType, tbb::flow::tag_matching> {
864     typedef typename JType::output_type TType;
865     typedef typename std::tuple_element<0, TType>::type T0;
866     typedef typename std::tuple_element<1, TType>::type T1;
867     typedef typename std::tuple_element<2, TType>::type T2;
868     typedef typename std::tuple_element<3, TType>::type T3;
869     typedef typename std::tuple_element<4, TType>::type T4;
870     typedef typename std::tuple_element<5, TType>::type T5;
871     typedef typename std::tuple_element<6, TType>::type T6;
872     typedef typename std::tuple_element<7, TType>::type T7;
873 public:
874     static JType *create(tbb::flow::graph& g) {
875         JType *temp = new JType(g,
876             tag_func<T0>(T0(2)),
877             tag_func<T1>(T1(3)),
878             tag_func<T2>(T2(4)),
879             tag_func<T3>(T3(5)),
880             tag_func<T4>(T4(6)),
881             tag_func<T5>(T5(7)),
882             tag_func<T6>(T6(8)),
883             tag_func<T7>(T7(9))
884         );
885         return temp;
886     }
887     static void destroy(JType *p) { delete p; }
888 };
889 #endif
890 
891 #if MAX_TUPLE_TEST_SIZE >= 9
892 template<typename JType, typename K, typename KHash>
893 class makeJoin<9, JType, tbb::flow::key_matching<K, KHash> > {
894     typedef typename JType::output_type TType;
895     typedef typename std::tuple_element<0, TType>::type T0;
896     typedef typename std::tuple_element<1, TType>::type T1;
897     typedef typename std::tuple_element<2, TType>::type T2;
898     typedef typename std::tuple_element<3, TType>::type T3;
899     typedef typename std::tuple_element<4, TType>::type T4;
900     typedef typename std::tuple_element<5, TType>::type T5;
901     typedef typename std::tuple_element<6, TType>::type T6;
902     typedef typename std::tuple_element<7, TType>::type T7;
903     typedef typename std::tuple_element<8, TType>::type T8;
904 public:
905     static JType *create(tbb::flow::graph& g) {
906         JType *temp = new JType(g,
907             my_struct_key<K, T0>(),
908             my_struct_key<K, T1>(),
909             my_struct_key<K, T2>(),
910             my_struct_key<K, T3>(),
911             my_struct_key<K, T4>(),
912             my_struct_key<K, T5>(),
913             my_struct_key<K, T6>(),
914             my_struct_key<K, T7>(),
915             my_struct_key<K, T8>()
916         );
917         return temp;
918     }
919     static void destroy(JType *p) { delete p; }
920 };
921 
922 template<typename JType>
923 class makeJoin<9, JType, tbb::flow::tag_matching> {
924     typedef typename JType::output_type TType;
925     typedef typename std::tuple_element<0, TType>::type T0;
926     typedef typename std::tuple_element<1, TType>::type T1;
927     typedef typename std::tuple_element<2, TType>::type T2;
928     typedef typename std::tuple_element<3, TType>::type T3;
929     typedef typename std::tuple_element<4, TType>::type T4;
930     typedef typename std::tuple_element<5, TType>::type T5;
931     typedef typename std::tuple_element<6, TType>::type T6;
932     typedef typename std::tuple_element<7, TType>::type T7;
933     typedef typename std::tuple_element<8, TType>::type T8;
934 public:
935     static JType *create(tbb::flow::graph& g) {
936         JType *temp = new JType(g,
937             tag_func<T0>(T0(2)),
938             tag_func<T1>(T1(3)),
939             tag_func<T2>(T2(4)),
940             tag_func<T3>(T3(5)),
941             tag_func<T4>(T4(6)),
942             tag_func<T5>(T5(7)),
943             tag_func<T6>(T6(8)),
944             tag_func<T7>(T7(9)),
945             tag_func<T8>(T8(10))
946         );
947         return temp;
948     }
949     static void destroy(JType *p) { delete p; }
950 };
951 #endif
952 
953 #if MAX_TUPLE_TEST_SIZE >= 10
954 template<typename JType, typename K, typename KHash>
955 class makeJoin<10, JType, tbb::flow::key_matching<K, KHash> > {
956     typedef typename JType::output_type TType;
957     typedef typename std::tuple_element<0, TType>::type T0;
958     typedef typename std::tuple_element<1, TType>::type T1;
959     typedef typename std::tuple_element<2, TType>::type T2;
960     typedef typename std::tuple_element<3, TType>::type T3;
961     typedef typename std::tuple_element<4, TType>::type T4;
962     typedef typename std::tuple_element<5, TType>::type T5;
963     typedef typename std::tuple_element<6, TType>::type T6;
964     typedef typename std::tuple_element<7, TType>::type T7;
965     typedef typename std::tuple_element<8, TType>::type T8;
966     typedef typename std::tuple_element<9, TType>::type T9;
967 public:
968     static JType *create(tbb::flow::graph& g) {
969         JType *temp = new JType(g,
970             my_struct_key<K, T0>(),
971             my_struct_key<K, T1>(),
972             my_struct_key<K, T2>(),
973             my_struct_key<K, T3>(),
974             my_struct_key<K, T4>(),
975             my_struct_key<K, T5>(),
976             my_struct_key<K, T6>(),
977             my_struct_key<K, T7>(),
978             my_struct_key<K, T8>(),
979             my_struct_key<K, T9>()
980         );
981         return temp;
982     }
983     static void destroy(JType *p) { delete p; }
984 };
985 
986 template<typename JType>
987 class makeJoin<10, JType, tbb::flow::tag_matching> {
988     typedef typename JType::output_type TType;
989     typedef typename std::tuple_element<0, TType>::type T0;
990     typedef typename std::tuple_element<1, TType>::type T1;
991     typedef typename std::tuple_element<2, TType>::type T2;
992     typedef typename std::tuple_element<3, TType>::type T3;
993     typedef typename std::tuple_element<4, TType>::type T4;
994     typedef typename std::tuple_element<5, TType>::type T5;
995     typedef typename std::tuple_element<6, TType>::type T6;
996     typedef typename std::tuple_element<7, TType>::type T7;
997     typedef typename std::tuple_element<8, TType>::type T8;
998     typedef typename std::tuple_element<9, TType>::type T9;
999 public:
1000     static JType *create(tbb::flow::graph& g) {
1001         JType *temp = new JType(g,
1002             tag_func<T0>(T0(2)),
1003             tag_func<T1>(T1(3)),
1004             tag_func<T2>(T2(4)),
1005             tag_func<T3>(T3(5)),
1006             tag_func<T4>(T4(6)),
1007             tag_func<T5>(T5(7)),
1008             tag_func<T6>(T6(8)),
1009             tag_func<T7>(T7(9)),
1010             tag_func<T8>(T8(10)),
1011             tag_func<T9>(T9(11))
1012         );
1013         return temp;
1014     }
1015     static void destroy(JType *p) { delete p; }
1016 };
1017 #endif
1018 
1019 // holder for input_node pointers for eventual deletion
1020 
1021 static void* all_input_nodes[MaxPorts][MaxNInputs];
1022 
1023 template<int ELEM, typename JNT>
1024 class input_node_helper {
1025 public:
1026     typedef JNT join_node_type;
1027     typedef tbb::flow::join_node<std::tuple<int, tbb::flow::continue_msg>, tbb::flow::reserving> input_join_type;
1028     typedef typename join_node_type::output_type TT;
1029 
1030     typedef typename std::tuple_element<ELEM-1, TT>::type IT;
1031     typedef typename tbb::flow::input_node<IT> my_input_node_type;
1032     typedef typename tbb::flow::function_node<std::tuple<int, tbb::flow::continue_msg>, IT> my_recirc_function_type;
1033     static void print_remark(const char * str) {
1034         input_node_helper<ELEM-1, JNT>::print_remark(str);
1035         INFO(", " << name_of<IT>::name());
1036     }
1037     static void add_input_nodes(join_node_type &my_join, tbb::flow::graph &g, int nInputs) {
1038         for(int i = 0; i < nInputs; ++i) {
1039             my_input_node_type *new_node = new my_input_node_type(g, my_input_body<IT, ELEM>(i, nInputs));
1040             tbb::flow::make_edge(*new_node, tbb::flow::input_port<ELEM-1>(my_join));
1041             all_input_nodes[ELEM-1][i] = (void *)new_node;
1042             new_node->activate();
1043         }
1044         // add the next input_node
1045         input_node_helper<ELEM-1, JNT>::add_input_nodes(my_join, g, nInputs);
1046     }
1047 
1048     static void add_recirc_func_nodes(join_node_type &my_join, input_join_type &my_input, tbb::flow::graph &g) {
1049         my_recirc_function_type *new_node = new my_recirc_function_type(g, tbb::flow::unlimited, recirc_func_body<IT>((IT)(ELEM+1)));
1050         tbb::flow::make_edge(*new_node, tbb::flow::input_port<ELEM-1>(my_join));
1051         tbb::flow::make_edge(my_input, *new_node);
1052         all_input_nodes[ELEM-1][0] = (void *)new_node;
1053         input_node_helper<ELEM-1, JNT>::add_recirc_func_nodes(my_join, my_input, g);
1054     }
1055 
1056     static void only_check_value(const int i, const TT &v) {
1057         CHECK_MESSAGE(std::get<ELEM-1>(v)==(IT)(i*(ELEM+1)), "");
1058         input_node_helper<ELEM-1, JNT>::only_check_value(i, v);
1059     }
1060 
1061     static void check_value(int i, TT &v, bool is_serial) {
1062         // the fetched value will match only if there is only one input_node.
1063         bool is_correct = !is_serial||std::get<ELEM-1>(v)==(IT)(i*(ELEM+1));
1064         CHECK_MESSAGE(is_correct, "");
1065         // tally the fetched value.
1066         int ival = (int)std::get<ELEM-1>(v);
1067         CHECK_MESSAGE(!(ival%(ELEM+1)), "");
1068         ival /= (ELEM+1);
1069         CHECK_MESSAGE(!outputCheck[ELEM-1][ival], "");
1070         outputCheck[ELEM-1][ival] = true;
1071         input_node_helper<ELEM-1, JNT>::check_value(i, v, is_serial);
1072     }
1073     static void remove_input_nodes(join_node_type& my_join, int nInputs) {
1074         for(int i = 0; i< nInputs; ++i) {
1075             my_input_node_type *dp = reinterpret_cast<my_input_node_type *>(all_input_nodes[ELEM-1][i]);
1076             tbb::flow::remove_edge(*dp, tbb::flow::input_port<ELEM-1>(my_join));
1077             delete dp;
1078         }
1079         input_node_helper<ELEM-1, JNT>::remove_input_nodes(my_join, nInputs);
1080     }
1081 
1082     static void remove_recirc_func_nodes(join_node_type& my_join, input_join_type &my_input) {
1083         my_recirc_function_type *fn = reinterpret_cast<my_recirc_function_type *>(all_input_nodes[ELEM-1][0]);
1084         tbb::flow::remove_edge(*fn, tbb::flow::input_port<ELEM-1>(my_join));
1085         tbb::flow::remove_edge(my_input, *fn);
1086         delete fn;
1087         input_node_helper<ELEM-1, JNT>::remove_recirc_func_nodes(my_join, my_input);
1088     }
1089 };
1090 
1091 template<typename JNT>
1092 class input_node_helper<1, JNT> {
1093     typedef JNT join_node_type;
1094     typedef tbb::flow::join_node<std::tuple<int, tbb::flow::continue_msg>, tbb::flow::reserving> input_join_type;
1095     typedef typename join_node_type::output_type TT;
1096 
1097     typedef typename std::tuple_element<0, TT>::type IT;
1098     typedef typename tbb::flow::input_node<IT> my_input_node_type;
1099     typedef typename tbb::flow::function_node<std::tuple<int, tbb::flow::continue_msg>, IT> my_recirc_function_type;
1100 public:
1101     static void print_remark(const char * str) {
1102         INFO(str << "< " << name_of<IT>::name());
1103     }
1104     static void add_input_nodes(join_node_type &my_join, tbb::flow::graph &g, int nInputs) {
1105         for(int i = 0; i < nInputs; ++i) {
1106             my_input_node_type *new_node = new my_input_node_type(g, my_input_body<IT, 1>(i, nInputs));
1107             tbb::flow::make_edge(*new_node, tbb::flow::input_port<0>(my_join));
1108             all_input_nodes[0][i] = (void *)new_node;
1109             new_node->activate();
1110         }
1111     }
1112 
1113     static void add_recirc_func_nodes(join_node_type &my_join, input_join_type &my_input, tbb::flow::graph &g) {
1114         my_recirc_function_type *new_node = new my_recirc_function_type(g, tbb::flow::unlimited, recirc_func_body<IT>((IT)(2)));
1115         tbb::flow::make_edge(*new_node, tbb::flow::input_port<0>(my_join));
1116         tbb::flow::make_edge(my_input, *new_node);
1117         all_input_nodes[0][0] = (void *)new_node;
1118     }
1119 
1120     static void only_check_value(const int i, const TT &v) {
1121         CHECK_MESSAGE(std::get<0>(v)==(IT)(i*2), "");
1122     }
1123 
1124     static void check_value(int i, TT &v, bool is_serial) {
1125         bool is_correct = !is_serial||std::get<0>(v)==(IT)(i*(2));
1126         CHECK_MESSAGE(is_correct, "");
1127         int ival = (int)std::get<0>(v);
1128         CHECK_MESSAGE(!(ival%2), "");
1129         ival /= 2;
1130         CHECK_MESSAGE(!outputCheck[0][ival], "");
1131         outputCheck[0][ival] = true;
1132     }
1133     static void remove_input_nodes(join_node_type& my_join, int nInputs) {
1134         for(int i = 0; i < nInputs; ++i) {
1135             my_input_node_type *dp = reinterpret_cast<my_input_node_type *>(all_input_nodes[0][i]);
1136             tbb::flow::remove_edge(*dp, tbb::flow::input_port<0>(my_join));
1137             delete dp;
1138         }
1139     }
1140 
1141     static void remove_recirc_func_nodes(join_node_type& my_join, input_join_type &my_input) {
1142         my_recirc_function_type *fn = reinterpret_cast<my_recirc_function_type *>(all_input_nodes[0][0]);
1143         tbb::flow::remove_edge(*fn, tbb::flow::input_port<0>(my_join));
1144         tbb::flow::remove_edge(my_input, *fn);
1145         delete fn;
1146     }
1147 };
1148 
1149 #if _MSC_VER && !defined(__INTEL_COMPILER)
1150 // Suppress "conditional expression is constant" warning.
1151 #pragma warning( push )
1152 #pragma warning( disable: 4127 )
1153 #endif
1154 
1155 template<typename JType, class JP>
1156 class parallel_test {
1157 public:
1158     typedef typename JType::output_type TType;
1159     typedef typename is_key_matching_join<JP>::key_type            key_type;
1160     static void test() {
1161         const int TUPLE_SIZE = std::tuple_size<TType>::value;
1162         const bool is_key_matching = is_key_matching_join<JP>::value;
1163 
1164         TType v;
1165         input_node_helper<TUPLE_SIZE, JType>::print_remark("Parallel test of join_node");
1166         INFO(" > ");
1167         if(is_key_matching) {
1168             INFO("with K == " << name_of<typename K_deref<typename is_key_matching_join<JP>::key_type>::type >::name());
1169             if(std::is_reference<typename is_key_matching_join<JP>::key_type>::value) {
1170                 INFO("&");
1171             }
1172         }
1173         INFO("\n");
1174         for(int i = 0; i < MaxPorts; ++i) {
1175             for(int j = 0; j < MaxNInputs; ++j) {
1176                 all_input_nodes[i][j] = NULL;
1177             }
1178         }
1179         for(int nInputs = 1; nInputs<=MaxNInputs; ++nInputs) {
1180             tbb::flow::graph g;
1181             bool not_out_of_order = (nInputs==1)&&(!is_key_matching);
1182             JType* my_join = makeJoin<TUPLE_SIZE, JType, JP>::create(g);
1183             tbb::flow::queue_node<TType> outq1(g);
1184             tbb::flow::queue_node<TType> outq2(g);
1185 
1186             tbb::flow::make_edge(*my_join, outq1);
1187             tbb::flow::make_edge(*my_join, outq2);
1188 
1189             input_node_helper<TUPLE_SIZE, JType>::add_input_nodes((*my_join), g, nInputs);
1190 
1191             g.wait_for_all();
1192 
1193             reset_outputCheck(TUPLE_SIZE, Count);
1194             for(int i = 0; i < Count; ++i) {
1195                 CHECK_MESSAGE(outq1.try_get(v), "");
1196                 input_node_helper<TUPLE_SIZE, JType>::check_value(i, v, not_out_of_order);
1197             }
1198 
1199             check_outputCheck(TUPLE_SIZE, Count);
1200             reset_outputCheck(TUPLE_SIZE, Count);
1201 
1202             for(int i = 0; i < Count; i++) {
1203                 CHECK_MESSAGE(outq2.try_get(v), "");;
1204                 input_node_helper<TUPLE_SIZE, JType>::check_value(i, v, not_out_of_order);
1205             }
1206             check_outputCheck(TUPLE_SIZE, Count);
1207 
1208             CHECK_MESSAGE(!outq1.try_get(v), "");
1209             CHECK_MESSAGE(!outq2.try_get(v), "");
1210 
1211             input_node_helper<TUPLE_SIZE, JType>::remove_input_nodes((*my_join), nInputs);
1212             tbb::flow::remove_edge(*my_join, outq1);
1213             tbb::flow::remove_edge(*my_join, outq2);
1214             makeJoin<TUPLE_SIZE, JType, JP>::destroy(my_join);
1215         }
1216     }
1217 };
1218 
1219 
1220 template<int ELEM, typename JType>
1221 class serial_queue_helper {
1222 public:
1223     typedef typename JType::output_type TT;
1224     typedef typename std::tuple_element<ELEM-1, TT>::type IT;
1225     typedef typename tbb::flow::queue_node<IT> my_queue_node_type;
1226     static void print_remark() {
1227         serial_queue_helper<ELEM-1, JType>::print_remark();
1228         INFO(", " << name_of<IT>::name());
1229     }
1230     static void add_queue_nodes(tbb::flow::graph &g, JType &my_join) {
1231         serial_queue_helper<ELEM-1, JType>::add_queue_nodes(g, my_join);
1232         my_queue_node_type *new_node = new my_queue_node_type(g);
1233         tbb::flow::make_edge(*new_node, std::get<ELEM-1>(my_join.input_ports()));
1234         all_input_nodes[ELEM-1][0] = (void *)new_node;
1235     }
1236 
1237     static void fill_one_queue(int maxVal) {
1238         // fill queue to "left" of me
1239         my_queue_node_type *qptr = reinterpret_cast<my_queue_node_type *>(all_input_nodes[ELEM-1][0]);
1240         serial_queue_helper<ELEM-1, JType>::fill_one_queue(maxVal);
1241         for(int i = 0; i < maxVal; ++i) {
1242             CHECK_MESSAGE(qptr->try_put(make_thingie<IT, ELEM>()(i)), "");
1243         }
1244     }
1245 
1246     static void put_one_queue_val(int myVal) {
1247         // put this val to my "left".
1248         serial_queue_helper<ELEM-1, JType>::put_one_queue_val(myVal);
1249         my_queue_node_type *qptr = reinterpret_cast<my_queue_node_type *>(all_input_nodes[ELEM-1][0]);
1250         CHECK_MESSAGE(qptr->try_put(make_thingie<IT, ELEM>()(myVal)), "");
1251     }
1252 
1253     static void check_queue_value(int i, TT &v) {
1254         serial_queue_helper<ELEM-1, JType>::check_queue_value(i, v);
1255         CHECK_MESSAGE(cast_from<IT>::my_int_val(std::get<ELEM-1>(v))==i * (ELEM+1), "");
1256     }
1257 
1258     static void remove_queue_nodes(JType &my_join) {
1259         my_queue_node_type *vptr = reinterpret_cast<my_queue_node_type *>(all_input_nodes[ELEM-1][0]);
1260         tbb::flow::remove_edge(*vptr, std::get<ELEM-1>(my_join.input_ports()));
1261         serial_queue_helper<ELEM-1, JType>::remove_queue_nodes(my_join);
1262         delete vptr;
1263     }
1264 };
1265 
1266 template<typename JType>
1267 class serial_queue_helper<1, JType> {
1268 public:
1269     typedef typename JType::output_type TT;
1270     typedef typename std::tuple_element<0, TT>::type IT;
1271     typedef typename tbb::flow::queue_node<IT> my_queue_node_type;
1272     static void print_remark() {
1273         INFO("Serial test of join_node< " << name_of<IT>::name());
1274     }
1275 
1276     static void add_queue_nodes(tbb::flow::graph &g, JType &my_join) {
1277         my_queue_node_type *new_node = new my_queue_node_type(g);
1278         tbb::flow::make_edge(*new_node, tbb::flow::input_port<0>(my_join));
1279         all_input_nodes[0][0] = (void *)new_node;
1280     }
1281 
1282     static void fill_one_queue(int maxVal) {
1283         my_queue_node_type *qptr = reinterpret_cast<my_queue_node_type *>(all_input_nodes[0][0]);
1284         for(int i = 0; i < maxVal; ++i) {
1285             CHECK_MESSAGE(qptr->try_put(make_thingie<IT, 1>()(i)), "");
1286         }
1287     }
1288 
1289     static void put_one_queue_val(int myVal) {
1290         my_queue_node_type *qptr = reinterpret_cast<my_queue_node_type *>(all_input_nodes[0][0]);
1291         IT my_val = make_thingie<IT, 1>()(myVal);
1292         CHECK_MESSAGE(qptr->try_put(my_val), "");
1293     }
1294 
1295     static void check_queue_value(int i, TT &v) {
1296         CHECK_MESSAGE(cast_from<IT>::my_int_val(std::get<0>(v))==i*2, "");
1297     }
1298 
1299     static void remove_queue_nodes(JType &my_join) {
1300         my_queue_node_type *vptr = reinterpret_cast<my_queue_node_type *>(all_input_nodes[0][0]);
1301         tbb::flow::remove_edge(*vptr, std::get<0>(my_join.input_ports()));
1302         delete vptr;
1303     }
1304 };
1305 
1306 //
1307 // Single reservable predecessor at each port, single accepting and rejecting successor
1308 //   * put to buffer before port0, then put to buffer before port1, ...
1309 //   * fill buffer before port0 then fill buffer before port1, ...
1310 
1311 template<typename JType, class JP>
1312 void test_one_serial(JType &my_join, tbb::flow::graph &g) {
1313     typedef typename JType::output_type TType;
1314     static const int TUPLE_SIZE = std::tuple_size<TType>::value;
1315     bool is_key_matching = is_key_matching_join<JP>::value;
1316     std::vector<bool> flags;
1317     serial_queue_helper<TUPLE_SIZE, JType>::add_queue_nodes(g, my_join);
1318     typedef TType q3_input_type;
1319     tbb::flow::queue_node< q3_input_type > q3(g);
1320 
1321     tbb::flow::make_edge(my_join, q3);
1322 
1323     // fill each queue with its value one-at-a-time
1324     flags.clear();
1325     for(int i = 0; i < Count; ++i) {
1326         serial_queue_helper<TUPLE_SIZE, JType>::put_one_queue_val(i);
1327         flags.push_back(false);
1328     }
1329 
1330     g.wait_for_all();
1331     for(int i = 0; i < Count; ++i) {
1332         q3_input_type v;
1333         g.wait_for_all();
1334         CHECK_MESSAGE( (q3.try_get(v)), "Error in try_get()");
1335         if(is_key_matching) {
1336             // because we look up tags in the hash table, the output may be out of order.
1337             int j = int(std::get<0>(v))/2;  // figure what the index should be
1338             serial_queue_helper<TUPLE_SIZE, JType>::check_queue_value(j, v);
1339             flags[j] = true;
1340         }
1341         else {
1342             serial_queue_helper<TUPLE_SIZE, JType>::check_queue_value(i, v);
1343         }
1344     }
1345 
1346     if(is_key_matching) {
1347         for(int i = 0; i < Count; ++i) {
1348             CHECK_MESSAGE(flags[i], "");
1349             flags[i] = false;
1350         }
1351     }
1352 
1353     tbb::flow::remove_edge(my_join, q3);
1354     tbb::flow::limiter_node<q3_input_type> limiter(g, Count / 2);
1355     tbb::flow::make_edge(my_join, limiter);
1356     tbb::flow::make_edge(limiter, q3);
1357 
1358     // fill each queue completely before filling the next.
1359     serial_queue_helper<TUPLE_SIZE, JType>::fill_one_queue(Count);
1360 
1361     g.wait_for_all();
1362     for(int i = 0; i < Count / 2; ++i) {
1363         q3_input_type v;
1364         g.wait_for_all();
1365         CHECK_MESSAGE( (q3.try_get(v)), "Error in try_get()");
1366         if(is_key_matching) {
1367             int j = int(std::get<0>(v))/2;
1368             serial_queue_helper<TUPLE_SIZE, JType>::check_queue_value(j, v);
1369             flags[j] = true;
1370         }
1371         else {
1372             serial_queue_helper<TUPLE_SIZE, JType>::check_queue_value(i, v);
1373         }
1374     }
1375 
1376     if(is_key_matching) {
1377         CHECK(std::count(flags.begin(), flags.end(), true) == Count / 2);
1378     }
1379 
1380     serial_queue_helper<TUPLE_SIZE, JType>::remove_queue_nodes(my_join);
1381 }
1382 
1383 template<typename JType, class JP>
1384 class serial_test {
1385     typedef typename JType::output_type TType;
1386 public:
1387     static void test() {
1388         tbb::flow::graph g;
1389         std::vector<bool> flags;
1390         bool is_key_matching = is_key_matching_join<JP>::value;
1391         flags.reserve(Count);
1392 
1393         const int TUPLE_SIZE = std::tuple_size<TType>::value;
1394         static const int ELEMS = 3;
1395 
1396         JType* my_join = makeJoin<TUPLE_SIZE, JType, JP>::create(g);
1397         test_input_ports_return_ref(*my_join);
1398         serial_queue_helper<TUPLE_SIZE, JType>::print_remark(); INFO(" >");
1399         if(is_key_matching) {
1400             INFO("with K == " << name_of<typename K_deref<typename is_key_matching_join<JP>::key_type>::type >::name());
1401             if(std::is_reference<typename is_key_matching_join<JP>::key_type>::value) {
1402                 INFO("&");
1403             }
1404         }
1405         INFO("\n");
1406 
1407         test_one_serial<JType, JP>(*my_join, g);
1408         // build the vector with copy construction from the used join node.
1409         std::vector<JType>join_vector(ELEMS, *my_join);
1410         // destroy the tired old join_node in case we're accidentally reusing pieces of it.
1411         makeJoin<TUPLE_SIZE, JType, JP>::destroy(my_join);
1412 
1413         for(int e = 0; e < ELEMS; ++e) {  // exercise each of the vector elements
1414             test_one_serial<JType, JP>(join_vector[e], g);
1415         }
1416     }
1417 
1418 }; // serial_test
1419 
1420 #if _MSC_VER && !defined(__INTEL_COMPILER)
1421 #pragma warning( pop )
1422 #endif
1423 
1424 template<
1425     template<typename, class > class TestType,  // serial_test or parallel_test
1426     typename OutputTupleType,           // type of the output of the join
1427     class J>                 // graph_buffer_policy (reserving, queueing, tag_matching or key_matching)
1428     class generate_test {
1429     public:
1430         typedef tbb::flow::join_node<OutputTupleType, typename filter_out_message_based_key_matching<J>::policy> join_node_type;
1431         static void do_test() {
1432             TestType<join_node_type, J>::test();
1433         }
1434 };
1435 
1436 template<class JP>
1437 void test_input_port_policies();
1438 
1439 // join_node (reserving) does not consume inputs until an item is available at
1440 // every input.  It tries to reserve each input, and if any fails it releases the
1441 // reservation.  When it builds a tuple it broadcasts to all its successors and
1442 // consumes all the inputs.
1443 //
1444 // So our test will put an item at one input port, then attach another node to the
1445 // same node (a queue node in this case).  The second successor should receive the
1446 // item in the queue, emptying it.
1447 //
1448 // We then place an item in the second input queue, and check the output queues; they
1449 // should still be empty.  Then we place an item in the first queue; the output queues
1450 // should then receive a tuple.
1451 //
1452 // we then attach another function node to the second input.  It should not receive
1453 // an item, verifying that the item in the queue is consumed.
1454 template<>
1455 void test_input_port_policies<tbb::flow::reserving>() {
1456     tbb::flow::graph g;
1457     typedef tbb::flow::join_node<std::tuple<int, int>, tbb::flow::reserving > JType; // two-phase is the default policy
1458                                                                                            // create join_node<type0,type1> jn
1459     JType jn(g);
1460     // create output_queue oq0, oq1
1461     typedef JType::output_type OQType;
1462     tbb::flow::queue_node<OQType> oq0(g);
1463     tbb::flow::queue_node<OQType> oq1(g);
1464     // create iq0, iq1
1465     typedef tbb::flow::queue_node<int> IQType;
1466     IQType iq0(g);
1467     IQType iq1(g);
1468     // create qnp, qnq
1469     IQType qnp(g);
1470     IQType qnq(g);
1471     INFO("Testing policies of join_node<reserving> input ports\n");
1472     // attach jn to oq0, oq1
1473     tbb::flow::make_edge(jn, oq0);
1474     tbb::flow::make_edge(jn, oq1);
1475 
1476     // attach iq0, iq1 to jn
1477     tbb::flow::make_edge(iq0, std::get<0>(jn.input_ports()));
1478     tbb::flow::make_edge(iq1, std::get<1>(jn.input_ports()));
1479 
1480     for(int loop = 0; loop < 3; ++loop) {
1481         // place one item in iq0
1482         CHECK_MESSAGE( (iq0.try_put(1)), "Error putting to iq1");
1483         // attach iq0 to qnp
1484         tbb::flow::make_edge(iq0, qnp);
1485         // qnp should have an item in it.
1486         g.wait_for_all();
1487         {
1488             int i;
1489             CHECK_MESSAGE( (qnp.try_get(i)&&i==1), "Error in item fetched by qnp");
1490         }
1491         // place item in iq1
1492         CHECK_MESSAGE( (iq1.try_put(2)), "Error putting to iq1");
1493         // oq0, oq1 should be empty
1494         g.wait_for_all();
1495         {
1496             OQType t1;
1497             CHECK_MESSAGE( (!oq0.try_get(t1)&&!oq1.try_get(t1)), "oq0 and oq1 not empty");
1498         }
1499         // detach qnp from iq0
1500         tbb::flow::remove_edge(iq0, qnp); // if we don't remove qnp it will gobble any values we put in iq0
1501                                           // place item in iq0
1502         CHECK_MESSAGE( (iq0.try_put(3)), "Error on second put to iq0");
1503         // oq0, oq1 should have items in them
1504         g.wait_for_all();
1505         {
1506             OQType t0;
1507             OQType t1;
1508             CHECK_MESSAGE( (oq0.try_get(t0)&&std::get<0>(t0)==3&&std::get<1>(t0)==2), "Error in oq0 output");
1509             CHECK_MESSAGE( (oq1.try_get(t1)&&std::get<0>(t1)==3&&std::get<1>(t1)==2), "Error in oq1 output");
1510         }
1511         // attach qnp to iq0, qnq to iq1
1512         // qnp and qnq should be empty
1513         tbb::flow::make_edge(iq0, qnp);
1514         tbb::flow::make_edge(iq1, qnq);
1515         g.wait_for_all();
1516         {
1517             int i;
1518             CHECK_MESSAGE( (!qnp.try_get(i)), "iq0 still had value in it");
1519             CHECK_MESSAGE( (!qnq.try_get(i)), "iq1 still had value in it");
1520         }
1521         tbb::flow::remove_edge(iq0, qnp);
1522         tbb::flow::remove_edge(iq1, qnq);
1523     } // for ( int loop ...
1524 }
1525 
1526 // join_node (queueing) consumes inputs as soon as they are available at
1527 // any input.  When it builds a tuple it broadcasts to all its successors and
1528 // discards the broadcast values.
1529 //
1530 // So our test will put an item at one input port, then attach another node to the
1531 // same node (a queue node in this case).  The second successor should not receive
1532 // an item (because the join consumed it).
1533 //
1534 // We then place an item in the second input queue, and check the output queues; they
1535 // should each have a tuple.
1536 //
1537 // we then attach another function node to the second input.  It should not receive
1538 // an item, verifying that the item in the queue is consumed.
1539 template<>
1540 void test_input_port_policies<tbb::flow::queueing>() {
1541     tbb::flow::graph g;
1542     typedef tbb::flow::join_node<std::tuple<int, int>, tbb::flow::queueing > JType;
1543     // create join_node<type0,type1> jn
1544     JType jn(g);
1545     // create output_queue oq0, oq1
1546     typedef JType::output_type OQType;
1547     tbb::flow::queue_node<OQType> oq0(g);
1548     tbb::flow::queue_node<OQType> oq1(g);
1549     // create iq0, iq1
1550     typedef tbb::flow::queue_node<int> IQType;
1551     IQType iq0(g);
1552     IQType iq1(g);
1553     // create qnp, qnq
1554     IQType qnp(g);
1555     IQType qnq(g);
1556     INFO("Testing policies of join_node<queueing> input ports\n");
1557     // attach jn to oq0, oq1
1558     tbb::flow::make_edge(jn, oq0);
1559     tbb::flow::make_edge(jn, oq1);
1560 
1561     // attach iq0, iq1 to jn
1562     tbb::flow::make_edge(iq0, std::get<0>(jn.input_ports()));
1563     tbb::flow::make_edge(iq1, std::get<1>(jn.input_ports()));
1564 
1565     for(int loop = 0; loop < 3; ++loop) {
1566         // place one item in iq0
1567         CHECK_MESSAGE( (iq0.try_put(1)), "Error putting to iq1");
1568         // attach iq0 to qnp
1569         tbb::flow::make_edge(iq0, qnp);
1570         // qnp should have an item in it.
1571         g.wait_for_all();
1572         {
1573             int i;
1574             CHECK_MESSAGE( (!qnp.try_get(i)), "Item was received by qnp");
1575         }
1576         // place item in iq1
1577         CHECK_MESSAGE( (iq1.try_put(2)), "Error putting to iq1");
1578         // oq0, oq1 should have items
1579         g.wait_for_all();
1580         {
1581             OQType t0;
1582             OQType t1;
1583             CHECK_MESSAGE( (oq0.try_get(t0)&&std::get<0>(t0)==1&&std::get<1>(t0)==2), "Error in oq0 output");
1584             CHECK_MESSAGE( (oq1.try_get(t1)&&std::get<0>(t1)==1&&std::get<1>(t1)==2), "Error in oq1 output");
1585         }
1586         // attach qnq to iq1
1587         // qnp and qnq should be empty
1588         tbb::flow::make_edge(iq1, qnq);
1589         g.wait_for_all();
1590         {
1591             int i;
1592             CHECK_MESSAGE( (!qnp.try_get(i)), "iq0 still had value in it");
1593             CHECK_MESSAGE( (!qnq.try_get(i)), "iq1 still had value in it");
1594         }
1595         tbb::flow::remove_edge(iq0, qnp);
1596         tbb::flow::remove_edge(iq1, qnq);
1597     } // for ( int loop ...
1598 }
1599 
1600 template<typename T>
1601 struct myTagValue {
1602     tbb::flow::tag_value operator()(T i) { return tbb::flow::tag_value(i); }
1603 };
1604 
1605 template<>
1606 struct myTagValue<CheckType<int> > {
1607     tbb::flow::tag_value operator()(CheckType<int> i) { return tbb::flow::tag_value((int)i); }
1608 };
1609 
1610 // join_node (tag_matching) consumes inputs as soon as they are available at
1611 // any input.  When it builds a tuple it broadcasts to all its successors and
1612 // discards the broadcast values.
1613 //
1614 // It chooses the tuple it broadcasts by matching the tag values returned by the
1615 // methods given the constructor of the join, in this case the method just casts
1616 // the value in each port to tag_value.
1617 //
1618 // So our test will put an item at one input port, then attach another node to the
1619 // same node (a queue node in this case).  The second successor should not receive
1620 // an item (because the join consumed it).
1621 //
1622 // We then place an item in the second input queue, and check the output queues; they
1623 // should each have a tuple.
1624 //
1625 // we then attach another queue node to the second input.  It should not receive
1626 // an item, verifying that the item in the queue is consumed.
1627 //
1628 // We will then exercise the join with a bunch of values, and the output order should
1629 // be determined by the order we insert items into the second queue.  (Each tuple set
1630 // corresponding to a tag will be complete when the second item is inserted.)
1631 template<>
1632 void test_input_port_policies<tbb::flow::tag_matching>() {
1633     tbb::flow::graph g;
1634     typedef tbb::flow::join_node<std::tuple<int, CheckType<int> >, tbb::flow::tag_matching > JoinNodeType;
1635     typedef JoinNodeType::output_type CheckTupleType;
1636     JoinNodeType testJoinNode(g, myTagValue<int>(), myTagValue<CheckType<int> >());
1637     tbb::flow::queue_node<CheckTupleType> checkTupleQueue0(g);
1638     tbb::flow::queue_node<CheckTupleType> checkTupleQueue1(g);
1639     {
1640         Checker<CheckType<int> > my_check;
1641 
1642 
1643         typedef tbb::flow::queue_node<int> IntQueueType;
1644         typedef tbb::flow::queue_node<CheckType<int> > CheckQueueType;
1645         IntQueueType intInputQueue(g);
1646         CheckQueueType checkInputQueue(g);
1647         IntQueueType intEmptyTestQueue(g);
1648         CheckQueueType checkEmptyTestQueue(g);
1649         INFO("Testing policies of join_node<tag_matching> input ports\n");
1650         // attach testJoinNode to checkTupleQueue0, checkTupleQueue1
1651         tbb::flow::make_edge(testJoinNode, checkTupleQueue0);
1652         tbb::flow::make_edge(testJoinNode, checkTupleQueue1);
1653 
1654         // attach intInputQueue, checkInputQueue to testJoinNode
1655         tbb::flow::make_edge(intInputQueue, tbb::flow::input_port<0>(testJoinNode));
1656         tbb::flow::make_edge(checkInputQueue, tbb::flow::input_port<1>(testJoinNode));
1657 
1658         // we'll put four discrete values in the inputs to the join_node.  Each
1659         // set of inputs should result in one output.
1660         for(int loop = 0; loop < 4; ++loop) {
1661             // place one item in intInputQueue
1662             CHECK_MESSAGE( (intInputQueue.try_put(loop)), "Error putting to intInputQueue");
1663             // attach intInputQueue to intEmptyTestQueue
1664             tbb::flow::make_edge(intInputQueue, intEmptyTestQueue);
1665             // intEmptyTestQueue should not have an item in it.  (the join consumed it.)
1666             g.wait_for_all();
1667             {
1668                 int intVal0;
1669                 CHECK_MESSAGE( (!intEmptyTestQueue.try_get(intVal0)), "Item was received by intEmptyTestQueue");
1670             }
1671             // place item in checkInputQueue
1672             CheckType<int> checkVal0(loop);
1673             CHECK_MESSAGE( (checkInputQueue.try_put(checkVal0)), "Error putting to checkInputQueue");
1674             // checkTupleQueue0, checkTupleQueue1 should have items
1675             g.wait_for_all();
1676             {
1677                 CheckTupleType t0;
1678                 CheckTupleType t1;
1679                 CHECK_MESSAGE( (checkTupleQueue0.try_get(t0)&&std::get<0>(t0)==loop&&(int)std::get<1>(t0)==loop), "Error in checkTupleQueue0 output");
1680                 CHECK_MESSAGE( (checkTupleQueue1.try_get(t1)&&std::get<0>(t1)==loop&&(int)std::get<1>(t1)==loop), "Error in checkTupleQueue1 output");
1681                 CHECK_MESSAGE( (!checkTupleQueue0.try_get(t0)), "extra object in output queue checkTupleQueue0");
1682                 CHECK_MESSAGE( (!checkTupleQueue1.try_get(t0)), "extra object in output queue checkTupleQueue1");
1683             }
1684             // attach checkEmptyTestQueue to checkInputQueue
1685             // intEmptyTestQueue and checkEmptyTestQueue should be empty
1686             tbb::flow::make_edge(checkInputQueue, checkEmptyTestQueue);
1687             g.wait_for_all();
1688             {
1689                 int intVal1;
1690                 CheckType<int> checkVal1;
1691                 CHECK_MESSAGE( (!intEmptyTestQueue.try_get(intVal1)), "intInputQueue still had value in it");
1692                 CHECK_MESSAGE( (!checkEmptyTestQueue.try_get(checkVal1)), "checkInputQueue still had value in it");
1693             }
1694             tbb::flow::remove_edge(intInputQueue, intEmptyTestQueue);
1695             tbb::flow::remove_edge(checkInputQueue, checkEmptyTestQueue);
1696         } // for ( int loop ...
1697 
1698           // Now we'll put [4 .. nValues - 1] in intInputQueue, and then put [4 .. nValues - 1] in checkInputQueue in
1699           // a different order.  We should see tuples in the output queues in the order we inserted
1700           // the integers into checkInputQueue.
1701         const int nValues = 100;
1702         const int nIncr = 31;  // relatively prime to nValues
1703 
1704         for(int loop = 4; loop < 4+nValues; ++loop) {
1705             // place one item in intInputQueue
1706             CHECK_MESSAGE( (intInputQueue.try_put(loop)), "Error putting to intInputQueue");
1707             g.wait_for_all();
1708             {
1709                 CheckTupleType t3;
1710                 CHECK_MESSAGE( (!checkTupleQueue0.try_get(t3)), "Object in output queue");
1711                 CHECK_MESSAGE( (!checkTupleQueue1.try_get(t3)), "Object in output queue");
1712             }
1713         } // for ( int loop ...
1714 
1715         for(int loop = 1; loop<=nValues; ++loop) {
1716             int lp1 = 4+(loop * nIncr)%nValues;
1717             // place item in checkInputQueue
1718             CHECK_MESSAGE( (checkInputQueue.try_put(lp1)), "Error putting to checkInputQueue");
1719             // checkTupleQueue0, checkTupleQueue1 should have items
1720             g.wait_for_all();
1721             {
1722                 CheckTupleType t0;
1723                 CheckTupleType t1;
1724                 CHECK_MESSAGE( (checkTupleQueue0.try_get(t0)&&std::get<0>(t0)==lp1 && std::get<1>(t0)==lp1), "Error in checkTupleQueue0 output");
1725                 CHECK_MESSAGE( (checkTupleQueue1.try_get(t1)&&std::get<0>(t1)==lp1 && std::get<1>(t1)==lp1), "Error in checkTupleQueue1 output");
1726                 CHECK_MESSAGE( (!checkTupleQueue0.try_get(t0)), "extra object in output queue checkTupleQueue0");
1727                 CHECK_MESSAGE( (!checkTupleQueue1.try_get(t0)), "extra object in output queue checkTupleQueue1");
1728             }
1729         } // for ( int loop ...
1730     } // Check
1731 }
1732 
1733 template<typename Policy> struct policy_name {};
1734 
1735 template<> struct policy_name<tbb::flow::queueing> {
1736 const char* msg_beg() { return "queueing\n";}
1737 const char* msg_end() { return "test queueing extract\n";}
1738 };
1739 
1740 template<> struct policy_name<tbb::flow::reserving> {
1741 const char* msg_beg() { return "reserving\n";}
1742 const char* msg_end() { return "test reserving extract\n";}
1743 };
1744 
1745 template<> struct policy_name<tbb::flow::tag_matching> {
1746 const char* msg_beg() { return "tag_matching\n";}
1747 const char* msg_end() { return "test tag_matching extract\n";}
1748 };
1749 
1750 template<typename Policy>
1751 void test_main() {
1752     test_input_port_policies<Policy>();
1753     for(int p = 0; p < 2; ++p) {
1754         INFO(policy_name<Policy>().msg_beg());
1755         generate_test<serial_test, std::tuple<threebyte, double>, Policy>::do_test();
1756 #if MAX_TUPLE_TEST_SIZE >= 4
1757         {
1758             Checker<CheckType<int> > my_check;
1759             generate_test<serial_test, std::tuple<float, double, CheckType<int>, long>, Policy>::do_test();
1760         }
1761 #endif
1762 #if MAX_TUPLE_TEST_SIZE >= 6
1763         generate_test<serial_test, std::tuple<double, double, int, long, int, short>, Policy>::do_test();
1764 #endif
1765 #if MAX_TUPLE_TEST_SIZE >= 8
1766         generate_test<serial_test, std::tuple<float, double, double, double, float, int, float, long>, Policy>::do_test();
1767 #endif
1768 #if MAX_TUPLE_TEST_SIZE >= 10
1769         generate_test<serial_test, std::tuple<float, double, int, double, double, float, long, int, float, long>, Policy>::do_test();
1770 #endif
1771         {
1772             Checker<CheckType<int> > my_check1;
1773             generate_test<parallel_test, std::tuple<float, CheckType<int> >, Policy>::do_test();
1774         }
1775 #if MAX_TUPLE_TEST_SIZE >= 3
1776         generate_test<parallel_test, std::tuple<float, int, long>, Policy>::do_test();
1777 #endif
1778 #if MAX_TUPLE_TEST_SIZE >= 5
1779         generate_test<parallel_test, std::tuple<double, double, int, int, short>, Policy>::do_test();
1780 #endif
1781 #if MAX_TUPLE_TEST_SIZE >= 7
1782         generate_test<parallel_test, std::tuple<float, int, double, float, long, float, long>, Policy>::do_test();
1783 #endif
1784 #if MAX_TUPLE_TEST_SIZE >= 9
1785         generate_test<parallel_test, std::tuple<float, double, int, double, double, long, int, float, long>, Policy>::do_test();
1786 #endif
1787     }
1788 }
1789 
1790 #endif /* tbb_test_join_node_H */
1791