1 /* 2 Copyright (c) 2005-2021 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #include "common/config.h" 18 19 #include "tbb/flow_graph.h" 20 #include "tbb/global_control.h" 21 22 #include "common/test.h" 23 #include "common/utils.h" 24 #include "common/utils_assert.h" 25 #include "common/test_follows_and_precedes_api.h" 26 #include "common/concepts_common.h" 27 28 #include <cstdio> 29 #include <atomic> 30 31 32 //! \file test_sequencer_node.cpp 33 //! \brief Test for [flow_graph.sequencer_node] specification 34 35 36 #define N 1000 37 #define C 10 38 39 template< typename T > 40 struct seq_inspector { 41 size_t operator()(const T &v) const { return size_t(v); } 42 }; 43 44 template< typename T > 45 bool wait_try_get( tbb::flow::graph &g, tbb::flow::sequencer_node<T> &q, T &value ) { 46 g.wait_for_all(); 47 return q.try_get(value); 48 } 49 50 template< typename T > 51 void spin_try_get( tbb::flow::queue_node<T> &q, T &value ) { 52 while ( q.try_get(value) != true ) ; 53 } 54 55 template< typename T > 56 struct parallel_puts : utils::NoAssign { 57 58 tbb::flow::sequencer_node<T> &my_q; 59 int my_num_threads; 60 61 parallel_puts( tbb::flow::sequencer_node<T> &q, int num_threads ) : my_q(q), my_num_threads(num_threads) {} 62 63 void operator()(int tid) const { 64 for (int j = tid; j < N; j+=my_num_threads) { 65 bool msg = my_q.try_put( T(j) ); 66 CHECK_MESSAGE( msg == true, "" ); 67 } 68 } 69 70 }; 71 72 template< typename T > 73 struct touches { 74 75 bool **my_touches; 76 T *my_last_touch; 77 int my_num_threads; 78 79 touches( int num_threads ) : my_num_threads(num_threads) { 80 my_last_touch = new T[my_num_threads]; 81 my_touches = new bool* [my_num_threads]; 82 for ( int p = 0; p < my_num_threads; ++p) { 83 my_last_touch[p] = T(-1); 84 my_touches[p] = new bool[N]; 85 for ( int n = 0; n < N; ++n) 86 my_touches[p][n] = false; 87 } 88 } 89 90 ~touches() { 91 for ( int p = 0; p < my_num_threads; ++p) { 92 delete [] my_touches[p]; 93 } 94 delete [] my_touches; 95 delete [] my_last_touch; 96 } 97 98 bool check( int tid, T v ) { 99 if ( my_touches[tid][v] != false ) { 100 printf("Error: value seen twice by local thread\n"); 101 return false; 102 } 103 if ( v <= my_last_touch[tid] ) { 104 printf("Error: value seen in wrong order by local thread\n"); 105 return false; 106 } 107 my_last_touch[tid] = v; 108 my_touches[tid][v] = true; 109 return true; 110 } 111 112 bool validate_touches() { 113 bool *all_touches = new bool[N]; 114 for ( int n = 0; n < N; ++n) 115 all_touches[n] = false; 116 117 for ( int p = 0; p < my_num_threads; ++p) { 118 for ( int n = 0; n < N; ++n) { 119 if ( my_touches[p][n] == true ) { 120 CHECK_MESSAGE( ( all_touches[n] == false), "value see by more than one thread\n" ); 121 all_touches[n] = true; 122 } 123 } 124 } 125 for ( int n = 0; n < N; ++n) { 126 if ( !all_touches[n] ) 127 printf("No touch at %d, my_num_threads = %d\n", n, my_num_threads); 128 //CHECK_MESSAGE( ( all_touches[n] == true), "value not seen by any thread\n" ); 129 } 130 delete [] all_touches; 131 return true; 132 } 133 134 }; 135 136 template< typename T > 137 struct parallel_gets : utils::NoAssign { 138 139 tbb::flow::sequencer_node<T> &my_q; 140 int my_num_threads; 141 touches<T> &my_touches; 142 143 parallel_gets( tbb::flow::sequencer_node<T> &q, int num_threads, touches<T> &t ) : my_q(q), my_num_threads(num_threads), my_touches(t) {} 144 145 void operator()(int tid) const { 146 for (int j = tid; j < N; j+=my_num_threads) { 147 T v; 148 spin_try_get( my_q, v ); 149 my_touches.check( tid, v ); 150 } 151 } 152 153 }; 154 155 template< typename T > 156 struct parallel_put_get : utils::NoAssign { 157 158 tbb::flow::sequencer_node<T> &my_s1; 159 tbb::flow::sequencer_node<T> &my_s2; 160 int my_num_threads; 161 std::atomic< int > &my_counter; 162 touches<T> &my_touches; 163 164 parallel_put_get( tbb::flow::sequencer_node<T> &s1, tbb::flow::sequencer_node<T> &s2, int num_threads, 165 std::atomic<int> &counter, touches<T> &t ) : my_s1(s1), my_s2(s2), my_num_threads(num_threads), my_counter(counter), my_touches(t) {} 166 167 void operator()(int tid) const { 168 int i_start = 0; 169 170 while ( (i_start = my_counter.fetch_add(C)) < N ) { 171 int i_end = ( N < i_start + C ) ? N : i_start + C; 172 for (int i = i_start; i < i_end; ++i) { 173 bool msg = my_s1.try_put( T(i) ); 174 CHECK_MESSAGE( msg == true, "" ); 175 } 176 177 for (int i = i_start; i < i_end; ++i) { 178 T v; 179 spin_try_get( my_s2, v ); 180 my_touches.check( tid, v ); 181 } 182 } 183 } 184 185 }; 186 187 // 188 // Tests 189 // 190 // multiple parallel senders, multiple receivers, properly sequenced (relative to receiver) at output 191 // chained sequencers, multiple parallel senders, multiple receivers, properly sequenced (relative to receiver) at output 192 // 193 194 template< typename T > 195 int test_parallel(int num_threads) { 196 tbb::flow::graph g; 197 198 tbb::flow::sequencer_node<T> s(g, seq_inspector<T>()); 199 utils::NativeParallelFor( num_threads, parallel_puts<T>(s, num_threads) ); 200 { 201 touches<T> t( num_threads ); 202 utils::NativeParallelFor( num_threads, parallel_gets<T>(s, num_threads, t) ); 203 g.wait_for_all(); 204 CHECK_MESSAGE( t.validate_touches(), "" ); 205 } 206 T bogus_value(-1); 207 T j = bogus_value; 208 CHECK_MESSAGE( s.try_get( j ) == false, "" ); 209 CHECK_MESSAGE( j == bogus_value, "" ); 210 g.wait_for_all(); 211 212 tbb::flow::sequencer_node<T> s1(g, seq_inspector<T>()); 213 tbb::flow::sequencer_node<T> s2(g, seq_inspector<T>()); 214 tbb::flow::sequencer_node<T> s3(g, seq_inspector<T>()); 215 tbb::flow::make_edge( s1, s2 ); 216 tbb::flow::make_edge( s2, s3 ); 217 218 { 219 touches<T> t( num_threads ); 220 std::atomic<int> counter; 221 counter = 0; 222 utils::NativeParallelFor( num_threads, parallel_put_get<T>(s1, s3, num_threads, counter, t) ); 223 g.wait_for_all(); 224 t.validate_touches(); 225 } 226 g.wait_for_all(); 227 CHECK_MESSAGE( s1.try_get( j ) == false, "" ); 228 g.wait_for_all(); 229 CHECK_MESSAGE( s2.try_get( j ) == false, "" ); 230 g.wait_for_all(); 231 CHECK_MESSAGE( s3.try_get( j ) == false, "" ); 232 CHECK_MESSAGE( j == bogus_value, "" ); 233 234 // test copy constructor 235 tbb::flow::sequencer_node<T> s_copy(s); 236 utils::NativeParallelFor( num_threads, parallel_puts<T>(s_copy, num_threads) ); 237 for (int i = 0; i < N; ++i) { 238 j = bogus_value; 239 spin_try_get( s_copy, j ); 240 CHECK_MESSAGE( i == j, "" ); 241 } 242 j = bogus_value; 243 g.wait_for_all(); 244 CHECK_MESSAGE( s_copy.try_get( j ) == false, "" ); 245 CHECK_MESSAGE( j == bogus_value, "" ); 246 247 return 0; 248 } 249 250 251 // 252 // Tests 253 // 254 // No predecessors can be registered 255 // Request from empty buffer fails 256 // In-order puts, single sender, single receiver, properly sequenced at output 257 // Reverse-order puts, single sender, single receiver, properly sequenced at output 258 // Chained sequencers (3), in-order and reverse-order tests, properly sequenced at output 259 // 260 261 template< typename T > 262 int test_serial() { 263 tbb::flow::graph g; 264 T bogus_value(-1); 265 266 tbb::flow::sequencer_node<T> s(g, seq_inspector<T>()); 267 tbb::flow::sequencer_node<T> s2(g, seq_inspector<T>()); 268 T j = bogus_value; 269 270 // 271 // Rejects attempts to add / remove predecessor 272 // Rejects request from empty Q 273 // 274 CHECK_MESSAGE( register_predecessor( s, s2 ) == false, "" ); 275 CHECK_MESSAGE( remove_predecessor( s, s2 ) == false, "" ); 276 CHECK_MESSAGE( s.try_get( j ) == false, "" ); 277 CHECK_MESSAGE( j == bogus_value, "" ); 278 279 // 280 // In-order simple puts and gets 281 // 282 283 for (int i = 0; i < N; ++i) { 284 bool msg = s.try_put( T(i) ); 285 CHECK_MESSAGE( msg == true, "" ); 286 CHECK_MESSAGE(!s.try_put( T(i) ), ""); // second attempt to put should reject 287 } 288 289 290 for (int i = 0; i < N; ++i) { 291 j = bogus_value; 292 CHECK_MESSAGE(wait_try_get( g, s, j ) == true, ""); 293 CHECK_MESSAGE( i == j, "" ); 294 CHECK_MESSAGE(!s.try_put( T(i) ),"" ); // after retrieving value, subsequent put should fail 295 } 296 j = bogus_value; 297 g.wait_for_all(); 298 CHECK_MESSAGE( s.try_get( j ) == false, "" ); 299 CHECK_MESSAGE( j == bogus_value, "" ); 300 301 // 302 // Reverse-order simple puts and gets 303 // 304 305 for (int i = N-1; i >= 0; --i) { 306 bool msg = s2.try_put( T(i) ); 307 CHECK_MESSAGE( msg == true, "" ); 308 } 309 310 for (int i = 0; i < N; ++i) { 311 j = bogus_value; 312 CHECK_MESSAGE(wait_try_get( g, s2, j ) == true, ""); 313 CHECK_MESSAGE( i == j, "" ); 314 } 315 j = bogus_value; 316 g.wait_for_all(); 317 CHECK_MESSAGE( s2.try_get( j ) == false, "" ); 318 CHECK_MESSAGE( j == bogus_value, "" ); 319 320 // 321 // Chained in-order simple puts and gets 322 // 323 324 tbb::flow::sequencer_node<T> s3(g, seq_inspector<T>()); 325 tbb::flow::sequencer_node<T> s4(g, seq_inspector<T>()); 326 tbb::flow::sequencer_node<T> s5(g, seq_inspector<T>()); 327 tbb::flow::make_edge( s3, s4 ); 328 tbb::flow::make_edge( s4, s5 ); 329 330 for (int i = 0; i < N; ++i) { 331 bool msg = s3.try_put( T(i) ); 332 CHECK_MESSAGE( msg == true, "" ); 333 } 334 335 for (int i = 0; i < N; ++i) { 336 j = bogus_value; 337 CHECK_MESSAGE(wait_try_get( g, s5, j ) == true, ""); 338 CHECK_MESSAGE( i == j, "" ); 339 } 340 j = bogus_value; 341 CHECK_MESSAGE( wait_try_get( g, s3, j ) == false, "" ); 342 CHECK_MESSAGE( wait_try_get( g, s4, j ) == false, "" ); 343 CHECK_MESSAGE( wait_try_get( g, s5, j ) == false, "" ); 344 CHECK_MESSAGE( j == bogus_value, "" ); 345 346 g.wait_for_all(); 347 tbb::flow::remove_edge( s3, s4 ); 348 CHECK_MESSAGE( s3.try_put( N ) == true, "" ); 349 CHECK_MESSAGE( wait_try_get( g, s4, j ) == false, "" ); 350 CHECK_MESSAGE( j == bogus_value, "" ); 351 CHECK_MESSAGE( wait_try_get( g, s5, j ) == false, "" ); 352 CHECK_MESSAGE( j == bogus_value, "" ); 353 CHECK_MESSAGE( wait_try_get( g, s3, j ) == true, "" ); 354 CHECK_MESSAGE( j == N, "" ); 355 356 // 357 // Chained reverse-order simple puts and gets 358 // 359 360 tbb::flow::sequencer_node<T> s6(g, seq_inspector<T>()); 361 tbb::flow::sequencer_node<T> s7(g, seq_inspector<T>()); 362 tbb::flow::sequencer_node<T> s8(g, seq_inspector<T>()); 363 tbb::flow::make_edge( s6, s7 ); 364 tbb::flow::make_edge( s7, s8 ); 365 366 for (int i = N-1; i >= 0; --i) { 367 bool msg = s6.try_put( T(i) ); 368 CHECK_MESSAGE( msg == true, "" ); 369 } 370 371 for (int i = 0; i < N; ++i) { 372 j = bogus_value; 373 CHECK_MESSAGE( wait_try_get( g, s8, j ) == true, "" ); 374 CHECK_MESSAGE( i == j, "" ); 375 } 376 j = bogus_value; 377 CHECK_MESSAGE( wait_try_get( g, s6, j ) == false, "" ); 378 CHECK_MESSAGE( wait_try_get( g, s7, j ) == false, "" ); 379 CHECK_MESSAGE( wait_try_get( g, s8, j ) == false, "" ); 380 CHECK_MESSAGE( j == bogus_value, "" ); 381 382 g.wait_for_all(); 383 tbb::flow::remove_edge( s6, s7 ); 384 CHECK_MESSAGE( s6.try_put( N ) == true, "" ); 385 CHECK_MESSAGE( wait_try_get( g, s7, j ) == false, "" ); 386 CHECK_MESSAGE( j == bogus_value, "" ); 387 CHECK_MESSAGE( wait_try_get( g, s8, j ) == false, "" ); 388 CHECK_MESSAGE( j == bogus_value, "" ); 389 CHECK_MESSAGE( wait_try_get( g, s6, j ) == true, "" ); 390 CHECK_MESSAGE( j == N, "" ); 391 392 return 0; 393 } 394 395 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 396 #include <array> 397 #include <vector> 398 void test_follows_and_precedes_api() { 399 std::array<int, 3> messages_for_follows = { {0, 1, 2} }; 400 std::vector<int> messages_for_precedes = {0, 1, 2}; 401 402 follows_and_precedes_testing::test_follows 403 <int, tbb::flow::sequencer_node<int>> 404 (messages_for_follows, [](const int& i) -> std::size_t { return i; }); 405 406 follows_and_precedes_testing::test_precedes 407 <int, tbb::flow::sequencer_node<int>> 408 (messages_for_precedes, [](const int& i) -> std::size_t { return i; }); 409 } 410 #endif 411 412 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT 413 template <typename Body> 414 void test_deduction_guides_common(Body body) { 415 using namespace tbb::flow; 416 graph g; 417 broadcast_node<int> br(g); 418 419 sequencer_node s1(g, body); 420 static_assert(std::is_same_v<decltype(s1), sequencer_node<int>>); 421 422 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 423 sequencer_node s2(follows(br), body); 424 static_assert(std::is_same_v<decltype(s2), sequencer_node<int>>); 425 #endif 426 427 sequencer_node s3(s1); 428 static_assert(std::is_same_v<decltype(s3), sequencer_node<int>>); 429 } 430 431 std::size_t sequencer_body_f(const int&) { return 1; } 432 433 void test_deduction_guides() { 434 test_deduction_guides_common([](const int&)->std::size_t { return 1; }); 435 test_deduction_guides_common([](const int&) mutable ->std::size_t { return 1; }); 436 test_deduction_guides_common(sequencer_body_f); 437 } 438 #endif 439 440 //! Test sequencer with various request orders and parallelism levels 441 //! \brief \ref requirement \ref error_guessing 442 TEST_CASE("Serial and parallel test"){ 443 for (int p = 2; p <= 4; ++p) { 444 tbb::global_control thread_limit(tbb::global_control::max_allowed_parallelism, p); 445 tbb::task_arena arena(p); 446 arena.execute( 447 [&]() { 448 test_serial<int>(); 449 test_parallel<int>(p); 450 } 451 ); 452 } 453 } 454 455 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 456 //! Test decution guides 457 //! \brief \ref requirement 458 TEST_CASE("Test follows and precedes API"){ 459 test_follows_and_precedes_api(); 460 } 461 #endif 462 463 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT 464 //! Test decution guides 465 //! \brief \ref requirement 466 TEST_CASE("Test deduction guides"){ 467 test_deduction_guides(); 468 } 469 #endif 470 471 #if __TBB_CPP20_CONCEPTS_PRESENT 472 //! \brief \ref error_guessing 473 TEST_CASE("constraints for sequencer_node object") { 474 struct Object : test_concepts::Copyable, test_concepts::CopyAssignable {}; 475 476 static_assert(utils::well_formed_instantiation<tbb::flow::sequencer_node, Object>); 477 static_assert(utils::well_formed_instantiation<tbb::flow::sequencer_node, int>); 478 static_assert(!utils::well_formed_instantiation<tbb::flow::sequencer_node, test_concepts::NonCopyable>); 479 static_assert(!utils::well_formed_instantiation<tbb::flow::sequencer_node, test_concepts::NonCopyAssignable>); 480 } 481 482 template <typename T, typename Sequencer> 483 concept can_call_sequencer_node_ctor = requires( tbb::flow::graph& graph, Sequencer seq, 484 tbb::flow::buffer_node<int>& f ) { 485 tbb::flow::sequencer_node<T>(graph, seq); 486 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 487 tbb::flow::sequencer_node<T>(tbb::flow::follows(f), seq); 488 #endif 489 }; 490 491 //! \brief \ref error_guessing 492 TEST_CASE("constraints for sequencer_node sequencer") { 493 using type = int; 494 using namespace test_concepts::sequencer; 495 496 static_assert(can_call_sequencer_node_ctor<type, Correct<type>>); 497 static_assert(!can_call_sequencer_node_ctor<type, NonCopyable<type>>); 498 static_assert(!can_call_sequencer_node_ctor<type, NonDestructible<type>>); 499 static_assert(!can_call_sequencer_node_ctor<type, NoOperatorRoundBrackets<type>>); 500 static_assert(!can_call_sequencer_node_ctor<type, WrongInputOperatorRoundBrackets<type>>); 501 static_assert(!can_call_sequencer_node_ctor<type, WrongReturnOperatorRoundBrackets<type>>); 502 } 503 #endif // __TBB_CPP20_CONCEPTS_PRESENT 504