1 /* 2 Copyright (c) 2005-2021 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 //! \file test_semaphore.cpp 18 //! \brief Test for [internal] functionality 19 20 #if _WIN32 || _WIN64 21 #define _CRT_SECURE_NO_WARNINGS 22 #endif 23 24 // Test for counting semaphore 25 #include "common/test.h" 26 #include "common/utils.h" 27 #include "common/spin_barrier.h" 28 #include "tbb/blocked_range.h" 29 #include "tbb/tick_count.h" 30 #include "../../src/tbb/semaphore.h" 31 #include <atomic> 32 #include <vector> 33 34 using tbb::detail::r1::semaphore; 35 36 std::atomic<int> pCount; 37 utils::SpinBarrier sBarrier; 38 39 // Semaphore basis function: 40 // set semaphore to initial value 41 // see that semaphore only allows that number of threads to be active 42 class Body : utils::NoAssign { 43 const int nIters; 44 semaphore& mySem; 45 std::vector<int>& ourCounts; 46 std::vector<double>& tottime; 47 48 static constexpr int tickCounts = 1; // millisecond 49 static constexpr int innerWait = 5; // millisecond 50 public: 51 Body( int nThread, int nIter, semaphore& sem, 52 std::vector<int>& our_counts, std::vector<double>& tot_time ) 53 : nIters(nIter), mySem(sem), ourCounts(our_counts), tottime(tot_time) 54 { 55 sBarrier.initialize(nThread); 56 pCount = 0; 57 } 58 59 void operator()( const int tid ) const { 60 sBarrier.wait(); 61 62 for (int i = 0; i < nIters; ++i) { 63 utils::Sleep(tid * tickCounts); 64 tbb::tick_count t0 = tbb::tick_count::now(); 65 mySem.P(); 66 tbb::tick_count t1 = tbb::tick_count::now(); 67 tottime[tid] += (t1 - t0).seconds(); 68 69 int curval = ++pCount; 70 if (curval > ourCounts[tid]) { 71 ourCounts[tid] = curval; 72 } 73 utils::Sleep(innerWait); 74 --pCount; 75 REQUIRE(int(pCount) >= 0); 76 mySem.V(); 77 } 78 } 79 }; // class Body 80 81 void test_semaphore( int sem_init_cnt, int extra_threads ) { 82 semaphore my_sem(sem_init_cnt); 83 int n_threads = sem_init_cnt + extra_threads; 84 85 std::vector<int> max_vals(n_threads); 86 std::vector<double> tot_times(n_threads); 87 88 int n_iters = 10; 89 Body body(n_threads, n_iters, my_sem, max_vals, tot_times); 90 91 pCount = 0; 92 utils::NativeParallelFor(n_threads, body); 93 94 if (extra_threads == 0) { 95 double allPWaits = 0; 96 for (auto item : tot_times) { 97 allPWaits += item; 98 } 99 allPWaits /= static_cast<double>(n_threads * n_iters); 100 } 101 REQUIRE_MESSAGE(!pCount, "not all threads decremented pCount"); 102 103 int max_count = -1; 104 for (auto item : max_vals) { 105 max_count = utils::max(max_count, item); 106 } 107 REQUIRE_MESSAGE(max_count <= sem_init_cnt, "Too many threads in semaphore-protected increment"); 108 } 109 110 #include "../../src/tbb/semaphore.cpp" 111 #if _WIN32 || _WIN64 112 #include "../../src/tbb/dynamic_link.cpp" 113 #endif 114 115 constexpr std::size_t N_TIMES = 1000; 116 117 template <typename S> 118 struct Counter { 119 std::atomic<long> value; 120 S my_sem; 121 Counter() : value(0) {} 122 }; // struct Counter 123 124 // Function object for use with parallel_for.h 125 template <typename C> 126 struct AddOne : utils::NoAssign { 127 C& my_counter; 128 129 // Increments counter once for each iteration in the iteration space 130 void operator()( int ) const { 131 for (std::size_t i = 0; i < N_TIMES; ++i) { 132 my_counter.my_sem.P(); 133 ++my_counter.value; 134 my_counter.my_sem.V(); 135 } 136 } 137 138 AddOne( C& c ) : my_counter(c) { 139 my_counter.my_sem.V(); 140 } 141 }; // struct AddOne 142 143 void test_binary_semaphore( int n_threads ) { 144 Counter<tbb::detail::r1::binary_semaphore> counter; 145 AddOne<decltype(counter)> AddOneBody(counter); 146 utils::NativeParallelFor(n_threads, AddOneBody); 147 REQUIRE_MESSAGE(n_threads * N_TIMES == counter.value, "Binary semaphore operations P()/V() have a race"); 148 } 149 150 // Power of 2, the most tokens that can be in flight 151 constexpr std::size_t MAX_TOKENS = 32; 152 enum FilterType { imaProducer, imaConsumer }; 153 154 class FilterBase : utils::NoAssign { 155 protected: 156 FilterType ima; 157 unsigned totTokens; // total number of tokens to be emitted, only used by producer 158 std::atomic<unsigned>& myTokens; 159 std::atomic<unsigned>& otherTokens; 160 161 unsigned myWait; 162 semaphore& my_sem; 163 semaphore& next_sem; 164 165 unsigned* myBuffer; 166 unsigned* nextBuffer; 167 unsigned curToken; 168 public: 169 FilterBase( FilterType filter, 170 unsigned tot_tokens, 171 std::atomic<unsigned>& my_tokens, 172 std::atomic<unsigned>& other_tokens, 173 unsigned my_wait, 174 semaphore& m_sem, 175 semaphore& n_sem, 176 unsigned* buf, 177 unsigned* n_buf ) 178 : ima(filter), totTokens(tot_tokens), myTokens(my_tokens), 179 otherTokens(other_tokens), myWait(my_wait), my_sem(m_sem), 180 next_sem(n_sem), myBuffer(buf), nextBuffer(n_buf) 181 { 182 curToken = 0; 183 } 184 185 void Produce( const int ); 186 void Consume( const int ); 187 void operator()( const int tid ) { 188 if (ima == imaConsumer) { 189 Consume(tid); 190 } else { 191 Produce(tid); 192 } 193 } 194 }; // class FilterBase 195 196 class ProduceConsumeBody { 197 FilterBase** my_filters; 198 public: 199 ProduceConsumeBody( FilterBase** filters ) : my_filters(filters) {} 200 201 void operator()( const int tid ) const { 202 my_filters[tid]->operator()(tid); 203 } 204 }; // class ProduceConsumeBody 205 206 // send a bunch of non-null "tokens" to consumer, then a NULL 207 void FilterBase::Produce( const int ) { 208 nextBuffer[0] = 0; // just in case we provide no tokens 209 sBarrier.wait(); 210 while(totTokens) { 211 while(!myTokens) { 212 my_sem.P(); 213 } 214 // we have a slot available 215 --myTokens; // moving this down reduces spurious wakeups 216 --totTokens; 217 if (totTokens) { 218 nextBuffer[curToken & (MAX_TOKENS - 1)] = curToken * 3 + 1; 219 } else { 220 nextBuffer[curToken & (MAX_TOKENS - 1)] = 0; 221 } 222 ++curToken; 223 224 utils::Sleep(myWait); 225 unsigned temp = ++otherTokens; 226 if (temp == 1) { 227 next_sem.V(); 228 } 229 } 230 next_sem.V(); // final wakeup 231 } 232 233 void FilterBase::Consume( const int ) { 234 unsigned myToken; 235 sBarrier.wait(); 236 do { 237 while( !myTokens ) { 238 my_sem.P(); 239 } 240 // we have a slot available 241 --myTokens; 242 myToken = myBuffer[curToken & (MAX_TOKENS - 1)]; 243 if (myToken) { 244 REQUIRE_MESSAGE(myToken == curToken * 3 + 1, "Error in received token"); 245 ++curToken; 246 utils::Sleep(myWait); 247 unsigned temp = ++otherTokens; 248 if (temp == 1) { 249 next_sem.V(); 250 } 251 } 252 } while(myToken); 253 // end of processing 254 REQUIRE_MESSAGE(curToken + 1 == totTokens, "Didn't receive enough tokens"); 255 } 256 257 // test of producer/consumer with atomic buffer cnt and semaphore 258 // nTokens are total number of tokens through the pipe 259 // pWait is the wait time for the producer 260 // cWait is the wait time for the consumer 261 void test_producer_consumer( unsigned totTokens, unsigned nTokens, unsigned pWait, unsigned cWait ) { 262 semaphore p_sem; 263 semaphore c_sem; 264 std::atomic<unsigned> p_tokens; 265 std::atomic<unsigned> c_tokens(0); 266 267 unsigned c_buffer[MAX_TOKENS]; 268 FilterBase* my_filters[2]; // one producer, one concumer 269 270 REQUIRE_MESSAGE(nTokens <= MAX_TOKENS, "Not enough slots for tokens"); 271 272 my_filters[0] = new FilterBase(imaProducer, totTokens, p_tokens, c_tokens, pWait, c_sem, p_sem, nullptr, &(c_buffer[0])); 273 my_filters[1] = new FilterBase(imaConsumer, totTokens, c_tokens, p_tokens, cWait, p_sem, c_sem, c_buffer, nullptr); 274 275 p_tokens = nTokens; 276 ProduceConsumeBody body(my_filters); 277 sBarrier.initialize(2); 278 utils::NativeParallelFor(2, body); 279 delete my_filters[0]; 280 delete my_filters[1]; 281 } 282 283 //! \brief \ref error_guessing 284 TEST_CASE("test binary semaphore") { 285 test_binary_semaphore(utils::MaxThread); 286 } 287 288 //! \brief \ref error_guessing 289 TEST_CASE("test semaphore") { 290 for (int sem_size = 1; sem_size <= int(utils::MaxThread); ++sem_size) { 291 for (int ex_threads = 0; ex_threads <= int(utils::MaxThread) - sem_size; ++ex_threads) { 292 test_semaphore(sem_size, ex_threads); 293 } 294 } 295 } 296 297 //! \brief \ref error_guessing 298 TEST_CASE("test producer-consumer") { 299 test_producer_consumer(10, 2, 5, 5); 300 test_producer_consumer(10, 2, 20, 5); 301 test_producer_consumer(10, 2, 5, 20); 302 303 test_producer_consumer(10, 1, 5, 5); 304 test_producer_consumer(20, 10, 5, 20); 305 test_producer_consumer(64, 32, 1, 20); 306 } 307