1 /* 2 Copyright (c) 2005-2021 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #ifndef tbb_test_join_node_H 18 #define tbb_test_join_node_H 19 20 #if _MSC_VER 21 // Suppress "decorated name length exceeded, name was truncated" warning 22 #if __INTEL_COMPILER 23 #pragma warning( disable: 2586 ) 24 #else 25 #pragma warning( disable: 4503 ) 26 #endif 27 #endif 28 29 #include "tbb/flow_graph.h" 30 31 #include "common/test.h" 32 #include "common/utils.h" 33 #include "common/checktype.h" 34 #include "common/graph_utils.h" 35 #include "common/test_follows_and_precedes_api.h" 36 37 #include <type_traits> 38 39 const char *names[] = { 40 "Adam", "Bruce", "Charles", "Daniel", "Evan", "Frederich", "George", "Hiram", "Ichabod", 41 "John", "Kevin", "Leonard", "Michael", "Ned", "Olin", "Paul", "Quentin", "Ralph", "Steven", 42 "Thomas", "Ulysses", "Victor", "Walter", "Xerxes", "Yitzhak", "Zebediah", "Anne", "Bethany", 43 "Clarisse", "Dorothy", "Erin", "Fatima", "Gabrielle", "Helen", "Irene", "Jacqueline", 44 "Katherine", "Lana", "Marilyn", "Noelle", "Okiilani", "Pauline", "Querida", "Rose", "Sybil", 45 "Tatiana", "Umiko", "Victoria", "Wilma", "Xena", "Yolanda", "Zoe", "Algernon", "Benjamin", 46 "Caleb", "Dylan", "Ezra", "Felix", "Gabriel", "Henry", "Issac", "Jasper", "Keifer", 47 "Lincoln", "Milo", "Nathaniel", "Owen", "Peter", "Quincy", "Ronan", "Silas", "Theodore", 48 "Uriah", "Vincent", "Wilbur", "Xavier", "Yoda", "Zachary", "Amelia", "Brielle", "Charlotte", 49 "Daphne", "Emma", "Fiona", "Grace", "Hazel", "Isla", "Juliet", "Keira", "Lily", "Mia", 50 "Nora", "Olivia", "Penelope", "Quintana", "Ruby", "Sophia", "Tessa", "Ursula", "Violet", 51 "Willow", "Xanthe", "Yvonne", "ZsaZsa", "Asher", "Bennett", "Connor", "Dominic", "Ethan", 52 "Finn", "Grayson", "Hudson", "Ian", "Jackson", "Kent", "Liam", "Matthew", "Noah", "Oliver", 53 "Parker", "Quinn", "Rhys", "Sebastian", "Taylor", "Umberto", "Vito", "William", "Xanto", 54 "Yogi", "Zane", "Ava", "Brenda", "Chloe", "Delilah", "Ella", "Felicity", "Genevieve", 55 "Hannah", "Isabella", "Josephine", "Kacie", "Lucy", "Madeline", "Natalie", "Octavia", 56 "Piper", "Qismah", "Rosalie", "Scarlett", "Tanya", "Uta", "Vivian", "Wendy", "Xola", 57 "Yaritza", "Zanthe"}; 58 59 static const int NameCnt = sizeof(names)/sizeof(char *); 60 61 template<typename K> 62 struct index_to_key { 63 K operator()(const int indx) { 64 return (K)(3*indx+1); 65 } 66 }; 67 68 template<> 69 struct index_to_key<std::string> { 70 std::string operator()(const int indx) { 71 return std::string(names[indx % NameCnt]); 72 } 73 }; 74 75 template<typename K> 76 struct K_deref { 77 typedef K type; 78 }; 79 80 template<typename K> 81 struct K_deref<K&> { 82 typedef K type; 83 }; 84 85 template<typename K, typename V> 86 struct MyKeyFirst { 87 K my_key; 88 V my_value; 89 MyKeyFirst(int i = 0, int v = 0): my_key(index_to_key<K>()(i)), my_value((V)v) { 90 } 91 void print_val() const { 92 INFO("MyKeyFirst{"); print_my_value(my_key); INFO(","); print_my_value(my_value); INFO("}"); 93 } 94 operator int() const { return (int)my_value; } 95 }; 96 97 template<typename K, typename V> 98 struct MyKeySecond { 99 V my_value; 100 K my_key; 101 MyKeySecond(int i = 0, int v = 0): my_value((V)v), my_key(index_to_key<K>()(i)) { 102 } 103 void print_val() const { 104 INFO("MyKeySecond{"); print_my_value(my_key); INFO(","); print_my_value(my_value); INFO("}"); 105 } 106 operator int() const { return (int)my_value; } 107 }; 108 109 template<typename K, typename V> 110 struct MyMessageKeyWithoutKey { 111 V my_value; 112 K my_message_key; 113 MyMessageKeyWithoutKey(int i = 0, int v = 0): my_value((V)v), my_message_key(index_to_key<K>()(i)) { 114 } 115 void print_val() const { 116 INFO("MyMessageKeyWithoutKey{"); print_my_value(my_message_key); INFO(","); print_my_value(my_value); INFO("}"); 117 } 118 operator int() const { return (int)my_value; } 119 const K& key() const { 120 return my_message_key; 121 } 122 }; 123 124 template<typename K, typename V> 125 struct MyMessageKeyWithBrokenKey { 126 V my_value; 127 K my_key; 128 K my_message_key; 129 MyMessageKeyWithBrokenKey(int i = 0, int v = 0): my_value((V)v), my_key(), my_message_key(index_to_key<K>()(i)) { 130 } 131 void print_val() const { 132 INFO("MyMessageKeyWithBrokenKey{"); print_my_value(my_message_key); INFO(","); print_my_value(my_value); INFO("}"); 133 } 134 operator int() const { return (int)my_value; } 135 const K& key() const { 136 return my_message_key; 137 } 138 139 }; 140 141 template<typename K, typename V> 142 struct MyKeyWithBrokenMessageKey { 143 V my_value; 144 K my_key; 145 MyKeyWithBrokenMessageKey(int i = 0, int v = 0): my_value((V)v), my_key(index_to_key<K>()(i)) { 146 } 147 void print_val() const { 148 INFO("MyKeyWithBrokenMessageKey{"); print_my_value(my_key); INFO(","); print_my_value(my_value); INFO("}"); 149 } 150 operator int() const { return (int)my_value; } 151 K key() const { 152 CHECK_MESSAGE( (false), "The method should never be called"); 153 return K(); 154 } 155 }; 156 157 template<typename K, typename V> 158 struct MyMessageKeyWithoutKeyMethod { 159 V my_value; 160 K my_message_key; 161 MyMessageKeyWithoutKeyMethod(int i = 0, int v = 0): my_value((V)v), my_message_key(index_to_key<K>()(i)) { 162 } 163 void print_val() const { 164 INFO("MyMessageKeyWithoutKeyMethod{"); print_my_value(my_message_key); INFO(","); print_my_value(my_value); INFO("}"); 165 } 166 operator int() const { return (int)my_value; } 167 //K key() const; // Do not define 168 }; 169 170 // Overload for MyMessageKeyWithoutKeyMethod 171 template <typename K, typename V> 172 K key_from_message(const MyMessageKeyWithoutKeyMethod<typename std::decay<K>::type, V> &m) { 173 return m.my_message_key; 174 } 175 176 177 // pattern for creating values in the tag_matching and key_matching, given an integer and the index in the tuple 178 template<typename TT, size_t INDEX> 179 struct make_thingie { 180 TT operator()(int const &i) { 181 return TT(i * (INDEX+1)); 182 } 183 }; 184 185 template<template <typename, typename> class T, typename K, typename V, size_t INDEX> 186 struct make_thingie<T<K, V>, INDEX> { 187 T<K, V> operator()(int const &i) { 188 return T<K, V>(i, i*(INDEX+1)); 189 } 190 }; 191 192 // cast_from<T>::my_int_val(i); 193 template<typename T> 194 struct cast_from { 195 static int my_int_val(T const &i) { return (int)i; } 196 }; 197 198 template<typename K, typename V> 199 struct cast_from<MyKeyFirst<K, V> > { 200 static int my_int_val(MyKeyFirst<K, V> const &i) { return (int)(i.my_value); } 201 }; 202 203 template<typename K, typename V> 204 struct cast_from<MyKeySecond<K, V> > { 205 static int my_int_val(MyKeySecond<K, V> const &i) { return (int)(i.my_value); } 206 }; 207 208 template<typename T> 209 void print_my_value(T const &i) { 210 INFO(" " << cast_from<T>::my_int_val(i) << " " ); 211 } 212 213 template<typename K, typename V> 214 void print_my_value(MyKeyFirst<K, V> const &i) { 215 i.print_val(); 216 } 217 218 template<typename K, typename V> 219 void print_my_value(MyKeySecond<K, V> const &i) { 220 i.print_val(); 221 } 222 223 template<> 224 void print_my_value(std::string const &i) { 225 INFO("\"" << i.c_str() << "\"" ); 226 } 227 228 // 229 // Tests 230 // 231 232 //! 233 // my_struct_key == given a type V with a field named my_key of type K, will return a copy of my_key 234 template<class K, typename V> 235 struct my_struct_key { 236 K operator()(const V& mv) { 237 return mv.my_key; 238 } 239 }; 240 241 // specialization returning reference to my_key. 242 template<class K, typename V> 243 struct my_struct_key<K&, V> { 244 K& operator()(const V& mv) { 245 return const_cast<K&>(mv.my_key); 246 } 247 }; 248 249 using tbb::detail::d1::type_to_key_function_body; 250 using tbb::detail::d1::hash_buffer; 251 using tbb::detail::d1::tbb_hash_compare; 252 using tbb::detail::d1::type_to_key_function_body_leaf; 253 254 template<class K, class V> struct VtoKFB { 255 typedef type_to_key_function_body<V, K> type; 256 }; 257 258 template<typename K> struct make_hash_compare { typedef tbb_hash_compare<K> type; }; 259 260 template<typename K, class V> 261 void hash_buffer_test(const char *sname) { 262 typedef typename K_deref<K>::type KnoR; 263 hash_buffer< 264 K, 265 V, 266 typename VtoKFB<K, V>::type, 267 tbb_hash_compare<KnoR> 268 > my_hash_buffer; 269 const bool k_is_ref = std::is_reference<K>::value; 270 typedef type_to_key_function_body_leaf< 271 V, K, my_struct_key<K, V> > my_func_body_type; 272 typename VtoKFB<K, V>::type *kp = new my_func_body_type(my_struct_key<K, V>()); 273 my_hash_buffer.set_key_func(kp); 274 INFO("Running hash_buffer test on " << sname << "; is ref == " << (k_is_ref ? "true" : "false") << "\n" ); 275 V mv1, mv0; 276 bool res; 277 for(int cnt = 0; cnt < 2; ++cnt) { 278 // insert 50 items after checking they are not already in the table 279 for(int i = 0; i < 50; ++i) { 280 KnoR kk = index_to_key<KnoR>()(i); 281 mv1.my_key = kk; 282 mv1.my_value = 0.5*i; 283 res = my_hash_buffer.find_with_key(kk, mv0); 284 CHECK_MESSAGE( (!res), "Found non-inserted item"); 285 res = my_hash_buffer.insert_with_key(mv1); 286 CHECK_MESSAGE( (res), "insert failed"); 287 res = my_hash_buffer.find_with_key(kk, mv0); 288 CHECK_MESSAGE( (res), "not found after insert"); 289 CHECK_MESSAGE( (mv0.my_value==mv1.my_value), "result not correct"); 290 } 291 // go backwards checking they are still there. 292 for(int i = 49; i>=0; --i) { 293 KnoR kk = index_to_key<KnoR>()(i); 294 double value = 0.5*i; 295 res = my_hash_buffer.find_with_key(kk, mv0); 296 CHECK_MESSAGE( (res), "find failed"); 297 CHECK_MESSAGE( (mv0.my_value==value), "result not correct"); 298 } 299 // delete every third item, check they are gone 300 for(int i = 0; i < 50; i += 3) { 301 KnoR kk = index_to_key<KnoR>()(i); 302 my_hash_buffer.delete_with_key(kk); 303 res = my_hash_buffer.find_with_key(kk, mv0); 304 CHECK_MESSAGE( (!res), "Found deleted item"); 305 } 306 // check the deleted items are gone, the non-deleted items are there. 307 for(int i = 0; i < 50; ++i) { 308 KnoR kk = index_to_key<KnoR>()(i); 309 double value = 0.5*i; 310 if(i%3==0) { 311 res = my_hash_buffer.find_with_key(kk, mv0); 312 CHECK_MESSAGE( (!res), "found an item that was previously deleted"); 313 } 314 else { 315 res = my_hash_buffer.find_with_key(kk, mv0); 316 CHECK_MESSAGE( (res), "find failed"); 317 CHECK_MESSAGE( (mv0.my_value==value), "result not correct"); 318 } 319 } 320 // insert new items, check the deleted items return true, the non-deleted items return false. 321 for(int i = 0; i < 50; ++i) { 322 KnoR kk = index_to_key<KnoR>()(i); 323 double value = 1.5*i; 324 mv1.my_key = kk; 325 mv1.my_value = value; 326 res = my_hash_buffer.insert_with_key(mv1); 327 if(i%3==0) { 328 CHECK_MESSAGE( (res), "didn't insert in empty slot"); 329 } 330 else { 331 CHECK_MESSAGE( (!res), "slot was empty on insert"); 332 } 333 } 334 // delete all items 335 for(int i = 0; i < 50; ++i) { 336 KnoR kk = index_to_key<KnoR>()(i); 337 my_hash_buffer.delete_with_key(kk); 338 res = my_hash_buffer.find_with_key(kk, mv0); 339 CHECK_MESSAGE( (!res), "Found deleted item"); 340 } 341 } // perform tasks twice 342 } 343 344 void 345 TestTaggedBuffers() { 346 hash_buffer_test<int, MyKeyFirst<int, double> >("MyKeyFirst<int,double>"); 347 hash_buffer_test<int&, MyKeyFirst<int, double> >("MyKeyFirst<int,double> with int&"); 348 hash_buffer_test<int, MyKeySecond<int, double> >("MyKeySecond<int,double>"); 349 350 hash_buffer_test<std::string, MyKeyFirst<std::string, double> >("MyKeyFirst<std::string,double>"); 351 hash_buffer_test<std::string&, MyKeySecond<std::string, double> >("MyKeySecond<std::string,double> with std::string&"); 352 } 353 354 struct threebyte { 355 unsigned char b1; 356 unsigned char b2; 357 unsigned char b3; 358 threebyte(int i = 0) { 359 b1 = (unsigned char)(i&0xFF); 360 b2 = (unsigned char)((i>>8)&0xFF); 361 b3 = (unsigned char)((i>>16)&0xFF); 362 } 363 operator int() const { return (int)(b1+(b2<<8)+(b3<<16)); } 364 }; 365 366 const int Count = 150; 367 368 const int Recirc_count = 1000; // number of tuples to be generated 369 const int MaxPorts = 10; 370 const int MaxNInputs = 5; // max # of input_nodes to register for each join_node input in parallel test 371 bool outputCheck[MaxPorts][Count]; // for checking output 372 373 void 374 check_outputCheck(int nUsed, int maxCnt) { 375 for(int i = 0; i < nUsed; ++i) { 376 for(int j = 0; j < maxCnt; ++j) { 377 CHECK_MESSAGE(outputCheck[i][j], ""); 378 } 379 } 380 } 381 382 void 383 reset_outputCheck(int nUsed, int maxCnt) { 384 for(int i = 0; i < nUsed; ++i) { 385 for(int j = 0; j < maxCnt; ++j) { 386 outputCheck[i][j] = false; 387 } 388 } 389 } 390 391 template<typename T> 392 class name_of { 393 public: 394 static const char* name() { return "Unknown"; } 395 }; 396 template<typename T> 397 class name_of<CheckType<T> > { 398 public: 399 static const char* name() { return "checktype"; } 400 }; 401 template<> 402 class name_of<int> { 403 public: 404 static const char* name() { return "int"; } 405 }; 406 template<> 407 class name_of<float> { 408 public: 409 static const char* name() { return "float"; } 410 }; 411 template<> 412 class name_of<double> { 413 public: 414 static const char* name() { return "double"; } 415 }; 416 template<> 417 class name_of<long> { 418 public: 419 static const char* name() { return "long"; } 420 }; 421 template<> 422 class name_of<short> { 423 public: 424 static const char* name() { return "short"; } 425 }; 426 template<> 427 class name_of<threebyte> { 428 public: 429 static const char* name() { return "threebyte"; } 430 }; 431 template<> 432 class name_of<std::string> { 433 public: 434 static const char* name() { return "std::string"; } 435 }; 436 template<typename K, typename V> 437 class name_of<MyKeyFirst<K, V> > { 438 public: 439 static const char* name() { return "MyKeyFirst<K,V>"; } 440 }; 441 template<typename K, typename V> 442 class name_of<MyKeySecond<K, V> > { 443 public: 444 static const char* name() { return "MyKeySecond<K,V>"; } 445 }; 446 447 // The additional policy to differ message based key matching from usual key matching. 448 // It only makes sense for the test because join_node is created with the key_matching policy for the both cases. 449 template <typename K, typename KHash = tbb_hash_compare<typename std::decay<K>::type > > 450 struct message_based_key_matching {}; 451 452 // test for key_matching 453 template<class JP> 454 struct is_key_matching_join { 455 static const bool value; 456 typedef int key_type; // have to define it to something 457 }; 458 459 template<class JP> 460 const bool is_key_matching_join<JP>::value = false; 461 462 template<class K, class KHash> 463 struct is_key_matching_join<tbb::flow::key_matching<K, KHash> > { 464 static const bool value; 465 typedef K key_type; 466 }; 467 468 template<class K, class KHash> 469 const bool is_key_matching_join<tbb::flow::key_matching<K, KHash> >::value = true; 470 471 template<class K, class KHash> 472 struct is_key_matching_join<message_based_key_matching<K, KHash> > { 473 static const bool value; 474 typedef K key_type; 475 }; 476 477 template<class K, class KHash> 478 const bool is_key_matching_join<message_based_key_matching<K, KHash> >::value = true; 479 480 // for recirculating tags, input is tuple<index,continue_msg> 481 // output is index*my_mult cast to the right type 482 template<typename TT> 483 class recirc_func_body { 484 TT my_mult; 485 public: 486 typedef std::tuple<int, tbb::flow::continue_msg> input_type; 487 recirc_func_body(TT multiplier): my_mult(multiplier) {} 488 recirc_func_body(const recirc_func_body &other): my_mult(other.my_mult) { } 489 void operator=(const recirc_func_body &other) { my_mult = other.my_mult; } 490 TT operator()(const input_type &v) { 491 return TT(std::get<0>(v)) * my_mult; 492 } 493 }; 494 495 static int input_count; // input_nodes are serial 496 497 // emit input_count continue_msg 498 class recirc_input_node_body { 499 public: 500 tbb::flow::continue_msg operator()(tbb::flow_control &fc) { 501 if( --input_count < 0 ){ 502 fc.stop(); 503 } 504 return tbb::flow::continue_msg(); 505 } 506 }; 507 508 // T must be arithmetic, and shouldn't wrap around for reasonable sizes of Count (which is now 150, and maxPorts is 10, 509 // so the max number generated right now is 1500 or so.) Input will generate a series of TT with value 510 // (init_val + (i-1)*addend) * my_mult, where i is the i-th invocation of the body. We are attaching addend 511 // input nodes to a join_port, and each will generate part of the numerical series the port is expecting 512 // to receive. If there is only one input node, the series order will be maintained; if more than one, 513 // this is not guaranteed. 514 template<typename TT, size_t INDEX> 515 class my_input_body { 516 int my_count; 517 int addend; 518 public: 519 my_input_body(int init_val, int addto): my_count(init_val), addend(addto) { } 520 TT operator()(tbb::flow_control& fc) { 521 int lc = my_count; 522 TT ret = make_thingie<TT, INDEX>()(my_count); 523 my_count += addend; 524 if ( lc < Count){ 525 return ret; 526 }else{ 527 fc.stop(); 528 return TT(); 529 } 530 } 531 }; 532 533 template<typename TT> 534 class tag_func { 535 TT my_mult; 536 public: 537 tag_func(TT multiplier): my_mult(multiplier) { } 538 // operator() will return [0 .. Count) 539 tbb::flow::tag_value operator()(TT v) { 540 tbb::flow::tag_value t = tbb::flow::tag_value(v/my_mult); 541 return t; 542 } 543 }; 544 545 template <class JP> 546 struct filter_out_message_based_key_matching { 547 typedef JP policy; 548 }; 549 550 template <typename K, typename KHash> 551 struct filter_out_message_based_key_matching<message_based_key_matching<K, KHash> > { 552 // To have message based key matching in join_node, the key_matchig policy should be specified. 553 typedef tbb::flow::key_matching<K, KHash> policy; 554 }; 555 556 // allocator for join_node. This is specialized for tag_matching and key_matching joins because they require a variable number 557 // of tag_value methods passed to the constructor 558 559 template<int N, typename JType, class JP> 560 class makeJoin { 561 public: 562 static JType *create(tbb::flow::graph& g) { 563 JType *temp = new JType(g); 564 return temp; 565 } 566 static void destroy(JType *p) { delete p; } 567 }; 568 569 // for general key_matching case, each type in the tuple is a class that has the my_key field and the my_value field. 570 // 571 template<typename JType, typename K, typename KHash> 572 class makeJoin<2, JType, tbb::flow::key_matching<K, KHash> > { 573 typedef typename JType::output_type TType; 574 typedef typename std::tuple_element<0, TType>::type T0; 575 typedef typename std::tuple_element<1, TType>::type T1; 576 public: 577 static JType *create(tbb::flow::graph& g) { 578 JType *temp = new JType(g, 579 my_struct_key<K, T0>(), 580 my_struct_key<K, T1>() 581 ); 582 return temp; 583 } 584 static void destroy(JType *p) { delete p; } 585 }; 586 587 template<typename JType> 588 class makeJoin<2, JType, tbb::flow::tag_matching> { 589 typedef typename JType::output_type TType; 590 typedef typename std::tuple_element<0, TType>::type T0; 591 typedef typename std::tuple_element<1, TType>::type T1; 592 public: 593 static JType *create(tbb::flow::graph& g) { 594 JType *temp = new JType(g, 595 tag_func<T0>(T0(2)), 596 tag_func<T1>(T1(3)) 597 ); 598 return temp; 599 } 600 static void destroy(JType *p) { delete p; } 601 }; 602 603 #if MAX_TUPLE_TEST_SIZE >= 3 604 template<typename JType, typename K, typename KHash> 605 class makeJoin<3, JType, tbb::flow::key_matching<K, KHash> > { 606 typedef typename JType::output_type TType; 607 typedef typename std::tuple_element<0, TType>::type T0; 608 typedef typename std::tuple_element<1, TType>::type T1; 609 typedef typename std::tuple_element<2, TType>::type T2; 610 public: 611 static JType *create(tbb::flow::graph& g) { 612 JType *temp = new JType(g, 613 my_struct_key<K, T0>(), 614 my_struct_key<K, T1>(), 615 my_struct_key<K, T2>() 616 ); 617 return temp; 618 } 619 static void destroy(JType *p) { delete p; } 620 }; 621 622 template<typename JType> 623 class makeJoin<3, JType, tbb::flow::tag_matching> { 624 typedef typename JType::output_type TType; 625 typedef typename std::tuple_element<0, TType>::type T0; 626 typedef typename std::tuple_element<1, TType>::type T1; 627 typedef typename std::tuple_element<2, TType>::type T2; 628 public: 629 static JType *create(tbb::flow::graph& g) { 630 JType *temp = new JType(g, 631 tag_func<T0>(T0(2)), 632 tag_func<T1>(T1(3)), 633 tag_func<T2>(T2(4)) 634 ); 635 return temp; 636 } 637 static void destroy(JType *p) { delete p; } 638 }; 639 640 #endif 641 #if MAX_TUPLE_TEST_SIZE >= 4 642 643 template<typename JType, typename K, typename KHash> 644 class makeJoin<4, JType, tbb::flow::key_matching<K, KHash> > { 645 typedef typename JType::output_type TType; 646 typedef typename std::tuple_element<0, TType>::type T0; 647 typedef typename std::tuple_element<1, TType>::type T1; 648 typedef typename std::tuple_element<2, TType>::type T2; 649 typedef typename std::tuple_element<3, TType>::type T3; 650 public: 651 static JType *create(tbb::flow::graph& g) { 652 JType *temp = new JType(g, 653 my_struct_key<K, T0>(), 654 my_struct_key<K, T1>(), 655 my_struct_key<K, T2>(), 656 my_struct_key<K, T3>() 657 ); 658 return temp; 659 } 660 static void destroy(JType *p) { delete p; } 661 }; 662 663 template<typename JType> 664 class makeJoin<4, JType, tbb::flow::tag_matching> { 665 typedef typename JType::output_type TType; 666 typedef typename std::tuple_element<0, TType>::type T0; 667 typedef typename std::tuple_element<1, TType>::type T1; 668 typedef typename std::tuple_element<2, TType>::type T2; 669 typedef typename std::tuple_element<3, TType>::type T3; 670 public: 671 static JType *create(tbb::flow::graph& g) { 672 JType *temp = new JType(g, 673 tag_func<T0>(T0(2)), 674 tag_func<T1>(T1(3)), 675 tag_func<T2>(T2(4)), 676 tag_func<T3>(T3(5)) 677 ); 678 return temp; 679 } 680 static void destroy(JType *p) { delete p; } 681 }; 682 683 #endif 684 #if MAX_TUPLE_TEST_SIZE >= 5 685 template<typename JType, typename K, typename KHash> 686 class makeJoin<5, JType, tbb::flow::key_matching<K, KHash> > { 687 typedef typename JType::output_type TType; 688 typedef typename std::tuple_element<0, TType>::type T0; 689 typedef typename std::tuple_element<1, TType>::type T1; 690 typedef typename std::tuple_element<2, TType>::type T2; 691 typedef typename std::tuple_element<3, TType>::type T3; 692 typedef typename std::tuple_element<4, TType>::type T4; 693 public: 694 static JType *create(tbb::flow::graph& g) { 695 JType *temp = new JType(g, 696 my_struct_key<K, T0>(), 697 my_struct_key<K, T1>(), 698 my_struct_key<K, T2>(), 699 my_struct_key<K, T3>(), 700 my_struct_key<K, T4>() 701 ); 702 return temp; 703 } 704 static void destroy(JType *p) { delete p; } 705 }; 706 707 template<typename JType> 708 class makeJoin<5, JType, tbb::flow::tag_matching> { 709 typedef typename JType::output_type TType; 710 typedef typename std::tuple_element<0, TType>::type T0; 711 typedef typename std::tuple_element<1, TType>::type T1; 712 typedef typename std::tuple_element<2, TType>::type T2; 713 typedef typename std::tuple_element<3, TType>::type T3; 714 typedef typename std::tuple_element<4, TType>::type T4; 715 public: 716 static JType *create(tbb::flow::graph& g) { 717 JType *temp = new JType(g, 718 tag_func<T0>(T0(2)), 719 tag_func<T1>(T1(3)), 720 tag_func<T2>(T2(4)), 721 tag_func<T3>(T3(5)), 722 tag_func<T4>(T4(6)) 723 ); 724 return temp; 725 } 726 static void destroy(JType *p) { delete p; } 727 }; 728 #endif 729 #if MAX_TUPLE_TEST_SIZE >= 6 730 template<typename JType, typename K, typename KHash> 731 class makeJoin<6, JType, tbb::flow::key_matching<K, KHash> > { 732 typedef typename JType::output_type TType; 733 typedef typename std::tuple_element<0, TType>::type T0; 734 typedef typename std::tuple_element<1, TType>::type T1; 735 typedef typename std::tuple_element<2, TType>::type T2; 736 typedef typename std::tuple_element<3, TType>::type T3; 737 typedef typename std::tuple_element<4, TType>::type T4; 738 typedef typename std::tuple_element<5, TType>::type T5; 739 public: 740 static JType *create(tbb::flow::graph& g) { 741 JType *temp = new JType(g, 742 my_struct_key<K, T0>(), 743 my_struct_key<K, T1>(), 744 my_struct_key<K, T2>(), 745 my_struct_key<K, T3>(), 746 my_struct_key<K, T4>(), 747 my_struct_key<K, T5>() 748 ); 749 return temp; 750 } 751 static void destroy(JType *p) { delete p; } 752 }; 753 754 template<typename JType> 755 class makeJoin<6, JType, tbb::flow::tag_matching> { 756 typedef typename JType::output_type TType; 757 typedef typename std::tuple_element<0, TType>::type T0; 758 typedef typename std::tuple_element<1, TType>::type T1; 759 typedef typename std::tuple_element<2, TType>::type T2; 760 typedef typename std::tuple_element<3, TType>::type T3; 761 typedef typename std::tuple_element<4, TType>::type T4; 762 typedef typename std::tuple_element<5, TType>::type T5; 763 public: 764 static JType *create(tbb::flow::graph& g) { 765 JType *temp = new JType(g, 766 tag_func<T0>(T0(2)), 767 tag_func<T1>(T1(3)), 768 tag_func<T2>(T2(4)), 769 tag_func<T3>(T3(5)), 770 tag_func<T4>(T4(6)), 771 tag_func<T5>(T5(7)) 772 ); 773 return temp; 774 } 775 static void destroy(JType *p) { delete p; } 776 }; 777 #endif 778 779 #if MAX_TUPLE_TEST_SIZE >= 7 780 template<typename JType, typename K, typename KHash> 781 class makeJoin<7, JType, tbb::flow::key_matching<K, KHash> > { 782 typedef typename JType::output_type TType; 783 typedef typename std::tuple_element<0, TType>::type T0; 784 typedef typename std::tuple_element<1, TType>::type T1; 785 typedef typename std::tuple_element<2, TType>::type T2; 786 typedef typename std::tuple_element<3, TType>::type T3; 787 typedef typename std::tuple_element<4, TType>::type T4; 788 typedef typename std::tuple_element<5, TType>::type T5; 789 typedef typename std::tuple_element<6, TType>::type T6; 790 public: 791 static JType *create(tbb::flow::graph& g) { 792 JType *temp = new JType(g, 793 my_struct_key<K, T0>(), 794 my_struct_key<K, T1>(), 795 my_struct_key<K, T2>(), 796 my_struct_key<K, T3>(), 797 my_struct_key<K, T4>(), 798 my_struct_key<K, T5>(), 799 my_struct_key<K, T6>() 800 ); 801 return temp; 802 } 803 static void destroy(JType *p) { delete p; } 804 }; 805 806 template<typename JType> 807 class makeJoin<7, JType, tbb::flow::tag_matching> { 808 typedef typename JType::output_type TType; 809 typedef typename std::tuple_element<0, TType>::type T0; 810 typedef typename std::tuple_element<1, TType>::type T1; 811 typedef typename std::tuple_element<2, TType>::type T2; 812 typedef typename std::tuple_element<3, TType>::type T3; 813 typedef typename std::tuple_element<4, TType>::type T4; 814 typedef typename std::tuple_element<5, TType>::type T5; 815 typedef typename std::tuple_element<6, TType>::type T6; 816 public: 817 static JType *create(tbb::flow::graph& g) { 818 JType *temp = new JType(g, 819 tag_func<T0>(T0(2)), 820 tag_func<T1>(T1(3)), 821 tag_func<T2>(T2(4)), 822 tag_func<T3>(T3(5)), 823 tag_func<T4>(T4(6)), 824 tag_func<T5>(T5(7)), 825 tag_func<T6>(T6(8)) 826 ); 827 return temp; 828 } 829 static void destroy(JType *p) { delete p; } 830 }; 831 #endif 832 833 #if MAX_TUPLE_TEST_SIZE >= 8 834 template<typename JType, typename K, typename KHash> 835 class makeJoin<8, JType, tbb::flow::key_matching<K, KHash> > { 836 typedef typename JType::output_type TType; 837 typedef typename std::tuple_element<0, TType>::type T0; 838 typedef typename std::tuple_element<1, TType>::type T1; 839 typedef typename std::tuple_element<2, TType>::type T2; 840 typedef typename std::tuple_element<3, TType>::type T3; 841 typedef typename std::tuple_element<4, TType>::type T4; 842 typedef typename std::tuple_element<5, TType>::type T5; 843 typedef typename std::tuple_element<6, TType>::type T6; 844 typedef typename std::tuple_element<7, TType>::type T7; 845 public: 846 static JType *create(tbb::flow::graph& g) { 847 JType *temp = new JType(g, 848 my_struct_key<K, T0>(), 849 my_struct_key<K, T1>(), 850 my_struct_key<K, T2>(), 851 my_struct_key<K, T3>(), 852 my_struct_key<K, T4>(), 853 my_struct_key<K, T5>(), 854 my_struct_key<K, T6>(), 855 my_struct_key<K, T7>() 856 ); 857 return temp; 858 } 859 static void destroy(JType *p) { delete p; } 860 }; 861 862 template<typename JType> 863 class makeJoin<8, JType, tbb::flow::tag_matching> { 864 typedef typename JType::output_type TType; 865 typedef typename std::tuple_element<0, TType>::type T0; 866 typedef typename std::tuple_element<1, TType>::type T1; 867 typedef typename std::tuple_element<2, TType>::type T2; 868 typedef typename std::tuple_element<3, TType>::type T3; 869 typedef typename std::tuple_element<4, TType>::type T4; 870 typedef typename std::tuple_element<5, TType>::type T5; 871 typedef typename std::tuple_element<6, TType>::type T6; 872 typedef typename std::tuple_element<7, TType>::type T7; 873 public: 874 static JType *create(tbb::flow::graph& g) { 875 JType *temp = new JType(g, 876 tag_func<T0>(T0(2)), 877 tag_func<T1>(T1(3)), 878 tag_func<T2>(T2(4)), 879 tag_func<T3>(T3(5)), 880 tag_func<T4>(T4(6)), 881 tag_func<T5>(T5(7)), 882 tag_func<T6>(T6(8)), 883 tag_func<T7>(T7(9)) 884 ); 885 return temp; 886 } 887 static void destroy(JType *p) { delete p; } 888 }; 889 #endif 890 891 #if MAX_TUPLE_TEST_SIZE >= 9 892 template<typename JType, typename K, typename KHash> 893 class makeJoin<9, JType, tbb::flow::key_matching<K, KHash> > { 894 typedef typename JType::output_type TType; 895 typedef typename std::tuple_element<0, TType>::type T0; 896 typedef typename std::tuple_element<1, TType>::type T1; 897 typedef typename std::tuple_element<2, TType>::type T2; 898 typedef typename std::tuple_element<3, TType>::type T3; 899 typedef typename std::tuple_element<4, TType>::type T4; 900 typedef typename std::tuple_element<5, TType>::type T5; 901 typedef typename std::tuple_element<6, TType>::type T6; 902 typedef typename std::tuple_element<7, TType>::type T7; 903 typedef typename std::tuple_element<8, TType>::type T8; 904 public: 905 static JType *create(tbb::flow::graph& g) { 906 JType *temp = new JType(g, 907 my_struct_key<K, T0>(), 908 my_struct_key<K, T1>(), 909 my_struct_key<K, T2>(), 910 my_struct_key<K, T3>(), 911 my_struct_key<K, T4>(), 912 my_struct_key<K, T5>(), 913 my_struct_key<K, T6>(), 914 my_struct_key<K, T7>(), 915 my_struct_key<K, T8>() 916 ); 917 return temp; 918 } 919 static void destroy(JType *p) { delete p; } 920 }; 921 922 template<typename JType> 923 class makeJoin<9, JType, tbb::flow::tag_matching> { 924 typedef typename JType::output_type TType; 925 typedef typename std::tuple_element<0, TType>::type T0; 926 typedef typename std::tuple_element<1, TType>::type T1; 927 typedef typename std::tuple_element<2, TType>::type T2; 928 typedef typename std::tuple_element<3, TType>::type T3; 929 typedef typename std::tuple_element<4, TType>::type T4; 930 typedef typename std::tuple_element<5, TType>::type T5; 931 typedef typename std::tuple_element<6, TType>::type T6; 932 typedef typename std::tuple_element<7, TType>::type T7; 933 typedef typename std::tuple_element<8, TType>::type T8; 934 public: 935 static JType *create(tbb::flow::graph& g) { 936 JType *temp = new JType(g, 937 tag_func<T0>(T0(2)), 938 tag_func<T1>(T1(3)), 939 tag_func<T2>(T2(4)), 940 tag_func<T3>(T3(5)), 941 tag_func<T4>(T4(6)), 942 tag_func<T5>(T5(7)), 943 tag_func<T6>(T6(8)), 944 tag_func<T7>(T7(9)), 945 tag_func<T8>(T8(10)) 946 ); 947 return temp; 948 } 949 static void destroy(JType *p) { delete p; } 950 }; 951 #endif 952 953 #if MAX_TUPLE_TEST_SIZE >= 10 954 template<typename JType, typename K, typename KHash> 955 class makeJoin<10, JType, tbb::flow::key_matching<K, KHash> > { 956 typedef typename JType::output_type TType; 957 typedef typename std::tuple_element<0, TType>::type T0; 958 typedef typename std::tuple_element<1, TType>::type T1; 959 typedef typename std::tuple_element<2, TType>::type T2; 960 typedef typename std::tuple_element<3, TType>::type T3; 961 typedef typename std::tuple_element<4, TType>::type T4; 962 typedef typename std::tuple_element<5, TType>::type T5; 963 typedef typename std::tuple_element<6, TType>::type T6; 964 typedef typename std::tuple_element<7, TType>::type T7; 965 typedef typename std::tuple_element<8, TType>::type T8; 966 typedef typename std::tuple_element<9, TType>::type T9; 967 public: 968 static JType *create(tbb::flow::graph& g) { 969 JType *temp = new JType(g, 970 my_struct_key<K, T0>(), 971 my_struct_key<K, T1>(), 972 my_struct_key<K, T2>(), 973 my_struct_key<K, T3>(), 974 my_struct_key<K, T4>(), 975 my_struct_key<K, T5>(), 976 my_struct_key<K, T6>(), 977 my_struct_key<K, T7>(), 978 my_struct_key<K, T8>(), 979 my_struct_key<K, T9>() 980 ); 981 return temp; 982 } 983 static void destroy(JType *p) { delete p; } 984 }; 985 986 template<typename JType> 987 class makeJoin<10, JType, tbb::flow::tag_matching> { 988 typedef typename JType::output_type TType; 989 typedef typename std::tuple_element<0, TType>::type T0; 990 typedef typename std::tuple_element<1, TType>::type T1; 991 typedef typename std::tuple_element<2, TType>::type T2; 992 typedef typename std::tuple_element<3, TType>::type T3; 993 typedef typename std::tuple_element<4, TType>::type T4; 994 typedef typename std::tuple_element<5, TType>::type T5; 995 typedef typename std::tuple_element<6, TType>::type T6; 996 typedef typename std::tuple_element<7, TType>::type T7; 997 typedef typename std::tuple_element<8, TType>::type T8; 998 typedef typename std::tuple_element<9, TType>::type T9; 999 public: 1000 static JType *create(tbb::flow::graph& g) { 1001 JType *temp = new JType(g, 1002 tag_func<T0>(T0(2)), 1003 tag_func<T1>(T1(3)), 1004 tag_func<T2>(T2(4)), 1005 tag_func<T3>(T3(5)), 1006 tag_func<T4>(T4(6)), 1007 tag_func<T5>(T5(7)), 1008 tag_func<T6>(T6(8)), 1009 tag_func<T7>(T7(9)), 1010 tag_func<T8>(T8(10)), 1011 tag_func<T9>(T9(11)) 1012 ); 1013 return temp; 1014 } 1015 static void destroy(JType *p) { delete p; } 1016 }; 1017 #endif 1018 1019 // holder for input_node pointers for eventual deletion 1020 1021 static void* all_input_nodes[MaxPorts][MaxNInputs]; 1022 1023 template<int ELEM, typename JNT> 1024 class input_node_helper { 1025 public: 1026 typedef JNT join_node_type; 1027 typedef tbb::flow::join_node<std::tuple<int, tbb::flow::continue_msg>, tbb::flow::reserving> input_join_type; 1028 typedef typename join_node_type::output_type TT; 1029 1030 typedef typename std::tuple_element<ELEM-1, TT>::type IT; 1031 typedef typename tbb::flow::input_node<IT> my_input_node_type; 1032 typedef typename tbb::flow::function_node<std::tuple<int, tbb::flow::continue_msg>, IT> my_recirc_function_type; 1033 static void print_remark(const char * str) { 1034 input_node_helper<ELEM-1, JNT>::print_remark(str); 1035 INFO(", " << name_of<IT>::name()); 1036 } 1037 static void add_input_nodes(join_node_type &my_join, tbb::flow::graph &g, int nInputs) { 1038 for(int i = 0; i < nInputs; ++i) { 1039 my_input_node_type *new_node = new my_input_node_type(g, my_input_body<IT, ELEM>(i, nInputs)); 1040 tbb::flow::make_edge(*new_node, tbb::flow::input_port<ELEM-1>(my_join)); 1041 all_input_nodes[ELEM-1][i] = (void *)new_node; 1042 new_node->activate(); 1043 } 1044 // add the next input_node 1045 input_node_helper<ELEM-1, JNT>::add_input_nodes(my_join, g, nInputs); 1046 } 1047 1048 static void add_recirc_func_nodes(join_node_type &my_join, input_join_type &my_input, tbb::flow::graph &g) { 1049 my_recirc_function_type *new_node = new my_recirc_function_type(g, tbb::flow::unlimited, recirc_func_body<IT>((IT)(ELEM+1))); 1050 tbb::flow::make_edge(*new_node, tbb::flow::input_port<ELEM-1>(my_join)); 1051 tbb::flow::make_edge(my_input, *new_node); 1052 all_input_nodes[ELEM-1][0] = (void *)new_node; 1053 input_node_helper<ELEM-1, JNT>::add_recirc_func_nodes(my_join, my_input, g); 1054 } 1055 1056 static void only_check_value(const int i, const TT &v) { 1057 CHECK_MESSAGE(std::get<ELEM-1>(v)==(IT)(i*(ELEM+1)), ""); 1058 input_node_helper<ELEM-1, JNT>::only_check_value(i, v); 1059 } 1060 1061 static void check_value(int i, TT &v, bool is_serial) { 1062 // the fetched value will match only if there is only one input_node. 1063 bool is_correct = !is_serial||std::get<ELEM-1>(v)==(IT)(i*(ELEM+1)); 1064 CHECK_MESSAGE(is_correct, ""); 1065 // tally the fetched value. 1066 int ival = (int)std::get<ELEM-1>(v); 1067 CHECK_MESSAGE(!(ival%(ELEM+1)), ""); 1068 ival /= (ELEM+1); 1069 CHECK_MESSAGE(!outputCheck[ELEM-1][ival], ""); 1070 outputCheck[ELEM-1][ival] = true; 1071 input_node_helper<ELEM-1, JNT>::check_value(i, v, is_serial); 1072 } 1073 static void remove_input_nodes(join_node_type& my_join, int nInputs) { 1074 for(int i = 0; i< nInputs; ++i) { 1075 my_input_node_type *dp = reinterpret_cast<my_input_node_type *>(all_input_nodes[ELEM-1][i]); 1076 tbb::flow::remove_edge(*dp, tbb::flow::input_port<ELEM-1>(my_join)); 1077 delete dp; 1078 } 1079 input_node_helper<ELEM-1, JNT>::remove_input_nodes(my_join, nInputs); 1080 } 1081 1082 static void remove_recirc_func_nodes(join_node_type& my_join, input_join_type &my_input) { 1083 my_recirc_function_type *fn = reinterpret_cast<my_recirc_function_type *>(all_input_nodes[ELEM-1][0]); 1084 tbb::flow::remove_edge(*fn, tbb::flow::input_port<ELEM-1>(my_join)); 1085 tbb::flow::remove_edge(my_input, *fn); 1086 delete fn; 1087 input_node_helper<ELEM-1, JNT>::remove_recirc_func_nodes(my_join, my_input); 1088 } 1089 }; 1090 1091 template<typename JNT> 1092 class input_node_helper<1, JNT> { 1093 typedef JNT join_node_type; 1094 typedef tbb::flow::join_node<std::tuple<int, tbb::flow::continue_msg>, tbb::flow::reserving> input_join_type; 1095 typedef typename join_node_type::output_type TT; 1096 1097 typedef typename std::tuple_element<0, TT>::type IT; 1098 typedef typename tbb::flow::input_node<IT> my_input_node_type; 1099 typedef typename tbb::flow::function_node<std::tuple<int, tbb::flow::continue_msg>, IT> my_recirc_function_type; 1100 public: 1101 static void print_remark(const char * str) { 1102 INFO(str << "< " << name_of<IT>::name()); 1103 } 1104 static void add_input_nodes(join_node_type &my_join, tbb::flow::graph &g, int nInputs) { 1105 for(int i = 0; i < nInputs; ++i) { 1106 my_input_node_type *new_node = new my_input_node_type(g, my_input_body<IT, 1>(i, nInputs)); 1107 tbb::flow::make_edge(*new_node, tbb::flow::input_port<0>(my_join)); 1108 all_input_nodes[0][i] = (void *)new_node; 1109 new_node->activate(); 1110 } 1111 } 1112 1113 static void add_recirc_func_nodes(join_node_type &my_join, input_join_type &my_input, tbb::flow::graph &g) { 1114 my_recirc_function_type *new_node = new my_recirc_function_type(g, tbb::flow::unlimited, recirc_func_body<IT>((IT)(2))); 1115 tbb::flow::make_edge(*new_node, tbb::flow::input_port<0>(my_join)); 1116 tbb::flow::make_edge(my_input, *new_node); 1117 all_input_nodes[0][0] = (void *)new_node; 1118 } 1119 1120 static void only_check_value(const int i, const TT &v) { 1121 CHECK_MESSAGE(std::get<0>(v)==(IT)(i*2), ""); 1122 } 1123 1124 static void check_value(int i, TT &v, bool is_serial) { 1125 bool is_correct = !is_serial||std::get<0>(v)==(IT)(i*(2)); 1126 CHECK_MESSAGE(is_correct, ""); 1127 int ival = (int)std::get<0>(v); 1128 CHECK_MESSAGE(!(ival%2), ""); 1129 ival /= 2; 1130 CHECK_MESSAGE(!outputCheck[0][ival], ""); 1131 outputCheck[0][ival] = true; 1132 } 1133 static void remove_input_nodes(join_node_type& my_join, int nInputs) { 1134 for(int i = 0; i < nInputs; ++i) { 1135 my_input_node_type *dp = reinterpret_cast<my_input_node_type *>(all_input_nodes[0][i]); 1136 tbb::flow::remove_edge(*dp, tbb::flow::input_port<0>(my_join)); 1137 delete dp; 1138 } 1139 } 1140 1141 static void remove_recirc_func_nodes(join_node_type& my_join, input_join_type &my_input) { 1142 my_recirc_function_type *fn = reinterpret_cast<my_recirc_function_type *>(all_input_nodes[0][0]); 1143 tbb::flow::remove_edge(*fn, tbb::flow::input_port<0>(my_join)); 1144 tbb::flow::remove_edge(my_input, *fn); 1145 delete fn; 1146 } 1147 }; 1148 1149 #if _MSC_VER && !defined(__INTEL_COMPILER) 1150 // Suppress "conditional expression is constant" warning. 1151 #pragma warning( push ) 1152 #pragma warning( disable: 4127 ) 1153 #endif 1154 1155 template<typename JType, class JP> 1156 class parallel_test { 1157 public: 1158 typedef typename JType::output_type TType; 1159 typedef typename is_key_matching_join<JP>::key_type key_type; 1160 static void test() { 1161 const int TUPLE_SIZE = std::tuple_size<TType>::value; 1162 const bool is_key_matching = is_key_matching_join<JP>::value; 1163 1164 TType v; 1165 input_node_helper<TUPLE_SIZE, JType>::print_remark("Parallel test of join_node"); 1166 INFO(" > "); 1167 if(is_key_matching) { 1168 INFO("with K == " << name_of<typename K_deref<typename is_key_matching_join<JP>::key_type>::type >::name()); 1169 if(std::is_reference<typename is_key_matching_join<JP>::key_type>::value) { 1170 INFO("&"); 1171 } 1172 } 1173 INFO("\n"); 1174 for(int i = 0; i < MaxPorts; ++i) { 1175 for(int j = 0; j < MaxNInputs; ++j) { 1176 all_input_nodes[i][j] = NULL; 1177 } 1178 } 1179 for(int nInputs = 1; nInputs<=MaxNInputs; ++nInputs) { 1180 tbb::flow::graph g; 1181 bool not_out_of_order = (nInputs==1)&&(!is_key_matching); 1182 JType* my_join = makeJoin<TUPLE_SIZE, JType, JP>::create(g); 1183 tbb::flow::queue_node<TType> outq1(g); 1184 tbb::flow::queue_node<TType> outq2(g); 1185 1186 tbb::flow::make_edge(*my_join, outq1); 1187 tbb::flow::make_edge(*my_join, outq2); 1188 1189 input_node_helper<TUPLE_SIZE, JType>::add_input_nodes((*my_join), g, nInputs); 1190 1191 g.wait_for_all(); 1192 1193 reset_outputCheck(TUPLE_SIZE, Count); 1194 for(int i = 0; i < Count; ++i) { 1195 CHECK_MESSAGE(outq1.try_get(v), ""); 1196 input_node_helper<TUPLE_SIZE, JType>::check_value(i, v, not_out_of_order); 1197 } 1198 1199 check_outputCheck(TUPLE_SIZE, Count); 1200 reset_outputCheck(TUPLE_SIZE, Count); 1201 1202 for(int i = 0; i < Count; i++) { 1203 CHECK_MESSAGE(outq2.try_get(v), "");; 1204 input_node_helper<TUPLE_SIZE, JType>::check_value(i, v, not_out_of_order); 1205 } 1206 check_outputCheck(TUPLE_SIZE, Count); 1207 1208 CHECK_MESSAGE(!outq1.try_get(v), ""); 1209 CHECK_MESSAGE(!outq2.try_get(v), ""); 1210 1211 input_node_helper<TUPLE_SIZE, JType>::remove_input_nodes((*my_join), nInputs); 1212 tbb::flow::remove_edge(*my_join, outq1); 1213 tbb::flow::remove_edge(*my_join, outq2); 1214 makeJoin<TUPLE_SIZE, JType, JP>::destroy(my_join); 1215 } 1216 } 1217 }; 1218 1219 1220 template<int ELEM, typename JType> 1221 class serial_queue_helper { 1222 public: 1223 typedef typename JType::output_type TT; 1224 typedef typename std::tuple_element<ELEM-1, TT>::type IT; 1225 typedef typename tbb::flow::queue_node<IT> my_queue_node_type; 1226 static void print_remark() { 1227 serial_queue_helper<ELEM-1, JType>::print_remark(); 1228 INFO(", " << name_of<IT>::name()); 1229 } 1230 static void add_queue_nodes(tbb::flow::graph &g, JType &my_join) { 1231 serial_queue_helper<ELEM-1, JType>::add_queue_nodes(g, my_join); 1232 my_queue_node_type *new_node = new my_queue_node_type(g); 1233 tbb::flow::make_edge(*new_node, std::get<ELEM-1>(my_join.input_ports())); 1234 all_input_nodes[ELEM-1][0] = (void *)new_node; 1235 } 1236 1237 static void fill_one_queue(int maxVal) { 1238 // fill queue to "left" of me 1239 my_queue_node_type *qptr = reinterpret_cast<my_queue_node_type *>(all_input_nodes[ELEM-1][0]); 1240 serial_queue_helper<ELEM-1, JType>::fill_one_queue(maxVal); 1241 for(int i = 0; i < maxVal; ++i) { 1242 CHECK_MESSAGE(qptr->try_put(make_thingie<IT, ELEM>()(i)), ""); 1243 } 1244 } 1245 1246 static void put_one_queue_val(int myVal) { 1247 // put this val to my "left". 1248 serial_queue_helper<ELEM-1, JType>::put_one_queue_val(myVal); 1249 my_queue_node_type *qptr = reinterpret_cast<my_queue_node_type *>(all_input_nodes[ELEM-1][0]); 1250 CHECK_MESSAGE(qptr->try_put(make_thingie<IT, ELEM>()(myVal)), ""); 1251 } 1252 1253 static void check_queue_value(int i, TT &v) { 1254 serial_queue_helper<ELEM-1, JType>::check_queue_value(i, v); 1255 CHECK_MESSAGE(cast_from<IT>::my_int_val(std::get<ELEM-1>(v))==i * (ELEM+1), ""); 1256 } 1257 1258 static void remove_queue_nodes(JType &my_join) { 1259 my_queue_node_type *vptr = reinterpret_cast<my_queue_node_type *>(all_input_nodes[ELEM-1][0]); 1260 tbb::flow::remove_edge(*vptr, std::get<ELEM-1>(my_join.input_ports())); 1261 serial_queue_helper<ELEM-1, JType>::remove_queue_nodes(my_join); 1262 delete vptr; 1263 } 1264 }; 1265 1266 template<typename JType> 1267 class serial_queue_helper<1, JType> { 1268 public: 1269 typedef typename JType::output_type TT; 1270 typedef typename std::tuple_element<0, TT>::type IT; 1271 typedef typename tbb::flow::queue_node<IT> my_queue_node_type; 1272 static void print_remark() { 1273 INFO("Serial test of join_node< " << name_of<IT>::name()); 1274 } 1275 1276 static void add_queue_nodes(tbb::flow::graph &g, JType &my_join) { 1277 my_queue_node_type *new_node = new my_queue_node_type(g); 1278 tbb::flow::make_edge(*new_node, tbb::flow::input_port<0>(my_join)); 1279 all_input_nodes[0][0] = (void *)new_node; 1280 } 1281 1282 static void fill_one_queue(int maxVal) { 1283 my_queue_node_type *qptr = reinterpret_cast<my_queue_node_type *>(all_input_nodes[0][0]); 1284 for(int i = 0; i < maxVal; ++i) { 1285 CHECK_MESSAGE(qptr->try_put(make_thingie<IT, 1>()(i)), ""); 1286 } 1287 } 1288 1289 static void put_one_queue_val(int myVal) { 1290 my_queue_node_type *qptr = reinterpret_cast<my_queue_node_type *>(all_input_nodes[0][0]); 1291 IT my_val = make_thingie<IT, 1>()(myVal); 1292 CHECK_MESSAGE(qptr->try_put(my_val), ""); 1293 } 1294 1295 static void check_queue_value(int i, TT &v) { 1296 CHECK_MESSAGE(cast_from<IT>::my_int_val(std::get<0>(v))==i*2, ""); 1297 } 1298 1299 static void remove_queue_nodes(JType &my_join) { 1300 my_queue_node_type *vptr = reinterpret_cast<my_queue_node_type *>(all_input_nodes[0][0]); 1301 tbb::flow::remove_edge(*vptr, std::get<0>(my_join.input_ports())); 1302 delete vptr; 1303 } 1304 }; 1305 1306 // 1307 // Single reservable predecessor at each port, single accepting and rejecting successor 1308 // * put to buffer before port0, then put to buffer before port1, ... 1309 // * fill buffer before port0 then fill buffer before port1, ... 1310 1311 template<typename JType, class JP> 1312 void test_one_serial(JType &my_join, tbb::flow::graph &g) { 1313 typedef typename JType::output_type TType; 1314 static const int TUPLE_SIZE = std::tuple_size<TType>::value; 1315 bool is_key_matching = is_key_matching_join<JP>::value; 1316 std::vector<bool> flags; 1317 serial_queue_helper<TUPLE_SIZE, JType>::add_queue_nodes(g, my_join); 1318 typedef TType q3_input_type; 1319 tbb::flow::queue_node< q3_input_type > q3(g); 1320 1321 tbb::flow::make_edge(my_join, q3); 1322 1323 // fill each queue with its value one-at-a-time 1324 flags.clear(); 1325 for(int i = 0; i < Count; ++i) { 1326 serial_queue_helper<TUPLE_SIZE, JType>::put_one_queue_val(i); 1327 flags.push_back(false); 1328 } 1329 1330 g.wait_for_all(); 1331 for(int i = 0; i < Count; ++i) { 1332 q3_input_type v; 1333 g.wait_for_all(); 1334 CHECK_MESSAGE( (q3.try_get(v)), "Error in try_get()"); 1335 if(is_key_matching) { 1336 // because we look up tags in the hash table, the output may be out of order. 1337 int j = int(std::get<0>(v))/2; // figure what the index should be 1338 serial_queue_helper<TUPLE_SIZE, JType>::check_queue_value(j, v); 1339 flags[j] = true; 1340 } 1341 else { 1342 serial_queue_helper<TUPLE_SIZE, JType>::check_queue_value(i, v); 1343 } 1344 } 1345 1346 if(is_key_matching) { 1347 for(int i = 0; i < Count; ++i) { 1348 CHECK_MESSAGE(flags[i], ""); 1349 flags[i] = false; 1350 } 1351 } 1352 1353 tbb::flow::remove_edge(my_join, q3); 1354 tbb::flow::limiter_node<q3_input_type> limiter(g, Count / 2); 1355 tbb::flow::make_edge(my_join, limiter); 1356 tbb::flow::make_edge(limiter, q3); 1357 1358 // fill each queue completely before filling the next. 1359 serial_queue_helper<TUPLE_SIZE, JType>::fill_one_queue(Count); 1360 1361 g.wait_for_all(); 1362 for(int i = 0; i < Count / 2; ++i) { 1363 q3_input_type v; 1364 g.wait_for_all(); 1365 CHECK_MESSAGE( (q3.try_get(v)), "Error in try_get()"); 1366 if(is_key_matching) { 1367 int j = int(std::get<0>(v))/2; 1368 serial_queue_helper<TUPLE_SIZE, JType>::check_queue_value(j, v); 1369 flags[j] = true; 1370 } 1371 else { 1372 serial_queue_helper<TUPLE_SIZE, JType>::check_queue_value(i, v); 1373 } 1374 } 1375 1376 if(is_key_matching) { 1377 CHECK(std::count(flags.begin(), flags.end(), true) == Count / 2); 1378 } 1379 1380 serial_queue_helper<TUPLE_SIZE, JType>::remove_queue_nodes(my_join); 1381 } 1382 1383 template<typename JType, class JP> 1384 class serial_test { 1385 typedef typename JType::output_type TType; 1386 public: 1387 static void test() { 1388 tbb::flow::graph g; 1389 std::vector<bool> flags; 1390 bool is_key_matching = is_key_matching_join<JP>::value; 1391 flags.reserve(Count); 1392 1393 const int TUPLE_SIZE = std::tuple_size<TType>::value; 1394 static const int ELEMS = 3; 1395 1396 JType* my_join = makeJoin<TUPLE_SIZE, JType, JP>::create(g); 1397 test_input_ports_return_ref(*my_join); 1398 serial_queue_helper<TUPLE_SIZE, JType>::print_remark(); INFO(" >"); 1399 if(is_key_matching) { 1400 INFO("with K == " << name_of<typename K_deref<typename is_key_matching_join<JP>::key_type>::type >::name()); 1401 if(std::is_reference<typename is_key_matching_join<JP>::key_type>::value) { 1402 INFO("&"); 1403 } 1404 } 1405 INFO("\n"); 1406 1407 test_one_serial<JType, JP>(*my_join, g); 1408 // build the vector with copy construction from the used join node. 1409 std::vector<JType>join_vector(ELEMS, *my_join); 1410 // destroy the tired old join_node in case we're accidentally reusing pieces of it. 1411 makeJoin<TUPLE_SIZE, JType, JP>::destroy(my_join); 1412 1413 for(int e = 0; e < ELEMS; ++e) { // exercise each of the vector elements 1414 test_one_serial<JType, JP>(join_vector[e], g); 1415 } 1416 } 1417 1418 }; // serial_test 1419 1420 #if _MSC_VER && !defined(__INTEL_COMPILER) 1421 #pragma warning( pop ) 1422 #endif 1423 1424 template< 1425 template<typename, class > class TestType, // serial_test or parallel_test 1426 typename OutputTupleType, // type of the output of the join 1427 class J> // graph_buffer_policy (reserving, queueing, tag_matching or key_matching) 1428 class generate_test { 1429 public: 1430 typedef tbb::flow::join_node<OutputTupleType, typename filter_out_message_based_key_matching<J>::policy> join_node_type; 1431 static void do_test() { 1432 TestType<join_node_type, J>::test(); 1433 } 1434 }; 1435 1436 template<class JP> 1437 void test_input_port_policies(); 1438 1439 // join_node (reserving) does not consume inputs until an item is available at 1440 // every input. It tries to reserve each input, and if any fails it releases the 1441 // reservation. When it builds a tuple it broadcasts to all its successors and 1442 // consumes all the inputs. 1443 // 1444 // So our test will put an item at one input port, then attach another node to the 1445 // same node (a queue node in this case). The second successor should receive the 1446 // item in the queue, emptying it. 1447 // 1448 // We then place an item in the second input queue, and check the output queues; they 1449 // should still be empty. Then we place an item in the first queue; the output queues 1450 // should then receive a tuple. 1451 // 1452 // we then attach another function node to the second input. It should not receive 1453 // an item, verifying that the item in the queue is consumed. 1454 template<> 1455 void test_input_port_policies<tbb::flow::reserving>() { 1456 tbb::flow::graph g; 1457 typedef tbb::flow::join_node<std::tuple<int, int>, tbb::flow::reserving > JType; // two-phase is the default policy 1458 // create join_node<type0,type1> jn 1459 JType jn(g); 1460 // create output_queue oq0, oq1 1461 typedef JType::output_type OQType; 1462 tbb::flow::queue_node<OQType> oq0(g); 1463 tbb::flow::queue_node<OQType> oq1(g); 1464 // create iq0, iq1 1465 typedef tbb::flow::queue_node<int> IQType; 1466 IQType iq0(g); 1467 IQType iq1(g); 1468 // create qnp, qnq 1469 IQType qnp(g); 1470 IQType qnq(g); 1471 INFO("Testing policies of join_node<reserving> input ports\n"); 1472 // attach jn to oq0, oq1 1473 tbb::flow::make_edge(jn, oq0); 1474 tbb::flow::make_edge(jn, oq1); 1475 1476 // attach iq0, iq1 to jn 1477 tbb::flow::make_edge(iq0, std::get<0>(jn.input_ports())); 1478 tbb::flow::make_edge(iq1, std::get<1>(jn.input_ports())); 1479 1480 for(int loop = 0; loop < 3; ++loop) { 1481 // place one item in iq0 1482 CHECK_MESSAGE( (iq0.try_put(1)), "Error putting to iq1"); 1483 // attach iq0 to qnp 1484 tbb::flow::make_edge(iq0, qnp); 1485 // qnp should have an item in it. 1486 g.wait_for_all(); 1487 { 1488 int i; 1489 CHECK_MESSAGE( (qnp.try_get(i)&&i==1), "Error in item fetched by qnp"); 1490 } 1491 // place item in iq1 1492 CHECK_MESSAGE( (iq1.try_put(2)), "Error putting to iq1"); 1493 // oq0, oq1 should be empty 1494 g.wait_for_all(); 1495 { 1496 OQType t1; 1497 CHECK_MESSAGE( (!oq0.try_get(t1)&&!oq1.try_get(t1)), "oq0 and oq1 not empty"); 1498 } 1499 // detach qnp from iq0 1500 tbb::flow::remove_edge(iq0, qnp); // if we don't remove qnp it will gobble any values we put in iq0 1501 // place item in iq0 1502 CHECK_MESSAGE( (iq0.try_put(3)), "Error on second put to iq0"); 1503 // oq0, oq1 should have items in them 1504 g.wait_for_all(); 1505 { 1506 OQType t0; 1507 OQType t1; 1508 CHECK_MESSAGE( (oq0.try_get(t0)&&std::get<0>(t0)==3&&std::get<1>(t0)==2), "Error in oq0 output"); 1509 CHECK_MESSAGE( (oq1.try_get(t1)&&std::get<0>(t1)==3&&std::get<1>(t1)==2), "Error in oq1 output"); 1510 } 1511 // attach qnp to iq0, qnq to iq1 1512 // qnp and qnq should be empty 1513 tbb::flow::make_edge(iq0, qnp); 1514 tbb::flow::make_edge(iq1, qnq); 1515 g.wait_for_all(); 1516 { 1517 int i; 1518 CHECK_MESSAGE( (!qnp.try_get(i)), "iq0 still had value in it"); 1519 CHECK_MESSAGE( (!qnq.try_get(i)), "iq1 still had value in it"); 1520 } 1521 tbb::flow::remove_edge(iq0, qnp); 1522 tbb::flow::remove_edge(iq1, qnq); 1523 } // for ( int loop ... 1524 } 1525 1526 // join_node (queueing) consumes inputs as soon as they are available at 1527 // any input. When it builds a tuple it broadcasts to all its successors and 1528 // discards the broadcast values. 1529 // 1530 // So our test will put an item at one input port, then attach another node to the 1531 // same node (a queue node in this case). The second successor should not receive 1532 // an item (because the join consumed it). 1533 // 1534 // We then place an item in the second input queue, and check the output queues; they 1535 // should each have a tuple. 1536 // 1537 // we then attach another function node to the second input. It should not receive 1538 // an item, verifying that the item in the queue is consumed. 1539 template<> 1540 void test_input_port_policies<tbb::flow::queueing>() { 1541 tbb::flow::graph g; 1542 typedef tbb::flow::join_node<std::tuple<int, int>, tbb::flow::queueing > JType; 1543 // create join_node<type0,type1> jn 1544 JType jn(g); 1545 // create output_queue oq0, oq1 1546 typedef JType::output_type OQType; 1547 tbb::flow::queue_node<OQType> oq0(g); 1548 tbb::flow::queue_node<OQType> oq1(g); 1549 // create iq0, iq1 1550 typedef tbb::flow::queue_node<int> IQType; 1551 IQType iq0(g); 1552 IQType iq1(g); 1553 // create qnp, qnq 1554 IQType qnp(g); 1555 IQType qnq(g); 1556 INFO("Testing policies of join_node<queueing> input ports\n"); 1557 // attach jn to oq0, oq1 1558 tbb::flow::make_edge(jn, oq0); 1559 tbb::flow::make_edge(jn, oq1); 1560 1561 // attach iq0, iq1 to jn 1562 tbb::flow::make_edge(iq0, std::get<0>(jn.input_ports())); 1563 tbb::flow::make_edge(iq1, std::get<1>(jn.input_ports())); 1564 1565 for(int loop = 0; loop < 3; ++loop) { 1566 // place one item in iq0 1567 CHECK_MESSAGE( (iq0.try_put(1)), "Error putting to iq1"); 1568 // attach iq0 to qnp 1569 tbb::flow::make_edge(iq0, qnp); 1570 // qnp should have an item in it. 1571 g.wait_for_all(); 1572 { 1573 int i; 1574 CHECK_MESSAGE( (!qnp.try_get(i)), "Item was received by qnp"); 1575 } 1576 // place item in iq1 1577 CHECK_MESSAGE( (iq1.try_put(2)), "Error putting to iq1"); 1578 // oq0, oq1 should have items 1579 g.wait_for_all(); 1580 { 1581 OQType t0; 1582 OQType t1; 1583 CHECK_MESSAGE( (oq0.try_get(t0)&&std::get<0>(t0)==1&&std::get<1>(t0)==2), "Error in oq0 output"); 1584 CHECK_MESSAGE( (oq1.try_get(t1)&&std::get<0>(t1)==1&&std::get<1>(t1)==2), "Error in oq1 output"); 1585 } 1586 // attach qnq to iq1 1587 // qnp and qnq should be empty 1588 tbb::flow::make_edge(iq1, qnq); 1589 g.wait_for_all(); 1590 { 1591 int i; 1592 CHECK_MESSAGE( (!qnp.try_get(i)), "iq0 still had value in it"); 1593 CHECK_MESSAGE( (!qnq.try_get(i)), "iq1 still had value in it"); 1594 } 1595 tbb::flow::remove_edge(iq0, qnp); 1596 tbb::flow::remove_edge(iq1, qnq); 1597 } // for ( int loop ... 1598 } 1599 1600 template<typename T> 1601 struct myTagValue { 1602 tbb::flow::tag_value operator()(T i) { return tbb::flow::tag_value(i); } 1603 }; 1604 1605 template<> 1606 struct myTagValue<CheckType<int> > { 1607 tbb::flow::tag_value operator()(CheckType<int> i) { return tbb::flow::tag_value((int)i); } 1608 }; 1609 1610 // join_node (tag_matching) consumes inputs as soon as they are available at 1611 // any input. When it builds a tuple it broadcasts to all its successors and 1612 // discards the broadcast values. 1613 // 1614 // It chooses the tuple it broadcasts by matching the tag values returned by the 1615 // methods given the constructor of the join, in this case the method just casts 1616 // the value in each port to tag_value. 1617 // 1618 // So our test will put an item at one input port, then attach another node to the 1619 // same node (a queue node in this case). The second successor should not receive 1620 // an item (because the join consumed it). 1621 // 1622 // We then place an item in the second input queue, and check the output queues; they 1623 // should each have a tuple. 1624 // 1625 // we then attach another queue node to the second input. It should not receive 1626 // an item, verifying that the item in the queue is consumed. 1627 // 1628 // We will then exercise the join with a bunch of values, and the output order should 1629 // be determined by the order we insert items into the second queue. (Each tuple set 1630 // corresponding to a tag will be complete when the second item is inserted.) 1631 template<> 1632 void test_input_port_policies<tbb::flow::tag_matching>() { 1633 tbb::flow::graph g; 1634 typedef tbb::flow::join_node<std::tuple<int, CheckType<int> >, tbb::flow::tag_matching > JoinNodeType; 1635 typedef JoinNodeType::output_type CheckTupleType; 1636 JoinNodeType testJoinNode(g, myTagValue<int>(), myTagValue<CheckType<int> >()); 1637 tbb::flow::queue_node<CheckTupleType> checkTupleQueue0(g); 1638 tbb::flow::queue_node<CheckTupleType> checkTupleQueue1(g); 1639 { 1640 Checker<CheckType<int> > my_check; 1641 1642 1643 typedef tbb::flow::queue_node<int> IntQueueType; 1644 typedef tbb::flow::queue_node<CheckType<int> > CheckQueueType; 1645 IntQueueType intInputQueue(g); 1646 CheckQueueType checkInputQueue(g); 1647 IntQueueType intEmptyTestQueue(g); 1648 CheckQueueType checkEmptyTestQueue(g); 1649 INFO("Testing policies of join_node<tag_matching> input ports\n"); 1650 // attach testJoinNode to checkTupleQueue0, checkTupleQueue1 1651 tbb::flow::make_edge(testJoinNode, checkTupleQueue0); 1652 tbb::flow::make_edge(testJoinNode, checkTupleQueue1); 1653 1654 // attach intInputQueue, checkInputQueue to testJoinNode 1655 tbb::flow::make_edge(intInputQueue, tbb::flow::input_port<0>(testJoinNode)); 1656 tbb::flow::make_edge(checkInputQueue, tbb::flow::input_port<1>(testJoinNode)); 1657 1658 // we'll put four discrete values in the inputs to the join_node. Each 1659 // set of inputs should result in one output. 1660 for(int loop = 0; loop < 4; ++loop) { 1661 // place one item in intInputQueue 1662 CHECK_MESSAGE( (intInputQueue.try_put(loop)), "Error putting to intInputQueue"); 1663 // attach intInputQueue to intEmptyTestQueue 1664 tbb::flow::make_edge(intInputQueue, intEmptyTestQueue); 1665 // intEmptyTestQueue should not have an item in it. (the join consumed it.) 1666 g.wait_for_all(); 1667 { 1668 int intVal0; 1669 CHECK_MESSAGE( (!intEmptyTestQueue.try_get(intVal0)), "Item was received by intEmptyTestQueue"); 1670 } 1671 // place item in checkInputQueue 1672 CheckType<int> checkVal0(loop); 1673 CHECK_MESSAGE( (checkInputQueue.try_put(checkVal0)), "Error putting to checkInputQueue"); 1674 // checkTupleQueue0, checkTupleQueue1 should have items 1675 g.wait_for_all(); 1676 { 1677 CheckTupleType t0; 1678 CheckTupleType t1; 1679 CHECK_MESSAGE( (checkTupleQueue0.try_get(t0)&&std::get<0>(t0)==loop&&(int)std::get<1>(t0)==loop), "Error in checkTupleQueue0 output"); 1680 CHECK_MESSAGE( (checkTupleQueue1.try_get(t1)&&std::get<0>(t1)==loop&&(int)std::get<1>(t1)==loop), "Error in checkTupleQueue1 output"); 1681 CHECK_MESSAGE( (!checkTupleQueue0.try_get(t0)), "extra object in output queue checkTupleQueue0"); 1682 CHECK_MESSAGE( (!checkTupleQueue1.try_get(t0)), "extra object in output queue checkTupleQueue1"); 1683 } 1684 // attach checkEmptyTestQueue to checkInputQueue 1685 // intEmptyTestQueue and checkEmptyTestQueue should be empty 1686 tbb::flow::make_edge(checkInputQueue, checkEmptyTestQueue); 1687 g.wait_for_all(); 1688 { 1689 int intVal1; 1690 CheckType<int> checkVal1; 1691 CHECK_MESSAGE( (!intEmptyTestQueue.try_get(intVal1)), "intInputQueue still had value in it"); 1692 CHECK_MESSAGE( (!checkEmptyTestQueue.try_get(checkVal1)), "checkInputQueue still had value in it"); 1693 } 1694 tbb::flow::remove_edge(intInputQueue, intEmptyTestQueue); 1695 tbb::flow::remove_edge(checkInputQueue, checkEmptyTestQueue); 1696 } // for ( int loop ... 1697 1698 // Now we'll put [4 .. nValues - 1] in intInputQueue, and then put [4 .. nValues - 1] in checkInputQueue in 1699 // a different order. We should see tuples in the output queues in the order we inserted 1700 // the integers into checkInputQueue. 1701 const int nValues = 100; 1702 const int nIncr = 31; // relatively prime to nValues 1703 1704 for(int loop = 4; loop < 4+nValues; ++loop) { 1705 // place one item in intInputQueue 1706 CHECK_MESSAGE( (intInputQueue.try_put(loop)), "Error putting to intInputQueue"); 1707 g.wait_for_all(); 1708 { 1709 CheckTupleType t3; 1710 CHECK_MESSAGE( (!checkTupleQueue0.try_get(t3)), "Object in output queue"); 1711 CHECK_MESSAGE( (!checkTupleQueue1.try_get(t3)), "Object in output queue"); 1712 } 1713 } // for ( int loop ... 1714 1715 for(int loop = 1; loop<=nValues; ++loop) { 1716 int lp1 = 4+(loop * nIncr)%nValues; 1717 // place item in checkInputQueue 1718 CHECK_MESSAGE( (checkInputQueue.try_put(lp1)), "Error putting to checkInputQueue"); 1719 // checkTupleQueue0, checkTupleQueue1 should have items 1720 g.wait_for_all(); 1721 { 1722 CheckTupleType t0; 1723 CheckTupleType t1; 1724 CHECK_MESSAGE( (checkTupleQueue0.try_get(t0)&&std::get<0>(t0)==lp1 && std::get<1>(t0)==lp1), "Error in checkTupleQueue0 output"); 1725 CHECK_MESSAGE( (checkTupleQueue1.try_get(t1)&&std::get<0>(t1)==lp1 && std::get<1>(t1)==lp1), "Error in checkTupleQueue1 output"); 1726 CHECK_MESSAGE( (!checkTupleQueue0.try_get(t0)), "extra object in output queue checkTupleQueue0"); 1727 CHECK_MESSAGE( (!checkTupleQueue1.try_get(t0)), "extra object in output queue checkTupleQueue1"); 1728 } 1729 } // for ( int loop ... 1730 } // Check 1731 } 1732 1733 template<typename Policy> struct policy_name {}; 1734 1735 template<> struct policy_name<tbb::flow::queueing> { 1736 const char* msg_beg() { return "queueing\n";} 1737 const char* msg_end() { return "test queueing extract\n";} 1738 }; 1739 1740 template<> struct policy_name<tbb::flow::reserving> { 1741 const char* msg_beg() { return "reserving\n";} 1742 const char* msg_end() { return "test reserving extract\n";} 1743 }; 1744 1745 template<> struct policy_name<tbb::flow::tag_matching> { 1746 const char* msg_beg() { return "tag_matching\n";} 1747 const char* msg_end() { return "test tag_matching extract\n";} 1748 }; 1749 1750 template<typename Policy> 1751 void test_main() { 1752 test_input_port_policies<Policy>(); 1753 for(int p = 0; p < 2; ++p) { 1754 INFO(policy_name<Policy>().msg_beg()); 1755 generate_test<serial_test, std::tuple<threebyte, double>, Policy>::do_test(); 1756 #if MAX_TUPLE_TEST_SIZE >= 4 1757 { 1758 Checker<CheckType<int> > my_check; 1759 generate_test<serial_test, std::tuple<float, double, CheckType<int>, long>, Policy>::do_test(); 1760 } 1761 #endif 1762 #if MAX_TUPLE_TEST_SIZE >= 6 1763 generate_test<serial_test, std::tuple<double, double, int, long, int, short>, Policy>::do_test(); 1764 #endif 1765 #if MAX_TUPLE_TEST_SIZE >= 8 1766 generate_test<serial_test, std::tuple<float, double, double, double, float, int, float, long>, Policy>::do_test(); 1767 #endif 1768 #if MAX_TUPLE_TEST_SIZE >= 10 1769 generate_test<serial_test, std::tuple<float, double, int, double, double, float, long, int, float, long>, Policy>::do_test(); 1770 #endif 1771 { 1772 Checker<CheckType<int> > my_check1; 1773 generate_test<parallel_test, std::tuple<float, CheckType<int> >, Policy>::do_test(); 1774 } 1775 #if MAX_TUPLE_TEST_SIZE >= 3 1776 generate_test<parallel_test, std::tuple<float, int, long>, Policy>::do_test(); 1777 #endif 1778 #if MAX_TUPLE_TEST_SIZE >= 5 1779 generate_test<parallel_test, std::tuple<double, double, int, int, short>, Policy>::do_test(); 1780 #endif 1781 #if MAX_TUPLE_TEST_SIZE >= 7 1782 generate_test<parallel_test, std::tuple<float, int, double, float, long, float, long>, Policy>::do_test(); 1783 #endif 1784 #if MAX_TUPLE_TEST_SIZE >= 9 1785 generate_test<parallel_test, std::tuple<float, double, int, double, double, long, int, float, long>, Policy>::do_test(); 1786 #endif 1787 } 1788 } 1789 1790 #endif /* tbb_test_join_node_H */ 1791