xref: /oneTBB/test/tbb/test_sequencer_node.cpp (revision 8c9445de)
1 /*
2     Copyright (c) 2005-2021 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #include "common/config.h"
18 
19 #include "tbb/flow_graph.h"
20 #include "tbb/global_control.h"
21 
22 #include "common/test.h"
23 #include "common/utils.h"
24 #include "common/utils_assert.h"
25 #include "common/test_follows_and_precedes_api.h"
26 #include "common/concepts_common.h"
27 
28 #include <cstdio>
29 #include <atomic>
30 
31 
32 //! \file test_sequencer_node.cpp
33 //! \brief Test for [flow_graph.sequencer_node] specification
34 
35 
36 #define N 1000
37 #define C 10
38 
39 template< typename T >
40 struct seq_inspector {
41     size_t operator()(const T &v) const { return size_t(v); }
42 };
43 
44 template< typename T >
45 bool wait_try_get( tbb::flow::graph &g, tbb::flow::sequencer_node<T> &q, T &value ) {
46     g.wait_for_all();
47     return q.try_get(value);
48 }
49 
50 template< typename T >
51 void spin_try_get( tbb::flow::queue_node<T> &q, T &value ) {
52     while ( q.try_get(value) != true ) ;
53 }
54 
55 template< typename T >
56 struct parallel_puts : utils::NoAssign {
57 
58     tbb::flow::sequencer_node<T> &my_q;
59     int my_num_threads;
60 
61     parallel_puts( tbb::flow::sequencer_node<T> &q, int num_threads ) : my_q(q), my_num_threads(num_threads) {}
62 
63     void operator()(int tid) const {
64         for (int j = tid; j < N; j+=my_num_threads) {
65             bool msg = my_q.try_put( T(j) );
66             CHECK_MESSAGE( msg == true, "" );
67         }
68     }
69 
70 };
71 
72 template< typename T >
73 struct touches {
74 
75     bool **my_touches;
76     T *my_last_touch;
77     int my_num_threads;
78 
79     touches( int num_threads ) : my_num_threads(num_threads) {
80         my_last_touch = new T[my_num_threads];
81         my_touches = new bool* [my_num_threads];
82         for ( int p = 0; p < my_num_threads; ++p) {
83             my_last_touch[p] = T(-1);
84             my_touches[p] = new bool[N];
85             for ( int n = 0; n < N; ++n)
86                 my_touches[p][n] = false;
87         }
88     }
89 
90     ~touches() {
91         for ( int p = 0; p < my_num_threads; ++p) {
92             delete [] my_touches[p];
93         }
94         delete [] my_touches;
95         delete [] my_last_touch;
96     }
97 
98     bool check( int tid, T v ) {
99         if ( my_touches[tid][v] != false ) {
100             printf("Error: value seen twice by local thread\n");
101             return false;
102         }
103         if ( v <= my_last_touch[tid] ) {
104             printf("Error: value seen in wrong order by local thread\n");
105             return false;
106         }
107         my_last_touch[tid] = v;
108         my_touches[tid][v] = true;
109         return true;
110     }
111 
112     bool validate_touches() {
113         bool *all_touches = new bool[N];
114         for ( int n = 0; n < N; ++n)
115             all_touches[n] = false;
116 
117         for ( int p = 0; p < my_num_threads; ++p) {
118             for ( int n = 0; n < N; ++n) {
119                 if ( my_touches[p][n] == true ) {
120                     CHECK_MESSAGE( ( all_touches[n] == false), "value see by more than one thread\n" );
121                     all_touches[n] = true;
122                 }
123             }
124         }
125         for ( int n = 0; n < N; ++n) {
126             if ( !all_touches[n] )
127                 printf("No touch at %d, my_num_threads = %d\n", n, my_num_threads);
128             //CHECK_MESSAGE( ( all_touches[n] == true), "value not seen by any thread\n" );
129         }
130         delete [] all_touches;
131         return true;
132     }
133 
134 };
135 
136 template< typename T >
137 struct parallel_gets : utils::NoAssign {
138 
139     tbb::flow::sequencer_node<T> &my_q;
140     int my_num_threads;
141     touches<T> &my_touches;
142 
143     parallel_gets( tbb::flow::sequencer_node<T> &q, int num_threads, touches<T> &t ) : my_q(q), my_num_threads(num_threads), my_touches(t) {}
144 
145     void operator()(int tid) const {
146         for (int j = tid; j < N; j+=my_num_threads) {
147             T v;
148             spin_try_get( my_q, v );
149             my_touches.check( tid, v );
150         }
151     }
152 
153 };
154 
155 template< typename T >
156 struct parallel_put_get : utils::NoAssign {
157 
158     tbb::flow::sequencer_node<T> &my_s1;
159     tbb::flow::sequencer_node<T> &my_s2;
160     int my_num_threads;
161     std::atomic< int > &my_counter;
162     touches<T> &my_touches;
163 
164     parallel_put_get( tbb::flow::sequencer_node<T> &s1, tbb::flow::sequencer_node<T> &s2, int num_threads,
165                       std::atomic<int> &counter, touches<T> &t ) : my_s1(s1), my_s2(s2), my_num_threads(num_threads), my_counter(counter), my_touches(t) {}
166 
167     void operator()(int tid) const {
168         int i_start = 0;
169 
170         while ( (i_start = my_counter.fetch_add(C)) < N ) {
171             int i_end = ( N < i_start + C ) ? N : i_start + C;
172             for (int i = i_start; i < i_end; ++i) {
173                 bool msg = my_s1.try_put( T(i) );
174                 CHECK_MESSAGE( msg == true, "" );
175             }
176 
177             for (int i = i_start; i < i_end; ++i) {
178                 T v;
179                 spin_try_get( my_s2, v );
180                 my_touches.check( tid, v );
181             }
182         }
183     }
184 
185 };
186 
187 //
188 // Tests
189 //
190 // multiple parallel senders, multiple receivers, properly sequenced (relative to receiver) at output
191 // chained sequencers, multiple parallel senders, multiple receivers, properly sequenced (relative to receiver) at output
192 //
193 
194 template< typename T >
195 int test_parallel(int num_threads) {
196     tbb::flow::graph g;
197 
198     tbb::flow::sequencer_node<T> s(g, seq_inspector<T>());
199     utils::NativeParallelFor( num_threads, parallel_puts<T>(s, num_threads) );
200     {
201         touches<T> t( num_threads );
202         utils::NativeParallelFor( num_threads, parallel_gets<T>(s, num_threads, t) );
203         g.wait_for_all();
204         CHECK_MESSAGE( t.validate_touches(), "" );
205     }
206     T bogus_value(-1);
207     T j = bogus_value;
208     CHECK_MESSAGE( s.try_get( j ) == false, "" );
209     CHECK_MESSAGE( j == bogus_value, "" );
210     g.wait_for_all();
211 
212     tbb::flow::sequencer_node<T> s1(g, seq_inspector<T>());
213     tbb::flow::sequencer_node<T> s2(g, seq_inspector<T>());
214     tbb::flow::sequencer_node<T> s3(g, seq_inspector<T>());
215     tbb::flow::make_edge( s1, s2 );
216     tbb::flow::make_edge( s2, s3 );
217 
218     {
219         touches<T> t( num_threads );
220         std::atomic<int> counter;
221         counter = 0;
222         utils::NativeParallelFor( num_threads, parallel_put_get<T>(s1, s3, num_threads, counter, t) );
223         g.wait_for_all();
224         t.validate_touches();
225     }
226     g.wait_for_all();
227     CHECK_MESSAGE( s1.try_get( j ) == false, "" );
228     g.wait_for_all();
229     CHECK_MESSAGE( s2.try_get( j ) == false, "" );
230     g.wait_for_all();
231     CHECK_MESSAGE( s3.try_get( j ) == false, "" );
232     CHECK_MESSAGE( j == bogus_value, "" );
233 
234     // test copy constructor
235     tbb::flow::sequencer_node<T> s_copy(s);
236     utils::NativeParallelFor( num_threads, parallel_puts<T>(s_copy, num_threads) );
237     for (int i = 0; i < N; ++i) {
238         j = bogus_value;
239         spin_try_get( s_copy, j );
240         CHECK_MESSAGE( i == j, "" );
241     }
242     j = bogus_value;
243     g.wait_for_all();
244     CHECK_MESSAGE( s_copy.try_get( j ) == false, "" );
245     CHECK_MESSAGE( j == bogus_value, "" );
246 
247     return 0;
248 }
249 
250 
251 //
252 // Tests
253 //
254 // No predecessors can be registered
255 // Request from empty buffer fails
256 // In-order puts, single sender, single receiver, properly sequenced at output
257 // Reverse-order puts, single sender, single receiver, properly sequenced at output
258 // Chained sequencers (3), in-order and reverse-order tests, properly sequenced at output
259 //
260 
261 template< typename T >
262 int test_serial() {
263     tbb::flow::graph g;
264     T bogus_value(-1);
265 
266     tbb::flow::sequencer_node<T> s(g, seq_inspector<T>());
267     tbb::flow::sequencer_node<T> s2(g, seq_inspector<T>());
268     T j = bogus_value;
269 
270     //
271     // Rejects attempts to add / remove predecessor
272     // Rejects request from empty Q
273     //
274     CHECK_MESSAGE( register_predecessor( s, s2 ) == false, "" );
275     CHECK_MESSAGE( remove_predecessor( s, s2 ) == false, "" );
276     CHECK_MESSAGE( s.try_get( j ) == false, "" );
277     CHECK_MESSAGE( j == bogus_value, "" );
278 
279     //
280     // In-order simple puts and gets
281     //
282 
283     for (int i = 0; i < N; ++i) {
284         bool msg = s.try_put( T(i) );
285         CHECK_MESSAGE( msg == true, "" );
286         CHECK_MESSAGE(!s.try_put( T(i) ), "");  // second attempt to put should reject
287     }
288 
289 
290     for (int i = 0; i < N; ++i) {
291         j = bogus_value;
292         CHECK_MESSAGE(wait_try_get( g, s, j ) == true, "");
293         CHECK_MESSAGE( i == j, "" );
294         CHECK_MESSAGE(!s.try_put( T(i) ),"" );  // after retrieving value, subsequent put should fail
295     }
296     j = bogus_value;
297     g.wait_for_all();
298     CHECK_MESSAGE( s.try_get( j ) == false, "" );
299     CHECK_MESSAGE( j == bogus_value, "" );
300 
301     //
302     // Reverse-order simple puts and gets
303     //
304 
305     for (int i = N-1; i >= 0; --i) {
306         bool msg = s2.try_put( T(i) );
307         CHECK_MESSAGE( msg == true, "" );
308     }
309 
310     for (int i = 0; i < N; ++i) {
311         j = bogus_value;
312         CHECK_MESSAGE(wait_try_get( g, s2, j ) == true, "");
313         CHECK_MESSAGE( i == j, "" );
314     }
315     j = bogus_value;
316     g.wait_for_all();
317     CHECK_MESSAGE( s2.try_get( j ) == false, "" );
318     CHECK_MESSAGE( j == bogus_value, "" );
319 
320     //
321     // Chained in-order simple puts and gets
322     //
323 
324     tbb::flow::sequencer_node<T> s3(g, seq_inspector<T>());
325     tbb::flow::sequencer_node<T> s4(g, seq_inspector<T>());
326     tbb::flow::sequencer_node<T> s5(g, seq_inspector<T>());
327     tbb::flow::make_edge( s3, s4 );
328     tbb::flow::make_edge( s4, s5 );
329 
330     for (int i = 0; i < N; ++i) {
331         bool msg = s3.try_put( T(i) );
332         CHECK_MESSAGE( msg == true, "" );
333     }
334 
335     for (int i = 0; i < N; ++i) {
336         j = bogus_value;
337         CHECK_MESSAGE(wait_try_get( g, s5, j ) == true, "");
338         CHECK_MESSAGE( i == j, "" );
339     }
340     j = bogus_value;
341     CHECK_MESSAGE( wait_try_get( g, s3, j ) == false, "" );
342     CHECK_MESSAGE( wait_try_get( g, s4, j ) == false, "" );
343     CHECK_MESSAGE( wait_try_get( g, s5, j ) == false, "" );
344     CHECK_MESSAGE( j == bogus_value, "" );
345 
346     g.wait_for_all();
347     tbb::flow::remove_edge( s3, s4 );
348     CHECK_MESSAGE( s3.try_put( N ) == true, "" );
349     CHECK_MESSAGE( wait_try_get( g, s4, j ) == false, "" );
350     CHECK_MESSAGE( j == bogus_value, "" );
351     CHECK_MESSAGE( wait_try_get( g, s5, j ) == false, "" );
352     CHECK_MESSAGE( j == bogus_value, "" );
353     CHECK_MESSAGE( wait_try_get( g, s3, j ) == true, "" );
354     CHECK_MESSAGE( j == N, "" );
355 
356     //
357     // Chained reverse-order simple puts and gets
358     //
359 
360     tbb::flow::sequencer_node<T> s6(g, seq_inspector<T>());
361     tbb::flow::sequencer_node<T> s7(g, seq_inspector<T>());
362     tbb::flow::sequencer_node<T> s8(g, seq_inspector<T>());
363     tbb::flow::make_edge( s6, s7 );
364     tbb::flow::make_edge( s7, s8 );
365 
366     for (int i = N-1; i >= 0; --i) {
367         bool msg = s6.try_put( T(i) );
368         CHECK_MESSAGE( msg == true, "" );
369     }
370 
371     for (int i = 0; i < N; ++i) {
372         j = bogus_value;
373         CHECK_MESSAGE( wait_try_get( g, s8, j ) == true, "" );
374         CHECK_MESSAGE( i == j, "" );
375     }
376     j = bogus_value;
377     CHECK_MESSAGE( wait_try_get( g, s6, j ) == false, "" );
378     CHECK_MESSAGE( wait_try_get( g, s7, j ) == false, "" );
379     CHECK_MESSAGE( wait_try_get( g, s8, j ) == false, "" );
380     CHECK_MESSAGE( j == bogus_value, "" );
381 
382     g.wait_for_all();
383     tbb::flow::remove_edge( s6, s7 );
384     CHECK_MESSAGE( s6.try_put( N ) == true, "" );
385     CHECK_MESSAGE( wait_try_get( g, s7, j ) == false, "" );
386     CHECK_MESSAGE( j == bogus_value, "" );
387     CHECK_MESSAGE( wait_try_get( g, s8, j ) == false, "" );
388     CHECK_MESSAGE( j == bogus_value, "" );
389     CHECK_MESSAGE( wait_try_get( g, s6, j ) == true, "" );
390     CHECK_MESSAGE( j == N, "" );
391 
392     return 0;
393 }
394 
395 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
396 #include <array>
397 #include <vector>
398 void test_follows_and_precedes_api() {
399     std::array<int, 3> messages_for_follows = { {0, 1, 2} };
400     std::vector<int> messages_for_precedes = {0, 1, 2};
401 
402     follows_and_precedes_testing::test_follows
403         <int, tbb::flow::sequencer_node<int>>
404         (messages_for_follows, [](const int& i) -> std::size_t { return i; });
405 
406     follows_and_precedes_testing::test_precedes
407         <int, tbb::flow::sequencer_node<int>>
408         (messages_for_precedes, [](const int& i) -> std::size_t { return i; });
409 }
410 #endif
411 
412 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
413 template <typename Body>
414 void test_deduction_guides_common(Body body) {
415     using namespace tbb::flow;
416     graph g;
417     broadcast_node<int> br(g);
418 
419     sequencer_node s1(g, body);
420     static_assert(std::is_same_v<decltype(s1), sequencer_node<int>>);
421 
422 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
423     sequencer_node s2(follows(br), body);
424     static_assert(std::is_same_v<decltype(s2), sequencer_node<int>>);
425 #endif
426 
427     sequencer_node s3(s1);
428     static_assert(std::is_same_v<decltype(s3), sequencer_node<int>>);
429 }
430 
431 std::size_t sequencer_body_f(const int&) { return 1; }
432 
433 void test_deduction_guides() {
434     test_deduction_guides_common([](const int&)->std::size_t { return 1; });
435     test_deduction_guides_common([](const int&) mutable ->std::size_t { return 1; });
436     test_deduction_guides_common(sequencer_body_f);
437 }
438 #endif
439 
440 //! Test sequencer with various request orders and parallelism levels
441 //! \brief \ref requirement \ref error_guessing
442 TEST_CASE("Serial and parallel test"){
443     for (int p = 2; p <= 4; ++p) {
444         tbb::global_control thread_limit(tbb::global_control::max_allowed_parallelism, p);
445         tbb::task_arena arena(p);
446         arena.execute(
447             [&]() {
448                 test_serial<int>();
449                 test_parallel<int>(p);
450             }
451         );
452 	}
453 }
454 
455 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
456 //! Test decution guides
457 //! \brief \ref requirement
458 TEST_CASE("Test follows and precedes API"){
459     test_follows_and_precedes_api();
460 }
461 #endif
462 
463 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
464 //! Test decution guides
465 //! \brief \ref requirement
466 TEST_CASE("Test deduction guides"){
467     test_deduction_guides();
468 }
469 #endif
470 
471 #if __TBB_CPP20_CONCEPTS_PRESENT
472 //! \brief \ref error_guessing
473 TEST_CASE("constraints for sequencer_node object") {
474     struct Object : test_concepts::Copyable, test_concepts::CopyAssignable {};
475 
476     static_assert(utils::well_formed_instantiation<tbb::flow::sequencer_node, Object>);
477     static_assert(utils::well_formed_instantiation<tbb::flow::sequencer_node, int>);
478     static_assert(!utils::well_formed_instantiation<tbb::flow::sequencer_node, test_concepts::NonCopyable>);
479     static_assert(!utils::well_formed_instantiation<tbb::flow::sequencer_node, test_concepts::NonCopyAssignable>);
480 }
481 
482 template <typename T, typename Sequencer>
483 concept can_call_sequencer_node_ctor = requires( tbb::flow::graph& graph, Sequencer seq,
484                                                  tbb::flow::buffer_node<int>& f ) {
485     tbb::flow::sequencer_node<T>(graph, seq);
486 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
487     tbb::flow::sequencer_node<T>(tbb::flow::follows(f), seq);
488 #endif
489 };
490 
491 //! \brief \ref error_guessing
492 TEST_CASE("constraints for sequencer_node sequencer") {
493     using type = int;
494     using namespace test_concepts::sequencer;
495 
496     static_assert(can_call_sequencer_node_ctor<type, Correct<type>>);
497     static_assert(!can_call_sequencer_node_ctor<type, NonCopyable<type>>);
498     static_assert(!can_call_sequencer_node_ctor<type, NonDestructible<type>>);
499     static_assert(!can_call_sequencer_node_ctor<type, NoOperatorRoundBrackets<type>>);
500     static_assert(!can_call_sequencer_node_ctor<type, WrongInputOperatorRoundBrackets<type>>);
501     static_assert(!can_call_sequencer_node_ctor<type, WrongReturnOperatorRoundBrackets<type>>);
502 }
503 #endif // __TBB_CPP20_CONCEPTS_PRESENT
504