151c0b2f7Stbbdev /* 2*b15aabb3Stbbdev Copyright (c) 2005-2021 Intel Corporation 351c0b2f7Stbbdev 451c0b2f7Stbbdev Licensed under the Apache License, Version 2.0 (the "License"); 551c0b2f7Stbbdev you may not use this file except in compliance with the License. 651c0b2f7Stbbdev You may obtain a copy of the License at 751c0b2f7Stbbdev 851c0b2f7Stbbdev http://www.apache.org/licenses/LICENSE-2.0 951c0b2f7Stbbdev 1051c0b2f7Stbbdev Unless required by applicable law or agreed to in writing, software 1151c0b2f7Stbbdev distributed under the License is distributed on an "AS IS" BASIS, 1251c0b2f7Stbbdev WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 1351c0b2f7Stbbdev See the License for the specific language governing permissions and 1451c0b2f7Stbbdev limitations under the License. 1551c0b2f7Stbbdev */ 1651c0b2f7Stbbdev 17*b15aabb3Stbbdev #if __INTEL_COMPILER && _MSC_VER 18*b15aabb3Stbbdev #pragma warning(disable : 2586) // decorated name length exceeded, name was truncated 19*b15aabb3Stbbdev #endif 2051c0b2f7Stbbdev 2151c0b2f7Stbbdev #include "common/config.h" 2251c0b2f7Stbbdev 2351c0b2f7Stbbdev #define __TBB_EXTRA_DEBUG 1 2451c0b2f7Stbbdev #include "tbb/flow_graph.h" 2551c0b2f7Stbbdev 2651c0b2f7Stbbdev #include "tbb/task.h" 2751c0b2f7Stbbdev #include "tbb/global_control.h" 2851c0b2f7Stbbdev 2951c0b2f7Stbbdev #include "common/test.h" 3051c0b2f7Stbbdev #include "common/utils.h" 3151c0b2f7Stbbdev #include "common/utils_assert.h" 3251c0b2f7Stbbdev #include "common/graph_utils.h" 3351c0b2f7Stbbdev #include "common/spin_barrier.h" 3451c0b2f7Stbbdev #include "common/test_follows_and_precedes_api.h" 3551c0b2f7Stbbdev 3651c0b2f7Stbbdev #include <string> 3751c0b2f7Stbbdev #include <thread> 3851c0b2f7Stbbdev #include <mutex> 3951c0b2f7Stbbdev 4051c0b2f7Stbbdev 4151c0b2f7Stbbdev //! \file test_async_node.cpp 4251c0b2f7Stbbdev //! \brief Test for [flow_graph.async_node] specification 4351c0b2f7Stbbdev 4451c0b2f7Stbbdev 4551c0b2f7Stbbdev class minimal_type { 4651c0b2f7Stbbdev template<typename T> 4751c0b2f7Stbbdev friend struct place_wrapper; 4851c0b2f7Stbbdev 4951c0b2f7Stbbdev int value; 5051c0b2f7Stbbdev 5151c0b2f7Stbbdev public: 5251c0b2f7Stbbdev minimal_type() : value(-1) {} 5351c0b2f7Stbbdev minimal_type(int v) : value(v) {} 5451c0b2f7Stbbdev minimal_type(const minimal_type &m) : value(m.value) { } 5551c0b2f7Stbbdev minimal_type &operator=(const minimal_type &m) { value = m.value; return *this; } 5651c0b2f7Stbbdev }; 5751c0b2f7Stbbdev 5851c0b2f7Stbbdev template <typename T> 5951c0b2f7Stbbdev struct place_wrapper { 6051c0b2f7Stbbdev typedef T wrapped_type; 6151c0b2f7Stbbdev T value; 6251c0b2f7Stbbdev std::thread::id thread_id; 6351c0b2f7Stbbdev 6451c0b2f7Stbbdev place_wrapper( int v = 0 ) : value(v), thread_id(std::this_thread::get_id()) {} 6551c0b2f7Stbbdev 6651c0b2f7Stbbdev template <typename Q> 6751c0b2f7Stbbdev place_wrapper(const place_wrapper<Q>& v) 6851c0b2f7Stbbdev : value(v.value), thread_id(v.thread_id) 6951c0b2f7Stbbdev {} 7051c0b2f7Stbbdev 7151c0b2f7Stbbdev template <typename Q> 7251c0b2f7Stbbdev place_wrapper<Q>& operator=(const place_wrapper<Q>& v) { 7351c0b2f7Stbbdev if (this != &v) { 7451c0b2f7Stbbdev value = v.value; 7551c0b2f7Stbbdev thread_id = v.thread_id; 7651c0b2f7Stbbdev } 7751c0b2f7Stbbdev return *this; 7851c0b2f7Stbbdev } 7951c0b2f7Stbbdev 8051c0b2f7Stbbdev }; 8151c0b2f7Stbbdev 8251c0b2f7Stbbdev template<typename T1, typename T2> 8351c0b2f7Stbbdev struct wrapper_helper { 8451c0b2f7Stbbdev static void check(const T1 &, const T2 &) { } 8551c0b2f7Stbbdev static void copy_value(const T1 &in, T2 &out) { out = in; } 8651c0b2f7Stbbdev }; 8751c0b2f7Stbbdev 8851c0b2f7Stbbdev template<typename T1, typename T2> 8951c0b2f7Stbbdev struct wrapper_helper< place_wrapper<T1>, place_wrapper<T2> > { 9051c0b2f7Stbbdev static void check(const place_wrapper<T1> &a, const place_wrapper<T2> &b) { 9151c0b2f7Stbbdev CHECK_MESSAGE( ( (a.thread_id != b.thread_id)), "same thread used to execute adjacent nodes"); 9251c0b2f7Stbbdev return; 9351c0b2f7Stbbdev } 9451c0b2f7Stbbdev static void copy_value(const place_wrapper<T1> &in, place_wrapper<T2> &out) { 9551c0b2f7Stbbdev out.value = in.value; 9651c0b2f7Stbbdev } 9751c0b2f7Stbbdev }; 9851c0b2f7Stbbdev 9951c0b2f7Stbbdev const int NUMBER_OF_MSGS = 10; 10051c0b2f7Stbbdev const int UNKNOWN_NUMBER_OF_ITEMS = -1; 10151c0b2f7Stbbdev std::atomic<int> async_body_exec_count; 10251c0b2f7Stbbdev std::atomic<int> async_activity_processed_msg_count; 10351c0b2f7Stbbdev std::atomic<int> end_body_exec_count; 10451c0b2f7Stbbdev 10551c0b2f7Stbbdev // queueing required in test_reset for testing of cancellation 10651c0b2f7Stbbdev typedef tbb::flow::async_node< int, int, tbb::flow::queueing > counting_async_node_type; 10751c0b2f7Stbbdev typedef counting_async_node_type::gateway_type counting_gateway_type; 10851c0b2f7Stbbdev 10951c0b2f7Stbbdev struct counting_async_unlimited_body { 11051c0b2f7Stbbdev 11151c0b2f7Stbbdev counting_async_unlimited_body(tbb::task_group_context& graph_tgc) : my_tgc( graph_tgc ) {} 11251c0b2f7Stbbdev 11351c0b2f7Stbbdev void operator()( const int &input, counting_gateway_type& gateway) { 11451c0b2f7Stbbdev // TODO revamp: reconsider logging for the tests. It is known that frequent calls to 11551c0b2f7Stbbdev // doctest's INFO cause issues. 11651c0b2f7Stbbdev 11751c0b2f7Stbbdev // INFO( "Body execution with input == " << input << "\n"); 11851c0b2f7Stbbdev ++async_body_exec_count; 11951c0b2f7Stbbdev if ( input == -1 ) { 12051c0b2f7Stbbdev bool result = my_tgc.cancel_group_execution(); 12151c0b2f7Stbbdev // INFO( "Canceling graph execution\n" ); 12251c0b2f7Stbbdev CHECK_MESSAGE( ( result == true), "attempted to cancel graph twice" ); 12351c0b2f7Stbbdev utils::Sleep(50); 12451c0b2f7Stbbdev } 12551c0b2f7Stbbdev gateway.try_put(input); 12651c0b2f7Stbbdev } 12751c0b2f7Stbbdev private: 12851c0b2f7Stbbdev tbb::task_group_context& my_tgc; 12951c0b2f7Stbbdev }; 13051c0b2f7Stbbdev 13151c0b2f7Stbbdev struct counting_async_serial_body : counting_async_unlimited_body { 13251c0b2f7Stbbdev typedef counting_async_unlimited_body base_type; 13351c0b2f7Stbbdev int my_async_body_exec_count; 13451c0b2f7Stbbdev 13551c0b2f7Stbbdev counting_async_serial_body(tbb::task_group_context& tgc) 13651c0b2f7Stbbdev : base_type(tgc), my_async_body_exec_count( 0 ) { } 13751c0b2f7Stbbdev 13851c0b2f7Stbbdev void operator()( const int &input, counting_gateway_type& gateway ) { 13951c0b2f7Stbbdev ++my_async_body_exec_count; 14051c0b2f7Stbbdev base_type::operator()( input, gateway ); 14151c0b2f7Stbbdev } 14251c0b2f7Stbbdev }; 14351c0b2f7Stbbdev 14451c0b2f7Stbbdev void test_reset() { 14551c0b2f7Stbbdev const int N = NUMBER_OF_MSGS; 14651c0b2f7Stbbdev async_body_exec_count = 0; 14751c0b2f7Stbbdev 14851c0b2f7Stbbdev tbb::task_group_context graph_ctx; 14951c0b2f7Stbbdev tbb::flow::graph g(graph_ctx); 15051c0b2f7Stbbdev counting_async_node_type a(g, tbb::flow::serial, counting_async_serial_body(graph_ctx) ); 15151c0b2f7Stbbdev 15251c0b2f7Stbbdev const int R = 3; 15351c0b2f7Stbbdev std::vector< std::shared_ptr<harness_counting_receiver<int>> > r; 15451c0b2f7Stbbdev for (size_t i = 0; i < R; ++i) { 15551c0b2f7Stbbdev r.push_back( std::make_shared<harness_counting_receiver<int>>(g) ); 15651c0b2f7Stbbdev } 15751c0b2f7Stbbdev 15851c0b2f7Stbbdev for (int i = 0; i < R; ++i) { 15951c0b2f7Stbbdev tbb::flow::make_edge(a, *r[i]); 16051c0b2f7Stbbdev } 16151c0b2f7Stbbdev 16251c0b2f7Stbbdev INFO( "One body execution\n" ); 16351c0b2f7Stbbdev a.try_put(-1); 16451c0b2f7Stbbdev for (int i = 0; i < N; ++i) { 16551c0b2f7Stbbdev a.try_put(i); 16651c0b2f7Stbbdev } 16751c0b2f7Stbbdev g.wait_for_all(); 16851c0b2f7Stbbdev // should be canceled with only 1 item reaching the async_body and the counting receivers 16951c0b2f7Stbbdev // and N items left in the node's queue 17051c0b2f7Stbbdev CHECK_MESSAGE( ( g.is_cancelled() == true), "task group not canceled" ); 17151c0b2f7Stbbdev 17251c0b2f7Stbbdev counting_async_serial_body b1 = tbb::flow::copy_body<counting_async_serial_body>(a); 17351c0b2f7Stbbdev CHECK_MESSAGE( ( int(async_body_exec_count) == int(b1.my_async_body_exec_count)), "body and global body counts are different" ); 17451c0b2f7Stbbdev CHECK_MESSAGE( ( int(async_body_exec_count) == 1), "global body execution count not 1" ); 17551c0b2f7Stbbdev for (int i = 0; i < R; ++i) { 17651c0b2f7Stbbdev CHECK_MESSAGE( ( int(r[i]->my_count) == 1), "counting receiver count not 1" ); 17751c0b2f7Stbbdev } 17851c0b2f7Stbbdev 17951c0b2f7Stbbdev // should clear the async_node queue, but retain its local count at 1 and keep all edges 18051c0b2f7Stbbdev g.reset(tbb::flow::rf_reset_protocol); 18151c0b2f7Stbbdev 18251c0b2f7Stbbdev INFO( "N body executions\n" ); 18351c0b2f7Stbbdev for (int i = 0; i < N; ++i) { 18451c0b2f7Stbbdev a.try_put(i); 18551c0b2f7Stbbdev } 18651c0b2f7Stbbdev g.wait_for_all(); 18751c0b2f7Stbbdev CHECK_MESSAGE( ( g.is_cancelled() == false), "task group not canceled" ); 18851c0b2f7Stbbdev 18951c0b2f7Stbbdev // a total of N+1 items should have passed through the node body 19051c0b2f7Stbbdev // the local body count should also be N+1 19151c0b2f7Stbbdev // and the counting receivers should all have a count of N+1 19251c0b2f7Stbbdev counting_async_serial_body b2 = tbb::flow::copy_body<counting_async_serial_body>(a); 19351c0b2f7Stbbdev CHECK_MESSAGE( int(async_body_exec_count) == int(b2.my_async_body_exec_count), 19451c0b2f7Stbbdev "local and global body execution counts are different" ); 19551c0b2f7Stbbdev INFO( "async_body_exec_count==" << int(async_body_exec_count) << "\n" ); 19651c0b2f7Stbbdev CHECK_MESSAGE( ( int(async_body_exec_count) == N+1), "global body execution count not N+1" ); 19751c0b2f7Stbbdev for (int i = 0; i < R; ++i) { 19851c0b2f7Stbbdev CHECK_MESSAGE( ( int(r[i]->my_count) == N+1), "counting receiver has not received N+1 items" ); 19951c0b2f7Stbbdev } 20051c0b2f7Stbbdev 20151c0b2f7Stbbdev INFO( "N body executions with new bodies\n" ); 20251c0b2f7Stbbdev // should clear the async_node queue and reset its local count to 0, but keep all edges 20351c0b2f7Stbbdev g.reset(tbb::flow::rf_reset_bodies); 20451c0b2f7Stbbdev for (int i = 0; i < N; ++i) { 20551c0b2f7Stbbdev a.try_put(i); 20651c0b2f7Stbbdev } 20751c0b2f7Stbbdev g.wait_for_all(); 20851c0b2f7Stbbdev CHECK_MESSAGE( ( g.is_cancelled() == false), "task group not canceled" ); 20951c0b2f7Stbbdev 21051c0b2f7Stbbdev // a total of 2N+1 items should have passed through the node body 21151c0b2f7Stbbdev // the local body count should be N 21251c0b2f7Stbbdev // and the counting receivers should all have a count of 2N+1 21351c0b2f7Stbbdev counting_async_serial_body b3 = tbb::flow::copy_body<counting_async_serial_body>(a); 21451c0b2f7Stbbdev CHECK_MESSAGE( ( int(async_body_exec_count) == 2*N+1), "global body execution count not 2N+1" ); 21551c0b2f7Stbbdev CHECK_MESSAGE( ( int(b3.my_async_body_exec_count) == N), "local body execution count not N" ); 21651c0b2f7Stbbdev for (int i = 0; i < R; ++i) { 21751c0b2f7Stbbdev CHECK_MESSAGE( ( int(r[i]->my_count) == 2*N+1), "counting receiver has not received 2N+1 items" ); 21851c0b2f7Stbbdev } 21951c0b2f7Stbbdev 22051c0b2f7Stbbdev // should clear the async_node queue and keep its local count at N and remove all edges 22151c0b2f7Stbbdev INFO( "N body executions with no edges\n" ); 22251c0b2f7Stbbdev g.reset(tbb::flow::rf_clear_edges); 22351c0b2f7Stbbdev for (int i = 0; i < N; ++i) { 22451c0b2f7Stbbdev a.try_put(i); 22551c0b2f7Stbbdev } 22651c0b2f7Stbbdev g.wait_for_all(); 22751c0b2f7Stbbdev CHECK_MESSAGE( ( g.is_cancelled() == false), "task group not canceled" ); 22851c0b2f7Stbbdev 22951c0b2f7Stbbdev // a total of 3N+1 items should have passed through the node body 23051c0b2f7Stbbdev // the local body count should now be 2*N 23151c0b2f7Stbbdev // and the counting receivers should remain at a count of 2N+1 23251c0b2f7Stbbdev counting_async_serial_body b4 = tbb::flow::copy_body<counting_async_serial_body>(a); 23351c0b2f7Stbbdev CHECK_MESSAGE( ( int(async_body_exec_count) == 3*N+1), "global body execution count not 3N+1" ); 23451c0b2f7Stbbdev CHECK_MESSAGE( ( int(b4.my_async_body_exec_count) == 2*N), "local body execution count not 2N" ); 23551c0b2f7Stbbdev for (int i = 0; i < R; ++i) { 23651c0b2f7Stbbdev CHECK_MESSAGE( ( int(r[i]->my_count) == 2*N+1), "counting receiver has not received 2N+1 items" ); 23751c0b2f7Stbbdev } 23851c0b2f7Stbbdev 23951c0b2f7Stbbdev // put back 1 edge to receiver 0 24051c0b2f7Stbbdev INFO( "N body executions with 1 edge\n" ); 24151c0b2f7Stbbdev tbb::flow::make_edge(a, *r[0]); 24251c0b2f7Stbbdev for (int i = 0; i < N; ++i) { 24351c0b2f7Stbbdev a.try_put(i); 24451c0b2f7Stbbdev } 24551c0b2f7Stbbdev g.wait_for_all(); 24651c0b2f7Stbbdev CHECK_MESSAGE( ( g.is_cancelled() == false), "task group not canceled" ); 24751c0b2f7Stbbdev 24851c0b2f7Stbbdev // a total of 4N+1 items should have passed through the node body 24951c0b2f7Stbbdev // the local body count should now be 3*N 25051c0b2f7Stbbdev // and all of the counting receivers should remain at a count of 2N+1, except r[0] which should be 3N+1 25151c0b2f7Stbbdev counting_async_serial_body b5 = tbb::flow::copy_body<counting_async_serial_body>(a); 25251c0b2f7Stbbdev CHECK_MESSAGE( ( int(async_body_exec_count) == 4*N+1), "global body execution count not 4N+1" ); 25351c0b2f7Stbbdev CHECK_MESSAGE( ( int(b5.my_async_body_exec_count) == 3*N), "local body execution count not 3N" ); 25451c0b2f7Stbbdev CHECK_MESSAGE( ( int(r[0]->my_count) == 3*N+1), "counting receiver has not received 3N+1 items" ); 25551c0b2f7Stbbdev for (int i = 1; i < R; ++i) { 25651c0b2f7Stbbdev CHECK_MESSAGE( ( int(r[i]->my_count) == 2*N+1), "counting receiver has not received 2N+1 items" ); 25751c0b2f7Stbbdev } 25851c0b2f7Stbbdev 25951c0b2f7Stbbdev // should clear the async_node queue and keep its local count at N and remove all edges 26051c0b2f7Stbbdev INFO( "N body executions with no edges and new body\n" ); 26151c0b2f7Stbbdev g.reset(static_cast<tbb::flow::reset_flags>(tbb::flow::rf_reset_bodies|tbb::flow::rf_clear_edges)); 26251c0b2f7Stbbdev for (int i = 0; i < N; ++i) { 26351c0b2f7Stbbdev a.try_put(i); 26451c0b2f7Stbbdev } 26551c0b2f7Stbbdev g.wait_for_all(); 26651c0b2f7Stbbdev CHECK_MESSAGE( ( g.is_cancelled() == false), "task group not canceled" ); 26751c0b2f7Stbbdev 26851c0b2f7Stbbdev // a total of 4N+1 items should have passed through the node body 26951c0b2f7Stbbdev // the local body count should now be 3*N 27051c0b2f7Stbbdev // and all of the counting receivers should remain at a count of 2N+1, except r[0] which should be 3N+1 27151c0b2f7Stbbdev counting_async_serial_body b6 = tbb::flow::copy_body<counting_async_serial_body>(a); 27251c0b2f7Stbbdev CHECK_MESSAGE( ( int(async_body_exec_count) == 5*N+1), "global body execution count not 5N+1" ); 27351c0b2f7Stbbdev CHECK_MESSAGE( ( int(b6.my_async_body_exec_count) == N), "local body execution count not N" ); 27451c0b2f7Stbbdev CHECK_MESSAGE( ( int(r[0]->my_count) == 3*N+1), "counting receiver has not received 3N+1 items" ); 27551c0b2f7Stbbdev for (int i = 1; i < R; ++i) { 27651c0b2f7Stbbdev CHECK_MESSAGE( ( int(r[i]->my_count) == 2*N+1), "counting receiver has not received 2N+1 items" ); 27751c0b2f7Stbbdev } 27851c0b2f7Stbbdev } 27951c0b2f7Stbbdev 28051c0b2f7Stbbdev 28151c0b2f7Stbbdev #include <mutex> 28251c0b2f7Stbbdev 28351c0b2f7Stbbdev template <typename T> 28451c0b2f7Stbbdev class async_activity_queue { 28551c0b2f7Stbbdev public: 28651c0b2f7Stbbdev void push( const T& item ) { 28751c0b2f7Stbbdev std::lock_guard<mutex_t> lock( m_mutex ); 28851c0b2f7Stbbdev m_queue.push( item ); 28951c0b2f7Stbbdev } 29051c0b2f7Stbbdev 29151c0b2f7Stbbdev bool try_pop( T& item ) { 29251c0b2f7Stbbdev std::lock_guard<mutex_t> lock( m_mutex ); 29351c0b2f7Stbbdev if( m_queue.empty() ) 29451c0b2f7Stbbdev return false; 29551c0b2f7Stbbdev item = m_queue.front(); 29651c0b2f7Stbbdev m_queue.pop(); 29751c0b2f7Stbbdev return true; 29851c0b2f7Stbbdev } 29951c0b2f7Stbbdev 30051c0b2f7Stbbdev bool empty() { 30151c0b2f7Stbbdev std::lock_guard<mutex_t> lock( m_mutex ); 30251c0b2f7Stbbdev return m_queue.empty(); 30351c0b2f7Stbbdev } 30451c0b2f7Stbbdev 30551c0b2f7Stbbdev private: 30651c0b2f7Stbbdev typedef std::mutex mutex_t; 30751c0b2f7Stbbdev mutex_t m_mutex; 30851c0b2f7Stbbdev std::queue<T> m_queue; 30951c0b2f7Stbbdev }; 31051c0b2f7Stbbdev 31151c0b2f7Stbbdev template< typename Input, typename Output > 31251c0b2f7Stbbdev class async_activity : utils::NoAssign { 31351c0b2f7Stbbdev public: 31451c0b2f7Stbbdev typedef Input input_type; 31551c0b2f7Stbbdev typedef Output output_type; 31651c0b2f7Stbbdev typedef tbb::flow::async_node< input_type, output_type > async_node_type; 31751c0b2f7Stbbdev typedef typename async_node_type::gateway_type gateway_type; 31851c0b2f7Stbbdev 31951c0b2f7Stbbdev struct work_type { 32051c0b2f7Stbbdev input_type input; 32151c0b2f7Stbbdev gateway_type* gateway; 32251c0b2f7Stbbdev }; 32351c0b2f7Stbbdev 32451c0b2f7Stbbdev class ServiceThreadBody { 32551c0b2f7Stbbdev public: 32651c0b2f7Stbbdev ServiceThreadBody( async_activity* activity ) : my_activity( activity ) {} 32751c0b2f7Stbbdev void operator()() { my_activity->process(); } 32851c0b2f7Stbbdev private: 32951c0b2f7Stbbdev async_activity* my_activity; 33051c0b2f7Stbbdev }; 33151c0b2f7Stbbdev 33251c0b2f7Stbbdev async_activity(int expected_items, bool deferred = false, int sleep_time = 50) 33351c0b2f7Stbbdev : my_expected_items(expected_items), my_sleep_time(sleep_time) 33451c0b2f7Stbbdev { 33551c0b2f7Stbbdev is_active = !deferred; 33651c0b2f7Stbbdev my_quit = false; 33751c0b2f7Stbbdev std::thread( ServiceThreadBody( this ) ).swap( my_service_thread ); 33851c0b2f7Stbbdev } 33951c0b2f7Stbbdev 34051c0b2f7Stbbdev private: 34151c0b2f7Stbbdev 34251c0b2f7Stbbdev async_activity( const async_activity& ) 34351c0b2f7Stbbdev : my_expected_items(UNKNOWN_NUMBER_OF_ITEMS), my_sleep_time(0) 34451c0b2f7Stbbdev { 34551c0b2f7Stbbdev is_active = true; 34651c0b2f7Stbbdev } 34751c0b2f7Stbbdev 34851c0b2f7Stbbdev public: 34951c0b2f7Stbbdev ~async_activity() { 35051c0b2f7Stbbdev stop(); 35151c0b2f7Stbbdev my_service_thread.join(); 35251c0b2f7Stbbdev } 35351c0b2f7Stbbdev 35451c0b2f7Stbbdev void submit( const input_type &input, gateway_type& gateway ) { 35551c0b2f7Stbbdev work_type work = {input, &gateway}; 35651c0b2f7Stbbdev my_work_queue.push( work ); 35751c0b2f7Stbbdev } 35851c0b2f7Stbbdev 35951c0b2f7Stbbdev void process() { 36051c0b2f7Stbbdev do { 36151c0b2f7Stbbdev work_type work; 36251c0b2f7Stbbdev if( is_active && my_work_queue.try_pop( work ) ) { 36351c0b2f7Stbbdev utils::Sleep(my_sleep_time); 36451c0b2f7Stbbdev ++async_activity_processed_msg_count; 36551c0b2f7Stbbdev output_type output; 36651c0b2f7Stbbdev wrapper_helper<output_type, output_type>::copy_value(work.input, output); 36751c0b2f7Stbbdev wrapper_helper<output_type, output_type>::check(work.input, output); 36851c0b2f7Stbbdev work.gateway->try_put(output); 36951c0b2f7Stbbdev if ( my_expected_items == UNKNOWN_NUMBER_OF_ITEMS || 37051c0b2f7Stbbdev int(async_activity_processed_msg_count) == my_expected_items ) { 37151c0b2f7Stbbdev work.gateway->release_wait(); 37251c0b2f7Stbbdev } 37351c0b2f7Stbbdev } 37451c0b2f7Stbbdev } while( my_quit == false || !my_work_queue.empty()); 37551c0b2f7Stbbdev } 37651c0b2f7Stbbdev 37751c0b2f7Stbbdev void stop() { 37851c0b2f7Stbbdev my_quit = true; 37951c0b2f7Stbbdev } 38051c0b2f7Stbbdev 38151c0b2f7Stbbdev void activate() { 38251c0b2f7Stbbdev is_active = true; 38351c0b2f7Stbbdev } 38451c0b2f7Stbbdev 38551c0b2f7Stbbdev bool should_reserve_each_time() { 38651c0b2f7Stbbdev if ( my_expected_items == UNKNOWN_NUMBER_OF_ITEMS ) 38751c0b2f7Stbbdev return true; 38851c0b2f7Stbbdev else 38951c0b2f7Stbbdev return false; 39051c0b2f7Stbbdev } 39151c0b2f7Stbbdev 39251c0b2f7Stbbdev private: 39351c0b2f7Stbbdev 39451c0b2f7Stbbdev const int my_expected_items; 39551c0b2f7Stbbdev const int my_sleep_time; 39651c0b2f7Stbbdev std::atomic< bool > is_active; 39751c0b2f7Stbbdev 39851c0b2f7Stbbdev async_activity_queue<work_type> my_work_queue; 39951c0b2f7Stbbdev 40051c0b2f7Stbbdev std::atomic< bool > my_quit; 40151c0b2f7Stbbdev 40251c0b2f7Stbbdev std::thread my_service_thread; 40351c0b2f7Stbbdev }; 40451c0b2f7Stbbdev 40551c0b2f7Stbbdev template<typename Input, typename Output> 40651c0b2f7Stbbdev struct basic_test { 40751c0b2f7Stbbdev typedef Input input_type; 40851c0b2f7Stbbdev typedef Output output_type; 40951c0b2f7Stbbdev typedef tbb::flow::async_node< input_type, output_type > async_node_type; 41051c0b2f7Stbbdev typedef typename async_node_type::gateway_type gateway_type; 41151c0b2f7Stbbdev 41251c0b2f7Stbbdev basic_test() {} 41351c0b2f7Stbbdev 41451c0b2f7Stbbdev static int run(int async_expected_items = UNKNOWN_NUMBER_OF_ITEMS) { 41551c0b2f7Stbbdev async_activity<input_type, output_type> my_async_activity(async_expected_items); 41651c0b2f7Stbbdev 41751c0b2f7Stbbdev tbb::flow::graph g; 41851c0b2f7Stbbdev 41951c0b2f7Stbbdev tbb::flow::function_node< int, input_type > start_node( 42051c0b2f7Stbbdev g, tbb::flow::unlimited, [](int input) { return input_type(input); } 42151c0b2f7Stbbdev ); 42251c0b2f7Stbbdev async_node_type offload_node( 42351c0b2f7Stbbdev g, tbb::flow::unlimited, 42451c0b2f7Stbbdev [&] (const input_type &input, gateway_type& gateway) { 42551c0b2f7Stbbdev ++async_body_exec_count; 42651c0b2f7Stbbdev if(my_async_activity.should_reserve_each_time()) 42751c0b2f7Stbbdev gateway.reserve_wait(); 42851c0b2f7Stbbdev my_async_activity.submit(input, gateway); 42951c0b2f7Stbbdev } 43051c0b2f7Stbbdev ); 43151c0b2f7Stbbdev tbb::flow::function_node< output_type > end_node( 43251c0b2f7Stbbdev g, tbb::flow::unlimited, 43351c0b2f7Stbbdev [&](const output_type& input) { 43451c0b2f7Stbbdev ++end_body_exec_count; 43551c0b2f7Stbbdev output_type output; 43651c0b2f7Stbbdev wrapper_helper<output_type, output_type>::check(input, output); 43751c0b2f7Stbbdev } 43851c0b2f7Stbbdev ); 43951c0b2f7Stbbdev 44051c0b2f7Stbbdev tbb::flow::make_edge( start_node, offload_node ); 44151c0b2f7Stbbdev tbb::flow::make_edge( offload_node, end_node ); 44251c0b2f7Stbbdev 44351c0b2f7Stbbdev async_body_exec_count = 0; 44451c0b2f7Stbbdev async_activity_processed_msg_count = 0; 44551c0b2f7Stbbdev end_body_exec_count = 0; 44651c0b2f7Stbbdev 44751c0b2f7Stbbdev if (async_expected_items != UNKNOWN_NUMBER_OF_ITEMS) { 44851c0b2f7Stbbdev offload_node.gateway().reserve_wait(); 44951c0b2f7Stbbdev } 45051c0b2f7Stbbdev for (int i = 0; i < NUMBER_OF_MSGS; ++i) { 45151c0b2f7Stbbdev start_node.try_put(i); 45251c0b2f7Stbbdev } 45351c0b2f7Stbbdev g.wait_for_all(); 45451c0b2f7Stbbdev CHECK_MESSAGE( ( async_body_exec_count == NUMBER_OF_MSGS), "AsyncBody processed wrong number of signals" ); 45551c0b2f7Stbbdev CHECK_MESSAGE( ( async_activity_processed_msg_count == NUMBER_OF_MSGS), "AsyncActivity processed wrong number of signals" ); 45651c0b2f7Stbbdev CHECK_MESSAGE( ( end_body_exec_count == NUMBER_OF_MSGS), "EndBody processed wrong number of signals"); 45751c0b2f7Stbbdev INFO( "async_body_exec_count == " << int(async_body_exec_count) << 45851c0b2f7Stbbdev " == async_activity_processed_msg_count == " << int(async_activity_processed_msg_count) << 45951c0b2f7Stbbdev " == end_body_exec_count == " << int(end_body_exec_count) << "\n" 46051c0b2f7Stbbdev ); 46151c0b2f7Stbbdev return 0; 46251c0b2f7Stbbdev } 46351c0b2f7Stbbdev 46451c0b2f7Stbbdev }; 46551c0b2f7Stbbdev 46651c0b2f7Stbbdev int test_copy_ctor() { 46751c0b2f7Stbbdev const int N = NUMBER_OF_MSGS; 46851c0b2f7Stbbdev async_body_exec_count = 0; 46951c0b2f7Stbbdev 47051c0b2f7Stbbdev tbb::flow::graph g; 47151c0b2f7Stbbdev 47251c0b2f7Stbbdev harness_counting_receiver<int> r1(g); 47351c0b2f7Stbbdev harness_counting_receiver<int> r2(g); 47451c0b2f7Stbbdev 47551c0b2f7Stbbdev tbb::task_group_context graph_ctx; 47651c0b2f7Stbbdev counting_async_node_type a(g, tbb::flow::unlimited, counting_async_unlimited_body(graph_ctx) ); 47751c0b2f7Stbbdev counting_async_node_type b(a); 47851c0b2f7Stbbdev 47951c0b2f7Stbbdev tbb::flow::make_edge(a, r1); // C++11-style of making edges 48051c0b2f7Stbbdev tbb::flow::make_edge(tbb::flow::output_port<0>(b), r2); // usual way of making edges 48151c0b2f7Stbbdev 48251c0b2f7Stbbdev for (int i = 0; i < N; ++i) { 48351c0b2f7Stbbdev a.try_put(i); 48451c0b2f7Stbbdev } 48551c0b2f7Stbbdev g.wait_for_all(); 48651c0b2f7Stbbdev 48751c0b2f7Stbbdev INFO("async_body_exec_count = " << int(async_body_exec_count) << "\n" ); 48851c0b2f7Stbbdev INFO("r1.my_count == " << int(r1.my_count) << " and r2.my_count = " << int(r2.my_count) << "\n" ); 48951c0b2f7Stbbdev CHECK_MESSAGE( ( int(async_body_exec_count) == NUMBER_OF_MSGS), "AsyncBody processed wrong number of signals" ); 49051c0b2f7Stbbdev CHECK_MESSAGE( ( int(r1.my_count) == N), "counting receiver r1 has not received N items" ); 49151c0b2f7Stbbdev CHECK_MESSAGE( ( int(r2.my_count) == 0), "counting receiver r2 has not received 0 items" ); 49251c0b2f7Stbbdev 49351c0b2f7Stbbdev for (int i = 0; i < N; ++i) { 49451c0b2f7Stbbdev b.try_put(i); 49551c0b2f7Stbbdev } 49651c0b2f7Stbbdev g.wait_for_all(); 49751c0b2f7Stbbdev 49851c0b2f7Stbbdev INFO("async_body_exec_count = " << int(async_body_exec_count) << "\n" ); 49951c0b2f7Stbbdev INFO("r1.my_count == " << int(r1.my_count) << " and r2.my_count = " << int(r2.my_count) << "\n" ); 50051c0b2f7Stbbdev CHECK_MESSAGE( ( int(async_body_exec_count) == 2*NUMBER_OF_MSGS), "AsyncBody processed wrong number of signals" ); 50151c0b2f7Stbbdev CHECK_MESSAGE( ( int(r1.my_count) == N), "counting receiver r1 has not received N items" ); 50251c0b2f7Stbbdev CHECK_MESSAGE( ( int(r2.my_count) == N), "counting receiver r2 has not received N items" ); 50351c0b2f7Stbbdev return 0; 50451c0b2f7Stbbdev } 50551c0b2f7Stbbdev 50651c0b2f7Stbbdev std::atomic<int> main_tid_count; 50751c0b2f7Stbbdev 50851c0b2f7Stbbdev template<typename Input, typename Output> 50951c0b2f7Stbbdev struct spin_test { 51051c0b2f7Stbbdev typedef Input input_type; 51151c0b2f7Stbbdev typedef Output output_type; 51251c0b2f7Stbbdev typedef tbb::flow::async_node< input_type, output_type > async_node_type; 51351c0b2f7Stbbdev typedef typename async_node_type::gateway_type gateway_type; 51451c0b2f7Stbbdev 51551c0b2f7Stbbdev class end_body_type { 51651c0b2f7Stbbdev typedef Output output_type; 51751c0b2f7Stbbdev std::thread::id my_main_tid; 51851c0b2f7Stbbdev utils::SpinBarrier *my_barrier; 51951c0b2f7Stbbdev public: 52051c0b2f7Stbbdev end_body_type(std::thread::id t, utils::SpinBarrier &b) : my_main_tid(t), my_barrier(&b) { } 52151c0b2f7Stbbdev 52251c0b2f7Stbbdev void operator()( const output_type & ) { 52351c0b2f7Stbbdev ++end_body_exec_count; 52451c0b2f7Stbbdev if (std::this_thread::get_id() == my_main_tid) { 52551c0b2f7Stbbdev ++main_tid_count; 52651c0b2f7Stbbdev } 527*b15aabb3Stbbdev my_barrier->wait(); 52851c0b2f7Stbbdev } 52951c0b2f7Stbbdev }; 53051c0b2f7Stbbdev 53151c0b2f7Stbbdev spin_test() {} 53251c0b2f7Stbbdev 53351c0b2f7Stbbdev static int run(int nthreads, int async_expected_items = UNKNOWN_NUMBER_OF_ITEMS) { 53451c0b2f7Stbbdev async_activity<input_type, output_type> my_async_activity(async_expected_items, false, 0); 53551c0b2f7Stbbdev const int overall_message_count = nthreads * NUMBER_OF_MSGS; 53651c0b2f7Stbbdev utils::SpinBarrier spin_barrier(nthreads); 53751c0b2f7Stbbdev 53851c0b2f7Stbbdev tbb::flow::graph g; 53951c0b2f7Stbbdev tbb::flow::function_node<int, input_type> start_node( 54051c0b2f7Stbbdev g, tbb::flow::unlimited, [](int input) { return input_type(input); } 54151c0b2f7Stbbdev ); 54251c0b2f7Stbbdev async_node_type offload_node( 54351c0b2f7Stbbdev g, tbb::flow::unlimited, 54451c0b2f7Stbbdev [&](const input_type &input, gateway_type& gateway) { 54551c0b2f7Stbbdev ++async_body_exec_count; 54651c0b2f7Stbbdev if(my_async_activity.should_reserve_each_time()) 54751c0b2f7Stbbdev gateway.reserve_wait(); 54851c0b2f7Stbbdev my_async_activity.submit(input, gateway); 54951c0b2f7Stbbdev } 55051c0b2f7Stbbdev ); 55151c0b2f7Stbbdev tbb::flow::function_node<output_type> end_node( 55251c0b2f7Stbbdev g, tbb::flow::unlimited, end_body_type(std::this_thread::get_id(), spin_barrier) 55351c0b2f7Stbbdev ); 55451c0b2f7Stbbdev 55551c0b2f7Stbbdev tbb::flow::make_edge( start_node, offload_node ); 55651c0b2f7Stbbdev tbb::flow::make_edge( offload_node, end_node ); 55751c0b2f7Stbbdev 55851c0b2f7Stbbdev async_body_exec_count = 0; 55951c0b2f7Stbbdev async_activity_processed_msg_count = 0; 56051c0b2f7Stbbdev end_body_exec_count = 0; 56151c0b2f7Stbbdev main_tid_count = 0; 56251c0b2f7Stbbdev 56351c0b2f7Stbbdev if (async_expected_items != UNKNOWN_NUMBER_OF_ITEMS ) { 56451c0b2f7Stbbdev offload_node.gateway().reserve_wait(); 56551c0b2f7Stbbdev } 56651c0b2f7Stbbdev for (int i = 0; i < overall_message_count; ++i) { 56751c0b2f7Stbbdev start_node.try_put(i); 56851c0b2f7Stbbdev } 56951c0b2f7Stbbdev g.wait_for_all(); 57051c0b2f7Stbbdev CHECK_MESSAGE( (async_body_exec_count == overall_message_count), 57151c0b2f7Stbbdev "AsyncBody processed wrong number of signals" ); 57251c0b2f7Stbbdev CHECK_MESSAGE( (async_activity_processed_msg_count == overall_message_count), 57351c0b2f7Stbbdev "AsyncActivity processed wrong number of signals" ); 57451c0b2f7Stbbdev CHECK_MESSAGE( (end_body_exec_count == overall_message_count), 57551c0b2f7Stbbdev "EndBody processed wrong number of signals"); 57651c0b2f7Stbbdev 57751c0b2f7Stbbdev INFO( "Main thread participated in " << main_tid_count << " end_body tasks\n"); 57851c0b2f7Stbbdev 57951c0b2f7Stbbdev INFO("async_body_exec_count == " << int(async_body_exec_count) << 58051c0b2f7Stbbdev " == async_activity_processed_msg_count == " << int(async_activity_processed_msg_count) << 58151c0b2f7Stbbdev " == end_body_exec_count == " << int(end_body_exec_count) << "\n" 58251c0b2f7Stbbdev ); 58351c0b2f7Stbbdev return 0; 58451c0b2f7Stbbdev } 58551c0b2f7Stbbdev 58651c0b2f7Stbbdev }; 58751c0b2f7Stbbdev 58851c0b2f7Stbbdev void test_for_spin_avoidance() { 58951c0b2f7Stbbdev const int nthreads = 4; 59051c0b2f7Stbbdev tbb::global_control gc(tbb::global_control::max_allowed_parallelism, nthreads); 59151c0b2f7Stbbdev spin_test<int, int>::run(nthreads); 59251c0b2f7Stbbdev } 59351c0b2f7Stbbdev 59451c0b2f7Stbbdev template< typename Input, typename Output > 59551c0b2f7Stbbdev int run_tests() { 59651c0b2f7Stbbdev basic_test<Input, Output>::run(); 59751c0b2f7Stbbdev basic_test<Input, Output>::run(NUMBER_OF_MSGS); 59851c0b2f7Stbbdev basic_test<place_wrapper<Input>, place_wrapper<Output> >::run(); 59951c0b2f7Stbbdev basic_test<place_wrapper<Input>, place_wrapper<Output> >::run(NUMBER_OF_MSGS); 60051c0b2f7Stbbdev return 0; 60151c0b2f7Stbbdev } 60251c0b2f7Stbbdev 60351c0b2f7Stbbdev #include "tbb/parallel_for.h" 60451c0b2f7Stbbdev template<typename Input, typename Output> 60551c0b2f7Stbbdev class equeueing_on_inner_level { 60651c0b2f7Stbbdev typedef Input input_type; 60751c0b2f7Stbbdev typedef Output output_type; 60851c0b2f7Stbbdev typedef async_activity<input_type, output_type> async_activity_type; 60951c0b2f7Stbbdev typedef tbb::flow::async_node<Input, Output> async_node_type; 61051c0b2f7Stbbdev typedef typename async_node_type::gateway_type gateway_type; 61151c0b2f7Stbbdev 61251c0b2f7Stbbdev class body_graph_with_async { 61351c0b2f7Stbbdev public: 61451c0b2f7Stbbdev body_graph_with_async( utils::SpinBarrier& barrier, async_activity_type& activity ) 61551c0b2f7Stbbdev : spin_barrier(&barrier), my_async_activity(&activity) {} 61651c0b2f7Stbbdev 61751c0b2f7Stbbdev void operator()(int) const { 61851c0b2f7Stbbdev tbb::flow::graph g; 61951c0b2f7Stbbdev tbb::flow::function_node< int, input_type > start_node( 62051c0b2f7Stbbdev g, tbb::flow::unlimited, [](int input) { return input_type(input); } 62151c0b2f7Stbbdev ); 62251c0b2f7Stbbdev async_node_type offload_node( 62351c0b2f7Stbbdev g, tbb::flow::unlimited, 62451c0b2f7Stbbdev [&](const input_type &input, gateway_type& gateway) { 62551c0b2f7Stbbdev gateway.reserve_wait(); 62651c0b2f7Stbbdev my_async_activity->submit( input, gateway ); 62751c0b2f7Stbbdev } 62851c0b2f7Stbbdev ); 62951c0b2f7Stbbdev tbb::flow::function_node< output_type > end_node( g, tbb::flow::unlimited, [](output_type){} ); 63051c0b2f7Stbbdev 63151c0b2f7Stbbdev tbb::flow::make_edge( start_node, offload_node ); 63251c0b2f7Stbbdev tbb::flow::make_edge( offload_node, end_node ); 63351c0b2f7Stbbdev 63451c0b2f7Stbbdev start_node.try_put(1); 63551c0b2f7Stbbdev 63651c0b2f7Stbbdev spin_barrier->wait(); 63751c0b2f7Stbbdev 63851c0b2f7Stbbdev my_async_activity->activate(); 63951c0b2f7Stbbdev 64051c0b2f7Stbbdev g.wait_for_all(); 64151c0b2f7Stbbdev } 64251c0b2f7Stbbdev 64351c0b2f7Stbbdev private: 64451c0b2f7Stbbdev utils::SpinBarrier* spin_barrier; 64551c0b2f7Stbbdev async_activity_type* my_async_activity; 64651c0b2f7Stbbdev }; 64751c0b2f7Stbbdev 64851c0b2f7Stbbdev public: 64951c0b2f7Stbbdev static int run () 65051c0b2f7Stbbdev { 65151c0b2f7Stbbdev const int nthreads = tbb::this_task_arena::max_concurrency(); 65251c0b2f7Stbbdev utils::SpinBarrier spin_barrier( nthreads ); 65351c0b2f7Stbbdev 65451c0b2f7Stbbdev async_activity_type my_async_activity( UNKNOWN_NUMBER_OF_ITEMS, true ); 65551c0b2f7Stbbdev 65651c0b2f7Stbbdev tbb::parallel_for( 0, nthreads, body_graph_with_async( spin_barrier, my_async_activity ) ); 65751c0b2f7Stbbdev return 0; 65851c0b2f7Stbbdev } 65951c0b2f7Stbbdev }; 66051c0b2f7Stbbdev 66151c0b2f7Stbbdev int run_test_equeueing_on_inner_level() { 66251c0b2f7Stbbdev equeueing_on_inner_level<int, int>::run(); 66351c0b2f7Stbbdev return 0; 66451c0b2f7Stbbdev } 66551c0b2f7Stbbdev 66651c0b2f7Stbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 66751c0b2f7Stbbdev #include <array> 66851c0b2f7Stbbdev 66951c0b2f7Stbbdev template<typename NodeType> 67051c0b2f7Stbbdev class AsyncActivity { 67151c0b2f7Stbbdev public: 67251c0b2f7Stbbdev using gateway_t = typename NodeType::gateway_type; 67351c0b2f7Stbbdev 67451c0b2f7Stbbdev struct work_type { 67551c0b2f7Stbbdev int input; 67651c0b2f7Stbbdev gateway_t* gateway; 67751c0b2f7Stbbdev }; 67851c0b2f7Stbbdev 679*b15aabb3Stbbdev AsyncActivity(size_t limit) : stop_limit(limit), c(0), thr([this]() { 68051c0b2f7Stbbdev while(!end_of_work()) { 68151c0b2f7Stbbdev work_type w; 68251c0b2f7Stbbdev while( my_q.try_pop(w) ) { 68351c0b2f7Stbbdev int res = do_work(w.input); 68451c0b2f7Stbbdev w.gateway->try_put(res); 68551c0b2f7Stbbdev w.gateway->release_wait(); 68651c0b2f7Stbbdev ++c; 68751c0b2f7Stbbdev } 68851c0b2f7Stbbdev } 689*b15aabb3Stbbdev }) {} 69051c0b2f7Stbbdev 69151c0b2f7Stbbdev void submit(int i, gateway_t* gateway) { 69251c0b2f7Stbbdev work_type w = {i, gateway}; 69351c0b2f7Stbbdev gateway->reserve_wait(); 69451c0b2f7Stbbdev my_q.push(w); 69551c0b2f7Stbbdev } 69651c0b2f7Stbbdev 69751c0b2f7Stbbdev void wait_for_all() { thr.join(); } 69851c0b2f7Stbbdev 69951c0b2f7Stbbdev private: 70051c0b2f7Stbbdev bool end_of_work() { return c >= stop_limit; } 70151c0b2f7Stbbdev 70251c0b2f7Stbbdev int do_work(int& i) { return i + i; } 70351c0b2f7Stbbdev 70451c0b2f7Stbbdev async_activity_queue<work_type> my_q; 70551c0b2f7Stbbdev size_t stop_limit; 70651c0b2f7Stbbdev size_t c; 707*b15aabb3Stbbdev std::thread thr; 70851c0b2f7Stbbdev }; 70951c0b2f7Stbbdev 71051c0b2f7Stbbdev void test_follows() { 71151c0b2f7Stbbdev using namespace tbb::flow; 71251c0b2f7Stbbdev 71351c0b2f7Stbbdev using input_t = int; 71451c0b2f7Stbbdev using output_t = int; 71551c0b2f7Stbbdev using node_t = async_node<input_t, output_t>; 71651c0b2f7Stbbdev 71751c0b2f7Stbbdev graph g; 71851c0b2f7Stbbdev 71951c0b2f7Stbbdev AsyncActivity<node_t> async_activity(3); 72051c0b2f7Stbbdev 72151c0b2f7Stbbdev std::array<broadcast_node<input_t>, 3> preds = { 72251c0b2f7Stbbdev { 72351c0b2f7Stbbdev broadcast_node<input_t>(g), 72451c0b2f7Stbbdev broadcast_node<input_t>(g), 72551c0b2f7Stbbdev broadcast_node<input_t>(g) 72651c0b2f7Stbbdev } 72751c0b2f7Stbbdev }; 72851c0b2f7Stbbdev 72951c0b2f7Stbbdev node_t node(follows(preds[0], preds[1], preds[2]), unlimited, [&](int input, node_t::gateway_type& gtw) { 73051c0b2f7Stbbdev async_activity.submit(input, >w); 73151c0b2f7Stbbdev }, no_priority); 73251c0b2f7Stbbdev 73351c0b2f7Stbbdev buffer_node<output_t> buf(g); 73451c0b2f7Stbbdev make_edge(node, buf); 73551c0b2f7Stbbdev 73651c0b2f7Stbbdev for(auto& pred: preds) { 73751c0b2f7Stbbdev pred.try_put(1); 73851c0b2f7Stbbdev } 73951c0b2f7Stbbdev 74051c0b2f7Stbbdev g.wait_for_all(); 74151c0b2f7Stbbdev async_activity.wait_for_all(); 74251c0b2f7Stbbdev 74351c0b2f7Stbbdev output_t storage; 74451c0b2f7Stbbdev CHECK_MESSAGE((buf.try_get(storage) && buf.try_get(storage) && buf.try_get(storage) && !buf.try_get(storage)), 74551c0b2f7Stbbdev "Not exact edge quantity was made"); 74651c0b2f7Stbbdev } 74751c0b2f7Stbbdev 74851c0b2f7Stbbdev void test_precedes() { 74951c0b2f7Stbbdev using namespace tbb::flow; 75051c0b2f7Stbbdev 75151c0b2f7Stbbdev using input_t = int; 75251c0b2f7Stbbdev using output_t = int; 75351c0b2f7Stbbdev using node_t = async_node<input_t, output_t>; 75451c0b2f7Stbbdev 75551c0b2f7Stbbdev graph g; 75651c0b2f7Stbbdev 75751c0b2f7Stbbdev AsyncActivity<node_t> async_activity(1); 75851c0b2f7Stbbdev 75951c0b2f7Stbbdev std::array<buffer_node<input_t>, 1> successors = { {buffer_node<input_t>(g)} }; 76051c0b2f7Stbbdev 76151c0b2f7Stbbdev broadcast_node<input_t> start(g); 76251c0b2f7Stbbdev 76351c0b2f7Stbbdev node_t node(precedes(successors[0]), unlimited, [&](int input, node_t::gateway_type& gtw) { 76451c0b2f7Stbbdev async_activity.submit(input, >w); 76551c0b2f7Stbbdev }, no_priority); 76651c0b2f7Stbbdev 76751c0b2f7Stbbdev make_edge(start, node); 76851c0b2f7Stbbdev 76951c0b2f7Stbbdev start.try_put(1); 77051c0b2f7Stbbdev 77151c0b2f7Stbbdev g.wait_for_all(); 77251c0b2f7Stbbdev async_activity.wait_for_all(); 77351c0b2f7Stbbdev 77451c0b2f7Stbbdev for(auto& successor : successors) { 77551c0b2f7Stbbdev output_t storage; 77651c0b2f7Stbbdev CHECK_MESSAGE((successor.try_get(storage) && !successor.try_get(storage)), 77751c0b2f7Stbbdev "Not exact edge quantity was made"); 77851c0b2f7Stbbdev } 77951c0b2f7Stbbdev } 78051c0b2f7Stbbdev 78151c0b2f7Stbbdev void test_follows_and_precedes_api() { 78251c0b2f7Stbbdev test_follows(); 78351c0b2f7Stbbdev test_precedes(); 78451c0b2f7Stbbdev } 78551c0b2f7Stbbdev #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 78651c0b2f7Stbbdev 78751c0b2f7Stbbdev //! Test async bodies processing 78851c0b2f7Stbbdev //! \brief \ref requirement \ref error_guessing 78951c0b2f7Stbbdev TEST_CASE("Basic tests"){ 79051c0b2f7Stbbdev tbb::task_arena arena(utils::MaxThread); 79151c0b2f7Stbbdev arena.execute( 79251c0b2f7Stbbdev [&]() { 79351c0b2f7Stbbdev run_tests<int, int>(); 79451c0b2f7Stbbdev run_tests<minimal_type, minimal_type>(); 79551c0b2f7Stbbdev run_tests<int, minimal_type>(); 79651c0b2f7Stbbdev } 79751c0b2f7Stbbdev ); 79851c0b2f7Stbbdev } 79951c0b2f7Stbbdev 80051c0b2f7Stbbdev //! NativeParallelFor test with various concurrency settings 80151c0b2f7Stbbdev //! \brief \ref requirement \ref error_guessing 80251c0b2f7Stbbdev TEST_CASE("Lightweight tests"){ 80351c0b2f7Stbbdev lightweight_testing::test<tbb::flow::async_node>(NUMBER_OF_MSGS); 80451c0b2f7Stbbdev } 80551c0b2f7Stbbdev 80651c0b2f7Stbbdev //! Test reset and cancellation 80751c0b2f7Stbbdev //! \brief \ref error_guessing 80851c0b2f7Stbbdev TEST_CASE("Reset test"){ 80951c0b2f7Stbbdev test_reset(); 81051c0b2f7Stbbdev } 81151c0b2f7Stbbdev 81251c0b2f7Stbbdev //! Test 81351c0b2f7Stbbdev //! \brief \ref requirement \ref error_guessing 81451c0b2f7Stbbdev TEST_CASE("Copy constructor test"){ 81551c0b2f7Stbbdev test_copy_ctor(); 81651c0b2f7Stbbdev } 81751c0b2f7Stbbdev 81851c0b2f7Stbbdev //! Test if main thread spins 81951c0b2f7Stbbdev //! \brief \ref stress 82051c0b2f7Stbbdev TEST_CASE("Spin avoidance test"){ 82151c0b2f7Stbbdev test_for_spin_avoidance(); 82251c0b2f7Stbbdev } 82351c0b2f7Stbbdev 82451c0b2f7Stbbdev //! Test nested enqueing 82551c0b2f7Stbbdev //! \brief \ref error_guessing 82651c0b2f7Stbbdev TEST_CASE("Inner enqueing test"){ 82751c0b2f7Stbbdev run_test_equeueing_on_inner_level(); 82851c0b2f7Stbbdev } 82951c0b2f7Stbbdev 83051c0b2f7Stbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 83151c0b2f7Stbbdev //! Test deprecated follows and preceedes API 83251c0b2f7Stbbdev //! \brief \ref error_guessing 83351c0b2f7Stbbdev TEST_CASE("Test follows and preceedes API"){ 83451c0b2f7Stbbdev test_follows_and_precedes_api(); 83551c0b2f7Stbbdev } 83651c0b2f7Stbbdev #endif 837