1 /* 2 Copyright (c) 2005-2021 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 // TODO: Add overlapping put / receive tests 18 19 #include "common/config.h" 20 21 #include "tbb/flow_graph.h" 22 23 #include "common/test.h" 24 #include "common/utils.h" 25 #include "common/utils_assert.h" 26 #include "common/checktype.h" 27 #include "common/graph_utils.h" 28 #include "common/test_follows_and_precedes_api.h" 29 30 #include <cstdio> 31 32 33 //! \file test_queue_node.cpp 34 //! \brief Test for [flow_graph.queue_node] specification 35 36 37 #define N 1000 38 #define C 10 39 40 template< typename T > 41 void spin_try_get( tbb::flow::queue_node<T> &q, T &value ) { 42 int count = 0; 43 while ( q.try_get(value) != true ) { 44 if (count < 1000000) { 45 ++count; 46 } 47 if (count == 1000000) { 48 // Perhaps, we observe the missed wakeup. Enqueue a task to wake up threads. 49 tbb::task_arena a(tbb::task_arena::attach{}); 50 a.enqueue([]{}); 51 ++count; 52 } 53 } 54 } 55 56 template< typename T > 57 void check_item( T* next_value, T &value ) { 58 int tid = value / N; 59 int offset = value % N; 60 CHECK_MESSAGE( next_value[tid] == T(offset), "" ); 61 ++next_value[tid]; 62 } 63 64 template< typename T > 65 struct parallel_puts : utils::NoAssign { 66 67 tbb::flow::queue_node<T> &my_q; 68 69 parallel_puts( tbb::flow::queue_node<T> &q ) : my_q(q) {} 70 71 void operator()(int i) const { 72 for (int j = 0; j < N; ++j) { 73 bool msg = my_q.try_put( T(N*i + j) ); 74 CHECK_MESSAGE( msg == true, "" ); 75 } 76 } 77 78 }; 79 80 template< typename T > 81 struct touches { 82 83 bool **my_touches; 84 T **my_last_touch; 85 int my_num_threads; 86 87 touches( int num_threads ) : my_num_threads(num_threads) { 88 my_last_touch = new T* [my_num_threads]; 89 my_touches = new bool* [my_num_threads]; 90 for ( int p = 0; p < my_num_threads; ++p) { 91 my_last_touch[p] = new T[my_num_threads]; 92 for ( int p2 = 0; p2 < my_num_threads; ++p2) 93 my_last_touch[p][p2] = -1; 94 95 my_touches[p] = new bool[N*my_num_threads]; 96 for ( int n = 0; n < N*my_num_threads; ++n) 97 my_touches[p][n] = false; 98 } 99 } 100 101 ~touches() { 102 for ( int p = 0; p < my_num_threads; ++p) { 103 delete [] my_touches[p]; 104 delete [] my_last_touch[p]; 105 } 106 delete [] my_touches; 107 delete [] my_last_touch; 108 } 109 110 bool check( int tid, T v ) { 111 int v_tid = v / N; 112 if ( my_touches[tid][v] != false ) { 113 printf("Error: value seen twice by local thread\n"); 114 return false; 115 } 116 if ( v <= my_last_touch[tid][v_tid] ) { 117 printf("Error: value seen in wrong order by local thread\n"); 118 return false; 119 } 120 my_last_touch[tid][v_tid] = v; 121 my_touches[tid][v] = true; 122 return true; 123 } 124 125 bool validate_touches() { 126 bool *all_touches = new bool[N*my_num_threads]; 127 for ( int n = 0; n < N*my_num_threads; ++n) 128 all_touches[n] = false; 129 130 for ( int p = 0; p < my_num_threads; ++p) { 131 for ( int n = 0; n < N*my_num_threads; ++n) { 132 if ( my_touches[p][n] == true ) { 133 CHECK_MESSAGE( ( all_touches[n] == false), "value see by more than one thread\n" ); 134 all_touches[n] = true; 135 } 136 } 137 } 138 for ( int n = 0; n < N*my_num_threads; ++n) { 139 if ( !all_touches[n] ) 140 printf("No touch at %d, my_num_threads = %d\n", n, my_num_threads); 141 //CHECK_MESSAGE( ( all_touches[n] == true), "value not seen by any thread\n" ); 142 } 143 delete [] all_touches; 144 return true; 145 } 146 147 }; 148 149 template< typename T > 150 struct parallel_gets : utils::NoAssign { 151 152 tbb::flow::queue_node<T> &my_q; 153 touches<T> &my_touches; 154 155 parallel_gets( tbb::flow::queue_node<T> &q, touches<T> &t) : my_q(q), my_touches(t) {} 156 157 void operator()(int tid) const { 158 for (int j = 0; j < N; ++j) { 159 T v; 160 spin_try_get( my_q, v ); 161 my_touches.check( tid, v ); 162 } 163 } 164 165 }; 166 167 template< typename T > 168 struct parallel_put_get : utils::NoAssign { 169 170 tbb::flow::queue_node<T> &my_q; 171 touches<T> &my_touches; 172 173 parallel_put_get( tbb::flow::queue_node<T> &q, touches<T> &t ) : my_q(q), my_touches(t) {} 174 175 void operator()(int tid) const { 176 177 for ( int i = 0; i < N; i+=C ) { 178 int j_end = ( N < i + C ) ? N : i + C; 179 // dump about C values into the Q 180 for ( int j = i; j < j_end; ++j ) { 181 CHECK_MESSAGE( my_q.try_put( T (N*tid + j ) ) == true, "" ); 182 } 183 // receiver about C values from the Q 184 for ( int j = i; j < j_end; ++j ) { 185 T v; 186 spin_try_get( my_q, v ); 187 my_touches.check( tid, v ); 188 } 189 } 190 } 191 192 }; 193 194 // 195 // Tests 196 // 197 // Item can be reserved, released, consumed ( single serial receiver ) 198 // 199 template< typename T > 200 int test_reservation() { 201 tbb::flow::graph g; 202 T bogus_value(-1); 203 204 // Simple tests 205 tbb::flow::queue_node<T> q(g); 206 207 q.try_put(T(1)); 208 q.try_put(T(2)); 209 q.try_put(T(3)); 210 211 T v; 212 CHECK_MESSAGE( q.reserve_item(v) == true, "" ); 213 CHECK_MESSAGE( v == T(1), "" ); 214 CHECK_MESSAGE( q.release_reservation() == true, "" ); 215 v = bogus_value; 216 g.wait_for_all(); 217 CHECK_MESSAGE( q.reserve_item(v) == true, "" ); 218 CHECK_MESSAGE( v == T(1), "" ); 219 CHECK_MESSAGE( q.consume_reservation() == true, "" ); 220 v = bogus_value; 221 g.wait_for_all(); 222 223 CHECK_MESSAGE( q.try_get(v) == true, "" ); 224 CHECK_MESSAGE( v == T(2), "" ); 225 v = bogus_value; 226 g.wait_for_all(); 227 228 CHECK_MESSAGE( q.reserve_item(v) == true, "" ); 229 CHECK_MESSAGE( v == T(3), "" ); 230 CHECK_MESSAGE( q.release_reservation() == true, "" ); 231 v = bogus_value; 232 g.wait_for_all(); 233 CHECK_MESSAGE( q.reserve_item(v) == true, "" ); 234 CHECK_MESSAGE( v == T(3), "" ); 235 CHECK_MESSAGE( q.consume_reservation() == true, "" ); 236 v = bogus_value; 237 g.wait_for_all(); 238 239 return 0; 240 } 241 242 // 243 // Tests 244 // 245 // multiple parallel senders, items in FIFO (relatively to sender) order 246 // multiple parallel senders, multiple parallel receivers, items in FIFO order (relative to sender/receiver) and all items received 247 // * overlapped puts / gets 248 // * all puts finished before any getS 249 // 250 template< typename T > 251 int test_parallel(int num_threads) { 252 tbb::flow::graph g; 253 tbb::flow::queue_node<T> q(g); 254 tbb::flow::queue_node<T> q2(g); 255 tbb::flow::queue_node<T> q3(g); 256 { 257 Checker< T > my_check; 258 T bogus_value(-1); 259 T j = bogus_value; 260 utils::NativeParallelFor( num_threads, parallel_puts<T>(q) ); 261 262 T *next_value = new T[num_threads]; 263 for (int tid = 0; tid < num_threads; ++tid) next_value[tid] = T(0); 264 265 for (int i = 0; i < num_threads * N; ++i ) { 266 spin_try_get( q, j ); 267 check_item( next_value, j ); 268 j = bogus_value; 269 } 270 for (int tid = 0; tid < num_threads; ++tid) { 271 CHECK_MESSAGE( next_value[tid] == T(N), "" ); 272 } 273 delete[] next_value; 274 275 j = bogus_value; 276 g.wait_for_all(); 277 CHECK_MESSAGE( q.try_get( j ) == false, "" ); 278 CHECK_MESSAGE( j == bogus_value, "" ); 279 280 utils::NativeParallelFor( num_threads, parallel_puts<T>(q) ); 281 282 { 283 touches< T > t( num_threads ); 284 utils::NativeParallelFor( num_threads, parallel_gets<T>(q, t) ); 285 g.wait_for_all(); 286 CHECK_MESSAGE( t.validate_touches(), "" ); 287 } 288 j = bogus_value; 289 CHECK_MESSAGE( q.try_get( j ) == false, "" ); 290 CHECK_MESSAGE( j == bogus_value, "" ); 291 292 g.wait_for_all(); 293 { 294 touches< T > t2( num_threads ); 295 utils::NativeParallelFor( num_threads, parallel_put_get<T>(q, t2) ); 296 g.wait_for_all(); 297 CHECK_MESSAGE( t2.validate_touches(), "" ); 298 } 299 j = bogus_value; 300 CHECK_MESSAGE( q.try_get( j ) == false, "" ); 301 CHECK_MESSAGE( j == bogus_value, "" ); 302 303 tbb::flow::make_edge( q, q2 ); 304 tbb::flow::make_edge( q2, q3 ); 305 306 utils::NativeParallelFor( num_threads, parallel_puts<T>(q) ); 307 { 308 touches< T > t3( num_threads ); 309 utils::NativeParallelFor( num_threads, parallel_gets<T>(q3, t3) ); 310 g.wait_for_all(); 311 CHECK_MESSAGE( t3.validate_touches(), "" ); 312 } 313 j = bogus_value; 314 g.wait_for_all(); 315 CHECK_MESSAGE( q.try_get( j ) == false, "" ); 316 g.wait_for_all(); 317 CHECK_MESSAGE( q2.try_get( j ) == false, "" ); 318 g.wait_for_all(); 319 CHECK_MESSAGE( q3.try_get( j ) == false, "" ); 320 CHECK_MESSAGE( j == bogus_value, "" ); 321 322 // test copy constructor 323 CHECK_MESSAGE( remove_successor( q, q2 ), "" ); 324 utils::NativeParallelFor( num_threads, parallel_puts<T>(q) ); 325 tbb::flow::queue_node<T> q_copy(q); 326 j = bogus_value; 327 g.wait_for_all(); 328 CHECK_MESSAGE( q_copy.try_get( j ) == false, "" ); 329 CHECK_MESSAGE( register_successor( q, q_copy ) == true, "" ); 330 { 331 touches< T > t( num_threads ); 332 utils::NativeParallelFor( num_threads, parallel_gets<T>(q_copy, t) ); 333 g.wait_for_all(); 334 CHECK_MESSAGE( t.validate_touches(), "" ); 335 } 336 j = bogus_value; 337 CHECK_MESSAGE( q.try_get( j ) == false, "" ); 338 CHECK_MESSAGE( j == bogus_value, "" ); 339 CHECK_MESSAGE( q_copy.try_get( j ) == false, "" ); 340 CHECK_MESSAGE( j == bogus_value, "" ); 341 } 342 343 return 0; 344 } 345 346 // 347 // Tests 348 // 349 // Predecessors cannot be registered 350 // Empty Q rejects item requests 351 // Single serial sender, items in FIFO order 352 // Chained Qs ( 2 & 3 ), single sender, items at last Q in FIFO order 353 // 354 355 template< typename T > 356 int test_serial() { 357 tbb::flow::graph g; 358 tbb::flow::queue_node<T> q(g); 359 tbb::flow::queue_node<T> q2(g); 360 { // destroy the graph after manipulating it, and see if all the items in the buffers 361 // have been destroyed before the graph 362 Checker<T> my_check; // if CheckType< U > count constructions and destructions 363 T bogus_value(-1); 364 T j = bogus_value; 365 366 // 367 // Rejects attempts to add / remove predecessor 368 // Rejects request from empty Q 369 // 370 CHECK_MESSAGE( register_predecessor( q, q2 ) == false, "" ); 371 CHECK_MESSAGE( remove_predecessor( q, q2 ) == false, "" ); 372 CHECK_MESSAGE( q.try_get( j ) == false, "" ); 373 CHECK_MESSAGE( j == bogus_value, "" ); 374 375 // 376 // Simple puts and gets 377 // 378 379 for (int i = 0; i < N; ++i) { 380 bool msg = q.try_put( T(i) ); 381 CHECK_MESSAGE( msg == true, "" ); 382 } 383 384 385 for (int i = 0; i < N; ++i) { 386 j = bogus_value; 387 spin_try_get( q, j ); 388 CHECK_MESSAGE( i == j, "" ); 389 } 390 j = bogus_value; 391 g.wait_for_all(); 392 CHECK_MESSAGE( q.try_get( j ) == false, "" ); 393 CHECK_MESSAGE( j == bogus_value, "" ); 394 395 tbb::flow::make_edge( q, q2 ); 396 397 for (int i = 0; i < N; ++i) { 398 bool msg = q.try_put( T(i) ); 399 CHECK_MESSAGE( msg == true, "" ); 400 } 401 402 403 for (int i = 0; i < N; ++i) { 404 j = bogus_value; 405 spin_try_get( q2, j ); 406 CHECK_MESSAGE( i == j, "" ); 407 } 408 j = bogus_value; 409 g.wait_for_all(); 410 CHECK_MESSAGE( q.try_get( j ) == false, "" ); 411 g.wait_for_all(); 412 CHECK_MESSAGE( q2.try_get( j ) == false, "" ); 413 CHECK_MESSAGE( j == bogus_value, "" ); 414 415 tbb::flow::remove_edge( q, q2 ); 416 CHECK_MESSAGE( q.try_put( 1 ) == true, "" ); 417 g.wait_for_all(); 418 CHECK_MESSAGE( q2.try_get( j ) == false, "" ); 419 CHECK_MESSAGE( j == bogus_value, "" ); 420 g.wait_for_all(); 421 CHECK_MESSAGE( q.try_get( j ) == true, "" ); 422 CHECK_MESSAGE( j == 1, "" ); 423 424 tbb::flow::queue_node<T> q3(g); 425 tbb::flow::make_edge( q, q2 ); 426 tbb::flow::make_edge( q2, q3 ); 427 428 for (int i = 0; i < N; ++i) { 429 bool msg = q.try_put( T(i) ); 430 CHECK_MESSAGE( msg == true, "" ); 431 } 432 433 for (int i = 0; i < N; ++i) { 434 j = bogus_value; 435 spin_try_get( q3, j ); 436 CHECK_MESSAGE( i == j, "" ); 437 } 438 j = bogus_value; 439 g.wait_for_all(); 440 CHECK_MESSAGE( q.try_get( j ) == false, "" ); 441 g.wait_for_all(); 442 CHECK_MESSAGE( q2.try_get( j ) == false, "" ); 443 g.wait_for_all(); 444 CHECK_MESSAGE( q3.try_get( j ) == false, "" ); 445 CHECK_MESSAGE( j == bogus_value, "" ); 446 447 tbb::flow::remove_edge( q, q2 ); 448 CHECK_MESSAGE( q.try_put( 1 ) == true, "" ); 449 g.wait_for_all(); 450 CHECK_MESSAGE( q2.try_get( j ) == false, "" ); 451 CHECK_MESSAGE( j == bogus_value, "" ); 452 g.wait_for_all(); 453 CHECK_MESSAGE( q3.try_get( j ) == false, "" ); 454 CHECK_MESSAGE( j == bogus_value, "" ); 455 g.wait_for_all(); 456 CHECK_MESSAGE( q.try_get( j ) == true, "" ); 457 CHECK_MESSAGE( j == 1, "" ); 458 } 459 460 return 0; 461 } 462 463 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 464 #include <array> 465 #include <vector> 466 void test_follows_and_precedes_api() { 467 std::array<int, 3> messages_for_follows = { {0, 1, 2} }; 468 std::vector<int> messages_for_precedes = {0, 1, 2}; 469 470 follows_and_precedes_testing::test_follows <int, tbb::flow::queue_node<int>>(messages_for_follows); 471 follows_and_precedes_testing::test_precedes <int, tbb::flow::queue_node<int>>(messages_for_precedes); 472 } 473 #endif 474 475 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT 476 void test_deduction_guides() { 477 using namespace tbb::flow; 478 graph g; 479 broadcast_node<int> br(g); 480 queue_node<int> q0(g); 481 482 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 483 queue_node q1(follows(br)); 484 static_assert(std::is_same_v<decltype(q1), queue_node<int>>); 485 486 queue_node q2(precedes(br)); 487 static_assert(std::is_same_v<decltype(q2), queue_node<int>>); 488 #endif 489 490 queue_node q3(q0); 491 static_assert(std::is_same_v<decltype(q3), queue_node<int>>); 492 g.wait_for_all(); 493 } 494 #endif 495 496 //! Test serial, parallel behavior and reservation under parallelism 497 //! \brief \ref requirement \ref error_guessing 498 TEST_CASE("Parallel, serial test"){ 499 for (int p = 2; p <= 4; ++p) { 500 tbb::task_arena arena(p); 501 arena.execute( 502 [&]() { 503 test_serial<int>(); 504 test_serial<CheckType<int> >(); 505 test_parallel<int>(p); 506 test_parallel<CheckType<int> >(p); 507 } 508 ); 509 } 510 } 511 512 //! Test reset and cancellation 513 //! \brief \ref error_guessing 514 TEST_CASE("Resets test"){ 515 INFO("Testing resets\n"); 516 test_resets<int, tbb::flow::queue_node<int> >(); 517 test_resets<float, tbb::flow::queue_node<float> >(); 518 } 519 520 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 521 //! Test follows and precedes API 522 //! \brief \ref error_guessing 523 TEST_CASE("Test follows and precedes API"){ 524 test_follows_and_precedes_api(); 525 } 526 #endif 527 528 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT 529 //! Test decution guides 530 //! \brief \ref requirement 531 TEST_CASE("Deduction guides"){ 532 test_deduction_guides(); 533 } 534 #endif 535 536 //! Test operations on a reserved queue_node 537 //! \brief \ref error_guessing 538 TEST_CASE("queue_node with reservation"){ 539 tbb::flow::graph g; 540 541 tbb::flow::queue_node<int> q(g); 542 543 bool res = q.try_put(42); 544 CHECK_MESSAGE( res, "queue_node must accept input." ); 545 546 int val = 1; 547 res = q.try_reserve(val); 548 CHECK_MESSAGE( res, "queue_node must reserve as it has an item." ); 549 CHECK_MESSAGE( (val == 42), "queue_node must reserve once passed item." ); 550 551 int out_arg = -1; 552 CHECK_MESSAGE((q.try_reserve(out_arg) == false), "Reserving a reserved node should fail."); 553 CHECK_MESSAGE((out_arg == -1), "Reserving a reserved node should not update its argument."); 554 555 out_arg = -1; 556 CHECK_MESSAGE((q.try_get(out_arg) == false), "Getting from reserved node should fail."); 557 CHECK_MESSAGE((out_arg == -1), "Getting from reserved node should not update its argument."); 558 g.wait_for_all(); 559 } 560