xref: /oneTBB/test/tbb/test_sequencer_node.cpp (revision 51c0b2f7)
1 /*
2     Copyright (c) 2005-2020 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #include "common/config.h"
18 
19 #include "tbb/flow_graph.h"
20 
21 #include "common/test.h"
22 #include "common/utils.h"
23 #include "common/utils_assert.h"
24 #include "common/test_follows_and_precedes_api.h"
25 
26 #include <cstdio>
27 #include <atomic>
28 
29 
30 //! \file test_sequencer_node.cpp
31 //! \brief Test for [flow_graph.sequencer_node] specification
32 
33 
34 #define N 1000
35 #define C 10
36 
37 template< typename T >
38 struct seq_inspector {
39     size_t operator()(const T &v) const { return size_t(v); }
40 };
41 
42 template< typename T >
43 bool wait_try_get( tbb::flow::graph &g, tbb::flow::sequencer_node<T> &q, T &value ) {
44     g.wait_for_all();
45     return q.try_get(value);
46 }
47 
48 template< typename T >
49 void spin_try_get( tbb::flow::queue_node<T> &q, T &value ) {
50     while ( q.try_get(value) != true ) ;
51 }
52 
53 template< typename T >
54 struct parallel_puts : utils::NoAssign {
55 
56     tbb::flow::sequencer_node<T> &my_q;
57     int my_num_threads;
58 
59     parallel_puts( tbb::flow::sequencer_node<T> &q, int num_threads ) : my_q(q), my_num_threads(num_threads) {}
60 
61     void operator()(int tid) const {
62         for (int j = tid; j < N; j+=my_num_threads) {
63             bool msg = my_q.try_put( T(j) );
64             CHECK_MESSAGE( msg == true, "" );
65         }
66     }
67 
68 };
69 
70 template< typename T >
71 struct touches {
72 
73     bool **my_touches;
74     T *my_last_touch;
75     int my_num_threads;
76 
77     touches( int num_threads ) : my_num_threads(num_threads) {
78         my_last_touch = new T[my_num_threads];
79         my_touches = new bool* [my_num_threads];
80         for ( int p = 0; p < my_num_threads; ++p) {
81             my_last_touch[p] = T(-1);
82             my_touches[p] = new bool[N];
83             for ( int n = 0; n < N; ++n)
84                 my_touches[p][n] = false;
85         }
86     }
87 
88     ~touches() {
89         for ( int p = 0; p < my_num_threads; ++p) {
90             delete [] my_touches[p];
91         }
92         delete [] my_touches;
93         delete [] my_last_touch;
94     }
95 
96     bool check( int tid, T v ) {
97         if ( my_touches[tid][v] != false ) {
98             printf("Error: value seen twice by local thread\n");
99             return false;
100         }
101         if ( v <= my_last_touch[tid] ) {
102             printf("Error: value seen in wrong order by local thread\n");
103             return false;
104         }
105         my_last_touch[tid] = v;
106         my_touches[tid][v] = true;
107         return true;
108     }
109 
110     bool validate_touches() {
111         bool *all_touches = new bool[N];
112         for ( int n = 0; n < N; ++n)
113             all_touches[n] = false;
114 
115         for ( int p = 0; p < my_num_threads; ++p) {
116             for ( int n = 0; n < N; ++n) {
117                 if ( my_touches[p][n] == true ) {
118                     CHECK_MESSAGE( ( all_touches[n] == false), "value see by more than one thread\n" );
119                     all_touches[n] = true;
120                 }
121             }
122         }
123         for ( int n = 0; n < N; ++n) {
124             if ( !all_touches[n] )
125                 printf("No touch at %d, my_num_threads = %d\n", n, my_num_threads);
126             //CHECK_MESSAGE( ( all_touches[n] == true), "value not seen by any thread\n" );
127         }
128         delete [] all_touches;
129         return true;
130     }
131 
132 };
133 
134 template< typename T >
135 struct parallel_gets : utils::NoAssign {
136 
137     tbb::flow::sequencer_node<T> &my_q;
138     int my_num_threads;
139     touches<T> &my_touches;
140 
141     parallel_gets( tbb::flow::sequencer_node<T> &q, int num_threads, touches<T> &t ) : my_q(q), my_num_threads(num_threads), my_touches(t) {}
142 
143     void operator()(int tid) const {
144         for (int j = tid; j < N; j+=my_num_threads) {
145             T v;
146             spin_try_get( my_q, v );
147             my_touches.check( tid, v );
148         }
149     }
150 
151 };
152 
153 template< typename T >
154 struct parallel_put_get : utils::NoAssign {
155 
156     tbb::flow::sequencer_node<T> &my_s1;
157     tbb::flow::sequencer_node<T> &my_s2;
158     int my_num_threads;
159     std::atomic< int > &my_counter;
160     touches<T> &my_touches;
161 
162     parallel_put_get( tbb::flow::sequencer_node<T> &s1, tbb::flow::sequencer_node<T> &s2, int num_threads,
163                       std::atomic<int> &counter, touches<T> &t ) : my_s1(s1), my_s2(s2), my_num_threads(num_threads), my_counter(counter), my_touches(t) {}
164 
165     void operator()(int tid) const {
166         int i_start = 0;
167 
168         while ( (i_start = my_counter.fetch_add(C)) < N ) {
169             int i_end = ( N < i_start + C ) ? N : i_start + C;
170             for (int i = i_start; i < i_end; ++i) {
171                 bool msg = my_s1.try_put( T(i) );
172                 CHECK_MESSAGE( msg == true, "" );
173             }
174 
175             for (int i = i_start; i < i_end; ++i) {
176                 T v;
177                 spin_try_get( my_s2, v );
178                 my_touches.check( tid, v );
179             }
180         }
181     }
182 
183 };
184 
185 //
186 // Tests
187 //
188 // multiple parallel senders, multiple receivers, properly sequenced (relative to receiver) at output
189 // chained sequencers, multiple parallel senders, multiple receivers, properly sequenced (relative to receiver) at output
190 //
191 
192 template< typename T >
193 int test_parallel(int num_threads) {
194     tbb::flow::graph g;
195 
196     tbb::flow::sequencer_node<T> s(g, seq_inspector<T>());
197     utils::NativeParallelFor( num_threads, parallel_puts<T>(s, num_threads) );
198     {
199         touches<T> t( num_threads );
200         utils::NativeParallelFor( num_threads, parallel_gets<T>(s, num_threads, t) );
201         g.wait_for_all();
202         CHECK_MESSAGE( t.validate_touches(), "" );
203     }
204     T bogus_value(-1);
205     T j = bogus_value;
206     CHECK_MESSAGE( s.try_get( j ) == false, "" );
207     CHECK_MESSAGE( j == bogus_value, "" );
208     g.wait_for_all();
209 
210     tbb::flow::sequencer_node<T> s1(g, seq_inspector<T>());
211     tbb::flow::sequencer_node<T> s2(g, seq_inspector<T>());
212     tbb::flow::sequencer_node<T> s3(g, seq_inspector<T>());
213     tbb::flow::make_edge( s1, s2 );
214     tbb::flow::make_edge( s2, s3 );
215 
216     {
217         touches<T> t( num_threads );
218         std::atomic<int> counter;
219         counter = 0;
220         utils::NativeParallelFor( num_threads, parallel_put_get<T>(s1, s3, num_threads, counter, t) );
221         g.wait_for_all();
222         t.validate_touches();
223     }
224     g.wait_for_all();
225     CHECK_MESSAGE( s1.try_get( j ) == false, "" );
226     g.wait_for_all();
227     CHECK_MESSAGE( s2.try_get( j ) == false, "" );
228     g.wait_for_all();
229     CHECK_MESSAGE( s3.try_get( j ) == false, "" );
230     CHECK_MESSAGE( j == bogus_value, "" );
231 
232     // test copy constructor
233     tbb::flow::sequencer_node<T> s_copy(s);
234     utils::NativeParallelFor( num_threads, parallel_puts<T>(s_copy, num_threads) );
235     for (int i = 0; i < N; ++i) {
236         j = bogus_value;
237         spin_try_get( s_copy, j );
238         CHECK_MESSAGE( i == j, "" );
239     }
240     j = bogus_value;
241     g.wait_for_all();
242     CHECK_MESSAGE( s_copy.try_get( j ) == false, "" );
243     CHECK_MESSAGE( j == bogus_value, "" );
244 
245     return 0;
246 }
247 
248 
249 //
250 // Tests
251 //
252 // No predecessors can be registered
253 // Request from empty buffer fails
254 // In-order puts, single sender, single receiver, properly sequenced at output
255 // Reverse-order puts, single sender, single receiver, properly sequenced at output
256 // Chained sequencers (3), in-order and reverse-order tests, properly sequenced at output
257 //
258 
259 template< typename T >
260 int test_serial() {
261     tbb::flow::graph g;
262     T bogus_value(-1);
263 
264     tbb::flow::sequencer_node<T> s(g, seq_inspector<T>());
265     tbb::flow::sequencer_node<T> s2(g, seq_inspector<T>());
266     T j = bogus_value;
267 
268     //
269     // Rejects attempts to add / remove predecessor
270     // Rejects request from empty Q
271     //
272     CHECK_MESSAGE( register_predecessor( s, s2 ) == false, "" );
273     CHECK_MESSAGE( remove_predecessor( s, s2 ) == false, "" );
274     CHECK_MESSAGE( s.try_get( j ) == false, "" );
275     CHECK_MESSAGE( j == bogus_value, "" );
276 
277     //
278     // In-order simple puts and gets
279     //
280 
281     for (int i = 0; i < N; ++i) {
282         bool msg = s.try_put( T(i) );
283         CHECK_MESSAGE( msg == true, "" );
284         CHECK_MESSAGE(!s.try_put( T(i) ), "");  // second attempt to put should reject
285     }
286 
287 
288     for (int i = 0; i < N; ++i) {
289         j = bogus_value;
290         CHECK_MESSAGE(wait_try_get( g, s, j ) == true, "");
291         CHECK_MESSAGE( i == j, "" );
292         CHECK_MESSAGE(!s.try_put( T(i) ),"" );  // after retrieving value, subsequent put should fail
293     }
294     j = bogus_value;
295     g.wait_for_all();
296     CHECK_MESSAGE( s.try_get( j ) == false, "" );
297     CHECK_MESSAGE( j == bogus_value, "" );
298 
299     //
300     // Reverse-order simple puts and gets
301     //
302 
303     for (int i = N-1; i >= 0; --i) {
304         bool msg = s2.try_put( T(i) );
305         CHECK_MESSAGE( msg == true, "" );
306     }
307 
308     for (int i = 0; i < N; ++i) {
309         j = bogus_value;
310         CHECK_MESSAGE(wait_try_get( g, s2, j ) == true, "");
311         CHECK_MESSAGE( i == j, "" );
312     }
313     j = bogus_value;
314     g.wait_for_all();
315     CHECK_MESSAGE( s2.try_get( j ) == false, "" );
316     CHECK_MESSAGE( j == bogus_value, "" );
317 
318     //
319     // Chained in-order simple puts and gets
320     //
321 
322     tbb::flow::sequencer_node<T> s3(g, seq_inspector<T>());
323     tbb::flow::sequencer_node<T> s4(g, seq_inspector<T>());
324     tbb::flow::sequencer_node<T> s5(g, seq_inspector<T>());
325     tbb::flow::make_edge( s3, s4 );
326     tbb::flow::make_edge( s4, s5 );
327 
328     for (int i = 0; i < N; ++i) {
329         bool msg = s3.try_put( T(i) );
330         CHECK_MESSAGE( msg == true, "" );
331     }
332 
333     for (int i = 0; i < N; ++i) {
334         j = bogus_value;
335         CHECK_MESSAGE(wait_try_get( g, s5, j ) == true, "");
336         CHECK_MESSAGE( i == j, "" );
337     }
338     j = bogus_value;
339     CHECK_MESSAGE( wait_try_get( g, s3, j ) == false, "" );
340     CHECK_MESSAGE( wait_try_get( g, s4, j ) == false, "" );
341     CHECK_MESSAGE( wait_try_get( g, s5, j ) == false, "" );
342     CHECK_MESSAGE( j == bogus_value, "" );
343 
344     g.wait_for_all();
345     tbb::flow::remove_edge( s3, s4 );
346     CHECK_MESSAGE( s3.try_put( N ) == true, "" );
347     CHECK_MESSAGE( wait_try_get( g, s4, j ) == false, "" );
348     CHECK_MESSAGE( j == bogus_value, "" );
349     CHECK_MESSAGE( wait_try_get( g, s5, j ) == false, "" );
350     CHECK_MESSAGE( j == bogus_value, "" );
351     CHECK_MESSAGE( wait_try_get( g, s3, j ) == true, "" );
352     CHECK_MESSAGE( j == N, "" );
353 
354     //
355     // Chained reverse-order simple puts and gets
356     //
357 
358     tbb::flow::sequencer_node<T> s6(g, seq_inspector<T>());
359     tbb::flow::sequencer_node<T> s7(g, seq_inspector<T>());
360     tbb::flow::sequencer_node<T> s8(g, seq_inspector<T>());
361     tbb::flow::make_edge( s6, s7 );
362     tbb::flow::make_edge( s7, s8 );
363 
364     for (int i = N-1; i >= 0; --i) {
365         bool msg = s6.try_put( T(i) );
366         CHECK_MESSAGE( msg == true, "" );
367     }
368 
369     for (int i = 0; i < N; ++i) {
370         j = bogus_value;
371         CHECK_MESSAGE( wait_try_get( g, s8, j ) == true, "" );
372         CHECK_MESSAGE( i == j, "" );
373     }
374     j = bogus_value;
375     CHECK_MESSAGE( wait_try_get( g, s6, j ) == false, "" );
376     CHECK_MESSAGE( wait_try_get( g, s7, j ) == false, "" );
377     CHECK_MESSAGE( wait_try_get( g, s8, j ) == false, "" );
378     CHECK_MESSAGE( j == bogus_value, "" );
379 
380     g.wait_for_all();
381     tbb::flow::remove_edge( s6, s7 );
382     CHECK_MESSAGE( s6.try_put( N ) == true, "" );
383     CHECK_MESSAGE( wait_try_get( g, s7, j ) == false, "" );
384     CHECK_MESSAGE( j == bogus_value, "" );
385     CHECK_MESSAGE( wait_try_get( g, s8, j ) == false, "" );
386     CHECK_MESSAGE( j == bogus_value, "" );
387     CHECK_MESSAGE( wait_try_get( g, s6, j ) == true, "" );
388     CHECK_MESSAGE( j == N, "" );
389 
390     return 0;
391 }
392 
393 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
394 #include <array>
395 #include <vector>
396 void test_follows_and_precedes_api() {
397     std::array<int, 3> messages_for_follows = { {0, 1, 2} };
398     std::vector<int> messages_for_precedes = {0, 1, 2};
399 
400     follows_and_precedes_testing::test_follows
401         <int, tbb::flow::sequencer_node<int>>
402         (messages_for_follows, [](const int& i) { return i; });
403 
404     follows_and_precedes_testing::test_precedes
405         <int, tbb::flow::sequencer_node<int>>
406         (messages_for_precedes, [](const int& i) { return i; });
407 }
408 #endif
409 
410 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
411 template <typename Body>
412 void test_deduction_guides_common(Body body) {
413     using namespace tbb::flow;
414     graph g;
415     broadcast_node<int> br(g);
416 
417     sequencer_node s1(g, body);
418     static_assert(std::is_same_v<decltype(s1), sequencer_node<int>>);
419 
420 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
421     sequencer_node s2(follows(br), body);
422     static_assert(std::is_same_v<decltype(s2), sequencer_node<int>>);
423 #endif
424 
425     sequencer_node s3(s1);
426     static_assert(std::is_same_v<decltype(s3), sequencer_node<int>>);
427 }
428 
429 int sequencer_body_f(const int&) { return 1; }
430 
431 void test_deduction_guides() {
432     test_deduction_guides_common([](const int&)->int { return 1; });
433     test_deduction_guides_common([](const int&) mutable ->int { return 1; });
434     test_deduction_guides_common(sequencer_body_f);
435 }
436 #endif
437 
438 //! Test sequencer with various request orders and parallelism levels
439 //! \brief \ref requirement \ref error_guessing
440 TEST_CASE("Serial and parallel test"){
441     for (int p = 2; p <= 4; ++p) {
442         tbb::task_arena arena(p);
443         arena.execute(
444             [&]() {
445 
446                 test_serial<int>();
447                 test_parallel<int>(p);
448 
449             }
450         );
451 	}
452 }
453 
454 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
455 //! Test decution guides
456 //! \brief \ref requirement
457 TEST_CASE("Test follows and precedes API"){
458     test_follows_and_precedes_api();
459 }
460 #endif
461 
462 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
463 //! Test decution guides
464 //! \brief \ref requirement
465 TEST_CASE("Test deduction guides"){
466     test_deduction_guides();
467 }
468 #endif
469