1 /*
2 Copyright (c) 2020-2023 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16
17 #include "common/test.h"
18 #include "common/utils.h"
19 #include "common/checktype.h"
20 #include "common/spin_barrier.h"
21 #include "common/utils_concurrency_limit.h"
22 #include "common/test_invoke.h"
23
24 #include "oneapi/tbb/parallel_pipeline.h"
25 #include "oneapi/tbb/global_control.h"
26 #include "oneapi/tbb/task_group.h"
27
28 #include <atomic>
29 #include <thread>
30 #include <string.h>
31 #include <memory>
32 #include <tuple>
33
34 //! \file conformance_parallel_pipeline.cpp
35 //! \brief Test for [algorithms.parallel_pipeline algorithms.parallel_pipeline.flow_control] specification
36
37 constexpr std::size_t n_tokens = 8;
38
39 constexpr int max_counter = 1024;
40
41 static std::atomic<int> input_counter{ max_counter };
42
43 template<typename U>
44 class input_filter {
45 public:
operator ()(oneapi::tbb::flow_control & control) const46 U operator()( oneapi::tbb::flow_control& control ) const {
47 if( --input_counter < 0 ) {
48 control.stop();
49 input_counter = max_counter;
50 }
51 return U();
52 }
53 };
54
55 template<typename T, typename U>
56 class middle_filter {
57 public:
operator ()(T) const58 U operator()(T) const {
59 U out{};
60 return out;
61 }
62 };
63
64 template<typename T>
65 class output_filter {
66 public:
operator ()(T) const67 void operator()(T ) const {}
68 };
69
70 static const oneapi::tbb::filter_mode filter_table[] = { oneapi::tbb::filter_mode::parallel,
71 oneapi::tbb::filter_mode::serial_in_order,
72 oneapi::tbb::filter_mode::serial_out_of_order};
73
74 template<typename Body, typename... Cotnext>
TestSingleFilter(Body body,Cotnext &...context)75 void TestSingleFilter(Body body, Cotnext&... context) {
76
77 for(int i =0; i <3; i++)
78 {
79 oneapi::tbb::filter_mode mode = filter_table[i];
80
81 oneapi::tbb::filter<void, void> one_filter( mode, body );
82 oneapi::tbb::parallel_pipeline( n_tokens, one_filter, context... );
83
84 oneapi::tbb::parallel_pipeline( n_tokens, oneapi::tbb::filter<void, void>(mode, body), context... );
85
86 oneapi::tbb::parallel_pipeline( n_tokens, oneapi::tbb::make_filter<void, void>(mode, body), context...);
87 }
88 }
89
TestSingleFilterFunctor()90 void TestSingleFilterFunctor() {
91
92 input_filter<void> i_filter;
93
94 TestSingleFilter(i_filter);
95
96 oneapi::tbb::task_group_context context;
97 TestSingleFilter(i_filter, context);
98 }
99
100
TestSingleFilterLambda()101 void TestSingleFilterLambda() {
102
103
104 TestSingleFilter([]( oneapi::tbb::flow_control& control ) {
105 if(input_counter-- == 0 ) {
106 control.stop();
107 input_counter = max_counter;
108 }
109 } );
110
111 oneapi::tbb::task_group_context context;
112 TestSingleFilter([]( oneapi::tbb::flow_control& control ) {
113 if(input_counter-- == 0 ) {
114 control.stop();
115 input_counter = max_counter;
116 }
117 }, context);
118 }
119
120 template<typename I, typename O>
RunPipeline(const oneapi::tbb::filter<I,O> & filter)121 void RunPipeline(const oneapi::tbb::filter<I, O> &filter)
122 {
123 bool flag{false};
124
125 auto f_beg = oneapi::tbb::make_filter<void, I>(oneapi::tbb::filter_mode::serial_out_of_order,
126 [&flag](oneapi::tbb::flow_control& fc) -> I{
127 if(flag) {
128 fc.stop();
129 }
130 flag = true;
131 return I();
132 });
133
134 auto f_end = oneapi::tbb::make_filter<O, void>(oneapi::tbb::filter_mode::serial_in_order,
135 [](O) {});
136
137 oneapi::tbb::parallel_pipeline(n_tokens, f_beg & filter & f_end);
138 }
139
RunPipeline(const oneapi::tbb::filter<void,void> & filter)140 void RunPipeline(const oneapi::tbb::filter<void, void> &filter)
141 {
142 oneapi::tbb::parallel_pipeline(n_tokens, filter);
143 }
144
145 template<typename Iterator1, typename Iterator2>
RootSequence(Iterator1 first,Iterator1 last,Iterator2 res)146 void RootSequence( Iterator1 first, Iterator1 last, Iterator2 res) {
147 using ValueType = typename Iterator1::value_type;
148 oneapi::tbb::parallel_pipeline( n_tokens,
149 oneapi::tbb::make_filter<void,ValueType>(
150 oneapi::tbb::filter_mode::serial_in_order,
151 [&first, &last](oneapi::tbb::flow_control& fc)-> ValueType{
152 if( first<last ) {
153 ValueType val = *first;
154 ++first;
155 return val;
156 } else {
157 fc.stop();
158 return ValueType{};
159 }
160 }
161 ) &
162 oneapi::tbb::make_filter<ValueType,ValueType>(
163 oneapi::tbb::filter_mode::parallel,
164 [](ValueType p){return p*p;}
165 ) &
166 oneapi::tbb::make_filter<ValueType,void>(
167 oneapi::tbb::filter_mode::serial_in_order,
168 [&res](ValueType x) {
169 *res = x;
170 ++res; }
171 )
172 );
173 }
174
175 //! Testing pipeline correctness
176 //! \brief \ref interface \ref requirement
177 TEST_CASE("Testing pipeline correctness")
178 {
179 std::vector<double> input(max_counter);
180 std::vector<double> output(max_counter);
181 for(std::size_t i = 0; i < max_counter; i++)
182 input[i] = (double)i;
183
184 RootSequence(input.cbegin(), input.cend(), output.begin());
185 for(int i = 0; i < max_counter; i++) {
186 CHECK_MESSAGE(output[i] == input[i]*input[i], "pipeline result is incorrect");
187 }
188 }
189
190 //! Testing pipeline with single filter
191 //! \brief \ref interface \ref requirement
192 TEST_CASE("Testing pipeline with single filter")
193 {
194 TestSingleFilterFunctor();
195 TestSingleFilterLambda();
196 }
197
198 //! Testing single filter with different ways of creation
199 //! \brief \ref interface \ref requirement
200 TEST_CASE_TEMPLATE("Filter creation testing", T, std::tuple<size_t, int>,
201 std::tuple<int, double>,
202 std::tuple<unsigned int*, size_t>,
203 std::tuple<unsigned short, unsigned short*>,
204 std::tuple<double*, unsigned short*>,
205 std::tuple<std::unique_ptr<int>, std::unique_ptr<int> >)
206 {
207 using I = typename std::tuple_element<0, T>::type;
208 using O = typename std::tuple_element<1, T>::type;
209 for(int i = 0; i < 3; i++)
210 {
211 oneapi::tbb::filter_mode mode = filter_table[i];
212 oneapi::tbb::filter<I, O> default_filter;
213
__anon5895f2ca0802(I)214 auto made_filter1 = oneapi::tbb::make_filter<I,O>(mode, [](I)->O{return O();});
215 static_assert(std::is_same<oneapi::tbb::filter<I, O>, decltype(made_filter1)>::value, "make_filter wrong result type");
216 RunPipeline(made_filter1);
217
__anon5895f2ca0902(I)218 auto made_filter2 = oneapi::tbb::make_filter(mode, [](I)->O{return O();});
219 static_assert(std::is_same<oneapi::tbb::filter<I, O>, decltype(made_filter2)>::value, "make_filter wrong result type");
220 RunPipeline(made_filter2);
221
__anon5895f2ca0a02(I)222 oneapi::tbb::filter<I, O> one_filter(mode, [](I)->O{return O();});
223 RunPipeline(one_filter);
224
225 oneapi::tbb::filter<I, O> copy_filter(one_filter);
226 RunPipeline(one_filter);
227
228 default_filter = copy_filter;
229 RunPipeline(default_filter);
230 default_filter.clear();
231 }
232 }
233
234 //! Testing filters concatenation
235 //! \brief \ref interface \ref requirement
236 TEST_CASE_TEMPLATE("Testing filters concatenation", T, std::tuple<size_t, int>,
237 std::tuple<int, double>,
238 std::tuple<unsigned int*, size_t>,
239 std::tuple<unsigned short, unsigned short*>,
240 std::tuple<double*, unsigned short*>,
241 std::tuple<std::unique_ptr<int>, std::unique_ptr<int> >)
242 {
243 using I = typename std::tuple_element<0, T>::type;
244 using O = typename std::tuple_element<1, T>::type;
245
246 for(int fi = 0; fi< 27; fi++)
247 {
248 int i = fi%3;
249 int j = (fi/3)%3;
250 int k = (fi/9);
251 auto filter_chain = oneapi::tbb::filter<void, I>(filter_table[i], input_filter<I>()) &
252 oneapi::tbb::filter<I, O>(filter_table[j], middle_filter<I,O>()) &
253 oneapi::tbb::filter<O, void>(filter_table[k], output_filter<O>());
254 RunPipeline(filter_chain);
255
256 oneapi::tbb::filter<void, I> filter1 = oneapi::tbb::filter<void, I>(filter_table[i], input_filter<I>());
257 oneapi::tbb::filter<I, O> filter2 = oneapi::tbb::filter<I, O>(filter_table[j], middle_filter<I,O>());
258 oneapi::tbb::filter<O, void> filter3 = oneapi::tbb::filter<O, void>(filter_table[k], output_filter<O>());
259
260 auto fitler12 = filter1 & filter2;
261 auto fitler23 = filter2 & filter3;
262 auto fitler123 = filter1 & filter2 & filter3;
263
264 RunPipeline(fitler12 & filter3);
265 RunPipeline(filter1 & fitler23);
266 RunPipeline(fitler123);
267 }
268 }
269
doWork()270 void doWork() {
271 for (int i = 0; i < 10; ++i)
272 utils::yield();
273 }
274
275 //! Testing filter modes
276 //! \brief \ref interface \ref requirement
277 TEST_CASE("Testing filter modes")
278 {
279 for ( auto concurrency_level : utils::concurrency_range() )
280 {
281 oneapi::tbb::global_control control(oneapi::tbb::global_control::max_allowed_parallelism, concurrency_level);
282
283 short serial_checker{0};
284 oneapi::tbb::filter<void,short> filter1(oneapi::tbb::filter_mode::serial_out_of_order,
285 [&serial_checker](oneapi::tbb::flow_control&fc)
__anon5895f2ca0b02(oneapi::tbb::flow_control&fc) 286 {
287 auto check_value = ++serial_checker;
288 doWork();
289 CHECK_MESSAGE(check_value == serial_checker, "a serial filter was executed concurrently");
290 if(serial_checker>=(short)max_counter)
291 {
292 fc.stop();
293 }
294 return check_value;
295 });
296
297 short serial_checker2{ 0 };
298 oneapi::tbb::filter<short, short> filter2(oneapi::tbb::filter_mode::serial_in_order,
299 [&serial_checker2](int)
__anon5895f2ca0c02(int) 300 {
301 auto check_value = ++serial_checker2;
302 doWork();
303 CHECK_MESSAGE(check_value == serial_checker2, "a serial filter was executed concurrently");
304 return check_value;
305 });
306
307 utils::SpinBarrier spin_barrier(utils::min(concurrency_level, n_tokens), true);
308 oneapi::tbb::filter<short,int> filter3(oneapi::tbb::filter_mode::parallel,
309 [&spin_barrier](int value)
__anon5895f2ca0d02(int value) 310 {
311 spin_barrier.wait();
312 doWork();
313 return value;
314 });
315
316
317 short order_checker{0};
318 oneapi::tbb::filter<int,void> filter4(oneapi::tbb::filter_mode::serial_in_order,
319 [&order_checker](int value)
__anon5895f2ca0e02(int value) 320 {
321 CHECK_MESSAGE(++order_checker == value, "the order of message was broken");
322 });
323
324 oneapi::tbb::parallel_pipeline(n_tokens, filter1 & filter2 & filter3 & filter4);
325 }
326 }
327
328 //! Testing max tocken number
329 //! \brief \ref interface \ref requirement
330 TEST_CASE("Testing max token number")
331 {
332 for(unsigned int i = 1; i < n_tokens; i++)
333 {
334 std::atomic<short> active_tokens{0};
335
336 oneapi::tbb::filter<void,int> filter1(oneapi::tbb::filter_mode::parallel,
337 [&active_tokens](oneapi::tbb::flow_control&fc)
__anon5895f2ca0f02(oneapi::tbb::flow_control&fc) 338 {
339 ++active_tokens;
340 doWork();
341 CHECK_MESSAGE(active_tokens <= n_tokens, "max number of tokens is exceed");
342 --active_tokens;
343 if (--input_counter < 0) {
344 fc.stop();
345 input_counter = max_counter;
346 }
347 return 0;
348 });
349
350 oneapi::tbb::filter<int,int> filter2(oneapi::tbb::filter_mode::parallel,
351 [&active_tokens](int value)
__anon5895f2ca1002(int value) 352 {
353 ++active_tokens;
354 doWork();
355 CHECK_MESSAGE(active_tokens <= n_tokens, "max number of tockens is exceed");
356 --active_tokens;
357 return value;
358 });
359
360 oneapi::tbb::filter<int,void> filter3(oneapi::tbb::filter_mode::serial_out_of_order,
361 [&active_tokens](int)
__anon5895f2ca1102(int) 362 {
363 ++active_tokens;
364 doWork();
365 CHECK_MESSAGE(active_tokens <= n_tokens, "max number of tockens is exceed");
366 --active_tokens;
367 });
368
369 oneapi::tbb::parallel_pipeline(i, filter1 & filter2 & filter3);
370 }
371 }
372
373 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
374
375 template <typename... T> struct print;
376
377 //! Testing deduction guides
378 //! \brief \ref interface \ref requirement
379 TEST_CASE_TEMPLATE("Deduction guides testing", T, int, unsigned int, double)
380 {
381 input_filter<T> i_filter;
382
383 oneapi::tbb::filter fc1(oneapi::tbb::filter_mode::serial_in_order, i_filter);
384 static_assert(std::is_same_v<decltype(fc1), oneapi::tbb::filter<void, T>>);
385
386 oneapi::tbb::filter fc2 (fc1);
387 static_assert(std::is_same_v<decltype(fc2), oneapi::tbb::filter<void, T>>);
388
389 middle_filter<T, std::size_t> m_filter;
390 oneapi::tbb::filter fc3(oneapi::tbb::filter_mode::serial_in_order, m_filter);
391 static_assert(std::is_same_v<decltype(fc3), oneapi::tbb::filter<T, std::size_t>>);
392
__anon5895f2ca1202(int&&) 393 oneapi::tbb::filter frv(oneapi::tbb::filter_mode::serial_in_order, [](int&&) -> double { return 0.0; });
__anon5895f2ca1302(const int&) 394 oneapi::tbb::filter fclv(oneapi::tbb::filter_mode::serial_in_order, [](const int&) -> double { return 0.0; });
__anon5895f2ca1402(const int) 395 oneapi::tbb::filter fc(oneapi::tbb::filter_mode::serial_in_order, [](const int) -> double { return 0.0; });
396
397 static_assert(std::is_same_v<decltype(frv), oneapi::tbb::filter<int, double>>);
398 static_assert(std::is_same_v<decltype(fclv), oneapi::tbb::filter<int, double>>);
399 static_assert(std::is_same_v<decltype(fc), oneapi::tbb::filter<int, double>>);
400 }
401 #endif //__TBB_CPP17_DEDUCTION_GUIDES_PRESENT
402
403 #if __TBB_CPP17_INVOKE_PRESENT
404
405 template <typename MiddleFilterBody, typename LastFilterBody>
test_pipeline_invoke_basic(const MiddleFilterBody & middle_body,const LastFilterBody & last_body)406 void test_pipeline_invoke_basic(const MiddleFilterBody& middle_body, const LastFilterBody& last_body) {
407 using output_filter_type = test_invoke::SmartID<std::size_t>;
408 using middle_filter_type = test_invoke::SmartID<output_filter_type>;
409
410 const std::size_t input_count = 10;
411 std::size_t signal_point = 0;
412 std::size_t counter = 0;
413
414 auto first_body = [&](oneapi::tbb::flow_control& fc) -> middle_filter_type {
415 if (++counter > input_count) {
416 fc.stop();
417 }
418 return middle_filter_type{output_filter_type{&signal_point}};
419 };
420
421 auto first_filter = oneapi::tbb::make_filter<void, middle_filter_type>(oneapi::tbb::filter_mode::serial_in_order, first_body);
422 auto middle_filter = oneapi::tbb::make_filter<middle_filter_type, output_filter_type>(oneapi::tbb::filter_mode::serial_in_order, middle_body);
423 auto last_filter = oneapi::tbb::make_filter<output_filter_type, void>(oneapi::tbb::filter_mode::serial_in_order, last_body);
424
425 oneapi::tbb::parallel_pipeline(16, first_filter & middle_filter & last_filter);
426
427 CHECK(signal_point == input_count);
428 }
429
430 //! Test that parallel_pipeline uses std::invoke to run the filter body
431 //! \brief \ref requirement
432 TEST_CASE("parallel_pipeline and std::invoke") {
433 using output_filter_type = test_invoke::SmartID<std::size_t>;
434 using middle_filter_type = test_invoke::SmartID<output_filter_type>;
435
436 test_pipeline_invoke_basic(&middle_filter_type::get_id, &output_filter_type::operate); // Pointer to non-static function as middle filter
437 test_pipeline_invoke_basic(&middle_filter_type::id, &output_filter_type::operate); // Pointer to non-static member as middle filter
438 }
439
440 #endif // __TBB_CPP17_INVOKE_PRESENT
441