1 /*
2     Copyright (c) 2005-2020 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #ifndef __TBB_concurrent_queue_H
18 #define __TBB_concurrent_queue_H
19 
20 #include "detail/_namespace_injection.h"
21 #include "detail/_concurrent_queue_base.h"
22 #include "detail/_allocator_traits.h"
23 #include "detail/_exception.h"
24 #include "detail/_containers_helpers.h"
25 #include "cache_aligned_allocator.h"
26 
27 namespace tbb {
28 namespace detail {
29 namespace d1 {
30 
31 // A high-performance thread-safe non-blocking concurrent queue.
32 // Multiple threads may each push and pop concurrently.
33 // Assignment construction is not allowed.
34 template <typename T, typename Allocator = tbb::cache_aligned_allocator<T>>
35 class concurrent_queue {
36     using allocator_traits_type = tbb::detail::allocator_traits<Allocator>;
37     using queue_representation_type = concurrent_queue_rep<T, Allocator>;
38     using queue_allocator_type = typename allocator_traits_type::template rebind_alloc<queue_representation_type>;
39     using queue_allocator_traits = tbb::detail::allocator_traits<queue_allocator_type>;
40 public:
41     using size_type = std::size_t;
42     using value_type = T;
43     using reference = T&;
44     using const_reference = const T&;
45     using difference_type = std::ptrdiff_t;
46 
47     using allocator_type = Allocator;
48     using pointer = typename allocator_traits_type::pointer;
49     using const_pointer = typename allocator_traits_type::const_pointer;
50 
51     using iterator = concurrent_queue_iterator<concurrent_queue, T, Allocator>;
52     using const_iterator = concurrent_queue_iterator<concurrent_queue, const T, Allocator>;
53 
54     concurrent_queue() : concurrent_queue(allocator_type()) {}
55 
56     explicit concurrent_queue(const allocator_type& a) :
57         my_allocator(a), my_queue_representation(nullptr)
58     {
59         my_queue_representation = static_cast<queue_representation_type*>(r1::cache_aligned_allocate(sizeof(queue_representation_type)));
60         queue_allocator_traits::construct(my_allocator, my_queue_representation, my_allocator);
61 
62         __TBB_ASSERT(is_aligned(my_queue_representation, max_nfs_size), "alignment error" );
63         __TBB_ASSERT(is_aligned(&my_queue_representation->head_counter, max_nfs_size), "alignment error" );
64         __TBB_ASSERT(is_aligned(&my_queue_representation->tail_counter, max_nfs_size), "alignment error" );
65         __TBB_ASSERT(is_aligned(&my_queue_representation->array, max_nfs_size), "alignment error" );
66     }
67 
68     template <typename InputIterator>
69     concurrent_queue(InputIterator begin, InputIterator end, const allocator_type& a = allocator_type()) :
70         concurrent_queue(a)
71     {
72         for (; begin != end; ++begin)
73             push(*begin);
74     }
75 
76     concurrent_queue(const concurrent_queue& src, const allocator_type& a) :
77         concurrent_queue(a)
78     {
79         my_queue_representation->assign(*src.my_queue_representation, copy_construct_item);
80     }
81 
82     concurrent_queue(const concurrent_queue& src) :
83         concurrent_queue(queue_allocator_traits::select_on_container_copy_construction(src.get_allocator()))
84     {
85         my_queue_representation->assign(*src.my_queue_representation, copy_construct_item);
86     }
87 
88     // Move constructors
89     concurrent_queue(concurrent_queue&& src) :
90         concurrent_queue(std::move(src.my_allocator))
91     {
92         internal_swap(src);
93     }
94 
95     concurrent_queue(concurrent_queue&& src, const allocator_type& a) :
96         concurrent_queue(a)
97     {
98         // checking that memory allocated by one instance of allocator can be deallocated
99         // with another
100         if (my_allocator == src.my_allocator) {
101             internal_swap(src);
102         } else {
103             // allocators are different => performing per-element move
104             my_queue_representation->assign(*src.my_queue_representation, move_construct_item);
105             src.clear();
106         }
107     }
108 
109     // Destroy queue
110     ~concurrent_queue() {
111         clear();
112         my_queue_representation->clear();
113         queue_allocator_traits::destroy(my_allocator, my_queue_representation);
114         r1::cache_aligned_deallocate(my_queue_representation);
115     }
116 
117     // Enqueue an item at tail of queue.
118     void push(const T& value) {
119         internal_push(value);
120     }
121 
122     void push(T&& value) {
123         internal_push(std::move(value));
124     }
125 
126     template <typename... Args>
127     void emplace( Args&&... args ) {
128         internal_push(std::forward<Args>(args)...);
129     }
130 
131     // Attempt to dequeue an item from head of queue.
132     /** Does not wait for item to become available.
133         Returns true if successful; false otherwise. */
134     bool try_pop( T& result ) {
135         return internal_try_pop(&result);
136     }
137 
138     // Return the number of items in the queue; thread unsafe
139     size_type unsafe_size() const {
140         std::ptrdiff_t size = my_queue_representation->size();
141         return size < 0 ? 0 :  size_type(size);
142     }
143 
144     // Equivalent to size()==0.
145     bool empty() const {
146         return my_queue_representation->empty();
147     }
148 
149     // Clear the queue. not thread-safe.
150     void clear() {
151         while (!empty()) {
152             T value;
153             try_pop(value);
154         }
155     }
156 
157     // Return allocator object
158     allocator_type get_allocator() const { return my_allocator; }
159 
160     //------------------------------------------------------------------------
161     // The iterators are intended only for debugging.  They are slow and not thread safe.
162     //------------------------------------------------------------------------
163 
164     iterator unsafe_begin() { return concurrent_queue_iterator_provider::get<iterator>(*this); }
165     iterator unsafe_end() { return iterator(); }
166     const_iterator unsafe_begin() const { return concurrent_queue_iterator_provider::get<const_iterator>(*this); }
167     const_iterator unsafe_end() const { return const_iterator(); }
168     const_iterator unsafe_cbegin() const { return concurrent_queue_iterator_provider::get<const_iterator>(*this); }
169     const_iterator unsafe_cend() const { return const_iterator(); }
170 
171 private:
172     void internal_swap(concurrent_queue& src) {
173         using std::swap;
174         swap(my_queue_representation, src.my_queue_representation);
175     }
176 
177     template <typename... Args>
178     void internal_push( Args&&... args ) {
179         ticket_type k = my_queue_representation->tail_counter++;
180         my_queue_representation->choose(k).push(k, *my_queue_representation, std::forward<Args>(args)...);
181     }
182 
183     bool internal_try_pop( void* dst ) {
184         ticket_type k;
185         do {
186             k = my_queue_representation->head_counter.load(std::memory_order_relaxed);
187             do {
188                 if (static_cast<std::ptrdiff_t>(my_queue_representation->tail_counter.load(std::memory_order_relaxed) - k) <= 0) {
189                     // Queue is empty
190                     return false;
191                 }
192 
193                 // Queue had item with ticket k when we looked. Attempt to get that item.
194                 // Another thread snatched the item, retry.
195             } while (!my_queue_representation->head_counter.compare_exchange_strong(k, k + 1));
196         } while (!my_queue_representation->choose(k).pop(dst, k, *my_queue_representation));
197         return true;
198     }
199 
200     template <typename Container, typename Value, typename A>
201     friend class concurrent_queue_iterator;
202 
203     static void copy_construct_item(T* location, const void* src) {
204         // TODO: use allocator_traits for copy construction
205         new (location) value_type(*static_cast<const value_type*>(src));
206         // queue_allocator_traits::construct(my_allocator, location, *static_cast<const T*>(src));
207     }
208 
209     static void move_construct_item(T* location, const void* src) {
210         // TODO: use allocator_traits for move construction
211         new (location) value_type(std::move(*static_cast<value_type*>(const_cast<void*>(src))));
212     }
213 
214     queue_allocator_type my_allocator;
215     queue_representation_type* my_queue_representation;
216 }; // class concurrent_queue
217 
218 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
219 // Deduction guide for the constructor from two iterators
220 template <typename It, typename Alloc = tbb::cache_aligned_allocator<iterator_value_t<It>>,
221           typename = std::enable_if_t<is_input_iterator_v<It>>,
222           typename = std::enable_if_t<is_allocator_v<Alloc>>>
223 concurrent_queue( It, It, Alloc = Alloc() )
224 -> concurrent_queue<iterator_value_t<It>, Alloc>;
225 
226 #endif /* __TBB_CPP17_DEDUCTION_GUIDES_PRESENT */
227 
228 class concurrent_monitor;
229 
230 template <typename FuncType>
231 class delegated_function : public delegate_base {
232 public:
233     delegated_function(FuncType& f) : my_func(f) {}
234 
235     bool operator()() const override {
236         return my_func();
237     }
238 
239 private:
240     FuncType &my_func;
241 }; // class delegated_function
242 
243 // The concurrent monitor tags for concurrent_bounded_queue.
244 static constexpr std::size_t cbq_slots_avail_tag = 0;
245 static constexpr std::size_t cbq_items_avail_tag = 1;
246 } // namespace d1
247 
248 
249 namespace r1 {
250     class concurrent_monitor;
251 
252     std::uint8_t* __TBB_EXPORTED_FUNC allocate_bounded_queue_rep( std::size_t queue_rep_size );
253     void __TBB_EXPORTED_FUNC deallocate_bounded_queue_rep( std::uint8_t* mem, std::size_t queue_rep_size );
254     void __TBB_EXPORTED_FUNC abort_bounded_queue_monitors( concurrent_monitor* monitors );
255     void __TBB_EXPORTED_FUNC notify_bounded_queue_monitor( concurrent_monitor* monitors, std::size_t monitor_tag
256                                                             , std::size_t ticket );
257     void __TBB_EXPORTED_FUNC wait_bounded_queue_monitor( concurrent_monitor* monitors, std::size_t monitor_tag,
258                                                             std::ptrdiff_t target, d1::delegate_base& predicate );
259 } // namespace r1
260 
261 
262 namespace d1 {
263 // A high-performance thread-safe blocking concurrent bounded queue.
264 // Supports boundedness and blocking semantics.
265 // Multiple threads may each push and pop concurrently.
266 // Assignment construction is not allowed.
267 template <typename T, typename Allocator = tbb::cache_aligned_allocator<T>>
268 class concurrent_bounded_queue {
269     using allocator_traits_type = tbb::detail::allocator_traits<Allocator>;
270     using queue_representation_type = concurrent_queue_rep<T, Allocator>;
271     using queue_allocator_type = typename allocator_traits_type::template rebind_alloc<queue_representation_type>;
272     using queue_allocator_traits = tbb::detail::allocator_traits<queue_allocator_type>;
273 
274     template <typename FuncType>
275     void internal_wait(r1::concurrent_monitor* monitors, std::size_t monitor_tag, std::ptrdiff_t target, FuncType pred) {
276         delegated_function<FuncType> func(pred);
277         r1::wait_bounded_queue_monitor(monitors, monitor_tag, target, func);
278     }
279 public:
280     using size_type = std::ptrdiff_t;
281     using value_type = T;
282     using reference = T&;
283     using const_reference = const T&;
284     using difference_type = std::ptrdiff_t;
285 
286     using allocator_type = Allocator;
287     using pointer = typename allocator_traits_type::pointer;
288     using const_pointer = typename allocator_traits_type::const_pointer;
289 
290     using iterator = concurrent_queue_iterator<concurrent_bounded_queue, T, Allocator>;
291     using const_iterator = concurrent_queue_iterator<concurrent_bounded_queue, const T, Allocator> ;
292 
293     concurrent_bounded_queue() : concurrent_bounded_queue(allocator_type()) {}
294 
295     explicit concurrent_bounded_queue( const allocator_type& a ) :
296         my_allocator(a), my_capacity(0), my_abort_counter(0), my_queue_representation(nullptr)
297     {
298         my_queue_representation = reinterpret_cast<queue_representation_type*>(
299             r1::allocate_bounded_queue_rep(sizeof(queue_representation_type)));
300         my_monitors = reinterpret_cast<r1::concurrent_monitor*>(my_queue_representation + 1);
301         queue_allocator_traits::construct(my_allocator, my_queue_representation, my_allocator);
302         my_capacity = std::size_t(-1) / (queue_representation_type::item_size > 1 ? queue_representation_type::item_size : 2);
303 
304         __TBB_ASSERT(is_aligned(my_queue_representation, max_nfs_size), "alignment error" );
305         __TBB_ASSERT(is_aligned(&my_queue_representation->head_counter, max_nfs_size), "alignment error" );
306         __TBB_ASSERT(is_aligned(&my_queue_representation->tail_counter, max_nfs_size), "alignment error" );
307         __TBB_ASSERT(is_aligned(&my_queue_representation->array, max_nfs_size), "alignment error" );
308     }
309 
310     template <typename InputIterator>
311     concurrent_bounded_queue( InputIterator begin, InputIterator end, const allocator_type& a = allocator_type() ) :
312         concurrent_bounded_queue(a)
313     {
314         for (; begin != end; ++begin)
315             push(*begin);
316     }
317 
318     concurrent_bounded_queue( const concurrent_bounded_queue& src, const allocator_type& a ) :
319         concurrent_bounded_queue(a)
320     {
321         my_queue_representation->assign(*src.my_queue_representation, copy_construct_item);
322     }
323 
324     concurrent_bounded_queue( const concurrent_bounded_queue& src ) :
325         concurrent_bounded_queue(queue_allocator_traits::select_on_container_copy_construction(src.get_allocator()))
326     {
327         my_queue_representation->assign(*src.my_queue_representation, copy_construct_item);
328     }
329 
330     // Move constructors
331     concurrent_bounded_queue( concurrent_bounded_queue&& src ) :
332         concurrent_bounded_queue(std::move(src.my_allocator))
333     {
334         internal_swap(src);
335     }
336 
337     concurrent_bounded_queue( concurrent_bounded_queue&& src, const allocator_type& a ) :
338         concurrent_bounded_queue(a)
339     {
340         // checking that memory allocated by one instance of allocator can be deallocated
341         // with another
342         if (my_allocator == src.my_allocator) {
343             internal_swap(src);
344         } else {
345             // allocators are different => performing per-element move
346             my_queue_representation->assign(*src.my_queue_representation, move_construct_item);
347             src.clear();
348         }
349     }
350 
351     // Destroy queue
352     ~concurrent_bounded_queue() {
353         clear();
354         my_queue_representation->clear();
355         queue_allocator_traits::destroy(my_allocator, my_queue_representation);
356         r1::deallocate_bounded_queue_rep(reinterpret_cast<std::uint8_t*>(my_queue_representation),
357                                          sizeof(queue_representation_type));
358     }
359 
360     // Enqueue an item at tail of queue.
361     void push( const T& value ) {
362         internal_push(value);
363     }
364 
365     void push( T&& value ) {
366         internal_push(std::move(value));
367     }
368 
369     // Enqueue an item at tail of queue if queue is not already full.
370     // Does not wait for queue to become not full.
371     // Returns true if item is pushed; false if queue was already full.
372     bool try_push( const T& value ) {
373         return internal_push_if_not_full(value);
374     }
375 
376     bool try_push( T&& value ) {
377         return internal_push_if_not_full(std::move(value));
378     }
379 
380     template <typename... Args>
381     void emplace( Args&&... args ) {
382         internal_push(std::forward<Args>(args)...);
383     }
384 
385     template <typename... Args>
386     bool try_emplace( Args&&... args ) {
387         return internal_push_if_not_full(std::forward<Args>(args)...);
388     }
389 
390     // Attempt to dequeue an item from head of queue.
391     /** Does not wait for item to become available.
392         Returns true if successful; false otherwise. */
393     bool pop( T& result ) {
394         return internal_pop(&result);
395     }
396 
397     bool try_pop( T& result ) {
398         return internal_pop_if_present(&result);
399     }
400 
401     void abort() {
402         internal_abort();
403     }
404 
405     // Return the number of items in the queue; thread unsafe
406     std::ptrdiff_t size() const {
407         return my_queue_representation->size();
408     }
409 
410     void set_capacity( size_type new_capacity ) {
411         std::ptrdiff_t c = new_capacity < 0 ? infinite_capacity : new_capacity;
412         my_capacity = c;
413     }
414 
415     size_type capacity() const {
416         return my_capacity;
417     }
418 
419     // Equivalent to size()==0.
420     bool empty() const {
421         return my_queue_representation->empty();
422     }
423 
424     // Clear the queue. not thread-safe.
425     void clear() {
426         while (!empty()) {
427             T value;
428             try_pop(value);
429         }
430     }
431 
432     // Return allocator object
433     allocator_type get_allocator() const { return my_allocator; }
434 
435     //------------------------------------------------------------------------
436     // The iterators are intended only for debugging.  They are slow and not thread safe.
437     //------------------------------------------------------------------------
438 
439     iterator unsafe_begin() { return concurrent_queue_iterator_provider::get<iterator>(*this); }
440     iterator unsafe_end() { return iterator(); }
441     const_iterator unsafe_begin() const { return concurrent_queue_iterator_provider::get<const_iterator>(*this); }
442     const_iterator unsafe_end() const { return const_iterator(); }
443     const_iterator unsafe_cbegin() const { return concurrent_queue_iterator_provider::get<const_iterator>(*this); }
444     const_iterator unsafe_cend() const { return const_iterator(); }
445 
446 private:
447     void internal_swap( concurrent_bounded_queue& src ) {
448         std::swap(my_queue_representation, src.my_queue_representation);
449         std::swap(my_monitors, src.my_monitors);
450     }
451 
452     static constexpr std::ptrdiff_t infinite_capacity = std::ptrdiff_t(~size_type(0) / 2);
453 
454     template <typename... Args>
455     void internal_push( Args&&... args ) {
456         unsigned old_abort_counter = my_abort_counter.load(std::memory_order_relaxed);
457         ticket_type ticket = my_queue_representation->tail_counter++;
458         std::ptrdiff_t target = ticket - my_capacity;
459 
460         if (static_cast<std::ptrdiff_t>(my_queue_representation->head_counter.load(std::memory_order_relaxed)) <= target) { // queue is full
461             auto pred = [&] {
462                 if (my_abort_counter.load(std::memory_order_relaxed) != old_abort_counter) {
463                     throw_exception(exception_id::user_abort);
464                 }
465 
466                 return static_cast<std::ptrdiff_t>(my_queue_representation->head_counter.load(std::memory_order_relaxed)) <= target;
467             };
468 
469             try_call( [&] {
470                 internal_wait(my_monitors, cbq_slots_avail_tag, target, pred);
471             }).on_exception( [&] {
472                 my_queue_representation->choose(ticket).abort_push(ticket, *my_queue_representation);
473             });
474 
475         }
476         __TBB_ASSERT((static_cast<std::ptrdiff_t>(my_queue_representation->head_counter.load(std::memory_order_relaxed)) > target), nullptr);
477         my_queue_representation->choose(ticket).push(ticket, *my_queue_representation, std::forward<Args>(args)...);
478         r1::notify_bounded_queue_monitor(my_monitors, cbq_items_avail_tag, ticket);
479     }
480 
481     template <typename... Args>
482     bool internal_push_if_not_full( Args&&... args ) {
483         ticket_type ticket = my_queue_representation->tail_counter.load(std::memory_order_relaxed);
484         do {
485             if (static_cast<std::ptrdiff_t>(ticket - my_queue_representation->head_counter.load(std::memory_order_relaxed)) >= my_capacity) {
486                 // Queue is full
487                 return false;
488             }
489             // Queue had empty slot with ticket k when we looked. Attempt to claim that slot.
490             // Another thread claimed the slot, so retry.
491         } while (!my_queue_representation->tail_counter.compare_exchange_strong(ticket, ticket + 1));
492 
493         my_queue_representation->choose(ticket).push(ticket, *my_queue_representation, std::forward<Args>(args)...);
494         r1::notify_bounded_queue_monitor(my_monitors, cbq_items_avail_tag, ticket);
495         return true;
496     }
497 
498     bool internal_pop( void* dst ) {
499         std::ptrdiff_t target;
500         // This loop is a single pop operation; abort_counter should not be re-read inside
501         unsigned old_abort_counter = my_abort_counter.load(std::memory_order_relaxed);
502 
503         do {
504             target = my_queue_representation->head_counter++;
505             if (static_cast<std::ptrdiff_t>(my_queue_representation->tail_counter.load(std::memory_order_relaxed)) <= target) {
506                 auto pred = [&] {
507                     if (my_abort_counter.load(std::memory_order_relaxed) != old_abort_counter) {
508                             throw_exception(exception_id::user_abort);
509                     }
510 
511                     return static_cast<std::ptrdiff_t>(my_queue_representation->tail_counter.load(std::memory_order_relaxed)) <= target;
512                 };
513 
514                 try_call( [&] {
515                     internal_wait(my_monitors, cbq_items_avail_tag, target, pred);
516                 }).on_exception( [&] {
517                     my_queue_representation->head_counter--;
518                 });
519             }
520             __TBB_ASSERT(static_cast<std::ptrdiff_t>(my_queue_representation->tail_counter.load(std::memory_order_relaxed)) > target, nullptr);
521         } while (!my_queue_representation->choose(target).pop(dst, target, *my_queue_representation));
522 
523         r1::notify_bounded_queue_monitor(my_monitors, cbq_slots_avail_tag, target);
524         return true;
525     }
526 
527     bool internal_pop_if_present( void* dst ) {
528         ticket_type ticket;
529         do {
530             ticket = my_queue_representation->head_counter.load(std::memory_order_relaxed);
531             do {
532                 if (static_cast<std::ptrdiff_t>(my_queue_representation->tail_counter.load(std::memory_order_relaxed) - ticket) <= 0) { // queue is empty
533                     // Queue is empty
534                     return false;
535                 }
536                 // Queue had item with ticket k when we looked.  Attempt to get that item.
537                 // Another thread snatched the item, retry.
538             } while (!my_queue_representation->head_counter.compare_exchange_strong(ticket, ticket + 1));
539         } while (!my_queue_representation->choose(ticket).pop(dst, ticket, *my_queue_representation));
540 
541         r1::notify_bounded_queue_monitor(my_monitors, cbq_slots_avail_tag, ticket);
542         return true;
543     }
544 
545     void internal_abort() {
546         ++my_abort_counter;
547         r1::abort_bounded_queue_monitors(my_monitors);
548     }
549 
550     static void copy_construct_item(T* location, const void* src) {
551         // TODO: use allocator_traits for copy construction
552         new (location) value_type(*static_cast<const value_type*>(src));
553     }
554 
555     static void move_construct_item(T* location, const void* src) {
556         // TODO: use allocator_traits for move construction
557         new (location) value_type(std::move(*static_cast<value_type*>(const_cast<void*>(src))));
558     }
559 
560     template <typename Container, typename Value, typename A>
561     friend class concurrent_queue_iterator;
562 
563     queue_allocator_type my_allocator;
564     std::ptrdiff_t my_capacity;
565     std::atomic<unsigned> my_abort_counter;
566     queue_representation_type* my_queue_representation;
567 
568     r1::concurrent_monitor* my_monitors;
569 }; // class concurrent_bounded_queue
570 
571 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
572 // Deduction guide for the constructor from two iterators
573 template <typename It, typename Alloc = tbb::cache_aligned_allocator<iterator_value_t<It>>>
574 concurrent_bounded_queue( It, It, Alloc = Alloc() )
575 -> concurrent_bounded_queue<iterator_value_t<It>, Alloc>;
576 
577 #endif /* __TBB_CPP17_DEDUCTION_GUIDES_PRESENT */
578 
579 } //namespace d1
580 } // namesapce detail
581 
582 inline namespace v1 {
583 
584 using detail::d1::concurrent_queue;
585 using detail::d1::concurrent_bounded_queue;
586 using detail::r1::user_abort;
587 using detail::r1::bad_last_alloc;
588 
589 } // inline namespace v1
590 } // namespace tbb
591 
592 #endif // __TBB_concurrent_queue_H
593