1 /* 2 Copyright (c) 2005-2022 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 /** @file harness_graph.cpp 18 This contains common helper classes and functions for testing graph nodes 19 **/ 20 21 #ifndef __TBB_harness_graph_H 22 #define __TBB_harness_graph_H 23 24 #include "config.h" 25 26 #include "oneapi/tbb/flow_graph.h" 27 #include "oneapi/tbb/task.h" 28 #include "oneapi/tbb/null_rw_mutex.h" 29 #include "oneapi/tbb/concurrent_unordered_set.h" 30 31 #include <atomic> 32 #include <thread> 33 #include <mutex> 34 #include <condition_variable> 35 36 #include "common/spin_barrier.h" 37 38 using tbb::detail::d1::SUCCESSFULLY_ENQUEUED; 39 40 // Needed conversion to and from continue_msg, but didn't want to add 41 // conversion operators to the class, since we don't want it in general, 42 // only in these tests. 43 template<typename InputType, typename OutputType> 44 struct converter { convert_valueconverter45 static OutputType convert_value(const InputType &i) { 46 return OutputType(i); 47 } 48 }; 49 50 template<typename InputType> 51 struct converter<InputType,tbb::flow::continue_msg> { 52 static tbb::flow::continue_msg convert_value(const InputType &/*i*/) { 53 return tbb::flow::continue_msg(); 54 } 55 }; 56 57 template<typename OutputType> 58 struct converter<tbb::flow::continue_msg,OutputType> { 59 static OutputType convert_value(const tbb::flow::continue_msg &/*i*/) { 60 return OutputType(); 61 } 62 }; 63 64 // helper for multifunction_node tests. 65 template<size_t N> 66 struct mof_helper { 67 template<typename InputType, typename ports_type> 68 static inline void output_converted_value(const InputType &i, ports_type &p) { 69 (void)std::get<N-1>(p).try_put(converter<InputType,typename std::tuple_element<N-1,ports_type>::type::output_type>::convert_value(i)); 70 output_converted_value<N-1>(i, p); 71 } 72 }; 73 74 template<> 75 struct mof_helper<1> { 76 template<typename InputType, typename ports_type> 77 static inline void output_converted_value(const InputType &i, ports_type &p) { 78 // just emit a default-constructed object 79 (void)std::get<0>(p).try_put(converter<InputType,typename std::tuple_element<0,ports_type>::type::output_type>::convert_value(i)); 80 } 81 }; 82 83 template< typename InputType, typename OutputType > 84 struct harness_graph_default_functor { 85 static OutputType construct( InputType v ) { 86 return OutputType(v); 87 } 88 }; 89 90 template< typename OutputType > 91 struct harness_graph_default_functor< tbb::flow::continue_msg, OutputType > { 92 static OutputType construct( tbb::flow::continue_msg ) { 93 return OutputType(); 94 } 95 }; 96 97 template< typename InputType > 98 struct harness_graph_default_functor< InputType, tbb::flow::continue_msg > { 99 static tbb::flow::continue_msg construct( InputType ) { 100 return tbb::flow::continue_msg(); 101 } 102 }; 103 104 template< > 105 struct harness_graph_default_functor< tbb::flow::continue_msg, tbb::flow::continue_msg > { 106 static tbb::flow::continue_msg construct( tbb::flow::continue_msg ) { 107 return tbb::flow::continue_msg(); 108 } 109 }; 110 111 template<typename InputType, typename OutputSet> 112 struct harness_graph_default_multifunction_functor { 113 static const int N = std::tuple_size<OutputSet>::value; 114 typedef typename tbb::flow::multifunction_node<InputType,OutputSet>::output_ports_type ports_type; 115 static void construct(const InputType &i, ports_type &p) { 116 mof_helper<N>::output_converted_value(i, p); 117 } 118 }; 119 120 //! An executor that accepts InputType and generates OutputType 121 template< typename InputType, typename OutputType > 122 struct harness_graph_executor { 123 124 typedef OutputType (*function_ptr_type)( InputType v ); 125 126 template<typename RW> 127 struct mutex_holder { static RW mutex; }; 128 129 static function_ptr_type fptr; 130 static std::atomic<size_t> execute_count; 131 static std::atomic<size_t> current_executors; 132 static size_t max_executors; 133 134 static inline OutputType func( InputType v ) { 135 size_t c; // Declaration separate from initialization to avoid ICC internal error on IA-64 architecture 136 c = current_executors++; 137 if (max_executors != 0) { 138 CHECK(c <= max_executors); 139 } 140 ++execute_count; 141 OutputType v2 = (*fptr)(v); 142 --current_executors; 143 return v2; 144 } 145 146 template< typename RW > 147 static inline OutputType tfunc( InputType v ) { 148 // Invocations allowed to be concurrent, the lock is acquired in shared ("read") mode. 149 // A test can take it exclusively, thus creating a barrier for invocations. 150 typename RW::scoped_lock l( mutex_holder<RW>::mutex, /*write=*/false ); 151 return func(v); 152 } 153 154 template< typename RW > 155 struct tfunctor { 156 std::atomic<size_t> my_execute_count; 157 tfunctor() { my_execute_count = 0; } 158 tfunctor( const tfunctor &f ) { my_execute_count = static_cast<size_t>(f.my_execute_count); } 159 OutputType operator()( InputType i ) { 160 typename RW::scoped_lock l( harness_graph_executor::mutex_holder<RW>::mutex, /*write=*/false ); 161 ++my_execute_count; 162 return harness_graph_executor::func(i); 163 } 164 }; 165 typedef tfunctor<tbb::null_rw_mutex> functor; 166 167 }; 168 169 //! A multifunction executor that accepts InputType and has only one Output of OutputType. 170 template< typename InputType, typename OutputTuple > 171 struct harness_graph_multifunction_executor { 172 typedef typename tbb::flow::multifunction_node<InputType,OutputTuple>::output_ports_type ports_type; 173 typedef typename std::tuple_element<0,OutputTuple>::type OutputType; 174 175 typedef void (*mfunction_ptr_type)( const InputType& v, ports_type &p ); 176 177 template<typename RW> 178 struct mutex_holder { static RW mutex; }; 179 180 static mfunction_ptr_type fptr; 181 static std::atomic<size_t> execute_count; 182 static std::atomic<size_t> current_executors; 183 static size_t max_executors; 184 185 static inline void empty_func( const InputType&, ports_type& ) { 186 } 187 188 static inline void func( const InputType &v, ports_type &p ) { 189 size_t c; // Declaration separate from initialization to avoid ICC internal error on IA-64 architecture 190 c = current_executors++; 191 CHECK( (max_executors == 0 || c <= max_executors) ); 192 CHECK( (std::tuple_size<OutputTuple>::value == 1) ); 193 ++execute_count; 194 (*fptr)(v,p); 195 --current_executors; 196 } 197 198 template< typename RW > 199 static inline void tfunc( const InputType& v, ports_type &p ) { 200 // Shared lock in invocations, exclusive in a test; see a comment in harness_graph_executor. 201 typename RW::scoped_lock l( mutex_holder<RW>::mutex, /*write=*/false ); 202 func(v,p); 203 } 204 205 template< typename RW > 206 struct tfunctor { 207 std::atomic<size_t> my_execute_count; 208 tfunctor() { my_execute_count = 0; } 209 tfunctor( const tfunctor &f ) { my_execute_count = static_cast<size_t>(f.my_execute_count); } 210 void operator()( const InputType &i, ports_type &p ) { 211 typename RW::scoped_lock l( harness_graph_multifunction_executor::mutex_holder<RW>::mutex, /*write=*/false ); 212 ++my_execute_count; 213 harness_graph_multifunction_executor::func(i,p); 214 } 215 }; 216 typedef tfunctor<tbb::null_rw_mutex> functor; 217 218 }; 219 220 // static vars for function_node tests 221 template< typename InputType, typename OutputType > 222 template< typename RW > 223 RW harness_graph_executor<InputType, OutputType>::mutex_holder<RW>::mutex; 224 225 template< typename InputType, typename OutputType > 226 std::atomic<size_t> harness_graph_executor<InputType, OutputType>::execute_count; 227 228 template< typename InputType, typename OutputType > 229 typename harness_graph_executor<InputType, OutputType>::function_ptr_type harness_graph_executor<InputType, OutputType>::fptr 230 = harness_graph_default_functor< InputType, OutputType >::construct; 231 232 template< typename InputType, typename OutputType > 233 std::atomic<size_t> harness_graph_executor<InputType, OutputType>::current_executors; 234 235 template< typename InputType, typename OutputType > 236 size_t harness_graph_executor<InputType, OutputType>::max_executors = 0; 237 238 // static vars for multifunction_node tests 239 template< typename InputType, typename OutputTuple > 240 template< typename RW > 241 RW harness_graph_multifunction_executor<InputType, OutputTuple>::mutex_holder<RW>::mutex; 242 243 template< typename InputType, typename OutputTuple > 244 std::atomic<size_t> harness_graph_multifunction_executor<InputType, OutputTuple>::execute_count; 245 246 template< typename InputType, typename OutputTuple > 247 typename harness_graph_multifunction_executor<InputType, OutputTuple>::mfunction_ptr_type harness_graph_multifunction_executor<InputType, OutputTuple>::fptr 248 = harness_graph_default_multifunction_functor< InputType, OutputTuple >::construct; 249 250 template< typename InputType, typename OutputTuple > 251 std::atomic<size_t> harness_graph_multifunction_executor<InputType, OutputTuple>::current_executors; 252 253 template< typename InputType, typename OutputTuple > 254 size_t harness_graph_multifunction_executor<InputType, OutputTuple>::max_executors = 0; 255 256 //! Counts the number of puts received 257 template< typename T > 258 struct harness_counting_receiver : public tbb::flow::receiver<T> { 259 harness_counting_receiver& operator=(const harness_counting_receiver&) = delete; 260 261 std::atomic< size_t > my_count; 262 T max_value; 263 size_t num_copies; 264 tbb::flow::graph& my_graph; 265 266 harness_counting_receiver(tbb::flow::graph& g) : num_copies(1), my_graph(g) { 267 my_count = 0; 268 } 269 270 void initialize_map( const T& m, size_t c ) { 271 my_count = 0; 272 max_value = m; 273 num_copies = c; 274 } 275 276 tbb::flow::graph& graph_reference() const override { 277 return my_graph; 278 } 279 280 tbb::detail::d1::graph_task *try_put_task( const T & ) override { 281 ++my_count; 282 return const_cast<tbb::detail::d1::graph_task*>(SUCCESSFULLY_ENQUEUED); 283 } 284 285 void validate() { 286 size_t n = my_count; 287 CHECK( n == num_copies*max_value ); 288 } 289 }; 290 291 //! Counts the number of puts received 292 template< typename T > 293 struct harness_mapped_receiver : public tbb::flow::receiver<T> { 294 harness_mapped_receiver(const harness_mapped_receiver&) = delete; 295 harness_mapped_receiver& operator=(const harness_mapped_receiver&) = delete; 296 297 std::atomic< size_t > my_count; 298 T max_value; 299 size_t num_copies; 300 typedef tbb::concurrent_unordered_multiset<T> multiset_type; 301 multiset_type *my_multiset; 302 tbb::flow::graph& my_graph; 303 304 harness_mapped_receiver(tbb::flow::graph& g) : my_multiset(nullptr), my_graph(g) { 305 my_count = 0; 306 } 307 308 #if __INTEL_COMPILER <= 2021 309 // Suppress superfluous diagnostic about virtual keyword absence in a destructor of an inherited 310 // class while the parent class has the virtual keyword for the destrocutor. 311 virtual 312 #endif 313 ~harness_mapped_receiver() { 314 delete my_multiset; 315 my_multiset = nullptr; 316 } 317 318 void initialize_map( const T& m, size_t c ) { 319 my_count = 0; 320 max_value = m; 321 num_copies = c; 322 delete my_multiset; 323 my_multiset = new multiset_type; 324 } 325 326 tbb::detail::d1::graph_task* try_put_task( const T &t ) override { 327 if ( my_multiset ) { 328 (*my_multiset).emplace( t ); 329 } else { 330 ++my_count; 331 } 332 return const_cast<tbb::detail::d1::graph_task*>(SUCCESSFULLY_ENQUEUED); 333 } 334 335 tbb::flow::graph& graph_reference() const override { 336 return my_graph; 337 } 338 339 void validate() { 340 if ( my_multiset ) { 341 for ( size_t i = 0; i < (size_t)max_value; ++i ) { 342 auto it = (*my_multiset).find((int)i); 343 CHECK_MESSAGE( it != my_multiset->end(), "Expected element in the map." ); 344 size_t n = (*my_multiset).count(int(i)); 345 CHECK( n == num_copies ); 346 } 347 } else { 348 size_t n = my_count; 349 CHECK( n == num_copies*max_value ); 350 } 351 } 352 353 void reset_receiver(tbb::flow::reset_flags /*f*/) { 354 my_count = 0; 355 if(my_multiset) delete my_multiset; 356 my_multiset = new multiset_type; 357 } 358 359 }; 360 361 //! Counts the number of puts received 362 template< typename T > 363 struct harness_counting_sender : public tbb::flow::sender<T> { 364 harness_counting_sender(const harness_counting_sender&) = delete; 365 harness_counting_sender& operator=(const harness_counting_sender&) = delete; 366 367 typedef typename tbb::flow::sender<T>::successor_type successor_type; 368 std::atomic< successor_type * > my_receiver; 369 std::atomic< size_t > my_count; 370 std::atomic< size_t > my_received; 371 size_t my_limit; 372 373 harness_counting_sender( ) : my_limit(~size_t(0)) { 374 my_receiver = nullptr; 375 my_count = 0; 376 my_received = 0; 377 } 378 379 harness_counting_sender( size_t limit ) : my_limit(limit) { 380 my_receiver = nullptr; 381 my_count = 0; 382 my_received = 0; 383 } 384 385 bool register_successor( successor_type &r ) override { 386 my_receiver = &r; 387 return true; 388 } 389 390 bool remove_successor( successor_type &r ) override { 391 successor_type *s = my_receiver.exchange( nullptr ); 392 CHECK( s == &r ); 393 return true; 394 } 395 396 bool try_get( T & v ) override { 397 size_t i = my_count++; 398 if ( i < my_limit ) { 399 v = T( i ); 400 ++my_received; 401 return true; 402 } else { 403 return false; 404 } 405 } 406 407 bool try_put_once() { 408 successor_type *s = my_receiver; 409 size_t i = my_count++; 410 if ( s->try_put( T(i) ) ) { 411 ++my_received; 412 return true; 413 } else { 414 return false; 415 } 416 } 417 418 void try_put_until_false() { 419 successor_type *s = my_receiver; 420 size_t i = my_count++; 421 422 while ( s->try_put( T(i) ) ) { 423 ++my_received; 424 i = my_count++; 425 } 426 } 427 428 void try_put_until_limit() { 429 successor_type *s = my_receiver; 430 431 for ( int i = 0; i < (int)my_limit; ++i ) { 432 CHECK( s->try_put( T(i) ) ); 433 ++my_received; 434 } 435 CHECK( my_received == my_limit ); 436 } 437 438 }; 439 440 template< typename InputType > 441 struct parallel_put_until_limit { 442 parallel_put_until_limit& operator=(const parallel_put_until_limit&) = delete; 443 444 typedef std::vector< std::shared_ptr<harness_counting_sender<InputType>> > senders_t; 445 446 senders_t& my_senders; 447 448 parallel_put_until_limit( senders_t& senders ) : my_senders(senders) {} 449 450 void operator()( int i ) const { my_senders[i]->try_put_until_limit(); } 451 }; 452 453 // test for resets of buffer-type nodes. 454 std::atomic<int> serial_fn_state0; 455 std::atomic<int> serial_fn_state1; 456 std::atomic<int> serial_continue_state0; 457 458 template<typename T> 459 struct serial_fn_body { 460 std::atomic<int>& my_flag; 461 serial_fn_body(std::atomic<int>& flag) : my_flag(flag) { } 462 T operator()(const T& in) { 463 if (my_flag == 0) { 464 my_flag = 1; 465 466 // wait until we are released 467 utils::SpinWaitWhileEq(my_flag, 1); 468 } 469 return in; 470 } 471 }; 472 473 template<typename T> 474 struct serial_continue_body { 475 std::atomic<int>& my_flag; 476 serial_continue_body(std::atomic<int> &flag) : my_flag(flag) {} 477 T operator()(const tbb::flow::continue_msg& /*in*/) { 478 // signal we have received a value 479 my_flag = 1; 480 // wait until we are released 481 utils::SpinWaitWhileEq(my_flag, 1); 482 return (T)1; 483 } 484 }; 485 486 template<typename T, typename BufferType> 487 void test_resets() { 488 const int NN = 3; 489 bool nFound[NN]; 490 tbb::task_arena arena{4}; 491 arena.execute( 492 [&] { 493 tbb::task_group_context tgc; 494 tbb::flow::graph g(tgc); 495 BufferType b0(g); 496 tbb::flow::queue_node<T> q0(g); 497 T j{}; 498 499 // reset empties buffer 500 for(T i = 0; i < NN; ++i) { 501 b0.try_put(i); 502 nFound[(int)i] = false; 503 } 504 g.wait_for_all(); 505 g.reset(); 506 CHECK_MESSAGE(!b0.try_get(j), "reset did not empty buffer"); 507 508 // reset doesn't delete edge 509 510 tbb::flow::make_edge(b0,q0); 511 g.wait_for_all(); // TODO: invesigate why make_edge to buffer_node always creates a forwarding task 512 g.reset(); 513 for(T i = 0; i < NN; ++i) { 514 b0.try_put(i); 515 } 516 517 g.wait_for_all(); 518 for( T i = 0; i < NN; ++i) { 519 CHECK_MESSAGE(q0.try_get(j), "Missing value from buffer"); 520 CHECK_MESSAGE(!nFound[(int)j], "Duplicate value found"); 521 nFound[(int)j] = true; 522 } 523 524 for(int ii = 0; ii < NN; ++ii) { 525 CHECK_MESSAGE(nFound[ii], "missing value"); 526 } 527 CHECK_MESSAGE(!q0.try_get(j), "Extra values in output"); 528 529 // reset reverses a reversed edge. 530 // we will use a serial rejecting node to get the edge to reverse. 531 tbb::flow::function_node<T, T, tbb::flow::rejecting> sfn(g, tbb::flow::serial, serial_fn_body<T>(serial_fn_state0)); 532 tbb::flow::queue_node<T> outq(g); 533 tbb::flow::remove_edge(b0,q0); 534 tbb::flow::make_edge(b0, sfn); 535 tbb::flow::make_edge(sfn,outq); 536 g.wait_for_all(); // wait for all the tasks started by building the graph are done. 537 serial_fn_state0 = 0; 538 539 // b0 ------> sfn ------> outq 540 for(int icnt = 0; icnt < 2; ++icnt) { 541 g.wait_for_all(); 542 serial_fn_state0 = 0; 543 std::thread t([&] { 544 b0.try_put((T)0); // will start sfn 545 g.wait_for_all(); // wait for all the tasks to complete. 546 }); 547 // wait until function_node starts 548 utils::SpinWaitWhileEq(serial_fn_state0, 0); 549 // now the function_node is executing. 550 // this will start a task to forward the second item 551 // to the serial function node 552 b0.try_put((T)1); // first item will be consumed by task completing the execution 553 b0.try_put((T)2); // second item will remain after cancellation 554 // now wait for the task that attempts to forward the buffer item to 555 // complete. 556 // now cancel the graph. 557 CHECK_MESSAGE(tgc.cancel_group_execution(), "task group already cancelled"); 558 serial_fn_state0 = 0; // release the function_node. 559 t.join(); 560 // check that at most one output reached the queue_node 561 T outt; 562 T outt2; 563 bool got_item1 = outq.try_get(outt); 564 bool got_item2 = outq.try_get(outt2); 565 // either the output queue was empty (if the function_node tested for cancellation before putting the 566 // result to the queue) or there was one element in the queue (the 0). 567 bool is_successful_operation = got_item1 && (int)outt == 0 && !got_item2; 568 CHECK_MESSAGE( is_successful_operation, "incorrect output from function_node"); 569 // the edge between the buffer and the function_node should be reversed, and the last 570 // message we put in the buffer should still be there. We can't directly test for the 571 // edge reversal. 572 got_item1 = b0.try_get(outt); 573 CHECK_MESSAGE(got_item1, " buffer lost a message"); 574 is_successful_operation = (2 == (int)outt || 1 == (int)outt); 575 CHECK_MESSAGE(is_successful_operation, " buffer had incorrect message"); // the one not consumed by the node. 576 CHECK_MESSAGE(g.is_cancelled(), "Graph was not cancelled"); 577 g.reset(); 578 } // icnt 579 580 // reset with remove_edge removes edge. (icnt ==0 => forward edge, 1 => reversed edge 581 for(int icnt = 0; icnt < 2; ++icnt) { 582 if(icnt == 1) { 583 // set up reversed edge 584 tbb::flow::make_edge(b0, sfn); 585 tbb::flow::make_edge(sfn,outq); 586 serial_fn_state0 = 0; 587 std::thread t([&] { 588 b0.try_put((T)0); // starts up the function node 589 b0.try_put((T)1); // should reverse the edge 590 g.wait_for_all(); // wait for all the tasks to complete. 591 }); 592 utils::SpinWaitWhileEq(serial_fn_state0, 0); // waiting for edge reversal 593 CHECK_MESSAGE(tgc.cancel_group_execution(), "task group already cancelled"); 594 serial_fn_state0 = 0; // release the function_node. 595 t.join(); 596 } 597 g.reset(tbb::flow::rf_clear_edges); 598 // test that no one is a successor to the buffer now. 599 serial_fn_state0 = 1; // let the function_node go if it gets an input message 600 b0.try_put((T)23); 601 g.wait_for_all(); 602 CHECK_MESSAGE((int)serial_fn_state0 == 1, "function_node executed when it shouldn't"); 603 T outt; 604 bool is_successful_operation = b0.try_get(outt) && (T)23 == outt && !outq.try_get(outt); 605 CHECK_MESSAGE(is_successful_operation, "node lost its input"); 606 } 607 } 608 ); // arena.execute() 609 } 610 611 template<typename NodeType> 612 void test_input_ports_return_ref(NodeType& mip_node) { 613 typename NodeType::input_ports_type& input_ports1 = mip_node.input_ports(); 614 typename NodeType::input_ports_type& input_ports2 = mip_node.input_ports(); 615 CHECK_MESSAGE(&input_ports1 == &input_ports2, "input_ports() should return reference"); 616 } 617 618 template<typename NodeType> 619 void test_output_ports_return_ref(NodeType& mop_node) { 620 typename NodeType::output_ports_type& output_ports1 = mop_node.output_ports(); 621 typename NodeType::output_ports_type& output_ports2 = mop_node.output_ports(); 622 CHECK_MESSAGE(&output_ports1 == &output_ports2, "output_ports() should return reference"); 623 } 624 625 template< template <typename> class ReservingNodeType, typename DataType, bool DoClear > 626 class harness_reserving_body { 627 harness_reserving_body& operator=(const harness_reserving_body&) = delete; 628 ReservingNodeType<DataType> &my_reserving_node; 629 tbb::flow::buffer_node<DataType> &my_buffer_node; 630 public: 631 harness_reserving_body(ReservingNodeType<DataType> &reserving_node, tbb::flow::buffer_node<DataType> &bn) : my_reserving_node(reserving_node), my_buffer_node(bn) {} 632 void operator()(DataType i) const { 633 my_reserving_node.try_put(i); 634 #if _MSC_VER && !__INTEL_COMPILER 635 #pragma warning (push) 636 #pragma warning (disable: 4127) /* suppress conditional expression is constant */ 637 #endif 638 if (DoClear) { 639 #if _MSC_VER && !__INTEL_COMPILER 640 #pragma warning (pop) 641 #endif 642 my_reserving_node.clear(); 643 } 644 my_buffer_node.try_put(i); 645 my_reserving_node.try_put(i); 646 } 647 }; 648 649 template< template <typename> class ReservingNodeType, typename DataType > 650 void test_reserving_nodes() { 651 #if TBB_TEST_LOW_WORKLOAD 652 const int N = 30; 653 #else 654 const int N = 300; 655 #endif 656 657 tbb::flow::graph g; 658 659 ReservingNodeType<DataType> reserving_n(g); 660 661 tbb::flow::buffer_node<DataType> buffering_n(g); 662 tbb::flow::join_node< std::tuple<DataType, DataType>, tbb::flow::reserving > join_n(g); 663 harness_counting_receiver< std::tuple<DataType, DataType> > end_receiver(g); 664 665 tbb::flow::make_edge(reserving_n, tbb::flow::input_port<0>(join_n)); 666 tbb::flow::make_edge(buffering_n, tbb::flow::input_port<1>(join_n)); 667 tbb::flow::make_edge(join_n, end_receiver); 668 669 utils::NativeParallelFor(N, harness_reserving_body<ReservingNodeType, DataType, false>(reserving_n, buffering_n)); 670 g.wait_for_all(); 671 672 CHECK(end_receiver.my_count == N); 673 674 // Should not hang 675 utils::NativeParallelFor(N, harness_reserving_body<ReservingNodeType, DataType, true>(reserving_n, buffering_n)); 676 g.wait_for_all(); 677 678 CHECK(end_receiver.my_count == 2 * N); 679 } 680 681 namespace lightweight_testing { 682 683 typedef std::tuple<int, int> output_tuple_type; 684 685 template<typename NodeType> 686 class native_loop_body { 687 native_loop_body& operator=(const native_loop_body&) = delete; 688 NodeType& my_node; 689 public: 690 native_loop_body(NodeType& node) : my_node(node) {} 691 692 void operator()(int) const noexcept { 693 std::thread::id this_id = std::this_thread::get_id(); 694 my_node.try_put(this_id); 695 } 696 }; 697 698 std::atomic<unsigned> g_body_count; 699 700 class concurrency_checker_body { 701 public: 702 concurrency_checker_body() { g_body_count = 0; } 703 704 template<typename gateway_type> 705 void operator()(const std::thread::id& input, gateway_type&) noexcept { increase_and_check(input); } 706 707 output_tuple_type operator()(const std::thread::id& input) noexcept { 708 increase_and_check(input); 709 return output_tuple_type(); 710 } 711 712 private: 713 void increase_and_check(const std::thread::id& input) { 714 ++g_body_count; 715 std::thread::id body_thread_id = std::this_thread::get_id(); 716 CHECK_MESSAGE(input == body_thread_id, "Body executed as not lightweight"); 717 } 718 }; 719 720 template<typename NodeType> 721 void test_unlimited_lightweight_execution(unsigned N) { 722 tbb::flow::graph g; 723 NodeType node(g, tbb::flow::unlimited, concurrency_checker_body()); 724 725 utils::NativeParallelFor(N, native_loop_body<NodeType>(node)); 726 g.wait_for_all(); 727 728 CHECK_MESSAGE(g_body_count == N, "Body needs to be executed N times"); 729 } 730 731 std::mutex m; 732 std::condition_variable lightweight_condition; 733 std::atomic<bool> work_submitted; 734 std::atomic<bool> lightweight_work_processed; 735 736 template<typename NodeType> 737 class native_loop_limited_body { 738 native_loop_limited_body& operator=(const native_loop_limited_body&) = delete; 739 NodeType& my_node; 740 utils::SpinBarrier& my_barrier; 741 public: 742 native_loop_limited_body(NodeType& node, utils::SpinBarrier& barrier): 743 my_node(node), my_barrier(barrier) {} 744 void operator()(int) const noexcept { 745 std::thread::id this_id = std::this_thread::get_id(); 746 my_node.try_put(this_id); 747 if(!lightweight_work_processed) { 748 my_barrier.wait(); 749 work_submitted = true; 750 lightweight_condition.notify_all(); 751 } 752 } 753 }; 754 755 struct condition_predicate { 756 bool operator()() { 757 return work_submitted; 758 } 759 }; 760 761 std::atomic<unsigned> g_lightweight_count; 762 std::atomic<unsigned> g_task_count; 763 764 template <bool NoExcept> 765 class limited_lightweight_checker_body { 766 public: 767 limited_lightweight_checker_body() { 768 g_body_count = 0; 769 g_lightweight_count = 0; 770 g_task_count = 0; 771 } 772 private: 773 void increase_and_check(const std::thread::id& /*input*/) { 774 ++g_body_count; 775 776 bool is_inside_task = oneapi::tbb::task::current_context() != nullptr; 777 778 if(is_inside_task) { 779 ++g_task_count; 780 } else { 781 std::unique_lock<std::mutex> lock(m); 782 lightweight_condition.wait(lock, condition_predicate()); 783 ++g_lightweight_count; 784 lightweight_work_processed = true; 785 } 786 } 787 public: 788 template<typename gateway_type> 789 void operator()(const std::thread::id& input, gateway_type&) noexcept(NoExcept) { 790 increase_and_check(input); 791 } 792 output_tuple_type operator()(const std::thread::id& input) noexcept(NoExcept) { 793 increase_and_check(input); 794 return output_tuple_type(); 795 } 796 }; 797 798 template<typename NodeType> 799 void test_limited_lightweight_execution(unsigned N, unsigned concurrency) { 800 CHECK_MESSAGE(concurrency != tbb::flow::unlimited, 801 "Test for limited concurrency cannot be called with unlimited concurrency argument"); 802 tbb::flow::graph g; 803 NodeType node(g, concurrency, limited_lightweight_checker_body</*NoExcept*/true>()); 804 // Execute first body as lightweight, then wait for all other threads to fill internal buffer. 805 // Then unblock the lightweight thread and check if other body executions are inside oneTBB task. 806 utils::SpinBarrier barrier(N - concurrency); 807 utils::NativeParallelFor(N, native_loop_limited_body<NodeType>(node, barrier)); 808 g.wait_for_all(); 809 CHECK_MESSAGE(g_body_count == N, "Body needs to be executed N times"); 810 CHECK_MESSAGE(g_lightweight_count == concurrency, "Body needs to be executed as lightweight once"); 811 CHECK_MESSAGE(g_task_count == N - concurrency, "Body needs to be executed as not lightweight N - 1 times"); 812 work_submitted = false; 813 lightweight_work_processed = false; 814 } 815 816 template<typename NodeType> 817 void test_limited_lightweight_execution_with_throwing_body(unsigned N, unsigned concurrency) { 818 CHECK_MESSAGE(concurrency != tbb::flow::unlimited, 819 "Test for limited concurrency cannot be called with unlimited concurrency argument"); 820 tbb::flow::graph g; 821 NodeType node(g, concurrency, limited_lightweight_checker_body</*NoExcept*/false>()); 822 // Body is no noexcept, in this case it must be executed as tasks, instead of lightweight execution 823 utils::SpinBarrier barrier(N); 824 utils::NativeParallelFor(N, native_loop_limited_body<NodeType>(node, barrier)); 825 g.wait_for_all(); 826 CHECK_MESSAGE(g_body_count == N, "Body needs to be executed N times"); 827 CHECK_MESSAGE(g_lightweight_count == 0, "Body needs to be executed with queueing policy"); 828 CHECK_MESSAGE(g_task_count == N, "Body needs to be executed as task N times"); 829 work_submitted = false; 830 lightweight_work_processed = false; 831 } 832 833 template <int Threshold> 834 struct throwing_body{ 835 std::atomic<int>& my_counter; 836 837 throwing_body(std::atomic<int>& counter) : my_counter(counter) {} 838 839 template<typename input_type, typename gateway_type> 840 void operator()(const input_type&, gateway_type&) { 841 ++my_counter; 842 if(my_counter == Threshold) 843 throw Threshold; 844 } 845 846 template<typename input_type> 847 output_tuple_type operator()(const input_type&) { 848 ++my_counter; 849 if(my_counter == Threshold) 850 throw Threshold; 851 return output_tuple_type(); 852 } 853 }; 854 855 #if TBB_USE_EXCEPTIONS 856 //! Test excesption thrown in node with lightweight policy was rethrown by graph 857 template<template<typename, typename, typename> class NodeType> 858 void test_exception_ligthweight_policy(){ 859 std::atomic<int> counter {0}; 860 constexpr int threshold = 10; 861 862 using IndexerNodeType = oneapi::tbb::flow::indexer_node<int, int>; 863 using FuncNodeType = NodeType<IndexerNodeType::output_type, output_tuple_type, tbb::flow::lightweight>; 864 oneapi::tbb::flow::graph g; 865 866 IndexerNodeType indexer(g); 867 FuncNodeType tested_node(g, oneapi::tbb::flow::serial, throwing_body<threshold>(counter)); 868 oneapi::tbb::flow::make_edge(indexer, tested_node); 869 870 utils::NativeParallelFor( threshold * 2, [&](int i){ 871 if(i % 2) 872 std::get<1>(indexer.input_ports()).try_put(1); 873 else 874 std::get<0>(indexer.input_ports()).try_put(0); 875 }); 876 877 bool catchException = false; 878 try 879 { 880 g.wait_for_all(); 881 } 882 catch (const int& exc) 883 { 884 catchException = true; 885 CHECK_MESSAGE( exc == threshold, "graph.wait_for_all() rethrow current exception" ); 886 } 887 CHECK_MESSAGE( catchException, "The exception must be thrown from graph.wait_for_all()" ); 888 CHECK_MESSAGE( counter == threshold, "Graph must cancel all tasks after exception" ); 889 } 890 #endif /* TBB_USE_EXCEPTIONS */ 891 892 template<typename NodeType> 893 void test_lightweight(unsigned N) { 894 test_unlimited_lightweight_execution<NodeType>(N); 895 test_limited_lightweight_execution<NodeType>(N, tbb::flow::serial); 896 test_limited_lightweight_execution<NodeType>(N, (std::min)(std::thread::hardware_concurrency() / 2, N/2)); 897 898 test_limited_lightweight_execution_with_throwing_body<NodeType>(N, tbb::flow::serial); 899 } 900 901 template<template<typename, typename, typename> class NodeType> 902 void test(unsigned N) { 903 typedef std::thread::id input_type; 904 typedef NodeType<input_type, output_tuple_type, tbb::flow::queueing_lightweight> node_type; 905 test_lightweight<node_type>(N); 906 907 #if TBB_USE_EXCEPTIONS 908 test_exception_ligthweight_policy<NodeType>(); 909 #endif /* TBB_USE_EXCEPTIONS */ 910 } 911 912 } // namespace lightweight_testing 913 914 #endif // __TBB_harness_graph_H 915