xref: /oneTBB/test/tbb/test_sequencer_node.cpp (revision fa3268c3)
1 /*
2     Copyright (c) 2005-2021 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #include "common/config.h"
18 
19 #include "tbb/flow_graph.h"
20 
21 #include "common/test.h"
22 #include "common/utils.h"
23 #include "common/utils_assert.h"
24 #include "common/test_follows_and_precedes_api.h"
25 #include "common/concepts_common.h"
26 
27 #include <cstdio>
28 #include <atomic>
29 
30 
31 //! \file test_sequencer_node.cpp
32 //! \brief Test for [flow_graph.sequencer_node] specification
33 
34 
35 #define N 1000
36 #define C 10
37 
38 template< typename T >
39 struct seq_inspector {
40     size_t operator()(const T &v) const { return size_t(v); }
41 };
42 
43 template< typename T >
44 bool wait_try_get( tbb::flow::graph &g, tbb::flow::sequencer_node<T> &q, T &value ) {
45     g.wait_for_all();
46     return q.try_get(value);
47 }
48 
49 template< typename T >
50 void spin_try_get( tbb::flow::queue_node<T> &q, T &value ) {
51     while ( q.try_get(value) != true ) ;
52 }
53 
54 template< typename T >
55 struct parallel_puts : utils::NoAssign {
56 
57     tbb::flow::sequencer_node<T> &my_q;
58     int my_num_threads;
59 
60     parallel_puts( tbb::flow::sequencer_node<T> &q, int num_threads ) : my_q(q), my_num_threads(num_threads) {}
61 
62     void operator()(int tid) const {
63         for (int j = tid; j < N; j+=my_num_threads) {
64             bool msg = my_q.try_put( T(j) );
65             CHECK_MESSAGE( msg == true, "" );
66         }
67     }
68 
69 };
70 
71 template< typename T >
72 struct touches {
73 
74     bool **my_touches;
75     T *my_last_touch;
76     int my_num_threads;
77 
78     touches( int num_threads ) : my_num_threads(num_threads) {
79         my_last_touch = new T[my_num_threads];
80         my_touches = new bool* [my_num_threads];
81         for ( int p = 0; p < my_num_threads; ++p) {
82             my_last_touch[p] = T(-1);
83             my_touches[p] = new bool[N];
84             for ( int n = 0; n < N; ++n)
85                 my_touches[p][n] = false;
86         }
87     }
88 
89     ~touches() {
90         for ( int p = 0; p < my_num_threads; ++p) {
91             delete [] my_touches[p];
92         }
93         delete [] my_touches;
94         delete [] my_last_touch;
95     }
96 
97     bool check( int tid, T v ) {
98         if ( my_touches[tid][v] != false ) {
99             printf("Error: value seen twice by local thread\n");
100             return false;
101         }
102         if ( v <= my_last_touch[tid] ) {
103             printf("Error: value seen in wrong order by local thread\n");
104             return false;
105         }
106         my_last_touch[tid] = v;
107         my_touches[tid][v] = true;
108         return true;
109     }
110 
111     bool validate_touches() {
112         bool *all_touches = new bool[N];
113         for ( int n = 0; n < N; ++n)
114             all_touches[n] = false;
115 
116         for ( int p = 0; p < my_num_threads; ++p) {
117             for ( int n = 0; n < N; ++n) {
118                 if ( my_touches[p][n] == true ) {
119                     CHECK_MESSAGE( ( all_touches[n] == false), "value see by more than one thread\n" );
120                     all_touches[n] = true;
121                 }
122             }
123         }
124         for ( int n = 0; n < N; ++n) {
125             if ( !all_touches[n] )
126                 printf("No touch at %d, my_num_threads = %d\n", n, my_num_threads);
127             //CHECK_MESSAGE( ( all_touches[n] == true), "value not seen by any thread\n" );
128         }
129         delete [] all_touches;
130         return true;
131     }
132 
133 };
134 
135 template< typename T >
136 struct parallel_gets : utils::NoAssign {
137 
138     tbb::flow::sequencer_node<T> &my_q;
139     int my_num_threads;
140     touches<T> &my_touches;
141 
142     parallel_gets( tbb::flow::sequencer_node<T> &q, int num_threads, touches<T> &t ) : my_q(q), my_num_threads(num_threads), my_touches(t) {}
143 
144     void operator()(int tid) const {
145         for (int j = tid; j < N; j+=my_num_threads) {
146             T v;
147             spin_try_get( my_q, v );
148             my_touches.check( tid, v );
149         }
150     }
151 
152 };
153 
154 template< typename T >
155 struct parallel_put_get : utils::NoAssign {
156 
157     tbb::flow::sequencer_node<T> &my_s1;
158     tbb::flow::sequencer_node<T> &my_s2;
159     int my_num_threads;
160     std::atomic< int > &my_counter;
161     touches<T> &my_touches;
162 
163     parallel_put_get( tbb::flow::sequencer_node<T> &s1, tbb::flow::sequencer_node<T> &s2, int num_threads,
164                       std::atomic<int> &counter, touches<T> &t ) : my_s1(s1), my_s2(s2), my_num_threads(num_threads), my_counter(counter), my_touches(t) {}
165 
166     void operator()(int tid) const {
167         int i_start = 0;
168 
169         while ( (i_start = my_counter.fetch_add(C)) < N ) {
170             int i_end = ( N < i_start + C ) ? N : i_start + C;
171             for (int i = i_start; i < i_end; ++i) {
172                 bool msg = my_s1.try_put( T(i) );
173                 CHECK_MESSAGE( msg == true, "" );
174             }
175 
176             for (int i = i_start; i < i_end; ++i) {
177                 T v;
178                 spin_try_get( my_s2, v );
179                 my_touches.check( tid, v );
180             }
181         }
182     }
183 
184 };
185 
186 //
187 // Tests
188 //
189 // multiple parallel senders, multiple receivers, properly sequenced (relative to receiver) at output
190 // chained sequencers, multiple parallel senders, multiple receivers, properly sequenced (relative to receiver) at output
191 //
192 
193 template< typename T >
194 int test_parallel(int num_threads) {
195     tbb::flow::graph g;
196 
197     tbb::flow::sequencer_node<T> s(g, seq_inspector<T>());
198     utils::NativeParallelFor( num_threads, parallel_puts<T>(s, num_threads) );
199     {
200         touches<T> t( num_threads );
201         utils::NativeParallelFor( num_threads, parallel_gets<T>(s, num_threads, t) );
202         g.wait_for_all();
203         CHECK_MESSAGE( t.validate_touches(), "" );
204     }
205     T bogus_value(-1);
206     T j = bogus_value;
207     CHECK_MESSAGE( s.try_get( j ) == false, "" );
208     CHECK_MESSAGE( j == bogus_value, "" );
209     g.wait_for_all();
210 
211     tbb::flow::sequencer_node<T> s1(g, seq_inspector<T>());
212     tbb::flow::sequencer_node<T> s2(g, seq_inspector<T>());
213     tbb::flow::sequencer_node<T> s3(g, seq_inspector<T>());
214     tbb::flow::make_edge( s1, s2 );
215     tbb::flow::make_edge( s2, s3 );
216 
217     {
218         touches<T> t( num_threads );
219         std::atomic<int> counter;
220         counter = 0;
221         utils::NativeParallelFor( num_threads, parallel_put_get<T>(s1, s3, num_threads, counter, t) );
222         g.wait_for_all();
223         t.validate_touches();
224     }
225     g.wait_for_all();
226     CHECK_MESSAGE( s1.try_get( j ) == false, "" );
227     g.wait_for_all();
228     CHECK_MESSAGE( s2.try_get( j ) == false, "" );
229     g.wait_for_all();
230     CHECK_MESSAGE( s3.try_get( j ) == false, "" );
231     CHECK_MESSAGE( j == bogus_value, "" );
232 
233     // test copy constructor
234     tbb::flow::sequencer_node<T> s_copy(s);
235     utils::NativeParallelFor( num_threads, parallel_puts<T>(s_copy, num_threads) );
236     for (int i = 0; i < N; ++i) {
237         j = bogus_value;
238         spin_try_get( s_copy, j );
239         CHECK_MESSAGE( i == j, "" );
240     }
241     j = bogus_value;
242     g.wait_for_all();
243     CHECK_MESSAGE( s_copy.try_get( j ) == false, "" );
244     CHECK_MESSAGE( j == bogus_value, "" );
245 
246     return 0;
247 }
248 
249 
250 //
251 // Tests
252 //
253 // No predecessors can be registered
254 // Request from empty buffer fails
255 // In-order puts, single sender, single receiver, properly sequenced at output
256 // Reverse-order puts, single sender, single receiver, properly sequenced at output
257 // Chained sequencers (3), in-order and reverse-order tests, properly sequenced at output
258 //
259 
260 template< typename T >
261 int test_serial() {
262     tbb::flow::graph g;
263     T bogus_value(-1);
264 
265     tbb::flow::sequencer_node<T> s(g, seq_inspector<T>());
266     tbb::flow::sequencer_node<T> s2(g, seq_inspector<T>());
267     T j = bogus_value;
268 
269     //
270     // Rejects attempts to add / remove predecessor
271     // Rejects request from empty Q
272     //
273     CHECK_MESSAGE( register_predecessor( s, s2 ) == false, "" );
274     CHECK_MESSAGE( remove_predecessor( s, s2 ) == false, "" );
275     CHECK_MESSAGE( s.try_get( j ) == false, "" );
276     CHECK_MESSAGE( j == bogus_value, "" );
277 
278     //
279     // In-order simple puts and gets
280     //
281 
282     for (int i = 0; i < N; ++i) {
283         bool msg = s.try_put( T(i) );
284         CHECK_MESSAGE( msg == true, "" );
285         CHECK_MESSAGE(!s.try_put( T(i) ), "");  // second attempt to put should reject
286     }
287 
288 
289     for (int i = 0; i < N; ++i) {
290         j = bogus_value;
291         CHECK_MESSAGE(wait_try_get( g, s, j ) == true, "");
292         CHECK_MESSAGE( i == j, "" );
293         CHECK_MESSAGE(!s.try_put( T(i) ),"" );  // after retrieving value, subsequent put should fail
294     }
295     j = bogus_value;
296     g.wait_for_all();
297     CHECK_MESSAGE( s.try_get( j ) == false, "" );
298     CHECK_MESSAGE( j == bogus_value, "" );
299 
300     //
301     // Reverse-order simple puts and gets
302     //
303 
304     for (int i = N-1; i >= 0; --i) {
305         bool msg = s2.try_put( T(i) );
306         CHECK_MESSAGE( msg == true, "" );
307     }
308 
309     for (int i = 0; i < N; ++i) {
310         j = bogus_value;
311         CHECK_MESSAGE(wait_try_get( g, s2, j ) == true, "");
312         CHECK_MESSAGE( i == j, "" );
313     }
314     j = bogus_value;
315     g.wait_for_all();
316     CHECK_MESSAGE( s2.try_get( j ) == false, "" );
317     CHECK_MESSAGE( j == bogus_value, "" );
318 
319     //
320     // Chained in-order simple puts and gets
321     //
322 
323     tbb::flow::sequencer_node<T> s3(g, seq_inspector<T>());
324     tbb::flow::sequencer_node<T> s4(g, seq_inspector<T>());
325     tbb::flow::sequencer_node<T> s5(g, seq_inspector<T>());
326     tbb::flow::make_edge( s3, s4 );
327     tbb::flow::make_edge( s4, s5 );
328 
329     for (int i = 0; i < N; ++i) {
330         bool msg = s3.try_put( T(i) );
331         CHECK_MESSAGE( msg == true, "" );
332     }
333 
334     for (int i = 0; i < N; ++i) {
335         j = bogus_value;
336         CHECK_MESSAGE(wait_try_get( g, s5, j ) == true, "");
337         CHECK_MESSAGE( i == j, "" );
338     }
339     j = bogus_value;
340     CHECK_MESSAGE( wait_try_get( g, s3, j ) == false, "" );
341     CHECK_MESSAGE( wait_try_get( g, s4, j ) == false, "" );
342     CHECK_MESSAGE( wait_try_get( g, s5, j ) == false, "" );
343     CHECK_MESSAGE( j == bogus_value, "" );
344 
345     g.wait_for_all();
346     tbb::flow::remove_edge( s3, s4 );
347     CHECK_MESSAGE( s3.try_put( N ) == true, "" );
348     CHECK_MESSAGE( wait_try_get( g, s4, j ) == false, "" );
349     CHECK_MESSAGE( j == bogus_value, "" );
350     CHECK_MESSAGE( wait_try_get( g, s5, j ) == false, "" );
351     CHECK_MESSAGE( j == bogus_value, "" );
352     CHECK_MESSAGE( wait_try_get( g, s3, j ) == true, "" );
353     CHECK_MESSAGE( j == N, "" );
354 
355     //
356     // Chained reverse-order simple puts and gets
357     //
358 
359     tbb::flow::sequencer_node<T> s6(g, seq_inspector<T>());
360     tbb::flow::sequencer_node<T> s7(g, seq_inspector<T>());
361     tbb::flow::sequencer_node<T> s8(g, seq_inspector<T>());
362     tbb::flow::make_edge( s6, s7 );
363     tbb::flow::make_edge( s7, s8 );
364 
365     for (int i = N-1; i >= 0; --i) {
366         bool msg = s6.try_put( T(i) );
367         CHECK_MESSAGE( msg == true, "" );
368     }
369 
370     for (int i = 0; i < N; ++i) {
371         j = bogus_value;
372         CHECK_MESSAGE( wait_try_get( g, s8, j ) == true, "" );
373         CHECK_MESSAGE( i == j, "" );
374     }
375     j = bogus_value;
376     CHECK_MESSAGE( wait_try_get( g, s6, j ) == false, "" );
377     CHECK_MESSAGE( wait_try_get( g, s7, j ) == false, "" );
378     CHECK_MESSAGE( wait_try_get( g, s8, j ) == false, "" );
379     CHECK_MESSAGE( j == bogus_value, "" );
380 
381     g.wait_for_all();
382     tbb::flow::remove_edge( s6, s7 );
383     CHECK_MESSAGE( s6.try_put( N ) == true, "" );
384     CHECK_MESSAGE( wait_try_get( g, s7, j ) == false, "" );
385     CHECK_MESSAGE( j == bogus_value, "" );
386     CHECK_MESSAGE( wait_try_get( g, s8, j ) == false, "" );
387     CHECK_MESSAGE( j == bogus_value, "" );
388     CHECK_MESSAGE( wait_try_get( g, s6, j ) == true, "" );
389     CHECK_MESSAGE( j == N, "" );
390 
391     return 0;
392 }
393 
394 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
395 #include <array>
396 #include <vector>
397 void test_follows_and_precedes_api() {
398     std::array<int, 3> messages_for_follows = { {0, 1, 2} };
399     std::vector<int> messages_for_precedes = {0, 1, 2};
400 
401     follows_and_precedes_testing::test_follows
402         <int, tbb::flow::sequencer_node<int>>
403         (messages_for_follows, [](const int& i) -> std::size_t { return i; });
404 
405     follows_and_precedes_testing::test_precedes
406         <int, tbb::flow::sequencer_node<int>>
407         (messages_for_precedes, [](const int& i) -> std::size_t { return i; });
408 }
409 #endif
410 
411 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
412 template <typename Body>
413 void test_deduction_guides_common(Body body) {
414     using namespace tbb::flow;
415     graph g;
416     broadcast_node<int> br(g);
417 
418     sequencer_node s1(g, body);
419     static_assert(std::is_same_v<decltype(s1), sequencer_node<int>>);
420 
421 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
422     sequencer_node s2(follows(br), body);
423     static_assert(std::is_same_v<decltype(s2), sequencer_node<int>>);
424 #endif
425 
426     sequencer_node s3(s1);
427     static_assert(std::is_same_v<decltype(s3), sequencer_node<int>>);
428 }
429 
430 std::size_t sequencer_body_f(const int&) { return 1; }
431 
432 void test_deduction_guides() {
433     test_deduction_guides_common([](const int&)->std::size_t { return 1; });
434     test_deduction_guides_common([](const int&) mutable ->std::size_t { return 1; });
435     test_deduction_guides_common(sequencer_body_f);
436 }
437 #endif
438 
439 //! Test sequencer with various request orders and parallelism levels
440 //! \brief \ref requirement \ref error_guessing
441 TEST_CASE("Serial and parallel test"){
442     for (int p = 2; p <= 4; ++p) {
443         tbb::task_arena arena(p);
444         arena.execute(
445             [&]() {
446                 test_serial<int>();
447                 test_parallel<int>(p);
448             }
449         );
450 	}
451 }
452 
453 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
454 //! Test decution guides
455 //! \brief \ref requirement
456 TEST_CASE("Test follows and precedes API"){
457     test_follows_and_precedes_api();
458 }
459 #endif
460 
461 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
462 //! Test decution guides
463 //! \brief \ref requirement
464 TEST_CASE("Test deduction guides"){
465     test_deduction_guides();
466 }
467 #endif
468 
469 #if __TBB_CPP20_CONCEPTS_PRESENT
470 //! \brief \ref error_guessing
471 TEST_CASE("constraints for sequencer_node object") {
472     struct Object : test_concepts::Copyable, test_concepts::CopyAssignable {};
473 
474     static_assert(utils::well_formed_instantiation<tbb::flow::sequencer_node, Object>);
475     static_assert(utils::well_formed_instantiation<tbb::flow::sequencer_node, int>);
476     static_assert(!utils::well_formed_instantiation<tbb::flow::sequencer_node, test_concepts::NonCopyable>);
477     static_assert(!utils::well_formed_instantiation<tbb::flow::sequencer_node, test_concepts::NonCopyAssignable>);
478 }
479 
480 template <typename T, typename Sequencer>
481 concept can_call_sequencer_node_ctor = requires( tbb::flow::graph& graph, Sequencer seq,
482                                                  tbb::flow::buffer_node<int>& f ) {
483     tbb::flow::sequencer_node<T>(graph, seq);
484 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
485     tbb::flow::sequencer_node<T>(tbb::flow::follows(f), seq);
486 #endif
487 };
488 
489 //! \brief \ref error_guessing
490 TEST_CASE("constraints for sequencer_node sequencer") {
491     using type = int;
492     using namespace test_concepts::sequencer;
493 
494     static_assert(can_call_sequencer_node_ctor<type, Correct<type>>);
495     static_assert(!can_call_sequencer_node_ctor<type, NonCopyable<type>>);
496     static_assert(!can_call_sequencer_node_ctor<type, NonDestructible<type>>);
497     static_assert(!can_call_sequencer_node_ctor<type, NoOperatorRoundBrackets<type>>);
498     static_assert(!can_call_sequencer_node_ctor<type, WrongInputOperatorRoundBrackets<type>>);
499     static_assert(!can_call_sequencer_node_ctor<type, WrongReturnOperatorRoundBrackets<type>>);
500 }
501 #endif // __TBB_CPP20_CONCEPTS_PRESENT
502