1 /* 2 Copyright (c) 2005-2020 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #include "common/config.h" 18 19 #include "tbb/flow_graph.h" 20 21 #include "common/test.h" 22 #include "common/utils.h" 23 #include "common/utils_assert.h" 24 #include "common/test_follows_and_precedes_api.h" 25 26 #include <cstdio> 27 #include <atomic> 28 29 30 //! \file test_sequencer_node.cpp 31 //! \brief Test for [flow_graph.sequencer_node] specification 32 33 34 #define N 1000 35 #define C 10 36 37 template< typename T > 38 struct seq_inspector { 39 size_t operator()(const T &v) const { return size_t(v); } 40 }; 41 42 template< typename T > 43 bool wait_try_get( tbb::flow::graph &g, tbb::flow::sequencer_node<T> &q, T &value ) { 44 g.wait_for_all(); 45 return q.try_get(value); 46 } 47 48 template< typename T > 49 void spin_try_get( tbb::flow::queue_node<T> &q, T &value ) { 50 while ( q.try_get(value) != true ) ; 51 } 52 53 template< typename T > 54 struct parallel_puts : utils::NoAssign { 55 56 tbb::flow::sequencer_node<T> &my_q; 57 int my_num_threads; 58 59 parallel_puts( tbb::flow::sequencer_node<T> &q, int num_threads ) : my_q(q), my_num_threads(num_threads) {} 60 61 void operator()(int tid) const { 62 for (int j = tid; j < N; j+=my_num_threads) { 63 bool msg = my_q.try_put( T(j) ); 64 CHECK_MESSAGE( msg == true, "" ); 65 } 66 } 67 68 }; 69 70 template< typename T > 71 struct touches { 72 73 bool **my_touches; 74 T *my_last_touch; 75 int my_num_threads; 76 77 touches( int num_threads ) : my_num_threads(num_threads) { 78 my_last_touch = new T[my_num_threads]; 79 my_touches = new bool* [my_num_threads]; 80 for ( int p = 0; p < my_num_threads; ++p) { 81 my_last_touch[p] = T(-1); 82 my_touches[p] = new bool[N]; 83 for ( int n = 0; n < N; ++n) 84 my_touches[p][n] = false; 85 } 86 } 87 88 ~touches() { 89 for ( int p = 0; p < my_num_threads; ++p) { 90 delete [] my_touches[p]; 91 } 92 delete [] my_touches; 93 delete [] my_last_touch; 94 } 95 96 bool check( int tid, T v ) { 97 if ( my_touches[tid][v] != false ) { 98 printf("Error: value seen twice by local thread\n"); 99 return false; 100 } 101 if ( v <= my_last_touch[tid] ) { 102 printf("Error: value seen in wrong order by local thread\n"); 103 return false; 104 } 105 my_last_touch[tid] = v; 106 my_touches[tid][v] = true; 107 return true; 108 } 109 110 bool validate_touches() { 111 bool *all_touches = new bool[N]; 112 for ( int n = 0; n < N; ++n) 113 all_touches[n] = false; 114 115 for ( int p = 0; p < my_num_threads; ++p) { 116 for ( int n = 0; n < N; ++n) { 117 if ( my_touches[p][n] == true ) { 118 CHECK_MESSAGE( ( all_touches[n] == false), "value see by more than one thread\n" ); 119 all_touches[n] = true; 120 } 121 } 122 } 123 for ( int n = 0; n < N; ++n) { 124 if ( !all_touches[n] ) 125 printf("No touch at %d, my_num_threads = %d\n", n, my_num_threads); 126 //CHECK_MESSAGE( ( all_touches[n] == true), "value not seen by any thread\n" ); 127 } 128 delete [] all_touches; 129 return true; 130 } 131 132 }; 133 134 template< typename T > 135 struct parallel_gets : utils::NoAssign { 136 137 tbb::flow::sequencer_node<T> &my_q; 138 int my_num_threads; 139 touches<T> &my_touches; 140 141 parallel_gets( tbb::flow::sequencer_node<T> &q, int num_threads, touches<T> &t ) : my_q(q), my_num_threads(num_threads), my_touches(t) {} 142 143 void operator()(int tid) const { 144 for (int j = tid; j < N; j+=my_num_threads) { 145 T v; 146 spin_try_get( my_q, v ); 147 my_touches.check( tid, v ); 148 } 149 } 150 151 }; 152 153 template< typename T > 154 struct parallel_put_get : utils::NoAssign { 155 156 tbb::flow::sequencer_node<T> &my_s1; 157 tbb::flow::sequencer_node<T> &my_s2; 158 int my_num_threads; 159 std::atomic< int > &my_counter; 160 touches<T> &my_touches; 161 162 parallel_put_get( tbb::flow::sequencer_node<T> &s1, tbb::flow::sequencer_node<T> &s2, int num_threads, 163 std::atomic<int> &counter, touches<T> &t ) : my_s1(s1), my_s2(s2), my_num_threads(num_threads), my_counter(counter), my_touches(t) {} 164 165 void operator()(int tid) const { 166 int i_start = 0; 167 168 while ( (i_start = my_counter.fetch_add(C)) < N ) { 169 int i_end = ( N < i_start + C ) ? N : i_start + C; 170 for (int i = i_start; i < i_end; ++i) { 171 bool msg = my_s1.try_put( T(i) ); 172 CHECK_MESSAGE( msg == true, "" ); 173 } 174 175 for (int i = i_start; i < i_end; ++i) { 176 T v; 177 spin_try_get( my_s2, v ); 178 my_touches.check( tid, v ); 179 } 180 } 181 } 182 183 }; 184 185 // 186 // Tests 187 // 188 // multiple parallel senders, multiple receivers, properly sequenced (relative to receiver) at output 189 // chained sequencers, multiple parallel senders, multiple receivers, properly sequenced (relative to receiver) at output 190 // 191 192 template< typename T > 193 int test_parallel(int num_threads) { 194 tbb::flow::graph g; 195 196 tbb::flow::sequencer_node<T> s(g, seq_inspector<T>()); 197 utils::NativeParallelFor( num_threads, parallel_puts<T>(s, num_threads) ); 198 { 199 touches<T> t( num_threads ); 200 utils::NativeParallelFor( num_threads, parallel_gets<T>(s, num_threads, t) ); 201 g.wait_for_all(); 202 CHECK_MESSAGE( t.validate_touches(), "" ); 203 } 204 T bogus_value(-1); 205 T j = bogus_value; 206 CHECK_MESSAGE( s.try_get( j ) == false, "" ); 207 CHECK_MESSAGE( j == bogus_value, "" ); 208 g.wait_for_all(); 209 210 tbb::flow::sequencer_node<T> s1(g, seq_inspector<T>()); 211 tbb::flow::sequencer_node<T> s2(g, seq_inspector<T>()); 212 tbb::flow::sequencer_node<T> s3(g, seq_inspector<T>()); 213 tbb::flow::make_edge( s1, s2 ); 214 tbb::flow::make_edge( s2, s3 ); 215 216 { 217 touches<T> t( num_threads ); 218 std::atomic<int> counter; 219 counter = 0; 220 utils::NativeParallelFor( num_threads, parallel_put_get<T>(s1, s3, num_threads, counter, t) ); 221 g.wait_for_all(); 222 t.validate_touches(); 223 } 224 g.wait_for_all(); 225 CHECK_MESSAGE( s1.try_get( j ) == false, "" ); 226 g.wait_for_all(); 227 CHECK_MESSAGE( s2.try_get( j ) == false, "" ); 228 g.wait_for_all(); 229 CHECK_MESSAGE( s3.try_get( j ) == false, "" ); 230 CHECK_MESSAGE( j == bogus_value, "" ); 231 232 // test copy constructor 233 tbb::flow::sequencer_node<T> s_copy(s); 234 utils::NativeParallelFor( num_threads, parallel_puts<T>(s_copy, num_threads) ); 235 for (int i = 0; i < N; ++i) { 236 j = bogus_value; 237 spin_try_get( s_copy, j ); 238 CHECK_MESSAGE( i == j, "" ); 239 } 240 j = bogus_value; 241 g.wait_for_all(); 242 CHECK_MESSAGE( s_copy.try_get( j ) == false, "" ); 243 CHECK_MESSAGE( j == bogus_value, "" ); 244 245 return 0; 246 } 247 248 249 // 250 // Tests 251 // 252 // No predecessors can be registered 253 // Request from empty buffer fails 254 // In-order puts, single sender, single receiver, properly sequenced at output 255 // Reverse-order puts, single sender, single receiver, properly sequenced at output 256 // Chained sequencers (3), in-order and reverse-order tests, properly sequenced at output 257 // 258 259 template< typename T > 260 int test_serial() { 261 tbb::flow::graph g; 262 T bogus_value(-1); 263 264 tbb::flow::sequencer_node<T> s(g, seq_inspector<T>()); 265 tbb::flow::sequencer_node<T> s2(g, seq_inspector<T>()); 266 T j = bogus_value; 267 268 // 269 // Rejects attempts to add / remove predecessor 270 // Rejects request from empty Q 271 // 272 CHECK_MESSAGE( register_predecessor( s, s2 ) == false, "" ); 273 CHECK_MESSAGE( remove_predecessor( s, s2 ) == false, "" ); 274 CHECK_MESSAGE( s.try_get( j ) == false, "" ); 275 CHECK_MESSAGE( j == bogus_value, "" ); 276 277 // 278 // In-order simple puts and gets 279 // 280 281 for (int i = 0; i < N; ++i) { 282 bool msg = s.try_put( T(i) ); 283 CHECK_MESSAGE( msg == true, "" ); 284 CHECK_MESSAGE(!s.try_put( T(i) ), ""); // second attempt to put should reject 285 } 286 287 288 for (int i = 0; i < N; ++i) { 289 j = bogus_value; 290 CHECK_MESSAGE(wait_try_get( g, s, j ) == true, ""); 291 CHECK_MESSAGE( i == j, "" ); 292 CHECK_MESSAGE(!s.try_put( T(i) ),"" ); // after retrieving value, subsequent put should fail 293 } 294 j = bogus_value; 295 g.wait_for_all(); 296 CHECK_MESSAGE( s.try_get( j ) == false, "" ); 297 CHECK_MESSAGE( j == bogus_value, "" ); 298 299 // 300 // Reverse-order simple puts and gets 301 // 302 303 for (int i = N-1; i >= 0; --i) { 304 bool msg = s2.try_put( T(i) ); 305 CHECK_MESSAGE( msg == true, "" ); 306 } 307 308 for (int i = 0; i < N; ++i) { 309 j = bogus_value; 310 CHECK_MESSAGE(wait_try_get( g, s2, j ) == true, ""); 311 CHECK_MESSAGE( i == j, "" ); 312 } 313 j = bogus_value; 314 g.wait_for_all(); 315 CHECK_MESSAGE( s2.try_get( j ) == false, "" ); 316 CHECK_MESSAGE( j == bogus_value, "" ); 317 318 // 319 // Chained in-order simple puts and gets 320 // 321 322 tbb::flow::sequencer_node<T> s3(g, seq_inspector<T>()); 323 tbb::flow::sequencer_node<T> s4(g, seq_inspector<T>()); 324 tbb::flow::sequencer_node<T> s5(g, seq_inspector<T>()); 325 tbb::flow::make_edge( s3, s4 ); 326 tbb::flow::make_edge( s4, s5 ); 327 328 for (int i = 0; i < N; ++i) { 329 bool msg = s3.try_put( T(i) ); 330 CHECK_MESSAGE( msg == true, "" ); 331 } 332 333 for (int i = 0; i < N; ++i) { 334 j = bogus_value; 335 CHECK_MESSAGE(wait_try_get( g, s5, j ) == true, ""); 336 CHECK_MESSAGE( i == j, "" ); 337 } 338 j = bogus_value; 339 CHECK_MESSAGE( wait_try_get( g, s3, j ) == false, "" ); 340 CHECK_MESSAGE( wait_try_get( g, s4, j ) == false, "" ); 341 CHECK_MESSAGE( wait_try_get( g, s5, j ) == false, "" ); 342 CHECK_MESSAGE( j == bogus_value, "" ); 343 344 g.wait_for_all(); 345 tbb::flow::remove_edge( s3, s4 ); 346 CHECK_MESSAGE( s3.try_put( N ) == true, "" ); 347 CHECK_MESSAGE( wait_try_get( g, s4, j ) == false, "" ); 348 CHECK_MESSAGE( j == bogus_value, "" ); 349 CHECK_MESSAGE( wait_try_get( g, s5, j ) == false, "" ); 350 CHECK_MESSAGE( j == bogus_value, "" ); 351 CHECK_MESSAGE( wait_try_get( g, s3, j ) == true, "" ); 352 CHECK_MESSAGE( j == N, "" ); 353 354 // 355 // Chained reverse-order simple puts and gets 356 // 357 358 tbb::flow::sequencer_node<T> s6(g, seq_inspector<T>()); 359 tbb::flow::sequencer_node<T> s7(g, seq_inspector<T>()); 360 tbb::flow::sequencer_node<T> s8(g, seq_inspector<T>()); 361 tbb::flow::make_edge( s6, s7 ); 362 tbb::flow::make_edge( s7, s8 ); 363 364 for (int i = N-1; i >= 0; --i) { 365 bool msg = s6.try_put( T(i) ); 366 CHECK_MESSAGE( msg == true, "" ); 367 } 368 369 for (int i = 0; i < N; ++i) { 370 j = bogus_value; 371 CHECK_MESSAGE( wait_try_get( g, s8, j ) == true, "" ); 372 CHECK_MESSAGE( i == j, "" ); 373 } 374 j = bogus_value; 375 CHECK_MESSAGE( wait_try_get( g, s6, j ) == false, "" ); 376 CHECK_MESSAGE( wait_try_get( g, s7, j ) == false, "" ); 377 CHECK_MESSAGE( wait_try_get( g, s8, j ) == false, "" ); 378 CHECK_MESSAGE( j == bogus_value, "" ); 379 380 g.wait_for_all(); 381 tbb::flow::remove_edge( s6, s7 ); 382 CHECK_MESSAGE( s6.try_put( N ) == true, "" ); 383 CHECK_MESSAGE( wait_try_get( g, s7, j ) == false, "" ); 384 CHECK_MESSAGE( j == bogus_value, "" ); 385 CHECK_MESSAGE( wait_try_get( g, s8, j ) == false, "" ); 386 CHECK_MESSAGE( j == bogus_value, "" ); 387 CHECK_MESSAGE( wait_try_get( g, s6, j ) == true, "" ); 388 CHECK_MESSAGE( j == N, "" ); 389 390 return 0; 391 } 392 393 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 394 #include <array> 395 #include <vector> 396 void test_follows_and_precedes_api() { 397 std::array<int, 3> messages_for_follows = { {0, 1, 2} }; 398 std::vector<int> messages_for_precedes = {0, 1, 2}; 399 400 follows_and_precedes_testing::test_follows 401 <int, tbb::flow::sequencer_node<int>> 402 (messages_for_follows, [](const int& i) { return i; }); 403 404 follows_and_precedes_testing::test_precedes 405 <int, tbb::flow::sequencer_node<int>> 406 (messages_for_precedes, [](const int& i) { return i; }); 407 } 408 #endif 409 410 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT 411 template <typename Body> 412 void test_deduction_guides_common(Body body) { 413 using namespace tbb::flow; 414 graph g; 415 broadcast_node<int> br(g); 416 417 sequencer_node s1(g, body); 418 static_assert(std::is_same_v<decltype(s1), sequencer_node<int>>); 419 420 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 421 sequencer_node s2(follows(br), body); 422 static_assert(std::is_same_v<decltype(s2), sequencer_node<int>>); 423 #endif 424 425 sequencer_node s3(s1); 426 static_assert(std::is_same_v<decltype(s3), sequencer_node<int>>); 427 } 428 429 int sequencer_body_f(const int&) { return 1; } 430 431 void test_deduction_guides() { 432 test_deduction_guides_common([](const int&)->int { return 1; }); 433 test_deduction_guides_common([](const int&) mutable ->int { return 1; }); 434 test_deduction_guides_common(sequencer_body_f); 435 } 436 #endif 437 438 //! Test sequencer with various request orders and parallelism levels 439 //! \brief \ref requirement \ref error_guessing 440 TEST_CASE("Serial and parallel test"){ 441 for (int p = 2; p <= 4; ++p) { 442 tbb::task_arena arena(p); 443 arena.execute( 444 [&]() { 445 446 test_serial<int>(); 447 test_parallel<int>(p); 448 449 } 450 ); 451 } 452 } 453 454 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 455 //! Test decution guides 456 //! \brief \ref requirement 457 TEST_CASE("Test follows and precedes API"){ 458 test_follows_and_precedes_api(); 459 } 460 #endif 461 462 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT 463 //! Test decution guides 464 //! \brief \ref requirement 465 TEST_CASE("Test deduction guides"){ 466 test_deduction_guides(); 467 } 468 #endif 469