1 /*
2     Copyright (c) 2005-2022 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #ifndef __TBB_concurrent_queue_H
18 #define __TBB_concurrent_queue_H
19 
20 #include "detail/_namespace_injection.h"
21 #include "detail/_concurrent_queue_base.h"
22 #include "detail/_allocator_traits.h"
23 #include "detail/_exception.h"
24 #include "detail/_containers_helpers.h"
25 #include "cache_aligned_allocator.h"
26 
27 namespace tbb {
28 namespace detail {
29 namespace d2 {
30 
31 template <typename QueueRep, typename Allocator>
32 std::pair<bool, ticket_type> internal_try_pop_impl(void* dst, QueueRep& queue, Allocator& alloc ) {
33     ticket_type ticket{};
34     do {
35         // Basically, we need to read `head_counter` before `tail_counter`. To achieve it we build happens-before on `head_counter`
36         ticket = queue.head_counter.load(std::memory_order_acquire);
37         do {
38             if (static_cast<std::ptrdiff_t>(queue.tail_counter.load(std::memory_order_relaxed) - ticket) <= 0) { // queue is empty
39                 // Queue is empty
40                 return { false, ticket };
41             }
42             // Queue had item with ticket k when we looked.  Attempt to get that item.
43             // Another thread snatched the item, retry.
44         } while (!queue.head_counter.compare_exchange_strong(ticket, ticket + 1));
45     } while (!queue.choose(ticket).pop(dst, ticket, queue, alloc));
46     return { true, ticket };
47 }
48 
49 // A high-performance thread-safe non-blocking concurrent queue.
50 // Multiple threads may each push and pop concurrently.
51 // Assignment construction is not allowed.
52 template <typename T, typename Allocator = tbb::cache_aligned_allocator<T>>
53 class concurrent_queue {
54     using allocator_traits_type = tbb::detail::allocator_traits<Allocator>;
55     using queue_representation_type = concurrent_queue_rep<T, Allocator>;
56     using queue_allocator_type = typename allocator_traits_type::template rebind_alloc<queue_representation_type>;
57     using queue_allocator_traits = tbb::detail::allocator_traits<queue_allocator_type>;
58 public:
59     using size_type = std::size_t;
60     using value_type = T;
61     using reference = T&;
62     using const_reference = const T&;
63     using difference_type = std::ptrdiff_t;
64 
65     using allocator_type = Allocator;
66     using pointer = typename allocator_traits_type::pointer;
67     using const_pointer = typename allocator_traits_type::const_pointer;
68 
69     using iterator = concurrent_queue_iterator<concurrent_queue, T, Allocator>;
70     using const_iterator = concurrent_queue_iterator<concurrent_queue, const T, Allocator>;
71 
72     concurrent_queue() : concurrent_queue(allocator_type()) {}
73 
74     explicit concurrent_queue(const allocator_type& a) :
75         my_allocator(a), my_queue_representation(nullptr)
76     {
77         my_queue_representation = static_cast<queue_representation_type*>(r1::cache_aligned_allocate(sizeof(queue_representation_type)));
78         queue_allocator_traits::construct(my_allocator, my_queue_representation);
79 
80         __TBB_ASSERT(is_aligned(my_queue_representation, max_nfs_size), "alignment error" );
81         __TBB_ASSERT(is_aligned(&my_queue_representation->head_counter, max_nfs_size), "alignment error" );
82         __TBB_ASSERT(is_aligned(&my_queue_representation->tail_counter, max_nfs_size), "alignment error" );
83         __TBB_ASSERT(is_aligned(&my_queue_representation->array, max_nfs_size), "alignment error" );
84     }
85 
86     template <typename InputIterator>
87     concurrent_queue(InputIterator begin, InputIterator end, const allocator_type& a = allocator_type()) :
88         concurrent_queue(a)
89     {
90         for (; begin != end; ++begin)
91             push(*begin);
92     }
93 
94     concurrent_queue(const concurrent_queue& src, const allocator_type& a) :
95         concurrent_queue(a)
96     {
97         my_queue_representation->assign(*src.my_queue_representation, my_allocator, copy_construct_item);
98     }
99 
100     concurrent_queue(const concurrent_queue& src) :
101         concurrent_queue(queue_allocator_traits::select_on_container_copy_construction(src.get_allocator()))
102     {
103         my_queue_representation->assign(*src.my_queue_representation, my_allocator, copy_construct_item);
104     }
105 
106     // Move constructors
107     concurrent_queue(concurrent_queue&& src) :
108         concurrent_queue(std::move(src.my_allocator))
109     {
110         internal_swap(src);
111     }
112 
113     concurrent_queue(concurrent_queue&& src, const allocator_type& a) :
114         concurrent_queue(a)
115     {
116         // checking that memory allocated by one instance of allocator can be deallocated
117         // with another
118         if (my_allocator == src.my_allocator) {
119             internal_swap(src);
120         } else {
121             // allocators are different => performing per-element move
122             my_queue_representation->assign(*src.my_queue_representation, my_allocator, move_construct_item);
123             src.clear();
124         }
125     }
126 
127     // Destroy queue
128     ~concurrent_queue() {
129         clear();
130         my_queue_representation->clear(my_allocator);
131         queue_allocator_traits::destroy(my_allocator, my_queue_representation);
132         r1::cache_aligned_deallocate(my_queue_representation);
133     }
134 
135     // Enqueue an item at tail of queue.
136     void push(const T& value) {
137         internal_push(value);
138     }
139 
140     void push(T&& value) {
141         internal_push(std::move(value));
142     }
143 
144     template <typename... Args>
145     void emplace( Args&&... args ) {
146         internal_push(std::forward<Args>(args)...);
147     }
148 
149     // Attempt to dequeue an item from head of queue.
150     /** Does not wait for item to become available.
151         Returns true if successful; false otherwise. */
152     bool try_pop( T& result ) {
153         return internal_try_pop(&result);
154     }
155 
156     // Return the number of items in the queue; thread unsafe
157     size_type unsafe_size() const {
158         std::ptrdiff_t size = my_queue_representation->size();
159         return size < 0 ? 0 :  size_type(size);
160     }
161 
162     // Equivalent to size()==0.
163     __TBB_nodiscard bool empty() const {
164         return my_queue_representation->empty();
165     }
166 
167     // Clear the queue. not thread-safe.
168     void clear() {
169         my_queue_representation->clear(my_allocator);
170     }
171 
172     // Return allocator object
173     allocator_type get_allocator() const { return my_allocator; }
174 
175     //------------------------------------------------------------------------
176     // The iterators are intended only for debugging.  They are slow and not thread safe.
177     //------------------------------------------------------------------------
178 
179     iterator unsafe_begin() { return concurrent_queue_iterator_provider::get<iterator>(*this); }
180     iterator unsafe_end() { return iterator(); }
181     const_iterator unsafe_begin() const { return concurrent_queue_iterator_provider::get<const_iterator>(*this); }
182     const_iterator unsafe_end() const { return const_iterator(); }
183     const_iterator unsafe_cbegin() const { return concurrent_queue_iterator_provider::get<const_iterator>(*this); }
184     const_iterator unsafe_cend() const { return const_iterator(); }
185 
186 private:
187     void internal_swap(concurrent_queue& src) {
188         using std::swap;
189         swap(my_queue_representation, src.my_queue_representation);
190     }
191 
192     template <typename... Args>
193     void internal_push( Args&&... args ) {
194         ticket_type k = my_queue_representation->tail_counter++;
195         my_queue_representation->choose(k).push(k, *my_queue_representation, my_allocator, std::forward<Args>(args)...);
196     }
197 
198     bool internal_try_pop( void* dst ) {
199         return internal_try_pop_impl(dst, *my_queue_representation, my_allocator).first;
200     }
201 
202     template <typename Container, typename Value, typename A>
203     friend class concurrent_queue_iterator;
204 
205     static void copy_construct_item(T* location, const void* src) {
206         // TODO: use allocator_traits for copy construction
207         new (location) value_type(*static_cast<const value_type*>(src));
208         // queue_allocator_traits::construct(my_allocator, location, *static_cast<const T*>(src));
209     }
210 
211     static void move_construct_item(T* location, const void* src) {
212         // TODO: use allocator_traits for move construction
213         new (location) value_type(std::move(*static_cast<value_type*>(const_cast<void*>(src))));
214     }
215 
216     queue_allocator_type my_allocator;
217     queue_representation_type* my_queue_representation;
218 }; // class concurrent_queue
219 
220 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
221 // Deduction guide for the constructor from two iterators
222 template <typename It, typename Alloc = tbb::cache_aligned_allocator<iterator_value_t<It>>,
223           typename = std::enable_if_t<is_input_iterator_v<It>>,
224           typename = std::enable_if_t<is_allocator_v<Alloc>>>
225 concurrent_queue( It, It, Alloc = Alloc() )
226 -> concurrent_queue<iterator_value_t<It>, Alloc>;
227 
228 #endif /* __TBB_CPP17_DEDUCTION_GUIDES_PRESENT */
229 
230 class concurrent_monitor;
231 
232 // The concurrent monitor tags for concurrent_bounded_queue.
233 static constexpr std::size_t cbq_slots_avail_tag = 0;
234 static constexpr std::size_t cbq_items_avail_tag = 1;
235 } // namespace d2
236 
237 
238 namespace r1 {
239     class concurrent_monitor;
240 
241     TBB_EXPORT std::uint8_t* __TBB_EXPORTED_FUNC allocate_bounded_queue_rep( std::size_t queue_rep_size );
242     TBB_EXPORT void __TBB_EXPORTED_FUNC deallocate_bounded_queue_rep( std::uint8_t* mem, std::size_t queue_rep_size );
243     TBB_EXPORT void __TBB_EXPORTED_FUNC abort_bounded_queue_monitors( concurrent_monitor* monitors );
244     TBB_EXPORT void __TBB_EXPORTED_FUNC notify_bounded_queue_monitor( concurrent_monitor* monitors, std::size_t monitor_tag
245                                                             , std::size_t ticket );
246     TBB_EXPORT void __TBB_EXPORTED_FUNC wait_bounded_queue_monitor( concurrent_monitor* monitors, std::size_t monitor_tag,
247                                                             std::ptrdiff_t target, d1::delegate_base& predicate );
248 } // namespace r1
249 
250 
251 namespace d2 {
252 // A high-performance thread-safe blocking concurrent bounded queue.
253 // Supports boundedness and blocking semantics.
254 // Multiple threads may each push and pop concurrently.
255 // Assignment construction is not allowed.
256 template <typename T, typename Allocator = tbb::cache_aligned_allocator<T>>
257 class concurrent_bounded_queue {
258     using allocator_traits_type = tbb::detail::allocator_traits<Allocator>;
259     using queue_representation_type = concurrent_queue_rep<T, Allocator>;
260     using queue_allocator_type = typename allocator_traits_type::template rebind_alloc<queue_representation_type>;
261     using queue_allocator_traits = tbb::detail::allocator_traits<queue_allocator_type>;
262 
263     template <typename FuncType>
264     void internal_wait(r1::concurrent_monitor* monitors, std::size_t monitor_tag, std::ptrdiff_t target, FuncType pred) {
265         d1::delegated_function<FuncType> func(pred);
266         r1::wait_bounded_queue_monitor(monitors, monitor_tag, target, func);
267     }
268 public:
269     using size_type = std::ptrdiff_t;
270     using value_type = T;
271     using reference = T&;
272     using const_reference = const T&;
273     using difference_type = std::ptrdiff_t;
274 
275     using allocator_type = Allocator;
276     using pointer = typename allocator_traits_type::pointer;
277     using const_pointer = typename allocator_traits_type::const_pointer;
278 
279     using iterator = concurrent_queue_iterator<concurrent_bounded_queue, T, Allocator>;
280     using const_iterator = concurrent_queue_iterator<concurrent_bounded_queue, const T, Allocator> ;
281 
282     concurrent_bounded_queue() : concurrent_bounded_queue(allocator_type()) {}
283 
284     explicit concurrent_bounded_queue( const allocator_type& a ) :
285         my_allocator(a), my_capacity(0), my_abort_counter(0), my_queue_representation(nullptr)
286     {
287         my_queue_representation = reinterpret_cast<queue_representation_type*>(
288             r1::allocate_bounded_queue_rep(sizeof(queue_representation_type)));
289         my_monitors = reinterpret_cast<r1::concurrent_monitor*>(my_queue_representation + 1);
290         queue_allocator_traits::construct(my_allocator, my_queue_representation);
291         my_capacity = std::size_t(-1) / (queue_representation_type::item_size > 1 ? queue_representation_type::item_size : 2);
292 
293         __TBB_ASSERT(is_aligned(my_queue_representation, max_nfs_size), "alignment error" );
294         __TBB_ASSERT(is_aligned(&my_queue_representation->head_counter, max_nfs_size), "alignment error" );
295         __TBB_ASSERT(is_aligned(&my_queue_representation->tail_counter, max_nfs_size), "alignment error" );
296         __TBB_ASSERT(is_aligned(&my_queue_representation->array, max_nfs_size), "alignment error" );
297     }
298 
299     template <typename InputIterator>
300     concurrent_bounded_queue( InputIterator begin, InputIterator end, const allocator_type& a = allocator_type() ) :
301         concurrent_bounded_queue(a)
302     {
303         for (; begin != end; ++begin)
304             push(*begin);
305     }
306 
307     concurrent_bounded_queue( const concurrent_bounded_queue& src, const allocator_type& a ) :
308         concurrent_bounded_queue(a)
309     {
310         my_queue_representation->assign(*src.my_queue_representation, my_allocator, copy_construct_item);
311     }
312 
313     concurrent_bounded_queue( const concurrent_bounded_queue& src ) :
314         concurrent_bounded_queue(queue_allocator_traits::select_on_container_copy_construction(src.get_allocator()))
315     {
316         my_queue_representation->assign(*src.my_queue_representation, my_allocator, copy_construct_item);
317     }
318 
319     // Move constructors
320     concurrent_bounded_queue( concurrent_bounded_queue&& src ) :
321         concurrent_bounded_queue(std::move(src.my_allocator))
322     {
323         internal_swap(src);
324     }
325 
326     concurrent_bounded_queue( concurrent_bounded_queue&& src, const allocator_type& a ) :
327         concurrent_bounded_queue(a)
328     {
329         // checking that memory allocated by one instance of allocator can be deallocated
330         // with another
331         if (my_allocator == src.my_allocator) {
332             internal_swap(src);
333         } else {
334             // allocators are different => performing per-element move
335             my_queue_representation->assign(*src.my_queue_representation, my_allocator, move_construct_item);
336             src.clear();
337         }
338     }
339 
340     // Destroy queue
341     ~concurrent_bounded_queue() {
342         clear();
343         my_queue_representation->clear(my_allocator);
344         queue_allocator_traits::destroy(my_allocator, my_queue_representation);
345         r1::deallocate_bounded_queue_rep(reinterpret_cast<std::uint8_t*>(my_queue_representation),
346                                          sizeof(queue_representation_type));
347     }
348 
349     // Enqueue an item at tail of queue.
350     void push( const T& value ) {
351         internal_push(value);
352     }
353 
354     void push( T&& value ) {
355         internal_push(std::move(value));
356     }
357 
358     // Enqueue an item at tail of queue if queue is not already full.
359     // Does not wait for queue to become not full.
360     // Returns true if item is pushed; false if queue was already full.
361     bool try_push( const T& value ) {
362         return internal_push_if_not_full(value);
363     }
364 
365     bool try_push( T&& value ) {
366         return internal_push_if_not_full(std::move(value));
367     }
368 
369     template <typename... Args>
370     void emplace( Args&&... args ) {
371         internal_push(std::forward<Args>(args)...);
372     }
373 
374     template <typename... Args>
375     bool try_emplace( Args&&... args ) {
376         return internal_push_if_not_full(std::forward<Args>(args)...);
377     }
378 
379     // Attempt to dequeue an item from head of queue.
380     void pop( T& result ) {
381         internal_pop(&result);
382     }
383 
384     /** Does not wait for item to become available.
385         Returns true if successful; false otherwise. */
386     bool try_pop( T& result ) {
387         return internal_pop_if_present(&result);
388     }
389 
390     void abort() {
391         internal_abort();
392     }
393 
394     // Return the number of items in the queue; thread unsafe
395     std::ptrdiff_t size() const {
396         return my_queue_representation->size();
397     }
398 
399     void set_capacity( size_type new_capacity ) {
400         std::ptrdiff_t c = new_capacity < 0 ? infinite_capacity : new_capacity;
401         my_capacity = c;
402     }
403 
404     size_type capacity() const {
405         return my_capacity;
406     }
407 
408     // Equivalent to size()==0.
409     __TBB_nodiscard bool empty() const {
410         return my_queue_representation->empty();
411     }
412 
413     // Clear the queue. not thread-safe.
414     void clear() {
415         my_queue_representation->clear(my_allocator);
416     }
417 
418     // Return allocator object
419     allocator_type get_allocator() const { return my_allocator; }
420 
421     //------------------------------------------------------------------------
422     // The iterators are intended only for debugging.  They are slow and not thread safe.
423     //------------------------------------------------------------------------
424 
425     iterator unsafe_begin() { return concurrent_queue_iterator_provider::get<iterator>(*this); }
426     iterator unsafe_end() { return iterator(); }
427     const_iterator unsafe_begin() const { return concurrent_queue_iterator_provider::get<const_iterator>(*this); }
428     const_iterator unsafe_end() const { return const_iterator(); }
429     const_iterator unsafe_cbegin() const { return concurrent_queue_iterator_provider::get<const_iterator>(*this); }
430     const_iterator unsafe_cend() const { return const_iterator(); }
431 
432 private:
433     void internal_swap( concurrent_bounded_queue& src ) {
434         std::swap(my_queue_representation, src.my_queue_representation);
435         std::swap(my_monitors, src.my_monitors);
436     }
437 
438     static constexpr std::ptrdiff_t infinite_capacity = std::ptrdiff_t(~size_type(0) / 2);
439 
440     template <typename... Args>
441     void internal_push( Args&&... args ) {
442         unsigned old_abort_counter = my_abort_counter.load(std::memory_order_relaxed);
443         ticket_type ticket = my_queue_representation->tail_counter++;
444         std::ptrdiff_t target = ticket - my_capacity;
445 
446         if (static_cast<std::ptrdiff_t>(my_queue_representation->head_counter.load(std::memory_order_relaxed)) <= target) { // queue is full
447             auto pred = [&] {
448                 if (my_abort_counter.load(std::memory_order_relaxed) != old_abort_counter) {
449                     throw_exception(exception_id::user_abort);
450                 }
451 
452                 return static_cast<std::ptrdiff_t>(my_queue_representation->head_counter.load(std::memory_order_relaxed)) <= target;
453             };
454 
455             try_call( [&] {
456                 internal_wait(my_monitors, cbq_slots_avail_tag, target, pred);
457             }).on_exception( [&] {
458                 my_queue_representation->choose(ticket).abort_push(ticket, *my_queue_representation, my_allocator);
459             });
460 
461         }
462         __TBB_ASSERT((static_cast<std::ptrdiff_t>(my_queue_representation->head_counter.load(std::memory_order_relaxed)) > target), nullptr);
463         my_queue_representation->choose(ticket).push(ticket, *my_queue_representation, my_allocator, std::forward<Args>(args)...);
464         r1::notify_bounded_queue_monitor(my_monitors, cbq_items_avail_tag, ticket);
465     }
466 
467     template <typename... Args>
468     bool internal_push_if_not_full( Args&&... args ) {
469         ticket_type ticket = my_queue_representation->tail_counter.load(std::memory_order_relaxed);
470         do {
471             if (static_cast<std::ptrdiff_t>(ticket - my_queue_representation->head_counter.load(std::memory_order_relaxed)) >= my_capacity) {
472                 // Queue is full
473                 return false;
474             }
475             // Queue had empty slot with ticket k when we looked. Attempt to claim that slot.
476             // Another thread claimed the slot, so retry.
477         } while (!my_queue_representation->tail_counter.compare_exchange_strong(ticket, ticket + 1));
478 
479         my_queue_representation->choose(ticket).push(ticket, *my_queue_representation, my_allocator, std::forward<Args>(args)...);
480         r1::notify_bounded_queue_monitor(my_monitors, cbq_items_avail_tag, ticket);
481         return true;
482     }
483 
484     void internal_pop( void* dst ) {
485         std::ptrdiff_t target;
486         // This loop is a single pop operation; abort_counter should not be re-read inside
487         unsigned old_abort_counter = my_abort_counter.load(std::memory_order_relaxed);
488 
489         do {
490             target = my_queue_representation->head_counter++;
491             if (static_cast<std::ptrdiff_t>(my_queue_representation->tail_counter.load(std::memory_order_relaxed)) <= target) {
492                 auto pred = [&] {
493                     if (my_abort_counter.load(std::memory_order_relaxed) != old_abort_counter) {
494                             throw_exception(exception_id::user_abort);
495                     }
496 
497                     return static_cast<std::ptrdiff_t>(my_queue_representation->tail_counter.load(std::memory_order_relaxed)) <= target;
498                 };
499 
500                 try_call( [&] {
501                     internal_wait(my_monitors, cbq_items_avail_tag, target, pred);
502                 }).on_exception( [&] {
503                     my_queue_representation->head_counter--;
504                 });
505             }
506             __TBB_ASSERT(static_cast<std::ptrdiff_t>(my_queue_representation->tail_counter.load(std::memory_order_relaxed)) > target, nullptr);
507         } while (!my_queue_representation->choose(target).pop(dst, target, *my_queue_representation, my_allocator));
508 
509         r1::notify_bounded_queue_monitor(my_monitors, cbq_slots_avail_tag, target);
510     }
511 
512     bool internal_pop_if_present( void* dst ) {
513         bool present{};
514         ticket_type ticket{};
515         std::tie(present, ticket) = internal_try_pop_impl(dst, *my_queue_representation, my_allocator);
516 
517         if (present) {
518             r1::notify_bounded_queue_monitor(my_monitors, cbq_slots_avail_tag, ticket);
519         }
520         return present;
521     }
522 
523     void internal_abort() {
524         ++my_abort_counter;
525         r1::abort_bounded_queue_monitors(my_monitors);
526     }
527 
528     static void copy_construct_item(T* location, const void* src) {
529         // TODO: use allocator_traits for copy construction
530         new (location) value_type(*static_cast<const value_type*>(src));
531     }
532 
533     static void move_construct_item(T* location, const void* src) {
534         // TODO: use allocator_traits for move construction
535         new (location) value_type(std::move(*static_cast<value_type*>(const_cast<void*>(src))));
536     }
537 
538     template <typename Container, typename Value, typename A>
539     friend class concurrent_queue_iterator;
540 
541     queue_allocator_type my_allocator;
542     std::ptrdiff_t my_capacity;
543     std::atomic<unsigned> my_abort_counter;
544     queue_representation_type* my_queue_representation;
545 
546     r1::concurrent_monitor* my_monitors;
547 }; // class concurrent_bounded_queue
548 
549 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
550 // Deduction guide for the constructor from two iterators
551 template <typename It, typename Alloc = tbb::cache_aligned_allocator<iterator_value_t<It>>>
552 concurrent_bounded_queue( It, It, Alloc = Alloc() )
553 -> concurrent_bounded_queue<iterator_value_t<It>, Alloc>;
554 
555 #endif /* __TBB_CPP17_DEDUCTION_GUIDES_PRESENT */
556 
557 } //namespace d2
558 } // namesapce detail
559 
560 inline namespace v1 {
561 
562 using detail::d2::concurrent_queue;
563 using detail::d2::concurrent_bounded_queue;
564 using detail::r1::user_abort;
565 using detail::r1::bad_last_alloc;
566 
567 } // inline namespace v1
568 } // namespace tbb
569 
570 #endif // __TBB_concurrent_queue_H
571