1 /* 2 Copyright (c) 2005-2021 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #include "common/config.h" 18 19 #include "tbb/flow_graph.h" 20 #include "tbb/global_control.h" 21 22 #include "common/test.h" 23 #include "common/utils.h" 24 #include "common/graph_utils.h" 25 #include "common/test_follows_and_precedes_api.h" 26 27 28 //! \file test_buffer_node.cpp 29 //! \brief Test for [flow_graph.buffer_node] specification 30 31 32 #define N 1000 33 #define C 10 34 35 template< typename T > 36 void spin_try_get( tbb::flow::buffer_node<T> &b, T &value ) { 37 while ( b.try_get(value) != true ) {} 38 } 39 40 template< typename T > 41 void check_item( T* count_value, T &value ) { 42 count_value[value / N] += value % N; 43 } 44 45 template< typename T > 46 struct parallel_puts : utils::NoAssign { 47 48 tbb::flow::buffer_node<T> &my_b; 49 50 parallel_puts( tbb::flow::buffer_node<T> &b ) : my_b(b) {} 51 52 void operator()(int i) const { 53 for (int j = 0; j < N; ++j) { 54 bool msg = my_b.try_put( T(N*i + j) ); 55 CHECK_MESSAGE( msg == true, "" ); 56 } 57 } 58 }; 59 60 template< typename T > 61 struct touches { 62 63 bool **my_touches; 64 int my_num_threads; 65 66 touches( int num_threads ) : my_num_threads(num_threads) { 67 my_touches = new bool* [my_num_threads]; 68 for ( int p = 0; p < my_num_threads; ++p) { 69 my_touches[p] = new bool[N]; 70 for ( int n = 0; n < N; ++n) 71 my_touches[p][n] = false; 72 } 73 } 74 75 ~touches() { 76 for ( int p = 0; p < my_num_threads; ++p) { 77 delete [] my_touches[p]; 78 } 79 delete [] my_touches; 80 } 81 82 bool check( T v ) { 83 CHECK_MESSAGE( my_touches[v/N][v%N] == false, "" ); 84 my_touches[v/N][v%N] = true; 85 return true; 86 } 87 88 bool validate_touches() { 89 for ( int p = 0; p < my_num_threads; ++p) { 90 for ( int n = 0; n < N; ++n) { 91 CHECK_MESSAGE( my_touches[p][n] == true, "" ); 92 } 93 } 94 return true; 95 } 96 }; 97 98 template< typename T > 99 struct parallel_gets : utils::NoAssign { 100 101 tbb::flow::buffer_node<T> &my_b; 102 touches<T> &my_touches; 103 104 parallel_gets( tbb::flow::buffer_node<T> &b, touches<T> &t) : my_b(b), my_touches(t) {} 105 106 void operator()(int) const { 107 for (int j = 0; j < N; ++j) { 108 T v; 109 spin_try_get( my_b, v ); 110 my_touches.check( v ); 111 } 112 } 113 114 }; 115 116 template< typename T > 117 struct parallel_put_get : utils::NoAssign { 118 119 tbb::flow::buffer_node<T> &my_b; 120 touches<T> &my_touches; 121 122 parallel_put_get( tbb::flow::buffer_node<T> &b, touches<T> &t ) : my_b(b), my_touches(t) {} 123 124 void operator()(int tid) const { 125 126 for ( int i = 0; i < N; i+=C ) { 127 int j_end = ( N < i + C ) ? N : i + C; 128 // dump about C values into the buffer 129 for ( int j = i; j < j_end; ++j ) { 130 CHECK_MESSAGE( my_b.try_put( T (N*tid + j ) ) == true, "" ); 131 } 132 // receiver about C values from the buffer 133 for ( int j = i; j < j_end; ++j ) { 134 T v; 135 spin_try_get( my_b, v ); 136 my_touches.check( v ); 137 } 138 } 139 } 140 141 }; 142 143 // 144 // Tests 145 // 146 // Item can be reserved, released, consumed ( single serial receiver ) 147 // 148 template< typename T > 149 int test_reservation() { 150 tbb::flow::graph g; 151 T bogus_value(-1); 152 153 // Simple tests 154 tbb::flow::buffer_node<T> b(g); 155 156 b.try_put(T(1)); 157 b.try_put(T(2)); 158 b.try_put(T(3)); 159 160 T v, vsum; 161 CHECK_MESSAGE( b.try_reserve(v) == true, "" ); 162 CHECK_MESSAGE( b.try_release() == true, "" ); 163 v = bogus_value; 164 g.wait_for_all(); 165 CHECK_MESSAGE( b.try_reserve(v) == true, "" ); 166 CHECK_MESSAGE( b.try_consume() == true, "" ); 167 vsum += v; 168 v = bogus_value; 169 g.wait_for_all(); 170 171 CHECK_MESSAGE( b.try_get(v) == true, "" ); 172 vsum += v; 173 v = bogus_value; 174 g.wait_for_all(); 175 176 CHECK_MESSAGE( b.try_reserve(v) == true, "" ); 177 CHECK_MESSAGE( b.try_release() == true, "" ); 178 v = bogus_value; 179 g.wait_for_all(); 180 CHECK_MESSAGE( b.try_reserve(v) == true, "" ); 181 CHECK_MESSAGE( b.try_consume() == true, "" ); 182 vsum += v; 183 CHECK_MESSAGE( vsum == T(6), ""); 184 v = bogus_value; 185 g.wait_for_all(); 186 187 return 0; 188 } 189 190 // 191 // Tests 192 // 193 // multiple parallel senders, items in arbitrary order 194 // multiple parallel senders, multiple parallel receivers, items in arbitrary order and all items received 195 // * overlapped puts / gets 196 // * all puts finished before any getS 197 // 198 template< typename T > 199 int test_parallel(int num_threads) { 200 tbb::flow::graph g; 201 tbb::flow::buffer_node<T> b(g); 202 tbb::flow::buffer_node<T> b2(g); 203 tbb::flow::buffer_node<T> b3(g); 204 T bogus_value(-1); 205 T j = bogus_value; 206 207 NativeParallelFor( num_threads, parallel_puts<T>(b) ); 208 209 T *next_value = new T[num_threads]; 210 for (int tid = 0; tid < num_threads; ++tid) next_value[tid] = T(0); 211 212 for (int i = 0; i < num_threads * N; ++i ) { 213 spin_try_get( b, j ); 214 check_item( next_value, j ); 215 j = bogus_value; 216 } 217 for (int tid = 0; tid < num_threads; ++tid) { 218 CHECK_MESSAGE( next_value[tid] == T((N*(N-1))/2), "" ); 219 } 220 221 j = bogus_value; 222 g.wait_for_all(); 223 CHECK_MESSAGE( b.try_get( j ) == false, "" ); 224 CHECK_MESSAGE( j == bogus_value, "" ); 225 226 NativeParallelFor( num_threads, parallel_puts<T>(b) ); 227 228 { 229 touches< T > t( num_threads ); 230 NativeParallelFor( num_threads, parallel_gets<T>(b, t) ); 231 g.wait_for_all(); 232 CHECK_MESSAGE( t.validate_touches(), "" ); 233 } 234 j = bogus_value; 235 CHECK_MESSAGE( b.try_get( j ) == false, "" ); 236 CHECK_MESSAGE( j == bogus_value, "" ); 237 238 g.wait_for_all(); 239 { 240 touches< T > t( num_threads ); 241 NativeParallelFor( num_threads, parallel_put_get<T>(b, t) ); 242 g.wait_for_all(); 243 CHECK_MESSAGE( t.validate_touches(), "" ); 244 } 245 j = bogus_value; 246 CHECK_MESSAGE( b.try_get( j ) == false, "" ); 247 CHECK_MESSAGE( j == bogus_value, "" ); 248 249 tbb::flow::make_edge( b, b2 ); 250 tbb::flow::make_edge( b2, b3 ); 251 252 NativeParallelFor( num_threads, parallel_puts<T>(b) ); 253 { 254 touches< T > t( num_threads ); 255 NativeParallelFor( num_threads, parallel_gets<T>(b3, t) ); 256 g.wait_for_all(); 257 CHECK_MESSAGE( t.validate_touches(), "" ); 258 } 259 j = bogus_value; 260 g.wait_for_all(); 261 CHECK_MESSAGE( b.try_get( j ) == false, "" ); 262 g.wait_for_all(); 263 CHECK_MESSAGE( b2.try_get( j ) == false, "" ); 264 g.wait_for_all(); 265 CHECK_MESSAGE( b3.try_get( j ) == false, "" ); 266 CHECK_MESSAGE( j == bogus_value, "" ); 267 268 // test copy constructor 269 CHECK_MESSAGE( b.remove_successor( b2 ), "" ); 270 // fill up b: 271 NativeParallelFor( num_threads, parallel_puts<T>(b) ); 272 // copy b: 273 tbb::flow::buffer_node<T> b_copy(b); 274 275 // b_copy should be empty 276 j = bogus_value; 277 g.wait_for_all(); 278 CHECK_MESSAGE( b_copy.try_get( j ) == false, "" ); 279 280 // hook them together: 281 CHECK_MESSAGE( b.register_successor(b_copy) == true, "" ); 282 // try to get content from b_copy 283 { 284 touches< T > t( num_threads ); 285 NativeParallelFor( num_threads, parallel_gets<T>(b_copy, t) ); 286 g.wait_for_all(); 287 CHECK_MESSAGE( t.validate_touches(), "" ); 288 } 289 // now both should be empty 290 j = bogus_value; 291 g.wait_for_all(); 292 CHECK_MESSAGE( b.try_get( j ) == false, "" ); 293 g.wait_for_all(); 294 CHECK_MESSAGE( b_copy.try_get( j ) == false, "" ); 295 CHECK_MESSAGE( j == bogus_value, "" ); 296 297 delete [] next_value; 298 return 0; 299 } 300 301 // 302 // Tests 303 // 304 // Predecessors cannot be registered 305 // Empty buffer rejects item requests 306 // Single serial sender, items in arbitrary order 307 // Chained buffers ( 2 & 3 ), single sender, items at last buffer in arbitrary order 308 // 309 310 #define TBB_INTERNAL_NAMESPACE detail::d1 311 using tbb::TBB_INTERNAL_NAMESPACE::register_predecessor; 312 using tbb::TBB_INTERNAL_NAMESPACE::remove_predecessor; 313 314 template< typename T > 315 int test_serial() { 316 tbb::flow::graph g; 317 T bogus_value(-1); 318 319 tbb::flow::buffer_node<T> b(g); 320 tbb::flow::buffer_node<T> b2(g); 321 T j = bogus_value; 322 323 // 324 // Rejects attempts to add / remove predecessor 325 // Rejects request from empty buffer 326 // 327 CHECK_MESSAGE( register_predecessor<T>( b, b2 ) == false, "" ); 328 CHECK_MESSAGE( remove_predecessor<T>( b, b2 ) == false, "" ); 329 CHECK_MESSAGE( b.try_get( j ) == false, "" ); 330 CHECK_MESSAGE( j == bogus_value, "" ); 331 332 // 333 // Simple puts and gets 334 // 335 336 for (int i = 0; i < N; ++i) { 337 bool msg = b.try_put( T(i) ); 338 CHECK_MESSAGE( msg == true, "" ); 339 } 340 341 T vsum = T(0); 342 for (int i = 0; i < N; ++i) { 343 j = bogus_value; 344 spin_try_get( b, j ); 345 vsum += j; 346 } 347 CHECK_MESSAGE( vsum == (N*(N-1))/2, ""); 348 j = bogus_value; 349 g.wait_for_all(); 350 CHECK_MESSAGE( b.try_get( j ) == false, "" ); 351 CHECK_MESSAGE( j == bogus_value, "" ); 352 353 tbb::flow::make_edge(b, b2); 354 355 vsum = T(0); 356 for (int i = 0; i < N; ++i) { 357 bool msg = b.try_put( T(i) ); 358 CHECK_MESSAGE( msg == true, "" ); 359 } 360 361 for (int i = 0; i < N; ++i) { 362 j = bogus_value; 363 spin_try_get( b2, j ); 364 vsum += j; 365 } 366 CHECK_MESSAGE( vsum == (N*(N-1))/2, ""); 367 j = bogus_value; 368 g.wait_for_all(); 369 CHECK_MESSAGE( b.try_get( j ) == false, "" ); 370 g.wait_for_all(); 371 CHECK_MESSAGE( b2.try_get( j ) == false, "" ); 372 CHECK_MESSAGE( j == bogus_value, "" ); 373 374 tbb::flow::remove_edge(b, b2); 375 CHECK_MESSAGE( b.try_put( 1 ) == true, "" ); 376 g.wait_for_all(); 377 CHECK_MESSAGE( b2.try_get( j ) == false, "" ); 378 CHECK_MESSAGE( j == bogus_value, "" ); 379 g.wait_for_all(); 380 CHECK_MESSAGE( b.try_get( j ) == true, "" ); 381 CHECK_MESSAGE( j == 1, "" ); 382 383 tbb::flow::buffer_node<T> b3(g); 384 tbb::flow::make_edge( b, b2 ); 385 tbb::flow::make_edge( b2, b3 ); 386 387 vsum = T(0); 388 for (int i = 0; i < N; ++i) { 389 bool msg = b.try_put( T(i) ); 390 CHECK_MESSAGE( msg == true, "" ); 391 } 392 393 for (int i = 0; i < N; ++i) { 394 j = bogus_value; 395 spin_try_get( b3, j ); 396 vsum += j; 397 } 398 CHECK_MESSAGE( vsum == (N*(N-1))/2, ""); 399 j = bogus_value; 400 g.wait_for_all(); 401 CHECK_MESSAGE( b.try_get( j ) == false, "" ); 402 g.wait_for_all(); 403 CHECK_MESSAGE( b2.try_get( j ) == false, "" ); 404 g.wait_for_all(); 405 CHECK_MESSAGE( b3.try_get( j ) == false, "" ); 406 CHECK_MESSAGE( j == bogus_value, "" ); 407 408 tbb::flow::remove_edge(b, b2); 409 CHECK_MESSAGE( b.try_put( 1 ) == true, "" ); 410 g.wait_for_all(); 411 CHECK_MESSAGE( b2.try_get( j ) == false, "" ); 412 CHECK_MESSAGE( j == bogus_value, "" ); 413 g.wait_for_all(); 414 CHECK_MESSAGE( b3.try_get( j ) == false, "" ); 415 CHECK_MESSAGE( j == bogus_value, "" ); 416 g.wait_for_all(); 417 CHECK_MESSAGE( b.try_get( j ) == true, "" ); 418 CHECK_MESSAGE( j == 1, "" ); 419 420 return 0; 421 } 422 423 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 424 #include <array> 425 #include <vector> 426 void test_follows_and_precedes_api() { 427 using msg_t = tbb::flow::continue_msg; 428 429 std::array<msg_t, 3> messages_for_follows = { {msg_t(), msg_t(), msg_t()} }; 430 std::vector<msg_t> messages_for_precedes = {msg_t(), msg_t(), msg_t()}; 431 432 follows_and_precedes_testing::test_follows<msg_t, tbb::flow::buffer_node<msg_t>>(messages_for_follows); 433 follows_and_precedes_testing::test_precedes<msg_t, tbb::flow::buffer_node<msg_t>>(messages_for_precedes); 434 } 435 #endif 436 437 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT 438 void test_deduction_guides() { 439 using namespace tbb::flow; 440 graph g; 441 broadcast_node<int> br(g); 442 buffer_node<int> b0(g); 443 444 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 445 buffer_node b1(follows(br)); 446 static_assert(std::is_same_v<decltype(b1), buffer_node<int>>); 447 448 buffer_node b2(precedes(br)); 449 static_assert(std::is_same_v<decltype(b2), buffer_node<int>>); 450 #endif 451 452 buffer_node b3(b0); 453 static_assert(std::is_same_v<decltype(b3), buffer_node<int>>); 454 g.wait_for_all(); 455 } 456 #endif 457 458 #include <iomanip> 459 460 //! Test buffer_node with parallel and serial neighbours 461 //! \brief \ref requirement \ref error_guessing 462 TEST_CASE("Serial and parallel test"){ 463 for (int p = 2; p <= 4; ++p) { 464 tbb::global_control thread_limit(tbb::global_control::max_allowed_parallelism, p); 465 tbb::task_arena arena(p); 466 arena.execute( 467 [&]() { 468 test_serial<int>(); 469 test_parallel<int>(p); 470 } 471 ); 472 } 473 } 474 475 //! Test reset and cancellation behavior 476 //! \brief \ref error_guessing 477 TEST_CASE("Resets"){ 478 test_resets<int,tbb::flow::buffer_node<int> >(); 479 test_resets<float,tbb::flow::buffer_node<float> >(); 480 } 481 482 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 483 //! Test deprecated follows and preceedes API 484 //! \brief \ref error_guessing 485 TEST_CASE("Follows and precedes API"){ 486 test_follows_and_precedes_api(); 487 } 488 #endif 489 490 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT 491 //! Test deduction guides 492 //! \brief requirement 493 TEST_CASE("Deduction guides"){ 494 test_deduction_guides(); 495 } 496 #endif 497