1 /*
2 Copyright (c) 2005-2021 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16
17 #include "common/config.h"
18
19 #include "tbb/flow_graph.h"
20 #include "tbb/global_control.h"
21
22 #include "common/test.h"
23 #include "common/utils.h"
24 #include "common/utils_assert.h"
25 #include "common/test_follows_and_precedes_api.h"
26 #include "common/concepts_common.h"
27
28 #include <cstdio>
29 #include <atomic>
30
31
32 //! \file test_sequencer_node.cpp
33 //! \brief Test for [flow_graph.sequencer_node] specification
34
35
36 #define N 1000
37 #define C 10
38
39 template< typename T >
40 struct seq_inspector {
operator ()seq_inspector41 size_t operator()(const T &v) const { return size_t(v); }
42 };
43
44 template< typename T >
wait_try_get(tbb::flow::graph & g,tbb::flow::sequencer_node<T> & q,T & value)45 bool wait_try_get( tbb::flow::graph &g, tbb::flow::sequencer_node<T> &q, T &value ) {
46 g.wait_for_all();
47 return q.try_get(value);
48 }
49
50 template< typename T >
spin_try_get(tbb::flow::queue_node<T> & q,T & value)51 void spin_try_get( tbb::flow::queue_node<T> &q, T &value ) {
52 while ( q.try_get(value) != true ) ;
53 }
54
55 template< typename T >
56 struct parallel_puts : utils::NoAssign {
57
58 tbb::flow::sequencer_node<T> &my_q;
59 int my_num_threads;
60
parallel_putsparallel_puts61 parallel_puts( tbb::flow::sequencer_node<T> &q, int num_threads ) : my_q(q), my_num_threads(num_threads) {}
62
operator ()parallel_puts63 void operator()(int tid) const {
64 for (int j = tid; j < N; j+=my_num_threads) {
65 bool msg = my_q.try_put( T(j) );
66 CHECK_MESSAGE( msg == true, "" );
67 }
68 }
69
70 };
71
72 template< typename T >
73 struct touches {
74
75 bool **my_touches;
76 T *my_last_touch;
77 int my_num_threads;
78
touchestouches79 touches( int num_threads ) : my_num_threads(num_threads) {
80 my_last_touch = new T[my_num_threads];
81 my_touches = new bool* [my_num_threads];
82 for ( int p = 0; p < my_num_threads; ++p) {
83 my_last_touch[p] = T(-1);
84 my_touches[p] = new bool[N];
85 for ( int n = 0; n < N; ++n)
86 my_touches[p][n] = false;
87 }
88 }
89
~touchestouches90 ~touches() {
91 for ( int p = 0; p < my_num_threads; ++p) {
92 delete [] my_touches[p];
93 }
94 delete [] my_touches;
95 delete [] my_last_touch;
96 }
97
checktouches98 bool check( int tid, T v ) {
99 if ( my_touches[tid][v] != false ) {
100 printf("Error: value seen twice by local thread\n");
101 return false;
102 }
103 if ( v <= my_last_touch[tid] ) {
104 printf("Error: value seen in wrong order by local thread\n");
105 return false;
106 }
107 my_last_touch[tid] = v;
108 my_touches[tid][v] = true;
109 return true;
110 }
111
validate_touchestouches112 bool validate_touches() {
113 bool *all_touches = new bool[N];
114 for ( int n = 0; n < N; ++n)
115 all_touches[n] = false;
116
117 for ( int p = 0; p < my_num_threads; ++p) {
118 for ( int n = 0; n < N; ++n) {
119 if ( my_touches[p][n] == true ) {
120 CHECK_MESSAGE( ( all_touches[n] == false), "value see by more than one thread\n" );
121 all_touches[n] = true;
122 }
123 }
124 }
125 for ( int n = 0; n < N; ++n) {
126 if ( !all_touches[n] )
127 printf("No touch at %d, my_num_threads = %d\n", n, my_num_threads);
128 //CHECK_MESSAGE( ( all_touches[n] == true), "value not seen by any thread\n" );
129 }
130 delete [] all_touches;
131 return true;
132 }
133
134 };
135
136 template< typename T >
137 struct parallel_gets : utils::NoAssign {
138
139 tbb::flow::sequencer_node<T> &my_q;
140 int my_num_threads;
141 touches<T> &my_touches;
142
parallel_getsparallel_gets143 parallel_gets( tbb::flow::sequencer_node<T> &q, int num_threads, touches<T> &t ) : my_q(q), my_num_threads(num_threads), my_touches(t) {}
144
operator ()parallel_gets145 void operator()(int tid) const {
146 for (int j = tid; j < N; j+=my_num_threads) {
147 T v;
148 spin_try_get( my_q, v );
149 my_touches.check( tid, v );
150 }
151 }
152
153 };
154
155 template< typename T >
156 struct parallel_put_get : utils::NoAssign {
157
158 tbb::flow::sequencer_node<T> &my_s1;
159 tbb::flow::sequencer_node<T> &my_s2;
160 int my_num_threads;
161 std::atomic< int > &my_counter;
162 touches<T> &my_touches;
163
parallel_put_getparallel_put_get164 parallel_put_get( tbb::flow::sequencer_node<T> &s1, tbb::flow::sequencer_node<T> &s2, int num_threads,
165 std::atomic<int> &counter, touches<T> &t ) : my_s1(s1), my_s2(s2), my_num_threads(num_threads), my_counter(counter), my_touches(t) {}
166
operator ()parallel_put_get167 void operator()(int tid) const {
168 int i_start = 0;
169
170 while ( (i_start = my_counter.fetch_add(C)) < N ) {
171 int i_end = ( N < i_start + C ) ? N : i_start + C;
172 for (int i = i_start; i < i_end; ++i) {
173 bool msg = my_s1.try_put( T(i) );
174 CHECK_MESSAGE( msg == true, "" );
175 }
176
177 for (int i = i_start; i < i_end; ++i) {
178 T v;
179 spin_try_get( my_s2, v );
180 my_touches.check( tid, v );
181 }
182 }
183 }
184
185 };
186
187 //
188 // Tests
189 //
190 // multiple parallel senders, multiple receivers, properly sequenced (relative to receiver) at output
191 // chained sequencers, multiple parallel senders, multiple receivers, properly sequenced (relative to receiver) at output
192 //
193
194 template< typename T >
test_parallel(int num_threads)195 int test_parallel(int num_threads) {
196 tbb::flow::graph g;
197
198 tbb::flow::sequencer_node<T> s(g, seq_inspector<T>());
199 utils::NativeParallelFor( num_threads, parallel_puts<T>(s, num_threads) );
200 {
201 touches<T> t( num_threads );
202 utils::NativeParallelFor( num_threads, parallel_gets<T>(s, num_threads, t) );
203 g.wait_for_all();
204 CHECK_MESSAGE( t.validate_touches(), "" );
205 }
206 T bogus_value(-1);
207 T j = bogus_value;
208 CHECK_MESSAGE( s.try_get( j ) == false, "" );
209 CHECK_MESSAGE( j == bogus_value, "" );
210 g.wait_for_all();
211
212 tbb::flow::sequencer_node<T> s1(g, seq_inspector<T>());
213 tbb::flow::sequencer_node<T> s2(g, seq_inspector<T>());
214 tbb::flow::sequencer_node<T> s3(g, seq_inspector<T>());
215 tbb::flow::make_edge( s1, s2 );
216 tbb::flow::make_edge( s2, s3 );
217
218 {
219 touches<T> t( num_threads );
220 std::atomic<int> counter;
221 counter = 0;
222 utils::NativeParallelFor( num_threads, parallel_put_get<T>(s1, s3, num_threads, counter, t) );
223 g.wait_for_all();
224 t.validate_touches();
225 }
226 g.wait_for_all();
227 CHECK_MESSAGE( s1.try_get( j ) == false, "" );
228 g.wait_for_all();
229 CHECK_MESSAGE( s2.try_get( j ) == false, "" );
230 g.wait_for_all();
231 CHECK_MESSAGE( s3.try_get( j ) == false, "" );
232 CHECK_MESSAGE( j == bogus_value, "" );
233
234 // test copy constructor
235 tbb::flow::sequencer_node<T> s_copy(s);
236 utils::NativeParallelFor( num_threads, parallel_puts<T>(s_copy, num_threads) );
237 for (int i = 0; i < N; ++i) {
238 j = bogus_value;
239 spin_try_get( s_copy, j );
240 CHECK_MESSAGE( i == j, "" );
241 }
242 j = bogus_value;
243 g.wait_for_all();
244 CHECK_MESSAGE( s_copy.try_get( j ) == false, "" );
245 CHECK_MESSAGE( j == bogus_value, "" );
246
247 return 0;
248 }
249
250
251 //
252 // Tests
253 //
254 // No predecessors can be registered
255 // Request from empty buffer fails
256 // In-order puts, single sender, single receiver, properly sequenced at output
257 // Reverse-order puts, single sender, single receiver, properly sequenced at output
258 // Chained sequencers (3), in-order and reverse-order tests, properly sequenced at output
259 //
260
261 template< typename T >
test_serial()262 int test_serial() {
263 tbb::flow::graph g;
264 T bogus_value(-1);
265
266 tbb::flow::sequencer_node<T> s(g, seq_inspector<T>());
267 tbb::flow::sequencer_node<T> s2(g, seq_inspector<T>());
268 T j = bogus_value;
269
270 //
271 // Rejects attempts to add / remove predecessor
272 // Rejects request from empty Q
273 //
274 CHECK_MESSAGE( register_predecessor( s, s2 ) == false, "" );
275 CHECK_MESSAGE( remove_predecessor( s, s2 ) == false, "" );
276 CHECK_MESSAGE( s.try_get( j ) == false, "" );
277 CHECK_MESSAGE( j == bogus_value, "" );
278
279 //
280 // In-order simple puts and gets
281 //
282
283 for (int i = 0; i < N; ++i) {
284 bool msg = s.try_put( T(i) );
285 CHECK_MESSAGE( msg == true, "" );
286 CHECK_MESSAGE(!s.try_put( T(i) ), ""); // second attempt to put should reject
287 }
288
289
290 for (int i = 0; i < N; ++i) {
291 j = bogus_value;
292 CHECK_MESSAGE(wait_try_get( g, s, j ) == true, "");
293 CHECK_MESSAGE( i == j, "" );
294 CHECK_MESSAGE(!s.try_put( T(i) ),"" ); // after retrieving value, subsequent put should fail
295 }
296 j = bogus_value;
297 g.wait_for_all();
298 CHECK_MESSAGE( s.try_get( j ) == false, "" );
299 CHECK_MESSAGE( j == bogus_value, "" );
300
301 //
302 // Reverse-order simple puts and gets
303 //
304
305 for (int i = N-1; i >= 0; --i) {
306 bool msg = s2.try_put( T(i) );
307 CHECK_MESSAGE( msg == true, "" );
308 }
309
310 for (int i = 0; i < N; ++i) {
311 j = bogus_value;
312 CHECK_MESSAGE(wait_try_get( g, s2, j ) == true, "");
313 CHECK_MESSAGE( i == j, "" );
314 }
315 j = bogus_value;
316 g.wait_for_all();
317 CHECK_MESSAGE( s2.try_get( j ) == false, "" );
318 CHECK_MESSAGE( j == bogus_value, "" );
319
320 //
321 // Chained in-order simple puts and gets
322 //
323
324 tbb::flow::sequencer_node<T> s3(g, seq_inspector<T>());
325 tbb::flow::sequencer_node<T> s4(g, seq_inspector<T>());
326 tbb::flow::sequencer_node<T> s5(g, seq_inspector<T>());
327 tbb::flow::make_edge( s3, s4 );
328 tbb::flow::make_edge( s4, s5 );
329
330 for (int i = 0; i < N; ++i) {
331 bool msg = s3.try_put( T(i) );
332 CHECK_MESSAGE( msg == true, "" );
333 }
334
335 for (int i = 0; i < N; ++i) {
336 j = bogus_value;
337 CHECK_MESSAGE(wait_try_get( g, s5, j ) == true, "");
338 CHECK_MESSAGE( i == j, "" );
339 }
340 j = bogus_value;
341 CHECK_MESSAGE( wait_try_get( g, s3, j ) == false, "" );
342 CHECK_MESSAGE( wait_try_get( g, s4, j ) == false, "" );
343 CHECK_MESSAGE( wait_try_get( g, s5, j ) == false, "" );
344 CHECK_MESSAGE( j == bogus_value, "" );
345
346 g.wait_for_all();
347 tbb::flow::remove_edge( s3, s4 );
348 CHECK_MESSAGE( s3.try_put( N ) == true, "" );
349 CHECK_MESSAGE( wait_try_get( g, s4, j ) == false, "" );
350 CHECK_MESSAGE( j == bogus_value, "" );
351 CHECK_MESSAGE( wait_try_get( g, s5, j ) == false, "" );
352 CHECK_MESSAGE( j == bogus_value, "" );
353 CHECK_MESSAGE( wait_try_get( g, s3, j ) == true, "" );
354 CHECK_MESSAGE( j == N, "" );
355
356 //
357 // Chained reverse-order simple puts and gets
358 //
359
360 tbb::flow::sequencer_node<T> s6(g, seq_inspector<T>());
361 tbb::flow::sequencer_node<T> s7(g, seq_inspector<T>());
362 tbb::flow::sequencer_node<T> s8(g, seq_inspector<T>());
363 tbb::flow::make_edge( s6, s7 );
364 tbb::flow::make_edge( s7, s8 );
365
366 for (int i = N-1; i >= 0; --i) {
367 bool msg = s6.try_put( T(i) );
368 CHECK_MESSAGE( msg == true, "" );
369 }
370
371 for (int i = 0; i < N; ++i) {
372 j = bogus_value;
373 CHECK_MESSAGE( wait_try_get( g, s8, j ) == true, "" );
374 CHECK_MESSAGE( i == j, "" );
375 }
376 j = bogus_value;
377 CHECK_MESSAGE( wait_try_get( g, s6, j ) == false, "" );
378 CHECK_MESSAGE( wait_try_get( g, s7, j ) == false, "" );
379 CHECK_MESSAGE( wait_try_get( g, s8, j ) == false, "" );
380 CHECK_MESSAGE( j == bogus_value, "" );
381
382 g.wait_for_all();
383 tbb::flow::remove_edge( s6, s7 );
384 CHECK_MESSAGE( s6.try_put( N ) == true, "" );
385 CHECK_MESSAGE( wait_try_get( g, s7, j ) == false, "" );
386 CHECK_MESSAGE( j == bogus_value, "" );
387 CHECK_MESSAGE( wait_try_get( g, s8, j ) == false, "" );
388 CHECK_MESSAGE( j == bogus_value, "" );
389 CHECK_MESSAGE( wait_try_get( g, s6, j ) == true, "" );
390 CHECK_MESSAGE( j == N, "" );
391
392 return 0;
393 }
394
395 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
396 #include <array>
397 #include <vector>
test_follows_and_precedes_api()398 void test_follows_and_precedes_api() {
399 std::array<int, 3> messages_for_follows = { {0, 1, 2} };
400 std::vector<int> messages_for_precedes = {0, 1, 2};
401
402 follows_and_precedes_testing::test_follows
403 <int, tbb::flow::sequencer_node<int>>
404 (messages_for_follows, [](const int& i) -> std::size_t { return i; });
405
406 follows_and_precedes_testing::test_precedes
407 <int, tbb::flow::sequencer_node<int>>
408 (messages_for_precedes, [](const int& i) -> std::size_t { return i; });
409 }
410 #endif
411
412 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
413 template <typename Body>
test_deduction_guides_common(Body body)414 void test_deduction_guides_common(Body body) {
415 using namespace tbb::flow;
416 graph g;
417 broadcast_node<int> br(g);
418
419 sequencer_node s1(g, body);
420 static_assert(std::is_same_v<decltype(s1), sequencer_node<int>>);
421
422 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
423 sequencer_node s2(follows(br), body);
424 static_assert(std::is_same_v<decltype(s2), sequencer_node<int>>);
425 #endif
426
427 sequencer_node s3(s1);
428 static_assert(std::is_same_v<decltype(s3), sequencer_node<int>>);
429 }
430
sequencer_body_f(const int &)431 std::size_t sequencer_body_f(const int&) { return 1; }
432
test_deduction_guides()433 void test_deduction_guides() {
434 test_deduction_guides_common([](const int&)->std::size_t { return 1; });
435 test_deduction_guides_common([](const int&) mutable ->std::size_t { return 1; });
436 test_deduction_guides_common(sequencer_body_f);
437 }
438 #endif
439
440 //! Test sequencer with various request orders and parallelism levels
441 //! \brief \ref requirement \ref error_guessing
442 TEST_CASE("Serial and parallel test"){
443 for (int p = 2; p <= 4; ++p) {
444 tbb::global_control thread_limit(tbb::global_control::max_allowed_parallelism, p);
445 tbb::task_arena arena(p);
446 arena.execute(
__anonf3616eba0502() 447 [&]() {
448 test_serial<int>();
449 test_parallel<int>(p);
450 }
451 );
452 }
453 }
454
455 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
456 //! Test decution guides
457 //! \brief \ref requirement
458 TEST_CASE("Test follows and precedes API"){
459 test_follows_and_precedes_api();
460 }
461 #endif
462
463 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
464 //! Test decution guides
465 //! \brief \ref requirement
466 TEST_CASE("Test deduction guides"){
467 test_deduction_guides();
468 }
469 #endif
470
471 #if __TBB_CPP20_CONCEPTS_PRESENT
472 //! \brief \ref error_guessing
473 TEST_CASE("constraints for sequencer_node object") {
474 struct Object : test_concepts::Copyable, test_concepts::CopyAssignable {};
475
476 static_assert(utils::well_formed_instantiation<tbb::flow::sequencer_node, Object>);
477 static_assert(utils::well_formed_instantiation<tbb::flow::sequencer_node, int>);
478 static_assert(!utils::well_formed_instantiation<tbb::flow::sequencer_node, test_concepts::NonCopyable>);
479 static_assert(!utils::well_formed_instantiation<tbb::flow::sequencer_node, test_concepts::NonCopyAssignable>);
480 }
481
482 template <typename T, typename Sequencer>
483 concept can_call_sequencer_node_ctor = requires( tbb::flow::graph& graph, Sequencer seq,
484 tbb::flow::buffer_node<int>& f ) {
485 tbb::flow::sequencer_node<T>(graph, seq);
486 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
487 tbb::flow::sequencer_node<T>(tbb::flow::follows(f), seq);
488 #endif
489 };
490
491 //! \brief \ref error_guessing
492 TEST_CASE("constraints for sequencer_node sequencer") {
493 using type = int;
494 using namespace test_concepts::sequencer;
495
496 static_assert(can_call_sequencer_node_ctor<type, Correct<type>>);
497 static_assert(!can_call_sequencer_node_ctor<type, NonCopyable<type>>);
498 static_assert(!can_call_sequencer_node_ctor<type, NonDestructible<type>>);
499 static_assert(!can_call_sequencer_node_ctor<type, NoOperatorRoundBrackets<type>>);
500 static_assert(!can_call_sequencer_node_ctor<type, WrongInputOperatorRoundBrackets<type>>);
501 static_assert(!can_call_sequencer_node_ctor<type, WrongReturnOperatorRoundBrackets<type>>);
502 }
503 #endif // __TBB_CPP20_CONCEPTS_PRESENT
504