xref: /oneTBB/test/tbb/test_semaphore.cpp (revision c21e688a)
151c0b2f7Stbbdev /*
2*c21e688aSSergey Zheltov     Copyright (c) 2005-2022 Intel Corporation
351c0b2f7Stbbdev 
451c0b2f7Stbbdev     Licensed under the Apache License, Version 2.0 (the "License");
551c0b2f7Stbbdev     you may not use this file except in compliance with the License.
651c0b2f7Stbbdev     You may obtain a copy of the License at
751c0b2f7Stbbdev 
851c0b2f7Stbbdev         http://www.apache.org/licenses/LICENSE-2.0
951c0b2f7Stbbdev 
1051c0b2f7Stbbdev     Unless required by applicable law or agreed to in writing, software
1151c0b2f7Stbbdev     distributed under the License is distributed on an "AS IS" BASIS,
1251c0b2f7Stbbdev     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1351c0b2f7Stbbdev     See the License for the specific language governing permissions and
1451c0b2f7Stbbdev     limitations under the License.
1551c0b2f7Stbbdev */
1651c0b2f7Stbbdev 
1751c0b2f7Stbbdev //! \file test_semaphore.cpp
1851c0b2f7Stbbdev //! \brief Test for [internal] functionality
1951c0b2f7Stbbdev 
20478de5b1Stbbdev #if _WIN32 || _WIN64
21478de5b1Stbbdev #define _CRT_SECURE_NO_WARNINGS
22478de5b1Stbbdev #endif
23478de5b1Stbbdev 
2451c0b2f7Stbbdev // Test for counting semaphore
2551c0b2f7Stbbdev #include "common/test.h"
2651c0b2f7Stbbdev #include "common/utils.h"
2751c0b2f7Stbbdev #include "common/spin_barrier.h"
2851c0b2f7Stbbdev #include "tbb/blocked_range.h"
2951c0b2f7Stbbdev #include "tbb/tick_count.h"
3051c0b2f7Stbbdev #include "../../src/tbb/semaphore.h"
3151c0b2f7Stbbdev #include <atomic>
3251c0b2f7Stbbdev #include <vector>
3351c0b2f7Stbbdev 
3451c0b2f7Stbbdev using tbb::detail::r1::semaphore;
3551c0b2f7Stbbdev 
3651c0b2f7Stbbdev std::atomic<int> pCount;
3751c0b2f7Stbbdev utils::SpinBarrier sBarrier;
3851c0b2f7Stbbdev 
3951c0b2f7Stbbdev // Semaphore basis function:
4051c0b2f7Stbbdev //  set semaphore to initial value
4151c0b2f7Stbbdev // see that semaphore only allows that number of threads to be active
4251c0b2f7Stbbdev class Body : utils::NoAssign {
4351c0b2f7Stbbdev     const int nIters;
4451c0b2f7Stbbdev     semaphore& mySem;
4551c0b2f7Stbbdev     std::vector<int>& ourCounts;
4651c0b2f7Stbbdev     std::vector<double>& tottime;
4751c0b2f7Stbbdev 
4851c0b2f7Stbbdev     static constexpr int tickCounts = 1; // millisecond
4951c0b2f7Stbbdev     static constexpr int innerWait = 5; // millisecond
5051c0b2f7Stbbdev public:
Body(int nThread,int nIter,semaphore & sem,std::vector<int> & our_counts,std::vector<double> & tot_time)5151c0b2f7Stbbdev     Body( int nThread, int nIter, semaphore& sem,
5251c0b2f7Stbbdev           std::vector<int>& our_counts, std::vector<double>& tot_time )
5351c0b2f7Stbbdev         : nIters(nIter), mySem(sem), ourCounts(our_counts), tottime(tot_time)
5451c0b2f7Stbbdev     {
5551c0b2f7Stbbdev         sBarrier.initialize(nThread);
5651c0b2f7Stbbdev         pCount = 0;
5751c0b2f7Stbbdev     }
5851c0b2f7Stbbdev 
operator ()(const int tid) const5951c0b2f7Stbbdev     void operator()( const int tid ) const {
6051c0b2f7Stbbdev         sBarrier.wait();
6151c0b2f7Stbbdev 
6251c0b2f7Stbbdev         for (int i = 0; i < nIters; ++i) {
6351c0b2f7Stbbdev             utils::Sleep(tid * tickCounts);
6451c0b2f7Stbbdev             tbb::tick_count t0 = tbb::tick_count::now();
6551c0b2f7Stbbdev             mySem.P();
6651c0b2f7Stbbdev             tbb::tick_count t1 = tbb::tick_count::now();
6751c0b2f7Stbbdev             tottime[tid] += (t1 - t0).seconds();
6851c0b2f7Stbbdev 
6951c0b2f7Stbbdev             int curval = ++pCount;
7051c0b2f7Stbbdev             if (curval > ourCounts[tid]) {
7151c0b2f7Stbbdev                 ourCounts[tid] = curval;
7251c0b2f7Stbbdev             }
7351c0b2f7Stbbdev             utils::Sleep(innerWait);
7451c0b2f7Stbbdev             --pCount;
7551c0b2f7Stbbdev             REQUIRE(int(pCount) >= 0);
7651c0b2f7Stbbdev             mySem.V();
7751c0b2f7Stbbdev         }
7851c0b2f7Stbbdev     }
7951c0b2f7Stbbdev }; // class Body
8051c0b2f7Stbbdev 
test_semaphore(int sem_init_cnt,int extra_threads)8151c0b2f7Stbbdev void test_semaphore( int sem_init_cnt, int extra_threads ) {
8251c0b2f7Stbbdev     semaphore my_sem(sem_init_cnt);
8351c0b2f7Stbbdev     int n_threads = sem_init_cnt + extra_threads;
8451c0b2f7Stbbdev 
8551c0b2f7Stbbdev     std::vector<int> max_vals(n_threads);
8651c0b2f7Stbbdev     std::vector<double> tot_times(n_threads);
8751c0b2f7Stbbdev 
8851c0b2f7Stbbdev     int n_iters = 10;
8951c0b2f7Stbbdev     Body body(n_threads, n_iters, my_sem, max_vals, tot_times);
9051c0b2f7Stbbdev 
9151c0b2f7Stbbdev     pCount = 0;
9251c0b2f7Stbbdev     utils::NativeParallelFor(n_threads, body);
9351c0b2f7Stbbdev     REQUIRE_MESSAGE(!pCount, "not all threads decremented pCount");
9451c0b2f7Stbbdev 
9551c0b2f7Stbbdev     int max_count = -1;
9651c0b2f7Stbbdev     for (auto item : max_vals) {
9751c0b2f7Stbbdev         max_count = utils::max(max_count, item);
9851c0b2f7Stbbdev     }
9951c0b2f7Stbbdev     REQUIRE_MESSAGE(max_count <= sem_init_cnt, "Too many threads in semaphore-protected increment");
10051c0b2f7Stbbdev }
10151c0b2f7Stbbdev 
10251c0b2f7Stbbdev #include "../../src/tbb/semaphore.cpp"
10351c0b2f7Stbbdev #if _WIN32 || _WIN64
10451c0b2f7Stbbdev #include "../../src/tbb/dynamic_link.cpp"
10551c0b2f7Stbbdev #endif
10651c0b2f7Stbbdev 
10751c0b2f7Stbbdev constexpr std::size_t N_TIMES = 1000;
10851c0b2f7Stbbdev 
10951c0b2f7Stbbdev template <typename S>
11051c0b2f7Stbbdev struct Counter {
11151c0b2f7Stbbdev     std::atomic<long> value;
11251c0b2f7Stbbdev     S my_sem;
CounterCounter11351c0b2f7Stbbdev     Counter() : value(0) {}
11451c0b2f7Stbbdev }; // struct Counter
11551c0b2f7Stbbdev 
11651c0b2f7Stbbdev // Function object for use with parallel_for.h
11751c0b2f7Stbbdev template <typename C>
11851c0b2f7Stbbdev struct AddOne : utils::NoAssign {
11951c0b2f7Stbbdev     C& my_counter;
12051c0b2f7Stbbdev 
12151c0b2f7Stbbdev     // Increments counter once for each iteration in the iteration space
operator ()AddOne12251c0b2f7Stbbdev     void operator()( int ) const {
12351c0b2f7Stbbdev         for (std::size_t i = 0; i < N_TIMES; ++i) {
12451c0b2f7Stbbdev             my_counter.my_sem.P();
12551c0b2f7Stbbdev             ++my_counter.value;
12651c0b2f7Stbbdev             my_counter.my_sem.V();
12751c0b2f7Stbbdev         }
12851c0b2f7Stbbdev     }
12951c0b2f7Stbbdev 
AddOneAddOne13051c0b2f7Stbbdev     AddOne( C& c ) : my_counter(c) {
13151c0b2f7Stbbdev         my_counter.my_sem.V();
13251c0b2f7Stbbdev     }
13351c0b2f7Stbbdev }; // struct AddOne
13451c0b2f7Stbbdev 
test_binary_semaphore(int n_threads)13551c0b2f7Stbbdev void test_binary_semaphore( int n_threads ) {
13651c0b2f7Stbbdev     Counter<tbb::detail::r1::binary_semaphore> counter;
13751c0b2f7Stbbdev     AddOne<decltype(counter)> AddOneBody(counter);
13851c0b2f7Stbbdev     utils::NativeParallelFor(n_threads, AddOneBody);
13951c0b2f7Stbbdev     REQUIRE_MESSAGE(n_threads * N_TIMES == counter.value, "Binary semaphore operations P()/V() have a race");
14051c0b2f7Stbbdev }
14151c0b2f7Stbbdev 
14251c0b2f7Stbbdev // Power of 2, the most tokens that can be in flight
14351c0b2f7Stbbdev constexpr std::size_t MAX_TOKENS = 32;
14451c0b2f7Stbbdev enum FilterType { imaProducer, imaConsumer };
14551c0b2f7Stbbdev 
14651c0b2f7Stbbdev class FilterBase : utils::NoAssign {
14751c0b2f7Stbbdev protected:
14851c0b2f7Stbbdev     FilterType ima;
14951c0b2f7Stbbdev     unsigned totTokens; // total number of tokens to be emitted, only used by producer
15051c0b2f7Stbbdev     std::atomic<unsigned>& myTokens;
15151c0b2f7Stbbdev     std::atomic<unsigned>& otherTokens;
15251c0b2f7Stbbdev 
15351c0b2f7Stbbdev     unsigned myWait;
15451c0b2f7Stbbdev     semaphore& my_sem;
15551c0b2f7Stbbdev     semaphore& next_sem;
15651c0b2f7Stbbdev 
15751c0b2f7Stbbdev     unsigned* myBuffer;
15851c0b2f7Stbbdev     unsigned* nextBuffer;
15951c0b2f7Stbbdev     unsigned curToken;
16051c0b2f7Stbbdev public:
FilterBase(FilterType filter,unsigned tot_tokens,std::atomic<unsigned> & my_tokens,std::atomic<unsigned> & other_tokens,unsigned my_wait,semaphore & m_sem,semaphore & n_sem,unsigned * buf,unsigned * n_buf)16151c0b2f7Stbbdev     FilterBase( FilterType filter,
16251c0b2f7Stbbdev                 unsigned tot_tokens,
16351c0b2f7Stbbdev                 std::atomic<unsigned>& my_tokens,
16451c0b2f7Stbbdev                 std::atomic<unsigned>& other_tokens,
16551c0b2f7Stbbdev                 unsigned my_wait,
16651c0b2f7Stbbdev                 semaphore& m_sem,
16751c0b2f7Stbbdev                 semaphore& n_sem,
16851c0b2f7Stbbdev                 unsigned* buf,
16951c0b2f7Stbbdev                 unsigned* n_buf )
17051c0b2f7Stbbdev         : ima(filter), totTokens(tot_tokens), myTokens(my_tokens),
17151c0b2f7Stbbdev           otherTokens(other_tokens), myWait(my_wait), my_sem(m_sem),
17251c0b2f7Stbbdev           next_sem(n_sem), myBuffer(buf), nextBuffer(n_buf)
17351c0b2f7Stbbdev     {
17451c0b2f7Stbbdev         curToken = 0;
17551c0b2f7Stbbdev     }
17651c0b2f7Stbbdev 
17751c0b2f7Stbbdev     void Produce( const int );
17851c0b2f7Stbbdev     void Consume( const int );
operator ()(const int tid)17951c0b2f7Stbbdev     void operator()( const int tid ) {
18051c0b2f7Stbbdev         if (ima == imaConsumer) {
18151c0b2f7Stbbdev             Consume(tid);
18251c0b2f7Stbbdev         } else {
18351c0b2f7Stbbdev             Produce(tid);
18451c0b2f7Stbbdev         }
18551c0b2f7Stbbdev     }
18651c0b2f7Stbbdev }; // class FilterBase
18751c0b2f7Stbbdev 
18851c0b2f7Stbbdev class ProduceConsumeBody {
18951c0b2f7Stbbdev     FilterBase** my_filters;
19051c0b2f7Stbbdev public:
ProduceConsumeBody(FilterBase ** filters)19151c0b2f7Stbbdev     ProduceConsumeBody( FilterBase** filters ) : my_filters(filters) {}
19251c0b2f7Stbbdev 
operator ()(const int tid) const19351c0b2f7Stbbdev     void operator()( const int tid ) const {
19451c0b2f7Stbbdev         my_filters[tid]->operator()(tid);
19551c0b2f7Stbbdev     }
19651c0b2f7Stbbdev }; // class ProduceConsumeBody
19751c0b2f7Stbbdev 
19857f524caSIlya Isaev // send a bunch of non-null "tokens" to consumer, then a nullptr
Produce(const int)19951c0b2f7Stbbdev void FilterBase::Produce( const int ) {
20051c0b2f7Stbbdev     nextBuffer[0] = 0; // just in case we provide no tokens
20151c0b2f7Stbbdev     sBarrier.wait();
20251c0b2f7Stbbdev     while(totTokens) {
20351c0b2f7Stbbdev         while(!myTokens) {
20451c0b2f7Stbbdev             my_sem.P();
20551c0b2f7Stbbdev         }
20651c0b2f7Stbbdev         // we have a slot available
20751c0b2f7Stbbdev         --myTokens; // moving this down reduces spurious wakeups
20851c0b2f7Stbbdev         --totTokens;
20951c0b2f7Stbbdev         if (totTokens) {
21051c0b2f7Stbbdev             nextBuffer[curToken & (MAX_TOKENS - 1)] = curToken * 3 + 1;
21151c0b2f7Stbbdev         } else {
21251c0b2f7Stbbdev             nextBuffer[curToken & (MAX_TOKENS - 1)] = 0;
21351c0b2f7Stbbdev         }
21451c0b2f7Stbbdev         ++curToken;
21551c0b2f7Stbbdev 
21651c0b2f7Stbbdev         utils::Sleep(myWait);
21751c0b2f7Stbbdev         unsigned temp = ++otherTokens;
21851c0b2f7Stbbdev         if (temp == 1) {
21951c0b2f7Stbbdev             next_sem.V();
22051c0b2f7Stbbdev         }
22151c0b2f7Stbbdev     }
22251c0b2f7Stbbdev     next_sem.V(); // final wakeup
22351c0b2f7Stbbdev }
22451c0b2f7Stbbdev 
Consume(const int)22551c0b2f7Stbbdev void FilterBase::Consume( const int ) {
22651c0b2f7Stbbdev     unsigned myToken;
22751c0b2f7Stbbdev     sBarrier.wait();
22851c0b2f7Stbbdev     do {
22951c0b2f7Stbbdev         while( !myTokens ) {
23051c0b2f7Stbbdev             my_sem.P();
23151c0b2f7Stbbdev         }
23251c0b2f7Stbbdev         // we have a slot available
23351c0b2f7Stbbdev         --myTokens;
23451c0b2f7Stbbdev         myToken = myBuffer[curToken & (MAX_TOKENS - 1)];
23551c0b2f7Stbbdev         if (myToken) {
23651c0b2f7Stbbdev             REQUIRE_MESSAGE(myToken == curToken * 3 + 1, "Error in received token");
23751c0b2f7Stbbdev             ++curToken;
23851c0b2f7Stbbdev             utils::Sleep(myWait);
23951c0b2f7Stbbdev             unsigned temp = ++otherTokens;
24051c0b2f7Stbbdev             if (temp == 1) {
24151c0b2f7Stbbdev                 next_sem.V();
24251c0b2f7Stbbdev             }
24351c0b2f7Stbbdev         }
24451c0b2f7Stbbdev     } while(myToken);
24551c0b2f7Stbbdev     // end of processing
24651c0b2f7Stbbdev     REQUIRE_MESSAGE(curToken + 1 == totTokens, "Didn't receive enough tokens");
24751c0b2f7Stbbdev }
24851c0b2f7Stbbdev 
24951c0b2f7Stbbdev // test of producer/consumer with atomic buffer cnt and semaphore
25051c0b2f7Stbbdev // nTokens are total number of tokens through the pipe
25151c0b2f7Stbbdev // pWait is the wait time for the producer
25251c0b2f7Stbbdev // cWait is the wait time for the consumer
test_producer_consumer(unsigned totTokens,unsigned nTokens,unsigned pWait,unsigned cWait)25351c0b2f7Stbbdev void test_producer_consumer( unsigned totTokens, unsigned nTokens, unsigned pWait, unsigned cWait ) {
25451c0b2f7Stbbdev     semaphore p_sem;
25551c0b2f7Stbbdev     semaphore c_sem;
25651c0b2f7Stbbdev     std::atomic<unsigned> p_tokens;
25751c0b2f7Stbbdev     std::atomic<unsigned> c_tokens(0);
25851c0b2f7Stbbdev 
25951c0b2f7Stbbdev     unsigned c_buffer[MAX_TOKENS];
26051c0b2f7Stbbdev     FilterBase* my_filters[2]; // one producer, one concumer
26151c0b2f7Stbbdev 
26251c0b2f7Stbbdev     REQUIRE_MESSAGE(nTokens <= MAX_TOKENS, "Not enough slots for tokens");
26351c0b2f7Stbbdev 
26451c0b2f7Stbbdev     my_filters[0] = new FilterBase(imaProducer, totTokens, p_tokens, c_tokens, pWait, c_sem, p_sem, nullptr, &(c_buffer[0]));
26551c0b2f7Stbbdev     my_filters[1] = new FilterBase(imaConsumer, totTokens, c_tokens, p_tokens, cWait, p_sem, c_sem, c_buffer, nullptr);
26651c0b2f7Stbbdev 
26751c0b2f7Stbbdev     p_tokens = nTokens;
26851c0b2f7Stbbdev     ProduceConsumeBody body(my_filters);
26951c0b2f7Stbbdev     sBarrier.initialize(2);
27051c0b2f7Stbbdev     utils::NativeParallelFor(2, body);
27151c0b2f7Stbbdev     delete my_filters[0];
27251c0b2f7Stbbdev     delete my_filters[1];
27351c0b2f7Stbbdev }
27451c0b2f7Stbbdev 
27551c0b2f7Stbbdev //! \brief \ref error_guessing
27651c0b2f7Stbbdev TEST_CASE("test binary semaphore") {
27751c0b2f7Stbbdev     test_binary_semaphore(utils::MaxThread);
27851c0b2f7Stbbdev }
27951c0b2f7Stbbdev 
28051c0b2f7Stbbdev //! \brief \ref error_guessing
28151c0b2f7Stbbdev TEST_CASE("test semaphore") {
28251c0b2f7Stbbdev     for (int sem_size = 1; sem_size <= int(utils::MaxThread); ++sem_size) {
28351c0b2f7Stbbdev         for (int ex_threads = 0; ex_threads <= int(utils::MaxThread) - sem_size; ++ex_threads) {
28451c0b2f7Stbbdev             test_semaphore(sem_size, ex_threads);
28551c0b2f7Stbbdev         }
28651c0b2f7Stbbdev     }
28751c0b2f7Stbbdev }
28851c0b2f7Stbbdev 
28951c0b2f7Stbbdev //! \brief \ref error_guessing
29051c0b2f7Stbbdev TEST_CASE("test producer-consumer") {
29151c0b2f7Stbbdev     test_producer_consumer(10, 2, 5, 5);
29251c0b2f7Stbbdev     test_producer_consumer(10, 2, 20, 5);
29351c0b2f7Stbbdev     test_producer_consumer(10, 2, 5, 20);
29451c0b2f7Stbbdev 
29551c0b2f7Stbbdev     test_producer_consumer(10, 1, 5, 5);
29651c0b2f7Stbbdev     test_producer_consumer(20, 10, 5, 20);
29751c0b2f7Stbbdev     test_producer_consumer(64, 32, 1, 20);
29851c0b2f7Stbbdev }
299