1 /* 2 Copyright (c) 2005-2021 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 // TODO: Add overlapping put / receive tests 18 19 #include "common/config.h" 20 21 // TODO revamp: move parts dependent on __TBB_EXTRA_DEBUG into separate test(s) since having these 22 // parts in all of tests might make testing of the product, which is different from what is actually 23 // released. 24 #define __TBB_EXTRA_DEBUG 1 25 #include "tbb/flow_graph.h" 26 27 #include "common/test.h" 28 #include "common/utils.h" 29 #include "common/utils_assert.h" 30 #include "common/checktype.h" 31 #include "common/graph_utils.h" 32 #include "common/test_follows_and_precedes_api.h" 33 34 #include <cstdio> 35 36 37 //! \file test_queue_node.cpp 38 //! \brief Test for [flow_graph.queue_node] specification 39 40 41 #define N 1000 42 #define C 10 43 44 template< typename T > 45 void spin_try_get( tbb::flow::queue_node<T> &q, T &value ) { 46 int count = 0; 47 while ( q.try_get(value) != true ) { 48 if (count < 1000000) { 49 ++count; 50 } 51 if (count == 1000000) { 52 // Perhaps, we observe the missed wakeup. Enqueue a task to wake up threads. 53 tbb::task_arena a(tbb::task_arena::attach{}); 54 a.enqueue([]{}); 55 ++count; 56 } 57 } 58 } 59 60 template< typename T > 61 void check_item( T* next_value, T &value ) { 62 int tid = value / N; 63 int offset = value % N; 64 CHECK_MESSAGE( next_value[tid] == T(offset), "" ); 65 ++next_value[tid]; 66 } 67 68 template< typename T > 69 struct parallel_puts : utils::NoAssign { 70 71 tbb::flow::queue_node<T> &my_q; 72 73 parallel_puts( tbb::flow::queue_node<T> &q ) : my_q(q) {} 74 75 void operator()(int i) const { 76 for (int j = 0; j < N; ++j) { 77 bool msg = my_q.try_put( T(N*i + j) ); 78 CHECK_MESSAGE( msg == true, "" ); 79 } 80 } 81 82 }; 83 84 template< typename T > 85 struct touches { 86 87 bool **my_touches; 88 T **my_last_touch; 89 int my_num_threads; 90 91 touches( int num_threads ) : my_num_threads(num_threads) { 92 my_last_touch = new T* [my_num_threads]; 93 my_touches = new bool* [my_num_threads]; 94 for ( int p = 0; p < my_num_threads; ++p) { 95 my_last_touch[p] = new T[my_num_threads]; 96 for ( int p2 = 0; p2 < my_num_threads; ++p2) 97 my_last_touch[p][p2] = -1; 98 99 my_touches[p] = new bool[N*my_num_threads]; 100 for ( int n = 0; n < N*my_num_threads; ++n) 101 my_touches[p][n] = false; 102 } 103 } 104 105 ~touches() { 106 for ( int p = 0; p < my_num_threads; ++p) { 107 delete [] my_touches[p]; 108 delete [] my_last_touch[p]; 109 } 110 delete [] my_touches; 111 delete [] my_last_touch; 112 } 113 114 bool check( int tid, T v ) { 115 int v_tid = v / N; 116 if ( my_touches[tid][v] != false ) { 117 printf("Error: value seen twice by local thread\n"); 118 return false; 119 } 120 if ( v <= my_last_touch[tid][v_tid] ) { 121 printf("Error: value seen in wrong order by local thread\n"); 122 return false; 123 } 124 my_last_touch[tid][v_tid] = v; 125 my_touches[tid][v] = true; 126 return true; 127 } 128 129 bool validate_touches() { 130 bool *all_touches = new bool[N*my_num_threads]; 131 for ( int n = 0; n < N*my_num_threads; ++n) 132 all_touches[n] = false; 133 134 for ( int p = 0; p < my_num_threads; ++p) { 135 for ( int n = 0; n < N*my_num_threads; ++n) { 136 if ( my_touches[p][n] == true ) { 137 CHECK_MESSAGE( ( all_touches[n] == false), "value see by more than one thread\n" ); 138 all_touches[n] = true; 139 } 140 } 141 } 142 for ( int n = 0; n < N*my_num_threads; ++n) { 143 if ( !all_touches[n] ) 144 printf("No touch at %d, my_num_threads = %d\n", n, my_num_threads); 145 //CHECK_MESSAGE( ( all_touches[n] == true), "value not seen by any thread\n" ); 146 } 147 delete [] all_touches; 148 return true; 149 } 150 151 }; 152 153 template< typename T > 154 struct parallel_gets : utils::NoAssign { 155 156 tbb::flow::queue_node<T> &my_q; 157 touches<T> &my_touches; 158 159 parallel_gets( tbb::flow::queue_node<T> &q, touches<T> &t) : my_q(q), my_touches(t) {} 160 161 void operator()(int tid) const { 162 for (int j = 0; j < N; ++j) { 163 T v; 164 spin_try_get( my_q, v ); 165 my_touches.check( tid, v ); 166 } 167 } 168 169 }; 170 171 template< typename T > 172 struct parallel_put_get : utils::NoAssign { 173 174 tbb::flow::queue_node<T> &my_q; 175 touches<T> &my_touches; 176 177 parallel_put_get( tbb::flow::queue_node<T> &q, touches<T> &t ) : my_q(q), my_touches(t) {} 178 179 void operator()(int tid) const { 180 181 for ( int i = 0; i < N; i+=C ) { 182 int j_end = ( N < i + C ) ? N : i + C; 183 // dump about C values into the Q 184 for ( int j = i; j < j_end; ++j ) { 185 CHECK_MESSAGE( my_q.try_put( T (N*tid + j ) ) == true, "" ); 186 } 187 // receiver about C values from the Q 188 for ( int j = i; j < j_end; ++j ) { 189 T v; 190 spin_try_get( my_q, v ); 191 my_touches.check( tid, v ); 192 } 193 } 194 } 195 196 }; 197 198 // 199 // Tests 200 // 201 // Item can be reserved, released, consumed ( single serial receiver ) 202 // 203 template< typename T > 204 int test_reservation() { 205 tbb::flow::graph g; 206 T bogus_value(-1); 207 208 // Simple tests 209 tbb::flow::queue_node<T> q(g); 210 211 q.try_put(T(1)); 212 q.try_put(T(2)); 213 q.try_put(T(3)); 214 215 T v; 216 CHECK_MESSAGE( q.reserve_item(v) == true, "" ); 217 CHECK_MESSAGE( v == T(1), "" ); 218 CHECK_MESSAGE( q.release_reservation() == true, "" ); 219 v = bogus_value; 220 g.wait_for_all(); 221 CHECK_MESSAGE( q.reserve_item(v) == true, "" ); 222 CHECK_MESSAGE( v == T(1), "" ); 223 CHECK_MESSAGE( q.consume_reservation() == true, "" ); 224 v = bogus_value; 225 g.wait_for_all(); 226 227 CHECK_MESSAGE( q.try_get(v) == true, "" ); 228 CHECK_MESSAGE( v == T(2), "" ); 229 v = bogus_value; 230 g.wait_for_all(); 231 232 CHECK_MESSAGE( q.reserve_item(v) == true, "" ); 233 CHECK_MESSAGE( v == T(3), "" ); 234 CHECK_MESSAGE( q.release_reservation() == true, "" ); 235 v = bogus_value; 236 g.wait_for_all(); 237 CHECK_MESSAGE( q.reserve_item(v) == true, "" ); 238 CHECK_MESSAGE( v == T(3), "" ); 239 CHECK_MESSAGE( q.consume_reservation() == true, "" ); 240 v = bogus_value; 241 g.wait_for_all(); 242 243 return 0; 244 } 245 246 // 247 // Tests 248 // 249 // multiple parallel senders, items in FIFO (relatively to sender) order 250 // multiple parallel senders, multiple parallel receivers, items in FIFO order (relative to sender/receiver) and all items received 251 // * overlapped puts / gets 252 // * all puts finished before any getS 253 // 254 template< typename T > 255 int test_parallel(int num_threads) { 256 tbb::flow::graph g; 257 tbb::flow::queue_node<T> q(g); 258 tbb::flow::queue_node<T> q2(g); 259 tbb::flow::queue_node<T> q3(g); 260 { 261 Checker< T > my_check; 262 T bogus_value(-1); 263 T j = bogus_value; 264 utils::NativeParallelFor( num_threads, parallel_puts<T>(q) ); 265 266 T *next_value = new T[num_threads]; 267 for (int tid = 0; tid < num_threads; ++tid) next_value[tid] = T(0); 268 269 for (int i = 0; i < num_threads * N; ++i ) { 270 spin_try_get( q, j ); 271 check_item( next_value, j ); 272 j = bogus_value; 273 } 274 for (int tid = 0; tid < num_threads; ++tid) { 275 CHECK_MESSAGE( next_value[tid] == T(N), "" ); 276 } 277 delete[] next_value; 278 279 j = bogus_value; 280 g.wait_for_all(); 281 CHECK_MESSAGE( q.try_get( j ) == false, "" ); 282 CHECK_MESSAGE( j == bogus_value, "" ); 283 284 utils::NativeParallelFor( num_threads, parallel_puts<T>(q) ); 285 286 { 287 touches< T > t( num_threads ); 288 utils::NativeParallelFor( num_threads, parallel_gets<T>(q, t) ); 289 g.wait_for_all(); 290 CHECK_MESSAGE( t.validate_touches(), "" ); 291 } 292 j = bogus_value; 293 CHECK_MESSAGE( q.try_get( j ) == false, "" ); 294 CHECK_MESSAGE( j == bogus_value, "" ); 295 296 g.wait_for_all(); 297 { 298 touches< T > t2( num_threads ); 299 utils::NativeParallelFor( num_threads, parallel_put_get<T>(q, t2) ); 300 g.wait_for_all(); 301 CHECK_MESSAGE( t2.validate_touches(), "" ); 302 } 303 j = bogus_value; 304 CHECK_MESSAGE( q.try_get( j ) == false, "" ); 305 CHECK_MESSAGE( j == bogus_value, "" ); 306 307 tbb::flow::make_edge( q, q2 ); 308 tbb::flow::make_edge( q2, q3 ); 309 310 utils::NativeParallelFor( num_threads, parallel_puts<T>(q) ); 311 { 312 touches< T > t3( num_threads ); 313 utils::NativeParallelFor( num_threads, parallel_gets<T>(q3, t3) ); 314 g.wait_for_all(); 315 CHECK_MESSAGE( t3.validate_touches(), "" ); 316 } 317 j = bogus_value; 318 g.wait_for_all(); 319 CHECK_MESSAGE( q.try_get( j ) == false, "" ); 320 g.wait_for_all(); 321 CHECK_MESSAGE( q2.try_get( j ) == false, "" ); 322 g.wait_for_all(); 323 CHECK_MESSAGE( q3.try_get( j ) == false, "" ); 324 CHECK_MESSAGE( j == bogus_value, "" ); 325 326 // test copy constructor 327 CHECK_MESSAGE( remove_successor( q, q2 ), "" ); 328 utils::NativeParallelFor( num_threads, parallel_puts<T>(q) ); 329 tbb::flow::queue_node<T> q_copy(q); 330 j = bogus_value; 331 g.wait_for_all(); 332 CHECK_MESSAGE( q_copy.try_get( j ) == false, "" ); 333 CHECK_MESSAGE( register_successor( q, q_copy ) == true, "" ); 334 { 335 touches< T > t( num_threads ); 336 utils::NativeParallelFor( num_threads, parallel_gets<T>(q_copy, t) ); 337 g.wait_for_all(); 338 CHECK_MESSAGE( t.validate_touches(), "" ); 339 } 340 j = bogus_value; 341 CHECK_MESSAGE( q.try_get( j ) == false, "" ); 342 CHECK_MESSAGE( j == bogus_value, "" ); 343 CHECK_MESSAGE( q_copy.try_get( j ) == false, "" ); 344 CHECK_MESSAGE( j == bogus_value, "" ); 345 } 346 347 return 0; 348 } 349 350 // 351 // Tests 352 // 353 // Predecessors cannot be registered 354 // Empty Q rejects item requests 355 // Single serial sender, items in FIFO order 356 // Chained Qs ( 2 & 3 ), single sender, items at last Q in FIFO order 357 // 358 359 template< typename T > 360 int test_serial() { 361 tbb::flow::graph g; 362 tbb::flow::queue_node<T> q(g); 363 tbb::flow::queue_node<T> q2(g); 364 { // destroy the graph after manipulating it, and see if all the items in the buffers 365 // have been destroyed before the graph 366 Checker<T> my_check; // if CheckType< U > count constructions and destructions 367 T bogus_value(-1); 368 T j = bogus_value; 369 370 // 371 // Rejects attempts to add / remove predecessor 372 // Rejects request from empty Q 373 // 374 CHECK_MESSAGE( register_predecessor( q, q2 ) == false, "" ); 375 CHECK_MESSAGE( remove_predecessor( q, q2 ) == false, "" ); 376 CHECK_MESSAGE( q.try_get( j ) == false, "" ); 377 CHECK_MESSAGE( j == bogus_value, "" ); 378 379 // 380 // Simple puts and gets 381 // 382 383 for (int i = 0; i < N; ++i) { 384 bool msg = q.try_put( T(i) ); 385 CHECK_MESSAGE( msg == true, "" ); 386 } 387 388 389 for (int i = 0; i < N; ++i) { 390 j = bogus_value; 391 spin_try_get( q, j ); 392 CHECK_MESSAGE( i == j, "" ); 393 } 394 j = bogus_value; 395 g.wait_for_all(); 396 CHECK_MESSAGE( q.try_get( j ) == false, "" ); 397 CHECK_MESSAGE( j == bogus_value, "" ); 398 399 tbb::flow::make_edge( q, q2 ); 400 401 for (int i = 0; i < N; ++i) { 402 bool msg = q.try_put( T(i) ); 403 CHECK_MESSAGE( msg == true, "" ); 404 } 405 406 407 for (int i = 0; i < N; ++i) { 408 j = bogus_value; 409 spin_try_get( q2, j ); 410 CHECK_MESSAGE( i == j, "" ); 411 } 412 j = bogus_value; 413 g.wait_for_all(); 414 CHECK_MESSAGE( q.try_get( j ) == false, "" ); 415 g.wait_for_all(); 416 CHECK_MESSAGE( q2.try_get( j ) == false, "" ); 417 CHECK_MESSAGE( j == bogus_value, "" ); 418 419 tbb::flow::remove_edge( q, q2 ); 420 CHECK_MESSAGE( q.try_put( 1 ) == true, "" ); 421 g.wait_for_all(); 422 CHECK_MESSAGE( q2.try_get( j ) == false, "" ); 423 CHECK_MESSAGE( j == bogus_value, "" ); 424 g.wait_for_all(); 425 CHECK_MESSAGE( q.try_get( j ) == true, "" ); 426 CHECK_MESSAGE( j == 1, "" ); 427 428 tbb::flow::queue_node<T> q3(g); 429 tbb::flow::make_edge( q, q2 ); 430 tbb::flow::make_edge( q2, q3 ); 431 432 for (int i = 0; i < N; ++i) { 433 bool msg = q.try_put( T(i) ); 434 CHECK_MESSAGE( msg == true, "" ); 435 } 436 437 for (int i = 0; i < N; ++i) { 438 j = bogus_value; 439 spin_try_get( q3, j ); 440 CHECK_MESSAGE( i == j, "" ); 441 } 442 j = bogus_value; 443 g.wait_for_all(); 444 CHECK_MESSAGE( q.try_get( j ) == false, "" ); 445 g.wait_for_all(); 446 CHECK_MESSAGE( q2.try_get( j ) == false, "" ); 447 g.wait_for_all(); 448 CHECK_MESSAGE( q3.try_get( j ) == false, "" ); 449 CHECK_MESSAGE( j == bogus_value, "" ); 450 451 tbb::flow::remove_edge( q, q2 ); 452 CHECK_MESSAGE( q.try_put( 1 ) == true, "" ); 453 g.wait_for_all(); 454 CHECK_MESSAGE( q2.try_get( j ) == false, "" ); 455 CHECK_MESSAGE( j == bogus_value, "" ); 456 g.wait_for_all(); 457 CHECK_MESSAGE( q3.try_get( j ) == false, "" ); 458 CHECK_MESSAGE( j == bogus_value, "" ); 459 g.wait_for_all(); 460 CHECK_MESSAGE( q.try_get( j ) == true, "" ); 461 CHECK_MESSAGE( j == 1, "" ); 462 } 463 464 return 0; 465 } 466 467 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 468 #include <array> 469 #include <vector> 470 void test_follows_and_precedes_api() { 471 std::array<int, 3> messages_for_follows = { {0, 1, 2} }; 472 std::vector<int> messages_for_precedes = {0, 1, 2}; 473 474 follows_and_precedes_testing::test_follows <int, tbb::flow::queue_node<int>>(messages_for_follows); 475 follows_and_precedes_testing::test_precedes <int, tbb::flow::queue_node<int>>(messages_for_precedes); 476 } 477 #endif 478 479 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT 480 void test_deduction_guides() { 481 using namespace tbb::flow; 482 graph g; 483 broadcast_node<int> br(g); 484 queue_node<int> q0(g); 485 486 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 487 queue_node q1(follows(br)); 488 static_assert(std::is_same_v<decltype(q1), queue_node<int>>); 489 490 queue_node q2(precedes(br)); 491 static_assert(std::is_same_v<decltype(q2), queue_node<int>>); 492 #endif 493 494 queue_node q3(q0); 495 static_assert(std::is_same_v<decltype(q3), queue_node<int>>); 496 g.wait_for_all(); 497 } 498 #endif 499 500 //! Test serial, parallel behavior and reservation under parallelism 501 //! \brief \ref requirement \ref error_guessing 502 TEST_CASE("Parallel, serial test"){ 503 for (int p = 2; p <= 4; ++p) { 504 tbb::task_arena arena(p); 505 arena.execute( 506 [&]() { 507 test_serial<int>(); 508 test_serial<CheckType<int> >(); 509 test_parallel<int>(p); 510 test_parallel<CheckType<int> >(p); 511 } 512 ); 513 } 514 } 515 516 //! Test reset and cancellation 517 //! \brief \ref error_guessing 518 TEST_CASE("Resets test"){ 519 INFO("Testing resets\n"); 520 test_resets<int, tbb::flow::queue_node<int> >(); 521 test_resets<float, tbb::flow::queue_node<float> >(); 522 } 523 524 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 525 //! Test follows and precedes API 526 //! \brief \ref error_guessing 527 TEST_CASE("Test follows and precedes API"){ 528 test_follows_and_precedes_api(); 529 } 530 #endif 531 532 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT 533 //! Test decution guides 534 //! \brief \ref requirement 535 TEST_CASE("Deduction guides"){ 536 test_deduction_guides(); 537 } 538 #endif 539 540 //! Test operations on a reserved queue_node 541 //! \brief \ref error_guessing 542 TEST_CASE("queue_node with reservation"){ 543 tbb::flow::graph g; 544 545 tbb::flow::queue_node<int> q(g); 546 547 bool res = q.try_put(42); 548 CHECK_MESSAGE( res, "queue_node must accept input." ); 549 550 int val = 1; 551 res = q.try_reserve(val); 552 CHECK_MESSAGE( res, "queue_node must reserve as it has an item." ); 553 CHECK_MESSAGE( (val == 42), "queue_node must reserve once passed item." ); 554 555 int out_arg = -1; 556 CHECK_MESSAGE((q.try_reserve(out_arg) == false), "Reserving a reserved node should fail."); 557 CHECK_MESSAGE((out_arg == -1), "Reserving a reserved node should not update its argument."); 558 559 out_arg = -1; 560 CHECK_MESSAGE((q.try_get(out_arg) == false), "Getting from reserved node should fail."); 561 CHECK_MESSAGE((out_arg == -1), "Getting from reserved node should not update its argument."); 562 g.wait_for_all(); 563 } 564