1 /*
2 Copyright (c) 2005-2023 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16
17 #include "common/config.h"
18
19 #include "tbb/flow_graph.h"
20 #include "tbb/global_control.h"
21
22 #include "common/test.h"
23 #include "common/utils.h"
24 #include "common/graph_utils.h"
25 #include "common/test_follows_and_precedes_api.h"
26
27
28 //! \file test_buffer_node.cpp
29 //! \brief Test for [flow_graph.buffer_node] specification
30
31
32 #define N 1000
33 #define C 10
34
35 template< typename T >
spin_try_get(tbb::flow::buffer_node<T> & b,T & value)36 void spin_try_get( tbb::flow::buffer_node<T> &b, T &value ) {
37 while ( b.try_get(value) != true ) {}
38 }
39
40 template< typename T >
check_item(T * count_value,T & value)41 void check_item( T* count_value, T &value ) {
42 count_value[value / N] += value % N;
43 }
44
45 template< typename T >
46 struct parallel_puts : utils::NoAssign {
47
48 tbb::flow::buffer_node<T> &my_b;
49
parallel_putsparallel_puts50 parallel_puts( tbb::flow::buffer_node<T> &b ) : my_b(b) {}
51
operator ()parallel_puts52 void operator()(int i) const {
53 for (int j = 0; j < N; ++j) {
54 bool msg = my_b.try_put( T(N*i + j) );
55 CHECK_MESSAGE( msg == true, "" );
56 }
57 }
58 };
59
60 template< typename T >
61 struct touches {
62
63 bool **my_touches;
64 int my_num_threads;
65
touchestouches66 touches( int num_threads ) : my_num_threads(num_threads) {
67 my_touches = new bool* [my_num_threads];
68 for ( int p = 0; p < my_num_threads; ++p) {
69 my_touches[p] = new bool[N];
70 for ( int n = 0; n < N; ++n)
71 my_touches[p][n] = false;
72 }
73 }
74
~touchestouches75 ~touches() {
76 for ( int p = 0; p < my_num_threads; ++p) {
77 delete [] my_touches[p];
78 }
79 delete [] my_touches;
80 }
81
checktouches82 bool check( T v ) {
83 CHECK_MESSAGE( my_touches[v/N][v%N] == false, "" );
84 my_touches[v/N][v%N] = true;
85 return true;
86 }
87
validate_touchestouches88 bool validate_touches() {
89 for ( int p = 0; p < my_num_threads; ++p) {
90 for ( int n = 0; n < N; ++n) {
91 CHECK_MESSAGE( my_touches[p][n] == true, "" );
92 }
93 }
94 return true;
95 }
96 };
97
98 template< typename T >
99 struct parallel_gets : utils::NoAssign {
100
101 tbb::flow::buffer_node<T> &my_b;
102 touches<T> &my_touches;
103
parallel_getsparallel_gets104 parallel_gets( tbb::flow::buffer_node<T> &b, touches<T> &t) : my_b(b), my_touches(t) {}
105
operator ()parallel_gets106 void operator()(int) const {
107 for (int j = 0; j < N; ++j) {
108 T v;
109 spin_try_get( my_b, v );
110 my_touches.check( v );
111 }
112 }
113
114 };
115
116 template< typename T >
117 struct parallel_put_get : utils::NoAssign {
118
119 tbb::flow::buffer_node<T> &my_b;
120 touches<T> &my_touches;
121
parallel_put_getparallel_put_get122 parallel_put_get( tbb::flow::buffer_node<T> &b, touches<T> &t ) : my_b(b), my_touches(t) {}
123
operator ()parallel_put_get124 void operator()(int tid) const {
125
126 for ( int i = 0; i < N; i+=C ) {
127 int j_end = ( N < i + C ) ? N : i + C;
128 // dump about C values into the buffer
129 for ( int j = i; j < j_end; ++j ) {
130 CHECK_MESSAGE( my_b.try_put( T (N*tid + j ) ) == true, "" );
131 }
132 // receiver about C values from the buffer
133 for ( int j = i; j < j_end; ++j ) {
134 T v;
135 spin_try_get( my_b, v );
136 my_touches.check( v );
137 }
138 }
139 }
140
141 };
142
143 //
144 // Tests
145 //
146 // Item can be reserved, released, consumed ( single serial receiver )
147 //
148 template< typename T >
test_reservation()149 int test_reservation() {
150 tbb::flow::graph g;
151 T bogus_value(-1);
152
153 // Simple tests
154 tbb::flow::buffer_node<T> b(g);
155
156 b.try_put(T(1));
157 b.try_put(T(2));
158 b.try_put(T(3));
159
160 T v, vsum;
161 CHECK_MESSAGE( b.try_reserve(v) == true, "" );
162 CHECK_MESSAGE( b.try_release() == true, "" );
163 v = bogus_value;
164 g.wait_for_all();
165 CHECK_MESSAGE( b.try_reserve(v) == true, "" );
166 CHECK_MESSAGE( b.try_consume() == true, "" );
167 vsum += v;
168 v = bogus_value;
169 g.wait_for_all();
170
171 CHECK_MESSAGE( b.try_get(v) == true, "" );
172 vsum += v;
173 v = bogus_value;
174 g.wait_for_all();
175
176 CHECK_MESSAGE( b.try_reserve(v) == true, "" );
177 CHECK_MESSAGE( b.try_release() == true, "" );
178 v = bogus_value;
179 g.wait_for_all();
180 CHECK_MESSAGE( b.try_reserve(v) == true, "" );
181 CHECK_MESSAGE( b.try_consume() == true, "" );
182 vsum += v;
183 CHECK_MESSAGE( vsum == T(6), "");
184 v = bogus_value;
185 g.wait_for_all();
186
187 return 0;
188 }
189
190 //
191 // Tests
192 //
193 // multiple parallel senders, items in arbitrary order
194 // multiple parallel senders, multiple parallel receivers, items in arbitrary order and all items received
195 // * overlapped puts / gets
196 // * all puts finished before any getS
197 //
198 template< typename T >
test_parallel(int num_threads)199 int test_parallel(int num_threads) {
200 tbb::flow::graph g;
201 tbb::flow::buffer_node<T> b(g);
202 tbb::flow::buffer_node<T> b2(g);
203 tbb::flow::buffer_node<T> b3(g);
204 T bogus_value(-1);
205 T j = bogus_value;
206
207 NativeParallelFor( num_threads, parallel_puts<T>(b) );
208
209 T *next_value = new T[num_threads];
210 for (int tid = 0; tid < num_threads; ++tid) next_value[tid] = T(0);
211
212 for (int i = 0; i < num_threads * N; ++i ) {
213 spin_try_get( b, j );
214 check_item( next_value, j );
215 j = bogus_value;
216 }
217 for (int tid = 0; tid < num_threads; ++tid) {
218 CHECK_MESSAGE( next_value[tid] == T((N*(N-1))/2), "" );
219 }
220
221 j = bogus_value;
222 g.wait_for_all();
223 CHECK_MESSAGE( b.try_get( j ) == false, "" );
224 CHECK_MESSAGE( j == bogus_value, "" );
225
226 NativeParallelFor( num_threads, parallel_puts<T>(b) );
227
228 {
229 touches< T > t( num_threads );
230 NativeParallelFor( num_threads, parallel_gets<T>(b, t) );
231 g.wait_for_all();
232 CHECK_MESSAGE( t.validate_touches(), "" );
233 }
234 j = bogus_value;
235 CHECK_MESSAGE( b.try_get( j ) == false, "" );
236 CHECK_MESSAGE( j == bogus_value, "" );
237
238 g.wait_for_all();
239 {
240 touches< T > t( num_threads );
241 NativeParallelFor( num_threads, parallel_put_get<T>(b, t) );
242 g.wait_for_all();
243 CHECK_MESSAGE( t.validate_touches(), "" );
244 }
245 j = bogus_value;
246 CHECK_MESSAGE( b.try_get( j ) == false, "" );
247 CHECK_MESSAGE( j == bogus_value, "" );
248
249 tbb::flow::make_edge( b, b2 );
250 tbb::flow::make_edge( b2, b3 );
251
252 NativeParallelFor( num_threads, parallel_puts<T>(b) );
253 {
254 touches< T > t( num_threads );
255 NativeParallelFor( num_threads, parallel_gets<T>(b3, t) );
256 g.wait_for_all();
257 CHECK_MESSAGE( t.validate_touches(), "" );
258 }
259 j = bogus_value;
260 g.wait_for_all();
261 CHECK_MESSAGE( b.try_get( j ) == false, "" );
262 g.wait_for_all();
263 CHECK_MESSAGE( b2.try_get( j ) == false, "" );
264 g.wait_for_all();
265 CHECK_MESSAGE( b3.try_get( j ) == false, "" );
266 CHECK_MESSAGE( j == bogus_value, "" );
267
268 // test copy constructor
269 CHECK_MESSAGE( b.remove_successor( b2 ), "" );
270 // fill up b:
271 NativeParallelFor( num_threads, parallel_puts<T>(b) );
272 // copy b:
273 tbb::flow::buffer_node<T> b_copy(b);
274
275 // b_copy should be empty
276 j = bogus_value;
277 g.wait_for_all();
278 CHECK_MESSAGE( b_copy.try_get( j ) == false, "" );
279
280 // hook them together:
281 CHECK_MESSAGE( b.register_successor(b_copy) == true, "" );
282 // try to get content from b_copy
283 {
284 touches< T > t( num_threads );
285 NativeParallelFor( num_threads, parallel_gets<T>(b_copy, t) );
286 g.wait_for_all();
287 CHECK_MESSAGE( t.validate_touches(), "" );
288 }
289 // now both should be empty
290 j = bogus_value;
291 g.wait_for_all();
292 CHECK_MESSAGE( b.try_get( j ) == false, "" );
293 g.wait_for_all();
294 CHECK_MESSAGE( b_copy.try_get( j ) == false, "" );
295 CHECK_MESSAGE( j == bogus_value, "" );
296
297 delete [] next_value;
298 return 0;
299 }
300
301 //
302 // Tests
303 //
304 // Predecessors cannot be registered
305 // Empty buffer rejects item requests
306 // Single serial sender, items in arbitrary order
307 // Chained buffers ( 2 & 3 ), single sender, items at last buffer in arbitrary order
308 //
309
310 #define TBB_INTERNAL_NAMESPACE detail::d1
311 using tbb::TBB_INTERNAL_NAMESPACE::register_predecessor;
312 using tbb::TBB_INTERNAL_NAMESPACE::remove_predecessor;
313
314 template< typename T >
test_serial()315 int test_serial() {
316 tbb::flow::graph g;
317 T bogus_value(-1);
318
319 tbb::flow::buffer_node<T> b(g);
320 tbb::flow::buffer_node<T> b2(g);
321 T j = bogus_value;
322
323 //
324 // Rejects attempts to add / remove predecessor
325 // Rejects request from empty buffer
326 //
327 CHECK_MESSAGE( register_predecessor<T>( b, b2 ) == false, "" );
328 CHECK_MESSAGE( remove_predecessor<T>( b, b2 ) == false, "" );
329 CHECK_MESSAGE( b.try_get( j ) == false, "" );
330 CHECK_MESSAGE( j == bogus_value, "" );
331
332 //
333 // Simple puts and gets
334 //
335
336 for (int i = 0; i < N; ++i) {
337 bool msg = b.try_put( T(i) );
338 CHECK_MESSAGE( msg == true, "" );
339 }
340
341 T vsum = T(0);
342 for (int i = 0; i < N; ++i) {
343 j = bogus_value;
344 spin_try_get( b, j );
345 vsum += j;
346 }
347 CHECK_MESSAGE( vsum == (N*(N-1))/2, "");
348 j = bogus_value;
349 g.wait_for_all();
350 CHECK_MESSAGE( b.try_get( j ) == false, "" );
351 CHECK_MESSAGE( j == bogus_value, "" );
352
353 tbb::flow::make_edge(b, b2);
354
355 vsum = T(0);
356 for (int i = 0; i < N; ++i) {
357 bool msg = b.try_put( T(i) );
358 CHECK_MESSAGE( msg == true, "" );
359 }
360
361 for (int i = 0; i < N; ++i) {
362 j = bogus_value;
363 spin_try_get( b2, j );
364 vsum += j;
365 }
366 CHECK_MESSAGE( vsum == (N*(N-1))/2, "");
367 j = bogus_value;
368 g.wait_for_all();
369 CHECK_MESSAGE( b.try_get( j ) == false, "" );
370 g.wait_for_all();
371 CHECK_MESSAGE( b2.try_get( j ) == false, "" );
372 CHECK_MESSAGE( j == bogus_value, "" );
373
374 tbb::flow::remove_edge(b, b2);
375 CHECK_MESSAGE( b.try_put( 1 ) == true, "" );
376 g.wait_for_all();
377 CHECK_MESSAGE( b2.try_get( j ) == false, "" );
378 CHECK_MESSAGE( j == bogus_value, "" );
379 g.wait_for_all();
380 CHECK_MESSAGE( b.try_get( j ) == true, "" );
381 CHECK_MESSAGE( j == 1, "" );
382
383 tbb::flow::buffer_node<T> b3(g);
384 tbb::flow::make_edge( b, b2 );
385 tbb::flow::make_edge( b2, b3 );
386
387 vsum = T(0);
388 for (int i = 0; i < N; ++i) {
389 bool msg = b.try_put( T(i) );
390 CHECK_MESSAGE( msg == true, "" );
391 }
392
393 for (int i = 0; i < N; ++i) {
394 j = bogus_value;
395 spin_try_get( b3, j );
396 vsum += j;
397 }
398 CHECK_MESSAGE( vsum == (N*(N-1))/2, "");
399 j = bogus_value;
400 g.wait_for_all();
401 CHECK_MESSAGE( b.try_get( j ) == false, "" );
402 g.wait_for_all();
403 CHECK_MESSAGE( b2.try_get( j ) == false, "" );
404 g.wait_for_all();
405 CHECK_MESSAGE( b3.try_get( j ) == false, "" );
406 CHECK_MESSAGE( j == bogus_value, "" );
407
408 tbb::flow::remove_edge(b, b2);
409 CHECK_MESSAGE( b.try_put( 1 ) == true, "" );
410 g.wait_for_all();
411 CHECK_MESSAGE( b2.try_get( j ) == false, "" );
412 CHECK_MESSAGE( j == bogus_value, "" );
413 g.wait_for_all();
414 CHECK_MESSAGE( b3.try_get( j ) == false, "" );
415 CHECK_MESSAGE( j == bogus_value, "" );
416 g.wait_for_all();
417 CHECK_MESSAGE( b.try_get( j ) == true, "" );
418 CHECK_MESSAGE( j == 1, "" );
419
420 return 0;
421 }
422
423 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
424 #include <array>
425 #include <vector>
test_follows_and_precedes_api()426 void test_follows_and_precedes_api() {
427 using msg_t = tbb::flow::continue_msg;
428
429 std::array<msg_t, 3> messages_for_follows = { {msg_t(), msg_t(), msg_t()} };
430 std::vector<msg_t> messages_for_precedes = {msg_t(), msg_t(), msg_t()};
431
432 follows_and_precedes_testing::test_follows<msg_t, tbb::flow::buffer_node<msg_t>>(messages_for_follows);
433 follows_and_precedes_testing::test_precedes<msg_t, tbb::flow::buffer_node<msg_t>>(messages_for_precedes);
434 }
435 #endif
436
437 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
test_deduction_guides()438 void test_deduction_guides() {
439 using namespace tbb::flow;
440 graph g;
441 broadcast_node<int> br(g);
442 buffer_node<int> b0(g);
443
444 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
445 buffer_node b1(follows(br));
446 static_assert(std::is_same_v<decltype(b1), buffer_node<int>>);
447
448 buffer_node b2(precedes(br));
449 static_assert(std::is_same_v<decltype(b2), buffer_node<int>>);
450 #endif
451
452 buffer_node b3(b0);
453 static_assert(std::is_same_v<decltype(b3), buffer_node<int>>);
454 g.wait_for_all();
455 }
456 #endif
457
458 #include <iomanip>
459
460 //! Test buffer_node with parallel and serial neighbours
461 //! \brief \ref requirement \ref error_guessing
462 TEST_CASE("Serial and parallel test"){
463 for (int p = 2; p <= 4; ++p) {
464 tbb::global_control thread_limit(tbb::global_control::max_allowed_parallelism, p);
465 tbb::task_arena arena(p);
466 arena.execute(
__anon4e62daa90102() 467 [&]() {
468 test_serial<int>();
469 test_parallel<int>(p);
470 }
471 );
472 }
473 }
474
475 //! Test reset and cancellation behavior
476 //! \brief \ref error_guessing
477 TEST_CASE("Resets"){
478 test_resets<int,tbb::flow::buffer_node<int> >();
479 test_resets<float,tbb::flow::buffer_node<float> >();
480 }
481
482 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
483 //! Test deprecated follows and precedes API
484 //! \brief \ref error_guessing
485 TEST_CASE("Follows and precedes API"){
486 test_follows_and_precedes_api();
487 }
488 #endif
489
490 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
491 //! Test deduction guides
492 //! \brief requirement
493 TEST_CASE("Deduction guides"){
494 test_deduction_guides();
495 }
496 #endif
497