151c0b2f7Stbbdev /*
2*89b2e0e3SOlga Malysheva Copyright (c) 2005-2023 Intel Corporation
351c0b2f7Stbbdev
451c0b2f7Stbbdev Licensed under the Apache License, Version 2.0 (the "License");
551c0b2f7Stbbdev you may not use this file except in compliance with the License.
651c0b2f7Stbbdev You may obtain a copy of the License at
751c0b2f7Stbbdev
851c0b2f7Stbbdev http://www.apache.org/licenses/LICENSE-2.0
951c0b2f7Stbbdev
1051c0b2f7Stbbdev Unless required by applicable law or agreed to in writing, software
1151c0b2f7Stbbdev distributed under the License is distributed on an "AS IS" BASIS,
1251c0b2f7Stbbdev WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1351c0b2f7Stbbdev See the License for the specific language governing permissions and
1451c0b2f7Stbbdev limitations under the License.
1551c0b2f7Stbbdev */
1651c0b2f7Stbbdev
17b15aabb3Stbbdev #if __INTEL_COMPILER && _MSC_VER
18b15aabb3Stbbdev #pragma warning(disable : 2586) // decorated name length exceeded, name was truncated
19b15aabb3Stbbdev #endif
20b15aabb3Stbbdev
2151c0b2f7Stbbdev #include "common/config.h"
2251c0b2f7Stbbdev
2351c0b2f7Stbbdev #include "tbb/flow_graph.h"
2451c0b2f7Stbbdev
2551c0b2f7Stbbdev #include "common/test.h"
2651c0b2f7Stbbdev #include "common/utils.h"
2751c0b2f7Stbbdev #include "common/utils_assert.h"
2851c0b2f7Stbbdev #include "common/test_follows_and_precedes_api.h"
296edb5c3aSVladimir Serov #include "tbb/global_control.h"
3051c0b2f7Stbbdev
3151c0b2f7Stbbdev #include <atomic>
3251c0b2f7Stbbdev
3351c0b2f7Stbbdev
3451c0b2f7Stbbdev //! \file test_limiter_node.cpp
3551c0b2f7Stbbdev //! \brief Test for [flow_graph.limiter_node] specification
3651c0b2f7Stbbdev
3751c0b2f7Stbbdev
3851c0b2f7Stbbdev const int L = 10;
3951c0b2f7Stbbdev const int N = 1000;
4051c0b2f7Stbbdev
4151c0b2f7Stbbdev using tbb::detail::d1::SUCCESSFULLY_ENQUEUED;
4251c0b2f7Stbbdev using tbb::detail::d1::graph_task;
4351c0b2f7Stbbdev
4451c0b2f7Stbbdev template< typename T >
4551c0b2f7Stbbdev struct serial_receiver : public tbb::flow::receiver<T>, utils::NoAssign {
4651c0b2f7Stbbdev T next_value;
4751c0b2f7Stbbdev tbb::flow::graph& my_graph;
4851c0b2f7Stbbdev
serial_receiverserial_receiver4951c0b2f7Stbbdev serial_receiver(tbb::flow::graph& g) : next_value(T(0)), my_graph(g) {}
5051c0b2f7Stbbdev
try_put_taskserial_receiver5151c0b2f7Stbbdev graph_task* try_put_task( const T &v ) override {
5251c0b2f7Stbbdev CHECK_MESSAGE( next_value++ == v, "" );
5351c0b2f7Stbbdev return const_cast<graph_task*>(SUCCESSFULLY_ENQUEUED);
5451c0b2f7Stbbdev }
5551c0b2f7Stbbdev
graph_referenceserial_receiver5651c0b2f7Stbbdev tbb::flow::graph& graph_reference() const override {
5751c0b2f7Stbbdev return my_graph;
5851c0b2f7Stbbdev }
5951c0b2f7Stbbdev };
6051c0b2f7Stbbdev
6151c0b2f7Stbbdev template< typename T >
6251c0b2f7Stbbdev struct parallel_receiver : public tbb::flow::receiver<T>, utils::NoAssign {
6351c0b2f7Stbbdev
6451c0b2f7Stbbdev std::atomic<int> my_count;
6551c0b2f7Stbbdev tbb::flow::graph& my_graph;
6651c0b2f7Stbbdev
parallel_receiverparallel_receiver6751c0b2f7Stbbdev parallel_receiver(tbb::flow::graph& g) : my_graph(g) { my_count = 0; }
6851c0b2f7Stbbdev
try_put_taskparallel_receiver6951c0b2f7Stbbdev graph_task* try_put_task( const T &/*v*/ ) override {
7051c0b2f7Stbbdev ++my_count;
7151c0b2f7Stbbdev return const_cast<graph_task*>(SUCCESSFULLY_ENQUEUED);
7251c0b2f7Stbbdev }
7351c0b2f7Stbbdev
graph_referenceparallel_receiver7451c0b2f7Stbbdev tbb::flow::graph& graph_reference() const override {
7551c0b2f7Stbbdev return my_graph;
7651c0b2f7Stbbdev }
7751c0b2f7Stbbdev };
7851c0b2f7Stbbdev
7951c0b2f7Stbbdev template< typename T >
8051c0b2f7Stbbdev struct empty_sender : public tbb::flow::sender<T> {
8151c0b2f7Stbbdev typedef typename tbb::flow::sender<T>::successor_type successor_type;
8251c0b2f7Stbbdev
register_successorempty_sender8351c0b2f7Stbbdev bool register_successor( successor_type & ) override { return false; }
remove_successorempty_sender8451c0b2f7Stbbdev bool remove_successor( successor_type & ) override { return false; }
8551c0b2f7Stbbdev };
8651c0b2f7Stbbdev
8751c0b2f7Stbbdev
8851c0b2f7Stbbdev template< typename T >
8951c0b2f7Stbbdev struct put_body : utils::NoAssign {
9051c0b2f7Stbbdev
9151c0b2f7Stbbdev tbb::flow::limiter_node<T> &my_lim;
9251c0b2f7Stbbdev std::atomic<int> &my_accept_count;
9351c0b2f7Stbbdev
put_bodyput_body9451c0b2f7Stbbdev put_body( tbb::flow::limiter_node<T> &lim, std::atomic<int> &accept_count ) :
9551c0b2f7Stbbdev my_lim(lim), my_accept_count(accept_count) {}
9651c0b2f7Stbbdev
operator ()put_body9751c0b2f7Stbbdev void operator()( int ) const {
9851c0b2f7Stbbdev for ( int i = 0; i < L; ++i ) {
9951c0b2f7Stbbdev bool msg = my_lim.try_put( T(i) );
10051c0b2f7Stbbdev if ( msg == true )
10151c0b2f7Stbbdev ++my_accept_count;
10251c0b2f7Stbbdev }
10351c0b2f7Stbbdev }
10451c0b2f7Stbbdev };
10551c0b2f7Stbbdev
10651c0b2f7Stbbdev template< typename T >
10751c0b2f7Stbbdev struct put_dec_body : utils::NoAssign {
10851c0b2f7Stbbdev
10951c0b2f7Stbbdev tbb::flow::limiter_node<T> &my_lim;
11051c0b2f7Stbbdev std::atomic<int> &my_accept_count;
11151c0b2f7Stbbdev
put_dec_bodyput_dec_body11251c0b2f7Stbbdev put_dec_body( tbb::flow::limiter_node<T> &lim, std::atomic<int> &accept_count ) :
11351c0b2f7Stbbdev my_lim(lim), my_accept_count(accept_count) {}
11451c0b2f7Stbbdev
operator ()put_dec_body11551c0b2f7Stbbdev void operator()( int ) const {
11651c0b2f7Stbbdev int local_accept_count = 0;
11751c0b2f7Stbbdev while ( local_accept_count < N ) {
11851c0b2f7Stbbdev bool msg = my_lim.try_put( T(local_accept_count) );
11951c0b2f7Stbbdev if ( msg == true ) {
12051c0b2f7Stbbdev ++local_accept_count;
12151c0b2f7Stbbdev ++my_accept_count;
12249e08aacStbbdev my_lim.decrementer().try_put( tbb::flow::continue_msg() );
12351c0b2f7Stbbdev }
12451c0b2f7Stbbdev }
12551c0b2f7Stbbdev }
12651c0b2f7Stbbdev
12751c0b2f7Stbbdev };
12851c0b2f7Stbbdev
12951c0b2f7Stbbdev template< typename T >
test_puts_with_decrements(int num_threads,tbb::flow::limiter_node<T> & lim,tbb::flow::graph & g)13051c0b2f7Stbbdev void test_puts_with_decrements( int num_threads, tbb::flow::limiter_node< T >& lim , tbb::flow::graph& g) {
13151c0b2f7Stbbdev parallel_receiver<T> r(g);
13251c0b2f7Stbbdev empty_sender< tbb::flow::continue_msg > s;
13351c0b2f7Stbbdev std::atomic<int> accept_count;
13451c0b2f7Stbbdev accept_count = 0;
13551c0b2f7Stbbdev tbb::flow::make_edge( lim, r );
13649e08aacStbbdev tbb::flow::make_edge(s, lim.decrementer());
13751c0b2f7Stbbdev
13851c0b2f7Stbbdev // test puts with decrements
13951c0b2f7Stbbdev utils::NativeParallelFor( num_threads, put_dec_body<T>(lim, accept_count) );
14051c0b2f7Stbbdev int c = accept_count;
14151c0b2f7Stbbdev CHECK_MESSAGE( c == N*num_threads, "" );
14251c0b2f7Stbbdev CHECK_MESSAGE( r.my_count == N*num_threads, "" );
14351c0b2f7Stbbdev }
14451c0b2f7Stbbdev
14551c0b2f7Stbbdev //
14651c0b2f7Stbbdev // Tests
14751c0b2f7Stbbdev //
14851c0b2f7Stbbdev // limiter only forwards below the limit, multiple parallel senders / single receiver
14951c0b2f7Stbbdev // multiple parallel senders that put to decrement at each accept, limiter accepts new messages
15051c0b2f7Stbbdev //
15151c0b2f7Stbbdev //
15251c0b2f7Stbbdev template< typename T >
test_parallel(int num_threads)15351c0b2f7Stbbdev int test_parallel(int num_threads) {
15451c0b2f7Stbbdev
15551c0b2f7Stbbdev // test puts with no decrements
15651c0b2f7Stbbdev for ( int i = 0; i < L; ++i ) {
15751c0b2f7Stbbdev tbb::flow::graph g;
15851c0b2f7Stbbdev tbb::flow::limiter_node< T > lim(g, i);
15951c0b2f7Stbbdev parallel_receiver<T> r(g);
16051c0b2f7Stbbdev std::atomic<int> accept_count;
16151c0b2f7Stbbdev accept_count = 0;
16251c0b2f7Stbbdev tbb::flow::make_edge( lim, r );
16351c0b2f7Stbbdev // test puts with no decrements
16451c0b2f7Stbbdev utils::NativeParallelFor( num_threads, put_body<T>(lim, accept_count) );
16551c0b2f7Stbbdev g.wait_for_all();
16651c0b2f7Stbbdev int c = accept_count;
16751c0b2f7Stbbdev CHECK_MESSAGE( c == i, "" );
16851c0b2f7Stbbdev }
16951c0b2f7Stbbdev
17051c0b2f7Stbbdev // test puts with decrements
17151c0b2f7Stbbdev for ( int i = 1; i < L; ++i ) {
17251c0b2f7Stbbdev tbb::flow::graph g;
17351c0b2f7Stbbdev tbb::flow::limiter_node< T > lim(g, i);
17451c0b2f7Stbbdev test_puts_with_decrements(num_threads, lim, g);
17551c0b2f7Stbbdev tbb::flow::limiter_node< T > lim_copy( lim );
17651c0b2f7Stbbdev test_puts_with_decrements(num_threads, lim_copy, g);
17751c0b2f7Stbbdev }
17851c0b2f7Stbbdev
17951c0b2f7Stbbdev return 0;
18051c0b2f7Stbbdev }
18151c0b2f7Stbbdev
18251c0b2f7Stbbdev //
18351c0b2f7Stbbdev // Tests
18451c0b2f7Stbbdev //
18551c0b2f7Stbbdev // limiter only forwards below the limit, single sender / single receiver
18651c0b2f7Stbbdev // at reject, a put to decrement, will cause next message to be accepted
18751c0b2f7Stbbdev //
18851c0b2f7Stbbdev template< typename T >
test_serial()18951c0b2f7Stbbdev int test_serial() {
19051c0b2f7Stbbdev
19151c0b2f7Stbbdev // test puts with no decrements
19251c0b2f7Stbbdev for ( int i = 0; i < L; ++i ) {
19351c0b2f7Stbbdev tbb::flow::graph g;
19451c0b2f7Stbbdev tbb::flow::limiter_node< T > lim(g, i);
19551c0b2f7Stbbdev serial_receiver<T> r(g);
19651c0b2f7Stbbdev tbb::flow::make_edge( lim, r );
19751c0b2f7Stbbdev for ( int j = 0; j < L; ++j ) {
19851c0b2f7Stbbdev bool msg = lim.try_put( T(j) );
19951c0b2f7Stbbdev CHECK_MESSAGE( (( j < i && msg == true ) || ( j >= i && msg == false )), "" );
20051c0b2f7Stbbdev }
20151c0b2f7Stbbdev g.wait_for_all();
20251c0b2f7Stbbdev }
20351c0b2f7Stbbdev
20451c0b2f7Stbbdev // test puts with decrements
20551c0b2f7Stbbdev for ( int i = 1; i < L; ++i ) {
20651c0b2f7Stbbdev tbb::flow::graph g;
20751c0b2f7Stbbdev tbb::flow::limiter_node< T > lim(g, i);
20851c0b2f7Stbbdev serial_receiver<T> r(g);
20951c0b2f7Stbbdev empty_sender< tbb::flow::continue_msg > s;
21051c0b2f7Stbbdev tbb::flow::make_edge( lim, r );
21149e08aacStbbdev tbb::flow::make_edge(s, lim.decrementer());
21251c0b2f7Stbbdev for ( int j = 0; j < N; ++j ) {
21351c0b2f7Stbbdev bool msg = lim.try_put( T(j) );
21451c0b2f7Stbbdev CHECK_MESSAGE( (( j < i && msg == true ) || ( j >= i && msg == false )), "" );
21551c0b2f7Stbbdev if ( msg == false ) {
21649e08aacStbbdev lim.decrementer().try_put( tbb::flow::continue_msg() );
21751c0b2f7Stbbdev msg = lim.try_put( T(j) );
21851c0b2f7Stbbdev CHECK_MESSAGE( msg == true, "" );
21951c0b2f7Stbbdev }
22051c0b2f7Stbbdev }
22151c0b2f7Stbbdev }
22251c0b2f7Stbbdev return 0;
22351c0b2f7Stbbdev }
22451c0b2f7Stbbdev
225*89b2e0e3SOlga Malysheva // reported bug in limiter (https://community.intel.com/t5/Intel-oneAPI-Threading-Building/multifun-node-try-put-several-messages-to-one-successor-crashes/m-p/922844)
22651c0b2f7Stbbdev #define DECREMENT_OUTPUT 1 // the port number of the decrement output of the multifunction_node
22751c0b2f7Stbbdev #define LIMITER_OUTPUT 0 // port number of the integer output
22851c0b2f7Stbbdev
22951c0b2f7Stbbdev typedef tbb::flow::multifunction_node<int, std::tuple<int,tbb::flow::continue_msg> > mfnode_type;
23051c0b2f7Stbbdev
23151c0b2f7Stbbdev std::atomic<size_t> emit_count;
23251c0b2f7Stbbdev std::atomic<size_t> emit_sum;
23351c0b2f7Stbbdev std::atomic<size_t> receive_count;
23451c0b2f7Stbbdev std::atomic<size_t> receive_sum;
23551c0b2f7Stbbdev
23651c0b2f7Stbbdev struct mfnode_body {
23751c0b2f7Stbbdev int max_cnt;
23851c0b2f7Stbbdev std::atomic<int>* my_cnt;
mfnode_bodymfnode_body23951c0b2f7Stbbdev mfnode_body(const int& _max, std::atomic<int> &_my) : max_cnt(_max), my_cnt(&_my) { }
operator ()mfnode_body24051c0b2f7Stbbdev void operator()(const int &/*in*/, mfnode_type::output_ports_type &out) {
24151c0b2f7Stbbdev int lcnt = ++(*my_cnt);
24251c0b2f7Stbbdev if(lcnt > max_cnt) {
24351c0b2f7Stbbdev return;
24451c0b2f7Stbbdev }
24551c0b2f7Stbbdev // put one continue_msg to the decrement of the limiter.
24651c0b2f7Stbbdev if(!std::get<DECREMENT_OUTPUT>(out).try_put(tbb::flow::continue_msg())) {
24751c0b2f7Stbbdev CHECK_MESSAGE( (false),"Unexpected rejection of decrement");
24851c0b2f7Stbbdev }
24951c0b2f7Stbbdev {
25051c0b2f7Stbbdev // put messages to the input of the limiter_node until it rejects.
25151c0b2f7Stbbdev while( std::get<LIMITER_OUTPUT>(out).try_put(lcnt) ) {
25251c0b2f7Stbbdev emit_sum += lcnt;
25351c0b2f7Stbbdev ++emit_count;
25451c0b2f7Stbbdev }
25551c0b2f7Stbbdev }
25651c0b2f7Stbbdev }
25751c0b2f7Stbbdev };
25851c0b2f7Stbbdev
25951c0b2f7Stbbdev struct fn_body {
operator ()fn_body26051c0b2f7Stbbdev int operator()(const int &in) {
26151c0b2f7Stbbdev receive_sum += in;
26251c0b2f7Stbbdev ++receive_count;
26351c0b2f7Stbbdev return in;
26451c0b2f7Stbbdev }
26551c0b2f7Stbbdev };
26651c0b2f7Stbbdev
26751c0b2f7Stbbdev // +------------+
26851c0b2f7Stbbdev // +---------+ | v
26951c0b2f7Stbbdev // | mf_node |0---+ +----------+ +----------+
27051c0b2f7Stbbdev // +->| |1---------->| lim_node |--------->| fn_node |--+
27151c0b2f7Stbbdev // | +---------+ +----------+ +----------+ |
27251c0b2f7Stbbdev // | |
27351c0b2f7Stbbdev // | |
27451c0b2f7Stbbdev // +-------------------------------------------------------------+
27551c0b2f7Stbbdev //
27651c0b2f7Stbbdev void
test_multifunction_to_limiter(int _max,int _nparallel)27751c0b2f7Stbbdev test_multifunction_to_limiter(int _max, int _nparallel) {
27851c0b2f7Stbbdev tbb::flow::graph g;
27951c0b2f7Stbbdev emit_count = 0;
28051c0b2f7Stbbdev emit_sum = 0;
28151c0b2f7Stbbdev receive_count = 0;
28251c0b2f7Stbbdev receive_sum = 0;
28351c0b2f7Stbbdev std::atomic<int> local_cnt;
28451c0b2f7Stbbdev local_cnt = 0;
28551c0b2f7Stbbdev mfnode_type mf_node(g, tbb::flow::unlimited, mfnode_body(_max, local_cnt));
28651c0b2f7Stbbdev tbb::flow::function_node<int, int> fn_node(g, tbb::flow::unlimited, fn_body());
28751c0b2f7Stbbdev tbb::flow::limiter_node<int> lim_node(g, _nparallel);
28851c0b2f7Stbbdev tbb::flow::make_edge(tbb::flow::output_port<LIMITER_OUTPUT>(mf_node), lim_node);
28949e08aacStbbdev tbb::flow::make_edge(tbb::flow::output_port<DECREMENT_OUTPUT>(mf_node), lim_node.decrementer());
29051c0b2f7Stbbdev tbb::flow::make_edge(lim_node, fn_node);
29151c0b2f7Stbbdev tbb::flow::make_edge(fn_node, mf_node);
29251c0b2f7Stbbdev
29351c0b2f7Stbbdev mf_node.try_put(1);
29451c0b2f7Stbbdev g.wait_for_all();
29551c0b2f7Stbbdev CHECK_MESSAGE( (emit_count == receive_count), "counts do not match");
29651c0b2f7Stbbdev CHECK_MESSAGE( (emit_sum == receive_sum), "sums do not match");
29751c0b2f7Stbbdev
29851c0b2f7Stbbdev // reset, test again
29951c0b2f7Stbbdev g.reset();
30051c0b2f7Stbbdev emit_count = 0;
30151c0b2f7Stbbdev emit_sum = 0;
30251c0b2f7Stbbdev receive_count = 0;
30351c0b2f7Stbbdev receive_sum = 0;
3045e91b2c0SVladislav Shchapov local_cnt = 0;
30551c0b2f7Stbbdev mf_node.try_put(1);
30651c0b2f7Stbbdev g.wait_for_all();
30751c0b2f7Stbbdev CHECK_MESSAGE( (emit_count == receive_count), "counts do not match");
30851c0b2f7Stbbdev CHECK_MESSAGE( (emit_sum == receive_sum), "sums do not match");
30951c0b2f7Stbbdev }
31051c0b2f7Stbbdev
31151c0b2f7Stbbdev
31251c0b2f7Stbbdev void
test_continue_msg_reception()31351c0b2f7Stbbdev test_continue_msg_reception() {
31451c0b2f7Stbbdev tbb::flow::graph g;
31551c0b2f7Stbbdev tbb::flow::limiter_node<int> ln(g,2);
31651c0b2f7Stbbdev tbb::flow::queue_node<int> qn(g);
31751c0b2f7Stbbdev tbb::flow::make_edge(ln, qn);
31849e08aacStbbdev ln.decrementer().try_put(tbb::flow::continue_msg());
31951c0b2f7Stbbdev ln.try_put(42);
32051c0b2f7Stbbdev g.wait_for_all();
32151c0b2f7Stbbdev int outint;
32251c0b2f7Stbbdev CHECK_MESSAGE( (qn.try_get(outint) && outint == 42), "initial put to decrement stops node");
32351c0b2f7Stbbdev }
32451c0b2f7Stbbdev
32551c0b2f7Stbbdev
32651c0b2f7Stbbdev //
32751c0b2f7Stbbdev // This test ascertains that if a message is not successfully put
32851c0b2f7Stbbdev // to a successor, the message is not dropped but released.
32951c0b2f7Stbbdev //
33051c0b2f7Stbbdev
test_reserve_release_messages()33151c0b2f7Stbbdev void test_reserve_release_messages() {
33251c0b2f7Stbbdev using namespace tbb::flow;
33351c0b2f7Stbbdev graph g;
33451c0b2f7Stbbdev
33551c0b2f7Stbbdev //making two queue_nodes: one broadcast_node and one limiter_node
33651c0b2f7Stbbdev queue_node<int> input_queue(g);
33751c0b2f7Stbbdev queue_node<int> output_queue(g);
33851c0b2f7Stbbdev broadcast_node<int> broad(g);
33951c0b2f7Stbbdev limiter_node<int, int> limit(g,2); //threshold of 2
34051c0b2f7Stbbdev
34151c0b2f7Stbbdev //edges
34251c0b2f7Stbbdev make_edge(input_queue, limit);
34351c0b2f7Stbbdev make_edge(limit, output_queue);
34449e08aacStbbdev make_edge(broad,limit.decrementer());
34551c0b2f7Stbbdev
34651c0b2f7Stbbdev int list[4] = {19, 33, 72, 98}; //list to be put to the input queue
34751c0b2f7Stbbdev
34851c0b2f7Stbbdev input_queue.try_put(list[0]); // succeeds
34951c0b2f7Stbbdev input_queue.try_put(list[1]); // succeeds
35051c0b2f7Stbbdev input_queue.try_put(list[2]); // fails, stored in upstream buffer
35151c0b2f7Stbbdev g.wait_for_all();
35251c0b2f7Stbbdev
35351c0b2f7Stbbdev remove_edge(limit, output_queue); //remove successor
35451c0b2f7Stbbdev
35551c0b2f7Stbbdev //sending message to the decrement port of the limiter
35651c0b2f7Stbbdev broad.try_put(1); //failed message retrieved.
35751c0b2f7Stbbdev g.wait_for_all();
35851c0b2f7Stbbdev
3590815661eSIlya Mishin tbb::flow::make_edge(limit, output_queue); //putting the successor back
36051c0b2f7Stbbdev
36151c0b2f7Stbbdev broad.try_put(1); //drop the count
36251c0b2f7Stbbdev
36351c0b2f7Stbbdev input_queue.try_put(list[3]); //success
36451c0b2f7Stbbdev g.wait_for_all();
36551c0b2f7Stbbdev
36651c0b2f7Stbbdev int var=0;
36751c0b2f7Stbbdev
36851c0b2f7Stbbdev for (int i=0; i<4; i++) {
36951c0b2f7Stbbdev output_queue.try_get(var);
37051c0b2f7Stbbdev CHECK_MESSAGE( (var==list[i]), "some data dropped, input does not match output");
37151c0b2f7Stbbdev g.wait_for_all();
37251c0b2f7Stbbdev }
37351c0b2f7Stbbdev }
37451c0b2f7Stbbdev
test_decrementer()37551c0b2f7Stbbdev void test_decrementer() {
37651c0b2f7Stbbdev const int threshold = 5;
37751c0b2f7Stbbdev tbb::flow::graph g;
37851c0b2f7Stbbdev tbb::flow::limiter_node<int, int> limit(g, threshold);
37951c0b2f7Stbbdev tbb::flow::queue_node<int> queue(g);
38051c0b2f7Stbbdev make_edge(limit, queue);
38151c0b2f7Stbbdev int m = 0;
38251c0b2f7Stbbdev CHECK_MESSAGE( ( limit.try_put( m++ )), "Newly constructed limiter node does not accept message." );
38349e08aacStbbdev CHECK_MESSAGE( limit.decrementer().try_put( -threshold ), // close limiter's gate
38451c0b2f7Stbbdev "Limiter node decrementer's port does not accept message." );
38551c0b2f7Stbbdev CHECK_MESSAGE( ( !limit.try_put( m++ )), "Closed limiter node's accepts message." );
38649e08aacStbbdev CHECK_MESSAGE( limit.decrementer().try_put( threshold + 5 ), // open limiter's gate
38751c0b2f7Stbbdev "Limiter node decrementer's port does not accept message." );
38851c0b2f7Stbbdev for( int i = 0; i < threshold; ++i )
38951c0b2f7Stbbdev CHECK_MESSAGE( ( limit.try_put( m++ )), "Limiter node does not accept message while open." );
39051c0b2f7Stbbdev CHECK_MESSAGE( ( !limit.try_put( m )), "Limiter node's gate is not closed." );
39151c0b2f7Stbbdev g.wait_for_all();
39251c0b2f7Stbbdev int expected[] = {0, 2, 3, 4, 5, 6};
39351c0b2f7Stbbdev int actual = -1; m = 0;
39451c0b2f7Stbbdev while( queue.try_get(actual) )
39551c0b2f7Stbbdev CHECK_MESSAGE( actual == expected[m++], "" );
39651c0b2f7Stbbdev CHECK_MESSAGE( ( sizeof(expected) / sizeof(expected[0]) == m), "Not all messages have been processed." );
39751c0b2f7Stbbdev g.wait_for_all();
39851c0b2f7Stbbdev
39951c0b2f7Stbbdev const size_t threshold2 = size_t(-1);
40051c0b2f7Stbbdev tbb::flow::limiter_node<int, long long> limit2(g, threshold2);
40151c0b2f7Stbbdev make_edge(limit2, queue);
40251c0b2f7Stbbdev CHECK_MESSAGE( ( limit2.try_put( 1 )), "Newly constructed limiter node does not accept message." );
40351c0b2f7Stbbdev long long decrement_value = (long long)( size_t(-1)/2 );
40449e08aacStbbdev CHECK_MESSAGE( limit2.decrementer().try_put( -decrement_value ),
40551c0b2f7Stbbdev "Limiter node decrementer's port does not accept message" );
40651c0b2f7Stbbdev CHECK_MESSAGE( ( limit2.try_put( 2 )), "Limiter's gate should not be closed yet." );
40749e08aacStbbdev CHECK_MESSAGE( limit2.decrementer().try_put( -decrement_value ),
40851c0b2f7Stbbdev "Limiter node decrementer's port does not accept message" );
40951c0b2f7Stbbdev CHECK_MESSAGE( ( !limit2.try_put( 3 )), "Overflow happened for internal counter." );
41051c0b2f7Stbbdev int expected2[] = {1, 2};
41151c0b2f7Stbbdev actual = -1; m = 0;
41251c0b2f7Stbbdev while( queue.try_get(actual) )
41351c0b2f7Stbbdev CHECK_MESSAGE( actual == expected2[m++], "" );
41451c0b2f7Stbbdev CHECK_MESSAGE( ( sizeof(expected2) / sizeof(expected2[0]) == m), "Not all messages have been processed." );
41551c0b2f7Stbbdev g.wait_for_all();
41625e2b1ddSIlya Mishin
41725e2b1ddSIlya Mishin const size_t threshold3 = 10;
41825e2b1ddSIlya Mishin tbb::flow::limiter_node<int, long long> limit3(g, threshold3);
41925e2b1ddSIlya Mishin make_edge(limit3, queue);
42025e2b1ddSIlya Mishin long long decrement_value3 = 3;
42125e2b1ddSIlya Mishin CHECK_MESSAGE( limit3.decrementer().try_put( -decrement_value3 ),
42225e2b1ddSIlya Mishin "Limiter node decrementer's port does not accept message" );
42325e2b1ddSIlya Mishin
42425e2b1ddSIlya Mishin m = 0;
42525e2b1ddSIlya Mishin while( limit3.try_put( m ) ){ m++; };
42625e2b1ddSIlya Mishin CHECK_MESSAGE( m == threshold3 - decrement_value3, "Not all messages have been accepted." );
42725e2b1ddSIlya Mishin
42825e2b1ddSIlya Mishin actual = -1; m = 0;
42925e2b1ddSIlya Mishin while( queue.try_get(actual) ){
43025e2b1ddSIlya Mishin CHECK_MESSAGE( actual == m++, "Not all messages have been processed." );
43125e2b1ddSIlya Mishin }
43225e2b1ddSIlya Mishin
43325e2b1ddSIlya Mishin g.wait_for_all();
43425e2b1ddSIlya Mishin CHECK_MESSAGE( m == threshold3 - decrement_value3, "Not all messages have been processed." );
43551c0b2f7Stbbdev }
43651c0b2f7Stbbdev
test_try_put_without_successors()43751c0b2f7Stbbdev void test_try_put_without_successors() {
43851c0b2f7Stbbdev tbb::flow::graph g;
43955f9b178SIvan Kochin int try_put_num{3};
44051c0b2f7Stbbdev tbb::flow::buffer_node<int> bn(g);
44151c0b2f7Stbbdev tbb::flow::limiter_node<int> ln(g, try_put_num);
4427567de93SIlya Mishin
44351c0b2f7Stbbdev tbb::flow::make_edge(bn, ln);
4447567de93SIlya Mishin
44555f9b178SIvan Kochin int i = 1;
44651c0b2f7Stbbdev for (; i <= try_put_num; i++)
44751c0b2f7Stbbdev bn.try_put(i);
44851c0b2f7Stbbdev
44955f9b178SIvan Kochin std::atomic<int> counter{0};
45051c0b2f7Stbbdev tbb::flow::function_node<int, int> fn(g, tbb::flow::unlimited,
45151c0b2f7Stbbdev [&](int input) {
45251c0b2f7Stbbdev counter += input;
45351c0b2f7Stbbdev return int{};
45451c0b2f7Stbbdev }
45551c0b2f7Stbbdev );
4567567de93SIlya Mishin
4570815661eSIlya Mishin tbb::flow::make_edge(ln, fn);
4587567de93SIlya Mishin
45951c0b2f7Stbbdev g.wait_for_all();
46051c0b2f7Stbbdev CHECK((counter == i * try_put_num / 2));
46151c0b2f7Stbbdev
46251c0b2f7Stbbdev // Check the lost message
46351c0b2f7Stbbdev tbb::flow::remove_edge(bn, ln);
46449e08aacStbbdev ln.decrementer().try_put(tbb::flow::continue_msg());
46551c0b2f7Stbbdev bn.try_put(try_put_num + 1);
46651c0b2f7Stbbdev g.wait_for_all();
46751c0b2f7Stbbdev CHECK((counter == i * try_put_num / 2));
46851c0b2f7Stbbdev
46951c0b2f7Stbbdev }
47051c0b2f7Stbbdev
47151c0b2f7Stbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
47251c0b2f7Stbbdev #include <array>
47351c0b2f7Stbbdev #include <vector>
test_follows_and_precedes_api()47451c0b2f7Stbbdev void test_follows_and_precedes_api() {
47551c0b2f7Stbbdev using msg_t = tbb::flow::continue_msg;
47651c0b2f7Stbbdev
47751c0b2f7Stbbdev std::array<msg_t, 3> messages_for_follows= { {msg_t(), msg_t(), msg_t()} };
47851c0b2f7Stbbdev std::vector<msg_t> messages_for_precedes = {msg_t()};
47951c0b2f7Stbbdev
48051c0b2f7Stbbdev follows_and_precedes_testing::test_follows
48151c0b2f7Stbbdev <msg_t, tbb::flow::limiter_node<msg_t, msg_t>>(messages_for_follows, 1000);
48251c0b2f7Stbbdev follows_and_precedes_testing::test_precedes
48351c0b2f7Stbbdev <msg_t, tbb::flow::limiter_node<msg_t, msg_t>>(messages_for_precedes, 1000);
48451c0b2f7Stbbdev
48551c0b2f7Stbbdev }
48651c0b2f7Stbbdev #endif
48751c0b2f7Stbbdev
48851c0b2f7Stbbdev #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
test_deduction_guides()48951c0b2f7Stbbdev void test_deduction_guides() {
49051c0b2f7Stbbdev using namespace tbb::flow;
49151c0b2f7Stbbdev
49251c0b2f7Stbbdev graph g;
49351c0b2f7Stbbdev broadcast_node<int> br(g);
49451c0b2f7Stbbdev limiter_node<int> l0(g, 100);
49551c0b2f7Stbbdev
49651c0b2f7Stbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
49751c0b2f7Stbbdev limiter_node l1(follows(br), 100);
49851c0b2f7Stbbdev static_assert(std::is_same_v<decltype(l1), limiter_node<int>>);
49951c0b2f7Stbbdev
50051c0b2f7Stbbdev limiter_node l2(precedes(br), 100);
50151c0b2f7Stbbdev static_assert(std::is_same_v<decltype(l2), limiter_node<int>>);
50251c0b2f7Stbbdev #endif
50351c0b2f7Stbbdev
50451c0b2f7Stbbdev limiter_node l3(l0);
50551c0b2f7Stbbdev static_assert(std::is_same_v<decltype(l3), limiter_node<int>>);
50651c0b2f7Stbbdev }
50751c0b2f7Stbbdev #endif
50851c0b2f7Stbbdev
test_decrement_while_try_put_task()5090815661eSIlya Mishin void test_decrement_while_try_put_task() {
5100815661eSIlya Mishin constexpr int threshold = 50000;
5110815661eSIlya Mishin
5120815661eSIlya Mishin tbb::flow::graph graph{};
5130815661eSIlya Mishin std::atomic<int> processed;
5140815661eSIlya Mishin tbb::flow::input_node<int> input{ graph, [&](tbb::flow_control & fc) -> int {
5150815661eSIlya Mishin static int i = {};
5160815661eSIlya Mishin if (i++ >= threshold) fc.stop();
5170815661eSIlya Mishin return i;
5180815661eSIlya Mishin }};
5190815661eSIlya Mishin tbb::flow::limiter_node<int, int> blockingNode{ graph, 1 };
5200815661eSIlya Mishin tbb::flow::multifunction_node<int, std::tuple<int>> processing{ graph, tbb::flow::serial,
5210815661eSIlya Mishin [&](const int & value, typename decltype(processing)::output_ports_type & out) {
5220815661eSIlya Mishin if (value != threshold)
5230815661eSIlya Mishin std::get<0>(out).try_put(1);
5240815661eSIlya Mishin processed.store(value);
5250815661eSIlya Mishin }};
5260815661eSIlya Mishin
5270815661eSIlya Mishin tbb::flow::make_edge(input, blockingNode);
5280815661eSIlya Mishin tbb::flow::make_edge(blockingNode, processing);
5290815661eSIlya Mishin tbb::flow::make_edge(processing, blockingNode.decrementer());
5300815661eSIlya Mishin
5310815661eSIlya Mishin input.activate();
5320815661eSIlya Mishin
5330815661eSIlya Mishin graph.wait_for_all();
5340815661eSIlya Mishin CHECK_MESSAGE(processed.load() == threshold, "decrementer terminate flow graph work");
5350815661eSIlya Mishin }
5360815661eSIlya Mishin
5370815661eSIlya Mishin
53851c0b2f7Stbbdev //! Test puts on limiter_node with decrements and varying parallelism levels
53951c0b2f7Stbbdev //! \brief \ref error_guessing
54051c0b2f7Stbbdev TEST_CASE("Serial and parallel tests") {
54151c0b2f7Stbbdev for (unsigned i = 1; i <= 2 * utils::MaxThread; ++i) {
54251c0b2f7Stbbdev tbb::task_arena arena(i);
54351c0b2f7Stbbdev arena.execute(
__anon70523bc50402() 54451c0b2f7Stbbdev [i]() {
54551c0b2f7Stbbdev test_serial<int>();
54651c0b2f7Stbbdev test_parallel<int>(i);
54751c0b2f7Stbbdev }
54851c0b2f7Stbbdev );
54951c0b2f7Stbbdev }
55051c0b2f7Stbbdev }
55151c0b2f7Stbbdev
55251c0b2f7Stbbdev //! Test initial put of continue_msg on decrementer port does not stop message flow
55351c0b2f7Stbbdev //! \brief \ref error_guessing
55451c0b2f7Stbbdev TEST_CASE("Test continue_msg reception") {
55551c0b2f7Stbbdev test_continue_msg_reception();
55651c0b2f7Stbbdev }
55751c0b2f7Stbbdev
5580815661eSIlya Mishin //! Test put message on decrementer port does not stop message flow
5590815661eSIlya Mishin //! \brief \ref error_guessing
5600815661eSIlya Mishin TEST_CASE("Test try_put to decrementer while try_put to limiter_node") {
5610815661eSIlya Mishin test_decrement_while_try_put_task();
5620815661eSIlya Mishin }
5630815661eSIlya Mishin
56451c0b2f7Stbbdev //! Test multifunction_node connected to limiter_node
56551c0b2f7Stbbdev //! \brief \ref error_guessing
56651c0b2f7Stbbdev TEST_CASE("Multifunction connected to limiter") {
56751c0b2f7Stbbdev test_multifunction_to_limiter(30,3);
56851c0b2f7Stbbdev test_multifunction_to_limiter(300,13);
56951c0b2f7Stbbdev test_multifunction_to_limiter(3000,1);
57051c0b2f7Stbbdev }
57151c0b2f7Stbbdev
57251c0b2f7Stbbdev //! Test message release if successor doesn't accept
57351c0b2f7Stbbdev //! \brief \ref requirement
57451c0b2f7Stbbdev TEST_CASE("Message is released if successor does not accept") {
57551c0b2f7Stbbdev test_reserve_release_messages();
57651c0b2f7Stbbdev }
57751c0b2f7Stbbdev
57851c0b2f7Stbbdev //! Test decrementer
57951c0b2f7Stbbdev //! \brief \ref requirement \ref error_guessing
58051c0b2f7Stbbdev TEST_CASE("Decrementer") {
58151c0b2f7Stbbdev test_decrementer();
58251c0b2f7Stbbdev }
58351c0b2f7Stbbdev
58451c0b2f7Stbbdev //! Test try_put() without successor
58551c0b2f7Stbbdev //! \brief \ref error_guessing
58651c0b2f7Stbbdev TEST_CASE("Test try_put() without successors") {
58751c0b2f7Stbbdev test_try_put_without_successors();
58851c0b2f7Stbbdev }
58951c0b2f7Stbbdev
59051c0b2f7Stbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
59151c0b2f7Stbbdev //! Test follows and precedes API
59251c0b2f7Stbbdev //! \brief \ref error_guessing
59351c0b2f7Stbbdev TEST_CASE( "Support for follows and precedes API" ) {
59451c0b2f7Stbbdev test_follows_and_precedes_api();
59551c0b2f7Stbbdev }
59651c0b2f7Stbbdev #endif
59751c0b2f7Stbbdev
59851c0b2f7Stbbdev #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
59951c0b2f7Stbbdev //! Test deduction guides
60051c0b2f7Stbbdev //! \brief \ref requirement
60151c0b2f7Stbbdev TEST_CASE( "Deduction guides" ) {
60251c0b2f7Stbbdev test_deduction_guides();
60351c0b2f7Stbbdev }
60451c0b2f7Stbbdev #endif
6056edb5c3aSVladimir Serov
6066edb5c3aSVladimir Serov struct TestLargeStruct {
6076edb5c3aSVladimir Serov char bytes[512]{ 0 };
6086edb5c3aSVladimir Serov };
6096edb5c3aSVladimir Serov
610534428d0SVladimir Serov //! Test correct node deallocation while using small_object_pool.
611534428d0SVladimir Serov //! (see https://github.com/oneapi-src/oneTBB/issues/639)
612534428d0SVladimir Serov //! \brief \ref error_guessing
613534428d0SVladimir Serov TEST_CASE("Test correct node deallocation while using small_object_pool") {
6146edb5c3aSVladimir Serov tbb::flow::graph graph;
615534428d0SVladimir Serov tbb::flow::queue_node<TestLargeStruct> input_node( graph );
616534428d0SVladimir Serov tbb::flow::function_node<TestLargeStruct> func( graph, tbb::flow::serial,
__anon70523bc50502(const TestLargeStruct& input) 617534428d0SVladimir Serov [](const TestLargeStruct& input) { return input; } );
6186edb5c3aSVladimir Serov
6196edb5c3aSVladimir Serov tbb::flow::make_edge( input_node, func );
6206edb5c3aSVladimir Serov CHECK( input_node.try_put( TestLargeStruct{} ) );
6216edb5c3aSVladimir Serov graph.wait_for_all();
6226edb5c3aSVladimir Serov
6235fc0a5f6SAlex tbb::task_scheduler_handle handle{ tbb::attach{} };
624d1667d51SVladimir Serov tbb::finalize( handle, std::nothrow );
6256edb5c3aSVladimir Serov }
626