1 /*
2     Copyright (c) 2020-2021 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #include "common/test.h"
18 #include "common/utils.h"
19 #include "common/checktype.h"
20 #include "common/spin_barrier.h"
21 #include "common/utils_concurrency_limit.h"
22 
23 #include "oneapi/tbb/parallel_pipeline.h"
24 #include "oneapi/tbb/global_control.h"
25 #include "oneapi/tbb/task_group.h"
26 
27 #include <atomic>
28 #include <thread>
29 #include <string.h>
30 #include <memory>
31 #include <tuple>
32 
33 //! \file conformance_parallel_pipeline.cpp
34 //! \brief Test for [algorithms.parallel_pipeline algorithms.parallel_pipeline.flow_control] specification
35 
36 constexpr std::size_t n_tokens = 8;
37 
38 constexpr int max_counter = 1024;
39 
40 static std::atomic<int> input_counter{ max_counter };
41 
42 template<typename U>
43 class input_filter {
44 public:
45     U operator()( oneapi::tbb::flow_control& control ) const {
46         if( --input_counter < 0 ) {
47             control.stop();
48             input_counter = max_counter;
49         }
50         return U();
51     }
52 };
53 
54 template<typename T, typename U>
55 class middle_filter {
56 public:
57     U operator()(T) const {
58         U out{};
59         return out;
60     }
61 };
62 
63 template<typename T>
64 class output_filter {
65 public:
66     void operator()(T ) const {}
67 };
68 
69 static const oneapi::tbb::filter_mode filter_table[] = { oneapi::tbb::filter_mode::parallel,
70                                                  oneapi::tbb::filter_mode::serial_in_order,
71                                                  oneapi::tbb::filter_mode::serial_out_of_order};
72 
73 template<typename Body, typename... Cotnext>
74 void TestSingleFilter(Body body, Cotnext&... context) {
75 
76     for(int i =0; i <3; i++)
77     {
78         oneapi::tbb::filter_mode mode = filter_table[i];
79 
80         oneapi::tbb::filter<void, void> one_filter( mode, body );
81         oneapi::tbb::parallel_pipeline( n_tokens, one_filter, context...   );
82 
83         oneapi::tbb::parallel_pipeline( n_tokens, oneapi::tbb::filter<void, void>(mode, body), context... );
84 
85         oneapi::tbb::parallel_pipeline( n_tokens, oneapi::tbb::make_filter<void, void>(mode, body), context...);
86     }
87 }
88 
89 void TestSingleFilterFunctor() {
90 
91     input_filter<void> i_filter;
92 
93     TestSingleFilter(i_filter);
94 
95     oneapi::tbb::task_group_context context;
96     TestSingleFilter(i_filter,  context);
97 }
98 
99 
100 void TestSingleFilterLambda() {
101 
102 
103     TestSingleFilter([]( oneapi::tbb::flow_control& control ) {
104                     if(input_counter-- == 0 ) {
105                         control.stop();
106                         input_counter = max_counter;
107                         }
108                     } );
109 
110     oneapi::tbb::task_group_context context;
111     TestSingleFilter([]( oneapi::tbb::flow_control& control ) {
112                      if(input_counter-- == 0 ) {
113                         control.stop();
114                         input_counter = max_counter;
115                         }
116                     },  context);
117 }
118 
119 template<typename I, typename O>
120 void RunPipeline(const oneapi::tbb::filter<I, O> &filter)
121 {
122     bool flag{false};
123 
124     auto f_beg = oneapi::tbb::make_filter<void, I>(oneapi::tbb::filter_mode::serial_out_of_order,
125                                         [&flag](oneapi::tbb::flow_control& fc) -> I{
126                                             if(flag) {
127                                                 fc.stop();
128                                             }
129                                             flag = true;
130                                             return I();
131                                         });
132 
133     auto f_end = oneapi::tbb::make_filter<O, void>(oneapi::tbb::filter_mode::serial_in_order,
134                                             [](O) {});
135 
136     oneapi::tbb::parallel_pipeline(n_tokens, f_beg & filter & f_end);
137 }
138 
139 void RunPipeline(const oneapi::tbb::filter<void, void> &filter)
140 {
141     oneapi::tbb::parallel_pipeline(n_tokens, filter);
142 }
143 
144 template<typename Iterator1, typename Iterator2>
145 void RootSequence( Iterator1 first, Iterator1 last, Iterator2 res) {
146     using ValueType = typename Iterator1::value_type;
147     oneapi::tbb::parallel_pipeline( n_tokens,
148         oneapi::tbb::make_filter<void,ValueType>(
149             oneapi::tbb::filter_mode::serial_in_order,
150             [&first, &last](oneapi::tbb::flow_control& fc)-> ValueType{
151                 if( first<last ) {
152                     ValueType val  = *first;
153                     ++first;
154                     return val;
155                  } else {
156                     fc.stop();
157                     return ValueType{};
158                 }
159             }
160         ) &
161         oneapi::tbb::make_filter<ValueType,ValueType>(
162             oneapi::tbb::filter_mode::parallel,
163             [](ValueType p){return p*p;}
164         ) &
165         oneapi::tbb::make_filter<ValueType,void>(
166             oneapi::tbb::filter_mode::serial_in_order,
167             [&res](ValueType x) {
168                 *res = x;
169                 ++res; }
170         )
171     );
172 }
173 
174 //! Testing pipeline correctness
175 //! \brief \ref interface \ref requirement
176 TEST_CASE("Testing pipeline correctness")
177 {
178     std::vector<double> input(max_counter);
179     std::vector<double> output(max_counter);
180     for(std::size_t i = 0; i < max_counter; i++)
181         input[i] = (double)i;
182 
183     RootSequence(input.cbegin(), input.cend(), output.begin());
184     for(int  i = 0; i < max_counter; i++) {
185         CHECK_MESSAGE(output[i] == input[i]*input[i], "pipeline result is incorrect");
186     }
187 }
188 
189 //! Testing pipeline with single filter
190 //! \brief \ref interface \ref requirement
191 TEST_CASE("Testing pipeline with single filter")
192 {
193     TestSingleFilterFunctor();
194     TestSingleFilterLambda();
195 }
196 
197 //! Testing single filter with different ways of creation
198 //! \brief \ref interface \ref requirement
199 TEST_CASE_TEMPLATE("Filter creation testing", T, std::tuple<size_t, int>,
200                                                  std::tuple<int, double>,
201                                                  std::tuple<unsigned int*, size_t>,
202                                                  std::tuple<unsigned short, unsigned short*>,
203                                                  std::tuple<double*, unsigned short*>,
204                                                  std::tuple<std::unique_ptr<int>, std::unique_ptr<int> >)
205 {
206     using I = typename std::tuple_element<0, T>::type;
207     using O = typename std::tuple_element<1, T>::type;
208     for(int i = 0; i < 3; i++)
209     {
210         oneapi::tbb::filter_mode mode = filter_table[i];
211         oneapi::tbb::filter<I, O> default_filter;
212 
213         auto made_filter1 = oneapi::tbb::make_filter<I,O>(mode, [](I)->O{return O();});
214         static_assert(std::is_same<oneapi::tbb::filter<I, O>, decltype(made_filter1)>::value, "make_filter wrong result type");
215         RunPipeline(made_filter1);
216 
217         auto made_filter2 = oneapi::tbb::make_filter(mode, [](I)->O{return O();});
218         static_assert(std::is_same<oneapi::tbb::filter<I, O>, decltype(made_filter2)>::value, "make_filter wrong result type");
219         RunPipeline(made_filter2);
220 
221         oneapi::tbb::filter<I, O> one_filter(mode, [](I)->O{return O();});
222         RunPipeline(one_filter);
223 
224         oneapi::tbb::filter<I, O> copy_filter(one_filter);
225         RunPipeline(one_filter);
226 
227         default_filter = copy_filter;
228         RunPipeline(default_filter);
229         default_filter.clear();
230     }
231 }
232 
233 //! Testing filters concatenation
234 //! \brief \ref interface \ref requirement
235 TEST_CASE_TEMPLATE("Testing filters concatenation", T, std::tuple<size_t, int>,
236                                                        std::tuple<int, double>,
237                                                        std::tuple<unsigned int*, size_t>,
238                                                        std::tuple<unsigned short, unsigned short*>,
239                                                        std::tuple<double*, unsigned short*>,
240                                                        std::tuple<std::unique_ptr<int>, std::unique_ptr<int> >)
241 {
242     using I = typename std::tuple_element<0, T>::type;
243     using O = typename std::tuple_element<1, T>::type;
244 
245     for(int fi = 0; fi< 27; fi++)
246     {
247         int i = fi%3;
248         int j = (fi/3)%3;
249         int k = (fi/9);
250         auto filter_chain = oneapi::tbb::filter<void, I>(filter_table[i], input_filter<I>()) &
251                             oneapi::tbb::filter<I, O>(filter_table[j], middle_filter<I,O>()) &
252                             oneapi::tbb::filter<O, void>(filter_table[k], output_filter<O>());
253         RunPipeline(filter_chain);
254 
255         oneapi::tbb::filter<void, I> filter1 = oneapi::tbb::filter<void, I>(filter_table[i], input_filter<I>());
256         oneapi::tbb::filter<I, O> filter2 = oneapi::tbb::filter<I, O>(filter_table[j], middle_filter<I,O>());
257         oneapi::tbb::filter<O, void> filter3 = oneapi::tbb::filter<O, void>(filter_table[k], output_filter<O>());
258 
259         auto fitler12 = filter1 & filter2;
260         auto fitler23 = filter2 & filter3;
261         auto fitler123 = filter1 & filter2 & filter3;
262 
263         RunPipeline(fitler12 & filter3);
264         RunPipeline(filter1 & fitler23);
265         RunPipeline(fitler123);
266     }
267 }
268 
269 void doWork() {
270     for (int i = 0; i < 10; ++i)
271         utils::yield();
272 }
273 
274 //! Testing filter modes
275 //! \brief \ref interface \ref requirement
276 TEST_CASE("Testing filter modes")
277 {
278     for ( auto concurrency_level : utils::concurrency_range() )
279     {
280         oneapi::tbb::global_control control(oneapi::tbb::global_control::max_allowed_parallelism, concurrency_level);
281 
282         std::atomic<short> serial_checker{0};
283         oneapi::tbb::filter<void,short> filter1(oneapi::tbb::filter_mode::serial_out_of_order,
284                                 [&serial_checker](oneapi::tbb::flow_control&fc)
285                                 {
286                                     auto check_value = ++serial_checker;
287                                     doWork();
288                                     CHECK_MESSAGE(check_value == serial_checker, "a serial filter was executed concurrently");
289                                     if(serial_checker>=(short)max_counter)
290                                     {
291                                         fc.stop();
292                                     }
293                                     return check_value;
294                                 });
295 
296         std::atomic<short> serial_checker2{ 0 };
297         oneapi::tbb::filter<short, short> filter2(oneapi::tbb::filter_mode::serial_in_order,
298             [&serial_checker2](int)
299             {
300                 auto check_value = ++serial_checker2;
301                 doWork();
302                 CHECK_MESSAGE(check_value == serial_checker2, "a serial filter was executed concurrently");
303                 return check_value;
304             });
305 
306         utils::SpinBarrier spin_barrier(utils::min(concurrency_level, n_tokens), true);
307         oneapi::tbb::filter<short,int> filter3(oneapi::tbb::filter_mode::parallel,
308                                 [&spin_barrier](int value)
309                                 {
310                                     spin_barrier.wait();
311                                     doWork();
312                                     return value;
313                                 });
314 
315 
316         std::atomic<short> order_checker{0};
317         oneapi::tbb::filter<int,void> filter4(oneapi::tbb::filter_mode::serial_in_order,
318                                 [&order_checker](int value)
319                                 {
320                                     CHECK_MESSAGE(++order_checker == value, "the order of message was broken");
321                                 });
322 
323         oneapi::tbb::parallel_pipeline(n_tokens, filter1 & filter2 & filter3 & filter4);
324     }
325 }
326 
327 //! Testing max tocken number
328 //! \brief \ref interface \ref requirement
329 TEST_CASE("Testing max token number")
330 {
331     for(unsigned int i = 1; i < n_tokens; i++)
332     {
333         std::atomic<short> active_tokens{0};
334         int counter{0};
335 
336         oneapi::tbb::filter<void,int> filter1(oneapi::tbb::filter_mode::parallel,
337                                 [&active_tokens,&counter](oneapi::tbb::flow_control&fc)
338                                 {
339                                     ++active_tokens;
340                                     doWork();
341                                     CHECK_MESSAGE(active_tokens <= n_tokens, "max number of tokens is exceed");
342                                     --active_tokens;
343                                     if(++counter>=max_counter)
344                                     {
345                                         fc.stop();
346                                     }
347                                     return 0;
348                                 });
349 
350         oneapi::tbb::filter<int,int> filter2(oneapi::tbb::filter_mode::parallel,
351                                 [&active_tokens](int value)
352                                 {
353                                     ++active_tokens;
354                                     doWork();
355                                     CHECK_MESSAGE(active_tokens <= n_tokens, "max number of tockens is exceed");
356                                     --active_tokens;
357                                     return value;
358                                 });
359 
360         oneapi::tbb::filter<int,void> filter3(oneapi::tbb::filter_mode::serial_out_of_order,
361                                 [&active_tokens](int)
362                                 {
363                                     ++active_tokens;
364                                     doWork();
365                                     CHECK_MESSAGE(active_tokens <= n_tokens, "max number of tockens is exceed");
366                                     --active_tokens;
367                                 });
368 
369         oneapi::tbb::parallel_pipeline(i, filter1 & filter2 & filter3);
370     }
371 }
372 
373 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
374 
375 //! Testing deduction guides
376 //! \brief \ref interface \ref requirement
377 TEST_CASE_TEMPLATE("Deduction guides testing", T, int, unsigned int, double)
378 {
379     input_filter<T> i_filter;
380     oneapi::tbb::filter  fc1(oneapi::tbb::filter_mode::serial_in_order, i_filter);
381     static_assert(std::is_same_v<decltype(fc1), oneapi::tbb::filter<void, T>>);
382 
383     oneapi::tbb::filter fc2 (fc1);
384     static_assert(std::is_same_v<decltype(fc2), oneapi::tbb::filter<void, T>>);
385 
386     middle_filter<T, std::size_t> m_filter;
387     oneapi::tbb::filter  fc3(oneapi::tbb::filter_mode::serial_in_order, m_filter);
388     static_assert(std::is_same_v<decltype(fc3), oneapi::tbb::filter<T, std::size_t>>);
389 
390     oneapi::tbb::filter frv(oneapi::tbb::filter_mode::serial_in_order, [](int&&) -> double { return 0.0; });
391     oneapi::tbb::filter fclv(oneapi::tbb::filter_mode::serial_in_order, [](const int&) -> double { return 0.0; });
392     oneapi::tbb::filter fc(oneapi::tbb::filter_mode::serial_in_order, [](const int) -> double { return 0.0; });
393 
394     static_assert(std::is_same_v<decltype(frv), oneapi::tbb::filter<int, double>>);
395     static_assert(std::is_same_v<decltype(fclv), oneapi::tbb::filter<int, double>>);
396     static_assert(std::is_same_v<decltype(fc), oneapi::tbb::filter<int, double>>);
397 }
398 #endif  //__TBB_CPP17_DEDUCTION_GUIDES_PRESENT
399