1 /* 2 Copyright (c) 2005-2021 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 /** @file harness_graph.cpp 18 This contains common helper classes and functions for testing graph nodes 19 **/ 20 21 #ifndef __TBB_harness_graph_H 22 #define __TBB_harness_graph_H 23 24 #include "config.h" 25 26 #include "oneapi/tbb/flow_graph.h" 27 #include "oneapi/tbb/null_rw_mutex.h" 28 #include "oneapi/tbb/concurrent_unordered_set.h" 29 30 #include <atomic> 31 #include <thread> 32 #include <mutex> 33 #include <condition_variable> 34 35 #include "common/spin_barrier.h" 36 37 using tbb::detail::d1::SUCCESSFULLY_ENQUEUED; 38 39 // Needed conversion to and from continue_msg, but didn't want to add 40 // conversion operators to the class, since we don't want it in general, 41 // only in these tests. 42 template<typename InputType, typename OutputType> 43 struct converter { 44 static OutputType convert_value(const InputType &i) { 45 return OutputType(i); 46 } 47 }; 48 49 template<typename InputType> 50 struct converter<InputType,tbb::flow::continue_msg> { 51 static tbb::flow::continue_msg convert_value(const InputType &/*i*/) { 52 return tbb::flow::continue_msg(); 53 } 54 }; 55 56 template<typename OutputType> 57 struct converter<tbb::flow::continue_msg,OutputType> { 58 static OutputType convert_value(const tbb::flow::continue_msg &/*i*/) { 59 return OutputType(); 60 } 61 }; 62 63 // helper for multifunction_node tests. 64 template<size_t N> 65 struct mof_helper { 66 template<typename InputType, typename ports_type> 67 static inline void output_converted_value(const InputType &i, ports_type &p) { 68 (void)std::get<N-1>(p).try_put(converter<InputType,typename std::tuple_element<N-1,ports_type>::type::output_type>::convert_value(i)); 69 output_converted_value<N-1>(i, p); 70 } 71 }; 72 73 template<> 74 struct mof_helper<1> { 75 template<typename InputType, typename ports_type> 76 static inline void output_converted_value(const InputType &i, ports_type &p) { 77 // just emit a default-constructed object 78 (void)std::get<0>(p).try_put(converter<InputType,typename std::tuple_element<0,ports_type>::type::output_type>::convert_value(i)); 79 } 80 }; 81 82 template< typename InputType, typename OutputType > 83 struct harness_graph_default_functor { 84 static OutputType construct( InputType v ) { 85 return OutputType(v); 86 } 87 }; 88 89 template< typename OutputType > 90 struct harness_graph_default_functor< tbb::flow::continue_msg, OutputType > { 91 static OutputType construct( tbb::flow::continue_msg ) { 92 return OutputType(); 93 } 94 }; 95 96 template< typename InputType > 97 struct harness_graph_default_functor< InputType, tbb::flow::continue_msg > { 98 static tbb::flow::continue_msg construct( InputType ) { 99 return tbb::flow::continue_msg(); 100 } 101 }; 102 103 template< > 104 struct harness_graph_default_functor< tbb::flow::continue_msg, tbb::flow::continue_msg > { 105 static tbb::flow::continue_msg construct( tbb::flow::continue_msg ) { 106 return tbb::flow::continue_msg(); 107 } 108 }; 109 110 template<typename InputType, typename OutputSet> 111 struct harness_graph_default_multifunction_functor { 112 static const int N = std::tuple_size<OutputSet>::value; 113 typedef typename tbb::flow::multifunction_node<InputType,OutputSet>::output_ports_type ports_type; 114 static void construct(const InputType &i, ports_type &p) { 115 mof_helper<N>::output_converted_value(i, p); 116 } 117 }; 118 119 //! An executor that accepts InputType and generates OutputType 120 template< typename InputType, typename OutputType > 121 struct harness_graph_executor { 122 123 typedef OutputType (*function_ptr_type)( InputType v ); 124 125 template<typename RW> 126 struct mutex_holder { static RW mutex; }; 127 128 static function_ptr_type fptr; 129 static std::atomic<size_t> execute_count; 130 static std::atomic<size_t> current_executors; 131 static size_t max_executors; 132 133 static inline OutputType func( InputType v ) { 134 size_t c; // Declaration separate from initialization to avoid ICC internal error on IA-64 architecture 135 c = current_executors++; 136 if (max_executors != 0) { 137 CHECK(c <= max_executors); 138 } 139 ++execute_count; 140 OutputType v2 = (*fptr)(v); 141 --current_executors; 142 return v2; 143 } 144 145 template< typename RW > 146 static inline OutputType tfunc( InputType v ) { 147 // Invocations allowed to be concurrent, the lock is acquired in shared ("read") mode. 148 // A test can take it exclusively, thus creating a barrier for invocations. 149 typename RW::scoped_lock l( mutex_holder<RW>::mutex, /*write=*/false ); 150 return func(v); 151 } 152 153 template< typename RW > 154 struct tfunctor { 155 std::atomic<size_t> my_execute_count; 156 tfunctor() { my_execute_count = 0; } 157 tfunctor( const tfunctor &f ) { my_execute_count = static_cast<size_t>(f.my_execute_count); } 158 OutputType operator()( InputType i ) { 159 typename RW::scoped_lock l( harness_graph_executor::mutex_holder<RW>::mutex, /*write=*/false ); 160 ++my_execute_count; 161 return harness_graph_executor::func(i); 162 } 163 }; 164 typedef tfunctor<tbb::null_rw_mutex> functor; 165 166 }; 167 168 //! A multifunction executor that accepts InputType and has only one Output of OutputType. 169 template< typename InputType, typename OutputTuple > 170 struct harness_graph_multifunction_executor { 171 typedef typename tbb::flow::multifunction_node<InputType,OutputTuple>::output_ports_type ports_type; 172 typedef typename std::tuple_element<0,OutputTuple>::type OutputType; 173 174 typedef void (*mfunction_ptr_type)( const InputType& v, ports_type &p ); 175 176 template<typename RW> 177 struct mutex_holder { static RW mutex; }; 178 179 static mfunction_ptr_type fptr; 180 static std::atomic<size_t> execute_count; 181 static std::atomic<size_t> current_executors; 182 static size_t max_executors; 183 184 static inline void empty_func( const InputType&, ports_type& ) { 185 } 186 187 static inline void func( const InputType &v, ports_type &p ) { 188 size_t c; // Declaration separate from initialization to avoid ICC internal error on IA-64 architecture 189 c = current_executors++; 190 CHECK( (max_executors == 0 || c <= max_executors) ); 191 CHECK( (std::tuple_size<OutputTuple>::value == 1) ); 192 ++execute_count; 193 (*fptr)(v,p); 194 --current_executors; 195 } 196 197 template< typename RW > 198 static inline void tfunc( const InputType& v, ports_type &p ) { 199 // Shared lock in invocations, exclusive in a test; see a comment in harness_graph_executor. 200 typename RW::scoped_lock l( mutex_holder<RW>::mutex, /*write=*/false ); 201 func(v,p); 202 } 203 204 template< typename RW > 205 struct tfunctor { 206 std::atomic<size_t> my_execute_count; 207 tfunctor() { my_execute_count = 0; } 208 tfunctor( const tfunctor &f ) { my_execute_count = static_cast<size_t>(f.my_execute_count); } 209 void operator()( const InputType &i, ports_type &p ) { 210 typename RW::scoped_lock l( harness_graph_multifunction_executor::mutex_holder<RW>::mutex, /*write=*/false ); 211 ++my_execute_count; 212 harness_graph_multifunction_executor::func(i,p); 213 } 214 }; 215 typedef tfunctor<tbb::null_rw_mutex> functor; 216 217 }; 218 219 // static vars for function_node tests 220 template< typename InputType, typename OutputType > 221 template< typename RW > 222 RW harness_graph_executor<InputType, OutputType>::mutex_holder<RW>::mutex; 223 224 template< typename InputType, typename OutputType > 225 std::atomic<size_t> harness_graph_executor<InputType, OutputType>::execute_count; 226 227 template< typename InputType, typename OutputType > 228 typename harness_graph_executor<InputType, OutputType>::function_ptr_type harness_graph_executor<InputType, OutputType>::fptr 229 = harness_graph_default_functor< InputType, OutputType >::construct; 230 231 template< typename InputType, typename OutputType > 232 std::atomic<size_t> harness_graph_executor<InputType, OutputType>::current_executors; 233 234 template< typename InputType, typename OutputType > 235 size_t harness_graph_executor<InputType, OutputType>::max_executors = 0; 236 237 // static vars for multifunction_node tests 238 template< typename InputType, typename OutputTuple > 239 template< typename RW > 240 RW harness_graph_multifunction_executor<InputType, OutputTuple>::mutex_holder<RW>::mutex; 241 242 template< typename InputType, typename OutputTuple > 243 std::atomic<size_t> harness_graph_multifunction_executor<InputType, OutputTuple>::execute_count; 244 245 template< typename InputType, typename OutputTuple > 246 typename harness_graph_multifunction_executor<InputType, OutputTuple>::mfunction_ptr_type harness_graph_multifunction_executor<InputType, OutputTuple>::fptr 247 = harness_graph_default_multifunction_functor< InputType, OutputTuple >::construct; 248 249 template< typename InputType, typename OutputTuple > 250 std::atomic<size_t> harness_graph_multifunction_executor<InputType, OutputTuple>::current_executors; 251 252 template< typename InputType, typename OutputTuple > 253 size_t harness_graph_multifunction_executor<InputType, OutputTuple>::max_executors = 0; 254 255 //! Counts the number of puts received 256 template< typename T > 257 struct harness_counting_receiver : public tbb::flow::receiver<T> { 258 harness_counting_receiver& operator=(const harness_counting_receiver&) = delete; 259 260 std::atomic< size_t > my_count; 261 T max_value; 262 size_t num_copies; 263 tbb::flow::graph& my_graph; 264 265 harness_counting_receiver(tbb::flow::graph& g) : num_copies(1), my_graph(g) { 266 my_count = 0; 267 } 268 269 void initialize_map( const T& m, size_t c ) { 270 my_count = 0; 271 max_value = m; 272 num_copies = c; 273 } 274 275 tbb::flow::graph& graph_reference() const override { 276 return my_graph; 277 } 278 279 tbb::detail::d1::graph_task *try_put_task( const T & ) override { 280 ++my_count; 281 return const_cast<tbb::detail::d1::graph_task*>(SUCCESSFULLY_ENQUEUED); 282 } 283 284 void validate() { 285 size_t n = my_count; 286 CHECK( n == num_copies*max_value ); 287 } 288 }; 289 290 //! Counts the number of puts received 291 template< typename T > 292 struct harness_mapped_receiver : public tbb::flow::receiver<T> { 293 harness_mapped_receiver(const harness_mapped_receiver&) = delete; 294 harness_mapped_receiver& operator=(const harness_mapped_receiver&) = delete; 295 296 std::atomic< size_t > my_count; 297 T max_value; 298 size_t num_copies; 299 typedef tbb::concurrent_unordered_multiset<T> multiset_type; 300 multiset_type *my_multiset; 301 tbb::flow::graph& my_graph; 302 303 harness_mapped_receiver(tbb::flow::graph& g) : my_multiset(NULL), my_graph(g) { 304 my_count = 0; 305 } 306 307 #if __INTEL_COMPILER <= 2021 308 // Suppress superfluous diagnostic about virtual keyword absence in a destructor of an inherited 309 // class while the parent class has the virtual keyword for the destrocutor. 310 virtual 311 #endif 312 ~harness_mapped_receiver() { 313 if ( my_multiset ) delete my_multiset; 314 } 315 316 void initialize_map( const T& m, size_t c ) { 317 my_count = 0; 318 max_value = m; 319 num_copies = c; 320 if ( my_multiset ) delete my_multiset; 321 my_multiset = new multiset_type; 322 } 323 324 tbb::detail::d1::graph_task* try_put_task( const T &t ) override { 325 if ( my_multiset ) { 326 (*my_multiset).emplace( t ); 327 } else { 328 ++my_count; 329 } 330 return const_cast<tbb::detail::d1::graph_task*>(SUCCESSFULLY_ENQUEUED); 331 } 332 333 tbb::flow::graph& graph_reference() const override { 334 return my_graph; 335 } 336 337 void validate() { 338 if ( my_multiset ) { 339 for ( size_t i = 0; i < (size_t)max_value; ++i ) { 340 auto it = (*my_multiset).find((int)i); 341 CHECK_MESSAGE( it != my_multiset->end(), "Expected element in the map." ); 342 size_t n = (*my_multiset).count(int(i)); 343 CHECK( n == num_copies ); 344 } 345 } else { 346 size_t n = my_count; 347 CHECK( n == num_copies*max_value ); 348 } 349 } 350 351 void reset_receiver(tbb::flow::reset_flags /*f*/) { 352 my_count = 0; 353 if(my_multiset) delete my_multiset; 354 my_multiset = new multiset_type; 355 } 356 357 }; 358 359 //! Counts the number of puts received 360 template< typename T > 361 struct harness_counting_sender : public tbb::flow::sender<T> { 362 harness_counting_sender(const harness_counting_sender&) = delete; 363 harness_counting_sender& operator=(const harness_counting_sender&) = delete; 364 365 typedef typename tbb::flow::sender<T>::successor_type successor_type; 366 std::atomic< successor_type * > my_receiver; 367 std::atomic< size_t > my_count; 368 std::atomic< size_t > my_received; 369 size_t my_limit; 370 371 harness_counting_sender( ) : my_limit(~size_t(0)) { 372 my_receiver = NULL; 373 my_count = 0; 374 my_received = 0; 375 } 376 377 harness_counting_sender( size_t limit ) : my_limit(limit) { 378 my_receiver = NULL; 379 my_count = 0; 380 my_received = 0; 381 } 382 383 bool register_successor( successor_type &r ) override { 384 my_receiver = &r; 385 return true; 386 } 387 388 bool remove_successor( successor_type &r ) override { 389 successor_type *s = my_receiver.exchange( NULL ); 390 CHECK( s == &r ); 391 return true; 392 } 393 394 bool try_get( T & v ) override { 395 size_t i = my_count++; 396 if ( i < my_limit ) { 397 v = T( i ); 398 ++my_received; 399 return true; 400 } else { 401 return false; 402 } 403 } 404 405 bool try_put_once() { 406 successor_type *s = my_receiver; 407 size_t i = my_count++; 408 if ( s->try_put( T(i) ) ) { 409 ++my_received; 410 return true; 411 } else { 412 return false; 413 } 414 } 415 416 void try_put_until_false() { 417 successor_type *s = my_receiver; 418 size_t i = my_count++; 419 420 while ( s->try_put( T(i) ) ) { 421 ++my_received; 422 i = my_count++; 423 } 424 } 425 426 void try_put_until_limit() { 427 successor_type *s = my_receiver; 428 429 for ( int i = 0; i < (int)my_limit; ++i ) { 430 CHECK( s->try_put( T(i) ) ); 431 ++my_received; 432 } 433 CHECK( my_received == my_limit ); 434 } 435 436 }; 437 438 template< typename InputType > 439 struct parallel_put_until_limit { 440 parallel_put_until_limit& operator=(const parallel_put_until_limit&) = delete; 441 442 typedef std::vector< std::shared_ptr<harness_counting_sender<InputType>> > senders_t; 443 444 senders_t& my_senders; 445 446 parallel_put_until_limit( senders_t& senders ) : my_senders(senders) {} 447 448 void operator()( int i ) const { my_senders[i]->try_put_until_limit(); } 449 }; 450 451 // test for resets of buffer-type nodes. 452 std::atomic<int> serial_fn_state0; 453 std::atomic<int> serial_fn_state1; 454 std::atomic<int> serial_continue_state0; 455 456 template<typename T> 457 struct serial_fn_body { 458 std::atomic<int>& my_flag; 459 serial_fn_body(std::atomic<int>& flag) : my_flag(flag) { } 460 T operator()(const T& in) { 461 if (my_flag == 0) { 462 my_flag = 1; 463 464 // wait until we are released 465 utils::SpinWaitWhileEq(my_flag, 1); 466 } 467 return in; 468 } 469 }; 470 471 template<typename T> 472 struct serial_continue_body { 473 std::atomic<int>& my_flag; 474 serial_continue_body(std::atomic<int> &flag) : my_flag(flag) {} 475 T operator()(const tbb::flow::continue_msg& /*in*/) { 476 // signal we have received a value 477 my_flag = 1; 478 // wait until we are released 479 utils::SpinWaitWhileEq(my_flag, 1); 480 return (T)1; 481 } 482 }; 483 484 template<typename T, typename BufferType> 485 void test_resets() { 486 const int NN = 3; 487 bool nFound[NN]; 488 tbb::task_arena arena{4}; 489 arena.execute( 490 [&] { 491 tbb::task_group_context tgc; 492 tbb::flow::graph g(tgc); 493 BufferType b0(g); 494 tbb::flow::queue_node<T> q0(g); 495 T j{}; 496 497 // reset empties buffer 498 for(T i = 0; i < NN; ++i) { 499 b0.try_put(i); 500 nFound[(int)i] = false; 501 } 502 g.wait_for_all(); 503 g.reset(); 504 CHECK_MESSAGE(!b0.try_get(j), "reset did not empty buffer"); 505 506 // reset doesn't delete edge 507 508 tbb::flow::make_edge(b0,q0); 509 g.wait_for_all(); // TODO: invesigate why make_edge to buffer_node always creates a forwarding task 510 g.reset(); 511 for(T i = 0; i < NN; ++i) { 512 b0.try_put(i); 513 } 514 515 g.wait_for_all(); 516 for( T i = 0; i < NN; ++i) { 517 CHECK_MESSAGE(q0.try_get(j), "Missing value from buffer"); 518 CHECK_MESSAGE(!nFound[(int)j], "Duplicate value found"); 519 nFound[(int)j] = true; 520 } 521 522 for(int ii = 0; ii < NN; ++ii) { 523 CHECK_MESSAGE(nFound[ii], "missing value"); 524 } 525 CHECK_MESSAGE(!q0.try_get(j), "Extra values in output"); 526 527 // reset reverses a reversed edge. 528 // we will use a serial rejecting node to get the edge to reverse. 529 tbb::flow::function_node<T, T, tbb::flow::rejecting> sfn(g, tbb::flow::serial, serial_fn_body<T>(serial_fn_state0)); 530 tbb::flow::queue_node<T> outq(g); 531 tbb::flow::remove_edge(b0,q0); 532 tbb::flow::make_edge(b0, sfn); 533 tbb::flow::make_edge(sfn,outq); 534 g.wait_for_all(); // wait for all the tasks started by building the graph are done. 535 serial_fn_state0 = 0; 536 537 // b0 ------> sfn ------> outq 538 for(int icnt = 0; icnt < 2; ++icnt) { 539 g.wait_for_all(); 540 serial_fn_state0 = 0; 541 std::thread t([&] { 542 b0.try_put((T)0); // will start sfn 543 g.wait_for_all(); // wait for all the tasks to complete. 544 }); 545 // wait until function_node starts 546 utils::SpinWaitWhileEq(serial_fn_state0, 0); 547 // now the function_node is executing. 548 // this will start a task to forward the second item 549 // to the serial function node 550 b0.try_put((T)1); // first item will be consumed by task completing the execution 551 b0.try_put((T)2); // second item will remain after cancellation 552 // now wait for the task that attempts to forward the buffer item to 553 // complete. 554 // now cancel the graph. 555 CHECK_MESSAGE(tgc.cancel_group_execution(), "task group already cancelled"); 556 serial_fn_state0 = 0; // release the function_node. 557 t.join(); 558 // check that at most one output reached the queue_node 559 T outt; 560 T outt2; 561 bool got_item1 = outq.try_get(outt); 562 bool got_item2 = outq.try_get(outt2); 563 // either the output queue was empty (if the function_node tested for cancellation before putting the 564 // result to the queue) or there was one element in the queue (the 0). 565 bool is_successful_operation = got_item1 && (int)outt == 0 && !got_item2; 566 CHECK_MESSAGE( is_successful_operation, "incorrect output from function_node"); 567 // the edge between the buffer and the function_node should be reversed, and the last 568 // message we put in the buffer should still be there. We can't directly test for the 569 // edge reversal. 570 got_item1 = b0.try_get(outt); 571 CHECK_MESSAGE(got_item1, " buffer lost a message"); 572 is_successful_operation = (2 == (int)outt || 1 == (int)outt); 573 CHECK_MESSAGE(is_successful_operation, " buffer had incorrect message"); // the one not consumed by the node. 574 CHECK_MESSAGE(g.is_cancelled(), "Graph was not cancelled"); 575 g.reset(); 576 } // icnt 577 578 // reset with remove_edge removes edge. (icnt ==0 => forward edge, 1 => reversed edge 579 for(int icnt = 0; icnt < 2; ++icnt) { 580 if(icnt == 1) { 581 // set up reversed edge 582 tbb::flow::make_edge(b0, sfn); 583 tbb::flow::make_edge(sfn,outq); 584 serial_fn_state0 = 0; 585 std::thread t([&] { 586 b0.try_put((T)0); // starts up the function node 587 b0.try_put((T)1); // should reverse the edge 588 g.wait_for_all(); // wait for all the tasks to complete. 589 }); 590 utils::SpinWaitWhileEq(serial_fn_state0, 0); // waiting for edge reversal 591 CHECK_MESSAGE(tgc.cancel_group_execution(), "task group already cancelled"); 592 serial_fn_state0 = 0; // release the function_node. 593 t.join(); 594 } 595 g.reset(tbb::flow::rf_clear_edges); 596 // test that no one is a successor to the buffer now. 597 serial_fn_state0 = 1; // let the function_node go if it gets an input message 598 b0.try_put((T)23); 599 g.wait_for_all(); 600 CHECK_MESSAGE((int)serial_fn_state0 == 1, "function_node executed when it shouldn't"); 601 T outt; 602 bool is_successful_operation = b0.try_get(outt) && (T)23 == outt && !outq.try_get(outt); 603 CHECK_MESSAGE(is_successful_operation, "node lost its input"); 604 } 605 } 606 ); // arena.execute() 607 } 608 609 template<typename NodeType> 610 void test_input_ports_return_ref(NodeType& mip_node) { 611 typename NodeType::input_ports_type& input_ports1 = mip_node.input_ports(); 612 typename NodeType::input_ports_type& input_ports2 = mip_node.input_ports(); 613 CHECK_MESSAGE(&input_ports1 == &input_ports2, "input_ports() should return reference"); 614 } 615 616 template<typename NodeType> 617 void test_output_ports_return_ref(NodeType& mop_node) { 618 typename NodeType::output_ports_type& output_ports1 = mop_node.output_ports(); 619 typename NodeType::output_ports_type& output_ports2 = mop_node.output_ports(); 620 CHECK_MESSAGE(&output_ports1 == &output_ports2, "output_ports() should return reference"); 621 } 622 623 template< template <typename> class ReservingNodeType, typename DataType, bool DoClear > 624 class harness_reserving_body { 625 harness_reserving_body& operator=(const harness_reserving_body&) = delete; 626 ReservingNodeType<DataType> &my_reserving_node; 627 tbb::flow::buffer_node<DataType> &my_buffer_node; 628 public: 629 harness_reserving_body(ReservingNodeType<DataType> &reserving_node, tbb::flow::buffer_node<DataType> &bn) : my_reserving_node(reserving_node), my_buffer_node(bn) {} 630 void operator()(DataType i) const { 631 my_reserving_node.try_put(i); 632 #if _MSC_VER && !__INTEL_COMPILER 633 #pragma warning (push) 634 #pragma warning (disable: 4127) /* suppress conditional expression is constant */ 635 #endif 636 if (DoClear) { 637 #if _MSC_VER && !__INTEL_COMPILER 638 #pragma warning (pop) 639 #endif 640 my_reserving_node.clear(); 641 } 642 my_buffer_node.try_put(i); 643 my_reserving_node.try_put(i); 644 } 645 }; 646 647 template< template <typename> class ReservingNodeType, typename DataType > 648 void test_reserving_nodes() { 649 #if TBB_TEST_LOW_WORKLOAD 650 const int N = 30; 651 #else 652 const int N = 300; 653 #endif 654 655 tbb::flow::graph g; 656 657 ReservingNodeType<DataType> reserving_n(g); 658 659 tbb::flow::buffer_node<DataType> buffering_n(g); 660 tbb::flow::join_node< std::tuple<DataType, DataType>, tbb::flow::reserving > join_n(g); 661 harness_counting_receiver< std::tuple<DataType, DataType> > end_receiver(g); 662 663 tbb::flow::make_edge(reserving_n, tbb::flow::input_port<0>(join_n)); 664 tbb::flow::make_edge(buffering_n, tbb::flow::input_port<1>(join_n)); 665 tbb::flow::make_edge(join_n, end_receiver); 666 667 utils::NativeParallelFor(N, harness_reserving_body<ReservingNodeType, DataType, false>(reserving_n, buffering_n)); 668 g.wait_for_all(); 669 670 CHECK(end_receiver.my_count == N); 671 672 // Should not hang 673 utils::NativeParallelFor(N, harness_reserving_body<ReservingNodeType, DataType, true>(reserving_n, buffering_n)); 674 g.wait_for_all(); 675 676 CHECK(end_receiver.my_count == 2 * N); 677 } 678 679 namespace lightweight_testing { 680 681 typedef std::tuple<int, int> output_tuple_type; 682 683 template<typename NodeType> 684 class native_loop_body { 685 native_loop_body& operator=(const native_loop_body&) = delete; 686 NodeType& my_node; 687 public: 688 native_loop_body(NodeType& node) : my_node(node) {} 689 690 void operator()(int) const { 691 std::thread::id this_id = std::this_thread::get_id(); 692 my_node.try_put(this_id); 693 } 694 }; 695 696 std::atomic<unsigned> g_body_count; 697 698 class concurrency_checker_body { 699 public: 700 concurrency_checker_body() { g_body_count = 0; } 701 702 template<typename gateway_type> 703 void operator()(const std::thread::id& input, gateway_type&) { increase_and_check(input); } 704 705 output_tuple_type operator()(const std::thread::id& input) { 706 increase_and_check(input); 707 return output_tuple_type(); 708 } 709 710 private: 711 void increase_and_check(const std::thread::id& input) { 712 ++g_body_count; 713 std::thread::id body_thread_id = std::this_thread::get_id(); 714 CHECK_MESSAGE(input == body_thread_id, "Body executed as not lightweight"); 715 } 716 }; 717 718 template<typename NodeType> 719 void test_unlimited_lightweight_execution(unsigned N) { 720 tbb::flow::graph g; 721 NodeType node(g, tbb::flow::unlimited, concurrency_checker_body()); 722 723 utils::NativeParallelFor(N, native_loop_body<NodeType>(node)); 724 g.wait_for_all(); 725 726 CHECK_MESSAGE(g_body_count == N, "Body needs to be executed N times"); 727 } 728 729 std::mutex m; 730 std::condition_variable lightweight_condition; 731 std::atomic<bool> work_submitted; 732 std::atomic<bool> lightweight_work_processed; 733 734 template<typename NodeType> 735 class native_loop_limited_body { 736 native_loop_limited_body& operator=(const native_loop_limited_body&) = delete; 737 NodeType& my_node; 738 utils::SpinBarrier& my_barrier; 739 public: 740 native_loop_limited_body(NodeType& node, utils::SpinBarrier& barrier): 741 my_node(node), my_barrier(barrier) {} 742 void operator()(int) const { 743 std::thread::id this_id = std::this_thread::get_id(); 744 my_node.try_put(this_id); 745 if(!lightweight_work_processed) { 746 my_barrier.wait(); 747 work_submitted = true; 748 lightweight_condition.notify_all(); 749 } 750 } 751 }; 752 753 struct condition_predicate { 754 bool operator()() { 755 return work_submitted; 756 } 757 }; 758 759 std::atomic<unsigned> g_lightweight_count; 760 std::atomic<unsigned> g_task_count; 761 762 class limited_lightweight_checker_body { 763 public: 764 limited_lightweight_checker_body() { 765 g_body_count = 0; 766 g_lightweight_count = 0; 767 g_task_count = 0; 768 } 769 private: 770 void increase_and_check(const std::thread::id& /*input*/) { 771 ++g_body_count; 772 // TODO revamp: in order not to rely on scheduler functionality anymore add 773 // __TBB_EXTRA_DEBUG for counting the number of tasks actually created by the flow graph, 774 // hence consider moving lightweight testing into whitebox test for the flow graph. 775 bool is_inside_task = false;/*tbb::task::self().state() == tbb::task::executing;*/ 776 if(is_inside_task) { 777 ++g_task_count; 778 } else { 779 std::unique_lock<std::mutex> lock(m); 780 lightweight_condition.wait(lock, condition_predicate()); 781 ++g_lightweight_count; 782 lightweight_work_processed = true; 783 } 784 } 785 public: 786 template<typename gateway_type> 787 void operator()(const std::thread::id& input, gateway_type&) { 788 increase_and_check(input); 789 } 790 output_tuple_type operator()(const std::thread::id& input) { 791 increase_and_check(input); 792 return output_tuple_type(); 793 } 794 }; 795 796 template<typename NodeType> 797 void test_limited_lightweight_execution(unsigned N, unsigned concurrency) { 798 CHECK_MESSAGE(concurrency != tbb::flow::unlimited, 799 "Test for limited concurrency cannot be called with unlimited concurrency argument"); 800 tbb::flow::graph g; 801 NodeType node(g, concurrency, limited_lightweight_checker_body()); 802 // Execute first body as lightweight, then wait for all other threads to fill internal buffer. 803 // Then unblock the lightweight thread and check if other body executions are inside oneTBB task. 804 utils::SpinBarrier barrier(N - concurrency); 805 utils::NativeParallelFor(N, native_loop_limited_body<NodeType>(node, barrier)); 806 g.wait_for_all(); 807 CHECK_MESSAGE(g_body_count == N, "Body needs to be executed N times"); 808 // TODO revamp: enable the following checks once whitebox flow graph testing is ready for it. 809 // CHECK_MESSAGE(g_lightweight_count == concurrency, "Body needs to be executed as lightweight once"); 810 // CHECK_MESSAGE(g_task_count == N - concurrency, "Body needs to be executed as not lightweight N - 1 times"); 811 work_submitted = false; 812 lightweight_work_processed = false; 813 } 814 815 template<typename NodeType> 816 void test_lightweight(unsigned N) { 817 test_unlimited_lightweight_execution<NodeType>(N); 818 test_limited_lightweight_execution<NodeType>(N, tbb::flow::serial); 819 test_limited_lightweight_execution<NodeType>(N, (std::min)(std::thread::hardware_concurrency() / 2, N/2)); 820 } 821 822 template<template<typename, typename, typename> class NodeType> 823 void test(unsigned N) { 824 typedef std::thread::id input_type; 825 typedef NodeType<input_type, output_tuple_type, tbb::flow::queueing_lightweight> node_type; 826 test_lightweight<node_type>(N); 827 } 828 829 } // namespace lightweight_testing 830 831 #endif // __TBB_harness_graph_H 832