xref: /oneTBB/test/tbb/test_concurrent_queue.cpp (revision 2eccd5f9)
1 /*
2     Copyright (c) 2005-2022 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #include <common/test.h>
18 #include <common/utils.h>
19 #include <common/vector_types.h>
20 #include <common/custom_allocators.h>
21 
22 #include <tbb/concurrent_queue.h>
23 #include <unordered_set>
24 
25 //! \file test_concurrent_queue.cpp
26 //! \brief Test for [containers.concurrent_queue containers.concurrent_bounded_queue] specification
27 
28 static constexpr std::size_t MaxThread = 4;
29 
30 template<typename CQ, typename T>
31 struct TestQueueElements {
32     CQ& queue;
33     const std::size_t nthread;
34     TestQueueElements( CQ& q, std::size_t n ) : queue(q), nthread(n) {}
35     void operator()( std::size_t k ) const {
36         for (std::size_t i=0; i < 1000; ++i) {
37             if( (i&0x1)==0 ) {
38                 CHECK(T(k) < T(nthread));
39                 queue.push(T(k));
40             } else {
41                 // Pop item from queue
42                 T item = 0;
43                 queue.try_pop(item);
44                 CHECK(item <= T(nthread));
45             }
46         }
47     }
48 };
49 
50 //! Test concurrent queue with primitive data type
51 template<typename CQ, typename T>
52 void TestPrimitiveTypes(std::size_t nthread, T exemplar) {
53     CQ queue;
54     for (std::size_t i = 0; i < 100; ++i) {
55         queue.push(exemplar);
56     }
57     utils::NativeParallelFor(nthread, TestQueueElements<CQ, T>(queue, nthread));
58 }
59 
60 void TestQueueWorksWithPrimitiveTypes() {
61     TestPrimitiveTypes<tbb::concurrent_queue<char>, char>(MaxThread, (char)1);
62     TestPrimitiveTypes<tbb::concurrent_queue<int>, int>(MaxThread, (int)-12);
63     TestPrimitiveTypes<tbb::concurrent_queue<float>, float>(MaxThread, (float)-1.2f);
64     TestPrimitiveTypes<tbb::concurrent_queue<double>, double>(MaxThread, (double)-4.3);
65     TestPrimitiveTypes<tbb::concurrent_bounded_queue<char>, char>(MaxThread, (char)1);
66     TestPrimitiveTypes<tbb::concurrent_bounded_queue<int>, int>(MaxThread, (int)-12);
67     TestPrimitiveTypes<tbb::concurrent_bounded_queue<float>, float>(MaxThread, (float)-1.2f);
68     TestPrimitiveTypes<tbb::concurrent_bounded_queue<double>, double>(MaxThread, (double)-4.3);
69 }
70 
71 #if HAVE_m128 || HAVE_m256
72 //! Test concurrent queue with vector types
73 /** Type Queue should be a queue of ClassWithSSE/ClassWithAVX. */
74 template<typename ClassWithVectorType, typename Queue>
75 void TestVectorTypes() {
76     Queue q1;
77     for (int i = 0; i < 100; ++i) {
78         // VC8 does not properly align a temporary value; to work around, use explicit variable
79         ClassWithVectorType bar(i);
80         q1.push(bar);
81     }
82 
83     // Copy the queue
84     Queue q2 = q1;
85     // Check that elements of the copy are correct
86     typename Queue::const_iterator ci = q2.unsafe_begin();
87     for (int i=0; i < 100; ++i ) {
88         CHECK((ci != q2.unsafe_end()));
89         ClassWithVectorType foo = *ci;
90         ClassWithVectorType bar(i);
91         CHECK((*ci == bar));
92         ++ci;
93     }
94 
95     for (int i = 0; i < 101; ++i) {
96         ClassWithVectorType tmp;
97         bool b = q1.try_pop(tmp);
98         CHECK((b == (i < 100)));
99         ClassWithVectorType bar(i);
100         CHECK((!b || tmp==bar));
101     }
102 }
103 #endif /* HAVE_m128 || HAVE_m256 */
104 
105 void TestQueueWorksWithSSE() {
106 
107 #if HAVE_m128
108     TestVectorTypes<ClassWithSSE, tbb::concurrent_queue<ClassWithSSE> >();
109     TestVectorTypes<ClassWithSSE, tbb::concurrent_bounded_queue<ClassWithSSE> >();
110 #endif /* HAVE_m128 */
111 #if HAVE_m256
112     if( have_AVX() ) {
113         TestVectorTypes<ClassWithAVX, tbb::concurrent_queue<ClassWithAVX> >();
114         TestVectorTypes<ClassWithAVX, tbb::concurrent_bounded_queue<ClassWithAVX> >();
115     }
116 #endif /* HAVE_m256 */
117 }
118 #if TBB_USE_EXCEPTIONS
119     int rnd_elem = -1;
120     int global_counter = -1;
121 
122 struct throw_element {
123     throw_element() = default;
124     throw_element(const throw_element&) {
125         if (global_counter++ == rnd_elem) {
126             throw std::exception{};
127         }
128     }
129 
130     throw_element& operator= (const throw_element&) = default;
131 };
132 
133 template <typename Queue>
134 void CopyWithThrowElement() {
135     utils::FastRandom<> rnd(42);
136 
137     Queue source;
138 
139     constexpr size_t queue_size = 100000;
140     for (std::size_t i = 0; i < queue_size; ++i) {
141         source.emplace();
142     }
143 
144     for (std::size_t i = 0; i < 100; ++i) {
145         global_counter = 0;
146         rnd_elem = rnd.get() % queue_size;
147 
148         REQUIRE_THROWS_AS( [&] {
149             Queue copy(source);
150             utils::suppress_unused_warning(copy);
151         }(), std::exception);
152     }
153 }
154 #endif // TBB_USE_EXCEPTIONS
155 
156 //! Test work with different fypes
157 //! \brief \ref error_guessing
158 TEST_CASE("testing work with different fypes") {
159     TestQueueWorksWithPrimitiveTypes();
160 }
161 
162 //! Test work with vector types
163 //! \brief \ref error_guessing
164 TEST_CASE("testing vector types") {
165     TestQueueWorksWithSSE();
166 }
167 
168 #if TBB_USE_EXCEPTIONS
169 //! \brief \ref regression \ref error_guessing
170 TEST_CASE("Test exception in allocation") {
171     using allocator_type = StaticSharedCountingAllocator<std::allocator<int>>;
172     using queue_type = tbb::concurrent_queue<int, allocator_type>;
173 
174     queue_type src_queue;
175     for (int i = 0; i < 100000; ++i) {
176         src_queue.push(i);
177     }
178 
179     allocator_type::set_limits(1);
180 
181     REQUIRE_THROWS_AS( [] {
182         queue_type queue1;
183         queue1.push(1);
184     }(), const std::bad_alloc);
185 
186     for (std::size_t i = 1; i < 1000; ++i) {
187         allocator_type::init_counters();
188         allocator_type::set_limits(1);
189         REQUIRE_THROWS_AS( [&] {
190             queue_type queue2(src_queue);
191             utils::suppress_unused_warning(queue2);
192         }(), const std::bad_alloc);
193     }
194 }
195 
196 //! \brief \ref regression \ref error_guessing
197 TEST_CASE("Test exception in allocation") {
198     CopyWithThrowElement<tbb::concurrent_queue<throw_element>>();
199     CopyWithThrowElement<tbb::concurrent_bounded_queue<throw_element>>();
200 }
201 
202 #endif // TBB_USE_EXCEPTIONS
203 
204 struct TrackableItem {
205     static std::unordered_set<TrackableItem*> object_addresses;
206 #if TBB_USE_EXCEPTIONS
207     static std::size_t global_count_for_exceptions;
208 #endif
209 
210     TrackableItem() {
211 #if TBB_USE_EXCEPTIONS
212         if (global_count_for_exceptions++ % 3 == 0) throw 1;
213 #endif
214         bool res = object_addresses.emplace(this).second;
215         CHECK(res);
216     }
217 
218     ~TrackableItem() {
219         auto it = object_addresses.find(this);
220         CHECK(it != object_addresses.end());
221         object_addresses.erase(it);
222     }
223 };
224 
225 template <typename Container>
226 void fill_and_catch(Container& q, std::size_t elements_count) {
227     CHECK(TrackableItem::object_addresses.size() == 0);
228     for (std::size_t i = 0; i < elements_count; ++i) {
229 #if TBB_USE_EXCEPTIONS
230         try {
231 #endif
232             q.emplace();
233 #if TBB_USE_EXCEPTIONS
234         } catch (int exception) {
235             CHECK(exception == 1);
236         }
237 #endif
238     }
239 #if TBB_USE_EXCEPTIONS
240     CHECK(TrackableItem::object_addresses.size() == 2 * elements_count / 3);
241 #else
242     CHECK(TrackableItem::object_addresses.size() == elements_count);
243 #endif
244 }
245 
246 std::unordered_set<TrackableItem*> TrackableItem::object_addresses;
247 #if TBB_USE_EXCEPTIONS
248 std::size_t TrackableItem::global_count_for_exceptions = 0;
249 #endif
250 
251 template <typename Container>
252 void test_tracking_dtors_on_clear() {
253     static_assert(std::is_same<typename Container::value_type, TrackableItem>::value, "Incorrect test setup");
254     const std::size_t elements_count = 100000;
255     {
256         Container q;
257         fill_and_catch(q, elements_count);
258 
259         q.clear();
260 
261         CHECK(q.empty());
262         CHECK(TrackableItem::object_addresses.empty());
263 #if TBB_USE_EXCEPTIONS
264         TrackableItem::global_count_for_exceptions = 0;
265 #endif
266     }
267     {
268         {
269             Container q;
270             fill_and_catch(q, elements_count);
271         } // Dtor of q would be called here
272         CHECK(TrackableItem::object_addresses.empty());
273 #if TBB_USE_EXCEPTIONS
274         TrackableItem::global_count_for_exceptions = 0;
275 #endif
276     }
277 }
278 
279 //! \brief \ref regression \ref error_guessing
280 TEST_CASE("Test clear and dtor with TrackableItem") {
281     test_tracking_dtors_on_clear<oneapi::tbb::concurrent_queue<TrackableItem>>();
282     test_tracking_dtors_on_clear<oneapi::tbb::concurrent_bounded_queue<TrackableItem>>();
283 }
284