xref: /oneTBB/test/tbb/test_async_node.cpp (revision c4a799df)
151c0b2f7Stbbdev /*
2*c4a799dfSJhaShweta1     Copyright (c) 2005-2023 Intel Corporation
351c0b2f7Stbbdev 
451c0b2f7Stbbdev     Licensed under the Apache License, Version 2.0 (the "License");
551c0b2f7Stbbdev     you may not use this file except in compliance with the License.
651c0b2f7Stbbdev     You may obtain a copy of the License at
751c0b2f7Stbbdev 
851c0b2f7Stbbdev         http://www.apache.org/licenses/LICENSE-2.0
951c0b2f7Stbbdev 
1051c0b2f7Stbbdev     Unless required by applicable law or agreed to in writing, software
1151c0b2f7Stbbdev     distributed under the License is distributed on an "AS IS" BASIS,
1251c0b2f7Stbbdev     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1351c0b2f7Stbbdev     See the License for the specific language governing permissions and
1451c0b2f7Stbbdev     limitations under the License.
1551c0b2f7Stbbdev */
1651c0b2f7Stbbdev 
17b15aabb3Stbbdev #if __INTEL_COMPILER && _MSC_VER
18b15aabb3Stbbdev #pragma warning(disable : 2586) // decorated name length exceeded, name was truncated
19b15aabb3Stbbdev #endif
2051c0b2f7Stbbdev 
2151c0b2f7Stbbdev #include "common/config.h"
2251c0b2f7Stbbdev 
2351c0b2f7Stbbdev #include "tbb/flow_graph.h"
2451c0b2f7Stbbdev 
2551c0b2f7Stbbdev #include "tbb/task.h"
2651c0b2f7Stbbdev #include "tbb/global_control.h"
2751c0b2f7Stbbdev 
2851c0b2f7Stbbdev #include "common/test.h"
2951c0b2f7Stbbdev #include "common/utils.h"
3051c0b2f7Stbbdev #include "common/utils_assert.h"
3151c0b2f7Stbbdev #include "common/graph_utils.h"
3251c0b2f7Stbbdev #include "common/spin_barrier.h"
3351c0b2f7Stbbdev #include "common/test_follows_and_precedes_api.h"
34478de5b1Stbbdev #include "common/concepts_common.h"
3551c0b2f7Stbbdev 
3651c0b2f7Stbbdev #include <string>
3751c0b2f7Stbbdev #include <thread>
3851c0b2f7Stbbdev #include <mutex>
3951c0b2f7Stbbdev 
4051c0b2f7Stbbdev 
4151c0b2f7Stbbdev //! \file test_async_node.cpp
4251c0b2f7Stbbdev //! \brief Test for [flow_graph.async_node] specification
4351c0b2f7Stbbdev 
4451c0b2f7Stbbdev 
4551c0b2f7Stbbdev class minimal_type {
4651c0b2f7Stbbdev     template<typename T>
4751c0b2f7Stbbdev     friend struct place_wrapper;
4851c0b2f7Stbbdev 
4951c0b2f7Stbbdev     int value;
5051c0b2f7Stbbdev 
5151c0b2f7Stbbdev public:
minimal_type()5251c0b2f7Stbbdev     minimal_type() : value(-1) {}
minimal_type(int v)5351c0b2f7Stbbdev     minimal_type(int v) : value(v) {}
minimal_type(const minimal_type & m)5451c0b2f7Stbbdev     minimal_type(const minimal_type &m) : value(m.value) { }
operator =(const minimal_type & m)5551c0b2f7Stbbdev     minimal_type &operator=(const minimal_type &m) { value = m.value; return *this; }
5651c0b2f7Stbbdev };
5751c0b2f7Stbbdev 
5851c0b2f7Stbbdev template <typename T>
5951c0b2f7Stbbdev struct place_wrapper {
6051c0b2f7Stbbdev     typedef T wrapped_type;
6151c0b2f7Stbbdev     T value;
6251c0b2f7Stbbdev     std::thread::id thread_id;
6351c0b2f7Stbbdev 
place_wrapperplace_wrapper6451c0b2f7Stbbdev     place_wrapper( int v = 0 ) : value(v), thread_id(std::this_thread::get_id()) {}
6551c0b2f7Stbbdev 
6651c0b2f7Stbbdev     template <typename Q>
place_wrapperplace_wrapper6751c0b2f7Stbbdev     place_wrapper(const place_wrapper<Q>& v)
6851c0b2f7Stbbdev         : value(v.value), thread_id(v.thread_id)
6951c0b2f7Stbbdev     {}
7051c0b2f7Stbbdev 
7151c0b2f7Stbbdev     template <typename Q>
operator =place_wrapper7251c0b2f7Stbbdev     place_wrapper<Q>& operator=(const place_wrapper<Q>& v) {
7351c0b2f7Stbbdev         if (this != &v) {
7451c0b2f7Stbbdev             value = v.value;
7551c0b2f7Stbbdev             thread_id = v.thread_id;
7651c0b2f7Stbbdev         }
7751c0b2f7Stbbdev         return *this;
7851c0b2f7Stbbdev     }
7951c0b2f7Stbbdev 
8051c0b2f7Stbbdev };
8151c0b2f7Stbbdev 
8251c0b2f7Stbbdev template<typename T1, typename T2>
8351c0b2f7Stbbdev struct wrapper_helper {
checkwrapper_helper8451c0b2f7Stbbdev     static void check(const T1 &, const T2 &) { }
copy_valuewrapper_helper8551c0b2f7Stbbdev     static void copy_value(const T1 &in, T2 &out) { out = in; }
8651c0b2f7Stbbdev };
8751c0b2f7Stbbdev 
8851c0b2f7Stbbdev template<typename T1, typename T2>
8951c0b2f7Stbbdev struct wrapper_helper< place_wrapper<T1>, place_wrapper<T2> > {
checkwrapper_helper9051c0b2f7Stbbdev     static void check(const place_wrapper<T1> &a, const place_wrapper<T2> &b) {
9151c0b2f7Stbbdev        CHECK_MESSAGE( ( (a.thread_id != b.thread_id)), "same thread used to execute adjacent nodes");
9251c0b2f7Stbbdev        return;
9351c0b2f7Stbbdev     }
copy_valuewrapper_helper9451c0b2f7Stbbdev     static void copy_value(const place_wrapper<T1> &in, place_wrapper<T2> &out) {
9551c0b2f7Stbbdev         out.value = in.value;
9651c0b2f7Stbbdev     }
9751c0b2f7Stbbdev };
9851c0b2f7Stbbdev 
9951c0b2f7Stbbdev const int NUMBER_OF_MSGS = 10;
10051c0b2f7Stbbdev const int UNKNOWN_NUMBER_OF_ITEMS = -1;
10151c0b2f7Stbbdev std::atomic<int> async_body_exec_count;
10251c0b2f7Stbbdev std::atomic<int> async_activity_processed_msg_count;
10351c0b2f7Stbbdev std::atomic<int> end_body_exec_count;
10451c0b2f7Stbbdev 
10551c0b2f7Stbbdev // queueing required in test_reset for testing of cancellation
10651c0b2f7Stbbdev typedef tbb::flow::async_node< int, int, tbb::flow::queueing > counting_async_node_type;
10751c0b2f7Stbbdev typedef counting_async_node_type::gateway_type counting_gateway_type;
10851c0b2f7Stbbdev 
10951c0b2f7Stbbdev struct counting_async_unlimited_body {
11051c0b2f7Stbbdev 
counting_async_unlimited_bodycounting_async_unlimited_body11151c0b2f7Stbbdev     counting_async_unlimited_body(tbb::task_group_context& graph_tgc) : my_tgc( graph_tgc ) {}
11251c0b2f7Stbbdev 
operator ()counting_async_unlimited_body11351c0b2f7Stbbdev     void operator()( const int &input, counting_gateway_type& gateway) {
11451c0b2f7Stbbdev         // TODO revamp: reconsider logging for the tests. It is known that frequent calls to
11551c0b2f7Stbbdev         // doctest's INFO cause issues.
11651c0b2f7Stbbdev 
11751c0b2f7Stbbdev         // INFO( "Body execution with input == " << input << "\n");
11851c0b2f7Stbbdev         ++async_body_exec_count;
11951c0b2f7Stbbdev         if ( input == -1 ) {
12051c0b2f7Stbbdev             bool result = my_tgc.cancel_group_execution();
12151c0b2f7Stbbdev             // INFO( "Canceling graph execution\n" );
12251c0b2f7Stbbdev             CHECK_MESSAGE( ( result == true), "attempted to cancel graph twice" );
12351c0b2f7Stbbdev             utils::Sleep(50);
12451c0b2f7Stbbdev         }
12551c0b2f7Stbbdev         gateway.try_put(input);
12651c0b2f7Stbbdev     }
12751c0b2f7Stbbdev private:
12851c0b2f7Stbbdev     tbb::task_group_context& my_tgc;
12951c0b2f7Stbbdev };
13051c0b2f7Stbbdev 
13151c0b2f7Stbbdev struct counting_async_serial_body : counting_async_unlimited_body {
13251c0b2f7Stbbdev     typedef counting_async_unlimited_body base_type;
13351c0b2f7Stbbdev     int my_async_body_exec_count;
13451c0b2f7Stbbdev 
counting_async_serial_bodycounting_async_serial_body13551c0b2f7Stbbdev     counting_async_serial_body(tbb::task_group_context& tgc)
13651c0b2f7Stbbdev         : base_type(tgc), my_async_body_exec_count( 0 ) { }
13751c0b2f7Stbbdev 
operator ()counting_async_serial_body13851c0b2f7Stbbdev     void operator()( const int &input, counting_gateway_type& gateway ) {
13951c0b2f7Stbbdev         ++my_async_body_exec_count;
14051c0b2f7Stbbdev         base_type::operator()( input, gateway );
14151c0b2f7Stbbdev     }
14251c0b2f7Stbbdev };
14351c0b2f7Stbbdev 
test_reset()14451c0b2f7Stbbdev void test_reset() {
14551c0b2f7Stbbdev     const int N = NUMBER_OF_MSGS;
14651c0b2f7Stbbdev     async_body_exec_count = 0;
14751c0b2f7Stbbdev 
14851c0b2f7Stbbdev     tbb::task_group_context graph_ctx;
14951c0b2f7Stbbdev     tbb::flow::graph g(graph_ctx);
15051c0b2f7Stbbdev     counting_async_node_type a(g, tbb::flow::serial, counting_async_serial_body(graph_ctx) );
15151c0b2f7Stbbdev 
15251c0b2f7Stbbdev     const int R = 3;
15351c0b2f7Stbbdev     std::vector< std::shared_ptr<harness_counting_receiver<int>> > r;
15451c0b2f7Stbbdev     for (size_t i = 0; i < R; ++i) {
15551c0b2f7Stbbdev         r.push_back( std::make_shared<harness_counting_receiver<int>>(g) );
15651c0b2f7Stbbdev     }
15751c0b2f7Stbbdev 
15851c0b2f7Stbbdev     for (int i = 0; i < R; ++i) {
15951c0b2f7Stbbdev         tbb::flow::make_edge(a, *r[i]);
16051c0b2f7Stbbdev     }
16151c0b2f7Stbbdev 
16251c0b2f7Stbbdev     INFO( "One body execution\n" );
16351c0b2f7Stbbdev     a.try_put(-1);
16451c0b2f7Stbbdev     for (int i = 0; i < N; ++i) {
16551c0b2f7Stbbdev        a.try_put(i);
16651c0b2f7Stbbdev     }
16751c0b2f7Stbbdev     g.wait_for_all();
16851c0b2f7Stbbdev     // should be canceled with only 1 item reaching the async_body and the counting receivers
16951c0b2f7Stbbdev     // and N items left in the node's queue
17051c0b2f7Stbbdev     CHECK_MESSAGE( ( g.is_cancelled() == true), "task group not canceled" );
17151c0b2f7Stbbdev 
17251c0b2f7Stbbdev     counting_async_serial_body b1 = tbb::flow::copy_body<counting_async_serial_body>(a);
17351c0b2f7Stbbdev     CHECK_MESSAGE( ( int(async_body_exec_count) == int(b1.my_async_body_exec_count)), "body and global body counts are different" );
17451c0b2f7Stbbdev     CHECK_MESSAGE( ( int(async_body_exec_count) == 1), "global body execution count not 1"  );
17551c0b2f7Stbbdev     for (int i = 0; i < R; ++i) {
17651c0b2f7Stbbdev         CHECK_MESSAGE( ( int(r[i]->my_count) == 1), "counting receiver count not 1" );
17751c0b2f7Stbbdev     }
17851c0b2f7Stbbdev 
17951c0b2f7Stbbdev     // should clear the async_node queue, but retain its local count at 1 and keep all edges
18051c0b2f7Stbbdev     g.reset(tbb::flow::rf_reset_protocol);
18151c0b2f7Stbbdev 
18251c0b2f7Stbbdev     INFO( "N body executions\n" );
18351c0b2f7Stbbdev     for (int i = 0; i < N; ++i) {
18451c0b2f7Stbbdev        a.try_put(i);
18551c0b2f7Stbbdev     }
18651c0b2f7Stbbdev     g.wait_for_all();
18751c0b2f7Stbbdev     CHECK_MESSAGE( ( g.is_cancelled() == false), "task group not canceled" );
18851c0b2f7Stbbdev 
18951c0b2f7Stbbdev     // a total of N+1 items should have passed through the node body
19051c0b2f7Stbbdev     // the local body count should also be N+1
19151c0b2f7Stbbdev     // and the counting receivers should all have a count of N+1
19251c0b2f7Stbbdev     counting_async_serial_body b2 = tbb::flow::copy_body<counting_async_serial_body>(a);
19351c0b2f7Stbbdev     CHECK_MESSAGE( int(async_body_exec_count) == int(b2.my_async_body_exec_count),
19451c0b2f7Stbbdev                    "local and global body execution counts are different" );
19551c0b2f7Stbbdev     INFO( "async_body_exec_count==" << int(async_body_exec_count) << "\n" );
19651c0b2f7Stbbdev     CHECK_MESSAGE( ( int(async_body_exec_count) == N+1), "global body execution count not N+1"  );
19751c0b2f7Stbbdev     for (int i = 0; i < R; ++i) {
19851c0b2f7Stbbdev         CHECK_MESSAGE( ( int(r[i]->my_count) == N+1), "counting receiver has not received N+1 items" );
19951c0b2f7Stbbdev     }
20051c0b2f7Stbbdev 
20151c0b2f7Stbbdev     INFO( "N body executions with new bodies\n" );
20251c0b2f7Stbbdev     // should clear the async_node queue and reset its local count to 0, but keep all edges
20351c0b2f7Stbbdev     g.reset(tbb::flow::rf_reset_bodies);
20451c0b2f7Stbbdev     for (int i = 0; i < N; ++i) {
20551c0b2f7Stbbdev        a.try_put(i);
20651c0b2f7Stbbdev     }
20751c0b2f7Stbbdev     g.wait_for_all();
20851c0b2f7Stbbdev     CHECK_MESSAGE( ( g.is_cancelled() == false), "task group not canceled" );
20951c0b2f7Stbbdev 
21051c0b2f7Stbbdev     // a total of 2N+1 items should have passed through the node body
21151c0b2f7Stbbdev     // the local body count should be N
21251c0b2f7Stbbdev     // and the counting receivers should all have a count of 2N+1
21351c0b2f7Stbbdev     counting_async_serial_body b3 = tbb::flow::copy_body<counting_async_serial_body>(a);
21451c0b2f7Stbbdev     CHECK_MESSAGE( ( int(async_body_exec_count) == 2*N+1), "global body execution count not 2N+1"  );
21551c0b2f7Stbbdev     CHECK_MESSAGE( ( int(b3.my_async_body_exec_count) == N), "local body execution count not N"  );
21651c0b2f7Stbbdev     for (int i = 0; i < R; ++i) {
21751c0b2f7Stbbdev         CHECK_MESSAGE( ( int(r[i]->my_count) == 2*N+1), "counting receiver has not received 2N+1 items" );
21851c0b2f7Stbbdev     }
21951c0b2f7Stbbdev 
22051c0b2f7Stbbdev     // should clear the async_node queue and keep its local count at N and remove all edges
22151c0b2f7Stbbdev     INFO( "N body executions with no edges\n" );
22251c0b2f7Stbbdev     g.reset(tbb::flow::rf_clear_edges);
22351c0b2f7Stbbdev     for (int i = 0; i < N; ++i) {
22451c0b2f7Stbbdev        a.try_put(i);
22551c0b2f7Stbbdev     }
22651c0b2f7Stbbdev     g.wait_for_all();
22751c0b2f7Stbbdev     CHECK_MESSAGE( ( g.is_cancelled() == false), "task group not canceled" );
22851c0b2f7Stbbdev 
22951c0b2f7Stbbdev     // a total of 3N+1 items should have passed through the node body
23051c0b2f7Stbbdev     // the local body count should now be 2*N
23151c0b2f7Stbbdev     // and the counting receivers should remain at a count of 2N+1
23251c0b2f7Stbbdev     counting_async_serial_body b4 = tbb::flow::copy_body<counting_async_serial_body>(a);
23351c0b2f7Stbbdev     CHECK_MESSAGE( ( int(async_body_exec_count) == 3*N+1), "global body execution count not 3N+1"  );
23451c0b2f7Stbbdev     CHECK_MESSAGE( ( int(b4.my_async_body_exec_count) == 2*N), "local body execution count not 2N"  );
23551c0b2f7Stbbdev     for (int i = 0; i < R; ++i) {
23651c0b2f7Stbbdev         CHECK_MESSAGE( ( int(r[i]->my_count) == 2*N+1), "counting receiver has not received 2N+1 items" );
23751c0b2f7Stbbdev     }
23851c0b2f7Stbbdev 
23951c0b2f7Stbbdev     // put back 1 edge to receiver 0
24051c0b2f7Stbbdev     INFO( "N body executions with 1 edge\n" );
24151c0b2f7Stbbdev     tbb::flow::make_edge(a, *r[0]);
24251c0b2f7Stbbdev     for (int i = 0; i < N; ++i) {
24351c0b2f7Stbbdev        a.try_put(i);
24451c0b2f7Stbbdev     }
24551c0b2f7Stbbdev     g.wait_for_all();
24651c0b2f7Stbbdev     CHECK_MESSAGE( ( g.is_cancelled() == false), "task group not canceled" );
24751c0b2f7Stbbdev 
24851c0b2f7Stbbdev     // a total of 4N+1 items should have passed through the node body
24951c0b2f7Stbbdev     // the local body count should now be 3*N
25051c0b2f7Stbbdev     // and all of the counting receivers should remain at a count of 2N+1, except r[0] which should be 3N+1
25151c0b2f7Stbbdev     counting_async_serial_body b5 = tbb::flow::copy_body<counting_async_serial_body>(a);
25251c0b2f7Stbbdev     CHECK_MESSAGE( ( int(async_body_exec_count) == 4*N+1), "global body execution count not 4N+1"  );
25351c0b2f7Stbbdev     CHECK_MESSAGE( ( int(b5.my_async_body_exec_count) == 3*N), "local body execution count not 3N"  );
25451c0b2f7Stbbdev     CHECK_MESSAGE( ( int(r[0]->my_count) == 3*N+1), "counting receiver has not received 3N+1 items" );
25551c0b2f7Stbbdev     for (int i = 1; i < R; ++i) {
25651c0b2f7Stbbdev         CHECK_MESSAGE( ( int(r[i]->my_count) == 2*N+1), "counting receiver has not received 2N+1 items" );
25751c0b2f7Stbbdev     }
25851c0b2f7Stbbdev 
25951c0b2f7Stbbdev     // should clear the async_node queue and keep its local count at N and remove all edges
26051c0b2f7Stbbdev     INFO( "N body executions with no edges and new body\n" );
26151c0b2f7Stbbdev     g.reset(static_cast<tbb::flow::reset_flags>(tbb::flow::rf_reset_bodies|tbb::flow::rf_clear_edges));
26251c0b2f7Stbbdev     for (int i = 0; i < N; ++i) {
26351c0b2f7Stbbdev        a.try_put(i);
26451c0b2f7Stbbdev     }
26551c0b2f7Stbbdev     g.wait_for_all();
26651c0b2f7Stbbdev     CHECK_MESSAGE( ( g.is_cancelled() == false), "task group not canceled" );
26751c0b2f7Stbbdev 
26851c0b2f7Stbbdev     // a total of 4N+1 items should have passed through the node body
26951c0b2f7Stbbdev     // the local body count should now be 3*N
27051c0b2f7Stbbdev     // and all of the counting receivers should remain at a count of 2N+1, except r[0] which should be 3N+1
27151c0b2f7Stbbdev     counting_async_serial_body b6 = tbb::flow::copy_body<counting_async_serial_body>(a);
27251c0b2f7Stbbdev     CHECK_MESSAGE( ( int(async_body_exec_count) == 5*N+1), "global body execution count not 5N+1"  );
27351c0b2f7Stbbdev     CHECK_MESSAGE( ( int(b6.my_async_body_exec_count) == N), "local body execution count not N"  );
27451c0b2f7Stbbdev     CHECK_MESSAGE( ( int(r[0]->my_count) == 3*N+1), "counting receiver has not received 3N+1 items" );
27551c0b2f7Stbbdev     for (int i = 1; i < R; ++i) {
27651c0b2f7Stbbdev         CHECK_MESSAGE( ( int(r[i]->my_count) == 2*N+1), "counting receiver has not received 2N+1 items" );
27751c0b2f7Stbbdev     }
27851c0b2f7Stbbdev }
27951c0b2f7Stbbdev 
28051c0b2f7Stbbdev 
28151c0b2f7Stbbdev #include <mutex>
28251c0b2f7Stbbdev 
28351c0b2f7Stbbdev template <typename T>
28451c0b2f7Stbbdev class async_activity_queue {
28551c0b2f7Stbbdev public:
push(const T & item)28651c0b2f7Stbbdev     void push( const T& item ) {
28751c0b2f7Stbbdev         std::lock_guard<mutex_t> lock( m_mutex );
28851c0b2f7Stbbdev         m_queue.push( item );
28951c0b2f7Stbbdev     }
29051c0b2f7Stbbdev 
try_pop(T & item)29151c0b2f7Stbbdev     bool try_pop( T& item ) {
29251c0b2f7Stbbdev         std::lock_guard<mutex_t> lock( m_mutex );
29351c0b2f7Stbbdev         if( m_queue.empty() )
29451c0b2f7Stbbdev             return false;
29551c0b2f7Stbbdev         item = m_queue.front();
29651c0b2f7Stbbdev         m_queue.pop();
29751c0b2f7Stbbdev         return true;
29851c0b2f7Stbbdev     }
29951c0b2f7Stbbdev 
empty()30051c0b2f7Stbbdev     bool empty() {
30151c0b2f7Stbbdev         std::lock_guard<mutex_t> lock( m_mutex );
30251c0b2f7Stbbdev         return m_queue.empty();
30351c0b2f7Stbbdev     }
30451c0b2f7Stbbdev 
30551c0b2f7Stbbdev private:
30651c0b2f7Stbbdev     typedef std::mutex mutex_t;
30751c0b2f7Stbbdev     mutex_t m_mutex;
30851c0b2f7Stbbdev     std::queue<T> m_queue;
30951c0b2f7Stbbdev };
31051c0b2f7Stbbdev 
31151c0b2f7Stbbdev template< typename Input, typename Output >
31251c0b2f7Stbbdev class async_activity : utils::NoAssign {
31351c0b2f7Stbbdev public:
31451c0b2f7Stbbdev     typedef Input input_type;
31551c0b2f7Stbbdev     typedef Output output_type;
31651c0b2f7Stbbdev     typedef tbb::flow::async_node< input_type, output_type > async_node_type;
31751c0b2f7Stbbdev     typedef typename async_node_type::gateway_type gateway_type;
31851c0b2f7Stbbdev 
31951c0b2f7Stbbdev     struct work_type {
32051c0b2f7Stbbdev         input_type input;
32151c0b2f7Stbbdev         gateway_type* gateway;
32251c0b2f7Stbbdev     };
32351c0b2f7Stbbdev 
32451c0b2f7Stbbdev     class ServiceThreadBody {
32551c0b2f7Stbbdev     public:
ServiceThreadBody(async_activity * activity)32651c0b2f7Stbbdev         ServiceThreadBody( async_activity* activity ) : my_activity( activity ) {}
operator ()()32751c0b2f7Stbbdev         void operator()() { my_activity->process(); }
32851c0b2f7Stbbdev     private:
32951c0b2f7Stbbdev         async_activity* my_activity;
33051c0b2f7Stbbdev     };
33151c0b2f7Stbbdev 
async_activity(int expected_items,bool deferred=false,int sleep_time=50)33251c0b2f7Stbbdev     async_activity(int expected_items, bool deferred = false, int sleep_time = 50)
33351c0b2f7Stbbdev         : my_expected_items(expected_items), my_sleep_time(sleep_time)
33451c0b2f7Stbbdev     {
33551c0b2f7Stbbdev         is_active = !deferred;
33651c0b2f7Stbbdev         my_quit = false;
33751c0b2f7Stbbdev         std::thread( ServiceThreadBody( this ) ).swap( my_service_thread );
33851c0b2f7Stbbdev     }
33951c0b2f7Stbbdev 
34051c0b2f7Stbbdev private:
34151c0b2f7Stbbdev 
async_activity(const async_activity &)34251c0b2f7Stbbdev     async_activity( const async_activity& )
34351c0b2f7Stbbdev         : my_expected_items(UNKNOWN_NUMBER_OF_ITEMS), my_sleep_time(0)
34451c0b2f7Stbbdev     {
34551c0b2f7Stbbdev         is_active = true;
34651c0b2f7Stbbdev     }
34751c0b2f7Stbbdev 
34851c0b2f7Stbbdev public:
~async_activity()34951c0b2f7Stbbdev     ~async_activity() {
35051c0b2f7Stbbdev         stop();
35151c0b2f7Stbbdev         my_service_thread.join();
35251c0b2f7Stbbdev     }
35351c0b2f7Stbbdev 
submit(const input_type & input,gateway_type & gateway)35451c0b2f7Stbbdev     void submit( const input_type &input, gateway_type& gateway ) {
35551c0b2f7Stbbdev         work_type work = {input, &gateway};
35651c0b2f7Stbbdev         my_work_queue.push( work );
35751c0b2f7Stbbdev     }
35851c0b2f7Stbbdev 
process()35951c0b2f7Stbbdev     void process() {
36051c0b2f7Stbbdev         do {
36151c0b2f7Stbbdev             work_type work;
36251c0b2f7Stbbdev             if( is_active && my_work_queue.try_pop( work ) ) {
36351c0b2f7Stbbdev                 utils::Sleep(my_sleep_time);
36451c0b2f7Stbbdev                 ++async_activity_processed_msg_count;
36551c0b2f7Stbbdev                 output_type output;
36651c0b2f7Stbbdev                 wrapper_helper<output_type, output_type>::copy_value(work.input, output);
36751c0b2f7Stbbdev                 wrapper_helper<output_type, output_type>::check(work.input, output);
36851c0b2f7Stbbdev                 work.gateway->try_put(output);
36951c0b2f7Stbbdev                 if ( my_expected_items == UNKNOWN_NUMBER_OF_ITEMS ||
37051c0b2f7Stbbdev                      int(async_activity_processed_msg_count) == my_expected_items ) {
37151c0b2f7Stbbdev                     work.gateway->release_wait();
37251c0b2f7Stbbdev                 }
37351c0b2f7Stbbdev             }
37451c0b2f7Stbbdev         } while( my_quit == false || !my_work_queue.empty());
37551c0b2f7Stbbdev     }
37651c0b2f7Stbbdev 
stop()37751c0b2f7Stbbdev     void stop() {
37851c0b2f7Stbbdev         my_quit = true;
37951c0b2f7Stbbdev     }
38051c0b2f7Stbbdev 
activate()38151c0b2f7Stbbdev     void activate() {
38251c0b2f7Stbbdev         is_active = true;
38351c0b2f7Stbbdev     }
38451c0b2f7Stbbdev 
should_reserve_each_time()38551c0b2f7Stbbdev     bool should_reserve_each_time() {
38651c0b2f7Stbbdev         if ( my_expected_items == UNKNOWN_NUMBER_OF_ITEMS )
38751c0b2f7Stbbdev             return true;
38851c0b2f7Stbbdev         else
38951c0b2f7Stbbdev             return false;
39051c0b2f7Stbbdev     }
39151c0b2f7Stbbdev 
39251c0b2f7Stbbdev private:
39351c0b2f7Stbbdev 
39451c0b2f7Stbbdev     const int my_expected_items;
39551c0b2f7Stbbdev     const int my_sleep_time;
39651c0b2f7Stbbdev     std::atomic< bool > is_active;
39751c0b2f7Stbbdev 
39851c0b2f7Stbbdev     async_activity_queue<work_type> my_work_queue;
39951c0b2f7Stbbdev 
40051c0b2f7Stbbdev     std::atomic< bool > my_quit;
40151c0b2f7Stbbdev 
40251c0b2f7Stbbdev     std::thread my_service_thread;
40351c0b2f7Stbbdev };
40451c0b2f7Stbbdev 
40551c0b2f7Stbbdev template<typename Input, typename Output>
40651c0b2f7Stbbdev struct basic_test {
40751c0b2f7Stbbdev     typedef Input input_type;
40851c0b2f7Stbbdev     typedef Output output_type;
40951c0b2f7Stbbdev     typedef tbb::flow::async_node< input_type, output_type > async_node_type;
41051c0b2f7Stbbdev     typedef typename async_node_type::gateway_type gateway_type;
41151c0b2f7Stbbdev 
basic_testbasic_test41251c0b2f7Stbbdev     basic_test() {}
41351c0b2f7Stbbdev 
runbasic_test41451c0b2f7Stbbdev     static int run(int async_expected_items = UNKNOWN_NUMBER_OF_ITEMS) {
41551c0b2f7Stbbdev         async_activity<input_type, output_type> my_async_activity(async_expected_items);
41651c0b2f7Stbbdev 
41751c0b2f7Stbbdev         tbb::flow::graph g;
41851c0b2f7Stbbdev 
41951c0b2f7Stbbdev         tbb::flow::function_node< int, input_type > start_node(
42051c0b2f7Stbbdev             g, tbb::flow::unlimited, [](int input) { return input_type(input); }
42151c0b2f7Stbbdev         );
42251c0b2f7Stbbdev         async_node_type offload_node(
42351c0b2f7Stbbdev             g, tbb::flow::unlimited,
42451c0b2f7Stbbdev             [&] (const input_type &input, gateway_type& gateway) {
42551c0b2f7Stbbdev                 ++async_body_exec_count;
42651c0b2f7Stbbdev                 if(my_async_activity.should_reserve_each_time())
42751c0b2f7Stbbdev                     gateway.reserve_wait();
42851c0b2f7Stbbdev                 my_async_activity.submit(input, gateway);
42951c0b2f7Stbbdev             }
43051c0b2f7Stbbdev         );
43151c0b2f7Stbbdev         tbb::flow::function_node< output_type > end_node(
43251c0b2f7Stbbdev             g, tbb::flow::unlimited,
43351c0b2f7Stbbdev             [&](const output_type& input) {
43451c0b2f7Stbbdev                 ++end_body_exec_count;
43551c0b2f7Stbbdev                 output_type output;
43651c0b2f7Stbbdev                 wrapper_helper<output_type, output_type>::check(input, output);
43751c0b2f7Stbbdev             }
43851c0b2f7Stbbdev         );
43951c0b2f7Stbbdev 
44051c0b2f7Stbbdev         tbb::flow::make_edge( start_node, offload_node );
44151c0b2f7Stbbdev         tbb::flow::make_edge( offload_node, end_node );
44251c0b2f7Stbbdev 
44351c0b2f7Stbbdev         async_body_exec_count = 0;
44451c0b2f7Stbbdev         async_activity_processed_msg_count = 0;
44551c0b2f7Stbbdev         end_body_exec_count = 0;
44651c0b2f7Stbbdev 
44751c0b2f7Stbbdev         if (async_expected_items != UNKNOWN_NUMBER_OF_ITEMS) {
44851c0b2f7Stbbdev             offload_node.gateway().reserve_wait();
44951c0b2f7Stbbdev         }
45051c0b2f7Stbbdev         for (int i = 0; i < NUMBER_OF_MSGS; ++i) {
45151c0b2f7Stbbdev             start_node.try_put(i);
45251c0b2f7Stbbdev         }
45351c0b2f7Stbbdev         g.wait_for_all();
45451c0b2f7Stbbdev         CHECK_MESSAGE( ( async_body_exec_count == NUMBER_OF_MSGS), "AsyncBody processed wrong number of signals" );
45551c0b2f7Stbbdev         CHECK_MESSAGE( ( async_activity_processed_msg_count == NUMBER_OF_MSGS), "AsyncActivity processed wrong number of signals" );
45651c0b2f7Stbbdev         CHECK_MESSAGE( ( end_body_exec_count == NUMBER_OF_MSGS), "EndBody processed wrong number of signals");
45751c0b2f7Stbbdev         INFO( "async_body_exec_count == " << int(async_body_exec_count) <<
45851c0b2f7Stbbdev               " == async_activity_processed_msg_count == " << int(async_activity_processed_msg_count) <<
45951c0b2f7Stbbdev               " == end_body_exec_count == " << int(end_body_exec_count) << "\n"
46051c0b2f7Stbbdev         );
46151c0b2f7Stbbdev         return 0;
46251c0b2f7Stbbdev     }
46351c0b2f7Stbbdev 
46451c0b2f7Stbbdev };
46551c0b2f7Stbbdev 
test_copy_ctor()46651c0b2f7Stbbdev int test_copy_ctor() {
46751c0b2f7Stbbdev     const int N = NUMBER_OF_MSGS;
46851c0b2f7Stbbdev     async_body_exec_count = 0;
46951c0b2f7Stbbdev 
47051c0b2f7Stbbdev     tbb::flow::graph g;
47151c0b2f7Stbbdev 
47251c0b2f7Stbbdev     harness_counting_receiver<int> r1(g);
47351c0b2f7Stbbdev     harness_counting_receiver<int> r2(g);
47451c0b2f7Stbbdev 
47551c0b2f7Stbbdev     tbb::task_group_context graph_ctx;
47651c0b2f7Stbbdev     counting_async_node_type a(g, tbb::flow::unlimited, counting_async_unlimited_body(graph_ctx) );
47751c0b2f7Stbbdev     counting_async_node_type b(a);
47851c0b2f7Stbbdev 
47951c0b2f7Stbbdev     tbb::flow::make_edge(a, r1);                             // C++11-style of making edges
48051c0b2f7Stbbdev     tbb::flow::make_edge(tbb::flow::output_port<0>(b), r2);  // usual way of making edges
48151c0b2f7Stbbdev 
48251c0b2f7Stbbdev     for (int i = 0; i < N; ++i) {
48351c0b2f7Stbbdev        a.try_put(i);
48451c0b2f7Stbbdev     }
48551c0b2f7Stbbdev     g.wait_for_all();
48651c0b2f7Stbbdev 
48751c0b2f7Stbbdev     INFO("async_body_exec_count = " << int(async_body_exec_count) << "\n" );
48851c0b2f7Stbbdev     INFO("r1.my_count == " << int(r1.my_count) << " and r2.my_count = " << int(r2.my_count) << "\n" );
48951c0b2f7Stbbdev     CHECK_MESSAGE( ( int(async_body_exec_count) == NUMBER_OF_MSGS), "AsyncBody processed wrong number of signals" );
49051c0b2f7Stbbdev     CHECK_MESSAGE( ( int(r1.my_count) == N), "counting receiver r1 has not received N items" );
49151c0b2f7Stbbdev     CHECK_MESSAGE( ( int(r2.my_count) == 0), "counting receiver r2 has not received 0 items" );
49251c0b2f7Stbbdev 
49351c0b2f7Stbbdev     for (int i = 0; i < N; ++i) {
49451c0b2f7Stbbdev        b.try_put(i);
49551c0b2f7Stbbdev     }
49651c0b2f7Stbbdev     g.wait_for_all();
49751c0b2f7Stbbdev 
49851c0b2f7Stbbdev     INFO("async_body_exec_count = " << int(async_body_exec_count) << "\n" );
49951c0b2f7Stbbdev     INFO("r1.my_count == " << int(r1.my_count) << " and r2.my_count = " << int(r2.my_count) << "\n" );
50051c0b2f7Stbbdev     CHECK_MESSAGE( ( int(async_body_exec_count) == 2*NUMBER_OF_MSGS), "AsyncBody processed wrong number of signals" );
50151c0b2f7Stbbdev     CHECK_MESSAGE( ( int(r1.my_count) == N), "counting receiver r1 has not received N items" );
50251c0b2f7Stbbdev     CHECK_MESSAGE( ( int(r2.my_count) == N), "counting receiver r2 has not received N items" );
50351c0b2f7Stbbdev     return 0;
50451c0b2f7Stbbdev }
50551c0b2f7Stbbdev 
50651c0b2f7Stbbdev std::atomic<int> main_tid_count;
50751c0b2f7Stbbdev 
50851c0b2f7Stbbdev template<typename Input, typename Output>
50951c0b2f7Stbbdev struct spin_test {
51051c0b2f7Stbbdev     typedef Input input_type;
51151c0b2f7Stbbdev     typedef Output output_type;
51251c0b2f7Stbbdev     typedef tbb::flow::async_node< input_type, output_type > async_node_type;
51351c0b2f7Stbbdev     typedef typename async_node_type::gateway_type gateway_type;
51451c0b2f7Stbbdev 
51551c0b2f7Stbbdev     class end_body_type {
51651c0b2f7Stbbdev         typedef Output output_type;
51751c0b2f7Stbbdev         std::thread::id my_main_tid;
51851c0b2f7Stbbdev         utils::SpinBarrier *my_barrier;
51951c0b2f7Stbbdev     public:
end_body_type(std::thread::id t,utils::SpinBarrier & b)52051c0b2f7Stbbdev         end_body_type(std::thread::id t, utils::SpinBarrier &b) : my_main_tid(t), my_barrier(&b) { }
52151c0b2f7Stbbdev 
operator ()(const output_type &)52251c0b2f7Stbbdev         void operator()( const output_type & ) {
52351c0b2f7Stbbdev             ++end_body_exec_count;
52451c0b2f7Stbbdev             if (std::this_thread::get_id() == my_main_tid) {
52551c0b2f7Stbbdev                ++main_tid_count;
52651c0b2f7Stbbdev             }
527b15aabb3Stbbdev             my_barrier->wait();
52851c0b2f7Stbbdev         }
52951c0b2f7Stbbdev     };
53051c0b2f7Stbbdev 
spin_testspin_test53151c0b2f7Stbbdev     spin_test() {}
53251c0b2f7Stbbdev 
runspin_test53351c0b2f7Stbbdev     static int run(int nthreads, int async_expected_items = UNKNOWN_NUMBER_OF_ITEMS) {
53451c0b2f7Stbbdev         async_activity<input_type, output_type> my_async_activity(async_expected_items, false, 0);
53551c0b2f7Stbbdev         const int overall_message_count = nthreads * NUMBER_OF_MSGS;
53651c0b2f7Stbbdev         utils::SpinBarrier spin_barrier(nthreads);
53751c0b2f7Stbbdev 
53851c0b2f7Stbbdev         tbb::flow::graph g;
53951c0b2f7Stbbdev         tbb::flow::function_node<int, input_type> start_node(
54051c0b2f7Stbbdev             g, tbb::flow::unlimited, [](int input) { return input_type(input); }
54151c0b2f7Stbbdev         );
54251c0b2f7Stbbdev         async_node_type offload_node(
54351c0b2f7Stbbdev             g, tbb::flow::unlimited,
54451c0b2f7Stbbdev             [&](const input_type &input, gateway_type& gateway) {
54551c0b2f7Stbbdev                 ++async_body_exec_count;
54651c0b2f7Stbbdev                 if(my_async_activity.should_reserve_each_time())
54751c0b2f7Stbbdev                     gateway.reserve_wait();
54851c0b2f7Stbbdev                 my_async_activity.submit(input, gateway);
54951c0b2f7Stbbdev             }
55051c0b2f7Stbbdev         );
55151c0b2f7Stbbdev         tbb::flow::function_node<output_type> end_node(
55251c0b2f7Stbbdev             g, tbb::flow::unlimited, end_body_type(std::this_thread::get_id(), spin_barrier)
55351c0b2f7Stbbdev         );
55451c0b2f7Stbbdev 
55551c0b2f7Stbbdev         tbb::flow::make_edge( start_node, offload_node );
55651c0b2f7Stbbdev         tbb::flow::make_edge( offload_node, end_node );
55751c0b2f7Stbbdev 
55851c0b2f7Stbbdev         async_body_exec_count = 0;
55951c0b2f7Stbbdev         async_activity_processed_msg_count = 0;
56051c0b2f7Stbbdev         end_body_exec_count = 0;
56151c0b2f7Stbbdev         main_tid_count = 0;
56251c0b2f7Stbbdev 
56351c0b2f7Stbbdev         if (async_expected_items != UNKNOWN_NUMBER_OF_ITEMS ) {
56451c0b2f7Stbbdev             offload_node.gateway().reserve_wait();
56551c0b2f7Stbbdev         }
56651c0b2f7Stbbdev         for (int i = 0; i < overall_message_count; ++i) {
56751c0b2f7Stbbdev             start_node.try_put(i);
56851c0b2f7Stbbdev         }
56951c0b2f7Stbbdev         g.wait_for_all();
57051c0b2f7Stbbdev         CHECK_MESSAGE( (async_body_exec_count == overall_message_count),
57151c0b2f7Stbbdev                        "AsyncBody processed wrong number of signals" );
57251c0b2f7Stbbdev         CHECK_MESSAGE( (async_activity_processed_msg_count == overall_message_count),
57351c0b2f7Stbbdev                        "AsyncActivity processed wrong number of signals" );
57451c0b2f7Stbbdev         CHECK_MESSAGE( (end_body_exec_count == overall_message_count),
57551c0b2f7Stbbdev                        "EndBody processed wrong number of signals");
57651c0b2f7Stbbdev 
57751c0b2f7Stbbdev         INFO( "Main thread participated in " << main_tid_count << " end_body tasks\n");
57851c0b2f7Stbbdev 
57951c0b2f7Stbbdev         INFO("async_body_exec_count == " << int(async_body_exec_count) <<
58051c0b2f7Stbbdev              " == async_activity_processed_msg_count == " << int(async_activity_processed_msg_count) <<
58151c0b2f7Stbbdev              " == end_body_exec_count == " << int(end_body_exec_count) << "\n"
58251c0b2f7Stbbdev         );
58351c0b2f7Stbbdev         return 0;
58451c0b2f7Stbbdev     }
58551c0b2f7Stbbdev 
58651c0b2f7Stbbdev };
58751c0b2f7Stbbdev 
test_for_spin_avoidance()58851c0b2f7Stbbdev void test_for_spin_avoidance() {
58951c0b2f7Stbbdev     const int nthreads = 4;
59051c0b2f7Stbbdev     tbb::global_control gc(tbb::global_control::max_allowed_parallelism, nthreads);
591478de5b1Stbbdev     tbb::task_arena a(nthreads);
592478de5b1Stbbdev     a.execute([&] {
59351c0b2f7Stbbdev         spin_test<int, int>::run(nthreads);
594478de5b1Stbbdev     });
59551c0b2f7Stbbdev }
59651c0b2f7Stbbdev 
59751c0b2f7Stbbdev template< typename Input, typename Output >
run_tests()59851c0b2f7Stbbdev int run_tests() {
59951c0b2f7Stbbdev     basic_test<Input, Output>::run();
60051c0b2f7Stbbdev     basic_test<Input, Output>::run(NUMBER_OF_MSGS);
60151c0b2f7Stbbdev     basic_test<place_wrapper<Input>, place_wrapper<Output> >::run();
60251c0b2f7Stbbdev     basic_test<place_wrapper<Input>, place_wrapper<Output> >::run(NUMBER_OF_MSGS);
60351c0b2f7Stbbdev     return 0;
60451c0b2f7Stbbdev }
60551c0b2f7Stbbdev 
60651c0b2f7Stbbdev #include "tbb/parallel_for.h"
60751c0b2f7Stbbdev template<typename Input, typename Output>
6085ab8e5f6SVertexwahn class enqueueing_on_inner_level {
60951c0b2f7Stbbdev     typedef Input input_type;
61051c0b2f7Stbbdev     typedef Output output_type;
61151c0b2f7Stbbdev     typedef async_activity<input_type, output_type> async_activity_type;
61251c0b2f7Stbbdev     typedef tbb::flow::async_node<Input, Output> async_node_type;
61351c0b2f7Stbbdev     typedef typename async_node_type::gateway_type gateway_type;
61451c0b2f7Stbbdev 
61551c0b2f7Stbbdev     class body_graph_with_async {
61651c0b2f7Stbbdev     public:
body_graph_with_async(utils::SpinBarrier & barrier,async_activity_type & activity)61751c0b2f7Stbbdev         body_graph_with_async( utils::SpinBarrier& barrier, async_activity_type& activity )
61851c0b2f7Stbbdev             : spin_barrier(&barrier), my_async_activity(&activity) {}
61951c0b2f7Stbbdev 
operator ()(int) const62051c0b2f7Stbbdev         void operator()(int) const {
62151c0b2f7Stbbdev             tbb::flow::graph g;
62251c0b2f7Stbbdev             tbb::flow::function_node< int, input_type > start_node(
62351c0b2f7Stbbdev                 g, tbb::flow::unlimited, [](int input) { return input_type(input); }
62451c0b2f7Stbbdev             );
62551c0b2f7Stbbdev             async_node_type offload_node(
62651c0b2f7Stbbdev                 g, tbb::flow::unlimited,
62751c0b2f7Stbbdev                 [&](const input_type &input, gateway_type& gateway) {
62851c0b2f7Stbbdev                     gateway.reserve_wait();
62951c0b2f7Stbbdev                     my_async_activity->submit( input, gateway );
63051c0b2f7Stbbdev                 }
63151c0b2f7Stbbdev             );
63251c0b2f7Stbbdev             tbb::flow::function_node< output_type > end_node( g, tbb::flow::unlimited, [](output_type){} );
63351c0b2f7Stbbdev 
63451c0b2f7Stbbdev             tbb::flow::make_edge( start_node, offload_node );
63551c0b2f7Stbbdev             tbb::flow::make_edge( offload_node, end_node );
63651c0b2f7Stbbdev 
63751c0b2f7Stbbdev             start_node.try_put(1);
63851c0b2f7Stbbdev 
63951c0b2f7Stbbdev             spin_barrier->wait();
64051c0b2f7Stbbdev 
64151c0b2f7Stbbdev             my_async_activity->activate();
64251c0b2f7Stbbdev 
64351c0b2f7Stbbdev             g.wait_for_all();
64451c0b2f7Stbbdev         }
64551c0b2f7Stbbdev 
64651c0b2f7Stbbdev     private:
64751c0b2f7Stbbdev         utils::SpinBarrier* spin_barrier;
64851c0b2f7Stbbdev         async_activity_type* my_async_activity;
64951c0b2f7Stbbdev     };
65051c0b2f7Stbbdev 
65151c0b2f7Stbbdev public:
run()65251c0b2f7Stbbdev     static int run ()
65351c0b2f7Stbbdev     {
65451c0b2f7Stbbdev         const int nthreads = tbb::this_task_arena::max_concurrency();
65551c0b2f7Stbbdev         utils::SpinBarrier spin_barrier( nthreads );
65651c0b2f7Stbbdev 
65751c0b2f7Stbbdev         async_activity_type my_async_activity( UNKNOWN_NUMBER_OF_ITEMS, true );
65851c0b2f7Stbbdev 
65951c0b2f7Stbbdev         tbb::parallel_for( 0, nthreads, body_graph_with_async( spin_barrier, my_async_activity ) );
66051c0b2f7Stbbdev         return 0;
66151c0b2f7Stbbdev     }
66251c0b2f7Stbbdev };
66351c0b2f7Stbbdev 
run_test_enqueueing_on_inner_level()6645ab8e5f6SVertexwahn int run_test_enqueueing_on_inner_level() {
6655ab8e5f6SVertexwahn     enqueueing_on_inner_level<int, int>::run();
66651c0b2f7Stbbdev     return 0;
66751c0b2f7Stbbdev }
66851c0b2f7Stbbdev 
66951c0b2f7Stbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
67051c0b2f7Stbbdev #include <array>
67151c0b2f7Stbbdev 
67251c0b2f7Stbbdev template<typename NodeType>
67351c0b2f7Stbbdev class AsyncActivity {
67451c0b2f7Stbbdev public:
67551c0b2f7Stbbdev     using gateway_t = typename NodeType::gateway_type;
67651c0b2f7Stbbdev 
67751c0b2f7Stbbdev     struct work_type {
67851c0b2f7Stbbdev         int input;
67951c0b2f7Stbbdev         gateway_t* gateway;
68051c0b2f7Stbbdev     };
68151c0b2f7Stbbdev 
__anon62ab872d0a02() 682b15aabb3Stbbdev     AsyncActivity(size_t limit) : stop_limit(limit), c(0), thr([this]() {
68351c0b2f7Stbbdev         while(!end_of_work()) {
68451c0b2f7Stbbdev             work_type w;
68551c0b2f7Stbbdev             while( my_q.try_pop(w) ) {
68651c0b2f7Stbbdev                 int res = do_work(w.input);
68751c0b2f7Stbbdev                 w.gateway->try_put(res);
68851c0b2f7Stbbdev                 w.gateway->release_wait();
68951c0b2f7Stbbdev                 ++c;
69051c0b2f7Stbbdev             }
69151c0b2f7Stbbdev         }
692b15aabb3Stbbdev     }) {}
69351c0b2f7Stbbdev 
submit(int i,gateway_t * gateway)69451c0b2f7Stbbdev     void submit(int i, gateway_t* gateway) {
69551c0b2f7Stbbdev         work_type w = {i, gateway};
69651c0b2f7Stbbdev         gateway->reserve_wait();
69751c0b2f7Stbbdev         my_q.push(w);
69851c0b2f7Stbbdev     }
69951c0b2f7Stbbdev 
wait_for_all()70051c0b2f7Stbbdev     void wait_for_all() { thr.join(); }
70151c0b2f7Stbbdev 
70251c0b2f7Stbbdev private:
end_of_work()70351c0b2f7Stbbdev     bool end_of_work() { return c >= stop_limit; }
70451c0b2f7Stbbdev 
do_work(int & i)70551c0b2f7Stbbdev     int do_work(int& i) { return i + i; }
70651c0b2f7Stbbdev 
70751c0b2f7Stbbdev     async_activity_queue<work_type> my_q;
70851c0b2f7Stbbdev     size_t stop_limit;
70951c0b2f7Stbbdev     size_t c;
710b15aabb3Stbbdev     std::thread thr;
71151c0b2f7Stbbdev };
71251c0b2f7Stbbdev 
test_follows()71351c0b2f7Stbbdev void test_follows() {
71451c0b2f7Stbbdev     using namespace tbb::flow;
71551c0b2f7Stbbdev 
71651c0b2f7Stbbdev     using input_t = int;
71751c0b2f7Stbbdev     using output_t = int;
71851c0b2f7Stbbdev     using node_t = async_node<input_t, output_t>;
71951c0b2f7Stbbdev 
72051c0b2f7Stbbdev     graph g;
72151c0b2f7Stbbdev 
72251c0b2f7Stbbdev     AsyncActivity<node_t> async_activity(3);
72351c0b2f7Stbbdev 
72451c0b2f7Stbbdev     std::array<broadcast_node<input_t>, 3> preds = {
72551c0b2f7Stbbdev       {
72651c0b2f7Stbbdev         broadcast_node<input_t>(g),
72751c0b2f7Stbbdev         broadcast_node<input_t>(g),
72851c0b2f7Stbbdev         broadcast_node<input_t>(g)
72951c0b2f7Stbbdev       }
73051c0b2f7Stbbdev     };
73151c0b2f7Stbbdev 
73251c0b2f7Stbbdev     node_t node(follows(preds[0], preds[1], preds[2]), unlimited, [&](int input, node_t::gateway_type& gtw) {
73351c0b2f7Stbbdev         async_activity.submit(input, &gtw);
73451c0b2f7Stbbdev     }, no_priority);
73551c0b2f7Stbbdev 
73651c0b2f7Stbbdev     buffer_node<output_t> buf(g);
73751c0b2f7Stbbdev     make_edge(node, buf);
73851c0b2f7Stbbdev 
73951c0b2f7Stbbdev     for(auto& pred: preds) {
74051c0b2f7Stbbdev         pred.try_put(1);
74151c0b2f7Stbbdev     }
74251c0b2f7Stbbdev 
74351c0b2f7Stbbdev     g.wait_for_all();
74451c0b2f7Stbbdev     async_activity.wait_for_all();
74551c0b2f7Stbbdev 
74651c0b2f7Stbbdev     output_t storage;
74751c0b2f7Stbbdev     CHECK_MESSAGE((buf.try_get(storage) && buf.try_get(storage) && buf.try_get(storage) && !buf.try_get(storage)),
74851c0b2f7Stbbdev                   "Not exact edge quantity was made");
74951c0b2f7Stbbdev }
75051c0b2f7Stbbdev 
test_precedes()75151c0b2f7Stbbdev void test_precedes() {
75251c0b2f7Stbbdev     using namespace tbb::flow;
75351c0b2f7Stbbdev 
75451c0b2f7Stbbdev     using input_t = int;
75551c0b2f7Stbbdev     using output_t = int;
75651c0b2f7Stbbdev     using node_t = async_node<input_t, output_t>;
75751c0b2f7Stbbdev 
75851c0b2f7Stbbdev     graph g;
75951c0b2f7Stbbdev 
76051c0b2f7Stbbdev     AsyncActivity<node_t> async_activity(1);
76151c0b2f7Stbbdev 
76251c0b2f7Stbbdev     std::array<buffer_node<input_t>, 1> successors = { {buffer_node<input_t>(g)} };
76351c0b2f7Stbbdev 
76451c0b2f7Stbbdev     broadcast_node<input_t> start(g);
76551c0b2f7Stbbdev 
76651c0b2f7Stbbdev     node_t node(precedes(successors[0]), unlimited, [&](int input, node_t::gateway_type& gtw) {
76751c0b2f7Stbbdev         async_activity.submit(input, &gtw);
76851c0b2f7Stbbdev     }, no_priority);
76951c0b2f7Stbbdev 
77051c0b2f7Stbbdev     make_edge(start, node);
77151c0b2f7Stbbdev 
77251c0b2f7Stbbdev     start.try_put(1);
77351c0b2f7Stbbdev 
77451c0b2f7Stbbdev     g.wait_for_all();
77551c0b2f7Stbbdev     async_activity.wait_for_all();
77651c0b2f7Stbbdev 
77751c0b2f7Stbbdev     for(auto& successor : successors) {
77851c0b2f7Stbbdev         output_t storage;
77951c0b2f7Stbbdev         CHECK_MESSAGE((successor.try_get(storage) && !successor.try_get(storage)),
78051c0b2f7Stbbdev                       "Not exact edge quantity was made");
78151c0b2f7Stbbdev     }
78251c0b2f7Stbbdev }
78351c0b2f7Stbbdev 
test_follows_and_precedes_api()78451c0b2f7Stbbdev void test_follows_and_precedes_api() {
78551c0b2f7Stbbdev     test_follows();
78651c0b2f7Stbbdev     test_precedes();
78751c0b2f7Stbbdev }
78851c0b2f7Stbbdev #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
78951c0b2f7Stbbdev 
79051c0b2f7Stbbdev //! Test async bodies processing
79151c0b2f7Stbbdev //! \brief \ref requirement \ref error_guessing
79251c0b2f7Stbbdev TEST_CASE("Basic tests"){
79351c0b2f7Stbbdev     tbb::task_arena arena(utils::MaxThread);
79451c0b2f7Stbbdev     arena.execute(
__anon62ab872d0d02() 79551c0b2f7Stbbdev         [&]() {
79651c0b2f7Stbbdev             run_tests<int, int>();
79751c0b2f7Stbbdev             run_tests<minimal_type, minimal_type>();
79851c0b2f7Stbbdev             run_tests<int, minimal_type>();
79951c0b2f7Stbbdev         }
80051c0b2f7Stbbdev     );
80151c0b2f7Stbbdev }
80251c0b2f7Stbbdev 
80351c0b2f7Stbbdev //! NativeParallelFor test with various concurrency settings
80451c0b2f7Stbbdev //! \brief \ref requirement \ref error_guessing
80551c0b2f7Stbbdev TEST_CASE("Lightweight tests"){
80651c0b2f7Stbbdev     lightweight_testing::test<tbb::flow::async_node>(NUMBER_OF_MSGS);
80751c0b2f7Stbbdev }
80851c0b2f7Stbbdev 
80951c0b2f7Stbbdev //! Test reset and cancellation
81051c0b2f7Stbbdev //! \brief \ref error_guessing
81151c0b2f7Stbbdev TEST_CASE("Reset test"){
81251c0b2f7Stbbdev     test_reset();
81351c0b2f7Stbbdev }
81451c0b2f7Stbbdev 
81551c0b2f7Stbbdev //! Test
81651c0b2f7Stbbdev //! \brief \ref requirement \ref error_guessing
81751c0b2f7Stbbdev TEST_CASE("Copy constructor test"){
81851c0b2f7Stbbdev     test_copy_ctor();
81951c0b2f7Stbbdev }
82051c0b2f7Stbbdev 
82151c0b2f7Stbbdev //! Test if main thread spins
82251c0b2f7Stbbdev //! \brief \ref stress
82351c0b2f7Stbbdev TEST_CASE("Spin avoidance test"){
82451c0b2f7Stbbdev     test_for_spin_avoidance();
82551c0b2f7Stbbdev }
82651c0b2f7Stbbdev 
8275ab8e5f6SVertexwahn //! Test nested enqueuing
82851c0b2f7Stbbdev //! \brief \ref error_guessing
8295ab8e5f6SVertexwahn TEST_CASE("Inner enqueuing test"){
8305ab8e5f6SVertexwahn     run_test_enqueueing_on_inner_level();
83151c0b2f7Stbbdev }
83251c0b2f7Stbbdev 
83351c0b2f7Stbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
834*c4a799dfSJhaShweta1 //! Test deprecated follows and precedes API
83551c0b2f7Stbbdev //! \brief \ref error_guessing
836*c4a799dfSJhaShweta1 TEST_CASE("Test follows and precedes API"){
83751c0b2f7Stbbdev     test_follows_and_precedes_api();
83851c0b2f7Stbbdev }
83951c0b2f7Stbbdev #endif
840478de5b1Stbbdev 
841478de5b1Stbbdev #if __TBB_CPP20_CONCEPTS_PRESENT
842478de5b1Stbbdev //! \brief \ref error_guessing
843478de5b1Stbbdev TEST_CASE("constraints for async_node input") {
844478de5b1Stbbdev     struct InputObject {
845478de5b1Stbbdev         InputObject() = default;
846478de5b1Stbbdev         InputObject( const InputObject& ) = default;
847478de5b1Stbbdev     };
848478de5b1Stbbdev 
849478de5b1Stbbdev     static_assert(utils::well_formed_instantiation<tbb::flow::async_node, InputObject, int>);
850478de5b1Stbbdev     static_assert(utils::well_formed_instantiation<tbb::flow::async_node, int, int>);
851478de5b1Stbbdev     static_assert(!utils::well_formed_instantiation<tbb::flow::async_node, test_concepts::NonCopyable, int>);
852478de5b1Stbbdev     static_assert(!utils::well_formed_instantiation<tbb::flow::async_node, test_concepts::NonDefaultInitializable, int>);
853478de5b1Stbbdev }
854478de5b1Stbbdev 
855478de5b1Stbbdev template <typename Input, typename Output, typename Body>
856478de5b1Stbbdev concept can_call_async_node_ctor = requires( tbb::flow::graph& graph, std::size_t concurrency,
857478de5b1Stbbdev                                              Body body, tbb::flow::node_priority_t priority, tbb::flow::buffer_node<int>& f ) {
858478de5b1Stbbdev     tbb::flow::async_node<Input, Output>(graph, concurrency, body);
859478de5b1Stbbdev     tbb::flow::async_node<Input, Output>(graph, concurrency, body, priority);
860478de5b1Stbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
861478de5b1Stbbdev     tbb::flow::async_node<Input, Output>(tbb::flow::follows(f), concurrency, body);
862478de5b1Stbbdev     tbb::flow::async_node<Input, Output>(tbb::flow::follows(f), concurrency, body, priority);
863478de5b1Stbbdev #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
864478de5b1Stbbdev };
865478de5b1Stbbdev 
866478de5b1Stbbdev //! \brief \ref error_guessing
867478de5b1Stbbdev TEST_CASE("constraints for async_node body") {
868478de5b1Stbbdev     using input_type = int;
869478de5b1Stbbdev     using output_type = input_type;
870478de5b1Stbbdev     using namespace test_concepts::async_node_body;
871478de5b1Stbbdev 
872478de5b1Stbbdev     static_assert(can_call_async_node_ctor<input_type, output_type, Correct<input_type, output_type>>);
873478de5b1Stbbdev     static_assert(!can_call_async_node_ctor<input_type, output_type, NonCopyable<input_type, output_type>>);
874478de5b1Stbbdev     static_assert(!can_call_async_node_ctor<input_type, output_type, NonDestructible<input_type, output_type>>);
875478de5b1Stbbdev     static_assert(!can_call_async_node_ctor<input_type, output_type, NoOperatorRoundBrackets<input_type, output_type>>);
876478de5b1Stbbdev     static_assert(!can_call_async_node_ctor<input_type, output_type, WrongFirstInputOperatorRoundBrackets<input_type, output_type>>);
877478de5b1Stbbdev     static_assert(!can_call_async_node_ctor<input_type, output_type, WrongSecondInputOperatorRoundBrackets<input_type, output_type>>);
878478de5b1Stbbdev }
879478de5b1Stbbdev 
880478de5b1Stbbdev #endif // __TBB_CPP20_CONCEPTS_PRESENT
881