1 /* 2 Copyright (c) 2005-2021 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #include "common/config.h" 18 19 #include "tbb/flow_graph.h" 20 21 #include "common/test.h" 22 #include "common/utils.h" 23 #include "common/utils_assert.h" 24 #include "common/test_follows_and_precedes_api.h" 25 #include "common/concepts_common.h" 26 27 #include <cstdio> 28 #include <atomic> 29 30 31 //! \file test_sequencer_node.cpp 32 //! \brief Test for [flow_graph.sequencer_node] specification 33 34 35 #define N 1000 36 #define C 10 37 38 template< typename T > 39 struct seq_inspector { 40 size_t operator()(const T &v) const { return size_t(v); } 41 }; 42 43 template< typename T > 44 bool wait_try_get( tbb::flow::graph &g, tbb::flow::sequencer_node<T> &q, T &value ) { 45 g.wait_for_all(); 46 return q.try_get(value); 47 } 48 49 template< typename T > 50 void spin_try_get( tbb::flow::queue_node<T> &q, T &value ) { 51 while ( q.try_get(value) != true ) ; 52 } 53 54 template< typename T > 55 struct parallel_puts : utils::NoAssign { 56 57 tbb::flow::sequencer_node<T> &my_q; 58 int my_num_threads; 59 60 parallel_puts( tbb::flow::sequencer_node<T> &q, int num_threads ) : my_q(q), my_num_threads(num_threads) {} 61 62 void operator()(int tid) const { 63 for (int j = tid; j < N; j+=my_num_threads) { 64 bool msg = my_q.try_put( T(j) ); 65 CHECK_MESSAGE( msg == true, "" ); 66 } 67 } 68 69 }; 70 71 template< typename T > 72 struct touches { 73 74 bool **my_touches; 75 T *my_last_touch; 76 int my_num_threads; 77 78 touches( int num_threads ) : my_num_threads(num_threads) { 79 my_last_touch = new T[my_num_threads]; 80 my_touches = new bool* [my_num_threads]; 81 for ( int p = 0; p < my_num_threads; ++p) { 82 my_last_touch[p] = T(-1); 83 my_touches[p] = new bool[N]; 84 for ( int n = 0; n < N; ++n) 85 my_touches[p][n] = false; 86 } 87 } 88 89 ~touches() { 90 for ( int p = 0; p < my_num_threads; ++p) { 91 delete [] my_touches[p]; 92 } 93 delete [] my_touches; 94 delete [] my_last_touch; 95 } 96 97 bool check( int tid, T v ) { 98 if ( my_touches[tid][v] != false ) { 99 printf("Error: value seen twice by local thread\n"); 100 return false; 101 } 102 if ( v <= my_last_touch[tid] ) { 103 printf("Error: value seen in wrong order by local thread\n"); 104 return false; 105 } 106 my_last_touch[tid] = v; 107 my_touches[tid][v] = true; 108 return true; 109 } 110 111 bool validate_touches() { 112 bool *all_touches = new bool[N]; 113 for ( int n = 0; n < N; ++n) 114 all_touches[n] = false; 115 116 for ( int p = 0; p < my_num_threads; ++p) { 117 for ( int n = 0; n < N; ++n) { 118 if ( my_touches[p][n] == true ) { 119 CHECK_MESSAGE( ( all_touches[n] == false), "value see by more than one thread\n" ); 120 all_touches[n] = true; 121 } 122 } 123 } 124 for ( int n = 0; n < N; ++n) { 125 if ( !all_touches[n] ) 126 printf("No touch at %d, my_num_threads = %d\n", n, my_num_threads); 127 //CHECK_MESSAGE( ( all_touches[n] == true), "value not seen by any thread\n" ); 128 } 129 delete [] all_touches; 130 return true; 131 } 132 133 }; 134 135 template< typename T > 136 struct parallel_gets : utils::NoAssign { 137 138 tbb::flow::sequencer_node<T> &my_q; 139 int my_num_threads; 140 touches<T> &my_touches; 141 142 parallel_gets( tbb::flow::sequencer_node<T> &q, int num_threads, touches<T> &t ) : my_q(q), my_num_threads(num_threads), my_touches(t) {} 143 144 void operator()(int tid) const { 145 for (int j = tid; j < N; j+=my_num_threads) { 146 T v; 147 spin_try_get( my_q, v ); 148 my_touches.check( tid, v ); 149 } 150 } 151 152 }; 153 154 template< typename T > 155 struct parallel_put_get : utils::NoAssign { 156 157 tbb::flow::sequencer_node<T> &my_s1; 158 tbb::flow::sequencer_node<T> &my_s2; 159 int my_num_threads; 160 std::atomic< int > &my_counter; 161 touches<T> &my_touches; 162 163 parallel_put_get( tbb::flow::sequencer_node<T> &s1, tbb::flow::sequencer_node<T> &s2, int num_threads, 164 std::atomic<int> &counter, touches<T> &t ) : my_s1(s1), my_s2(s2), my_num_threads(num_threads), my_counter(counter), my_touches(t) {} 165 166 void operator()(int tid) const { 167 int i_start = 0; 168 169 while ( (i_start = my_counter.fetch_add(C)) < N ) { 170 int i_end = ( N < i_start + C ) ? N : i_start + C; 171 for (int i = i_start; i < i_end; ++i) { 172 bool msg = my_s1.try_put( T(i) ); 173 CHECK_MESSAGE( msg == true, "" ); 174 } 175 176 for (int i = i_start; i < i_end; ++i) { 177 T v; 178 spin_try_get( my_s2, v ); 179 my_touches.check( tid, v ); 180 } 181 } 182 } 183 184 }; 185 186 // 187 // Tests 188 // 189 // multiple parallel senders, multiple receivers, properly sequenced (relative to receiver) at output 190 // chained sequencers, multiple parallel senders, multiple receivers, properly sequenced (relative to receiver) at output 191 // 192 193 template< typename T > 194 int test_parallel(int num_threads) { 195 tbb::flow::graph g; 196 197 tbb::flow::sequencer_node<T> s(g, seq_inspector<T>()); 198 utils::NativeParallelFor( num_threads, parallel_puts<T>(s, num_threads) ); 199 { 200 touches<T> t( num_threads ); 201 utils::NativeParallelFor( num_threads, parallel_gets<T>(s, num_threads, t) ); 202 g.wait_for_all(); 203 CHECK_MESSAGE( t.validate_touches(), "" ); 204 } 205 T bogus_value(-1); 206 T j = bogus_value; 207 CHECK_MESSAGE( s.try_get( j ) == false, "" ); 208 CHECK_MESSAGE( j == bogus_value, "" ); 209 g.wait_for_all(); 210 211 tbb::flow::sequencer_node<T> s1(g, seq_inspector<T>()); 212 tbb::flow::sequencer_node<T> s2(g, seq_inspector<T>()); 213 tbb::flow::sequencer_node<T> s3(g, seq_inspector<T>()); 214 tbb::flow::make_edge( s1, s2 ); 215 tbb::flow::make_edge( s2, s3 ); 216 217 { 218 touches<T> t( num_threads ); 219 std::atomic<int> counter; 220 counter = 0; 221 utils::NativeParallelFor( num_threads, parallel_put_get<T>(s1, s3, num_threads, counter, t) ); 222 g.wait_for_all(); 223 t.validate_touches(); 224 } 225 g.wait_for_all(); 226 CHECK_MESSAGE( s1.try_get( j ) == false, "" ); 227 g.wait_for_all(); 228 CHECK_MESSAGE( s2.try_get( j ) == false, "" ); 229 g.wait_for_all(); 230 CHECK_MESSAGE( s3.try_get( j ) == false, "" ); 231 CHECK_MESSAGE( j == bogus_value, "" ); 232 233 // test copy constructor 234 tbb::flow::sequencer_node<T> s_copy(s); 235 utils::NativeParallelFor( num_threads, parallel_puts<T>(s_copy, num_threads) ); 236 for (int i = 0; i < N; ++i) { 237 j = bogus_value; 238 spin_try_get( s_copy, j ); 239 CHECK_MESSAGE( i == j, "" ); 240 } 241 j = bogus_value; 242 g.wait_for_all(); 243 CHECK_MESSAGE( s_copy.try_get( j ) == false, "" ); 244 CHECK_MESSAGE( j == bogus_value, "" ); 245 246 return 0; 247 } 248 249 250 // 251 // Tests 252 // 253 // No predecessors can be registered 254 // Request from empty buffer fails 255 // In-order puts, single sender, single receiver, properly sequenced at output 256 // Reverse-order puts, single sender, single receiver, properly sequenced at output 257 // Chained sequencers (3), in-order and reverse-order tests, properly sequenced at output 258 // 259 260 template< typename T > 261 int test_serial() { 262 tbb::flow::graph g; 263 T bogus_value(-1); 264 265 tbb::flow::sequencer_node<T> s(g, seq_inspector<T>()); 266 tbb::flow::sequencer_node<T> s2(g, seq_inspector<T>()); 267 T j = bogus_value; 268 269 // 270 // Rejects attempts to add / remove predecessor 271 // Rejects request from empty Q 272 // 273 CHECK_MESSAGE( register_predecessor( s, s2 ) == false, "" ); 274 CHECK_MESSAGE( remove_predecessor( s, s2 ) == false, "" ); 275 CHECK_MESSAGE( s.try_get( j ) == false, "" ); 276 CHECK_MESSAGE( j == bogus_value, "" ); 277 278 // 279 // In-order simple puts and gets 280 // 281 282 for (int i = 0; i < N; ++i) { 283 bool msg = s.try_put( T(i) ); 284 CHECK_MESSAGE( msg == true, "" ); 285 CHECK_MESSAGE(!s.try_put( T(i) ), ""); // second attempt to put should reject 286 } 287 288 289 for (int i = 0; i < N; ++i) { 290 j = bogus_value; 291 CHECK_MESSAGE(wait_try_get( g, s, j ) == true, ""); 292 CHECK_MESSAGE( i == j, "" ); 293 CHECK_MESSAGE(!s.try_put( T(i) ),"" ); // after retrieving value, subsequent put should fail 294 } 295 j = bogus_value; 296 g.wait_for_all(); 297 CHECK_MESSAGE( s.try_get( j ) == false, "" ); 298 CHECK_MESSAGE( j == bogus_value, "" ); 299 300 // 301 // Reverse-order simple puts and gets 302 // 303 304 for (int i = N-1; i >= 0; --i) { 305 bool msg = s2.try_put( T(i) ); 306 CHECK_MESSAGE( msg == true, "" ); 307 } 308 309 for (int i = 0; i < N; ++i) { 310 j = bogus_value; 311 CHECK_MESSAGE(wait_try_get( g, s2, j ) == true, ""); 312 CHECK_MESSAGE( i == j, "" ); 313 } 314 j = bogus_value; 315 g.wait_for_all(); 316 CHECK_MESSAGE( s2.try_get( j ) == false, "" ); 317 CHECK_MESSAGE( j == bogus_value, "" ); 318 319 // 320 // Chained in-order simple puts and gets 321 // 322 323 tbb::flow::sequencer_node<T> s3(g, seq_inspector<T>()); 324 tbb::flow::sequencer_node<T> s4(g, seq_inspector<T>()); 325 tbb::flow::sequencer_node<T> s5(g, seq_inspector<T>()); 326 tbb::flow::make_edge( s3, s4 ); 327 tbb::flow::make_edge( s4, s5 ); 328 329 for (int i = 0; i < N; ++i) { 330 bool msg = s3.try_put( T(i) ); 331 CHECK_MESSAGE( msg == true, "" ); 332 } 333 334 for (int i = 0; i < N; ++i) { 335 j = bogus_value; 336 CHECK_MESSAGE(wait_try_get( g, s5, j ) == true, ""); 337 CHECK_MESSAGE( i == j, "" ); 338 } 339 j = bogus_value; 340 CHECK_MESSAGE( wait_try_get( g, s3, j ) == false, "" ); 341 CHECK_MESSAGE( wait_try_get( g, s4, j ) == false, "" ); 342 CHECK_MESSAGE( wait_try_get( g, s5, j ) == false, "" ); 343 CHECK_MESSAGE( j == bogus_value, "" ); 344 345 g.wait_for_all(); 346 tbb::flow::remove_edge( s3, s4 ); 347 CHECK_MESSAGE( s3.try_put( N ) == true, "" ); 348 CHECK_MESSAGE( wait_try_get( g, s4, j ) == false, "" ); 349 CHECK_MESSAGE( j == bogus_value, "" ); 350 CHECK_MESSAGE( wait_try_get( g, s5, j ) == false, "" ); 351 CHECK_MESSAGE( j == bogus_value, "" ); 352 CHECK_MESSAGE( wait_try_get( g, s3, j ) == true, "" ); 353 CHECK_MESSAGE( j == N, "" ); 354 355 // 356 // Chained reverse-order simple puts and gets 357 // 358 359 tbb::flow::sequencer_node<T> s6(g, seq_inspector<T>()); 360 tbb::flow::sequencer_node<T> s7(g, seq_inspector<T>()); 361 tbb::flow::sequencer_node<T> s8(g, seq_inspector<T>()); 362 tbb::flow::make_edge( s6, s7 ); 363 tbb::flow::make_edge( s7, s8 ); 364 365 for (int i = N-1; i >= 0; --i) { 366 bool msg = s6.try_put( T(i) ); 367 CHECK_MESSAGE( msg == true, "" ); 368 } 369 370 for (int i = 0; i < N; ++i) { 371 j = bogus_value; 372 CHECK_MESSAGE( wait_try_get( g, s8, j ) == true, "" ); 373 CHECK_MESSAGE( i == j, "" ); 374 } 375 j = bogus_value; 376 CHECK_MESSAGE( wait_try_get( g, s6, j ) == false, "" ); 377 CHECK_MESSAGE( wait_try_get( g, s7, j ) == false, "" ); 378 CHECK_MESSAGE( wait_try_get( g, s8, j ) == false, "" ); 379 CHECK_MESSAGE( j == bogus_value, "" ); 380 381 g.wait_for_all(); 382 tbb::flow::remove_edge( s6, s7 ); 383 CHECK_MESSAGE( s6.try_put( N ) == true, "" ); 384 CHECK_MESSAGE( wait_try_get( g, s7, j ) == false, "" ); 385 CHECK_MESSAGE( j == bogus_value, "" ); 386 CHECK_MESSAGE( wait_try_get( g, s8, j ) == false, "" ); 387 CHECK_MESSAGE( j == bogus_value, "" ); 388 CHECK_MESSAGE( wait_try_get( g, s6, j ) == true, "" ); 389 CHECK_MESSAGE( j == N, "" ); 390 391 return 0; 392 } 393 394 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 395 #include <array> 396 #include <vector> 397 void test_follows_and_precedes_api() { 398 std::array<int, 3> messages_for_follows = { {0, 1, 2} }; 399 std::vector<int> messages_for_precedes = {0, 1, 2}; 400 401 follows_and_precedes_testing::test_follows 402 <int, tbb::flow::sequencer_node<int>> 403 (messages_for_follows, [](const int& i) -> std::size_t { return i; }); 404 405 follows_and_precedes_testing::test_precedes 406 <int, tbb::flow::sequencer_node<int>> 407 (messages_for_precedes, [](const int& i) -> std::size_t { return i; }); 408 } 409 #endif 410 411 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT 412 template <typename Body> 413 void test_deduction_guides_common(Body body) { 414 using namespace tbb::flow; 415 graph g; 416 broadcast_node<int> br(g); 417 418 sequencer_node s1(g, body); 419 static_assert(std::is_same_v<decltype(s1), sequencer_node<int>>); 420 421 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 422 sequencer_node s2(follows(br), body); 423 static_assert(std::is_same_v<decltype(s2), sequencer_node<int>>); 424 #endif 425 426 sequencer_node s3(s1); 427 static_assert(std::is_same_v<decltype(s3), sequencer_node<int>>); 428 } 429 430 std::size_t sequencer_body_f(const int&) { return 1; } 431 432 void test_deduction_guides() { 433 test_deduction_guides_common([](const int&)->std::size_t { return 1; }); 434 test_deduction_guides_common([](const int&) mutable ->std::size_t { return 1; }); 435 test_deduction_guides_common(sequencer_body_f); 436 } 437 #endif 438 439 //! Test sequencer with various request orders and parallelism levels 440 //! \brief \ref requirement \ref error_guessing 441 TEST_CASE("Serial and parallel test"){ 442 for (int p = 2; p <= 4; ++p) { 443 tbb::task_arena arena(p); 444 arena.execute( 445 [&]() { 446 test_serial<int>(); 447 test_parallel<int>(p); 448 } 449 ); 450 } 451 } 452 453 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 454 //! Test decution guides 455 //! \brief \ref requirement 456 TEST_CASE("Test follows and precedes API"){ 457 test_follows_and_precedes_api(); 458 } 459 #endif 460 461 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT 462 //! Test decution guides 463 //! \brief \ref requirement 464 TEST_CASE("Test deduction guides"){ 465 test_deduction_guides(); 466 } 467 #endif 468 469 #if __TBB_CPP20_CONCEPTS_PRESENT 470 //! \brief \ref error_guessing 471 TEST_CASE("constraints for sequencer_node object") { 472 struct Object : test_concepts::Copyable, test_concepts::CopyAssignable {}; 473 474 static_assert(utils::well_formed_instantiation<tbb::flow::sequencer_node, Object>); 475 static_assert(utils::well_formed_instantiation<tbb::flow::sequencer_node, int>); 476 static_assert(!utils::well_formed_instantiation<tbb::flow::sequencer_node, test_concepts::NonCopyable>); 477 static_assert(!utils::well_formed_instantiation<tbb::flow::sequencer_node, test_concepts::NonCopyAssignable>); 478 } 479 480 template <typename T, typename Sequencer> 481 concept can_call_sequencer_node_ctor = requires( tbb::flow::graph& graph, Sequencer seq, 482 tbb::flow::buffer_node<int>& f ) { 483 tbb::flow::sequencer_node<T>(graph, seq); 484 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 485 tbb::flow::sequencer_node<T>(tbb::flow::follows(f), seq); 486 #endif 487 }; 488 489 //! \brief \ref error_guessing 490 TEST_CASE("constraints for sequencer_node sequencer") { 491 using type = int; 492 using namespace test_concepts::sequencer; 493 494 static_assert(can_call_sequencer_node_ctor<type, Correct<type>>); 495 static_assert(!can_call_sequencer_node_ctor<type, NonCopyable<type>>); 496 static_assert(!can_call_sequencer_node_ctor<type, NonDestructible<type>>); 497 static_assert(!can_call_sequencer_node_ctor<type, NoOperatorRoundBrackets<type>>); 498 static_assert(!can_call_sequencer_node_ctor<type, WrongInputOperatorRoundBrackets<type>>); 499 static_assert(!can_call_sequencer_node_ctor<type, WrongReturnOperatorRoundBrackets<type>>); 500 } 501 #endif // __TBB_CPP20_CONCEPTS_PRESENT 502