151c0b2f7Stbbdev /* 2b15aabb3Stbbdev Copyright (c) 2005-2021 Intel Corporation 351c0b2f7Stbbdev 451c0b2f7Stbbdev Licensed under the Apache License, Version 2.0 (the "License"); 551c0b2f7Stbbdev you may not use this file except in compliance with the License. 651c0b2f7Stbbdev You may obtain a copy of the License at 751c0b2f7Stbbdev 851c0b2f7Stbbdev http://www.apache.org/licenses/LICENSE-2.0 951c0b2f7Stbbdev 1051c0b2f7Stbbdev Unless required by applicable law or agreed to in writing, software 1151c0b2f7Stbbdev distributed under the License is distributed on an "AS IS" BASIS, 1251c0b2f7Stbbdev WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 1351c0b2f7Stbbdev See the License for the specific language governing permissions and 1451c0b2f7Stbbdev limitations under the License. 1551c0b2f7Stbbdev */ 1651c0b2f7Stbbdev 1751c0b2f7Stbbdev // have to expose the reset_node method to be able to reset a function_body 1851c0b2f7Stbbdev 1951c0b2f7Stbbdev #include "common/config.h" 2051c0b2f7Stbbdev 2151c0b2f7Stbbdev #include "tbb/flow_graph.h" 2251c0b2f7Stbbdev 2351c0b2f7Stbbdev #include "common/test.h" 2451c0b2f7Stbbdev #include "common/utils.h" 2551c0b2f7Stbbdev #include "common/utils_assert.h" 26478de5b1Stbbdev #include "common/concepts_common.h" 2751c0b2f7Stbbdev 2851c0b2f7Stbbdev 2951c0b2f7Stbbdev //! \file test_input_node.cpp 3051c0b2f7Stbbdev //! \brief Test for [flow_graph.input_node] specification 3151c0b2f7Stbbdev 3251c0b2f7Stbbdev 3351c0b2f7Stbbdev using tbb::detail::d1::graph_task; 3451c0b2f7Stbbdev using tbb::detail::d1::SUCCESSFULLY_ENQUEUED; 3551c0b2f7Stbbdev 3651c0b2f7Stbbdev const int N = 1000; 3751c0b2f7Stbbdev 3851c0b2f7Stbbdev template< typename T > 3951c0b2f7Stbbdev class test_push_receiver : public tbb::flow::receiver<T>, utils::NoAssign { 4051c0b2f7Stbbdev 4151c0b2f7Stbbdev std::atomic<int> my_counters[N]; 4251c0b2f7Stbbdev tbb::flow::graph& my_graph; 4351c0b2f7Stbbdev 4451c0b2f7Stbbdev public: 4551c0b2f7Stbbdev 4651c0b2f7Stbbdev test_push_receiver(tbb::flow::graph& g) : my_graph(g) { 4751c0b2f7Stbbdev for (int i = 0; i < N; ++i ) 4851c0b2f7Stbbdev my_counters[i] = 0; 4951c0b2f7Stbbdev } 5051c0b2f7Stbbdev 5151c0b2f7Stbbdev int get_count( int i ) { 5251c0b2f7Stbbdev int v = my_counters[i]; 5351c0b2f7Stbbdev return v; 5451c0b2f7Stbbdev } 5551c0b2f7Stbbdev 5651c0b2f7Stbbdev typedef typename tbb::flow::receiver<T>::predecessor_type predecessor_type; 5751c0b2f7Stbbdev 5851c0b2f7Stbbdev graph_task* try_put_task( const T &v ) override { 5951c0b2f7Stbbdev int i = (int)v; 6051c0b2f7Stbbdev ++my_counters[i]; 6151c0b2f7Stbbdev return const_cast<graph_task*>(SUCCESSFULLY_ENQUEUED); 6251c0b2f7Stbbdev } 6351c0b2f7Stbbdev 6451c0b2f7Stbbdev tbb::flow::graph& graph_reference() const override { 6551c0b2f7Stbbdev return my_graph; 6651c0b2f7Stbbdev } 6751c0b2f7Stbbdev }; 6851c0b2f7Stbbdev 6951c0b2f7Stbbdev template< typename T > 7051c0b2f7Stbbdev class my_input_body { 7151c0b2f7Stbbdev 7251c0b2f7Stbbdev unsigned my_count; 7351c0b2f7Stbbdev int *ninvocations; 7451c0b2f7Stbbdev 7551c0b2f7Stbbdev public: 7651c0b2f7Stbbdev 77*57f524caSIlya Isaev my_input_body() : ninvocations(nullptr) { my_count = 0; } 7851c0b2f7Stbbdev my_input_body(int &_inv) : ninvocations(&_inv) { my_count = 0; } 7951c0b2f7Stbbdev 8051c0b2f7Stbbdev T operator()( tbb::flow_control& fc ) { 8151c0b2f7Stbbdev T v = (T)my_count++; 8251c0b2f7Stbbdev if(ninvocations) ++(*ninvocations); 8351c0b2f7Stbbdev if ( (int)v < N ){ 8451c0b2f7Stbbdev return v; 8551c0b2f7Stbbdev }else{ 8651c0b2f7Stbbdev fc.stop(); 8751c0b2f7Stbbdev return T(); 8851c0b2f7Stbbdev } 8951c0b2f7Stbbdev } 9051c0b2f7Stbbdev 9151c0b2f7Stbbdev }; 9251c0b2f7Stbbdev 9351c0b2f7Stbbdev template< typename T > 9451c0b2f7Stbbdev class function_body { 9551c0b2f7Stbbdev 9651c0b2f7Stbbdev std::atomic<int> *my_counters; 9751c0b2f7Stbbdev 9851c0b2f7Stbbdev public: 9951c0b2f7Stbbdev 10051c0b2f7Stbbdev function_body( std::atomic<int> *counters ) : my_counters(counters) { 10151c0b2f7Stbbdev for (int i = 0; i < N; ++i ) 10251c0b2f7Stbbdev my_counters[i] = 0; 10351c0b2f7Stbbdev } 10451c0b2f7Stbbdev 10551c0b2f7Stbbdev bool operator()( T v ) { 10651c0b2f7Stbbdev ++my_counters[(int)v]; 10751c0b2f7Stbbdev return true; 10851c0b2f7Stbbdev } 10951c0b2f7Stbbdev 11051c0b2f7Stbbdev }; 11151c0b2f7Stbbdev 11251c0b2f7Stbbdev template< typename T > 11351c0b2f7Stbbdev void test_single_dest() { 11451c0b2f7Stbbdev // push only 11551c0b2f7Stbbdev tbb::flow::graph g; 11651c0b2f7Stbbdev tbb::flow::input_node<T> src(g, my_input_body<T>() ); 11751c0b2f7Stbbdev test_push_receiver<T> dest(g); 11851c0b2f7Stbbdev tbb::flow::make_edge( src, dest ); 11951c0b2f7Stbbdev src.activate(); 12051c0b2f7Stbbdev g.wait_for_all(); 12151c0b2f7Stbbdev for (int i = 0; i < N; ++i ) { 12251c0b2f7Stbbdev CHECK_MESSAGE( dest.get_count(i) == 1, "" ); 12351c0b2f7Stbbdev } 12451c0b2f7Stbbdev 12551c0b2f7Stbbdev // push only 12651c0b2f7Stbbdev std::atomic<int> counters3[N]; 12751c0b2f7Stbbdev tbb::flow::input_node<T> src3(g, my_input_body<T>() ); 12851c0b2f7Stbbdev src3.activate(); 12951c0b2f7Stbbdev 13051c0b2f7Stbbdev function_body<T> b3( counters3 ); 13151c0b2f7Stbbdev tbb::flow::function_node<T,bool> dest3(g, tbb::flow::unlimited, b3 ); 13251c0b2f7Stbbdev tbb::flow::make_edge( src3, dest3 ); 13351c0b2f7Stbbdev g.wait_for_all(); 13451c0b2f7Stbbdev for (int i = 0; i < N; ++i ) { 13551c0b2f7Stbbdev int v = counters3[i]; 13651c0b2f7Stbbdev CHECK_MESSAGE( v == 1, "" ); 13751c0b2f7Stbbdev } 13851c0b2f7Stbbdev 13951c0b2f7Stbbdev // push & pull 14051c0b2f7Stbbdev tbb::flow::input_node<T> src2(g, my_input_body<T>() ); 14151c0b2f7Stbbdev src2.activate(); 14251c0b2f7Stbbdev std::atomic<int> counters2[N]; 14351c0b2f7Stbbdev 14451c0b2f7Stbbdev function_body<T> b2( counters2 ); 14551c0b2f7Stbbdev tbb::flow::function_node<T,bool,tbb::flow::rejecting> dest2(g, tbb::flow::serial, b2 ); 14651c0b2f7Stbbdev tbb::flow::make_edge( src2, dest2 ); 14751c0b2f7Stbbdev g.wait_for_all(); 14851c0b2f7Stbbdev for (int i = 0; i < N; ++i ) { 14951c0b2f7Stbbdev int v = counters2[i]; 15051c0b2f7Stbbdev CHECK_MESSAGE( v == 1, "" ); 15151c0b2f7Stbbdev } 15251c0b2f7Stbbdev 15351c0b2f7Stbbdev // test copy constructor 15451c0b2f7Stbbdev tbb::flow::input_node<T> src_copy(src); 15551c0b2f7Stbbdev src_copy.activate(); 15651c0b2f7Stbbdev test_push_receiver<T> dest_c(g); 15751c0b2f7Stbbdev CHECK_MESSAGE( src_copy.register_successor(dest_c), "" ); 15851c0b2f7Stbbdev g.wait_for_all(); 15951c0b2f7Stbbdev for (int i = 0; i < N; ++i ) { 16051c0b2f7Stbbdev CHECK_MESSAGE( dest_c.get_count(i) == 1, "" ); 16151c0b2f7Stbbdev } 16251c0b2f7Stbbdev } 16351c0b2f7Stbbdev 16451c0b2f7Stbbdev void test_reset() { 16551c0b2f7Stbbdev // input_node -> function_node 16651c0b2f7Stbbdev tbb::flow::graph g; 16751c0b2f7Stbbdev std::atomic<int> counters3[N]; 16851c0b2f7Stbbdev tbb::flow::input_node<int> src3(g, my_input_body<int>()); 16951c0b2f7Stbbdev src3.activate(); 17051c0b2f7Stbbdev tbb::flow::input_node<int> src_inactive(g, my_input_body<int>()); 17151c0b2f7Stbbdev function_body<int> b3( counters3 ); 17251c0b2f7Stbbdev tbb::flow::function_node<int,bool> dest3(g, tbb::flow::unlimited, b3); 17351c0b2f7Stbbdev tbb::flow::make_edge( src3, dest3 ); 17451c0b2f7Stbbdev // source_node already in active state. Let the graph run, 17551c0b2f7Stbbdev g.wait_for_all(); 17651c0b2f7Stbbdev // check the array for each value. 17751c0b2f7Stbbdev for (int i = 0; i < N; ++i ) { 17851c0b2f7Stbbdev int v = counters3[i]; 17951c0b2f7Stbbdev CHECK_MESSAGE( v == 1, "" ); 18051c0b2f7Stbbdev counters3[i] = 0; 18151c0b2f7Stbbdev } 18251c0b2f7Stbbdev 18351c0b2f7Stbbdev g.reset(tbb::flow::rf_reset_bodies); // <-- re-initializes the counts. 18451c0b2f7Stbbdev // and spawns task to run input 18551c0b2f7Stbbdev src3.activate(); 18651c0b2f7Stbbdev 18751c0b2f7Stbbdev g.wait_for_all(); 18851c0b2f7Stbbdev // check output queue again. Should be the same contents. 18951c0b2f7Stbbdev for (int i = 0; i < N; ++i ) { 19051c0b2f7Stbbdev int v = counters3[i]; 19151c0b2f7Stbbdev CHECK_MESSAGE( v == 1, "" ); 19251c0b2f7Stbbdev counters3[i] = 0; 19351c0b2f7Stbbdev } 19451c0b2f7Stbbdev g.reset(); // doesn't reset the input_node_body to initial state, but does spawn a task 19551c0b2f7Stbbdev // to run the input_node. 19651c0b2f7Stbbdev 19751c0b2f7Stbbdev g.wait_for_all(); 19851c0b2f7Stbbdev // array should be all zero 19951c0b2f7Stbbdev for (int i = 0; i < N; ++i ) { 20051c0b2f7Stbbdev int v = counters3[i]; 20151c0b2f7Stbbdev CHECK_MESSAGE( v == 0, "" ); 20251c0b2f7Stbbdev } 20351c0b2f7Stbbdev 20451c0b2f7Stbbdev remove_edge(src3, dest3); 20551c0b2f7Stbbdev make_edge(src_inactive, dest3); 20651c0b2f7Stbbdev 20751c0b2f7Stbbdev // src_inactive doesn't run 20851c0b2f7Stbbdev g.wait_for_all(); 20951c0b2f7Stbbdev for (int i = 0; i < N; ++i ) { 21051c0b2f7Stbbdev int v = counters3[i]; 21151c0b2f7Stbbdev CHECK_MESSAGE( v == 0, "" ); 21251c0b2f7Stbbdev } 21351c0b2f7Stbbdev 21451c0b2f7Stbbdev // run graph 21551c0b2f7Stbbdev src_inactive.activate(); 21651c0b2f7Stbbdev g.wait_for_all(); 21751c0b2f7Stbbdev // check output 21851c0b2f7Stbbdev for (int i = 0; i < N; ++i ) { 21951c0b2f7Stbbdev int v = counters3[i]; 22051c0b2f7Stbbdev CHECK_MESSAGE( v == 1, "" ); 22151c0b2f7Stbbdev counters3[i] = 0; 22251c0b2f7Stbbdev } 22351c0b2f7Stbbdev g.reset(tbb::flow::rf_reset_bodies); // <-- reinitializes the counts 22451c0b2f7Stbbdev // src_inactive doesn't run 22551c0b2f7Stbbdev g.wait_for_all(); 22651c0b2f7Stbbdev for (int i = 0; i < N; ++i ) { 22751c0b2f7Stbbdev int v = counters3[i]; 22851c0b2f7Stbbdev CHECK_MESSAGE( v == 0, "" ); 22951c0b2f7Stbbdev } 23051c0b2f7Stbbdev 23151c0b2f7Stbbdev // start it up 23251c0b2f7Stbbdev src_inactive.activate(); 23351c0b2f7Stbbdev g.wait_for_all(); 23451c0b2f7Stbbdev for (int i = 0; i < N; ++i ) { 23551c0b2f7Stbbdev int v = counters3[i]; 23651c0b2f7Stbbdev CHECK_MESSAGE( v == 1, "" ); 23751c0b2f7Stbbdev counters3[i] = 0; 23851c0b2f7Stbbdev } 23951c0b2f7Stbbdev g.reset(); // doesn't reset the input_node_body to initial state, and doesn't 24051c0b2f7Stbbdev // spawn a task to run the input_node. 24151c0b2f7Stbbdev 24251c0b2f7Stbbdev g.wait_for_all(); 24351c0b2f7Stbbdev // array should be all zero 24451c0b2f7Stbbdev for (int i = 0; i < N; ++i ) { 24551c0b2f7Stbbdev int v = counters3[i]; 24651c0b2f7Stbbdev CHECK_MESSAGE( v == 0, "" ); 24751c0b2f7Stbbdev } 24851c0b2f7Stbbdev src_inactive.activate(); 24951c0b2f7Stbbdev // input_node_body is already in final state, so input_node will not forward a message. 25051c0b2f7Stbbdev g.wait_for_all(); 25151c0b2f7Stbbdev for (int i = 0; i < N; ++i ) { 25251c0b2f7Stbbdev int v = counters3[i]; 25351c0b2f7Stbbdev CHECK_MESSAGE( v == 0, "" ); 25451c0b2f7Stbbdev } 25551c0b2f7Stbbdev } 25651c0b2f7Stbbdev 25751c0b2f7Stbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 25851c0b2f7Stbbdev #include <array> 25951c0b2f7Stbbdev void test_follows_and_precedes_api() { 26051c0b2f7Stbbdev using namespace tbb::flow; 26151c0b2f7Stbbdev 26251c0b2f7Stbbdev graph g; 26351c0b2f7Stbbdev 26451c0b2f7Stbbdev std::array<buffer_node<bool>, 3> successors {{ 26551c0b2f7Stbbdev buffer_node<bool>(g), 26651c0b2f7Stbbdev buffer_node<bool>(g), 26751c0b2f7Stbbdev buffer_node<bool>(g) 26851c0b2f7Stbbdev }}; 26951c0b2f7Stbbdev 27051c0b2f7Stbbdev bool do_try_put = true; 27151c0b2f7Stbbdev input_node<bool> src( 27251c0b2f7Stbbdev precedes(successors[0], successors[1], successors[2]), 27351c0b2f7Stbbdev [&](tbb::flow_control& fc) -> bool { 27451c0b2f7Stbbdev if(!do_try_put) 27551c0b2f7Stbbdev fc.stop(); 27651c0b2f7Stbbdev do_try_put = !do_try_put; 27751c0b2f7Stbbdev return true; 27851c0b2f7Stbbdev } 27951c0b2f7Stbbdev ); 28051c0b2f7Stbbdev 28151c0b2f7Stbbdev src.activate(); 28251c0b2f7Stbbdev g.wait_for_all(); 28351c0b2f7Stbbdev 28451c0b2f7Stbbdev bool storage; 28551c0b2f7Stbbdev for(auto& successor: successors) { 28651c0b2f7Stbbdev CHECK_MESSAGE((successor.try_get(storage) && !successor.try_get(storage)), 28751c0b2f7Stbbdev "Not exact edge quantity was made"); 28851c0b2f7Stbbdev } 28951c0b2f7Stbbdev } 29051c0b2f7Stbbdev #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 29151c0b2f7Stbbdev 29251c0b2f7Stbbdev //! Test push, push-pull behavior and copy constructor 29351c0b2f7Stbbdev //! \brief \ref error_guessing \ref requirement 29451c0b2f7Stbbdev TEST_CASE("Single destination tests"){ 29551c0b2f7Stbbdev for ( unsigned int p = utils::MinThread; p < utils::MaxThread; ++p ) { 29651c0b2f7Stbbdev tbb::task_arena arena(p); 29751c0b2f7Stbbdev arena.execute( 29851c0b2f7Stbbdev [&]() { 29951c0b2f7Stbbdev test_single_dest<int>(); 30051c0b2f7Stbbdev test_single_dest<float>(); 30151c0b2f7Stbbdev } 30251c0b2f7Stbbdev ); 30351c0b2f7Stbbdev } 30451c0b2f7Stbbdev } 30551c0b2f7Stbbdev 30651c0b2f7Stbbdev //! Test reset variants 30751c0b2f7Stbbdev //! \brief \ref error_guessing 30851c0b2f7Stbbdev TEST_CASE("Reset test"){ 30951c0b2f7Stbbdev test_reset(); 31051c0b2f7Stbbdev } 31151c0b2f7Stbbdev 31251c0b2f7Stbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 31351c0b2f7Stbbdev //! Test follows and precedes API 31451c0b2f7Stbbdev //! \brief \ref error_guessing 31551c0b2f7Stbbdev TEST_CASE("Follows and precedes API"){ 31651c0b2f7Stbbdev test_follows_and_precedes_api(); 31751c0b2f7Stbbdev } 31851c0b2f7Stbbdev #endif 31951c0b2f7Stbbdev 32051c0b2f7Stbbdev //! Test try_get before activation 32151c0b2f7Stbbdev //! \brief \ref error_guessing 32251c0b2f7Stbbdev TEST_CASE("try_get before activation"){ 32351c0b2f7Stbbdev tbb::flow::graph g; 324478de5b1Stbbdev tbb::flow::input_node<int> in(g, [&](tbb::flow_control& fc) { fc.stop(); return 0;}); 32551c0b2f7Stbbdev 32651c0b2f7Stbbdev int tmp = -1; 32751c0b2f7Stbbdev CHECK_MESSAGE((in.try_get(tmp) == false), "try_get before activation should not succeed"); 32851c0b2f7Stbbdev } 329478de5b1Stbbdev 330478de5b1Stbbdev #if __TBB_CPP20_CONCEPTS_PRESENT 331478de5b1Stbbdev //! \brief \ref error_guessing 332478de5b1Stbbdev TEST_CASE("constraints for input_node output") { 333478de5b1Stbbdev struct Object : test_concepts::Copyable, test_concepts::CopyAssignable {}; 334478de5b1Stbbdev 335478de5b1Stbbdev static_assert(utils::well_formed_instantiation<tbb::flow::input_node, Object>); 336478de5b1Stbbdev static_assert(utils::well_formed_instantiation<tbb::flow::input_node, int>); 337478de5b1Stbbdev static_assert(!utils::well_formed_instantiation<tbb::flow::input_node, test_concepts::NonCopyable>); 338478de5b1Stbbdev static_assert(!utils::well_formed_instantiation<tbb::flow::input_node, test_concepts::NonCopyAssignable>); 339478de5b1Stbbdev } 340478de5b1Stbbdev 341478de5b1Stbbdev template <typename Output, typename Body> 342478de5b1Stbbdev concept can_call_input_node_ctor = requires( tbb::flow::graph& graph, Body body, tbb::flow::buffer_node<int> f ) { 343478de5b1Stbbdev tbb::flow::input_node<Output>(graph, body); 344478de5b1Stbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 345478de5b1Stbbdev tbb::flow::input_node<Output>(tbb::flow::precedes(f), body); 346478de5b1Stbbdev #endif 347478de5b1Stbbdev }; 348478de5b1Stbbdev 349478de5b1Stbbdev //! \brief \ref error_guessing 350478de5b1Stbbdev TEST_CASE("constraints for input_node body") { 351478de5b1Stbbdev using output_type = int; 352478de5b1Stbbdev using namespace test_concepts::input_node_body; 353478de5b1Stbbdev 354478de5b1Stbbdev static_assert(can_call_input_node_ctor<output_type, Correct<output_type>>); 355478de5b1Stbbdev static_assert(!can_call_input_node_ctor<output_type, NonCopyable<output_type>>); 356478de5b1Stbbdev static_assert(!can_call_input_node_ctor<output_type, NonDestructible<output_type>>); 357478de5b1Stbbdev static_assert(!can_call_input_node_ctor<output_type, NoOperatorRoundBrackets<output_type>>); 358478de5b1Stbbdev static_assert(!can_call_input_node_ctor<output_type, WrongInputOperatorRoundBrackets<output_type>>); 359478de5b1Stbbdev static_assert(!can_call_input_node_ctor<output_type, WrongReturnOperatorRoundBrackets<output_type>>); 360478de5b1Stbbdev } 361478de5b1Stbbdev #endif // __TBB_CPP20_CONCEPTS_PRESENT 362