xref: /oneTBB/test/tbb/test_queue_node.cpp (revision b15aabb3)
1 /*
2     Copyright (c) 2005-2021 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 // TODO: Add overlapping put / receive tests
18 
19 #include "common/config.h"
20 
21 // TODO revamp: move parts dependent on __TBB_EXTRA_DEBUG into separate test(s) since having these
22 // parts in all of tests might make testing of the product, which is different from what is actually
23 // released.
24 #define __TBB_EXTRA_DEBUG 1
25 #include "tbb/flow_graph.h"
26 
27 #include "common/test.h"
28 #include "common/utils.h"
29 #include "common/utils_assert.h"
30 #include "common/checktype.h"
31 #include "common/graph_utils.h"
32 #include "common/test_follows_and_precedes_api.h"
33 
34 #include <cstdio>
35 
36 
37 //! \file test_queue_node.cpp
38 //! \brief Test for [flow_graph.queue_node] specification
39 
40 
41 #define N 1000
42 #define C 10
43 
44 template< typename T >
45 void spin_try_get( tbb::flow::queue_node<T> &q, T &value ) {
46     int count = 0;
47     while ( q.try_get(value) != true ) {
48         if (count < 1000000) {
49             ++count;
50         }
51         if (count == 1000000) {
52             // Perhaps, we observe the missed wakeup. Enqueue a task to wake up threads.
53             tbb::task_arena a(tbb::task_arena::attach{});
54             a.enqueue([]{});
55             ++count;
56         }
57     }
58 }
59 
60 template< typename T >
61 void check_item( T* next_value, T &value ) {
62     int tid = value / N;
63     int offset = value % N;
64     CHECK_MESSAGE( next_value[tid] == T(offset), "" );
65     ++next_value[tid];
66 }
67 
68 template< typename T >
69 struct parallel_puts : utils::NoAssign {
70 
71     tbb::flow::queue_node<T> &my_q;
72 
73     parallel_puts( tbb::flow::queue_node<T> &q ) : my_q(q) {}
74 
75     void operator()(int i) const {
76         for (int j = 0; j < N; ++j) {
77             bool msg = my_q.try_put( T(N*i + j) );
78             CHECK_MESSAGE( msg == true, "" );
79         }
80     }
81 
82 };
83 
84 template< typename T >
85 struct touches {
86 
87     bool **my_touches;
88     T **my_last_touch;
89     int my_num_threads;
90 
91     touches( int num_threads ) : my_num_threads(num_threads) {
92         my_last_touch = new T* [my_num_threads];
93         my_touches = new bool* [my_num_threads];
94         for ( int p = 0; p < my_num_threads; ++p) {
95             my_last_touch[p] = new T[my_num_threads];
96             for ( int p2 = 0; p2 < my_num_threads; ++p2)
97                 my_last_touch[p][p2] = -1;
98 
99             my_touches[p] = new bool[N*my_num_threads];
100             for ( int n = 0; n < N*my_num_threads; ++n)
101                 my_touches[p][n] = false;
102         }
103     }
104 
105     ~touches() {
106         for ( int p = 0; p < my_num_threads; ++p) {
107             delete [] my_touches[p];
108             delete [] my_last_touch[p];
109         }
110         delete [] my_touches;
111         delete [] my_last_touch;
112     }
113 
114     bool check( int tid, T v ) {
115         int v_tid = v / N;
116         if ( my_touches[tid][v] != false ) {
117             printf("Error: value seen twice by local thread\n");
118             return false;
119         }
120         if ( v <= my_last_touch[tid][v_tid] ) {
121             printf("Error: value seen in wrong order by local thread\n");
122             return false;
123         }
124         my_last_touch[tid][v_tid] = v;
125         my_touches[tid][v] = true;
126         return true;
127     }
128 
129     bool validate_touches() {
130         bool *all_touches = new bool[N*my_num_threads];
131         for ( int n = 0; n < N*my_num_threads; ++n)
132             all_touches[n] = false;
133 
134         for ( int p = 0; p < my_num_threads; ++p) {
135             for ( int n = 0; n < N*my_num_threads; ++n) {
136                 if ( my_touches[p][n] == true ) {
137                     CHECK_MESSAGE( ( all_touches[n] == false), "value see by more than one thread\n" );
138                     all_touches[n] = true;
139                 }
140             }
141         }
142         for ( int n = 0; n < N*my_num_threads; ++n) {
143             if ( !all_touches[n] )
144                 printf("No touch at %d, my_num_threads = %d\n", n, my_num_threads);
145             //CHECK_MESSAGE( ( all_touches[n] == true), "value not seen by any thread\n" );
146         }
147         delete [] all_touches;
148         return true;
149     }
150 
151 };
152 
153 template< typename T >
154 struct parallel_gets : utils::NoAssign {
155 
156     tbb::flow::queue_node<T> &my_q;
157     touches<T> &my_touches;
158 
159     parallel_gets( tbb::flow::queue_node<T> &q, touches<T> &t) : my_q(q), my_touches(t) {}
160 
161     void operator()(int tid) const {
162         for (int j = 0; j < N; ++j) {
163             T v;
164             spin_try_get( my_q, v );
165             my_touches.check( tid, v );
166         }
167     }
168 
169 };
170 
171 template< typename T >
172 struct parallel_put_get : utils::NoAssign {
173 
174     tbb::flow::queue_node<T> &my_q;
175     touches<T> &my_touches;
176 
177     parallel_put_get( tbb::flow::queue_node<T> &q, touches<T> &t ) : my_q(q), my_touches(t) {}
178 
179     void operator()(int tid) const {
180 
181         for ( int i = 0; i < N; i+=C ) {
182             int j_end = ( N < i + C ) ? N : i + C;
183             // dump about C values into the Q
184             for ( int j = i; j < j_end; ++j ) {
185                 CHECK_MESSAGE( my_q.try_put( T (N*tid + j ) ) == true, "" );
186             }
187             // receiver about C values from the Q
188             for ( int j = i; j < j_end; ++j ) {
189                 T v;
190                 spin_try_get( my_q, v );
191                 my_touches.check( tid, v );
192             }
193         }
194     }
195 
196 };
197 
198 //
199 // Tests
200 //
201 // Item can be reserved, released, consumed ( single serial receiver )
202 //
203 template< typename T >
204 int test_reservation() {
205     tbb::flow::graph g;
206     T bogus_value(-1);
207 
208     // Simple tests
209     tbb::flow::queue_node<T> q(g);
210 
211     q.try_put(T(1));
212     q.try_put(T(2));
213     q.try_put(T(3));
214 
215     T v;
216     CHECK_MESSAGE( q.reserve_item(v) == true, "" );
217     CHECK_MESSAGE( v == T(1), "" );
218     CHECK_MESSAGE( q.release_reservation() == true, "" );
219     v = bogus_value;
220     g.wait_for_all();
221     CHECK_MESSAGE( q.reserve_item(v) == true, "" );
222     CHECK_MESSAGE( v == T(1), "" );
223     CHECK_MESSAGE( q.consume_reservation() == true, "" );
224     v = bogus_value;
225     g.wait_for_all();
226 
227     CHECK_MESSAGE( q.try_get(v) == true, "" );
228     CHECK_MESSAGE( v == T(2), "" );
229     v = bogus_value;
230     g.wait_for_all();
231 
232     CHECK_MESSAGE( q.reserve_item(v) == true, "" );
233     CHECK_MESSAGE( v == T(3), "" );
234     CHECK_MESSAGE( q.release_reservation() == true, "" );
235     v = bogus_value;
236     g.wait_for_all();
237     CHECK_MESSAGE( q.reserve_item(v) == true, "" );
238     CHECK_MESSAGE( v == T(3), "" );
239     CHECK_MESSAGE( q.consume_reservation() == true, "" );
240     v = bogus_value;
241     g.wait_for_all();
242 
243     return 0;
244 }
245 
246 //
247 // Tests
248 //
249 // multiple parallel senders, items in FIFO (relatively to sender) order
250 // multiple parallel senders, multiple parallel receivers, items in FIFO order (relative to sender/receiver) and all items received
251 //   * overlapped puts / gets
252 //   * all puts finished before any getS
253 //
254 template< typename T >
255 int test_parallel(int num_threads) {
256     tbb::flow::graph g;
257     tbb::flow::queue_node<T> q(g);
258     tbb::flow::queue_node<T> q2(g);
259     tbb::flow::queue_node<T> q3(g);
260     {
261         Checker< T > my_check;
262         T bogus_value(-1);
263         T j = bogus_value;
264         utils::NativeParallelFor( num_threads, parallel_puts<T>(q) );
265 
266         T *next_value = new T[num_threads];
267         for (int tid = 0; tid < num_threads; ++tid) next_value[tid] = T(0);
268 
269         for (int i = 0; i < num_threads * N; ++i ) {
270             spin_try_get( q, j );
271             check_item( next_value, j );
272             j = bogus_value;
273         }
274         for (int tid = 0; tid < num_threads; ++tid)  {
275             CHECK_MESSAGE( next_value[tid] == T(N), "" );
276         }
277         delete[] next_value;
278 
279         j = bogus_value;
280         g.wait_for_all();
281         CHECK_MESSAGE( q.try_get( j ) == false, "" );
282         CHECK_MESSAGE( j == bogus_value, "" );
283 
284         utils::NativeParallelFor( num_threads, parallel_puts<T>(q) );
285 
286         {
287             touches< T > t( num_threads );
288             utils::NativeParallelFor( num_threads, parallel_gets<T>(q, t) );
289             g.wait_for_all();
290             CHECK_MESSAGE( t.validate_touches(), "" );
291         }
292         j = bogus_value;
293         CHECK_MESSAGE( q.try_get( j ) == false, "" );
294         CHECK_MESSAGE( j == bogus_value, "" );
295 
296         g.wait_for_all();
297         {
298             touches< T > t2( num_threads );
299             utils::NativeParallelFor( num_threads, parallel_put_get<T>(q, t2) );
300             g.wait_for_all();
301             CHECK_MESSAGE( t2.validate_touches(), "" );
302         }
303         j = bogus_value;
304         CHECK_MESSAGE( q.try_get( j ) == false, "" );
305         CHECK_MESSAGE( j == bogus_value, "" );
306 
307         tbb::flow::make_edge( q, q2 );
308         tbb::flow::make_edge( q2, q3 );
309 
310         utils::NativeParallelFor( num_threads, parallel_puts<T>(q) );
311         {
312             touches< T > t3( num_threads );
313             utils::NativeParallelFor( num_threads, parallel_gets<T>(q3, t3) );
314             g.wait_for_all();
315             CHECK_MESSAGE( t3.validate_touches(), "" );
316         }
317         j = bogus_value;
318         g.wait_for_all();
319         CHECK_MESSAGE( q.try_get( j ) == false, "" );
320         g.wait_for_all();
321         CHECK_MESSAGE( q2.try_get( j ) == false, "" );
322         g.wait_for_all();
323         CHECK_MESSAGE( q3.try_get( j ) == false, "" );
324         CHECK_MESSAGE( j == bogus_value, "" );
325 
326         // test copy constructor
327         CHECK_MESSAGE( remove_successor( q, q2 ), "" );
328         utils::NativeParallelFor( num_threads, parallel_puts<T>(q) );
329         tbb::flow::queue_node<T> q_copy(q);
330         j = bogus_value;
331         g.wait_for_all();
332         CHECK_MESSAGE( q_copy.try_get( j ) == false, "" );
333         CHECK_MESSAGE( register_successor( q, q_copy ) == true, "" );
334         {
335             touches< T > t( num_threads );
336             utils::NativeParallelFor( num_threads, parallel_gets<T>(q_copy, t) );
337             g.wait_for_all();
338             CHECK_MESSAGE( t.validate_touches(), "" );
339         }
340         j = bogus_value;
341         CHECK_MESSAGE( q.try_get( j ) == false, "" );
342         CHECK_MESSAGE( j == bogus_value, "" );
343         CHECK_MESSAGE( q_copy.try_get( j ) == false, "" );
344         CHECK_MESSAGE( j == bogus_value, "" );
345     }
346 
347     return 0;
348 }
349 
350 //
351 // Tests
352 //
353 // Predecessors cannot be registered
354 // Empty Q rejects item requests
355 // Single serial sender, items in FIFO order
356 // Chained Qs ( 2 & 3 ), single sender, items at last Q in FIFO order
357 //
358 
359 template< typename T >
360 int test_serial() {
361     tbb::flow::graph g;
362     tbb::flow::queue_node<T> q(g);
363     tbb::flow::queue_node<T> q2(g);
364     {   // destroy the graph after manipulating it, and see if all the items in the buffers
365         // have been destroyed before the graph
366         Checker<T> my_check;  // if CheckType< U > count constructions and destructions
367         T bogus_value(-1);
368         T j = bogus_value;
369 
370         //
371         // Rejects attempts to add / remove predecessor
372         // Rejects request from empty Q
373         //
374         CHECK_MESSAGE( register_predecessor( q, q2 ) == false, "" );
375         CHECK_MESSAGE( remove_predecessor( q, q2 ) == false, "" );
376         CHECK_MESSAGE( q.try_get( j ) == false, "" );
377         CHECK_MESSAGE( j == bogus_value, "" );
378 
379         //
380         // Simple puts and gets
381         //
382 
383         for (int i = 0; i < N; ++i) {
384             bool msg = q.try_put( T(i) );
385             CHECK_MESSAGE( msg == true, "" );
386         }
387 
388 
389         for (int i = 0; i < N; ++i) {
390             j = bogus_value;
391             spin_try_get( q, j );
392             CHECK_MESSAGE( i == j, "" );
393         }
394         j = bogus_value;
395         g.wait_for_all();
396         CHECK_MESSAGE( q.try_get( j ) == false, "" );
397         CHECK_MESSAGE( j == bogus_value, "" );
398 
399         tbb::flow::make_edge( q, q2 );
400 
401         for (int i = 0; i < N; ++i) {
402             bool msg = q.try_put( T(i) );
403             CHECK_MESSAGE( msg == true, "" );
404         }
405 
406 
407         for (int i = 0; i < N; ++i) {
408             j = bogus_value;
409             spin_try_get( q2, j );
410             CHECK_MESSAGE( i == j, "" );
411         }
412         j = bogus_value;
413         g.wait_for_all();
414         CHECK_MESSAGE( q.try_get( j ) == false, "" );
415         g.wait_for_all();
416         CHECK_MESSAGE( q2.try_get( j ) == false, "" );
417         CHECK_MESSAGE( j == bogus_value, "" );
418 
419         tbb::flow::remove_edge( q, q2 );
420         CHECK_MESSAGE( q.try_put( 1 ) == true, "" );
421         g.wait_for_all();
422         CHECK_MESSAGE( q2.try_get( j ) == false, "" );
423         CHECK_MESSAGE( j == bogus_value, "" );
424         g.wait_for_all();
425         CHECK_MESSAGE( q.try_get( j ) == true, "" );
426         CHECK_MESSAGE( j == 1, "" );
427 
428         tbb::flow::queue_node<T> q3(g);
429         tbb::flow::make_edge( q, q2 );
430         tbb::flow::make_edge( q2, q3 );
431 
432         for (int i = 0; i < N; ++i) {
433             bool msg = q.try_put( T(i) );
434             CHECK_MESSAGE( msg == true, "" );
435         }
436 
437         for (int i = 0; i < N; ++i) {
438             j = bogus_value;
439             spin_try_get( q3, j );
440             CHECK_MESSAGE( i == j, "" );
441         }
442         j = bogus_value;
443         g.wait_for_all();
444         CHECK_MESSAGE( q.try_get( j ) == false, "" );
445         g.wait_for_all();
446         CHECK_MESSAGE( q2.try_get( j ) == false, "" );
447         g.wait_for_all();
448         CHECK_MESSAGE( q3.try_get( j ) == false, "" );
449         CHECK_MESSAGE( j == bogus_value, "" );
450 
451         tbb::flow::remove_edge( q,  q2 );
452         CHECK_MESSAGE( q.try_put( 1 ) == true, "" );
453         g.wait_for_all();
454         CHECK_MESSAGE( q2.try_get( j ) == false, "" );
455         CHECK_MESSAGE( j == bogus_value, "" );
456         g.wait_for_all();
457         CHECK_MESSAGE( q3.try_get( j ) == false, "" );
458         CHECK_MESSAGE( j == bogus_value, "" );
459         g.wait_for_all();
460         CHECK_MESSAGE( q.try_get( j ) == true, "" );
461         CHECK_MESSAGE( j == 1, "" );
462     }
463 
464     return 0;
465 }
466 
467 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
468 #include <array>
469 #include <vector>
470 void test_follows_and_precedes_api() {
471     std::array<int, 3> messages_for_follows = { {0, 1, 2} };
472     std::vector<int> messages_for_precedes = {0, 1, 2};
473 
474     follows_and_precedes_testing::test_follows <int, tbb::flow::queue_node<int>>(messages_for_follows);
475     follows_and_precedes_testing::test_precedes <int, tbb::flow::queue_node<int>>(messages_for_precedes);
476 }
477 #endif
478 
479 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
480 void test_deduction_guides() {
481     using namespace tbb::flow;
482     graph g;
483     broadcast_node<int> br(g);
484     queue_node<int> q0(g);
485 
486 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
487     queue_node q1(follows(br));
488     static_assert(std::is_same_v<decltype(q1), queue_node<int>>);
489 
490     queue_node q2(precedes(br));
491     static_assert(std::is_same_v<decltype(q2), queue_node<int>>);
492 #endif
493 
494     queue_node q3(q0);
495     static_assert(std::is_same_v<decltype(q3), queue_node<int>>);
496     g.wait_for_all();
497 }
498 #endif
499 
500 //! Test serial, parallel behavior and reservation under parallelism
501 //! \brief \ref requirement \ref error_guessing
502 TEST_CASE("Parallel, serial test"){
503     for (int p = 2; p <= 4; ++p) {
504         tbb::task_arena arena(p);
505         arena.execute(
506             [&]() {
507                 test_serial<int>();
508                 test_serial<CheckType<int> >();
509                 test_parallel<int>(p);
510                 test_parallel<CheckType<int> >(p);
511             }
512         );
513 	}
514 }
515 
516 //! Test reset and cancellation
517 //! \brief \ref error_guessing
518 TEST_CASE("Resets test"){
519     INFO("Testing resets\n");
520     test_resets<int, tbb::flow::queue_node<int> >();
521     test_resets<float, tbb::flow::queue_node<float> >();
522 }
523 
524 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
525 //! Test follows and precedes API
526 //! \brief \ref error_guessing
527 TEST_CASE("Test follows and precedes API"){
528     test_follows_and_precedes_api();
529 }
530 #endif
531 
532 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
533 //! Test decution guides
534 //! \brief \ref requirement
535 TEST_CASE("Deduction guides"){
536     test_deduction_guides();
537 }
538 #endif
539 
540 //! Test operations on a reserved queue_node
541 //! \brief \ref error_guessing
542 TEST_CASE("queue_node with reservation"){
543     tbb::flow::graph g;
544 
545     tbb::flow::queue_node<int> q(g);
546 
547     bool res = q.try_put(42);
548     CHECK_MESSAGE( res, "queue_node must accept input." );
549 
550     int val = 1;
551     res = q.try_reserve(val);
552     CHECK_MESSAGE( res, "queue_node must reserve as it has an item." );
553     CHECK_MESSAGE( (val == 42), "queue_node must reserve once passed item." );
554 
555     int out_arg = -1;
556     CHECK_MESSAGE((q.try_reserve(out_arg) == false), "Reserving a reserved node should fail.");
557     CHECK_MESSAGE((out_arg == -1), "Reserving a reserved node should not update its argument.");
558 
559     out_arg = -1;
560     CHECK_MESSAGE((q.try_get(out_arg) == false), "Getting from reserved node should fail.");
561     CHECK_MESSAGE((out_arg == -1), "Getting from reserved node should not update its argument.");
562     g.wait_for_all();
563 }
564