1 /*
2     Copyright (c) 2005-2020 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #ifndef __TBB_concurrent_queue_H
18 #define __TBB_concurrent_queue_H
19 
20 #include "detail/_namespace_injection.h"
21 #include "detail/_concurrent_queue_base.h"
22 #include "detail/_allocator_traits.h"
23 #include "detail/_exception.h"
24 #include "cache_aligned_allocator.h"
25 
26 namespace tbb {
27 namespace detail {
28 namespace d1 {
29 
30 // A high-performance thread-safe non-blocking concurrent queue.
31 // Multiple threads may each push and pop concurrently.
32 // Assignment construction is not allowed.
33 template <typename T, typename Allocator = tbb::cache_aligned_allocator<T>>
34 class concurrent_queue {
35     using allocator_traits_type = tbb::detail::allocator_traits<Allocator>;
36     using queue_representation_type = concurrent_queue_rep<T, Allocator>;
37     using queue_allocator_type = typename allocator_traits_type::template rebind_alloc<queue_representation_type>;
38     using queue_allocator_traits = tbb::detail::allocator_traits<queue_allocator_type>;
39 public:
40     using size_type = std::size_t;
41     using value_type = T;
42     using reference = T&;
43     using const_reference = const T&;
44     using difference_type = std::ptrdiff_t;
45 
46     using allocator_type = Allocator;
47     using pointer = typename allocator_traits_type::pointer;
48     using const_pointer = typename allocator_traits_type::const_pointer;
49 
50     using iterator = concurrent_queue_iterator<concurrent_queue, T, Allocator>;
51     using const_iterator = concurrent_queue_iterator<concurrent_queue, const T, Allocator>;
52 
53     concurrent_queue() : concurrent_queue(allocator_type()) {}
54 
55     explicit concurrent_queue(const allocator_type& a) :
56         my_allocator(a), my_queue_representation(nullptr)
57     {
58         my_queue_representation = static_cast<queue_representation_type*>(r1::cache_aligned_allocate(sizeof(queue_representation_type)));
59         queue_allocator_traits::construct(my_allocator, my_queue_representation, my_allocator);
60 
61         __TBB_ASSERT(is_aligned(my_queue_representation, max_nfs_size), "alignment error" );
62         __TBB_ASSERT(is_aligned(&my_queue_representation->head_counter, max_nfs_size), "alignment error" );
63         __TBB_ASSERT(is_aligned(&my_queue_representation->tail_counter, max_nfs_size), "alignment error" );
64         __TBB_ASSERT(is_aligned(&my_queue_representation->array, max_nfs_size), "alignment error" );
65     }
66 
67     template <typename InputIterator>
68     concurrent_queue(InputIterator begin, InputIterator end, const allocator_type& a = allocator_type()) :
69         concurrent_queue(a)
70     {
71         for (; begin != end; ++begin)
72             push(*begin);
73     }
74 
75     concurrent_queue(const concurrent_queue& src, const allocator_type& a) :
76         concurrent_queue(a)
77     {
78         my_queue_representation->assign(*src.my_queue_representation, copy_construct_item);
79     }
80 
81     concurrent_queue(const concurrent_queue& src) :
82         concurrent_queue(queue_allocator_traits::select_on_container_copy_construction(src.get_allocator()))
83     {
84         my_queue_representation->assign(*src.my_queue_representation, copy_construct_item);
85     }
86 
87     // Move constructors
88     concurrent_queue(concurrent_queue&& src) :
89         concurrent_queue(std::move(src.my_allocator))
90     {
91         internal_swap(src);
92     }
93 
94     concurrent_queue(concurrent_queue&& src, const allocator_type& a) :
95         concurrent_queue(a)
96     {
97         // checking that memory allocated by one instance of allocator can be deallocated
98         // with another
99         if (my_allocator == src.my_allocator) {
100             internal_swap(src);
101         } else {
102             // allocators are different => performing per-element move
103             my_queue_representation->assign(*src.my_queue_representation, move_construct_item);
104             src.clear();
105         }
106     }
107 
108     // Destroy queue
109     ~concurrent_queue() {
110         clear();
111         my_queue_representation->clear();
112         queue_allocator_traits::destroy(my_allocator, my_queue_representation);
113         r1::cache_aligned_deallocate(my_queue_representation);
114     }
115 
116     // Enqueue an item at tail of queue.
117     void push(const T& value) {
118         internal_push(value);
119     }
120 
121     void push(T&& value) {
122         internal_push(std::move(value));
123     }
124 
125     template <typename... Args>
126     void emplace( Args&&... args ) {
127         internal_push(std::forward<Args>(args)...);
128     }
129 
130     // Attempt to dequeue an item from head of queue.
131     /** Does not wait for item to become available.
132         Returns true if successful; false otherwise. */
133     bool try_pop( T& result ) {
134         return internal_try_pop(&result);
135     }
136 
137     // Return the number of items in the queue; thread unsafe
138     size_type unsafe_size() const {
139         std::ptrdiff_t size = my_queue_representation->size();
140         return size < 0 ? 0 :  size_type(size);
141     }
142 
143     // Equivalent to size()==0.
144     bool empty() const {
145         return my_queue_representation->empty();
146     }
147 
148     // Clear the queue. not thread-safe.
149     void clear() {
150         while (!empty()) {
151             T value;
152             try_pop(value);
153         }
154     }
155 
156     // Return allocator object
157     allocator_type get_allocator() const { return my_allocator; }
158 
159     //------------------------------------------------------------------------
160     // The iterators are intended only for debugging.  They are slow and not thread safe.
161     //------------------------------------------------------------------------
162 
163     iterator unsafe_begin() { return concurrent_queue_iterator_provider::get<iterator>(*this); }
164     iterator unsafe_end() { return iterator(); }
165     const_iterator unsafe_begin() const { return concurrent_queue_iterator_provider::get<const_iterator>(*this); }
166     const_iterator unsafe_end() const { return const_iterator(); }
167     const_iterator unsafe_cbegin() const { return concurrent_queue_iterator_provider::get<const_iterator>(*this); }
168     const_iterator unsafe_cend() const { return const_iterator(); }
169 
170 private:
171     void internal_swap(concurrent_queue& src) {
172         using std::swap;
173         swap(my_queue_representation, src.my_queue_representation);
174     }
175 
176     template <typename... Args>
177     void internal_push( Args&&... args ) {
178         ticket_type k = my_queue_representation->tail_counter++;
179         my_queue_representation->choose(k).push(k, *my_queue_representation, std::forward<Args>(args)...);
180     }
181 
182     bool internal_try_pop( void* dst ) {
183         ticket_type k;
184         do {
185             k = my_queue_representation->head_counter.load(std::memory_order_relaxed);
186             do {
187                 if (static_cast<std::ptrdiff_t>(my_queue_representation->tail_counter.load(std::memory_order_relaxed) - k) <= 0) {
188                     // Queue is empty
189                     return false;
190                 }
191 
192                 // Queue had item with ticket k when we looked. Attempt to get that item.
193                 // Another thread snatched the item, retry.
194             } while (!my_queue_representation->head_counter.compare_exchange_strong(k, k + 1));
195         } while (!my_queue_representation->choose(k).pop(dst, k, *my_queue_representation));
196         return true;
197     }
198 
199     template <typename Container, typename Value, typename A>
200     friend class concurrent_queue_iterator;
201 
202     static void copy_construct_item(T* location, const void* src) {
203         // TODO: use allocator_traits for copy construction
204         new (location) value_type(*static_cast<const value_type*>(src));
205         // queue_allocator_traits::construct(my_allocator, location, *static_cast<const T*>(src));
206     }
207 
208     static void move_construct_item(T* location, const void* src) {
209         // TODO: use allocator_traits for move construction
210         new (location) value_type(std::move(*static_cast<value_type*>(const_cast<void*>(src))));
211     }
212 
213     queue_allocator_type my_allocator;
214     queue_representation_type* my_queue_representation;
215 }; // class concurrent_queue
216 
217 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
218 // Deduction guide for the constructor from two iterators
219 template <typename InputIterator,
220           typename T = typename std::iterator_traits<InputIterator>::value_type,
221           typename A = tbb::cache_aligned_allocator<T>>
222 concurrent_queue(InputIterator, InputIterator, const A& = A())
223 ->concurrent_queue<T, A>;
224 #endif /* __TBB_CPP17_DEDUCTION_GUIDES_PRESENT */
225 
226 class concurrent_monitor;
227 
228 template <typename FuncType>
229 class delegated_function : public delegate_base {
230 public:
231     delegated_function(FuncType& f) : my_func(f) {}
232 
233     bool operator()() const override {
234         return my_func();
235     }
236 
237 private:
238     FuncType &my_func;
239 }; // class delegated_function
240 
241 // The concurrent monitor tags for concurrent_bounded_queue.
242 static constexpr std::size_t cbq_slots_avail_tag = 0;
243 static constexpr std::size_t cbq_items_avail_tag = 1;
244 } // namespace d1
245 
246 
247 namespace r1 {
248     class concurrent_monitor;
249 
250     std::uint8_t* __TBB_EXPORTED_FUNC allocate_bounded_queue_rep( std::size_t queue_rep_size );
251     void __TBB_EXPORTED_FUNC deallocate_bounded_queue_rep( std::uint8_t* mem, std::size_t queue_rep_size );
252     void __TBB_EXPORTED_FUNC abort_bounded_queue_monitors( concurrent_monitor* monitors );
253     void __TBB_EXPORTED_FUNC notify_bounded_queue_monitor( concurrent_monitor* monitors, std::size_t monitor_tag
254                                                             , std::size_t ticket );
255     void __TBB_EXPORTED_FUNC wait_bounded_queue_monitor( concurrent_monitor* monitors, std::size_t monitor_tag,
256                                                             std::ptrdiff_t target, d1::delegate_base& predicate );
257 } // namespace r1
258 
259 
260 namespace d1 {
261 // A high-performance thread-safe blocking concurrent bounded queue.
262 // Supports boundedness and blocking semantics.
263 // Multiple threads may each push and pop concurrently.
264 // Assignment construction is not allowed.
265 template <typename T, typename Allocator = tbb::cache_aligned_allocator<T>>
266 class concurrent_bounded_queue {
267     using allocator_traits_type = tbb::detail::allocator_traits<Allocator>;
268     using queue_representation_type = concurrent_queue_rep<T, Allocator>;
269     using queue_allocator_type = typename allocator_traits_type::template rebind_alloc<queue_representation_type>;
270     using queue_allocator_traits = tbb::detail::allocator_traits<queue_allocator_type>;
271 
272     template <typename FuncType>
273     void internal_wait(r1::concurrent_monitor* monitors, std::size_t monitor_tag, std::ptrdiff_t target, FuncType pred) {
274         delegated_function<FuncType> func(pred);
275         r1::wait_bounded_queue_monitor(monitors, monitor_tag, target, func);
276     }
277 public:
278     using size_type = std::ptrdiff_t;
279     using value_type = T;
280     using reference = T&;
281     using const_reference = const T&;
282     using difference_type = std::ptrdiff_t;
283 
284     using allocator_type = Allocator;
285     using pointer = typename allocator_traits_type::pointer;
286     using const_pointer = typename allocator_traits_type::const_pointer;
287 
288     using iterator = concurrent_queue_iterator<concurrent_bounded_queue, T, Allocator>;
289     using const_iterator = concurrent_queue_iterator<concurrent_bounded_queue, const T, Allocator> ;
290 
291     concurrent_bounded_queue() : concurrent_bounded_queue(allocator_type()) {}
292 
293     explicit concurrent_bounded_queue( const allocator_type& a ) :
294         my_allocator(a), my_capacity(0), my_abort_counter(0), my_queue_representation(nullptr)
295     {
296         my_queue_representation = reinterpret_cast<queue_representation_type*>(
297             r1::allocate_bounded_queue_rep(sizeof(queue_representation_type)));
298         my_monitors = reinterpret_cast<r1::concurrent_monitor*>(my_queue_representation + 1);
299         queue_allocator_traits::construct(my_allocator, my_queue_representation, my_allocator);
300         my_capacity = std::size_t(-1) / (queue_representation_type::item_size > 1 ? queue_representation_type::item_size : 2);
301 
302         __TBB_ASSERT(is_aligned(my_queue_representation, max_nfs_size), "alignment error" );
303         __TBB_ASSERT(is_aligned(&my_queue_representation->head_counter, max_nfs_size), "alignment error" );
304         __TBB_ASSERT(is_aligned(&my_queue_representation->tail_counter, max_nfs_size), "alignment error" );
305         __TBB_ASSERT(is_aligned(&my_queue_representation->array, max_nfs_size), "alignment error" );
306     }
307 
308     template <typename InputIterator>
309     concurrent_bounded_queue( InputIterator begin, InputIterator end, const allocator_type& a = allocator_type() ) :
310         concurrent_bounded_queue(a)
311     {
312         for (; begin != end; ++begin)
313             push(*begin);
314     }
315 
316     concurrent_bounded_queue( const concurrent_bounded_queue& src, const allocator_type& a ) :
317         concurrent_bounded_queue(a)
318     {
319         my_queue_representation->assign(*src.my_queue_representation, copy_construct_item);
320     }
321 
322     concurrent_bounded_queue( const concurrent_bounded_queue& src ) :
323         concurrent_bounded_queue(queue_allocator_traits::select_on_container_copy_construction(src.get_allocator()))
324     {
325         my_queue_representation->assign(*src.my_queue_representation, copy_construct_item);
326     }
327 
328     // Move constructors
329     concurrent_bounded_queue( concurrent_bounded_queue&& src ) :
330         concurrent_bounded_queue(std::move(src.my_allocator))
331     {
332         internal_swap(src);
333     }
334 
335     concurrent_bounded_queue( concurrent_bounded_queue&& src, const allocator_type& a ) :
336         concurrent_bounded_queue(a)
337     {
338         // checking that memory allocated by one instance of allocator can be deallocated
339         // with another
340         if (my_allocator == src.my_allocator) {
341             internal_swap(src);
342         } else {
343             // allocators are different => performing per-element move
344             my_queue_representation->assign(*src.my_queue_representation, move_construct_item);
345             src.clear();
346         }
347     }
348 
349     // Destroy queue
350     ~concurrent_bounded_queue() {
351         clear();
352         my_queue_representation->clear();
353         queue_allocator_traits::destroy(my_allocator, my_queue_representation);
354         r1::deallocate_bounded_queue_rep(reinterpret_cast<std::uint8_t*>(my_queue_representation),
355                                          sizeof(queue_representation_type));
356     }
357 
358     // Enqueue an item at tail of queue.
359     void push( const T& value ) {
360         internal_push(value);
361     }
362 
363     void push( T&& value ) {
364         internal_push(std::move(value));
365     }
366 
367     // Enqueue an item at tail of queue if queue is not already full.
368     // Does not wait for queue to become not full.
369     // Returns true if item is pushed; false if queue was already full.
370     bool try_push( const T& value ) {
371         return internal_push_if_not_full(value);
372     }
373 
374     bool try_push( T&& value ) {
375         return internal_push_if_not_full(std::move(value));
376     }
377 
378     template <typename... Args>
379     void emplace( Args&&... args ) {
380         internal_push(std::forward<Args>(args)...);
381     }
382 
383     template <typename... Args>
384     bool try_emplace( Args&&... args ) {
385         return internal_push_if_not_full(std::forward<Args>(args)...);
386     }
387 
388     // Attempt to dequeue an item from head of queue.
389     /** Does not wait for item to become available.
390         Returns true if successful; false otherwise. */
391     bool pop( T& result ) {
392         return internal_pop(&result);
393     }
394 
395     bool try_pop( T& result ) {
396         return internal_pop_if_present(&result);
397     }
398 
399     void abort() {
400         internal_abort();
401     }
402 
403     // Return the number of items in the queue; thread unsafe
404     std::ptrdiff_t size() const {
405         return my_queue_representation->size();
406     }
407 
408     void set_capacity( size_type new_capacity ) {
409         std::ptrdiff_t c = new_capacity < 0 ? infinite_capacity : new_capacity;
410         my_capacity = c;
411     }
412 
413     size_type capacity() const {
414         return my_capacity;
415     }
416 
417     // Equivalent to size()==0.
418     bool empty() const {
419         return my_queue_representation->empty();
420     }
421 
422     // Clear the queue. not thread-safe.
423     void clear() {
424         while (!empty()) {
425             T value;
426             try_pop(value);
427         }
428     }
429 
430     // Return allocator object
431     allocator_type get_allocator() const { return my_allocator; }
432 
433     //------------------------------------------------------------------------
434     // The iterators are intended only for debugging.  They are slow and not thread safe.
435     //------------------------------------------------------------------------
436 
437     iterator unsafe_begin() { return concurrent_queue_iterator_provider::get<iterator>(*this); }
438     iterator unsafe_end() { return iterator(); }
439     const_iterator unsafe_begin() const { return concurrent_queue_iterator_provider::get<const_iterator>(*this); }
440     const_iterator unsafe_end() const { return const_iterator(); }
441     const_iterator unsafe_cbegin() const { return concurrent_queue_iterator_provider::get<const_iterator>(*this); }
442     const_iterator unsafe_cend() const { return const_iterator(); }
443 
444 private:
445     void internal_swap( concurrent_bounded_queue& src ) {
446         std::swap(my_queue_representation, src.my_queue_representation);
447         std::swap(my_monitors, src.my_monitors);
448     }
449 
450     static constexpr std::ptrdiff_t infinite_capacity = std::ptrdiff_t(~size_type(0) / 2);
451 
452     template <typename... Args>
453     void internal_push( Args&&... args ) {
454         unsigned old_abort_counter = my_abort_counter.load(std::memory_order_relaxed);
455         ticket_type ticket = my_queue_representation->tail_counter++;
456         std::ptrdiff_t target = ticket - my_capacity;
457 
458         if (static_cast<std::ptrdiff_t>(my_queue_representation->head_counter.load(std::memory_order_relaxed)) <= target) { // queue is full
459             auto pred = [&] {
460                 if (my_abort_counter.load(std::memory_order_relaxed) != old_abort_counter) {
461                     throw_exception(exception_id::user_abort);
462                 }
463 
464                 return static_cast<std::ptrdiff_t>(my_queue_representation->head_counter.load(std::memory_order_relaxed)) <= target;
465             };
466 
467             try_call( [&] {
468                 internal_wait(my_monitors, cbq_slots_avail_tag, target, pred);
469             }).on_exception( [&] {
470                 my_queue_representation->choose(ticket).abort_push(ticket, *my_queue_representation);
471             });
472 
473         }
474         __TBB_ASSERT((static_cast<std::ptrdiff_t>(my_queue_representation->head_counter.load(std::memory_order_relaxed)) > target), nullptr);
475         my_queue_representation->choose(ticket).push(ticket, *my_queue_representation, std::forward<Args>(args)...);
476         r1::notify_bounded_queue_monitor(my_monitors, cbq_items_avail_tag, ticket);
477     }
478 
479     template <typename... Args>
480     bool internal_push_if_not_full( Args&&... args ) {
481         ticket_type ticket = my_queue_representation->tail_counter.load(std::memory_order_relaxed);
482         do {
483             if (static_cast<std::ptrdiff_t>(ticket - my_queue_representation->head_counter.load(std::memory_order_relaxed)) >= my_capacity) {
484                 // Queue is full
485                 return false;
486             }
487             // Queue had empty slot with ticket k when we looked. Attempt to claim that slot.
488             // Another thread claimed the slot, so retry.
489         } while (!my_queue_representation->tail_counter.compare_exchange_strong(ticket, ticket + 1));
490 
491         my_queue_representation->choose(ticket).push(ticket, *my_queue_representation, std::forward<Args>(args)...);
492         r1::notify_bounded_queue_monitor(my_monitors, cbq_items_avail_tag, ticket);
493         return true;
494     }
495 
496     bool internal_pop( void* dst ) {
497         std::ptrdiff_t target;
498         // This loop is a single pop operation; abort_counter should not be re-read inside
499         unsigned old_abort_counter = my_abort_counter.load(std::memory_order_relaxed);
500 
501         do {
502             target = my_queue_representation->head_counter++;
503             if (static_cast<std::ptrdiff_t>(my_queue_representation->tail_counter.load(std::memory_order_relaxed)) <= target) {
504                 auto pred = [&] {
505                     if (my_abort_counter.load(std::memory_order_relaxed) != old_abort_counter) {
506                             throw_exception(exception_id::user_abort);
507                     }
508 
509                     return static_cast<std::ptrdiff_t>(my_queue_representation->tail_counter.load(std::memory_order_relaxed)) <= target;
510                 };
511 
512                 try_call( [&] {
513                     internal_wait(my_monitors, cbq_items_avail_tag, target, pred);
514                 }).on_exception( [&] {
515                     my_queue_representation->head_counter--;
516                 });
517             }
518             __TBB_ASSERT(static_cast<std::ptrdiff_t>(my_queue_representation->tail_counter.load(std::memory_order_relaxed)) > target, nullptr);
519         } while (!my_queue_representation->choose(target).pop(dst, target, *my_queue_representation));
520 
521         r1::notify_bounded_queue_monitor(my_monitors, cbq_slots_avail_tag, target);
522         return true;
523     }
524 
525     bool internal_pop_if_present( void* dst ) {
526         ticket_type ticket;
527         do {
528             ticket = my_queue_representation->head_counter.load(std::memory_order_relaxed);
529             do {
530                 if (static_cast<std::ptrdiff_t>(my_queue_representation->tail_counter.load(std::memory_order_relaxed) - ticket) <= 0) { // queue is empty
531                     // Queue is empty
532                     return false;
533                 }
534                 // Queue had item with ticket k when we looked.  Attempt to get that item.
535                 // Another thread snatched the item, retry.
536             } while (!my_queue_representation->head_counter.compare_exchange_strong(ticket, ticket + 1));
537         } while (!my_queue_representation->choose(ticket).pop(dst, ticket, *my_queue_representation));
538 
539         r1::notify_bounded_queue_monitor(my_monitors, cbq_slots_avail_tag, ticket);
540         return true;
541     }
542 
543     void internal_abort() {
544         ++my_abort_counter;
545         r1::abort_bounded_queue_monitors(my_monitors);
546     }
547 
548     static void copy_construct_item(T* location, const void* src) {
549         // TODO: use allocator_traits for copy construction
550         new (location) value_type(*static_cast<const value_type*>(src));
551     }
552 
553     static void move_construct_item(T* location, const void* src) {
554         // TODO: use allocator_traits for move construction
555         new (location) value_type(std::move(*static_cast<value_type*>(const_cast<void*>(src))));
556     }
557 
558     template <typename Container, typename Value, typename A>
559     friend class concurrent_queue_iterator;
560 
561     queue_allocator_type my_allocator;
562     std::ptrdiff_t my_capacity;
563     std::atomic<unsigned> my_abort_counter;
564     queue_representation_type* my_queue_representation;
565 
566     r1::concurrent_monitor* my_monitors;
567 }; // class concurrent_bounded_queue
568 
569 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
570 // Deduction guide for the constructor from two iterators
571 template <typename InputIterator,
572           typename T = typename std::iterator_traits<InputIterator>::value_type,
573           typename A = tbb::cache_aligned_allocator<T>>
574 concurrent_bounded_queue(InputIterator, InputIterator, const A& = A())
575 ->concurrent_bounded_queue<T, A>;
576 #endif /* __TBB_CPP17_DEDUCTION_GUIDES_PRESENT */
577 
578 } //namespace d1
579 } // namesapce detail
580 
581 inline namespace v1 {
582 
583 using detail::d1::concurrent_queue;
584 using detail::d1::concurrent_bounded_queue;
585 using detail::r1::user_abort;
586 using detail::r1::bad_last_alloc;
587 
588 } // inline namespace v1
589 } // namespace tbb
590 
591 #endif // __TBB_concurrent_queue_H
592