1 /* 2 Copyright (c) 2020-2021 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #include "common/test.h" 18 #include "common/utils.h" 19 #include "common/checktype.h" 20 #include "common/spin_barrier.h" 21 #include "common/utils_concurrency_limit.h" 22 23 #include "oneapi/tbb/parallel_pipeline.h" 24 #include "oneapi/tbb/global_control.h" 25 #include "oneapi/tbb/task_group.h" 26 27 #include <atomic> 28 #include <thread> 29 #include <string.h> 30 #include <memory> 31 #include <tuple> 32 33 //! \file conformance_parallel_pipeline.cpp 34 //! \brief Test for [algorithms.parallel_pipeline algorithms.parallel_pipeline.flow_control] specification 35 36 constexpr std::size_t n_tokens = 8; 37 38 constexpr int max_counter = 1024; 39 40 static std::atomic<int> input_counter{ max_counter }; 41 42 template<typename U> 43 class input_filter { 44 public: 45 U operator()( oneapi::tbb::flow_control& control ) const { 46 if( --input_counter < 0 ) { 47 control.stop(); 48 input_counter = max_counter; 49 } 50 return U(); 51 } 52 }; 53 54 template<typename T, typename U> 55 class middle_filter { 56 public: 57 U operator()(T) const { 58 U out{}; 59 return out; 60 } 61 }; 62 63 template<typename T> 64 class output_filter { 65 public: 66 void operator()(T ) const {} 67 }; 68 69 static const oneapi::tbb::filter_mode filter_table[] = { oneapi::tbb::filter_mode::parallel, 70 oneapi::tbb::filter_mode::serial_in_order, 71 oneapi::tbb::filter_mode::serial_out_of_order}; 72 73 template<typename Body, typename... Cotnext> 74 void TestSingleFilter(Body body, Cotnext&... context) { 75 76 for(int i =0; i <3; i++) 77 { 78 oneapi::tbb::filter_mode mode = filter_table[i]; 79 80 oneapi::tbb::filter<void, void> one_filter( mode, body ); 81 oneapi::tbb::parallel_pipeline( n_tokens, one_filter, context... ); 82 83 oneapi::tbb::parallel_pipeline( n_tokens, oneapi::tbb::filter<void, void>(mode, body), context... ); 84 85 oneapi::tbb::parallel_pipeline( n_tokens, oneapi::tbb::make_filter<void, void>(mode, body), context...); 86 } 87 } 88 89 void TestSingleFilterFunctor() { 90 91 input_filter<void> i_filter; 92 93 TestSingleFilter(i_filter); 94 95 oneapi::tbb::task_group_context context; 96 TestSingleFilter(i_filter, context); 97 } 98 99 100 void TestSingleFilterLambda() { 101 102 103 TestSingleFilter([]( oneapi::tbb::flow_control& control ) { 104 if(input_counter-- == 0 ) { 105 control.stop(); 106 input_counter = max_counter; 107 } 108 } ); 109 110 oneapi::tbb::task_group_context context; 111 TestSingleFilter([]( oneapi::tbb::flow_control& control ) { 112 if(input_counter-- == 0 ) { 113 control.stop(); 114 input_counter = max_counter; 115 } 116 }, context); 117 } 118 119 template<typename I, typename O> 120 void RunPipeline(const oneapi::tbb::filter<I, O> &filter) 121 { 122 bool flag{false}; 123 124 auto f_beg = oneapi::tbb::make_filter<void, I>(oneapi::tbb::filter_mode::serial_out_of_order, 125 [&flag](oneapi::tbb::flow_control& fc) -> I{ 126 if(flag) { 127 fc.stop(); 128 } 129 flag = true; 130 return I(); 131 }); 132 133 auto f_end = oneapi::tbb::make_filter<O, void>(oneapi::tbb::filter_mode::serial_in_order, 134 [](O) {}); 135 136 oneapi::tbb::parallel_pipeline(n_tokens, f_beg & filter & f_end); 137 } 138 139 void RunPipeline(const oneapi::tbb::filter<void, void> &filter) 140 { 141 oneapi::tbb::parallel_pipeline(n_tokens, filter); 142 } 143 144 template<typename Iterator1, typename Iterator2> 145 void RootSequence( Iterator1 first, Iterator1 last, Iterator2 res) { 146 using ValueType = typename Iterator1::value_type; 147 oneapi::tbb::parallel_pipeline( n_tokens, 148 oneapi::tbb::make_filter<void,ValueType>( 149 oneapi::tbb::filter_mode::serial_in_order, 150 [&first, &last](oneapi::tbb::flow_control& fc)-> ValueType{ 151 if( first<last ) { 152 ValueType val = *first; 153 ++first; 154 return val; 155 } else { 156 fc.stop(); 157 return ValueType{}; 158 } 159 } 160 ) & 161 oneapi::tbb::make_filter<ValueType,ValueType>( 162 oneapi::tbb::filter_mode::parallel, 163 [](ValueType p){return p*p;} 164 ) & 165 oneapi::tbb::make_filter<ValueType,void>( 166 oneapi::tbb::filter_mode::serial_in_order, 167 [&res](ValueType x) { 168 *res = x; 169 ++res; } 170 ) 171 ); 172 } 173 174 //! Testing pipeline correctness 175 //! \brief \ref interface \ref requirement 176 TEST_CASE("Testing pipeline correctness") 177 { 178 std::vector<double> input(max_counter); 179 std::vector<double> output(max_counter); 180 for(std::size_t i = 0; i < max_counter; i++) 181 input[i] = (double)i; 182 183 RootSequence(input.cbegin(), input.cend(), output.begin()); 184 for(int i = 0; i < max_counter; i++) { 185 CHECK_MESSAGE(output[i] == input[i]*input[i], "pipeline result is incorrect"); 186 } 187 } 188 189 //! Testing pipeline with single filter 190 //! \brief \ref interface \ref requirement 191 TEST_CASE("Testing pipeline with single filter") 192 { 193 TestSingleFilterFunctor(); 194 TestSingleFilterLambda(); 195 } 196 197 //! Testing single filter with different ways of creation 198 //! \brief \ref interface \ref requirement 199 TEST_CASE_TEMPLATE("Filter creation testing", T, std::tuple<size_t, int>, 200 std::tuple<int, double>, 201 std::tuple<unsigned int*, size_t>, 202 std::tuple<unsigned short, unsigned short*>, 203 std::tuple<double*, unsigned short*>, 204 std::tuple<std::unique_ptr<int>, std::unique_ptr<int> >) 205 { 206 using I = typename std::tuple_element<0, T>::type; 207 using O = typename std::tuple_element<1, T>::type; 208 for(int i = 0; i < 3; i++) 209 { 210 oneapi::tbb::filter_mode mode = filter_table[i]; 211 oneapi::tbb::filter<I, O> default_filter; 212 213 auto made_filter1 = oneapi::tbb::make_filter<I,O>(mode, [](I)->O{return O();}); 214 static_assert(std::is_same<oneapi::tbb::filter<I, O>, decltype(made_filter1)>::value, "make_filter wrong result type"); 215 RunPipeline(made_filter1); 216 217 auto made_filter2 = oneapi::tbb::make_filter(mode, [](I)->O{return O();}); 218 static_assert(std::is_same<oneapi::tbb::filter<I, O>, decltype(made_filter2)>::value, "make_filter wrong result type"); 219 RunPipeline(made_filter2); 220 221 oneapi::tbb::filter<I, O> one_filter(mode, [](I)->O{return O();}); 222 RunPipeline(one_filter); 223 224 oneapi::tbb::filter<I, O> copy_filter(one_filter); 225 RunPipeline(one_filter); 226 227 default_filter = copy_filter; 228 RunPipeline(default_filter); 229 default_filter.clear(); 230 } 231 } 232 233 //! Testing filters concatenation 234 //! \brief \ref interface \ref requirement 235 TEST_CASE_TEMPLATE("Testing filters concatenation", T, std::tuple<size_t, int>, 236 std::tuple<int, double>, 237 std::tuple<unsigned int*, size_t>, 238 std::tuple<unsigned short, unsigned short*>, 239 std::tuple<double*, unsigned short*>, 240 std::tuple<std::unique_ptr<int>, std::unique_ptr<int> >) 241 { 242 using I = typename std::tuple_element<0, T>::type; 243 using O = typename std::tuple_element<1, T>::type; 244 245 for(int fi = 0; fi< 27; fi++) 246 { 247 int i = fi%3; 248 int j = (fi/3)%3; 249 int k = (fi/9); 250 auto filter_chain = oneapi::tbb::filter<void, I>(filter_table[i], input_filter<I>()) & 251 oneapi::tbb::filter<I, O>(filter_table[j], middle_filter<I,O>()) & 252 oneapi::tbb::filter<O, void>(filter_table[k], output_filter<O>()); 253 RunPipeline(filter_chain); 254 255 oneapi::tbb::filter<void, I> filter1 = oneapi::tbb::filter<void, I>(filter_table[i], input_filter<I>()); 256 oneapi::tbb::filter<I, O> filter2 = oneapi::tbb::filter<I, O>(filter_table[j], middle_filter<I,O>()); 257 oneapi::tbb::filter<O, void> filter3 = oneapi::tbb::filter<O, void>(filter_table[k], output_filter<O>()); 258 259 auto fitler12 = filter1 & filter2; 260 auto fitler23 = filter2 & filter3; 261 auto fitler123 = filter1 & filter2 & filter3; 262 263 RunPipeline(fitler12 & filter3); 264 RunPipeline(filter1 & fitler23); 265 RunPipeline(fitler123); 266 } 267 } 268 269 void doWork() { 270 for (int i = 0; i < 10; ++i) 271 utils::yield(); 272 } 273 274 //! Testing filter modes 275 //! \brief \ref interface \ref requirement 276 TEST_CASE("Testing filter modes") 277 { 278 for ( auto concurrency_level : utils::concurrency_range() ) 279 { 280 oneapi::tbb::global_control control(oneapi::tbb::global_control::max_allowed_parallelism, concurrency_level); 281 282 std::atomic<short> serial_checker{0}; 283 oneapi::tbb::filter<void,short> filter1(oneapi::tbb::filter_mode::serial_out_of_order, 284 [&serial_checker](oneapi::tbb::flow_control&fc) 285 { 286 auto check_value = ++serial_checker; 287 doWork(); 288 CHECK_MESSAGE(check_value == serial_checker, "a serial filter was executed concurrently"); 289 if(serial_checker>=(short)max_counter) 290 { 291 fc.stop(); 292 } 293 return check_value; 294 }); 295 296 std::atomic<short> serial_checker2{ 0 }; 297 oneapi::tbb::filter<short, short> filter2(oneapi::tbb::filter_mode::serial_in_order, 298 [&serial_checker2](int) 299 { 300 auto check_value = ++serial_checker2; 301 doWork(); 302 CHECK_MESSAGE(check_value == serial_checker2, "a serial filter was executed concurrently"); 303 return check_value; 304 }); 305 306 utils::SpinBarrier spin_barrier(utils::min(concurrency_level, n_tokens), true); 307 oneapi::tbb::filter<short,int> filter3(oneapi::tbb::filter_mode::parallel, 308 [&spin_barrier](int value) 309 { 310 spin_barrier.wait(); 311 doWork(); 312 return value; 313 }); 314 315 316 std::atomic<short> order_checker{0}; 317 oneapi::tbb::filter<int,void> filter4(oneapi::tbb::filter_mode::serial_in_order, 318 [&order_checker](int value) 319 { 320 CHECK_MESSAGE(++order_checker == value, "the order of message was broken"); 321 }); 322 323 oneapi::tbb::parallel_pipeline(n_tokens, filter1 & filter2 & filter3 & filter4); 324 } 325 } 326 327 //! Testing max tocken number 328 //! \brief \ref interface \ref requirement 329 TEST_CASE("Testing max token number") 330 { 331 for(unsigned int i = 1; i < n_tokens; i++) 332 { 333 std::atomic<short> active_tokens{0}; 334 int counter{0}; 335 336 oneapi::tbb::filter<void,int> filter1(oneapi::tbb::filter_mode::parallel, 337 [&active_tokens,&counter](oneapi::tbb::flow_control&fc) 338 { 339 ++active_tokens; 340 doWork(); 341 CHECK_MESSAGE(active_tokens <= n_tokens, "max number of tokens is exceed"); 342 --active_tokens; 343 if(++counter>=max_counter) 344 { 345 fc.stop(); 346 } 347 return 0; 348 }); 349 350 oneapi::tbb::filter<int,int> filter2(oneapi::tbb::filter_mode::parallel, 351 [&active_tokens](int value) 352 { 353 ++active_tokens; 354 doWork(); 355 CHECK_MESSAGE(active_tokens <= n_tokens, "max number of tockens is exceed"); 356 --active_tokens; 357 return value; 358 }); 359 360 oneapi::tbb::filter<int,void> filter3(oneapi::tbb::filter_mode::serial_out_of_order, 361 [&active_tokens](int) 362 { 363 ++active_tokens; 364 doWork(); 365 CHECK_MESSAGE(active_tokens <= n_tokens, "max number of tockens is exceed"); 366 --active_tokens; 367 }); 368 369 oneapi::tbb::parallel_pipeline(i, filter1 & filter2 & filter3); 370 } 371 } 372 373 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT 374 375 //! Testing deduction guides 376 //! \brief \ref interface \ref requirement 377 TEST_CASE_TEMPLATE("Deduction guides testing", T, int, unsigned int, double) 378 { 379 input_filter<T> i_filter; 380 oneapi::tbb::filter fc1(oneapi::tbb::filter_mode::serial_in_order, i_filter); 381 static_assert(std::is_same_v<decltype(fc1), oneapi::tbb::filter<void, T>>); 382 383 oneapi::tbb::filter fc2 (fc1); 384 static_assert(std::is_same_v<decltype(fc2), oneapi::tbb::filter<void, T>>); 385 386 middle_filter<T, std::size_t> m_filter; 387 oneapi::tbb::filter fc3(oneapi::tbb::filter_mode::serial_in_order, m_filter); 388 static_assert(std::is_same_v<decltype(fc3), oneapi::tbb::filter<T, std::size_t>>); 389 390 oneapi::tbb::filter frv(oneapi::tbb::filter_mode::serial_in_order, [](int&&) -> double { return 0.0; }); 391 oneapi::tbb::filter fclv(oneapi::tbb::filter_mode::serial_in_order, [](const int&) -> double { return 0.0; }); 392 oneapi::tbb::filter fc(oneapi::tbb::filter_mode::serial_in_order, [](const int) -> double { return 0.0; }); 393 394 static_assert(std::is_same_v<decltype(frv), oneapi::tbb::filter<int, double>>); 395 static_assert(std::is_same_v<decltype(fclv), oneapi::tbb::filter<int, double>>); 396 static_assert(std::is_same_v<decltype(fc), oneapi::tbb::filter<int, double>>); 397 } 398 #endif //__TBB_CPP17_DEDUCTION_GUIDES_PRESENT 399