xref: /oneTBB/test/tbb/test_semaphore.cpp (revision 7196bb4f)
1 /*
2     Copyright (c) 2005-2022 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 //! \file test_semaphore.cpp
18 //! \brief Test for [internal] functionality
19 
20 #if _WIN32 || _WIN64
21 #define _CRT_SECURE_NO_WARNINGS
22 #endif
23 
24 // Test for counting semaphore
25 #include "common/test.h"
26 #include "common/utils.h"
27 #include "common/spin_barrier.h"
28 #include "tbb/blocked_range.h"
29 #include "tbb/tick_count.h"
30 #include "../../src/tbb/semaphore.h"
31 #include <atomic>
32 #include <vector>
33 
34 using tbb::detail::r1::semaphore;
35 
36 std::atomic<int> pCount;
37 utils::SpinBarrier sBarrier;
38 
39 // Semaphore basis function:
40 //  set semaphore to initial value
41 // see that semaphore only allows that number of threads to be active
42 class Body : utils::NoAssign {
43     const int nIters;
44     semaphore& mySem;
45     std::vector<int>& ourCounts;
46     std::vector<double>& tottime;
47 
48     static constexpr int tickCounts = 1; // millisecond
49     static constexpr int innerWait = 5; // millisecond
50 public:
51     Body( int nThread, int nIter, semaphore& sem,
52           std::vector<int>& our_counts, std::vector<double>& tot_time )
53         : nIters(nIter), mySem(sem), ourCounts(our_counts), tottime(tot_time)
54     {
55         sBarrier.initialize(nThread);
56         pCount = 0;
57     }
58 
59     void operator()( const int tid ) const {
60         sBarrier.wait();
61 
62         for (int i = 0; i < nIters; ++i) {
63             utils::Sleep(tid * tickCounts);
64             tbb::tick_count t0 = tbb::tick_count::now();
65             mySem.P();
66             tbb::tick_count t1 = tbb::tick_count::now();
67             tottime[tid] += (t1 - t0).seconds();
68 
69             int curval = ++pCount;
70             if (curval > ourCounts[tid]) {
71                 ourCounts[tid] = curval;
72             }
73             utils::Sleep(innerWait);
74             --pCount;
75             REQUIRE(int(pCount) >= 0);
76             mySem.V();
77         }
78     }
79 }; // class Body
80 
81 void test_semaphore( int sem_init_cnt, int extra_threads ) {
82     semaphore my_sem(sem_init_cnt);
83     int n_threads = sem_init_cnt + extra_threads;
84 
85     std::vector<int> max_vals(n_threads);
86     std::vector<double> tot_times(n_threads);
87 
88     int n_iters = 10;
89     Body body(n_threads, n_iters, my_sem, max_vals, tot_times);
90 
91     pCount = 0;
92     utils::NativeParallelFor(n_threads, body);
93     REQUIRE_MESSAGE(!pCount, "not all threads decremented pCount");
94 
95     int max_count = -1;
96     for (auto item : max_vals) {
97         max_count = utils::max(max_count, item);
98     }
99     REQUIRE_MESSAGE(max_count <= sem_init_cnt, "Too many threads in semaphore-protected increment");
100 }
101 
102 #include "../../src/tbb/semaphore.cpp"
103 #if _WIN32 || _WIN64
104 #include "../../src/tbb/dynamic_link.cpp"
105 #endif
106 
107 constexpr std::size_t N_TIMES = 1000;
108 
109 template <typename S>
110 struct Counter {
111     std::atomic<long> value;
112     S my_sem;
113     Counter() : value(0) {}
114 }; // struct Counter
115 
116 // Function object for use with parallel_for.h
117 template <typename C>
118 struct AddOne : utils::NoAssign {
119     C& my_counter;
120 
121     // Increments counter once for each iteration in the iteration space
122     void operator()( int ) const {
123         for (std::size_t i = 0; i < N_TIMES; ++i) {
124             my_counter.my_sem.P();
125             ++my_counter.value;
126             my_counter.my_sem.V();
127         }
128     }
129 
130     AddOne( C& c ) : my_counter(c) {
131         my_counter.my_sem.V();
132     }
133 }; // struct AddOne
134 
135 void test_binary_semaphore( int n_threads ) {
136     Counter<tbb::detail::r1::binary_semaphore> counter;
137     AddOne<decltype(counter)> AddOneBody(counter);
138     utils::NativeParallelFor(n_threads, AddOneBody);
139     REQUIRE_MESSAGE(n_threads * N_TIMES == counter.value, "Binary semaphore operations P()/V() have a race");
140 }
141 
142 // Power of 2, the most tokens that can be in flight
143 constexpr std::size_t MAX_TOKENS = 32;
144 enum FilterType { imaProducer, imaConsumer };
145 
146 class FilterBase : utils::NoAssign {
147 protected:
148     FilterType ima;
149     unsigned totTokens; // total number of tokens to be emitted, only used by producer
150     std::atomic<unsigned>& myTokens;
151     std::atomic<unsigned>& otherTokens;
152 
153     unsigned myWait;
154     semaphore& my_sem;
155     semaphore& next_sem;
156 
157     unsigned* myBuffer;
158     unsigned* nextBuffer;
159     unsigned curToken;
160 public:
161     FilterBase( FilterType filter,
162                 unsigned tot_tokens,
163                 std::atomic<unsigned>& my_tokens,
164                 std::atomic<unsigned>& other_tokens,
165                 unsigned my_wait,
166                 semaphore& m_sem,
167                 semaphore& n_sem,
168                 unsigned* buf,
169                 unsigned* n_buf )
170         : ima(filter), totTokens(tot_tokens), myTokens(my_tokens),
171           otherTokens(other_tokens), myWait(my_wait), my_sem(m_sem),
172           next_sem(n_sem), myBuffer(buf), nextBuffer(n_buf)
173     {
174         curToken = 0;
175     }
176 
177     void Produce( const int );
178     void Consume( const int );
179     void operator()( const int tid ) {
180         if (ima == imaConsumer) {
181             Consume(tid);
182         } else {
183             Produce(tid);
184         }
185     }
186 }; // class FilterBase
187 
188 class ProduceConsumeBody {
189     FilterBase** my_filters;
190 public:
191     ProduceConsumeBody( FilterBase** filters ) : my_filters(filters) {}
192 
193     void operator()( const int tid ) const {
194         my_filters[tid]->operator()(tid);
195     }
196 }; // class ProduceConsumeBody
197 
198 // send a bunch of non-null "tokens" to consumer, then a nullptr
199 void FilterBase::Produce( const int ) {
200     nextBuffer[0] = 0; // just in case we provide no tokens
201     sBarrier.wait();
202     while(totTokens) {
203         while(!myTokens) {
204             my_sem.P();
205         }
206         // we have a slot available
207         --myTokens; // moving this down reduces spurious wakeups
208         --totTokens;
209         if (totTokens) {
210             nextBuffer[curToken & (MAX_TOKENS - 1)] = curToken * 3 + 1;
211         } else {
212             nextBuffer[curToken & (MAX_TOKENS - 1)] = 0;
213         }
214         ++curToken;
215 
216         utils::Sleep(myWait);
217         unsigned temp = ++otherTokens;
218         if (temp == 1) {
219             next_sem.V();
220         }
221     }
222     next_sem.V(); // final wakeup
223 }
224 
225 void FilterBase::Consume( const int ) {
226     unsigned myToken;
227     sBarrier.wait();
228     do {
229         while( !myTokens ) {
230             my_sem.P();
231         }
232         // we have a slot available
233         --myTokens;
234         myToken = myBuffer[curToken & (MAX_TOKENS - 1)];
235         if (myToken) {
236             REQUIRE_MESSAGE(myToken == curToken * 3 + 1, "Error in received token");
237             ++curToken;
238             utils::Sleep(myWait);
239             unsigned temp = ++otherTokens;
240             if (temp == 1) {
241                 next_sem.V();
242             }
243         }
244     } while(myToken);
245     // end of processing
246     REQUIRE_MESSAGE(curToken + 1 == totTokens, "Didn't receive enough tokens");
247 }
248 
249 // test of producer/consumer with atomic buffer cnt and semaphore
250 // nTokens are total number of tokens through the pipe
251 // pWait is the wait time for the producer
252 // cWait is the wait time for the consumer
253 void test_producer_consumer( unsigned totTokens, unsigned nTokens, unsigned pWait, unsigned cWait ) {
254     semaphore p_sem;
255     semaphore c_sem;
256     std::atomic<unsigned> p_tokens;
257     std::atomic<unsigned> c_tokens(0);
258 
259     unsigned c_buffer[MAX_TOKENS];
260     FilterBase* my_filters[2]; // one producer, one concumer
261 
262     REQUIRE_MESSAGE(nTokens <= MAX_TOKENS, "Not enough slots for tokens");
263 
264     my_filters[0] = new FilterBase(imaProducer, totTokens, p_tokens, c_tokens, pWait, c_sem, p_sem, nullptr, &(c_buffer[0]));
265     my_filters[1] = new FilterBase(imaConsumer, totTokens, c_tokens, p_tokens, cWait, p_sem, c_sem, c_buffer, nullptr);
266 
267     p_tokens = nTokens;
268     ProduceConsumeBody body(my_filters);
269     sBarrier.initialize(2);
270     utils::NativeParallelFor(2, body);
271     delete my_filters[0];
272     delete my_filters[1];
273 }
274 
275 //! \brief \ref error_guessing
276 TEST_CASE("test binary semaphore") {
277     test_binary_semaphore(utils::MaxThread);
278 }
279 
280 //! \brief \ref error_guessing
281 TEST_CASE("test semaphore") {
282     for (int sem_size = 1; sem_size <= int(utils::MaxThread); ++sem_size) {
283         for (int ex_threads = 0; ex_threads <= int(utils::MaxThread) - sem_size; ++ex_threads) {
284             test_semaphore(sem_size, ex_threads);
285         }
286     }
287 }
288 
289 //! \brief \ref error_guessing
290 TEST_CASE("test producer-consumer") {
291     test_producer_consumer(10, 2, 5, 5);
292     test_producer_consumer(10, 2, 20, 5);
293     test_producer_consumer(10, 2, 5, 20);
294 
295     test_producer_consumer(10, 1, 5, 5);
296     test_producer_consumer(20, 10, 5, 20);
297     test_producer_consumer(64, 32, 1, 20);
298 }
299