xref: /oneTBB/test/tbb/test_buffer_node.cpp (revision d86ed7fb)
1 /*
2     Copyright (c) 2005-2020 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #include "common/config.h"
18 
19 // TODO revamp: move parts dependent on __TBB_EXTRA_DEBUG into separate test(s) since having these
20 // parts in all of tests might make testing of the product, which is different from what is actually
21 // released.
22 #define __TBB_EXTRA_DEBUG 1
23 #include "tbb/flow_graph.h"
24 
25 #include "common/test.h"
26 #include "common/utils.h"
27 #include "common/graph_utils.h"
28 #include "common/test_follows_and_precedes_api.h"
29 
30 
31 //! \file test_buffer_node.cpp
32 //! \brief Test for [flow_graph.buffer_node] specification
33 
34 
35 #define N 1000
36 #define C 10
37 
38 template< typename T >
39 void spin_try_get( tbb::flow::buffer_node<T> &b, T &value ) {
40     while ( b.try_get(value) != true ) {}
41 }
42 
43 template< typename T >
44 void check_item( T* count_value, T &value ) {
45     count_value[value / N] += value % N;
46 }
47 
48 template< typename T >
49 struct parallel_puts : utils::NoAssign {
50 
51     tbb::flow::buffer_node<T> &my_b;
52 
53     parallel_puts( tbb::flow::buffer_node<T> &b ) : my_b(b) {}
54 
55     void operator()(int i) const {
56         for (int j = 0; j < N; ++j) {
57             bool msg = my_b.try_put( T(N*i + j) );
58             CHECK_MESSAGE( msg == true, "" );
59         }
60     }
61 };
62 
63 template< typename T >
64 struct touches {
65 
66     bool **my_touches;
67     int my_num_threads;
68 
69     touches( int num_threads ) : my_num_threads(num_threads) {
70         my_touches = new bool* [my_num_threads];
71         for ( int p = 0; p < my_num_threads; ++p) {
72             my_touches[p] = new bool[N];
73             for ( int n = 0; n < N; ++n)
74                 my_touches[p][n] = false;
75         }
76     }
77 
78     ~touches() {
79         for ( int p = 0; p < my_num_threads; ++p) {
80             delete [] my_touches[p];
81         }
82         delete [] my_touches;
83     }
84 
85     bool check( T v ) {
86         CHECK_MESSAGE( my_touches[v/N][v%N] == false, "" );
87         my_touches[v/N][v%N] = true;
88         return true;
89     }
90 
91     bool validate_touches() {
92         for ( int p = 0; p < my_num_threads; ++p) {
93             for ( int n = 0; n < N; ++n) {
94                 CHECK_MESSAGE( my_touches[p][n] == true, "" );
95             }
96         }
97         return true;
98     }
99 };
100 
101 template< typename T >
102 struct parallel_gets : utils::NoAssign {
103 
104     tbb::flow::buffer_node<T> &my_b;
105     touches<T> &my_touches;
106 
107     parallel_gets( tbb::flow::buffer_node<T> &b, touches<T> &t) : my_b(b), my_touches(t) {}
108 
109     void operator()(int) const {
110         for (int j = 0; j < N; ++j) {
111             T v;
112             spin_try_get( my_b, v );
113             my_touches.check( v );
114         }
115     }
116 
117 };
118 
119 template< typename T >
120 struct parallel_put_get : utils::NoAssign {
121 
122     tbb::flow::buffer_node<T> &my_b;
123     touches<T> &my_touches;
124 
125     parallel_put_get( tbb::flow::buffer_node<T> &b, touches<T> &t ) : my_b(b), my_touches(t) {}
126 
127     void operator()(int tid) const {
128 
129         for ( int i = 0; i < N; i+=C ) {
130             int j_end = ( N < i + C ) ? N : i + C;
131             // dump about C values into the buffer
132             for ( int j = i; j < j_end; ++j ) {
133                 CHECK_MESSAGE( my_b.try_put( T (N*tid + j ) ) == true, "" );
134             }
135             // receiver about C values from the buffer
136             for ( int j = i; j < j_end; ++j ) {
137                 T v;
138                 spin_try_get( my_b, v );
139                 my_touches.check( v );
140             }
141         }
142     }
143 
144 };
145 
146 //
147 // Tests
148 //
149 // Item can be reserved, released, consumed ( single serial receiver )
150 //
151 template< typename T >
152 int test_reservation() {
153     tbb::flow::graph g;
154     T bogus_value(-1);
155 
156     // Simple tests
157     tbb::flow::buffer_node<T> b(g);
158 
159     b.try_put(T(1));
160     b.try_put(T(2));
161     b.try_put(T(3));
162 
163     T v, vsum;
164     CHECK_MESSAGE( b.try_reserve(v) == true, "" );
165     CHECK_MESSAGE( b.try_release() == true, "" );
166     v = bogus_value;
167     g.wait_for_all();
168     CHECK_MESSAGE( b.try_reserve(v) == true, "" );
169     CHECK_MESSAGE( b.try_consume() == true, "" );
170     vsum += v;
171     v = bogus_value;
172     g.wait_for_all();
173 
174     CHECK_MESSAGE( b.try_get(v) == true, "" );
175     vsum += v;
176     v = bogus_value;
177     g.wait_for_all();
178 
179     CHECK_MESSAGE( b.try_reserve(v) == true, "" );
180     CHECK_MESSAGE( b.try_release() == true, "" );
181     v = bogus_value;
182     g.wait_for_all();
183     CHECK_MESSAGE( b.try_reserve(v) == true, "" );
184     CHECK_MESSAGE( b.try_consume() == true, "" );
185     vsum += v;
186     CHECK_MESSAGE( vsum == T(6), "");
187     v = bogus_value;
188     g.wait_for_all();
189 
190     return 0;
191 }
192 
193 //
194 // Tests
195 //
196 // multiple parallel senders, items in arbitrary order
197 // multiple parallel senders, multiple parallel receivers, items in arbitrary order and all items received
198 //   * overlapped puts / gets
199 //   * all puts finished before any getS
200 //
201 template< typename T >
202 int test_parallel(int num_threads) {
203     tbb::flow::graph g;
204     tbb::flow::buffer_node<T> b(g);
205     tbb::flow::buffer_node<T> b2(g);
206     tbb::flow::buffer_node<T> b3(g);
207     T bogus_value(-1);
208     T j = bogus_value;
209 
210     NativeParallelFor( num_threads, parallel_puts<T>(b) );
211 
212     T *next_value = new T[num_threads];
213     for (int tid = 0; tid < num_threads; ++tid) next_value[tid] = T(0);
214 
215     for (int i = 0; i < num_threads * N; ++i ) {
216         spin_try_get( b, j );
217         check_item( next_value, j );
218         j = bogus_value;
219     }
220     for (int tid = 0; tid < num_threads; ++tid)  {
221         CHECK_MESSAGE( next_value[tid] == T((N*(N-1))/2), "" );
222     }
223 
224     j = bogus_value;
225     g.wait_for_all();
226     CHECK_MESSAGE( b.try_get( j ) == false, "" );
227     CHECK_MESSAGE( j == bogus_value, "" );
228 
229     NativeParallelFor( num_threads, parallel_puts<T>(b) );
230 
231     {
232         touches< T > t( num_threads );
233         NativeParallelFor( num_threads, parallel_gets<T>(b, t) );
234         g.wait_for_all();
235         CHECK_MESSAGE( t.validate_touches(), "" );
236     }
237     j = bogus_value;
238     CHECK_MESSAGE( b.try_get( j ) == false, "" );
239     CHECK_MESSAGE( j == bogus_value, "" );
240 
241     g.wait_for_all();
242     {
243         touches< T > t( num_threads );
244         NativeParallelFor( num_threads, parallel_put_get<T>(b, t) );
245         g.wait_for_all();
246         CHECK_MESSAGE( t.validate_touches(), "" );
247     }
248     j = bogus_value;
249     CHECK_MESSAGE( b.try_get( j ) == false, "" );
250     CHECK_MESSAGE( j == bogus_value, "" );
251 
252     tbb::flow::make_edge( b, b2 );
253     tbb::flow::make_edge( b2, b3 );
254 
255     NativeParallelFor( num_threads, parallel_puts<T>(b) );
256     {
257         touches< T > t( num_threads );
258         NativeParallelFor( num_threads, parallel_gets<T>(b3, t) );
259         g.wait_for_all();
260         CHECK_MESSAGE( t.validate_touches(), "" );
261     }
262     j = bogus_value;
263     g.wait_for_all();
264     CHECK_MESSAGE( b.try_get( j ) == false, "" );
265     g.wait_for_all();
266     CHECK_MESSAGE( b2.try_get( j ) == false, "" );
267     g.wait_for_all();
268     CHECK_MESSAGE( b3.try_get( j ) == false, "" );
269     CHECK_MESSAGE( j == bogus_value, "" );
270 
271     // test copy constructor
272     CHECK_MESSAGE( b.remove_successor( b2 ), "" );
273     // fill up b:
274     NativeParallelFor( num_threads, parallel_puts<T>(b) );
275     // copy b:
276     tbb::flow::buffer_node<T> b_copy(b);
277 
278     // b_copy should be empty
279     j = bogus_value;
280     g.wait_for_all();
281     CHECK_MESSAGE( b_copy.try_get( j ) == false, "" );
282 
283     // hook them together:
284     CHECK_MESSAGE( b.register_successor(b_copy) == true, "" );
285     // try to get content from b_copy
286     {
287         touches< T > t( num_threads );
288         NativeParallelFor( num_threads, parallel_gets<T>(b_copy, t) );
289         g.wait_for_all();
290         CHECK_MESSAGE( t.validate_touches(), "" );
291     }
292     // now both should be empty
293     j = bogus_value;
294     g.wait_for_all();
295     CHECK_MESSAGE( b.try_get( j ) == false, "" );
296     g.wait_for_all();
297     CHECK_MESSAGE( b_copy.try_get( j ) == false, "" );
298     CHECK_MESSAGE( j == bogus_value, "" );
299 
300     delete [] next_value;
301     return 0;
302 }
303 
304 //
305 // Tests
306 //
307 // Predecessors cannot be registered
308 // Empty buffer rejects item requests
309 // Single serial sender, items in arbitrary order
310 // Chained buffers ( 2 & 3 ), single sender, items at last buffer in arbitrary order
311 //
312 
313 #define TBB_INTERNAL_NAMESPACE detail::d1
314 using tbb::TBB_INTERNAL_NAMESPACE::register_predecessor;
315 using tbb::TBB_INTERNAL_NAMESPACE::remove_predecessor;
316 
317 template< typename T >
318 int test_serial() {
319     tbb::flow::graph g;
320     T bogus_value(-1);
321 
322     tbb::flow::buffer_node<T> b(g);
323     tbb::flow::buffer_node<T> b2(g);
324     T j = bogus_value;
325 
326     //
327     // Rejects attempts to add / remove predecessor
328     // Rejects request from empty buffer
329     //
330     CHECK_MESSAGE( register_predecessor<T>( b, b2 ) == false, "" );
331     CHECK_MESSAGE( remove_predecessor<T>( b, b2 ) == false, "" );
332     CHECK_MESSAGE( b.try_get( j ) == false, "" );
333     CHECK_MESSAGE( j == bogus_value, "" );
334 
335     //
336     // Simple puts and gets
337     //
338 
339     for (int i = 0; i < N; ++i) {
340         bool msg = b.try_put( T(i) );
341         CHECK_MESSAGE( msg == true, "" );
342     }
343 
344     T vsum = T(0);
345     for (int i = 0; i < N; ++i) {
346         j = bogus_value;
347         spin_try_get( b, j );
348         vsum += j;
349     }
350     CHECK_MESSAGE( vsum == (N*(N-1))/2, "");
351     j = bogus_value;
352     g.wait_for_all();
353     CHECK_MESSAGE( b.try_get( j ) == false, "" );
354     CHECK_MESSAGE( j == bogus_value, "" );
355 
356     tbb::flow::make_edge(b, b2);
357 
358     vsum = T(0);
359     for (int i = 0; i < N; ++i) {
360         bool msg = b.try_put( T(i) );
361         CHECK_MESSAGE( msg == true, "" );
362     }
363 
364     for (int i = 0; i < N; ++i) {
365         j = bogus_value;
366         spin_try_get( b2, j );
367         vsum += j;
368     }
369     CHECK_MESSAGE( vsum == (N*(N-1))/2, "");
370     j = bogus_value;
371     g.wait_for_all();
372     CHECK_MESSAGE( b.try_get( j ) == false, "" );
373     g.wait_for_all();
374     CHECK_MESSAGE( b2.try_get( j ) == false, "" );
375     CHECK_MESSAGE( j == bogus_value, "" );
376 
377     tbb::flow::remove_edge(b, b2);
378     CHECK_MESSAGE( b.try_put( 1 ) == true, "" );
379     g.wait_for_all();
380     CHECK_MESSAGE( b2.try_get( j ) == false, "" );
381     CHECK_MESSAGE( j == bogus_value, "" );
382     g.wait_for_all();
383     CHECK_MESSAGE( b.try_get( j ) == true, "" );
384     CHECK_MESSAGE( j == 1, "" );
385 
386     tbb::flow::buffer_node<T> b3(g);
387     tbb::flow::make_edge( b, b2 );
388     tbb::flow::make_edge( b2, b3 );
389 
390     vsum = T(0);
391     for (int i = 0; i < N; ++i) {
392         bool msg = b.try_put( T(i) );
393         CHECK_MESSAGE( msg == true, "" );
394     }
395 
396     for (int i = 0; i < N; ++i) {
397         j = bogus_value;
398         spin_try_get( b3, j );
399         vsum += j;
400     }
401     CHECK_MESSAGE( vsum == (N*(N-1))/2, "");
402     j = bogus_value;
403     g.wait_for_all();
404     CHECK_MESSAGE( b.try_get( j ) == false, "" );
405     g.wait_for_all();
406     CHECK_MESSAGE( b2.try_get( j ) == false, "" );
407     g.wait_for_all();
408     CHECK_MESSAGE( b3.try_get( j ) == false, "" );
409     CHECK_MESSAGE( j == bogus_value, "" );
410 
411     tbb::flow::remove_edge(b, b2);
412     CHECK_MESSAGE( b.try_put( 1 ) == true, "" );
413     g.wait_for_all();
414     CHECK_MESSAGE( b2.try_get( j ) == false, "" );
415     CHECK_MESSAGE( j == bogus_value, "" );
416     g.wait_for_all();
417     CHECK_MESSAGE( b3.try_get( j ) == false, "" );
418     CHECK_MESSAGE( j == bogus_value, "" );
419     g.wait_for_all();
420     CHECK_MESSAGE( b.try_get( j ) == true, "" );
421     CHECK_MESSAGE( j == 1, "" );
422 
423     return 0;
424 }
425 
426 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
427 #include <array>
428 #include <vector>
429 void test_follows_and_precedes_api() {
430     using msg_t = tbb::flow::continue_msg;
431 
432     std::array<msg_t, 3> messages_for_follows = { {msg_t(), msg_t(), msg_t()} };
433     std::vector<msg_t> messages_for_precedes = {msg_t(), msg_t(), msg_t()};
434 
435     follows_and_precedes_testing::test_follows<msg_t, tbb::flow::buffer_node<msg_t>>(messages_for_follows);
436     follows_and_precedes_testing::test_precedes<msg_t, tbb::flow::buffer_node<msg_t>>(messages_for_precedes);
437 }
438 #endif
439 
440 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
441 void test_deduction_guides() {
442     using namespace tbb::flow;
443     graph g;
444     broadcast_node<int> br(g);
445     buffer_node<int> b0(g);
446 
447 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
448     buffer_node b1(follows(br));
449     static_assert(std::is_same_v<decltype(b1), buffer_node<int>>);
450 
451     buffer_node b2(precedes(br));
452     static_assert(std::is_same_v<decltype(b2), buffer_node<int>>);
453 #endif
454 
455     buffer_node b3(b0);
456     static_assert(std::is_same_v<decltype(b3), buffer_node<int>>);
457     g.wait_for_all();
458 }
459 #endif
460 
461 #include <iomanip>
462 
463 //! Test buffer_node with parallel and serial neighbours
464 //! \brief \ref requirement \ref error_guessing
465 TEST_CASE("Serial and parallel test"){
466     for (int p = 2; p <= 4; ++p) {
467         tbb::task_arena arena(p);
468         arena.execute(
469             [&]() {
470                 test_serial<int>();
471                 test_parallel<int>(p);
472             }
473         );
474     }
475 }
476 
477 //! Test reset and cancellation behavior
478 //! \brief \ref error_guessing
479 TEST_CASE("Resets"){
480     test_resets<int,tbb::flow::buffer_node<int> >();
481     test_resets<float,tbb::flow::buffer_node<float> >();
482 }
483 
484 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
485 //! Test deprecated follows and preceedes API
486 //! \brief \ref error_guessing
487 TEST_CASE("Follows and precedes API"){
488     test_follows_and_precedes_api();
489 }
490 #endif
491 
492 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
493 //! Test deduction guides
494 //! \brief requirement
495 TEST_CASE("Deduction guides"){
496     test_deduction_guides();
497 }
498 #endif
499