xref: /oneTBB/include/tbb/concurrent_queue.h (revision 51c0b2f7)
1 /*
2     Copyright (c) 2005-2020 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #ifndef __TBB_concurrent_queue_H
18 #define __TBB_concurrent_queue_H
19 
20 #include "detail/_concurrent_queue_base.h"
21 #include "detail/_allocator_traits.h"
22 #include "detail/_exception.h"
23 #include "cache_aligned_allocator.h"
24 
25 namespace tbb {
26 namespace detail {
27 namespace d1 {
28 
29 // A high-performance thread-safe non-blocking concurrent queue.
30 // Multiple threads may each push and pop concurrently.
31 // Assignment construction is not allowed.
32 template <typename T, typename Allocator = tbb::cache_aligned_allocator<T>>
33 class concurrent_queue {
34     using allocator_traits_type = tbb::detail::allocator_traits<Allocator>;
35     using queue_representation_type = concurrent_queue_rep<T, Allocator>;
36     using queue_allocator_type = typename allocator_traits_type::template rebind_alloc<queue_representation_type>;
37     using queue_allocator_traits = tbb::detail::allocator_traits<queue_allocator_type>;
38 public:
39     using size_type = std::size_t;
40     using value_type = T;
41     using reference = T&;
42     using const_reference = const T&;
43     using difference_type = std::ptrdiff_t;
44 
45     using allocator_type = Allocator;
46     using pointer = typename allocator_traits_type::pointer;
47     using const_pointer = typename allocator_traits_type::const_pointer;
48 
49     using iterator = concurrent_queue_iterator<concurrent_queue, T, Allocator>;
50     using const_iterator = concurrent_queue_iterator<concurrent_queue, const T, Allocator>;
51 
52     concurrent_queue() : concurrent_queue(allocator_type()) {}
53 
54     explicit concurrent_queue(const allocator_type& a) :
55         my_allocator(a), my_queue_representation(nullptr)
56     {
57         my_queue_representation = static_cast<queue_representation_type*>(r1::cache_aligned_allocate(sizeof(queue_representation_type)));
58         queue_allocator_traits::construct(my_allocator, my_queue_representation, my_allocator);
59 
60         __TBB_ASSERT(is_aligned(my_queue_representation, max_nfs_size), "alignment error" );
61         __TBB_ASSERT(is_aligned(&my_queue_representation->head_counter, max_nfs_size), "alignment error" );
62         __TBB_ASSERT(is_aligned(&my_queue_representation->tail_counter, max_nfs_size), "alignment error" );
63         __TBB_ASSERT(is_aligned(&my_queue_representation->array, max_nfs_size), "alignment error" );
64     }
65 
66     template <typename InputIterator>
67     concurrent_queue(InputIterator begin, InputIterator end, const allocator_type& a = allocator_type()) :
68         concurrent_queue(a)
69     {
70         for (; begin != end; ++begin)
71             push(*begin);
72     }
73 
74     concurrent_queue(const concurrent_queue& src, const allocator_type& a) :
75         concurrent_queue(a)
76     {
77         my_queue_representation->assign(*src.my_queue_representation, copy_construct_item);
78     }
79 
80     concurrent_queue(const concurrent_queue& src) :
81         concurrent_queue(queue_allocator_traits::select_on_container_copy_construction(src.get_allocator()))
82     {
83         my_queue_representation->assign(*src.my_queue_representation, copy_construct_item);
84     }
85 
86     // Move constructors
87     concurrent_queue(concurrent_queue&& src) :
88         concurrent_queue(std::move(src.my_allocator))
89     {
90         internal_swap(src);
91     }
92 
93     concurrent_queue(concurrent_queue&& src, const allocator_type& a) :
94         concurrent_queue(a)
95     {
96         // checking that memory allocated by one instance of allocator can be deallocated
97         // with another
98         if (my_allocator == src.my_allocator) {
99             internal_swap(src);
100         } else {
101             // allocators are different => performing per-element move
102             my_queue_representation->assign(*src.my_queue_representation, move_construct_item);
103             src.clear();
104         }
105     }
106 
107     // Destroy queue
108     ~concurrent_queue() {
109         clear();
110         my_queue_representation->clear();
111         queue_allocator_traits::destroy(my_allocator, my_queue_representation);
112         r1::cache_aligned_deallocate(my_queue_representation);
113     }
114 
115     // Enqueue an item at tail of queue.
116     void push(const T& value) {
117         internal_push(value);
118     }
119 
120     void push(T&& value) {
121         internal_push(std::move(value));
122     }
123 
124     template <typename... Args>
125     void emplace( Args&&... args ) {
126         internal_push(std::forward<Args>(args)...);
127     }
128 
129     // Attempt to dequeue an item from head of queue.
130     /** Does not wait for item to become available.
131         Returns true if successful; false otherwise. */
132     bool try_pop( T& result ) {
133         return internal_try_pop(&result);
134     }
135 
136     // Return the number of items in the queue; thread unsafe
137     size_type unsafe_size() const {
138         std::ptrdiff_t size = my_queue_representation->size();
139         return size < 0 ? 0 :  size_type(size);
140     }
141 
142     // Equivalent to size()==0.
143     bool empty() const {
144         return my_queue_representation->empty();
145     }
146 
147     // Clear the queue. not thread-safe.
148     void clear() {
149         while (!empty()) {
150             T value;
151             try_pop(value);
152         }
153     }
154 
155     // Return allocator object
156     allocator_type get_allocator() const { return my_allocator; }
157 
158     //------------------------------------------------------------------------
159     // The iterators are intended only for debugging.  They are slow and not thread safe.
160     //------------------------------------------------------------------------
161 
162     iterator unsafe_begin() { return concurrent_queue_iterator_provider::get<iterator>(*this); }
163     iterator unsafe_end() { return iterator(); }
164     const_iterator unsafe_begin() const { return concurrent_queue_iterator_provider::get<const_iterator>(*this); }
165     const_iterator unsafe_end() const { return const_iterator(); }
166     const_iterator unsafe_cbegin() const { return concurrent_queue_iterator_provider::get<const_iterator>(*this); }
167     const_iterator unsafe_cend() const { return const_iterator(); }
168 
169 private:
170     void internal_swap(concurrent_queue& src) {
171         using std::swap;
172         swap(my_queue_representation, src.my_queue_representation);
173     }
174 
175     template <typename... Args>
176     void internal_push( Args&&... args ) {
177         ticket_type k = my_queue_representation->tail_counter++;
178         my_queue_representation->choose(k).push(k, *my_queue_representation, std::forward<Args>(args)...);
179     }
180 
181     bool internal_try_pop( void* dst ) {
182         ticket_type k;
183         do {
184             k = my_queue_representation->head_counter.load(std::memory_order_relaxed);
185             do {
186                 if (static_cast<std::ptrdiff_t>(my_queue_representation->tail_counter.load(std::memory_order_relaxed) - k) <= 0) {
187                     // Queue is empty
188                     return false;
189                 }
190 
191                 // Queue had item with ticket k when we looked. Attempt to get that item.
192                 // Another thread snatched the item, retry.
193             } while (!my_queue_representation->head_counter.compare_exchange_strong(k, k + 1));
194         } while (!my_queue_representation->choose(k).pop(dst, k, *my_queue_representation));
195         return true;
196     }
197 
198     template <typename Container, typename Value, typename A>
199     friend class concurrent_queue_iterator;
200 
201     static void copy_construct_item(T* location, const void* src) {
202         // TODO: use allocator_traits for copy construction
203         new (location) value_type(*static_cast<const value_type*>(src));
204         // queue_allocator_traits::construct(my_allocator, location, *static_cast<const T*>(src));
205     }
206 
207     static void move_construct_item(T* location, const void* src) {
208         // TODO: use allocator_traits for move construction
209         new (location) value_type(std::move(*static_cast<value_type*>(const_cast<void*>(src))));
210     }
211 
212     queue_allocator_type my_allocator;
213     queue_representation_type* my_queue_representation;
214 }; // class concurrent_queue
215 
216 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
217 // Deduction guide for the constructor from two iterators
218 template <typename InputIterator,
219           typename T = typename std::iterator_traits<InputIterator>::value_type,
220           typename A = tbb::cache_aligned_allocator<T>>
221 concurrent_queue(InputIterator, InputIterator, const A& = A())
222 ->concurrent_queue<T, A>;
223 #endif /* __TBB_CPP17_DEDUCTION_GUIDES_PRESENT */
224 
225 class concurrent_monitor;
226 
227 template <typename FuncType>
228 class delegated_function : public delegate_base {
229 public:
230     delegated_function(FuncType& f) : my_func(f) {}
231 
232     bool operator()() const override {
233         return my_func();
234     }
235 
236 private:
237     FuncType &my_func;
238 }; // class delegated_function
239 
240 // The concurrent monitor tags for concurrent_bounded_queue.
241 static constexpr std::size_t cbq_slots_avail_tag = 0;
242 static constexpr std::size_t cbq_items_avail_tag = 1;
243 } // namespace d1
244 
245 
246 namespace r1 {
247     class concurrent_monitor;
248 
249     std::uint8_t* __TBB_EXPORTED_FUNC allocate_bounded_queue_rep( std::size_t queue_rep_size );
250     void __TBB_EXPORTED_FUNC deallocate_bounded_queue_rep( std::uint8_t* mem, std::size_t queue_rep_size );
251     void __TBB_EXPORTED_FUNC abort_bounded_queue_monitors( concurrent_monitor* monitors );
252     void __TBB_EXPORTED_FUNC notify_bounded_queue_monitor( concurrent_monitor* monitors, std::size_t monitor_tag
253                                                             , std::size_t ticket );
254     void __TBB_EXPORTED_FUNC wait_bounded_queue_monitor( concurrent_monitor* monitors, std::size_t monitor_tag,
255                                                             std::ptrdiff_t target, d1::delegate_base& predicate );
256 } // namespace r1
257 
258 
259 namespace d1 {
260 // A high-performance thread-safe blocking concurrent bounded queue.
261 // Supports boundedness and blocking semantics.
262 // Multiple threads may each push and pop concurrently.
263 // Assignment construction is not allowed.
264 template <typename T, typename Allocator = tbb::cache_aligned_allocator<T>>
265 class concurrent_bounded_queue {
266     using allocator_traits_type = tbb::detail::allocator_traits<Allocator>;
267     using queue_representation_type = concurrent_queue_rep<T, Allocator>;
268     using queue_allocator_type = typename allocator_traits_type::template rebind_alloc<queue_representation_type>;
269     using queue_allocator_traits = tbb::detail::allocator_traits<queue_allocator_type>;
270 
271     template <typename FuncType>
272     void internal_wait(r1::concurrent_monitor* monitors, std::size_t monitor_tag, std::ptrdiff_t target, FuncType pred) {
273         delegated_function<FuncType> func(pred);
274         r1::wait_bounded_queue_monitor(monitors, monitor_tag, target, func);
275     }
276 public:
277     using size_type = std::ptrdiff_t;
278     using value_type = T;
279     using reference = T&;
280     using const_reference = const T&;
281     using difference_type = std::ptrdiff_t;
282 
283     using allocator_type = Allocator;
284     using pointer = typename allocator_traits_type::pointer;
285     using const_pointer = typename allocator_traits_type::const_pointer;
286 
287     using iterator = concurrent_queue_iterator<concurrent_bounded_queue, T, Allocator>;
288     using const_iterator = concurrent_queue_iterator<concurrent_bounded_queue, const T, Allocator> ;
289 
290     concurrent_bounded_queue() : concurrent_bounded_queue(allocator_type()) {}
291 
292     explicit concurrent_bounded_queue( const allocator_type& a ) :
293         my_allocator(a), my_capacity(0), my_abort_counter(0), my_queue_representation(nullptr)
294     {
295         my_queue_representation = reinterpret_cast<queue_representation_type*>(
296             r1::allocate_bounded_queue_rep(sizeof(queue_representation_type)));
297         my_monitors = reinterpret_cast<r1::concurrent_monitor*>(my_queue_representation + 1);
298         queue_allocator_traits::construct(my_allocator, my_queue_representation, my_allocator);
299         my_capacity = std::size_t(-1) / (queue_representation_type::item_size > 1 ? queue_representation_type::item_size : 2);
300 
301         __TBB_ASSERT(is_aligned(my_queue_representation, max_nfs_size), "alignment error" );
302         __TBB_ASSERT(is_aligned(&my_queue_representation->head_counter, max_nfs_size), "alignment error" );
303         __TBB_ASSERT(is_aligned(&my_queue_representation->tail_counter, max_nfs_size), "alignment error" );
304         __TBB_ASSERT(is_aligned(&my_queue_representation->array, max_nfs_size), "alignment error" );
305     }
306 
307     template <typename InputIterator>
308     concurrent_bounded_queue( InputIterator begin, InputIterator end, const allocator_type& a = allocator_type() ) :
309         concurrent_bounded_queue(a)
310     {
311         for (; begin != end; ++begin)
312             push(*begin);
313     }
314 
315     concurrent_bounded_queue( const concurrent_bounded_queue& src, const allocator_type& a ) :
316         concurrent_bounded_queue(a)
317     {
318         my_queue_representation->assign(*src.my_queue_representation, copy_construct_item);
319     }
320 
321     concurrent_bounded_queue( const concurrent_bounded_queue& src ) :
322         concurrent_bounded_queue(queue_allocator_traits::select_on_container_copy_construction(src.get_allocator()))
323     {
324         my_queue_representation->assign(*src.my_queue_representation, copy_construct_item);
325     }
326 
327     // Move constructors
328     concurrent_bounded_queue( concurrent_bounded_queue&& src ) :
329         concurrent_bounded_queue(std::move(src.my_allocator))
330     {
331         internal_swap(src);
332     }
333 
334     concurrent_bounded_queue( concurrent_bounded_queue&& src, const allocator_type& a ) :
335         concurrent_bounded_queue(a)
336     {
337         // checking that memory allocated by one instance of allocator can be deallocated
338         // with another
339         if (my_allocator == src.my_allocator) {
340             internal_swap(src);
341         } else {
342             // allocators are different => performing per-element move
343             my_queue_representation->assign(*src.my_queue_representation, move_construct_item);
344             src.clear();
345         }
346     }
347 
348     // Destroy queue
349     ~concurrent_bounded_queue() {
350         clear();
351         my_queue_representation->clear();
352         queue_allocator_traits::destroy(my_allocator, my_queue_representation);
353         r1::deallocate_bounded_queue_rep(reinterpret_cast<std::uint8_t*>(my_queue_representation),
354                                          sizeof(queue_representation_type));
355     }
356 
357     // Enqueue an item at tail of queue.
358     void push( const T& value ) {
359         internal_push(value);
360     }
361 
362     void push( T&& value ) {
363         internal_push(std::move(value));
364     }
365 
366     // Enqueue an item at tail of queue if queue is not already full.
367     // Does not wait for queue to become not full.
368     // Returns true if item is pushed; false if queue was already full.
369     bool try_push( const T& value ) {
370         return internal_push_if_not_full(value);
371     }
372 
373     bool try_push( T&& value ) {
374         return internal_push_if_not_full(std::move(value));
375     }
376 
377     template <typename... Args>
378     void emplace( Args&&... args ) {
379         internal_push(std::forward<Args>(args)...);
380     }
381 
382     template <typename... Args>
383     bool try_emplace( Args&&... args ) {
384         return internal_push_if_not_full(std::forward<Args>(args)...);
385     }
386 
387     // Attempt to dequeue an item from head of queue.
388     /** Does not wait for item to become available.
389         Returns true if successful; false otherwise. */
390     bool pop( T& result ) {
391         return internal_pop(&result);
392     }
393 
394     bool try_pop( T& result ) {
395         return internal_pop_if_present(&result);
396     }
397 
398     void abort() {
399         internal_abort();
400     }
401 
402     // Return the number of items in the queue; thread unsafe
403     std::ptrdiff_t size() const {
404         return my_queue_representation->size();
405     }
406 
407     void set_capacity( size_type new_capacity ) {
408         std::ptrdiff_t c = new_capacity < 0 ? infinite_capacity : new_capacity;
409         my_capacity = c;
410     }
411 
412     size_type capacity() const {
413         return my_capacity;
414     }
415 
416     // Equivalent to size()==0.
417     bool empty() const {
418         return my_queue_representation->empty();
419     }
420 
421     // Clear the queue. not thread-safe.
422     void clear() {
423         while (!empty()) {
424             T value;
425             try_pop(value);
426         }
427     }
428 
429     // Return allocator object
430     allocator_type get_allocator() const { return my_allocator; }
431 
432     //------------------------------------------------------------------------
433     // The iterators are intended only for debugging.  They are slow and not thread safe.
434     //------------------------------------------------------------------------
435 
436     iterator unsafe_begin() { return concurrent_queue_iterator_provider::get<iterator>(*this); }
437     iterator unsafe_end() { return iterator(); }
438     const_iterator unsafe_begin() const { return concurrent_queue_iterator_provider::get<const_iterator>(*this); }
439     const_iterator unsafe_end() const { return const_iterator(); }
440     const_iterator unsafe_cbegin() const { return concurrent_queue_iterator_provider::get<const_iterator>(*this); }
441     const_iterator unsafe_cend() const { return const_iterator(); }
442 
443 private:
444     void internal_swap( concurrent_bounded_queue& src ) {
445         std::swap(my_queue_representation, src.my_queue_representation);
446         std::swap(my_monitors, src.my_monitors);
447     }
448 
449     static constexpr std::ptrdiff_t infinite_capacity = std::ptrdiff_t(~size_type(0) / 2);
450 
451     template <typename... Args>
452     void internal_push( Args&&... args ) {
453         unsigned old_abort_counter = my_abort_counter.load(std::memory_order_relaxed);
454         ticket_type ticket = my_queue_representation->tail_counter++;
455         std::ptrdiff_t target = ticket - my_capacity;
456 
457         if (static_cast<std::ptrdiff_t>(my_queue_representation->head_counter.load(std::memory_order_relaxed)) <= target) { // queue is full
458             auto pred = [&] {
459                 if (my_abort_counter.load(std::memory_order_relaxed) != old_abort_counter) {
460                     throw_exception(exception_id::user_abort);
461                 }
462 
463                 return static_cast<std::ptrdiff_t>(my_queue_representation->head_counter.load(std::memory_order_relaxed)) <= target;
464             };
465 
466             try_call( [&] {
467                 internal_wait(my_monitors, cbq_slots_avail_tag, target, pred);
468             }).on_exception( [&] {
469                 my_queue_representation->choose(ticket).abort_push(ticket, *my_queue_representation);
470             });
471 
472         }
473         __TBB_ASSERT((static_cast<std::ptrdiff_t>(my_queue_representation->head_counter.load(std::memory_order_relaxed)) > target), nullptr);
474         my_queue_representation->choose(ticket).push(ticket, *my_queue_representation, std::forward<Args>(args)...);
475         r1::notify_bounded_queue_monitor(my_monitors, cbq_items_avail_tag, ticket);
476     }
477 
478     template <typename... Args>
479     bool internal_push_if_not_full( Args&&... args ) {
480         ticket_type ticket = my_queue_representation->tail_counter.load(std::memory_order_relaxed);
481         do {
482             if (static_cast<std::ptrdiff_t>(ticket - my_queue_representation->head_counter.load(std::memory_order_relaxed)) >= my_capacity) {
483                 // Queue is full
484                 return false;
485             }
486             // Queue had empty slot with ticket k when we looked. Attempt to claim that slot.
487             // Another thread claimed the slot, so retry.
488         } while (!my_queue_representation->tail_counter.compare_exchange_strong(ticket, ticket + 1));
489 
490         my_queue_representation->choose(ticket).push(ticket, *my_queue_representation, std::forward<Args>(args)...);
491         r1::notify_bounded_queue_monitor(my_monitors, cbq_items_avail_tag, ticket);
492         return true;
493     }
494 
495     bool internal_pop( void* dst ) {
496         std::ptrdiff_t target;
497         // This loop is a single pop operation; abort_counter should not be re-read inside
498         unsigned old_abort_counter = my_abort_counter.load(std::memory_order_relaxed);
499 
500         do {
501             target = my_queue_representation->head_counter++;
502             if (static_cast<std::ptrdiff_t>(my_queue_representation->tail_counter.load(std::memory_order_relaxed)) <= target) {
503                 auto pred = [&] {
504                     if (my_abort_counter.load(std::memory_order_relaxed) != old_abort_counter) {
505                             throw_exception(exception_id::user_abort);
506                     }
507 
508                     return static_cast<std::ptrdiff_t>(my_queue_representation->tail_counter.load(std::memory_order_relaxed)) <= target;
509                 };
510 
511                 try_call( [&] {
512                     internal_wait(my_monitors, cbq_items_avail_tag, target, pred);
513                 }).on_exception( [&] {
514                     my_queue_representation->head_counter--;
515                 });
516             }
517             __TBB_ASSERT(static_cast<std::ptrdiff_t>(my_queue_representation->tail_counter.load(std::memory_order_relaxed)) > target, nullptr);
518         } while (!my_queue_representation->choose(target).pop(dst, target, *my_queue_representation));
519 
520         r1::notify_bounded_queue_monitor(my_monitors, cbq_slots_avail_tag, target);
521         return true;
522     }
523 
524     bool internal_pop_if_present( void* dst ) {
525         ticket_type ticket;
526         do {
527             ticket = my_queue_representation->head_counter.load(std::memory_order_relaxed);
528             do {
529                 if (static_cast<std::ptrdiff_t>(my_queue_representation->tail_counter.load(std::memory_order_relaxed) - ticket) <= 0) { // queue is empty
530                     // Queue is empty
531                     return false;
532                 }
533                 // Queue had item with ticket k when we looked.  Attempt to get that item.
534                 // Another thread snatched the item, retry.
535             } while (!my_queue_representation->head_counter.compare_exchange_strong(ticket, ticket + 1));
536         } while (!my_queue_representation->choose(ticket).pop(dst, ticket, *my_queue_representation));
537 
538         r1::notify_bounded_queue_monitor(my_monitors, cbq_slots_avail_tag, ticket);
539         return true;
540     }
541 
542     void internal_abort() {
543         ++my_abort_counter;
544         r1::abort_bounded_queue_monitors(my_monitors);
545     }
546 
547     static void copy_construct_item(T* location, const void* src) {
548         // TODO: use allocator_traits for copy construction
549         new (location) value_type(*static_cast<const value_type*>(src));
550     }
551 
552     static void move_construct_item(T* location, const void* src) {
553         // TODO: use allocator_traits for move construction
554         new (location) value_type(std::move(*static_cast<value_type*>(const_cast<void*>(src))));
555     }
556 
557     template <typename Container, typename Value, typename A>
558     friend class concurrent_queue_iterator;
559 
560     queue_allocator_type my_allocator;
561     std::ptrdiff_t my_capacity;
562     std::atomic<unsigned> my_abort_counter;
563     queue_representation_type* my_queue_representation;
564 
565     r1::concurrent_monitor* my_monitors;
566 }; // class concurrent_bounded_queue
567 
568 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
569 // Deduction guide for the constructor from two iterators
570 template <typename InputIterator,
571           typename T = typename std::iterator_traits<InputIterator>::value_type,
572           typename A = tbb::cache_aligned_allocator<T>>
573 concurrent_bounded_queue(InputIterator, InputIterator, const A& = A())
574 ->concurrent_bounded_queue<T, A>;
575 #endif /* __TBB_CPP17_DEDUCTION_GUIDES_PRESENT */
576 
577 } //namespace d1
578 } // namesapce detail
579 
580 inline namespace v1 {
581 
582 using detail::d1::concurrent_queue;
583 using detail::d1::concurrent_bounded_queue;
584 using detail::r1::user_abort;
585 using detail::r1::bad_last_alloc;
586 
587 } // inline namespace v1
588 } // namespace tbb
589 
590 #endif // __TBB_concurrent_queue_H
591