1 /* 2 Copyright (c) 2005-2021 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #include "common/config.h" 18 19 #include "tbb/flow_graph.h" 20 21 #include "common/test.h" 22 #include "common/utils.h" 23 #include "common/graph_utils.h" 24 #include "common/test_follows_and_precedes_api.h" 25 26 27 //! \file test_buffer_node.cpp 28 //! \brief Test for [flow_graph.buffer_node] specification 29 30 31 #define N 1000 32 #define C 10 33 34 template< typename T > 35 void spin_try_get( tbb::flow::buffer_node<T> &b, T &value ) { 36 while ( b.try_get(value) != true ) {} 37 } 38 39 template< typename T > 40 void check_item( T* count_value, T &value ) { 41 count_value[value / N] += value % N; 42 } 43 44 template< typename T > 45 struct parallel_puts : utils::NoAssign { 46 47 tbb::flow::buffer_node<T> &my_b; 48 49 parallel_puts( tbb::flow::buffer_node<T> &b ) : my_b(b) {} 50 51 void operator()(int i) const { 52 for (int j = 0; j < N; ++j) { 53 bool msg = my_b.try_put( T(N*i + j) ); 54 CHECK_MESSAGE( msg == true, "" ); 55 } 56 } 57 }; 58 59 template< typename T > 60 struct touches { 61 62 bool **my_touches; 63 int my_num_threads; 64 65 touches( int num_threads ) : my_num_threads(num_threads) { 66 my_touches = new bool* [my_num_threads]; 67 for ( int p = 0; p < my_num_threads; ++p) { 68 my_touches[p] = new bool[N]; 69 for ( int n = 0; n < N; ++n) 70 my_touches[p][n] = false; 71 } 72 } 73 74 ~touches() { 75 for ( int p = 0; p < my_num_threads; ++p) { 76 delete [] my_touches[p]; 77 } 78 delete [] my_touches; 79 } 80 81 bool check( T v ) { 82 CHECK_MESSAGE( my_touches[v/N][v%N] == false, "" ); 83 my_touches[v/N][v%N] = true; 84 return true; 85 } 86 87 bool validate_touches() { 88 for ( int p = 0; p < my_num_threads; ++p) { 89 for ( int n = 0; n < N; ++n) { 90 CHECK_MESSAGE( my_touches[p][n] == true, "" ); 91 } 92 } 93 return true; 94 } 95 }; 96 97 template< typename T > 98 struct parallel_gets : utils::NoAssign { 99 100 tbb::flow::buffer_node<T> &my_b; 101 touches<T> &my_touches; 102 103 parallel_gets( tbb::flow::buffer_node<T> &b, touches<T> &t) : my_b(b), my_touches(t) {} 104 105 void operator()(int) const { 106 for (int j = 0; j < N; ++j) { 107 T v; 108 spin_try_get( my_b, v ); 109 my_touches.check( v ); 110 } 111 } 112 113 }; 114 115 template< typename T > 116 struct parallel_put_get : utils::NoAssign { 117 118 tbb::flow::buffer_node<T> &my_b; 119 touches<T> &my_touches; 120 121 parallel_put_get( tbb::flow::buffer_node<T> &b, touches<T> &t ) : my_b(b), my_touches(t) {} 122 123 void operator()(int tid) const { 124 125 for ( int i = 0; i < N; i+=C ) { 126 int j_end = ( N < i + C ) ? N : i + C; 127 // dump about C values into the buffer 128 for ( int j = i; j < j_end; ++j ) { 129 CHECK_MESSAGE( my_b.try_put( T (N*tid + j ) ) == true, "" ); 130 } 131 // receiver about C values from the buffer 132 for ( int j = i; j < j_end; ++j ) { 133 T v; 134 spin_try_get( my_b, v ); 135 my_touches.check( v ); 136 } 137 } 138 } 139 140 }; 141 142 // 143 // Tests 144 // 145 // Item can be reserved, released, consumed ( single serial receiver ) 146 // 147 template< typename T > 148 int test_reservation() { 149 tbb::flow::graph g; 150 T bogus_value(-1); 151 152 // Simple tests 153 tbb::flow::buffer_node<T> b(g); 154 155 b.try_put(T(1)); 156 b.try_put(T(2)); 157 b.try_put(T(3)); 158 159 T v, vsum; 160 CHECK_MESSAGE( b.try_reserve(v) == true, "" ); 161 CHECK_MESSAGE( b.try_release() == true, "" ); 162 v = bogus_value; 163 g.wait_for_all(); 164 CHECK_MESSAGE( b.try_reserve(v) == true, "" ); 165 CHECK_MESSAGE( b.try_consume() == true, "" ); 166 vsum += v; 167 v = bogus_value; 168 g.wait_for_all(); 169 170 CHECK_MESSAGE( b.try_get(v) == true, "" ); 171 vsum += v; 172 v = bogus_value; 173 g.wait_for_all(); 174 175 CHECK_MESSAGE( b.try_reserve(v) == true, "" ); 176 CHECK_MESSAGE( b.try_release() == true, "" ); 177 v = bogus_value; 178 g.wait_for_all(); 179 CHECK_MESSAGE( b.try_reserve(v) == true, "" ); 180 CHECK_MESSAGE( b.try_consume() == true, "" ); 181 vsum += v; 182 CHECK_MESSAGE( vsum == T(6), ""); 183 v = bogus_value; 184 g.wait_for_all(); 185 186 return 0; 187 } 188 189 // 190 // Tests 191 // 192 // multiple parallel senders, items in arbitrary order 193 // multiple parallel senders, multiple parallel receivers, items in arbitrary order and all items received 194 // * overlapped puts / gets 195 // * all puts finished before any getS 196 // 197 template< typename T > 198 int test_parallel(int num_threads) { 199 tbb::flow::graph g; 200 tbb::flow::buffer_node<T> b(g); 201 tbb::flow::buffer_node<T> b2(g); 202 tbb::flow::buffer_node<T> b3(g); 203 T bogus_value(-1); 204 T j = bogus_value; 205 206 NativeParallelFor( num_threads, parallel_puts<T>(b) ); 207 208 T *next_value = new T[num_threads]; 209 for (int tid = 0; tid < num_threads; ++tid) next_value[tid] = T(0); 210 211 for (int i = 0; i < num_threads * N; ++i ) { 212 spin_try_get( b, j ); 213 check_item( next_value, j ); 214 j = bogus_value; 215 } 216 for (int tid = 0; tid < num_threads; ++tid) { 217 CHECK_MESSAGE( next_value[tid] == T((N*(N-1))/2), "" ); 218 } 219 220 j = bogus_value; 221 g.wait_for_all(); 222 CHECK_MESSAGE( b.try_get( j ) == false, "" ); 223 CHECK_MESSAGE( j == bogus_value, "" ); 224 225 NativeParallelFor( num_threads, parallel_puts<T>(b) ); 226 227 { 228 touches< T > t( num_threads ); 229 NativeParallelFor( num_threads, parallel_gets<T>(b, t) ); 230 g.wait_for_all(); 231 CHECK_MESSAGE( t.validate_touches(), "" ); 232 } 233 j = bogus_value; 234 CHECK_MESSAGE( b.try_get( j ) == false, "" ); 235 CHECK_MESSAGE( j == bogus_value, "" ); 236 237 g.wait_for_all(); 238 { 239 touches< T > t( num_threads ); 240 NativeParallelFor( num_threads, parallel_put_get<T>(b, t) ); 241 g.wait_for_all(); 242 CHECK_MESSAGE( t.validate_touches(), "" ); 243 } 244 j = bogus_value; 245 CHECK_MESSAGE( b.try_get( j ) == false, "" ); 246 CHECK_MESSAGE( j == bogus_value, "" ); 247 248 tbb::flow::make_edge( b, b2 ); 249 tbb::flow::make_edge( b2, b3 ); 250 251 NativeParallelFor( num_threads, parallel_puts<T>(b) ); 252 { 253 touches< T > t( num_threads ); 254 NativeParallelFor( num_threads, parallel_gets<T>(b3, t) ); 255 g.wait_for_all(); 256 CHECK_MESSAGE( t.validate_touches(), "" ); 257 } 258 j = bogus_value; 259 g.wait_for_all(); 260 CHECK_MESSAGE( b.try_get( j ) == false, "" ); 261 g.wait_for_all(); 262 CHECK_MESSAGE( b2.try_get( j ) == false, "" ); 263 g.wait_for_all(); 264 CHECK_MESSAGE( b3.try_get( j ) == false, "" ); 265 CHECK_MESSAGE( j == bogus_value, "" ); 266 267 // test copy constructor 268 CHECK_MESSAGE( b.remove_successor( b2 ), "" ); 269 // fill up b: 270 NativeParallelFor( num_threads, parallel_puts<T>(b) ); 271 // copy b: 272 tbb::flow::buffer_node<T> b_copy(b); 273 274 // b_copy should be empty 275 j = bogus_value; 276 g.wait_for_all(); 277 CHECK_MESSAGE( b_copy.try_get( j ) == false, "" ); 278 279 // hook them together: 280 CHECK_MESSAGE( b.register_successor(b_copy) == true, "" ); 281 // try to get content from b_copy 282 { 283 touches< T > t( num_threads ); 284 NativeParallelFor( num_threads, parallel_gets<T>(b_copy, t) ); 285 g.wait_for_all(); 286 CHECK_MESSAGE( t.validate_touches(), "" ); 287 } 288 // now both should be empty 289 j = bogus_value; 290 g.wait_for_all(); 291 CHECK_MESSAGE( b.try_get( j ) == false, "" ); 292 g.wait_for_all(); 293 CHECK_MESSAGE( b_copy.try_get( j ) == false, "" ); 294 CHECK_MESSAGE( j == bogus_value, "" ); 295 296 delete [] next_value; 297 return 0; 298 } 299 300 // 301 // Tests 302 // 303 // Predecessors cannot be registered 304 // Empty buffer rejects item requests 305 // Single serial sender, items in arbitrary order 306 // Chained buffers ( 2 & 3 ), single sender, items at last buffer in arbitrary order 307 // 308 309 #define TBB_INTERNAL_NAMESPACE detail::d1 310 using tbb::TBB_INTERNAL_NAMESPACE::register_predecessor; 311 using tbb::TBB_INTERNAL_NAMESPACE::remove_predecessor; 312 313 template< typename T > 314 int test_serial() { 315 tbb::flow::graph g; 316 T bogus_value(-1); 317 318 tbb::flow::buffer_node<T> b(g); 319 tbb::flow::buffer_node<T> b2(g); 320 T j = bogus_value; 321 322 // 323 // Rejects attempts to add / remove predecessor 324 // Rejects request from empty buffer 325 // 326 CHECK_MESSAGE( register_predecessor<T>( b, b2 ) == false, "" ); 327 CHECK_MESSAGE( remove_predecessor<T>( b, b2 ) == false, "" ); 328 CHECK_MESSAGE( b.try_get( j ) == false, "" ); 329 CHECK_MESSAGE( j == bogus_value, "" ); 330 331 // 332 // Simple puts and gets 333 // 334 335 for (int i = 0; i < N; ++i) { 336 bool msg = b.try_put( T(i) ); 337 CHECK_MESSAGE( msg == true, "" ); 338 } 339 340 T vsum = T(0); 341 for (int i = 0; i < N; ++i) { 342 j = bogus_value; 343 spin_try_get( b, j ); 344 vsum += j; 345 } 346 CHECK_MESSAGE( vsum == (N*(N-1))/2, ""); 347 j = bogus_value; 348 g.wait_for_all(); 349 CHECK_MESSAGE( b.try_get( j ) == false, "" ); 350 CHECK_MESSAGE( j == bogus_value, "" ); 351 352 tbb::flow::make_edge(b, b2); 353 354 vsum = T(0); 355 for (int i = 0; i < N; ++i) { 356 bool msg = b.try_put( T(i) ); 357 CHECK_MESSAGE( msg == true, "" ); 358 } 359 360 for (int i = 0; i < N; ++i) { 361 j = bogus_value; 362 spin_try_get( b2, j ); 363 vsum += j; 364 } 365 CHECK_MESSAGE( vsum == (N*(N-1))/2, ""); 366 j = bogus_value; 367 g.wait_for_all(); 368 CHECK_MESSAGE( b.try_get( j ) == false, "" ); 369 g.wait_for_all(); 370 CHECK_MESSAGE( b2.try_get( j ) == false, "" ); 371 CHECK_MESSAGE( j == bogus_value, "" ); 372 373 tbb::flow::remove_edge(b, b2); 374 CHECK_MESSAGE( b.try_put( 1 ) == true, "" ); 375 g.wait_for_all(); 376 CHECK_MESSAGE( b2.try_get( j ) == false, "" ); 377 CHECK_MESSAGE( j == bogus_value, "" ); 378 g.wait_for_all(); 379 CHECK_MESSAGE( b.try_get( j ) == true, "" ); 380 CHECK_MESSAGE( j == 1, "" ); 381 382 tbb::flow::buffer_node<T> b3(g); 383 tbb::flow::make_edge( b, b2 ); 384 tbb::flow::make_edge( b2, b3 ); 385 386 vsum = T(0); 387 for (int i = 0; i < N; ++i) { 388 bool msg = b.try_put( T(i) ); 389 CHECK_MESSAGE( msg == true, "" ); 390 } 391 392 for (int i = 0; i < N; ++i) { 393 j = bogus_value; 394 spin_try_get( b3, j ); 395 vsum += j; 396 } 397 CHECK_MESSAGE( vsum == (N*(N-1))/2, ""); 398 j = bogus_value; 399 g.wait_for_all(); 400 CHECK_MESSAGE( b.try_get( j ) == false, "" ); 401 g.wait_for_all(); 402 CHECK_MESSAGE( b2.try_get( j ) == false, "" ); 403 g.wait_for_all(); 404 CHECK_MESSAGE( b3.try_get( j ) == false, "" ); 405 CHECK_MESSAGE( j == bogus_value, "" ); 406 407 tbb::flow::remove_edge(b, b2); 408 CHECK_MESSAGE( b.try_put( 1 ) == true, "" ); 409 g.wait_for_all(); 410 CHECK_MESSAGE( b2.try_get( j ) == false, "" ); 411 CHECK_MESSAGE( j == bogus_value, "" ); 412 g.wait_for_all(); 413 CHECK_MESSAGE( b3.try_get( j ) == false, "" ); 414 CHECK_MESSAGE( j == bogus_value, "" ); 415 g.wait_for_all(); 416 CHECK_MESSAGE( b.try_get( j ) == true, "" ); 417 CHECK_MESSAGE( j == 1, "" ); 418 419 return 0; 420 } 421 422 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 423 #include <array> 424 #include <vector> 425 void test_follows_and_precedes_api() { 426 using msg_t = tbb::flow::continue_msg; 427 428 std::array<msg_t, 3> messages_for_follows = { {msg_t(), msg_t(), msg_t()} }; 429 std::vector<msg_t> messages_for_precedes = {msg_t(), msg_t(), msg_t()}; 430 431 follows_and_precedes_testing::test_follows<msg_t, tbb::flow::buffer_node<msg_t>>(messages_for_follows); 432 follows_and_precedes_testing::test_precedes<msg_t, tbb::flow::buffer_node<msg_t>>(messages_for_precedes); 433 } 434 #endif 435 436 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT 437 void test_deduction_guides() { 438 using namespace tbb::flow; 439 graph g; 440 broadcast_node<int> br(g); 441 buffer_node<int> b0(g); 442 443 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 444 buffer_node b1(follows(br)); 445 static_assert(std::is_same_v<decltype(b1), buffer_node<int>>); 446 447 buffer_node b2(precedes(br)); 448 static_assert(std::is_same_v<decltype(b2), buffer_node<int>>); 449 #endif 450 451 buffer_node b3(b0); 452 static_assert(std::is_same_v<decltype(b3), buffer_node<int>>); 453 g.wait_for_all(); 454 } 455 #endif 456 457 #include <iomanip> 458 459 //! Test buffer_node with parallel and serial neighbours 460 //! \brief \ref requirement \ref error_guessing 461 TEST_CASE("Serial and parallel test"){ 462 for (int p = 2; p <= 4; ++p) { 463 tbb::task_arena arena(p); 464 arena.execute( 465 [&]() { 466 test_serial<int>(); 467 test_parallel<int>(p); 468 } 469 ); 470 } 471 } 472 473 //! Test reset and cancellation behavior 474 //! \brief \ref error_guessing 475 TEST_CASE("Resets"){ 476 test_resets<int,tbb::flow::buffer_node<int> >(); 477 test_resets<float,tbb::flow::buffer_node<float> >(); 478 } 479 480 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 481 //! Test deprecated follows and preceedes API 482 //! \brief \ref error_guessing 483 TEST_CASE("Follows and precedes API"){ 484 test_follows_and_precedes_api(); 485 } 486 #endif 487 488 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT 489 //! Test deduction guides 490 //! \brief requirement 491 TEST_CASE("Deduction guides"){ 492 test_deduction_guides(); 493 } 494 #endif 495