1 /* 2 Copyright (c) 2005-2022 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 // Before including parallel_pipeline.h, set up the variable to count heap allocated 18 // filter_node objects, and make it known for the header. 19 #include "common/test.h" 20 #include "common/utils.h" 21 #include "common/checktype.h" 22 23 int filter_node_count = 0; 24 #define __TBB_TEST_FILTER_NODE_COUNT filter_node_count 25 #include "tbb/parallel_pipeline.h" 26 #include "tbb/global_control.h" 27 #include "tbb/spin_mutex.h" 28 #include "tbb/task_group.h" 29 30 #include <atomic> 31 #include <string.h> 32 #include <memory> // std::unique_ptr 33 34 //! \file test_parallel_pipeline.cpp 35 //! \brief Test for [algorithms.parallel_pipeline algorithms.parallel_pipeline.flow_control] specification 36 37 const unsigned n_tokens = 8; 38 // we can conceivably have two buffers used in the middle filter for every token in flight, so 39 // we must allocate two buffers for every token. Unlikely, but possible. 40 const unsigned n_buffers = 2*n_tokens; 41 const int max_counter = 16; 42 43 static std::size_t concurrency = 0; 44 45 static std::atomic<int> output_counter; 46 static std::atomic<int> input_counter; 47 static std::atomic<int> non_pointer_specialized_calls; 48 static std::atomic<int> pointer_specialized_calls; 49 static std::atomic<int> first_pointer_specialized_calls; 50 static std::atomic<int> second_pointer_specialized_calls; 51 52 static int intbuffer[max_counter]; // store results for <int,int> parallel pipeline test 53 static bool check_intbuffer; 54 55 static void* buffers[n_buffers]; 56 static std::atomic_flag buf_in_use[n_buffers] = {ATOMIC_FLAG_INIT}; 57 58 void *fetchNextBuffer() { 59 for(size_t icnt = 0; icnt < n_buffers; ++icnt) { 60 if(!buf_in_use[icnt].test_and_set()) { 61 return buffers[icnt]; 62 } 63 } 64 CHECK_MESSAGE(false, "Ran out of buffers, p:"<< concurrency); 65 return nullptr; 66 } 67 void freeBuffer(void *buf) { 68 for(size_t i=0; i < n_buffers;++i) { 69 if(buffers[i] == buf) { 70 buf_in_use[i].clear(); 71 return; 72 } 73 } 74 CHECK_MESSAGE(false, "Tried to free a buffer not in our list, p:" << concurrency); 75 } 76 77 template<typename T> 78 class free_on_scope_exit { 79 public: 80 free_on_scope_exit(T *p) : my_p(p) {} 81 ~free_on_scope_exit() { if(!my_p) return; my_p->~T(); freeBuffer(my_p); } 82 private: 83 T *my_p; 84 }; 85 86 // methods for testing CheckType< >, that return okay values for other types. 87 template<typename T> 88 bool middle_is_ready(T &/*p*/) { return false; } 89 90 template<typename U> 91 bool middle_is_ready(CheckType<U> &p) { return p.is_ready(); } 92 93 template<typename T> 94 bool output_is_ready(T &/*p*/) { return true; } 95 96 template<typename U> 97 bool output_is_ready(CheckType<U> &p) { return p.is_ready(); } 98 99 template<typename T> 100 int middle_my_id( T &/*p*/) { return 0; } 101 102 template<typename U> 103 int middle_my_id(CheckType<U> &p) { return p.id(); } 104 105 template<typename T> 106 int output_my_id( T &/*p*/) { return 1; } 107 108 template<typename U> 109 int output_my_id(CheckType<U> &p) { return p.id(); } 110 111 template<typename T> 112 void my_function(T &p) { p = 0; } 113 114 template<typename U> 115 void my_function(CheckType<U> &p) { p.get_ready(); } 116 117 // Filters must be copy-constructible, and be const-qualifiable. 118 template<typename U> 119 class input_filter : DestroyedTracker { 120 public: 121 U operator()( tbb::flow_control& control ) const { 122 CHECK(is_alive()); 123 if( --input_counter < 0 ) { 124 control.stop(); 125 } 126 else // only count successful reads 127 ++non_pointer_specialized_calls; 128 return U(); // default constructed 129 } 130 131 }; 132 133 // specialization for pointer 134 template<typename U> 135 class input_filter<U*> : DestroyedTracker { 136 public: 137 U* operator()(tbb::flow_control& control) const { 138 CHECK(is_alive()); 139 int ival = --input_counter; 140 if(ival < 0) { 141 control.stop(); 142 return nullptr; 143 } 144 ++pointer_specialized_calls; 145 if(ival == max_counter / 2) { 146 return nullptr; // non-stop nullptr 147 } 148 U* myReturn = new(fetchNextBuffer()) U(); 149 if (myReturn) { // may have been passed in a nullptr 150 CHECK_MESSAGE(!middle_my_id(*myReturn), "bad id value, p:" << concurrency); 151 CHECK_MESSAGE(!middle_is_ready(*myReturn), "Already ready, p:" << concurrency); 152 } 153 return myReturn; 154 } 155 }; 156 157 template<> 158 class input_filter<void> : DestroyedTracker { 159 public: 160 void operator()( tbb::flow_control& control ) const { 161 CHECK(is_alive()); 162 if( --input_counter < 0 ) { 163 control.stop(); 164 } 165 else 166 ++non_pointer_specialized_calls; 167 } 168 169 }; 170 171 // specialization for int that passes back a sequence of integers 172 template<> 173 class input_filter<int> : DestroyedTracker { 174 public: 175 int 176 operator()(tbb::flow_control& control ) const { 177 CHECK(is_alive()); 178 int oldval = --input_counter; 179 if( oldval < 0 ) { 180 control.stop(); 181 } 182 else 183 ++non_pointer_specialized_calls; 184 return oldval+1; 185 } 186 }; 187 188 template<typename T, typename U> 189 class middle_filter : DestroyedTracker { 190 public: 191 U operator()(T t) const { 192 CHECK(is_alive()); 193 CHECK_MESSAGE(!middle_my_id(t), "bad id value, p:" << concurrency); 194 CHECK_MESSAGE(!middle_is_ready(t), "Already ready, p:" << concurrency ); 195 U out; 196 my_function(out); 197 ++non_pointer_specialized_calls; 198 return out; 199 } 200 }; 201 202 template<typename T, typename U> 203 class middle_filter<T*,U> : DestroyedTracker { 204 public: 205 U operator()(T* my_storage) const { 206 free_on_scope_exit<T> my_ptr(my_storage); // free_on_scope_exit marks the buffer available 207 CHECK(is_alive()); 208 if(my_storage) { // may have been passed in a nullptr 209 CHECK_MESSAGE(!middle_my_id(*my_storage), "bad id value, p:" << concurrency); 210 CHECK_MESSAGE(!middle_is_ready(*my_storage), "Already ready, p:" << concurrency ); 211 } 212 ++first_pointer_specialized_calls; 213 U out; 214 my_function(out); 215 return out; 216 } 217 }; 218 219 template<typename T, typename U> 220 class middle_filter<T,U*> : DestroyedTracker { 221 public: 222 U* operator()(T my_storage) const { 223 CHECK(is_alive()); 224 CHECK_MESSAGE(!middle_my_id(my_storage), "bad id value, p:" << concurrency); 225 CHECK_MESSAGE(!middle_is_ready(my_storage), "Already ready, p:" << concurrency ); 226 // allocate new space from buffers 227 U* my_return = new(fetchNextBuffer()) U(); 228 my_function(*my_return); 229 ++second_pointer_specialized_calls; 230 return my_return; 231 } 232 }; 233 234 template<typename T, typename U> 235 class middle_filter<T*,U*> : DestroyedTracker { 236 public: 237 U* operator()(T* my_storage) const { 238 free_on_scope_exit<T> my_ptr(my_storage); // free_on_scope_exit marks the buffer available 239 CHECK(is_alive()); 240 if(my_storage) { 241 CHECK_MESSAGE(!middle_my_id(*my_storage), "bad id value, p:" << concurrency); 242 CHECK_MESSAGE(!middle_is_ready(*my_storage), "Already ready, p:" << concurrency ); 243 } 244 // may have been passed a nullptr 245 ++pointer_specialized_calls; 246 if(!my_storage) return nullptr; 247 CHECK_MESSAGE(!middle_my_id(*my_storage), "bad id value, p:" << concurrency); 248 CHECK_MESSAGE(!middle_is_ready(*my_storage), "Already ready, p:" << concurrency ); 249 U* my_return = new(fetchNextBuffer()) U(); 250 my_function(*my_return); 251 return my_return; 252 } 253 }; 254 255 // specialization for int that squares the input and returns that. 256 template<> 257 class middle_filter<int,int> : DestroyedTracker { 258 public: 259 int operator()(int my_input) const { 260 CHECK(is_alive()); 261 ++non_pointer_specialized_calls; 262 return my_input*my_input; 263 } 264 }; 265 266 // --------------------------------- 267 template<typename T> 268 class output_filter : DestroyedTracker { 269 public: 270 void operator()(T c) const { 271 CHECK(is_alive()); 272 CHECK_MESSAGE(output_my_id(c), "unset id value, p:" << concurrency); 273 CHECK_MESSAGE(output_is_ready(c), "not yet ready, p:" << concurrency); 274 ++non_pointer_specialized_calls; 275 output_counter++; 276 } 277 }; 278 279 // specialization for int that puts the received value in an array 280 template<> 281 class output_filter<int> : DestroyedTracker { 282 public: 283 void operator()(int my_input) const { 284 CHECK(is_alive()); 285 ++non_pointer_specialized_calls; 286 int myindx = output_counter++; 287 intbuffer[myindx] = my_input; 288 } 289 }; 290 291 template<typename T> 292 class output_filter<T*> : DestroyedTracker { 293 public: 294 void operator()(T* c) const { 295 free_on_scope_exit<T> my_ptr(c); 296 CHECK(is_alive()); 297 if(c) { 298 CHECK_MESSAGE(output_my_id(*c), "unset id value, p:" << concurrency); 299 CHECK_MESSAGE(output_is_ready(*c), "not yet ready, p:" << concurrency); 300 } 301 output_counter++; 302 ++pointer_specialized_calls; 303 } 304 }; 305 306 typedef enum { 307 no_pointer_counts, 308 assert_nonpointer, 309 assert_firstpointer, 310 assert_secondpointer, 311 assert_allpointer 312 } final_assert_type; 313 314 void resetCounters() { 315 output_counter = 0; 316 input_counter = max_counter; 317 non_pointer_specialized_calls = 0; 318 pointer_specialized_calls = 0; 319 first_pointer_specialized_calls = 0; 320 second_pointer_specialized_calls = 0; 321 // we have to reset the buffer flags because our input filters return allocated space on end-of-input, 322 // (on eof a default-constructed object is returned) and they do not pass through the filter further. 323 for(size_t i = 0; i < n_buffers; ++i) 324 buf_in_use[i].clear(); 325 } 326 327 void checkCounters(final_assert_type my_t) { 328 CHECK_MESSAGE(output_counter == max_counter, "Ran out of buffers, p:" << concurrency); 329 switch(my_t) { 330 case assert_nonpointer: 331 CHECK_MESSAGE(pointer_specialized_calls+first_pointer_specialized_calls+second_pointer_specialized_calls == 0, "non-pointer filters specialized to pointer, p:" << concurrency); 332 CHECK_MESSAGE(non_pointer_specialized_calls == 3*max_counter, "bad count for non-pointer filters, p:" << concurrency); 333 if(check_intbuffer) { 334 for(int i = 1; i <= max_counter; ++i) { 335 int j = i*i; 336 bool found_val = false; 337 for(int k = 0; k < max_counter; ++k) { 338 if(intbuffer[k] == j) { 339 found_val = true; 340 break; 341 } 342 } 343 CHECK_MESSAGE(found_val, "Missing value in output array, p:" << concurrency ); 344 } 345 } 346 break; 347 case assert_firstpointer: 348 { 349 bool check = pointer_specialized_calls == max_counter && // input filter extra invocation 350 first_pointer_specialized_calls == max_counter && 351 non_pointer_specialized_calls == max_counter && 352 second_pointer_specialized_calls == 0; 353 CHECK_MESSAGE(check, "incorrect specialization for firstpointer, p:" << concurrency); 354 } 355 break; 356 case assert_secondpointer: 357 { 358 bool check = pointer_specialized_calls == max_counter && 359 first_pointer_specialized_calls == 0 && 360 non_pointer_specialized_calls == max_counter && // input filter 361 second_pointer_specialized_calls == max_counter; 362 CHECK_MESSAGE(check, "incorrect specialization for firstpointer, p:" << concurrency); 363 } 364 break; 365 case assert_allpointer: 366 CHECK_MESSAGE(non_pointer_specialized_calls+first_pointer_specialized_calls+second_pointer_specialized_calls == 0, "pointer filters specialized to non-pointer, p:" << concurrency); 367 CHECK_MESSAGE(pointer_specialized_calls == 3*max_counter, "bad count for pointer filters, p:" << concurrency); 368 break; 369 case no_pointer_counts: 370 break; 371 } 372 } 373 374 static const tbb::filter_mode filter_table[] = { tbb::filter_mode::parallel, tbb::filter_mode::serial_in_order, tbb::filter_mode::serial_out_of_order}; 375 const unsigned number_of_filter_types = sizeof(filter_table)/sizeof(filter_table[0]); 376 377 using filter_chain = tbb::filter<void, void>; 378 using mode_array =tbb::filter_mode; 379 380 // The filters are passed by value, which forces a temporary copy to be created. This is 381 // to reproduce the bug where a filter_chain uses refs to filters, which after a call 382 // would be references to destructed temporaries. 383 template<typename type1, typename type2> 384 void fill_chain( filter_chain &my_chain, mode_array *filter_type, input_filter<type1> i_filter, 385 middle_filter<type1, type2> m_filter, output_filter<type2> o_filter ) { 386 my_chain = tbb::filter<void, type1>(filter_type[0], i_filter) & 387 tbb::filter<type1, type2>(filter_type[1], m_filter) & 388 tbb::filter<type2, void>(filter_type[2], o_filter); 389 } 390 391 template<typename... Context> 392 void run_function_spec(Context&... context) { 393 CHECK_MESSAGE(!filter_node_count, "invalid filter_node counter"); 394 input_filter<void> i_filter; 395 // Test pipeline that contains only one filter 396 for( unsigned i = 0; i<number_of_filter_types; i++) { 397 tbb::filter<void, void> one_filter( filter_table[i], i_filter ); 398 CHECK_MESSAGE(filter_node_count==1, "some filter nodes left after previous iteration?"); 399 resetCounters(); 400 tbb::parallel_pipeline( n_tokens, one_filter, context... ); 401 // no need to check counters 402 std::atomic<int> counter; 403 counter = max_counter; 404 // Construct filter using lambda-syntax when parallel_pipeline() is being run; 405 tbb::parallel_pipeline( n_tokens, 406 tbb::filter<void, void>(filter_table[i], [&counter]( tbb::flow_control& control ) { 407 if( counter-- == 0 ) 408 control.stop(); 409 } 410 ), 411 context... 412 ); 413 } 414 CHECK_MESSAGE(!filter_node_count, "filter_node objects leaked"); 415 } 416 417 template<typename t1, typename t2, typename... Context> 418 void run_filter_set( 419 input_filter<t1>& i_filter, 420 middle_filter<t1,t2>& m_filter, 421 output_filter<t2>& o_filter, 422 mode_array *filter_type, 423 final_assert_type my_t, 424 Context&... context) { 425 tbb::filter<void, t1> filter1( filter_type[0], i_filter ); 426 tbb::filter<t1, t2> filter2( filter_type[1], m_filter ); 427 tbb::filter<t2, void> filter3( filter_type[2], o_filter ); 428 429 CHECK_MESSAGE(filter_node_count==3, "some filter nodes left after previous iteration?"); 430 resetCounters(); 431 // Create filters sequence when parallel_pipeline() is being run 432 tbb::parallel_pipeline( n_tokens, filter1, filter2, filter3, context... ); 433 checkCounters(my_t); 434 435 // Create filters sequence partially outside parallel_pipeline() and also when parallel_pipeline() is being run 436 tbb::filter<void, t2> filter12; 437 filter12 = filter1 & filter2; 438 for( int i = 0; i<3; i++) 439 { 440 filter12 &= tbb::filter<t2,t2>(filter_type[i], [](t2 x) -> t2 { return x;}); 441 } 442 resetCounters(); 443 tbb::parallel_pipeline( n_tokens, filter12, filter3, context... ); 444 checkCounters(my_t); 445 446 tbb::filter<void, void> filter123 = filter12 & filter3; 447 // Run pipeline twice with the same filter sequence 448 for( unsigned i = 0; i<2; i++ ) { 449 resetCounters(); 450 tbb::parallel_pipeline( n_tokens, filter123, context... ); 451 checkCounters(my_t); 452 } 453 454 // Now copy-and-move-construct another filter instance, and use it to run pipeline 455 { 456 tbb::filter<void, void> copy123( filter123 ); 457 resetCounters(); 458 tbb::parallel_pipeline( n_tokens, copy123, context... ); 459 checkCounters(my_t); 460 tbb::filter<void, void> move123( std::move(filter123) ); 461 resetCounters(); 462 tbb::parallel_pipeline( n_tokens, move123, context... ); 463 checkCounters(my_t); 464 } 465 466 // Construct filters and create the sequence when parallel_pipeline() is being run 467 resetCounters(); 468 tbb::parallel_pipeline( n_tokens, 469 tbb::filter<void, t1>(filter_type[0], i_filter), 470 tbb::filter<t1, t2>(filter_type[1], m_filter), 471 tbb::filter<t2, void>(filter_type[2], o_filter), 472 context... ); 473 checkCounters(my_t); 474 475 // Construct filters, make a copy, destroy the original filters, and run with the copy 476 int cnt = filter_node_count; 477 { 478 tbb::filter<void, void>* p123 = new tbb::filter<void,void> ( 479 tbb::filter<void, t1>(filter_type[0], i_filter)& 480 tbb::filter<t1, t2>(filter_type[1], m_filter)& 481 tbb::filter<t2, void>(filter_type[2], o_filter) ); 482 CHECK_MESSAGE(filter_node_count==cnt+5, "filter node accounting error?"); 483 tbb::filter<void, void> copy123( *p123 ); 484 delete p123; 485 CHECK_MESSAGE(filter_node_count==cnt+5, "filter nodes deleted prematurely?"); 486 resetCounters(); 487 tbb::parallel_pipeline( n_tokens, copy123, context... ); 488 checkCounters(my_t); 489 } 490 491 // construct a filter with temporaries 492 { 493 tbb::filter<void, void> my_filter; 494 fill_chain<t1,t2>( my_filter, filter_type, i_filter, m_filter, o_filter ); 495 resetCounters(); 496 tbb::parallel_pipeline( n_tokens, my_filter, context... ); 497 checkCounters(my_t); 498 } 499 CHECK_MESSAGE(filter_node_count==cnt, "scope ended but filter nodes not deleted?"); 500 } 501 502 template <typename t1, typename t2, typename... Context> 503 void run_lambdas_test( mode_array *filter_type, Context&... context ) { 504 std::atomic<int> counter; 505 counter = max_counter; 506 // Construct filters using lambda-syntax and create the sequence when parallel_pipeline() is being run; 507 resetCounters(); // only need the output_counter reset. 508 tbb::parallel_pipeline( n_tokens, 509 tbb::make_filter<void, t1>(filter_type[0], [&counter]( tbb::flow_control& control ) -> t1 { 510 if( --counter < 0 ) 511 control.stop(); 512 return t1(); } 513 ), 514 tbb::make_filter<t1, t2>(filter_type[1], []( t1 /*my_storage*/ ) -> t2 { 515 return t2(); } 516 ), 517 tbb::make_filter<t2,void>(filter_type[2], [] ( t2 ) -> void { 518 output_counter++; } 519 ), 520 context... 521 ); 522 checkCounters(no_pointer_counts); // don't have to worry about specializations 523 counter = max_counter; 524 // pointer filters 525 resetCounters(); 526 tbb::parallel_pipeline( n_tokens, 527 tbb::filter<void,t1*>(filter_type[0], [&counter]( tbb::flow_control& control ) -> t1* { 528 if( --counter < 0 ) { 529 control.stop(); 530 return nullptr; 531 } 532 return new(fetchNextBuffer()) t1(); } 533 ), 534 tbb::filter<t1*, t2*>(filter_type[1], []( t1* my_storage ) -> t2* { 535 my_storage->~t1(); 536 return new(my_storage) t2(); } 537 ), 538 tbb::filter<t2*, void>(filter_type[2], [] ( t2* my_storage ) -> void { 539 my_storage->~t2(); 540 freeBuffer(my_storage); 541 output_counter++; } 542 ), 543 context... 544 ); 545 checkCounters(no_pointer_counts); 546 // first filter outputs pointer 547 counter = max_counter; 548 resetCounters(); 549 tbb::parallel_pipeline( n_tokens, 550 tbb::make_filter(filter_type[0], [&counter]( tbb::flow_control& control ) -> t1* { 551 if( --counter < 0 ) { 552 control.stop(); 553 return nullptr; 554 } 555 return new(fetchNextBuffer()) t1(); } 556 )& 557 tbb::make_filter(filter_type[1], []( t1* my_storage ) -> t2 { 558 my_storage->~t1(); 559 freeBuffer(my_storage); 560 return t2(); } 561 ), 562 tbb::make_filter(filter_type[2], [] ( t2 /*my_storage*/) -> void { 563 output_counter++; } 564 ), 565 context... 566 ); 567 checkCounters(no_pointer_counts); 568 // second filter outputs pointer 569 counter = max_counter; 570 resetCounters(); 571 tbb::parallel_pipeline( n_tokens, 572 tbb::make_filter(filter_type[0], [&counter]( tbb::flow_control& control ) -> t1 { 573 if( --counter < 0 ) { 574 control.stop(); 575 } 576 return t1(); } 577 ), 578 tbb::filter<t1, t2*>(filter_type[1], []( t1 /*my_storage*/ ) -> t2* { 579 return new(fetchNextBuffer()) t2(); } 580 )& 581 tbb::make_filter<t2*, void>(filter_type[2], [] ( t2* my_storage) -> void { 582 my_storage->~t2(); 583 freeBuffer(my_storage); 584 output_counter++; } 585 ), 586 context... 587 ); 588 checkCounters(no_pointer_counts); 589 } 590 591 template<typename type1, typename type2> 592 void run_function(const char *l1, const char *l2) { 593 CHECK_MESSAGE(!filter_node_count, "invalid filter_node counter"); 594 595 check_intbuffer = (!strcmp(l1,"int") && !strcmp(l2,"int")); 596 597 Checker<type1> check1; // check constructions/destructions 598 Checker<type2> check2; // for type1 or type2 === CheckType<T> 599 600 const size_t number_of_filters = 3; 601 602 input_filter<type1> i_filter; 603 input_filter<type1*> p_i_filter; 604 605 middle_filter<type1, type2> m_filter; 606 middle_filter<type1*, type2> pr_m_filter; 607 middle_filter<type1, type2*> rp_m_filter; 608 middle_filter<type1*, type2*> pp_m_filter; 609 610 output_filter<type2> o_filter; 611 output_filter<type2*> p_o_filter; 612 613 // allocate the buffers for the filters 614 unsigned max_size = (sizeof(type1) > sizeof(type2) ) ? sizeof(type1) : sizeof(type2); 615 for(unsigned i = 0; i < (unsigned)n_buffers; ++i) { 616 buffers[i] = malloc(max_size); 617 buf_in_use[i].clear(); 618 } 619 620 unsigned limit = 1; 621 // Test pipeline that contains number_of_filters filters 622 for( unsigned i=0; i<number_of_filters; ++i) 623 limit *= number_of_filter_types; 624 // Iterate over possible filter sequences 625 for( unsigned numeral=0; numeral<limit; ++numeral ) { 626 unsigned temp = numeral; 627 tbb::filter_mode filter_type[number_of_filter_types]; 628 for( unsigned i=0; i<number_of_filters; ++i, temp/=number_of_filter_types ) 629 filter_type[i] = filter_table[temp%number_of_filter_types]; 630 631 tbb::task_group_context context; 632 run_filter_set<type1,type2>(i_filter, m_filter, o_filter, filter_type, assert_nonpointer); 633 run_filter_set<type1,type2>(i_filter, m_filter, o_filter, filter_type, assert_nonpointer, context); 634 run_filter_set<type1*,type2>(p_i_filter, pr_m_filter, o_filter, filter_type, assert_firstpointer); 635 run_filter_set<type1*,type2>(p_i_filter, pr_m_filter, o_filter, filter_type, assert_firstpointer, context); 636 run_filter_set<type1,type2*>(i_filter, rp_m_filter, p_o_filter, filter_type, assert_secondpointer); 637 run_filter_set<type1,type2*>(i_filter, rp_m_filter, p_o_filter, filter_type, assert_secondpointer, context); 638 run_filter_set<type1*,type2*>(p_i_filter, pp_m_filter, p_o_filter, filter_type, assert_allpointer); 639 run_filter_set<type1*,type2*>(p_i_filter, pp_m_filter, p_o_filter, filter_type, assert_allpointer, context); 640 641 run_lambdas_test<type1,type2>(filter_type); 642 run_lambdas_test<type1,type2>(filter_type, context); 643 } 644 CHECK_MESSAGE(!filter_node_count, "filter_node objects leaked"); 645 646 for(unsigned i = 0; i < (unsigned)n_buffers; ++i) { 647 free(buffers[i]); 648 } 649 } 650 651 //! Testing single filter pipeline 652 //! \brief \ref error_guessing 653 TEST_CASE("Pipeline testing for single filter") { 654 run_function_spec(); 655 tbb::task_group_context context; 656 run_function_spec(context); 657 } 658 659 #define RUN_TYPED_TEST_CASE(type1, type2) TEST_CASE("Pipeline testing with "#type1" and "#type2) { \ 660 for ( std::size_t concurrency_level : {1, 2, 4, 5, 7, 8} ) { \ 661 if ( concurrency_level > tbb::global_control::active_value(tbb::global_control::max_allowed_parallelism) ) \ 662 break; \ 663 concurrency = concurrency_level; \ 664 tbb::global_control control(tbb::global_control::max_allowed_parallelism, concurrency_level); \ 665 run_function<type1, type2>(#type1, #type2); \ 666 } \ 667 } 668 // Run test several times with different types 669 RUN_TYPED_TEST_CASE(std::size_t, int) 670 RUN_TYPED_TEST_CASE(int, double) 671 RUN_TYPED_TEST_CASE(std::size_t, double) 672 RUN_TYPED_TEST_CASE(std::size_t, bool) 673 RUN_TYPED_TEST_CASE(int, int) 674 RUN_TYPED_TEST_CASE(CheckType<unsigned int>, std::size_t) 675 RUN_TYPED_TEST_CASE(CheckType<unsigned short>, std::size_t) 676 RUN_TYPED_TEST_CASE(CheckType<unsigned int>, CheckType<unsigned int>) 677 RUN_TYPED_TEST_CASE(CheckType<unsigned int>, CheckType<unsigned short>) 678 RUN_TYPED_TEST_CASE(CheckType<unsigned short>, CheckType<unsigned short>) 679 RUN_TYPED_TEST_CASE(double, CheckType<unsigned short>) 680 RUN_TYPED_TEST_CASE(std::unique_ptr<int>, std::unique_ptr<int>) // move-only type 681 682 #undef RUN_TYPED_TEST_CASE 683