1 /*
2     Copyright (c) 2020-2023 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #include "common/test.h"
18 #include "common/utils.h"
19 #include "common/checktype.h"
20 #include "common/spin_barrier.h"
21 #include "common/utils_concurrency_limit.h"
22 #include "common/test_invoke.h"
23 
24 #include "oneapi/tbb/parallel_pipeline.h"
25 #include "oneapi/tbb/global_control.h"
26 #include "oneapi/tbb/task_group.h"
27 
28 #include <atomic>
29 #include <thread>
30 #include <string.h>
31 #include <memory>
32 #include <tuple>
33 
34 //! \file conformance_parallel_pipeline.cpp
35 //! \brief Test for [algorithms.parallel_pipeline algorithms.parallel_pipeline.flow_control] specification
36 
37 constexpr std::size_t n_tokens = 8;
38 
39 constexpr int max_counter = 1024;
40 
41 static std::atomic<int> input_counter{ max_counter };
42 
43 template<typename U>
44 class input_filter {
45 public:
operator ()(oneapi::tbb::flow_control & control) const46     U operator()( oneapi::tbb::flow_control& control ) const {
47         if( --input_counter < 0 ) {
48             control.stop();
49             input_counter = max_counter;
50         }
51         return U();
52     }
53 };
54 
55 template<typename T, typename U>
56 class middle_filter {
57 public:
operator ()(T) const58     U operator()(T) const {
59         U out{};
60         return out;
61     }
62 };
63 
64 template<typename T>
65 class output_filter {
66 public:
operator ()(T) const67     void operator()(T ) const {}
68 };
69 
70 static const oneapi::tbb::filter_mode filter_table[] = { oneapi::tbb::filter_mode::parallel,
71                                                  oneapi::tbb::filter_mode::serial_in_order,
72                                                  oneapi::tbb::filter_mode::serial_out_of_order};
73 
74 template<typename Body, typename... Cotnext>
TestSingleFilter(Body body,Cotnext &...context)75 void TestSingleFilter(Body body, Cotnext&... context) {
76 
77     for(int i =0; i <3; i++)
78     {
79         oneapi::tbb::filter_mode mode = filter_table[i];
80 
81         oneapi::tbb::filter<void, void> one_filter( mode, body );
82         oneapi::tbb::parallel_pipeline( n_tokens, one_filter, context...   );
83 
84         oneapi::tbb::parallel_pipeline( n_tokens, oneapi::tbb::filter<void, void>(mode, body), context... );
85 
86         oneapi::tbb::parallel_pipeline( n_tokens, oneapi::tbb::make_filter<void, void>(mode, body), context...);
87     }
88 }
89 
TestSingleFilterFunctor()90 void TestSingleFilterFunctor() {
91 
92     input_filter<void> i_filter;
93 
94     TestSingleFilter(i_filter);
95 
96     oneapi::tbb::task_group_context context;
97     TestSingleFilter(i_filter,  context);
98 }
99 
100 
TestSingleFilterLambda()101 void TestSingleFilterLambda() {
102 
103 
104     TestSingleFilter([]( oneapi::tbb::flow_control& control ) {
105                     if(input_counter-- == 0 ) {
106                         control.stop();
107                         input_counter = max_counter;
108                         }
109                     } );
110 
111     oneapi::tbb::task_group_context context;
112     TestSingleFilter([]( oneapi::tbb::flow_control& control ) {
113                      if(input_counter-- == 0 ) {
114                         control.stop();
115                         input_counter = max_counter;
116                         }
117                     },  context);
118 }
119 
120 template<typename I, typename O>
RunPipeline(const oneapi::tbb::filter<I,O> & filter)121 void RunPipeline(const oneapi::tbb::filter<I, O> &filter)
122 {
123     bool flag{false};
124 
125     auto f_beg = oneapi::tbb::make_filter<void, I>(oneapi::tbb::filter_mode::serial_out_of_order,
126                                         [&flag](oneapi::tbb::flow_control& fc) -> I{
127                                             if(flag) {
128                                                 fc.stop();
129                                             }
130                                             flag = true;
131                                             return I();
132                                         });
133 
134     auto f_end = oneapi::tbb::make_filter<O, void>(oneapi::tbb::filter_mode::serial_in_order,
135                                             [](O) {});
136 
137     oneapi::tbb::parallel_pipeline(n_tokens, f_beg & filter & f_end);
138 }
139 
RunPipeline(const oneapi::tbb::filter<void,void> & filter)140 void RunPipeline(const oneapi::tbb::filter<void, void> &filter)
141 {
142     oneapi::tbb::parallel_pipeline(n_tokens, filter);
143 }
144 
145 template<typename Iterator1, typename Iterator2>
RootSequence(Iterator1 first,Iterator1 last,Iterator2 res)146 void RootSequence( Iterator1 first, Iterator1 last, Iterator2 res) {
147     using ValueType = typename Iterator1::value_type;
148     oneapi::tbb::parallel_pipeline( n_tokens,
149         oneapi::tbb::make_filter<void,ValueType>(
150             oneapi::tbb::filter_mode::serial_in_order,
151             [&first, &last](oneapi::tbb::flow_control& fc)-> ValueType{
152                 if( first<last ) {
153                     ValueType val  = *first;
154                     ++first;
155                     return val;
156                  } else {
157                     fc.stop();
158                     return ValueType{};
159                 }
160             }
161         ) &
162         oneapi::tbb::make_filter<ValueType,ValueType>(
163             oneapi::tbb::filter_mode::parallel,
164             [](ValueType p){return p*p;}
165         ) &
166         oneapi::tbb::make_filter<ValueType,void>(
167             oneapi::tbb::filter_mode::serial_in_order,
168             [&res](ValueType x) {
169                 *res = x;
170                 ++res; }
171         )
172     );
173 }
174 
175 //! Testing pipeline correctness
176 //! \brief \ref interface \ref requirement
177 TEST_CASE("Testing pipeline correctness")
178 {
179     std::vector<double> input(max_counter);
180     std::vector<double> output(max_counter);
181     for(std::size_t i = 0; i < max_counter; i++)
182         input[i] = (double)i;
183 
184     RootSequence(input.cbegin(), input.cend(), output.begin());
185     for(int  i = 0; i < max_counter; i++) {
186         CHECK_MESSAGE(output[i] == input[i]*input[i], "pipeline result is incorrect");
187     }
188 }
189 
190 //! Testing pipeline with single filter
191 //! \brief \ref interface \ref requirement
192 TEST_CASE("Testing pipeline with single filter")
193 {
194     TestSingleFilterFunctor();
195     TestSingleFilterLambda();
196 }
197 
198 //! Testing single filter with different ways of creation
199 //! \brief \ref interface \ref requirement
200 TEST_CASE_TEMPLATE("Filter creation testing", T, std::tuple<size_t, int>,
201                                                  std::tuple<int, double>,
202                                                  std::tuple<unsigned int*, size_t>,
203                                                  std::tuple<unsigned short, unsigned short*>,
204                                                  std::tuple<double*, unsigned short*>,
205                                                  std::tuple<std::unique_ptr<int>, std::unique_ptr<int> >)
206 {
207     using I = typename std::tuple_element<0, T>::type;
208     using O = typename std::tuple_element<1, T>::type;
209     for(int i = 0; i < 3; i++)
210     {
211         oneapi::tbb::filter_mode mode = filter_table[i];
212         oneapi::tbb::filter<I, O> default_filter;
213 
__anon5895f2ca0802(I)214         auto made_filter1 = oneapi::tbb::make_filter<I,O>(mode, [](I)->O{return O();});
215         static_assert(std::is_same<oneapi::tbb::filter<I, O>, decltype(made_filter1)>::value, "make_filter wrong result type");
216         RunPipeline(made_filter1);
217 
__anon5895f2ca0902(I)218         auto made_filter2 = oneapi::tbb::make_filter(mode, [](I)->O{return O();});
219         static_assert(std::is_same<oneapi::tbb::filter<I, O>, decltype(made_filter2)>::value, "make_filter wrong result type");
220         RunPipeline(made_filter2);
221 
__anon5895f2ca0a02(I)222         oneapi::tbb::filter<I, O> one_filter(mode, [](I)->O{return O();});
223         RunPipeline(one_filter);
224 
225         oneapi::tbb::filter<I, O> copy_filter(one_filter);
226         RunPipeline(one_filter);
227 
228         default_filter = copy_filter;
229         RunPipeline(default_filter);
230         default_filter.clear();
231     }
232 }
233 
234 //! Testing filters concatenation
235 //! \brief \ref interface \ref requirement
236 TEST_CASE_TEMPLATE("Testing filters concatenation", T, std::tuple<size_t, int>,
237                                                        std::tuple<int, double>,
238                                                        std::tuple<unsigned int*, size_t>,
239                                                        std::tuple<unsigned short, unsigned short*>,
240                                                        std::tuple<double*, unsigned short*>,
241                                                        std::tuple<std::unique_ptr<int>, std::unique_ptr<int> >)
242 {
243     using I = typename std::tuple_element<0, T>::type;
244     using O = typename std::tuple_element<1, T>::type;
245 
246     for(int fi = 0; fi< 27; fi++)
247     {
248         int i = fi%3;
249         int j = (fi/3)%3;
250         int k = (fi/9);
251         auto filter_chain = oneapi::tbb::filter<void, I>(filter_table[i], input_filter<I>()) &
252                             oneapi::tbb::filter<I, O>(filter_table[j], middle_filter<I,O>()) &
253                             oneapi::tbb::filter<O, void>(filter_table[k], output_filter<O>());
254         RunPipeline(filter_chain);
255 
256         oneapi::tbb::filter<void, I> filter1 = oneapi::tbb::filter<void, I>(filter_table[i], input_filter<I>());
257         oneapi::tbb::filter<I, O> filter2 = oneapi::tbb::filter<I, O>(filter_table[j], middle_filter<I,O>());
258         oneapi::tbb::filter<O, void> filter3 = oneapi::tbb::filter<O, void>(filter_table[k], output_filter<O>());
259 
260         auto fitler12 = filter1 & filter2;
261         auto fitler23 = filter2 & filter3;
262         auto fitler123 = filter1 & filter2 & filter3;
263 
264         RunPipeline(fitler12 & filter3);
265         RunPipeline(filter1 & fitler23);
266         RunPipeline(fitler123);
267     }
268 }
269 
doWork()270 void doWork() {
271     for (int i = 0; i < 10; ++i)
272         utils::yield();
273 }
274 
275 //! Testing filter modes
276 //! \brief \ref interface \ref requirement
277 TEST_CASE("Testing filter modes")
278 {
279     for ( auto concurrency_level : utils::concurrency_range() )
280     {
281         oneapi::tbb::global_control control(oneapi::tbb::global_control::max_allowed_parallelism, concurrency_level);
282 
283         short serial_checker{0};
284         oneapi::tbb::filter<void,short> filter1(oneapi::tbb::filter_mode::serial_out_of_order,
285                                 [&serial_checker](oneapi::tbb::flow_control&fc)
__anon5895f2ca0b02(oneapi::tbb::flow_control&fc) 286                                 {
287                                     auto check_value = ++serial_checker;
288                                     doWork();
289                                     CHECK_MESSAGE(check_value == serial_checker, "a serial filter was executed concurrently");
290                                     if(serial_checker>=(short)max_counter)
291                                     {
292                                         fc.stop();
293                                     }
294                                     return check_value;
295                                 });
296 
297         short serial_checker2{ 0 };
298         oneapi::tbb::filter<short, short> filter2(oneapi::tbb::filter_mode::serial_in_order,
299             [&serial_checker2](int)
__anon5895f2ca0c02(int) 300             {
301                 auto check_value = ++serial_checker2;
302                 doWork();
303                 CHECK_MESSAGE(check_value == serial_checker2, "a serial filter was executed concurrently");
304                 return check_value;
305             });
306 
307         utils::SpinBarrier spin_barrier(utils::min(concurrency_level, n_tokens), true);
308         oneapi::tbb::filter<short,int> filter3(oneapi::tbb::filter_mode::parallel,
309                                 [&spin_barrier](int value)
__anon5895f2ca0d02(int value) 310                                 {
311                                     spin_barrier.wait();
312                                     doWork();
313                                     return value;
314                                 });
315 
316 
317         short order_checker{0};
318         oneapi::tbb::filter<int,void> filter4(oneapi::tbb::filter_mode::serial_in_order,
319                                 [&order_checker](int value)
__anon5895f2ca0e02(int value) 320                                 {
321                                     CHECK_MESSAGE(++order_checker == value, "the order of message was broken");
322                                 });
323 
324         oneapi::tbb::parallel_pipeline(n_tokens, filter1 & filter2 & filter3 & filter4);
325     }
326 }
327 
328 //! Testing max tocken number
329 //! \brief \ref interface \ref requirement
330 TEST_CASE("Testing max token number")
331 {
332     for(unsigned int i = 1; i < n_tokens; i++)
333     {
334         std::atomic<short> active_tokens{0};
335 
336         oneapi::tbb::filter<void,int> filter1(oneapi::tbb::filter_mode::parallel,
337                                 [&active_tokens](oneapi::tbb::flow_control&fc)
__anon5895f2ca0f02(oneapi::tbb::flow_control&fc) 338                                 {
339                                     ++active_tokens;
340                                     doWork();
341                                     CHECK_MESSAGE(active_tokens <= n_tokens, "max number of tokens is exceed");
342                                     --active_tokens;
343                                     if (--input_counter < 0) {
344                                         fc.stop();
345                                         input_counter = max_counter;
346                                     }
347                                     return 0;
348                                 });
349 
350         oneapi::tbb::filter<int,int> filter2(oneapi::tbb::filter_mode::parallel,
351                                 [&active_tokens](int value)
__anon5895f2ca1002(int value) 352                                 {
353                                     ++active_tokens;
354                                     doWork();
355                                     CHECK_MESSAGE(active_tokens <= n_tokens, "max number of tockens is exceed");
356                                     --active_tokens;
357                                     return value;
358                                 });
359 
360         oneapi::tbb::filter<int,void> filter3(oneapi::tbb::filter_mode::serial_out_of_order,
361                                 [&active_tokens](int)
__anon5895f2ca1102(int) 362                                 {
363                                     ++active_tokens;
364                                     doWork();
365                                     CHECK_MESSAGE(active_tokens <= n_tokens, "max number of tockens is exceed");
366                                     --active_tokens;
367                                 });
368 
369         oneapi::tbb::parallel_pipeline(i, filter1 & filter2 & filter3);
370     }
371 }
372 
373 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
374 
375 template <typename... T> struct print;
376 
377 //! Testing deduction guides
378 //! \brief \ref interface \ref requirement
379 TEST_CASE_TEMPLATE("Deduction guides testing", T, int, unsigned int, double)
380 {
381     input_filter<T> i_filter;
382 
383     oneapi::tbb::filter  fc1(oneapi::tbb::filter_mode::serial_in_order, i_filter);
384     static_assert(std::is_same_v<decltype(fc1), oneapi::tbb::filter<void, T>>);
385 
386     oneapi::tbb::filter fc2 (fc1);
387     static_assert(std::is_same_v<decltype(fc2), oneapi::tbb::filter<void, T>>);
388 
389     middle_filter<T, std::size_t> m_filter;
390     oneapi::tbb::filter  fc3(oneapi::tbb::filter_mode::serial_in_order, m_filter);
391     static_assert(std::is_same_v<decltype(fc3), oneapi::tbb::filter<T, std::size_t>>);
392 
__anon5895f2ca1202(int&&) 393     oneapi::tbb::filter frv(oneapi::tbb::filter_mode::serial_in_order, [](int&&) -> double { return 0.0; });
__anon5895f2ca1302(const int&) 394     oneapi::tbb::filter fclv(oneapi::tbb::filter_mode::serial_in_order, [](const int&) -> double { return 0.0; });
__anon5895f2ca1402(const int) 395     oneapi::tbb::filter fc(oneapi::tbb::filter_mode::serial_in_order, [](const int) -> double { return 0.0; });
396 
397     static_assert(std::is_same_v<decltype(frv), oneapi::tbb::filter<int, double>>);
398     static_assert(std::is_same_v<decltype(fclv), oneapi::tbb::filter<int, double>>);
399     static_assert(std::is_same_v<decltype(fc), oneapi::tbb::filter<int, double>>);
400 }
401 #endif  //__TBB_CPP17_DEDUCTION_GUIDES_PRESENT
402 
403 #if __TBB_CPP17_INVOKE_PRESENT
404 
405 template <typename MiddleFilterBody, typename LastFilterBody>
test_pipeline_invoke_basic(const MiddleFilterBody & middle_body,const LastFilterBody & last_body)406 void test_pipeline_invoke_basic(const MiddleFilterBody& middle_body, const LastFilterBody& last_body) {
407     using output_filter_type = test_invoke::SmartID<std::size_t>;
408     using middle_filter_type = test_invoke::SmartID<output_filter_type>;
409 
410     const std::size_t input_count = 10;
411     std::size_t signal_point = 0;
412     std::size_t counter = 0;
413 
414     auto first_body = [&](oneapi::tbb::flow_control& fc) -> middle_filter_type {
415         if (++counter > input_count) {
416             fc.stop();
417         }
418         return middle_filter_type{output_filter_type{&signal_point}};
419     };
420 
421     auto first_filter = oneapi::tbb::make_filter<void, middle_filter_type>(oneapi::tbb::filter_mode::serial_in_order, first_body);
422     auto middle_filter = oneapi::tbb::make_filter<middle_filter_type, output_filter_type>(oneapi::tbb::filter_mode::serial_in_order, middle_body);
423     auto last_filter = oneapi::tbb::make_filter<output_filter_type, void>(oneapi::tbb::filter_mode::serial_in_order, last_body);
424 
425     oneapi::tbb::parallel_pipeline(16, first_filter & middle_filter & last_filter);
426 
427     CHECK(signal_point == input_count);
428 }
429 
430 //! Test that parallel_pipeline uses std::invoke to run the filter body
431 //! \brief \ref requirement
432 TEST_CASE("parallel_pipeline and std::invoke") {
433     using output_filter_type = test_invoke::SmartID<std::size_t>;
434     using middle_filter_type = test_invoke::SmartID<output_filter_type>;
435 
436     test_pipeline_invoke_basic(&middle_filter_type::get_id, &output_filter_type::operate); // Pointer to non-static function as middle filter
437     test_pipeline_invoke_basic(&middle_filter_type::id, &output_filter_type::operate); // Pointer to non-static member as middle filter
438 }
439 
440 #endif // __TBB_CPP17_INVOKE_PRESENT
441