1 /*
2 Copyright (c) 2005-2022 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16
17 //! \file test_semaphore.cpp
18 //! \brief Test for [internal] functionality
19
20 #if _WIN32 || _WIN64
21 #define _CRT_SECURE_NO_WARNINGS
22 #endif
23
24 // Test for counting semaphore
25 #include "common/test.h"
26 #include "common/utils.h"
27 #include "common/spin_barrier.h"
28 #include "tbb/blocked_range.h"
29 #include "tbb/tick_count.h"
30 #include "../../src/tbb/semaphore.h"
31 #include <atomic>
32 #include <vector>
33
34 using tbb::detail::r1::semaphore;
35
36 std::atomic<int> pCount;
37 utils::SpinBarrier sBarrier;
38
39 // Semaphore basis function:
40 // set semaphore to initial value
41 // see that semaphore only allows that number of threads to be active
42 class Body : utils::NoAssign {
43 const int nIters;
44 semaphore& mySem;
45 std::vector<int>& ourCounts;
46 std::vector<double>& tottime;
47
48 static constexpr int tickCounts = 1; // millisecond
49 static constexpr int innerWait = 5; // millisecond
50 public:
Body(int nThread,int nIter,semaphore & sem,std::vector<int> & our_counts,std::vector<double> & tot_time)51 Body( int nThread, int nIter, semaphore& sem,
52 std::vector<int>& our_counts, std::vector<double>& tot_time )
53 : nIters(nIter), mySem(sem), ourCounts(our_counts), tottime(tot_time)
54 {
55 sBarrier.initialize(nThread);
56 pCount = 0;
57 }
58
operator ()(const int tid) const59 void operator()( const int tid ) const {
60 sBarrier.wait();
61
62 for (int i = 0; i < nIters; ++i) {
63 utils::Sleep(tid * tickCounts);
64 tbb::tick_count t0 = tbb::tick_count::now();
65 mySem.P();
66 tbb::tick_count t1 = tbb::tick_count::now();
67 tottime[tid] += (t1 - t0).seconds();
68
69 int curval = ++pCount;
70 if (curval > ourCounts[tid]) {
71 ourCounts[tid] = curval;
72 }
73 utils::Sleep(innerWait);
74 --pCount;
75 REQUIRE(int(pCount) >= 0);
76 mySem.V();
77 }
78 }
79 }; // class Body
80
test_semaphore(int sem_init_cnt,int extra_threads)81 void test_semaphore( int sem_init_cnt, int extra_threads ) {
82 semaphore my_sem(sem_init_cnt);
83 int n_threads = sem_init_cnt + extra_threads;
84
85 std::vector<int> max_vals(n_threads);
86 std::vector<double> tot_times(n_threads);
87
88 int n_iters = 10;
89 Body body(n_threads, n_iters, my_sem, max_vals, tot_times);
90
91 pCount = 0;
92 utils::NativeParallelFor(n_threads, body);
93 REQUIRE_MESSAGE(!pCount, "not all threads decremented pCount");
94
95 int max_count = -1;
96 for (auto item : max_vals) {
97 max_count = utils::max(max_count, item);
98 }
99 REQUIRE_MESSAGE(max_count <= sem_init_cnt, "Too many threads in semaphore-protected increment");
100 }
101
102 #include "../../src/tbb/semaphore.cpp"
103 #if _WIN32 || _WIN64
104 #include "../../src/tbb/dynamic_link.cpp"
105 #endif
106
107 constexpr std::size_t N_TIMES = 1000;
108
109 template <typename S>
110 struct Counter {
111 std::atomic<long> value;
112 S my_sem;
CounterCounter113 Counter() : value(0) {}
114 }; // struct Counter
115
116 // Function object for use with parallel_for.h
117 template <typename C>
118 struct AddOne : utils::NoAssign {
119 C& my_counter;
120
121 // Increments counter once for each iteration in the iteration space
operator ()AddOne122 void operator()( int ) const {
123 for (std::size_t i = 0; i < N_TIMES; ++i) {
124 my_counter.my_sem.P();
125 ++my_counter.value;
126 my_counter.my_sem.V();
127 }
128 }
129
AddOneAddOne130 AddOne( C& c ) : my_counter(c) {
131 my_counter.my_sem.V();
132 }
133 }; // struct AddOne
134
test_binary_semaphore(int n_threads)135 void test_binary_semaphore( int n_threads ) {
136 Counter<tbb::detail::r1::binary_semaphore> counter;
137 AddOne<decltype(counter)> AddOneBody(counter);
138 utils::NativeParallelFor(n_threads, AddOneBody);
139 REQUIRE_MESSAGE(n_threads * N_TIMES == counter.value, "Binary semaphore operations P()/V() have a race");
140 }
141
142 // Power of 2, the most tokens that can be in flight
143 constexpr std::size_t MAX_TOKENS = 32;
144 enum FilterType { imaProducer, imaConsumer };
145
146 class FilterBase : utils::NoAssign {
147 protected:
148 FilterType ima;
149 unsigned totTokens; // total number of tokens to be emitted, only used by producer
150 std::atomic<unsigned>& myTokens;
151 std::atomic<unsigned>& otherTokens;
152
153 unsigned myWait;
154 semaphore& my_sem;
155 semaphore& next_sem;
156
157 unsigned* myBuffer;
158 unsigned* nextBuffer;
159 unsigned curToken;
160 public:
FilterBase(FilterType filter,unsigned tot_tokens,std::atomic<unsigned> & my_tokens,std::atomic<unsigned> & other_tokens,unsigned my_wait,semaphore & m_sem,semaphore & n_sem,unsigned * buf,unsigned * n_buf)161 FilterBase( FilterType filter,
162 unsigned tot_tokens,
163 std::atomic<unsigned>& my_tokens,
164 std::atomic<unsigned>& other_tokens,
165 unsigned my_wait,
166 semaphore& m_sem,
167 semaphore& n_sem,
168 unsigned* buf,
169 unsigned* n_buf )
170 : ima(filter), totTokens(tot_tokens), myTokens(my_tokens),
171 otherTokens(other_tokens), myWait(my_wait), my_sem(m_sem),
172 next_sem(n_sem), myBuffer(buf), nextBuffer(n_buf)
173 {
174 curToken = 0;
175 }
176
177 void Produce( const int );
178 void Consume( const int );
operator ()(const int tid)179 void operator()( const int tid ) {
180 if (ima == imaConsumer) {
181 Consume(tid);
182 } else {
183 Produce(tid);
184 }
185 }
186 }; // class FilterBase
187
188 class ProduceConsumeBody {
189 FilterBase** my_filters;
190 public:
ProduceConsumeBody(FilterBase ** filters)191 ProduceConsumeBody( FilterBase** filters ) : my_filters(filters) {}
192
operator ()(const int tid) const193 void operator()( const int tid ) const {
194 my_filters[tid]->operator()(tid);
195 }
196 }; // class ProduceConsumeBody
197
198 // send a bunch of non-null "tokens" to consumer, then a nullptr
Produce(const int)199 void FilterBase::Produce( const int ) {
200 nextBuffer[0] = 0; // just in case we provide no tokens
201 sBarrier.wait();
202 while(totTokens) {
203 while(!myTokens) {
204 my_sem.P();
205 }
206 // we have a slot available
207 --myTokens; // moving this down reduces spurious wakeups
208 --totTokens;
209 if (totTokens) {
210 nextBuffer[curToken & (MAX_TOKENS - 1)] = curToken * 3 + 1;
211 } else {
212 nextBuffer[curToken & (MAX_TOKENS - 1)] = 0;
213 }
214 ++curToken;
215
216 utils::Sleep(myWait);
217 unsigned temp = ++otherTokens;
218 if (temp == 1) {
219 next_sem.V();
220 }
221 }
222 next_sem.V(); // final wakeup
223 }
224
Consume(const int)225 void FilterBase::Consume( const int ) {
226 unsigned myToken;
227 sBarrier.wait();
228 do {
229 while( !myTokens ) {
230 my_sem.P();
231 }
232 // we have a slot available
233 --myTokens;
234 myToken = myBuffer[curToken & (MAX_TOKENS - 1)];
235 if (myToken) {
236 REQUIRE_MESSAGE(myToken == curToken * 3 + 1, "Error in received token");
237 ++curToken;
238 utils::Sleep(myWait);
239 unsigned temp = ++otherTokens;
240 if (temp == 1) {
241 next_sem.V();
242 }
243 }
244 } while(myToken);
245 // end of processing
246 REQUIRE_MESSAGE(curToken + 1 == totTokens, "Didn't receive enough tokens");
247 }
248
249 // test of producer/consumer with atomic buffer cnt and semaphore
250 // nTokens are total number of tokens through the pipe
251 // pWait is the wait time for the producer
252 // cWait is the wait time for the consumer
test_producer_consumer(unsigned totTokens,unsigned nTokens,unsigned pWait,unsigned cWait)253 void test_producer_consumer( unsigned totTokens, unsigned nTokens, unsigned pWait, unsigned cWait ) {
254 semaphore p_sem;
255 semaphore c_sem;
256 std::atomic<unsigned> p_tokens;
257 std::atomic<unsigned> c_tokens(0);
258
259 unsigned c_buffer[MAX_TOKENS];
260 FilterBase* my_filters[2]; // one producer, one concumer
261
262 REQUIRE_MESSAGE(nTokens <= MAX_TOKENS, "Not enough slots for tokens");
263
264 my_filters[0] = new FilterBase(imaProducer, totTokens, p_tokens, c_tokens, pWait, c_sem, p_sem, nullptr, &(c_buffer[0]));
265 my_filters[1] = new FilterBase(imaConsumer, totTokens, c_tokens, p_tokens, cWait, p_sem, c_sem, c_buffer, nullptr);
266
267 p_tokens = nTokens;
268 ProduceConsumeBody body(my_filters);
269 sBarrier.initialize(2);
270 utils::NativeParallelFor(2, body);
271 delete my_filters[0];
272 delete my_filters[1];
273 }
274
275 //! \brief \ref error_guessing
276 TEST_CASE("test binary semaphore") {
277 test_binary_semaphore(utils::MaxThread);
278 }
279
280 //! \brief \ref error_guessing
281 TEST_CASE("test semaphore") {
282 for (int sem_size = 1; sem_size <= int(utils::MaxThread); ++sem_size) {
283 for (int ex_threads = 0; ex_threads <= int(utils::MaxThread) - sem_size; ++ex_threads) {
284 test_semaphore(sem_size, ex_threads);
285 }
286 }
287 }
288
289 //! \brief \ref error_guessing
290 TEST_CASE("test producer-consumer") {
291 test_producer_consumer(10, 2, 5, 5);
292 test_producer_consumer(10, 2, 20, 5);
293 test_producer_consumer(10, 2, 5, 20);
294
295 test_producer_consumer(10, 1, 5, 5);
296 test_producer_consumer(20, 10, 5, 20);
297 test_producer_consumer(64, 32, 1, 20);
298 }
299