1 /* 2 Copyright (c) 2005-2021 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #include "common/config.h" 18 19 // TODO revamp: move parts dependent on __TBB_EXTRA_DEBUG into separate test(s) since having these 20 // parts in all of tests might make testing of the product, which is different from what is actually 21 // released. 22 #define __TBB_EXTRA_DEBUG 1 23 #include "tbb/flow_graph.h" 24 25 #include "common/test.h" 26 #include "common/utils.h" 27 #include "common/graph_utils.h" 28 #include "common/test_follows_and_precedes_api.h" 29 30 31 //! \file test_buffer_node.cpp 32 //! \brief Test for [flow_graph.buffer_node] specification 33 34 35 #define N 1000 36 #define C 10 37 38 template< typename T > 39 void spin_try_get( tbb::flow::buffer_node<T> &b, T &value ) { 40 while ( b.try_get(value) != true ) {} 41 } 42 43 template< typename T > 44 void check_item( T* count_value, T &value ) { 45 count_value[value / N] += value % N; 46 } 47 48 template< typename T > 49 struct parallel_puts : utils::NoAssign { 50 51 tbb::flow::buffer_node<T> &my_b; 52 53 parallel_puts( tbb::flow::buffer_node<T> &b ) : my_b(b) {} 54 55 void operator()(int i) const { 56 for (int j = 0; j < N; ++j) { 57 bool msg = my_b.try_put( T(N*i + j) ); 58 CHECK_MESSAGE( msg == true, "" ); 59 } 60 } 61 }; 62 63 template< typename T > 64 struct touches { 65 66 bool **my_touches; 67 int my_num_threads; 68 69 touches( int num_threads ) : my_num_threads(num_threads) { 70 my_touches = new bool* [my_num_threads]; 71 for ( int p = 0; p < my_num_threads; ++p) { 72 my_touches[p] = new bool[N]; 73 for ( int n = 0; n < N; ++n) 74 my_touches[p][n] = false; 75 } 76 } 77 78 ~touches() { 79 for ( int p = 0; p < my_num_threads; ++p) { 80 delete [] my_touches[p]; 81 } 82 delete [] my_touches; 83 } 84 85 bool check( T v ) { 86 CHECK_MESSAGE( my_touches[v/N][v%N] == false, "" ); 87 my_touches[v/N][v%N] = true; 88 return true; 89 } 90 91 bool validate_touches() { 92 for ( int p = 0; p < my_num_threads; ++p) { 93 for ( int n = 0; n < N; ++n) { 94 CHECK_MESSAGE( my_touches[p][n] == true, "" ); 95 } 96 } 97 return true; 98 } 99 }; 100 101 template< typename T > 102 struct parallel_gets : utils::NoAssign { 103 104 tbb::flow::buffer_node<T> &my_b; 105 touches<T> &my_touches; 106 107 parallel_gets( tbb::flow::buffer_node<T> &b, touches<T> &t) : my_b(b), my_touches(t) {} 108 109 void operator()(int) const { 110 for (int j = 0; j < N; ++j) { 111 T v; 112 spin_try_get( my_b, v ); 113 my_touches.check( v ); 114 } 115 } 116 117 }; 118 119 template< typename T > 120 struct parallel_put_get : utils::NoAssign { 121 122 tbb::flow::buffer_node<T> &my_b; 123 touches<T> &my_touches; 124 125 parallel_put_get( tbb::flow::buffer_node<T> &b, touches<T> &t ) : my_b(b), my_touches(t) {} 126 127 void operator()(int tid) const { 128 129 for ( int i = 0; i < N; i+=C ) { 130 int j_end = ( N < i + C ) ? N : i + C; 131 // dump about C values into the buffer 132 for ( int j = i; j < j_end; ++j ) { 133 CHECK_MESSAGE( my_b.try_put( T (N*tid + j ) ) == true, "" ); 134 } 135 // receiver about C values from the buffer 136 for ( int j = i; j < j_end; ++j ) { 137 T v; 138 spin_try_get( my_b, v ); 139 my_touches.check( v ); 140 } 141 } 142 } 143 144 }; 145 146 // 147 // Tests 148 // 149 // Item can be reserved, released, consumed ( single serial receiver ) 150 // 151 template< typename T > 152 int test_reservation() { 153 tbb::flow::graph g; 154 T bogus_value(-1); 155 156 // Simple tests 157 tbb::flow::buffer_node<T> b(g); 158 159 b.try_put(T(1)); 160 b.try_put(T(2)); 161 b.try_put(T(3)); 162 163 T v, vsum; 164 CHECK_MESSAGE( b.try_reserve(v) == true, "" ); 165 CHECK_MESSAGE( b.try_release() == true, "" ); 166 v = bogus_value; 167 g.wait_for_all(); 168 CHECK_MESSAGE( b.try_reserve(v) == true, "" ); 169 CHECK_MESSAGE( b.try_consume() == true, "" ); 170 vsum += v; 171 v = bogus_value; 172 g.wait_for_all(); 173 174 CHECK_MESSAGE( b.try_get(v) == true, "" ); 175 vsum += v; 176 v = bogus_value; 177 g.wait_for_all(); 178 179 CHECK_MESSAGE( b.try_reserve(v) == true, "" ); 180 CHECK_MESSAGE( b.try_release() == true, "" ); 181 v = bogus_value; 182 g.wait_for_all(); 183 CHECK_MESSAGE( b.try_reserve(v) == true, "" ); 184 CHECK_MESSAGE( b.try_consume() == true, "" ); 185 vsum += v; 186 CHECK_MESSAGE( vsum == T(6), ""); 187 v = bogus_value; 188 g.wait_for_all(); 189 190 return 0; 191 } 192 193 // 194 // Tests 195 // 196 // multiple parallel senders, items in arbitrary order 197 // multiple parallel senders, multiple parallel receivers, items in arbitrary order and all items received 198 // * overlapped puts / gets 199 // * all puts finished before any getS 200 // 201 template< typename T > 202 int test_parallel(int num_threads) { 203 tbb::flow::graph g; 204 tbb::flow::buffer_node<T> b(g); 205 tbb::flow::buffer_node<T> b2(g); 206 tbb::flow::buffer_node<T> b3(g); 207 T bogus_value(-1); 208 T j = bogus_value; 209 210 NativeParallelFor( num_threads, parallel_puts<T>(b) ); 211 212 T *next_value = new T[num_threads]; 213 for (int tid = 0; tid < num_threads; ++tid) next_value[tid] = T(0); 214 215 for (int i = 0; i < num_threads * N; ++i ) { 216 spin_try_get( b, j ); 217 check_item( next_value, j ); 218 j = bogus_value; 219 } 220 for (int tid = 0; tid < num_threads; ++tid) { 221 CHECK_MESSAGE( next_value[tid] == T((N*(N-1))/2), "" ); 222 } 223 224 j = bogus_value; 225 g.wait_for_all(); 226 CHECK_MESSAGE( b.try_get( j ) == false, "" ); 227 CHECK_MESSAGE( j == bogus_value, "" ); 228 229 NativeParallelFor( num_threads, parallel_puts<T>(b) ); 230 231 { 232 touches< T > t( num_threads ); 233 NativeParallelFor( num_threads, parallel_gets<T>(b, t) ); 234 g.wait_for_all(); 235 CHECK_MESSAGE( t.validate_touches(), "" ); 236 } 237 j = bogus_value; 238 CHECK_MESSAGE( b.try_get( j ) == false, "" ); 239 CHECK_MESSAGE( j == bogus_value, "" ); 240 241 g.wait_for_all(); 242 { 243 touches< T > t( num_threads ); 244 NativeParallelFor( num_threads, parallel_put_get<T>(b, t) ); 245 g.wait_for_all(); 246 CHECK_MESSAGE( t.validate_touches(), "" ); 247 } 248 j = bogus_value; 249 CHECK_MESSAGE( b.try_get( j ) == false, "" ); 250 CHECK_MESSAGE( j == bogus_value, "" ); 251 252 tbb::flow::make_edge( b, b2 ); 253 tbb::flow::make_edge( b2, b3 ); 254 255 NativeParallelFor( num_threads, parallel_puts<T>(b) ); 256 { 257 touches< T > t( num_threads ); 258 NativeParallelFor( num_threads, parallel_gets<T>(b3, t) ); 259 g.wait_for_all(); 260 CHECK_MESSAGE( t.validate_touches(), "" ); 261 } 262 j = bogus_value; 263 g.wait_for_all(); 264 CHECK_MESSAGE( b.try_get( j ) == false, "" ); 265 g.wait_for_all(); 266 CHECK_MESSAGE( b2.try_get( j ) == false, "" ); 267 g.wait_for_all(); 268 CHECK_MESSAGE( b3.try_get( j ) == false, "" ); 269 CHECK_MESSAGE( j == bogus_value, "" ); 270 271 // test copy constructor 272 CHECK_MESSAGE( b.remove_successor( b2 ), "" ); 273 // fill up b: 274 NativeParallelFor( num_threads, parallel_puts<T>(b) ); 275 // copy b: 276 tbb::flow::buffer_node<T> b_copy(b); 277 278 // b_copy should be empty 279 j = bogus_value; 280 g.wait_for_all(); 281 CHECK_MESSAGE( b_copy.try_get( j ) == false, "" ); 282 283 // hook them together: 284 CHECK_MESSAGE( b.register_successor(b_copy) == true, "" ); 285 // try to get content from b_copy 286 { 287 touches< T > t( num_threads ); 288 NativeParallelFor( num_threads, parallel_gets<T>(b_copy, t) ); 289 g.wait_for_all(); 290 CHECK_MESSAGE( t.validate_touches(), "" ); 291 } 292 // now both should be empty 293 j = bogus_value; 294 g.wait_for_all(); 295 CHECK_MESSAGE( b.try_get( j ) == false, "" ); 296 g.wait_for_all(); 297 CHECK_MESSAGE( b_copy.try_get( j ) == false, "" ); 298 CHECK_MESSAGE( j == bogus_value, "" ); 299 300 delete [] next_value; 301 return 0; 302 } 303 304 // 305 // Tests 306 // 307 // Predecessors cannot be registered 308 // Empty buffer rejects item requests 309 // Single serial sender, items in arbitrary order 310 // Chained buffers ( 2 & 3 ), single sender, items at last buffer in arbitrary order 311 // 312 313 #define TBB_INTERNAL_NAMESPACE detail::d1 314 using tbb::TBB_INTERNAL_NAMESPACE::register_predecessor; 315 using tbb::TBB_INTERNAL_NAMESPACE::remove_predecessor; 316 317 template< typename T > 318 int test_serial() { 319 tbb::flow::graph g; 320 T bogus_value(-1); 321 322 tbb::flow::buffer_node<T> b(g); 323 tbb::flow::buffer_node<T> b2(g); 324 T j = bogus_value; 325 326 // 327 // Rejects attempts to add / remove predecessor 328 // Rejects request from empty buffer 329 // 330 CHECK_MESSAGE( register_predecessor<T>( b, b2 ) == false, "" ); 331 CHECK_MESSAGE( remove_predecessor<T>( b, b2 ) == false, "" ); 332 CHECK_MESSAGE( b.try_get( j ) == false, "" ); 333 CHECK_MESSAGE( j == bogus_value, "" ); 334 335 // 336 // Simple puts and gets 337 // 338 339 for (int i = 0; i < N; ++i) { 340 bool msg = b.try_put( T(i) ); 341 CHECK_MESSAGE( msg == true, "" ); 342 } 343 344 T vsum = T(0); 345 for (int i = 0; i < N; ++i) { 346 j = bogus_value; 347 spin_try_get( b, j ); 348 vsum += j; 349 } 350 CHECK_MESSAGE( vsum == (N*(N-1))/2, ""); 351 j = bogus_value; 352 g.wait_for_all(); 353 CHECK_MESSAGE( b.try_get( j ) == false, "" ); 354 CHECK_MESSAGE( j == bogus_value, "" ); 355 356 tbb::flow::make_edge(b, b2); 357 358 vsum = T(0); 359 for (int i = 0; i < N; ++i) { 360 bool msg = b.try_put( T(i) ); 361 CHECK_MESSAGE( msg == true, "" ); 362 } 363 364 for (int i = 0; i < N; ++i) { 365 j = bogus_value; 366 spin_try_get( b2, j ); 367 vsum += j; 368 } 369 CHECK_MESSAGE( vsum == (N*(N-1))/2, ""); 370 j = bogus_value; 371 g.wait_for_all(); 372 CHECK_MESSAGE( b.try_get( j ) == false, "" ); 373 g.wait_for_all(); 374 CHECK_MESSAGE( b2.try_get( j ) == false, "" ); 375 CHECK_MESSAGE( j == bogus_value, "" ); 376 377 tbb::flow::remove_edge(b, b2); 378 CHECK_MESSAGE( b.try_put( 1 ) == true, "" ); 379 g.wait_for_all(); 380 CHECK_MESSAGE( b2.try_get( j ) == false, "" ); 381 CHECK_MESSAGE( j == bogus_value, "" ); 382 g.wait_for_all(); 383 CHECK_MESSAGE( b.try_get( j ) == true, "" ); 384 CHECK_MESSAGE( j == 1, "" ); 385 386 tbb::flow::buffer_node<T> b3(g); 387 tbb::flow::make_edge( b, b2 ); 388 tbb::flow::make_edge( b2, b3 ); 389 390 vsum = T(0); 391 for (int i = 0; i < N; ++i) { 392 bool msg = b.try_put( T(i) ); 393 CHECK_MESSAGE( msg == true, "" ); 394 } 395 396 for (int i = 0; i < N; ++i) { 397 j = bogus_value; 398 spin_try_get( b3, j ); 399 vsum += j; 400 } 401 CHECK_MESSAGE( vsum == (N*(N-1))/2, ""); 402 j = bogus_value; 403 g.wait_for_all(); 404 CHECK_MESSAGE( b.try_get( j ) == false, "" ); 405 g.wait_for_all(); 406 CHECK_MESSAGE( b2.try_get( j ) == false, "" ); 407 g.wait_for_all(); 408 CHECK_MESSAGE( b3.try_get( j ) == false, "" ); 409 CHECK_MESSAGE( j == bogus_value, "" ); 410 411 tbb::flow::remove_edge(b, b2); 412 CHECK_MESSAGE( b.try_put( 1 ) == true, "" ); 413 g.wait_for_all(); 414 CHECK_MESSAGE( b2.try_get( j ) == false, "" ); 415 CHECK_MESSAGE( j == bogus_value, "" ); 416 g.wait_for_all(); 417 CHECK_MESSAGE( b3.try_get( j ) == false, "" ); 418 CHECK_MESSAGE( j == bogus_value, "" ); 419 g.wait_for_all(); 420 CHECK_MESSAGE( b.try_get( j ) == true, "" ); 421 CHECK_MESSAGE( j == 1, "" ); 422 423 return 0; 424 } 425 426 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 427 #include <array> 428 #include <vector> 429 void test_follows_and_precedes_api() { 430 using msg_t = tbb::flow::continue_msg; 431 432 std::array<msg_t, 3> messages_for_follows = { {msg_t(), msg_t(), msg_t()} }; 433 std::vector<msg_t> messages_for_precedes = {msg_t(), msg_t(), msg_t()}; 434 435 follows_and_precedes_testing::test_follows<msg_t, tbb::flow::buffer_node<msg_t>>(messages_for_follows); 436 follows_and_precedes_testing::test_precedes<msg_t, tbb::flow::buffer_node<msg_t>>(messages_for_precedes); 437 } 438 #endif 439 440 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT 441 void test_deduction_guides() { 442 using namespace tbb::flow; 443 graph g; 444 broadcast_node<int> br(g); 445 buffer_node<int> b0(g); 446 447 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 448 buffer_node b1(follows(br)); 449 static_assert(std::is_same_v<decltype(b1), buffer_node<int>>); 450 451 buffer_node b2(precedes(br)); 452 static_assert(std::is_same_v<decltype(b2), buffer_node<int>>); 453 #endif 454 455 buffer_node b3(b0); 456 static_assert(std::is_same_v<decltype(b3), buffer_node<int>>); 457 g.wait_for_all(); 458 } 459 #endif 460 461 #include <iomanip> 462 463 //! Test buffer_node with parallel and serial neighbours 464 //! \brief \ref requirement \ref error_guessing 465 TEST_CASE("Serial and parallel test"){ 466 for (int p = 2; p <= 4; ++p) { 467 tbb::task_arena arena(p); 468 arena.execute( 469 [&]() { 470 test_serial<int>(); 471 test_parallel<int>(p); 472 } 473 ); 474 } 475 } 476 477 //! Test reset and cancellation behavior 478 //! \brief \ref error_guessing 479 TEST_CASE("Resets"){ 480 test_resets<int,tbb::flow::buffer_node<int> >(); 481 test_resets<float,tbb::flow::buffer_node<float> >(); 482 } 483 484 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 485 //! Test deprecated follows and preceedes API 486 //! \brief \ref error_guessing 487 TEST_CASE("Follows and precedes API"){ 488 test_follows_and_precedes_api(); 489 } 490 #endif 491 492 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT 493 //! Test deduction guides 494 //! \brief requirement 495 TEST_CASE("Deduction guides"){ 496 test_deduction_guides(); 497 } 498 #endif 499