1 /* 2 Copyright (c) 2005-2020 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 // TODO: Add overlapping put / receive tests 18 19 #include "common/config.h" 20 21 // TODO revamp: move parts dependent on __TBB_EXTRA_DEBUG into separate test(s) since having these 22 // parts in all of tests might make testing of the product, which is different from what is actually 23 // released. 24 #define __TBB_EXTRA_DEBUG 1 25 #include "tbb/flow_graph.h" 26 27 #include "common/test.h" 28 #include "common/utils.h" 29 #include "common/utils_assert.h" 30 #include "common/checktype.h" 31 #include "common/graph_utils.h" 32 #include "common/test_follows_and_precedes_api.h" 33 34 #include <cstdio> 35 36 37 //! \file test_queue_node.cpp 38 //! \brief Test for [flow_graph.queue_node] specification 39 40 41 #define N 1000 42 #define C 10 43 44 template< typename T > 45 void spin_try_get( tbb::flow::queue_node<T> &q, T &value ) { 46 while ( q.try_get(value) != true ) ; 47 } 48 49 template< typename T > 50 void check_item( T* next_value, T &value ) { 51 int tid = value / N; 52 int offset = value % N; 53 CHECK_MESSAGE( next_value[tid] == T(offset), "" ); 54 ++next_value[tid]; 55 } 56 57 template< typename T > 58 struct parallel_puts : utils::NoAssign { 59 60 tbb::flow::queue_node<T> &my_q; 61 62 parallel_puts( tbb::flow::queue_node<T> &q ) : my_q(q) {} 63 64 void operator()(int i) const { 65 for (int j = 0; j < N; ++j) { 66 bool msg = my_q.try_put( T(N*i + j) ); 67 CHECK_MESSAGE( msg == true, "" ); 68 } 69 } 70 71 }; 72 73 74 75 template< typename T > 76 struct touches { 77 78 bool **my_touches; 79 T **my_last_touch; 80 int my_num_threads; 81 82 touches( int num_threads ) : my_num_threads(num_threads) { 83 my_last_touch = new T* [my_num_threads]; 84 my_touches = new bool* [my_num_threads]; 85 for ( int p = 0; p < my_num_threads; ++p) { 86 my_last_touch[p] = new T[my_num_threads]; 87 for ( int p2 = 0; p2 < my_num_threads; ++p2) 88 my_last_touch[p][p2] = -1; 89 90 my_touches[p] = new bool[N*my_num_threads]; 91 for ( int n = 0; n < N*my_num_threads; ++n) 92 my_touches[p][n] = false; 93 } 94 } 95 96 ~touches() { 97 for ( int p = 0; p < my_num_threads; ++p) { 98 delete [] my_touches[p]; 99 delete [] my_last_touch[p]; 100 } 101 delete [] my_touches; 102 delete [] my_last_touch; 103 } 104 105 bool check( int tid, T v ) { 106 int v_tid = v / N; 107 if ( my_touches[tid][v] != false ) { 108 printf("Error: value seen twice by local thread\n"); 109 return false; 110 } 111 if ( v <= my_last_touch[tid][v_tid] ) { 112 printf("Error: value seen in wrong order by local thread\n"); 113 return false; 114 } 115 my_last_touch[tid][v_tid] = v; 116 my_touches[tid][v] = true; 117 return true; 118 } 119 120 bool validate_touches() { 121 bool *all_touches = new bool[N*my_num_threads]; 122 for ( int n = 0; n < N*my_num_threads; ++n) 123 all_touches[n] = false; 124 125 for ( int p = 0; p < my_num_threads; ++p) { 126 for ( int n = 0; n < N*my_num_threads; ++n) { 127 if ( my_touches[p][n] == true ) { 128 CHECK_MESSAGE( ( all_touches[n] == false), "value see by more than one thread\n" ); 129 all_touches[n] = true; 130 } 131 } 132 } 133 for ( int n = 0; n < N*my_num_threads; ++n) { 134 if ( !all_touches[n] ) 135 printf("No touch at %d, my_num_threads = %d\n", n, my_num_threads); 136 //CHECK_MESSAGE( ( all_touches[n] == true), "value not seen by any thread\n" ); 137 } 138 delete [] all_touches; 139 return true; 140 } 141 142 }; 143 144 template< typename T > 145 struct parallel_gets : utils::NoAssign { 146 147 tbb::flow::queue_node<T> &my_q; 148 touches<T> &my_touches; 149 150 parallel_gets( tbb::flow::queue_node<T> &q, touches<T> &t) : my_q(q), my_touches(t) {} 151 152 void operator()(int tid) const { 153 for (int j = 0; j < N; ++j) { 154 T v; 155 spin_try_get( my_q, v ); 156 my_touches.check( tid, v ); 157 } 158 } 159 160 }; 161 162 template< typename T > 163 struct parallel_put_get : utils::NoAssign { 164 165 tbb::flow::queue_node<T> &my_q; 166 touches<T> &my_touches; 167 168 parallel_put_get( tbb::flow::queue_node<T> &q, touches<T> &t ) : my_q(q), my_touches(t) {} 169 170 void operator()(int tid) const { 171 172 for ( int i = 0; i < N; i+=C ) { 173 int j_end = ( N < i + C ) ? N : i + C; 174 // dump about C values into the Q 175 for ( int j = i; j < j_end; ++j ) { 176 CHECK_MESSAGE( my_q.try_put( T (N*tid + j ) ) == true, "" ); 177 } 178 // receiver about C values from the Q 179 for ( int j = i; j < j_end; ++j ) { 180 T v; 181 spin_try_get( my_q, v ); 182 my_touches.check( tid, v ); 183 } 184 } 185 } 186 187 }; 188 189 // 190 // Tests 191 // 192 // Item can be reserved, released, consumed ( single serial receiver ) 193 // 194 template< typename T > 195 int test_reservation() { 196 tbb::flow::graph g; 197 T bogus_value(-1); 198 199 // Simple tests 200 tbb::flow::queue_node<T> q(g); 201 202 q.try_put(T(1)); 203 q.try_put(T(2)); 204 q.try_put(T(3)); 205 206 T v; 207 CHECK_MESSAGE( q.reserve_item(v) == true, "" ); 208 CHECK_MESSAGE( v == T(1), "" ); 209 CHECK_MESSAGE( q.release_reservation() == true, "" ); 210 v = bogus_value; 211 g.wait_for_all(); 212 CHECK_MESSAGE( q.reserve_item(v) == true, "" ); 213 CHECK_MESSAGE( v == T(1), "" ); 214 CHECK_MESSAGE( q.consume_reservation() == true, "" ); 215 v = bogus_value; 216 g.wait_for_all(); 217 218 CHECK_MESSAGE( q.try_get(v) == true, "" ); 219 CHECK_MESSAGE( v == T(2), "" ); 220 v = bogus_value; 221 g.wait_for_all(); 222 223 CHECK_MESSAGE( q.reserve_item(v) == true, "" ); 224 CHECK_MESSAGE( v == T(3), "" ); 225 CHECK_MESSAGE( q.release_reservation() == true, "" ); 226 v = bogus_value; 227 g.wait_for_all(); 228 CHECK_MESSAGE( q.reserve_item(v) == true, "" ); 229 CHECK_MESSAGE( v == T(3), "" ); 230 CHECK_MESSAGE( q.consume_reservation() == true, "" ); 231 v = bogus_value; 232 g.wait_for_all(); 233 234 return 0; 235 } 236 237 // 238 // Tests 239 // 240 // multiple parallel senders, items in FIFO (relatively to sender) order 241 // multiple parallel senders, multiple parallel receivers, items in FIFO order (relative to sender/receiver) and all items received 242 // * overlapped puts / gets 243 // * all puts finished before any getS 244 // 245 template< typename T > 246 int test_parallel(int num_threads) { 247 tbb::flow::graph g; 248 tbb::flow::queue_node<T> q(g); 249 tbb::flow::queue_node<T> q2(g); 250 tbb::flow::queue_node<T> q3(g); 251 { 252 Checker< T > my_check; 253 T bogus_value(-1); 254 T j = bogus_value; 255 utils::NativeParallelFor( num_threads, parallel_puts<T>(q) ); 256 257 T *next_value = new T[num_threads]; 258 for (int tid = 0; tid < num_threads; ++tid) next_value[tid] = T(0); 259 260 for (int i = 0; i < num_threads * N; ++i ) { 261 spin_try_get( q, j ); 262 check_item( next_value, j ); 263 j = bogus_value; 264 } 265 for (int tid = 0; tid < num_threads; ++tid) { 266 CHECK_MESSAGE( next_value[tid] == T(N), "" ); 267 } 268 delete[] next_value; 269 270 j = bogus_value; 271 g.wait_for_all(); 272 CHECK_MESSAGE( q.try_get( j ) == false, "" ); 273 CHECK_MESSAGE( j == bogus_value, "" ); 274 275 utils::NativeParallelFor( num_threads, parallel_puts<T>(q) ); 276 277 { 278 touches< T > t( num_threads ); 279 utils::NativeParallelFor( num_threads, parallel_gets<T>(q, t) ); 280 g.wait_for_all(); 281 CHECK_MESSAGE( t.validate_touches(), "" ); 282 } 283 j = bogus_value; 284 CHECK_MESSAGE( q.try_get( j ) == false, "" ); 285 CHECK_MESSAGE( j == bogus_value, "" ); 286 287 g.wait_for_all(); 288 { 289 touches< T > t2( num_threads ); 290 utils::NativeParallelFor( num_threads, parallel_put_get<T>(q, t2) ); 291 g.wait_for_all(); 292 CHECK_MESSAGE( t2.validate_touches(), "" ); 293 } 294 j = bogus_value; 295 CHECK_MESSAGE( q.try_get( j ) == false, "" ); 296 CHECK_MESSAGE( j == bogus_value, "" ); 297 298 tbb::flow::make_edge( q, q2 ); 299 tbb::flow::make_edge( q2, q3 ); 300 301 utils::NativeParallelFor( num_threads, parallel_puts<T>(q) ); 302 { 303 touches< T > t3( num_threads ); 304 utils::NativeParallelFor( num_threads, parallel_gets<T>(q3, t3) ); 305 g.wait_for_all(); 306 CHECK_MESSAGE( t3.validate_touches(), "" ); 307 } 308 j = bogus_value; 309 g.wait_for_all(); 310 CHECK_MESSAGE( q.try_get( j ) == false, "" ); 311 g.wait_for_all(); 312 CHECK_MESSAGE( q2.try_get( j ) == false, "" ); 313 g.wait_for_all(); 314 CHECK_MESSAGE( q3.try_get( j ) == false, "" ); 315 CHECK_MESSAGE( j == bogus_value, "" ); 316 317 // test copy constructor 318 CHECK_MESSAGE( remove_successor( q, q2 ), "" ); 319 utils::NativeParallelFor( num_threads, parallel_puts<T>(q) ); 320 tbb::flow::queue_node<T> q_copy(q); 321 j = bogus_value; 322 g.wait_for_all(); 323 CHECK_MESSAGE( q_copy.try_get( j ) == false, "" ); 324 CHECK_MESSAGE( register_successor( q, q_copy ) == true, "" ); 325 { 326 touches< T > t( num_threads ); 327 utils::NativeParallelFor( num_threads, parallel_gets<T>(q_copy, t) ); 328 g.wait_for_all(); 329 CHECK_MESSAGE( t.validate_touches(), "" ); 330 } 331 j = bogus_value; 332 CHECK_MESSAGE( q.try_get( j ) == false, "" ); 333 CHECK_MESSAGE( j == bogus_value, "" ); 334 CHECK_MESSAGE( q_copy.try_get( j ) == false, "" ); 335 CHECK_MESSAGE( j == bogus_value, "" ); 336 } 337 338 return 0; 339 } 340 341 // 342 // Tests 343 // 344 // Predecessors cannot be registered 345 // Empty Q rejects item requests 346 // Single serial sender, items in FIFO order 347 // Chained Qs ( 2 & 3 ), single sender, items at last Q in FIFO order 348 // 349 350 template< typename T > 351 int test_serial() { 352 tbb::flow::graph g; 353 tbb::flow::queue_node<T> q(g); 354 tbb::flow::queue_node<T> q2(g); 355 { // destroy the graph after manipulating it, and see if all the items in the buffers 356 // have been destroyed before the graph 357 Checker<T> my_check; // if CheckType< U > count constructions and destructions 358 T bogus_value(-1); 359 T j = bogus_value; 360 361 // 362 // Rejects attempts to add / remove predecessor 363 // Rejects request from empty Q 364 // 365 CHECK_MESSAGE( register_predecessor( q, q2 ) == false, "" ); 366 CHECK_MESSAGE( remove_predecessor( q, q2 ) == false, "" ); 367 CHECK_MESSAGE( q.try_get( j ) == false, "" ); 368 CHECK_MESSAGE( j == bogus_value, "" ); 369 370 // 371 // Simple puts and gets 372 // 373 374 for (int i = 0; i < N; ++i) { 375 bool msg = q.try_put( T(i) ); 376 CHECK_MESSAGE( msg == true, "" ); 377 } 378 379 380 for (int i = 0; i < N; ++i) { 381 j = bogus_value; 382 spin_try_get( q, j ); 383 CHECK_MESSAGE( i == j, "" ); 384 } 385 j = bogus_value; 386 g.wait_for_all(); 387 CHECK_MESSAGE( q.try_get( j ) == false, "" ); 388 CHECK_MESSAGE( j == bogus_value, "" ); 389 390 tbb::flow::make_edge( q, q2 ); 391 392 for (int i = 0; i < N; ++i) { 393 bool msg = q.try_put( T(i) ); 394 CHECK_MESSAGE( msg == true, "" ); 395 } 396 397 398 for (int i = 0; i < N; ++i) { 399 j = bogus_value; 400 spin_try_get( q2, j ); 401 CHECK_MESSAGE( i == j, "" ); 402 } 403 j = bogus_value; 404 g.wait_for_all(); 405 CHECK_MESSAGE( q.try_get( j ) == false, "" ); 406 g.wait_for_all(); 407 CHECK_MESSAGE( q2.try_get( j ) == false, "" ); 408 CHECK_MESSAGE( j == bogus_value, "" ); 409 410 tbb::flow::remove_edge( q, q2 ); 411 CHECK_MESSAGE( q.try_put( 1 ) == true, "" ); 412 g.wait_for_all(); 413 CHECK_MESSAGE( q2.try_get( j ) == false, "" ); 414 CHECK_MESSAGE( j == bogus_value, "" ); 415 g.wait_for_all(); 416 CHECK_MESSAGE( q.try_get( j ) == true, "" ); 417 CHECK_MESSAGE( j == 1, "" ); 418 419 tbb::flow::queue_node<T> q3(g); 420 tbb::flow::make_edge( q, q2 ); 421 tbb::flow::make_edge( q2, q3 ); 422 423 for (int i = 0; i < N; ++i) { 424 bool msg = q.try_put( T(i) ); 425 CHECK_MESSAGE( msg == true, "" ); 426 } 427 428 for (int i = 0; i < N; ++i) { 429 j = bogus_value; 430 spin_try_get( q3, j ); 431 CHECK_MESSAGE( i == j, "" ); 432 } 433 j = bogus_value; 434 g.wait_for_all(); 435 CHECK_MESSAGE( q.try_get( j ) == false, "" ); 436 g.wait_for_all(); 437 CHECK_MESSAGE( q2.try_get( j ) == false, "" ); 438 g.wait_for_all(); 439 CHECK_MESSAGE( q3.try_get( j ) == false, "" ); 440 CHECK_MESSAGE( j == bogus_value, "" ); 441 442 tbb::flow::remove_edge( q, q2 ); 443 CHECK_MESSAGE( q.try_put( 1 ) == true, "" ); 444 g.wait_for_all(); 445 CHECK_MESSAGE( q2.try_get( j ) == false, "" ); 446 CHECK_MESSAGE( j == bogus_value, "" ); 447 g.wait_for_all(); 448 CHECK_MESSAGE( q3.try_get( j ) == false, "" ); 449 CHECK_MESSAGE( j == bogus_value, "" ); 450 g.wait_for_all(); 451 CHECK_MESSAGE( q.try_get( j ) == true, "" ); 452 CHECK_MESSAGE( j == 1, "" ); 453 } 454 455 return 0; 456 } 457 458 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 459 #include <array> 460 #include <vector> 461 void test_follows_and_precedes_api() { 462 std::array<int, 3> messages_for_follows = { {0, 1, 2} }; 463 std::vector<int> messages_for_precedes = {0, 1, 2}; 464 465 follows_and_precedes_testing::test_follows <int, tbb::flow::queue_node<int>>(messages_for_follows); 466 follows_and_precedes_testing::test_precedes <int, tbb::flow::queue_node<int>>(messages_for_precedes); 467 } 468 #endif 469 470 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT 471 void test_deduction_guides() { 472 using namespace tbb::flow; 473 graph g; 474 broadcast_node<int> br(g); 475 queue_node<int> q0(g); 476 477 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 478 queue_node q1(follows(br)); 479 static_assert(std::is_same_v<decltype(q1), queue_node<int>>); 480 481 queue_node q2(precedes(br)); 482 static_assert(std::is_same_v<decltype(q2), queue_node<int>>); 483 #endif 484 485 queue_node q3(q0); 486 static_assert(std::is_same_v<decltype(q3), queue_node<int>>); 487 g.wait_for_all(); 488 } 489 #endif 490 491 //! Test serial, parallel behavior and reservation under parallelism 492 //! \brief \ref requirement \ref error_guessing 493 TEST_CASE("Parallel, serial test"){ 494 for (int p = 2; p <= 4; ++p) { 495 tbb::task_arena arena(p); 496 arena.execute( 497 [&]() { 498 499 test_serial<int>(); 500 test_serial<CheckType<int> >(); 501 test_parallel<int>(p); 502 test_parallel<CheckType<int> >(p); 503 504 } 505 ); 506 } 507 } 508 509 //! Test reset and cancellation 510 //! \brief \ref error_guessing 511 TEST_CASE("Resets test"){ 512 INFO("Testing resets\n"); 513 test_resets<int, tbb::flow::queue_node<int> >(); 514 test_resets<float, tbb::flow::queue_node<float> >(); 515 } 516 517 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 518 //! Test follows and precedes API 519 //! \brief \ref error_guessing 520 TEST_CASE("Test follows and precedes API"){ 521 test_follows_and_precedes_api(); 522 } 523 #endif 524 525 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT 526 //! Test decution guides 527 //! \brief \ref requirement 528 TEST_CASE("Deduction guides"){ 529 test_deduction_guides(); 530 } 531 #endif 532 533 //! Test operations on a reserved queue_node 534 //! \brief \ref error_guessing 535 TEST_CASE("queue_node with reservation"){ 536 tbb::flow::graph g; 537 538 tbb::flow::queue_node<int> q(g); 539 540 bool res = q.try_put(42); 541 CHECK_MESSAGE( res, "queue_node must accept input." ); 542 543 int val = 1; 544 res = q.try_reserve(val); 545 CHECK_MESSAGE( res, "queue_node must reserve as it has an item." ); 546 CHECK_MESSAGE( (val == 42), "queue_node must reserve once passed item." ); 547 548 int out_arg = -1; 549 CHECK_MESSAGE((q.try_reserve(out_arg) == false), "Reserving a reserved node should fail."); 550 CHECK_MESSAGE((out_arg == -1), "Reserving a reserved node should not update its argument."); 551 552 out_arg = -1; 553 CHECK_MESSAGE((q.try_get(out_arg) == false), "Getting from reserved node should fail."); 554 CHECK_MESSAGE((out_arg == -1), "Getting from reserved node should not update its argument."); 555 g.wait_for_all(); 556 557 } 558