151c0b2f7Stbbdev /*
2b15aabb3Stbbdev Copyright (c) 2005-2021 Intel Corporation
351c0b2f7Stbbdev
451c0b2f7Stbbdev Licensed under the Apache License, Version 2.0 (the "License");
551c0b2f7Stbbdev you may not use this file except in compliance with the License.
651c0b2f7Stbbdev You may obtain a copy of the License at
751c0b2f7Stbbdev
851c0b2f7Stbbdev http://www.apache.org/licenses/LICENSE-2.0
951c0b2f7Stbbdev
1051c0b2f7Stbbdev Unless required by applicable law or agreed to in writing, software
1151c0b2f7Stbbdev distributed under the License is distributed on an "AS IS" BASIS,
1251c0b2f7Stbbdev WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1351c0b2f7Stbbdev See the License for the specific language governing permissions and
1451c0b2f7Stbbdev limitations under the License.
1551c0b2f7Stbbdev */
1651c0b2f7Stbbdev
1751c0b2f7Stbbdev // TODO: Add overlapping put / receive tests
1851c0b2f7Stbbdev
1951c0b2f7Stbbdev #include "common/config.h"
2051c0b2f7Stbbdev
2151c0b2f7Stbbdev #include "tbb/flow_graph.h"
22*552f342bSPavel #include "tbb/global_control.h"
2351c0b2f7Stbbdev
2451c0b2f7Stbbdev #include "common/test.h"
2551c0b2f7Stbbdev #include "common/utils.h"
2651c0b2f7Stbbdev #include "common/utils_assert.h"
2751c0b2f7Stbbdev #include "common/checktype.h"
2851c0b2f7Stbbdev #include "common/graph_utils.h"
2951c0b2f7Stbbdev #include "common/test_follows_and_precedes_api.h"
3051c0b2f7Stbbdev
3151c0b2f7Stbbdev #include <cstdio>
3251c0b2f7Stbbdev
3351c0b2f7Stbbdev
3451c0b2f7Stbbdev //! \file test_queue_node.cpp
3551c0b2f7Stbbdev //! \brief Test for [flow_graph.queue_node] specification
3651c0b2f7Stbbdev
3751c0b2f7Stbbdev
3851c0b2f7Stbbdev #define N 1000
3951c0b2f7Stbbdev #define C 10
4051c0b2f7Stbbdev
4151c0b2f7Stbbdev template< typename T >
spin_try_get(tbb::flow::queue_node<T> & q,T & value)4251c0b2f7Stbbdev void spin_try_get( tbb::flow::queue_node<T> &q, T &value ) {
43b15aabb3Stbbdev int count = 0;
44b15aabb3Stbbdev while ( q.try_get(value) != true ) {
45b15aabb3Stbbdev if (count < 1000000) {
46b15aabb3Stbbdev ++count;
47b15aabb3Stbbdev }
48b15aabb3Stbbdev if (count == 1000000) {
49b15aabb3Stbbdev // Perhaps, we observe the missed wakeup. Enqueue a task to wake up threads.
50b15aabb3Stbbdev tbb::task_arena a(tbb::task_arena::attach{});
51b15aabb3Stbbdev a.enqueue([]{});
52b15aabb3Stbbdev ++count;
53b15aabb3Stbbdev }
54b15aabb3Stbbdev }
5551c0b2f7Stbbdev }
5651c0b2f7Stbbdev
5751c0b2f7Stbbdev template< typename T >
check_item(T * next_value,T & value)5851c0b2f7Stbbdev void check_item( T* next_value, T &value ) {
5951c0b2f7Stbbdev int tid = value / N;
6051c0b2f7Stbbdev int offset = value % N;
6151c0b2f7Stbbdev CHECK_MESSAGE( next_value[tid] == T(offset), "" );
6251c0b2f7Stbbdev ++next_value[tid];
6351c0b2f7Stbbdev }
6451c0b2f7Stbbdev
6551c0b2f7Stbbdev template< typename T >
6651c0b2f7Stbbdev struct parallel_puts : utils::NoAssign {
6751c0b2f7Stbbdev
6851c0b2f7Stbbdev tbb::flow::queue_node<T> &my_q;
6951c0b2f7Stbbdev
parallel_putsparallel_puts7051c0b2f7Stbbdev parallel_puts( tbb::flow::queue_node<T> &q ) : my_q(q) {}
7151c0b2f7Stbbdev
operator ()parallel_puts7251c0b2f7Stbbdev void operator()(int i) const {
7351c0b2f7Stbbdev for (int j = 0; j < N; ++j) {
7451c0b2f7Stbbdev bool msg = my_q.try_put( T(N*i + j) );
7551c0b2f7Stbbdev CHECK_MESSAGE( msg == true, "" );
7651c0b2f7Stbbdev }
7751c0b2f7Stbbdev }
7851c0b2f7Stbbdev
7951c0b2f7Stbbdev };
8051c0b2f7Stbbdev
8151c0b2f7Stbbdev template< typename T >
8251c0b2f7Stbbdev struct touches {
8351c0b2f7Stbbdev
8451c0b2f7Stbbdev bool **my_touches;
8551c0b2f7Stbbdev T **my_last_touch;
8651c0b2f7Stbbdev int my_num_threads;
8751c0b2f7Stbbdev
touchestouches8851c0b2f7Stbbdev touches( int num_threads ) : my_num_threads(num_threads) {
8951c0b2f7Stbbdev my_last_touch = new T* [my_num_threads];
9051c0b2f7Stbbdev my_touches = new bool* [my_num_threads];
9151c0b2f7Stbbdev for ( int p = 0; p < my_num_threads; ++p) {
9251c0b2f7Stbbdev my_last_touch[p] = new T[my_num_threads];
9351c0b2f7Stbbdev for ( int p2 = 0; p2 < my_num_threads; ++p2)
9451c0b2f7Stbbdev my_last_touch[p][p2] = -1;
9551c0b2f7Stbbdev
9651c0b2f7Stbbdev my_touches[p] = new bool[N*my_num_threads];
9751c0b2f7Stbbdev for ( int n = 0; n < N*my_num_threads; ++n)
9851c0b2f7Stbbdev my_touches[p][n] = false;
9951c0b2f7Stbbdev }
10051c0b2f7Stbbdev }
10151c0b2f7Stbbdev
~touchestouches10251c0b2f7Stbbdev ~touches() {
10351c0b2f7Stbbdev for ( int p = 0; p < my_num_threads; ++p) {
10451c0b2f7Stbbdev delete [] my_touches[p];
10551c0b2f7Stbbdev delete [] my_last_touch[p];
10651c0b2f7Stbbdev }
10751c0b2f7Stbbdev delete [] my_touches;
10851c0b2f7Stbbdev delete [] my_last_touch;
10951c0b2f7Stbbdev }
11051c0b2f7Stbbdev
checktouches11151c0b2f7Stbbdev bool check( int tid, T v ) {
11251c0b2f7Stbbdev int v_tid = v / N;
11351c0b2f7Stbbdev if ( my_touches[tid][v] != false ) {
11451c0b2f7Stbbdev printf("Error: value seen twice by local thread\n");
11551c0b2f7Stbbdev return false;
11651c0b2f7Stbbdev }
11751c0b2f7Stbbdev if ( v <= my_last_touch[tid][v_tid] ) {
11851c0b2f7Stbbdev printf("Error: value seen in wrong order by local thread\n");
11951c0b2f7Stbbdev return false;
12051c0b2f7Stbbdev }
12151c0b2f7Stbbdev my_last_touch[tid][v_tid] = v;
12251c0b2f7Stbbdev my_touches[tid][v] = true;
12351c0b2f7Stbbdev return true;
12451c0b2f7Stbbdev }
12551c0b2f7Stbbdev
validate_touchestouches12651c0b2f7Stbbdev bool validate_touches() {
12751c0b2f7Stbbdev bool *all_touches = new bool[N*my_num_threads];
12851c0b2f7Stbbdev for ( int n = 0; n < N*my_num_threads; ++n)
12951c0b2f7Stbbdev all_touches[n] = false;
13051c0b2f7Stbbdev
13151c0b2f7Stbbdev for ( int p = 0; p < my_num_threads; ++p) {
13251c0b2f7Stbbdev for ( int n = 0; n < N*my_num_threads; ++n) {
13351c0b2f7Stbbdev if ( my_touches[p][n] == true ) {
13451c0b2f7Stbbdev CHECK_MESSAGE( ( all_touches[n] == false), "value see by more than one thread\n" );
13551c0b2f7Stbbdev all_touches[n] = true;
13651c0b2f7Stbbdev }
13751c0b2f7Stbbdev }
13851c0b2f7Stbbdev }
13951c0b2f7Stbbdev for ( int n = 0; n < N*my_num_threads; ++n) {
14051c0b2f7Stbbdev if ( !all_touches[n] )
14151c0b2f7Stbbdev printf("No touch at %d, my_num_threads = %d\n", n, my_num_threads);
14251c0b2f7Stbbdev //CHECK_MESSAGE( ( all_touches[n] == true), "value not seen by any thread\n" );
14351c0b2f7Stbbdev }
14451c0b2f7Stbbdev delete [] all_touches;
14551c0b2f7Stbbdev return true;
14651c0b2f7Stbbdev }
14751c0b2f7Stbbdev
14851c0b2f7Stbbdev };
14951c0b2f7Stbbdev
15051c0b2f7Stbbdev template< typename T >
15151c0b2f7Stbbdev struct parallel_gets : utils::NoAssign {
15251c0b2f7Stbbdev
15351c0b2f7Stbbdev tbb::flow::queue_node<T> &my_q;
15451c0b2f7Stbbdev touches<T> &my_touches;
15551c0b2f7Stbbdev
parallel_getsparallel_gets15651c0b2f7Stbbdev parallel_gets( tbb::flow::queue_node<T> &q, touches<T> &t) : my_q(q), my_touches(t) {}
15751c0b2f7Stbbdev
operator ()parallel_gets15851c0b2f7Stbbdev void operator()(int tid) const {
15951c0b2f7Stbbdev for (int j = 0; j < N; ++j) {
16051c0b2f7Stbbdev T v;
16151c0b2f7Stbbdev spin_try_get( my_q, v );
16251c0b2f7Stbbdev my_touches.check( tid, v );
16351c0b2f7Stbbdev }
16451c0b2f7Stbbdev }
16551c0b2f7Stbbdev
16651c0b2f7Stbbdev };
16751c0b2f7Stbbdev
16851c0b2f7Stbbdev template< typename T >
16951c0b2f7Stbbdev struct parallel_put_get : utils::NoAssign {
17051c0b2f7Stbbdev
17151c0b2f7Stbbdev tbb::flow::queue_node<T> &my_q;
17251c0b2f7Stbbdev touches<T> &my_touches;
17351c0b2f7Stbbdev
parallel_put_getparallel_put_get17451c0b2f7Stbbdev parallel_put_get( tbb::flow::queue_node<T> &q, touches<T> &t ) : my_q(q), my_touches(t) {}
17551c0b2f7Stbbdev
operator ()parallel_put_get17651c0b2f7Stbbdev void operator()(int tid) const {
17751c0b2f7Stbbdev
17851c0b2f7Stbbdev for ( int i = 0; i < N; i+=C ) {
17951c0b2f7Stbbdev int j_end = ( N < i + C ) ? N : i + C;
18051c0b2f7Stbbdev // dump about C values into the Q
18151c0b2f7Stbbdev for ( int j = i; j < j_end; ++j ) {
18251c0b2f7Stbbdev CHECK_MESSAGE( my_q.try_put( T (N*tid + j ) ) == true, "" );
18351c0b2f7Stbbdev }
18451c0b2f7Stbbdev // receiver about C values from the Q
18551c0b2f7Stbbdev for ( int j = i; j < j_end; ++j ) {
18651c0b2f7Stbbdev T v;
18751c0b2f7Stbbdev spin_try_get( my_q, v );
18851c0b2f7Stbbdev my_touches.check( tid, v );
18951c0b2f7Stbbdev }
19051c0b2f7Stbbdev }
19151c0b2f7Stbbdev }
19251c0b2f7Stbbdev
19351c0b2f7Stbbdev };
19451c0b2f7Stbbdev
19551c0b2f7Stbbdev //
19651c0b2f7Stbbdev // Tests
19751c0b2f7Stbbdev //
19851c0b2f7Stbbdev // Item can be reserved, released, consumed ( single serial receiver )
19951c0b2f7Stbbdev //
20051c0b2f7Stbbdev template< typename T >
test_reservation()20151c0b2f7Stbbdev int test_reservation() {
20251c0b2f7Stbbdev tbb::flow::graph g;
20351c0b2f7Stbbdev T bogus_value(-1);
20451c0b2f7Stbbdev
20551c0b2f7Stbbdev // Simple tests
20651c0b2f7Stbbdev tbb::flow::queue_node<T> q(g);
20751c0b2f7Stbbdev
20851c0b2f7Stbbdev q.try_put(T(1));
20951c0b2f7Stbbdev q.try_put(T(2));
21051c0b2f7Stbbdev q.try_put(T(3));
21151c0b2f7Stbbdev
21251c0b2f7Stbbdev T v;
21351c0b2f7Stbbdev CHECK_MESSAGE( q.reserve_item(v) == true, "" );
21451c0b2f7Stbbdev CHECK_MESSAGE( v == T(1), "" );
21551c0b2f7Stbbdev CHECK_MESSAGE( q.release_reservation() == true, "" );
21651c0b2f7Stbbdev v = bogus_value;
21751c0b2f7Stbbdev g.wait_for_all();
21851c0b2f7Stbbdev CHECK_MESSAGE( q.reserve_item(v) == true, "" );
21951c0b2f7Stbbdev CHECK_MESSAGE( v == T(1), "" );
22051c0b2f7Stbbdev CHECK_MESSAGE( q.consume_reservation() == true, "" );
22151c0b2f7Stbbdev v = bogus_value;
22251c0b2f7Stbbdev g.wait_for_all();
22351c0b2f7Stbbdev
22451c0b2f7Stbbdev CHECK_MESSAGE( q.try_get(v) == true, "" );
22551c0b2f7Stbbdev CHECK_MESSAGE( v == T(2), "" );
22651c0b2f7Stbbdev v = bogus_value;
22751c0b2f7Stbbdev g.wait_for_all();
22851c0b2f7Stbbdev
22951c0b2f7Stbbdev CHECK_MESSAGE( q.reserve_item(v) == true, "" );
23051c0b2f7Stbbdev CHECK_MESSAGE( v == T(3), "" );
23151c0b2f7Stbbdev CHECK_MESSAGE( q.release_reservation() == true, "" );
23251c0b2f7Stbbdev v = bogus_value;
23351c0b2f7Stbbdev g.wait_for_all();
23451c0b2f7Stbbdev CHECK_MESSAGE( q.reserve_item(v) == true, "" );
23551c0b2f7Stbbdev CHECK_MESSAGE( v == T(3), "" );
23651c0b2f7Stbbdev CHECK_MESSAGE( q.consume_reservation() == true, "" );
23751c0b2f7Stbbdev v = bogus_value;
23851c0b2f7Stbbdev g.wait_for_all();
23951c0b2f7Stbbdev
24051c0b2f7Stbbdev return 0;
24151c0b2f7Stbbdev }
24251c0b2f7Stbbdev
24351c0b2f7Stbbdev //
24451c0b2f7Stbbdev // Tests
24551c0b2f7Stbbdev //
24651c0b2f7Stbbdev // multiple parallel senders, items in FIFO (relatively to sender) order
24751c0b2f7Stbbdev // multiple parallel senders, multiple parallel receivers, items in FIFO order (relative to sender/receiver) and all items received
24851c0b2f7Stbbdev // * overlapped puts / gets
24951c0b2f7Stbbdev // * all puts finished before any getS
25051c0b2f7Stbbdev //
25151c0b2f7Stbbdev template< typename T >
test_parallel(int num_threads)25251c0b2f7Stbbdev int test_parallel(int num_threads) {
25351c0b2f7Stbbdev tbb::flow::graph g;
25451c0b2f7Stbbdev tbb::flow::queue_node<T> q(g);
25551c0b2f7Stbbdev tbb::flow::queue_node<T> q2(g);
25651c0b2f7Stbbdev tbb::flow::queue_node<T> q3(g);
25751c0b2f7Stbbdev {
25851c0b2f7Stbbdev Checker< T > my_check;
25951c0b2f7Stbbdev T bogus_value(-1);
26051c0b2f7Stbbdev T j = bogus_value;
26151c0b2f7Stbbdev utils::NativeParallelFor( num_threads, parallel_puts<T>(q) );
26251c0b2f7Stbbdev
26351c0b2f7Stbbdev T *next_value = new T[num_threads];
26451c0b2f7Stbbdev for (int tid = 0; tid < num_threads; ++tid) next_value[tid] = T(0);
26551c0b2f7Stbbdev
26651c0b2f7Stbbdev for (int i = 0; i < num_threads * N; ++i ) {
26751c0b2f7Stbbdev spin_try_get( q, j );
26851c0b2f7Stbbdev check_item( next_value, j );
26951c0b2f7Stbbdev j = bogus_value;
27051c0b2f7Stbbdev }
27151c0b2f7Stbbdev for (int tid = 0; tid < num_threads; ++tid) {
27251c0b2f7Stbbdev CHECK_MESSAGE( next_value[tid] == T(N), "" );
27351c0b2f7Stbbdev }
27451c0b2f7Stbbdev delete[] next_value;
27551c0b2f7Stbbdev
27651c0b2f7Stbbdev j = bogus_value;
27751c0b2f7Stbbdev g.wait_for_all();
27851c0b2f7Stbbdev CHECK_MESSAGE( q.try_get( j ) == false, "" );
27951c0b2f7Stbbdev CHECK_MESSAGE( j == bogus_value, "" );
28051c0b2f7Stbbdev
28151c0b2f7Stbbdev utils::NativeParallelFor( num_threads, parallel_puts<T>(q) );
28251c0b2f7Stbbdev
28351c0b2f7Stbbdev {
28451c0b2f7Stbbdev touches< T > t( num_threads );
28551c0b2f7Stbbdev utils::NativeParallelFor( num_threads, parallel_gets<T>(q, t) );
28651c0b2f7Stbbdev g.wait_for_all();
28751c0b2f7Stbbdev CHECK_MESSAGE( t.validate_touches(), "" );
28851c0b2f7Stbbdev }
28951c0b2f7Stbbdev j = bogus_value;
29051c0b2f7Stbbdev CHECK_MESSAGE( q.try_get( j ) == false, "" );
29151c0b2f7Stbbdev CHECK_MESSAGE( j == bogus_value, "" );
29251c0b2f7Stbbdev
29351c0b2f7Stbbdev g.wait_for_all();
29451c0b2f7Stbbdev {
29551c0b2f7Stbbdev touches< T > t2( num_threads );
29651c0b2f7Stbbdev utils::NativeParallelFor( num_threads, parallel_put_get<T>(q, t2) );
29751c0b2f7Stbbdev g.wait_for_all();
29851c0b2f7Stbbdev CHECK_MESSAGE( t2.validate_touches(), "" );
29951c0b2f7Stbbdev }
30051c0b2f7Stbbdev j = bogus_value;
30151c0b2f7Stbbdev CHECK_MESSAGE( q.try_get( j ) == false, "" );
30251c0b2f7Stbbdev CHECK_MESSAGE( j == bogus_value, "" );
30351c0b2f7Stbbdev
30451c0b2f7Stbbdev tbb::flow::make_edge( q, q2 );
30551c0b2f7Stbbdev tbb::flow::make_edge( q2, q3 );
30651c0b2f7Stbbdev
30751c0b2f7Stbbdev utils::NativeParallelFor( num_threads, parallel_puts<T>(q) );
30851c0b2f7Stbbdev {
30951c0b2f7Stbbdev touches< T > t3( num_threads );
31051c0b2f7Stbbdev utils::NativeParallelFor( num_threads, parallel_gets<T>(q3, t3) );
31151c0b2f7Stbbdev g.wait_for_all();
31251c0b2f7Stbbdev CHECK_MESSAGE( t3.validate_touches(), "" );
31351c0b2f7Stbbdev }
31451c0b2f7Stbbdev j = bogus_value;
31551c0b2f7Stbbdev g.wait_for_all();
31651c0b2f7Stbbdev CHECK_MESSAGE( q.try_get( j ) == false, "" );
31751c0b2f7Stbbdev g.wait_for_all();
31851c0b2f7Stbbdev CHECK_MESSAGE( q2.try_get( j ) == false, "" );
31951c0b2f7Stbbdev g.wait_for_all();
32051c0b2f7Stbbdev CHECK_MESSAGE( q3.try_get( j ) == false, "" );
32151c0b2f7Stbbdev CHECK_MESSAGE( j == bogus_value, "" );
32251c0b2f7Stbbdev
32351c0b2f7Stbbdev // test copy constructor
32451c0b2f7Stbbdev CHECK_MESSAGE( remove_successor( q, q2 ), "" );
32551c0b2f7Stbbdev utils::NativeParallelFor( num_threads, parallel_puts<T>(q) );
32651c0b2f7Stbbdev tbb::flow::queue_node<T> q_copy(q);
32751c0b2f7Stbbdev j = bogus_value;
32851c0b2f7Stbbdev g.wait_for_all();
32951c0b2f7Stbbdev CHECK_MESSAGE( q_copy.try_get( j ) == false, "" );
33051c0b2f7Stbbdev CHECK_MESSAGE( register_successor( q, q_copy ) == true, "" );
33151c0b2f7Stbbdev {
33251c0b2f7Stbbdev touches< T > t( num_threads );
33351c0b2f7Stbbdev utils::NativeParallelFor( num_threads, parallel_gets<T>(q_copy, t) );
33451c0b2f7Stbbdev g.wait_for_all();
33551c0b2f7Stbbdev CHECK_MESSAGE( t.validate_touches(), "" );
33651c0b2f7Stbbdev }
33751c0b2f7Stbbdev j = bogus_value;
33851c0b2f7Stbbdev CHECK_MESSAGE( q.try_get( j ) == false, "" );
33951c0b2f7Stbbdev CHECK_MESSAGE( j == bogus_value, "" );
34051c0b2f7Stbbdev CHECK_MESSAGE( q_copy.try_get( j ) == false, "" );
34151c0b2f7Stbbdev CHECK_MESSAGE( j == bogus_value, "" );
34251c0b2f7Stbbdev }
34351c0b2f7Stbbdev
34451c0b2f7Stbbdev return 0;
34551c0b2f7Stbbdev }
34651c0b2f7Stbbdev
34751c0b2f7Stbbdev //
34851c0b2f7Stbbdev // Tests
34951c0b2f7Stbbdev //
35051c0b2f7Stbbdev // Predecessors cannot be registered
35151c0b2f7Stbbdev // Empty Q rejects item requests
35251c0b2f7Stbbdev // Single serial sender, items in FIFO order
35351c0b2f7Stbbdev // Chained Qs ( 2 & 3 ), single sender, items at last Q in FIFO order
35451c0b2f7Stbbdev //
35551c0b2f7Stbbdev
35651c0b2f7Stbbdev template< typename T >
test_serial()35751c0b2f7Stbbdev int test_serial() {
35851c0b2f7Stbbdev tbb::flow::graph g;
35951c0b2f7Stbbdev tbb::flow::queue_node<T> q(g);
36051c0b2f7Stbbdev tbb::flow::queue_node<T> q2(g);
36151c0b2f7Stbbdev { // destroy the graph after manipulating it, and see if all the items in the buffers
36251c0b2f7Stbbdev // have been destroyed before the graph
36351c0b2f7Stbbdev Checker<T> my_check; // if CheckType< U > count constructions and destructions
36451c0b2f7Stbbdev T bogus_value(-1);
36551c0b2f7Stbbdev T j = bogus_value;
36651c0b2f7Stbbdev
36751c0b2f7Stbbdev //
36851c0b2f7Stbbdev // Rejects attempts to add / remove predecessor
36951c0b2f7Stbbdev // Rejects request from empty Q
37051c0b2f7Stbbdev //
37151c0b2f7Stbbdev CHECK_MESSAGE( register_predecessor( q, q2 ) == false, "" );
37251c0b2f7Stbbdev CHECK_MESSAGE( remove_predecessor( q, q2 ) == false, "" );
37351c0b2f7Stbbdev CHECK_MESSAGE( q.try_get( j ) == false, "" );
37451c0b2f7Stbbdev CHECK_MESSAGE( j == bogus_value, "" );
37551c0b2f7Stbbdev
37651c0b2f7Stbbdev //
37751c0b2f7Stbbdev // Simple puts and gets
37851c0b2f7Stbbdev //
37951c0b2f7Stbbdev
38051c0b2f7Stbbdev for (int i = 0; i < N; ++i) {
38151c0b2f7Stbbdev bool msg = q.try_put( T(i) );
38251c0b2f7Stbbdev CHECK_MESSAGE( msg == true, "" );
38351c0b2f7Stbbdev }
38451c0b2f7Stbbdev
38551c0b2f7Stbbdev
38651c0b2f7Stbbdev for (int i = 0; i < N; ++i) {
38751c0b2f7Stbbdev j = bogus_value;
38851c0b2f7Stbbdev spin_try_get( q, j );
38951c0b2f7Stbbdev CHECK_MESSAGE( i == j, "" );
39051c0b2f7Stbbdev }
39151c0b2f7Stbbdev j = bogus_value;
39251c0b2f7Stbbdev g.wait_for_all();
39351c0b2f7Stbbdev CHECK_MESSAGE( q.try_get( j ) == false, "" );
39451c0b2f7Stbbdev CHECK_MESSAGE( j == bogus_value, "" );
39551c0b2f7Stbbdev
39651c0b2f7Stbbdev tbb::flow::make_edge( q, q2 );
39751c0b2f7Stbbdev
39851c0b2f7Stbbdev for (int i = 0; i < N; ++i) {
39951c0b2f7Stbbdev bool msg = q.try_put( T(i) );
40051c0b2f7Stbbdev CHECK_MESSAGE( msg == true, "" );
40151c0b2f7Stbbdev }
40251c0b2f7Stbbdev
40351c0b2f7Stbbdev
40451c0b2f7Stbbdev for (int i = 0; i < N; ++i) {
40551c0b2f7Stbbdev j = bogus_value;
40651c0b2f7Stbbdev spin_try_get( q2, j );
40751c0b2f7Stbbdev CHECK_MESSAGE( i == j, "" );
40851c0b2f7Stbbdev }
40951c0b2f7Stbbdev j = bogus_value;
41051c0b2f7Stbbdev g.wait_for_all();
41151c0b2f7Stbbdev CHECK_MESSAGE( q.try_get( j ) == false, "" );
41251c0b2f7Stbbdev g.wait_for_all();
41351c0b2f7Stbbdev CHECK_MESSAGE( q2.try_get( j ) == false, "" );
41451c0b2f7Stbbdev CHECK_MESSAGE( j == bogus_value, "" );
41551c0b2f7Stbbdev
41651c0b2f7Stbbdev tbb::flow::remove_edge( q, q2 );
41751c0b2f7Stbbdev CHECK_MESSAGE( q.try_put( 1 ) == true, "" );
41851c0b2f7Stbbdev g.wait_for_all();
41951c0b2f7Stbbdev CHECK_MESSAGE( q2.try_get( j ) == false, "" );
42051c0b2f7Stbbdev CHECK_MESSAGE( j == bogus_value, "" );
42151c0b2f7Stbbdev g.wait_for_all();
42251c0b2f7Stbbdev CHECK_MESSAGE( q.try_get( j ) == true, "" );
42351c0b2f7Stbbdev CHECK_MESSAGE( j == 1, "" );
42451c0b2f7Stbbdev
42551c0b2f7Stbbdev tbb::flow::queue_node<T> q3(g);
42651c0b2f7Stbbdev tbb::flow::make_edge( q, q2 );
42751c0b2f7Stbbdev tbb::flow::make_edge( q2, q3 );
42851c0b2f7Stbbdev
42951c0b2f7Stbbdev for (int i = 0; i < N; ++i) {
43051c0b2f7Stbbdev bool msg = q.try_put( T(i) );
43151c0b2f7Stbbdev CHECK_MESSAGE( msg == true, "" );
43251c0b2f7Stbbdev }
43351c0b2f7Stbbdev
43451c0b2f7Stbbdev for (int i = 0; i < N; ++i) {
43551c0b2f7Stbbdev j = bogus_value;
43651c0b2f7Stbbdev spin_try_get( q3, j );
43751c0b2f7Stbbdev CHECK_MESSAGE( i == j, "" );
43851c0b2f7Stbbdev }
43951c0b2f7Stbbdev j = bogus_value;
44051c0b2f7Stbbdev g.wait_for_all();
44151c0b2f7Stbbdev CHECK_MESSAGE( q.try_get( j ) == false, "" );
44251c0b2f7Stbbdev g.wait_for_all();
44351c0b2f7Stbbdev CHECK_MESSAGE( q2.try_get( j ) == false, "" );
44451c0b2f7Stbbdev g.wait_for_all();
44551c0b2f7Stbbdev CHECK_MESSAGE( q3.try_get( j ) == false, "" );
44651c0b2f7Stbbdev CHECK_MESSAGE( j == bogus_value, "" );
44751c0b2f7Stbbdev
44851c0b2f7Stbbdev tbb::flow::remove_edge( q, q2 );
44951c0b2f7Stbbdev CHECK_MESSAGE( q.try_put( 1 ) == true, "" );
45051c0b2f7Stbbdev g.wait_for_all();
45151c0b2f7Stbbdev CHECK_MESSAGE( q2.try_get( j ) == false, "" );
45251c0b2f7Stbbdev CHECK_MESSAGE( j == bogus_value, "" );
45351c0b2f7Stbbdev g.wait_for_all();
45451c0b2f7Stbbdev CHECK_MESSAGE( q3.try_get( j ) == false, "" );
45551c0b2f7Stbbdev CHECK_MESSAGE( j == bogus_value, "" );
45651c0b2f7Stbbdev g.wait_for_all();
45751c0b2f7Stbbdev CHECK_MESSAGE( q.try_get( j ) == true, "" );
45851c0b2f7Stbbdev CHECK_MESSAGE( j == 1, "" );
45951c0b2f7Stbbdev }
46051c0b2f7Stbbdev
46151c0b2f7Stbbdev return 0;
46251c0b2f7Stbbdev }
46351c0b2f7Stbbdev
46451c0b2f7Stbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
46551c0b2f7Stbbdev #include <array>
46651c0b2f7Stbbdev #include <vector>
test_follows_and_precedes_api()46751c0b2f7Stbbdev void test_follows_and_precedes_api() {
46851c0b2f7Stbbdev std::array<int, 3> messages_for_follows = { {0, 1, 2} };
46951c0b2f7Stbbdev std::vector<int> messages_for_precedes = {0, 1, 2};
47051c0b2f7Stbbdev
47151c0b2f7Stbbdev follows_and_precedes_testing::test_follows <int, tbb::flow::queue_node<int>>(messages_for_follows);
47251c0b2f7Stbbdev follows_and_precedes_testing::test_precedes <int, tbb::flow::queue_node<int>>(messages_for_precedes);
47351c0b2f7Stbbdev }
47451c0b2f7Stbbdev #endif
47551c0b2f7Stbbdev
47651c0b2f7Stbbdev #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
test_deduction_guides()47751c0b2f7Stbbdev void test_deduction_guides() {
47851c0b2f7Stbbdev using namespace tbb::flow;
47951c0b2f7Stbbdev graph g;
48051c0b2f7Stbbdev broadcast_node<int> br(g);
48151c0b2f7Stbbdev queue_node<int> q0(g);
48251c0b2f7Stbbdev
48351c0b2f7Stbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
48451c0b2f7Stbbdev queue_node q1(follows(br));
48551c0b2f7Stbbdev static_assert(std::is_same_v<decltype(q1), queue_node<int>>);
48651c0b2f7Stbbdev
48751c0b2f7Stbbdev queue_node q2(precedes(br));
48851c0b2f7Stbbdev static_assert(std::is_same_v<decltype(q2), queue_node<int>>);
48951c0b2f7Stbbdev #endif
49051c0b2f7Stbbdev
49151c0b2f7Stbbdev queue_node q3(q0);
49251c0b2f7Stbbdev static_assert(std::is_same_v<decltype(q3), queue_node<int>>);
49351c0b2f7Stbbdev g.wait_for_all();
49451c0b2f7Stbbdev }
49551c0b2f7Stbbdev #endif
49651c0b2f7Stbbdev
49751c0b2f7Stbbdev //! Test serial, parallel behavior and reservation under parallelism
49851c0b2f7Stbbdev //! \brief \ref requirement \ref error_guessing
49951c0b2f7Stbbdev TEST_CASE("Parallel, serial test"){
50051c0b2f7Stbbdev for (int p = 2; p <= 4; ++p) {
501*552f342bSPavel tbb::global_control thread_limit(tbb::global_control::max_allowed_parallelism, p);
50251c0b2f7Stbbdev tbb::task_arena arena(p);
50351c0b2f7Stbbdev arena.execute(
__anon9007ffb40202() 50451c0b2f7Stbbdev [&]() {
50551c0b2f7Stbbdev test_serial<int>();
50651c0b2f7Stbbdev test_serial<CheckType<int> >();
50751c0b2f7Stbbdev test_parallel<int>(p);
50851c0b2f7Stbbdev test_parallel<CheckType<int> >(p);
50951c0b2f7Stbbdev }
51051c0b2f7Stbbdev );
51151c0b2f7Stbbdev }
51251c0b2f7Stbbdev }
51351c0b2f7Stbbdev
51451c0b2f7Stbbdev //! Test reset and cancellation
51551c0b2f7Stbbdev //! \brief \ref error_guessing
51651c0b2f7Stbbdev TEST_CASE("Resets test"){
51751c0b2f7Stbbdev INFO("Testing resets\n");
51851c0b2f7Stbbdev test_resets<int, tbb::flow::queue_node<int> >();
51951c0b2f7Stbbdev test_resets<float, tbb::flow::queue_node<float> >();
52051c0b2f7Stbbdev }
52151c0b2f7Stbbdev
52251c0b2f7Stbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
52351c0b2f7Stbbdev //! Test follows and precedes API
52451c0b2f7Stbbdev //! \brief \ref error_guessing
52551c0b2f7Stbbdev TEST_CASE("Test follows and precedes API"){
52651c0b2f7Stbbdev test_follows_and_precedes_api();
52751c0b2f7Stbbdev }
52851c0b2f7Stbbdev #endif
52951c0b2f7Stbbdev
53051c0b2f7Stbbdev #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
53151c0b2f7Stbbdev //! Test decution guides
53251c0b2f7Stbbdev //! \brief \ref requirement
53351c0b2f7Stbbdev TEST_CASE("Deduction guides"){
53451c0b2f7Stbbdev test_deduction_guides();
53551c0b2f7Stbbdev }
53651c0b2f7Stbbdev #endif
53751c0b2f7Stbbdev
53851c0b2f7Stbbdev //! Test operations on a reserved queue_node
53951c0b2f7Stbbdev //! \brief \ref error_guessing
54051c0b2f7Stbbdev TEST_CASE("queue_node with reservation"){
54151c0b2f7Stbbdev tbb::flow::graph g;
54251c0b2f7Stbbdev
54351c0b2f7Stbbdev tbb::flow::queue_node<int> q(g);
54451c0b2f7Stbbdev
54551c0b2f7Stbbdev bool res = q.try_put(42);
54651c0b2f7Stbbdev CHECK_MESSAGE( res, "queue_node must accept input." );
54751c0b2f7Stbbdev
54851c0b2f7Stbbdev int val = 1;
54951c0b2f7Stbbdev res = q.try_reserve(val);
55051c0b2f7Stbbdev CHECK_MESSAGE( res, "queue_node must reserve as it has an item." );
55151c0b2f7Stbbdev CHECK_MESSAGE( (val == 42), "queue_node must reserve once passed item." );
55251c0b2f7Stbbdev
55351c0b2f7Stbbdev int out_arg = -1;
55451c0b2f7Stbbdev CHECK_MESSAGE((q.try_reserve(out_arg) == false), "Reserving a reserved node should fail.");
55551c0b2f7Stbbdev CHECK_MESSAGE((out_arg == -1), "Reserving a reserved node should not update its argument.");
55651c0b2f7Stbbdev
55751c0b2f7Stbbdev out_arg = -1;
55851c0b2f7Stbbdev CHECK_MESSAGE((q.try_get(out_arg) == false), "Getting from reserved node should fail.");
55951c0b2f7Stbbdev CHECK_MESSAGE((out_arg == -1), "Getting from reserved node should not update its argument.");
56051c0b2f7Stbbdev g.wait_for_all();
56151c0b2f7Stbbdev }
562