xref: /oneTBB/test/tbb/test_buffer_node.cpp (revision be66b3a1)
1 /*
2     Copyright (c) 2005-2021 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #include "common/config.h"
18 
19 #include "tbb/flow_graph.h"
20 
21 #include "common/test.h"
22 #include "common/utils.h"
23 #include "common/graph_utils.h"
24 #include "common/test_follows_and_precedes_api.h"
25 
26 
27 //! \file test_buffer_node.cpp
28 //! \brief Test for [flow_graph.buffer_node] specification
29 
30 
31 #define N 1000
32 #define C 10
33 
34 template< typename T >
35 void spin_try_get( tbb::flow::buffer_node<T> &b, T &value ) {
36     while ( b.try_get(value) != true ) {}
37 }
38 
39 template< typename T >
40 void check_item( T* count_value, T &value ) {
41     count_value[value / N] += value % N;
42 }
43 
44 template< typename T >
45 struct parallel_puts : utils::NoAssign {
46 
47     tbb::flow::buffer_node<T> &my_b;
48 
49     parallel_puts( tbb::flow::buffer_node<T> &b ) : my_b(b) {}
50 
51     void operator()(int i) const {
52         for (int j = 0; j < N; ++j) {
53             bool msg = my_b.try_put( T(N*i + j) );
54             CHECK_MESSAGE( msg == true, "" );
55         }
56     }
57 };
58 
59 template< typename T >
60 struct touches {
61 
62     bool **my_touches;
63     int my_num_threads;
64 
65     touches( int num_threads ) : my_num_threads(num_threads) {
66         my_touches = new bool* [my_num_threads];
67         for ( int p = 0; p < my_num_threads; ++p) {
68             my_touches[p] = new bool[N];
69             for ( int n = 0; n < N; ++n)
70                 my_touches[p][n] = false;
71         }
72     }
73 
74     ~touches() {
75         for ( int p = 0; p < my_num_threads; ++p) {
76             delete [] my_touches[p];
77         }
78         delete [] my_touches;
79     }
80 
81     bool check( T v ) {
82         CHECK_MESSAGE( my_touches[v/N][v%N] == false, "" );
83         my_touches[v/N][v%N] = true;
84         return true;
85     }
86 
87     bool validate_touches() {
88         for ( int p = 0; p < my_num_threads; ++p) {
89             for ( int n = 0; n < N; ++n) {
90                 CHECK_MESSAGE( my_touches[p][n] == true, "" );
91             }
92         }
93         return true;
94     }
95 };
96 
97 template< typename T >
98 struct parallel_gets : utils::NoAssign {
99 
100     tbb::flow::buffer_node<T> &my_b;
101     touches<T> &my_touches;
102 
103     parallel_gets( tbb::flow::buffer_node<T> &b, touches<T> &t) : my_b(b), my_touches(t) {}
104 
105     void operator()(int) const {
106         for (int j = 0; j < N; ++j) {
107             T v;
108             spin_try_get( my_b, v );
109             my_touches.check( v );
110         }
111     }
112 
113 };
114 
115 template< typename T >
116 struct parallel_put_get : utils::NoAssign {
117 
118     tbb::flow::buffer_node<T> &my_b;
119     touches<T> &my_touches;
120 
121     parallel_put_get( tbb::flow::buffer_node<T> &b, touches<T> &t ) : my_b(b), my_touches(t) {}
122 
123     void operator()(int tid) const {
124 
125         for ( int i = 0; i < N; i+=C ) {
126             int j_end = ( N < i + C ) ? N : i + C;
127             // dump about C values into the buffer
128             for ( int j = i; j < j_end; ++j ) {
129                 CHECK_MESSAGE( my_b.try_put( T (N*tid + j ) ) == true, "" );
130             }
131             // receiver about C values from the buffer
132             for ( int j = i; j < j_end; ++j ) {
133                 T v;
134                 spin_try_get( my_b, v );
135                 my_touches.check( v );
136             }
137         }
138     }
139 
140 };
141 
142 //
143 // Tests
144 //
145 // Item can be reserved, released, consumed ( single serial receiver )
146 //
147 template< typename T >
148 int test_reservation() {
149     tbb::flow::graph g;
150     T bogus_value(-1);
151 
152     // Simple tests
153     tbb::flow::buffer_node<T> b(g);
154 
155     b.try_put(T(1));
156     b.try_put(T(2));
157     b.try_put(T(3));
158 
159     T v, vsum;
160     CHECK_MESSAGE( b.try_reserve(v) == true, "" );
161     CHECK_MESSAGE( b.try_release() == true, "" );
162     v = bogus_value;
163     g.wait_for_all();
164     CHECK_MESSAGE( b.try_reserve(v) == true, "" );
165     CHECK_MESSAGE( b.try_consume() == true, "" );
166     vsum += v;
167     v = bogus_value;
168     g.wait_for_all();
169 
170     CHECK_MESSAGE( b.try_get(v) == true, "" );
171     vsum += v;
172     v = bogus_value;
173     g.wait_for_all();
174 
175     CHECK_MESSAGE( b.try_reserve(v) == true, "" );
176     CHECK_MESSAGE( b.try_release() == true, "" );
177     v = bogus_value;
178     g.wait_for_all();
179     CHECK_MESSAGE( b.try_reserve(v) == true, "" );
180     CHECK_MESSAGE( b.try_consume() == true, "" );
181     vsum += v;
182     CHECK_MESSAGE( vsum == T(6), "");
183     v = bogus_value;
184     g.wait_for_all();
185 
186     return 0;
187 }
188 
189 //
190 // Tests
191 //
192 // multiple parallel senders, items in arbitrary order
193 // multiple parallel senders, multiple parallel receivers, items in arbitrary order and all items received
194 //   * overlapped puts / gets
195 //   * all puts finished before any getS
196 //
197 template< typename T >
198 int test_parallel(int num_threads) {
199     tbb::flow::graph g;
200     tbb::flow::buffer_node<T> b(g);
201     tbb::flow::buffer_node<T> b2(g);
202     tbb::flow::buffer_node<T> b3(g);
203     T bogus_value(-1);
204     T j = bogus_value;
205 
206     NativeParallelFor( num_threads, parallel_puts<T>(b) );
207 
208     T *next_value = new T[num_threads];
209     for (int tid = 0; tid < num_threads; ++tid) next_value[tid] = T(0);
210 
211     for (int i = 0; i < num_threads * N; ++i ) {
212         spin_try_get( b, j );
213         check_item( next_value, j );
214         j = bogus_value;
215     }
216     for (int tid = 0; tid < num_threads; ++tid)  {
217         CHECK_MESSAGE( next_value[tid] == T((N*(N-1))/2), "" );
218     }
219 
220     j = bogus_value;
221     g.wait_for_all();
222     CHECK_MESSAGE( b.try_get( j ) == false, "" );
223     CHECK_MESSAGE( j == bogus_value, "" );
224 
225     NativeParallelFor( num_threads, parallel_puts<T>(b) );
226 
227     {
228         touches< T > t( num_threads );
229         NativeParallelFor( num_threads, parallel_gets<T>(b, t) );
230         g.wait_for_all();
231         CHECK_MESSAGE( t.validate_touches(), "" );
232     }
233     j = bogus_value;
234     CHECK_MESSAGE( b.try_get( j ) == false, "" );
235     CHECK_MESSAGE( j == bogus_value, "" );
236 
237     g.wait_for_all();
238     {
239         touches< T > t( num_threads );
240         NativeParallelFor( num_threads, parallel_put_get<T>(b, t) );
241         g.wait_for_all();
242         CHECK_MESSAGE( t.validate_touches(), "" );
243     }
244     j = bogus_value;
245     CHECK_MESSAGE( b.try_get( j ) == false, "" );
246     CHECK_MESSAGE( j == bogus_value, "" );
247 
248     tbb::flow::make_edge( b, b2 );
249     tbb::flow::make_edge( b2, b3 );
250 
251     NativeParallelFor( num_threads, parallel_puts<T>(b) );
252     {
253         touches< T > t( num_threads );
254         NativeParallelFor( num_threads, parallel_gets<T>(b3, t) );
255         g.wait_for_all();
256         CHECK_MESSAGE( t.validate_touches(), "" );
257     }
258     j = bogus_value;
259     g.wait_for_all();
260     CHECK_MESSAGE( b.try_get( j ) == false, "" );
261     g.wait_for_all();
262     CHECK_MESSAGE( b2.try_get( j ) == false, "" );
263     g.wait_for_all();
264     CHECK_MESSAGE( b3.try_get( j ) == false, "" );
265     CHECK_MESSAGE( j == bogus_value, "" );
266 
267     // test copy constructor
268     CHECK_MESSAGE( b.remove_successor( b2 ), "" );
269     // fill up b:
270     NativeParallelFor( num_threads, parallel_puts<T>(b) );
271     // copy b:
272     tbb::flow::buffer_node<T> b_copy(b);
273 
274     // b_copy should be empty
275     j = bogus_value;
276     g.wait_for_all();
277     CHECK_MESSAGE( b_copy.try_get( j ) == false, "" );
278 
279     // hook them together:
280     CHECK_MESSAGE( b.register_successor(b_copy) == true, "" );
281     // try to get content from b_copy
282     {
283         touches< T > t( num_threads );
284         NativeParallelFor( num_threads, parallel_gets<T>(b_copy, t) );
285         g.wait_for_all();
286         CHECK_MESSAGE( t.validate_touches(), "" );
287     }
288     // now both should be empty
289     j = bogus_value;
290     g.wait_for_all();
291     CHECK_MESSAGE( b.try_get( j ) == false, "" );
292     g.wait_for_all();
293     CHECK_MESSAGE( b_copy.try_get( j ) == false, "" );
294     CHECK_MESSAGE( j == bogus_value, "" );
295 
296     delete [] next_value;
297     return 0;
298 }
299 
300 //
301 // Tests
302 //
303 // Predecessors cannot be registered
304 // Empty buffer rejects item requests
305 // Single serial sender, items in arbitrary order
306 // Chained buffers ( 2 & 3 ), single sender, items at last buffer in arbitrary order
307 //
308 
309 #define TBB_INTERNAL_NAMESPACE detail::d1
310 using tbb::TBB_INTERNAL_NAMESPACE::register_predecessor;
311 using tbb::TBB_INTERNAL_NAMESPACE::remove_predecessor;
312 
313 template< typename T >
314 int test_serial() {
315     tbb::flow::graph g;
316     T bogus_value(-1);
317 
318     tbb::flow::buffer_node<T> b(g);
319     tbb::flow::buffer_node<T> b2(g);
320     T j = bogus_value;
321 
322     //
323     // Rejects attempts to add / remove predecessor
324     // Rejects request from empty buffer
325     //
326     CHECK_MESSAGE( register_predecessor<T>( b, b2 ) == false, "" );
327     CHECK_MESSAGE( remove_predecessor<T>( b, b2 ) == false, "" );
328     CHECK_MESSAGE( b.try_get( j ) == false, "" );
329     CHECK_MESSAGE( j == bogus_value, "" );
330 
331     //
332     // Simple puts and gets
333     //
334 
335     for (int i = 0; i < N; ++i) {
336         bool msg = b.try_put( T(i) );
337         CHECK_MESSAGE( msg == true, "" );
338     }
339 
340     T vsum = T(0);
341     for (int i = 0; i < N; ++i) {
342         j = bogus_value;
343         spin_try_get( b, j );
344         vsum += j;
345     }
346     CHECK_MESSAGE( vsum == (N*(N-1))/2, "");
347     j = bogus_value;
348     g.wait_for_all();
349     CHECK_MESSAGE( b.try_get( j ) == false, "" );
350     CHECK_MESSAGE( j == bogus_value, "" );
351 
352     tbb::flow::make_edge(b, b2);
353 
354     vsum = T(0);
355     for (int i = 0; i < N; ++i) {
356         bool msg = b.try_put( T(i) );
357         CHECK_MESSAGE( msg == true, "" );
358     }
359 
360     for (int i = 0; i < N; ++i) {
361         j = bogus_value;
362         spin_try_get( b2, j );
363         vsum += j;
364     }
365     CHECK_MESSAGE( vsum == (N*(N-1))/2, "");
366     j = bogus_value;
367     g.wait_for_all();
368     CHECK_MESSAGE( b.try_get( j ) == false, "" );
369     g.wait_for_all();
370     CHECK_MESSAGE( b2.try_get( j ) == false, "" );
371     CHECK_MESSAGE( j == bogus_value, "" );
372 
373     tbb::flow::remove_edge(b, b2);
374     CHECK_MESSAGE( b.try_put( 1 ) == true, "" );
375     g.wait_for_all();
376     CHECK_MESSAGE( b2.try_get( j ) == false, "" );
377     CHECK_MESSAGE( j == bogus_value, "" );
378     g.wait_for_all();
379     CHECK_MESSAGE( b.try_get( j ) == true, "" );
380     CHECK_MESSAGE( j == 1, "" );
381 
382     tbb::flow::buffer_node<T> b3(g);
383     tbb::flow::make_edge( b, b2 );
384     tbb::flow::make_edge( b2, b3 );
385 
386     vsum = T(0);
387     for (int i = 0; i < N; ++i) {
388         bool msg = b.try_put( T(i) );
389         CHECK_MESSAGE( msg == true, "" );
390     }
391 
392     for (int i = 0; i < N; ++i) {
393         j = bogus_value;
394         spin_try_get( b3, j );
395         vsum += j;
396     }
397     CHECK_MESSAGE( vsum == (N*(N-1))/2, "");
398     j = bogus_value;
399     g.wait_for_all();
400     CHECK_MESSAGE( b.try_get( j ) == false, "" );
401     g.wait_for_all();
402     CHECK_MESSAGE( b2.try_get( j ) == false, "" );
403     g.wait_for_all();
404     CHECK_MESSAGE( b3.try_get( j ) == false, "" );
405     CHECK_MESSAGE( j == bogus_value, "" );
406 
407     tbb::flow::remove_edge(b, b2);
408     CHECK_MESSAGE( b.try_put( 1 ) == true, "" );
409     g.wait_for_all();
410     CHECK_MESSAGE( b2.try_get( j ) == false, "" );
411     CHECK_MESSAGE( j == bogus_value, "" );
412     g.wait_for_all();
413     CHECK_MESSAGE( b3.try_get( j ) == false, "" );
414     CHECK_MESSAGE( j == bogus_value, "" );
415     g.wait_for_all();
416     CHECK_MESSAGE( b.try_get( j ) == true, "" );
417     CHECK_MESSAGE( j == 1, "" );
418 
419     return 0;
420 }
421 
422 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
423 #include <array>
424 #include <vector>
425 void test_follows_and_precedes_api() {
426     using msg_t = tbb::flow::continue_msg;
427 
428     std::array<msg_t, 3> messages_for_follows = { {msg_t(), msg_t(), msg_t()} };
429     std::vector<msg_t> messages_for_precedes = {msg_t(), msg_t(), msg_t()};
430 
431     follows_and_precedes_testing::test_follows<msg_t, tbb::flow::buffer_node<msg_t>>(messages_for_follows);
432     follows_and_precedes_testing::test_precedes<msg_t, tbb::flow::buffer_node<msg_t>>(messages_for_precedes);
433 }
434 #endif
435 
436 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
437 void test_deduction_guides() {
438     using namespace tbb::flow;
439     graph g;
440     broadcast_node<int> br(g);
441     buffer_node<int> b0(g);
442 
443 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
444     buffer_node b1(follows(br));
445     static_assert(std::is_same_v<decltype(b1), buffer_node<int>>);
446 
447     buffer_node b2(precedes(br));
448     static_assert(std::is_same_v<decltype(b2), buffer_node<int>>);
449 #endif
450 
451     buffer_node b3(b0);
452     static_assert(std::is_same_v<decltype(b3), buffer_node<int>>);
453     g.wait_for_all();
454 }
455 #endif
456 
457 #include <iomanip>
458 
459 //! Test buffer_node with parallel and serial neighbours
460 //! \brief \ref requirement \ref error_guessing
461 TEST_CASE("Serial and parallel test"){
462     for (int p = 2; p <= 4; ++p) {
463         tbb::task_arena arena(p);
464         arena.execute(
465             [&]() {
466                 test_serial<int>();
467                 test_parallel<int>(p);
468             }
469         );
470     }
471 }
472 
473 //! Test reset and cancellation behavior
474 //! \brief \ref error_guessing
475 TEST_CASE("Resets"){
476     test_resets<int,tbb::flow::buffer_node<int> >();
477     test_resets<float,tbb::flow::buffer_node<float> >();
478 }
479 
480 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
481 //! Test deprecated follows and preceedes API
482 //! \brief \ref error_guessing
483 TEST_CASE("Follows and precedes API"){
484     test_follows_and_precedes_api();
485 }
486 #endif
487 
488 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
489 //! Test deduction guides
490 //! \brief requirement
491 TEST_CASE("Deduction guides"){
492     test_deduction_guides();
493 }
494 #endif
495