1 /*
2     Copyright (c) 2005-2021 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 // TODO: Add overlapping put / receive tests
18 
19 #include "common/config.h"
20 
21 // TODO revamp: move parts dependent on __TBB_EXTRA_DEBUG into separate test(s) since having these
22 // parts in all of tests might make testing of the product, which is different from what is actually
23 // released.
24 #define __TBB_EXTRA_DEBUG 1
25 #include "tbb/flow_graph.h"
26 
27 #include "common/test.h"
28 #include "common/utils.h"
29 #include "common/utils_assert.h"
30 #include "common/checktype.h"
31 #include "common/graph_utils.h"
32 #include "common/test_follows_and_precedes_api.h"
33 
34 #include <cstdio>
35 
36 
37 //! \file test_priority_queue_node.cpp
38 //! \brief Test for [flow_graph.priority_queue_node] specification
39 
40 
41 #define N 10
42 #define C 10
43 
44 template< typename T >
45 void spin_try_get( tbb::flow::priority_queue_node<T> &q, T &value ) {
46     while ( q.try_get(value) != true ) ;
47 }
48 
49 template< typename T >
50 void check_item( T* next_value, T &value ) {
51     int tid = value / N;
52     int offset = value % N;
53     CHECK_MESSAGE( next_value[tid] == T(offset), "" );
54     ++next_value[tid];
55 }
56 
57 template< typename T >
58 struct parallel_puts : utils::NoAssign {
59     tbb::flow::priority_queue_node<T> &my_q;
60     parallel_puts( tbb::flow::priority_queue_node<T> &q ) : my_q(q) {}
61     void operator()(int i) const {
62         for (int j = 0; j < N; ++j) {
63             bool msg = my_q.try_put( T(N*i + j) );
64             CHECK_MESSAGE( msg == true, "" );
65         }
66     }
67 };
68 
69 template< typename T >
70 struct parallel_gets : utils::NoAssign {
71     tbb::flow::priority_queue_node<T> &my_q;
72     parallel_gets( tbb::flow::priority_queue_node<T> &q) : my_q(q) {}
73     void operator()(int) const {
74         T prev;
75         spin_try_get( my_q, prev );
76         for (int j = 0; j < N-1; ++j) {
77             T v;
78             spin_try_get( my_q, v );
79             CHECK_MESSAGE(v < prev, "");
80         }
81     }
82 };
83 
84 template< typename T >
85 struct parallel_put_get : utils::NoAssign {
86     tbb::flow::priority_queue_node<T> &my_q;
87     parallel_put_get( tbb::flow::priority_queue_node<T> &q ) : my_q(q) {}
88     void operator()(int tid) const {
89         for ( int i = 0; i < N; i+=C ) {
90             int j_end = ( N < i + C ) ? N : i + C;
91             // dump about C values into the Q
92             for ( int j = i; j < j_end; ++j ) {
93                 CHECK_MESSAGE( my_q.try_put( T (N*tid + j ) ) == true, "" );
94             }
95             // receive about C values from the Q
96             for ( int j = i; j < j_end; ++j ) {
97                 T v;
98                 spin_try_get( my_q, v );
99             }
100         }
101     }
102 };
103 
104 //
105 // Tests
106 //
107 // Item can be reserved, released, consumed ( single serial receiver )
108 //
109 template< typename T >
110 int test_reservation(int) {
111     tbb::flow::graph g;
112 
113     // Simple tests
114     tbb::flow::priority_queue_node<T> q(g);
115 
116     {
117 
118         T bogus_value(-1);
119 
120         q.try_put(T(1));
121         q.try_put(T(2));
122         q.try_put(T(3));
123         g.wait_for_all();
124 
125         T v=bogus_value, w=bogus_value;
126         CHECK_MESSAGE( q.try_reserve(v) == true, "" );
127         CHECK_MESSAGE( v == T(3), "" );
128         CHECK_MESSAGE( q.try_release() == true, "" );
129         v = bogus_value;
130         g.wait_for_all();
131         CHECK_MESSAGE( q.try_reserve(v) == true, "" );
132         CHECK_MESSAGE( v == T(3), "" );
133         CHECK_MESSAGE( q.try_consume() == true, "" );
134         v = bogus_value;
135         g.wait_for_all();
136 
137         CHECK_MESSAGE( q.try_get(v) == true, "" );
138         CHECK_MESSAGE( v == T(2), "" );
139         v = bogus_value;
140         g.wait_for_all();
141 
142         CHECK_MESSAGE( q.try_reserve(v) == true, "" );
143         CHECK_MESSAGE( v == T(1), "" );
144         CHECK_MESSAGE( q.try_reserve(w) == false, "" );
145         CHECK_MESSAGE( w == bogus_value, "" );
146         CHECK_MESSAGE( q.try_get(w) == false, "" );
147         CHECK_MESSAGE( w == bogus_value, "" );
148         CHECK_MESSAGE( q.try_release() == true, "" );
149         v = bogus_value;
150         g.wait_for_all();
151         CHECK_MESSAGE( q.try_reserve(v) == true, "" );
152         CHECK_MESSAGE( v == T(1), "" );
153         CHECK_MESSAGE( q.try_consume() == true, "" );
154         v = bogus_value;
155         g.wait_for_all();
156         CHECK_MESSAGE( q.try_get(v) == false, "" );
157     }
158     return 0;
159 }
160 
161 //
162 // Tests
163 //
164 // multiple parallel senders, items in FIFO (relatively to sender) order
165 // multiple parallel senders, multiple parallel receivers, items in FIFO order (relative to sender/receiver) and all items received
166 //   * overlapped puts / gets
167 //   * all puts finished before any getS
168 //
169 template< typename T >
170 int test_parallel(int num_threads) {
171     tbb::flow::graph g;
172     tbb::flow::priority_queue_node<T> q(g);
173     tbb::flow::priority_queue_node<T> q2(g);
174     tbb::flow::priority_queue_node<T> q3(g);
175     T bogus_value(-1);
176     T j = bogus_value;
177 
178     NativeParallelFor( num_threads, parallel_puts<T>(q) );
179     for (int i = num_threads*N -1; i>=0; --i) {
180         spin_try_get( q, j );
181         CHECK_MESSAGE(j == i, "");
182         j = bogus_value;
183     }
184     g.wait_for_all();
185     CHECK_MESSAGE( q.try_get( j ) == false, "" );
186     CHECK_MESSAGE( j == bogus_value, "" );
187 
188     NativeParallelFor( num_threads, parallel_puts<T>(q) );
189     g.wait_for_all();
190     NativeParallelFor( num_threads, parallel_gets<T>(q) );
191     g.wait_for_all();
192     j = bogus_value;
193     CHECK_MESSAGE( q.try_get( j ) == false, "" );
194     CHECK_MESSAGE( j == bogus_value, "" );
195 
196     NativeParallelFor( num_threads, parallel_put_get<T>(q) );
197     g.wait_for_all();
198     j = bogus_value;
199     CHECK_MESSAGE( q.try_get( j ) == false, "" );
200     CHECK_MESSAGE( j == bogus_value, "" );
201 
202     tbb::flow::make_edge( q, q2 );
203     tbb::flow::make_edge( q2, q3 );
204     NativeParallelFor( num_threads, parallel_puts<T>(q) );
205     g.wait_for_all();
206     NativeParallelFor( num_threads, parallel_gets<T>(q3) );
207     g.wait_for_all();
208     j = bogus_value;
209     CHECK_MESSAGE( q.try_get( j ) == false, "" );
210     CHECK_MESSAGE( j == bogus_value, "" );
211     CHECK_MESSAGE( q2.try_get( j ) == false, "" );
212     CHECK_MESSAGE( j == bogus_value, "" );
213     CHECK_MESSAGE( q3.try_get( j ) == false, "" );
214     CHECK_MESSAGE( j == bogus_value, "" );
215 
216     // test copy constructor
217     CHECK_MESSAGE( remove_successor(q, q2) == true, "" );
218     NativeParallelFor( num_threads, parallel_puts<T>(q) );
219     tbb::flow::priority_queue_node<T> q_copy(q);
220     g.wait_for_all();
221     j = bogus_value;
222     CHECK_MESSAGE( q_copy.try_get( j ) == false, "" );
223     CHECK_MESSAGE( register_successor(q, q_copy) == true, "" );
224     for (int i = num_threads*N -1; i>=0; --i) {
225         spin_try_get( q_copy, j );
226         CHECK_MESSAGE(j == i, "");
227         j = bogus_value;
228     }
229     g.wait_for_all();
230     CHECK_MESSAGE( q.try_get( j ) == false, "" );
231     CHECK_MESSAGE( j == bogus_value, "" );
232     CHECK_MESSAGE( q_copy.try_get( j ) == false, "" );
233     CHECK_MESSAGE( j == bogus_value, "" );
234 
235     return 0;
236 }
237 
238 //
239 // Tests
240 //
241 // Predecessors cannot be registered
242 // Empty Q rejects item requests
243 // Single serial sender, items in FIFO order
244 // Chained Qs ( 2 & 3 ), single sender, items at last Q in FIFO order
245 //
246 
247 template< typename T >
248 int test_serial() {
249     tbb::flow::graph g;
250     T bogus_value(-1);
251 
252     tbb::flow::priority_queue_node<T> q(g);
253     tbb::flow::priority_queue_node<T> q2(g);
254     T j = bogus_value;
255 
256     //
257     // Rejects attempts to add / remove predecessor
258     // Rejects request from empty Q
259     //
260     CHECK_MESSAGE( register_predecessor(q, q2) == false, "" );
261     CHECK_MESSAGE( remove_predecessor(q, q2) == false, "" );
262     CHECK_MESSAGE( q.try_get( j ) == false, "" );
263     CHECK_MESSAGE( j == bogus_value, "" );
264 
265     //
266     // Simple puts and gets
267     //
268 
269     for (int i = 0; i < N; ++i)
270         CHECK_MESSAGE( q.try_put( T(i) ), "" );
271     for (int i = N-1; i >=0; --i) {
272         j = bogus_value;
273         spin_try_get( q, j );
274         CHECK_MESSAGE( i == j, "" );
275     }
276     j = bogus_value;
277     g.wait_for_all();
278     CHECK_MESSAGE( q.try_get( j ) == false, "" );
279     CHECK_MESSAGE( j == bogus_value, "" );
280 
281     tbb::flow::make_edge( q, q2 );
282 
283     for (int i = 0; i < N; ++i)
284         CHECK_MESSAGE( q.try_put( T(i) ), "" );
285     g.wait_for_all();
286     for (int i = N-1; i >= 0; --i) {
287         j = bogus_value;
288         spin_try_get( q2, j );
289         CHECK_MESSAGE( i == j, "" );
290     }
291     j = bogus_value;
292     g.wait_for_all();
293     CHECK_MESSAGE( q.try_get( j ) == false, "" );
294     g.wait_for_all();
295     CHECK_MESSAGE( q2.try_get( j ) == false, "" );
296     CHECK_MESSAGE( j == bogus_value, "" );
297 
298     tbb::flow::remove_edge( q, q2 );
299     CHECK_MESSAGE( q.try_put( 1 ) == true, "" );
300     g.wait_for_all();
301     CHECK_MESSAGE( q2.try_get( j ) == false, "" );
302     CHECK_MESSAGE( j == bogus_value, "" );
303     g.wait_for_all();
304     CHECK_MESSAGE( q.try_get( j ) == true, "" );
305     CHECK_MESSAGE( j == 1, "" );
306 
307     tbb::flow::priority_queue_node<T> q3(g);
308     tbb::flow::make_edge( q, q2 );
309     tbb::flow::make_edge( q2, q3 );
310 
311     for (int i = 0; i < N; ++i)
312         CHECK_MESSAGE(  q.try_put( T(i) ), "" );
313     g.wait_for_all();
314     for (int i = N-1; i >= 0; --i) {
315         j = bogus_value;
316         spin_try_get( q3, j );
317         CHECK_MESSAGE( i == j, "" );
318     }
319     j = bogus_value;
320     g.wait_for_all();
321     CHECK_MESSAGE( q.try_get( j ) == false, "" );
322     g.wait_for_all();
323     CHECK_MESSAGE( q2.try_get( j ) == false, "" );
324     g.wait_for_all();
325     CHECK_MESSAGE( q3.try_get( j ) == false, "" );
326     CHECK_MESSAGE( j == bogus_value, "" );
327 
328     tbb::flow::remove_edge( q,  q2 );
329     CHECK_MESSAGE( q.try_put( 1 ) == true, "" );
330     g.wait_for_all();
331     CHECK_MESSAGE( q2.try_get( j ) == false, "" );
332     CHECK_MESSAGE( j == bogus_value, "" );
333     g.wait_for_all();
334     CHECK_MESSAGE( q3.try_get( j ) == false, "" );
335     CHECK_MESSAGE( j == bogus_value, "" );
336     g.wait_for_all();
337     CHECK_MESSAGE( q.try_get( j ) == true, "" );
338     CHECK_MESSAGE( j == 1, "" );
339 
340     return 0;
341 }
342 
343 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
344 #include <array>
345 #include <vector>
346 void test_follows_and_precedes_api() {
347     std::array<int, 3> messages_for_follows = { {0, 1, 2} };
348     std::vector<int> messages_for_precedes = {0, 1, 2};
349 
350     follows_and_precedes_testing::test_follows <int, tbb::flow::priority_queue_node<int>>(messages_for_follows);
351     follows_and_precedes_testing::test_precedes <int, tbb::flow::priority_queue_node<int>>(messages_for_precedes);
352 }
353 #endif
354 
355 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
356 void test_deduction_guides() {
357     using namespace tbb::flow;
358 
359     graph g;
360     broadcast_node<int> br(g);
361     priority_queue_node<int> pq0(g);
362 
363 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
364     using compare_type = std::greater<void>;
365     priority_queue_node pq1(follows(br));
366     static_assert(std::is_same_v<decltype(pq1), priority_queue_node<int>>);
367 
368     priority_queue_node pq2(follows(br), compare_type());
369     static_assert(std::is_same_v<decltype(pq2), priority_queue_node<int, compare_type>>);
370 
371     priority_queue_node pq3(precedes(br));
372     static_assert(std::is_same_v<decltype(pq3), priority_queue_node<int>>);
373 
374     priority_queue_node pq4(precedes(br), compare_type());
375     static_assert(std::is_same_v<decltype(pq4), priority_queue_node<int, compare_type>>);
376 #endif
377 
378     priority_queue_node pq5(pq0);
379     static_assert(std::is_same_v<decltype(pq5), priority_queue_node<int>>);
380     g.wait_for_all();
381 }
382 #endif
383 
384 //! Test serial, parallel behavior and reservation under parallelism
385 //! \brief \ref requirement \ref error_guessing
386 TEST_CASE("Serial, parallel and reservation tests"){
387     for (int p = 2; p <= 4; ++p) {
388         tbb::task_arena arena(p);
389         arena.execute(
390             [&]() {
391                 test_serial<int>();
392                 test_reservation<int>(p);
393                 test_reservation<CheckType<int> >(p);
394                 test_parallel<int>(p);
395             }
396         );
397 	}
398 }
399 
400 //! Test reset and cancellation
401 //! \brief \ref error_guessing
402 TEST_CASE("Reset tests"){
403     INFO("Testing resets\n");
404     test_resets<int,tbb::flow::priority_queue_node<int> >();
405     test_resets<float,tbb::flow::priority_queue_node<float> >();
406 }
407 
408 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
409 //! Test follows and precedes API
410 //! \brief \ref error_guessing
411 TEST_CASE("Test follows and precedes API"){
412     test_follows_and_precedes_api();
413 }
414 #endif
415 
416 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
417 //! Test decution guides
418 //! \brief \ref requirement
419 TEST_CASE("Test deduction guides"){
420     test_deduction_guides();
421 }
422 #endif
423 
424