1 /* 2 Copyright (c) 2005-2022 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #ifndef tbb_test_join_node_H 18 #define tbb_test_join_node_H 19 20 #if _MSC_VER 21 // Suppress "decorated name length exceeded, name was truncated" warning 22 #if __INTEL_COMPILER 23 #pragma warning( disable: 2586 ) 24 #else 25 #pragma warning( disable: 4503 ) 26 #endif 27 #endif 28 29 #include "tbb/flow_graph.h" 30 31 #include "common/test.h" 32 #include "common/utils.h" 33 #include "common/checktype.h" 34 #include "common/graph_utils.h" 35 36 #include <type_traits> 37 38 const char *names[] = { 39 "Adam", "Bruce", "Charles", "Daniel", "Evan", "Frederich", "George", "Hiram", "Ichabod", 40 "John", "Kevin", "Leonard", "Michael", "Ned", "Olin", "Paul", "Quentin", "Ralph", "Steven", 41 "Thomas", "Ulysses", "Victor", "Walter", "Xerxes", "Yitzhak", "Zebediah", "Anne", "Bethany", 42 "Clarisse", "Dorothy", "Erin", "Fatima", "Gabrielle", "Helen", "Irene", "Jacqueline", 43 "Katherine", "Lana", "Marilyn", "Noelle", "Okiilani", "Pauline", "Querida", "Rose", "Sybil", 44 "Tatiana", "Umiko", "Victoria", "Wilma", "Xena", "Yolanda", "Zoe", "Algernon", "Benjamin", 45 "Caleb", "Dylan", "Ezra", "Felix", "Gabriel", "Henry", "Issac", "Jasper", "Keifer", 46 "Lincoln", "Milo", "Nathaniel", "Owen", "Peter", "Quincy", "Ronan", "Silas", "Theodore", 47 "Uriah", "Vincent", "Wilbur", "Xavier", "Yoda", "Zachary", "Amelia", "Brielle", "Charlotte", 48 "Daphne", "Emma", "Fiona", "Grace", "Hazel", "Isla", "Juliet", "Keira", "Lily", "Mia", 49 "Nora", "Olivia", "Penelope", "Quintana", "Ruby", "Sophia", "Tessa", "Ursula", "Violet", 50 "Willow", "Xanthe", "Yvonne", "ZsaZsa", "Asher", "Bennett", "Connor", "Dominic", "Ethan", 51 "Finn", "Grayson", "Hudson", "Ian", "Jackson", "Kent", "Liam", "Matthew", "Noah", "Oliver", 52 "Parker", "Quinn", "Rhys", "Sebastian", "Taylor", "Umberto", "Vito", "William", "Xanto", 53 "Yogi", "Zane", "Ava", "Brenda", "Chloe", "Delilah", "Ella", "Felicity", "Genevieve", 54 "Hannah", "Isabella", "Josephine", "Kacie", "Lucy", "Madeline", "Natalie", "Octavia", 55 "Piper", "Qismah", "Rosalie", "Scarlett", "Tanya", "Uta", "Vivian", "Wendy", "Xola", 56 "Yaritza", "Zanthe"}; 57 58 static const int NameCnt = sizeof(names)/sizeof(char *); 59 60 template<typename K> 61 struct index_to_key { operatorindex_to_key62 K operator()(const int indx) { 63 return (K)(3*indx+1); 64 } 65 }; 66 67 template<> 68 struct index_to_key<std::string> { 69 std::string operator()(const int indx) { 70 return std::string(names[indx % NameCnt]); 71 } 72 }; 73 74 template<typename K> 75 struct K_deref { 76 typedef K type; 77 }; 78 79 template<typename K> 80 struct K_deref<K&> { 81 typedef K type; 82 }; 83 84 template<typename K, typename V> 85 struct MyKeyFirst { 86 K my_key; 87 V my_value; 88 MyKeyFirst(int i = 0, int v = 0): my_key(index_to_key<K>()(i)), my_value((V)v) { 89 } 90 void print_val() const { 91 INFO("MyKeyFirst{"); print_my_value(my_key); INFO(","); print_my_value(my_value); INFO("}"); 92 } 93 operator int() const { return (int)my_value; } 94 }; 95 96 template<typename K, typename V> 97 struct MyKeySecond { 98 V my_value; 99 K my_key; 100 MyKeySecond(int i = 0, int v = 0): my_value((V)v), my_key(index_to_key<K>()(i)) { 101 } 102 void print_val() const { 103 INFO("MyKeySecond{"); print_my_value(my_key); INFO(","); print_my_value(my_value); INFO("}"); 104 } 105 operator int() const { return (int)my_value; } 106 }; 107 108 template<typename K, typename V> 109 struct MyMessageKeyWithoutKey { 110 V my_value; 111 K my_message_key; 112 MyMessageKeyWithoutKey(int i = 0, int v = 0): my_value((V)v), my_message_key(index_to_key<K>()(i)) { 113 } 114 void print_val() const { 115 INFO("MyMessageKeyWithoutKey{"); print_my_value(my_message_key); INFO(","); print_my_value(my_value); INFO("}"); 116 } 117 operator int() const { return (int)my_value; } 118 const K& key() const { 119 return my_message_key; 120 } 121 }; 122 123 template<typename K, typename V> 124 struct MyMessageKeyWithBrokenKey { 125 V my_value; 126 K my_key; 127 K my_message_key; 128 MyMessageKeyWithBrokenKey(int i = 0, int v = 0): my_value((V)v), my_key(), my_message_key(index_to_key<K>()(i)) { 129 } 130 void print_val() const { 131 INFO("MyMessageKeyWithBrokenKey{"); print_my_value(my_message_key); INFO(","); print_my_value(my_value); INFO("}"); 132 } 133 operator int() const { return (int)my_value; } 134 const K& key() const { 135 return my_message_key; 136 } 137 138 }; 139 140 template<typename K, typename V> 141 struct MyKeyWithBrokenMessageKey { 142 V my_value; 143 K my_key; 144 MyKeyWithBrokenMessageKey(int i = 0, int v = 0): my_value((V)v), my_key(index_to_key<K>()(i)) { 145 } 146 void print_val() const { 147 INFO("MyKeyWithBrokenMessageKey{"); print_my_value(my_key); INFO(","); print_my_value(my_value); INFO("}"); 148 } 149 operator int() const { return (int)my_value; } 150 K key() const { 151 CHECK_MESSAGE( (false), "The method should never be called"); 152 return K(); 153 } 154 }; 155 156 template<typename K, typename V> 157 struct MyMessageKeyWithoutKeyMethod { 158 V my_value; 159 K my_message_key; 160 MyMessageKeyWithoutKeyMethod(int i = 0, int v = 0): my_value((V)v), my_message_key(index_to_key<K>()(i)) { 161 } 162 void print_val() const { 163 INFO("MyMessageKeyWithoutKeyMethod{"); print_my_value(my_message_key); INFO(","); print_my_value(my_value); INFO("}"); 164 } 165 operator int() const { return (int)my_value; } 166 //K key() const; // Do not define 167 }; 168 169 // Overload for MyMessageKeyWithoutKeyMethod 170 template <typename K, typename V> 171 K key_from_message(const MyMessageKeyWithoutKeyMethod<typename std::decay<K>::type, V> &m) { 172 return m.my_message_key; 173 } 174 175 176 // pattern for creating values in the tag_matching and key_matching, given an integer and the index in the tuple 177 template<typename TT, size_t INDEX> 178 struct make_thingie { 179 TT operator()(int const &i) { 180 return TT(i * (INDEX+1)); 181 } 182 }; 183 184 template<template <typename, typename> class T, typename K, typename V, size_t INDEX> 185 struct make_thingie<T<K, V>, INDEX> { 186 T<K, V> operator()(int const &i) { 187 return T<K, V>(i, i*(INDEX+1)); 188 } 189 }; 190 191 // cast_from<T>::my_int_val(i); 192 template<typename T> 193 struct cast_from { 194 static int my_int_val(T const &i) { return (int)i; } 195 }; 196 197 template<typename K, typename V> 198 struct cast_from<MyKeyFirst<K, V> > { 199 static int my_int_val(MyKeyFirst<K, V> const &i) { return (int)(i.my_value); } 200 }; 201 202 template<typename K, typename V> 203 struct cast_from<MyKeySecond<K, V> > { 204 static int my_int_val(MyKeySecond<K, V> const &i) { return (int)(i.my_value); } 205 }; 206 207 template<typename T> 208 void print_my_value(T const &i) { 209 INFO(" " << cast_from<T>::my_int_val(i) << " " ); 210 } 211 212 template<typename K, typename V> 213 void print_my_value(MyKeyFirst<K, V> const &i) { 214 i.print_val(); 215 } 216 217 template<typename K, typename V> 218 void print_my_value(MyKeySecond<K, V> const &i) { 219 i.print_val(); 220 } 221 222 template<> 223 void print_my_value(std::string const &i) { 224 INFO("\"" << i.c_str() << "\"" ); 225 } 226 227 // 228 // Tests 229 // 230 231 //! 232 // my_struct_key == given a type V with a field named my_key of type K, will return a copy of my_key 233 template<class K, typename V> 234 struct my_struct_key { 235 K operator()(const V& mv) { 236 return mv.my_key; 237 } 238 }; 239 240 // specialization returning reference to my_key. 241 template<class K, typename V> 242 struct my_struct_key<K&, V> { 243 K& operator()(const V& mv) { 244 return const_cast<K&>(mv.my_key); 245 } 246 }; 247 248 using tbb::detail::d1::type_to_key_function_body; 249 using tbb::detail::d1::hash_buffer; 250 using tbb::detail::d1::tbb_hash_compare; 251 using tbb::detail::d1::type_to_key_function_body_leaf; 252 253 template<class K, class V> struct VtoKFB { 254 typedef type_to_key_function_body<V, K> type; 255 }; 256 257 template<typename K> struct make_hash_compare { typedef tbb_hash_compare<K> type; }; 258 259 template<typename K, class V> 260 void hash_buffer_test(const char *sname) { 261 typedef typename K_deref<K>::type KnoR; 262 hash_buffer< 263 K, 264 V, 265 typename VtoKFB<K, V>::type, 266 tbb_hash_compare<KnoR> 267 > my_hash_buffer; 268 const bool k_is_ref = std::is_reference<K>::value; 269 typedef type_to_key_function_body_leaf< 270 V, K, my_struct_key<K, V> > my_func_body_type; 271 typename VtoKFB<K, V>::type *kp = new my_func_body_type(my_struct_key<K, V>()); 272 my_hash_buffer.set_key_func(kp); 273 INFO("Running hash_buffer test on " << sname << "; is ref == " << (k_is_ref ? "true" : "false") << "\n" ); 274 V mv1, mv0; 275 bool res; 276 for(int cnt = 0; cnt < 2; ++cnt) { 277 // insert 50 items after checking they are not already in the table 278 for(int i = 0; i < 50; ++i) { 279 KnoR kk = index_to_key<KnoR>()(i); 280 mv1.my_key = kk; 281 mv1.my_value = 0.5*i; 282 res = my_hash_buffer.find_with_key(kk, mv0); 283 CHECK_MESSAGE( (!res), "Found non-inserted item"); 284 res = my_hash_buffer.insert_with_key(mv1); 285 CHECK_MESSAGE( (res), "insert failed"); 286 res = my_hash_buffer.find_with_key(kk, mv0); 287 CHECK_MESSAGE( (res), "not found after insert"); 288 CHECK_MESSAGE( (mv0.my_value==mv1.my_value), "result not correct"); 289 } 290 // go backwards checking they are still there. 291 for(int i = 49; i>=0; --i) { 292 KnoR kk = index_to_key<KnoR>()(i); 293 double value = 0.5*i; 294 res = my_hash_buffer.find_with_key(kk, mv0); 295 CHECK_MESSAGE( (res), "find failed"); 296 CHECK_MESSAGE( (mv0.my_value==value), "result not correct"); 297 } 298 // delete every third item, check they are gone 299 for(int i = 0; i < 50; i += 3) { 300 KnoR kk = index_to_key<KnoR>()(i); 301 my_hash_buffer.delete_with_key(kk); 302 res = my_hash_buffer.find_with_key(kk, mv0); 303 CHECK_MESSAGE( (!res), "Found deleted item"); 304 } 305 // check the deleted items are gone, the non-deleted items are there. 306 for(int i = 0; i < 50; ++i) { 307 KnoR kk = index_to_key<KnoR>()(i); 308 double value = 0.5*i; 309 if(i%3==0) { 310 res = my_hash_buffer.find_with_key(kk, mv0); 311 CHECK_MESSAGE( (!res), "found an item that was previously deleted"); 312 } 313 else { 314 res = my_hash_buffer.find_with_key(kk, mv0); 315 CHECK_MESSAGE( (res), "find failed"); 316 CHECK_MESSAGE( (mv0.my_value==value), "result not correct"); 317 } 318 } 319 // insert new items, check the deleted items return true, the non-deleted items return false. 320 for(int i = 0; i < 50; ++i) { 321 KnoR kk = index_to_key<KnoR>()(i); 322 double value = 1.5*i; 323 mv1.my_key = kk; 324 mv1.my_value = value; 325 res = my_hash_buffer.insert_with_key(mv1); 326 if(i%3==0) { 327 CHECK_MESSAGE( (res), "didn't insert in empty slot"); 328 } 329 else { 330 CHECK_MESSAGE( (!res), "slot was empty on insert"); 331 } 332 } 333 // delete all items 334 for(int i = 0; i < 50; ++i) { 335 KnoR kk = index_to_key<KnoR>()(i); 336 my_hash_buffer.delete_with_key(kk); 337 res = my_hash_buffer.find_with_key(kk, mv0); 338 CHECK_MESSAGE( (!res), "Found deleted item"); 339 } 340 } // perform tasks twice 341 } 342 343 void 344 TestTaggedBuffers() { 345 hash_buffer_test<int, MyKeyFirst<int, double> >("MyKeyFirst<int,double>"); 346 hash_buffer_test<int&, MyKeyFirst<int, double> >("MyKeyFirst<int,double> with int&"); 347 hash_buffer_test<int, MyKeySecond<int, double> >("MyKeySecond<int,double>"); 348 349 hash_buffer_test<std::string, MyKeyFirst<std::string, double> >("MyKeyFirst<std::string,double>"); 350 hash_buffer_test<std::string&, MyKeySecond<std::string, double> >("MyKeySecond<std::string,double> with std::string&"); 351 } 352 353 struct threebyte { 354 unsigned char b1; 355 unsigned char b2; 356 unsigned char b3; 357 threebyte(int i = 0) { 358 b1 = (unsigned char)(i&0xFF); 359 b2 = (unsigned char)((i>>8)&0xFF); 360 b3 = (unsigned char)((i>>16)&0xFF); 361 } 362 operator int() const { return (int)(b1+(b2<<8)+(b3<<16)); } 363 }; 364 365 const int Count = 150; 366 367 const int Recirc_count = 1000; // number of tuples to be generated 368 const int MaxPorts = 10; 369 const int MaxNInputs = 5; // max # of input_nodes to register for each join_node input in parallel test 370 bool outputCheck[MaxPorts][Count]; // for checking output 371 372 void 373 check_outputCheck(int nUsed, int maxCnt) { 374 for(int i = 0; i < nUsed; ++i) { 375 for(int j = 0; j < maxCnt; ++j) { 376 CHECK_MESSAGE(outputCheck[i][j], ""); 377 } 378 } 379 } 380 381 void 382 reset_outputCheck(int nUsed, int maxCnt) { 383 for(int i = 0; i < nUsed; ++i) { 384 for(int j = 0; j < maxCnt; ++j) { 385 outputCheck[i][j] = false; 386 } 387 } 388 } 389 390 template<typename T> 391 class name_of { 392 public: 393 static const char* name() { return "Unknown"; } 394 }; 395 template<typename T> 396 class name_of<CheckType<T> > { 397 public: 398 static const char* name() { return "checktype"; } 399 }; 400 template<> 401 class name_of<int> { 402 public: 403 static const char* name() { return "int"; } 404 }; 405 template<> 406 class name_of<float> { 407 public: 408 static const char* name() { return "float"; } 409 }; 410 template<> 411 class name_of<double> { 412 public: 413 static const char* name() { return "double"; } 414 }; 415 template<> 416 class name_of<long> { 417 public: 418 static const char* name() { return "long"; } 419 }; 420 template<> 421 class name_of<short> { 422 public: 423 static const char* name() { return "short"; } 424 }; 425 template<> 426 class name_of<threebyte> { 427 public: 428 static const char* name() { return "threebyte"; } 429 }; 430 template<> 431 class name_of<std::string> { 432 public: 433 static const char* name() { return "std::string"; } 434 }; 435 template<typename K, typename V> 436 class name_of<MyKeyFirst<K, V> > { 437 public: 438 static const char* name() { return "MyKeyFirst<K,V>"; } 439 }; 440 template<typename K, typename V> 441 class name_of<MyKeySecond<K, V> > { 442 public: 443 static const char* name() { return "MyKeySecond<K,V>"; } 444 }; 445 446 // The additional policy to differ message based key matching from usual key matching. 447 // It only makes sense for the test because join_node is created with the key_matching policy for the both cases. 448 template <typename K, typename KHash = tbb_hash_compare<typename std::decay<K>::type > > 449 struct message_based_key_matching {}; 450 451 // test for key_matching 452 template<class JP> 453 struct is_key_matching_join { 454 static const bool value; 455 typedef int key_type; // have to define it to something 456 }; 457 458 template<class JP> 459 const bool is_key_matching_join<JP>::value = false; 460 461 template<class K, class KHash> 462 struct is_key_matching_join<tbb::flow::key_matching<K, KHash> > { 463 static const bool value; 464 typedef K key_type; 465 }; 466 467 template<class K, class KHash> 468 const bool is_key_matching_join<tbb::flow::key_matching<K, KHash> >::value = true; 469 470 template<class K, class KHash> 471 struct is_key_matching_join<message_based_key_matching<K, KHash> > { 472 static const bool value; 473 typedef K key_type; 474 }; 475 476 template<class K, class KHash> 477 const bool is_key_matching_join<message_based_key_matching<K, KHash> >::value = true; 478 479 // for recirculating tags, input is tuple<index,continue_msg> 480 // output is index*my_mult cast to the right type 481 template<typename TT> 482 class recirc_func_body { 483 TT my_mult; 484 public: 485 typedef std::tuple<int, tbb::flow::continue_msg> input_type; 486 recirc_func_body(TT multiplier): my_mult(multiplier) {} 487 recirc_func_body(const recirc_func_body &other): my_mult(other.my_mult) { } 488 void operator=(const recirc_func_body &other) { my_mult = other.my_mult; } 489 TT operator()(const input_type &v) { 490 return TT(std::get<0>(v)) * my_mult; 491 } 492 }; 493 494 static int input_count; // input_nodes are serial 495 496 // emit input_count continue_msg 497 class recirc_input_node_body { 498 public: 499 tbb::flow::continue_msg operator()(tbb::flow_control &fc) { 500 if( --input_count < 0 ){ 501 fc.stop(); 502 } 503 return tbb::flow::continue_msg(); 504 } 505 }; 506 507 // T must be arithmetic, and shouldn't wrap around for reasonable sizes of Count (which is now 150, and maxPorts is 10, 508 // so the max number generated right now is 1500 or so.) Input will generate a series of TT with value 509 // (init_val + (i-1)*addend) * my_mult, where i is the i-th invocation of the body. We are attaching addend 510 // input nodes to a join_port, and each will generate part of the numerical series the port is expecting 511 // to receive. If there is only one input node, the series order will be maintained; if more than one, 512 // this is not guaranteed. 513 template<typename TT, size_t INDEX> 514 class my_input_body { 515 int my_count; 516 int addend; 517 public: 518 my_input_body(int init_val, int addto): my_count(init_val), addend(addto) { } 519 TT operator()(tbb::flow_control& fc) { 520 int lc = my_count; 521 TT ret = make_thingie<TT, INDEX>()(my_count); 522 my_count += addend; 523 if ( lc < Count){ 524 return ret; 525 }else{ 526 fc.stop(); 527 return TT(); 528 } 529 } 530 }; 531 532 template<typename TT> 533 class tag_func { 534 TT my_mult; 535 public: 536 tag_func(TT multiplier): my_mult(multiplier) { } 537 // operator() will return [0 .. Count) 538 tbb::flow::tag_value operator()(TT v) { 539 tbb::flow::tag_value t = tbb::flow::tag_value(v/my_mult); 540 return t; 541 } 542 }; 543 544 template <class JP> 545 struct filter_out_message_based_key_matching { 546 typedef JP policy; 547 }; 548 549 template <typename K, typename KHash> 550 struct filter_out_message_based_key_matching<message_based_key_matching<K, KHash> > { 551 // To have message based key matching in join_node, the key_matchig policy should be specified. 552 typedef tbb::flow::key_matching<K, KHash> policy; 553 }; 554 555 // allocator for join_node. This is specialized for tag_matching and key_matching joins because they require a variable number 556 // of tag_value methods passed to the constructor 557 558 template<int N, typename JType, class JP> 559 class makeJoin { 560 public: 561 static JType *create(tbb::flow::graph& g) { 562 JType *temp = new JType(g); 563 return temp; 564 } 565 static void destroy(JType *p) { delete p; } 566 }; 567 568 // for general key_matching case, each type in the tuple is a class that has the my_key field and the my_value field. 569 // 570 template<typename JType, typename K, typename KHash> 571 class makeJoin<2, JType, tbb::flow::key_matching<K, KHash> > { 572 typedef typename JType::output_type TType; 573 typedef typename std::tuple_element<0, TType>::type T0; 574 typedef typename std::tuple_element<1, TType>::type T1; 575 public: 576 static JType *create(tbb::flow::graph& g) { 577 JType *temp = new JType(g, 578 my_struct_key<K, T0>(), 579 my_struct_key<K, T1>() 580 ); 581 return temp; 582 } 583 static void destroy(JType *p) { delete p; } 584 }; 585 586 template<typename JType> 587 class makeJoin<2, JType, tbb::flow::tag_matching> { 588 typedef typename JType::output_type TType; 589 typedef typename std::tuple_element<0, TType>::type T0; 590 typedef typename std::tuple_element<1, TType>::type T1; 591 public: 592 static JType *create(tbb::flow::graph& g) { 593 JType *temp = new JType(g, 594 tag_func<T0>(T0(2)), 595 tag_func<T1>(T1(3)) 596 ); 597 return temp; 598 } 599 static void destroy(JType *p) { delete p; } 600 }; 601 602 #if MAX_TUPLE_TEST_SIZE >= 3 603 template<typename JType, typename K, typename KHash> 604 class makeJoin<3, JType, tbb::flow::key_matching<K, KHash> > { 605 typedef typename JType::output_type TType; 606 typedef typename std::tuple_element<0, TType>::type T0; 607 typedef typename std::tuple_element<1, TType>::type T1; 608 typedef typename std::tuple_element<2, TType>::type T2; 609 public: 610 static JType *create(tbb::flow::graph& g) { 611 JType *temp = new JType(g, 612 my_struct_key<K, T0>(), 613 my_struct_key<K, T1>(), 614 my_struct_key<K, T2>() 615 ); 616 return temp; 617 } 618 static void destroy(JType *p) { delete p; } 619 }; 620 621 template<typename JType> 622 class makeJoin<3, JType, tbb::flow::tag_matching> { 623 typedef typename JType::output_type TType; 624 typedef typename std::tuple_element<0, TType>::type T0; 625 typedef typename std::tuple_element<1, TType>::type T1; 626 typedef typename std::tuple_element<2, TType>::type T2; 627 public: 628 static JType *create(tbb::flow::graph& g) { 629 JType *temp = new JType(g, 630 tag_func<T0>(T0(2)), 631 tag_func<T1>(T1(3)), 632 tag_func<T2>(T2(4)) 633 ); 634 return temp; 635 } 636 static void destroy(JType *p) { delete p; } 637 }; 638 639 #endif 640 #if MAX_TUPLE_TEST_SIZE >= 4 641 642 template<typename JType, typename K, typename KHash> 643 class makeJoin<4, JType, tbb::flow::key_matching<K, KHash> > { 644 typedef typename JType::output_type TType; 645 typedef typename std::tuple_element<0, TType>::type T0; 646 typedef typename std::tuple_element<1, TType>::type T1; 647 typedef typename std::tuple_element<2, TType>::type T2; 648 typedef typename std::tuple_element<3, TType>::type T3; 649 public: 650 static JType *create(tbb::flow::graph& g) { 651 JType *temp = new JType(g, 652 my_struct_key<K, T0>(), 653 my_struct_key<K, T1>(), 654 my_struct_key<K, T2>(), 655 my_struct_key<K, T3>() 656 ); 657 return temp; 658 } 659 static void destroy(JType *p) { delete p; } 660 }; 661 662 template<typename JType> 663 class makeJoin<4, JType, tbb::flow::tag_matching> { 664 typedef typename JType::output_type TType; 665 typedef typename std::tuple_element<0, TType>::type T0; 666 typedef typename std::tuple_element<1, TType>::type T1; 667 typedef typename std::tuple_element<2, TType>::type T2; 668 typedef typename std::tuple_element<3, TType>::type T3; 669 public: 670 static JType *create(tbb::flow::graph& g) { 671 JType *temp = new JType(g, 672 tag_func<T0>(T0(2)), 673 tag_func<T1>(T1(3)), 674 tag_func<T2>(T2(4)), 675 tag_func<T3>(T3(5)) 676 ); 677 return temp; 678 } 679 static void destroy(JType *p) { delete p; } 680 }; 681 682 #endif 683 #if MAX_TUPLE_TEST_SIZE >= 5 684 template<typename JType, typename K, typename KHash> 685 class makeJoin<5, JType, tbb::flow::key_matching<K, KHash> > { 686 typedef typename JType::output_type TType; 687 typedef typename std::tuple_element<0, TType>::type T0; 688 typedef typename std::tuple_element<1, TType>::type T1; 689 typedef typename std::tuple_element<2, TType>::type T2; 690 typedef typename std::tuple_element<3, TType>::type T3; 691 typedef typename std::tuple_element<4, TType>::type T4; 692 public: 693 static JType *create(tbb::flow::graph& g) { 694 JType *temp = new JType(g, 695 my_struct_key<K, T0>(), 696 my_struct_key<K, T1>(), 697 my_struct_key<K, T2>(), 698 my_struct_key<K, T3>(), 699 my_struct_key<K, T4>() 700 ); 701 return temp; 702 } 703 static void destroy(JType *p) { delete p; } 704 }; 705 706 template<typename JType> 707 class makeJoin<5, JType, tbb::flow::tag_matching> { 708 typedef typename JType::output_type TType; 709 typedef typename std::tuple_element<0, TType>::type T0; 710 typedef typename std::tuple_element<1, TType>::type T1; 711 typedef typename std::tuple_element<2, TType>::type T2; 712 typedef typename std::tuple_element<3, TType>::type T3; 713 typedef typename std::tuple_element<4, TType>::type T4; 714 public: 715 static JType *create(tbb::flow::graph& g) { 716 JType *temp = new JType(g, 717 tag_func<T0>(T0(2)), 718 tag_func<T1>(T1(3)), 719 tag_func<T2>(T2(4)), 720 tag_func<T3>(T3(5)), 721 tag_func<T4>(T4(6)) 722 ); 723 return temp; 724 } 725 static void destroy(JType *p) { delete p; } 726 }; 727 #endif 728 #if MAX_TUPLE_TEST_SIZE >= 6 729 template<typename JType, typename K, typename KHash> 730 class makeJoin<6, JType, tbb::flow::key_matching<K, KHash> > { 731 typedef typename JType::output_type TType; 732 typedef typename std::tuple_element<0, TType>::type T0; 733 typedef typename std::tuple_element<1, TType>::type T1; 734 typedef typename std::tuple_element<2, TType>::type T2; 735 typedef typename std::tuple_element<3, TType>::type T3; 736 typedef typename std::tuple_element<4, TType>::type T4; 737 typedef typename std::tuple_element<5, TType>::type T5; 738 public: 739 static JType *create(tbb::flow::graph& g) { 740 JType *temp = new JType(g, 741 my_struct_key<K, T0>(), 742 my_struct_key<K, T1>(), 743 my_struct_key<K, T2>(), 744 my_struct_key<K, T3>(), 745 my_struct_key<K, T4>(), 746 my_struct_key<K, T5>() 747 ); 748 return temp; 749 } 750 static void destroy(JType *p) { delete p; } 751 }; 752 753 template<typename JType> 754 class makeJoin<6, JType, tbb::flow::tag_matching> { 755 typedef typename JType::output_type TType; 756 typedef typename std::tuple_element<0, TType>::type T0; 757 typedef typename std::tuple_element<1, TType>::type T1; 758 typedef typename std::tuple_element<2, TType>::type T2; 759 typedef typename std::tuple_element<3, TType>::type T3; 760 typedef typename std::tuple_element<4, TType>::type T4; 761 typedef typename std::tuple_element<5, TType>::type T5; 762 public: 763 static JType *create(tbb::flow::graph& g) { 764 JType *temp = new JType(g, 765 tag_func<T0>(T0(2)), 766 tag_func<T1>(T1(3)), 767 tag_func<T2>(T2(4)), 768 tag_func<T3>(T3(5)), 769 tag_func<T4>(T4(6)), 770 tag_func<T5>(T5(7)) 771 ); 772 return temp; 773 } 774 static void destroy(JType *p) { delete p; } 775 }; 776 #endif 777 778 #if MAX_TUPLE_TEST_SIZE >= 7 779 template<typename JType, typename K, typename KHash> 780 class makeJoin<7, JType, tbb::flow::key_matching<K, KHash> > { 781 typedef typename JType::output_type TType; 782 typedef typename std::tuple_element<0, TType>::type T0; 783 typedef typename std::tuple_element<1, TType>::type T1; 784 typedef typename std::tuple_element<2, TType>::type T2; 785 typedef typename std::tuple_element<3, TType>::type T3; 786 typedef typename std::tuple_element<4, TType>::type T4; 787 typedef typename std::tuple_element<5, TType>::type T5; 788 typedef typename std::tuple_element<6, TType>::type T6; 789 public: 790 static JType *create(tbb::flow::graph& g) { 791 JType *temp = new JType(g, 792 my_struct_key<K, T0>(), 793 my_struct_key<K, T1>(), 794 my_struct_key<K, T2>(), 795 my_struct_key<K, T3>(), 796 my_struct_key<K, T4>(), 797 my_struct_key<K, T5>(), 798 my_struct_key<K, T6>() 799 ); 800 return temp; 801 } 802 static void destroy(JType *p) { delete p; } 803 }; 804 805 template<typename JType> 806 class makeJoin<7, JType, tbb::flow::tag_matching> { 807 typedef typename JType::output_type TType; 808 typedef typename std::tuple_element<0, TType>::type T0; 809 typedef typename std::tuple_element<1, TType>::type T1; 810 typedef typename std::tuple_element<2, TType>::type T2; 811 typedef typename std::tuple_element<3, TType>::type T3; 812 typedef typename std::tuple_element<4, TType>::type T4; 813 typedef typename std::tuple_element<5, TType>::type T5; 814 typedef typename std::tuple_element<6, TType>::type T6; 815 public: 816 static JType *create(tbb::flow::graph& g) { 817 JType *temp = new JType(g, 818 tag_func<T0>(T0(2)), 819 tag_func<T1>(T1(3)), 820 tag_func<T2>(T2(4)), 821 tag_func<T3>(T3(5)), 822 tag_func<T4>(T4(6)), 823 tag_func<T5>(T5(7)), 824 tag_func<T6>(T6(8)) 825 ); 826 return temp; 827 } 828 static void destroy(JType *p) { delete p; } 829 }; 830 #endif 831 832 #if MAX_TUPLE_TEST_SIZE >= 8 833 template<typename JType, typename K, typename KHash> 834 class makeJoin<8, JType, tbb::flow::key_matching<K, KHash> > { 835 typedef typename JType::output_type TType; 836 typedef typename std::tuple_element<0, TType>::type T0; 837 typedef typename std::tuple_element<1, TType>::type T1; 838 typedef typename std::tuple_element<2, TType>::type T2; 839 typedef typename std::tuple_element<3, TType>::type T3; 840 typedef typename std::tuple_element<4, TType>::type T4; 841 typedef typename std::tuple_element<5, TType>::type T5; 842 typedef typename std::tuple_element<6, TType>::type T6; 843 typedef typename std::tuple_element<7, TType>::type T7; 844 public: 845 static JType *create(tbb::flow::graph& g) { 846 JType *temp = new JType(g, 847 my_struct_key<K, T0>(), 848 my_struct_key<K, T1>(), 849 my_struct_key<K, T2>(), 850 my_struct_key<K, T3>(), 851 my_struct_key<K, T4>(), 852 my_struct_key<K, T5>(), 853 my_struct_key<K, T6>(), 854 my_struct_key<K, T7>() 855 ); 856 return temp; 857 } 858 static void destroy(JType *p) { delete p; } 859 }; 860 861 template<typename JType> 862 class makeJoin<8, JType, tbb::flow::tag_matching> { 863 typedef typename JType::output_type TType; 864 typedef typename std::tuple_element<0, TType>::type T0; 865 typedef typename std::tuple_element<1, TType>::type T1; 866 typedef typename std::tuple_element<2, TType>::type T2; 867 typedef typename std::tuple_element<3, TType>::type T3; 868 typedef typename std::tuple_element<4, TType>::type T4; 869 typedef typename std::tuple_element<5, TType>::type T5; 870 typedef typename std::tuple_element<6, TType>::type T6; 871 typedef typename std::tuple_element<7, TType>::type T7; 872 public: 873 static JType *create(tbb::flow::graph& g) { 874 JType *temp = new JType(g, 875 tag_func<T0>(T0(2)), 876 tag_func<T1>(T1(3)), 877 tag_func<T2>(T2(4)), 878 tag_func<T3>(T3(5)), 879 tag_func<T4>(T4(6)), 880 tag_func<T5>(T5(7)), 881 tag_func<T6>(T6(8)), 882 tag_func<T7>(T7(9)) 883 ); 884 return temp; 885 } 886 static void destroy(JType *p) { delete p; } 887 }; 888 #endif 889 890 #if MAX_TUPLE_TEST_SIZE >= 9 891 template<typename JType, typename K, typename KHash> 892 class makeJoin<9, JType, tbb::flow::key_matching<K, KHash> > { 893 typedef typename JType::output_type TType; 894 typedef typename std::tuple_element<0, TType>::type T0; 895 typedef typename std::tuple_element<1, TType>::type T1; 896 typedef typename std::tuple_element<2, TType>::type T2; 897 typedef typename std::tuple_element<3, TType>::type T3; 898 typedef typename std::tuple_element<4, TType>::type T4; 899 typedef typename std::tuple_element<5, TType>::type T5; 900 typedef typename std::tuple_element<6, TType>::type T6; 901 typedef typename std::tuple_element<7, TType>::type T7; 902 typedef typename std::tuple_element<8, TType>::type T8; 903 public: 904 static JType *create(tbb::flow::graph& g) { 905 JType *temp = new JType(g, 906 my_struct_key<K, T0>(), 907 my_struct_key<K, T1>(), 908 my_struct_key<K, T2>(), 909 my_struct_key<K, T3>(), 910 my_struct_key<K, T4>(), 911 my_struct_key<K, T5>(), 912 my_struct_key<K, T6>(), 913 my_struct_key<K, T7>(), 914 my_struct_key<K, T8>() 915 ); 916 return temp; 917 } 918 static void destroy(JType *p) { delete p; } 919 }; 920 921 template<typename JType> 922 class makeJoin<9, JType, tbb::flow::tag_matching> { 923 typedef typename JType::output_type TType; 924 typedef typename std::tuple_element<0, TType>::type T0; 925 typedef typename std::tuple_element<1, TType>::type T1; 926 typedef typename std::tuple_element<2, TType>::type T2; 927 typedef typename std::tuple_element<3, TType>::type T3; 928 typedef typename std::tuple_element<4, TType>::type T4; 929 typedef typename std::tuple_element<5, TType>::type T5; 930 typedef typename std::tuple_element<6, TType>::type T6; 931 typedef typename std::tuple_element<7, TType>::type T7; 932 typedef typename std::tuple_element<8, TType>::type T8; 933 public: 934 static JType *create(tbb::flow::graph& g) { 935 JType *temp = new JType(g, 936 tag_func<T0>(T0(2)), 937 tag_func<T1>(T1(3)), 938 tag_func<T2>(T2(4)), 939 tag_func<T3>(T3(5)), 940 tag_func<T4>(T4(6)), 941 tag_func<T5>(T5(7)), 942 tag_func<T6>(T6(8)), 943 tag_func<T7>(T7(9)), 944 tag_func<T8>(T8(10)) 945 ); 946 return temp; 947 } 948 static void destroy(JType *p) { delete p; } 949 }; 950 #endif 951 952 #if MAX_TUPLE_TEST_SIZE >= 10 953 template<typename JType, typename K, typename KHash> 954 class makeJoin<10, JType, tbb::flow::key_matching<K, KHash> > { 955 typedef typename JType::output_type TType; 956 typedef typename std::tuple_element<0, TType>::type T0; 957 typedef typename std::tuple_element<1, TType>::type T1; 958 typedef typename std::tuple_element<2, TType>::type T2; 959 typedef typename std::tuple_element<3, TType>::type T3; 960 typedef typename std::tuple_element<4, TType>::type T4; 961 typedef typename std::tuple_element<5, TType>::type T5; 962 typedef typename std::tuple_element<6, TType>::type T6; 963 typedef typename std::tuple_element<7, TType>::type T7; 964 typedef typename std::tuple_element<8, TType>::type T8; 965 typedef typename std::tuple_element<9, TType>::type T9; 966 public: 967 static JType *create(tbb::flow::graph& g) { 968 JType *temp = new JType(g, 969 my_struct_key<K, T0>(), 970 my_struct_key<K, T1>(), 971 my_struct_key<K, T2>(), 972 my_struct_key<K, T3>(), 973 my_struct_key<K, T4>(), 974 my_struct_key<K, T5>(), 975 my_struct_key<K, T6>(), 976 my_struct_key<K, T7>(), 977 my_struct_key<K, T8>(), 978 my_struct_key<K, T9>() 979 ); 980 return temp; 981 } 982 static void destroy(JType *p) { delete p; } 983 }; 984 985 template<typename JType> 986 class makeJoin<10, JType, tbb::flow::tag_matching> { 987 typedef typename JType::output_type TType; 988 typedef typename std::tuple_element<0, TType>::type T0; 989 typedef typename std::tuple_element<1, TType>::type T1; 990 typedef typename std::tuple_element<2, TType>::type T2; 991 typedef typename std::tuple_element<3, TType>::type T3; 992 typedef typename std::tuple_element<4, TType>::type T4; 993 typedef typename std::tuple_element<5, TType>::type T5; 994 typedef typename std::tuple_element<6, TType>::type T6; 995 typedef typename std::tuple_element<7, TType>::type T7; 996 typedef typename std::tuple_element<8, TType>::type T8; 997 typedef typename std::tuple_element<9, TType>::type T9; 998 public: 999 static JType *create(tbb::flow::graph& g) { 1000 JType *temp = new JType(g, 1001 tag_func<T0>(T0(2)), 1002 tag_func<T1>(T1(3)), 1003 tag_func<T2>(T2(4)), 1004 tag_func<T3>(T3(5)), 1005 tag_func<T4>(T4(6)), 1006 tag_func<T5>(T5(7)), 1007 tag_func<T6>(T6(8)), 1008 tag_func<T7>(T7(9)), 1009 tag_func<T8>(T8(10)), 1010 tag_func<T9>(T9(11)) 1011 ); 1012 return temp; 1013 } 1014 static void destroy(JType *p) { delete p; } 1015 }; 1016 #endif 1017 1018 // holder for input_node pointers for eventual deletion 1019 1020 static void* all_input_nodes[MaxPorts][MaxNInputs]; 1021 1022 template<int ELEM, typename JNT> 1023 class input_node_helper { 1024 public: 1025 typedef JNT join_node_type; 1026 typedef tbb::flow::join_node<std::tuple<int, tbb::flow::continue_msg>, tbb::flow::reserving> input_join_type; 1027 typedef typename join_node_type::output_type TT; 1028 1029 typedef typename std::tuple_element<ELEM-1, TT>::type IT; 1030 typedef typename tbb::flow::input_node<IT> my_input_node_type; 1031 typedef typename tbb::flow::function_node<std::tuple<int, tbb::flow::continue_msg>, IT> my_recirc_function_type; 1032 static void print_remark(const char * str) { 1033 input_node_helper<ELEM-1, JNT>::print_remark(str); 1034 INFO(", " << name_of<IT>::name()); 1035 } 1036 static void add_input_nodes(join_node_type &my_join, tbb::flow::graph &g, int nInputs) { 1037 for(int i = 0; i < nInputs; ++i) { 1038 my_input_node_type *new_node = new my_input_node_type(g, my_input_body<IT, ELEM>(i, nInputs)); 1039 tbb::flow::make_edge(*new_node, tbb::flow::input_port<ELEM-1>(my_join)); 1040 all_input_nodes[ELEM-1][i] = (void *)new_node; 1041 new_node->activate(); 1042 } 1043 // add the next input_node 1044 input_node_helper<ELEM-1, JNT>::add_input_nodes(my_join, g, nInputs); 1045 } 1046 1047 static void add_recirc_func_nodes(join_node_type &my_join, input_join_type &my_input, tbb::flow::graph &g) { 1048 my_recirc_function_type *new_node = new my_recirc_function_type(g, tbb::flow::unlimited, recirc_func_body<IT>((IT)(ELEM+1))); 1049 tbb::flow::make_edge(*new_node, tbb::flow::input_port<ELEM-1>(my_join)); 1050 tbb::flow::make_edge(my_input, *new_node); 1051 all_input_nodes[ELEM-1][0] = (void *)new_node; 1052 input_node_helper<ELEM-1, JNT>::add_recirc_func_nodes(my_join, my_input, g); 1053 } 1054 1055 static void only_check_value(const int i, const TT &v) { 1056 CHECK_MESSAGE(std::get<ELEM-1>(v)==(IT)(i*(ELEM+1)), ""); 1057 input_node_helper<ELEM-1, JNT>::only_check_value(i, v); 1058 } 1059 1060 static void check_value(int i, TT &v, bool is_serial) { 1061 // the fetched value will match only if there is only one input_node. 1062 bool is_correct = !is_serial||std::get<ELEM-1>(v)==(IT)(i*(ELEM+1)); 1063 CHECK_MESSAGE(is_correct, ""); 1064 // tally the fetched value. 1065 int ival = (int)std::get<ELEM-1>(v); 1066 CHECK_MESSAGE(!(ival%(ELEM+1)), ""); 1067 ival /= (ELEM+1); 1068 CHECK_MESSAGE(!outputCheck[ELEM-1][ival], ""); 1069 outputCheck[ELEM-1][ival] = true; 1070 input_node_helper<ELEM-1, JNT>::check_value(i, v, is_serial); 1071 } 1072 static void remove_input_nodes(join_node_type& my_join, int nInputs) { 1073 for(int i = 0; i< nInputs; ++i) { 1074 my_input_node_type *dp = reinterpret_cast<my_input_node_type *>(all_input_nodes[ELEM-1][i]); 1075 tbb::flow::remove_edge(*dp, tbb::flow::input_port<ELEM-1>(my_join)); 1076 delete dp; 1077 } 1078 input_node_helper<ELEM-1, JNT>::remove_input_nodes(my_join, nInputs); 1079 } 1080 1081 static void remove_recirc_func_nodes(join_node_type& my_join, input_join_type &my_input) { 1082 my_recirc_function_type *fn = reinterpret_cast<my_recirc_function_type *>(all_input_nodes[ELEM-1][0]); 1083 tbb::flow::remove_edge(*fn, tbb::flow::input_port<ELEM-1>(my_join)); 1084 tbb::flow::remove_edge(my_input, *fn); 1085 delete fn; 1086 input_node_helper<ELEM-1, JNT>::remove_recirc_func_nodes(my_join, my_input); 1087 } 1088 }; 1089 1090 template<typename JNT> 1091 class input_node_helper<1, JNT> { 1092 typedef JNT join_node_type; 1093 typedef tbb::flow::join_node<std::tuple<int, tbb::flow::continue_msg>, tbb::flow::reserving> input_join_type; 1094 typedef typename join_node_type::output_type TT; 1095 1096 typedef typename std::tuple_element<0, TT>::type IT; 1097 typedef typename tbb::flow::input_node<IT> my_input_node_type; 1098 typedef typename tbb::flow::function_node<std::tuple<int, tbb::flow::continue_msg>, IT> my_recirc_function_type; 1099 public: 1100 static void print_remark(const char * str) { 1101 INFO(str << "< " << name_of<IT>::name()); 1102 } 1103 static void add_input_nodes(join_node_type &my_join, tbb::flow::graph &g, int nInputs) { 1104 for(int i = 0; i < nInputs; ++i) { 1105 my_input_node_type *new_node = new my_input_node_type(g, my_input_body<IT, 1>(i, nInputs)); 1106 tbb::flow::make_edge(*new_node, tbb::flow::input_port<0>(my_join)); 1107 all_input_nodes[0][i] = (void *)new_node; 1108 new_node->activate(); 1109 } 1110 } 1111 1112 static void add_recirc_func_nodes(join_node_type &my_join, input_join_type &my_input, tbb::flow::graph &g) { 1113 my_recirc_function_type *new_node = new my_recirc_function_type(g, tbb::flow::unlimited, recirc_func_body<IT>((IT)(2))); 1114 tbb::flow::make_edge(*new_node, tbb::flow::input_port<0>(my_join)); 1115 tbb::flow::make_edge(my_input, *new_node); 1116 all_input_nodes[0][0] = (void *)new_node; 1117 } 1118 1119 static void only_check_value(const int i, const TT &v) { 1120 CHECK_MESSAGE(std::get<0>(v)==(IT)(i*2), ""); 1121 } 1122 1123 static void check_value(int i, TT &v, bool is_serial) { 1124 bool is_correct = !is_serial||std::get<0>(v)==(IT)(i*(2)); 1125 CHECK_MESSAGE(is_correct, ""); 1126 int ival = (int)std::get<0>(v); 1127 CHECK_MESSAGE(!(ival%2), ""); 1128 ival /= 2; 1129 CHECK_MESSAGE(!outputCheck[0][ival], ""); 1130 outputCheck[0][ival] = true; 1131 } 1132 static void remove_input_nodes(join_node_type& my_join, int nInputs) { 1133 for(int i = 0; i < nInputs; ++i) { 1134 my_input_node_type *dp = reinterpret_cast<my_input_node_type *>(all_input_nodes[0][i]); 1135 tbb::flow::remove_edge(*dp, tbb::flow::input_port<0>(my_join)); 1136 delete dp; 1137 } 1138 } 1139 1140 static void remove_recirc_func_nodes(join_node_type& my_join, input_join_type &my_input) { 1141 my_recirc_function_type *fn = reinterpret_cast<my_recirc_function_type *>(all_input_nodes[0][0]); 1142 tbb::flow::remove_edge(*fn, tbb::flow::input_port<0>(my_join)); 1143 tbb::flow::remove_edge(my_input, *fn); 1144 delete fn; 1145 } 1146 }; 1147 1148 #if _MSC_VER && !defined(__INTEL_COMPILER) 1149 // Suppress "conditional expression is constant" warning. 1150 #pragma warning( push ) 1151 #pragma warning( disable: 4127 ) 1152 #endif 1153 1154 template<typename JType, class JP> 1155 class parallel_test { 1156 public: 1157 typedef typename JType::output_type TType; 1158 typedef typename is_key_matching_join<JP>::key_type key_type; 1159 static void test() { 1160 const int TUPLE_SIZE = std::tuple_size<TType>::value; 1161 const bool is_key_matching = is_key_matching_join<JP>::value; 1162 1163 TType v; 1164 input_node_helper<TUPLE_SIZE, JType>::print_remark("Parallel test of join_node"); 1165 INFO(" > "); 1166 if(is_key_matching) { 1167 INFO("with K == " << name_of<typename K_deref<typename is_key_matching_join<JP>::key_type>::type >::name()); 1168 if(std::is_reference<typename is_key_matching_join<JP>::key_type>::value) { 1169 INFO("&"); 1170 } 1171 } 1172 INFO("\n"); 1173 for(int i = 0; i < MaxPorts; ++i) { 1174 for(int j = 0; j < MaxNInputs; ++j) { 1175 all_input_nodes[i][j] = nullptr; 1176 } 1177 } 1178 for(int nInputs = 1; nInputs<=MaxNInputs; ++nInputs) { 1179 tbb::flow::graph g; 1180 bool not_out_of_order = (nInputs==1)&&(!is_key_matching); 1181 JType* my_join = makeJoin<TUPLE_SIZE, JType, JP>::create(g); 1182 tbb::flow::queue_node<TType> outq1(g); 1183 tbb::flow::queue_node<TType> outq2(g); 1184 1185 tbb::flow::make_edge(*my_join, outq1); 1186 tbb::flow::make_edge(*my_join, outq2); 1187 1188 input_node_helper<TUPLE_SIZE, JType>::add_input_nodes((*my_join), g, nInputs); 1189 1190 g.wait_for_all(); 1191 1192 reset_outputCheck(TUPLE_SIZE, Count); 1193 for(int i = 0; i < Count; ++i) { 1194 CHECK_MESSAGE(outq1.try_get(v), ""); 1195 input_node_helper<TUPLE_SIZE, JType>::check_value(i, v, not_out_of_order); 1196 } 1197 1198 check_outputCheck(TUPLE_SIZE, Count); 1199 reset_outputCheck(TUPLE_SIZE, Count); 1200 1201 for(int i = 0; i < Count; i++) { 1202 CHECK_MESSAGE(outq2.try_get(v), ""); 1203 input_node_helper<TUPLE_SIZE, JType>::check_value(i, v, not_out_of_order); 1204 } 1205 check_outputCheck(TUPLE_SIZE, Count); 1206 1207 CHECK_MESSAGE(!outq1.try_get(v), ""); 1208 CHECK_MESSAGE(!outq2.try_get(v), ""); 1209 1210 input_node_helper<TUPLE_SIZE, JType>::remove_input_nodes((*my_join), nInputs); 1211 tbb::flow::remove_edge(*my_join, outq1); 1212 tbb::flow::remove_edge(*my_join, outq2); 1213 makeJoin<TUPLE_SIZE, JType, JP>::destroy(my_join); 1214 } 1215 } 1216 }; 1217 1218 1219 template<int ELEM, typename JType> 1220 class serial_queue_helper { 1221 public: 1222 typedef typename JType::output_type TT; 1223 typedef typename std::tuple_element<ELEM-1, TT>::type IT; 1224 typedef typename tbb::flow::queue_node<IT> my_queue_node_type; 1225 static void print_remark() { 1226 serial_queue_helper<ELEM-1, JType>::print_remark(); 1227 INFO(", " << name_of<IT>::name()); 1228 } 1229 static void add_queue_nodes(tbb::flow::graph &g, JType &my_join) { 1230 serial_queue_helper<ELEM-1, JType>::add_queue_nodes(g, my_join); 1231 my_queue_node_type *new_node = new my_queue_node_type(g); 1232 tbb::flow::make_edge(*new_node, std::get<ELEM-1>(my_join.input_ports())); 1233 all_input_nodes[ELEM-1][0] = (void *)new_node; 1234 } 1235 1236 static void fill_one_queue(int maxVal) { 1237 // fill queue to "left" of me 1238 my_queue_node_type *qptr = reinterpret_cast<my_queue_node_type *>(all_input_nodes[ELEM-1][0]); 1239 serial_queue_helper<ELEM-1, JType>::fill_one_queue(maxVal); 1240 for(int i = 0; i < maxVal; ++i) { 1241 CHECK_MESSAGE(qptr->try_put(make_thingie<IT, ELEM>()(i)), ""); 1242 } 1243 } 1244 1245 static void put_one_queue_val(int myVal) { 1246 // put this val to my "left". 1247 serial_queue_helper<ELEM-1, JType>::put_one_queue_val(myVal); 1248 my_queue_node_type *qptr = reinterpret_cast<my_queue_node_type *>(all_input_nodes[ELEM-1][0]); 1249 CHECK_MESSAGE(qptr->try_put(make_thingie<IT, ELEM>()(myVal)), ""); 1250 } 1251 1252 static void check_queue_value(int i, TT &v) { 1253 serial_queue_helper<ELEM-1, JType>::check_queue_value(i, v); 1254 CHECK_MESSAGE(cast_from<IT>::my_int_val(std::get<ELEM-1>(v))==i * (ELEM+1), ""); 1255 } 1256 1257 static void remove_queue_nodes(JType &my_join) { 1258 my_queue_node_type *vptr = reinterpret_cast<my_queue_node_type *>(all_input_nodes[ELEM-1][0]); 1259 tbb::flow::remove_edge(*vptr, std::get<ELEM-1>(my_join.input_ports())); 1260 serial_queue_helper<ELEM-1, JType>::remove_queue_nodes(my_join); 1261 delete vptr; 1262 } 1263 }; 1264 1265 template<typename JType> 1266 class serial_queue_helper<1, JType> { 1267 public: 1268 typedef typename JType::output_type TT; 1269 typedef typename std::tuple_element<0, TT>::type IT; 1270 typedef typename tbb::flow::queue_node<IT> my_queue_node_type; 1271 static void print_remark() { 1272 INFO("Serial test of join_node< " << name_of<IT>::name()); 1273 } 1274 1275 static void add_queue_nodes(tbb::flow::graph &g, JType &my_join) { 1276 my_queue_node_type *new_node = new my_queue_node_type(g); 1277 tbb::flow::make_edge(*new_node, tbb::flow::input_port<0>(my_join)); 1278 all_input_nodes[0][0] = (void *)new_node; 1279 } 1280 1281 static void fill_one_queue(int maxVal) { 1282 my_queue_node_type *qptr = reinterpret_cast<my_queue_node_type *>(all_input_nodes[0][0]); 1283 for(int i = 0; i < maxVal; ++i) { 1284 CHECK_MESSAGE(qptr->try_put(make_thingie<IT, 1>()(i)), ""); 1285 } 1286 } 1287 1288 static void put_one_queue_val(int myVal) { 1289 my_queue_node_type *qptr = reinterpret_cast<my_queue_node_type *>(all_input_nodes[0][0]); 1290 IT my_val = make_thingie<IT, 1>()(myVal); 1291 CHECK_MESSAGE(qptr->try_put(my_val), ""); 1292 } 1293 1294 static void check_queue_value(int i, TT &v) { 1295 CHECK_MESSAGE(cast_from<IT>::my_int_val(std::get<0>(v))==i*2, ""); 1296 } 1297 1298 static void remove_queue_nodes(JType &my_join) { 1299 my_queue_node_type *vptr = reinterpret_cast<my_queue_node_type *>(all_input_nodes[0][0]); 1300 tbb::flow::remove_edge(*vptr, std::get<0>(my_join.input_ports())); 1301 delete vptr; 1302 } 1303 }; 1304 1305 // 1306 // Single reservable predecessor at each port, single accepting and rejecting successor 1307 // * put to buffer before port0, then put to buffer before port1, ... 1308 // * fill buffer before port0 then fill buffer before port1, ... 1309 1310 template<typename JType, class JP> 1311 void test_one_serial(JType &my_join, tbb::flow::graph &g) { 1312 typedef typename JType::output_type TType; 1313 static const int TUPLE_SIZE = std::tuple_size<TType>::value; 1314 bool is_key_matching = is_key_matching_join<JP>::value; 1315 std::vector<bool> flags; 1316 serial_queue_helper<TUPLE_SIZE, JType>::add_queue_nodes(g, my_join); 1317 typedef TType q3_input_type; 1318 tbb::flow::queue_node< q3_input_type > q3(g); 1319 1320 tbb::flow::make_edge(my_join, q3); 1321 1322 // fill each queue with its value one-at-a-time 1323 flags.clear(); 1324 for(int i = 0; i < Count; ++i) { 1325 serial_queue_helper<TUPLE_SIZE, JType>::put_one_queue_val(i); 1326 flags.push_back(false); 1327 } 1328 1329 g.wait_for_all(); 1330 for(int i = 0; i < Count; ++i) { 1331 q3_input_type v; 1332 g.wait_for_all(); 1333 CHECK_MESSAGE( (q3.try_get(v)), "Error in try_get()"); 1334 if(is_key_matching) { 1335 // because we look up tags in the hash table, the output may be out of order. 1336 int j = int(std::get<0>(v))/2; // figure what the index should be 1337 serial_queue_helper<TUPLE_SIZE, JType>::check_queue_value(j, v); 1338 flags[j] = true; 1339 } 1340 else { 1341 serial_queue_helper<TUPLE_SIZE, JType>::check_queue_value(i, v); 1342 } 1343 } 1344 1345 if(is_key_matching) { 1346 for(int i = 0; i < Count; ++i) { 1347 CHECK_MESSAGE(flags[i], ""); 1348 flags[i] = false; 1349 } 1350 } 1351 1352 tbb::flow::remove_edge(my_join, q3); 1353 tbb::flow::limiter_node<q3_input_type> limiter(g, Count / 2); 1354 tbb::flow::make_edge(my_join, limiter); 1355 tbb::flow::make_edge(limiter, q3); 1356 1357 // fill each queue completely before filling the next. 1358 serial_queue_helper<TUPLE_SIZE, JType>::fill_one_queue(Count); 1359 1360 g.wait_for_all(); 1361 for(int i = 0; i < Count / 2; ++i) { 1362 q3_input_type v; 1363 g.wait_for_all(); 1364 CHECK_MESSAGE( (q3.try_get(v)), "Error in try_get()"); 1365 if(is_key_matching) { 1366 int j = int(std::get<0>(v))/2; 1367 serial_queue_helper<TUPLE_SIZE, JType>::check_queue_value(j, v); 1368 flags[j] = true; 1369 } 1370 else { 1371 serial_queue_helper<TUPLE_SIZE, JType>::check_queue_value(i, v); 1372 } 1373 } 1374 1375 if(is_key_matching) { 1376 CHECK(std::count(flags.begin(), flags.end(), true) == Count / 2); 1377 } 1378 1379 serial_queue_helper<TUPLE_SIZE, JType>::remove_queue_nodes(my_join); 1380 } 1381 1382 template<typename JType, class JP> 1383 class serial_test { 1384 typedef typename JType::output_type TType; 1385 public: 1386 static void test() { 1387 tbb::flow::graph g; 1388 std::vector<bool> flags; 1389 bool is_key_matching = is_key_matching_join<JP>::value; 1390 flags.reserve(Count); 1391 1392 const int TUPLE_SIZE = std::tuple_size<TType>::value; 1393 static const int ELEMS = 3; 1394 1395 JType* my_join = makeJoin<TUPLE_SIZE, JType, JP>::create(g); 1396 test_input_ports_return_ref(*my_join); 1397 serial_queue_helper<TUPLE_SIZE, JType>::print_remark(); INFO(" >"); 1398 if(is_key_matching) { 1399 INFO("with K == " << name_of<typename K_deref<typename is_key_matching_join<JP>::key_type>::type >::name()); 1400 if(std::is_reference<typename is_key_matching_join<JP>::key_type>::value) { 1401 INFO("&"); 1402 } 1403 } 1404 INFO("\n"); 1405 1406 test_one_serial<JType, JP>(*my_join, g); 1407 // build the vector with copy construction from the used join node. 1408 std::vector<JType>join_vector(ELEMS, *my_join); 1409 // destroy the tired old join_node in case we're accidentally reusing pieces of it. 1410 makeJoin<TUPLE_SIZE, JType, JP>::destroy(my_join); 1411 1412 for(int e = 0; e < ELEMS; ++e) { // exercise each of the vector elements 1413 test_one_serial<JType, JP>(join_vector[e], g); 1414 } 1415 } 1416 1417 }; // serial_test 1418 1419 #if _MSC_VER && !defined(__INTEL_COMPILER) 1420 #pragma warning( pop ) 1421 #endif 1422 1423 template< 1424 template<typename, class > class TestType, // serial_test or parallel_test 1425 typename OutputTupleType, // type of the output of the join 1426 class J> // graph_buffer_policy (reserving, queueing, tag_matching or key_matching) 1427 class generate_test { 1428 public: 1429 typedef tbb::flow::join_node<OutputTupleType, typename filter_out_message_based_key_matching<J>::policy> join_node_type; 1430 static void do_test() { 1431 TestType<join_node_type, J>::test(); 1432 } 1433 }; 1434 1435 template<class JP> 1436 void test_input_port_policies(); 1437 1438 // join_node (reserving) does not consume inputs until an item is available at 1439 // every input. It tries to reserve each input, and if any fails it releases the 1440 // reservation. When it builds a tuple it broadcasts to all its successors and 1441 // consumes all the inputs. 1442 // 1443 // So our test will put an item at one input port, then attach another node to the 1444 // same node (a queue node in this case). The second successor should receive the 1445 // item in the queue, emptying it. 1446 // 1447 // We then place an item in the second input queue, and check the output queues; they 1448 // should still be empty. Then we place an item in the first queue; the output queues 1449 // should then receive a tuple. 1450 // 1451 // we then attach another function node to the second input. It should not receive 1452 // an item, verifying that the item in the queue is consumed. 1453 template<> 1454 void test_input_port_policies<tbb::flow::reserving>() { 1455 tbb::flow::graph g; 1456 typedef tbb::flow::join_node<std::tuple<int, int>, tbb::flow::reserving > JType; // two-phase is the default policy 1457 // create join_node<type0,type1> jn 1458 JType jn(g); 1459 // create output_queue oq0, oq1 1460 typedef JType::output_type OQType; 1461 tbb::flow::queue_node<OQType> oq0(g); 1462 tbb::flow::queue_node<OQType> oq1(g); 1463 // create iq0, iq1 1464 typedef tbb::flow::queue_node<int> IQType; 1465 IQType iq0(g); 1466 IQType iq1(g); 1467 // create qnp, qnq 1468 IQType qnp(g); 1469 IQType qnq(g); 1470 INFO("Testing policies of join_node<reserving> input ports\n"); 1471 // attach jn to oq0, oq1 1472 tbb::flow::make_edge(jn, oq0); 1473 tbb::flow::make_edge(jn, oq1); 1474 1475 // attach iq0, iq1 to jn 1476 tbb::flow::make_edge(iq0, std::get<0>(jn.input_ports())); 1477 tbb::flow::make_edge(iq1, std::get<1>(jn.input_ports())); 1478 1479 for(int loop = 0; loop < 3; ++loop) { 1480 // place one item in iq0 1481 CHECK_MESSAGE( (iq0.try_put(1)), "Error putting to iq1"); 1482 // attach iq0 to qnp 1483 tbb::flow::make_edge(iq0, qnp); 1484 // qnp should have an item in it. 1485 g.wait_for_all(); 1486 { 1487 int i; 1488 CHECK_MESSAGE( (qnp.try_get(i)&&i==1), "Error in item fetched by qnp"); 1489 } 1490 // place item in iq1 1491 CHECK_MESSAGE( (iq1.try_put(2)), "Error putting to iq1"); 1492 // oq0, oq1 should be empty 1493 g.wait_for_all(); 1494 { 1495 OQType t1; 1496 CHECK_MESSAGE( (!oq0.try_get(t1)&&!oq1.try_get(t1)), "oq0 and oq1 not empty"); 1497 } 1498 // detach qnp from iq0 1499 tbb::flow::remove_edge(iq0, qnp); // if we don't remove qnp it will gobble any values we put in iq0 1500 // place item in iq0 1501 CHECK_MESSAGE( (iq0.try_put(3)), "Error on second put to iq0"); 1502 // oq0, oq1 should have items in them 1503 g.wait_for_all(); 1504 { 1505 OQType t0; 1506 OQType t1; 1507 CHECK_MESSAGE( (oq0.try_get(t0)&&std::get<0>(t0)==3&&std::get<1>(t0)==2), "Error in oq0 output"); 1508 CHECK_MESSAGE( (oq1.try_get(t1)&&std::get<0>(t1)==3&&std::get<1>(t1)==2), "Error in oq1 output"); 1509 } 1510 // attach qnp to iq0, qnq to iq1 1511 // qnp and qnq should be empty 1512 tbb::flow::make_edge(iq0, qnp); 1513 tbb::flow::make_edge(iq1, qnq); 1514 g.wait_for_all(); 1515 { 1516 int i; 1517 CHECK_MESSAGE( (!qnp.try_get(i)), "iq0 still had value in it"); 1518 CHECK_MESSAGE( (!qnq.try_get(i)), "iq1 still had value in it"); 1519 } 1520 tbb::flow::remove_edge(iq0, qnp); 1521 tbb::flow::remove_edge(iq1, qnq); 1522 } // for ( int loop ... 1523 } 1524 1525 // join_node (queueing) consumes inputs as soon as they are available at 1526 // any input. When it builds a tuple it broadcasts to all its successors and 1527 // discards the broadcast values. 1528 // 1529 // So our test will put an item at one input port, then attach another node to the 1530 // same node (a queue node in this case). The second successor should not receive 1531 // an item (because the join consumed it). 1532 // 1533 // We then place an item in the second input queue, and check the output queues; they 1534 // should each have a tuple. 1535 // 1536 // we then attach another function node to the second input. It should not receive 1537 // an item, verifying that the item in the queue is consumed. 1538 template<> 1539 void test_input_port_policies<tbb::flow::queueing>() { 1540 tbb::flow::graph g; 1541 typedef tbb::flow::join_node<std::tuple<int, int>, tbb::flow::queueing > JType; 1542 // create join_node<type0,type1> jn 1543 JType jn(g); 1544 // create output_queue oq0, oq1 1545 typedef JType::output_type OQType; 1546 tbb::flow::queue_node<OQType> oq0(g); 1547 tbb::flow::queue_node<OQType> oq1(g); 1548 // create iq0, iq1 1549 typedef tbb::flow::queue_node<int> IQType; 1550 IQType iq0(g); 1551 IQType iq1(g); 1552 // create qnp, qnq 1553 IQType qnp(g); 1554 IQType qnq(g); 1555 INFO("Testing policies of join_node<queueing> input ports\n"); 1556 // attach jn to oq0, oq1 1557 tbb::flow::make_edge(jn, oq0); 1558 tbb::flow::make_edge(jn, oq1); 1559 1560 // attach iq0, iq1 to jn 1561 tbb::flow::make_edge(iq0, std::get<0>(jn.input_ports())); 1562 tbb::flow::make_edge(iq1, std::get<1>(jn.input_ports())); 1563 1564 for(int loop = 0; loop < 3; ++loop) { 1565 // place one item in iq0 1566 CHECK_MESSAGE( (iq0.try_put(1)), "Error putting to iq1"); 1567 // attach iq0 to qnp 1568 tbb::flow::make_edge(iq0, qnp); 1569 // qnp should have an item in it. 1570 g.wait_for_all(); 1571 { 1572 int i; 1573 CHECK_MESSAGE( (!qnp.try_get(i)), "Item was received by qnp"); 1574 } 1575 // place item in iq1 1576 CHECK_MESSAGE( (iq1.try_put(2)), "Error putting to iq1"); 1577 // oq0, oq1 should have items 1578 g.wait_for_all(); 1579 { 1580 OQType t0; 1581 OQType t1; 1582 CHECK_MESSAGE( (oq0.try_get(t0)&&std::get<0>(t0)==1&&std::get<1>(t0)==2), "Error in oq0 output"); 1583 CHECK_MESSAGE( (oq1.try_get(t1)&&std::get<0>(t1)==1&&std::get<1>(t1)==2), "Error in oq1 output"); 1584 } 1585 // attach qnq to iq1 1586 // qnp and qnq should be empty 1587 tbb::flow::make_edge(iq1, qnq); 1588 g.wait_for_all(); 1589 { 1590 int i; 1591 CHECK_MESSAGE( (!qnp.try_get(i)), "iq0 still had value in it"); 1592 CHECK_MESSAGE( (!qnq.try_get(i)), "iq1 still had value in it"); 1593 } 1594 tbb::flow::remove_edge(iq0, qnp); 1595 tbb::flow::remove_edge(iq1, qnq); 1596 } // for ( int loop ... 1597 } 1598 1599 template<typename T> 1600 struct myTagValue { 1601 tbb::flow::tag_value operator()(T i) { return tbb::flow::tag_value(i); } 1602 }; 1603 1604 template<> 1605 struct myTagValue<CheckType<int> > { 1606 tbb::flow::tag_value operator()(CheckType<int> i) { return tbb::flow::tag_value((int)i); } 1607 }; 1608 1609 // join_node (tag_matching) consumes inputs as soon as they are available at 1610 // any input. When it builds a tuple it broadcasts to all its successors and 1611 // discards the broadcast values. 1612 // 1613 // It chooses the tuple it broadcasts by matching the tag values returned by the 1614 // methods given the constructor of the join, in this case the method just casts 1615 // the value in each port to tag_value. 1616 // 1617 // So our test will put an item at one input port, then attach another node to the 1618 // same node (a queue node in this case). The second successor should not receive 1619 // an item (because the join consumed it). 1620 // 1621 // We then place an item in the second input queue, and check the output queues; they 1622 // should each have a tuple. 1623 // 1624 // we then attach another queue node to the second input. It should not receive 1625 // an item, verifying that the item in the queue is consumed. 1626 // 1627 // We will then exercise the join with a bunch of values, and the output order should 1628 // be determined by the order we insert items into the second queue. (Each tuple set 1629 // corresponding to a tag will be complete when the second item is inserted.) 1630 template<> 1631 void test_input_port_policies<tbb::flow::tag_matching>() { 1632 tbb::flow::graph g; 1633 typedef tbb::flow::join_node<std::tuple<int, CheckType<int> >, tbb::flow::tag_matching > JoinNodeType; 1634 typedef JoinNodeType::output_type CheckTupleType; 1635 JoinNodeType testJoinNode(g, myTagValue<int>(), myTagValue<CheckType<int> >()); 1636 tbb::flow::queue_node<CheckTupleType> checkTupleQueue0(g); 1637 tbb::flow::queue_node<CheckTupleType> checkTupleQueue1(g); 1638 { 1639 Checker<CheckType<int> > my_check; 1640 1641 1642 typedef tbb::flow::queue_node<int> IntQueueType; 1643 typedef tbb::flow::queue_node<CheckType<int> > CheckQueueType; 1644 IntQueueType intInputQueue(g); 1645 CheckQueueType checkInputQueue(g); 1646 IntQueueType intEmptyTestQueue(g); 1647 CheckQueueType checkEmptyTestQueue(g); 1648 INFO("Testing policies of join_node<tag_matching> input ports\n"); 1649 // attach testJoinNode to checkTupleQueue0, checkTupleQueue1 1650 tbb::flow::make_edge(testJoinNode, checkTupleQueue0); 1651 tbb::flow::make_edge(testJoinNode, checkTupleQueue1); 1652 1653 // attach intInputQueue, checkInputQueue to testJoinNode 1654 tbb::flow::make_edge(intInputQueue, tbb::flow::input_port<0>(testJoinNode)); 1655 tbb::flow::make_edge(checkInputQueue, tbb::flow::input_port<1>(testJoinNode)); 1656 1657 // we'll put four discrete values in the inputs to the join_node. Each 1658 // set of inputs should result in one output. 1659 for(int loop = 0; loop < 4; ++loop) { 1660 // place one item in intInputQueue 1661 CHECK_MESSAGE( (intInputQueue.try_put(loop)), "Error putting to intInputQueue"); 1662 // attach intInputQueue to intEmptyTestQueue 1663 tbb::flow::make_edge(intInputQueue, intEmptyTestQueue); 1664 // intEmptyTestQueue should not have an item in it. (the join consumed it.) 1665 g.wait_for_all(); 1666 { 1667 int intVal0; 1668 CHECK_MESSAGE( (!intEmptyTestQueue.try_get(intVal0)), "Item was received by intEmptyTestQueue"); 1669 } 1670 // place item in checkInputQueue 1671 CheckType<int> checkVal0(loop); 1672 CHECK_MESSAGE( (checkInputQueue.try_put(checkVal0)), "Error putting to checkInputQueue"); 1673 // checkTupleQueue0, checkTupleQueue1 should have items 1674 g.wait_for_all(); 1675 { 1676 CheckTupleType t0; 1677 CheckTupleType t1; 1678 CHECK_MESSAGE( (checkTupleQueue0.try_get(t0)&&std::get<0>(t0)==loop&&(int)std::get<1>(t0)==loop), "Error in checkTupleQueue0 output"); 1679 CHECK_MESSAGE( (checkTupleQueue1.try_get(t1)&&std::get<0>(t1)==loop&&(int)std::get<1>(t1)==loop), "Error in checkTupleQueue1 output"); 1680 CHECK_MESSAGE( (!checkTupleQueue0.try_get(t0)), "extra object in output queue checkTupleQueue0"); 1681 CHECK_MESSAGE( (!checkTupleQueue1.try_get(t0)), "extra object in output queue checkTupleQueue1"); 1682 } 1683 // attach checkEmptyTestQueue to checkInputQueue 1684 // intEmptyTestQueue and checkEmptyTestQueue should be empty 1685 tbb::flow::make_edge(checkInputQueue, checkEmptyTestQueue); 1686 g.wait_for_all(); 1687 { 1688 int intVal1; 1689 CheckType<int> checkVal1; 1690 CHECK_MESSAGE( (!intEmptyTestQueue.try_get(intVal1)), "intInputQueue still had value in it"); 1691 CHECK_MESSAGE( (!checkEmptyTestQueue.try_get(checkVal1)), "checkInputQueue still had value in it"); 1692 } 1693 tbb::flow::remove_edge(intInputQueue, intEmptyTestQueue); 1694 tbb::flow::remove_edge(checkInputQueue, checkEmptyTestQueue); 1695 } // for ( int loop ... 1696 1697 // Now we'll put [4 .. nValues - 1] in intInputQueue, and then put [4 .. nValues - 1] in checkInputQueue in 1698 // a different order. We should see tuples in the output queues in the order we inserted 1699 // the integers into checkInputQueue. 1700 const int nValues = 100; 1701 const int nIncr = 31; // relatively prime to nValues 1702 1703 for(int loop = 4; loop < 4+nValues; ++loop) { 1704 // place one item in intInputQueue 1705 CHECK_MESSAGE( (intInputQueue.try_put(loop)), "Error putting to intInputQueue"); 1706 g.wait_for_all(); 1707 { 1708 CheckTupleType t3; 1709 CHECK_MESSAGE( (!checkTupleQueue0.try_get(t3)), "Object in output queue"); 1710 CHECK_MESSAGE( (!checkTupleQueue1.try_get(t3)), "Object in output queue"); 1711 } 1712 } // for ( int loop ... 1713 1714 for(int loop = 1; loop<=nValues; ++loop) { 1715 int lp1 = 4+(loop * nIncr)%nValues; 1716 // place item in checkInputQueue 1717 CHECK_MESSAGE( (checkInputQueue.try_put(lp1)), "Error putting to checkInputQueue"); 1718 // checkTupleQueue0, checkTupleQueue1 should have items 1719 g.wait_for_all(); 1720 { 1721 CheckTupleType t0; 1722 CheckTupleType t1; 1723 CHECK_MESSAGE( (checkTupleQueue0.try_get(t0)&&std::get<0>(t0)==lp1 && std::get<1>(t0)==lp1), "Error in checkTupleQueue0 output"); 1724 CHECK_MESSAGE( (checkTupleQueue1.try_get(t1)&&std::get<0>(t1)==lp1 && std::get<1>(t1)==lp1), "Error in checkTupleQueue1 output"); 1725 CHECK_MESSAGE( (!checkTupleQueue0.try_get(t0)), "extra object in output queue checkTupleQueue0"); 1726 CHECK_MESSAGE( (!checkTupleQueue1.try_get(t0)), "extra object in output queue checkTupleQueue1"); 1727 } 1728 } // for ( int loop ... 1729 } // Check 1730 } 1731 1732 template<typename Policy> struct policy_name {}; 1733 1734 template<> struct policy_name<tbb::flow::queueing> { 1735 const char* msg_beg() { return "queueing\n";} 1736 const char* msg_end() { return "test queueing extract\n";} 1737 }; 1738 1739 template<> struct policy_name<tbb::flow::reserving> { 1740 const char* msg_beg() { return "reserving\n";} 1741 const char* msg_end() { return "test reserving extract\n";} 1742 }; 1743 1744 template<> struct policy_name<tbb::flow::tag_matching> { 1745 const char* msg_beg() { return "tag_matching\n";} 1746 const char* msg_end() { return "test tag_matching extract\n";} 1747 }; 1748 1749 template<typename Policy> 1750 void test_main() { 1751 test_input_port_policies<Policy>(); 1752 for(int p = 0; p < 2; ++p) { 1753 INFO(policy_name<Policy>().msg_beg()); 1754 generate_test<serial_test, std::tuple<threebyte, double>, Policy>::do_test(); 1755 #if MAX_TUPLE_TEST_SIZE >= 4 1756 { 1757 Checker<CheckType<int> > my_check; 1758 generate_test<serial_test, std::tuple<float, double, CheckType<int>, long>, Policy>::do_test(); 1759 } 1760 #endif 1761 #if MAX_TUPLE_TEST_SIZE >= 6 1762 generate_test<serial_test, std::tuple<double, double, int, long, int, short>, Policy>::do_test(); 1763 #endif 1764 #if MAX_TUPLE_TEST_SIZE >= 8 1765 generate_test<serial_test, std::tuple<float, double, double, double, float, int, float, long>, Policy>::do_test(); 1766 #endif 1767 #if MAX_TUPLE_TEST_SIZE >= 10 1768 generate_test<serial_test, std::tuple<float, double, int, double, double, float, long, int, float, long>, Policy>::do_test(); 1769 #endif 1770 { 1771 Checker<CheckType<int> > my_check1; 1772 generate_test<parallel_test, std::tuple<float, CheckType<int> >, Policy>::do_test(); 1773 } 1774 #if MAX_TUPLE_TEST_SIZE >= 3 1775 generate_test<parallel_test, std::tuple<float, int, long>, Policy>::do_test(); 1776 #endif 1777 #if MAX_TUPLE_TEST_SIZE >= 5 1778 generate_test<parallel_test, std::tuple<double, double, int, int, short>, Policy>::do_test(); 1779 #endif 1780 #if MAX_TUPLE_TEST_SIZE >= 7 1781 generate_test<parallel_test, std::tuple<float, int, double, float, long, float, long>, Policy>::do_test(); 1782 #endif 1783 #if MAX_TUPLE_TEST_SIZE >= 9 1784 generate_test<parallel_test, std::tuple<float, double, int, double, double, long, int, float, long>, Policy>::do_test(); 1785 #endif 1786 } 1787 } 1788 1789 #endif /* tbb_test_join_node_H */ 1790