1 /* 2 Copyright (c) 2020-2023 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #include "common/test.h" 18 #include "common/utils.h" 19 #include "common/checktype.h" 20 #include "common/spin_barrier.h" 21 #include "common/utils_concurrency_limit.h" 22 #include "common/test_invoke.h" 23 24 #include "oneapi/tbb/parallel_pipeline.h" 25 #include "oneapi/tbb/global_control.h" 26 #include "oneapi/tbb/task_group.h" 27 28 #include <atomic> 29 #include <thread> 30 #include <string.h> 31 #include <memory> 32 #include <tuple> 33 34 //! \file conformance_parallel_pipeline.cpp 35 //! \brief Test for [algorithms.parallel_pipeline algorithms.parallel_pipeline.flow_control] specification 36 37 constexpr std::size_t n_tokens = 8; 38 39 constexpr int max_counter = 1024; 40 41 static std::atomic<int> input_counter{ max_counter }; 42 43 template<typename U> 44 class input_filter { 45 public: 46 U operator()( oneapi::tbb::flow_control& control ) const { 47 if( --input_counter < 0 ) { 48 control.stop(); 49 input_counter = max_counter; 50 } 51 return U(); 52 } 53 }; 54 55 template<typename T, typename U> 56 class middle_filter { 57 public: 58 U operator()(T) const { 59 U out{}; 60 return out; 61 } 62 }; 63 64 template<typename T> 65 class output_filter { 66 public: 67 void operator()(T ) const {} 68 }; 69 70 static const oneapi::tbb::filter_mode filter_table[] = { oneapi::tbb::filter_mode::parallel, 71 oneapi::tbb::filter_mode::serial_in_order, 72 oneapi::tbb::filter_mode::serial_out_of_order}; 73 74 template<typename Body, typename... Cotnext> 75 void TestSingleFilter(Body body, Cotnext&... context) { 76 77 for(int i =0; i <3; i++) 78 { 79 oneapi::tbb::filter_mode mode = filter_table[i]; 80 81 oneapi::tbb::filter<void, void> one_filter( mode, body ); 82 oneapi::tbb::parallel_pipeline( n_tokens, one_filter, context... ); 83 84 oneapi::tbb::parallel_pipeline( n_tokens, oneapi::tbb::filter<void, void>(mode, body), context... ); 85 86 oneapi::tbb::parallel_pipeline( n_tokens, oneapi::tbb::make_filter<void, void>(mode, body), context...); 87 } 88 } 89 90 void TestSingleFilterFunctor() { 91 92 input_filter<void> i_filter; 93 94 TestSingleFilter(i_filter); 95 96 oneapi::tbb::task_group_context context; 97 TestSingleFilter(i_filter, context); 98 } 99 100 101 void TestSingleFilterLambda() { 102 103 104 TestSingleFilter([]( oneapi::tbb::flow_control& control ) { 105 if(input_counter-- == 0 ) { 106 control.stop(); 107 input_counter = max_counter; 108 } 109 } ); 110 111 oneapi::tbb::task_group_context context; 112 TestSingleFilter([]( oneapi::tbb::flow_control& control ) { 113 if(input_counter-- == 0 ) { 114 control.stop(); 115 input_counter = max_counter; 116 } 117 }, context); 118 } 119 120 template<typename I, typename O> 121 void RunPipeline(const oneapi::tbb::filter<I, O> &filter) 122 { 123 bool flag{false}; 124 125 auto f_beg = oneapi::tbb::make_filter<void, I>(oneapi::tbb::filter_mode::serial_out_of_order, 126 [&flag](oneapi::tbb::flow_control& fc) -> I{ 127 if(flag) { 128 fc.stop(); 129 } 130 flag = true; 131 return I(); 132 }); 133 134 auto f_end = oneapi::tbb::make_filter<O, void>(oneapi::tbb::filter_mode::serial_in_order, 135 [](O) {}); 136 137 oneapi::tbb::parallel_pipeline(n_tokens, f_beg & filter & f_end); 138 } 139 140 void RunPipeline(const oneapi::tbb::filter<void, void> &filter) 141 { 142 oneapi::tbb::parallel_pipeline(n_tokens, filter); 143 } 144 145 template<typename Iterator1, typename Iterator2> 146 void RootSequence( Iterator1 first, Iterator1 last, Iterator2 res) { 147 using ValueType = typename Iterator1::value_type; 148 oneapi::tbb::parallel_pipeline( n_tokens, 149 oneapi::tbb::make_filter<void,ValueType>( 150 oneapi::tbb::filter_mode::serial_in_order, 151 [&first, &last](oneapi::tbb::flow_control& fc)-> ValueType{ 152 if( first<last ) { 153 ValueType val = *first; 154 ++first; 155 return val; 156 } else { 157 fc.stop(); 158 return ValueType{}; 159 } 160 } 161 ) & 162 oneapi::tbb::make_filter<ValueType,ValueType>( 163 oneapi::tbb::filter_mode::parallel, 164 [](ValueType p){return p*p;} 165 ) & 166 oneapi::tbb::make_filter<ValueType,void>( 167 oneapi::tbb::filter_mode::serial_in_order, 168 [&res](ValueType x) { 169 *res = x; 170 ++res; } 171 ) 172 ); 173 } 174 175 //! Testing pipeline correctness 176 //! \brief \ref interface \ref requirement 177 TEST_CASE("Testing pipeline correctness") 178 { 179 std::vector<double> input(max_counter); 180 std::vector<double> output(max_counter); 181 for(std::size_t i = 0; i < max_counter; i++) 182 input[i] = (double)i; 183 184 RootSequence(input.cbegin(), input.cend(), output.begin()); 185 for(int i = 0; i < max_counter; i++) { 186 CHECK_MESSAGE(output[i] == input[i]*input[i], "pipeline result is incorrect"); 187 } 188 } 189 190 //! Testing pipeline with single filter 191 //! \brief \ref interface \ref requirement 192 TEST_CASE("Testing pipeline with single filter") 193 { 194 TestSingleFilterFunctor(); 195 TestSingleFilterLambda(); 196 } 197 198 //! Testing single filter with different ways of creation 199 //! \brief \ref interface \ref requirement 200 TEST_CASE_TEMPLATE("Filter creation testing", T, std::tuple<size_t, int>, 201 std::tuple<int, double>, 202 std::tuple<unsigned int*, size_t>, 203 std::tuple<unsigned short, unsigned short*>, 204 std::tuple<double*, unsigned short*>, 205 std::tuple<std::unique_ptr<int>, std::unique_ptr<int> >) 206 { 207 using I = typename std::tuple_element<0, T>::type; 208 using O = typename std::tuple_element<1, T>::type; 209 for(int i = 0; i < 3; i++) 210 { 211 oneapi::tbb::filter_mode mode = filter_table[i]; 212 oneapi::tbb::filter<I, O> default_filter; 213 214 auto made_filter1 = oneapi::tbb::make_filter<I,O>(mode, [](I)->O{return O();}); 215 static_assert(std::is_same<oneapi::tbb::filter<I, O>, decltype(made_filter1)>::value, "make_filter wrong result type"); 216 RunPipeline(made_filter1); 217 218 auto made_filter2 = oneapi::tbb::make_filter(mode, [](I)->O{return O();}); 219 static_assert(std::is_same<oneapi::tbb::filter<I, O>, decltype(made_filter2)>::value, "make_filter wrong result type"); 220 RunPipeline(made_filter2); 221 222 oneapi::tbb::filter<I, O> one_filter(mode, [](I)->O{return O();}); 223 RunPipeline(one_filter); 224 225 oneapi::tbb::filter<I, O> copy_filter(one_filter); 226 RunPipeline(one_filter); 227 228 default_filter = copy_filter; 229 RunPipeline(default_filter); 230 default_filter.clear(); 231 } 232 } 233 234 //! Testing filters concatenation 235 //! \brief \ref interface \ref requirement 236 TEST_CASE_TEMPLATE("Testing filters concatenation", T, std::tuple<size_t, int>, 237 std::tuple<int, double>, 238 std::tuple<unsigned int*, size_t>, 239 std::tuple<unsigned short, unsigned short*>, 240 std::tuple<double*, unsigned short*>, 241 std::tuple<std::unique_ptr<int>, std::unique_ptr<int> >) 242 { 243 using I = typename std::tuple_element<0, T>::type; 244 using O = typename std::tuple_element<1, T>::type; 245 246 for(int fi = 0; fi< 27; fi++) 247 { 248 int i = fi%3; 249 int j = (fi/3)%3; 250 int k = (fi/9); 251 auto filter_chain = oneapi::tbb::filter<void, I>(filter_table[i], input_filter<I>()) & 252 oneapi::tbb::filter<I, O>(filter_table[j], middle_filter<I,O>()) & 253 oneapi::tbb::filter<O, void>(filter_table[k], output_filter<O>()); 254 RunPipeline(filter_chain); 255 256 oneapi::tbb::filter<void, I> filter1 = oneapi::tbb::filter<void, I>(filter_table[i], input_filter<I>()); 257 oneapi::tbb::filter<I, O> filter2 = oneapi::tbb::filter<I, O>(filter_table[j], middle_filter<I,O>()); 258 oneapi::tbb::filter<O, void> filter3 = oneapi::tbb::filter<O, void>(filter_table[k], output_filter<O>()); 259 260 auto fitler12 = filter1 & filter2; 261 auto fitler23 = filter2 & filter3; 262 auto fitler123 = filter1 & filter2 & filter3; 263 264 RunPipeline(fitler12 & filter3); 265 RunPipeline(filter1 & fitler23); 266 RunPipeline(fitler123); 267 } 268 } 269 270 void doWork() { 271 for (int i = 0; i < 10; ++i) 272 utils::yield(); 273 } 274 275 //! Testing filter modes 276 //! \brief \ref interface \ref requirement 277 TEST_CASE("Testing filter modes") 278 { 279 for ( auto concurrency_level : utils::concurrency_range() ) 280 { 281 oneapi::tbb::global_control control(oneapi::tbb::global_control::max_allowed_parallelism, concurrency_level); 282 283 short serial_checker{0}; 284 oneapi::tbb::filter<void,short> filter1(oneapi::tbb::filter_mode::serial_out_of_order, 285 [&serial_checker](oneapi::tbb::flow_control&fc) 286 { 287 auto check_value = ++serial_checker; 288 doWork(); 289 CHECK_MESSAGE(check_value == serial_checker, "a serial filter was executed concurrently"); 290 if(serial_checker>=(short)max_counter) 291 { 292 fc.stop(); 293 } 294 return check_value; 295 }); 296 297 short serial_checker2{ 0 }; 298 oneapi::tbb::filter<short, short> filter2(oneapi::tbb::filter_mode::serial_in_order, 299 [&serial_checker2](int) 300 { 301 auto check_value = ++serial_checker2; 302 doWork(); 303 CHECK_MESSAGE(check_value == serial_checker2, "a serial filter was executed concurrently"); 304 return check_value; 305 }); 306 307 utils::SpinBarrier spin_barrier(utils::min(concurrency_level, n_tokens), true); 308 oneapi::tbb::filter<short,int> filter3(oneapi::tbb::filter_mode::parallel, 309 [&spin_barrier](int value) 310 { 311 spin_barrier.wait(); 312 doWork(); 313 return value; 314 }); 315 316 317 short order_checker{0}; 318 oneapi::tbb::filter<int,void> filter4(oneapi::tbb::filter_mode::serial_in_order, 319 [&order_checker](int value) 320 { 321 CHECK_MESSAGE(++order_checker == value, "the order of message was broken"); 322 }); 323 324 oneapi::tbb::parallel_pipeline(n_tokens, filter1 & filter2 & filter3 & filter4); 325 } 326 } 327 328 //! Testing max tocken number 329 //! \brief \ref interface \ref requirement 330 TEST_CASE("Testing max token number") 331 { 332 for(unsigned int i = 1; i < n_tokens; i++) 333 { 334 std::atomic<short> active_tokens{0}; 335 336 oneapi::tbb::filter<void,int> filter1(oneapi::tbb::filter_mode::parallel, 337 [&active_tokens](oneapi::tbb::flow_control&fc) 338 { 339 ++active_tokens; 340 doWork(); 341 CHECK_MESSAGE(active_tokens <= n_tokens, "max number of tokens is exceed"); 342 --active_tokens; 343 if (--input_counter < 0) { 344 fc.stop(); 345 input_counter = max_counter; 346 } 347 return 0; 348 }); 349 350 oneapi::tbb::filter<int,int> filter2(oneapi::tbb::filter_mode::parallel, 351 [&active_tokens](int value) 352 { 353 ++active_tokens; 354 doWork(); 355 CHECK_MESSAGE(active_tokens <= n_tokens, "max number of tockens is exceed"); 356 --active_tokens; 357 return value; 358 }); 359 360 oneapi::tbb::filter<int,void> filter3(oneapi::tbb::filter_mode::serial_out_of_order, 361 [&active_tokens](int) 362 { 363 ++active_tokens; 364 doWork(); 365 CHECK_MESSAGE(active_tokens <= n_tokens, "max number of tockens is exceed"); 366 --active_tokens; 367 }); 368 369 oneapi::tbb::parallel_pipeline(i, filter1 & filter2 & filter3); 370 } 371 } 372 373 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT 374 375 template <typename... T> struct print; 376 377 //! Testing deduction guides 378 //! \brief \ref interface \ref requirement 379 TEST_CASE_TEMPLATE("Deduction guides testing", T, int, unsigned int, double) 380 { 381 input_filter<T> i_filter; 382 383 oneapi::tbb::filter fc1(oneapi::tbb::filter_mode::serial_in_order, i_filter); 384 static_assert(std::is_same_v<decltype(fc1), oneapi::tbb::filter<void, T>>); 385 386 oneapi::tbb::filter fc2 (fc1); 387 static_assert(std::is_same_v<decltype(fc2), oneapi::tbb::filter<void, T>>); 388 389 middle_filter<T, std::size_t> m_filter; 390 oneapi::tbb::filter fc3(oneapi::tbb::filter_mode::serial_in_order, m_filter); 391 static_assert(std::is_same_v<decltype(fc3), oneapi::tbb::filter<T, std::size_t>>); 392 393 oneapi::tbb::filter frv(oneapi::tbb::filter_mode::serial_in_order, [](int&&) -> double { return 0.0; }); 394 oneapi::tbb::filter fclv(oneapi::tbb::filter_mode::serial_in_order, [](const int&) -> double { return 0.0; }); 395 oneapi::tbb::filter fc(oneapi::tbb::filter_mode::serial_in_order, [](const int) -> double { return 0.0; }); 396 397 static_assert(std::is_same_v<decltype(frv), oneapi::tbb::filter<int, double>>); 398 static_assert(std::is_same_v<decltype(fclv), oneapi::tbb::filter<int, double>>); 399 static_assert(std::is_same_v<decltype(fc), oneapi::tbb::filter<int, double>>); 400 } 401 #endif //__TBB_CPP17_DEDUCTION_GUIDES_PRESENT 402 403 #if __TBB_CPP17_INVOKE_PRESENT 404 405 template <typename MiddleFilterBody, typename LastFilterBody> 406 void test_pipeline_invoke_basic(const MiddleFilterBody& middle_body, const LastFilterBody& last_body) { 407 using output_filter_type = test_invoke::SmartID<std::size_t>; 408 using middle_filter_type = test_invoke::SmartID<output_filter_type>; 409 410 const std::size_t input_count = 10; 411 std::size_t signal_point = 0; 412 std::size_t counter = 0; 413 414 auto first_body = [&](oneapi::tbb::flow_control& fc) -> middle_filter_type { 415 if (++counter > input_count) { 416 fc.stop(); 417 } 418 return middle_filter_type{output_filter_type{&signal_point}}; 419 }; 420 421 auto first_filter = oneapi::tbb::make_filter<void, middle_filter_type>(oneapi::tbb::filter_mode::serial_in_order, first_body); 422 auto middle_filter = oneapi::tbb::make_filter<middle_filter_type, output_filter_type>(oneapi::tbb::filter_mode::serial_in_order, middle_body); 423 auto last_filter = oneapi::tbb::make_filter<output_filter_type, void>(oneapi::tbb::filter_mode::serial_in_order, last_body); 424 425 oneapi::tbb::parallel_pipeline(16, first_filter & middle_filter & last_filter); 426 427 CHECK(signal_point == input_count); 428 } 429 430 //! Test that parallel_pipeline uses std::invoke to run the filter body 431 //! \brief \ref requirement 432 TEST_CASE("parallel_pipeline and std::invoke") { 433 using output_filter_type = test_invoke::SmartID<std::size_t>; 434 using middle_filter_type = test_invoke::SmartID<output_filter_type>; 435 436 test_pipeline_invoke_basic(&middle_filter_type::get_id, &output_filter_type::operate); // Pointer to non-static function as middle filter 437 test_pipeline_invoke_basic(&middle_filter_type::id, &output_filter_type::operate); // Pointer to non-static member as middle filter 438 } 439 440 #endif // __TBB_CPP17_INVOKE_PRESENT 441