xref: /oneTBB/test/tbb/test_queue_node.cpp (revision 6caecf96)
1 /*
2     Copyright (c) 2005-2021 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 // TODO: Add overlapping put / receive tests
18 
19 #include "common/config.h"
20 
21 #include "tbb/flow_graph.h"
22 
23 #include "common/test.h"
24 #include "common/utils.h"
25 #include "common/utils_assert.h"
26 #include "common/checktype.h"
27 #include "common/graph_utils.h"
28 #include "common/test_follows_and_precedes_api.h"
29 
30 #include <cstdio>
31 
32 
33 //! \file test_queue_node.cpp
34 //! \brief Test for [flow_graph.queue_node] specification
35 
36 
37 #define N 1000
38 #define C 10
39 
40 template< typename T >
41 void spin_try_get( tbb::flow::queue_node<T> &q, T &value ) {
42     int count = 0;
43     while ( q.try_get(value) != true ) {
44         if (count < 1000000) {
45             ++count;
46         }
47         if (count == 1000000) {
48             // Perhaps, we observe the missed wakeup. Enqueue a task to wake up threads.
49             tbb::task_arena a(tbb::task_arena::attach{});
50             a.enqueue([]{});
51             ++count;
52         }
53     }
54 }
55 
56 template< typename T >
57 void check_item( T* next_value, T &value ) {
58     int tid = value / N;
59     int offset = value % N;
60     CHECK_MESSAGE( next_value[tid] == T(offset), "" );
61     ++next_value[tid];
62 }
63 
64 template< typename T >
65 struct parallel_puts : utils::NoAssign {
66 
67     tbb::flow::queue_node<T> &my_q;
68 
69     parallel_puts( tbb::flow::queue_node<T> &q ) : my_q(q) {}
70 
71     void operator()(int i) const {
72         for (int j = 0; j < N; ++j) {
73             bool msg = my_q.try_put( T(N*i + j) );
74             CHECK_MESSAGE( msg == true, "" );
75         }
76     }
77 
78 };
79 
80 template< typename T >
81 struct touches {
82 
83     bool **my_touches;
84     T **my_last_touch;
85     int my_num_threads;
86 
87     touches( int num_threads ) : my_num_threads(num_threads) {
88         my_last_touch = new T* [my_num_threads];
89         my_touches = new bool* [my_num_threads];
90         for ( int p = 0; p < my_num_threads; ++p) {
91             my_last_touch[p] = new T[my_num_threads];
92             for ( int p2 = 0; p2 < my_num_threads; ++p2)
93                 my_last_touch[p][p2] = -1;
94 
95             my_touches[p] = new bool[N*my_num_threads];
96             for ( int n = 0; n < N*my_num_threads; ++n)
97                 my_touches[p][n] = false;
98         }
99     }
100 
101     ~touches() {
102         for ( int p = 0; p < my_num_threads; ++p) {
103             delete [] my_touches[p];
104             delete [] my_last_touch[p];
105         }
106         delete [] my_touches;
107         delete [] my_last_touch;
108     }
109 
110     bool check( int tid, T v ) {
111         int v_tid = v / N;
112         if ( my_touches[tid][v] != false ) {
113             printf("Error: value seen twice by local thread\n");
114             return false;
115         }
116         if ( v <= my_last_touch[tid][v_tid] ) {
117             printf("Error: value seen in wrong order by local thread\n");
118             return false;
119         }
120         my_last_touch[tid][v_tid] = v;
121         my_touches[tid][v] = true;
122         return true;
123     }
124 
125     bool validate_touches() {
126         bool *all_touches = new bool[N*my_num_threads];
127         for ( int n = 0; n < N*my_num_threads; ++n)
128             all_touches[n] = false;
129 
130         for ( int p = 0; p < my_num_threads; ++p) {
131             for ( int n = 0; n < N*my_num_threads; ++n) {
132                 if ( my_touches[p][n] == true ) {
133                     CHECK_MESSAGE( ( all_touches[n] == false), "value see by more than one thread\n" );
134                     all_touches[n] = true;
135                 }
136             }
137         }
138         for ( int n = 0; n < N*my_num_threads; ++n) {
139             if ( !all_touches[n] )
140                 printf("No touch at %d, my_num_threads = %d\n", n, my_num_threads);
141             //CHECK_MESSAGE( ( all_touches[n] == true), "value not seen by any thread\n" );
142         }
143         delete [] all_touches;
144         return true;
145     }
146 
147 };
148 
149 template< typename T >
150 struct parallel_gets : utils::NoAssign {
151 
152     tbb::flow::queue_node<T> &my_q;
153     touches<T> &my_touches;
154 
155     parallel_gets( tbb::flow::queue_node<T> &q, touches<T> &t) : my_q(q), my_touches(t) {}
156 
157     void operator()(int tid) const {
158         for (int j = 0; j < N; ++j) {
159             T v;
160             spin_try_get( my_q, v );
161             my_touches.check( tid, v );
162         }
163     }
164 
165 };
166 
167 template< typename T >
168 struct parallel_put_get : utils::NoAssign {
169 
170     tbb::flow::queue_node<T> &my_q;
171     touches<T> &my_touches;
172 
173     parallel_put_get( tbb::flow::queue_node<T> &q, touches<T> &t ) : my_q(q), my_touches(t) {}
174 
175     void operator()(int tid) const {
176 
177         for ( int i = 0; i < N; i+=C ) {
178             int j_end = ( N < i + C ) ? N : i + C;
179             // dump about C values into the Q
180             for ( int j = i; j < j_end; ++j ) {
181                 CHECK_MESSAGE( my_q.try_put( T (N*tid + j ) ) == true, "" );
182             }
183             // receiver about C values from the Q
184             for ( int j = i; j < j_end; ++j ) {
185                 T v;
186                 spin_try_get( my_q, v );
187                 my_touches.check( tid, v );
188             }
189         }
190     }
191 
192 };
193 
194 //
195 // Tests
196 //
197 // Item can be reserved, released, consumed ( single serial receiver )
198 //
199 template< typename T >
200 int test_reservation() {
201     tbb::flow::graph g;
202     T bogus_value(-1);
203 
204     // Simple tests
205     tbb::flow::queue_node<T> q(g);
206 
207     q.try_put(T(1));
208     q.try_put(T(2));
209     q.try_put(T(3));
210 
211     T v;
212     CHECK_MESSAGE( q.reserve_item(v) == true, "" );
213     CHECK_MESSAGE( v == T(1), "" );
214     CHECK_MESSAGE( q.release_reservation() == true, "" );
215     v = bogus_value;
216     g.wait_for_all();
217     CHECK_MESSAGE( q.reserve_item(v) == true, "" );
218     CHECK_MESSAGE( v == T(1), "" );
219     CHECK_MESSAGE( q.consume_reservation() == true, "" );
220     v = bogus_value;
221     g.wait_for_all();
222 
223     CHECK_MESSAGE( q.try_get(v) == true, "" );
224     CHECK_MESSAGE( v == T(2), "" );
225     v = bogus_value;
226     g.wait_for_all();
227 
228     CHECK_MESSAGE( q.reserve_item(v) == true, "" );
229     CHECK_MESSAGE( v == T(3), "" );
230     CHECK_MESSAGE( q.release_reservation() == true, "" );
231     v = bogus_value;
232     g.wait_for_all();
233     CHECK_MESSAGE( q.reserve_item(v) == true, "" );
234     CHECK_MESSAGE( v == T(3), "" );
235     CHECK_MESSAGE( q.consume_reservation() == true, "" );
236     v = bogus_value;
237     g.wait_for_all();
238 
239     return 0;
240 }
241 
242 //
243 // Tests
244 //
245 // multiple parallel senders, items in FIFO (relatively to sender) order
246 // multiple parallel senders, multiple parallel receivers, items in FIFO order (relative to sender/receiver) and all items received
247 //   * overlapped puts / gets
248 //   * all puts finished before any getS
249 //
250 template< typename T >
251 int test_parallel(int num_threads) {
252     tbb::flow::graph g;
253     tbb::flow::queue_node<T> q(g);
254     tbb::flow::queue_node<T> q2(g);
255     tbb::flow::queue_node<T> q3(g);
256     {
257         Checker< T > my_check;
258         T bogus_value(-1);
259         T j = bogus_value;
260         utils::NativeParallelFor( num_threads, parallel_puts<T>(q) );
261 
262         T *next_value = new T[num_threads];
263         for (int tid = 0; tid < num_threads; ++tid) next_value[tid] = T(0);
264 
265         for (int i = 0; i < num_threads * N; ++i ) {
266             spin_try_get( q, j );
267             check_item( next_value, j );
268             j = bogus_value;
269         }
270         for (int tid = 0; tid < num_threads; ++tid)  {
271             CHECK_MESSAGE( next_value[tid] == T(N), "" );
272         }
273         delete[] next_value;
274 
275         j = bogus_value;
276         g.wait_for_all();
277         CHECK_MESSAGE( q.try_get( j ) == false, "" );
278         CHECK_MESSAGE( j == bogus_value, "" );
279 
280         utils::NativeParallelFor( num_threads, parallel_puts<T>(q) );
281 
282         {
283             touches< T > t( num_threads );
284             utils::NativeParallelFor( num_threads, parallel_gets<T>(q, t) );
285             g.wait_for_all();
286             CHECK_MESSAGE( t.validate_touches(), "" );
287         }
288         j = bogus_value;
289         CHECK_MESSAGE( q.try_get( j ) == false, "" );
290         CHECK_MESSAGE( j == bogus_value, "" );
291 
292         g.wait_for_all();
293         {
294             touches< T > t2( num_threads );
295             utils::NativeParallelFor( num_threads, parallel_put_get<T>(q, t2) );
296             g.wait_for_all();
297             CHECK_MESSAGE( t2.validate_touches(), "" );
298         }
299         j = bogus_value;
300         CHECK_MESSAGE( q.try_get( j ) == false, "" );
301         CHECK_MESSAGE( j == bogus_value, "" );
302 
303         tbb::flow::make_edge( q, q2 );
304         tbb::flow::make_edge( q2, q3 );
305 
306         utils::NativeParallelFor( num_threads, parallel_puts<T>(q) );
307         {
308             touches< T > t3( num_threads );
309             utils::NativeParallelFor( num_threads, parallel_gets<T>(q3, t3) );
310             g.wait_for_all();
311             CHECK_MESSAGE( t3.validate_touches(), "" );
312         }
313         j = bogus_value;
314         g.wait_for_all();
315         CHECK_MESSAGE( q.try_get( j ) == false, "" );
316         g.wait_for_all();
317         CHECK_MESSAGE( q2.try_get( j ) == false, "" );
318         g.wait_for_all();
319         CHECK_MESSAGE( q3.try_get( j ) == false, "" );
320         CHECK_MESSAGE( j == bogus_value, "" );
321 
322         // test copy constructor
323         CHECK_MESSAGE( remove_successor( q, q2 ), "" );
324         utils::NativeParallelFor( num_threads, parallel_puts<T>(q) );
325         tbb::flow::queue_node<T> q_copy(q);
326         j = bogus_value;
327         g.wait_for_all();
328         CHECK_MESSAGE( q_copy.try_get( j ) == false, "" );
329         CHECK_MESSAGE( register_successor( q, q_copy ) == true, "" );
330         {
331             touches< T > t( num_threads );
332             utils::NativeParallelFor( num_threads, parallel_gets<T>(q_copy, t) );
333             g.wait_for_all();
334             CHECK_MESSAGE( t.validate_touches(), "" );
335         }
336         j = bogus_value;
337         CHECK_MESSAGE( q.try_get( j ) == false, "" );
338         CHECK_MESSAGE( j == bogus_value, "" );
339         CHECK_MESSAGE( q_copy.try_get( j ) == false, "" );
340         CHECK_MESSAGE( j == bogus_value, "" );
341     }
342 
343     return 0;
344 }
345 
346 //
347 // Tests
348 //
349 // Predecessors cannot be registered
350 // Empty Q rejects item requests
351 // Single serial sender, items in FIFO order
352 // Chained Qs ( 2 & 3 ), single sender, items at last Q in FIFO order
353 //
354 
355 template< typename T >
356 int test_serial() {
357     tbb::flow::graph g;
358     tbb::flow::queue_node<T> q(g);
359     tbb::flow::queue_node<T> q2(g);
360     {   // destroy the graph after manipulating it, and see if all the items in the buffers
361         // have been destroyed before the graph
362         Checker<T> my_check;  // if CheckType< U > count constructions and destructions
363         T bogus_value(-1);
364         T j = bogus_value;
365 
366         //
367         // Rejects attempts to add / remove predecessor
368         // Rejects request from empty Q
369         //
370         CHECK_MESSAGE( register_predecessor( q, q2 ) == false, "" );
371         CHECK_MESSAGE( remove_predecessor( q, q2 ) == false, "" );
372         CHECK_MESSAGE( q.try_get( j ) == false, "" );
373         CHECK_MESSAGE( j == bogus_value, "" );
374 
375         //
376         // Simple puts and gets
377         //
378 
379         for (int i = 0; i < N; ++i) {
380             bool msg = q.try_put( T(i) );
381             CHECK_MESSAGE( msg == true, "" );
382         }
383 
384 
385         for (int i = 0; i < N; ++i) {
386             j = bogus_value;
387             spin_try_get( q, j );
388             CHECK_MESSAGE( i == j, "" );
389         }
390         j = bogus_value;
391         g.wait_for_all();
392         CHECK_MESSAGE( q.try_get( j ) == false, "" );
393         CHECK_MESSAGE( j == bogus_value, "" );
394 
395         tbb::flow::make_edge( q, q2 );
396 
397         for (int i = 0; i < N; ++i) {
398             bool msg = q.try_put( T(i) );
399             CHECK_MESSAGE( msg == true, "" );
400         }
401 
402 
403         for (int i = 0; i < N; ++i) {
404             j = bogus_value;
405             spin_try_get( q2, j );
406             CHECK_MESSAGE( i == j, "" );
407         }
408         j = bogus_value;
409         g.wait_for_all();
410         CHECK_MESSAGE( q.try_get( j ) == false, "" );
411         g.wait_for_all();
412         CHECK_MESSAGE( q2.try_get( j ) == false, "" );
413         CHECK_MESSAGE( j == bogus_value, "" );
414 
415         tbb::flow::remove_edge( q, q2 );
416         CHECK_MESSAGE( q.try_put( 1 ) == true, "" );
417         g.wait_for_all();
418         CHECK_MESSAGE( q2.try_get( j ) == false, "" );
419         CHECK_MESSAGE( j == bogus_value, "" );
420         g.wait_for_all();
421         CHECK_MESSAGE( q.try_get( j ) == true, "" );
422         CHECK_MESSAGE( j == 1, "" );
423 
424         tbb::flow::queue_node<T> q3(g);
425         tbb::flow::make_edge( q, q2 );
426         tbb::flow::make_edge( q2, q3 );
427 
428         for (int i = 0; i < N; ++i) {
429             bool msg = q.try_put( T(i) );
430             CHECK_MESSAGE( msg == true, "" );
431         }
432 
433         for (int i = 0; i < N; ++i) {
434             j = bogus_value;
435             spin_try_get( q3, j );
436             CHECK_MESSAGE( i == j, "" );
437         }
438         j = bogus_value;
439         g.wait_for_all();
440         CHECK_MESSAGE( q.try_get( j ) == false, "" );
441         g.wait_for_all();
442         CHECK_MESSAGE( q2.try_get( j ) == false, "" );
443         g.wait_for_all();
444         CHECK_MESSAGE( q3.try_get( j ) == false, "" );
445         CHECK_MESSAGE( j == bogus_value, "" );
446 
447         tbb::flow::remove_edge( q,  q2 );
448         CHECK_MESSAGE( q.try_put( 1 ) == true, "" );
449         g.wait_for_all();
450         CHECK_MESSAGE( q2.try_get( j ) == false, "" );
451         CHECK_MESSAGE( j == bogus_value, "" );
452         g.wait_for_all();
453         CHECK_MESSAGE( q3.try_get( j ) == false, "" );
454         CHECK_MESSAGE( j == bogus_value, "" );
455         g.wait_for_all();
456         CHECK_MESSAGE( q.try_get( j ) == true, "" );
457         CHECK_MESSAGE( j == 1, "" );
458     }
459 
460     return 0;
461 }
462 
463 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
464 #include <array>
465 #include <vector>
466 void test_follows_and_precedes_api() {
467     std::array<int, 3> messages_for_follows = { {0, 1, 2} };
468     std::vector<int> messages_for_precedes = {0, 1, 2};
469 
470     follows_and_precedes_testing::test_follows <int, tbb::flow::queue_node<int>>(messages_for_follows);
471     follows_and_precedes_testing::test_precedes <int, tbb::flow::queue_node<int>>(messages_for_precedes);
472 }
473 #endif
474 
475 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
476 void test_deduction_guides() {
477     using namespace tbb::flow;
478     graph g;
479     broadcast_node<int> br(g);
480     queue_node<int> q0(g);
481 
482 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
483     queue_node q1(follows(br));
484     static_assert(std::is_same_v<decltype(q1), queue_node<int>>);
485 
486     queue_node q2(precedes(br));
487     static_assert(std::is_same_v<decltype(q2), queue_node<int>>);
488 #endif
489 
490     queue_node q3(q0);
491     static_assert(std::is_same_v<decltype(q3), queue_node<int>>);
492     g.wait_for_all();
493 }
494 #endif
495 
496 //! Test serial, parallel behavior and reservation under parallelism
497 //! \brief \ref requirement \ref error_guessing
498 TEST_CASE("Parallel, serial test"){
499     for (int p = 2; p <= 4; ++p) {
500         tbb::task_arena arena(p);
501         arena.execute(
502             [&]() {
503                 test_serial<int>();
504                 test_serial<CheckType<int> >();
505                 test_parallel<int>(p);
506                 test_parallel<CheckType<int> >(p);
507             }
508         );
509 	}
510 }
511 
512 //! Test reset and cancellation
513 //! \brief \ref error_guessing
514 TEST_CASE("Resets test"){
515     INFO("Testing resets\n");
516     test_resets<int, tbb::flow::queue_node<int> >();
517     test_resets<float, tbb::flow::queue_node<float> >();
518 }
519 
520 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
521 //! Test follows and precedes API
522 //! \brief \ref error_guessing
523 TEST_CASE("Test follows and precedes API"){
524     test_follows_and_precedes_api();
525 }
526 #endif
527 
528 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
529 //! Test decution guides
530 //! \brief \ref requirement
531 TEST_CASE("Deduction guides"){
532     test_deduction_guides();
533 }
534 #endif
535 
536 //! Test operations on a reserved queue_node
537 //! \brief \ref error_guessing
538 TEST_CASE("queue_node with reservation"){
539     tbb::flow::graph g;
540 
541     tbb::flow::queue_node<int> q(g);
542 
543     bool res = q.try_put(42);
544     CHECK_MESSAGE( res, "queue_node must accept input." );
545 
546     int val = 1;
547     res = q.try_reserve(val);
548     CHECK_MESSAGE( res, "queue_node must reserve as it has an item." );
549     CHECK_MESSAGE( (val == 42), "queue_node must reserve once passed item." );
550 
551     int out_arg = -1;
552     CHECK_MESSAGE((q.try_reserve(out_arg) == false), "Reserving a reserved node should fail.");
553     CHECK_MESSAGE((out_arg == -1), "Reserving a reserved node should not update its argument.");
554 
555     out_arg = -1;
556     CHECK_MESSAGE((q.try_get(out_arg) == false), "Getting from reserved node should fail.");
557     CHECK_MESSAGE((out_arg == -1), "Getting from reserved node should not update its argument.");
558     g.wait_for_all();
559 }
560