151c0b2f7Stbbdev /*
2*c4a799dfSJhaShweta1 Copyright (c) 2005-2023 Intel Corporation
351c0b2f7Stbbdev
451c0b2f7Stbbdev Licensed under the Apache License, Version 2.0 (the "License");
551c0b2f7Stbbdev you may not use this file except in compliance with the License.
651c0b2f7Stbbdev You may obtain a copy of the License at
751c0b2f7Stbbdev
851c0b2f7Stbbdev http://www.apache.org/licenses/LICENSE-2.0
951c0b2f7Stbbdev
1051c0b2f7Stbbdev Unless required by applicable law or agreed to in writing, software
1151c0b2f7Stbbdev distributed under the License is distributed on an "AS IS" BASIS,
1251c0b2f7Stbbdev WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1351c0b2f7Stbbdev See the License for the specific language governing permissions and
1451c0b2f7Stbbdev limitations under the License.
1551c0b2f7Stbbdev */
1651c0b2f7Stbbdev
17b15aabb3Stbbdev #if __INTEL_COMPILER && _MSC_VER
18b15aabb3Stbbdev #pragma warning(disable : 2586) // decorated name length exceeded, name was truncated
19b15aabb3Stbbdev #endif
2051c0b2f7Stbbdev
2151c0b2f7Stbbdev #include "common/config.h"
2251c0b2f7Stbbdev
2351c0b2f7Stbbdev #include "tbb/flow_graph.h"
2451c0b2f7Stbbdev
2551c0b2f7Stbbdev #include "tbb/task.h"
2651c0b2f7Stbbdev #include "tbb/global_control.h"
2751c0b2f7Stbbdev
2851c0b2f7Stbbdev #include "common/test.h"
2951c0b2f7Stbbdev #include "common/utils.h"
3051c0b2f7Stbbdev #include "common/utils_assert.h"
3151c0b2f7Stbbdev #include "common/graph_utils.h"
3251c0b2f7Stbbdev #include "common/spin_barrier.h"
3351c0b2f7Stbbdev #include "common/test_follows_and_precedes_api.h"
34478de5b1Stbbdev #include "common/concepts_common.h"
3551c0b2f7Stbbdev
3651c0b2f7Stbbdev #include <string>
3751c0b2f7Stbbdev #include <thread>
3851c0b2f7Stbbdev #include <mutex>
3951c0b2f7Stbbdev
4051c0b2f7Stbbdev
4151c0b2f7Stbbdev //! \file test_async_node.cpp
4251c0b2f7Stbbdev //! \brief Test for [flow_graph.async_node] specification
4351c0b2f7Stbbdev
4451c0b2f7Stbbdev
4551c0b2f7Stbbdev class minimal_type {
4651c0b2f7Stbbdev template<typename T>
4751c0b2f7Stbbdev friend struct place_wrapper;
4851c0b2f7Stbbdev
4951c0b2f7Stbbdev int value;
5051c0b2f7Stbbdev
5151c0b2f7Stbbdev public:
minimal_type()5251c0b2f7Stbbdev minimal_type() : value(-1) {}
minimal_type(int v)5351c0b2f7Stbbdev minimal_type(int v) : value(v) {}
minimal_type(const minimal_type & m)5451c0b2f7Stbbdev minimal_type(const minimal_type &m) : value(m.value) { }
operator =(const minimal_type & m)5551c0b2f7Stbbdev minimal_type &operator=(const minimal_type &m) { value = m.value; return *this; }
5651c0b2f7Stbbdev };
5751c0b2f7Stbbdev
5851c0b2f7Stbbdev template <typename T>
5951c0b2f7Stbbdev struct place_wrapper {
6051c0b2f7Stbbdev typedef T wrapped_type;
6151c0b2f7Stbbdev T value;
6251c0b2f7Stbbdev std::thread::id thread_id;
6351c0b2f7Stbbdev
place_wrapperplace_wrapper6451c0b2f7Stbbdev place_wrapper( int v = 0 ) : value(v), thread_id(std::this_thread::get_id()) {}
6551c0b2f7Stbbdev
6651c0b2f7Stbbdev template <typename Q>
place_wrapperplace_wrapper6751c0b2f7Stbbdev place_wrapper(const place_wrapper<Q>& v)
6851c0b2f7Stbbdev : value(v.value), thread_id(v.thread_id)
6951c0b2f7Stbbdev {}
7051c0b2f7Stbbdev
7151c0b2f7Stbbdev template <typename Q>
operator =place_wrapper7251c0b2f7Stbbdev place_wrapper<Q>& operator=(const place_wrapper<Q>& v) {
7351c0b2f7Stbbdev if (this != &v) {
7451c0b2f7Stbbdev value = v.value;
7551c0b2f7Stbbdev thread_id = v.thread_id;
7651c0b2f7Stbbdev }
7751c0b2f7Stbbdev return *this;
7851c0b2f7Stbbdev }
7951c0b2f7Stbbdev
8051c0b2f7Stbbdev };
8151c0b2f7Stbbdev
8251c0b2f7Stbbdev template<typename T1, typename T2>
8351c0b2f7Stbbdev struct wrapper_helper {
checkwrapper_helper8451c0b2f7Stbbdev static void check(const T1 &, const T2 &) { }
copy_valuewrapper_helper8551c0b2f7Stbbdev static void copy_value(const T1 &in, T2 &out) { out = in; }
8651c0b2f7Stbbdev };
8751c0b2f7Stbbdev
8851c0b2f7Stbbdev template<typename T1, typename T2>
8951c0b2f7Stbbdev struct wrapper_helper< place_wrapper<T1>, place_wrapper<T2> > {
checkwrapper_helper9051c0b2f7Stbbdev static void check(const place_wrapper<T1> &a, const place_wrapper<T2> &b) {
9151c0b2f7Stbbdev CHECK_MESSAGE( ( (a.thread_id != b.thread_id)), "same thread used to execute adjacent nodes");
9251c0b2f7Stbbdev return;
9351c0b2f7Stbbdev }
copy_valuewrapper_helper9451c0b2f7Stbbdev static void copy_value(const place_wrapper<T1> &in, place_wrapper<T2> &out) {
9551c0b2f7Stbbdev out.value = in.value;
9651c0b2f7Stbbdev }
9751c0b2f7Stbbdev };
9851c0b2f7Stbbdev
9951c0b2f7Stbbdev const int NUMBER_OF_MSGS = 10;
10051c0b2f7Stbbdev const int UNKNOWN_NUMBER_OF_ITEMS = -1;
10151c0b2f7Stbbdev std::atomic<int> async_body_exec_count;
10251c0b2f7Stbbdev std::atomic<int> async_activity_processed_msg_count;
10351c0b2f7Stbbdev std::atomic<int> end_body_exec_count;
10451c0b2f7Stbbdev
10551c0b2f7Stbbdev // queueing required in test_reset for testing of cancellation
10651c0b2f7Stbbdev typedef tbb::flow::async_node< int, int, tbb::flow::queueing > counting_async_node_type;
10751c0b2f7Stbbdev typedef counting_async_node_type::gateway_type counting_gateway_type;
10851c0b2f7Stbbdev
10951c0b2f7Stbbdev struct counting_async_unlimited_body {
11051c0b2f7Stbbdev
counting_async_unlimited_bodycounting_async_unlimited_body11151c0b2f7Stbbdev counting_async_unlimited_body(tbb::task_group_context& graph_tgc) : my_tgc( graph_tgc ) {}
11251c0b2f7Stbbdev
operator ()counting_async_unlimited_body11351c0b2f7Stbbdev void operator()( const int &input, counting_gateway_type& gateway) {
11451c0b2f7Stbbdev // TODO revamp: reconsider logging for the tests. It is known that frequent calls to
11551c0b2f7Stbbdev // doctest's INFO cause issues.
11651c0b2f7Stbbdev
11751c0b2f7Stbbdev // INFO( "Body execution with input == " << input << "\n");
11851c0b2f7Stbbdev ++async_body_exec_count;
11951c0b2f7Stbbdev if ( input == -1 ) {
12051c0b2f7Stbbdev bool result = my_tgc.cancel_group_execution();
12151c0b2f7Stbbdev // INFO( "Canceling graph execution\n" );
12251c0b2f7Stbbdev CHECK_MESSAGE( ( result == true), "attempted to cancel graph twice" );
12351c0b2f7Stbbdev utils::Sleep(50);
12451c0b2f7Stbbdev }
12551c0b2f7Stbbdev gateway.try_put(input);
12651c0b2f7Stbbdev }
12751c0b2f7Stbbdev private:
12851c0b2f7Stbbdev tbb::task_group_context& my_tgc;
12951c0b2f7Stbbdev };
13051c0b2f7Stbbdev
13151c0b2f7Stbbdev struct counting_async_serial_body : counting_async_unlimited_body {
13251c0b2f7Stbbdev typedef counting_async_unlimited_body base_type;
13351c0b2f7Stbbdev int my_async_body_exec_count;
13451c0b2f7Stbbdev
counting_async_serial_bodycounting_async_serial_body13551c0b2f7Stbbdev counting_async_serial_body(tbb::task_group_context& tgc)
13651c0b2f7Stbbdev : base_type(tgc), my_async_body_exec_count( 0 ) { }
13751c0b2f7Stbbdev
operator ()counting_async_serial_body13851c0b2f7Stbbdev void operator()( const int &input, counting_gateway_type& gateway ) {
13951c0b2f7Stbbdev ++my_async_body_exec_count;
14051c0b2f7Stbbdev base_type::operator()( input, gateway );
14151c0b2f7Stbbdev }
14251c0b2f7Stbbdev };
14351c0b2f7Stbbdev
test_reset()14451c0b2f7Stbbdev void test_reset() {
14551c0b2f7Stbbdev const int N = NUMBER_OF_MSGS;
14651c0b2f7Stbbdev async_body_exec_count = 0;
14751c0b2f7Stbbdev
14851c0b2f7Stbbdev tbb::task_group_context graph_ctx;
14951c0b2f7Stbbdev tbb::flow::graph g(graph_ctx);
15051c0b2f7Stbbdev counting_async_node_type a(g, tbb::flow::serial, counting_async_serial_body(graph_ctx) );
15151c0b2f7Stbbdev
15251c0b2f7Stbbdev const int R = 3;
15351c0b2f7Stbbdev std::vector< std::shared_ptr<harness_counting_receiver<int>> > r;
15451c0b2f7Stbbdev for (size_t i = 0; i < R; ++i) {
15551c0b2f7Stbbdev r.push_back( std::make_shared<harness_counting_receiver<int>>(g) );
15651c0b2f7Stbbdev }
15751c0b2f7Stbbdev
15851c0b2f7Stbbdev for (int i = 0; i < R; ++i) {
15951c0b2f7Stbbdev tbb::flow::make_edge(a, *r[i]);
16051c0b2f7Stbbdev }
16151c0b2f7Stbbdev
16251c0b2f7Stbbdev INFO( "One body execution\n" );
16351c0b2f7Stbbdev a.try_put(-1);
16451c0b2f7Stbbdev for (int i = 0; i < N; ++i) {
16551c0b2f7Stbbdev a.try_put(i);
16651c0b2f7Stbbdev }
16751c0b2f7Stbbdev g.wait_for_all();
16851c0b2f7Stbbdev // should be canceled with only 1 item reaching the async_body and the counting receivers
16951c0b2f7Stbbdev // and N items left in the node's queue
17051c0b2f7Stbbdev CHECK_MESSAGE( ( g.is_cancelled() == true), "task group not canceled" );
17151c0b2f7Stbbdev
17251c0b2f7Stbbdev counting_async_serial_body b1 = tbb::flow::copy_body<counting_async_serial_body>(a);
17351c0b2f7Stbbdev CHECK_MESSAGE( ( int(async_body_exec_count) == int(b1.my_async_body_exec_count)), "body and global body counts are different" );
17451c0b2f7Stbbdev CHECK_MESSAGE( ( int(async_body_exec_count) == 1), "global body execution count not 1" );
17551c0b2f7Stbbdev for (int i = 0; i < R; ++i) {
17651c0b2f7Stbbdev CHECK_MESSAGE( ( int(r[i]->my_count) == 1), "counting receiver count not 1" );
17751c0b2f7Stbbdev }
17851c0b2f7Stbbdev
17951c0b2f7Stbbdev // should clear the async_node queue, but retain its local count at 1 and keep all edges
18051c0b2f7Stbbdev g.reset(tbb::flow::rf_reset_protocol);
18151c0b2f7Stbbdev
18251c0b2f7Stbbdev INFO( "N body executions\n" );
18351c0b2f7Stbbdev for (int i = 0; i < N; ++i) {
18451c0b2f7Stbbdev a.try_put(i);
18551c0b2f7Stbbdev }
18651c0b2f7Stbbdev g.wait_for_all();
18751c0b2f7Stbbdev CHECK_MESSAGE( ( g.is_cancelled() == false), "task group not canceled" );
18851c0b2f7Stbbdev
18951c0b2f7Stbbdev // a total of N+1 items should have passed through the node body
19051c0b2f7Stbbdev // the local body count should also be N+1
19151c0b2f7Stbbdev // and the counting receivers should all have a count of N+1
19251c0b2f7Stbbdev counting_async_serial_body b2 = tbb::flow::copy_body<counting_async_serial_body>(a);
19351c0b2f7Stbbdev CHECK_MESSAGE( int(async_body_exec_count) == int(b2.my_async_body_exec_count),
19451c0b2f7Stbbdev "local and global body execution counts are different" );
19551c0b2f7Stbbdev INFO( "async_body_exec_count==" << int(async_body_exec_count) << "\n" );
19651c0b2f7Stbbdev CHECK_MESSAGE( ( int(async_body_exec_count) == N+1), "global body execution count not N+1" );
19751c0b2f7Stbbdev for (int i = 0; i < R; ++i) {
19851c0b2f7Stbbdev CHECK_MESSAGE( ( int(r[i]->my_count) == N+1), "counting receiver has not received N+1 items" );
19951c0b2f7Stbbdev }
20051c0b2f7Stbbdev
20151c0b2f7Stbbdev INFO( "N body executions with new bodies\n" );
20251c0b2f7Stbbdev // should clear the async_node queue and reset its local count to 0, but keep all edges
20351c0b2f7Stbbdev g.reset(tbb::flow::rf_reset_bodies);
20451c0b2f7Stbbdev for (int i = 0; i < N; ++i) {
20551c0b2f7Stbbdev a.try_put(i);
20651c0b2f7Stbbdev }
20751c0b2f7Stbbdev g.wait_for_all();
20851c0b2f7Stbbdev CHECK_MESSAGE( ( g.is_cancelled() == false), "task group not canceled" );
20951c0b2f7Stbbdev
21051c0b2f7Stbbdev // a total of 2N+1 items should have passed through the node body
21151c0b2f7Stbbdev // the local body count should be N
21251c0b2f7Stbbdev // and the counting receivers should all have a count of 2N+1
21351c0b2f7Stbbdev counting_async_serial_body b3 = tbb::flow::copy_body<counting_async_serial_body>(a);
21451c0b2f7Stbbdev CHECK_MESSAGE( ( int(async_body_exec_count) == 2*N+1), "global body execution count not 2N+1" );
21551c0b2f7Stbbdev CHECK_MESSAGE( ( int(b3.my_async_body_exec_count) == N), "local body execution count not N" );
21651c0b2f7Stbbdev for (int i = 0; i < R; ++i) {
21751c0b2f7Stbbdev CHECK_MESSAGE( ( int(r[i]->my_count) == 2*N+1), "counting receiver has not received 2N+1 items" );
21851c0b2f7Stbbdev }
21951c0b2f7Stbbdev
22051c0b2f7Stbbdev // should clear the async_node queue and keep its local count at N and remove all edges
22151c0b2f7Stbbdev INFO( "N body executions with no edges\n" );
22251c0b2f7Stbbdev g.reset(tbb::flow::rf_clear_edges);
22351c0b2f7Stbbdev for (int i = 0; i < N; ++i) {
22451c0b2f7Stbbdev a.try_put(i);
22551c0b2f7Stbbdev }
22651c0b2f7Stbbdev g.wait_for_all();
22751c0b2f7Stbbdev CHECK_MESSAGE( ( g.is_cancelled() == false), "task group not canceled" );
22851c0b2f7Stbbdev
22951c0b2f7Stbbdev // a total of 3N+1 items should have passed through the node body
23051c0b2f7Stbbdev // the local body count should now be 2*N
23151c0b2f7Stbbdev // and the counting receivers should remain at a count of 2N+1
23251c0b2f7Stbbdev counting_async_serial_body b4 = tbb::flow::copy_body<counting_async_serial_body>(a);
23351c0b2f7Stbbdev CHECK_MESSAGE( ( int(async_body_exec_count) == 3*N+1), "global body execution count not 3N+1" );
23451c0b2f7Stbbdev CHECK_MESSAGE( ( int(b4.my_async_body_exec_count) == 2*N), "local body execution count not 2N" );
23551c0b2f7Stbbdev for (int i = 0; i < R; ++i) {
23651c0b2f7Stbbdev CHECK_MESSAGE( ( int(r[i]->my_count) == 2*N+1), "counting receiver has not received 2N+1 items" );
23751c0b2f7Stbbdev }
23851c0b2f7Stbbdev
23951c0b2f7Stbbdev // put back 1 edge to receiver 0
24051c0b2f7Stbbdev INFO( "N body executions with 1 edge\n" );
24151c0b2f7Stbbdev tbb::flow::make_edge(a, *r[0]);
24251c0b2f7Stbbdev for (int i = 0; i < N; ++i) {
24351c0b2f7Stbbdev a.try_put(i);
24451c0b2f7Stbbdev }
24551c0b2f7Stbbdev g.wait_for_all();
24651c0b2f7Stbbdev CHECK_MESSAGE( ( g.is_cancelled() == false), "task group not canceled" );
24751c0b2f7Stbbdev
24851c0b2f7Stbbdev // a total of 4N+1 items should have passed through the node body
24951c0b2f7Stbbdev // the local body count should now be 3*N
25051c0b2f7Stbbdev // and all of the counting receivers should remain at a count of 2N+1, except r[0] which should be 3N+1
25151c0b2f7Stbbdev counting_async_serial_body b5 = tbb::flow::copy_body<counting_async_serial_body>(a);
25251c0b2f7Stbbdev CHECK_MESSAGE( ( int(async_body_exec_count) == 4*N+1), "global body execution count not 4N+1" );
25351c0b2f7Stbbdev CHECK_MESSAGE( ( int(b5.my_async_body_exec_count) == 3*N), "local body execution count not 3N" );
25451c0b2f7Stbbdev CHECK_MESSAGE( ( int(r[0]->my_count) == 3*N+1), "counting receiver has not received 3N+1 items" );
25551c0b2f7Stbbdev for (int i = 1; i < R; ++i) {
25651c0b2f7Stbbdev CHECK_MESSAGE( ( int(r[i]->my_count) == 2*N+1), "counting receiver has not received 2N+1 items" );
25751c0b2f7Stbbdev }
25851c0b2f7Stbbdev
25951c0b2f7Stbbdev // should clear the async_node queue and keep its local count at N and remove all edges
26051c0b2f7Stbbdev INFO( "N body executions with no edges and new body\n" );
26151c0b2f7Stbbdev g.reset(static_cast<tbb::flow::reset_flags>(tbb::flow::rf_reset_bodies|tbb::flow::rf_clear_edges));
26251c0b2f7Stbbdev for (int i = 0; i < N; ++i) {
26351c0b2f7Stbbdev a.try_put(i);
26451c0b2f7Stbbdev }
26551c0b2f7Stbbdev g.wait_for_all();
26651c0b2f7Stbbdev CHECK_MESSAGE( ( g.is_cancelled() == false), "task group not canceled" );
26751c0b2f7Stbbdev
26851c0b2f7Stbbdev // a total of 4N+1 items should have passed through the node body
26951c0b2f7Stbbdev // the local body count should now be 3*N
27051c0b2f7Stbbdev // and all of the counting receivers should remain at a count of 2N+1, except r[0] which should be 3N+1
27151c0b2f7Stbbdev counting_async_serial_body b6 = tbb::flow::copy_body<counting_async_serial_body>(a);
27251c0b2f7Stbbdev CHECK_MESSAGE( ( int(async_body_exec_count) == 5*N+1), "global body execution count not 5N+1" );
27351c0b2f7Stbbdev CHECK_MESSAGE( ( int(b6.my_async_body_exec_count) == N), "local body execution count not N" );
27451c0b2f7Stbbdev CHECK_MESSAGE( ( int(r[0]->my_count) == 3*N+1), "counting receiver has not received 3N+1 items" );
27551c0b2f7Stbbdev for (int i = 1; i < R; ++i) {
27651c0b2f7Stbbdev CHECK_MESSAGE( ( int(r[i]->my_count) == 2*N+1), "counting receiver has not received 2N+1 items" );
27751c0b2f7Stbbdev }
27851c0b2f7Stbbdev }
27951c0b2f7Stbbdev
28051c0b2f7Stbbdev
28151c0b2f7Stbbdev #include <mutex>
28251c0b2f7Stbbdev
28351c0b2f7Stbbdev template <typename T>
28451c0b2f7Stbbdev class async_activity_queue {
28551c0b2f7Stbbdev public:
push(const T & item)28651c0b2f7Stbbdev void push( const T& item ) {
28751c0b2f7Stbbdev std::lock_guard<mutex_t> lock( m_mutex );
28851c0b2f7Stbbdev m_queue.push( item );
28951c0b2f7Stbbdev }
29051c0b2f7Stbbdev
try_pop(T & item)29151c0b2f7Stbbdev bool try_pop( T& item ) {
29251c0b2f7Stbbdev std::lock_guard<mutex_t> lock( m_mutex );
29351c0b2f7Stbbdev if( m_queue.empty() )
29451c0b2f7Stbbdev return false;
29551c0b2f7Stbbdev item = m_queue.front();
29651c0b2f7Stbbdev m_queue.pop();
29751c0b2f7Stbbdev return true;
29851c0b2f7Stbbdev }
29951c0b2f7Stbbdev
empty()30051c0b2f7Stbbdev bool empty() {
30151c0b2f7Stbbdev std::lock_guard<mutex_t> lock( m_mutex );
30251c0b2f7Stbbdev return m_queue.empty();
30351c0b2f7Stbbdev }
30451c0b2f7Stbbdev
30551c0b2f7Stbbdev private:
30651c0b2f7Stbbdev typedef std::mutex mutex_t;
30751c0b2f7Stbbdev mutex_t m_mutex;
30851c0b2f7Stbbdev std::queue<T> m_queue;
30951c0b2f7Stbbdev };
31051c0b2f7Stbbdev
31151c0b2f7Stbbdev template< typename Input, typename Output >
31251c0b2f7Stbbdev class async_activity : utils::NoAssign {
31351c0b2f7Stbbdev public:
31451c0b2f7Stbbdev typedef Input input_type;
31551c0b2f7Stbbdev typedef Output output_type;
31651c0b2f7Stbbdev typedef tbb::flow::async_node< input_type, output_type > async_node_type;
31751c0b2f7Stbbdev typedef typename async_node_type::gateway_type gateway_type;
31851c0b2f7Stbbdev
31951c0b2f7Stbbdev struct work_type {
32051c0b2f7Stbbdev input_type input;
32151c0b2f7Stbbdev gateway_type* gateway;
32251c0b2f7Stbbdev };
32351c0b2f7Stbbdev
32451c0b2f7Stbbdev class ServiceThreadBody {
32551c0b2f7Stbbdev public:
ServiceThreadBody(async_activity * activity)32651c0b2f7Stbbdev ServiceThreadBody( async_activity* activity ) : my_activity( activity ) {}
operator ()()32751c0b2f7Stbbdev void operator()() { my_activity->process(); }
32851c0b2f7Stbbdev private:
32951c0b2f7Stbbdev async_activity* my_activity;
33051c0b2f7Stbbdev };
33151c0b2f7Stbbdev
async_activity(int expected_items,bool deferred=false,int sleep_time=50)33251c0b2f7Stbbdev async_activity(int expected_items, bool deferred = false, int sleep_time = 50)
33351c0b2f7Stbbdev : my_expected_items(expected_items), my_sleep_time(sleep_time)
33451c0b2f7Stbbdev {
33551c0b2f7Stbbdev is_active = !deferred;
33651c0b2f7Stbbdev my_quit = false;
33751c0b2f7Stbbdev std::thread( ServiceThreadBody( this ) ).swap( my_service_thread );
33851c0b2f7Stbbdev }
33951c0b2f7Stbbdev
34051c0b2f7Stbbdev private:
34151c0b2f7Stbbdev
async_activity(const async_activity &)34251c0b2f7Stbbdev async_activity( const async_activity& )
34351c0b2f7Stbbdev : my_expected_items(UNKNOWN_NUMBER_OF_ITEMS), my_sleep_time(0)
34451c0b2f7Stbbdev {
34551c0b2f7Stbbdev is_active = true;
34651c0b2f7Stbbdev }
34751c0b2f7Stbbdev
34851c0b2f7Stbbdev public:
~async_activity()34951c0b2f7Stbbdev ~async_activity() {
35051c0b2f7Stbbdev stop();
35151c0b2f7Stbbdev my_service_thread.join();
35251c0b2f7Stbbdev }
35351c0b2f7Stbbdev
submit(const input_type & input,gateway_type & gateway)35451c0b2f7Stbbdev void submit( const input_type &input, gateway_type& gateway ) {
35551c0b2f7Stbbdev work_type work = {input, &gateway};
35651c0b2f7Stbbdev my_work_queue.push( work );
35751c0b2f7Stbbdev }
35851c0b2f7Stbbdev
process()35951c0b2f7Stbbdev void process() {
36051c0b2f7Stbbdev do {
36151c0b2f7Stbbdev work_type work;
36251c0b2f7Stbbdev if( is_active && my_work_queue.try_pop( work ) ) {
36351c0b2f7Stbbdev utils::Sleep(my_sleep_time);
36451c0b2f7Stbbdev ++async_activity_processed_msg_count;
36551c0b2f7Stbbdev output_type output;
36651c0b2f7Stbbdev wrapper_helper<output_type, output_type>::copy_value(work.input, output);
36751c0b2f7Stbbdev wrapper_helper<output_type, output_type>::check(work.input, output);
36851c0b2f7Stbbdev work.gateway->try_put(output);
36951c0b2f7Stbbdev if ( my_expected_items == UNKNOWN_NUMBER_OF_ITEMS ||
37051c0b2f7Stbbdev int(async_activity_processed_msg_count) == my_expected_items ) {
37151c0b2f7Stbbdev work.gateway->release_wait();
37251c0b2f7Stbbdev }
37351c0b2f7Stbbdev }
37451c0b2f7Stbbdev } while( my_quit == false || !my_work_queue.empty());
37551c0b2f7Stbbdev }
37651c0b2f7Stbbdev
stop()37751c0b2f7Stbbdev void stop() {
37851c0b2f7Stbbdev my_quit = true;
37951c0b2f7Stbbdev }
38051c0b2f7Stbbdev
activate()38151c0b2f7Stbbdev void activate() {
38251c0b2f7Stbbdev is_active = true;
38351c0b2f7Stbbdev }
38451c0b2f7Stbbdev
should_reserve_each_time()38551c0b2f7Stbbdev bool should_reserve_each_time() {
38651c0b2f7Stbbdev if ( my_expected_items == UNKNOWN_NUMBER_OF_ITEMS )
38751c0b2f7Stbbdev return true;
38851c0b2f7Stbbdev else
38951c0b2f7Stbbdev return false;
39051c0b2f7Stbbdev }
39151c0b2f7Stbbdev
39251c0b2f7Stbbdev private:
39351c0b2f7Stbbdev
39451c0b2f7Stbbdev const int my_expected_items;
39551c0b2f7Stbbdev const int my_sleep_time;
39651c0b2f7Stbbdev std::atomic< bool > is_active;
39751c0b2f7Stbbdev
39851c0b2f7Stbbdev async_activity_queue<work_type> my_work_queue;
39951c0b2f7Stbbdev
40051c0b2f7Stbbdev std::atomic< bool > my_quit;
40151c0b2f7Stbbdev
40251c0b2f7Stbbdev std::thread my_service_thread;
40351c0b2f7Stbbdev };
40451c0b2f7Stbbdev
40551c0b2f7Stbbdev template<typename Input, typename Output>
40651c0b2f7Stbbdev struct basic_test {
40751c0b2f7Stbbdev typedef Input input_type;
40851c0b2f7Stbbdev typedef Output output_type;
40951c0b2f7Stbbdev typedef tbb::flow::async_node< input_type, output_type > async_node_type;
41051c0b2f7Stbbdev typedef typename async_node_type::gateway_type gateway_type;
41151c0b2f7Stbbdev
basic_testbasic_test41251c0b2f7Stbbdev basic_test() {}
41351c0b2f7Stbbdev
runbasic_test41451c0b2f7Stbbdev static int run(int async_expected_items = UNKNOWN_NUMBER_OF_ITEMS) {
41551c0b2f7Stbbdev async_activity<input_type, output_type> my_async_activity(async_expected_items);
41651c0b2f7Stbbdev
41751c0b2f7Stbbdev tbb::flow::graph g;
41851c0b2f7Stbbdev
41951c0b2f7Stbbdev tbb::flow::function_node< int, input_type > start_node(
42051c0b2f7Stbbdev g, tbb::flow::unlimited, [](int input) { return input_type(input); }
42151c0b2f7Stbbdev );
42251c0b2f7Stbbdev async_node_type offload_node(
42351c0b2f7Stbbdev g, tbb::flow::unlimited,
42451c0b2f7Stbbdev [&] (const input_type &input, gateway_type& gateway) {
42551c0b2f7Stbbdev ++async_body_exec_count;
42651c0b2f7Stbbdev if(my_async_activity.should_reserve_each_time())
42751c0b2f7Stbbdev gateway.reserve_wait();
42851c0b2f7Stbbdev my_async_activity.submit(input, gateway);
42951c0b2f7Stbbdev }
43051c0b2f7Stbbdev );
43151c0b2f7Stbbdev tbb::flow::function_node< output_type > end_node(
43251c0b2f7Stbbdev g, tbb::flow::unlimited,
43351c0b2f7Stbbdev [&](const output_type& input) {
43451c0b2f7Stbbdev ++end_body_exec_count;
43551c0b2f7Stbbdev output_type output;
43651c0b2f7Stbbdev wrapper_helper<output_type, output_type>::check(input, output);
43751c0b2f7Stbbdev }
43851c0b2f7Stbbdev );
43951c0b2f7Stbbdev
44051c0b2f7Stbbdev tbb::flow::make_edge( start_node, offload_node );
44151c0b2f7Stbbdev tbb::flow::make_edge( offload_node, end_node );
44251c0b2f7Stbbdev
44351c0b2f7Stbbdev async_body_exec_count = 0;
44451c0b2f7Stbbdev async_activity_processed_msg_count = 0;
44551c0b2f7Stbbdev end_body_exec_count = 0;
44651c0b2f7Stbbdev
44751c0b2f7Stbbdev if (async_expected_items != UNKNOWN_NUMBER_OF_ITEMS) {
44851c0b2f7Stbbdev offload_node.gateway().reserve_wait();
44951c0b2f7Stbbdev }
45051c0b2f7Stbbdev for (int i = 0; i < NUMBER_OF_MSGS; ++i) {
45151c0b2f7Stbbdev start_node.try_put(i);
45251c0b2f7Stbbdev }
45351c0b2f7Stbbdev g.wait_for_all();
45451c0b2f7Stbbdev CHECK_MESSAGE( ( async_body_exec_count == NUMBER_OF_MSGS), "AsyncBody processed wrong number of signals" );
45551c0b2f7Stbbdev CHECK_MESSAGE( ( async_activity_processed_msg_count == NUMBER_OF_MSGS), "AsyncActivity processed wrong number of signals" );
45651c0b2f7Stbbdev CHECK_MESSAGE( ( end_body_exec_count == NUMBER_OF_MSGS), "EndBody processed wrong number of signals");
45751c0b2f7Stbbdev INFO( "async_body_exec_count == " << int(async_body_exec_count) <<
45851c0b2f7Stbbdev " == async_activity_processed_msg_count == " << int(async_activity_processed_msg_count) <<
45951c0b2f7Stbbdev " == end_body_exec_count == " << int(end_body_exec_count) << "\n"
46051c0b2f7Stbbdev );
46151c0b2f7Stbbdev return 0;
46251c0b2f7Stbbdev }
46351c0b2f7Stbbdev
46451c0b2f7Stbbdev };
46551c0b2f7Stbbdev
test_copy_ctor()46651c0b2f7Stbbdev int test_copy_ctor() {
46751c0b2f7Stbbdev const int N = NUMBER_OF_MSGS;
46851c0b2f7Stbbdev async_body_exec_count = 0;
46951c0b2f7Stbbdev
47051c0b2f7Stbbdev tbb::flow::graph g;
47151c0b2f7Stbbdev
47251c0b2f7Stbbdev harness_counting_receiver<int> r1(g);
47351c0b2f7Stbbdev harness_counting_receiver<int> r2(g);
47451c0b2f7Stbbdev
47551c0b2f7Stbbdev tbb::task_group_context graph_ctx;
47651c0b2f7Stbbdev counting_async_node_type a(g, tbb::flow::unlimited, counting_async_unlimited_body(graph_ctx) );
47751c0b2f7Stbbdev counting_async_node_type b(a);
47851c0b2f7Stbbdev
47951c0b2f7Stbbdev tbb::flow::make_edge(a, r1); // C++11-style of making edges
48051c0b2f7Stbbdev tbb::flow::make_edge(tbb::flow::output_port<0>(b), r2); // usual way of making edges
48151c0b2f7Stbbdev
48251c0b2f7Stbbdev for (int i = 0; i < N; ++i) {
48351c0b2f7Stbbdev a.try_put(i);
48451c0b2f7Stbbdev }
48551c0b2f7Stbbdev g.wait_for_all();
48651c0b2f7Stbbdev
48751c0b2f7Stbbdev INFO("async_body_exec_count = " << int(async_body_exec_count) << "\n" );
48851c0b2f7Stbbdev INFO("r1.my_count == " << int(r1.my_count) << " and r2.my_count = " << int(r2.my_count) << "\n" );
48951c0b2f7Stbbdev CHECK_MESSAGE( ( int(async_body_exec_count) == NUMBER_OF_MSGS), "AsyncBody processed wrong number of signals" );
49051c0b2f7Stbbdev CHECK_MESSAGE( ( int(r1.my_count) == N), "counting receiver r1 has not received N items" );
49151c0b2f7Stbbdev CHECK_MESSAGE( ( int(r2.my_count) == 0), "counting receiver r2 has not received 0 items" );
49251c0b2f7Stbbdev
49351c0b2f7Stbbdev for (int i = 0; i < N; ++i) {
49451c0b2f7Stbbdev b.try_put(i);
49551c0b2f7Stbbdev }
49651c0b2f7Stbbdev g.wait_for_all();
49751c0b2f7Stbbdev
49851c0b2f7Stbbdev INFO("async_body_exec_count = " << int(async_body_exec_count) << "\n" );
49951c0b2f7Stbbdev INFO("r1.my_count == " << int(r1.my_count) << " and r2.my_count = " << int(r2.my_count) << "\n" );
50051c0b2f7Stbbdev CHECK_MESSAGE( ( int(async_body_exec_count) == 2*NUMBER_OF_MSGS), "AsyncBody processed wrong number of signals" );
50151c0b2f7Stbbdev CHECK_MESSAGE( ( int(r1.my_count) == N), "counting receiver r1 has not received N items" );
50251c0b2f7Stbbdev CHECK_MESSAGE( ( int(r2.my_count) == N), "counting receiver r2 has not received N items" );
50351c0b2f7Stbbdev return 0;
50451c0b2f7Stbbdev }
50551c0b2f7Stbbdev
50651c0b2f7Stbbdev std::atomic<int> main_tid_count;
50751c0b2f7Stbbdev
50851c0b2f7Stbbdev template<typename Input, typename Output>
50951c0b2f7Stbbdev struct spin_test {
51051c0b2f7Stbbdev typedef Input input_type;
51151c0b2f7Stbbdev typedef Output output_type;
51251c0b2f7Stbbdev typedef tbb::flow::async_node< input_type, output_type > async_node_type;
51351c0b2f7Stbbdev typedef typename async_node_type::gateway_type gateway_type;
51451c0b2f7Stbbdev
51551c0b2f7Stbbdev class end_body_type {
51651c0b2f7Stbbdev typedef Output output_type;
51751c0b2f7Stbbdev std::thread::id my_main_tid;
51851c0b2f7Stbbdev utils::SpinBarrier *my_barrier;
51951c0b2f7Stbbdev public:
end_body_type(std::thread::id t,utils::SpinBarrier & b)52051c0b2f7Stbbdev end_body_type(std::thread::id t, utils::SpinBarrier &b) : my_main_tid(t), my_barrier(&b) { }
52151c0b2f7Stbbdev
operator ()(const output_type &)52251c0b2f7Stbbdev void operator()( const output_type & ) {
52351c0b2f7Stbbdev ++end_body_exec_count;
52451c0b2f7Stbbdev if (std::this_thread::get_id() == my_main_tid) {
52551c0b2f7Stbbdev ++main_tid_count;
52651c0b2f7Stbbdev }
527b15aabb3Stbbdev my_barrier->wait();
52851c0b2f7Stbbdev }
52951c0b2f7Stbbdev };
53051c0b2f7Stbbdev
spin_testspin_test53151c0b2f7Stbbdev spin_test() {}
53251c0b2f7Stbbdev
runspin_test53351c0b2f7Stbbdev static int run(int nthreads, int async_expected_items = UNKNOWN_NUMBER_OF_ITEMS) {
53451c0b2f7Stbbdev async_activity<input_type, output_type> my_async_activity(async_expected_items, false, 0);
53551c0b2f7Stbbdev const int overall_message_count = nthreads * NUMBER_OF_MSGS;
53651c0b2f7Stbbdev utils::SpinBarrier spin_barrier(nthreads);
53751c0b2f7Stbbdev
53851c0b2f7Stbbdev tbb::flow::graph g;
53951c0b2f7Stbbdev tbb::flow::function_node<int, input_type> start_node(
54051c0b2f7Stbbdev g, tbb::flow::unlimited, [](int input) { return input_type(input); }
54151c0b2f7Stbbdev );
54251c0b2f7Stbbdev async_node_type offload_node(
54351c0b2f7Stbbdev g, tbb::flow::unlimited,
54451c0b2f7Stbbdev [&](const input_type &input, gateway_type& gateway) {
54551c0b2f7Stbbdev ++async_body_exec_count;
54651c0b2f7Stbbdev if(my_async_activity.should_reserve_each_time())
54751c0b2f7Stbbdev gateway.reserve_wait();
54851c0b2f7Stbbdev my_async_activity.submit(input, gateway);
54951c0b2f7Stbbdev }
55051c0b2f7Stbbdev );
55151c0b2f7Stbbdev tbb::flow::function_node<output_type> end_node(
55251c0b2f7Stbbdev g, tbb::flow::unlimited, end_body_type(std::this_thread::get_id(), spin_barrier)
55351c0b2f7Stbbdev );
55451c0b2f7Stbbdev
55551c0b2f7Stbbdev tbb::flow::make_edge( start_node, offload_node );
55651c0b2f7Stbbdev tbb::flow::make_edge( offload_node, end_node );
55751c0b2f7Stbbdev
55851c0b2f7Stbbdev async_body_exec_count = 0;
55951c0b2f7Stbbdev async_activity_processed_msg_count = 0;
56051c0b2f7Stbbdev end_body_exec_count = 0;
56151c0b2f7Stbbdev main_tid_count = 0;
56251c0b2f7Stbbdev
56351c0b2f7Stbbdev if (async_expected_items != UNKNOWN_NUMBER_OF_ITEMS ) {
56451c0b2f7Stbbdev offload_node.gateway().reserve_wait();
56551c0b2f7Stbbdev }
56651c0b2f7Stbbdev for (int i = 0; i < overall_message_count; ++i) {
56751c0b2f7Stbbdev start_node.try_put(i);
56851c0b2f7Stbbdev }
56951c0b2f7Stbbdev g.wait_for_all();
57051c0b2f7Stbbdev CHECK_MESSAGE( (async_body_exec_count == overall_message_count),
57151c0b2f7Stbbdev "AsyncBody processed wrong number of signals" );
57251c0b2f7Stbbdev CHECK_MESSAGE( (async_activity_processed_msg_count == overall_message_count),
57351c0b2f7Stbbdev "AsyncActivity processed wrong number of signals" );
57451c0b2f7Stbbdev CHECK_MESSAGE( (end_body_exec_count == overall_message_count),
57551c0b2f7Stbbdev "EndBody processed wrong number of signals");
57651c0b2f7Stbbdev
57751c0b2f7Stbbdev INFO( "Main thread participated in " << main_tid_count << " end_body tasks\n");
57851c0b2f7Stbbdev
57951c0b2f7Stbbdev INFO("async_body_exec_count == " << int(async_body_exec_count) <<
58051c0b2f7Stbbdev " == async_activity_processed_msg_count == " << int(async_activity_processed_msg_count) <<
58151c0b2f7Stbbdev " == end_body_exec_count == " << int(end_body_exec_count) << "\n"
58251c0b2f7Stbbdev );
58351c0b2f7Stbbdev return 0;
58451c0b2f7Stbbdev }
58551c0b2f7Stbbdev
58651c0b2f7Stbbdev };
58751c0b2f7Stbbdev
test_for_spin_avoidance()58851c0b2f7Stbbdev void test_for_spin_avoidance() {
58951c0b2f7Stbbdev const int nthreads = 4;
59051c0b2f7Stbbdev tbb::global_control gc(tbb::global_control::max_allowed_parallelism, nthreads);
591478de5b1Stbbdev tbb::task_arena a(nthreads);
592478de5b1Stbbdev a.execute([&] {
59351c0b2f7Stbbdev spin_test<int, int>::run(nthreads);
594478de5b1Stbbdev });
59551c0b2f7Stbbdev }
59651c0b2f7Stbbdev
59751c0b2f7Stbbdev template< typename Input, typename Output >
run_tests()59851c0b2f7Stbbdev int run_tests() {
59951c0b2f7Stbbdev basic_test<Input, Output>::run();
60051c0b2f7Stbbdev basic_test<Input, Output>::run(NUMBER_OF_MSGS);
60151c0b2f7Stbbdev basic_test<place_wrapper<Input>, place_wrapper<Output> >::run();
60251c0b2f7Stbbdev basic_test<place_wrapper<Input>, place_wrapper<Output> >::run(NUMBER_OF_MSGS);
60351c0b2f7Stbbdev return 0;
60451c0b2f7Stbbdev }
60551c0b2f7Stbbdev
60651c0b2f7Stbbdev #include "tbb/parallel_for.h"
60751c0b2f7Stbbdev template<typename Input, typename Output>
6085ab8e5f6SVertexwahn class enqueueing_on_inner_level {
60951c0b2f7Stbbdev typedef Input input_type;
61051c0b2f7Stbbdev typedef Output output_type;
61151c0b2f7Stbbdev typedef async_activity<input_type, output_type> async_activity_type;
61251c0b2f7Stbbdev typedef tbb::flow::async_node<Input, Output> async_node_type;
61351c0b2f7Stbbdev typedef typename async_node_type::gateway_type gateway_type;
61451c0b2f7Stbbdev
61551c0b2f7Stbbdev class body_graph_with_async {
61651c0b2f7Stbbdev public:
body_graph_with_async(utils::SpinBarrier & barrier,async_activity_type & activity)61751c0b2f7Stbbdev body_graph_with_async( utils::SpinBarrier& barrier, async_activity_type& activity )
61851c0b2f7Stbbdev : spin_barrier(&barrier), my_async_activity(&activity) {}
61951c0b2f7Stbbdev
operator ()(int) const62051c0b2f7Stbbdev void operator()(int) const {
62151c0b2f7Stbbdev tbb::flow::graph g;
62251c0b2f7Stbbdev tbb::flow::function_node< int, input_type > start_node(
62351c0b2f7Stbbdev g, tbb::flow::unlimited, [](int input) { return input_type(input); }
62451c0b2f7Stbbdev );
62551c0b2f7Stbbdev async_node_type offload_node(
62651c0b2f7Stbbdev g, tbb::flow::unlimited,
62751c0b2f7Stbbdev [&](const input_type &input, gateway_type& gateway) {
62851c0b2f7Stbbdev gateway.reserve_wait();
62951c0b2f7Stbbdev my_async_activity->submit( input, gateway );
63051c0b2f7Stbbdev }
63151c0b2f7Stbbdev );
63251c0b2f7Stbbdev tbb::flow::function_node< output_type > end_node( g, tbb::flow::unlimited, [](output_type){} );
63351c0b2f7Stbbdev
63451c0b2f7Stbbdev tbb::flow::make_edge( start_node, offload_node );
63551c0b2f7Stbbdev tbb::flow::make_edge( offload_node, end_node );
63651c0b2f7Stbbdev
63751c0b2f7Stbbdev start_node.try_put(1);
63851c0b2f7Stbbdev
63951c0b2f7Stbbdev spin_barrier->wait();
64051c0b2f7Stbbdev
64151c0b2f7Stbbdev my_async_activity->activate();
64251c0b2f7Stbbdev
64351c0b2f7Stbbdev g.wait_for_all();
64451c0b2f7Stbbdev }
64551c0b2f7Stbbdev
64651c0b2f7Stbbdev private:
64751c0b2f7Stbbdev utils::SpinBarrier* spin_barrier;
64851c0b2f7Stbbdev async_activity_type* my_async_activity;
64951c0b2f7Stbbdev };
65051c0b2f7Stbbdev
65151c0b2f7Stbbdev public:
run()65251c0b2f7Stbbdev static int run ()
65351c0b2f7Stbbdev {
65451c0b2f7Stbbdev const int nthreads = tbb::this_task_arena::max_concurrency();
65551c0b2f7Stbbdev utils::SpinBarrier spin_barrier( nthreads );
65651c0b2f7Stbbdev
65751c0b2f7Stbbdev async_activity_type my_async_activity( UNKNOWN_NUMBER_OF_ITEMS, true );
65851c0b2f7Stbbdev
65951c0b2f7Stbbdev tbb::parallel_for( 0, nthreads, body_graph_with_async( spin_barrier, my_async_activity ) );
66051c0b2f7Stbbdev return 0;
66151c0b2f7Stbbdev }
66251c0b2f7Stbbdev };
66351c0b2f7Stbbdev
run_test_enqueueing_on_inner_level()6645ab8e5f6SVertexwahn int run_test_enqueueing_on_inner_level() {
6655ab8e5f6SVertexwahn enqueueing_on_inner_level<int, int>::run();
66651c0b2f7Stbbdev return 0;
66751c0b2f7Stbbdev }
66851c0b2f7Stbbdev
66951c0b2f7Stbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
67051c0b2f7Stbbdev #include <array>
67151c0b2f7Stbbdev
67251c0b2f7Stbbdev template<typename NodeType>
67351c0b2f7Stbbdev class AsyncActivity {
67451c0b2f7Stbbdev public:
67551c0b2f7Stbbdev using gateway_t = typename NodeType::gateway_type;
67651c0b2f7Stbbdev
67751c0b2f7Stbbdev struct work_type {
67851c0b2f7Stbbdev int input;
67951c0b2f7Stbbdev gateway_t* gateway;
68051c0b2f7Stbbdev };
68151c0b2f7Stbbdev
__anon62ab872d0a02() 682b15aabb3Stbbdev AsyncActivity(size_t limit) : stop_limit(limit), c(0), thr([this]() {
68351c0b2f7Stbbdev while(!end_of_work()) {
68451c0b2f7Stbbdev work_type w;
68551c0b2f7Stbbdev while( my_q.try_pop(w) ) {
68651c0b2f7Stbbdev int res = do_work(w.input);
68751c0b2f7Stbbdev w.gateway->try_put(res);
68851c0b2f7Stbbdev w.gateway->release_wait();
68951c0b2f7Stbbdev ++c;
69051c0b2f7Stbbdev }
69151c0b2f7Stbbdev }
692b15aabb3Stbbdev }) {}
69351c0b2f7Stbbdev
submit(int i,gateway_t * gateway)69451c0b2f7Stbbdev void submit(int i, gateway_t* gateway) {
69551c0b2f7Stbbdev work_type w = {i, gateway};
69651c0b2f7Stbbdev gateway->reserve_wait();
69751c0b2f7Stbbdev my_q.push(w);
69851c0b2f7Stbbdev }
69951c0b2f7Stbbdev
wait_for_all()70051c0b2f7Stbbdev void wait_for_all() { thr.join(); }
70151c0b2f7Stbbdev
70251c0b2f7Stbbdev private:
end_of_work()70351c0b2f7Stbbdev bool end_of_work() { return c >= stop_limit; }
70451c0b2f7Stbbdev
do_work(int & i)70551c0b2f7Stbbdev int do_work(int& i) { return i + i; }
70651c0b2f7Stbbdev
70751c0b2f7Stbbdev async_activity_queue<work_type> my_q;
70851c0b2f7Stbbdev size_t stop_limit;
70951c0b2f7Stbbdev size_t c;
710b15aabb3Stbbdev std::thread thr;
71151c0b2f7Stbbdev };
71251c0b2f7Stbbdev
test_follows()71351c0b2f7Stbbdev void test_follows() {
71451c0b2f7Stbbdev using namespace tbb::flow;
71551c0b2f7Stbbdev
71651c0b2f7Stbbdev using input_t = int;
71751c0b2f7Stbbdev using output_t = int;
71851c0b2f7Stbbdev using node_t = async_node<input_t, output_t>;
71951c0b2f7Stbbdev
72051c0b2f7Stbbdev graph g;
72151c0b2f7Stbbdev
72251c0b2f7Stbbdev AsyncActivity<node_t> async_activity(3);
72351c0b2f7Stbbdev
72451c0b2f7Stbbdev std::array<broadcast_node<input_t>, 3> preds = {
72551c0b2f7Stbbdev {
72651c0b2f7Stbbdev broadcast_node<input_t>(g),
72751c0b2f7Stbbdev broadcast_node<input_t>(g),
72851c0b2f7Stbbdev broadcast_node<input_t>(g)
72951c0b2f7Stbbdev }
73051c0b2f7Stbbdev };
73151c0b2f7Stbbdev
73251c0b2f7Stbbdev node_t node(follows(preds[0], preds[1], preds[2]), unlimited, [&](int input, node_t::gateway_type& gtw) {
73351c0b2f7Stbbdev async_activity.submit(input, >w);
73451c0b2f7Stbbdev }, no_priority);
73551c0b2f7Stbbdev
73651c0b2f7Stbbdev buffer_node<output_t> buf(g);
73751c0b2f7Stbbdev make_edge(node, buf);
73851c0b2f7Stbbdev
73951c0b2f7Stbbdev for(auto& pred: preds) {
74051c0b2f7Stbbdev pred.try_put(1);
74151c0b2f7Stbbdev }
74251c0b2f7Stbbdev
74351c0b2f7Stbbdev g.wait_for_all();
74451c0b2f7Stbbdev async_activity.wait_for_all();
74551c0b2f7Stbbdev
74651c0b2f7Stbbdev output_t storage;
74751c0b2f7Stbbdev CHECK_MESSAGE((buf.try_get(storage) && buf.try_get(storage) && buf.try_get(storage) && !buf.try_get(storage)),
74851c0b2f7Stbbdev "Not exact edge quantity was made");
74951c0b2f7Stbbdev }
75051c0b2f7Stbbdev
test_precedes()75151c0b2f7Stbbdev void test_precedes() {
75251c0b2f7Stbbdev using namespace tbb::flow;
75351c0b2f7Stbbdev
75451c0b2f7Stbbdev using input_t = int;
75551c0b2f7Stbbdev using output_t = int;
75651c0b2f7Stbbdev using node_t = async_node<input_t, output_t>;
75751c0b2f7Stbbdev
75851c0b2f7Stbbdev graph g;
75951c0b2f7Stbbdev
76051c0b2f7Stbbdev AsyncActivity<node_t> async_activity(1);
76151c0b2f7Stbbdev
76251c0b2f7Stbbdev std::array<buffer_node<input_t>, 1> successors = { {buffer_node<input_t>(g)} };
76351c0b2f7Stbbdev
76451c0b2f7Stbbdev broadcast_node<input_t> start(g);
76551c0b2f7Stbbdev
76651c0b2f7Stbbdev node_t node(precedes(successors[0]), unlimited, [&](int input, node_t::gateway_type& gtw) {
76751c0b2f7Stbbdev async_activity.submit(input, >w);
76851c0b2f7Stbbdev }, no_priority);
76951c0b2f7Stbbdev
77051c0b2f7Stbbdev make_edge(start, node);
77151c0b2f7Stbbdev
77251c0b2f7Stbbdev start.try_put(1);
77351c0b2f7Stbbdev
77451c0b2f7Stbbdev g.wait_for_all();
77551c0b2f7Stbbdev async_activity.wait_for_all();
77651c0b2f7Stbbdev
77751c0b2f7Stbbdev for(auto& successor : successors) {
77851c0b2f7Stbbdev output_t storage;
77951c0b2f7Stbbdev CHECK_MESSAGE((successor.try_get(storage) && !successor.try_get(storage)),
78051c0b2f7Stbbdev "Not exact edge quantity was made");
78151c0b2f7Stbbdev }
78251c0b2f7Stbbdev }
78351c0b2f7Stbbdev
test_follows_and_precedes_api()78451c0b2f7Stbbdev void test_follows_and_precedes_api() {
78551c0b2f7Stbbdev test_follows();
78651c0b2f7Stbbdev test_precedes();
78751c0b2f7Stbbdev }
78851c0b2f7Stbbdev #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
78951c0b2f7Stbbdev
79051c0b2f7Stbbdev //! Test async bodies processing
79151c0b2f7Stbbdev //! \brief \ref requirement \ref error_guessing
79251c0b2f7Stbbdev TEST_CASE("Basic tests"){
79351c0b2f7Stbbdev tbb::task_arena arena(utils::MaxThread);
79451c0b2f7Stbbdev arena.execute(
__anon62ab872d0d02() 79551c0b2f7Stbbdev [&]() {
79651c0b2f7Stbbdev run_tests<int, int>();
79751c0b2f7Stbbdev run_tests<minimal_type, minimal_type>();
79851c0b2f7Stbbdev run_tests<int, minimal_type>();
79951c0b2f7Stbbdev }
80051c0b2f7Stbbdev );
80151c0b2f7Stbbdev }
80251c0b2f7Stbbdev
80351c0b2f7Stbbdev //! NativeParallelFor test with various concurrency settings
80451c0b2f7Stbbdev //! \brief \ref requirement \ref error_guessing
80551c0b2f7Stbbdev TEST_CASE("Lightweight tests"){
80651c0b2f7Stbbdev lightweight_testing::test<tbb::flow::async_node>(NUMBER_OF_MSGS);
80751c0b2f7Stbbdev }
80851c0b2f7Stbbdev
80951c0b2f7Stbbdev //! Test reset and cancellation
81051c0b2f7Stbbdev //! \brief \ref error_guessing
81151c0b2f7Stbbdev TEST_CASE("Reset test"){
81251c0b2f7Stbbdev test_reset();
81351c0b2f7Stbbdev }
81451c0b2f7Stbbdev
81551c0b2f7Stbbdev //! Test
81651c0b2f7Stbbdev //! \brief \ref requirement \ref error_guessing
81751c0b2f7Stbbdev TEST_CASE("Copy constructor test"){
81851c0b2f7Stbbdev test_copy_ctor();
81951c0b2f7Stbbdev }
82051c0b2f7Stbbdev
82151c0b2f7Stbbdev //! Test if main thread spins
82251c0b2f7Stbbdev //! \brief \ref stress
82351c0b2f7Stbbdev TEST_CASE("Spin avoidance test"){
82451c0b2f7Stbbdev test_for_spin_avoidance();
82551c0b2f7Stbbdev }
82651c0b2f7Stbbdev
8275ab8e5f6SVertexwahn //! Test nested enqueuing
82851c0b2f7Stbbdev //! \brief \ref error_guessing
8295ab8e5f6SVertexwahn TEST_CASE("Inner enqueuing test"){
8305ab8e5f6SVertexwahn run_test_enqueueing_on_inner_level();
83151c0b2f7Stbbdev }
83251c0b2f7Stbbdev
83351c0b2f7Stbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
834*c4a799dfSJhaShweta1 //! Test deprecated follows and precedes API
83551c0b2f7Stbbdev //! \brief \ref error_guessing
836*c4a799dfSJhaShweta1 TEST_CASE("Test follows and precedes API"){
83751c0b2f7Stbbdev test_follows_and_precedes_api();
83851c0b2f7Stbbdev }
83951c0b2f7Stbbdev #endif
840478de5b1Stbbdev
841478de5b1Stbbdev #if __TBB_CPP20_CONCEPTS_PRESENT
842478de5b1Stbbdev //! \brief \ref error_guessing
843478de5b1Stbbdev TEST_CASE("constraints for async_node input") {
844478de5b1Stbbdev struct InputObject {
845478de5b1Stbbdev InputObject() = default;
846478de5b1Stbbdev InputObject( const InputObject& ) = default;
847478de5b1Stbbdev };
848478de5b1Stbbdev
849478de5b1Stbbdev static_assert(utils::well_formed_instantiation<tbb::flow::async_node, InputObject, int>);
850478de5b1Stbbdev static_assert(utils::well_formed_instantiation<tbb::flow::async_node, int, int>);
851478de5b1Stbbdev static_assert(!utils::well_formed_instantiation<tbb::flow::async_node, test_concepts::NonCopyable, int>);
852478de5b1Stbbdev static_assert(!utils::well_formed_instantiation<tbb::flow::async_node, test_concepts::NonDefaultInitializable, int>);
853478de5b1Stbbdev }
854478de5b1Stbbdev
855478de5b1Stbbdev template <typename Input, typename Output, typename Body>
856478de5b1Stbbdev concept can_call_async_node_ctor = requires( tbb::flow::graph& graph, std::size_t concurrency,
857478de5b1Stbbdev Body body, tbb::flow::node_priority_t priority, tbb::flow::buffer_node<int>& f ) {
858478de5b1Stbbdev tbb::flow::async_node<Input, Output>(graph, concurrency, body);
859478de5b1Stbbdev tbb::flow::async_node<Input, Output>(graph, concurrency, body, priority);
860478de5b1Stbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
861478de5b1Stbbdev tbb::flow::async_node<Input, Output>(tbb::flow::follows(f), concurrency, body);
862478de5b1Stbbdev tbb::flow::async_node<Input, Output>(tbb::flow::follows(f), concurrency, body, priority);
863478de5b1Stbbdev #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
864478de5b1Stbbdev };
865478de5b1Stbbdev
866478de5b1Stbbdev //! \brief \ref error_guessing
867478de5b1Stbbdev TEST_CASE("constraints for async_node body") {
868478de5b1Stbbdev using input_type = int;
869478de5b1Stbbdev using output_type = input_type;
870478de5b1Stbbdev using namespace test_concepts::async_node_body;
871478de5b1Stbbdev
872478de5b1Stbbdev static_assert(can_call_async_node_ctor<input_type, output_type, Correct<input_type, output_type>>);
873478de5b1Stbbdev static_assert(!can_call_async_node_ctor<input_type, output_type, NonCopyable<input_type, output_type>>);
874478de5b1Stbbdev static_assert(!can_call_async_node_ctor<input_type, output_type, NonDestructible<input_type, output_type>>);
875478de5b1Stbbdev static_assert(!can_call_async_node_ctor<input_type, output_type, NoOperatorRoundBrackets<input_type, output_type>>);
876478de5b1Stbbdev static_assert(!can_call_async_node_ctor<input_type, output_type, WrongFirstInputOperatorRoundBrackets<input_type, output_type>>);
877478de5b1Stbbdev static_assert(!can_call_async_node_ctor<input_type, output_type, WrongSecondInputOperatorRoundBrackets<input_type, output_type>>);
878478de5b1Stbbdev }
879478de5b1Stbbdev
880478de5b1Stbbdev #endif // __TBB_CPP20_CONCEPTS_PRESENT
881