xref: /oneTBB/test/tbb/test_buffer_node.cpp (revision 552f342b)
1 /*
2     Copyright (c) 2005-2021 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #include "common/config.h"
18 
19 #include "tbb/flow_graph.h"
20 #include "tbb/global_control.h"
21 
22 #include "common/test.h"
23 #include "common/utils.h"
24 #include "common/graph_utils.h"
25 #include "common/test_follows_and_precedes_api.h"
26 
27 
28 //! \file test_buffer_node.cpp
29 //! \brief Test for [flow_graph.buffer_node] specification
30 
31 
32 #define N 1000
33 #define C 10
34 
35 template< typename T >
36 void spin_try_get( tbb::flow::buffer_node<T> &b, T &value ) {
37     while ( b.try_get(value) != true ) {}
38 }
39 
40 template< typename T >
41 void check_item( T* count_value, T &value ) {
42     count_value[value / N] += value % N;
43 }
44 
45 template< typename T >
46 struct parallel_puts : utils::NoAssign {
47 
48     tbb::flow::buffer_node<T> &my_b;
49 
50     parallel_puts( tbb::flow::buffer_node<T> &b ) : my_b(b) {}
51 
52     void operator()(int i) const {
53         for (int j = 0; j < N; ++j) {
54             bool msg = my_b.try_put( T(N*i + j) );
55             CHECK_MESSAGE( msg == true, "" );
56         }
57     }
58 };
59 
60 template< typename T >
61 struct touches {
62 
63     bool **my_touches;
64     int my_num_threads;
65 
66     touches( int num_threads ) : my_num_threads(num_threads) {
67         my_touches = new bool* [my_num_threads];
68         for ( int p = 0; p < my_num_threads; ++p) {
69             my_touches[p] = new bool[N];
70             for ( int n = 0; n < N; ++n)
71                 my_touches[p][n] = false;
72         }
73     }
74 
75     ~touches() {
76         for ( int p = 0; p < my_num_threads; ++p) {
77             delete [] my_touches[p];
78         }
79         delete [] my_touches;
80     }
81 
82     bool check( T v ) {
83         CHECK_MESSAGE( my_touches[v/N][v%N] == false, "" );
84         my_touches[v/N][v%N] = true;
85         return true;
86     }
87 
88     bool validate_touches() {
89         for ( int p = 0; p < my_num_threads; ++p) {
90             for ( int n = 0; n < N; ++n) {
91                 CHECK_MESSAGE( my_touches[p][n] == true, "" );
92             }
93         }
94         return true;
95     }
96 };
97 
98 template< typename T >
99 struct parallel_gets : utils::NoAssign {
100 
101     tbb::flow::buffer_node<T> &my_b;
102     touches<T> &my_touches;
103 
104     parallel_gets( tbb::flow::buffer_node<T> &b, touches<T> &t) : my_b(b), my_touches(t) {}
105 
106     void operator()(int) const {
107         for (int j = 0; j < N; ++j) {
108             T v;
109             spin_try_get( my_b, v );
110             my_touches.check( v );
111         }
112     }
113 
114 };
115 
116 template< typename T >
117 struct parallel_put_get : utils::NoAssign {
118 
119     tbb::flow::buffer_node<T> &my_b;
120     touches<T> &my_touches;
121 
122     parallel_put_get( tbb::flow::buffer_node<T> &b, touches<T> &t ) : my_b(b), my_touches(t) {}
123 
124     void operator()(int tid) const {
125 
126         for ( int i = 0; i < N; i+=C ) {
127             int j_end = ( N < i + C ) ? N : i + C;
128             // dump about C values into the buffer
129             for ( int j = i; j < j_end; ++j ) {
130                 CHECK_MESSAGE( my_b.try_put( T (N*tid + j ) ) == true, "" );
131             }
132             // receiver about C values from the buffer
133             for ( int j = i; j < j_end; ++j ) {
134                 T v;
135                 spin_try_get( my_b, v );
136                 my_touches.check( v );
137             }
138         }
139     }
140 
141 };
142 
143 //
144 // Tests
145 //
146 // Item can be reserved, released, consumed ( single serial receiver )
147 //
148 template< typename T >
149 int test_reservation() {
150     tbb::flow::graph g;
151     T bogus_value(-1);
152 
153     // Simple tests
154     tbb::flow::buffer_node<T> b(g);
155 
156     b.try_put(T(1));
157     b.try_put(T(2));
158     b.try_put(T(3));
159 
160     T v, vsum;
161     CHECK_MESSAGE( b.try_reserve(v) == true, "" );
162     CHECK_MESSAGE( b.try_release() == true, "" );
163     v = bogus_value;
164     g.wait_for_all();
165     CHECK_MESSAGE( b.try_reserve(v) == true, "" );
166     CHECK_MESSAGE( b.try_consume() == true, "" );
167     vsum += v;
168     v = bogus_value;
169     g.wait_for_all();
170 
171     CHECK_MESSAGE( b.try_get(v) == true, "" );
172     vsum += v;
173     v = bogus_value;
174     g.wait_for_all();
175 
176     CHECK_MESSAGE( b.try_reserve(v) == true, "" );
177     CHECK_MESSAGE( b.try_release() == true, "" );
178     v = bogus_value;
179     g.wait_for_all();
180     CHECK_MESSAGE( b.try_reserve(v) == true, "" );
181     CHECK_MESSAGE( b.try_consume() == true, "" );
182     vsum += v;
183     CHECK_MESSAGE( vsum == T(6), "");
184     v = bogus_value;
185     g.wait_for_all();
186 
187     return 0;
188 }
189 
190 //
191 // Tests
192 //
193 // multiple parallel senders, items in arbitrary order
194 // multiple parallel senders, multiple parallel receivers, items in arbitrary order and all items received
195 //   * overlapped puts / gets
196 //   * all puts finished before any getS
197 //
198 template< typename T >
199 int test_parallel(int num_threads) {
200     tbb::flow::graph g;
201     tbb::flow::buffer_node<T> b(g);
202     tbb::flow::buffer_node<T> b2(g);
203     tbb::flow::buffer_node<T> b3(g);
204     T bogus_value(-1);
205     T j = bogus_value;
206 
207     NativeParallelFor( num_threads, parallel_puts<T>(b) );
208 
209     T *next_value = new T[num_threads];
210     for (int tid = 0; tid < num_threads; ++tid) next_value[tid] = T(0);
211 
212     for (int i = 0; i < num_threads * N; ++i ) {
213         spin_try_get( b, j );
214         check_item( next_value, j );
215         j = bogus_value;
216     }
217     for (int tid = 0; tid < num_threads; ++tid)  {
218         CHECK_MESSAGE( next_value[tid] == T((N*(N-1))/2), "" );
219     }
220 
221     j = bogus_value;
222     g.wait_for_all();
223     CHECK_MESSAGE( b.try_get( j ) == false, "" );
224     CHECK_MESSAGE( j == bogus_value, "" );
225 
226     NativeParallelFor( num_threads, parallel_puts<T>(b) );
227 
228     {
229         touches< T > t( num_threads );
230         NativeParallelFor( num_threads, parallel_gets<T>(b, t) );
231         g.wait_for_all();
232         CHECK_MESSAGE( t.validate_touches(), "" );
233     }
234     j = bogus_value;
235     CHECK_MESSAGE( b.try_get( j ) == false, "" );
236     CHECK_MESSAGE( j == bogus_value, "" );
237 
238     g.wait_for_all();
239     {
240         touches< T > t( num_threads );
241         NativeParallelFor( num_threads, parallel_put_get<T>(b, t) );
242         g.wait_for_all();
243         CHECK_MESSAGE( t.validate_touches(), "" );
244     }
245     j = bogus_value;
246     CHECK_MESSAGE( b.try_get( j ) == false, "" );
247     CHECK_MESSAGE( j == bogus_value, "" );
248 
249     tbb::flow::make_edge( b, b2 );
250     tbb::flow::make_edge( b2, b3 );
251 
252     NativeParallelFor( num_threads, parallel_puts<T>(b) );
253     {
254         touches< T > t( num_threads );
255         NativeParallelFor( num_threads, parallel_gets<T>(b3, t) );
256         g.wait_for_all();
257         CHECK_MESSAGE( t.validate_touches(), "" );
258     }
259     j = bogus_value;
260     g.wait_for_all();
261     CHECK_MESSAGE( b.try_get( j ) == false, "" );
262     g.wait_for_all();
263     CHECK_MESSAGE( b2.try_get( j ) == false, "" );
264     g.wait_for_all();
265     CHECK_MESSAGE( b3.try_get( j ) == false, "" );
266     CHECK_MESSAGE( j == bogus_value, "" );
267 
268     // test copy constructor
269     CHECK_MESSAGE( b.remove_successor( b2 ), "" );
270     // fill up b:
271     NativeParallelFor( num_threads, parallel_puts<T>(b) );
272     // copy b:
273     tbb::flow::buffer_node<T> b_copy(b);
274 
275     // b_copy should be empty
276     j = bogus_value;
277     g.wait_for_all();
278     CHECK_MESSAGE( b_copy.try_get( j ) == false, "" );
279 
280     // hook them together:
281     CHECK_MESSAGE( b.register_successor(b_copy) == true, "" );
282     // try to get content from b_copy
283     {
284         touches< T > t( num_threads );
285         NativeParallelFor( num_threads, parallel_gets<T>(b_copy, t) );
286         g.wait_for_all();
287         CHECK_MESSAGE( t.validate_touches(), "" );
288     }
289     // now both should be empty
290     j = bogus_value;
291     g.wait_for_all();
292     CHECK_MESSAGE( b.try_get( j ) == false, "" );
293     g.wait_for_all();
294     CHECK_MESSAGE( b_copy.try_get( j ) == false, "" );
295     CHECK_MESSAGE( j == bogus_value, "" );
296 
297     delete [] next_value;
298     return 0;
299 }
300 
301 //
302 // Tests
303 //
304 // Predecessors cannot be registered
305 // Empty buffer rejects item requests
306 // Single serial sender, items in arbitrary order
307 // Chained buffers ( 2 & 3 ), single sender, items at last buffer in arbitrary order
308 //
309 
310 #define TBB_INTERNAL_NAMESPACE detail::d1
311 using tbb::TBB_INTERNAL_NAMESPACE::register_predecessor;
312 using tbb::TBB_INTERNAL_NAMESPACE::remove_predecessor;
313 
314 template< typename T >
315 int test_serial() {
316     tbb::flow::graph g;
317     T bogus_value(-1);
318 
319     tbb::flow::buffer_node<T> b(g);
320     tbb::flow::buffer_node<T> b2(g);
321     T j = bogus_value;
322 
323     //
324     // Rejects attempts to add / remove predecessor
325     // Rejects request from empty buffer
326     //
327     CHECK_MESSAGE( register_predecessor<T>( b, b2 ) == false, "" );
328     CHECK_MESSAGE( remove_predecessor<T>( b, b2 ) == false, "" );
329     CHECK_MESSAGE( b.try_get( j ) == false, "" );
330     CHECK_MESSAGE( j == bogus_value, "" );
331 
332     //
333     // Simple puts and gets
334     //
335 
336     for (int i = 0; i < N; ++i) {
337         bool msg = b.try_put( T(i) );
338         CHECK_MESSAGE( msg == true, "" );
339     }
340 
341     T vsum = T(0);
342     for (int i = 0; i < N; ++i) {
343         j = bogus_value;
344         spin_try_get( b, j );
345         vsum += j;
346     }
347     CHECK_MESSAGE( vsum == (N*(N-1))/2, "");
348     j = bogus_value;
349     g.wait_for_all();
350     CHECK_MESSAGE( b.try_get( j ) == false, "" );
351     CHECK_MESSAGE( j == bogus_value, "" );
352 
353     tbb::flow::make_edge(b, b2);
354 
355     vsum = T(0);
356     for (int i = 0; i < N; ++i) {
357         bool msg = b.try_put( T(i) );
358         CHECK_MESSAGE( msg == true, "" );
359     }
360 
361     for (int i = 0; i < N; ++i) {
362         j = bogus_value;
363         spin_try_get( b2, j );
364         vsum += j;
365     }
366     CHECK_MESSAGE( vsum == (N*(N-1))/2, "");
367     j = bogus_value;
368     g.wait_for_all();
369     CHECK_MESSAGE( b.try_get( j ) == false, "" );
370     g.wait_for_all();
371     CHECK_MESSAGE( b2.try_get( j ) == false, "" );
372     CHECK_MESSAGE( j == bogus_value, "" );
373 
374     tbb::flow::remove_edge(b, b2);
375     CHECK_MESSAGE( b.try_put( 1 ) == true, "" );
376     g.wait_for_all();
377     CHECK_MESSAGE( b2.try_get( j ) == false, "" );
378     CHECK_MESSAGE( j == bogus_value, "" );
379     g.wait_for_all();
380     CHECK_MESSAGE( b.try_get( j ) == true, "" );
381     CHECK_MESSAGE( j == 1, "" );
382 
383     tbb::flow::buffer_node<T> b3(g);
384     tbb::flow::make_edge( b, b2 );
385     tbb::flow::make_edge( b2, b3 );
386 
387     vsum = T(0);
388     for (int i = 0; i < N; ++i) {
389         bool msg = b.try_put( T(i) );
390         CHECK_MESSAGE( msg == true, "" );
391     }
392 
393     for (int i = 0; i < N; ++i) {
394         j = bogus_value;
395         spin_try_get( b3, j );
396         vsum += j;
397     }
398     CHECK_MESSAGE( vsum == (N*(N-1))/2, "");
399     j = bogus_value;
400     g.wait_for_all();
401     CHECK_MESSAGE( b.try_get( j ) == false, "" );
402     g.wait_for_all();
403     CHECK_MESSAGE( b2.try_get( j ) == false, "" );
404     g.wait_for_all();
405     CHECK_MESSAGE( b3.try_get( j ) == false, "" );
406     CHECK_MESSAGE( j == bogus_value, "" );
407 
408     tbb::flow::remove_edge(b, b2);
409     CHECK_MESSAGE( b.try_put( 1 ) == true, "" );
410     g.wait_for_all();
411     CHECK_MESSAGE( b2.try_get( j ) == false, "" );
412     CHECK_MESSAGE( j == bogus_value, "" );
413     g.wait_for_all();
414     CHECK_MESSAGE( b3.try_get( j ) == false, "" );
415     CHECK_MESSAGE( j == bogus_value, "" );
416     g.wait_for_all();
417     CHECK_MESSAGE( b.try_get( j ) == true, "" );
418     CHECK_MESSAGE( j == 1, "" );
419 
420     return 0;
421 }
422 
423 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
424 #include <array>
425 #include <vector>
426 void test_follows_and_precedes_api() {
427     using msg_t = tbb::flow::continue_msg;
428 
429     std::array<msg_t, 3> messages_for_follows = { {msg_t(), msg_t(), msg_t()} };
430     std::vector<msg_t> messages_for_precedes = {msg_t(), msg_t(), msg_t()};
431 
432     follows_and_precedes_testing::test_follows<msg_t, tbb::flow::buffer_node<msg_t>>(messages_for_follows);
433     follows_and_precedes_testing::test_precedes<msg_t, tbb::flow::buffer_node<msg_t>>(messages_for_precedes);
434 }
435 #endif
436 
437 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
438 void test_deduction_guides() {
439     using namespace tbb::flow;
440     graph g;
441     broadcast_node<int> br(g);
442     buffer_node<int> b0(g);
443 
444 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
445     buffer_node b1(follows(br));
446     static_assert(std::is_same_v<decltype(b1), buffer_node<int>>);
447 
448     buffer_node b2(precedes(br));
449     static_assert(std::is_same_v<decltype(b2), buffer_node<int>>);
450 #endif
451 
452     buffer_node b3(b0);
453     static_assert(std::is_same_v<decltype(b3), buffer_node<int>>);
454     g.wait_for_all();
455 }
456 #endif
457 
458 #include <iomanip>
459 
460 //! Test buffer_node with parallel and serial neighbours
461 //! \brief \ref requirement \ref error_guessing
462 TEST_CASE("Serial and parallel test"){
463     for (int p = 2; p <= 4; ++p) {
464         tbb::global_control thread_limit(tbb::global_control::max_allowed_parallelism, p);
465         tbb::task_arena arena(p);
466         arena.execute(
467             [&]() {
468                 test_serial<int>();
469                 test_parallel<int>(p);
470             }
471         );
472     }
473 }
474 
475 //! Test reset and cancellation behavior
476 //! \brief \ref error_guessing
477 TEST_CASE("Resets"){
478     test_resets<int,tbb::flow::buffer_node<int> >();
479     test_resets<float,tbb::flow::buffer_node<float> >();
480 }
481 
482 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
483 //! Test deprecated follows and preceedes API
484 //! \brief \ref error_guessing
485 TEST_CASE("Follows and precedes API"){
486     test_follows_and_precedes_api();
487 }
488 #endif
489 
490 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
491 //! Test deduction guides
492 //! \brief requirement
493 TEST_CASE("Deduction guides"){
494     test_deduction_guides();
495 }
496 #endif
497