1 /* 2 Copyright (c) 2005-2021 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 // TODO: Add overlapping put / receive tests 18 19 #include "common/config.h" 20 21 #include "tbb/flow_graph.h" 22 #include "tbb/global_control.h" 23 24 #include "common/test.h" 25 #include "common/utils.h" 26 #include "common/utils_assert.h" 27 #include "common/checktype.h" 28 #include "common/graph_utils.h" 29 #include "common/test_follows_and_precedes_api.h" 30 31 #include <cstdio> 32 33 34 //! \file test_queue_node.cpp 35 //! \brief Test for [flow_graph.queue_node] specification 36 37 38 #define N 1000 39 #define C 10 40 41 template< typename T > 42 void spin_try_get( tbb::flow::queue_node<T> &q, T &value ) { 43 int count = 0; 44 while ( q.try_get(value) != true ) { 45 if (count < 1000000) { 46 ++count; 47 } 48 if (count == 1000000) { 49 // Perhaps, we observe the missed wakeup. Enqueue a task to wake up threads. 50 tbb::task_arena a(tbb::task_arena::attach{}); 51 a.enqueue([]{}); 52 ++count; 53 } 54 } 55 } 56 57 template< typename T > 58 void check_item( T* next_value, T &value ) { 59 int tid = value / N; 60 int offset = value % N; 61 CHECK_MESSAGE( next_value[tid] == T(offset), "" ); 62 ++next_value[tid]; 63 } 64 65 template< typename T > 66 struct parallel_puts : utils::NoAssign { 67 68 tbb::flow::queue_node<T> &my_q; 69 70 parallel_puts( tbb::flow::queue_node<T> &q ) : my_q(q) {} 71 72 void operator()(int i) const { 73 for (int j = 0; j < N; ++j) { 74 bool msg = my_q.try_put( T(N*i + j) ); 75 CHECK_MESSAGE( msg == true, "" ); 76 } 77 } 78 79 }; 80 81 template< typename T > 82 struct touches { 83 84 bool **my_touches; 85 T **my_last_touch; 86 int my_num_threads; 87 88 touches( int num_threads ) : my_num_threads(num_threads) { 89 my_last_touch = new T* [my_num_threads]; 90 my_touches = new bool* [my_num_threads]; 91 for ( int p = 0; p < my_num_threads; ++p) { 92 my_last_touch[p] = new T[my_num_threads]; 93 for ( int p2 = 0; p2 < my_num_threads; ++p2) 94 my_last_touch[p][p2] = -1; 95 96 my_touches[p] = new bool[N*my_num_threads]; 97 for ( int n = 0; n < N*my_num_threads; ++n) 98 my_touches[p][n] = false; 99 } 100 } 101 102 ~touches() { 103 for ( int p = 0; p < my_num_threads; ++p) { 104 delete [] my_touches[p]; 105 delete [] my_last_touch[p]; 106 } 107 delete [] my_touches; 108 delete [] my_last_touch; 109 } 110 111 bool check( int tid, T v ) { 112 int v_tid = v / N; 113 if ( my_touches[tid][v] != false ) { 114 printf("Error: value seen twice by local thread\n"); 115 return false; 116 } 117 if ( v <= my_last_touch[tid][v_tid] ) { 118 printf("Error: value seen in wrong order by local thread\n"); 119 return false; 120 } 121 my_last_touch[tid][v_tid] = v; 122 my_touches[tid][v] = true; 123 return true; 124 } 125 126 bool validate_touches() { 127 bool *all_touches = new bool[N*my_num_threads]; 128 for ( int n = 0; n < N*my_num_threads; ++n) 129 all_touches[n] = false; 130 131 for ( int p = 0; p < my_num_threads; ++p) { 132 for ( int n = 0; n < N*my_num_threads; ++n) { 133 if ( my_touches[p][n] == true ) { 134 CHECK_MESSAGE( ( all_touches[n] == false), "value see by more than one thread\n" ); 135 all_touches[n] = true; 136 } 137 } 138 } 139 for ( int n = 0; n < N*my_num_threads; ++n) { 140 if ( !all_touches[n] ) 141 printf("No touch at %d, my_num_threads = %d\n", n, my_num_threads); 142 //CHECK_MESSAGE( ( all_touches[n] == true), "value not seen by any thread\n" ); 143 } 144 delete [] all_touches; 145 return true; 146 } 147 148 }; 149 150 template< typename T > 151 struct parallel_gets : utils::NoAssign { 152 153 tbb::flow::queue_node<T> &my_q; 154 touches<T> &my_touches; 155 156 parallel_gets( tbb::flow::queue_node<T> &q, touches<T> &t) : my_q(q), my_touches(t) {} 157 158 void operator()(int tid) const { 159 for (int j = 0; j < N; ++j) { 160 T v; 161 spin_try_get( my_q, v ); 162 my_touches.check( tid, v ); 163 } 164 } 165 166 }; 167 168 template< typename T > 169 struct parallel_put_get : utils::NoAssign { 170 171 tbb::flow::queue_node<T> &my_q; 172 touches<T> &my_touches; 173 174 parallel_put_get( tbb::flow::queue_node<T> &q, touches<T> &t ) : my_q(q), my_touches(t) {} 175 176 void operator()(int tid) const { 177 178 for ( int i = 0; i < N; i+=C ) { 179 int j_end = ( N < i + C ) ? N : i + C; 180 // dump about C values into the Q 181 for ( int j = i; j < j_end; ++j ) { 182 CHECK_MESSAGE( my_q.try_put( T (N*tid + j ) ) == true, "" ); 183 } 184 // receiver about C values from the Q 185 for ( int j = i; j < j_end; ++j ) { 186 T v; 187 spin_try_get( my_q, v ); 188 my_touches.check( tid, v ); 189 } 190 } 191 } 192 193 }; 194 195 // 196 // Tests 197 // 198 // Item can be reserved, released, consumed ( single serial receiver ) 199 // 200 template< typename T > 201 int test_reservation() { 202 tbb::flow::graph g; 203 T bogus_value(-1); 204 205 // Simple tests 206 tbb::flow::queue_node<T> q(g); 207 208 q.try_put(T(1)); 209 q.try_put(T(2)); 210 q.try_put(T(3)); 211 212 T v; 213 CHECK_MESSAGE( q.reserve_item(v) == true, "" ); 214 CHECK_MESSAGE( v == T(1), "" ); 215 CHECK_MESSAGE( q.release_reservation() == true, "" ); 216 v = bogus_value; 217 g.wait_for_all(); 218 CHECK_MESSAGE( q.reserve_item(v) == true, "" ); 219 CHECK_MESSAGE( v == T(1), "" ); 220 CHECK_MESSAGE( q.consume_reservation() == true, "" ); 221 v = bogus_value; 222 g.wait_for_all(); 223 224 CHECK_MESSAGE( q.try_get(v) == true, "" ); 225 CHECK_MESSAGE( v == T(2), "" ); 226 v = bogus_value; 227 g.wait_for_all(); 228 229 CHECK_MESSAGE( q.reserve_item(v) == true, "" ); 230 CHECK_MESSAGE( v == T(3), "" ); 231 CHECK_MESSAGE( q.release_reservation() == true, "" ); 232 v = bogus_value; 233 g.wait_for_all(); 234 CHECK_MESSAGE( q.reserve_item(v) == true, "" ); 235 CHECK_MESSAGE( v == T(3), "" ); 236 CHECK_MESSAGE( q.consume_reservation() == true, "" ); 237 v = bogus_value; 238 g.wait_for_all(); 239 240 return 0; 241 } 242 243 // 244 // Tests 245 // 246 // multiple parallel senders, items in FIFO (relatively to sender) order 247 // multiple parallel senders, multiple parallel receivers, items in FIFO order (relative to sender/receiver) and all items received 248 // * overlapped puts / gets 249 // * all puts finished before any getS 250 // 251 template< typename T > 252 int test_parallel(int num_threads) { 253 tbb::flow::graph g; 254 tbb::flow::queue_node<T> q(g); 255 tbb::flow::queue_node<T> q2(g); 256 tbb::flow::queue_node<T> q3(g); 257 { 258 Checker< T > my_check; 259 T bogus_value(-1); 260 T j = bogus_value; 261 utils::NativeParallelFor( num_threads, parallel_puts<T>(q) ); 262 263 T *next_value = new T[num_threads]; 264 for (int tid = 0; tid < num_threads; ++tid) next_value[tid] = T(0); 265 266 for (int i = 0; i < num_threads * N; ++i ) { 267 spin_try_get( q, j ); 268 check_item( next_value, j ); 269 j = bogus_value; 270 } 271 for (int tid = 0; tid < num_threads; ++tid) { 272 CHECK_MESSAGE( next_value[tid] == T(N), "" ); 273 } 274 delete[] next_value; 275 276 j = bogus_value; 277 g.wait_for_all(); 278 CHECK_MESSAGE( q.try_get( j ) == false, "" ); 279 CHECK_MESSAGE( j == bogus_value, "" ); 280 281 utils::NativeParallelFor( num_threads, parallel_puts<T>(q) ); 282 283 { 284 touches< T > t( num_threads ); 285 utils::NativeParallelFor( num_threads, parallel_gets<T>(q, t) ); 286 g.wait_for_all(); 287 CHECK_MESSAGE( t.validate_touches(), "" ); 288 } 289 j = bogus_value; 290 CHECK_MESSAGE( q.try_get( j ) == false, "" ); 291 CHECK_MESSAGE( j == bogus_value, "" ); 292 293 g.wait_for_all(); 294 { 295 touches< T > t2( num_threads ); 296 utils::NativeParallelFor( num_threads, parallel_put_get<T>(q, t2) ); 297 g.wait_for_all(); 298 CHECK_MESSAGE( t2.validate_touches(), "" ); 299 } 300 j = bogus_value; 301 CHECK_MESSAGE( q.try_get( j ) == false, "" ); 302 CHECK_MESSAGE( j == bogus_value, "" ); 303 304 tbb::flow::make_edge( q, q2 ); 305 tbb::flow::make_edge( q2, q3 ); 306 307 utils::NativeParallelFor( num_threads, parallel_puts<T>(q) ); 308 { 309 touches< T > t3( num_threads ); 310 utils::NativeParallelFor( num_threads, parallel_gets<T>(q3, t3) ); 311 g.wait_for_all(); 312 CHECK_MESSAGE( t3.validate_touches(), "" ); 313 } 314 j = bogus_value; 315 g.wait_for_all(); 316 CHECK_MESSAGE( q.try_get( j ) == false, "" ); 317 g.wait_for_all(); 318 CHECK_MESSAGE( q2.try_get( j ) == false, "" ); 319 g.wait_for_all(); 320 CHECK_MESSAGE( q3.try_get( j ) == false, "" ); 321 CHECK_MESSAGE( j == bogus_value, "" ); 322 323 // test copy constructor 324 CHECK_MESSAGE( remove_successor( q, q2 ), "" ); 325 utils::NativeParallelFor( num_threads, parallel_puts<T>(q) ); 326 tbb::flow::queue_node<T> q_copy(q); 327 j = bogus_value; 328 g.wait_for_all(); 329 CHECK_MESSAGE( q_copy.try_get( j ) == false, "" ); 330 CHECK_MESSAGE( register_successor( q, q_copy ) == true, "" ); 331 { 332 touches< T > t( num_threads ); 333 utils::NativeParallelFor( num_threads, parallel_gets<T>(q_copy, t) ); 334 g.wait_for_all(); 335 CHECK_MESSAGE( t.validate_touches(), "" ); 336 } 337 j = bogus_value; 338 CHECK_MESSAGE( q.try_get( j ) == false, "" ); 339 CHECK_MESSAGE( j == bogus_value, "" ); 340 CHECK_MESSAGE( q_copy.try_get( j ) == false, "" ); 341 CHECK_MESSAGE( j == bogus_value, "" ); 342 } 343 344 return 0; 345 } 346 347 // 348 // Tests 349 // 350 // Predecessors cannot be registered 351 // Empty Q rejects item requests 352 // Single serial sender, items in FIFO order 353 // Chained Qs ( 2 & 3 ), single sender, items at last Q in FIFO order 354 // 355 356 template< typename T > 357 int test_serial() { 358 tbb::flow::graph g; 359 tbb::flow::queue_node<T> q(g); 360 tbb::flow::queue_node<T> q2(g); 361 { // destroy the graph after manipulating it, and see if all the items in the buffers 362 // have been destroyed before the graph 363 Checker<T> my_check; // if CheckType< U > count constructions and destructions 364 T bogus_value(-1); 365 T j = bogus_value; 366 367 // 368 // Rejects attempts to add / remove predecessor 369 // Rejects request from empty Q 370 // 371 CHECK_MESSAGE( register_predecessor( q, q2 ) == false, "" ); 372 CHECK_MESSAGE( remove_predecessor( q, q2 ) == false, "" ); 373 CHECK_MESSAGE( q.try_get( j ) == false, "" ); 374 CHECK_MESSAGE( j == bogus_value, "" ); 375 376 // 377 // Simple puts and gets 378 // 379 380 for (int i = 0; i < N; ++i) { 381 bool msg = q.try_put( T(i) ); 382 CHECK_MESSAGE( msg == true, "" ); 383 } 384 385 386 for (int i = 0; i < N; ++i) { 387 j = bogus_value; 388 spin_try_get( q, j ); 389 CHECK_MESSAGE( i == j, "" ); 390 } 391 j = bogus_value; 392 g.wait_for_all(); 393 CHECK_MESSAGE( q.try_get( j ) == false, "" ); 394 CHECK_MESSAGE( j == bogus_value, "" ); 395 396 tbb::flow::make_edge( q, q2 ); 397 398 for (int i = 0; i < N; ++i) { 399 bool msg = q.try_put( T(i) ); 400 CHECK_MESSAGE( msg == true, "" ); 401 } 402 403 404 for (int i = 0; i < N; ++i) { 405 j = bogus_value; 406 spin_try_get( q2, j ); 407 CHECK_MESSAGE( i == j, "" ); 408 } 409 j = bogus_value; 410 g.wait_for_all(); 411 CHECK_MESSAGE( q.try_get( j ) == false, "" ); 412 g.wait_for_all(); 413 CHECK_MESSAGE( q2.try_get( j ) == false, "" ); 414 CHECK_MESSAGE( j == bogus_value, "" ); 415 416 tbb::flow::remove_edge( q, q2 ); 417 CHECK_MESSAGE( q.try_put( 1 ) == true, "" ); 418 g.wait_for_all(); 419 CHECK_MESSAGE( q2.try_get( j ) == false, "" ); 420 CHECK_MESSAGE( j == bogus_value, "" ); 421 g.wait_for_all(); 422 CHECK_MESSAGE( q.try_get( j ) == true, "" ); 423 CHECK_MESSAGE( j == 1, "" ); 424 425 tbb::flow::queue_node<T> q3(g); 426 tbb::flow::make_edge( q, q2 ); 427 tbb::flow::make_edge( q2, q3 ); 428 429 for (int i = 0; i < N; ++i) { 430 bool msg = q.try_put( T(i) ); 431 CHECK_MESSAGE( msg == true, "" ); 432 } 433 434 for (int i = 0; i < N; ++i) { 435 j = bogus_value; 436 spin_try_get( q3, j ); 437 CHECK_MESSAGE( i == j, "" ); 438 } 439 j = bogus_value; 440 g.wait_for_all(); 441 CHECK_MESSAGE( q.try_get( j ) == false, "" ); 442 g.wait_for_all(); 443 CHECK_MESSAGE( q2.try_get( j ) == false, "" ); 444 g.wait_for_all(); 445 CHECK_MESSAGE( q3.try_get( j ) == false, "" ); 446 CHECK_MESSAGE( j == bogus_value, "" ); 447 448 tbb::flow::remove_edge( q, q2 ); 449 CHECK_MESSAGE( q.try_put( 1 ) == true, "" ); 450 g.wait_for_all(); 451 CHECK_MESSAGE( q2.try_get( j ) == false, "" ); 452 CHECK_MESSAGE( j == bogus_value, "" ); 453 g.wait_for_all(); 454 CHECK_MESSAGE( q3.try_get( j ) == false, "" ); 455 CHECK_MESSAGE( j == bogus_value, "" ); 456 g.wait_for_all(); 457 CHECK_MESSAGE( q.try_get( j ) == true, "" ); 458 CHECK_MESSAGE( j == 1, "" ); 459 } 460 461 return 0; 462 } 463 464 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 465 #include <array> 466 #include <vector> 467 void test_follows_and_precedes_api() { 468 std::array<int, 3> messages_for_follows = { {0, 1, 2} }; 469 std::vector<int> messages_for_precedes = {0, 1, 2}; 470 471 follows_and_precedes_testing::test_follows <int, tbb::flow::queue_node<int>>(messages_for_follows); 472 follows_and_precedes_testing::test_precedes <int, tbb::flow::queue_node<int>>(messages_for_precedes); 473 } 474 #endif 475 476 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT 477 void test_deduction_guides() { 478 using namespace tbb::flow; 479 graph g; 480 broadcast_node<int> br(g); 481 queue_node<int> q0(g); 482 483 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 484 queue_node q1(follows(br)); 485 static_assert(std::is_same_v<decltype(q1), queue_node<int>>); 486 487 queue_node q2(precedes(br)); 488 static_assert(std::is_same_v<decltype(q2), queue_node<int>>); 489 #endif 490 491 queue_node q3(q0); 492 static_assert(std::is_same_v<decltype(q3), queue_node<int>>); 493 g.wait_for_all(); 494 } 495 #endif 496 497 //! Test serial, parallel behavior and reservation under parallelism 498 //! \brief \ref requirement \ref error_guessing 499 TEST_CASE("Parallel, serial test"){ 500 for (int p = 2; p <= 4; ++p) { 501 tbb::global_control thread_limit(tbb::global_control::max_allowed_parallelism, p); 502 tbb::task_arena arena(p); 503 arena.execute( 504 [&]() { 505 test_serial<int>(); 506 test_serial<CheckType<int> >(); 507 test_parallel<int>(p); 508 test_parallel<CheckType<int> >(p); 509 } 510 ); 511 } 512 } 513 514 //! Test reset and cancellation 515 //! \brief \ref error_guessing 516 TEST_CASE("Resets test"){ 517 INFO("Testing resets\n"); 518 test_resets<int, tbb::flow::queue_node<int> >(); 519 test_resets<float, tbb::flow::queue_node<float> >(); 520 } 521 522 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 523 //! Test follows and precedes API 524 //! \brief \ref error_guessing 525 TEST_CASE("Test follows and precedes API"){ 526 test_follows_and_precedes_api(); 527 } 528 #endif 529 530 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT 531 //! Test decution guides 532 //! \brief \ref requirement 533 TEST_CASE("Deduction guides"){ 534 test_deduction_guides(); 535 } 536 #endif 537 538 //! Test operations on a reserved queue_node 539 //! \brief \ref error_guessing 540 TEST_CASE("queue_node with reservation"){ 541 tbb::flow::graph g; 542 543 tbb::flow::queue_node<int> q(g); 544 545 bool res = q.try_put(42); 546 CHECK_MESSAGE( res, "queue_node must accept input." ); 547 548 int val = 1; 549 res = q.try_reserve(val); 550 CHECK_MESSAGE( res, "queue_node must reserve as it has an item." ); 551 CHECK_MESSAGE( (val == 42), "queue_node must reserve once passed item." ); 552 553 int out_arg = -1; 554 CHECK_MESSAGE((q.try_reserve(out_arg) == false), "Reserving a reserved node should fail."); 555 CHECK_MESSAGE((out_arg == -1), "Reserving a reserved node should not update its argument."); 556 557 out_arg = -1; 558 CHECK_MESSAGE((q.try_get(out_arg) == false), "Getting from reserved node should fail."); 559 CHECK_MESSAGE((out_arg == -1), "Getting from reserved node should not update its argument."); 560 g.wait_for_all(); 561 } 562