151c0b2f7Stbbdev /*
2*c21e688aSSergey Zheltov Copyright (c) 2005-2022 Intel Corporation
351c0b2f7Stbbdev
451c0b2f7Stbbdev Licensed under the Apache License, Version 2.0 (the "License");
551c0b2f7Stbbdev you may not use this file except in compliance with the License.
651c0b2f7Stbbdev You may obtain a copy of the License at
751c0b2f7Stbbdev
851c0b2f7Stbbdev http://www.apache.org/licenses/LICENSE-2.0
951c0b2f7Stbbdev
1051c0b2f7Stbbdev Unless required by applicable law or agreed to in writing, software
1151c0b2f7Stbbdev distributed under the License is distributed on an "AS IS" BASIS,
1251c0b2f7Stbbdev WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1351c0b2f7Stbbdev See the License for the specific language governing permissions and
1451c0b2f7Stbbdev limitations under the License.
1551c0b2f7Stbbdev */
1651c0b2f7Stbbdev
1751c0b2f7Stbbdev // Before including parallel_pipeline.h, set up the variable to count heap allocated
1851c0b2f7Stbbdev // filter_node objects, and make it known for the header.
1951c0b2f7Stbbdev #include "common/test.h"
2051c0b2f7Stbbdev #include "common/utils.h"
2151c0b2f7Stbbdev #include "common/checktype.h"
2251c0b2f7Stbbdev
2351c0b2f7Stbbdev int filter_node_count = 0;
2451c0b2f7Stbbdev #define __TBB_TEST_FILTER_NODE_COUNT filter_node_count
2551c0b2f7Stbbdev #include "tbb/parallel_pipeline.h"
2651c0b2f7Stbbdev #include "tbb/global_control.h"
2751c0b2f7Stbbdev #include "tbb/spin_mutex.h"
2851c0b2f7Stbbdev #include "tbb/task_group.h"
2951c0b2f7Stbbdev
3051c0b2f7Stbbdev #include <atomic>
3151c0b2f7Stbbdev #include <string.h>
3251c0b2f7Stbbdev #include <memory> // std::unique_ptr
3351c0b2f7Stbbdev
3451c0b2f7Stbbdev //! \file test_parallel_pipeline.cpp
3551c0b2f7Stbbdev //! \brief Test for [algorithms.parallel_pipeline algorithms.parallel_pipeline.flow_control] specification
3651c0b2f7Stbbdev
3751c0b2f7Stbbdev const unsigned n_tokens = 8;
3851c0b2f7Stbbdev // we can conceivably have two buffers used in the middle filter for every token in flight, so
3951c0b2f7Stbbdev // we must allocate two buffers for every token. Unlikely, but possible.
4051c0b2f7Stbbdev const unsigned n_buffers = 2*n_tokens;
4151c0b2f7Stbbdev const int max_counter = 16;
4251c0b2f7Stbbdev
4351c0b2f7Stbbdev static std::size_t concurrency = 0;
4451c0b2f7Stbbdev
4551c0b2f7Stbbdev static std::atomic<int> output_counter;
4651c0b2f7Stbbdev static std::atomic<int> input_counter;
4751c0b2f7Stbbdev static std::atomic<int> non_pointer_specialized_calls;
4851c0b2f7Stbbdev static std::atomic<int> pointer_specialized_calls;
4951c0b2f7Stbbdev static std::atomic<int> first_pointer_specialized_calls;
5051c0b2f7Stbbdev static std::atomic<int> second_pointer_specialized_calls;
5151c0b2f7Stbbdev
5251c0b2f7Stbbdev static int intbuffer[max_counter]; // store results for <int,int> parallel pipeline test
5351c0b2f7Stbbdev static bool check_intbuffer;
5451c0b2f7Stbbdev
5551c0b2f7Stbbdev static void* buffers[n_buffers];
5651c0b2f7Stbbdev static std::atomic_flag buf_in_use[n_buffers] = {ATOMIC_FLAG_INIT};
5751c0b2f7Stbbdev
fetchNextBuffer()5851c0b2f7Stbbdev void *fetchNextBuffer() {
5951c0b2f7Stbbdev for(size_t icnt = 0; icnt < n_buffers; ++icnt) {
6051c0b2f7Stbbdev if(!buf_in_use[icnt].test_and_set()) {
6151c0b2f7Stbbdev return buffers[icnt];
6251c0b2f7Stbbdev }
6351c0b2f7Stbbdev }
6451c0b2f7Stbbdev CHECK_MESSAGE(false, "Ran out of buffers, p:"<< concurrency);
6557f524caSIlya Isaev return nullptr;
6651c0b2f7Stbbdev }
freeBuffer(void * buf)6751c0b2f7Stbbdev void freeBuffer(void *buf) {
6851c0b2f7Stbbdev for(size_t i=0; i < n_buffers;++i) {
6951c0b2f7Stbbdev if(buffers[i] == buf) {
7051c0b2f7Stbbdev buf_in_use[i].clear();
7151c0b2f7Stbbdev return;
7251c0b2f7Stbbdev }
7351c0b2f7Stbbdev }
7451c0b2f7Stbbdev CHECK_MESSAGE(false, "Tried to free a buffer not in our list, p:" << concurrency);
7551c0b2f7Stbbdev }
7651c0b2f7Stbbdev
7751c0b2f7Stbbdev template<typename T>
7851c0b2f7Stbbdev class free_on_scope_exit {
7951c0b2f7Stbbdev public:
free_on_scope_exit(T * p)8051c0b2f7Stbbdev free_on_scope_exit(T *p) : my_p(p) {}
~free_on_scope_exit()8151c0b2f7Stbbdev ~free_on_scope_exit() { if(!my_p) return; my_p->~T(); freeBuffer(my_p); }
8251c0b2f7Stbbdev private:
8351c0b2f7Stbbdev T *my_p;
8451c0b2f7Stbbdev };
8551c0b2f7Stbbdev
8651c0b2f7Stbbdev // methods for testing CheckType< >, that return okay values for other types.
8751c0b2f7Stbbdev template<typename T>
middle_is_ready(T &)8851c0b2f7Stbbdev bool middle_is_ready(T &/*p*/) { return false; }
8951c0b2f7Stbbdev
9051c0b2f7Stbbdev template<typename U>
middle_is_ready(CheckType<U> & p)9151c0b2f7Stbbdev bool middle_is_ready(CheckType<U> &p) { return p.is_ready(); }
9251c0b2f7Stbbdev
9351c0b2f7Stbbdev template<typename T>
output_is_ready(T &)9451c0b2f7Stbbdev bool output_is_ready(T &/*p*/) { return true; }
9551c0b2f7Stbbdev
9651c0b2f7Stbbdev template<typename U>
output_is_ready(CheckType<U> & p)9751c0b2f7Stbbdev bool output_is_ready(CheckType<U> &p) { return p.is_ready(); }
9851c0b2f7Stbbdev
9951c0b2f7Stbbdev template<typename T>
middle_my_id(T &)10051c0b2f7Stbbdev int middle_my_id( T &/*p*/) { return 0; }
10151c0b2f7Stbbdev
10251c0b2f7Stbbdev template<typename U>
middle_my_id(CheckType<U> & p)10351c0b2f7Stbbdev int middle_my_id(CheckType<U> &p) { return p.id(); }
10451c0b2f7Stbbdev
10551c0b2f7Stbbdev template<typename T>
output_my_id(T &)10651c0b2f7Stbbdev int output_my_id( T &/*p*/) { return 1; }
10751c0b2f7Stbbdev
10851c0b2f7Stbbdev template<typename U>
output_my_id(CheckType<U> & p)10951c0b2f7Stbbdev int output_my_id(CheckType<U> &p) { return p.id(); }
11051c0b2f7Stbbdev
11151c0b2f7Stbbdev template<typename T>
my_function(T & p)11251c0b2f7Stbbdev void my_function(T &p) { p = 0; }
11351c0b2f7Stbbdev
11451c0b2f7Stbbdev template<typename U>
my_function(CheckType<U> & p)11551c0b2f7Stbbdev void my_function(CheckType<U> &p) { p.get_ready(); }
11651c0b2f7Stbbdev
11751c0b2f7Stbbdev // Filters must be copy-constructible, and be const-qualifiable.
11851c0b2f7Stbbdev template<typename U>
11951c0b2f7Stbbdev class input_filter : DestroyedTracker {
12051c0b2f7Stbbdev public:
operator ()(tbb::flow_control & control) const12151c0b2f7Stbbdev U operator()( tbb::flow_control& control ) const {
12251c0b2f7Stbbdev CHECK(is_alive());
12351c0b2f7Stbbdev if( --input_counter < 0 ) {
12451c0b2f7Stbbdev control.stop();
12551c0b2f7Stbbdev }
12651c0b2f7Stbbdev else // only count successful reads
12751c0b2f7Stbbdev ++non_pointer_specialized_calls;
12851c0b2f7Stbbdev return U(); // default constructed
12951c0b2f7Stbbdev }
13051c0b2f7Stbbdev
13151c0b2f7Stbbdev };
13251c0b2f7Stbbdev
13351c0b2f7Stbbdev // specialization for pointer
13451c0b2f7Stbbdev template<typename U>
13551c0b2f7Stbbdev class input_filter<U*> : DestroyedTracker {
13651c0b2f7Stbbdev public:
operator ()(tbb::flow_control & control) const13751c0b2f7Stbbdev U* operator()(tbb::flow_control& control) const {
13851c0b2f7Stbbdev CHECK(is_alive());
13951c0b2f7Stbbdev int ival = --input_counter;
14051c0b2f7Stbbdev if(ival < 0) {
14151c0b2f7Stbbdev control.stop();
14251c0b2f7Stbbdev return nullptr;
14351c0b2f7Stbbdev }
14451c0b2f7Stbbdev ++pointer_specialized_calls;
14551c0b2f7Stbbdev if(ival == max_counter / 2) {
14657f524caSIlya Isaev return nullptr; // non-stop nullptr
14751c0b2f7Stbbdev }
14851c0b2f7Stbbdev U* myReturn = new(fetchNextBuffer()) U();
14957f524caSIlya Isaev if (myReturn) { // may have been passed in a nullptr
15051c0b2f7Stbbdev CHECK_MESSAGE(!middle_my_id(*myReturn), "bad id value, p:" << concurrency);
15151c0b2f7Stbbdev CHECK_MESSAGE(!middle_is_ready(*myReturn), "Already ready, p:" << concurrency);
15251c0b2f7Stbbdev }
15351c0b2f7Stbbdev return myReturn;
15451c0b2f7Stbbdev }
15551c0b2f7Stbbdev };
15651c0b2f7Stbbdev
15751c0b2f7Stbbdev template<>
15851c0b2f7Stbbdev class input_filter<void> : DestroyedTracker {
15951c0b2f7Stbbdev public:
operator ()(tbb::flow_control & control) const16051c0b2f7Stbbdev void operator()( tbb::flow_control& control ) const {
16151c0b2f7Stbbdev CHECK(is_alive());
16251c0b2f7Stbbdev if( --input_counter < 0 ) {
16351c0b2f7Stbbdev control.stop();
16451c0b2f7Stbbdev }
16551c0b2f7Stbbdev else
16651c0b2f7Stbbdev ++non_pointer_specialized_calls;
16751c0b2f7Stbbdev }
16851c0b2f7Stbbdev
16951c0b2f7Stbbdev };
17051c0b2f7Stbbdev
17151c0b2f7Stbbdev // specialization for int that passes back a sequence of integers
17251c0b2f7Stbbdev template<>
17351c0b2f7Stbbdev class input_filter<int> : DestroyedTracker {
17451c0b2f7Stbbdev public:
17551c0b2f7Stbbdev int
operator ()(tbb::flow_control & control) const17651c0b2f7Stbbdev operator()(tbb::flow_control& control ) const {
17751c0b2f7Stbbdev CHECK(is_alive());
17851c0b2f7Stbbdev int oldval = --input_counter;
17951c0b2f7Stbbdev if( oldval < 0 ) {
18051c0b2f7Stbbdev control.stop();
18151c0b2f7Stbbdev }
18251c0b2f7Stbbdev else
18351c0b2f7Stbbdev ++non_pointer_specialized_calls;
18451c0b2f7Stbbdev return oldval+1;
18551c0b2f7Stbbdev }
18651c0b2f7Stbbdev };
18751c0b2f7Stbbdev
18851c0b2f7Stbbdev template<typename T, typename U>
18951c0b2f7Stbbdev class middle_filter : DestroyedTracker {
19051c0b2f7Stbbdev public:
operator ()(T t) const19151c0b2f7Stbbdev U operator()(T t) const {
19251c0b2f7Stbbdev CHECK(is_alive());
19351c0b2f7Stbbdev CHECK_MESSAGE(!middle_my_id(t), "bad id value, p:" << concurrency);
19451c0b2f7Stbbdev CHECK_MESSAGE(!middle_is_ready(t), "Already ready, p:" << concurrency );
19551c0b2f7Stbbdev U out;
19651c0b2f7Stbbdev my_function(out);
19751c0b2f7Stbbdev ++non_pointer_specialized_calls;
19851c0b2f7Stbbdev return out;
19951c0b2f7Stbbdev }
20051c0b2f7Stbbdev };
20151c0b2f7Stbbdev
20251c0b2f7Stbbdev template<typename T, typename U>
20351c0b2f7Stbbdev class middle_filter<T*,U> : DestroyedTracker {
20451c0b2f7Stbbdev public:
operator ()(T * my_storage) const20551c0b2f7Stbbdev U operator()(T* my_storage) const {
20651c0b2f7Stbbdev free_on_scope_exit<T> my_ptr(my_storage); // free_on_scope_exit marks the buffer available
20751c0b2f7Stbbdev CHECK(is_alive());
20857f524caSIlya Isaev if(my_storage) { // may have been passed in a nullptr
20951c0b2f7Stbbdev CHECK_MESSAGE(!middle_my_id(*my_storage), "bad id value, p:" << concurrency);
21051c0b2f7Stbbdev CHECK_MESSAGE(!middle_is_ready(*my_storage), "Already ready, p:" << concurrency );
21151c0b2f7Stbbdev }
21251c0b2f7Stbbdev ++first_pointer_specialized_calls;
21351c0b2f7Stbbdev U out;
21451c0b2f7Stbbdev my_function(out);
21551c0b2f7Stbbdev return out;
21651c0b2f7Stbbdev }
21751c0b2f7Stbbdev };
21851c0b2f7Stbbdev
21951c0b2f7Stbbdev template<typename T, typename U>
22051c0b2f7Stbbdev class middle_filter<T,U*> : DestroyedTracker {
22151c0b2f7Stbbdev public:
operator ()(T my_storage) const22251c0b2f7Stbbdev U* operator()(T my_storage) const {
22351c0b2f7Stbbdev CHECK(is_alive());
22451c0b2f7Stbbdev CHECK_MESSAGE(!middle_my_id(my_storage), "bad id value, p:" << concurrency);
22551c0b2f7Stbbdev CHECK_MESSAGE(!middle_is_ready(my_storage), "Already ready, p:" << concurrency );
22651c0b2f7Stbbdev // allocate new space from buffers
22751c0b2f7Stbbdev U* my_return = new(fetchNextBuffer()) U();
22851c0b2f7Stbbdev my_function(*my_return);
22951c0b2f7Stbbdev ++second_pointer_specialized_calls;
23051c0b2f7Stbbdev return my_return;
23151c0b2f7Stbbdev }
23251c0b2f7Stbbdev };
23351c0b2f7Stbbdev
23451c0b2f7Stbbdev template<typename T, typename U>
23551c0b2f7Stbbdev class middle_filter<T*,U*> : DestroyedTracker {
23651c0b2f7Stbbdev public:
operator ()(T * my_storage) const23751c0b2f7Stbbdev U* operator()(T* my_storage) const {
23851c0b2f7Stbbdev free_on_scope_exit<T> my_ptr(my_storage); // free_on_scope_exit marks the buffer available
23951c0b2f7Stbbdev CHECK(is_alive());
24051c0b2f7Stbbdev if(my_storage) {
24151c0b2f7Stbbdev CHECK_MESSAGE(!middle_my_id(*my_storage), "bad id value, p:" << concurrency);
24251c0b2f7Stbbdev CHECK_MESSAGE(!middle_is_ready(*my_storage), "Already ready, p:" << concurrency );
24351c0b2f7Stbbdev }
24457f524caSIlya Isaev // may have been passed a nullptr
24551c0b2f7Stbbdev ++pointer_specialized_calls;
24651c0b2f7Stbbdev if(!my_storage) return nullptr;
24751c0b2f7Stbbdev CHECK_MESSAGE(!middle_my_id(*my_storage), "bad id value, p:" << concurrency);
24851c0b2f7Stbbdev CHECK_MESSAGE(!middle_is_ready(*my_storage), "Already ready, p:" << concurrency );
24951c0b2f7Stbbdev U* my_return = new(fetchNextBuffer()) U();
25051c0b2f7Stbbdev my_function(*my_return);
25151c0b2f7Stbbdev return my_return;
25251c0b2f7Stbbdev }
25351c0b2f7Stbbdev };
25451c0b2f7Stbbdev
25551c0b2f7Stbbdev // specialization for int that squares the input and returns that.
25651c0b2f7Stbbdev template<>
25751c0b2f7Stbbdev class middle_filter<int,int> : DestroyedTracker {
25851c0b2f7Stbbdev public:
operator ()(int my_input) const25951c0b2f7Stbbdev int operator()(int my_input) const {
26051c0b2f7Stbbdev CHECK(is_alive());
26151c0b2f7Stbbdev ++non_pointer_specialized_calls;
26251c0b2f7Stbbdev return my_input*my_input;
26351c0b2f7Stbbdev }
26451c0b2f7Stbbdev };
26551c0b2f7Stbbdev
26651c0b2f7Stbbdev // ---------------------------------
26751c0b2f7Stbbdev template<typename T>
26851c0b2f7Stbbdev class output_filter : DestroyedTracker {
26951c0b2f7Stbbdev public:
operator ()(T c) const27051c0b2f7Stbbdev void operator()(T c) const {
27151c0b2f7Stbbdev CHECK(is_alive());
27251c0b2f7Stbbdev CHECK_MESSAGE(output_my_id(c), "unset id value, p:" << concurrency);
27351c0b2f7Stbbdev CHECK_MESSAGE(output_is_ready(c), "not yet ready, p:" << concurrency);
27451c0b2f7Stbbdev ++non_pointer_specialized_calls;
27551c0b2f7Stbbdev output_counter++;
27651c0b2f7Stbbdev }
27751c0b2f7Stbbdev };
27851c0b2f7Stbbdev
27951c0b2f7Stbbdev // specialization for int that puts the received value in an array
28051c0b2f7Stbbdev template<>
28151c0b2f7Stbbdev class output_filter<int> : DestroyedTracker {
28251c0b2f7Stbbdev public:
operator ()(int my_input) const28351c0b2f7Stbbdev void operator()(int my_input) const {
28451c0b2f7Stbbdev CHECK(is_alive());
28551c0b2f7Stbbdev ++non_pointer_specialized_calls;
28651c0b2f7Stbbdev int myindx = output_counter++;
28751c0b2f7Stbbdev intbuffer[myindx] = my_input;
28851c0b2f7Stbbdev }
28951c0b2f7Stbbdev };
29051c0b2f7Stbbdev
29151c0b2f7Stbbdev template<typename T>
29251c0b2f7Stbbdev class output_filter<T*> : DestroyedTracker {
29351c0b2f7Stbbdev public:
operator ()(T * c) const29451c0b2f7Stbbdev void operator()(T* c) const {
29551c0b2f7Stbbdev free_on_scope_exit<T> my_ptr(c);
29651c0b2f7Stbbdev CHECK(is_alive());
29751c0b2f7Stbbdev if(c) {
29851c0b2f7Stbbdev CHECK_MESSAGE(output_my_id(*c), "unset id value, p:" << concurrency);
29951c0b2f7Stbbdev CHECK_MESSAGE(output_is_ready(*c), "not yet ready, p:" << concurrency);
30051c0b2f7Stbbdev }
30151c0b2f7Stbbdev output_counter++;
30251c0b2f7Stbbdev ++pointer_specialized_calls;
30351c0b2f7Stbbdev }
30451c0b2f7Stbbdev };
30551c0b2f7Stbbdev
30651c0b2f7Stbbdev typedef enum {
30751c0b2f7Stbbdev no_pointer_counts,
30851c0b2f7Stbbdev assert_nonpointer,
30951c0b2f7Stbbdev assert_firstpointer,
31051c0b2f7Stbbdev assert_secondpointer,
31151c0b2f7Stbbdev assert_allpointer
31251c0b2f7Stbbdev } final_assert_type;
31351c0b2f7Stbbdev
resetCounters()31451c0b2f7Stbbdev void resetCounters() {
31551c0b2f7Stbbdev output_counter = 0;
31651c0b2f7Stbbdev input_counter = max_counter;
31751c0b2f7Stbbdev non_pointer_specialized_calls = 0;
31851c0b2f7Stbbdev pointer_specialized_calls = 0;
31951c0b2f7Stbbdev first_pointer_specialized_calls = 0;
32051c0b2f7Stbbdev second_pointer_specialized_calls = 0;
32151c0b2f7Stbbdev // we have to reset the buffer flags because our input filters return allocated space on end-of-input,
32251c0b2f7Stbbdev // (on eof a default-constructed object is returned) and they do not pass through the filter further.
32351c0b2f7Stbbdev for(size_t i = 0; i < n_buffers; ++i)
32451c0b2f7Stbbdev buf_in_use[i].clear();
32551c0b2f7Stbbdev }
32651c0b2f7Stbbdev
checkCounters(final_assert_type my_t)32751c0b2f7Stbbdev void checkCounters(final_assert_type my_t) {
32851c0b2f7Stbbdev CHECK_MESSAGE(output_counter == max_counter, "Ran out of buffers, p:" << concurrency);
32951c0b2f7Stbbdev switch(my_t) {
33051c0b2f7Stbbdev case assert_nonpointer:
33151c0b2f7Stbbdev CHECK_MESSAGE(pointer_specialized_calls+first_pointer_specialized_calls+second_pointer_specialized_calls == 0, "non-pointer filters specialized to pointer, p:" << concurrency);
33251c0b2f7Stbbdev CHECK_MESSAGE(non_pointer_specialized_calls == 3*max_counter, "bad count for non-pointer filters, p:" << concurrency);
33351c0b2f7Stbbdev if(check_intbuffer) {
33451c0b2f7Stbbdev for(int i = 1; i <= max_counter; ++i) {
33551c0b2f7Stbbdev int j = i*i;
33651c0b2f7Stbbdev bool found_val = false;
33751c0b2f7Stbbdev for(int k = 0; k < max_counter; ++k) {
33851c0b2f7Stbbdev if(intbuffer[k] == j) {
33951c0b2f7Stbbdev found_val = true;
34051c0b2f7Stbbdev break;
34151c0b2f7Stbbdev }
34251c0b2f7Stbbdev }
34351c0b2f7Stbbdev CHECK_MESSAGE(found_val, "Missing value in output array, p:" << concurrency );
34451c0b2f7Stbbdev }
34551c0b2f7Stbbdev }
34651c0b2f7Stbbdev break;
34751c0b2f7Stbbdev case assert_firstpointer:
34851c0b2f7Stbbdev {
34951c0b2f7Stbbdev bool check = pointer_specialized_calls == max_counter && // input filter extra invocation
35051c0b2f7Stbbdev first_pointer_specialized_calls == max_counter &&
35151c0b2f7Stbbdev non_pointer_specialized_calls == max_counter &&
35251c0b2f7Stbbdev second_pointer_specialized_calls == 0;
35351c0b2f7Stbbdev CHECK_MESSAGE(check, "incorrect specialization for firstpointer, p:" << concurrency);
35451c0b2f7Stbbdev }
35551c0b2f7Stbbdev break;
35651c0b2f7Stbbdev case assert_secondpointer:
35751c0b2f7Stbbdev {
35851c0b2f7Stbbdev bool check = pointer_specialized_calls == max_counter &&
35951c0b2f7Stbbdev first_pointer_specialized_calls == 0 &&
36051c0b2f7Stbbdev non_pointer_specialized_calls == max_counter && // input filter
36151c0b2f7Stbbdev second_pointer_specialized_calls == max_counter;
36251c0b2f7Stbbdev CHECK_MESSAGE(check, "incorrect specialization for firstpointer, p:" << concurrency);
36351c0b2f7Stbbdev }
36451c0b2f7Stbbdev break;
36551c0b2f7Stbbdev case assert_allpointer:
36651c0b2f7Stbbdev CHECK_MESSAGE(non_pointer_specialized_calls+first_pointer_specialized_calls+second_pointer_specialized_calls == 0, "pointer filters specialized to non-pointer, p:" << concurrency);
36751c0b2f7Stbbdev CHECK_MESSAGE(pointer_specialized_calls == 3*max_counter, "bad count for pointer filters, p:" << concurrency);
36851c0b2f7Stbbdev break;
36951c0b2f7Stbbdev case no_pointer_counts:
37051c0b2f7Stbbdev break;
37151c0b2f7Stbbdev }
37251c0b2f7Stbbdev }
37351c0b2f7Stbbdev
37451c0b2f7Stbbdev static const tbb::filter_mode filter_table[] = { tbb::filter_mode::parallel, tbb::filter_mode::serial_in_order, tbb::filter_mode::serial_out_of_order};
37551c0b2f7Stbbdev const unsigned number_of_filter_types = sizeof(filter_table)/sizeof(filter_table[0]);
37651c0b2f7Stbbdev
37751c0b2f7Stbbdev using filter_chain = tbb::filter<void, void>;
37851c0b2f7Stbbdev using mode_array =tbb::filter_mode;
37951c0b2f7Stbbdev
38051c0b2f7Stbbdev // The filters are passed by value, which forces a temporary copy to be created. This is
38151c0b2f7Stbbdev // to reproduce the bug where a filter_chain uses refs to filters, which after a call
38251c0b2f7Stbbdev // would be references to destructed temporaries.
38351c0b2f7Stbbdev template<typename type1, typename type2>
fill_chain(filter_chain & my_chain,mode_array * filter_type,input_filter<type1> i_filter,middle_filter<type1,type2> m_filter,output_filter<type2> o_filter)38451c0b2f7Stbbdev void fill_chain( filter_chain &my_chain, mode_array *filter_type, input_filter<type1> i_filter,
38551c0b2f7Stbbdev middle_filter<type1, type2> m_filter, output_filter<type2> o_filter ) {
38651c0b2f7Stbbdev my_chain = tbb::filter<void, type1>(filter_type[0], i_filter) &
38751c0b2f7Stbbdev tbb::filter<type1, type2>(filter_type[1], m_filter) &
38851c0b2f7Stbbdev tbb::filter<type2, void>(filter_type[2], o_filter);
38951c0b2f7Stbbdev }
39051c0b2f7Stbbdev
39151c0b2f7Stbbdev template<typename... Context>
run_function_spec(Context &...context)39251c0b2f7Stbbdev void run_function_spec(Context&... context) {
39351c0b2f7Stbbdev CHECK_MESSAGE(!filter_node_count, "invalid filter_node counter");
39451c0b2f7Stbbdev input_filter<void> i_filter;
39551c0b2f7Stbbdev // Test pipeline that contains only one filter
39651c0b2f7Stbbdev for( unsigned i = 0; i<number_of_filter_types; i++) {
39751c0b2f7Stbbdev tbb::filter<void, void> one_filter( filter_table[i], i_filter );
39851c0b2f7Stbbdev CHECK_MESSAGE(filter_node_count==1, "some filter nodes left after previous iteration?");
39951c0b2f7Stbbdev resetCounters();
40051c0b2f7Stbbdev tbb::parallel_pipeline( n_tokens, one_filter, context... );
40151c0b2f7Stbbdev // no need to check counters
40251c0b2f7Stbbdev std::atomic<int> counter;
40351c0b2f7Stbbdev counter = max_counter;
40451c0b2f7Stbbdev // Construct filter using lambda-syntax when parallel_pipeline() is being run;
40551c0b2f7Stbbdev tbb::parallel_pipeline( n_tokens,
40651c0b2f7Stbbdev tbb::filter<void, void>(filter_table[i], [&counter]( tbb::flow_control& control ) {
40751c0b2f7Stbbdev if( counter-- == 0 )
40851c0b2f7Stbbdev control.stop();
40951c0b2f7Stbbdev }
41051c0b2f7Stbbdev ),
41151c0b2f7Stbbdev context...
41251c0b2f7Stbbdev );
41351c0b2f7Stbbdev }
41451c0b2f7Stbbdev CHECK_MESSAGE(!filter_node_count, "filter_node objects leaked");
41551c0b2f7Stbbdev }
41651c0b2f7Stbbdev
41751c0b2f7Stbbdev template<typename t1, typename t2, typename... Context>
run_filter_set(input_filter<t1> & i_filter,middle_filter<t1,t2> & m_filter,output_filter<t2> & o_filter,mode_array * filter_type,final_assert_type my_t,Context &...context)41851c0b2f7Stbbdev void run_filter_set(
41951c0b2f7Stbbdev input_filter<t1>& i_filter,
42051c0b2f7Stbbdev middle_filter<t1,t2>& m_filter,
42151c0b2f7Stbbdev output_filter<t2>& o_filter,
42251c0b2f7Stbbdev mode_array *filter_type,
42351c0b2f7Stbbdev final_assert_type my_t,
42451c0b2f7Stbbdev Context&... context) {
42551c0b2f7Stbbdev tbb::filter<void, t1> filter1( filter_type[0], i_filter );
42651c0b2f7Stbbdev tbb::filter<t1, t2> filter2( filter_type[1], m_filter );
42751c0b2f7Stbbdev tbb::filter<t2, void> filter3( filter_type[2], o_filter );
42851c0b2f7Stbbdev
42951c0b2f7Stbbdev CHECK_MESSAGE(filter_node_count==3, "some filter nodes left after previous iteration?");
43051c0b2f7Stbbdev resetCounters();
43151c0b2f7Stbbdev // Create filters sequence when parallel_pipeline() is being run
43251c0b2f7Stbbdev tbb::parallel_pipeline( n_tokens, filter1, filter2, filter3, context... );
43351c0b2f7Stbbdev checkCounters(my_t);
43451c0b2f7Stbbdev
43551c0b2f7Stbbdev // Create filters sequence partially outside parallel_pipeline() and also when parallel_pipeline() is being run
43651c0b2f7Stbbdev tbb::filter<void, t2> filter12;
43751c0b2f7Stbbdev filter12 = filter1 & filter2;
43851c0b2f7Stbbdev for( int i = 0; i<3; i++)
43951c0b2f7Stbbdev {
44051c0b2f7Stbbdev filter12 &= tbb::filter<t2,t2>(filter_type[i], [](t2 x) -> t2 { return x;});
44151c0b2f7Stbbdev }
44251c0b2f7Stbbdev resetCounters();
44351c0b2f7Stbbdev tbb::parallel_pipeline( n_tokens, filter12, filter3, context... );
44451c0b2f7Stbbdev checkCounters(my_t);
44551c0b2f7Stbbdev
44651c0b2f7Stbbdev tbb::filter<void, void> filter123 = filter12 & filter3;
44751c0b2f7Stbbdev // Run pipeline twice with the same filter sequence
44851c0b2f7Stbbdev for( unsigned i = 0; i<2; i++ ) {
44951c0b2f7Stbbdev resetCounters();
45051c0b2f7Stbbdev tbb::parallel_pipeline( n_tokens, filter123, context... );
45151c0b2f7Stbbdev checkCounters(my_t);
45251c0b2f7Stbbdev }
45351c0b2f7Stbbdev
45451c0b2f7Stbbdev // Now copy-and-move-construct another filter instance, and use it to run pipeline
45551c0b2f7Stbbdev {
45651c0b2f7Stbbdev tbb::filter<void, void> copy123( filter123 );
45751c0b2f7Stbbdev resetCounters();
45851c0b2f7Stbbdev tbb::parallel_pipeline( n_tokens, copy123, context... );
45951c0b2f7Stbbdev checkCounters(my_t);
46051c0b2f7Stbbdev tbb::filter<void, void> move123( std::move(filter123) );
46151c0b2f7Stbbdev resetCounters();
46251c0b2f7Stbbdev tbb::parallel_pipeline( n_tokens, move123, context... );
46351c0b2f7Stbbdev checkCounters(my_t);
46451c0b2f7Stbbdev }
46551c0b2f7Stbbdev
46651c0b2f7Stbbdev // Construct filters and create the sequence when parallel_pipeline() is being run
46751c0b2f7Stbbdev resetCounters();
46851c0b2f7Stbbdev tbb::parallel_pipeline( n_tokens,
46951c0b2f7Stbbdev tbb::filter<void, t1>(filter_type[0], i_filter),
47051c0b2f7Stbbdev tbb::filter<t1, t2>(filter_type[1], m_filter),
47151c0b2f7Stbbdev tbb::filter<t2, void>(filter_type[2], o_filter),
47251c0b2f7Stbbdev context... );
47351c0b2f7Stbbdev checkCounters(my_t);
47451c0b2f7Stbbdev
47551c0b2f7Stbbdev // Construct filters, make a copy, destroy the original filters, and run with the copy
47651c0b2f7Stbbdev int cnt = filter_node_count;
47751c0b2f7Stbbdev {
47851c0b2f7Stbbdev tbb::filter<void, void>* p123 = new tbb::filter<void,void> (
47951c0b2f7Stbbdev tbb::filter<void, t1>(filter_type[0], i_filter)&
48051c0b2f7Stbbdev tbb::filter<t1, t2>(filter_type[1], m_filter)&
48151c0b2f7Stbbdev tbb::filter<t2, void>(filter_type[2], o_filter) );
48251c0b2f7Stbbdev CHECK_MESSAGE(filter_node_count==cnt+5, "filter node accounting error?");
48351c0b2f7Stbbdev tbb::filter<void, void> copy123( *p123 );
48451c0b2f7Stbbdev delete p123;
48551c0b2f7Stbbdev CHECK_MESSAGE(filter_node_count==cnt+5, "filter nodes deleted prematurely?");
48651c0b2f7Stbbdev resetCounters();
48751c0b2f7Stbbdev tbb::parallel_pipeline( n_tokens, copy123, context... );
48851c0b2f7Stbbdev checkCounters(my_t);
48951c0b2f7Stbbdev }
49051c0b2f7Stbbdev
49151c0b2f7Stbbdev // construct a filter with temporaries
49251c0b2f7Stbbdev {
49351c0b2f7Stbbdev tbb::filter<void, void> my_filter;
49451c0b2f7Stbbdev fill_chain<t1,t2>( my_filter, filter_type, i_filter, m_filter, o_filter );
49551c0b2f7Stbbdev resetCounters();
49651c0b2f7Stbbdev tbb::parallel_pipeline( n_tokens, my_filter, context... );
49751c0b2f7Stbbdev checkCounters(my_t);
49851c0b2f7Stbbdev }
49951c0b2f7Stbbdev CHECK_MESSAGE(filter_node_count==cnt, "scope ended but filter nodes not deleted?");
50051c0b2f7Stbbdev }
50151c0b2f7Stbbdev
50251c0b2f7Stbbdev template <typename t1, typename t2, typename... Context>
run_lambdas_test(mode_array * filter_type,Context &...context)50351c0b2f7Stbbdev void run_lambdas_test( mode_array *filter_type, Context&... context ) {
50451c0b2f7Stbbdev std::atomic<int> counter;
50551c0b2f7Stbbdev counter = max_counter;
50651c0b2f7Stbbdev // Construct filters using lambda-syntax and create the sequence when parallel_pipeline() is being run;
50751c0b2f7Stbbdev resetCounters(); // only need the output_counter reset.
50851c0b2f7Stbbdev tbb::parallel_pipeline( n_tokens,
50951c0b2f7Stbbdev tbb::make_filter<void, t1>(filter_type[0], [&counter]( tbb::flow_control& control ) -> t1 {
51051c0b2f7Stbbdev if( --counter < 0 )
51151c0b2f7Stbbdev control.stop();
51251c0b2f7Stbbdev return t1(); }
51351c0b2f7Stbbdev ),
51451c0b2f7Stbbdev tbb::make_filter<t1, t2>(filter_type[1], []( t1 /*my_storage*/ ) -> t2 {
51551c0b2f7Stbbdev return t2(); }
51651c0b2f7Stbbdev ),
51751c0b2f7Stbbdev tbb::make_filter<t2,void>(filter_type[2], [] ( t2 ) -> void {
51851c0b2f7Stbbdev output_counter++; }
51951c0b2f7Stbbdev ),
52051c0b2f7Stbbdev context...
52151c0b2f7Stbbdev );
52251c0b2f7Stbbdev checkCounters(no_pointer_counts); // don't have to worry about specializations
52351c0b2f7Stbbdev counter = max_counter;
52451c0b2f7Stbbdev // pointer filters
52551c0b2f7Stbbdev resetCounters();
52651c0b2f7Stbbdev tbb::parallel_pipeline( n_tokens,
52751c0b2f7Stbbdev tbb::filter<void,t1*>(filter_type[0], [&counter]( tbb::flow_control& control ) -> t1* {
52851c0b2f7Stbbdev if( --counter < 0 ) {
52951c0b2f7Stbbdev control.stop();
53051c0b2f7Stbbdev return nullptr;
53151c0b2f7Stbbdev }
53251c0b2f7Stbbdev return new(fetchNextBuffer()) t1(); }
53351c0b2f7Stbbdev ),
53451c0b2f7Stbbdev tbb::filter<t1*, t2*>(filter_type[1], []( t1* my_storage ) -> t2* {
53551c0b2f7Stbbdev my_storage->~t1();
53651c0b2f7Stbbdev return new(my_storage) t2(); }
53751c0b2f7Stbbdev ),
53851c0b2f7Stbbdev tbb::filter<t2*, void>(filter_type[2], [] ( t2* my_storage ) -> void {
53951c0b2f7Stbbdev my_storage->~t2();
54051c0b2f7Stbbdev freeBuffer(my_storage);
54151c0b2f7Stbbdev output_counter++; }
54251c0b2f7Stbbdev ),
54351c0b2f7Stbbdev context...
54451c0b2f7Stbbdev );
54551c0b2f7Stbbdev checkCounters(no_pointer_counts);
54651c0b2f7Stbbdev // first filter outputs pointer
54751c0b2f7Stbbdev counter = max_counter;
54851c0b2f7Stbbdev resetCounters();
54951c0b2f7Stbbdev tbb::parallel_pipeline( n_tokens,
55051c0b2f7Stbbdev tbb::make_filter(filter_type[0], [&counter]( tbb::flow_control& control ) -> t1* {
55151c0b2f7Stbbdev if( --counter < 0 ) {
55251c0b2f7Stbbdev control.stop();
55351c0b2f7Stbbdev return nullptr;
55451c0b2f7Stbbdev }
55551c0b2f7Stbbdev return new(fetchNextBuffer()) t1(); }
55651c0b2f7Stbbdev )&
55751c0b2f7Stbbdev tbb::make_filter(filter_type[1], []( t1* my_storage ) -> t2 {
55851c0b2f7Stbbdev my_storage->~t1();
55951c0b2f7Stbbdev freeBuffer(my_storage);
56051c0b2f7Stbbdev return t2(); }
56151c0b2f7Stbbdev ),
56251c0b2f7Stbbdev tbb::make_filter(filter_type[2], [] ( t2 /*my_storage*/) -> void {
56351c0b2f7Stbbdev output_counter++; }
56451c0b2f7Stbbdev ),
56551c0b2f7Stbbdev context...
56651c0b2f7Stbbdev );
56751c0b2f7Stbbdev checkCounters(no_pointer_counts);
56851c0b2f7Stbbdev // second filter outputs pointer
56951c0b2f7Stbbdev counter = max_counter;
57051c0b2f7Stbbdev resetCounters();
57151c0b2f7Stbbdev tbb::parallel_pipeline( n_tokens,
57251c0b2f7Stbbdev tbb::make_filter(filter_type[0], [&counter]( tbb::flow_control& control ) -> t1 {
57351c0b2f7Stbbdev if( --counter < 0 ) {
57451c0b2f7Stbbdev control.stop();
57551c0b2f7Stbbdev }
57651c0b2f7Stbbdev return t1(); }
57751c0b2f7Stbbdev ),
57851c0b2f7Stbbdev tbb::filter<t1, t2*>(filter_type[1], []( t1 /*my_storage*/ ) -> t2* {
57951c0b2f7Stbbdev return new(fetchNextBuffer()) t2(); }
58051c0b2f7Stbbdev )&
58151c0b2f7Stbbdev tbb::make_filter<t2*, void>(filter_type[2], [] ( t2* my_storage) -> void {
58251c0b2f7Stbbdev my_storage->~t2();
58351c0b2f7Stbbdev freeBuffer(my_storage);
58451c0b2f7Stbbdev output_counter++; }
58551c0b2f7Stbbdev ),
58651c0b2f7Stbbdev context...
58751c0b2f7Stbbdev );
58851c0b2f7Stbbdev checkCounters(no_pointer_counts);
58951c0b2f7Stbbdev }
59051c0b2f7Stbbdev
59151c0b2f7Stbbdev template<typename type1, typename type2>
run_function(const char * l1,const char * l2)59251c0b2f7Stbbdev void run_function(const char *l1, const char *l2) {
59351c0b2f7Stbbdev CHECK_MESSAGE(!filter_node_count, "invalid filter_node counter");
59451c0b2f7Stbbdev
59551c0b2f7Stbbdev check_intbuffer = (!strcmp(l1,"int") && !strcmp(l2,"int"));
59651c0b2f7Stbbdev
59751c0b2f7Stbbdev Checker<type1> check1; // check constructions/destructions
59851c0b2f7Stbbdev Checker<type2> check2; // for type1 or type2 === CheckType<T>
59951c0b2f7Stbbdev
60051c0b2f7Stbbdev const size_t number_of_filters = 3;
60151c0b2f7Stbbdev
60251c0b2f7Stbbdev input_filter<type1> i_filter;
60351c0b2f7Stbbdev input_filter<type1*> p_i_filter;
60451c0b2f7Stbbdev
60551c0b2f7Stbbdev middle_filter<type1, type2> m_filter;
60651c0b2f7Stbbdev middle_filter<type1*, type2> pr_m_filter;
60751c0b2f7Stbbdev middle_filter<type1, type2*> rp_m_filter;
60851c0b2f7Stbbdev middle_filter<type1*, type2*> pp_m_filter;
60951c0b2f7Stbbdev
61051c0b2f7Stbbdev output_filter<type2> o_filter;
61151c0b2f7Stbbdev output_filter<type2*> p_o_filter;
61251c0b2f7Stbbdev
61351c0b2f7Stbbdev // allocate the buffers for the filters
61451c0b2f7Stbbdev unsigned max_size = (sizeof(type1) > sizeof(type2) ) ? sizeof(type1) : sizeof(type2);
61551c0b2f7Stbbdev for(unsigned i = 0; i < (unsigned)n_buffers; ++i) {
61651c0b2f7Stbbdev buffers[i] = malloc(max_size);
61751c0b2f7Stbbdev buf_in_use[i].clear();
61851c0b2f7Stbbdev }
61951c0b2f7Stbbdev
62051c0b2f7Stbbdev unsigned limit = 1;
62151c0b2f7Stbbdev // Test pipeline that contains number_of_filters filters
62251c0b2f7Stbbdev for( unsigned i=0; i<number_of_filters; ++i)
62351c0b2f7Stbbdev limit *= number_of_filter_types;
62451c0b2f7Stbbdev // Iterate over possible filter sequences
62551c0b2f7Stbbdev for( unsigned numeral=0; numeral<limit; ++numeral ) {
62651c0b2f7Stbbdev unsigned temp = numeral;
62751c0b2f7Stbbdev tbb::filter_mode filter_type[number_of_filter_types];
62851c0b2f7Stbbdev for( unsigned i=0; i<number_of_filters; ++i, temp/=number_of_filter_types )
62951c0b2f7Stbbdev filter_type[i] = filter_table[temp%number_of_filter_types];
63051c0b2f7Stbbdev
63151c0b2f7Stbbdev tbb::task_group_context context;
63251c0b2f7Stbbdev run_filter_set<type1,type2>(i_filter, m_filter, o_filter, filter_type, assert_nonpointer);
63351c0b2f7Stbbdev run_filter_set<type1,type2>(i_filter, m_filter, o_filter, filter_type, assert_nonpointer, context);
63451c0b2f7Stbbdev run_filter_set<type1*,type2>(p_i_filter, pr_m_filter, o_filter, filter_type, assert_firstpointer);
63551c0b2f7Stbbdev run_filter_set<type1*,type2>(p_i_filter, pr_m_filter, o_filter, filter_type, assert_firstpointer, context);
63651c0b2f7Stbbdev run_filter_set<type1,type2*>(i_filter, rp_m_filter, p_o_filter, filter_type, assert_secondpointer);
63751c0b2f7Stbbdev run_filter_set<type1,type2*>(i_filter, rp_m_filter, p_o_filter, filter_type, assert_secondpointer, context);
63851c0b2f7Stbbdev run_filter_set<type1*,type2*>(p_i_filter, pp_m_filter, p_o_filter, filter_type, assert_allpointer);
63951c0b2f7Stbbdev run_filter_set<type1*,type2*>(p_i_filter, pp_m_filter, p_o_filter, filter_type, assert_allpointer, context);
64051c0b2f7Stbbdev
64151c0b2f7Stbbdev run_lambdas_test<type1,type2>(filter_type);
64251c0b2f7Stbbdev run_lambdas_test<type1,type2>(filter_type, context);
64351c0b2f7Stbbdev }
64451c0b2f7Stbbdev CHECK_MESSAGE(!filter_node_count, "filter_node objects leaked");
64551c0b2f7Stbbdev
64651c0b2f7Stbbdev for(unsigned i = 0; i < (unsigned)n_buffers; ++i) {
64751c0b2f7Stbbdev free(buffers[i]);
64851c0b2f7Stbbdev }
64951c0b2f7Stbbdev }
65051c0b2f7Stbbdev
65151c0b2f7Stbbdev //! Testing single filter pipeline
65251c0b2f7Stbbdev //! \brief \ref error_guessing
65351c0b2f7Stbbdev TEST_CASE("Pipeline testing for single filter") {
65451c0b2f7Stbbdev run_function_spec();
65551c0b2f7Stbbdev tbb::task_group_context context;
65651c0b2f7Stbbdev run_function_spec(context);
65751c0b2f7Stbbdev }
65851c0b2f7Stbbdev
65951c0b2f7Stbbdev #define RUN_TYPED_TEST_CASE(type1, type2) TEST_CASE("Pipeline testing with "#type1" and "#type2) { \
66051c0b2f7Stbbdev for ( std::size_t concurrency_level : {1, 2, 4, 5, 7, 8} ) { \
66151c0b2f7Stbbdev if ( concurrency_level > tbb::global_control::active_value(tbb::global_control::max_allowed_parallelism) ) \
66251c0b2f7Stbbdev break; \
66351c0b2f7Stbbdev concurrency = concurrency_level; \
66451c0b2f7Stbbdev tbb::global_control control(tbb::global_control::max_allowed_parallelism, concurrency_level); \
66551c0b2f7Stbbdev run_function<type1, type2>(#type1, #type2); \
66651c0b2f7Stbbdev } \
66751c0b2f7Stbbdev }
66851c0b2f7Stbbdev // Run test several times with different types
66951c0b2f7Stbbdev RUN_TYPED_TEST_CASE(std::size_t, int)
67051c0b2f7Stbbdev RUN_TYPED_TEST_CASE(int, double)
67151c0b2f7Stbbdev RUN_TYPED_TEST_CASE(std::size_t, double)
67251c0b2f7Stbbdev RUN_TYPED_TEST_CASE(std::size_t, bool)
67351c0b2f7Stbbdev RUN_TYPED_TEST_CASE(int, int)
67451c0b2f7Stbbdev RUN_TYPED_TEST_CASE(CheckType<unsigned int>, std::size_t)
67551c0b2f7Stbbdev RUN_TYPED_TEST_CASE(CheckType<unsigned short>, std::size_t)
67651c0b2f7Stbbdev RUN_TYPED_TEST_CASE(CheckType<unsigned int>, CheckType<unsigned int>)
67751c0b2f7Stbbdev RUN_TYPED_TEST_CASE(CheckType<unsigned int>, CheckType<unsigned short>)
67851c0b2f7Stbbdev RUN_TYPED_TEST_CASE(CheckType<unsigned short>, CheckType<unsigned short>)
67951c0b2f7Stbbdev RUN_TYPED_TEST_CASE(double, CheckType<unsigned short>)
68051c0b2f7Stbbdev RUN_TYPED_TEST_CASE(std::unique_ptr<int>, std::unique_ptr<int>) // move-only type
68151c0b2f7Stbbdev
68251c0b2f7Stbbdev #undef RUN_TYPED_TEST_CASE
683