1 /*
2     Copyright (c) 2005-2021 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 // TODO: Add overlapping put / receive tests
18 
19 #include "common/config.h"
20 
21 #include "tbb/flow_graph.h"
22 
23 #include "common/test.h"
24 #include "common/utils.h"
25 #include "common/utils_assert.h"
26 #include "common/checktype.h"
27 #include "common/graph_utils.h"
28 #include "common/test_follows_and_precedes_api.h"
29 
30 #include <cstdio>
31 
32 
33 //! \file test_priority_queue_node.cpp
34 //! \brief Test for [flow_graph.priority_queue_node] specification
35 
36 
37 #define N 10
38 #define C 10
39 
40 template< typename T >
41 void spin_try_get( tbb::flow::priority_queue_node<T> &q, T &value ) {
42     while ( q.try_get(value) != true ) ;
43 }
44 
45 template< typename T >
46 void check_item( T* next_value, T &value ) {
47     int tid = value / N;
48     int offset = value % N;
49     CHECK_MESSAGE( next_value[tid] == T(offset), "" );
50     ++next_value[tid];
51 }
52 
53 template< typename T >
54 struct parallel_puts : utils::NoAssign {
55     tbb::flow::priority_queue_node<T> &my_q;
56     parallel_puts( tbb::flow::priority_queue_node<T> &q ) : my_q(q) {}
57     void operator()(int i) const {
58         for (int j = 0; j < N; ++j) {
59             bool msg = my_q.try_put( T(N*i + j) );
60             CHECK_MESSAGE( msg == true, "" );
61         }
62     }
63 };
64 
65 template< typename T >
66 struct parallel_gets : utils::NoAssign {
67     tbb::flow::priority_queue_node<T> &my_q;
68     parallel_gets( tbb::flow::priority_queue_node<T> &q) : my_q(q) {}
69     void operator()(int) const {
70         T prev;
71         spin_try_get( my_q, prev );
72         for (int j = 0; j < N-1; ++j) {
73             T v;
74             spin_try_get( my_q, v );
75             CHECK_MESSAGE(v < prev, "");
76         }
77     }
78 };
79 
80 template< typename T >
81 struct parallel_put_get : utils::NoAssign {
82     tbb::flow::priority_queue_node<T> &my_q;
83     parallel_put_get( tbb::flow::priority_queue_node<T> &q ) : my_q(q) {}
84     void operator()(int tid) const {
85         for ( int i = 0; i < N; i+=C ) {
86             int j_end = ( N < i + C ) ? N : i + C;
87             // dump about C values into the Q
88             for ( int j = i; j < j_end; ++j ) {
89                 CHECK_MESSAGE( my_q.try_put( T (N*tid + j ) ) == true, "" );
90             }
91             // receive about C values from the Q
92             for ( int j = i; j < j_end; ++j ) {
93                 T v;
94                 spin_try_get( my_q, v );
95             }
96         }
97     }
98 };
99 
100 //
101 // Tests
102 //
103 // Item can be reserved, released, consumed ( single serial receiver )
104 //
105 template< typename T >
106 int test_reservation(int) {
107     tbb::flow::graph g;
108 
109     // Simple tests
110     tbb::flow::priority_queue_node<T> q(g);
111 
112     {
113 
114         T bogus_value(-1);
115 
116         q.try_put(T(1));
117         q.try_put(T(2));
118         q.try_put(T(3));
119         g.wait_for_all();
120 
121         T v=bogus_value, w=bogus_value;
122         CHECK_MESSAGE( q.try_reserve(v) == true, "" );
123         CHECK_MESSAGE( v == T(3), "" );
124         CHECK_MESSAGE( q.try_release() == true, "" );
125         v = bogus_value;
126         g.wait_for_all();
127         CHECK_MESSAGE( q.try_reserve(v) == true, "" );
128         CHECK_MESSAGE( v == T(3), "" );
129         CHECK_MESSAGE( q.try_consume() == true, "" );
130         v = bogus_value;
131         g.wait_for_all();
132 
133         CHECK_MESSAGE( q.try_get(v) == true, "" );
134         CHECK_MESSAGE( v == T(2), "" );
135         v = bogus_value;
136         g.wait_for_all();
137 
138         CHECK_MESSAGE( q.try_reserve(v) == true, "" );
139         CHECK_MESSAGE( v == T(1), "" );
140         CHECK_MESSAGE( q.try_reserve(w) == false, "" );
141         CHECK_MESSAGE( w == bogus_value, "" );
142         CHECK_MESSAGE( q.try_get(w) == false, "" );
143         CHECK_MESSAGE( w == bogus_value, "" );
144         CHECK_MESSAGE( q.try_release() == true, "" );
145         v = bogus_value;
146         g.wait_for_all();
147         CHECK_MESSAGE( q.try_reserve(v) == true, "" );
148         CHECK_MESSAGE( v == T(1), "" );
149         CHECK_MESSAGE( q.try_consume() == true, "" );
150         v = bogus_value;
151         g.wait_for_all();
152         CHECK_MESSAGE( q.try_get(v) == false, "" );
153     }
154     return 0;
155 }
156 
157 //
158 // Tests
159 //
160 // multiple parallel senders, items in FIFO (relatively to sender) order
161 // multiple parallel senders, multiple parallel receivers, items in FIFO order (relative to sender/receiver) and all items received
162 //   * overlapped puts / gets
163 //   * all puts finished before any getS
164 //
165 template< typename T >
166 int test_parallel(int num_threads) {
167     tbb::flow::graph g;
168     tbb::flow::priority_queue_node<T> q(g);
169     tbb::flow::priority_queue_node<T> q2(g);
170     tbb::flow::priority_queue_node<T> q3(g);
171     T bogus_value(-1);
172     T j = bogus_value;
173 
174     NativeParallelFor( num_threads, parallel_puts<T>(q) );
175     for (int i = num_threads*N -1; i>=0; --i) {
176         spin_try_get( q, j );
177         CHECK_MESSAGE(j == i, "");
178         j = bogus_value;
179     }
180     g.wait_for_all();
181     CHECK_MESSAGE( q.try_get( j ) == false, "" );
182     CHECK_MESSAGE( j == bogus_value, "" );
183 
184     NativeParallelFor( num_threads, parallel_puts<T>(q) );
185     g.wait_for_all();
186     NativeParallelFor( num_threads, parallel_gets<T>(q) );
187     g.wait_for_all();
188     j = bogus_value;
189     CHECK_MESSAGE( q.try_get( j ) == false, "" );
190     CHECK_MESSAGE( j == bogus_value, "" );
191 
192     NativeParallelFor( num_threads, parallel_put_get<T>(q) );
193     g.wait_for_all();
194     j = bogus_value;
195     CHECK_MESSAGE( q.try_get( j ) == false, "" );
196     CHECK_MESSAGE( j == bogus_value, "" );
197 
198     tbb::flow::make_edge( q, q2 );
199     tbb::flow::make_edge( q2, q3 );
200     NativeParallelFor( num_threads, parallel_puts<T>(q) );
201     g.wait_for_all();
202     NativeParallelFor( num_threads, parallel_gets<T>(q3) );
203     g.wait_for_all();
204     j = bogus_value;
205     CHECK_MESSAGE( q.try_get( j ) == false, "" );
206     CHECK_MESSAGE( j == bogus_value, "" );
207     CHECK_MESSAGE( q2.try_get( j ) == false, "" );
208     CHECK_MESSAGE( j == bogus_value, "" );
209     CHECK_MESSAGE( q3.try_get( j ) == false, "" );
210     CHECK_MESSAGE( j == bogus_value, "" );
211 
212     // test copy constructor
213     CHECK_MESSAGE( remove_successor(q, q2) == true, "" );
214     NativeParallelFor( num_threads, parallel_puts<T>(q) );
215     tbb::flow::priority_queue_node<T> q_copy(q);
216     g.wait_for_all();
217     j = bogus_value;
218     CHECK_MESSAGE( q_copy.try_get( j ) == false, "" );
219     CHECK_MESSAGE( register_successor(q, q_copy) == true, "" );
220     for (int i = num_threads*N -1; i>=0; --i) {
221         spin_try_get( q_copy, j );
222         CHECK_MESSAGE(j == i, "");
223         j = bogus_value;
224     }
225     g.wait_for_all();
226     CHECK_MESSAGE( q.try_get( j ) == false, "" );
227     CHECK_MESSAGE( j == bogus_value, "" );
228     CHECK_MESSAGE( q_copy.try_get( j ) == false, "" );
229     CHECK_MESSAGE( j == bogus_value, "" );
230 
231     return 0;
232 }
233 
234 //
235 // Tests
236 //
237 // Predecessors cannot be registered
238 // Empty Q rejects item requests
239 // Single serial sender, items in FIFO order
240 // Chained Qs ( 2 & 3 ), single sender, items at last Q in FIFO order
241 //
242 
243 template< typename T >
244 int test_serial() {
245     tbb::flow::graph g;
246     T bogus_value(-1);
247 
248     tbb::flow::priority_queue_node<T> q(g);
249     tbb::flow::priority_queue_node<T> q2(g);
250     T j = bogus_value;
251 
252     //
253     // Rejects attempts to add / remove predecessor
254     // Rejects request from empty Q
255     //
256     CHECK_MESSAGE( register_predecessor(q, q2) == false, "" );
257     CHECK_MESSAGE( remove_predecessor(q, q2) == false, "" );
258     CHECK_MESSAGE( q.try_get( j ) == false, "" );
259     CHECK_MESSAGE( j == bogus_value, "" );
260 
261     //
262     // Simple puts and gets
263     //
264 
265     for (int i = 0; i < N; ++i)
266         CHECK_MESSAGE( q.try_put( T(i) ), "" );
267     for (int i = N-1; i >=0; --i) {
268         j = bogus_value;
269         spin_try_get( q, j );
270         CHECK_MESSAGE( i == j, "" );
271     }
272     j = bogus_value;
273     g.wait_for_all();
274     CHECK_MESSAGE( q.try_get( j ) == false, "" );
275     CHECK_MESSAGE( j == bogus_value, "" );
276 
277     tbb::flow::make_edge( q, q2 );
278 
279     for (int i = 0; i < N; ++i)
280         CHECK_MESSAGE( q.try_put( T(i) ), "" );
281     g.wait_for_all();
282     for (int i = N-1; i >= 0; --i) {
283         j = bogus_value;
284         spin_try_get( q2, j );
285         CHECK_MESSAGE( i == j, "" );
286     }
287     j = bogus_value;
288     g.wait_for_all();
289     CHECK_MESSAGE( q.try_get( j ) == false, "" );
290     g.wait_for_all();
291     CHECK_MESSAGE( q2.try_get( j ) == false, "" );
292     CHECK_MESSAGE( j == bogus_value, "" );
293 
294     tbb::flow::remove_edge( q, q2 );
295     CHECK_MESSAGE( q.try_put( 1 ) == true, "" );
296     g.wait_for_all();
297     CHECK_MESSAGE( q2.try_get( j ) == false, "" );
298     CHECK_MESSAGE( j == bogus_value, "" );
299     g.wait_for_all();
300     CHECK_MESSAGE( q.try_get( j ) == true, "" );
301     CHECK_MESSAGE( j == 1, "" );
302 
303     tbb::flow::priority_queue_node<T> q3(g);
304     tbb::flow::make_edge( q, q2 );
305     tbb::flow::make_edge( q2, q3 );
306 
307     for (int i = 0; i < N; ++i)
308         CHECK_MESSAGE(  q.try_put( T(i) ), "" );
309     g.wait_for_all();
310     for (int i = N-1; i >= 0; --i) {
311         j = bogus_value;
312         spin_try_get( q3, j );
313         CHECK_MESSAGE( i == j, "" );
314     }
315     j = bogus_value;
316     g.wait_for_all();
317     CHECK_MESSAGE( q.try_get( j ) == false, "" );
318     g.wait_for_all();
319     CHECK_MESSAGE( q2.try_get( j ) == false, "" );
320     g.wait_for_all();
321     CHECK_MESSAGE( q3.try_get( j ) == false, "" );
322     CHECK_MESSAGE( j == bogus_value, "" );
323 
324     tbb::flow::remove_edge( q,  q2 );
325     CHECK_MESSAGE( q.try_put( 1 ) == true, "" );
326     g.wait_for_all();
327     CHECK_MESSAGE( q2.try_get( j ) == false, "" );
328     CHECK_MESSAGE( j == bogus_value, "" );
329     g.wait_for_all();
330     CHECK_MESSAGE( q3.try_get( j ) == false, "" );
331     CHECK_MESSAGE( j == bogus_value, "" );
332     g.wait_for_all();
333     CHECK_MESSAGE( q.try_get( j ) == true, "" );
334     CHECK_MESSAGE( j == 1, "" );
335 
336     return 0;
337 }
338 
339 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
340 #include <array>
341 #include <vector>
342 void test_follows_and_precedes_api() {
343     std::array<int, 3> messages_for_follows = { {0, 1, 2} };
344     std::vector<int> messages_for_precedes = {0, 1, 2};
345 
346     follows_and_precedes_testing::test_follows <int, tbb::flow::priority_queue_node<int>>(messages_for_follows);
347     follows_and_precedes_testing::test_precedes <int, tbb::flow::priority_queue_node<int>>(messages_for_precedes);
348 }
349 #endif
350 
351 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
352 void test_deduction_guides() {
353     using namespace tbb::flow;
354 
355     graph g;
356     broadcast_node<int> br(g);
357     priority_queue_node<int> pq0(g);
358 
359 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
360     using compare_type = std::greater<void>;
361     priority_queue_node pq1(follows(br));
362     static_assert(std::is_same_v<decltype(pq1), priority_queue_node<int>>);
363 
364     priority_queue_node pq2(follows(br), compare_type());
365     static_assert(std::is_same_v<decltype(pq2), priority_queue_node<int, compare_type>>);
366 
367     priority_queue_node pq3(precedes(br));
368     static_assert(std::is_same_v<decltype(pq3), priority_queue_node<int>>);
369 
370     priority_queue_node pq4(precedes(br), compare_type());
371     static_assert(std::is_same_v<decltype(pq4), priority_queue_node<int, compare_type>>);
372 #endif
373 
374     priority_queue_node pq5(pq0);
375     static_assert(std::is_same_v<decltype(pq5), priority_queue_node<int>>);
376     g.wait_for_all();
377 }
378 #endif
379 
380 //! Test serial, parallel behavior and reservation under parallelism
381 //! \brief \ref requirement \ref error_guessing
382 TEST_CASE("Serial, parallel and reservation tests"){
383     for (int p = 2; p <= 4; ++p) {
384         tbb::task_arena arena(p);
385         arena.execute(
386             [&]() {
387                 test_serial<int>();
388                 test_reservation<int>(p);
389                 test_reservation<CheckType<int> >(p);
390                 test_parallel<int>(p);
391             }
392         );
393 	}
394 }
395 
396 //! Test reset and cancellation
397 //! \brief \ref error_guessing
398 TEST_CASE("Reset tests"){
399     INFO("Testing resets\n");
400     test_resets<int,tbb::flow::priority_queue_node<int> >();
401     test_resets<float,tbb::flow::priority_queue_node<float> >();
402 }
403 
404 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
405 //! Test follows and precedes API
406 //! \brief \ref error_guessing
407 TEST_CASE("Test follows and precedes API"){
408     test_follows_and_precedes_api();
409 }
410 #endif
411 
412 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
413 //! Test decution guides
414 //! \brief \ref requirement
415 TEST_CASE("Test deduction guides"){
416     test_deduction_guides();
417 }
418 #endif
419 
420