151c0b2f7Stbbdev /* 2*b15aabb3Stbbdev Copyright (c) 2005-2021 Intel Corporation 351c0b2f7Stbbdev 451c0b2f7Stbbdev Licensed under the Apache License, Version 2.0 (the "License"); 551c0b2f7Stbbdev you may not use this file except in compliance with the License. 651c0b2f7Stbbdev You may obtain a copy of the License at 751c0b2f7Stbbdev 851c0b2f7Stbbdev http://www.apache.org/licenses/LICENSE-2.0 951c0b2f7Stbbdev 1051c0b2f7Stbbdev Unless required by applicable law or agreed to in writing, software 1151c0b2f7Stbbdev distributed under the License is distributed on an "AS IS" BASIS, 1251c0b2f7Stbbdev WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 1351c0b2f7Stbbdev See the License for the specific language governing permissions and 1451c0b2f7Stbbdev limitations under the License. 1551c0b2f7Stbbdev */ 1651c0b2f7Stbbdev 1751c0b2f7Stbbdev // TODO: Add overlapping put / receive tests 1851c0b2f7Stbbdev 1951c0b2f7Stbbdev #include "common/config.h" 2051c0b2f7Stbbdev 2151c0b2f7Stbbdev // TODO revamp: move parts dependent on __TBB_EXTRA_DEBUG into separate test(s) since having these 2251c0b2f7Stbbdev // parts in all of tests might make testing of the product, which is different from what is actually 2351c0b2f7Stbbdev // released. 2451c0b2f7Stbbdev #define __TBB_EXTRA_DEBUG 1 2551c0b2f7Stbbdev #include "tbb/flow_graph.h" 2651c0b2f7Stbbdev 2751c0b2f7Stbbdev #include "common/test.h" 2851c0b2f7Stbbdev #include "common/utils.h" 2951c0b2f7Stbbdev #include "common/utils_assert.h" 3051c0b2f7Stbbdev #include "common/checktype.h" 3151c0b2f7Stbbdev #include "common/graph_utils.h" 3251c0b2f7Stbbdev #include "common/test_follows_and_precedes_api.h" 3351c0b2f7Stbbdev 3451c0b2f7Stbbdev #include <cstdio> 3551c0b2f7Stbbdev 3651c0b2f7Stbbdev 3751c0b2f7Stbbdev //! \file test_queue_node.cpp 3851c0b2f7Stbbdev //! \brief Test for [flow_graph.queue_node] specification 3951c0b2f7Stbbdev 4051c0b2f7Stbbdev 4151c0b2f7Stbbdev #define N 1000 4251c0b2f7Stbbdev #define C 10 4351c0b2f7Stbbdev 4451c0b2f7Stbbdev template< typename T > 4551c0b2f7Stbbdev void spin_try_get( tbb::flow::queue_node<T> &q, T &value ) { 46*b15aabb3Stbbdev int count = 0; 47*b15aabb3Stbbdev while ( q.try_get(value) != true ) { 48*b15aabb3Stbbdev if (count < 1000000) { 49*b15aabb3Stbbdev ++count; 50*b15aabb3Stbbdev } 51*b15aabb3Stbbdev if (count == 1000000) { 52*b15aabb3Stbbdev // Perhaps, we observe the missed wakeup. Enqueue a task to wake up threads. 53*b15aabb3Stbbdev tbb::task_arena a(tbb::task_arena::attach{}); 54*b15aabb3Stbbdev a.enqueue([]{}); 55*b15aabb3Stbbdev ++count; 56*b15aabb3Stbbdev } 57*b15aabb3Stbbdev } 5851c0b2f7Stbbdev } 5951c0b2f7Stbbdev 6051c0b2f7Stbbdev template< typename T > 6151c0b2f7Stbbdev void check_item( T* next_value, T &value ) { 6251c0b2f7Stbbdev int tid = value / N; 6351c0b2f7Stbbdev int offset = value % N; 6451c0b2f7Stbbdev CHECK_MESSAGE( next_value[tid] == T(offset), "" ); 6551c0b2f7Stbbdev ++next_value[tid]; 6651c0b2f7Stbbdev } 6751c0b2f7Stbbdev 6851c0b2f7Stbbdev template< typename T > 6951c0b2f7Stbbdev struct parallel_puts : utils::NoAssign { 7051c0b2f7Stbbdev 7151c0b2f7Stbbdev tbb::flow::queue_node<T> &my_q; 7251c0b2f7Stbbdev 7351c0b2f7Stbbdev parallel_puts( tbb::flow::queue_node<T> &q ) : my_q(q) {} 7451c0b2f7Stbbdev 7551c0b2f7Stbbdev void operator()(int i) const { 7651c0b2f7Stbbdev for (int j = 0; j < N; ++j) { 7751c0b2f7Stbbdev bool msg = my_q.try_put( T(N*i + j) ); 7851c0b2f7Stbbdev CHECK_MESSAGE( msg == true, "" ); 7951c0b2f7Stbbdev } 8051c0b2f7Stbbdev } 8151c0b2f7Stbbdev 8251c0b2f7Stbbdev }; 8351c0b2f7Stbbdev 8451c0b2f7Stbbdev template< typename T > 8551c0b2f7Stbbdev struct touches { 8651c0b2f7Stbbdev 8751c0b2f7Stbbdev bool **my_touches; 8851c0b2f7Stbbdev T **my_last_touch; 8951c0b2f7Stbbdev int my_num_threads; 9051c0b2f7Stbbdev 9151c0b2f7Stbbdev touches( int num_threads ) : my_num_threads(num_threads) { 9251c0b2f7Stbbdev my_last_touch = new T* [my_num_threads]; 9351c0b2f7Stbbdev my_touches = new bool* [my_num_threads]; 9451c0b2f7Stbbdev for ( int p = 0; p < my_num_threads; ++p) { 9551c0b2f7Stbbdev my_last_touch[p] = new T[my_num_threads]; 9651c0b2f7Stbbdev for ( int p2 = 0; p2 < my_num_threads; ++p2) 9751c0b2f7Stbbdev my_last_touch[p][p2] = -1; 9851c0b2f7Stbbdev 9951c0b2f7Stbbdev my_touches[p] = new bool[N*my_num_threads]; 10051c0b2f7Stbbdev for ( int n = 0; n < N*my_num_threads; ++n) 10151c0b2f7Stbbdev my_touches[p][n] = false; 10251c0b2f7Stbbdev } 10351c0b2f7Stbbdev } 10451c0b2f7Stbbdev 10551c0b2f7Stbbdev ~touches() { 10651c0b2f7Stbbdev for ( int p = 0; p < my_num_threads; ++p) { 10751c0b2f7Stbbdev delete [] my_touches[p]; 10851c0b2f7Stbbdev delete [] my_last_touch[p]; 10951c0b2f7Stbbdev } 11051c0b2f7Stbbdev delete [] my_touches; 11151c0b2f7Stbbdev delete [] my_last_touch; 11251c0b2f7Stbbdev } 11351c0b2f7Stbbdev 11451c0b2f7Stbbdev bool check( int tid, T v ) { 11551c0b2f7Stbbdev int v_tid = v / N; 11651c0b2f7Stbbdev if ( my_touches[tid][v] != false ) { 11751c0b2f7Stbbdev printf("Error: value seen twice by local thread\n"); 11851c0b2f7Stbbdev return false; 11951c0b2f7Stbbdev } 12051c0b2f7Stbbdev if ( v <= my_last_touch[tid][v_tid] ) { 12151c0b2f7Stbbdev printf("Error: value seen in wrong order by local thread\n"); 12251c0b2f7Stbbdev return false; 12351c0b2f7Stbbdev } 12451c0b2f7Stbbdev my_last_touch[tid][v_tid] = v; 12551c0b2f7Stbbdev my_touches[tid][v] = true; 12651c0b2f7Stbbdev return true; 12751c0b2f7Stbbdev } 12851c0b2f7Stbbdev 12951c0b2f7Stbbdev bool validate_touches() { 13051c0b2f7Stbbdev bool *all_touches = new bool[N*my_num_threads]; 13151c0b2f7Stbbdev for ( int n = 0; n < N*my_num_threads; ++n) 13251c0b2f7Stbbdev all_touches[n] = false; 13351c0b2f7Stbbdev 13451c0b2f7Stbbdev for ( int p = 0; p < my_num_threads; ++p) { 13551c0b2f7Stbbdev for ( int n = 0; n < N*my_num_threads; ++n) { 13651c0b2f7Stbbdev if ( my_touches[p][n] == true ) { 13751c0b2f7Stbbdev CHECK_MESSAGE( ( all_touches[n] == false), "value see by more than one thread\n" ); 13851c0b2f7Stbbdev all_touches[n] = true; 13951c0b2f7Stbbdev } 14051c0b2f7Stbbdev } 14151c0b2f7Stbbdev } 14251c0b2f7Stbbdev for ( int n = 0; n < N*my_num_threads; ++n) { 14351c0b2f7Stbbdev if ( !all_touches[n] ) 14451c0b2f7Stbbdev printf("No touch at %d, my_num_threads = %d\n", n, my_num_threads); 14551c0b2f7Stbbdev //CHECK_MESSAGE( ( all_touches[n] == true), "value not seen by any thread\n" ); 14651c0b2f7Stbbdev } 14751c0b2f7Stbbdev delete [] all_touches; 14851c0b2f7Stbbdev return true; 14951c0b2f7Stbbdev } 15051c0b2f7Stbbdev 15151c0b2f7Stbbdev }; 15251c0b2f7Stbbdev 15351c0b2f7Stbbdev template< typename T > 15451c0b2f7Stbbdev struct parallel_gets : utils::NoAssign { 15551c0b2f7Stbbdev 15651c0b2f7Stbbdev tbb::flow::queue_node<T> &my_q; 15751c0b2f7Stbbdev touches<T> &my_touches; 15851c0b2f7Stbbdev 15951c0b2f7Stbbdev parallel_gets( tbb::flow::queue_node<T> &q, touches<T> &t) : my_q(q), my_touches(t) {} 16051c0b2f7Stbbdev 16151c0b2f7Stbbdev void operator()(int tid) const { 16251c0b2f7Stbbdev for (int j = 0; j < N; ++j) { 16351c0b2f7Stbbdev T v; 16451c0b2f7Stbbdev spin_try_get( my_q, v ); 16551c0b2f7Stbbdev my_touches.check( tid, v ); 16651c0b2f7Stbbdev } 16751c0b2f7Stbbdev } 16851c0b2f7Stbbdev 16951c0b2f7Stbbdev }; 17051c0b2f7Stbbdev 17151c0b2f7Stbbdev template< typename T > 17251c0b2f7Stbbdev struct parallel_put_get : utils::NoAssign { 17351c0b2f7Stbbdev 17451c0b2f7Stbbdev tbb::flow::queue_node<T> &my_q; 17551c0b2f7Stbbdev touches<T> &my_touches; 17651c0b2f7Stbbdev 17751c0b2f7Stbbdev parallel_put_get( tbb::flow::queue_node<T> &q, touches<T> &t ) : my_q(q), my_touches(t) {} 17851c0b2f7Stbbdev 17951c0b2f7Stbbdev void operator()(int tid) const { 18051c0b2f7Stbbdev 18151c0b2f7Stbbdev for ( int i = 0; i < N; i+=C ) { 18251c0b2f7Stbbdev int j_end = ( N < i + C ) ? N : i + C; 18351c0b2f7Stbbdev // dump about C values into the Q 18451c0b2f7Stbbdev for ( int j = i; j < j_end; ++j ) { 18551c0b2f7Stbbdev CHECK_MESSAGE( my_q.try_put( T (N*tid + j ) ) == true, "" ); 18651c0b2f7Stbbdev } 18751c0b2f7Stbbdev // receiver about C values from the Q 18851c0b2f7Stbbdev for ( int j = i; j < j_end; ++j ) { 18951c0b2f7Stbbdev T v; 19051c0b2f7Stbbdev spin_try_get( my_q, v ); 19151c0b2f7Stbbdev my_touches.check( tid, v ); 19251c0b2f7Stbbdev } 19351c0b2f7Stbbdev } 19451c0b2f7Stbbdev } 19551c0b2f7Stbbdev 19651c0b2f7Stbbdev }; 19751c0b2f7Stbbdev 19851c0b2f7Stbbdev // 19951c0b2f7Stbbdev // Tests 20051c0b2f7Stbbdev // 20151c0b2f7Stbbdev // Item can be reserved, released, consumed ( single serial receiver ) 20251c0b2f7Stbbdev // 20351c0b2f7Stbbdev template< typename T > 20451c0b2f7Stbbdev int test_reservation() { 20551c0b2f7Stbbdev tbb::flow::graph g; 20651c0b2f7Stbbdev T bogus_value(-1); 20751c0b2f7Stbbdev 20851c0b2f7Stbbdev // Simple tests 20951c0b2f7Stbbdev tbb::flow::queue_node<T> q(g); 21051c0b2f7Stbbdev 21151c0b2f7Stbbdev q.try_put(T(1)); 21251c0b2f7Stbbdev q.try_put(T(2)); 21351c0b2f7Stbbdev q.try_put(T(3)); 21451c0b2f7Stbbdev 21551c0b2f7Stbbdev T v; 21651c0b2f7Stbbdev CHECK_MESSAGE( q.reserve_item(v) == true, "" ); 21751c0b2f7Stbbdev CHECK_MESSAGE( v == T(1), "" ); 21851c0b2f7Stbbdev CHECK_MESSAGE( q.release_reservation() == true, "" ); 21951c0b2f7Stbbdev v = bogus_value; 22051c0b2f7Stbbdev g.wait_for_all(); 22151c0b2f7Stbbdev CHECK_MESSAGE( q.reserve_item(v) == true, "" ); 22251c0b2f7Stbbdev CHECK_MESSAGE( v == T(1), "" ); 22351c0b2f7Stbbdev CHECK_MESSAGE( q.consume_reservation() == true, "" ); 22451c0b2f7Stbbdev v = bogus_value; 22551c0b2f7Stbbdev g.wait_for_all(); 22651c0b2f7Stbbdev 22751c0b2f7Stbbdev CHECK_MESSAGE( q.try_get(v) == true, "" ); 22851c0b2f7Stbbdev CHECK_MESSAGE( v == T(2), "" ); 22951c0b2f7Stbbdev v = bogus_value; 23051c0b2f7Stbbdev g.wait_for_all(); 23151c0b2f7Stbbdev 23251c0b2f7Stbbdev CHECK_MESSAGE( q.reserve_item(v) == true, "" ); 23351c0b2f7Stbbdev CHECK_MESSAGE( v == T(3), "" ); 23451c0b2f7Stbbdev CHECK_MESSAGE( q.release_reservation() == true, "" ); 23551c0b2f7Stbbdev v = bogus_value; 23651c0b2f7Stbbdev g.wait_for_all(); 23751c0b2f7Stbbdev CHECK_MESSAGE( q.reserve_item(v) == true, "" ); 23851c0b2f7Stbbdev CHECK_MESSAGE( v == T(3), "" ); 23951c0b2f7Stbbdev CHECK_MESSAGE( q.consume_reservation() == true, "" ); 24051c0b2f7Stbbdev v = bogus_value; 24151c0b2f7Stbbdev g.wait_for_all(); 24251c0b2f7Stbbdev 24351c0b2f7Stbbdev return 0; 24451c0b2f7Stbbdev } 24551c0b2f7Stbbdev 24651c0b2f7Stbbdev // 24751c0b2f7Stbbdev // Tests 24851c0b2f7Stbbdev // 24951c0b2f7Stbbdev // multiple parallel senders, items in FIFO (relatively to sender) order 25051c0b2f7Stbbdev // multiple parallel senders, multiple parallel receivers, items in FIFO order (relative to sender/receiver) and all items received 25151c0b2f7Stbbdev // * overlapped puts / gets 25251c0b2f7Stbbdev // * all puts finished before any getS 25351c0b2f7Stbbdev // 25451c0b2f7Stbbdev template< typename T > 25551c0b2f7Stbbdev int test_parallel(int num_threads) { 25651c0b2f7Stbbdev tbb::flow::graph g; 25751c0b2f7Stbbdev tbb::flow::queue_node<T> q(g); 25851c0b2f7Stbbdev tbb::flow::queue_node<T> q2(g); 25951c0b2f7Stbbdev tbb::flow::queue_node<T> q3(g); 26051c0b2f7Stbbdev { 26151c0b2f7Stbbdev Checker< T > my_check; 26251c0b2f7Stbbdev T bogus_value(-1); 26351c0b2f7Stbbdev T j = bogus_value; 26451c0b2f7Stbbdev utils::NativeParallelFor( num_threads, parallel_puts<T>(q) ); 26551c0b2f7Stbbdev 26651c0b2f7Stbbdev T *next_value = new T[num_threads]; 26751c0b2f7Stbbdev for (int tid = 0; tid < num_threads; ++tid) next_value[tid] = T(0); 26851c0b2f7Stbbdev 26951c0b2f7Stbbdev for (int i = 0; i < num_threads * N; ++i ) { 27051c0b2f7Stbbdev spin_try_get( q, j ); 27151c0b2f7Stbbdev check_item( next_value, j ); 27251c0b2f7Stbbdev j = bogus_value; 27351c0b2f7Stbbdev } 27451c0b2f7Stbbdev for (int tid = 0; tid < num_threads; ++tid) { 27551c0b2f7Stbbdev CHECK_MESSAGE( next_value[tid] == T(N), "" ); 27651c0b2f7Stbbdev } 27751c0b2f7Stbbdev delete[] next_value; 27851c0b2f7Stbbdev 27951c0b2f7Stbbdev j = bogus_value; 28051c0b2f7Stbbdev g.wait_for_all(); 28151c0b2f7Stbbdev CHECK_MESSAGE( q.try_get( j ) == false, "" ); 28251c0b2f7Stbbdev CHECK_MESSAGE( j == bogus_value, "" ); 28351c0b2f7Stbbdev 28451c0b2f7Stbbdev utils::NativeParallelFor( num_threads, parallel_puts<T>(q) ); 28551c0b2f7Stbbdev 28651c0b2f7Stbbdev { 28751c0b2f7Stbbdev touches< T > t( num_threads ); 28851c0b2f7Stbbdev utils::NativeParallelFor( num_threads, parallel_gets<T>(q, t) ); 28951c0b2f7Stbbdev g.wait_for_all(); 29051c0b2f7Stbbdev CHECK_MESSAGE( t.validate_touches(), "" ); 29151c0b2f7Stbbdev } 29251c0b2f7Stbbdev j = bogus_value; 29351c0b2f7Stbbdev CHECK_MESSAGE( q.try_get( j ) == false, "" ); 29451c0b2f7Stbbdev CHECK_MESSAGE( j == bogus_value, "" ); 29551c0b2f7Stbbdev 29651c0b2f7Stbbdev g.wait_for_all(); 29751c0b2f7Stbbdev { 29851c0b2f7Stbbdev touches< T > t2( num_threads ); 29951c0b2f7Stbbdev utils::NativeParallelFor( num_threads, parallel_put_get<T>(q, t2) ); 30051c0b2f7Stbbdev g.wait_for_all(); 30151c0b2f7Stbbdev CHECK_MESSAGE( t2.validate_touches(), "" ); 30251c0b2f7Stbbdev } 30351c0b2f7Stbbdev j = bogus_value; 30451c0b2f7Stbbdev CHECK_MESSAGE( q.try_get( j ) == false, "" ); 30551c0b2f7Stbbdev CHECK_MESSAGE( j == bogus_value, "" ); 30651c0b2f7Stbbdev 30751c0b2f7Stbbdev tbb::flow::make_edge( q, q2 ); 30851c0b2f7Stbbdev tbb::flow::make_edge( q2, q3 ); 30951c0b2f7Stbbdev 31051c0b2f7Stbbdev utils::NativeParallelFor( num_threads, parallel_puts<T>(q) ); 31151c0b2f7Stbbdev { 31251c0b2f7Stbbdev touches< T > t3( num_threads ); 31351c0b2f7Stbbdev utils::NativeParallelFor( num_threads, parallel_gets<T>(q3, t3) ); 31451c0b2f7Stbbdev g.wait_for_all(); 31551c0b2f7Stbbdev CHECK_MESSAGE( t3.validate_touches(), "" ); 31651c0b2f7Stbbdev } 31751c0b2f7Stbbdev j = bogus_value; 31851c0b2f7Stbbdev g.wait_for_all(); 31951c0b2f7Stbbdev CHECK_MESSAGE( q.try_get( j ) == false, "" ); 32051c0b2f7Stbbdev g.wait_for_all(); 32151c0b2f7Stbbdev CHECK_MESSAGE( q2.try_get( j ) == false, "" ); 32251c0b2f7Stbbdev g.wait_for_all(); 32351c0b2f7Stbbdev CHECK_MESSAGE( q3.try_get( j ) == false, "" ); 32451c0b2f7Stbbdev CHECK_MESSAGE( j == bogus_value, "" ); 32551c0b2f7Stbbdev 32651c0b2f7Stbbdev // test copy constructor 32751c0b2f7Stbbdev CHECK_MESSAGE( remove_successor( q, q2 ), "" ); 32851c0b2f7Stbbdev utils::NativeParallelFor( num_threads, parallel_puts<T>(q) ); 32951c0b2f7Stbbdev tbb::flow::queue_node<T> q_copy(q); 33051c0b2f7Stbbdev j = bogus_value; 33151c0b2f7Stbbdev g.wait_for_all(); 33251c0b2f7Stbbdev CHECK_MESSAGE( q_copy.try_get( j ) == false, "" ); 33351c0b2f7Stbbdev CHECK_MESSAGE( register_successor( q, q_copy ) == true, "" ); 33451c0b2f7Stbbdev { 33551c0b2f7Stbbdev touches< T > t( num_threads ); 33651c0b2f7Stbbdev utils::NativeParallelFor( num_threads, parallel_gets<T>(q_copy, t) ); 33751c0b2f7Stbbdev g.wait_for_all(); 33851c0b2f7Stbbdev CHECK_MESSAGE( t.validate_touches(), "" ); 33951c0b2f7Stbbdev } 34051c0b2f7Stbbdev j = bogus_value; 34151c0b2f7Stbbdev CHECK_MESSAGE( q.try_get( j ) == false, "" ); 34251c0b2f7Stbbdev CHECK_MESSAGE( j == bogus_value, "" ); 34351c0b2f7Stbbdev CHECK_MESSAGE( q_copy.try_get( j ) == false, "" ); 34451c0b2f7Stbbdev CHECK_MESSAGE( j == bogus_value, "" ); 34551c0b2f7Stbbdev } 34651c0b2f7Stbbdev 34751c0b2f7Stbbdev return 0; 34851c0b2f7Stbbdev } 34951c0b2f7Stbbdev 35051c0b2f7Stbbdev // 35151c0b2f7Stbbdev // Tests 35251c0b2f7Stbbdev // 35351c0b2f7Stbbdev // Predecessors cannot be registered 35451c0b2f7Stbbdev // Empty Q rejects item requests 35551c0b2f7Stbbdev // Single serial sender, items in FIFO order 35651c0b2f7Stbbdev // Chained Qs ( 2 & 3 ), single sender, items at last Q in FIFO order 35751c0b2f7Stbbdev // 35851c0b2f7Stbbdev 35951c0b2f7Stbbdev template< typename T > 36051c0b2f7Stbbdev int test_serial() { 36151c0b2f7Stbbdev tbb::flow::graph g; 36251c0b2f7Stbbdev tbb::flow::queue_node<T> q(g); 36351c0b2f7Stbbdev tbb::flow::queue_node<T> q2(g); 36451c0b2f7Stbbdev { // destroy the graph after manipulating it, and see if all the items in the buffers 36551c0b2f7Stbbdev // have been destroyed before the graph 36651c0b2f7Stbbdev Checker<T> my_check; // if CheckType< U > count constructions and destructions 36751c0b2f7Stbbdev T bogus_value(-1); 36851c0b2f7Stbbdev T j = bogus_value; 36951c0b2f7Stbbdev 37051c0b2f7Stbbdev // 37151c0b2f7Stbbdev // Rejects attempts to add / remove predecessor 37251c0b2f7Stbbdev // Rejects request from empty Q 37351c0b2f7Stbbdev // 37451c0b2f7Stbbdev CHECK_MESSAGE( register_predecessor( q, q2 ) == false, "" ); 37551c0b2f7Stbbdev CHECK_MESSAGE( remove_predecessor( q, q2 ) == false, "" ); 37651c0b2f7Stbbdev CHECK_MESSAGE( q.try_get( j ) == false, "" ); 37751c0b2f7Stbbdev CHECK_MESSAGE( j == bogus_value, "" ); 37851c0b2f7Stbbdev 37951c0b2f7Stbbdev // 38051c0b2f7Stbbdev // Simple puts and gets 38151c0b2f7Stbbdev // 38251c0b2f7Stbbdev 38351c0b2f7Stbbdev for (int i = 0; i < N; ++i) { 38451c0b2f7Stbbdev bool msg = q.try_put( T(i) ); 38551c0b2f7Stbbdev CHECK_MESSAGE( msg == true, "" ); 38651c0b2f7Stbbdev } 38751c0b2f7Stbbdev 38851c0b2f7Stbbdev 38951c0b2f7Stbbdev for (int i = 0; i < N; ++i) { 39051c0b2f7Stbbdev j = bogus_value; 39151c0b2f7Stbbdev spin_try_get( q, j ); 39251c0b2f7Stbbdev CHECK_MESSAGE( i == j, "" ); 39351c0b2f7Stbbdev } 39451c0b2f7Stbbdev j = bogus_value; 39551c0b2f7Stbbdev g.wait_for_all(); 39651c0b2f7Stbbdev CHECK_MESSAGE( q.try_get( j ) == false, "" ); 39751c0b2f7Stbbdev CHECK_MESSAGE( j == bogus_value, "" ); 39851c0b2f7Stbbdev 39951c0b2f7Stbbdev tbb::flow::make_edge( q, q2 ); 40051c0b2f7Stbbdev 40151c0b2f7Stbbdev for (int i = 0; i < N; ++i) { 40251c0b2f7Stbbdev bool msg = q.try_put( T(i) ); 40351c0b2f7Stbbdev CHECK_MESSAGE( msg == true, "" ); 40451c0b2f7Stbbdev } 40551c0b2f7Stbbdev 40651c0b2f7Stbbdev 40751c0b2f7Stbbdev for (int i = 0; i < N; ++i) { 40851c0b2f7Stbbdev j = bogus_value; 40951c0b2f7Stbbdev spin_try_get( q2, j ); 41051c0b2f7Stbbdev CHECK_MESSAGE( i == j, "" ); 41151c0b2f7Stbbdev } 41251c0b2f7Stbbdev j = bogus_value; 41351c0b2f7Stbbdev g.wait_for_all(); 41451c0b2f7Stbbdev CHECK_MESSAGE( q.try_get( j ) == false, "" ); 41551c0b2f7Stbbdev g.wait_for_all(); 41651c0b2f7Stbbdev CHECK_MESSAGE( q2.try_get( j ) == false, "" ); 41751c0b2f7Stbbdev CHECK_MESSAGE( j == bogus_value, "" ); 41851c0b2f7Stbbdev 41951c0b2f7Stbbdev tbb::flow::remove_edge( q, q2 ); 42051c0b2f7Stbbdev CHECK_MESSAGE( q.try_put( 1 ) == true, "" ); 42151c0b2f7Stbbdev g.wait_for_all(); 42251c0b2f7Stbbdev CHECK_MESSAGE( q2.try_get( j ) == false, "" ); 42351c0b2f7Stbbdev CHECK_MESSAGE( j == bogus_value, "" ); 42451c0b2f7Stbbdev g.wait_for_all(); 42551c0b2f7Stbbdev CHECK_MESSAGE( q.try_get( j ) == true, "" ); 42651c0b2f7Stbbdev CHECK_MESSAGE( j == 1, "" ); 42751c0b2f7Stbbdev 42851c0b2f7Stbbdev tbb::flow::queue_node<T> q3(g); 42951c0b2f7Stbbdev tbb::flow::make_edge( q, q2 ); 43051c0b2f7Stbbdev tbb::flow::make_edge( q2, q3 ); 43151c0b2f7Stbbdev 43251c0b2f7Stbbdev for (int i = 0; i < N; ++i) { 43351c0b2f7Stbbdev bool msg = q.try_put( T(i) ); 43451c0b2f7Stbbdev CHECK_MESSAGE( msg == true, "" ); 43551c0b2f7Stbbdev } 43651c0b2f7Stbbdev 43751c0b2f7Stbbdev for (int i = 0; i < N; ++i) { 43851c0b2f7Stbbdev j = bogus_value; 43951c0b2f7Stbbdev spin_try_get( q3, j ); 44051c0b2f7Stbbdev CHECK_MESSAGE( i == j, "" ); 44151c0b2f7Stbbdev } 44251c0b2f7Stbbdev j = bogus_value; 44351c0b2f7Stbbdev g.wait_for_all(); 44451c0b2f7Stbbdev CHECK_MESSAGE( q.try_get( j ) == false, "" ); 44551c0b2f7Stbbdev g.wait_for_all(); 44651c0b2f7Stbbdev CHECK_MESSAGE( q2.try_get( j ) == false, "" ); 44751c0b2f7Stbbdev g.wait_for_all(); 44851c0b2f7Stbbdev CHECK_MESSAGE( q3.try_get( j ) == false, "" ); 44951c0b2f7Stbbdev CHECK_MESSAGE( j == bogus_value, "" ); 45051c0b2f7Stbbdev 45151c0b2f7Stbbdev tbb::flow::remove_edge( q, q2 ); 45251c0b2f7Stbbdev CHECK_MESSAGE( q.try_put( 1 ) == true, "" ); 45351c0b2f7Stbbdev g.wait_for_all(); 45451c0b2f7Stbbdev CHECK_MESSAGE( q2.try_get( j ) == false, "" ); 45551c0b2f7Stbbdev CHECK_MESSAGE( j == bogus_value, "" ); 45651c0b2f7Stbbdev g.wait_for_all(); 45751c0b2f7Stbbdev CHECK_MESSAGE( q3.try_get( j ) == false, "" ); 45851c0b2f7Stbbdev CHECK_MESSAGE( j == bogus_value, "" ); 45951c0b2f7Stbbdev g.wait_for_all(); 46051c0b2f7Stbbdev CHECK_MESSAGE( q.try_get( j ) == true, "" ); 46151c0b2f7Stbbdev CHECK_MESSAGE( j == 1, "" ); 46251c0b2f7Stbbdev } 46351c0b2f7Stbbdev 46451c0b2f7Stbbdev return 0; 46551c0b2f7Stbbdev } 46651c0b2f7Stbbdev 46751c0b2f7Stbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 46851c0b2f7Stbbdev #include <array> 46951c0b2f7Stbbdev #include <vector> 47051c0b2f7Stbbdev void test_follows_and_precedes_api() { 47151c0b2f7Stbbdev std::array<int, 3> messages_for_follows = { {0, 1, 2} }; 47251c0b2f7Stbbdev std::vector<int> messages_for_precedes = {0, 1, 2}; 47351c0b2f7Stbbdev 47451c0b2f7Stbbdev follows_and_precedes_testing::test_follows <int, tbb::flow::queue_node<int>>(messages_for_follows); 47551c0b2f7Stbbdev follows_and_precedes_testing::test_precedes <int, tbb::flow::queue_node<int>>(messages_for_precedes); 47651c0b2f7Stbbdev } 47751c0b2f7Stbbdev #endif 47851c0b2f7Stbbdev 47951c0b2f7Stbbdev #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT 48051c0b2f7Stbbdev void test_deduction_guides() { 48151c0b2f7Stbbdev using namespace tbb::flow; 48251c0b2f7Stbbdev graph g; 48351c0b2f7Stbbdev broadcast_node<int> br(g); 48451c0b2f7Stbbdev queue_node<int> q0(g); 48551c0b2f7Stbbdev 48651c0b2f7Stbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 48751c0b2f7Stbbdev queue_node q1(follows(br)); 48851c0b2f7Stbbdev static_assert(std::is_same_v<decltype(q1), queue_node<int>>); 48951c0b2f7Stbbdev 49051c0b2f7Stbbdev queue_node q2(precedes(br)); 49151c0b2f7Stbbdev static_assert(std::is_same_v<decltype(q2), queue_node<int>>); 49251c0b2f7Stbbdev #endif 49351c0b2f7Stbbdev 49451c0b2f7Stbbdev queue_node q3(q0); 49551c0b2f7Stbbdev static_assert(std::is_same_v<decltype(q3), queue_node<int>>); 49651c0b2f7Stbbdev g.wait_for_all(); 49751c0b2f7Stbbdev } 49851c0b2f7Stbbdev #endif 49951c0b2f7Stbbdev 50051c0b2f7Stbbdev //! Test serial, parallel behavior and reservation under parallelism 50151c0b2f7Stbbdev //! \brief \ref requirement \ref error_guessing 50251c0b2f7Stbbdev TEST_CASE("Parallel, serial test"){ 50351c0b2f7Stbbdev for (int p = 2; p <= 4; ++p) { 50451c0b2f7Stbbdev tbb::task_arena arena(p); 50551c0b2f7Stbbdev arena.execute( 50651c0b2f7Stbbdev [&]() { 50751c0b2f7Stbbdev test_serial<int>(); 50851c0b2f7Stbbdev test_serial<CheckType<int> >(); 50951c0b2f7Stbbdev test_parallel<int>(p); 51051c0b2f7Stbbdev test_parallel<CheckType<int> >(p); 51151c0b2f7Stbbdev } 51251c0b2f7Stbbdev ); 51351c0b2f7Stbbdev } 51451c0b2f7Stbbdev } 51551c0b2f7Stbbdev 51651c0b2f7Stbbdev //! Test reset and cancellation 51751c0b2f7Stbbdev //! \brief \ref error_guessing 51851c0b2f7Stbbdev TEST_CASE("Resets test"){ 51951c0b2f7Stbbdev INFO("Testing resets\n"); 52051c0b2f7Stbbdev test_resets<int, tbb::flow::queue_node<int> >(); 52151c0b2f7Stbbdev test_resets<float, tbb::flow::queue_node<float> >(); 52251c0b2f7Stbbdev } 52351c0b2f7Stbbdev 52451c0b2f7Stbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 52551c0b2f7Stbbdev //! Test follows and precedes API 52651c0b2f7Stbbdev //! \brief \ref error_guessing 52751c0b2f7Stbbdev TEST_CASE("Test follows and precedes API"){ 52851c0b2f7Stbbdev test_follows_and_precedes_api(); 52951c0b2f7Stbbdev } 53051c0b2f7Stbbdev #endif 53151c0b2f7Stbbdev 53251c0b2f7Stbbdev #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT 53351c0b2f7Stbbdev //! Test decution guides 53451c0b2f7Stbbdev //! \brief \ref requirement 53551c0b2f7Stbbdev TEST_CASE("Deduction guides"){ 53651c0b2f7Stbbdev test_deduction_guides(); 53751c0b2f7Stbbdev } 53851c0b2f7Stbbdev #endif 53951c0b2f7Stbbdev 54051c0b2f7Stbbdev //! Test operations on a reserved queue_node 54151c0b2f7Stbbdev //! \brief \ref error_guessing 54251c0b2f7Stbbdev TEST_CASE("queue_node with reservation"){ 54351c0b2f7Stbbdev tbb::flow::graph g; 54451c0b2f7Stbbdev 54551c0b2f7Stbbdev tbb::flow::queue_node<int> q(g); 54651c0b2f7Stbbdev 54751c0b2f7Stbbdev bool res = q.try_put(42); 54851c0b2f7Stbbdev CHECK_MESSAGE( res, "queue_node must accept input." ); 54951c0b2f7Stbbdev 55051c0b2f7Stbbdev int val = 1; 55151c0b2f7Stbbdev res = q.try_reserve(val); 55251c0b2f7Stbbdev CHECK_MESSAGE( res, "queue_node must reserve as it has an item." ); 55351c0b2f7Stbbdev CHECK_MESSAGE( (val == 42), "queue_node must reserve once passed item." ); 55451c0b2f7Stbbdev 55551c0b2f7Stbbdev int out_arg = -1; 55651c0b2f7Stbbdev CHECK_MESSAGE((q.try_reserve(out_arg) == false), "Reserving a reserved node should fail."); 55751c0b2f7Stbbdev CHECK_MESSAGE((out_arg == -1), "Reserving a reserved node should not update its argument."); 55851c0b2f7Stbbdev 55951c0b2f7Stbbdev out_arg = -1; 56051c0b2f7Stbbdev CHECK_MESSAGE((q.try_get(out_arg) == false), "Getting from reserved node should fail."); 56151c0b2f7Stbbdev CHECK_MESSAGE((out_arg == -1), "Getting from reserved node should not update its argument."); 56251c0b2f7Stbbdev g.wait_for_all(); 56351c0b2f7Stbbdev } 564