1 /* 2 Copyright (c) 2005-2021 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 //! \file test_semaphore.cpp 18 //! \brief Test for [internal] functionality 19 20 #if _WIN32 || _WIN64 21 #define _CRT_SECURE_NO_WARNINGS 22 #endif 23 24 // Test for counting semaphore 25 #include "common/test.h" 26 #include "common/utils.h" 27 #include "common/spin_barrier.h" 28 #include "tbb/blocked_range.h" 29 #include "tbb/tick_count.h" 30 #include "../../src/tbb/semaphore.h" 31 #include <atomic> 32 #include <vector> 33 34 using tbb::detail::r1::semaphore; 35 36 std::atomic<int> pCount; 37 utils::SpinBarrier sBarrier; 38 39 // Semaphore basis function: 40 // set semaphore to initial value 41 // see that semaphore only allows that number of threads to be active 42 class Body : utils::NoAssign { 43 const int nIters; 44 semaphore& mySem; 45 std::vector<int>& ourCounts; 46 std::vector<double>& tottime; 47 48 static constexpr int tickCounts = 1; // millisecond 49 static constexpr int innerWait = 5; // millisecond 50 public: 51 Body( int nThread, int nIter, semaphore& sem, 52 std::vector<int>& our_counts, std::vector<double>& tot_time ) 53 : nIters(nIter), mySem(sem), ourCounts(our_counts), tottime(tot_time) 54 { 55 sBarrier.initialize(nThread); 56 pCount = 0; 57 } 58 59 void operator()( const int tid ) const { 60 sBarrier.wait(); 61 62 for (int i = 0; i < nIters; ++i) { 63 utils::Sleep(tid * tickCounts); 64 tbb::tick_count t0 = tbb::tick_count::now(); 65 mySem.P(); 66 tbb::tick_count t1 = tbb::tick_count::now(); 67 tottime[tid] += (t1 - t0).seconds(); 68 69 int curval = ++pCount; 70 if (curval > ourCounts[tid]) { 71 ourCounts[tid] = curval; 72 } 73 utils::Sleep(innerWait); 74 --pCount; 75 REQUIRE(int(pCount) >= 0); 76 mySem.V(); 77 } 78 } 79 }; // class Body 80 81 void test_semaphore( int sem_init_cnt, int extra_threads ) { 82 semaphore my_sem(sem_init_cnt); 83 int n_threads = sem_init_cnt + extra_threads; 84 85 std::vector<int> max_vals(n_threads); 86 std::vector<double> tot_times(n_threads); 87 88 int n_iters = 10; 89 Body body(n_threads, n_iters, my_sem, max_vals, tot_times); 90 91 pCount = 0; 92 utils::NativeParallelFor(n_threads, body); 93 REQUIRE_MESSAGE(!pCount, "not all threads decremented pCount"); 94 95 int max_count = -1; 96 for (auto item : max_vals) { 97 max_count = utils::max(max_count, item); 98 } 99 REQUIRE_MESSAGE(max_count <= sem_init_cnt, "Too many threads in semaphore-protected increment"); 100 } 101 102 #include "../../src/tbb/semaphore.cpp" 103 #if _WIN32 || _WIN64 104 #include "../../src/tbb/dynamic_link.cpp" 105 #endif 106 107 constexpr std::size_t N_TIMES = 1000; 108 109 template <typename S> 110 struct Counter { 111 std::atomic<long> value; 112 S my_sem; 113 Counter() : value(0) {} 114 }; // struct Counter 115 116 // Function object for use with parallel_for.h 117 template <typename C> 118 struct AddOne : utils::NoAssign { 119 C& my_counter; 120 121 // Increments counter once for each iteration in the iteration space 122 void operator()( int ) const { 123 for (std::size_t i = 0; i < N_TIMES; ++i) { 124 my_counter.my_sem.P(); 125 ++my_counter.value; 126 my_counter.my_sem.V(); 127 } 128 } 129 130 AddOne( C& c ) : my_counter(c) { 131 my_counter.my_sem.V(); 132 } 133 }; // struct AddOne 134 135 void test_binary_semaphore( int n_threads ) { 136 Counter<tbb::detail::r1::binary_semaphore> counter; 137 AddOne<decltype(counter)> AddOneBody(counter); 138 utils::NativeParallelFor(n_threads, AddOneBody); 139 REQUIRE_MESSAGE(n_threads * N_TIMES == counter.value, "Binary semaphore operations P()/V() have a race"); 140 } 141 142 // Power of 2, the most tokens that can be in flight 143 constexpr std::size_t MAX_TOKENS = 32; 144 enum FilterType { imaProducer, imaConsumer }; 145 146 class FilterBase : utils::NoAssign { 147 protected: 148 FilterType ima; 149 unsigned totTokens; // total number of tokens to be emitted, only used by producer 150 std::atomic<unsigned>& myTokens; 151 std::atomic<unsigned>& otherTokens; 152 153 unsigned myWait; 154 semaphore& my_sem; 155 semaphore& next_sem; 156 157 unsigned* myBuffer; 158 unsigned* nextBuffer; 159 unsigned curToken; 160 public: 161 FilterBase( FilterType filter, 162 unsigned tot_tokens, 163 std::atomic<unsigned>& my_tokens, 164 std::atomic<unsigned>& other_tokens, 165 unsigned my_wait, 166 semaphore& m_sem, 167 semaphore& n_sem, 168 unsigned* buf, 169 unsigned* n_buf ) 170 : ima(filter), totTokens(tot_tokens), myTokens(my_tokens), 171 otherTokens(other_tokens), myWait(my_wait), my_sem(m_sem), 172 next_sem(n_sem), myBuffer(buf), nextBuffer(n_buf) 173 { 174 curToken = 0; 175 } 176 177 void Produce( const int ); 178 void Consume( const int ); 179 void operator()( const int tid ) { 180 if (ima == imaConsumer) { 181 Consume(tid); 182 } else { 183 Produce(tid); 184 } 185 } 186 }; // class FilterBase 187 188 class ProduceConsumeBody { 189 FilterBase** my_filters; 190 public: 191 ProduceConsumeBody( FilterBase** filters ) : my_filters(filters) {} 192 193 void operator()( const int tid ) const { 194 my_filters[tid]->operator()(tid); 195 } 196 }; // class ProduceConsumeBody 197 198 // send a bunch of non-null "tokens" to consumer, then a NULL 199 void FilterBase::Produce( const int ) { 200 nextBuffer[0] = 0; // just in case we provide no tokens 201 sBarrier.wait(); 202 while(totTokens) { 203 while(!myTokens) { 204 my_sem.P(); 205 } 206 // we have a slot available 207 --myTokens; // moving this down reduces spurious wakeups 208 --totTokens; 209 if (totTokens) { 210 nextBuffer[curToken & (MAX_TOKENS - 1)] = curToken * 3 + 1; 211 } else { 212 nextBuffer[curToken & (MAX_TOKENS - 1)] = 0; 213 } 214 ++curToken; 215 216 utils::Sleep(myWait); 217 unsigned temp = ++otherTokens; 218 if (temp == 1) { 219 next_sem.V(); 220 } 221 } 222 next_sem.V(); // final wakeup 223 } 224 225 void FilterBase::Consume( const int ) { 226 unsigned myToken; 227 sBarrier.wait(); 228 do { 229 while( !myTokens ) { 230 my_sem.P(); 231 } 232 // we have a slot available 233 --myTokens; 234 myToken = myBuffer[curToken & (MAX_TOKENS - 1)]; 235 if (myToken) { 236 REQUIRE_MESSAGE(myToken == curToken * 3 + 1, "Error in received token"); 237 ++curToken; 238 utils::Sleep(myWait); 239 unsigned temp = ++otherTokens; 240 if (temp == 1) { 241 next_sem.V(); 242 } 243 } 244 } while(myToken); 245 // end of processing 246 REQUIRE_MESSAGE(curToken + 1 == totTokens, "Didn't receive enough tokens"); 247 } 248 249 // test of producer/consumer with atomic buffer cnt and semaphore 250 // nTokens are total number of tokens through the pipe 251 // pWait is the wait time for the producer 252 // cWait is the wait time for the consumer 253 void test_producer_consumer( unsigned totTokens, unsigned nTokens, unsigned pWait, unsigned cWait ) { 254 semaphore p_sem; 255 semaphore c_sem; 256 std::atomic<unsigned> p_tokens; 257 std::atomic<unsigned> c_tokens(0); 258 259 unsigned c_buffer[MAX_TOKENS]; 260 FilterBase* my_filters[2]; // one producer, one concumer 261 262 REQUIRE_MESSAGE(nTokens <= MAX_TOKENS, "Not enough slots for tokens"); 263 264 my_filters[0] = new FilterBase(imaProducer, totTokens, p_tokens, c_tokens, pWait, c_sem, p_sem, nullptr, &(c_buffer[0])); 265 my_filters[1] = new FilterBase(imaConsumer, totTokens, c_tokens, p_tokens, cWait, p_sem, c_sem, c_buffer, nullptr); 266 267 p_tokens = nTokens; 268 ProduceConsumeBody body(my_filters); 269 sBarrier.initialize(2); 270 utils::NativeParallelFor(2, body); 271 delete my_filters[0]; 272 delete my_filters[1]; 273 } 274 275 //! \brief \ref error_guessing 276 TEST_CASE("test binary semaphore") { 277 test_binary_semaphore(utils::MaxThread); 278 } 279 280 //! \brief \ref error_guessing 281 TEST_CASE("test semaphore") { 282 for (int sem_size = 1; sem_size <= int(utils::MaxThread); ++sem_size) { 283 for (int ex_threads = 0; ex_threads <= int(utils::MaxThread) - sem_size; ++ex_threads) { 284 test_semaphore(sem_size, ex_threads); 285 } 286 } 287 } 288 289 //! \brief \ref error_guessing 290 TEST_CASE("test producer-consumer") { 291 test_producer_consumer(10, 2, 5, 5); 292 test_producer_consumer(10, 2, 20, 5); 293 test_producer_consumer(10, 2, 5, 20); 294 295 test_producer_consumer(10, 1, 5, 5); 296 test_producer_consumer(20, 10, 5, 20); 297 test_producer_consumer(64, 32, 1, 20); 298 } 299