xref: /oneTBB/test/tbb/test_queue_node.cpp (revision 552f342b)
1 /*
2     Copyright (c) 2005-2021 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 // TODO: Add overlapping put / receive tests
18 
19 #include "common/config.h"
20 
21 #include "tbb/flow_graph.h"
22 #include "tbb/global_control.h"
23 
24 #include "common/test.h"
25 #include "common/utils.h"
26 #include "common/utils_assert.h"
27 #include "common/checktype.h"
28 #include "common/graph_utils.h"
29 #include "common/test_follows_and_precedes_api.h"
30 
31 #include <cstdio>
32 
33 
34 //! \file test_queue_node.cpp
35 //! \brief Test for [flow_graph.queue_node] specification
36 
37 
38 #define N 1000
39 #define C 10
40 
41 template< typename T >
spin_try_get(tbb::flow::queue_node<T> & q,T & value)42 void spin_try_get( tbb::flow::queue_node<T> &q, T &value ) {
43     int count = 0;
44     while ( q.try_get(value) != true ) {
45         if (count < 1000000) {
46             ++count;
47         }
48         if (count == 1000000) {
49             // Perhaps, we observe the missed wakeup. Enqueue a task to wake up threads.
50             tbb::task_arena a(tbb::task_arena::attach{});
51             a.enqueue([]{});
52             ++count;
53         }
54     }
55 }
56 
57 template< typename T >
check_item(T * next_value,T & value)58 void check_item( T* next_value, T &value ) {
59     int tid = value / N;
60     int offset = value % N;
61     CHECK_MESSAGE( next_value[tid] == T(offset), "" );
62     ++next_value[tid];
63 }
64 
65 template< typename T >
66 struct parallel_puts : utils::NoAssign {
67 
68     tbb::flow::queue_node<T> &my_q;
69 
parallel_putsparallel_puts70     parallel_puts( tbb::flow::queue_node<T> &q ) : my_q(q) {}
71 
operator ()parallel_puts72     void operator()(int i) const {
73         for (int j = 0; j < N; ++j) {
74             bool msg = my_q.try_put( T(N*i + j) );
75             CHECK_MESSAGE( msg == true, "" );
76         }
77     }
78 
79 };
80 
81 template< typename T >
82 struct touches {
83 
84     bool **my_touches;
85     T **my_last_touch;
86     int my_num_threads;
87 
touchestouches88     touches( int num_threads ) : my_num_threads(num_threads) {
89         my_last_touch = new T* [my_num_threads];
90         my_touches = new bool* [my_num_threads];
91         for ( int p = 0; p < my_num_threads; ++p) {
92             my_last_touch[p] = new T[my_num_threads];
93             for ( int p2 = 0; p2 < my_num_threads; ++p2)
94                 my_last_touch[p][p2] = -1;
95 
96             my_touches[p] = new bool[N*my_num_threads];
97             for ( int n = 0; n < N*my_num_threads; ++n)
98                 my_touches[p][n] = false;
99         }
100     }
101 
~touchestouches102     ~touches() {
103         for ( int p = 0; p < my_num_threads; ++p) {
104             delete [] my_touches[p];
105             delete [] my_last_touch[p];
106         }
107         delete [] my_touches;
108         delete [] my_last_touch;
109     }
110 
checktouches111     bool check( int tid, T v ) {
112         int v_tid = v / N;
113         if ( my_touches[tid][v] != false ) {
114             printf("Error: value seen twice by local thread\n");
115             return false;
116         }
117         if ( v <= my_last_touch[tid][v_tid] ) {
118             printf("Error: value seen in wrong order by local thread\n");
119             return false;
120         }
121         my_last_touch[tid][v_tid] = v;
122         my_touches[tid][v] = true;
123         return true;
124     }
125 
validate_touchestouches126     bool validate_touches() {
127         bool *all_touches = new bool[N*my_num_threads];
128         for ( int n = 0; n < N*my_num_threads; ++n)
129             all_touches[n] = false;
130 
131         for ( int p = 0; p < my_num_threads; ++p) {
132             for ( int n = 0; n < N*my_num_threads; ++n) {
133                 if ( my_touches[p][n] == true ) {
134                     CHECK_MESSAGE( ( all_touches[n] == false), "value see by more than one thread\n" );
135                     all_touches[n] = true;
136                 }
137             }
138         }
139         for ( int n = 0; n < N*my_num_threads; ++n) {
140             if ( !all_touches[n] )
141                 printf("No touch at %d, my_num_threads = %d\n", n, my_num_threads);
142             //CHECK_MESSAGE( ( all_touches[n] == true), "value not seen by any thread\n" );
143         }
144         delete [] all_touches;
145         return true;
146     }
147 
148 };
149 
150 template< typename T >
151 struct parallel_gets : utils::NoAssign {
152 
153     tbb::flow::queue_node<T> &my_q;
154     touches<T> &my_touches;
155 
parallel_getsparallel_gets156     parallel_gets( tbb::flow::queue_node<T> &q, touches<T> &t) : my_q(q), my_touches(t) {}
157 
operator ()parallel_gets158     void operator()(int tid) const {
159         for (int j = 0; j < N; ++j) {
160             T v;
161             spin_try_get( my_q, v );
162             my_touches.check( tid, v );
163         }
164     }
165 
166 };
167 
168 template< typename T >
169 struct parallel_put_get : utils::NoAssign {
170 
171     tbb::flow::queue_node<T> &my_q;
172     touches<T> &my_touches;
173 
parallel_put_getparallel_put_get174     parallel_put_get( tbb::flow::queue_node<T> &q, touches<T> &t ) : my_q(q), my_touches(t) {}
175 
operator ()parallel_put_get176     void operator()(int tid) const {
177 
178         for ( int i = 0; i < N; i+=C ) {
179             int j_end = ( N < i + C ) ? N : i + C;
180             // dump about C values into the Q
181             for ( int j = i; j < j_end; ++j ) {
182                 CHECK_MESSAGE( my_q.try_put( T (N*tid + j ) ) == true, "" );
183             }
184             // receiver about C values from the Q
185             for ( int j = i; j < j_end; ++j ) {
186                 T v;
187                 spin_try_get( my_q, v );
188                 my_touches.check( tid, v );
189             }
190         }
191     }
192 
193 };
194 
195 //
196 // Tests
197 //
198 // Item can be reserved, released, consumed ( single serial receiver )
199 //
200 template< typename T >
test_reservation()201 int test_reservation() {
202     tbb::flow::graph g;
203     T bogus_value(-1);
204 
205     // Simple tests
206     tbb::flow::queue_node<T> q(g);
207 
208     q.try_put(T(1));
209     q.try_put(T(2));
210     q.try_put(T(3));
211 
212     T v;
213     CHECK_MESSAGE( q.reserve_item(v) == true, "" );
214     CHECK_MESSAGE( v == T(1), "" );
215     CHECK_MESSAGE( q.release_reservation() == true, "" );
216     v = bogus_value;
217     g.wait_for_all();
218     CHECK_MESSAGE( q.reserve_item(v) == true, "" );
219     CHECK_MESSAGE( v == T(1), "" );
220     CHECK_MESSAGE( q.consume_reservation() == true, "" );
221     v = bogus_value;
222     g.wait_for_all();
223 
224     CHECK_MESSAGE( q.try_get(v) == true, "" );
225     CHECK_MESSAGE( v == T(2), "" );
226     v = bogus_value;
227     g.wait_for_all();
228 
229     CHECK_MESSAGE( q.reserve_item(v) == true, "" );
230     CHECK_MESSAGE( v == T(3), "" );
231     CHECK_MESSAGE( q.release_reservation() == true, "" );
232     v = bogus_value;
233     g.wait_for_all();
234     CHECK_MESSAGE( q.reserve_item(v) == true, "" );
235     CHECK_MESSAGE( v == T(3), "" );
236     CHECK_MESSAGE( q.consume_reservation() == true, "" );
237     v = bogus_value;
238     g.wait_for_all();
239 
240     return 0;
241 }
242 
243 //
244 // Tests
245 //
246 // multiple parallel senders, items in FIFO (relatively to sender) order
247 // multiple parallel senders, multiple parallel receivers, items in FIFO order (relative to sender/receiver) and all items received
248 //   * overlapped puts / gets
249 //   * all puts finished before any getS
250 //
251 template< typename T >
test_parallel(int num_threads)252 int test_parallel(int num_threads) {
253     tbb::flow::graph g;
254     tbb::flow::queue_node<T> q(g);
255     tbb::flow::queue_node<T> q2(g);
256     tbb::flow::queue_node<T> q3(g);
257     {
258         Checker< T > my_check;
259         T bogus_value(-1);
260         T j = bogus_value;
261         utils::NativeParallelFor( num_threads, parallel_puts<T>(q) );
262 
263         T *next_value = new T[num_threads];
264         for (int tid = 0; tid < num_threads; ++tid) next_value[tid] = T(0);
265 
266         for (int i = 0; i < num_threads * N; ++i ) {
267             spin_try_get( q, j );
268             check_item( next_value, j );
269             j = bogus_value;
270         }
271         for (int tid = 0; tid < num_threads; ++tid)  {
272             CHECK_MESSAGE( next_value[tid] == T(N), "" );
273         }
274         delete[] next_value;
275 
276         j = bogus_value;
277         g.wait_for_all();
278         CHECK_MESSAGE( q.try_get( j ) == false, "" );
279         CHECK_MESSAGE( j == bogus_value, "" );
280 
281         utils::NativeParallelFor( num_threads, parallel_puts<T>(q) );
282 
283         {
284             touches< T > t( num_threads );
285             utils::NativeParallelFor( num_threads, parallel_gets<T>(q, t) );
286             g.wait_for_all();
287             CHECK_MESSAGE( t.validate_touches(), "" );
288         }
289         j = bogus_value;
290         CHECK_MESSAGE( q.try_get( j ) == false, "" );
291         CHECK_MESSAGE( j == bogus_value, "" );
292 
293         g.wait_for_all();
294         {
295             touches< T > t2( num_threads );
296             utils::NativeParallelFor( num_threads, parallel_put_get<T>(q, t2) );
297             g.wait_for_all();
298             CHECK_MESSAGE( t2.validate_touches(), "" );
299         }
300         j = bogus_value;
301         CHECK_MESSAGE( q.try_get( j ) == false, "" );
302         CHECK_MESSAGE( j == bogus_value, "" );
303 
304         tbb::flow::make_edge( q, q2 );
305         tbb::flow::make_edge( q2, q3 );
306 
307         utils::NativeParallelFor( num_threads, parallel_puts<T>(q) );
308         {
309             touches< T > t3( num_threads );
310             utils::NativeParallelFor( num_threads, parallel_gets<T>(q3, t3) );
311             g.wait_for_all();
312             CHECK_MESSAGE( t3.validate_touches(), "" );
313         }
314         j = bogus_value;
315         g.wait_for_all();
316         CHECK_MESSAGE( q.try_get( j ) == false, "" );
317         g.wait_for_all();
318         CHECK_MESSAGE( q2.try_get( j ) == false, "" );
319         g.wait_for_all();
320         CHECK_MESSAGE( q3.try_get( j ) == false, "" );
321         CHECK_MESSAGE( j == bogus_value, "" );
322 
323         // test copy constructor
324         CHECK_MESSAGE( remove_successor( q, q2 ), "" );
325         utils::NativeParallelFor( num_threads, parallel_puts<T>(q) );
326         tbb::flow::queue_node<T> q_copy(q);
327         j = bogus_value;
328         g.wait_for_all();
329         CHECK_MESSAGE( q_copy.try_get( j ) == false, "" );
330         CHECK_MESSAGE( register_successor( q, q_copy ) == true, "" );
331         {
332             touches< T > t( num_threads );
333             utils::NativeParallelFor( num_threads, parallel_gets<T>(q_copy, t) );
334             g.wait_for_all();
335             CHECK_MESSAGE( t.validate_touches(), "" );
336         }
337         j = bogus_value;
338         CHECK_MESSAGE( q.try_get( j ) == false, "" );
339         CHECK_MESSAGE( j == bogus_value, "" );
340         CHECK_MESSAGE( q_copy.try_get( j ) == false, "" );
341         CHECK_MESSAGE( j == bogus_value, "" );
342     }
343 
344     return 0;
345 }
346 
347 //
348 // Tests
349 //
350 // Predecessors cannot be registered
351 // Empty Q rejects item requests
352 // Single serial sender, items in FIFO order
353 // Chained Qs ( 2 & 3 ), single sender, items at last Q in FIFO order
354 //
355 
356 template< typename T >
test_serial()357 int test_serial() {
358     tbb::flow::graph g;
359     tbb::flow::queue_node<T> q(g);
360     tbb::flow::queue_node<T> q2(g);
361     {   // destroy the graph after manipulating it, and see if all the items in the buffers
362         // have been destroyed before the graph
363         Checker<T> my_check;  // if CheckType< U > count constructions and destructions
364         T bogus_value(-1);
365         T j = bogus_value;
366 
367         //
368         // Rejects attempts to add / remove predecessor
369         // Rejects request from empty Q
370         //
371         CHECK_MESSAGE( register_predecessor( q, q2 ) == false, "" );
372         CHECK_MESSAGE( remove_predecessor( q, q2 ) == false, "" );
373         CHECK_MESSAGE( q.try_get( j ) == false, "" );
374         CHECK_MESSAGE( j == bogus_value, "" );
375 
376         //
377         // Simple puts and gets
378         //
379 
380         for (int i = 0; i < N; ++i) {
381             bool msg = q.try_put( T(i) );
382             CHECK_MESSAGE( msg == true, "" );
383         }
384 
385 
386         for (int i = 0; i < N; ++i) {
387             j = bogus_value;
388             spin_try_get( q, j );
389             CHECK_MESSAGE( i == j, "" );
390         }
391         j = bogus_value;
392         g.wait_for_all();
393         CHECK_MESSAGE( q.try_get( j ) == false, "" );
394         CHECK_MESSAGE( j == bogus_value, "" );
395 
396         tbb::flow::make_edge( q, q2 );
397 
398         for (int i = 0; i < N; ++i) {
399             bool msg = q.try_put( T(i) );
400             CHECK_MESSAGE( msg == true, "" );
401         }
402 
403 
404         for (int i = 0; i < N; ++i) {
405             j = bogus_value;
406             spin_try_get( q2, j );
407             CHECK_MESSAGE( i == j, "" );
408         }
409         j = bogus_value;
410         g.wait_for_all();
411         CHECK_MESSAGE( q.try_get( j ) == false, "" );
412         g.wait_for_all();
413         CHECK_MESSAGE( q2.try_get( j ) == false, "" );
414         CHECK_MESSAGE( j == bogus_value, "" );
415 
416         tbb::flow::remove_edge( q, q2 );
417         CHECK_MESSAGE( q.try_put( 1 ) == true, "" );
418         g.wait_for_all();
419         CHECK_MESSAGE( q2.try_get( j ) == false, "" );
420         CHECK_MESSAGE( j == bogus_value, "" );
421         g.wait_for_all();
422         CHECK_MESSAGE( q.try_get( j ) == true, "" );
423         CHECK_MESSAGE( j == 1, "" );
424 
425         tbb::flow::queue_node<T> q3(g);
426         tbb::flow::make_edge( q, q2 );
427         tbb::flow::make_edge( q2, q3 );
428 
429         for (int i = 0; i < N; ++i) {
430             bool msg = q.try_put( T(i) );
431             CHECK_MESSAGE( msg == true, "" );
432         }
433 
434         for (int i = 0; i < N; ++i) {
435             j = bogus_value;
436             spin_try_get( q3, j );
437             CHECK_MESSAGE( i == j, "" );
438         }
439         j = bogus_value;
440         g.wait_for_all();
441         CHECK_MESSAGE( q.try_get( j ) == false, "" );
442         g.wait_for_all();
443         CHECK_MESSAGE( q2.try_get( j ) == false, "" );
444         g.wait_for_all();
445         CHECK_MESSAGE( q3.try_get( j ) == false, "" );
446         CHECK_MESSAGE( j == bogus_value, "" );
447 
448         tbb::flow::remove_edge( q,  q2 );
449         CHECK_MESSAGE( q.try_put( 1 ) == true, "" );
450         g.wait_for_all();
451         CHECK_MESSAGE( q2.try_get( j ) == false, "" );
452         CHECK_MESSAGE( j == bogus_value, "" );
453         g.wait_for_all();
454         CHECK_MESSAGE( q3.try_get( j ) == false, "" );
455         CHECK_MESSAGE( j == bogus_value, "" );
456         g.wait_for_all();
457         CHECK_MESSAGE( q.try_get( j ) == true, "" );
458         CHECK_MESSAGE( j == 1, "" );
459     }
460 
461     return 0;
462 }
463 
464 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
465 #include <array>
466 #include <vector>
test_follows_and_precedes_api()467 void test_follows_and_precedes_api() {
468     std::array<int, 3> messages_for_follows = { {0, 1, 2} };
469     std::vector<int> messages_for_precedes = {0, 1, 2};
470 
471     follows_and_precedes_testing::test_follows <int, tbb::flow::queue_node<int>>(messages_for_follows);
472     follows_and_precedes_testing::test_precedes <int, tbb::flow::queue_node<int>>(messages_for_precedes);
473 }
474 #endif
475 
476 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
test_deduction_guides()477 void test_deduction_guides() {
478     using namespace tbb::flow;
479     graph g;
480     broadcast_node<int> br(g);
481     queue_node<int> q0(g);
482 
483 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
484     queue_node q1(follows(br));
485     static_assert(std::is_same_v<decltype(q1), queue_node<int>>);
486 
487     queue_node q2(precedes(br));
488     static_assert(std::is_same_v<decltype(q2), queue_node<int>>);
489 #endif
490 
491     queue_node q3(q0);
492     static_assert(std::is_same_v<decltype(q3), queue_node<int>>);
493     g.wait_for_all();
494 }
495 #endif
496 
497 //! Test serial, parallel behavior and reservation under parallelism
498 //! \brief \ref requirement \ref error_guessing
499 TEST_CASE("Parallel, serial test"){
500     for (int p = 2; p <= 4; ++p) {
501         tbb::global_control thread_limit(tbb::global_control::max_allowed_parallelism, p);
502         tbb::task_arena arena(p);
503         arena.execute(
__anon9007ffb40202() 504             [&]() {
505                 test_serial<int>();
506                 test_serial<CheckType<int> >();
507                 test_parallel<int>(p);
508                 test_parallel<CheckType<int> >(p);
509             }
510         );
511 	}
512 }
513 
514 //! Test reset and cancellation
515 //! \brief \ref error_guessing
516 TEST_CASE("Resets test"){
517     INFO("Testing resets\n");
518     test_resets<int, tbb::flow::queue_node<int> >();
519     test_resets<float, tbb::flow::queue_node<float> >();
520 }
521 
522 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
523 //! Test follows and precedes API
524 //! \brief \ref error_guessing
525 TEST_CASE("Test follows and precedes API"){
526     test_follows_and_precedes_api();
527 }
528 #endif
529 
530 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
531 //! Test decution guides
532 //! \brief \ref requirement
533 TEST_CASE("Deduction guides"){
534     test_deduction_guides();
535 }
536 #endif
537 
538 //! Test operations on a reserved queue_node
539 //! \brief \ref error_guessing
540 TEST_CASE("queue_node with reservation"){
541     tbb::flow::graph g;
542 
543     tbb::flow::queue_node<int> q(g);
544 
545     bool res = q.try_put(42);
546     CHECK_MESSAGE( res, "queue_node must accept input." );
547 
548     int val = 1;
549     res = q.try_reserve(val);
550     CHECK_MESSAGE( res, "queue_node must reserve as it has an item." );
551     CHECK_MESSAGE( (val == 42), "queue_node must reserve once passed item." );
552 
553     int out_arg = -1;
554     CHECK_MESSAGE((q.try_reserve(out_arg) == false), "Reserving a reserved node should fail.");
555     CHECK_MESSAGE((out_arg == -1), "Reserving a reserved node should not update its argument.");
556 
557     out_arg = -1;
558     CHECK_MESSAGE((q.try_get(out_arg) == false), "Getting from reserved node should fail.");
559     CHECK_MESSAGE((out_arg == -1), "Getting from reserved node should not update its argument.");
560     g.wait_for_all();
561 }
562