1*51c0b2f7Stbbdev /* 2*51c0b2f7Stbbdev Copyright (c) 2005-2020 Intel Corporation 3*51c0b2f7Stbbdev 4*51c0b2f7Stbbdev Licensed under the Apache License, Version 2.0 (the "License"); 5*51c0b2f7Stbbdev you may not use this file except in compliance with the License. 6*51c0b2f7Stbbdev You may obtain a copy of the License at 7*51c0b2f7Stbbdev 8*51c0b2f7Stbbdev http://www.apache.org/licenses/LICENSE-2.0 9*51c0b2f7Stbbdev 10*51c0b2f7Stbbdev Unless required by applicable law or agreed to in writing, software 11*51c0b2f7Stbbdev distributed under the License is distributed on an "AS IS" BASIS, 12*51c0b2f7Stbbdev WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13*51c0b2f7Stbbdev See the License for the specific language governing permissions and 14*51c0b2f7Stbbdev limitations under the License. 15*51c0b2f7Stbbdev */ 16*51c0b2f7Stbbdev 17*51c0b2f7Stbbdev 18*51c0b2f7Stbbdev #include "common/config.h" 19*51c0b2f7Stbbdev 20*51c0b2f7Stbbdev #define __TBB_EXTRA_DEBUG 1 21*51c0b2f7Stbbdev #include "tbb/flow_graph.h" 22*51c0b2f7Stbbdev 23*51c0b2f7Stbbdev #include "tbb/task.h" 24*51c0b2f7Stbbdev #include "tbb/global_control.h" 25*51c0b2f7Stbbdev 26*51c0b2f7Stbbdev #include "common/test.h" 27*51c0b2f7Stbbdev #include "common/utils.h" 28*51c0b2f7Stbbdev #include "common/utils_assert.h" 29*51c0b2f7Stbbdev #include "common/graph_utils.h" 30*51c0b2f7Stbbdev #include "common/spin_barrier.h" 31*51c0b2f7Stbbdev #include "common/test_follows_and_precedes_api.h" 32*51c0b2f7Stbbdev 33*51c0b2f7Stbbdev #include <string> 34*51c0b2f7Stbbdev #include <thread> 35*51c0b2f7Stbbdev #include <mutex> 36*51c0b2f7Stbbdev 37*51c0b2f7Stbbdev 38*51c0b2f7Stbbdev //! \file test_async_node.cpp 39*51c0b2f7Stbbdev //! \brief Test for [flow_graph.async_node] specification 40*51c0b2f7Stbbdev 41*51c0b2f7Stbbdev 42*51c0b2f7Stbbdev class minimal_type { 43*51c0b2f7Stbbdev template<typename T> 44*51c0b2f7Stbbdev friend struct place_wrapper; 45*51c0b2f7Stbbdev 46*51c0b2f7Stbbdev int value; 47*51c0b2f7Stbbdev 48*51c0b2f7Stbbdev public: 49*51c0b2f7Stbbdev minimal_type() : value(-1) {} 50*51c0b2f7Stbbdev minimal_type(int v) : value(v) {} 51*51c0b2f7Stbbdev minimal_type(const minimal_type &m) : value(m.value) { } 52*51c0b2f7Stbbdev minimal_type &operator=(const minimal_type &m) { value = m.value; return *this; } 53*51c0b2f7Stbbdev }; 54*51c0b2f7Stbbdev 55*51c0b2f7Stbbdev template <typename T> 56*51c0b2f7Stbbdev struct place_wrapper { 57*51c0b2f7Stbbdev typedef T wrapped_type; 58*51c0b2f7Stbbdev T value; 59*51c0b2f7Stbbdev std::thread::id thread_id; 60*51c0b2f7Stbbdev 61*51c0b2f7Stbbdev place_wrapper( int v = 0 ) : value(v), thread_id(std::this_thread::get_id()) {} 62*51c0b2f7Stbbdev 63*51c0b2f7Stbbdev template <typename Q> 64*51c0b2f7Stbbdev place_wrapper(const place_wrapper<Q>& v) 65*51c0b2f7Stbbdev : value(v.value), thread_id(v.thread_id) 66*51c0b2f7Stbbdev {} 67*51c0b2f7Stbbdev 68*51c0b2f7Stbbdev template <typename Q> 69*51c0b2f7Stbbdev place_wrapper<Q>& operator=(const place_wrapper<Q>& v) { 70*51c0b2f7Stbbdev if (this != &v) { 71*51c0b2f7Stbbdev value = v.value; 72*51c0b2f7Stbbdev thread_id = v.thread_id; 73*51c0b2f7Stbbdev } 74*51c0b2f7Stbbdev return *this; 75*51c0b2f7Stbbdev } 76*51c0b2f7Stbbdev 77*51c0b2f7Stbbdev }; 78*51c0b2f7Stbbdev 79*51c0b2f7Stbbdev template<typename T1, typename T2> 80*51c0b2f7Stbbdev struct wrapper_helper { 81*51c0b2f7Stbbdev static void check(const T1 &, const T2 &) { } 82*51c0b2f7Stbbdev static void copy_value(const T1 &in, T2 &out) { out = in; } 83*51c0b2f7Stbbdev }; 84*51c0b2f7Stbbdev 85*51c0b2f7Stbbdev template<typename T1, typename T2> 86*51c0b2f7Stbbdev struct wrapper_helper< place_wrapper<T1>, place_wrapper<T2> > { 87*51c0b2f7Stbbdev static void check(const place_wrapper<T1> &a, const place_wrapper<T2> &b) { 88*51c0b2f7Stbbdev CHECK_MESSAGE( ( (a.thread_id != b.thread_id)), "same thread used to execute adjacent nodes"); 89*51c0b2f7Stbbdev return; 90*51c0b2f7Stbbdev } 91*51c0b2f7Stbbdev static void copy_value(const place_wrapper<T1> &in, place_wrapper<T2> &out) { 92*51c0b2f7Stbbdev out.value = in.value; 93*51c0b2f7Stbbdev } 94*51c0b2f7Stbbdev }; 95*51c0b2f7Stbbdev 96*51c0b2f7Stbbdev const int NUMBER_OF_MSGS = 10; 97*51c0b2f7Stbbdev const int UNKNOWN_NUMBER_OF_ITEMS = -1; 98*51c0b2f7Stbbdev std::atomic<int> async_body_exec_count; 99*51c0b2f7Stbbdev std::atomic<int> async_activity_processed_msg_count; 100*51c0b2f7Stbbdev std::atomic<int> end_body_exec_count; 101*51c0b2f7Stbbdev 102*51c0b2f7Stbbdev // queueing required in test_reset for testing of cancellation 103*51c0b2f7Stbbdev typedef tbb::flow::async_node< int, int, tbb::flow::queueing > counting_async_node_type; 104*51c0b2f7Stbbdev typedef counting_async_node_type::gateway_type counting_gateway_type; 105*51c0b2f7Stbbdev 106*51c0b2f7Stbbdev struct counting_async_unlimited_body { 107*51c0b2f7Stbbdev 108*51c0b2f7Stbbdev counting_async_unlimited_body(tbb::task_group_context& graph_tgc) : my_tgc( graph_tgc ) {} 109*51c0b2f7Stbbdev 110*51c0b2f7Stbbdev void operator()( const int &input, counting_gateway_type& gateway) { 111*51c0b2f7Stbbdev // TODO revamp: reconsider logging for the tests. It is known that frequent calls to 112*51c0b2f7Stbbdev // doctest's INFO cause issues. 113*51c0b2f7Stbbdev 114*51c0b2f7Stbbdev // INFO( "Body execution with input == " << input << "\n"); 115*51c0b2f7Stbbdev ++async_body_exec_count; 116*51c0b2f7Stbbdev if ( input == -1 ) { 117*51c0b2f7Stbbdev bool result = my_tgc.cancel_group_execution(); 118*51c0b2f7Stbbdev // INFO( "Canceling graph execution\n" ); 119*51c0b2f7Stbbdev CHECK_MESSAGE( ( result == true), "attempted to cancel graph twice" ); 120*51c0b2f7Stbbdev utils::Sleep(50); 121*51c0b2f7Stbbdev } 122*51c0b2f7Stbbdev gateway.try_put(input); 123*51c0b2f7Stbbdev } 124*51c0b2f7Stbbdev private: 125*51c0b2f7Stbbdev tbb::task_group_context& my_tgc; 126*51c0b2f7Stbbdev }; 127*51c0b2f7Stbbdev 128*51c0b2f7Stbbdev struct counting_async_serial_body : counting_async_unlimited_body { 129*51c0b2f7Stbbdev typedef counting_async_unlimited_body base_type; 130*51c0b2f7Stbbdev int my_async_body_exec_count; 131*51c0b2f7Stbbdev 132*51c0b2f7Stbbdev counting_async_serial_body(tbb::task_group_context& tgc) 133*51c0b2f7Stbbdev : base_type(tgc), my_async_body_exec_count( 0 ) { } 134*51c0b2f7Stbbdev 135*51c0b2f7Stbbdev void operator()( const int &input, counting_gateway_type& gateway ) { 136*51c0b2f7Stbbdev ++my_async_body_exec_count; 137*51c0b2f7Stbbdev base_type::operator()( input, gateway ); 138*51c0b2f7Stbbdev } 139*51c0b2f7Stbbdev }; 140*51c0b2f7Stbbdev 141*51c0b2f7Stbbdev void test_reset() { 142*51c0b2f7Stbbdev const int N = NUMBER_OF_MSGS; 143*51c0b2f7Stbbdev async_body_exec_count = 0; 144*51c0b2f7Stbbdev 145*51c0b2f7Stbbdev tbb::task_group_context graph_ctx; 146*51c0b2f7Stbbdev tbb::flow::graph g(graph_ctx); 147*51c0b2f7Stbbdev counting_async_node_type a(g, tbb::flow::serial, counting_async_serial_body(graph_ctx) ); 148*51c0b2f7Stbbdev 149*51c0b2f7Stbbdev const int R = 3; 150*51c0b2f7Stbbdev std::vector< std::shared_ptr<harness_counting_receiver<int>> > r; 151*51c0b2f7Stbbdev for (size_t i = 0; i < R; ++i) { 152*51c0b2f7Stbbdev r.push_back( std::make_shared<harness_counting_receiver<int>>(g) ); 153*51c0b2f7Stbbdev } 154*51c0b2f7Stbbdev 155*51c0b2f7Stbbdev for (int i = 0; i < R; ++i) { 156*51c0b2f7Stbbdev tbb::flow::make_edge(a, *r[i]); 157*51c0b2f7Stbbdev } 158*51c0b2f7Stbbdev 159*51c0b2f7Stbbdev INFO( "One body execution\n" ); 160*51c0b2f7Stbbdev a.try_put(-1); 161*51c0b2f7Stbbdev for (int i = 0; i < N; ++i) { 162*51c0b2f7Stbbdev a.try_put(i); 163*51c0b2f7Stbbdev } 164*51c0b2f7Stbbdev g.wait_for_all(); 165*51c0b2f7Stbbdev // should be canceled with only 1 item reaching the async_body and the counting receivers 166*51c0b2f7Stbbdev // and N items left in the node's queue 167*51c0b2f7Stbbdev CHECK_MESSAGE( ( g.is_cancelled() == true), "task group not canceled" ); 168*51c0b2f7Stbbdev 169*51c0b2f7Stbbdev counting_async_serial_body b1 = tbb::flow::copy_body<counting_async_serial_body>(a); 170*51c0b2f7Stbbdev CHECK_MESSAGE( ( int(async_body_exec_count) == int(b1.my_async_body_exec_count)), "body and global body counts are different" ); 171*51c0b2f7Stbbdev CHECK_MESSAGE( ( int(async_body_exec_count) == 1), "global body execution count not 1" ); 172*51c0b2f7Stbbdev for (int i = 0; i < R; ++i) { 173*51c0b2f7Stbbdev CHECK_MESSAGE( ( int(r[i]->my_count) == 1), "counting receiver count not 1" ); 174*51c0b2f7Stbbdev } 175*51c0b2f7Stbbdev 176*51c0b2f7Stbbdev // should clear the async_node queue, but retain its local count at 1 and keep all edges 177*51c0b2f7Stbbdev g.reset(tbb::flow::rf_reset_protocol); 178*51c0b2f7Stbbdev 179*51c0b2f7Stbbdev INFO( "N body executions\n" ); 180*51c0b2f7Stbbdev for (int i = 0; i < N; ++i) { 181*51c0b2f7Stbbdev a.try_put(i); 182*51c0b2f7Stbbdev } 183*51c0b2f7Stbbdev g.wait_for_all(); 184*51c0b2f7Stbbdev CHECK_MESSAGE( ( g.is_cancelled() == false), "task group not canceled" ); 185*51c0b2f7Stbbdev 186*51c0b2f7Stbbdev // a total of N+1 items should have passed through the node body 187*51c0b2f7Stbbdev // the local body count should also be N+1 188*51c0b2f7Stbbdev // and the counting receivers should all have a count of N+1 189*51c0b2f7Stbbdev counting_async_serial_body b2 = tbb::flow::copy_body<counting_async_serial_body>(a); 190*51c0b2f7Stbbdev CHECK_MESSAGE( int(async_body_exec_count) == int(b2.my_async_body_exec_count), 191*51c0b2f7Stbbdev "local and global body execution counts are different" ); 192*51c0b2f7Stbbdev INFO( "async_body_exec_count==" << int(async_body_exec_count) << "\n" ); 193*51c0b2f7Stbbdev CHECK_MESSAGE( ( int(async_body_exec_count) == N+1), "global body execution count not N+1" ); 194*51c0b2f7Stbbdev for (int i = 0; i < R; ++i) { 195*51c0b2f7Stbbdev CHECK_MESSAGE( ( int(r[i]->my_count) == N+1), "counting receiver has not received N+1 items" ); 196*51c0b2f7Stbbdev } 197*51c0b2f7Stbbdev 198*51c0b2f7Stbbdev INFO( "N body executions with new bodies\n" ); 199*51c0b2f7Stbbdev // should clear the async_node queue and reset its local count to 0, but keep all edges 200*51c0b2f7Stbbdev g.reset(tbb::flow::rf_reset_bodies); 201*51c0b2f7Stbbdev for (int i = 0; i < N; ++i) { 202*51c0b2f7Stbbdev a.try_put(i); 203*51c0b2f7Stbbdev } 204*51c0b2f7Stbbdev g.wait_for_all(); 205*51c0b2f7Stbbdev CHECK_MESSAGE( ( g.is_cancelled() == false), "task group not canceled" ); 206*51c0b2f7Stbbdev 207*51c0b2f7Stbbdev // a total of 2N+1 items should have passed through the node body 208*51c0b2f7Stbbdev // the local body count should be N 209*51c0b2f7Stbbdev // and the counting receivers should all have a count of 2N+1 210*51c0b2f7Stbbdev counting_async_serial_body b3 = tbb::flow::copy_body<counting_async_serial_body>(a); 211*51c0b2f7Stbbdev CHECK_MESSAGE( ( int(async_body_exec_count) == 2*N+1), "global body execution count not 2N+1" ); 212*51c0b2f7Stbbdev CHECK_MESSAGE( ( int(b3.my_async_body_exec_count) == N), "local body execution count not N" ); 213*51c0b2f7Stbbdev for (int i = 0; i < R; ++i) { 214*51c0b2f7Stbbdev CHECK_MESSAGE( ( int(r[i]->my_count) == 2*N+1), "counting receiver has not received 2N+1 items" ); 215*51c0b2f7Stbbdev } 216*51c0b2f7Stbbdev 217*51c0b2f7Stbbdev // should clear the async_node queue and keep its local count at N and remove all edges 218*51c0b2f7Stbbdev INFO( "N body executions with no edges\n" ); 219*51c0b2f7Stbbdev g.reset(tbb::flow::rf_clear_edges); 220*51c0b2f7Stbbdev for (int i = 0; i < N; ++i) { 221*51c0b2f7Stbbdev a.try_put(i); 222*51c0b2f7Stbbdev } 223*51c0b2f7Stbbdev g.wait_for_all(); 224*51c0b2f7Stbbdev CHECK_MESSAGE( ( g.is_cancelled() == false), "task group not canceled" ); 225*51c0b2f7Stbbdev 226*51c0b2f7Stbbdev // a total of 3N+1 items should have passed through the node body 227*51c0b2f7Stbbdev // the local body count should now be 2*N 228*51c0b2f7Stbbdev // and the counting receivers should remain at a count of 2N+1 229*51c0b2f7Stbbdev counting_async_serial_body b4 = tbb::flow::copy_body<counting_async_serial_body>(a); 230*51c0b2f7Stbbdev CHECK_MESSAGE( ( int(async_body_exec_count) == 3*N+1), "global body execution count not 3N+1" ); 231*51c0b2f7Stbbdev CHECK_MESSAGE( ( int(b4.my_async_body_exec_count) == 2*N), "local body execution count not 2N" ); 232*51c0b2f7Stbbdev for (int i = 0; i < R; ++i) { 233*51c0b2f7Stbbdev CHECK_MESSAGE( ( int(r[i]->my_count) == 2*N+1), "counting receiver has not received 2N+1 items" ); 234*51c0b2f7Stbbdev } 235*51c0b2f7Stbbdev 236*51c0b2f7Stbbdev // put back 1 edge to receiver 0 237*51c0b2f7Stbbdev INFO( "N body executions with 1 edge\n" ); 238*51c0b2f7Stbbdev tbb::flow::make_edge(a, *r[0]); 239*51c0b2f7Stbbdev for (int i = 0; i < N; ++i) { 240*51c0b2f7Stbbdev a.try_put(i); 241*51c0b2f7Stbbdev } 242*51c0b2f7Stbbdev g.wait_for_all(); 243*51c0b2f7Stbbdev CHECK_MESSAGE( ( g.is_cancelled() == false), "task group not canceled" ); 244*51c0b2f7Stbbdev 245*51c0b2f7Stbbdev // a total of 4N+1 items should have passed through the node body 246*51c0b2f7Stbbdev // the local body count should now be 3*N 247*51c0b2f7Stbbdev // and all of the counting receivers should remain at a count of 2N+1, except r[0] which should be 3N+1 248*51c0b2f7Stbbdev counting_async_serial_body b5 = tbb::flow::copy_body<counting_async_serial_body>(a); 249*51c0b2f7Stbbdev CHECK_MESSAGE( ( int(async_body_exec_count) == 4*N+1), "global body execution count not 4N+1" ); 250*51c0b2f7Stbbdev CHECK_MESSAGE( ( int(b5.my_async_body_exec_count) == 3*N), "local body execution count not 3N" ); 251*51c0b2f7Stbbdev CHECK_MESSAGE( ( int(r[0]->my_count) == 3*N+1), "counting receiver has not received 3N+1 items" ); 252*51c0b2f7Stbbdev for (int i = 1; i < R; ++i) { 253*51c0b2f7Stbbdev CHECK_MESSAGE( ( int(r[i]->my_count) == 2*N+1), "counting receiver has not received 2N+1 items" ); 254*51c0b2f7Stbbdev } 255*51c0b2f7Stbbdev 256*51c0b2f7Stbbdev // should clear the async_node queue and keep its local count at N and remove all edges 257*51c0b2f7Stbbdev INFO( "N body executions with no edges and new body\n" ); 258*51c0b2f7Stbbdev g.reset(static_cast<tbb::flow::reset_flags>(tbb::flow::rf_reset_bodies|tbb::flow::rf_clear_edges)); 259*51c0b2f7Stbbdev for (int i = 0; i < N; ++i) { 260*51c0b2f7Stbbdev a.try_put(i); 261*51c0b2f7Stbbdev } 262*51c0b2f7Stbbdev g.wait_for_all(); 263*51c0b2f7Stbbdev CHECK_MESSAGE( ( g.is_cancelled() == false), "task group not canceled" ); 264*51c0b2f7Stbbdev 265*51c0b2f7Stbbdev // a total of 4N+1 items should have passed through the node body 266*51c0b2f7Stbbdev // the local body count should now be 3*N 267*51c0b2f7Stbbdev // and all of the counting receivers should remain at a count of 2N+1, except r[0] which should be 3N+1 268*51c0b2f7Stbbdev counting_async_serial_body b6 = tbb::flow::copy_body<counting_async_serial_body>(a); 269*51c0b2f7Stbbdev CHECK_MESSAGE( ( int(async_body_exec_count) == 5*N+1), "global body execution count not 5N+1" ); 270*51c0b2f7Stbbdev CHECK_MESSAGE( ( int(b6.my_async_body_exec_count) == N), "local body execution count not N" ); 271*51c0b2f7Stbbdev CHECK_MESSAGE( ( int(r[0]->my_count) == 3*N+1), "counting receiver has not received 3N+1 items" ); 272*51c0b2f7Stbbdev for (int i = 1; i < R; ++i) { 273*51c0b2f7Stbbdev CHECK_MESSAGE( ( int(r[i]->my_count) == 2*N+1), "counting receiver has not received 2N+1 items" ); 274*51c0b2f7Stbbdev } 275*51c0b2f7Stbbdev } 276*51c0b2f7Stbbdev 277*51c0b2f7Stbbdev 278*51c0b2f7Stbbdev #include <mutex> 279*51c0b2f7Stbbdev 280*51c0b2f7Stbbdev template <typename T> 281*51c0b2f7Stbbdev class async_activity_queue { 282*51c0b2f7Stbbdev public: 283*51c0b2f7Stbbdev void push( const T& item ) { 284*51c0b2f7Stbbdev std::lock_guard<mutex_t> lock( m_mutex ); 285*51c0b2f7Stbbdev m_queue.push( item ); 286*51c0b2f7Stbbdev } 287*51c0b2f7Stbbdev 288*51c0b2f7Stbbdev bool try_pop( T& item ) { 289*51c0b2f7Stbbdev std::lock_guard<mutex_t> lock( m_mutex ); 290*51c0b2f7Stbbdev if( m_queue.empty() ) 291*51c0b2f7Stbbdev return false; 292*51c0b2f7Stbbdev item = m_queue.front(); 293*51c0b2f7Stbbdev m_queue.pop(); 294*51c0b2f7Stbbdev return true; 295*51c0b2f7Stbbdev } 296*51c0b2f7Stbbdev 297*51c0b2f7Stbbdev bool empty() { 298*51c0b2f7Stbbdev std::lock_guard<mutex_t> lock( m_mutex ); 299*51c0b2f7Stbbdev return m_queue.empty(); 300*51c0b2f7Stbbdev } 301*51c0b2f7Stbbdev 302*51c0b2f7Stbbdev private: 303*51c0b2f7Stbbdev typedef std::mutex mutex_t; 304*51c0b2f7Stbbdev mutex_t m_mutex; 305*51c0b2f7Stbbdev std::queue<T> m_queue; 306*51c0b2f7Stbbdev }; 307*51c0b2f7Stbbdev 308*51c0b2f7Stbbdev template< typename Input, typename Output > 309*51c0b2f7Stbbdev class async_activity : utils::NoAssign { 310*51c0b2f7Stbbdev public: 311*51c0b2f7Stbbdev typedef Input input_type; 312*51c0b2f7Stbbdev typedef Output output_type; 313*51c0b2f7Stbbdev typedef tbb::flow::async_node< input_type, output_type > async_node_type; 314*51c0b2f7Stbbdev typedef typename async_node_type::gateway_type gateway_type; 315*51c0b2f7Stbbdev 316*51c0b2f7Stbbdev struct work_type { 317*51c0b2f7Stbbdev input_type input; 318*51c0b2f7Stbbdev gateway_type* gateway; 319*51c0b2f7Stbbdev }; 320*51c0b2f7Stbbdev 321*51c0b2f7Stbbdev class ServiceThreadBody { 322*51c0b2f7Stbbdev public: 323*51c0b2f7Stbbdev ServiceThreadBody( async_activity* activity ) : my_activity( activity ) {} 324*51c0b2f7Stbbdev void operator()() { my_activity->process(); } 325*51c0b2f7Stbbdev private: 326*51c0b2f7Stbbdev async_activity* my_activity; 327*51c0b2f7Stbbdev }; 328*51c0b2f7Stbbdev 329*51c0b2f7Stbbdev async_activity(int expected_items, bool deferred = false, int sleep_time = 50) 330*51c0b2f7Stbbdev : my_expected_items(expected_items), my_sleep_time(sleep_time) 331*51c0b2f7Stbbdev { 332*51c0b2f7Stbbdev is_active = !deferred; 333*51c0b2f7Stbbdev my_quit = false; 334*51c0b2f7Stbbdev std::thread( ServiceThreadBody( this ) ).swap( my_service_thread ); 335*51c0b2f7Stbbdev } 336*51c0b2f7Stbbdev 337*51c0b2f7Stbbdev private: 338*51c0b2f7Stbbdev 339*51c0b2f7Stbbdev async_activity( const async_activity& ) 340*51c0b2f7Stbbdev : my_expected_items(UNKNOWN_NUMBER_OF_ITEMS), my_sleep_time(0) 341*51c0b2f7Stbbdev { 342*51c0b2f7Stbbdev is_active = true; 343*51c0b2f7Stbbdev } 344*51c0b2f7Stbbdev 345*51c0b2f7Stbbdev public: 346*51c0b2f7Stbbdev ~async_activity() { 347*51c0b2f7Stbbdev stop(); 348*51c0b2f7Stbbdev my_service_thread.join(); 349*51c0b2f7Stbbdev } 350*51c0b2f7Stbbdev 351*51c0b2f7Stbbdev void submit( const input_type &input, gateway_type& gateway ) { 352*51c0b2f7Stbbdev work_type work = {input, &gateway}; 353*51c0b2f7Stbbdev my_work_queue.push( work ); 354*51c0b2f7Stbbdev } 355*51c0b2f7Stbbdev 356*51c0b2f7Stbbdev void process() { 357*51c0b2f7Stbbdev do { 358*51c0b2f7Stbbdev work_type work; 359*51c0b2f7Stbbdev if( is_active && my_work_queue.try_pop( work ) ) { 360*51c0b2f7Stbbdev utils::Sleep(my_sleep_time); 361*51c0b2f7Stbbdev ++async_activity_processed_msg_count; 362*51c0b2f7Stbbdev output_type output; 363*51c0b2f7Stbbdev wrapper_helper<output_type, output_type>::copy_value(work.input, output); 364*51c0b2f7Stbbdev wrapper_helper<output_type, output_type>::check(work.input, output); 365*51c0b2f7Stbbdev work.gateway->try_put(output); 366*51c0b2f7Stbbdev if ( my_expected_items == UNKNOWN_NUMBER_OF_ITEMS || 367*51c0b2f7Stbbdev int(async_activity_processed_msg_count) == my_expected_items ) { 368*51c0b2f7Stbbdev work.gateway->release_wait(); 369*51c0b2f7Stbbdev } 370*51c0b2f7Stbbdev } 371*51c0b2f7Stbbdev } while( my_quit == false || !my_work_queue.empty()); 372*51c0b2f7Stbbdev } 373*51c0b2f7Stbbdev 374*51c0b2f7Stbbdev void stop() { 375*51c0b2f7Stbbdev my_quit = true; 376*51c0b2f7Stbbdev } 377*51c0b2f7Stbbdev 378*51c0b2f7Stbbdev void activate() { 379*51c0b2f7Stbbdev is_active = true; 380*51c0b2f7Stbbdev } 381*51c0b2f7Stbbdev 382*51c0b2f7Stbbdev bool should_reserve_each_time() { 383*51c0b2f7Stbbdev if ( my_expected_items == UNKNOWN_NUMBER_OF_ITEMS ) 384*51c0b2f7Stbbdev return true; 385*51c0b2f7Stbbdev else 386*51c0b2f7Stbbdev return false; 387*51c0b2f7Stbbdev } 388*51c0b2f7Stbbdev 389*51c0b2f7Stbbdev private: 390*51c0b2f7Stbbdev 391*51c0b2f7Stbbdev const int my_expected_items; 392*51c0b2f7Stbbdev const int my_sleep_time; 393*51c0b2f7Stbbdev std::atomic< bool > is_active; 394*51c0b2f7Stbbdev 395*51c0b2f7Stbbdev async_activity_queue<work_type> my_work_queue; 396*51c0b2f7Stbbdev 397*51c0b2f7Stbbdev std::atomic< bool > my_quit; 398*51c0b2f7Stbbdev 399*51c0b2f7Stbbdev std::thread my_service_thread; 400*51c0b2f7Stbbdev }; 401*51c0b2f7Stbbdev 402*51c0b2f7Stbbdev template<typename Input, typename Output> 403*51c0b2f7Stbbdev struct basic_test { 404*51c0b2f7Stbbdev typedef Input input_type; 405*51c0b2f7Stbbdev typedef Output output_type; 406*51c0b2f7Stbbdev typedef tbb::flow::async_node< input_type, output_type > async_node_type; 407*51c0b2f7Stbbdev typedef typename async_node_type::gateway_type gateway_type; 408*51c0b2f7Stbbdev 409*51c0b2f7Stbbdev basic_test() {} 410*51c0b2f7Stbbdev 411*51c0b2f7Stbbdev static int run(int async_expected_items = UNKNOWN_NUMBER_OF_ITEMS) { 412*51c0b2f7Stbbdev async_activity<input_type, output_type> my_async_activity(async_expected_items); 413*51c0b2f7Stbbdev 414*51c0b2f7Stbbdev tbb::flow::graph g; 415*51c0b2f7Stbbdev 416*51c0b2f7Stbbdev tbb::flow::function_node< int, input_type > start_node( 417*51c0b2f7Stbbdev g, tbb::flow::unlimited, [](int input) { return input_type(input); } 418*51c0b2f7Stbbdev ); 419*51c0b2f7Stbbdev async_node_type offload_node( 420*51c0b2f7Stbbdev g, tbb::flow::unlimited, 421*51c0b2f7Stbbdev [&] (const input_type &input, gateway_type& gateway) { 422*51c0b2f7Stbbdev ++async_body_exec_count; 423*51c0b2f7Stbbdev if(my_async_activity.should_reserve_each_time()) 424*51c0b2f7Stbbdev gateway.reserve_wait(); 425*51c0b2f7Stbbdev my_async_activity.submit(input, gateway); 426*51c0b2f7Stbbdev } 427*51c0b2f7Stbbdev ); 428*51c0b2f7Stbbdev tbb::flow::function_node< output_type > end_node( 429*51c0b2f7Stbbdev g, tbb::flow::unlimited, 430*51c0b2f7Stbbdev [&](const output_type& input) { 431*51c0b2f7Stbbdev ++end_body_exec_count; 432*51c0b2f7Stbbdev output_type output; 433*51c0b2f7Stbbdev wrapper_helper<output_type, output_type>::check(input, output); 434*51c0b2f7Stbbdev } 435*51c0b2f7Stbbdev ); 436*51c0b2f7Stbbdev 437*51c0b2f7Stbbdev tbb::flow::make_edge( start_node, offload_node ); 438*51c0b2f7Stbbdev tbb::flow::make_edge( offload_node, end_node ); 439*51c0b2f7Stbbdev 440*51c0b2f7Stbbdev async_body_exec_count = 0; 441*51c0b2f7Stbbdev async_activity_processed_msg_count = 0; 442*51c0b2f7Stbbdev end_body_exec_count = 0; 443*51c0b2f7Stbbdev 444*51c0b2f7Stbbdev if (async_expected_items != UNKNOWN_NUMBER_OF_ITEMS) { 445*51c0b2f7Stbbdev offload_node.gateway().reserve_wait(); 446*51c0b2f7Stbbdev } 447*51c0b2f7Stbbdev for (int i = 0; i < NUMBER_OF_MSGS; ++i) { 448*51c0b2f7Stbbdev start_node.try_put(i); 449*51c0b2f7Stbbdev } 450*51c0b2f7Stbbdev g.wait_for_all(); 451*51c0b2f7Stbbdev CHECK_MESSAGE( ( async_body_exec_count == NUMBER_OF_MSGS), "AsyncBody processed wrong number of signals" ); 452*51c0b2f7Stbbdev CHECK_MESSAGE( ( async_activity_processed_msg_count == NUMBER_OF_MSGS), "AsyncActivity processed wrong number of signals" ); 453*51c0b2f7Stbbdev CHECK_MESSAGE( ( end_body_exec_count == NUMBER_OF_MSGS), "EndBody processed wrong number of signals"); 454*51c0b2f7Stbbdev INFO( "async_body_exec_count == " << int(async_body_exec_count) << 455*51c0b2f7Stbbdev " == async_activity_processed_msg_count == " << int(async_activity_processed_msg_count) << 456*51c0b2f7Stbbdev " == end_body_exec_count == " << int(end_body_exec_count) << "\n" 457*51c0b2f7Stbbdev ); 458*51c0b2f7Stbbdev return 0; 459*51c0b2f7Stbbdev } 460*51c0b2f7Stbbdev 461*51c0b2f7Stbbdev }; 462*51c0b2f7Stbbdev 463*51c0b2f7Stbbdev int test_copy_ctor() { 464*51c0b2f7Stbbdev const int N = NUMBER_OF_MSGS; 465*51c0b2f7Stbbdev async_body_exec_count = 0; 466*51c0b2f7Stbbdev 467*51c0b2f7Stbbdev tbb::flow::graph g; 468*51c0b2f7Stbbdev 469*51c0b2f7Stbbdev harness_counting_receiver<int> r1(g); 470*51c0b2f7Stbbdev harness_counting_receiver<int> r2(g); 471*51c0b2f7Stbbdev 472*51c0b2f7Stbbdev tbb::task_group_context graph_ctx; 473*51c0b2f7Stbbdev counting_async_node_type a(g, tbb::flow::unlimited, counting_async_unlimited_body(graph_ctx) ); 474*51c0b2f7Stbbdev counting_async_node_type b(a); 475*51c0b2f7Stbbdev 476*51c0b2f7Stbbdev tbb::flow::make_edge(a, r1); // C++11-style of making edges 477*51c0b2f7Stbbdev tbb::flow::make_edge(tbb::flow::output_port<0>(b), r2); // usual way of making edges 478*51c0b2f7Stbbdev 479*51c0b2f7Stbbdev for (int i = 0; i < N; ++i) { 480*51c0b2f7Stbbdev a.try_put(i); 481*51c0b2f7Stbbdev } 482*51c0b2f7Stbbdev g.wait_for_all(); 483*51c0b2f7Stbbdev 484*51c0b2f7Stbbdev INFO("async_body_exec_count = " << int(async_body_exec_count) << "\n" ); 485*51c0b2f7Stbbdev INFO("r1.my_count == " << int(r1.my_count) << " and r2.my_count = " << int(r2.my_count) << "\n" ); 486*51c0b2f7Stbbdev CHECK_MESSAGE( ( int(async_body_exec_count) == NUMBER_OF_MSGS), "AsyncBody processed wrong number of signals" ); 487*51c0b2f7Stbbdev CHECK_MESSAGE( ( int(r1.my_count) == N), "counting receiver r1 has not received N items" ); 488*51c0b2f7Stbbdev CHECK_MESSAGE( ( int(r2.my_count) == 0), "counting receiver r2 has not received 0 items" ); 489*51c0b2f7Stbbdev 490*51c0b2f7Stbbdev for (int i = 0; i < N; ++i) { 491*51c0b2f7Stbbdev b.try_put(i); 492*51c0b2f7Stbbdev } 493*51c0b2f7Stbbdev g.wait_for_all(); 494*51c0b2f7Stbbdev 495*51c0b2f7Stbbdev INFO("async_body_exec_count = " << int(async_body_exec_count) << "\n" ); 496*51c0b2f7Stbbdev INFO("r1.my_count == " << int(r1.my_count) << " and r2.my_count = " << int(r2.my_count) << "\n" ); 497*51c0b2f7Stbbdev CHECK_MESSAGE( ( int(async_body_exec_count) == 2*NUMBER_OF_MSGS), "AsyncBody processed wrong number of signals" ); 498*51c0b2f7Stbbdev CHECK_MESSAGE( ( int(r1.my_count) == N), "counting receiver r1 has not received N items" ); 499*51c0b2f7Stbbdev CHECK_MESSAGE( ( int(r2.my_count) == N), "counting receiver r2 has not received N items" ); 500*51c0b2f7Stbbdev return 0; 501*51c0b2f7Stbbdev } 502*51c0b2f7Stbbdev 503*51c0b2f7Stbbdev std::atomic<int> main_tid_count; 504*51c0b2f7Stbbdev 505*51c0b2f7Stbbdev template<typename Input, typename Output> 506*51c0b2f7Stbbdev struct spin_test { 507*51c0b2f7Stbbdev typedef Input input_type; 508*51c0b2f7Stbbdev typedef Output output_type; 509*51c0b2f7Stbbdev typedef tbb::flow::async_node< input_type, output_type > async_node_type; 510*51c0b2f7Stbbdev typedef typename async_node_type::gateway_type gateway_type; 511*51c0b2f7Stbbdev 512*51c0b2f7Stbbdev class end_body_type { 513*51c0b2f7Stbbdev typedef Output output_type; 514*51c0b2f7Stbbdev std::thread::id my_main_tid; 515*51c0b2f7Stbbdev utils::SpinBarrier *my_barrier; 516*51c0b2f7Stbbdev public: 517*51c0b2f7Stbbdev end_body_type(std::thread::id t, utils::SpinBarrier &b) : my_main_tid(t), my_barrier(&b) { } 518*51c0b2f7Stbbdev 519*51c0b2f7Stbbdev void operator()( const output_type & ) { 520*51c0b2f7Stbbdev ++end_body_exec_count; 521*51c0b2f7Stbbdev if (std::this_thread::get_id() == my_main_tid) { 522*51c0b2f7Stbbdev ++main_tid_count; 523*51c0b2f7Stbbdev } 524*51c0b2f7Stbbdev my_barrier->timedWaitNoError(10); 525*51c0b2f7Stbbdev } 526*51c0b2f7Stbbdev }; 527*51c0b2f7Stbbdev 528*51c0b2f7Stbbdev spin_test() {} 529*51c0b2f7Stbbdev 530*51c0b2f7Stbbdev static int run(int nthreads, int async_expected_items = UNKNOWN_NUMBER_OF_ITEMS) { 531*51c0b2f7Stbbdev async_activity<input_type, output_type> my_async_activity(async_expected_items, false, 0); 532*51c0b2f7Stbbdev const int overall_message_count = nthreads * NUMBER_OF_MSGS; 533*51c0b2f7Stbbdev utils::SpinBarrier spin_barrier(nthreads); 534*51c0b2f7Stbbdev 535*51c0b2f7Stbbdev tbb::flow::graph g; 536*51c0b2f7Stbbdev tbb::flow::function_node<int, input_type> start_node( 537*51c0b2f7Stbbdev g, tbb::flow::unlimited, [](int input) { return input_type(input); } 538*51c0b2f7Stbbdev ); 539*51c0b2f7Stbbdev async_node_type offload_node( 540*51c0b2f7Stbbdev g, tbb::flow::unlimited, 541*51c0b2f7Stbbdev [&](const input_type &input, gateway_type& gateway) { 542*51c0b2f7Stbbdev ++async_body_exec_count; 543*51c0b2f7Stbbdev if(my_async_activity.should_reserve_each_time()) 544*51c0b2f7Stbbdev gateway.reserve_wait(); 545*51c0b2f7Stbbdev my_async_activity.submit(input, gateway); 546*51c0b2f7Stbbdev } 547*51c0b2f7Stbbdev ); 548*51c0b2f7Stbbdev tbb::flow::function_node<output_type> end_node( 549*51c0b2f7Stbbdev g, tbb::flow::unlimited, end_body_type(std::this_thread::get_id(), spin_barrier) 550*51c0b2f7Stbbdev ); 551*51c0b2f7Stbbdev 552*51c0b2f7Stbbdev tbb::flow::make_edge( start_node, offload_node ); 553*51c0b2f7Stbbdev tbb::flow::make_edge( offload_node, end_node ); 554*51c0b2f7Stbbdev 555*51c0b2f7Stbbdev async_body_exec_count = 0; 556*51c0b2f7Stbbdev async_activity_processed_msg_count = 0; 557*51c0b2f7Stbbdev end_body_exec_count = 0; 558*51c0b2f7Stbbdev main_tid_count = 0; 559*51c0b2f7Stbbdev 560*51c0b2f7Stbbdev if (async_expected_items != UNKNOWN_NUMBER_OF_ITEMS ) { 561*51c0b2f7Stbbdev offload_node.gateway().reserve_wait(); 562*51c0b2f7Stbbdev } 563*51c0b2f7Stbbdev for (int i = 0; i < overall_message_count; ++i) { 564*51c0b2f7Stbbdev start_node.try_put(i); 565*51c0b2f7Stbbdev } 566*51c0b2f7Stbbdev g.wait_for_all(); 567*51c0b2f7Stbbdev CHECK_MESSAGE( (async_body_exec_count == overall_message_count), 568*51c0b2f7Stbbdev "AsyncBody processed wrong number of signals" ); 569*51c0b2f7Stbbdev CHECK_MESSAGE( (async_activity_processed_msg_count == overall_message_count), 570*51c0b2f7Stbbdev "AsyncActivity processed wrong number of signals" ); 571*51c0b2f7Stbbdev CHECK_MESSAGE( (end_body_exec_count == overall_message_count), 572*51c0b2f7Stbbdev "EndBody processed wrong number of signals"); 573*51c0b2f7Stbbdev 574*51c0b2f7Stbbdev INFO( "Main thread participated in " << main_tid_count << " end_body tasks\n"); 575*51c0b2f7Stbbdev 576*51c0b2f7Stbbdev INFO("async_body_exec_count == " << int(async_body_exec_count) << 577*51c0b2f7Stbbdev " == async_activity_processed_msg_count == " << int(async_activity_processed_msg_count) << 578*51c0b2f7Stbbdev " == end_body_exec_count == " << int(end_body_exec_count) << "\n" 579*51c0b2f7Stbbdev ); 580*51c0b2f7Stbbdev return 0; 581*51c0b2f7Stbbdev } 582*51c0b2f7Stbbdev 583*51c0b2f7Stbbdev }; 584*51c0b2f7Stbbdev 585*51c0b2f7Stbbdev void test_for_spin_avoidance() { 586*51c0b2f7Stbbdev const int nthreads = 4; 587*51c0b2f7Stbbdev tbb::global_control gc(tbb::global_control::max_allowed_parallelism, nthreads); 588*51c0b2f7Stbbdev spin_test<int, int>::run(nthreads); 589*51c0b2f7Stbbdev } 590*51c0b2f7Stbbdev 591*51c0b2f7Stbbdev template< typename Input, typename Output > 592*51c0b2f7Stbbdev int run_tests() { 593*51c0b2f7Stbbdev basic_test<Input, Output>::run(); 594*51c0b2f7Stbbdev basic_test<Input, Output>::run(NUMBER_OF_MSGS); 595*51c0b2f7Stbbdev basic_test<place_wrapper<Input>, place_wrapper<Output> >::run(); 596*51c0b2f7Stbbdev basic_test<place_wrapper<Input>, place_wrapper<Output> >::run(NUMBER_OF_MSGS); 597*51c0b2f7Stbbdev return 0; 598*51c0b2f7Stbbdev } 599*51c0b2f7Stbbdev 600*51c0b2f7Stbbdev #include "tbb/parallel_for.h" 601*51c0b2f7Stbbdev template<typename Input, typename Output> 602*51c0b2f7Stbbdev class equeueing_on_inner_level { 603*51c0b2f7Stbbdev typedef Input input_type; 604*51c0b2f7Stbbdev typedef Output output_type; 605*51c0b2f7Stbbdev typedef async_activity<input_type, output_type> async_activity_type; 606*51c0b2f7Stbbdev typedef tbb::flow::async_node<Input, Output> async_node_type; 607*51c0b2f7Stbbdev typedef typename async_node_type::gateway_type gateway_type; 608*51c0b2f7Stbbdev 609*51c0b2f7Stbbdev class body_graph_with_async { 610*51c0b2f7Stbbdev public: 611*51c0b2f7Stbbdev body_graph_with_async( utils::SpinBarrier& barrier, async_activity_type& activity ) 612*51c0b2f7Stbbdev : spin_barrier(&barrier), my_async_activity(&activity) {} 613*51c0b2f7Stbbdev 614*51c0b2f7Stbbdev void operator()(int) const { 615*51c0b2f7Stbbdev tbb::flow::graph g; 616*51c0b2f7Stbbdev tbb::flow::function_node< int, input_type > start_node( 617*51c0b2f7Stbbdev g, tbb::flow::unlimited, [](int input) { return input_type(input); } 618*51c0b2f7Stbbdev ); 619*51c0b2f7Stbbdev async_node_type offload_node( 620*51c0b2f7Stbbdev g, tbb::flow::unlimited, 621*51c0b2f7Stbbdev [&](const input_type &input, gateway_type& gateway) { 622*51c0b2f7Stbbdev gateway.reserve_wait(); 623*51c0b2f7Stbbdev my_async_activity->submit( input, gateway ); 624*51c0b2f7Stbbdev } 625*51c0b2f7Stbbdev ); 626*51c0b2f7Stbbdev tbb::flow::function_node< output_type > end_node( g, tbb::flow::unlimited, [](output_type){} ); 627*51c0b2f7Stbbdev 628*51c0b2f7Stbbdev tbb::flow::make_edge( start_node, offload_node ); 629*51c0b2f7Stbbdev tbb::flow::make_edge( offload_node, end_node ); 630*51c0b2f7Stbbdev 631*51c0b2f7Stbbdev start_node.try_put(1); 632*51c0b2f7Stbbdev 633*51c0b2f7Stbbdev spin_barrier->wait(); 634*51c0b2f7Stbbdev 635*51c0b2f7Stbbdev my_async_activity->activate(); 636*51c0b2f7Stbbdev 637*51c0b2f7Stbbdev g.wait_for_all(); 638*51c0b2f7Stbbdev } 639*51c0b2f7Stbbdev 640*51c0b2f7Stbbdev private: 641*51c0b2f7Stbbdev utils::SpinBarrier* spin_barrier; 642*51c0b2f7Stbbdev async_activity_type* my_async_activity; 643*51c0b2f7Stbbdev }; 644*51c0b2f7Stbbdev 645*51c0b2f7Stbbdev public: 646*51c0b2f7Stbbdev static int run () 647*51c0b2f7Stbbdev { 648*51c0b2f7Stbbdev const int nthreads = tbb::this_task_arena::max_concurrency(); 649*51c0b2f7Stbbdev utils::SpinBarrier spin_barrier( nthreads ); 650*51c0b2f7Stbbdev 651*51c0b2f7Stbbdev async_activity_type my_async_activity( UNKNOWN_NUMBER_OF_ITEMS, true ); 652*51c0b2f7Stbbdev 653*51c0b2f7Stbbdev tbb::parallel_for( 0, nthreads, body_graph_with_async( spin_barrier, my_async_activity ) ); 654*51c0b2f7Stbbdev return 0; 655*51c0b2f7Stbbdev } 656*51c0b2f7Stbbdev }; 657*51c0b2f7Stbbdev 658*51c0b2f7Stbbdev int run_test_equeueing_on_inner_level() { 659*51c0b2f7Stbbdev equeueing_on_inner_level<int, int>::run(); 660*51c0b2f7Stbbdev return 0; 661*51c0b2f7Stbbdev } 662*51c0b2f7Stbbdev 663*51c0b2f7Stbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 664*51c0b2f7Stbbdev #include <array> 665*51c0b2f7Stbbdev 666*51c0b2f7Stbbdev template<typename NodeType> 667*51c0b2f7Stbbdev class AsyncActivity { 668*51c0b2f7Stbbdev public: 669*51c0b2f7Stbbdev using gateway_t = typename NodeType::gateway_type; 670*51c0b2f7Stbbdev 671*51c0b2f7Stbbdev struct work_type { 672*51c0b2f7Stbbdev int input; 673*51c0b2f7Stbbdev gateway_t* gateway; 674*51c0b2f7Stbbdev }; 675*51c0b2f7Stbbdev 676*51c0b2f7Stbbdev AsyncActivity(size_t limit) : thr([this]() { 677*51c0b2f7Stbbdev while(!end_of_work()) { 678*51c0b2f7Stbbdev work_type w; 679*51c0b2f7Stbbdev while( my_q.try_pop(w) ) { 680*51c0b2f7Stbbdev int res = do_work(w.input); 681*51c0b2f7Stbbdev w.gateway->try_put(res); 682*51c0b2f7Stbbdev w.gateway->release_wait(); 683*51c0b2f7Stbbdev ++c; 684*51c0b2f7Stbbdev } 685*51c0b2f7Stbbdev } 686*51c0b2f7Stbbdev }), stop_limit(limit), c(0) {} 687*51c0b2f7Stbbdev 688*51c0b2f7Stbbdev void submit(int i, gateway_t* gateway) { 689*51c0b2f7Stbbdev work_type w = {i, gateway}; 690*51c0b2f7Stbbdev gateway->reserve_wait(); 691*51c0b2f7Stbbdev my_q.push(w); 692*51c0b2f7Stbbdev } 693*51c0b2f7Stbbdev 694*51c0b2f7Stbbdev void wait_for_all() { thr.join(); } 695*51c0b2f7Stbbdev 696*51c0b2f7Stbbdev private: 697*51c0b2f7Stbbdev bool end_of_work() { return c >= stop_limit; } 698*51c0b2f7Stbbdev 699*51c0b2f7Stbbdev int do_work(int& i) { return i + i; } 700*51c0b2f7Stbbdev 701*51c0b2f7Stbbdev async_activity_queue<work_type> my_q; 702*51c0b2f7Stbbdev std::thread thr; 703*51c0b2f7Stbbdev size_t stop_limit; 704*51c0b2f7Stbbdev size_t c; 705*51c0b2f7Stbbdev }; 706*51c0b2f7Stbbdev 707*51c0b2f7Stbbdev void test_follows() { 708*51c0b2f7Stbbdev using namespace tbb::flow; 709*51c0b2f7Stbbdev 710*51c0b2f7Stbbdev using input_t = int; 711*51c0b2f7Stbbdev using output_t = int; 712*51c0b2f7Stbbdev using node_t = async_node<input_t, output_t>; 713*51c0b2f7Stbbdev 714*51c0b2f7Stbbdev graph g; 715*51c0b2f7Stbbdev 716*51c0b2f7Stbbdev AsyncActivity<node_t> async_activity(3); 717*51c0b2f7Stbbdev 718*51c0b2f7Stbbdev std::array<broadcast_node<input_t>, 3> preds = { 719*51c0b2f7Stbbdev { 720*51c0b2f7Stbbdev broadcast_node<input_t>(g), 721*51c0b2f7Stbbdev broadcast_node<input_t>(g), 722*51c0b2f7Stbbdev broadcast_node<input_t>(g) 723*51c0b2f7Stbbdev } 724*51c0b2f7Stbbdev }; 725*51c0b2f7Stbbdev 726*51c0b2f7Stbbdev node_t node(follows(preds[0], preds[1], preds[2]), unlimited, [&](int input, node_t::gateway_type& gtw) { 727*51c0b2f7Stbbdev async_activity.submit(input, >w); 728*51c0b2f7Stbbdev }, no_priority); 729*51c0b2f7Stbbdev 730*51c0b2f7Stbbdev buffer_node<output_t> buf(g); 731*51c0b2f7Stbbdev make_edge(node, buf); 732*51c0b2f7Stbbdev 733*51c0b2f7Stbbdev for(auto& pred: preds) { 734*51c0b2f7Stbbdev pred.try_put(1); 735*51c0b2f7Stbbdev } 736*51c0b2f7Stbbdev 737*51c0b2f7Stbbdev g.wait_for_all(); 738*51c0b2f7Stbbdev async_activity.wait_for_all(); 739*51c0b2f7Stbbdev 740*51c0b2f7Stbbdev output_t storage; 741*51c0b2f7Stbbdev CHECK_MESSAGE((buf.try_get(storage) && buf.try_get(storage) && buf.try_get(storage) && !buf.try_get(storage)), 742*51c0b2f7Stbbdev "Not exact edge quantity was made"); 743*51c0b2f7Stbbdev } 744*51c0b2f7Stbbdev 745*51c0b2f7Stbbdev void test_precedes() { 746*51c0b2f7Stbbdev using namespace tbb::flow; 747*51c0b2f7Stbbdev 748*51c0b2f7Stbbdev using input_t = int; 749*51c0b2f7Stbbdev using output_t = int; 750*51c0b2f7Stbbdev using node_t = async_node<input_t, output_t>; 751*51c0b2f7Stbbdev 752*51c0b2f7Stbbdev graph g; 753*51c0b2f7Stbbdev 754*51c0b2f7Stbbdev AsyncActivity<node_t> async_activity(1); 755*51c0b2f7Stbbdev 756*51c0b2f7Stbbdev std::array<buffer_node<input_t>, 1> successors = { {buffer_node<input_t>(g)} }; 757*51c0b2f7Stbbdev 758*51c0b2f7Stbbdev broadcast_node<input_t> start(g); 759*51c0b2f7Stbbdev 760*51c0b2f7Stbbdev node_t node(precedes(successors[0]), unlimited, [&](int input, node_t::gateway_type& gtw) { 761*51c0b2f7Stbbdev async_activity.submit(input, >w); 762*51c0b2f7Stbbdev }, no_priority); 763*51c0b2f7Stbbdev 764*51c0b2f7Stbbdev make_edge(start, node); 765*51c0b2f7Stbbdev 766*51c0b2f7Stbbdev start.try_put(1); 767*51c0b2f7Stbbdev 768*51c0b2f7Stbbdev g.wait_for_all(); 769*51c0b2f7Stbbdev async_activity.wait_for_all(); 770*51c0b2f7Stbbdev 771*51c0b2f7Stbbdev for(auto& successor : successors) { 772*51c0b2f7Stbbdev output_t storage; 773*51c0b2f7Stbbdev CHECK_MESSAGE((successor.try_get(storage) && !successor.try_get(storage)), 774*51c0b2f7Stbbdev "Not exact edge quantity was made"); 775*51c0b2f7Stbbdev } 776*51c0b2f7Stbbdev } 777*51c0b2f7Stbbdev 778*51c0b2f7Stbbdev void test_follows_and_precedes_api() { 779*51c0b2f7Stbbdev test_follows(); 780*51c0b2f7Stbbdev test_precedes(); 781*51c0b2f7Stbbdev } 782*51c0b2f7Stbbdev #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 783*51c0b2f7Stbbdev 784*51c0b2f7Stbbdev //! Test async bodies processing 785*51c0b2f7Stbbdev //! \brief \ref requirement \ref error_guessing 786*51c0b2f7Stbbdev TEST_CASE("Basic tests"){ 787*51c0b2f7Stbbdev tbb::task_arena arena(utils::MaxThread); 788*51c0b2f7Stbbdev arena.execute( 789*51c0b2f7Stbbdev [&]() { 790*51c0b2f7Stbbdev run_tests<int, int>(); 791*51c0b2f7Stbbdev run_tests<minimal_type, minimal_type>(); 792*51c0b2f7Stbbdev run_tests<int, minimal_type>(); 793*51c0b2f7Stbbdev } 794*51c0b2f7Stbbdev ); 795*51c0b2f7Stbbdev } 796*51c0b2f7Stbbdev 797*51c0b2f7Stbbdev //! NativeParallelFor test with various concurrency settings 798*51c0b2f7Stbbdev //! \brief \ref requirement \ref error_guessing 799*51c0b2f7Stbbdev TEST_CASE("Lightweight tests"){ 800*51c0b2f7Stbbdev lightweight_testing::test<tbb::flow::async_node>(NUMBER_OF_MSGS); 801*51c0b2f7Stbbdev } 802*51c0b2f7Stbbdev 803*51c0b2f7Stbbdev //! Test reset and cancellation 804*51c0b2f7Stbbdev //! \brief \ref error_guessing 805*51c0b2f7Stbbdev TEST_CASE("Reset test"){ 806*51c0b2f7Stbbdev test_reset(); 807*51c0b2f7Stbbdev } 808*51c0b2f7Stbbdev 809*51c0b2f7Stbbdev //! Test 810*51c0b2f7Stbbdev //! \brief \ref requirement \ref error_guessing 811*51c0b2f7Stbbdev TEST_CASE("Copy constructor test"){ 812*51c0b2f7Stbbdev test_copy_ctor(); 813*51c0b2f7Stbbdev } 814*51c0b2f7Stbbdev 815*51c0b2f7Stbbdev //! Test if main thread spins 816*51c0b2f7Stbbdev //! \brief \ref stress 817*51c0b2f7Stbbdev TEST_CASE("Spin avoidance test"){ 818*51c0b2f7Stbbdev test_for_spin_avoidance(); 819*51c0b2f7Stbbdev } 820*51c0b2f7Stbbdev 821*51c0b2f7Stbbdev //! Test nested enqueing 822*51c0b2f7Stbbdev //! \brief \ref error_guessing 823*51c0b2f7Stbbdev TEST_CASE("Inner enqueing test"){ 824*51c0b2f7Stbbdev run_test_equeueing_on_inner_level(); 825*51c0b2f7Stbbdev } 826*51c0b2f7Stbbdev 827*51c0b2f7Stbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 828*51c0b2f7Stbbdev //! Test deprecated follows and preceedes API 829*51c0b2f7Stbbdev //! \brief \ref error_guessing 830*51c0b2f7Stbbdev TEST_CASE("Test follows and preceedes API"){ 831*51c0b2f7Stbbdev test_follows_and_precedes_api(); 832*51c0b2f7Stbbdev } 833*51c0b2f7Stbbdev #endif 834