151c0b2f7Stbbdev /*
2*c21e688aSSergey Zheltov Copyright (c) 2005-2022 Intel Corporation
351c0b2f7Stbbdev
451c0b2f7Stbbdev Licensed under the Apache License, Version 2.0 (the "License");
551c0b2f7Stbbdev you may not use this file except in compliance with the License.
651c0b2f7Stbbdev You may obtain a copy of the License at
751c0b2f7Stbbdev
851c0b2f7Stbbdev http://www.apache.org/licenses/LICENSE-2.0
951c0b2f7Stbbdev
1051c0b2f7Stbbdev Unless required by applicable law or agreed to in writing, software
1151c0b2f7Stbbdev distributed under the License is distributed on an "AS IS" BASIS,
1251c0b2f7Stbbdev WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1351c0b2f7Stbbdev See the License for the specific language governing permissions and
1451c0b2f7Stbbdev limitations under the License.
1551c0b2f7Stbbdev */
1651c0b2f7Stbbdev
1751c0b2f7Stbbdev //! \file test_semaphore.cpp
1851c0b2f7Stbbdev //! \brief Test for [internal] functionality
1951c0b2f7Stbbdev
20478de5b1Stbbdev #if _WIN32 || _WIN64
21478de5b1Stbbdev #define _CRT_SECURE_NO_WARNINGS
22478de5b1Stbbdev #endif
23478de5b1Stbbdev
2451c0b2f7Stbbdev // Test for counting semaphore
2551c0b2f7Stbbdev #include "common/test.h"
2651c0b2f7Stbbdev #include "common/utils.h"
2751c0b2f7Stbbdev #include "common/spin_barrier.h"
2851c0b2f7Stbbdev #include "tbb/blocked_range.h"
2951c0b2f7Stbbdev #include "tbb/tick_count.h"
3051c0b2f7Stbbdev #include "../../src/tbb/semaphore.h"
3151c0b2f7Stbbdev #include <atomic>
3251c0b2f7Stbbdev #include <vector>
3351c0b2f7Stbbdev
3451c0b2f7Stbbdev using tbb::detail::r1::semaphore;
3551c0b2f7Stbbdev
3651c0b2f7Stbbdev std::atomic<int> pCount;
3751c0b2f7Stbbdev utils::SpinBarrier sBarrier;
3851c0b2f7Stbbdev
3951c0b2f7Stbbdev // Semaphore basis function:
4051c0b2f7Stbbdev // set semaphore to initial value
4151c0b2f7Stbbdev // see that semaphore only allows that number of threads to be active
4251c0b2f7Stbbdev class Body : utils::NoAssign {
4351c0b2f7Stbbdev const int nIters;
4451c0b2f7Stbbdev semaphore& mySem;
4551c0b2f7Stbbdev std::vector<int>& ourCounts;
4651c0b2f7Stbbdev std::vector<double>& tottime;
4751c0b2f7Stbbdev
4851c0b2f7Stbbdev static constexpr int tickCounts = 1; // millisecond
4951c0b2f7Stbbdev static constexpr int innerWait = 5; // millisecond
5051c0b2f7Stbbdev public:
Body(int nThread,int nIter,semaphore & sem,std::vector<int> & our_counts,std::vector<double> & tot_time)5151c0b2f7Stbbdev Body( int nThread, int nIter, semaphore& sem,
5251c0b2f7Stbbdev std::vector<int>& our_counts, std::vector<double>& tot_time )
5351c0b2f7Stbbdev : nIters(nIter), mySem(sem), ourCounts(our_counts), tottime(tot_time)
5451c0b2f7Stbbdev {
5551c0b2f7Stbbdev sBarrier.initialize(nThread);
5651c0b2f7Stbbdev pCount = 0;
5751c0b2f7Stbbdev }
5851c0b2f7Stbbdev
operator ()(const int tid) const5951c0b2f7Stbbdev void operator()( const int tid ) const {
6051c0b2f7Stbbdev sBarrier.wait();
6151c0b2f7Stbbdev
6251c0b2f7Stbbdev for (int i = 0; i < nIters; ++i) {
6351c0b2f7Stbbdev utils::Sleep(tid * tickCounts);
6451c0b2f7Stbbdev tbb::tick_count t0 = tbb::tick_count::now();
6551c0b2f7Stbbdev mySem.P();
6651c0b2f7Stbbdev tbb::tick_count t1 = tbb::tick_count::now();
6751c0b2f7Stbbdev tottime[tid] += (t1 - t0).seconds();
6851c0b2f7Stbbdev
6951c0b2f7Stbbdev int curval = ++pCount;
7051c0b2f7Stbbdev if (curval > ourCounts[tid]) {
7151c0b2f7Stbbdev ourCounts[tid] = curval;
7251c0b2f7Stbbdev }
7351c0b2f7Stbbdev utils::Sleep(innerWait);
7451c0b2f7Stbbdev --pCount;
7551c0b2f7Stbbdev REQUIRE(int(pCount) >= 0);
7651c0b2f7Stbbdev mySem.V();
7751c0b2f7Stbbdev }
7851c0b2f7Stbbdev }
7951c0b2f7Stbbdev }; // class Body
8051c0b2f7Stbbdev
test_semaphore(int sem_init_cnt,int extra_threads)8151c0b2f7Stbbdev void test_semaphore( int sem_init_cnt, int extra_threads ) {
8251c0b2f7Stbbdev semaphore my_sem(sem_init_cnt);
8351c0b2f7Stbbdev int n_threads = sem_init_cnt + extra_threads;
8451c0b2f7Stbbdev
8551c0b2f7Stbbdev std::vector<int> max_vals(n_threads);
8651c0b2f7Stbbdev std::vector<double> tot_times(n_threads);
8751c0b2f7Stbbdev
8851c0b2f7Stbbdev int n_iters = 10;
8951c0b2f7Stbbdev Body body(n_threads, n_iters, my_sem, max_vals, tot_times);
9051c0b2f7Stbbdev
9151c0b2f7Stbbdev pCount = 0;
9251c0b2f7Stbbdev utils::NativeParallelFor(n_threads, body);
9351c0b2f7Stbbdev REQUIRE_MESSAGE(!pCount, "not all threads decremented pCount");
9451c0b2f7Stbbdev
9551c0b2f7Stbbdev int max_count = -1;
9651c0b2f7Stbbdev for (auto item : max_vals) {
9751c0b2f7Stbbdev max_count = utils::max(max_count, item);
9851c0b2f7Stbbdev }
9951c0b2f7Stbbdev REQUIRE_MESSAGE(max_count <= sem_init_cnt, "Too many threads in semaphore-protected increment");
10051c0b2f7Stbbdev }
10151c0b2f7Stbbdev
10251c0b2f7Stbbdev #include "../../src/tbb/semaphore.cpp"
10351c0b2f7Stbbdev #if _WIN32 || _WIN64
10451c0b2f7Stbbdev #include "../../src/tbb/dynamic_link.cpp"
10551c0b2f7Stbbdev #endif
10651c0b2f7Stbbdev
10751c0b2f7Stbbdev constexpr std::size_t N_TIMES = 1000;
10851c0b2f7Stbbdev
10951c0b2f7Stbbdev template <typename S>
11051c0b2f7Stbbdev struct Counter {
11151c0b2f7Stbbdev std::atomic<long> value;
11251c0b2f7Stbbdev S my_sem;
CounterCounter11351c0b2f7Stbbdev Counter() : value(0) {}
11451c0b2f7Stbbdev }; // struct Counter
11551c0b2f7Stbbdev
11651c0b2f7Stbbdev // Function object for use with parallel_for.h
11751c0b2f7Stbbdev template <typename C>
11851c0b2f7Stbbdev struct AddOne : utils::NoAssign {
11951c0b2f7Stbbdev C& my_counter;
12051c0b2f7Stbbdev
12151c0b2f7Stbbdev // Increments counter once for each iteration in the iteration space
operator ()AddOne12251c0b2f7Stbbdev void operator()( int ) const {
12351c0b2f7Stbbdev for (std::size_t i = 0; i < N_TIMES; ++i) {
12451c0b2f7Stbbdev my_counter.my_sem.P();
12551c0b2f7Stbbdev ++my_counter.value;
12651c0b2f7Stbbdev my_counter.my_sem.V();
12751c0b2f7Stbbdev }
12851c0b2f7Stbbdev }
12951c0b2f7Stbbdev
AddOneAddOne13051c0b2f7Stbbdev AddOne( C& c ) : my_counter(c) {
13151c0b2f7Stbbdev my_counter.my_sem.V();
13251c0b2f7Stbbdev }
13351c0b2f7Stbbdev }; // struct AddOne
13451c0b2f7Stbbdev
test_binary_semaphore(int n_threads)13551c0b2f7Stbbdev void test_binary_semaphore( int n_threads ) {
13651c0b2f7Stbbdev Counter<tbb::detail::r1::binary_semaphore> counter;
13751c0b2f7Stbbdev AddOne<decltype(counter)> AddOneBody(counter);
13851c0b2f7Stbbdev utils::NativeParallelFor(n_threads, AddOneBody);
13951c0b2f7Stbbdev REQUIRE_MESSAGE(n_threads * N_TIMES == counter.value, "Binary semaphore operations P()/V() have a race");
14051c0b2f7Stbbdev }
14151c0b2f7Stbbdev
14251c0b2f7Stbbdev // Power of 2, the most tokens that can be in flight
14351c0b2f7Stbbdev constexpr std::size_t MAX_TOKENS = 32;
14451c0b2f7Stbbdev enum FilterType { imaProducer, imaConsumer };
14551c0b2f7Stbbdev
14651c0b2f7Stbbdev class FilterBase : utils::NoAssign {
14751c0b2f7Stbbdev protected:
14851c0b2f7Stbbdev FilterType ima;
14951c0b2f7Stbbdev unsigned totTokens; // total number of tokens to be emitted, only used by producer
15051c0b2f7Stbbdev std::atomic<unsigned>& myTokens;
15151c0b2f7Stbbdev std::atomic<unsigned>& otherTokens;
15251c0b2f7Stbbdev
15351c0b2f7Stbbdev unsigned myWait;
15451c0b2f7Stbbdev semaphore& my_sem;
15551c0b2f7Stbbdev semaphore& next_sem;
15651c0b2f7Stbbdev
15751c0b2f7Stbbdev unsigned* myBuffer;
15851c0b2f7Stbbdev unsigned* nextBuffer;
15951c0b2f7Stbbdev unsigned curToken;
16051c0b2f7Stbbdev public:
FilterBase(FilterType filter,unsigned tot_tokens,std::atomic<unsigned> & my_tokens,std::atomic<unsigned> & other_tokens,unsigned my_wait,semaphore & m_sem,semaphore & n_sem,unsigned * buf,unsigned * n_buf)16151c0b2f7Stbbdev FilterBase( FilterType filter,
16251c0b2f7Stbbdev unsigned tot_tokens,
16351c0b2f7Stbbdev std::atomic<unsigned>& my_tokens,
16451c0b2f7Stbbdev std::atomic<unsigned>& other_tokens,
16551c0b2f7Stbbdev unsigned my_wait,
16651c0b2f7Stbbdev semaphore& m_sem,
16751c0b2f7Stbbdev semaphore& n_sem,
16851c0b2f7Stbbdev unsigned* buf,
16951c0b2f7Stbbdev unsigned* n_buf )
17051c0b2f7Stbbdev : ima(filter), totTokens(tot_tokens), myTokens(my_tokens),
17151c0b2f7Stbbdev otherTokens(other_tokens), myWait(my_wait), my_sem(m_sem),
17251c0b2f7Stbbdev next_sem(n_sem), myBuffer(buf), nextBuffer(n_buf)
17351c0b2f7Stbbdev {
17451c0b2f7Stbbdev curToken = 0;
17551c0b2f7Stbbdev }
17651c0b2f7Stbbdev
17751c0b2f7Stbbdev void Produce( const int );
17851c0b2f7Stbbdev void Consume( const int );
operator ()(const int tid)17951c0b2f7Stbbdev void operator()( const int tid ) {
18051c0b2f7Stbbdev if (ima == imaConsumer) {
18151c0b2f7Stbbdev Consume(tid);
18251c0b2f7Stbbdev } else {
18351c0b2f7Stbbdev Produce(tid);
18451c0b2f7Stbbdev }
18551c0b2f7Stbbdev }
18651c0b2f7Stbbdev }; // class FilterBase
18751c0b2f7Stbbdev
18851c0b2f7Stbbdev class ProduceConsumeBody {
18951c0b2f7Stbbdev FilterBase** my_filters;
19051c0b2f7Stbbdev public:
ProduceConsumeBody(FilterBase ** filters)19151c0b2f7Stbbdev ProduceConsumeBody( FilterBase** filters ) : my_filters(filters) {}
19251c0b2f7Stbbdev
operator ()(const int tid) const19351c0b2f7Stbbdev void operator()( const int tid ) const {
19451c0b2f7Stbbdev my_filters[tid]->operator()(tid);
19551c0b2f7Stbbdev }
19651c0b2f7Stbbdev }; // class ProduceConsumeBody
19751c0b2f7Stbbdev
19857f524caSIlya Isaev // send a bunch of non-null "tokens" to consumer, then a nullptr
Produce(const int)19951c0b2f7Stbbdev void FilterBase::Produce( const int ) {
20051c0b2f7Stbbdev nextBuffer[0] = 0; // just in case we provide no tokens
20151c0b2f7Stbbdev sBarrier.wait();
20251c0b2f7Stbbdev while(totTokens) {
20351c0b2f7Stbbdev while(!myTokens) {
20451c0b2f7Stbbdev my_sem.P();
20551c0b2f7Stbbdev }
20651c0b2f7Stbbdev // we have a slot available
20751c0b2f7Stbbdev --myTokens; // moving this down reduces spurious wakeups
20851c0b2f7Stbbdev --totTokens;
20951c0b2f7Stbbdev if (totTokens) {
21051c0b2f7Stbbdev nextBuffer[curToken & (MAX_TOKENS - 1)] = curToken * 3 + 1;
21151c0b2f7Stbbdev } else {
21251c0b2f7Stbbdev nextBuffer[curToken & (MAX_TOKENS - 1)] = 0;
21351c0b2f7Stbbdev }
21451c0b2f7Stbbdev ++curToken;
21551c0b2f7Stbbdev
21651c0b2f7Stbbdev utils::Sleep(myWait);
21751c0b2f7Stbbdev unsigned temp = ++otherTokens;
21851c0b2f7Stbbdev if (temp == 1) {
21951c0b2f7Stbbdev next_sem.V();
22051c0b2f7Stbbdev }
22151c0b2f7Stbbdev }
22251c0b2f7Stbbdev next_sem.V(); // final wakeup
22351c0b2f7Stbbdev }
22451c0b2f7Stbbdev
Consume(const int)22551c0b2f7Stbbdev void FilterBase::Consume( const int ) {
22651c0b2f7Stbbdev unsigned myToken;
22751c0b2f7Stbbdev sBarrier.wait();
22851c0b2f7Stbbdev do {
22951c0b2f7Stbbdev while( !myTokens ) {
23051c0b2f7Stbbdev my_sem.P();
23151c0b2f7Stbbdev }
23251c0b2f7Stbbdev // we have a slot available
23351c0b2f7Stbbdev --myTokens;
23451c0b2f7Stbbdev myToken = myBuffer[curToken & (MAX_TOKENS - 1)];
23551c0b2f7Stbbdev if (myToken) {
23651c0b2f7Stbbdev REQUIRE_MESSAGE(myToken == curToken * 3 + 1, "Error in received token");
23751c0b2f7Stbbdev ++curToken;
23851c0b2f7Stbbdev utils::Sleep(myWait);
23951c0b2f7Stbbdev unsigned temp = ++otherTokens;
24051c0b2f7Stbbdev if (temp == 1) {
24151c0b2f7Stbbdev next_sem.V();
24251c0b2f7Stbbdev }
24351c0b2f7Stbbdev }
24451c0b2f7Stbbdev } while(myToken);
24551c0b2f7Stbbdev // end of processing
24651c0b2f7Stbbdev REQUIRE_MESSAGE(curToken + 1 == totTokens, "Didn't receive enough tokens");
24751c0b2f7Stbbdev }
24851c0b2f7Stbbdev
24951c0b2f7Stbbdev // test of producer/consumer with atomic buffer cnt and semaphore
25051c0b2f7Stbbdev // nTokens are total number of tokens through the pipe
25151c0b2f7Stbbdev // pWait is the wait time for the producer
25251c0b2f7Stbbdev // cWait is the wait time for the consumer
test_producer_consumer(unsigned totTokens,unsigned nTokens,unsigned pWait,unsigned cWait)25351c0b2f7Stbbdev void test_producer_consumer( unsigned totTokens, unsigned nTokens, unsigned pWait, unsigned cWait ) {
25451c0b2f7Stbbdev semaphore p_sem;
25551c0b2f7Stbbdev semaphore c_sem;
25651c0b2f7Stbbdev std::atomic<unsigned> p_tokens;
25751c0b2f7Stbbdev std::atomic<unsigned> c_tokens(0);
25851c0b2f7Stbbdev
25951c0b2f7Stbbdev unsigned c_buffer[MAX_TOKENS];
26051c0b2f7Stbbdev FilterBase* my_filters[2]; // one producer, one concumer
26151c0b2f7Stbbdev
26251c0b2f7Stbbdev REQUIRE_MESSAGE(nTokens <= MAX_TOKENS, "Not enough slots for tokens");
26351c0b2f7Stbbdev
26451c0b2f7Stbbdev my_filters[0] = new FilterBase(imaProducer, totTokens, p_tokens, c_tokens, pWait, c_sem, p_sem, nullptr, &(c_buffer[0]));
26551c0b2f7Stbbdev my_filters[1] = new FilterBase(imaConsumer, totTokens, c_tokens, p_tokens, cWait, p_sem, c_sem, c_buffer, nullptr);
26651c0b2f7Stbbdev
26751c0b2f7Stbbdev p_tokens = nTokens;
26851c0b2f7Stbbdev ProduceConsumeBody body(my_filters);
26951c0b2f7Stbbdev sBarrier.initialize(2);
27051c0b2f7Stbbdev utils::NativeParallelFor(2, body);
27151c0b2f7Stbbdev delete my_filters[0];
27251c0b2f7Stbbdev delete my_filters[1];
27351c0b2f7Stbbdev }
27451c0b2f7Stbbdev
27551c0b2f7Stbbdev //! \brief \ref error_guessing
27651c0b2f7Stbbdev TEST_CASE("test binary semaphore") {
27751c0b2f7Stbbdev test_binary_semaphore(utils::MaxThread);
27851c0b2f7Stbbdev }
27951c0b2f7Stbbdev
28051c0b2f7Stbbdev //! \brief \ref error_guessing
28151c0b2f7Stbbdev TEST_CASE("test semaphore") {
28251c0b2f7Stbbdev for (int sem_size = 1; sem_size <= int(utils::MaxThread); ++sem_size) {
28351c0b2f7Stbbdev for (int ex_threads = 0; ex_threads <= int(utils::MaxThread) - sem_size; ++ex_threads) {
28451c0b2f7Stbbdev test_semaphore(sem_size, ex_threads);
28551c0b2f7Stbbdev }
28651c0b2f7Stbbdev }
28751c0b2f7Stbbdev }
28851c0b2f7Stbbdev
28951c0b2f7Stbbdev //! \brief \ref error_guessing
29051c0b2f7Stbbdev TEST_CASE("test producer-consumer") {
29151c0b2f7Stbbdev test_producer_consumer(10, 2, 5, 5);
29251c0b2f7Stbbdev test_producer_consumer(10, 2, 20, 5);
29351c0b2f7Stbbdev test_producer_consumer(10, 2, 5, 20);
29451c0b2f7Stbbdev
29551c0b2f7Stbbdev test_producer_consumer(10, 1, 5, 5);
29651c0b2f7Stbbdev test_producer_consumer(20, 10, 5, 20);
29751c0b2f7Stbbdev test_producer_consumer(64, 32, 1, 20);
29851c0b2f7Stbbdev }
299