xref: /oneTBB/test/tbb/test_queue_node.cpp (revision 51c0b2f7)
1 /*
2     Copyright (c) 2005-2020 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 // TODO: Add overlapping put / receive tests
18 
19 #include "common/config.h"
20 
21 // TODO revamp: move parts dependent on __TBB_EXTRA_DEBUG into separate test(s) since having these
22 // parts in all of tests might make testing of the product, which is different from what is actually
23 // released.
24 #define __TBB_EXTRA_DEBUG 1
25 #include "tbb/flow_graph.h"
26 
27 #include "common/test.h"
28 #include "common/utils.h"
29 #include "common/utils_assert.h"
30 #include "common/checktype.h"
31 #include "common/graph_utils.h"
32 #include "common/test_follows_and_precedes_api.h"
33 
34 #include <cstdio>
35 
36 
37 //! \file test_queue_node.cpp
38 //! \brief Test for [flow_graph.queue_node] specification
39 
40 
41 #define N 1000
42 #define C 10
43 
44 template< typename T >
45 void spin_try_get( tbb::flow::queue_node<T> &q, T &value ) {
46     while ( q.try_get(value) != true ) ;
47 }
48 
49 template< typename T >
50 void check_item( T* next_value, T &value ) {
51     int tid = value / N;
52     int offset = value % N;
53     CHECK_MESSAGE( next_value[tid] == T(offset), "" );
54     ++next_value[tid];
55 }
56 
57 template< typename T >
58 struct parallel_puts : utils::NoAssign {
59 
60     tbb::flow::queue_node<T> &my_q;
61 
62     parallel_puts( tbb::flow::queue_node<T> &q ) : my_q(q) {}
63 
64     void operator()(int i) const {
65         for (int j = 0; j < N; ++j) {
66             bool msg = my_q.try_put( T(N*i + j) );
67             CHECK_MESSAGE( msg == true, "" );
68         }
69     }
70 
71 };
72 
73 
74 
75 template< typename T >
76 struct touches {
77 
78     bool **my_touches;
79     T **my_last_touch;
80     int my_num_threads;
81 
82     touches( int num_threads ) : my_num_threads(num_threads) {
83         my_last_touch = new T* [my_num_threads];
84         my_touches = new bool* [my_num_threads];
85         for ( int p = 0; p < my_num_threads; ++p) {
86             my_last_touch[p] = new T[my_num_threads];
87             for ( int p2 = 0; p2 < my_num_threads; ++p2)
88                 my_last_touch[p][p2] = -1;
89 
90             my_touches[p] = new bool[N*my_num_threads];
91             for ( int n = 0; n < N*my_num_threads; ++n)
92                 my_touches[p][n] = false;
93         }
94     }
95 
96     ~touches() {
97         for ( int p = 0; p < my_num_threads; ++p) {
98             delete [] my_touches[p];
99             delete [] my_last_touch[p];
100         }
101         delete [] my_touches;
102         delete [] my_last_touch;
103     }
104 
105     bool check( int tid, T v ) {
106         int v_tid = v / N;
107         if ( my_touches[tid][v] != false ) {
108             printf("Error: value seen twice by local thread\n");
109             return false;
110         }
111         if ( v <= my_last_touch[tid][v_tid] ) {
112             printf("Error: value seen in wrong order by local thread\n");
113             return false;
114         }
115         my_last_touch[tid][v_tid] = v;
116         my_touches[tid][v] = true;
117         return true;
118     }
119 
120     bool validate_touches() {
121         bool *all_touches = new bool[N*my_num_threads];
122         for ( int n = 0; n < N*my_num_threads; ++n)
123             all_touches[n] = false;
124 
125         for ( int p = 0; p < my_num_threads; ++p) {
126             for ( int n = 0; n < N*my_num_threads; ++n) {
127                 if ( my_touches[p][n] == true ) {
128                     CHECK_MESSAGE( ( all_touches[n] == false), "value see by more than one thread\n" );
129                     all_touches[n] = true;
130                 }
131             }
132         }
133         for ( int n = 0; n < N*my_num_threads; ++n) {
134             if ( !all_touches[n] )
135                 printf("No touch at %d, my_num_threads = %d\n", n, my_num_threads);
136             //CHECK_MESSAGE( ( all_touches[n] == true), "value not seen by any thread\n" );
137         }
138         delete [] all_touches;
139         return true;
140     }
141 
142 };
143 
144 template< typename T >
145 struct parallel_gets : utils::NoAssign {
146 
147     tbb::flow::queue_node<T> &my_q;
148     touches<T> &my_touches;
149 
150     parallel_gets( tbb::flow::queue_node<T> &q, touches<T> &t) : my_q(q), my_touches(t) {}
151 
152     void operator()(int tid) const {
153         for (int j = 0; j < N; ++j) {
154             T v;
155             spin_try_get( my_q, v );
156             my_touches.check( tid, v );
157         }
158     }
159 
160 };
161 
162 template< typename T >
163 struct parallel_put_get : utils::NoAssign {
164 
165     tbb::flow::queue_node<T> &my_q;
166     touches<T> &my_touches;
167 
168     parallel_put_get( tbb::flow::queue_node<T> &q, touches<T> &t ) : my_q(q), my_touches(t) {}
169 
170     void operator()(int tid) const {
171 
172         for ( int i = 0; i < N; i+=C ) {
173             int j_end = ( N < i + C ) ? N : i + C;
174             // dump about C values into the Q
175             for ( int j = i; j < j_end; ++j ) {
176                 CHECK_MESSAGE( my_q.try_put( T (N*tid + j ) ) == true, "" );
177             }
178             // receiver about C values from the Q
179             for ( int j = i; j < j_end; ++j ) {
180                 T v;
181                 spin_try_get( my_q, v );
182                 my_touches.check( tid, v );
183             }
184         }
185     }
186 
187 };
188 
189 //
190 // Tests
191 //
192 // Item can be reserved, released, consumed ( single serial receiver )
193 //
194 template< typename T >
195 int test_reservation() {
196     tbb::flow::graph g;
197     T bogus_value(-1);
198 
199     // Simple tests
200     tbb::flow::queue_node<T> q(g);
201 
202     q.try_put(T(1));
203     q.try_put(T(2));
204     q.try_put(T(3));
205 
206     T v;
207     CHECK_MESSAGE( q.reserve_item(v) == true, "" );
208     CHECK_MESSAGE( v == T(1), "" );
209     CHECK_MESSAGE( q.release_reservation() == true, "" );
210     v = bogus_value;
211     g.wait_for_all();
212     CHECK_MESSAGE( q.reserve_item(v) == true, "" );
213     CHECK_MESSAGE( v == T(1), "" );
214     CHECK_MESSAGE( q.consume_reservation() == true, "" );
215     v = bogus_value;
216     g.wait_for_all();
217 
218     CHECK_MESSAGE( q.try_get(v) == true, "" );
219     CHECK_MESSAGE( v == T(2), "" );
220     v = bogus_value;
221     g.wait_for_all();
222 
223     CHECK_MESSAGE( q.reserve_item(v) == true, "" );
224     CHECK_MESSAGE( v == T(3), "" );
225     CHECK_MESSAGE( q.release_reservation() == true, "" );
226     v = bogus_value;
227     g.wait_for_all();
228     CHECK_MESSAGE( q.reserve_item(v) == true, "" );
229     CHECK_MESSAGE( v == T(3), "" );
230     CHECK_MESSAGE( q.consume_reservation() == true, "" );
231     v = bogus_value;
232     g.wait_for_all();
233 
234     return 0;
235 }
236 
237 //
238 // Tests
239 //
240 // multiple parallel senders, items in FIFO (relatively to sender) order
241 // multiple parallel senders, multiple parallel receivers, items in FIFO order (relative to sender/receiver) and all items received
242 //   * overlapped puts / gets
243 //   * all puts finished before any getS
244 //
245 template< typename T >
246 int test_parallel(int num_threads) {
247     tbb::flow::graph g;
248     tbb::flow::queue_node<T> q(g);
249     tbb::flow::queue_node<T> q2(g);
250     tbb::flow::queue_node<T> q3(g);
251     {
252         Checker< T > my_check;
253         T bogus_value(-1);
254         T j = bogus_value;
255         utils::NativeParallelFor( num_threads, parallel_puts<T>(q) );
256 
257         T *next_value = new T[num_threads];
258         for (int tid = 0; tid < num_threads; ++tid) next_value[tid] = T(0);
259 
260         for (int i = 0; i < num_threads * N; ++i ) {
261             spin_try_get( q, j );
262             check_item( next_value, j );
263             j = bogus_value;
264         }
265         for (int tid = 0; tid < num_threads; ++tid)  {
266             CHECK_MESSAGE( next_value[tid] == T(N), "" );
267         }
268         delete[] next_value;
269 
270         j = bogus_value;
271         g.wait_for_all();
272         CHECK_MESSAGE( q.try_get( j ) == false, "" );
273         CHECK_MESSAGE( j == bogus_value, "" );
274 
275         utils::NativeParallelFor( num_threads, parallel_puts<T>(q) );
276 
277         {
278             touches< T > t( num_threads );
279             utils::NativeParallelFor( num_threads, parallel_gets<T>(q, t) );
280             g.wait_for_all();
281             CHECK_MESSAGE( t.validate_touches(), "" );
282         }
283         j = bogus_value;
284         CHECK_MESSAGE( q.try_get( j ) == false, "" );
285         CHECK_MESSAGE( j == bogus_value, "" );
286 
287         g.wait_for_all();
288         {
289             touches< T > t2( num_threads );
290             utils::NativeParallelFor( num_threads, parallel_put_get<T>(q, t2) );
291             g.wait_for_all();
292             CHECK_MESSAGE( t2.validate_touches(), "" );
293         }
294         j = bogus_value;
295         CHECK_MESSAGE( q.try_get( j ) == false, "" );
296         CHECK_MESSAGE( j == bogus_value, "" );
297 
298         tbb::flow::make_edge( q, q2 );
299         tbb::flow::make_edge( q2, q3 );
300 
301         utils::NativeParallelFor( num_threads, parallel_puts<T>(q) );
302         {
303             touches< T > t3( num_threads );
304             utils::NativeParallelFor( num_threads, parallel_gets<T>(q3, t3) );
305             g.wait_for_all();
306             CHECK_MESSAGE( t3.validate_touches(), "" );
307         }
308         j = bogus_value;
309         g.wait_for_all();
310         CHECK_MESSAGE( q.try_get( j ) == false, "" );
311         g.wait_for_all();
312         CHECK_MESSAGE( q2.try_get( j ) == false, "" );
313         g.wait_for_all();
314         CHECK_MESSAGE( q3.try_get( j ) == false, "" );
315         CHECK_MESSAGE( j == bogus_value, "" );
316 
317         // test copy constructor
318         CHECK_MESSAGE( remove_successor( q, q2 ), "" );
319         utils::NativeParallelFor( num_threads, parallel_puts<T>(q) );
320         tbb::flow::queue_node<T> q_copy(q);
321         j = bogus_value;
322         g.wait_for_all();
323         CHECK_MESSAGE( q_copy.try_get( j ) == false, "" );
324         CHECK_MESSAGE( register_successor( q, q_copy ) == true, "" );
325         {
326             touches< T > t( num_threads );
327             utils::NativeParallelFor( num_threads, parallel_gets<T>(q_copy, t) );
328             g.wait_for_all();
329             CHECK_MESSAGE( t.validate_touches(), "" );
330         }
331         j = bogus_value;
332         CHECK_MESSAGE( q.try_get( j ) == false, "" );
333         CHECK_MESSAGE( j == bogus_value, "" );
334         CHECK_MESSAGE( q_copy.try_get( j ) == false, "" );
335         CHECK_MESSAGE( j == bogus_value, "" );
336     }
337 
338     return 0;
339 }
340 
341 //
342 // Tests
343 //
344 // Predecessors cannot be registered
345 // Empty Q rejects item requests
346 // Single serial sender, items in FIFO order
347 // Chained Qs ( 2 & 3 ), single sender, items at last Q in FIFO order
348 //
349 
350 template< typename T >
351 int test_serial() {
352     tbb::flow::graph g;
353     tbb::flow::queue_node<T> q(g);
354     tbb::flow::queue_node<T> q2(g);
355     {   // destroy the graph after manipulating it, and see if all the items in the buffers
356         // have been destroyed before the graph
357         Checker<T> my_check;  // if CheckType< U > count constructions and destructions
358         T bogus_value(-1);
359         T j = bogus_value;
360 
361         //
362         // Rejects attempts to add / remove predecessor
363         // Rejects request from empty Q
364         //
365         CHECK_MESSAGE( register_predecessor( q, q2 ) == false, "" );
366         CHECK_MESSAGE( remove_predecessor( q, q2 ) == false, "" );
367         CHECK_MESSAGE( q.try_get( j ) == false, "" );
368         CHECK_MESSAGE( j == bogus_value, "" );
369 
370         //
371         // Simple puts and gets
372         //
373 
374         for (int i = 0; i < N; ++i) {
375             bool msg = q.try_put( T(i) );
376             CHECK_MESSAGE( msg == true, "" );
377         }
378 
379 
380         for (int i = 0; i < N; ++i) {
381             j = bogus_value;
382             spin_try_get( q, j );
383             CHECK_MESSAGE( i == j, "" );
384         }
385         j = bogus_value;
386         g.wait_for_all();
387         CHECK_MESSAGE( q.try_get( j ) == false, "" );
388         CHECK_MESSAGE( j == bogus_value, "" );
389 
390         tbb::flow::make_edge( q, q2 );
391 
392         for (int i = 0; i < N; ++i) {
393             bool msg = q.try_put( T(i) );
394             CHECK_MESSAGE( msg == true, "" );
395         }
396 
397 
398         for (int i = 0; i < N; ++i) {
399             j = bogus_value;
400             spin_try_get( q2, j );
401             CHECK_MESSAGE( i == j, "" );
402         }
403         j = bogus_value;
404         g.wait_for_all();
405         CHECK_MESSAGE( q.try_get( j ) == false, "" );
406         g.wait_for_all();
407         CHECK_MESSAGE( q2.try_get( j ) == false, "" );
408         CHECK_MESSAGE( j == bogus_value, "" );
409 
410         tbb::flow::remove_edge( q, q2 );
411         CHECK_MESSAGE( q.try_put( 1 ) == true, "" );
412         g.wait_for_all();
413         CHECK_MESSAGE( q2.try_get( j ) == false, "" );
414         CHECK_MESSAGE( j == bogus_value, "" );
415         g.wait_for_all();
416         CHECK_MESSAGE( q.try_get( j ) == true, "" );
417         CHECK_MESSAGE( j == 1, "" );
418 
419         tbb::flow::queue_node<T> q3(g);
420         tbb::flow::make_edge( q, q2 );
421         tbb::flow::make_edge( q2, q3 );
422 
423         for (int i = 0; i < N; ++i) {
424             bool msg = q.try_put( T(i) );
425             CHECK_MESSAGE( msg == true, "" );
426         }
427 
428         for (int i = 0; i < N; ++i) {
429             j = bogus_value;
430             spin_try_get( q3, j );
431             CHECK_MESSAGE( i == j, "" );
432         }
433         j = bogus_value;
434         g.wait_for_all();
435         CHECK_MESSAGE( q.try_get( j ) == false, "" );
436         g.wait_for_all();
437         CHECK_MESSAGE( q2.try_get( j ) == false, "" );
438         g.wait_for_all();
439         CHECK_MESSAGE( q3.try_get( j ) == false, "" );
440         CHECK_MESSAGE( j == bogus_value, "" );
441 
442         tbb::flow::remove_edge( q,  q2 );
443         CHECK_MESSAGE( q.try_put( 1 ) == true, "" );
444         g.wait_for_all();
445         CHECK_MESSAGE( q2.try_get( j ) == false, "" );
446         CHECK_MESSAGE( j == bogus_value, "" );
447         g.wait_for_all();
448         CHECK_MESSAGE( q3.try_get( j ) == false, "" );
449         CHECK_MESSAGE( j == bogus_value, "" );
450         g.wait_for_all();
451         CHECK_MESSAGE( q.try_get( j ) == true, "" );
452         CHECK_MESSAGE( j == 1, "" );
453     }
454 
455     return 0;
456 }
457 
458 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
459 #include <array>
460 #include <vector>
461 void test_follows_and_precedes_api() {
462     std::array<int, 3> messages_for_follows = { {0, 1, 2} };
463     std::vector<int> messages_for_precedes = {0, 1, 2};
464 
465     follows_and_precedes_testing::test_follows <int, tbb::flow::queue_node<int>>(messages_for_follows);
466     follows_and_precedes_testing::test_precedes <int, tbb::flow::queue_node<int>>(messages_for_precedes);
467 }
468 #endif
469 
470 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
471 void test_deduction_guides() {
472     using namespace tbb::flow;
473     graph g;
474     broadcast_node<int> br(g);
475     queue_node<int> q0(g);
476 
477 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
478     queue_node q1(follows(br));
479     static_assert(std::is_same_v<decltype(q1), queue_node<int>>);
480 
481     queue_node q2(precedes(br));
482     static_assert(std::is_same_v<decltype(q2), queue_node<int>>);
483 #endif
484 
485     queue_node q3(q0);
486     static_assert(std::is_same_v<decltype(q3), queue_node<int>>);
487     g.wait_for_all();
488 }
489 #endif
490 
491 //! Test serial, parallel behavior and reservation under parallelism
492 //! \brief \ref requirement \ref error_guessing
493 TEST_CASE("Parallel, serial test"){
494     for (int p = 2; p <= 4; ++p) {
495         tbb::task_arena arena(p);
496         arena.execute(
497             [&]() {
498 
499                 test_serial<int>();
500                 test_serial<CheckType<int> >();
501                 test_parallel<int>(p);
502                 test_parallel<CheckType<int> >(p);
503 
504             }
505         );
506 	}
507 }
508 
509 //! Test reset and cancellation
510 //! \brief \ref error_guessing
511 TEST_CASE("Resets test"){
512     INFO("Testing resets\n");
513     test_resets<int, tbb::flow::queue_node<int> >();
514     test_resets<float, tbb::flow::queue_node<float> >();
515 }
516 
517 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
518 //! Test follows and precedes API
519 //! \brief \ref error_guessing
520 TEST_CASE("Test follows and precedes API"){
521     test_follows_and_precedes_api();
522 }
523 #endif
524 
525 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
526 //! Test decution guides
527 //! \brief \ref requirement
528 TEST_CASE("Deduction guides"){
529     test_deduction_guides();
530 }
531 #endif
532 
533 //! Test operations on a reserved queue_node
534 //! \brief \ref error_guessing
535 TEST_CASE("queue_node with reservation"){
536     tbb::flow::graph g;
537 
538     tbb::flow::queue_node<int> q(g);
539 
540     bool res = q.try_put(42);
541     CHECK_MESSAGE( res, "queue_node must accept input." );
542 
543     int val = 1;
544     res = q.try_reserve(val);
545     CHECK_MESSAGE( res, "queue_node must reserve as it has an item." );
546     CHECK_MESSAGE( (val == 42), "queue_node must reserve once passed item." );
547 
548     int out_arg = -1;
549     CHECK_MESSAGE((q.try_reserve(out_arg) == false), "Reserving a reserved node should fail.");
550     CHECK_MESSAGE((out_arg == -1), "Reserving a reserved node should not update its argument.");
551 
552     out_arg = -1;
553     CHECK_MESSAGE((q.try_get(out_arg) == false), "Getting from reserved node should fail.");
554     CHECK_MESSAGE((out_arg == -1), "Getting from reserved node should not update its argument.");
555     g.wait_for_all();
556 
557 }
558