xref: /oneTBB/test/tbb/test_input_node.cpp (revision c21e688a)
151c0b2f7Stbbdev /*
2*c21e688aSSergey Zheltov     Copyright (c) 2005-2022 Intel Corporation
351c0b2f7Stbbdev 
451c0b2f7Stbbdev     Licensed under the Apache License, Version 2.0 (the "License");
551c0b2f7Stbbdev     you may not use this file except in compliance with the License.
651c0b2f7Stbbdev     You may obtain a copy of the License at
751c0b2f7Stbbdev 
851c0b2f7Stbbdev         http://www.apache.org/licenses/LICENSE-2.0
951c0b2f7Stbbdev 
1051c0b2f7Stbbdev     Unless required by applicable law or agreed to in writing, software
1151c0b2f7Stbbdev     distributed under the License is distributed on an "AS IS" BASIS,
1251c0b2f7Stbbdev     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1351c0b2f7Stbbdev     See the License for the specific language governing permissions and
1451c0b2f7Stbbdev     limitations under the License.
1551c0b2f7Stbbdev */
1651c0b2f7Stbbdev 
1751c0b2f7Stbbdev // have to expose the reset_node method to be able to reset a function_body
1851c0b2f7Stbbdev 
1951c0b2f7Stbbdev #include "common/config.h"
2051c0b2f7Stbbdev 
2151c0b2f7Stbbdev #include "tbb/flow_graph.h"
2251c0b2f7Stbbdev 
2351c0b2f7Stbbdev #include "common/test.h"
2451c0b2f7Stbbdev #include "common/utils.h"
2551c0b2f7Stbbdev #include "common/utils_assert.h"
26478de5b1Stbbdev #include "common/concepts_common.h"
2751c0b2f7Stbbdev 
2851c0b2f7Stbbdev 
2951c0b2f7Stbbdev //! \file test_input_node.cpp
3051c0b2f7Stbbdev //! \brief Test for [flow_graph.input_node] specification
3151c0b2f7Stbbdev 
3251c0b2f7Stbbdev 
3351c0b2f7Stbbdev using tbb::detail::d1::graph_task;
3451c0b2f7Stbbdev using tbb::detail::d1::SUCCESSFULLY_ENQUEUED;
3551c0b2f7Stbbdev 
3651c0b2f7Stbbdev const int N = 1000;
3751c0b2f7Stbbdev 
3851c0b2f7Stbbdev template< typename T >
3951c0b2f7Stbbdev class test_push_receiver : public tbb::flow::receiver<T>, utils::NoAssign {
4051c0b2f7Stbbdev 
4151c0b2f7Stbbdev     std::atomic<int> my_counters[N];
4251c0b2f7Stbbdev     tbb::flow::graph& my_graph;
4351c0b2f7Stbbdev 
4451c0b2f7Stbbdev public:
4551c0b2f7Stbbdev 
test_push_receiver(tbb::flow::graph & g)4651c0b2f7Stbbdev     test_push_receiver(tbb::flow::graph& g) : my_graph(g) {
4751c0b2f7Stbbdev         for (int i = 0; i < N; ++i )
4851c0b2f7Stbbdev             my_counters[i] = 0;
4951c0b2f7Stbbdev     }
5051c0b2f7Stbbdev 
get_count(int i)5151c0b2f7Stbbdev     int get_count( int i ) {
5251c0b2f7Stbbdev         int v = my_counters[i];
5351c0b2f7Stbbdev         return v;
5451c0b2f7Stbbdev     }
5551c0b2f7Stbbdev 
5651c0b2f7Stbbdev     typedef typename tbb::flow::receiver<T>::predecessor_type predecessor_type;
5751c0b2f7Stbbdev 
try_put_task(const T & v)5851c0b2f7Stbbdev     graph_task* try_put_task( const T &v ) override {
5951c0b2f7Stbbdev         int i = (int)v;
6051c0b2f7Stbbdev         ++my_counters[i];
6151c0b2f7Stbbdev         return const_cast<graph_task*>(SUCCESSFULLY_ENQUEUED);
6251c0b2f7Stbbdev     }
6351c0b2f7Stbbdev 
graph_reference() const6451c0b2f7Stbbdev     tbb::flow::graph& graph_reference() const override {
6551c0b2f7Stbbdev         return my_graph;
6651c0b2f7Stbbdev     }
6751c0b2f7Stbbdev };
6851c0b2f7Stbbdev 
6951c0b2f7Stbbdev template< typename T >
7051c0b2f7Stbbdev class my_input_body {
7151c0b2f7Stbbdev 
7251c0b2f7Stbbdev     unsigned my_count;
7351c0b2f7Stbbdev     int *ninvocations;
7451c0b2f7Stbbdev 
7551c0b2f7Stbbdev public:
7651c0b2f7Stbbdev 
my_input_body()7757f524caSIlya Isaev     my_input_body() : ninvocations(nullptr) { my_count = 0; }
my_input_body(int & _inv)7851c0b2f7Stbbdev     my_input_body(int &_inv) : ninvocations(&_inv)  { my_count = 0; }
7951c0b2f7Stbbdev 
operator ()(tbb::flow_control & fc)8051c0b2f7Stbbdev     T operator()( tbb::flow_control& fc ) {
8151c0b2f7Stbbdev         T v = (T)my_count++;
8251c0b2f7Stbbdev         if(ninvocations) ++(*ninvocations);
8351c0b2f7Stbbdev         if ( (int)v < N ){
8451c0b2f7Stbbdev             return v;
8551c0b2f7Stbbdev         }else{
8651c0b2f7Stbbdev             fc.stop();
8751c0b2f7Stbbdev             return T();
8851c0b2f7Stbbdev         }
8951c0b2f7Stbbdev     }
9051c0b2f7Stbbdev 
9151c0b2f7Stbbdev };
9251c0b2f7Stbbdev 
9351c0b2f7Stbbdev template< typename T >
9451c0b2f7Stbbdev class function_body {
9551c0b2f7Stbbdev 
9651c0b2f7Stbbdev     std::atomic<int> *my_counters;
9751c0b2f7Stbbdev 
9851c0b2f7Stbbdev public:
9951c0b2f7Stbbdev 
function_body(std::atomic<int> * counters)10051c0b2f7Stbbdev     function_body( std::atomic<int> *counters ) : my_counters(counters) {
10151c0b2f7Stbbdev         for (int i = 0; i < N; ++i )
10251c0b2f7Stbbdev             my_counters[i] = 0;
10351c0b2f7Stbbdev     }
10451c0b2f7Stbbdev 
operator ()(T v)10551c0b2f7Stbbdev     bool operator()( T v ) {
10651c0b2f7Stbbdev         ++my_counters[(int)v];
10751c0b2f7Stbbdev         return true;
10851c0b2f7Stbbdev     }
10951c0b2f7Stbbdev 
11051c0b2f7Stbbdev };
11151c0b2f7Stbbdev 
11251c0b2f7Stbbdev template< typename T >
test_single_dest()11351c0b2f7Stbbdev void test_single_dest() {
11451c0b2f7Stbbdev     // push only
11551c0b2f7Stbbdev     tbb::flow::graph g;
11651c0b2f7Stbbdev     tbb::flow::input_node<T> src(g, my_input_body<T>() );
11751c0b2f7Stbbdev     test_push_receiver<T> dest(g);
11851c0b2f7Stbbdev     tbb::flow::make_edge( src, dest );
11951c0b2f7Stbbdev     src.activate();
12051c0b2f7Stbbdev     g.wait_for_all();
12151c0b2f7Stbbdev     for (int i = 0; i < N; ++i ) {
12251c0b2f7Stbbdev         CHECK_MESSAGE( dest.get_count(i) == 1, "" );
12351c0b2f7Stbbdev     }
12451c0b2f7Stbbdev 
12551c0b2f7Stbbdev     // push only
12651c0b2f7Stbbdev     std::atomic<int> counters3[N];
12751c0b2f7Stbbdev     tbb::flow::input_node<T> src3(g, my_input_body<T>() );
12851c0b2f7Stbbdev     src3.activate();
12951c0b2f7Stbbdev 
13051c0b2f7Stbbdev     function_body<T> b3( counters3 );
13151c0b2f7Stbbdev     tbb::flow::function_node<T,bool> dest3(g, tbb::flow::unlimited, b3 );
13251c0b2f7Stbbdev     tbb::flow::make_edge( src3, dest3 );
13351c0b2f7Stbbdev     g.wait_for_all();
13451c0b2f7Stbbdev     for (int i = 0; i < N; ++i ) {
13551c0b2f7Stbbdev         int v = counters3[i];
13651c0b2f7Stbbdev         CHECK_MESSAGE( v == 1, "" );
13751c0b2f7Stbbdev     }
13851c0b2f7Stbbdev 
13951c0b2f7Stbbdev     // push & pull
14051c0b2f7Stbbdev     tbb::flow::input_node<T> src2(g, my_input_body<T>() );
14151c0b2f7Stbbdev     src2.activate();
14251c0b2f7Stbbdev     std::atomic<int> counters2[N];
14351c0b2f7Stbbdev 
14451c0b2f7Stbbdev     function_body<T> b2( counters2 );
14551c0b2f7Stbbdev     tbb::flow::function_node<T,bool,tbb::flow::rejecting> dest2(g, tbb::flow::serial, b2 );
14651c0b2f7Stbbdev     tbb::flow::make_edge( src2, dest2 );
14751c0b2f7Stbbdev     g.wait_for_all();
14851c0b2f7Stbbdev     for (int i = 0; i < N; ++i ) {
14951c0b2f7Stbbdev         int v = counters2[i];
15051c0b2f7Stbbdev         CHECK_MESSAGE( v == 1, "" );
15151c0b2f7Stbbdev     }
15251c0b2f7Stbbdev 
15351c0b2f7Stbbdev     // test copy constructor
15451c0b2f7Stbbdev     tbb::flow::input_node<T> src_copy(src);
15551c0b2f7Stbbdev     src_copy.activate();
15651c0b2f7Stbbdev     test_push_receiver<T> dest_c(g);
15751c0b2f7Stbbdev     CHECK_MESSAGE( src_copy.register_successor(dest_c), "" );
15851c0b2f7Stbbdev     g.wait_for_all();
15951c0b2f7Stbbdev     for (int i = 0; i < N; ++i ) {
16051c0b2f7Stbbdev         CHECK_MESSAGE( dest_c.get_count(i) == 1, "" );
16151c0b2f7Stbbdev     }
16251c0b2f7Stbbdev }
16351c0b2f7Stbbdev 
test_reset()16451c0b2f7Stbbdev void test_reset() {
16551c0b2f7Stbbdev     //    input_node -> function_node
16651c0b2f7Stbbdev     tbb::flow::graph g;
16751c0b2f7Stbbdev     std::atomic<int> counters3[N];
16851c0b2f7Stbbdev     tbb::flow::input_node<int> src3(g, my_input_body<int>());
16951c0b2f7Stbbdev     src3.activate();
17051c0b2f7Stbbdev     tbb::flow::input_node<int> src_inactive(g, my_input_body<int>());
17151c0b2f7Stbbdev     function_body<int> b3( counters3 );
17251c0b2f7Stbbdev     tbb::flow::function_node<int,bool> dest3(g, tbb::flow::unlimited, b3);
17351c0b2f7Stbbdev     tbb::flow::make_edge( src3, dest3 );
17451c0b2f7Stbbdev     //    source_node already in active state.  Let the graph run,
17551c0b2f7Stbbdev     g.wait_for_all();
17651c0b2f7Stbbdev     //    check the array for each value.
17751c0b2f7Stbbdev     for (int i = 0; i < N; ++i ) {
17851c0b2f7Stbbdev         int v = counters3[i];
17951c0b2f7Stbbdev         CHECK_MESSAGE( v == 1, "" );
18051c0b2f7Stbbdev         counters3[i] = 0;
18151c0b2f7Stbbdev     }
18251c0b2f7Stbbdev 
18351c0b2f7Stbbdev     g.reset(tbb::flow::rf_reset_bodies);  // <-- re-initializes the counts.
18451c0b2f7Stbbdev     // and spawns task to run input
18551c0b2f7Stbbdev     src3.activate();
18651c0b2f7Stbbdev 
18751c0b2f7Stbbdev     g.wait_for_all();
18851c0b2f7Stbbdev     //    check output queue again.  Should be the same contents.
18951c0b2f7Stbbdev     for (int i = 0; i < N; ++i ) {
19051c0b2f7Stbbdev         int v = counters3[i];
19151c0b2f7Stbbdev         CHECK_MESSAGE( v == 1, "" );
19251c0b2f7Stbbdev         counters3[i] = 0;
19351c0b2f7Stbbdev     }
19451c0b2f7Stbbdev     g.reset();  // doesn't reset the input_node_body to initial state, but does spawn a task
19551c0b2f7Stbbdev                 // to run the input_node.
19651c0b2f7Stbbdev 
19751c0b2f7Stbbdev     g.wait_for_all();
19851c0b2f7Stbbdev     // array should be all zero
19951c0b2f7Stbbdev     for (int i = 0; i < N; ++i ) {
20051c0b2f7Stbbdev         int v = counters3[i];
20151c0b2f7Stbbdev         CHECK_MESSAGE( v == 0, "" );
20251c0b2f7Stbbdev     }
20351c0b2f7Stbbdev 
20451c0b2f7Stbbdev     remove_edge(src3, dest3);
20551c0b2f7Stbbdev     make_edge(src_inactive, dest3);
20651c0b2f7Stbbdev 
20751c0b2f7Stbbdev     // src_inactive doesn't run
20851c0b2f7Stbbdev     g.wait_for_all();
20951c0b2f7Stbbdev     for (int i = 0; i < N; ++i ) {
21051c0b2f7Stbbdev         int v = counters3[i];
21151c0b2f7Stbbdev         CHECK_MESSAGE( v == 0, "" );
21251c0b2f7Stbbdev     }
21351c0b2f7Stbbdev 
21451c0b2f7Stbbdev     // run graph
21551c0b2f7Stbbdev     src_inactive.activate();
21651c0b2f7Stbbdev     g.wait_for_all();
21751c0b2f7Stbbdev     // check output
21851c0b2f7Stbbdev     for (int i = 0; i < N; ++i ) {
21951c0b2f7Stbbdev         int v = counters3[i];
22051c0b2f7Stbbdev         CHECK_MESSAGE( v == 1, "" );
22151c0b2f7Stbbdev         counters3[i] = 0;
22251c0b2f7Stbbdev     }
22351c0b2f7Stbbdev     g.reset(tbb::flow::rf_reset_bodies);  // <-- reinitializes the counts
22451c0b2f7Stbbdev     // src_inactive doesn't run
22551c0b2f7Stbbdev     g.wait_for_all();
22651c0b2f7Stbbdev     for (int i = 0; i < N; ++i ) {
22751c0b2f7Stbbdev         int v = counters3[i];
22851c0b2f7Stbbdev         CHECK_MESSAGE( v == 0, "" );
22951c0b2f7Stbbdev     }
23051c0b2f7Stbbdev 
23151c0b2f7Stbbdev     // start it up
23251c0b2f7Stbbdev     src_inactive.activate();
23351c0b2f7Stbbdev     g.wait_for_all();
23451c0b2f7Stbbdev     for (int i = 0; i < N; ++i ) {
23551c0b2f7Stbbdev         int v = counters3[i];
23651c0b2f7Stbbdev         CHECK_MESSAGE( v == 1, "" );
23751c0b2f7Stbbdev         counters3[i] = 0;
23851c0b2f7Stbbdev     }
23951c0b2f7Stbbdev     g.reset();  // doesn't reset the input_node_body to initial state, and doesn't
24051c0b2f7Stbbdev                 // spawn a task to run the input_node.
24151c0b2f7Stbbdev 
24251c0b2f7Stbbdev     g.wait_for_all();
24351c0b2f7Stbbdev     // array should be all zero
24451c0b2f7Stbbdev     for (int i = 0; i < N; ++i ) {
24551c0b2f7Stbbdev         int v = counters3[i];
24651c0b2f7Stbbdev         CHECK_MESSAGE( v == 0, "" );
24751c0b2f7Stbbdev     }
24851c0b2f7Stbbdev     src_inactive.activate();
24951c0b2f7Stbbdev     // input_node_body is already in final state, so input_node will not forward a message.
25051c0b2f7Stbbdev     g.wait_for_all();
25151c0b2f7Stbbdev     for (int i = 0; i < N; ++i ) {
25251c0b2f7Stbbdev         int v = counters3[i];
25351c0b2f7Stbbdev         CHECK_MESSAGE( v == 0, "" );
25451c0b2f7Stbbdev     }
25551c0b2f7Stbbdev }
25651c0b2f7Stbbdev 
25751c0b2f7Stbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
25851c0b2f7Stbbdev #include <array>
test_follows_and_precedes_api()25951c0b2f7Stbbdev void test_follows_and_precedes_api() {
26051c0b2f7Stbbdev     using namespace tbb::flow;
26151c0b2f7Stbbdev 
26251c0b2f7Stbbdev     graph g;
26351c0b2f7Stbbdev 
26451c0b2f7Stbbdev     std::array<buffer_node<bool>, 3> successors {{
26551c0b2f7Stbbdev                                                   buffer_node<bool>(g),
26651c0b2f7Stbbdev                                                   buffer_node<bool>(g),
26751c0b2f7Stbbdev                                                   buffer_node<bool>(g)
26851c0b2f7Stbbdev         }};
26951c0b2f7Stbbdev 
27051c0b2f7Stbbdev     bool do_try_put = true;
27151c0b2f7Stbbdev     input_node<bool> src(
27251c0b2f7Stbbdev         precedes(successors[0], successors[1], successors[2]),
27351c0b2f7Stbbdev         [&](tbb::flow_control& fc) -> bool {
27451c0b2f7Stbbdev             if(!do_try_put)
27551c0b2f7Stbbdev                 fc.stop();
27651c0b2f7Stbbdev             do_try_put = !do_try_put;
27751c0b2f7Stbbdev             return true;
27851c0b2f7Stbbdev         }
27951c0b2f7Stbbdev     );
28051c0b2f7Stbbdev 
28151c0b2f7Stbbdev     src.activate();
28251c0b2f7Stbbdev     g.wait_for_all();
28351c0b2f7Stbbdev 
28451c0b2f7Stbbdev     bool storage;
28551c0b2f7Stbbdev     for(auto& successor: successors) {
28651c0b2f7Stbbdev         CHECK_MESSAGE((successor.try_get(storage) && !successor.try_get(storage)),
28751c0b2f7Stbbdev                       "Not exact edge quantity was made");
28851c0b2f7Stbbdev     }
28951c0b2f7Stbbdev }
29051c0b2f7Stbbdev #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
29151c0b2f7Stbbdev 
29251c0b2f7Stbbdev //! Test push, push-pull behavior and copy constructor
29351c0b2f7Stbbdev //! \brief \ref error_guessing \ref requirement
29451c0b2f7Stbbdev TEST_CASE("Single destination tests"){
29551c0b2f7Stbbdev     for ( unsigned int p = utils::MinThread; p < utils::MaxThread; ++p ) {
29651c0b2f7Stbbdev         tbb::task_arena arena(p);
29751c0b2f7Stbbdev         arena.execute(
__anone180743f0202() 29851c0b2f7Stbbdev             [&]() {
29951c0b2f7Stbbdev                 test_single_dest<int>();
30051c0b2f7Stbbdev                 test_single_dest<float>();
30151c0b2f7Stbbdev             }
30251c0b2f7Stbbdev         );
30351c0b2f7Stbbdev 	}
30451c0b2f7Stbbdev }
30551c0b2f7Stbbdev 
30651c0b2f7Stbbdev //! Test reset variants
30751c0b2f7Stbbdev //! \brief \ref error_guessing
30851c0b2f7Stbbdev TEST_CASE("Reset test"){
30951c0b2f7Stbbdev     test_reset();
31051c0b2f7Stbbdev }
31151c0b2f7Stbbdev 
31251c0b2f7Stbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
31351c0b2f7Stbbdev //! Test follows and precedes API
31451c0b2f7Stbbdev //! \brief \ref error_guessing
31551c0b2f7Stbbdev TEST_CASE("Follows and precedes API"){
31651c0b2f7Stbbdev     test_follows_and_precedes_api();
31751c0b2f7Stbbdev }
31851c0b2f7Stbbdev #endif
31951c0b2f7Stbbdev 
32051c0b2f7Stbbdev //! Test try_get before activation
32151c0b2f7Stbbdev //! \brief \ref error_guessing
32251c0b2f7Stbbdev TEST_CASE("try_get before activation"){
32351c0b2f7Stbbdev     tbb::flow::graph g;
__anone180743f0302(tbb::flow_control& fc) 324478de5b1Stbbdev     tbb::flow::input_node<int> in(g, [&](tbb::flow_control& fc) { fc.stop(); return 0;});
32551c0b2f7Stbbdev 
32651c0b2f7Stbbdev     int tmp = -1;
32751c0b2f7Stbbdev     CHECK_MESSAGE((in.try_get(tmp) == false), "try_get before activation should not succeed");
32851c0b2f7Stbbdev }
329478de5b1Stbbdev 
330478de5b1Stbbdev #if __TBB_CPP20_CONCEPTS_PRESENT
331478de5b1Stbbdev //! \brief \ref error_guessing
332478de5b1Stbbdev TEST_CASE("constraints for input_node output") {
333478de5b1Stbbdev     struct Object : test_concepts::Copyable, test_concepts::CopyAssignable {};
334478de5b1Stbbdev 
335478de5b1Stbbdev     static_assert(utils::well_formed_instantiation<tbb::flow::input_node, Object>);
336478de5b1Stbbdev     static_assert(utils::well_formed_instantiation<tbb::flow::input_node, int>);
337478de5b1Stbbdev     static_assert(!utils::well_formed_instantiation<tbb::flow::input_node, test_concepts::NonCopyable>);
338478de5b1Stbbdev     static_assert(!utils::well_formed_instantiation<tbb::flow::input_node, test_concepts::NonCopyAssignable>);
339478de5b1Stbbdev }
340478de5b1Stbbdev 
341478de5b1Stbbdev template <typename Output, typename Body>
342478de5b1Stbbdev concept can_call_input_node_ctor = requires( tbb::flow::graph& graph, Body body, tbb::flow::buffer_node<int> f ) {
343478de5b1Stbbdev     tbb::flow::input_node<Output>(graph, body);
344478de5b1Stbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
345478de5b1Stbbdev     tbb::flow::input_node<Output>(tbb::flow::precedes(f), body);
346478de5b1Stbbdev #endif
347478de5b1Stbbdev };
348478de5b1Stbbdev 
349478de5b1Stbbdev //! \brief \ref error_guessing
350478de5b1Stbbdev TEST_CASE("constraints for input_node body") {
351478de5b1Stbbdev     using output_type = int;
352478de5b1Stbbdev     using namespace test_concepts::input_node_body;
353478de5b1Stbbdev 
354478de5b1Stbbdev     static_assert(can_call_input_node_ctor<output_type, Correct<output_type>>);
355478de5b1Stbbdev     static_assert(!can_call_input_node_ctor<output_type, NonCopyable<output_type>>);
356478de5b1Stbbdev     static_assert(!can_call_input_node_ctor<output_type, NonDestructible<output_type>>);
357478de5b1Stbbdev     static_assert(!can_call_input_node_ctor<output_type, NoOperatorRoundBrackets<output_type>>);
358478de5b1Stbbdev     static_assert(!can_call_input_node_ctor<output_type, WrongInputOperatorRoundBrackets<output_type>>);
359478de5b1Stbbdev     static_assert(!can_call_input_node_ctor<output_type, WrongReturnOperatorRoundBrackets<output_type>>);
360478de5b1Stbbdev }
361478de5b1Stbbdev #endif // __TBB_CPP20_CONCEPTS_PRESENT
362