xref: /oneTBB/test/tbb/test_async_node.cpp (revision b15aabb3)
151c0b2f7Stbbdev /*
2*b15aabb3Stbbdev     Copyright (c) 2005-2021 Intel Corporation
351c0b2f7Stbbdev 
451c0b2f7Stbbdev     Licensed under the Apache License, Version 2.0 (the "License");
551c0b2f7Stbbdev     you may not use this file except in compliance with the License.
651c0b2f7Stbbdev     You may obtain a copy of the License at
751c0b2f7Stbbdev 
851c0b2f7Stbbdev         http://www.apache.org/licenses/LICENSE-2.0
951c0b2f7Stbbdev 
1051c0b2f7Stbbdev     Unless required by applicable law or agreed to in writing, software
1151c0b2f7Stbbdev     distributed under the License is distributed on an "AS IS" BASIS,
1251c0b2f7Stbbdev     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1351c0b2f7Stbbdev     See the License for the specific language governing permissions and
1451c0b2f7Stbbdev     limitations under the License.
1551c0b2f7Stbbdev */
1651c0b2f7Stbbdev 
17*b15aabb3Stbbdev #if __INTEL_COMPILER && _MSC_VER
18*b15aabb3Stbbdev #pragma warning(disable : 2586) // decorated name length exceeded, name was truncated
19*b15aabb3Stbbdev #endif
2051c0b2f7Stbbdev 
2151c0b2f7Stbbdev #include "common/config.h"
2251c0b2f7Stbbdev 
2351c0b2f7Stbbdev #define __TBB_EXTRA_DEBUG 1
2451c0b2f7Stbbdev #include "tbb/flow_graph.h"
2551c0b2f7Stbbdev 
2651c0b2f7Stbbdev #include "tbb/task.h"
2751c0b2f7Stbbdev #include "tbb/global_control.h"
2851c0b2f7Stbbdev 
2951c0b2f7Stbbdev #include "common/test.h"
3051c0b2f7Stbbdev #include "common/utils.h"
3151c0b2f7Stbbdev #include "common/utils_assert.h"
3251c0b2f7Stbbdev #include "common/graph_utils.h"
3351c0b2f7Stbbdev #include "common/spin_barrier.h"
3451c0b2f7Stbbdev #include "common/test_follows_and_precedes_api.h"
3551c0b2f7Stbbdev 
3651c0b2f7Stbbdev #include <string>
3751c0b2f7Stbbdev #include <thread>
3851c0b2f7Stbbdev #include <mutex>
3951c0b2f7Stbbdev 
4051c0b2f7Stbbdev 
4151c0b2f7Stbbdev //! \file test_async_node.cpp
4251c0b2f7Stbbdev //! \brief Test for [flow_graph.async_node] specification
4351c0b2f7Stbbdev 
4451c0b2f7Stbbdev 
4551c0b2f7Stbbdev class minimal_type {
4651c0b2f7Stbbdev     template<typename T>
4751c0b2f7Stbbdev     friend struct place_wrapper;
4851c0b2f7Stbbdev 
4951c0b2f7Stbbdev     int value;
5051c0b2f7Stbbdev 
5151c0b2f7Stbbdev public:
5251c0b2f7Stbbdev     minimal_type() : value(-1) {}
5351c0b2f7Stbbdev     minimal_type(int v) : value(v) {}
5451c0b2f7Stbbdev     minimal_type(const minimal_type &m) : value(m.value) { }
5551c0b2f7Stbbdev     minimal_type &operator=(const minimal_type &m) { value = m.value; return *this; }
5651c0b2f7Stbbdev };
5751c0b2f7Stbbdev 
5851c0b2f7Stbbdev template <typename T>
5951c0b2f7Stbbdev struct place_wrapper {
6051c0b2f7Stbbdev     typedef T wrapped_type;
6151c0b2f7Stbbdev     T value;
6251c0b2f7Stbbdev     std::thread::id thread_id;
6351c0b2f7Stbbdev 
6451c0b2f7Stbbdev     place_wrapper( int v = 0 ) : value(v), thread_id(std::this_thread::get_id()) {}
6551c0b2f7Stbbdev 
6651c0b2f7Stbbdev     template <typename Q>
6751c0b2f7Stbbdev     place_wrapper(const place_wrapper<Q>& v)
6851c0b2f7Stbbdev         : value(v.value), thread_id(v.thread_id)
6951c0b2f7Stbbdev     {}
7051c0b2f7Stbbdev 
7151c0b2f7Stbbdev     template <typename Q>
7251c0b2f7Stbbdev     place_wrapper<Q>& operator=(const place_wrapper<Q>& v) {
7351c0b2f7Stbbdev         if (this != &v) {
7451c0b2f7Stbbdev             value = v.value;
7551c0b2f7Stbbdev             thread_id = v.thread_id;
7651c0b2f7Stbbdev         }
7751c0b2f7Stbbdev         return *this;
7851c0b2f7Stbbdev     }
7951c0b2f7Stbbdev 
8051c0b2f7Stbbdev };
8151c0b2f7Stbbdev 
8251c0b2f7Stbbdev template<typename T1, typename T2>
8351c0b2f7Stbbdev struct wrapper_helper {
8451c0b2f7Stbbdev     static void check(const T1 &, const T2 &) { }
8551c0b2f7Stbbdev     static void copy_value(const T1 &in, T2 &out) { out = in; }
8651c0b2f7Stbbdev };
8751c0b2f7Stbbdev 
8851c0b2f7Stbbdev template<typename T1, typename T2>
8951c0b2f7Stbbdev struct wrapper_helper< place_wrapper<T1>, place_wrapper<T2> > {
9051c0b2f7Stbbdev     static void check(const place_wrapper<T1> &a, const place_wrapper<T2> &b) {
9151c0b2f7Stbbdev        CHECK_MESSAGE( ( (a.thread_id != b.thread_id)), "same thread used to execute adjacent nodes");
9251c0b2f7Stbbdev        return;
9351c0b2f7Stbbdev     }
9451c0b2f7Stbbdev     static void copy_value(const place_wrapper<T1> &in, place_wrapper<T2> &out) {
9551c0b2f7Stbbdev         out.value = in.value;
9651c0b2f7Stbbdev     }
9751c0b2f7Stbbdev };
9851c0b2f7Stbbdev 
9951c0b2f7Stbbdev const int NUMBER_OF_MSGS = 10;
10051c0b2f7Stbbdev const int UNKNOWN_NUMBER_OF_ITEMS = -1;
10151c0b2f7Stbbdev std::atomic<int> async_body_exec_count;
10251c0b2f7Stbbdev std::atomic<int> async_activity_processed_msg_count;
10351c0b2f7Stbbdev std::atomic<int> end_body_exec_count;
10451c0b2f7Stbbdev 
10551c0b2f7Stbbdev // queueing required in test_reset for testing of cancellation
10651c0b2f7Stbbdev typedef tbb::flow::async_node< int, int, tbb::flow::queueing > counting_async_node_type;
10751c0b2f7Stbbdev typedef counting_async_node_type::gateway_type counting_gateway_type;
10851c0b2f7Stbbdev 
10951c0b2f7Stbbdev struct counting_async_unlimited_body {
11051c0b2f7Stbbdev 
11151c0b2f7Stbbdev     counting_async_unlimited_body(tbb::task_group_context& graph_tgc) : my_tgc( graph_tgc ) {}
11251c0b2f7Stbbdev 
11351c0b2f7Stbbdev     void operator()( const int &input, counting_gateway_type& gateway) {
11451c0b2f7Stbbdev         // TODO revamp: reconsider logging for the tests. It is known that frequent calls to
11551c0b2f7Stbbdev         // doctest's INFO cause issues.
11651c0b2f7Stbbdev 
11751c0b2f7Stbbdev         // INFO( "Body execution with input == " << input << "\n");
11851c0b2f7Stbbdev         ++async_body_exec_count;
11951c0b2f7Stbbdev         if ( input == -1 ) {
12051c0b2f7Stbbdev             bool result = my_tgc.cancel_group_execution();
12151c0b2f7Stbbdev             // INFO( "Canceling graph execution\n" );
12251c0b2f7Stbbdev             CHECK_MESSAGE( ( result == true), "attempted to cancel graph twice" );
12351c0b2f7Stbbdev             utils::Sleep(50);
12451c0b2f7Stbbdev         }
12551c0b2f7Stbbdev         gateway.try_put(input);
12651c0b2f7Stbbdev     }
12751c0b2f7Stbbdev private:
12851c0b2f7Stbbdev     tbb::task_group_context& my_tgc;
12951c0b2f7Stbbdev };
13051c0b2f7Stbbdev 
13151c0b2f7Stbbdev struct counting_async_serial_body : counting_async_unlimited_body {
13251c0b2f7Stbbdev     typedef counting_async_unlimited_body base_type;
13351c0b2f7Stbbdev     int my_async_body_exec_count;
13451c0b2f7Stbbdev 
13551c0b2f7Stbbdev     counting_async_serial_body(tbb::task_group_context& tgc)
13651c0b2f7Stbbdev         : base_type(tgc), my_async_body_exec_count( 0 ) { }
13751c0b2f7Stbbdev 
13851c0b2f7Stbbdev     void operator()( const int &input, counting_gateway_type& gateway ) {
13951c0b2f7Stbbdev         ++my_async_body_exec_count;
14051c0b2f7Stbbdev         base_type::operator()( input, gateway );
14151c0b2f7Stbbdev     }
14251c0b2f7Stbbdev };
14351c0b2f7Stbbdev 
14451c0b2f7Stbbdev void test_reset() {
14551c0b2f7Stbbdev     const int N = NUMBER_OF_MSGS;
14651c0b2f7Stbbdev     async_body_exec_count = 0;
14751c0b2f7Stbbdev 
14851c0b2f7Stbbdev     tbb::task_group_context graph_ctx;
14951c0b2f7Stbbdev     tbb::flow::graph g(graph_ctx);
15051c0b2f7Stbbdev     counting_async_node_type a(g, tbb::flow::serial, counting_async_serial_body(graph_ctx) );
15151c0b2f7Stbbdev 
15251c0b2f7Stbbdev     const int R = 3;
15351c0b2f7Stbbdev     std::vector< std::shared_ptr<harness_counting_receiver<int>> > r;
15451c0b2f7Stbbdev     for (size_t i = 0; i < R; ++i) {
15551c0b2f7Stbbdev         r.push_back( std::make_shared<harness_counting_receiver<int>>(g) );
15651c0b2f7Stbbdev     }
15751c0b2f7Stbbdev 
15851c0b2f7Stbbdev     for (int i = 0; i < R; ++i) {
15951c0b2f7Stbbdev         tbb::flow::make_edge(a, *r[i]);
16051c0b2f7Stbbdev     }
16151c0b2f7Stbbdev 
16251c0b2f7Stbbdev     INFO( "One body execution\n" );
16351c0b2f7Stbbdev     a.try_put(-1);
16451c0b2f7Stbbdev     for (int i = 0; i < N; ++i) {
16551c0b2f7Stbbdev        a.try_put(i);
16651c0b2f7Stbbdev     }
16751c0b2f7Stbbdev     g.wait_for_all();
16851c0b2f7Stbbdev     // should be canceled with only 1 item reaching the async_body and the counting receivers
16951c0b2f7Stbbdev     // and N items left in the node's queue
17051c0b2f7Stbbdev     CHECK_MESSAGE( ( g.is_cancelled() == true), "task group not canceled" );
17151c0b2f7Stbbdev 
17251c0b2f7Stbbdev     counting_async_serial_body b1 = tbb::flow::copy_body<counting_async_serial_body>(a);
17351c0b2f7Stbbdev     CHECK_MESSAGE( ( int(async_body_exec_count) == int(b1.my_async_body_exec_count)), "body and global body counts are different" );
17451c0b2f7Stbbdev     CHECK_MESSAGE( ( int(async_body_exec_count) == 1), "global body execution count not 1"  );
17551c0b2f7Stbbdev     for (int i = 0; i < R; ++i) {
17651c0b2f7Stbbdev         CHECK_MESSAGE( ( int(r[i]->my_count) == 1), "counting receiver count not 1" );
17751c0b2f7Stbbdev     }
17851c0b2f7Stbbdev 
17951c0b2f7Stbbdev     // should clear the async_node queue, but retain its local count at 1 and keep all edges
18051c0b2f7Stbbdev     g.reset(tbb::flow::rf_reset_protocol);
18151c0b2f7Stbbdev 
18251c0b2f7Stbbdev     INFO( "N body executions\n" );
18351c0b2f7Stbbdev     for (int i = 0; i < N; ++i) {
18451c0b2f7Stbbdev        a.try_put(i);
18551c0b2f7Stbbdev     }
18651c0b2f7Stbbdev     g.wait_for_all();
18751c0b2f7Stbbdev     CHECK_MESSAGE( ( g.is_cancelled() == false), "task group not canceled" );
18851c0b2f7Stbbdev 
18951c0b2f7Stbbdev     // a total of N+1 items should have passed through the node body
19051c0b2f7Stbbdev     // the local body count should also be N+1
19151c0b2f7Stbbdev     // and the counting receivers should all have a count of N+1
19251c0b2f7Stbbdev     counting_async_serial_body b2 = tbb::flow::copy_body<counting_async_serial_body>(a);
19351c0b2f7Stbbdev     CHECK_MESSAGE( int(async_body_exec_count) == int(b2.my_async_body_exec_count),
19451c0b2f7Stbbdev                    "local and global body execution counts are different" );
19551c0b2f7Stbbdev     INFO( "async_body_exec_count==" << int(async_body_exec_count) << "\n" );
19651c0b2f7Stbbdev     CHECK_MESSAGE( ( int(async_body_exec_count) == N+1), "global body execution count not N+1"  );
19751c0b2f7Stbbdev     for (int i = 0; i < R; ++i) {
19851c0b2f7Stbbdev         CHECK_MESSAGE( ( int(r[i]->my_count) == N+1), "counting receiver has not received N+1 items" );
19951c0b2f7Stbbdev     }
20051c0b2f7Stbbdev 
20151c0b2f7Stbbdev     INFO( "N body executions with new bodies\n" );
20251c0b2f7Stbbdev     // should clear the async_node queue and reset its local count to 0, but keep all edges
20351c0b2f7Stbbdev     g.reset(tbb::flow::rf_reset_bodies);
20451c0b2f7Stbbdev     for (int i = 0; i < N; ++i) {
20551c0b2f7Stbbdev        a.try_put(i);
20651c0b2f7Stbbdev     }
20751c0b2f7Stbbdev     g.wait_for_all();
20851c0b2f7Stbbdev     CHECK_MESSAGE( ( g.is_cancelled() == false), "task group not canceled" );
20951c0b2f7Stbbdev 
21051c0b2f7Stbbdev     // a total of 2N+1 items should have passed through the node body
21151c0b2f7Stbbdev     // the local body count should be N
21251c0b2f7Stbbdev     // and the counting receivers should all have a count of 2N+1
21351c0b2f7Stbbdev     counting_async_serial_body b3 = tbb::flow::copy_body<counting_async_serial_body>(a);
21451c0b2f7Stbbdev     CHECK_MESSAGE( ( int(async_body_exec_count) == 2*N+1), "global body execution count not 2N+1"  );
21551c0b2f7Stbbdev     CHECK_MESSAGE( ( int(b3.my_async_body_exec_count) == N), "local body execution count not N"  );
21651c0b2f7Stbbdev     for (int i = 0; i < R; ++i) {
21751c0b2f7Stbbdev         CHECK_MESSAGE( ( int(r[i]->my_count) == 2*N+1), "counting receiver has not received 2N+1 items" );
21851c0b2f7Stbbdev     }
21951c0b2f7Stbbdev 
22051c0b2f7Stbbdev     // should clear the async_node queue and keep its local count at N and remove all edges
22151c0b2f7Stbbdev     INFO( "N body executions with no edges\n" );
22251c0b2f7Stbbdev     g.reset(tbb::flow::rf_clear_edges);
22351c0b2f7Stbbdev     for (int i = 0; i < N; ++i) {
22451c0b2f7Stbbdev        a.try_put(i);
22551c0b2f7Stbbdev     }
22651c0b2f7Stbbdev     g.wait_for_all();
22751c0b2f7Stbbdev     CHECK_MESSAGE( ( g.is_cancelled() == false), "task group not canceled" );
22851c0b2f7Stbbdev 
22951c0b2f7Stbbdev     // a total of 3N+1 items should have passed through the node body
23051c0b2f7Stbbdev     // the local body count should now be 2*N
23151c0b2f7Stbbdev     // and the counting receivers should remain at a count of 2N+1
23251c0b2f7Stbbdev     counting_async_serial_body b4 = tbb::flow::copy_body<counting_async_serial_body>(a);
23351c0b2f7Stbbdev     CHECK_MESSAGE( ( int(async_body_exec_count) == 3*N+1), "global body execution count not 3N+1"  );
23451c0b2f7Stbbdev     CHECK_MESSAGE( ( int(b4.my_async_body_exec_count) == 2*N), "local body execution count not 2N"  );
23551c0b2f7Stbbdev     for (int i = 0; i < R; ++i) {
23651c0b2f7Stbbdev         CHECK_MESSAGE( ( int(r[i]->my_count) == 2*N+1), "counting receiver has not received 2N+1 items" );
23751c0b2f7Stbbdev     }
23851c0b2f7Stbbdev 
23951c0b2f7Stbbdev     // put back 1 edge to receiver 0
24051c0b2f7Stbbdev     INFO( "N body executions with 1 edge\n" );
24151c0b2f7Stbbdev     tbb::flow::make_edge(a, *r[0]);
24251c0b2f7Stbbdev     for (int i = 0; i < N; ++i) {
24351c0b2f7Stbbdev        a.try_put(i);
24451c0b2f7Stbbdev     }
24551c0b2f7Stbbdev     g.wait_for_all();
24651c0b2f7Stbbdev     CHECK_MESSAGE( ( g.is_cancelled() == false), "task group not canceled" );
24751c0b2f7Stbbdev 
24851c0b2f7Stbbdev     // a total of 4N+1 items should have passed through the node body
24951c0b2f7Stbbdev     // the local body count should now be 3*N
25051c0b2f7Stbbdev     // and all of the counting receivers should remain at a count of 2N+1, except r[0] which should be 3N+1
25151c0b2f7Stbbdev     counting_async_serial_body b5 = tbb::flow::copy_body<counting_async_serial_body>(a);
25251c0b2f7Stbbdev     CHECK_MESSAGE( ( int(async_body_exec_count) == 4*N+1), "global body execution count not 4N+1"  );
25351c0b2f7Stbbdev     CHECK_MESSAGE( ( int(b5.my_async_body_exec_count) == 3*N), "local body execution count not 3N"  );
25451c0b2f7Stbbdev     CHECK_MESSAGE( ( int(r[0]->my_count) == 3*N+1), "counting receiver has not received 3N+1 items" );
25551c0b2f7Stbbdev     for (int i = 1; i < R; ++i) {
25651c0b2f7Stbbdev         CHECK_MESSAGE( ( int(r[i]->my_count) == 2*N+1), "counting receiver has not received 2N+1 items" );
25751c0b2f7Stbbdev     }
25851c0b2f7Stbbdev 
25951c0b2f7Stbbdev     // should clear the async_node queue and keep its local count at N and remove all edges
26051c0b2f7Stbbdev     INFO( "N body executions with no edges and new body\n" );
26151c0b2f7Stbbdev     g.reset(static_cast<tbb::flow::reset_flags>(tbb::flow::rf_reset_bodies|tbb::flow::rf_clear_edges));
26251c0b2f7Stbbdev     for (int i = 0; i < N; ++i) {
26351c0b2f7Stbbdev        a.try_put(i);
26451c0b2f7Stbbdev     }
26551c0b2f7Stbbdev     g.wait_for_all();
26651c0b2f7Stbbdev     CHECK_MESSAGE( ( g.is_cancelled() == false), "task group not canceled" );
26751c0b2f7Stbbdev 
26851c0b2f7Stbbdev     // a total of 4N+1 items should have passed through the node body
26951c0b2f7Stbbdev     // the local body count should now be 3*N
27051c0b2f7Stbbdev     // and all of the counting receivers should remain at a count of 2N+1, except r[0] which should be 3N+1
27151c0b2f7Stbbdev     counting_async_serial_body b6 = tbb::flow::copy_body<counting_async_serial_body>(a);
27251c0b2f7Stbbdev     CHECK_MESSAGE( ( int(async_body_exec_count) == 5*N+1), "global body execution count not 5N+1"  );
27351c0b2f7Stbbdev     CHECK_MESSAGE( ( int(b6.my_async_body_exec_count) == N), "local body execution count not N"  );
27451c0b2f7Stbbdev     CHECK_MESSAGE( ( int(r[0]->my_count) == 3*N+1), "counting receiver has not received 3N+1 items" );
27551c0b2f7Stbbdev     for (int i = 1; i < R; ++i) {
27651c0b2f7Stbbdev         CHECK_MESSAGE( ( int(r[i]->my_count) == 2*N+1), "counting receiver has not received 2N+1 items" );
27751c0b2f7Stbbdev     }
27851c0b2f7Stbbdev }
27951c0b2f7Stbbdev 
28051c0b2f7Stbbdev 
28151c0b2f7Stbbdev #include <mutex>
28251c0b2f7Stbbdev 
28351c0b2f7Stbbdev template <typename T>
28451c0b2f7Stbbdev class async_activity_queue {
28551c0b2f7Stbbdev public:
28651c0b2f7Stbbdev     void push( const T& item ) {
28751c0b2f7Stbbdev         std::lock_guard<mutex_t> lock( m_mutex );
28851c0b2f7Stbbdev         m_queue.push( item );
28951c0b2f7Stbbdev     }
29051c0b2f7Stbbdev 
29151c0b2f7Stbbdev     bool try_pop( T& item ) {
29251c0b2f7Stbbdev         std::lock_guard<mutex_t> lock( m_mutex );
29351c0b2f7Stbbdev         if( m_queue.empty() )
29451c0b2f7Stbbdev             return false;
29551c0b2f7Stbbdev         item = m_queue.front();
29651c0b2f7Stbbdev         m_queue.pop();
29751c0b2f7Stbbdev         return true;
29851c0b2f7Stbbdev     }
29951c0b2f7Stbbdev 
30051c0b2f7Stbbdev     bool empty() {
30151c0b2f7Stbbdev         std::lock_guard<mutex_t> lock( m_mutex );
30251c0b2f7Stbbdev         return m_queue.empty();
30351c0b2f7Stbbdev     }
30451c0b2f7Stbbdev 
30551c0b2f7Stbbdev private:
30651c0b2f7Stbbdev     typedef std::mutex mutex_t;
30751c0b2f7Stbbdev     mutex_t m_mutex;
30851c0b2f7Stbbdev     std::queue<T> m_queue;
30951c0b2f7Stbbdev };
31051c0b2f7Stbbdev 
31151c0b2f7Stbbdev template< typename Input, typename Output >
31251c0b2f7Stbbdev class async_activity : utils::NoAssign {
31351c0b2f7Stbbdev public:
31451c0b2f7Stbbdev     typedef Input input_type;
31551c0b2f7Stbbdev     typedef Output output_type;
31651c0b2f7Stbbdev     typedef tbb::flow::async_node< input_type, output_type > async_node_type;
31751c0b2f7Stbbdev     typedef typename async_node_type::gateway_type gateway_type;
31851c0b2f7Stbbdev 
31951c0b2f7Stbbdev     struct work_type {
32051c0b2f7Stbbdev         input_type input;
32151c0b2f7Stbbdev         gateway_type* gateway;
32251c0b2f7Stbbdev     };
32351c0b2f7Stbbdev 
32451c0b2f7Stbbdev     class ServiceThreadBody {
32551c0b2f7Stbbdev     public:
32651c0b2f7Stbbdev         ServiceThreadBody( async_activity* activity ) : my_activity( activity ) {}
32751c0b2f7Stbbdev         void operator()() { my_activity->process(); }
32851c0b2f7Stbbdev     private:
32951c0b2f7Stbbdev         async_activity* my_activity;
33051c0b2f7Stbbdev     };
33151c0b2f7Stbbdev 
33251c0b2f7Stbbdev     async_activity(int expected_items, bool deferred = false, int sleep_time = 50)
33351c0b2f7Stbbdev         : my_expected_items(expected_items), my_sleep_time(sleep_time)
33451c0b2f7Stbbdev     {
33551c0b2f7Stbbdev         is_active = !deferred;
33651c0b2f7Stbbdev         my_quit = false;
33751c0b2f7Stbbdev         std::thread( ServiceThreadBody( this ) ).swap( my_service_thread );
33851c0b2f7Stbbdev     }
33951c0b2f7Stbbdev 
34051c0b2f7Stbbdev private:
34151c0b2f7Stbbdev 
34251c0b2f7Stbbdev     async_activity( const async_activity& )
34351c0b2f7Stbbdev         : my_expected_items(UNKNOWN_NUMBER_OF_ITEMS), my_sleep_time(0)
34451c0b2f7Stbbdev     {
34551c0b2f7Stbbdev         is_active = true;
34651c0b2f7Stbbdev     }
34751c0b2f7Stbbdev 
34851c0b2f7Stbbdev public:
34951c0b2f7Stbbdev     ~async_activity() {
35051c0b2f7Stbbdev         stop();
35151c0b2f7Stbbdev         my_service_thread.join();
35251c0b2f7Stbbdev     }
35351c0b2f7Stbbdev 
35451c0b2f7Stbbdev     void submit( const input_type &input, gateway_type& gateway ) {
35551c0b2f7Stbbdev         work_type work = {input, &gateway};
35651c0b2f7Stbbdev         my_work_queue.push( work );
35751c0b2f7Stbbdev     }
35851c0b2f7Stbbdev 
35951c0b2f7Stbbdev     void process() {
36051c0b2f7Stbbdev         do {
36151c0b2f7Stbbdev             work_type work;
36251c0b2f7Stbbdev             if( is_active && my_work_queue.try_pop( work ) ) {
36351c0b2f7Stbbdev                 utils::Sleep(my_sleep_time);
36451c0b2f7Stbbdev                 ++async_activity_processed_msg_count;
36551c0b2f7Stbbdev                 output_type output;
36651c0b2f7Stbbdev                 wrapper_helper<output_type, output_type>::copy_value(work.input, output);
36751c0b2f7Stbbdev                 wrapper_helper<output_type, output_type>::check(work.input, output);
36851c0b2f7Stbbdev                 work.gateway->try_put(output);
36951c0b2f7Stbbdev                 if ( my_expected_items == UNKNOWN_NUMBER_OF_ITEMS ||
37051c0b2f7Stbbdev                      int(async_activity_processed_msg_count) == my_expected_items ) {
37151c0b2f7Stbbdev                     work.gateway->release_wait();
37251c0b2f7Stbbdev                 }
37351c0b2f7Stbbdev             }
37451c0b2f7Stbbdev         } while( my_quit == false || !my_work_queue.empty());
37551c0b2f7Stbbdev     }
37651c0b2f7Stbbdev 
37751c0b2f7Stbbdev     void stop() {
37851c0b2f7Stbbdev         my_quit = true;
37951c0b2f7Stbbdev     }
38051c0b2f7Stbbdev 
38151c0b2f7Stbbdev     void activate() {
38251c0b2f7Stbbdev         is_active = true;
38351c0b2f7Stbbdev     }
38451c0b2f7Stbbdev 
38551c0b2f7Stbbdev     bool should_reserve_each_time() {
38651c0b2f7Stbbdev         if ( my_expected_items == UNKNOWN_NUMBER_OF_ITEMS )
38751c0b2f7Stbbdev             return true;
38851c0b2f7Stbbdev         else
38951c0b2f7Stbbdev             return false;
39051c0b2f7Stbbdev     }
39151c0b2f7Stbbdev 
39251c0b2f7Stbbdev private:
39351c0b2f7Stbbdev 
39451c0b2f7Stbbdev     const int my_expected_items;
39551c0b2f7Stbbdev     const int my_sleep_time;
39651c0b2f7Stbbdev     std::atomic< bool > is_active;
39751c0b2f7Stbbdev 
39851c0b2f7Stbbdev     async_activity_queue<work_type> my_work_queue;
39951c0b2f7Stbbdev 
40051c0b2f7Stbbdev     std::atomic< bool > my_quit;
40151c0b2f7Stbbdev 
40251c0b2f7Stbbdev     std::thread my_service_thread;
40351c0b2f7Stbbdev };
40451c0b2f7Stbbdev 
40551c0b2f7Stbbdev template<typename Input, typename Output>
40651c0b2f7Stbbdev struct basic_test {
40751c0b2f7Stbbdev     typedef Input input_type;
40851c0b2f7Stbbdev     typedef Output output_type;
40951c0b2f7Stbbdev     typedef tbb::flow::async_node< input_type, output_type > async_node_type;
41051c0b2f7Stbbdev     typedef typename async_node_type::gateway_type gateway_type;
41151c0b2f7Stbbdev 
41251c0b2f7Stbbdev     basic_test() {}
41351c0b2f7Stbbdev 
41451c0b2f7Stbbdev     static int run(int async_expected_items = UNKNOWN_NUMBER_OF_ITEMS) {
41551c0b2f7Stbbdev         async_activity<input_type, output_type> my_async_activity(async_expected_items);
41651c0b2f7Stbbdev 
41751c0b2f7Stbbdev         tbb::flow::graph g;
41851c0b2f7Stbbdev 
41951c0b2f7Stbbdev         tbb::flow::function_node< int, input_type > start_node(
42051c0b2f7Stbbdev             g, tbb::flow::unlimited, [](int input) { return input_type(input); }
42151c0b2f7Stbbdev         );
42251c0b2f7Stbbdev         async_node_type offload_node(
42351c0b2f7Stbbdev             g, tbb::flow::unlimited,
42451c0b2f7Stbbdev             [&] (const input_type &input, gateway_type& gateway) {
42551c0b2f7Stbbdev                 ++async_body_exec_count;
42651c0b2f7Stbbdev                 if(my_async_activity.should_reserve_each_time())
42751c0b2f7Stbbdev                     gateway.reserve_wait();
42851c0b2f7Stbbdev                 my_async_activity.submit(input, gateway);
42951c0b2f7Stbbdev             }
43051c0b2f7Stbbdev         );
43151c0b2f7Stbbdev         tbb::flow::function_node< output_type > end_node(
43251c0b2f7Stbbdev             g, tbb::flow::unlimited,
43351c0b2f7Stbbdev             [&](const output_type& input) {
43451c0b2f7Stbbdev                 ++end_body_exec_count;
43551c0b2f7Stbbdev                 output_type output;
43651c0b2f7Stbbdev                 wrapper_helper<output_type, output_type>::check(input, output);
43751c0b2f7Stbbdev             }
43851c0b2f7Stbbdev         );
43951c0b2f7Stbbdev 
44051c0b2f7Stbbdev         tbb::flow::make_edge( start_node, offload_node );
44151c0b2f7Stbbdev         tbb::flow::make_edge( offload_node, end_node );
44251c0b2f7Stbbdev 
44351c0b2f7Stbbdev         async_body_exec_count = 0;
44451c0b2f7Stbbdev         async_activity_processed_msg_count = 0;
44551c0b2f7Stbbdev         end_body_exec_count = 0;
44651c0b2f7Stbbdev 
44751c0b2f7Stbbdev         if (async_expected_items != UNKNOWN_NUMBER_OF_ITEMS) {
44851c0b2f7Stbbdev             offload_node.gateway().reserve_wait();
44951c0b2f7Stbbdev         }
45051c0b2f7Stbbdev         for (int i = 0; i < NUMBER_OF_MSGS; ++i) {
45151c0b2f7Stbbdev             start_node.try_put(i);
45251c0b2f7Stbbdev         }
45351c0b2f7Stbbdev         g.wait_for_all();
45451c0b2f7Stbbdev         CHECK_MESSAGE( ( async_body_exec_count == NUMBER_OF_MSGS), "AsyncBody processed wrong number of signals" );
45551c0b2f7Stbbdev         CHECK_MESSAGE( ( async_activity_processed_msg_count == NUMBER_OF_MSGS), "AsyncActivity processed wrong number of signals" );
45651c0b2f7Stbbdev         CHECK_MESSAGE( ( end_body_exec_count == NUMBER_OF_MSGS), "EndBody processed wrong number of signals");
45751c0b2f7Stbbdev         INFO( "async_body_exec_count == " << int(async_body_exec_count) <<
45851c0b2f7Stbbdev               " == async_activity_processed_msg_count == " << int(async_activity_processed_msg_count) <<
45951c0b2f7Stbbdev               " == end_body_exec_count == " << int(end_body_exec_count) << "\n"
46051c0b2f7Stbbdev         );
46151c0b2f7Stbbdev         return 0;
46251c0b2f7Stbbdev     }
46351c0b2f7Stbbdev 
46451c0b2f7Stbbdev };
46551c0b2f7Stbbdev 
46651c0b2f7Stbbdev int test_copy_ctor() {
46751c0b2f7Stbbdev     const int N = NUMBER_OF_MSGS;
46851c0b2f7Stbbdev     async_body_exec_count = 0;
46951c0b2f7Stbbdev 
47051c0b2f7Stbbdev     tbb::flow::graph g;
47151c0b2f7Stbbdev 
47251c0b2f7Stbbdev     harness_counting_receiver<int> r1(g);
47351c0b2f7Stbbdev     harness_counting_receiver<int> r2(g);
47451c0b2f7Stbbdev 
47551c0b2f7Stbbdev     tbb::task_group_context graph_ctx;
47651c0b2f7Stbbdev     counting_async_node_type a(g, tbb::flow::unlimited, counting_async_unlimited_body(graph_ctx) );
47751c0b2f7Stbbdev     counting_async_node_type b(a);
47851c0b2f7Stbbdev 
47951c0b2f7Stbbdev     tbb::flow::make_edge(a, r1);                             // C++11-style of making edges
48051c0b2f7Stbbdev     tbb::flow::make_edge(tbb::flow::output_port<0>(b), r2);  // usual way of making edges
48151c0b2f7Stbbdev 
48251c0b2f7Stbbdev     for (int i = 0; i < N; ++i) {
48351c0b2f7Stbbdev        a.try_put(i);
48451c0b2f7Stbbdev     }
48551c0b2f7Stbbdev     g.wait_for_all();
48651c0b2f7Stbbdev 
48751c0b2f7Stbbdev     INFO("async_body_exec_count = " << int(async_body_exec_count) << "\n" );
48851c0b2f7Stbbdev     INFO("r1.my_count == " << int(r1.my_count) << " and r2.my_count = " << int(r2.my_count) << "\n" );
48951c0b2f7Stbbdev     CHECK_MESSAGE( ( int(async_body_exec_count) == NUMBER_OF_MSGS), "AsyncBody processed wrong number of signals" );
49051c0b2f7Stbbdev     CHECK_MESSAGE( ( int(r1.my_count) == N), "counting receiver r1 has not received N items" );
49151c0b2f7Stbbdev     CHECK_MESSAGE( ( int(r2.my_count) == 0), "counting receiver r2 has not received 0 items" );
49251c0b2f7Stbbdev 
49351c0b2f7Stbbdev     for (int i = 0; i < N; ++i) {
49451c0b2f7Stbbdev        b.try_put(i);
49551c0b2f7Stbbdev     }
49651c0b2f7Stbbdev     g.wait_for_all();
49751c0b2f7Stbbdev 
49851c0b2f7Stbbdev     INFO("async_body_exec_count = " << int(async_body_exec_count) << "\n" );
49951c0b2f7Stbbdev     INFO("r1.my_count == " << int(r1.my_count) << " and r2.my_count = " << int(r2.my_count) << "\n" );
50051c0b2f7Stbbdev     CHECK_MESSAGE( ( int(async_body_exec_count) == 2*NUMBER_OF_MSGS), "AsyncBody processed wrong number of signals" );
50151c0b2f7Stbbdev     CHECK_MESSAGE( ( int(r1.my_count) == N), "counting receiver r1 has not received N items" );
50251c0b2f7Stbbdev     CHECK_MESSAGE( ( int(r2.my_count) == N), "counting receiver r2 has not received N items" );
50351c0b2f7Stbbdev     return 0;
50451c0b2f7Stbbdev }
50551c0b2f7Stbbdev 
50651c0b2f7Stbbdev std::atomic<int> main_tid_count;
50751c0b2f7Stbbdev 
50851c0b2f7Stbbdev template<typename Input, typename Output>
50951c0b2f7Stbbdev struct spin_test {
51051c0b2f7Stbbdev     typedef Input input_type;
51151c0b2f7Stbbdev     typedef Output output_type;
51251c0b2f7Stbbdev     typedef tbb::flow::async_node< input_type, output_type > async_node_type;
51351c0b2f7Stbbdev     typedef typename async_node_type::gateway_type gateway_type;
51451c0b2f7Stbbdev 
51551c0b2f7Stbbdev     class end_body_type {
51651c0b2f7Stbbdev         typedef Output output_type;
51751c0b2f7Stbbdev         std::thread::id my_main_tid;
51851c0b2f7Stbbdev         utils::SpinBarrier *my_barrier;
51951c0b2f7Stbbdev     public:
52051c0b2f7Stbbdev         end_body_type(std::thread::id t, utils::SpinBarrier &b) : my_main_tid(t), my_barrier(&b) { }
52151c0b2f7Stbbdev 
52251c0b2f7Stbbdev         void operator()( const output_type & ) {
52351c0b2f7Stbbdev             ++end_body_exec_count;
52451c0b2f7Stbbdev             if (std::this_thread::get_id() == my_main_tid) {
52551c0b2f7Stbbdev                ++main_tid_count;
52651c0b2f7Stbbdev             }
527*b15aabb3Stbbdev             my_barrier->wait();
52851c0b2f7Stbbdev         }
52951c0b2f7Stbbdev     };
53051c0b2f7Stbbdev 
53151c0b2f7Stbbdev     spin_test() {}
53251c0b2f7Stbbdev 
53351c0b2f7Stbbdev     static int run(int nthreads, int async_expected_items = UNKNOWN_NUMBER_OF_ITEMS) {
53451c0b2f7Stbbdev         async_activity<input_type, output_type> my_async_activity(async_expected_items, false, 0);
53551c0b2f7Stbbdev         const int overall_message_count = nthreads * NUMBER_OF_MSGS;
53651c0b2f7Stbbdev         utils::SpinBarrier spin_barrier(nthreads);
53751c0b2f7Stbbdev 
53851c0b2f7Stbbdev         tbb::flow::graph g;
53951c0b2f7Stbbdev         tbb::flow::function_node<int, input_type> start_node(
54051c0b2f7Stbbdev             g, tbb::flow::unlimited, [](int input) { return input_type(input); }
54151c0b2f7Stbbdev         );
54251c0b2f7Stbbdev         async_node_type offload_node(
54351c0b2f7Stbbdev             g, tbb::flow::unlimited,
54451c0b2f7Stbbdev             [&](const input_type &input, gateway_type& gateway) {
54551c0b2f7Stbbdev                 ++async_body_exec_count;
54651c0b2f7Stbbdev                 if(my_async_activity.should_reserve_each_time())
54751c0b2f7Stbbdev                     gateway.reserve_wait();
54851c0b2f7Stbbdev                 my_async_activity.submit(input, gateway);
54951c0b2f7Stbbdev             }
55051c0b2f7Stbbdev         );
55151c0b2f7Stbbdev         tbb::flow::function_node<output_type> end_node(
55251c0b2f7Stbbdev             g, tbb::flow::unlimited, end_body_type(std::this_thread::get_id(), spin_barrier)
55351c0b2f7Stbbdev         );
55451c0b2f7Stbbdev 
55551c0b2f7Stbbdev         tbb::flow::make_edge( start_node, offload_node );
55651c0b2f7Stbbdev         tbb::flow::make_edge( offload_node, end_node );
55751c0b2f7Stbbdev 
55851c0b2f7Stbbdev         async_body_exec_count = 0;
55951c0b2f7Stbbdev         async_activity_processed_msg_count = 0;
56051c0b2f7Stbbdev         end_body_exec_count = 0;
56151c0b2f7Stbbdev         main_tid_count = 0;
56251c0b2f7Stbbdev 
56351c0b2f7Stbbdev         if (async_expected_items != UNKNOWN_NUMBER_OF_ITEMS ) {
56451c0b2f7Stbbdev             offload_node.gateway().reserve_wait();
56551c0b2f7Stbbdev         }
56651c0b2f7Stbbdev         for (int i = 0; i < overall_message_count; ++i) {
56751c0b2f7Stbbdev             start_node.try_put(i);
56851c0b2f7Stbbdev         }
56951c0b2f7Stbbdev         g.wait_for_all();
57051c0b2f7Stbbdev         CHECK_MESSAGE( (async_body_exec_count == overall_message_count),
57151c0b2f7Stbbdev                        "AsyncBody processed wrong number of signals" );
57251c0b2f7Stbbdev         CHECK_MESSAGE( (async_activity_processed_msg_count == overall_message_count),
57351c0b2f7Stbbdev                        "AsyncActivity processed wrong number of signals" );
57451c0b2f7Stbbdev         CHECK_MESSAGE( (end_body_exec_count == overall_message_count),
57551c0b2f7Stbbdev                        "EndBody processed wrong number of signals");
57651c0b2f7Stbbdev 
57751c0b2f7Stbbdev         INFO( "Main thread participated in " << main_tid_count << " end_body tasks\n");
57851c0b2f7Stbbdev 
57951c0b2f7Stbbdev         INFO("async_body_exec_count == " << int(async_body_exec_count) <<
58051c0b2f7Stbbdev              " == async_activity_processed_msg_count == " << int(async_activity_processed_msg_count) <<
58151c0b2f7Stbbdev              " == end_body_exec_count == " << int(end_body_exec_count) << "\n"
58251c0b2f7Stbbdev         );
58351c0b2f7Stbbdev         return 0;
58451c0b2f7Stbbdev     }
58551c0b2f7Stbbdev 
58651c0b2f7Stbbdev };
58751c0b2f7Stbbdev 
58851c0b2f7Stbbdev void test_for_spin_avoidance() {
58951c0b2f7Stbbdev     const int nthreads = 4;
59051c0b2f7Stbbdev     tbb::global_control gc(tbb::global_control::max_allowed_parallelism, nthreads);
59151c0b2f7Stbbdev     spin_test<int, int>::run(nthreads);
59251c0b2f7Stbbdev }
59351c0b2f7Stbbdev 
59451c0b2f7Stbbdev template< typename Input, typename Output >
59551c0b2f7Stbbdev int run_tests() {
59651c0b2f7Stbbdev     basic_test<Input, Output>::run();
59751c0b2f7Stbbdev     basic_test<Input, Output>::run(NUMBER_OF_MSGS);
59851c0b2f7Stbbdev     basic_test<place_wrapper<Input>, place_wrapper<Output> >::run();
59951c0b2f7Stbbdev     basic_test<place_wrapper<Input>, place_wrapper<Output> >::run(NUMBER_OF_MSGS);
60051c0b2f7Stbbdev     return 0;
60151c0b2f7Stbbdev }
60251c0b2f7Stbbdev 
60351c0b2f7Stbbdev #include "tbb/parallel_for.h"
60451c0b2f7Stbbdev template<typename Input, typename Output>
60551c0b2f7Stbbdev class equeueing_on_inner_level {
60651c0b2f7Stbbdev     typedef Input input_type;
60751c0b2f7Stbbdev     typedef Output output_type;
60851c0b2f7Stbbdev     typedef async_activity<input_type, output_type> async_activity_type;
60951c0b2f7Stbbdev     typedef tbb::flow::async_node<Input, Output> async_node_type;
61051c0b2f7Stbbdev     typedef typename async_node_type::gateway_type gateway_type;
61151c0b2f7Stbbdev 
61251c0b2f7Stbbdev     class body_graph_with_async {
61351c0b2f7Stbbdev     public:
61451c0b2f7Stbbdev         body_graph_with_async( utils::SpinBarrier& barrier, async_activity_type& activity )
61551c0b2f7Stbbdev             : spin_barrier(&barrier), my_async_activity(&activity) {}
61651c0b2f7Stbbdev 
61751c0b2f7Stbbdev         void operator()(int) const {
61851c0b2f7Stbbdev             tbb::flow::graph g;
61951c0b2f7Stbbdev             tbb::flow::function_node< int, input_type > start_node(
62051c0b2f7Stbbdev                 g, tbb::flow::unlimited, [](int input) { return input_type(input); }
62151c0b2f7Stbbdev             );
62251c0b2f7Stbbdev             async_node_type offload_node(
62351c0b2f7Stbbdev                 g, tbb::flow::unlimited,
62451c0b2f7Stbbdev                 [&](const input_type &input, gateway_type& gateway) {
62551c0b2f7Stbbdev                     gateway.reserve_wait();
62651c0b2f7Stbbdev                     my_async_activity->submit( input, gateway );
62751c0b2f7Stbbdev                 }
62851c0b2f7Stbbdev             );
62951c0b2f7Stbbdev             tbb::flow::function_node< output_type > end_node( g, tbb::flow::unlimited, [](output_type){} );
63051c0b2f7Stbbdev 
63151c0b2f7Stbbdev             tbb::flow::make_edge( start_node, offload_node );
63251c0b2f7Stbbdev             tbb::flow::make_edge( offload_node, end_node );
63351c0b2f7Stbbdev 
63451c0b2f7Stbbdev             start_node.try_put(1);
63551c0b2f7Stbbdev 
63651c0b2f7Stbbdev             spin_barrier->wait();
63751c0b2f7Stbbdev 
63851c0b2f7Stbbdev             my_async_activity->activate();
63951c0b2f7Stbbdev 
64051c0b2f7Stbbdev             g.wait_for_all();
64151c0b2f7Stbbdev         }
64251c0b2f7Stbbdev 
64351c0b2f7Stbbdev     private:
64451c0b2f7Stbbdev         utils::SpinBarrier* spin_barrier;
64551c0b2f7Stbbdev         async_activity_type* my_async_activity;
64651c0b2f7Stbbdev     };
64751c0b2f7Stbbdev 
64851c0b2f7Stbbdev public:
64951c0b2f7Stbbdev     static int run ()
65051c0b2f7Stbbdev     {
65151c0b2f7Stbbdev         const int nthreads = tbb::this_task_arena::max_concurrency();
65251c0b2f7Stbbdev         utils::SpinBarrier spin_barrier( nthreads );
65351c0b2f7Stbbdev 
65451c0b2f7Stbbdev         async_activity_type my_async_activity( UNKNOWN_NUMBER_OF_ITEMS, true );
65551c0b2f7Stbbdev 
65651c0b2f7Stbbdev         tbb::parallel_for( 0, nthreads, body_graph_with_async( spin_barrier, my_async_activity ) );
65751c0b2f7Stbbdev         return 0;
65851c0b2f7Stbbdev     }
65951c0b2f7Stbbdev };
66051c0b2f7Stbbdev 
66151c0b2f7Stbbdev int run_test_equeueing_on_inner_level() {
66251c0b2f7Stbbdev     equeueing_on_inner_level<int, int>::run();
66351c0b2f7Stbbdev     return 0;
66451c0b2f7Stbbdev }
66551c0b2f7Stbbdev 
66651c0b2f7Stbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
66751c0b2f7Stbbdev #include <array>
66851c0b2f7Stbbdev 
66951c0b2f7Stbbdev template<typename NodeType>
67051c0b2f7Stbbdev class AsyncActivity {
67151c0b2f7Stbbdev public:
67251c0b2f7Stbbdev     using gateway_t = typename NodeType::gateway_type;
67351c0b2f7Stbbdev 
67451c0b2f7Stbbdev     struct work_type {
67551c0b2f7Stbbdev         int input;
67651c0b2f7Stbbdev         gateway_t* gateway;
67751c0b2f7Stbbdev     };
67851c0b2f7Stbbdev 
679*b15aabb3Stbbdev     AsyncActivity(size_t limit) : stop_limit(limit), c(0), thr([this]() {
68051c0b2f7Stbbdev         while(!end_of_work()) {
68151c0b2f7Stbbdev             work_type w;
68251c0b2f7Stbbdev             while( my_q.try_pop(w) ) {
68351c0b2f7Stbbdev                 int res = do_work(w.input);
68451c0b2f7Stbbdev                 w.gateway->try_put(res);
68551c0b2f7Stbbdev                 w.gateway->release_wait();
68651c0b2f7Stbbdev                 ++c;
68751c0b2f7Stbbdev             }
68851c0b2f7Stbbdev         }
689*b15aabb3Stbbdev     }) {}
69051c0b2f7Stbbdev 
69151c0b2f7Stbbdev     void submit(int i, gateway_t* gateway) {
69251c0b2f7Stbbdev         work_type w = {i, gateway};
69351c0b2f7Stbbdev         gateway->reserve_wait();
69451c0b2f7Stbbdev         my_q.push(w);
69551c0b2f7Stbbdev     }
69651c0b2f7Stbbdev 
69751c0b2f7Stbbdev     void wait_for_all() { thr.join(); }
69851c0b2f7Stbbdev 
69951c0b2f7Stbbdev private:
70051c0b2f7Stbbdev     bool end_of_work() { return c >= stop_limit; }
70151c0b2f7Stbbdev 
70251c0b2f7Stbbdev     int do_work(int& i) { return i + i; }
70351c0b2f7Stbbdev 
70451c0b2f7Stbbdev     async_activity_queue<work_type> my_q;
70551c0b2f7Stbbdev     size_t stop_limit;
70651c0b2f7Stbbdev     size_t c;
707*b15aabb3Stbbdev     std::thread thr;
70851c0b2f7Stbbdev };
70951c0b2f7Stbbdev 
71051c0b2f7Stbbdev void test_follows() {
71151c0b2f7Stbbdev     using namespace tbb::flow;
71251c0b2f7Stbbdev 
71351c0b2f7Stbbdev     using input_t = int;
71451c0b2f7Stbbdev     using output_t = int;
71551c0b2f7Stbbdev     using node_t = async_node<input_t, output_t>;
71651c0b2f7Stbbdev 
71751c0b2f7Stbbdev     graph g;
71851c0b2f7Stbbdev 
71951c0b2f7Stbbdev     AsyncActivity<node_t> async_activity(3);
72051c0b2f7Stbbdev 
72151c0b2f7Stbbdev     std::array<broadcast_node<input_t>, 3> preds = {
72251c0b2f7Stbbdev       {
72351c0b2f7Stbbdev         broadcast_node<input_t>(g),
72451c0b2f7Stbbdev         broadcast_node<input_t>(g),
72551c0b2f7Stbbdev         broadcast_node<input_t>(g)
72651c0b2f7Stbbdev       }
72751c0b2f7Stbbdev     };
72851c0b2f7Stbbdev 
72951c0b2f7Stbbdev     node_t node(follows(preds[0], preds[1], preds[2]), unlimited, [&](int input, node_t::gateway_type& gtw) {
73051c0b2f7Stbbdev         async_activity.submit(input, &gtw);
73151c0b2f7Stbbdev     }, no_priority);
73251c0b2f7Stbbdev 
73351c0b2f7Stbbdev     buffer_node<output_t> buf(g);
73451c0b2f7Stbbdev     make_edge(node, buf);
73551c0b2f7Stbbdev 
73651c0b2f7Stbbdev     for(auto& pred: preds) {
73751c0b2f7Stbbdev         pred.try_put(1);
73851c0b2f7Stbbdev     }
73951c0b2f7Stbbdev 
74051c0b2f7Stbbdev     g.wait_for_all();
74151c0b2f7Stbbdev     async_activity.wait_for_all();
74251c0b2f7Stbbdev 
74351c0b2f7Stbbdev     output_t storage;
74451c0b2f7Stbbdev     CHECK_MESSAGE((buf.try_get(storage) && buf.try_get(storage) && buf.try_get(storage) && !buf.try_get(storage)),
74551c0b2f7Stbbdev                   "Not exact edge quantity was made");
74651c0b2f7Stbbdev }
74751c0b2f7Stbbdev 
74851c0b2f7Stbbdev void test_precedes() {
74951c0b2f7Stbbdev     using namespace tbb::flow;
75051c0b2f7Stbbdev 
75151c0b2f7Stbbdev     using input_t = int;
75251c0b2f7Stbbdev     using output_t = int;
75351c0b2f7Stbbdev     using node_t = async_node<input_t, output_t>;
75451c0b2f7Stbbdev 
75551c0b2f7Stbbdev     graph g;
75651c0b2f7Stbbdev 
75751c0b2f7Stbbdev     AsyncActivity<node_t> async_activity(1);
75851c0b2f7Stbbdev 
75951c0b2f7Stbbdev     std::array<buffer_node<input_t>, 1> successors = { {buffer_node<input_t>(g)} };
76051c0b2f7Stbbdev 
76151c0b2f7Stbbdev     broadcast_node<input_t> start(g);
76251c0b2f7Stbbdev 
76351c0b2f7Stbbdev     node_t node(precedes(successors[0]), unlimited, [&](int input, node_t::gateway_type& gtw) {
76451c0b2f7Stbbdev         async_activity.submit(input, &gtw);
76551c0b2f7Stbbdev     }, no_priority);
76651c0b2f7Stbbdev 
76751c0b2f7Stbbdev     make_edge(start, node);
76851c0b2f7Stbbdev 
76951c0b2f7Stbbdev     start.try_put(1);
77051c0b2f7Stbbdev 
77151c0b2f7Stbbdev     g.wait_for_all();
77251c0b2f7Stbbdev     async_activity.wait_for_all();
77351c0b2f7Stbbdev 
77451c0b2f7Stbbdev     for(auto& successor : successors) {
77551c0b2f7Stbbdev         output_t storage;
77651c0b2f7Stbbdev         CHECK_MESSAGE((successor.try_get(storage) && !successor.try_get(storage)),
77751c0b2f7Stbbdev                       "Not exact edge quantity was made");
77851c0b2f7Stbbdev     }
77951c0b2f7Stbbdev }
78051c0b2f7Stbbdev 
78151c0b2f7Stbbdev void test_follows_and_precedes_api() {
78251c0b2f7Stbbdev     test_follows();
78351c0b2f7Stbbdev     test_precedes();
78451c0b2f7Stbbdev }
78551c0b2f7Stbbdev #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
78651c0b2f7Stbbdev 
78751c0b2f7Stbbdev //! Test async bodies processing
78851c0b2f7Stbbdev //! \brief \ref requirement \ref error_guessing
78951c0b2f7Stbbdev TEST_CASE("Basic tests"){
79051c0b2f7Stbbdev     tbb::task_arena arena(utils::MaxThread);
79151c0b2f7Stbbdev     arena.execute(
79251c0b2f7Stbbdev         [&]() {
79351c0b2f7Stbbdev             run_tests<int, int>();
79451c0b2f7Stbbdev             run_tests<minimal_type, minimal_type>();
79551c0b2f7Stbbdev             run_tests<int, minimal_type>();
79651c0b2f7Stbbdev         }
79751c0b2f7Stbbdev     );
79851c0b2f7Stbbdev }
79951c0b2f7Stbbdev 
80051c0b2f7Stbbdev //! NativeParallelFor test with various concurrency settings
80151c0b2f7Stbbdev //! \brief \ref requirement \ref error_guessing
80251c0b2f7Stbbdev TEST_CASE("Lightweight tests"){
80351c0b2f7Stbbdev     lightweight_testing::test<tbb::flow::async_node>(NUMBER_OF_MSGS);
80451c0b2f7Stbbdev }
80551c0b2f7Stbbdev 
80651c0b2f7Stbbdev //! Test reset and cancellation
80751c0b2f7Stbbdev //! \brief \ref error_guessing
80851c0b2f7Stbbdev TEST_CASE("Reset test"){
80951c0b2f7Stbbdev     test_reset();
81051c0b2f7Stbbdev }
81151c0b2f7Stbbdev 
81251c0b2f7Stbbdev //! Test
81351c0b2f7Stbbdev //! \brief \ref requirement \ref error_guessing
81451c0b2f7Stbbdev TEST_CASE("Copy constructor test"){
81551c0b2f7Stbbdev     test_copy_ctor();
81651c0b2f7Stbbdev }
81751c0b2f7Stbbdev 
81851c0b2f7Stbbdev //! Test if main thread spins
81951c0b2f7Stbbdev //! \brief \ref stress
82051c0b2f7Stbbdev TEST_CASE("Spin avoidance test"){
82151c0b2f7Stbbdev     test_for_spin_avoidance();
82251c0b2f7Stbbdev }
82351c0b2f7Stbbdev 
82451c0b2f7Stbbdev //! Test nested enqueing
82551c0b2f7Stbbdev //! \brief \ref error_guessing
82651c0b2f7Stbbdev TEST_CASE("Inner enqueing test"){
82751c0b2f7Stbbdev     run_test_equeueing_on_inner_level();
82851c0b2f7Stbbdev }
82951c0b2f7Stbbdev 
83051c0b2f7Stbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
83151c0b2f7Stbbdev //! Test deprecated follows and preceedes API
83251c0b2f7Stbbdev //! \brief \ref error_guessing
83351c0b2f7Stbbdev TEST_CASE("Test follows and preceedes API"){
83451c0b2f7Stbbdev     test_follows_and_precedes_api();
83551c0b2f7Stbbdev }
83651c0b2f7Stbbdev #endif
837