151c0b2f7Stbbdev /*
2*c21e688aSSergey Zheltov Copyright (c) 2005-2022 Intel Corporation
351c0b2f7Stbbdev
451c0b2f7Stbbdev Licensed under the Apache License, Version 2.0 (the "License");
551c0b2f7Stbbdev you may not use this file except in compliance with the License.
651c0b2f7Stbbdev You may obtain a copy of the License at
751c0b2f7Stbbdev
851c0b2f7Stbbdev http://www.apache.org/licenses/LICENSE-2.0
951c0b2f7Stbbdev
1051c0b2f7Stbbdev Unless required by applicable law or agreed to in writing, software
1151c0b2f7Stbbdev distributed under the License is distributed on an "AS IS" BASIS,
1251c0b2f7Stbbdev WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1351c0b2f7Stbbdev See the License for the specific language governing permissions and
1451c0b2f7Stbbdev limitations under the License.
1551c0b2f7Stbbdev */
1651c0b2f7Stbbdev
1751c0b2f7Stbbdev // have to expose the reset_node method to be able to reset a function_body
1851c0b2f7Stbbdev
1951c0b2f7Stbbdev #include "common/config.h"
2051c0b2f7Stbbdev
2151c0b2f7Stbbdev #include "tbb/flow_graph.h"
2251c0b2f7Stbbdev
2351c0b2f7Stbbdev #include "common/test.h"
2451c0b2f7Stbbdev #include "common/utils.h"
2551c0b2f7Stbbdev #include "common/utils_assert.h"
26478de5b1Stbbdev #include "common/concepts_common.h"
2751c0b2f7Stbbdev
2851c0b2f7Stbbdev
2951c0b2f7Stbbdev //! \file test_input_node.cpp
3051c0b2f7Stbbdev //! \brief Test for [flow_graph.input_node] specification
3151c0b2f7Stbbdev
3251c0b2f7Stbbdev
3351c0b2f7Stbbdev using tbb::detail::d1::graph_task;
3451c0b2f7Stbbdev using tbb::detail::d1::SUCCESSFULLY_ENQUEUED;
3551c0b2f7Stbbdev
3651c0b2f7Stbbdev const int N = 1000;
3751c0b2f7Stbbdev
3851c0b2f7Stbbdev template< typename T >
3951c0b2f7Stbbdev class test_push_receiver : public tbb::flow::receiver<T>, utils::NoAssign {
4051c0b2f7Stbbdev
4151c0b2f7Stbbdev std::atomic<int> my_counters[N];
4251c0b2f7Stbbdev tbb::flow::graph& my_graph;
4351c0b2f7Stbbdev
4451c0b2f7Stbbdev public:
4551c0b2f7Stbbdev
test_push_receiver(tbb::flow::graph & g)4651c0b2f7Stbbdev test_push_receiver(tbb::flow::graph& g) : my_graph(g) {
4751c0b2f7Stbbdev for (int i = 0; i < N; ++i )
4851c0b2f7Stbbdev my_counters[i] = 0;
4951c0b2f7Stbbdev }
5051c0b2f7Stbbdev
get_count(int i)5151c0b2f7Stbbdev int get_count( int i ) {
5251c0b2f7Stbbdev int v = my_counters[i];
5351c0b2f7Stbbdev return v;
5451c0b2f7Stbbdev }
5551c0b2f7Stbbdev
5651c0b2f7Stbbdev typedef typename tbb::flow::receiver<T>::predecessor_type predecessor_type;
5751c0b2f7Stbbdev
try_put_task(const T & v)5851c0b2f7Stbbdev graph_task* try_put_task( const T &v ) override {
5951c0b2f7Stbbdev int i = (int)v;
6051c0b2f7Stbbdev ++my_counters[i];
6151c0b2f7Stbbdev return const_cast<graph_task*>(SUCCESSFULLY_ENQUEUED);
6251c0b2f7Stbbdev }
6351c0b2f7Stbbdev
graph_reference() const6451c0b2f7Stbbdev tbb::flow::graph& graph_reference() const override {
6551c0b2f7Stbbdev return my_graph;
6651c0b2f7Stbbdev }
6751c0b2f7Stbbdev };
6851c0b2f7Stbbdev
6951c0b2f7Stbbdev template< typename T >
7051c0b2f7Stbbdev class my_input_body {
7151c0b2f7Stbbdev
7251c0b2f7Stbbdev unsigned my_count;
7351c0b2f7Stbbdev int *ninvocations;
7451c0b2f7Stbbdev
7551c0b2f7Stbbdev public:
7651c0b2f7Stbbdev
my_input_body()7757f524caSIlya Isaev my_input_body() : ninvocations(nullptr) { my_count = 0; }
my_input_body(int & _inv)7851c0b2f7Stbbdev my_input_body(int &_inv) : ninvocations(&_inv) { my_count = 0; }
7951c0b2f7Stbbdev
operator ()(tbb::flow_control & fc)8051c0b2f7Stbbdev T operator()( tbb::flow_control& fc ) {
8151c0b2f7Stbbdev T v = (T)my_count++;
8251c0b2f7Stbbdev if(ninvocations) ++(*ninvocations);
8351c0b2f7Stbbdev if ( (int)v < N ){
8451c0b2f7Stbbdev return v;
8551c0b2f7Stbbdev }else{
8651c0b2f7Stbbdev fc.stop();
8751c0b2f7Stbbdev return T();
8851c0b2f7Stbbdev }
8951c0b2f7Stbbdev }
9051c0b2f7Stbbdev
9151c0b2f7Stbbdev };
9251c0b2f7Stbbdev
9351c0b2f7Stbbdev template< typename T >
9451c0b2f7Stbbdev class function_body {
9551c0b2f7Stbbdev
9651c0b2f7Stbbdev std::atomic<int> *my_counters;
9751c0b2f7Stbbdev
9851c0b2f7Stbbdev public:
9951c0b2f7Stbbdev
function_body(std::atomic<int> * counters)10051c0b2f7Stbbdev function_body( std::atomic<int> *counters ) : my_counters(counters) {
10151c0b2f7Stbbdev for (int i = 0; i < N; ++i )
10251c0b2f7Stbbdev my_counters[i] = 0;
10351c0b2f7Stbbdev }
10451c0b2f7Stbbdev
operator ()(T v)10551c0b2f7Stbbdev bool operator()( T v ) {
10651c0b2f7Stbbdev ++my_counters[(int)v];
10751c0b2f7Stbbdev return true;
10851c0b2f7Stbbdev }
10951c0b2f7Stbbdev
11051c0b2f7Stbbdev };
11151c0b2f7Stbbdev
11251c0b2f7Stbbdev template< typename T >
test_single_dest()11351c0b2f7Stbbdev void test_single_dest() {
11451c0b2f7Stbbdev // push only
11551c0b2f7Stbbdev tbb::flow::graph g;
11651c0b2f7Stbbdev tbb::flow::input_node<T> src(g, my_input_body<T>() );
11751c0b2f7Stbbdev test_push_receiver<T> dest(g);
11851c0b2f7Stbbdev tbb::flow::make_edge( src, dest );
11951c0b2f7Stbbdev src.activate();
12051c0b2f7Stbbdev g.wait_for_all();
12151c0b2f7Stbbdev for (int i = 0; i < N; ++i ) {
12251c0b2f7Stbbdev CHECK_MESSAGE( dest.get_count(i) == 1, "" );
12351c0b2f7Stbbdev }
12451c0b2f7Stbbdev
12551c0b2f7Stbbdev // push only
12651c0b2f7Stbbdev std::atomic<int> counters3[N];
12751c0b2f7Stbbdev tbb::flow::input_node<T> src3(g, my_input_body<T>() );
12851c0b2f7Stbbdev src3.activate();
12951c0b2f7Stbbdev
13051c0b2f7Stbbdev function_body<T> b3( counters3 );
13151c0b2f7Stbbdev tbb::flow::function_node<T,bool> dest3(g, tbb::flow::unlimited, b3 );
13251c0b2f7Stbbdev tbb::flow::make_edge( src3, dest3 );
13351c0b2f7Stbbdev g.wait_for_all();
13451c0b2f7Stbbdev for (int i = 0; i < N; ++i ) {
13551c0b2f7Stbbdev int v = counters3[i];
13651c0b2f7Stbbdev CHECK_MESSAGE( v == 1, "" );
13751c0b2f7Stbbdev }
13851c0b2f7Stbbdev
13951c0b2f7Stbbdev // push & pull
14051c0b2f7Stbbdev tbb::flow::input_node<T> src2(g, my_input_body<T>() );
14151c0b2f7Stbbdev src2.activate();
14251c0b2f7Stbbdev std::atomic<int> counters2[N];
14351c0b2f7Stbbdev
14451c0b2f7Stbbdev function_body<T> b2( counters2 );
14551c0b2f7Stbbdev tbb::flow::function_node<T,bool,tbb::flow::rejecting> dest2(g, tbb::flow::serial, b2 );
14651c0b2f7Stbbdev tbb::flow::make_edge( src2, dest2 );
14751c0b2f7Stbbdev g.wait_for_all();
14851c0b2f7Stbbdev for (int i = 0; i < N; ++i ) {
14951c0b2f7Stbbdev int v = counters2[i];
15051c0b2f7Stbbdev CHECK_MESSAGE( v == 1, "" );
15151c0b2f7Stbbdev }
15251c0b2f7Stbbdev
15351c0b2f7Stbbdev // test copy constructor
15451c0b2f7Stbbdev tbb::flow::input_node<T> src_copy(src);
15551c0b2f7Stbbdev src_copy.activate();
15651c0b2f7Stbbdev test_push_receiver<T> dest_c(g);
15751c0b2f7Stbbdev CHECK_MESSAGE( src_copy.register_successor(dest_c), "" );
15851c0b2f7Stbbdev g.wait_for_all();
15951c0b2f7Stbbdev for (int i = 0; i < N; ++i ) {
16051c0b2f7Stbbdev CHECK_MESSAGE( dest_c.get_count(i) == 1, "" );
16151c0b2f7Stbbdev }
16251c0b2f7Stbbdev }
16351c0b2f7Stbbdev
test_reset()16451c0b2f7Stbbdev void test_reset() {
16551c0b2f7Stbbdev // input_node -> function_node
16651c0b2f7Stbbdev tbb::flow::graph g;
16751c0b2f7Stbbdev std::atomic<int> counters3[N];
16851c0b2f7Stbbdev tbb::flow::input_node<int> src3(g, my_input_body<int>());
16951c0b2f7Stbbdev src3.activate();
17051c0b2f7Stbbdev tbb::flow::input_node<int> src_inactive(g, my_input_body<int>());
17151c0b2f7Stbbdev function_body<int> b3( counters3 );
17251c0b2f7Stbbdev tbb::flow::function_node<int,bool> dest3(g, tbb::flow::unlimited, b3);
17351c0b2f7Stbbdev tbb::flow::make_edge( src3, dest3 );
17451c0b2f7Stbbdev // source_node already in active state. Let the graph run,
17551c0b2f7Stbbdev g.wait_for_all();
17651c0b2f7Stbbdev // check the array for each value.
17751c0b2f7Stbbdev for (int i = 0; i < N; ++i ) {
17851c0b2f7Stbbdev int v = counters3[i];
17951c0b2f7Stbbdev CHECK_MESSAGE( v == 1, "" );
18051c0b2f7Stbbdev counters3[i] = 0;
18151c0b2f7Stbbdev }
18251c0b2f7Stbbdev
18351c0b2f7Stbbdev g.reset(tbb::flow::rf_reset_bodies); // <-- re-initializes the counts.
18451c0b2f7Stbbdev // and spawns task to run input
18551c0b2f7Stbbdev src3.activate();
18651c0b2f7Stbbdev
18751c0b2f7Stbbdev g.wait_for_all();
18851c0b2f7Stbbdev // check output queue again. Should be the same contents.
18951c0b2f7Stbbdev for (int i = 0; i < N; ++i ) {
19051c0b2f7Stbbdev int v = counters3[i];
19151c0b2f7Stbbdev CHECK_MESSAGE( v == 1, "" );
19251c0b2f7Stbbdev counters3[i] = 0;
19351c0b2f7Stbbdev }
19451c0b2f7Stbbdev g.reset(); // doesn't reset the input_node_body to initial state, but does spawn a task
19551c0b2f7Stbbdev // to run the input_node.
19651c0b2f7Stbbdev
19751c0b2f7Stbbdev g.wait_for_all();
19851c0b2f7Stbbdev // array should be all zero
19951c0b2f7Stbbdev for (int i = 0; i < N; ++i ) {
20051c0b2f7Stbbdev int v = counters3[i];
20151c0b2f7Stbbdev CHECK_MESSAGE( v == 0, "" );
20251c0b2f7Stbbdev }
20351c0b2f7Stbbdev
20451c0b2f7Stbbdev remove_edge(src3, dest3);
20551c0b2f7Stbbdev make_edge(src_inactive, dest3);
20651c0b2f7Stbbdev
20751c0b2f7Stbbdev // src_inactive doesn't run
20851c0b2f7Stbbdev g.wait_for_all();
20951c0b2f7Stbbdev for (int i = 0; i < N; ++i ) {
21051c0b2f7Stbbdev int v = counters3[i];
21151c0b2f7Stbbdev CHECK_MESSAGE( v == 0, "" );
21251c0b2f7Stbbdev }
21351c0b2f7Stbbdev
21451c0b2f7Stbbdev // run graph
21551c0b2f7Stbbdev src_inactive.activate();
21651c0b2f7Stbbdev g.wait_for_all();
21751c0b2f7Stbbdev // check output
21851c0b2f7Stbbdev for (int i = 0; i < N; ++i ) {
21951c0b2f7Stbbdev int v = counters3[i];
22051c0b2f7Stbbdev CHECK_MESSAGE( v == 1, "" );
22151c0b2f7Stbbdev counters3[i] = 0;
22251c0b2f7Stbbdev }
22351c0b2f7Stbbdev g.reset(tbb::flow::rf_reset_bodies); // <-- reinitializes the counts
22451c0b2f7Stbbdev // src_inactive doesn't run
22551c0b2f7Stbbdev g.wait_for_all();
22651c0b2f7Stbbdev for (int i = 0; i < N; ++i ) {
22751c0b2f7Stbbdev int v = counters3[i];
22851c0b2f7Stbbdev CHECK_MESSAGE( v == 0, "" );
22951c0b2f7Stbbdev }
23051c0b2f7Stbbdev
23151c0b2f7Stbbdev // start it up
23251c0b2f7Stbbdev src_inactive.activate();
23351c0b2f7Stbbdev g.wait_for_all();
23451c0b2f7Stbbdev for (int i = 0; i < N; ++i ) {
23551c0b2f7Stbbdev int v = counters3[i];
23651c0b2f7Stbbdev CHECK_MESSAGE( v == 1, "" );
23751c0b2f7Stbbdev counters3[i] = 0;
23851c0b2f7Stbbdev }
23951c0b2f7Stbbdev g.reset(); // doesn't reset the input_node_body to initial state, and doesn't
24051c0b2f7Stbbdev // spawn a task to run the input_node.
24151c0b2f7Stbbdev
24251c0b2f7Stbbdev g.wait_for_all();
24351c0b2f7Stbbdev // array should be all zero
24451c0b2f7Stbbdev for (int i = 0; i < N; ++i ) {
24551c0b2f7Stbbdev int v = counters3[i];
24651c0b2f7Stbbdev CHECK_MESSAGE( v == 0, "" );
24751c0b2f7Stbbdev }
24851c0b2f7Stbbdev src_inactive.activate();
24951c0b2f7Stbbdev // input_node_body is already in final state, so input_node will not forward a message.
25051c0b2f7Stbbdev g.wait_for_all();
25151c0b2f7Stbbdev for (int i = 0; i < N; ++i ) {
25251c0b2f7Stbbdev int v = counters3[i];
25351c0b2f7Stbbdev CHECK_MESSAGE( v == 0, "" );
25451c0b2f7Stbbdev }
25551c0b2f7Stbbdev }
25651c0b2f7Stbbdev
25751c0b2f7Stbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
25851c0b2f7Stbbdev #include <array>
test_follows_and_precedes_api()25951c0b2f7Stbbdev void test_follows_and_precedes_api() {
26051c0b2f7Stbbdev using namespace tbb::flow;
26151c0b2f7Stbbdev
26251c0b2f7Stbbdev graph g;
26351c0b2f7Stbbdev
26451c0b2f7Stbbdev std::array<buffer_node<bool>, 3> successors {{
26551c0b2f7Stbbdev buffer_node<bool>(g),
26651c0b2f7Stbbdev buffer_node<bool>(g),
26751c0b2f7Stbbdev buffer_node<bool>(g)
26851c0b2f7Stbbdev }};
26951c0b2f7Stbbdev
27051c0b2f7Stbbdev bool do_try_put = true;
27151c0b2f7Stbbdev input_node<bool> src(
27251c0b2f7Stbbdev precedes(successors[0], successors[1], successors[2]),
27351c0b2f7Stbbdev [&](tbb::flow_control& fc) -> bool {
27451c0b2f7Stbbdev if(!do_try_put)
27551c0b2f7Stbbdev fc.stop();
27651c0b2f7Stbbdev do_try_put = !do_try_put;
27751c0b2f7Stbbdev return true;
27851c0b2f7Stbbdev }
27951c0b2f7Stbbdev );
28051c0b2f7Stbbdev
28151c0b2f7Stbbdev src.activate();
28251c0b2f7Stbbdev g.wait_for_all();
28351c0b2f7Stbbdev
28451c0b2f7Stbbdev bool storage;
28551c0b2f7Stbbdev for(auto& successor: successors) {
28651c0b2f7Stbbdev CHECK_MESSAGE((successor.try_get(storage) && !successor.try_get(storage)),
28751c0b2f7Stbbdev "Not exact edge quantity was made");
28851c0b2f7Stbbdev }
28951c0b2f7Stbbdev }
29051c0b2f7Stbbdev #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
29151c0b2f7Stbbdev
29251c0b2f7Stbbdev //! Test push, push-pull behavior and copy constructor
29351c0b2f7Stbbdev //! \brief \ref error_guessing \ref requirement
29451c0b2f7Stbbdev TEST_CASE("Single destination tests"){
29551c0b2f7Stbbdev for ( unsigned int p = utils::MinThread; p < utils::MaxThread; ++p ) {
29651c0b2f7Stbbdev tbb::task_arena arena(p);
29751c0b2f7Stbbdev arena.execute(
__anone180743f0202() 29851c0b2f7Stbbdev [&]() {
29951c0b2f7Stbbdev test_single_dest<int>();
30051c0b2f7Stbbdev test_single_dest<float>();
30151c0b2f7Stbbdev }
30251c0b2f7Stbbdev );
30351c0b2f7Stbbdev }
30451c0b2f7Stbbdev }
30551c0b2f7Stbbdev
30651c0b2f7Stbbdev //! Test reset variants
30751c0b2f7Stbbdev //! \brief \ref error_guessing
30851c0b2f7Stbbdev TEST_CASE("Reset test"){
30951c0b2f7Stbbdev test_reset();
31051c0b2f7Stbbdev }
31151c0b2f7Stbbdev
31251c0b2f7Stbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
31351c0b2f7Stbbdev //! Test follows and precedes API
31451c0b2f7Stbbdev //! \brief \ref error_guessing
31551c0b2f7Stbbdev TEST_CASE("Follows and precedes API"){
31651c0b2f7Stbbdev test_follows_and_precedes_api();
31751c0b2f7Stbbdev }
31851c0b2f7Stbbdev #endif
31951c0b2f7Stbbdev
32051c0b2f7Stbbdev //! Test try_get before activation
32151c0b2f7Stbbdev //! \brief \ref error_guessing
32251c0b2f7Stbbdev TEST_CASE("try_get before activation"){
32351c0b2f7Stbbdev tbb::flow::graph g;
__anone180743f0302(tbb::flow_control& fc) 324478de5b1Stbbdev tbb::flow::input_node<int> in(g, [&](tbb::flow_control& fc) { fc.stop(); return 0;});
32551c0b2f7Stbbdev
32651c0b2f7Stbbdev int tmp = -1;
32751c0b2f7Stbbdev CHECK_MESSAGE((in.try_get(tmp) == false), "try_get before activation should not succeed");
32851c0b2f7Stbbdev }
329478de5b1Stbbdev
330478de5b1Stbbdev #if __TBB_CPP20_CONCEPTS_PRESENT
331478de5b1Stbbdev //! \brief \ref error_guessing
332478de5b1Stbbdev TEST_CASE("constraints for input_node output") {
333478de5b1Stbbdev struct Object : test_concepts::Copyable, test_concepts::CopyAssignable {};
334478de5b1Stbbdev
335478de5b1Stbbdev static_assert(utils::well_formed_instantiation<tbb::flow::input_node, Object>);
336478de5b1Stbbdev static_assert(utils::well_formed_instantiation<tbb::flow::input_node, int>);
337478de5b1Stbbdev static_assert(!utils::well_formed_instantiation<tbb::flow::input_node, test_concepts::NonCopyable>);
338478de5b1Stbbdev static_assert(!utils::well_formed_instantiation<tbb::flow::input_node, test_concepts::NonCopyAssignable>);
339478de5b1Stbbdev }
340478de5b1Stbbdev
341478de5b1Stbbdev template <typename Output, typename Body>
342478de5b1Stbbdev concept can_call_input_node_ctor = requires( tbb::flow::graph& graph, Body body, tbb::flow::buffer_node<int> f ) {
343478de5b1Stbbdev tbb::flow::input_node<Output>(graph, body);
344478de5b1Stbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
345478de5b1Stbbdev tbb::flow::input_node<Output>(tbb::flow::precedes(f), body);
346478de5b1Stbbdev #endif
347478de5b1Stbbdev };
348478de5b1Stbbdev
349478de5b1Stbbdev //! \brief \ref error_guessing
350478de5b1Stbbdev TEST_CASE("constraints for input_node body") {
351478de5b1Stbbdev using output_type = int;
352478de5b1Stbbdev using namespace test_concepts::input_node_body;
353478de5b1Stbbdev
354478de5b1Stbbdev static_assert(can_call_input_node_ctor<output_type, Correct<output_type>>);
355478de5b1Stbbdev static_assert(!can_call_input_node_ctor<output_type, NonCopyable<output_type>>);
356478de5b1Stbbdev static_assert(!can_call_input_node_ctor<output_type, NonDestructible<output_type>>);
357478de5b1Stbbdev static_assert(!can_call_input_node_ctor<output_type, NoOperatorRoundBrackets<output_type>>);
358478de5b1Stbbdev static_assert(!can_call_input_node_ctor<output_type, WrongInputOperatorRoundBrackets<output_type>>);
359478de5b1Stbbdev static_assert(!can_call_input_node_ctor<output_type, WrongReturnOperatorRoundBrackets<output_type>>);
360478de5b1Stbbdev }
361478de5b1Stbbdev #endif // __TBB_CPP20_CONCEPTS_PRESENT
362