xref: /oneTBB/test/tbb/test_limiter_node.cpp (revision 89b2e0e3)
151c0b2f7Stbbdev /*
2*89b2e0e3SOlga Malysheva     Copyright (c) 2005-2023 Intel Corporation
351c0b2f7Stbbdev 
451c0b2f7Stbbdev     Licensed under the Apache License, Version 2.0 (the "License");
551c0b2f7Stbbdev     you may not use this file except in compliance with the License.
651c0b2f7Stbbdev     You may obtain a copy of the License at
751c0b2f7Stbbdev 
851c0b2f7Stbbdev         http://www.apache.org/licenses/LICENSE-2.0
951c0b2f7Stbbdev 
1051c0b2f7Stbbdev     Unless required by applicable law or agreed to in writing, software
1151c0b2f7Stbbdev     distributed under the License is distributed on an "AS IS" BASIS,
1251c0b2f7Stbbdev     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1351c0b2f7Stbbdev     See the License for the specific language governing permissions and
1451c0b2f7Stbbdev     limitations under the License.
1551c0b2f7Stbbdev */
1651c0b2f7Stbbdev 
17b15aabb3Stbbdev #if __INTEL_COMPILER && _MSC_VER
18b15aabb3Stbbdev #pragma warning(disable : 2586) // decorated name length exceeded, name was truncated
19b15aabb3Stbbdev #endif
20b15aabb3Stbbdev 
2151c0b2f7Stbbdev #include "common/config.h"
2251c0b2f7Stbbdev 
2351c0b2f7Stbbdev #include "tbb/flow_graph.h"
2451c0b2f7Stbbdev 
2551c0b2f7Stbbdev #include "common/test.h"
2651c0b2f7Stbbdev #include "common/utils.h"
2751c0b2f7Stbbdev #include "common/utils_assert.h"
2851c0b2f7Stbbdev #include "common/test_follows_and_precedes_api.h"
296edb5c3aSVladimir Serov #include "tbb/global_control.h"
3051c0b2f7Stbbdev 
3151c0b2f7Stbbdev #include <atomic>
3251c0b2f7Stbbdev 
3351c0b2f7Stbbdev 
3451c0b2f7Stbbdev //! \file test_limiter_node.cpp
3551c0b2f7Stbbdev //! \brief Test for [flow_graph.limiter_node] specification
3651c0b2f7Stbbdev 
3751c0b2f7Stbbdev 
3851c0b2f7Stbbdev const int L = 10;
3951c0b2f7Stbbdev const int N = 1000;
4051c0b2f7Stbbdev 
4151c0b2f7Stbbdev using tbb::detail::d1::SUCCESSFULLY_ENQUEUED;
4251c0b2f7Stbbdev using tbb::detail::d1::graph_task;
4351c0b2f7Stbbdev 
4451c0b2f7Stbbdev template< typename T >
4551c0b2f7Stbbdev struct serial_receiver : public tbb::flow::receiver<T>, utils::NoAssign {
4651c0b2f7Stbbdev    T next_value;
4751c0b2f7Stbbdev    tbb::flow::graph& my_graph;
4851c0b2f7Stbbdev 
serial_receiverserial_receiver4951c0b2f7Stbbdev    serial_receiver(tbb::flow::graph& g) : next_value(T(0)), my_graph(g) {}
5051c0b2f7Stbbdev 
try_put_taskserial_receiver5151c0b2f7Stbbdev    graph_task* try_put_task( const T &v ) override {
5251c0b2f7Stbbdev        CHECK_MESSAGE( next_value++  == v, "" );
5351c0b2f7Stbbdev        return const_cast<graph_task*>(SUCCESSFULLY_ENQUEUED);
5451c0b2f7Stbbdev    }
5551c0b2f7Stbbdev 
graph_referenceserial_receiver5651c0b2f7Stbbdev     tbb::flow::graph& graph_reference() const override {
5751c0b2f7Stbbdev         return my_graph;
5851c0b2f7Stbbdev     }
5951c0b2f7Stbbdev };
6051c0b2f7Stbbdev 
6151c0b2f7Stbbdev template< typename T >
6251c0b2f7Stbbdev struct parallel_receiver : public tbb::flow::receiver<T>, utils::NoAssign {
6351c0b2f7Stbbdev 
6451c0b2f7Stbbdev     std::atomic<int> my_count;
6551c0b2f7Stbbdev     tbb::flow::graph& my_graph;
6651c0b2f7Stbbdev 
parallel_receiverparallel_receiver6751c0b2f7Stbbdev     parallel_receiver(tbb::flow::graph& g) : my_graph(g) { my_count = 0; }
6851c0b2f7Stbbdev 
try_put_taskparallel_receiver6951c0b2f7Stbbdev     graph_task* try_put_task( const T &/*v*/ ) override {
7051c0b2f7Stbbdev        ++my_count;
7151c0b2f7Stbbdev        return const_cast<graph_task*>(SUCCESSFULLY_ENQUEUED);
7251c0b2f7Stbbdev     }
7351c0b2f7Stbbdev 
graph_referenceparallel_receiver7451c0b2f7Stbbdev     tbb::flow::graph& graph_reference() const override {
7551c0b2f7Stbbdev         return my_graph;
7651c0b2f7Stbbdev     }
7751c0b2f7Stbbdev };
7851c0b2f7Stbbdev 
7951c0b2f7Stbbdev template< typename T >
8051c0b2f7Stbbdev struct empty_sender : public tbb::flow::sender<T> {
8151c0b2f7Stbbdev         typedef typename tbb::flow::sender<T>::successor_type successor_type;
8251c0b2f7Stbbdev 
register_successorempty_sender8351c0b2f7Stbbdev         bool register_successor( successor_type & ) override { return false; }
remove_successorempty_sender8451c0b2f7Stbbdev         bool remove_successor( successor_type & ) override { return false; }
8551c0b2f7Stbbdev };
8651c0b2f7Stbbdev 
8751c0b2f7Stbbdev 
8851c0b2f7Stbbdev template< typename T >
8951c0b2f7Stbbdev struct put_body : utils::NoAssign {
9051c0b2f7Stbbdev 
9151c0b2f7Stbbdev     tbb::flow::limiter_node<T> &my_lim;
9251c0b2f7Stbbdev     std::atomic<int> &my_accept_count;
9351c0b2f7Stbbdev 
put_bodyput_body9451c0b2f7Stbbdev     put_body( tbb::flow::limiter_node<T> &lim, std::atomic<int> &accept_count ) :
9551c0b2f7Stbbdev         my_lim(lim), my_accept_count(accept_count) {}
9651c0b2f7Stbbdev 
operator ()put_body9751c0b2f7Stbbdev     void operator()( int ) const {
9851c0b2f7Stbbdev         for ( int i = 0; i < L; ++i ) {
9951c0b2f7Stbbdev             bool msg = my_lim.try_put( T(i) );
10051c0b2f7Stbbdev             if ( msg == true )
10151c0b2f7Stbbdev                ++my_accept_count;
10251c0b2f7Stbbdev         }
10351c0b2f7Stbbdev     }
10451c0b2f7Stbbdev };
10551c0b2f7Stbbdev 
10651c0b2f7Stbbdev template< typename T >
10751c0b2f7Stbbdev struct put_dec_body : utils::NoAssign {
10851c0b2f7Stbbdev 
10951c0b2f7Stbbdev     tbb::flow::limiter_node<T> &my_lim;
11051c0b2f7Stbbdev     std::atomic<int> &my_accept_count;
11151c0b2f7Stbbdev 
put_dec_bodyput_dec_body11251c0b2f7Stbbdev     put_dec_body( tbb::flow::limiter_node<T> &lim, std::atomic<int> &accept_count ) :
11351c0b2f7Stbbdev         my_lim(lim), my_accept_count(accept_count) {}
11451c0b2f7Stbbdev 
operator ()put_dec_body11551c0b2f7Stbbdev     void operator()( int ) const {
11651c0b2f7Stbbdev         int local_accept_count = 0;
11751c0b2f7Stbbdev         while ( local_accept_count < N ) {
11851c0b2f7Stbbdev             bool msg = my_lim.try_put( T(local_accept_count) );
11951c0b2f7Stbbdev             if ( msg == true ) {
12051c0b2f7Stbbdev                 ++local_accept_count;
12151c0b2f7Stbbdev                 ++my_accept_count;
12249e08aacStbbdev                 my_lim.decrementer().try_put( tbb::flow::continue_msg() );
12351c0b2f7Stbbdev             }
12451c0b2f7Stbbdev         }
12551c0b2f7Stbbdev     }
12651c0b2f7Stbbdev 
12751c0b2f7Stbbdev };
12851c0b2f7Stbbdev 
12951c0b2f7Stbbdev template< typename T >
test_puts_with_decrements(int num_threads,tbb::flow::limiter_node<T> & lim,tbb::flow::graph & g)13051c0b2f7Stbbdev void test_puts_with_decrements( int num_threads, tbb::flow::limiter_node< T >& lim , tbb::flow::graph& g) {
13151c0b2f7Stbbdev     parallel_receiver<T> r(g);
13251c0b2f7Stbbdev     empty_sender< tbb::flow::continue_msg > s;
13351c0b2f7Stbbdev     std::atomic<int> accept_count;
13451c0b2f7Stbbdev     accept_count = 0;
13551c0b2f7Stbbdev     tbb::flow::make_edge( lim, r );
13649e08aacStbbdev     tbb::flow::make_edge(s, lim.decrementer());
13751c0b2f7Stbbdev 
13851c0b2f7Stbbdev     // test puts with decrements
13951c0b2f7Stbbdev     utils::NativeParallelFor( num_threads, put_dec_body<T>(lim, accept_count) );
14051c0b2f7Stbbdev     int c = accept_count;
14151c0b2f7Stbbdev     CHECK_MESSAGE( c == N*num_threads, "" );
14251c0b2f7Stbbdev     CHECK_MESSAGE( r.my_count == N*num_threads, "" );
14351c0b2f7Stbbdev }
14451c0b2f7Stbbdev 
14551c0b2f7Stbbdev //
14651c0b2f7Stbbdev // Tests
14751c0b2f7Stbbdev //
14851c0b2f7Stbbdev // limiter only forwards below the limit, multiple parallel senders / single receiver
14951c0b2f7Stbbdev // multiple parallel senders that put to decrement at each accept, limiter accepts new messages
15051c0b2f7Stbbdev //
15151c0b2f7Stbbdev //
15251c0b2f7Stbbdev template< typename T >
test_parallel(int num_threads)15351c0b2f7Stbbdev int test_parallel(int num_threads) {
15451c0b2f7Stbbdev 
15551c0b2f7Stbbdev    // test puts with no decrements
15651c0b2f7Stbbdev    for ( int i = 0; i < L; ++i ) {
15751c0b2f7Stbbdev        tbb::flow::graph g;
15851c0b2f7Stbbdev        tbb::flow::limiter_node< T > lim(g, i);
15951c0b2f7Stbbdev        parallel_receiver<T> r(g);
16051c0b2f7Stbbdev        std::atomic<int> accept_count;
16151c0b2f7Stbbdev        accept_count = 0;
16251c0b2f7Stbbdev        tbb::flow::make_edge( lim, r );
16351c0b2f7Stbbdev        // test puts with no decrements
16451c0b2f7Stbbdev        utils::NativeParallelFor( num_threads, put_body<T>(lim, accept_count) );
16551c0b2f7Stbbdev        g.wait_for_all();
16651c0b2f7Stbbdev        int c = accept_count;
16751c0b2f7Stbbdev        CHECK_MESSAGE( c == i, "" );
16851c0b2f7Stbbdev    }
16951c0b2f7Stbbdev 
17051c0b2f7Stbbdev    // test puts with decrements
17151c0b2f7Stbbdev    for ( int i = 1; i < L; ++i ) {
17251c0b2f7Stbbdev        tbb::flow::graph g;
17351c0b2f7Stbbdev        tbb::flow::limiter_node< T > lim(g, i);
17451c0b2f7Stbbdev        test_puts_with_decrements(num_threads, lim, g);
17551c0b2f7Stbbdev        tbb::flow::limiter_node< T > lim_copy( lim );
17651c0b2f7Stbbdev        test_puts_with_decrements(num_threads, lim_copy, g);
17751c0b2f7Stbbdev    }
17851c0b2f7Stbbdev 
17951c0b2f7Stbbdev    return 0;
18051c0b2f7Stbbdev }
18151c0b2f7Stbbdev 
18251c0b2f7Stbbdev //
18351c0b2f7Stbbdev // Tests
18451c0b2f7Stbbdev //
18551c0b2f7Stbbdev // limiter only forwards below the limit, single sender / single receiver
18651c0b2f7Stbbdev // at reject, a put to decrement, will cause next message to be accepted
18751c0b2f7Stbbdev //
18851c0b2f7Stbbdev template< typename T >
test_serial()18951c0b2f7Stbbdev int test_serial() {
19051c0b2f7Stbbdev 
19151c0b2f7Stbbdev    // test puts with no decrements
19251c0b2f7Stbbdev    for ( int i = 0; i < L; ++i ) {
19351c0b2f7Stbbdev        tbb::flow::graph g;
19451c0b2f7Stbbdev        tbb::flow::limiter_node< T > lim(g, i);
19551c0b2f7Stbbdev        serial_receiver<T> r(g);
19651c0b2f7Stbbdev        tbb::flow::make_edge( lim, r );
19751c0b2f7Stbbdev        for ( int j = 0; j < L; ++j ) {
19851c0b2f7Stbbdev            bool msg = lim.try_put( T(j) );
19951c0b2f7Stbbdev            CHECK_MESSAGE( (( j < i && msg == true ) || ( j >= i && msg == false )), "" );
20051c0b2f7Stbbdev        }
20151c0b2f7Stbbdev        g.wait_for_all();
20251c0b2f7Stbbdev    }
20351c0b2f7Stbbdev 
20451c0b2f7Stbbdev    // test puts with decrements
20551c0b2f7Stbbdev    for ( int i = 1; i < L; ++i ) {
20651c0b2f7Stbbdev        tbb::flow::graph g;
20751c0b2f7Stbbdev        tbb::flow::limiter_node< T > lim(g, i);
20851c0b2f7Stbbdev        serial_receiver<T> r(g);
20951c0b2f7Stbbdev        empty_sender< tbb::flow::continue_msg > s;
21051c0b2f7Stbbdev        tbb::flow::make_edge( lim, r );
21149e08aacStbbdev        tbb::flow::make_edge(s, lim.decrementer());
21251c0b2f7Stbbdev        for ( int j = 0; j < N; ++j ) {
21351c0b2f7Stbbdev            bool msg = lim.try_put( T(j) );
21451c0b2f7Stbbdev            CHECK_MESSAGE( (( j < i && msg == true ) || ( j >= i && msg == false )), "" );
21551c0b2f7Stbbdev            if ( msg == false ) {
21649e08aacStbbdev                lim.decrementer().try_put( tbb::flow::continue_msg() );
21751c0b2f7Stbbdev                msg = lim.try_put( T(j) );
21851c0b2f7Stbbdev                CHECK_MESSAGE( msg == true, "" );
21951c0b2f7Stbbdev            }
22051c0b2f7Stbbdev        }
22151c0b2f7Stbbdev    }
22251c0b2f7Stbbdev    return 0;
22351c0b2f7Stbbdev }
22451c0b2f7Stbbdev 
225*89b2e0e3SOlga Malysheva // reported bug in limiter (https://community.intel.com/t5/Intel-oneAPI-Threading-Building/multifun-node-try-put-several-messages-to-one-successor-crashes/m-p/922844)
22651c0b2f7Stbbdev #define DECREMENT_OUTPUT 1  // the port number of the decrement output of the multifunction_node
22751c0b2f7Stbbdev #define LIMITER_OUTPUT 0    // port number of the integer output
22851c0b2f7Stbbdev 
22951c0b2f7Stbbdev typedef tbb::flow::multifunction_node<int, std::tuple<int,tbb::flow::continue_msg> > mfnode_type;
23051c0b2f7Stbbdev 
23151c0b2f7Stbbdev std::atomic<size_t> emit_count;
23251c0b2f7Stbbdev std::atomic<size_t> emit_sum;
23351c0b2f7Stbbdev std::atomic<size_t> receive_count;
23451c0b2f7Stbbdev std::atomic<size_t> receive_sum;
23551c0b2f7Stbbdev 
23651c0b2f7Stbbdev struct mfnode_body {
23751c0b2f7Stbbdev     int max_cnt;
23851c0b2f7Stbbdev     std::atomic<int>* my_cnt;
mfnode_bodymfnode_body23951c0b2f7Stbbdev     mfnode_body(const int& _max, std::atomic<int> &_my) : max_cnt(_max), my_cnt(&_my)  { }
operator ()mfnode_body24051c0b2f7Stbbdev     void operator()(const int &/*in*/, mfnode_type::output_ports_type &out) {
24151c0b2f7Stbbdev         int lcnt = ++(*my_cnt);
24251c0b2f7Stbbdev         if(lcnt > max_cnt) {
24351c0b2f7Stbbdev             return;
24451c0b2f7Stbbdev         }
24551c0b2f7Stbbdev         // put one continue_msg to the decrement of the limiter.
24651c0b2f7Stbbdev         if(!std::get<DECREMENT_OUTPUT>(out).try_put(tbb::flow::continue_msg())) {
24751c0b2f7Stbbdev             CHECK_MESSAGE( (false),"Unexpected rejection of decrement");
24851c0b2f7Stbbdev         }
24951c0b2f7Stbbdev         {
25051c0b2f7Stbbdev             // put messages to the input of the limiter_node until it rejects.
25151c0b2f7Stbbdev             while( std::get<LIMITER_OUTPUT>(out).try_put(lcnt) ) {
25251c0b2f7Stbbdev                 emit_sum += lcnt;
25351c0b2f7Stbbdev                 ++emit_count;
25451c0b2f7Stbbdev             }
25551c0b2f7Stbbdev         }
25651c0b2f7Stbbdev     }
25751c0b2f7Stbbdev };
25851c0b2f7Stbbdev 
25951c0b2f7Stbbdev struct fn_body {
operator ()fn_body26051c0b2f7Stbbdev     int operator()(const int &in) {
26151c0b2f7Stbbdev         receive_sum += in;
26251c0b2f7Stbbdev         ++receive_count;
26351c0b2f7Stbbdev         return in;
26451c0b2f7Stbbdev     }
26551c0b2f7Stbbdev };
26651c0b2f7Stbbdev 
26751c0b2f7Stbbdev //                   +------------+
26851c0b2f7Stbbdev //    +---------+    |            v
26951c0b2f7Stbbdev //    | mf_node |0---+       +----------+          +----------+
27051c0b2f7Stbbdev // +->|         |1---------->| lim_node |--------->| fn_node  |--+
27151c0b2f7Stbbdev // |  +---------+            +----------+          +----------+  |
27251c0b2f7Stbbdev // |                                                             |
27351c0b2f7Stbbdev // |                                                             |
27451c0b2f7Stbbdev // +-------------------------------------------------------------+
27551c0b2f7Stbbdev //
27651c0b2f7Stbbdev void
test_multifunction_to_limiter(int _max,int _nparallel)27751c0b2f7Stbbdev test_multifunction_to_limiter(int _max, int _nparallel) {
27851c0b2f7Stbbdev     tbb::flow::graph g;
27951c0b2f7Stbbdev     emit_count = 0;
28051c0b2f7Stbbdev     emit_sum = 0;
28151c0b2f7Stbbdev     receive_count = 0;
28251c0b2f7Stbbdev     receive_sum = 0;
28351c0b2f7Stbbdev     std::atomic<int> local_cnt;
28451c0b2f7Stbbdev     local_cnt = 0;
28551c0b2f7Stbbdev     mfnode_type mf_node(g, tbb::flow::unlimited, mfnode_body(_max, local_cnt));
28651c0b2f7Stbbdev     tbb::flow::function_node<int, int> fn_node(g, tbb::flow::unlimited, fn_body());
28751c0b2f7Stbbdev     tbb::flow::limiter_node<int> lim_node(g, _nparallel);
28851c0b2f7Stbbdev     tbb::flow::make_edge(tbb::flow::output_port<LIMITER_OUTPUT>(mf_node), lim_node);
28949e08aacStbbdev     tbb::flow::make_edge(tbb::flow::output_port<DECREMENT_OUTPUT>(mf_node), lim_node.decrementer());
29051c0b2f7Stbbdev     tbb::flow::make_edge(lim_node, fn_node);
29151c0b2f7Stbbdev     tbb::flow::make_edge(fn_node, mf_node);
29251c0b2f7Stbbdev 
29351c0b2f7Stbbdev     mf_node.try_put(1);
29451c0b2f7Stbbdev     g.wait_for_all();
29551c0b2f7Stbbdev     CHECK_MESSAGE( (emit_count == receive_count), "counts do not match");
29651c0b2f7Stbbdev     CHECK_MESSAGE( (emit_sum == receive_sum), "sums do not match");
29751c0b2f7Stbbdev 
29851c0b2f7Stbbdev     // reset, test again
29951c0b2f7Stbbdev     g.reset();
30051c0b2f7Stbbdev     emit_count = 0;
30151c0b2f7Stbbdev     emit_sum = 0;
30251c0b2f7Stbbdev     receive_count = 0;
30351c0b2f7Stbbdev     receive_sum = 0;
3045e91b2c0SVladislav Shchapov     local_cnt = 0;
30551c0b2f7Stbbdev     mf_node.try_put(1);
30651c0b2f7Stbbdev     g.wait_for_all();
30751c0b2f7Stbbdev     CHECK_MESSAGE( (emit_count == receive_count), "counts do not match");
30851c0b2f7Stbbdev     CHECK_MESSAGE( (emit_sum == receive_sum), "sums do not match");
30951c0b2f7Stbbdev }
31051c0b2f7Stbbdev 
31151c0b2f7Stbbdev 
31251c0b2f7Stbbdev void
test_continue_msg_reception()31351c0b2f7Stbbdev test_continue_msg_reception() {
31451c0b2f7Stbbdev     tbb::flow::graph g;
31551c0b2f7Stbbdev     tbb::flow::limiter_node<int> ln(g,2);
31651c0b2f7Stbbdev     tbb::flow::queue_node<int>   qn(g);
31751c0b2f7Stbbdev     tbb::flow::make_edge(ln, qn);
31849e08aacStbbdev     ln.decrementer().try_put(tbb::flow::continue_msg());
31951c0b2f7Stbbdev     ln.try_put(42);
32051c0b2f7Stbbdev     g.wait_for_all();
32151c0b2f7Stbbdev     int outint;
32251c0b2f7Stbbdev     CHECK_MESSAGE( (qn.try_get(outint) && outint == 42), "initial put to decrement stops node");
32351c0b2f7Stbbdev }
32451c0b2f7Stbbdev 
32551c0b2f7Stbbdev 
32651c0b2f7Stbbdev //
32751c0b2f7Stbbdev // This test ascertains that if a message is not successfully put
32851c0b2f7Stbbdev // to a successor, the message is not dropped but released.
32951c0b2f7Stbbdev //
33051c0b2f7Stbbdev 
test_reserve_release_messages()33151c0b2f7Stbbdev void test_reserve_release_messages() {
33251c0b2f7Stbbdev     using namespace tbb::flow;
33351c0b2f7Stbbdev     graph g;
33451c0b2f7Stbbdev 
33551c0b2f7Stbbdev     //making two queue_nodes: one broadcast_node and one limiter_node
33651c0b2f7Stbbdev     queue_node<int> input_queue(g);
33751c0b2f7Stbbdev     queue_node<int> output_queue(g);
33851c0b2f7Stbbdev     broadcast_node<int> broad(g);
33951c0b2f7Stbbdev     limiter_node<int, int> limit(g,2); //threshold of 2
34051c0b2f7Stbbdev 
34151c0b2f7Stbbdev     //edges
34251c0b2f7Stbbdev     make_edge(input_queue, limit);
34351c0b2f7Stbbdev     make_edge(limit, output_queue);
34449e08aacStbbdev     make_edge(broad,limit.decrementer());
34551c0b2f7Stbbdev 
34651c0b2f7Stbbdev     int list[4] = {19, 33, 72, 98}; //list to be put to the input queue
34751c0b2f7Stbbdev 
34851c0b2f7Stbbdev     input_queue.try_put(list[0]); // succeeds
34951c0b2f7Stbbdev     input_queue.try_put(list[1]); // succeeds
35051c0b2f7Stbbdev     input_queue.try_put(list[2]); // fails, stored in upstream buffer
35151c0b2f7Stbbdev     g.wait_for_all();
35251c0b2f7Stbbdev 
35351c0b2f7Stbbdev     remove_edge(limit, output_queue); //remove successor
35451c0b2f7Stbbdev 
35551c0b2f7Stbbdev     //sending message to the decrement port of the limiter
35651c0b2f7Stbbdev     broad.try_put(1); //failed message retrieved.
35751c0b2f7Stbbdev     g.wait_for_all();
35851c0b2f7Stbbdev 
3590815661eSIlya Mishin     tbb::flow::make_edge(limit, output_queue); //putting the successor back
36051c0b2f7Stbbdev 
36151c0b2f7Stbbdev     broad.try_put(1);  //drop the count
36251c0b2f7Stbbdev 
36351c0b2f7Stbbdev     input_queue.try_put(list[3]);  //success
36451c0b2f7Stbbdev     g.wait_for_all();
36551c0b2f7Stbbdev 
36651c0b2f7Stbbdev     int var=0;
36751c0b2f7Stbbdev 
36851c0b2f7Stbbdev     for (int i=0; i<4; i++) {
36951c0b2f7Stbbdev         output_queue.try_get(var);
37051c0b2f7Stbbdev         CHECK_MESSAGE( (var==list[i]), "some data dropped, input does not match output");
37151c0b2f7Stbbdev         g.wait_for_all();
37251c0b2f7Stbbdev     }
37351c0b2f7Stbbdev }
37451c0b2f7Stbbdev 
test_decrementer()37551c0b2f7Stbbdev void test_decrementer() {
37651c0b2f7Stbbdev     const int threshold = 5;
37751c0b2f7Stbbdev     tbb::flow::graph g;
37851c0b2f7Stbbdev     tbb::flow::limiter_node<int, int> limit(g, threshold);
37951c0b2f7Stbbdev     tbb::flow::queue_node<int> queue(g);
38051c0b2f7Stbbdev     make_edge(limit, queue);
38151c0b2f7Stbbdev     int m = 0;
38251c0b2f7Stbbdev     CHECK_MESSAGE( ( limit.try_put( m++ )), "Newly constructed limiter node does not accept message." );
38349e08aacStbbdev     CHECK_MESSAGE( limit.decrementer().try_put( -threshold ), // close limiter's gate
38451c0b2f7Stbbdev                    "Limiter node decrementer's port does not accept message." );
38551c0b2f7Stbbdev     CHECK_MESSAGE( ( !limit.try_put( m++ )), "Closed limiter node's accepts message." );
38649e08aacStbbdev     CHECK_MESSAGE( limit.decrementer().try_put( threshold + 5 ),  // open limiter's gate
38751c0b2f7Stbbdev                    "Limiter node decrementer's port does not accept message." );
38851c0b2f7Stbbdev     for( int i = 0; i < threshold; ++i )
38951c0b2f7Stbbdev         CHECK_MESSAGE( ( limit.try_put( m++ )), "Limiter node does not accept message while open." );
39051c0b2f7Stbbdev     CHECK_MESSAGE( ( !limit.try_put( m )), "Limiter node's gate is not closed." );
39151c0b2f7Stbbdev     g.wait_for_all();
39251c0b2f7Stbbdev     int expected[] = {0, 2, 3, 4, 5, 6};
39351c0b2f7Stbbdev     int actual = -1; m = 0;
39451c0b2f7Stbbdev     while( queue.try_get(actual) )
39551c0b2f7Stbbdev         CHECK_MESSAGE( actual == expected[m++], "" );
39651c0b2f7Stbbdev     CHECK_MESSAGE( ( sizeof(expected) / sizeof(expected[0]) == m), "Not all messages have been processed." );
39751c0b2f7Stbbdev     g.wait_for_all();
39851c0b2f7Stbbdev 
39951c0b2f7Stbbdev     const size_t threshold2 = size_t(-1);
40051c0b2f7Stbbdev     tbb::flow::limiter_node<int, long long> limit2(g, threshold2);
40151c0b2f7Stbbdev     make_edge(limit2, queue);
40251c0b2f7Stbbdev     CHECK_MESSAGE( ( limit2.try_put( 1 )), "Newly constructed limiter node does not accept message." );
40351c0b2f7Stbbdev     long long decrement_value = (long long)( size_t(-1)/2 );
40449e08aacStbbdev     CHECK_MESSAGE( limit2.decrementer().try_put( -decrement_value ),
40551c0b2f7Stbbdev                    "Limiter node decrementer's port does not accept message" );
40651c0b2f7Stbbdev     CHECK_MESSAGE( ( limit2.try_put( 2 )), "Limiter's gate should not be closed yet." );
40749e08aacStbbdev     CHECK_MESSAGE( limit2.decrementer().try_put( -decrement_value ),
40851c0b2f7Stbbdev                    "Limiter node decrementer's port does not accept message" );
40951c0b2f7Stbbdev     CHECK_MESSAGE( ( !limit2.try_put( 3 )), "Overflow happened for internal counter." );
41051c0b2f7Stbbdev     int expected2[] = {1, 2};
41151c0b2f7Stbbdev     actual = -1; m = 0;
41251c0b2f7Stbbdev     while( queue.try_get(actual) )
41351c0b2f7Stbbdev         CHECK_MESSAGE( actual == expected2[m++], "" );
41451c0b2f7Stbbdev     CHECK_MESSAGE( ( sizeof(expected2) / sizeof(expected2[0]) == m), "Not all messages have been processed." );
41551c0b2f7Stbbdev     g.wait_for_all();
41625e2b1ddSIlya Mishin 
41725e2b1ddSIlya Mishin     const size_t threshold3 = 10;
41825e2b1ddSIlya Mishin     tbb::flow::limiter_node<int, long long> limit3(g, threshold3);
41925e2b1ddSIlya Mishin     make_edge(limit3, queue);
42025e2b1ddSIlya Mishin     long long decrement_value3 = 3;
42125e2b1ddSIlya Mishin     CHECK_MESSAGE( limit3.decrementer().try_put( -decrement_value3 ),
42225e2b1ddSIlya Mishin                    "Limiter node decrementer's port does not accept message" );
42325e2b1ddSIlya Mishin 
42425e2b1ddSIlya Mishin     m = 0;
42525e2b1ddSIlya Mishin     while( limit3.try_put( m ) ){ m++; };
42625e2b1ddSIlya Mishin     CHECK_MESSAGE( m == threshold3 - decrement_value3, "Not all messages have been accepted." );
42725e2b1ddSIlya Mishin 
42825e2b1ddSIlya Mishin     actual = -1; m = 0;
42925e2b1ddSIlya Mishin     while( queue.try_get(actual) ){
43025e2b1ddSIlya Mishin         CHECK_MESSAGE( actual == m++, "Not all messages have been processed." );
43125e2b1ddSIlya Mishin     }
43225e2b1ddSIlya Mishin 
43325e2b1ddSIlya Mishin     g.wait_for_all();
43425e2b1ddSIlya Mishin     CHECK_MESSAGE( m == threshold3 - decrement_value3, "Not all messages have been processed." );
43551c0b2f7Stbbdev }
43651c0b2f7Stbbdev 
test_try_put_without_successors()43751c0b2f7Stbbdev void test_try_put_without_successors() {
43851c0b2f7Stbbdev     tbb::flow::graph g;
43955f9b178SIvan Kochin     int try_put_num{3};
44051c0b2f7Stbbdev     tbb::flow::buffer_node<int> bn(g);
44151c0b2f7Stbbdev     tbb::flow::limiter_node<int> ln(g, try_put_num);
4427567de93SIlya Mishin 
44351c0b2f7Stbbdev     tbb::flow::make_edge(bn, ln);
4447567de93SIlya Mishin 
44555f9b178SIvan Kochin     int i = 1;
44651c0b2f7Stbbdev     for (; i <= try_put_num; i++)
44751c0b2f7Stbbdev         bn.try_put(i);
44851c0b2f7Stbbdev 
44955f9b178SIvan Kochin     std::atomic<int> counter{0};
45051c0b2f7Stbbdev     tbb::flow::function_node<int, int> fn(g, tbb::flow::unlimited,
45151c0b2f7Stbbdev         [&](int input) {
45251c0b2f7Stbbdev             counter += input;
45351c0b2f7Stbbdev             return int{};
45451c0b2f7Stbbdev         }
45551c0b2f7Stbbdev     );
4567567de93SIlya Mishin 
4570815661eSIlya Mishin     tbb::flow::make_edge(ln, fn);
4587567de93SIlya Mishin 
45951c0b2f7Stbbdev     g.wait_for_all();
46051c0b2f7Stbbdev     CHECK((counter == i * try_put_num / 2));
46151c0b2f7Stbbdev 
46251c0b2f7Stbbdev     // Check the lost message
46351c0b2f7Stbbdev     tbb::flow::remove_edge(bn, ln);
46449e08aacStbbdev     ln.decrementer().try_put(tbb::flow::continue_msg());
46551c0b2f7Stbbdev     bn.try_put(try_put_num + 1);
46651c0b2f7Stbbdev     g.wait_for_all();
46751c0b2f7Stbbdev     CHECK((counter == i * try_put_num / 2));
46851c0b2f7Stbbdev 
46951c0b2f7Stbbdev }
47051c0b2f7Stbbdev 
47151c0b2f7Stbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
47251c0b2f7Stbbdev #include <array>
47351c0b2f7Stbbdev #include <vector>
test_follows_and_precedes_api()47451c0b2f7Stbbdev void test_follows_and_precedes_api() {
47551c0b2f7Stbbdev     using msg_t = tbb::flow::continue_msg;
47651c0b2f7Stbbdev 
47751c0b2f7Stbbdev     std::array<msg_t, 3> messages_for_follows= { {msg_t(), msg_t(), msg_t()} };
47851c0b2f7Stbbdev     std::vector<msg_t> messages_for_precedes = {msg_t()};
47951c0b2f7Stbbdev 
48051c0b2f7Stbbdev     follows_and_precedes_testing::test_follows
48151c0b2f7Stbbdev         <msg_t, tbb::flow::limiter_node<msg_t, msg_t>>(messages_for_follows, 1000);
48251c0b2f7Stbbdev     follows_and_precedes_testing::test_precedes
48351c0b2f7Stbbdev         <msg_t, tbb::flow::limiter_node<msg_t, msg_t>>(messages_for_precedes, 1000);
48451c0b2f7Stbbdev 
48551c0b2f7Stbbdev }
48651c0b2f7Stbbdev #endif
48751c0b2f7Stbbdev 
48851c0b2f7Stbbdev #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
test_deduction_guides()48951c0b2f7Stbbdev void test_deduction_guides() {
49051c0b2f7Stbbdev     using namespace tbb::flow;
49151c0b2f7Stbbdev 
49251c0b2f7Stbbdev     graph g;
49351c0b2f7Stbbdev     broadcast_node<int> br(g);
49451c0b2f7Stbbdev     limiter_node<int> l0(g, 100);
49551c0b2f7Stbbdev 
49651c0b2f7Stbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
49751c0b2f7Stbbdev     limiter_node l1(follows(br), 100);
49851c0b2f7Stbbdev     static_assert(std::is_same_v<decltype(l1), limiter_node<int>>);
49951c0b2f7Stbbdev 
50051c0b2f7Stbbdev     limiter_node l2(precedes(br), 100);
50151c0b2f7Stbbdev     static_assert(std::is_same_v<decltype(l2), limiter_node<int>>);
50251c0b2f7Stbbdev #endif
50351c0b2f7Stbbdev 
50451c0b2f7Stbbdev     limiter_node l3(l0);
50551c0b2f7Stbbdev     static_assert(std::is_same_v<decltype(l3), limiter_node<int>>);
50651c0b2f7Stbbdev }
50751c0b2f7Stbbdev #endif
50851c0b2f7Stbbdev 
test_decrement_while_try_put_task()5090815661eSIlya Mishin void test_decrement_while_try_put_task() {
5100815661eSIlya Mishin     constexpr int threshold = 50000;
5110815661eSIlya Mishin 
5120815661eSIlya Mishin     tbb::flow::graph graph{};
5130815661eSIlya Mishin     std::atomic<int> processed;
5140815661eSIlya Mishin     tbb::flow::input_node<int> input{ graph, [&](tbb::flow_control & fc) -> int {
5150815661eSIlya Mishin         static int i = {};
5160815661eSIlya Mishin         if (i++ >= threshold) fc.stop();
5170815661eSIlya Mishin         return i;
5180815661eSIlya Mishin     }};
5190815661eSIlya Mishin     tbb::flow::limiter_node<int, int> blockingNode{ graph, 1 };
5200815661eSIlya Mishin     tbb::flow::multifunction_node<int, std::tuple<int>> processing{ graph, tbb::flow::serial,
5210815661eSIlya Mishin         [&](const int & value, typename decltype(processing)::output_ports_type & out) {
5220815661eSIlya Mishin             if (value != threshold)
5230815661eSIlya Mishin                 std::get<0>(out).try_put(1);
5240815661eSIlya Mishin             processed.store(value);
5250815661eSIlya Mishin         }};
5260815661eSIlya Mishin 
5270815661eSIlya Mishin     tbb::flow::make_edge(input, blockingNode);
5280815661eSIlya Mishin     tbb::flow::make_edge(blockingNode, processing);
5290815661eSIlya Mishin     tbb::flow::make_edge(processing, blockingNode.decrementer());
5300815661eSIlya Mishin 
5310815661eSIlya Mishin     input.activate();
5320815661eSIlya Mishin 
5330815661eSIlya Mishin     graph.wait_for_all();
5340815661eSIlya Mishin     CHECK_MESSAGE(processed.load() == threshold, "decrementer terminate flow graph work");
5350815661eSIlya Mishin }
5360815661eSIlya Mishin 
5370815661eSIlya Mishin 
53851c0b2f7Stbbdev //! Test puts on limiter_node with decrements and varying parallelism levels
53951c0b2f7Stbbdev //! \brief \ref error_guessing
54051c0b2f7Stbbdev TEST_CASE("Serial and parallel tests") {
54151c0b2f7Stbbdev     for (unsigned i = 1; i <= 2 * utils::MaxThread; ++i) {
54251c0b2f7Stbbdev         tbb::task_arena arena(i);
54351c0b2f7Stbbdev         arena.execute(
__anon70523bc50402() 54451c0b2f7Stbbdev             [i]() {
54551c0b2f7Stbbdev                 test_serial<int>();
54651c0b2f7Stbbdev                 test_parallel<int>(i);
54751c0b2f7Stbbdev             }
54851c0b2f7Stbbdev         );
54951c0b2f7Stbbdev     }
55051c0b2f7Stbbdev }
55151c0b2f7Stbbdev 
55251c0b2f7Stbbdev //! Test initial put of continue_msg on decrementer port does not stop message flow
55351c0b2f7Stbbdev //! \brief \ref error_guessing
55451c0b2f7Stbbdev TEST_CASE("Test continue_msg reception") {
55551c0b2f7Stbbdev     test_continue_msg_reception();
55651c0b2f7Stbbdev }
55751c0b2f7Stbbdev 
5580815661eSIlya Mishin //! Test put message on decrementer port does not stop message flow
5590815661eSIlya Mishin //! \brief \ref error_guessing
5600815661eSIlya Mishin TEST_CASE("Test try_put to decrementer while try_put to limiter_node") {
5610815661eSIlya Mishin     test_decrement_while_try_put_task();
5620815661eSIlya Mishin }
5630815661eSIlya Mishin 
56451c0b2f7Stbbdev //! Test multifunction_node connected to limiter_node
56551c0b2f7Stbbdev //! \brief \ref error_guessing
56651c0b2f7Stbbdev TEST_CASE("Multifunction connected to limiter") {
56751c0b2f7Stbbdev     test_multifunction_to_limiter(30,3);
56851c0b2f7Stbbdev     test_multifunction_to_limiter(300,13);
56951c0b2f7Stbbdev     test_multifunction_to_limiter(3000,1);
57051c0b2f7Stbbdev }
57151c0b2f7Stbbdev 
57251c0b2f7Stbbdev //! Test message release if successor doesn't accept
57351c0b2f7Stbbdev //! \brief \ref requirement
57451c0b2f7Stbbdev TEST_CASE("Message is released if successor does not accept") {
57551c0b2f7Stbbdev     test_reserve_release_messages();
57651c0b2f7Stbbdev }
57751c0b2f7Stbbdev 
57851c0b2f7Stbbdev //! Test decrementer
57951c0b2f7Stbbdev //! \brief \ref requirement \ref error_guessing
58051c0b2f7Stbbdev TEST_CASE("Decrementer") {
58151c0b2f7Stbbdev     test_decrementer();
58251c0b2f7Stbbdev }
58351c0b2f7Stbbdev 
58451c0b2f7Stbbdev //! Test try_put() without successor
58551c0b2f7Stbbdev //! \brief \ref error_guessing
58651c0b2f7Stbbdev TEST_CASE("Test try_put() without successors") {
58751c0b2f7Stbbdev     test_try_put_without_successors();
58851c0b2f7Stbbdev }
58951c0b2f7Stbbdev 
59051c0b2f7Stbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
59151c0b2f7Stbbdev //! Test follows and precedes API
59251c0b2f7Stbbdev //! \brief \ref error_guessing
59351c0b2f7Stbbdev TEST_CASE( "Support for follows and precedes API" ) {
59451c0b2f7Stbbdev     test_follows_and_precedes_api();
59551c0b2f7Stbbdev }
59651c0b2f7Stbbdev #endif
59751c0b2f7Stbbdev 
59851c0b2f7Stbbdev #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
59951c0b2f7Stbbdev //! Test deduction guides
60051c0b2f7Stbbdev //! \brief \ref requirement
60151c0b2f7Stbbdev TEST_CASE( "Deduction guides" ) {
60251c0b2f7Stbbdev     test_deduction_guides();
60351c0b2f7Stbbdev }
60451c0b2f7Stbbdev #endif
6056edb5c3aSVladimir Serov 
6066edb5c3aSVladimir Serov struct TestLargeStruct {
6076edb5c3aSVladimir Serov     char bytes[512]{ 0 };
6086edb5c3aSVladimir Serov };
6096edb5c3aSVladimir Serov 
610534428d0SVladimir Serov //! Test correct node deallocation while using small_object_pool.
611534428d0SVladimir Serov //! (see https://github.com/oneapi-src/oneTBB/issues/639)
612534428d0SVladimir Serov //! \brief \ref error_guessing
613534428d0SVladimir Serov TEST_CASE("Test correct node deallocation while using small_object_pool") {
6146edb5c3aSVladimir Serov     tbb::flow::graph graph;
615534428d0SVladimir Serov     tbb::flow::queue_node<TestLargeStruct> input_node( graph );
616534428d0SVladimir Serov     tbb::flow::function_node<TestLargeStruct> func( graph, tbb::flow::serial,
__anon70523bc50502(const TestLargeStruct& input) 617534428d0SVladimir Serov         [](const TestLargeStruct& input) { return input; } );
6186edb5c3aSVladimir Serov 
6196edb5c3aSVladimir Serov     tbb::flow::make_edge( input_node, func );
6206edb5c3aSVladimir Serov     CHECK( input_node.try_put( TestLargeStruct{} ) );
6216edb5c3aSVladimir Serov     graph.wait_for_all();
6226edb5c3aSVladimir Serov 
6235fc0a5f6SAlex     tbb::task_scheduler_handle handle{ tbb::attach{} };
624d1667d51SVladimir Serov     tbb::finalize( handle, std::nothrow );
6256edb5c3aSVladimir Serov }
626