1 /*
2     Copyright (c) 2005-2022 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #ifndef __TBB_concurrent_queue_H
18 #define __TBB_concurrent_queue_H
19 
20 #include "detail/_namespace_injection.h"
21 #include "detail/_concurrent_queue_base.h"
22 #include "detail/_allocator_traits.h"
23 #include "detail/_exception.h"
24 #include "detail/_containers_helpers.h"
25 #include "cache_aligned_allocator.h"
26 
27 namespace tbb {
28 namespace detail {
29 namespace d2 {
30 
31 // A high-performance thread-safe non-blocking concurrent queue.
32 // Multiple threads may each push and pop concurrently.
33 // Assignment construction is not allowed.
34 template <typename T, typename Allocator = tbb::cache_aligned_allocator<T>>
35 class concurrent_queue {
36     using allocator_traits_type = tbb::detail::allocator_traits<Allocator>;
37     using queue_representation_type = concurrent_queue_rep<T, Allocator>;
38     using queue_allocator_type = typename allocator_traits_type::template rebind_alloc<queue_representation_type>;
39     using queue_allocator_traits = tbb::detail::allocator_traits<queue_allocator_type>;
40 public:
41     using size_type = std::size_t;
42     using value_type = T;
43     using reference = T&;
44     using const_reference = const T&;
45     using difference_type = std::ptrdiff_t;
46 
47     using allocator_type = Allocator;
48     using pointer = typename allocator_traits_type::pointer;
49     using const_pointer = typename allocator_traits_type::const_pointer;
50 
51     using iterator = concurrent_queue_iterator<concurrent_queue, T, Allocator>;
52     using const_iterator = concurrent_queue_iterator<concurrent_queue, const T, Allocator>;
53 
54     concurrent_queue() : concurrent_queue(allocator_type()) {}
55 
56     explicit concurrent_queue(const allocator_type& a) :
57         my_allocator(a), my_queue_representation(nullptr)
58     {
59         my_queue_representation = static_cast<queue_representation_type*>(r1::cache_aligned_allocate(sizeof(queue_representation_type)));
60         queue_allocator_traits::construct(my_allocator, my_queue_representation);
61 
62         __TBB_ASSERT(is_aligned(my_queue_representation, max_nfs_size), "alignment error" );
63         __TBB_ASSERT(is_aligned(&my_queue_representation->head_counter, max_nfs_size), "alignment error" );
64         __TBB_ASSERT(is_aligned(&my_queue_representation->tail_counter, max_nfs_size), "alignment error" );
65         __TBB_ASSERT(is_aligned(&my_queue_representation->array, max_nfs_size), "alignment error" );
66     }
67 
68     template <typename InputIterator>
69     concurrent_queue(InputIterator begin, InputIterator end, const allocator_type& a = allocator_type()) :
70         concurrent_queue(a)
71     {
72         for (; begin != end; ++begin)
73             push(*begin);
74     }
75 
76     concurrent_queue(const concurrent_queue& src, const allocator_type& a) :
77         concurrent_queue(a)
78     {
79         my_queue_representation->assign(*src.my_queue_representation, my_allocator, copy_construct_item);
80     }
81 
82     concurrent_queue(const concurrent_queue& src) :
83         concurrent_queue(queue_allocator_traits::select_on_container_copy_construction(src.get_allocator()))
84     {
85         my_queue_representation->assign(*src.my_queue_representation, my_allocator, copy_construct_item);
86     }
87 
88     // Move constructors
89     concurrent_queue(concurrent_queue&& src) :
90         concurrent_queue(std::move(src.my_allocator))
91     {
92         internal_swap(src);
93     }
94 
95     concurrent_queue(concurrent_queue&& src, const allocator_type& a) :
96         concurrent_queue(a)
97     {
98         // checking that memory allocated by one instance of allocator can be deallocated
99         // with another
100         if (my_allocator == src.my_allocator) {
101             internal_swap(src);
102         } else {
103             // allocators are different => performing per-element move
104             my_queue_representation->assign(*src.my_queue_representation, my_allocator, move_construct_item);
105             src.clear();
106         }
107     }
108 
109     // Destroy queue
110     ~concurrent_queue() {
111         clear();
112         my_queue_representation->clear(my_allocator);
113         queue_allocator_traits::destroy(my_allocator, my_queue_representation);
114         r1::cache_aligned_deallocate(my_queue_representation);
115     }
116 
117     // Enqueue an item at tail of queue.
118     void push(const T& value) {
119         internal_push(value);
120     }
121 
122     void push(T&& value) {
123         internal_push(std::move(value));
124     }
125 
126     template <typename... Args>
127     void emplace( Args&&... args ) {
128         internal_push(std::forward<Args>(args)...);
129     }
130 
131     // Attempt to dequeue an item from head of queue.
132     /** Does not wait for item to become available.
133         Returns true if successful; false otherwise. */
134     bool try_pop( T& result ) {
135         return internal_try_pop(&result);
136     }
137 
138     // Return the number of items in the queue; thread unsafe
139     size_type unsafe_size() const {
140         std::ptrdiff_t size = my_queue_representation->size();
141         return size < 0 ? 0 :  size_type(size);
142     }
143 
144     // Equivalent to size()==0.
145     __TBB_nodiscard bool empty() const {
146         return my_queue_representation->empty();
147     }
148 
149     // Clear the queue. not thread-safe.
150     void clear() {
151         my_queue_representation->clear(my_allocator);
152     }
153 
154     // Return allocator object
155     allocator_type get_allocator() const { return my_allocator; }
156 
157     //------------------------------------------------------------------------
158     // The iterators are intended only for debugging.  They are slow and not thread safe.
159     //------------------------------------------------------------------------
160 
161     iterator unsafe_begin() { return concurrent_queue_iterator_provider::get<iterator>(*this); }
162     iterator unsafe_end() { return iterator(); }
163     const_iterator unsafe_begin() const { return concurrent_queue_iterator_provider::get<const_iterator>(*this); }
164     const_iterator unsafe_end() const { return const_iterator(); }
165     const_iterator unsafe_cbegin() const { return concurrent_queue_iterator_provider::get<const_iterator>(*this); }
166     const_iterator unsafe_cend() const { return const_iterator(); }
167 
168 private:
169     void internal_swap(concurrent_queue& src) {
170         using std::swap;
171         swap(my_queue_representation, src.my_queue_representation);
172     }
173 
174     template <typename... Args>
175     void internal_push( Args&&... args ) {
176         ticket_type k = my_queue_representation->tail_counter++;
177         my_queue_representation->choose(k).push(k, *my_queue_representation, my_allocator, std::forward<Args>(args)...);
178     }
179 
180     bool internal_try_pop( void* dst ) {
181         ticket_type k;
182         do {
183             k = my_queue_representation->head_counter.load(std::memory_order_relaxed);
184             do {
185                 if (static_cast<std::ptrdiff_t>(my_queue_representation->tail_counter.load(std::memory_order_relaxed) - k) <= 0) {
186                     // Queue is empty
187                     return false;
188                 }
189 
190                 // Queue had item with ticket k when we looked. Attempt to get that item.
191                 // Another thread snatched the item, retry.
192             } while (!my_queue_representation->head_counter.compare_exchange_strong(k, k + 1));
193         } while (!my_queue_representation->choose(k).pop(dst, k, *my_queue_representation, my_allocator));
194         return true;
195     }
196 
197     template <typename Container, typename Value, typename A>
198     friend class concurrent_queue_iterator;
199 
200     static void copy_construct_item(T* location, const void* src) {
201         // TODO: use allocator_traits for copy construction
202         new (location) value_type(*static_cast<const value_type*>(src));
203         // queue_allocator_traits::construct(my_allocator, location, *static_cast<const T*>(src));
204     }
205 
206     static void move_construct_item(T* location, const void* src) {
207         // TODO: use allocator_traits for move construction
208         new (location) value_type(std::move(*static_cast<value_type*>(const_cast<void*>(src))));
209     }
210 
211     queue_allocator_type my_allocator;
212     queue_representation_type* my_queue_representation;
213 }; // class concurrent_queue
214 
215 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
216 // Deduction guide for the constructor from two iterators
217 template <typename It, typename Alloc = tbb::cache_aligned_allocator<iterator_value_t<It>>,
218           typename = std::enable_if_t<is_input_iterator_v<It>>,
219           typename = std::enable_if_t<is_allocator_v<Alloc>>>
220 concurrent_queue( It, It, Alloc = Alloc() )
221 -> concurrent_queue<iterator_value_t<It>, Alloc>;
222 
223 #endif /* __TBB_CPP17_DEDUCTION_GUIDES_PRESENT */
224 
225 class concurrent_monitor;
226 
227 // The concurrent monitor tags for concurrent_bounded_queue.
228 static constexpr std::size_t cbq_slots_avail_tag = 0;
229 static constexpr std::size_t cbq_items_avail_tag = 1;
230 } // namespace d2
231 
232 
233 namespace r1 {
234     class concurrent_monitor;
235 
236     TBB_EXPORT std::uint8_t* __TBB_EXPORTED_FUNC allocate_bounded_queue_rep( std::size_t queue_rep_size );
237     TBB_EXPORT void __TBB_EXPORTED_FUNC deallocate_bounded_queue_rep( std::uint8_t* mem, std::size_t queue_rep_size );
238     TBB_EXPORT void __TBB_EXPORTED_FUNC abort_bounded_queue_monitors( concurrent_monitor* monitors );
239     TBB_EXPORT void __TBB_EXPORTED_FUNC notify_bounded_queue_monitor( concurrent_monitor* monitors, std::size_t monitor_tag
240                                                             , std::size_t ticket );
241     TBB_EXPORT void __TBB_EXPORTED_FUNC wait_bounded_queue_monitor( concurrent_monitor* monitors, std::size_t monitor_tag,
242                                                             std::ptrdiff_t target, d1::delegate_base& predicate );
243 } // namespace r1
244 
245 
246 namespace d2 {
247 // A high-performance thread-safe blocking concurrent bounded queue.
248 // Supports boundedness and blocking semantics.
249 // Multiple threads may each push and pop concurrently.
250 // Assignment construction is not allowed.
251 template <typename T, typename Allocator = tbb::cache_aligned_allocator<T>>
252 class concurrent_bounded_queue {
253     using allocator_traits_type = tbb::detail::allocator_traits<Allocator>;
254     using queue_representation_type = concurrent_queue_rep<T, Allocator>;
255     using queue_allocator_type = typename allocator_traits_type::template rebind_alloc<queue_representation_type>;
256     using queue_allocator_traits = tbb::detail::allocator_traits<queue_allocator_type>;
257 
258     template <typename FuncType>
259     void internal_wait(r1::concurrent_monitor* monitors, std::size_t monitor_tag, std::ptrdiff_t target, FuncType pred) {
260         d1::delegated_function<FuncType> func(pred);
261         r1::wait_bounded_queue_monitor(monitors, monitor_tag, target, func);
262     }
263 public:
264     using size_type = std::ptrdiff_t;
265     using value_type = T;
266     using reference = T&;
267     using const_reference = const T&;
268     using difference_type = std::ptrdiff_t;
269 
270     using allocator_type = Allocator;
271     using pointer = typename allocator_traits_type::pointer;
272     using const_pointer = typename allocator_traits_type::const_pointer;
273 
274     using iterator = concurrent_queue_iterator<concurrent_bounded_queue, T, Allocator>;
275     using const_iterator = concurrent_queue_iterator<concurrent_bounded_queue, const T, Allocator> ;
276 
277     concurrent_bounded_queue() : concurrent_bounded_queue(allocator_type()) {}
278 
279     explicit concurrent_bounded_queue( const allocator_type& a ) :
280         my_allocator(a), my_capacity(0), my_abort_counter(0), my_queue_representation(nullptr)
281     {
282         my_queue_representation = reinterpret_cast<queue_representation_type*>(
283             r1::allocate_bounded_queue_rep(sizeof(queue_representation_type)));
284         my_monitors = reinterpret_cast<r1::concurrent_monitor*>(my_queue_representation + 1);
285         queue_allocator_traits::construct(my_allocator, my_queue_representation);
286         my_capacity = std::size_t(-1) / (queue_representation_type::item_size > 1 ? queue_representation_type::item_size : 2);
287 
288         __TBB_ASSERT(is_aligned(my_queue_representation, max_nfs_size), "alignment error" );
289         __TBB_ASSERT(is_aligned(&my_queue_representation->head_counter, max_nfs_size), "alignment error" );
290         __TBB_ASSERT(is_aligned(&my_queue_representation->tail_counter, max_nfs_size), "alignment error" );
291         __TBB_ASSERT(is_aligned(&my_queue_representation->array, max_nfs_size), "alignment error" );
292     }
293 
294     template <typename InputIterator>
295     concurrent_bounded_queue( InputIterator begin, InputIterator end, const allocator_type& a = allocator_type() ) :
296         concurrent_bounded_queue(a)
297     {
298         for (; begin != end; ++begin)
299             push(*begin);
300     }
301 
302     concurrent_bounded_queue( const concurrent_bounded_queue& src, const allocator_type& a ) :
303         concurrent_bounded_queue(a)
304     {
305         my_queue_representation->assign(*src.my_queue_representation, my_allocator, copy_construct_item);
306     }
307 
308     concurrent_bounded_queue( const concurrent_bounded_queue& src ) :
309         concurrent_bounded_queue(queue_allocator_traits::select_on_container_copy_construction(src.get_allocator()))
310     {
311         my_queue_representation->assign(*src.my_queue_representation, my_allocator, copy_construct_item);
312     }
313 
314     // Move constructors
315     concurrent_bounded_queue( concurrent_bounded_queue&& src ) :
316         concurrent_bounded_queue(std::move(src.my_allocator))
317     {
318         internal_swap(src);
319     }
320 
321     concurrent_bounded_queue( concurrent_bounded_queue&& src, const allocator_type& a ) :
322         concurrent_bounded_queue(a)
323     {
324         // checking that memory allocated by one instance of allocator can be deallocated
325         // with another
326         if (my_allocator == src.my_allocator) {
327             internal_swap(src);
328         } else {
329             // allocators are different => performing per-element move
330             my_queue_representation->assign(*src.my_queue_representation, my_allocator, move_construct_item);
331             src.clear();
332         }
333     }
334 
335     // Destroy queue
336     ~concurrent_bounded_queue() {
337         clear();
338         my_queue_representation->clear(my_allocator);
339         queue_allocator_traits::destroy(my_allocator, my_queue_representation);
340         r1::deallocate_bounded_queue_rep(reinterpret_cast<std::uint8_t*>(my_queue_representation),
341                                          sizeof(queue_representation_type));
342     }
343 
344     // Enqueue an item at tail of queue.
345     void push( const T& value ) {
346         internal_push(value);
347     }
348 
349     void push( T&& value ) {
350         internal_push(std::move(value));
351     }
352 
353     // Enqueue an item at tail of queue if queue is not already full.
354     // Does not wait for queue to become not full.
355     // Returns true if item is pushed; false if queue was already full.
356     bool try_push( const T& value ) {
357         return internal_push_if_not_full(value);
358     }
359 
360     bool try_push( T&& value ) {
361         return internal_push_if_not_full(std::move(value));
362     }
363 
364     template <typename... Args>
365     void emplace( Args&&... args ) {
366         internal_push(std::forward<Args>(args)...);
367     }
368 
369     template <typename... Args>
370     bool try_emplace( Args&&... args ) {
371         return internal_push_if_not_full(std::forward<Args>(args)...);
372     }
373 
374     // Attempt to dequeue an item from head of queue.
375     void pop( T& result ) {
376         internal_pop(&result);
377     }
378 
379     /** Does not wait for item to become available.
380         Returns true if successful; false otherwise. */
381     bool try_pop( T& result ) {
382         return internal_pop_if_present(&result);
383     }
384 
385     void abort() {
386         internal_abort();
387     }
388 
389     // Return the number of items in the queue; thread unsafe
390     std::ptrdiff_t size() const {
391         return my_queue_representation->size();
392     }
393 
394     void set_capacity( size_type new_capacity ) {
395         std::ptrdiff_t c = new_capacity < 0 ? infinite_capacity : new_capacity;
396         my_capacity = c;
397     }
398 
399     size_type capacity() const {
400         return my_capacity;
401     }
402 
403     // Equivalent to size()==0.
404     __TBB_nodiscard bool empty() const {
405         return my_queue_representation->empty();
406     }
407 
408     // Clear the queue. not thread-safe.
409     void clear() {
410         my_queue_representation->clear(my_allocator);
411     }
412 
413     // Return allocator object
414     allocator_type get_allocator() const { return my_allocator; }
415 
416     //------------------------------------------------------------------------
417     // The iterators are intended only for debugging.  They are slow and not thread safe.
418     //------------------------------------------------------------------------
419 
420     iterator unsafe_begin() { return concurrent_queue_iterator_provider::get<iterator>(*this); }
421     iterator unsafe_end() { return iterator(); }
422     const_iterator unsafe_begin() const { return concurrent_queue_iterator_provider::get<const_iterator>(*this); }
423     const_iterator unsafe_end() const { return const_iterator(); }
424     const_iterator unsafe_cbegin() const { return concurrent_queue_iterator_provider::get<const_iterator>(*this); }
425     const_iterator unsafe_cend() const { return const_iterator(); }
426 
427 private:
428     void internal_swap( concurrent_bounded_queue& src ) {
429         std::swap(my_queue_representation, src.my_queue_representation);
430         std::swap(my_monitors, src.my_monitors);
431     }
432 
433     static constexpr std::ptrdiff_t infinite_capacity = std::ptrdiff_t(~size_type(0) / 2);
434 
435     template <typename... Args>
436     void internal_push( Args&&... args ) {
437         unsigned old_abort_counter = my_abort_counter.load(std::memory_order_relaxed);
438         ticket_type ticket = my_queue_representation->tail_counter++;
439         std::ptrdiff_t target = ticket - my_capacity;
440 
441         if (static_cast<std::ptrdiff_t>(my_queue_representation->head_counter.load(std::memory_order_relaxed)) <= target) { // queue is full
442             auto pred = [&] {
443                 if (my_abort_counter.load(std::memory_order_relaxed) != old_abort_counter) {
444                     throw_exception(exception_id::user_abort);
445                 }
446 
447                 return static_cast<std::ptrdiff_t>(my_queue_representation->head_counter.load(std::memory_order_relaxed)) <= target;
448             };
449 
450             try_call( [&] {
451                 internal_wait(my_monitors, cbq_slots_avail_tag, target, pred);
452             }).on_exception( [&] {
453                 my_queue_representation->choose(ticket).abort_push(ticket, *my_queue_representation, my_allocator);
454             });
455 
456         }
457         __TBB_ASSERT((static_cast<std::ptrdiff_t>(my_queue_representation->head_counter.load(std::memory_order_relaxed)) > target), nullptr);
458         my_queue_representation->choose(ticket).push(ticket, *my_queue_representation, my_allocator, std::forward<Args>(args)...);
459         r1::notify_bounded_queue_monitor(my_monitors, cbq_items_avail_tag, ticket);
460     }
461 
462     template <typename... Args>
463     bool internal_push_if_not_full( Args&&... args ) {
464         ticket_type ticket = my_queue_representation->tail_counter.load(std::memory_order_relaxed);
465         do {
466             if (static_cast<std::ptrdiff_t>(ticket - my_queue_representation->head_counter.load(std::memory_order_relaxed)) >= my_capacity) {
467                 // Queue is full
468                 return false;
469             }
470             // Queue had empty slot with ticket k when we looked. Attempt to claim that slot.
471             // Another thread claimed the slot, so retry.
472         } while (!my_queue_representation->tail_counter.compare_exchange_strong(ticket, ticket + 1));
473 
474         my_queue_representation->choose(ticket).push(ticket, *my_queue_representation, my_allocator, std::forward<Args>(args)...);
475         r1::notify_bounded_queue_monitor(my_monitors, cbq_items_avail_tag, ticket);
476         return true;
477     }
478 
479     void internal_pop( void* dst ) {
480         std::ptrdiff_t target;
481         // This loop is a single pop operation; abort_counter should not be re-read inside
482         unsigned old_abort_counter = my_abort_counter.load(std::memory_order_relaxed);
483 
484         do {
485             target = my_queue_representation->head_counter++;
486             if (static_cast<std::ptrdiff_t>(my_queue_representation->tail_counter.load(std::memory_order_relaxed)) <= target) {
487                 auto pred = [&] {
488                     if (my_abort_counter.load(std::memory_order_relaxed) != old_abort_counter) {
489                             throw_exception(exception_id::user_abort);
490                     }
491 
492                     return static_cast<std::ptrdiff_t>(my_queue_representation->tail_counter.load(std::memory_order_relaxed)) <= target;
493                 };
494 
495                 try_call( [&] {
496                     internal_wait(my_monitors, cbq_items_avail_tag, target, pred);
497                 }).on_exception( [&] {
498                     my_queue_representation->head_counter--;
499                 });
500             }
501             __TBB_ASSERT(static_cast<std::ptrdiff_t>(my_queue_representation->tail_counter.load(std::memory_order_relaxed)) > target, nullptr);
502         } while (!my_queue_representation->choose(target).pop(dst, target, *my_queue_representation, my_allocator));
503 
504         r1::notify_bounded_queue_monitor(my_monitors, cbq_slots_avail_tag, target);
505     }
506 
507     bool internal_pop_if_present( void* dst ) {
508         ticket_type ticket;
509         do {
510             ticket = my_queue_representation->head_counter.load(std::memory_order_relaxed);
511             do {
512                 if (static_cast<std::ptrdiff_t>(my_queue_representation->tail_counter.load(std::memory_order_relaxed) - ticket) <= 0) { // queue is empty
513                     // Queue is empty
514                     return false;
515                 }
516                 // Queue had item with ticket k when we looked.  Attempt to get that item.
517                 // Another thread snatched the item, retry.
518             } while (!my_queue_representation->head_counter.compare_exchange_strong(ticket, ticket + 1));
519         } while (!my_queue_representation->choose(ticket).pop(dst, ticket, *my_queue_representation, my_allocator));
520 
521         r1::notify_bounded_queue_monitor(my_monitors, cbq_slots_avail_tag, ticket);
522         return true;
523     }
524 
525     void internal_abort() {
526         ++my_abort_counter;
527         r1::abort_bounded_queue_monitors(my_monitors);
528     }
529 
530     static void copy_construct_item(T* location, const void* src) {
531         // TODO: use allocator_traits for copy construction
532         new (location) value_type(*static_cast<const value_type*>(src));
533     }
534 
535     static void move_construct_item(T* location, const void* src) {
536         // TODO: use allocator_traits for move construction
537         new (location) value_type(std::move(*static_cast<value_type*>(const_cast<void*>(src))));
538     }
539 
540     template <typename Container, typename Value, typename A>
541     friend class concurrent_queue_iterator;
542 
543     queue_allocator_type my_allocator;
544     std::ptrdiff_t my_capacity;
545     std::atomic<unsigned> my_abort_counter;
546     queue_representation_type* my_queue_representation;
547 
548     r1::concurrent_monitor* my_monitors;
549 }; // class concurrent_bounded_queue
550 
551 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
552 // Deduction guide for the constructor from two iterators
553 template <typename It, typename Alloc = tbb::cache_aligned_allocator<iterator_value_t<It>>>
554 concurrent_bounded_queue( It, It, Alloc = Alloc() )
555 -> concurrent_bounded_queue<iterator_value_t<It>, Alloc>;
556 
557 #endif /* __TBB_CPP17_DEDUCTION_GUIDES_PRESENT */
558 
559 } //namespace d2
560 } // namesapce detail
561 
562 inline namespace v1 {
563 
564 using detail::d2::concurrent_queue;
565 using detail::d2::concurrent_bounded_queue;
566 using detail::r1::user_abort;
567 using detail::r1::bad_last_alloc;
568 
569 } // inline namespace v1
570 } // namespace tbb
571 
572 #endif // __TBB_concurrent_queue_H
573