1 /*
2     Copyright (c) 2005-2022 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #ifndef __TBB_detail__concurrent_queue_base_H
18 #define __TBB_detail__concurrent_queue_base_H
19 
20 #include "_utils.h"
21 #include "_exception.h"
22 #include "_machine.h"
23 #include "_allocator_traits.h"
24 
25 #include "../profiling.h"
26 #include "../spin_mutex.h"
27 #include "../cache_aligned_allocator.h"
28 
29 #include <atomic>
30 
31 namespace tbb {
32 namespace detail {
33 namespace d2 {
34 
35 using ticket_type = std::size_t;
36 
37 template <typename Page>
is_valid_page(const Page p)38 inline bool is_valid_page(const Page p) {
39     return reinterpret_cast<std::uintptr_t>(p) > 1;
40 }
41 
42 template <typename T, typename Allocator>
43 struct concurrent_queue_rep;
44 
45 template <typename Container, typename T, typename Allocator>
46 class micro_queue_pop_finalizer;
47 
48 #if _MSC_VER && !defined(__INTEL_COMPILER)
49 // unary minus operator applied to unsigned type, result still unsigned
50 #pragma warning( push )
51 #pragma warning( disable: 4146 )
52 #endif
53 
54 // A queue using simple locking.
55 // For efficiency, this class has no constructor.
56 // The caller is expected to zero-initialize it.
57 template <typename T, typename Allocator>
58 class micro_queue {
59 private:
60     using queue_rep_type = concurrent_queue_rep<T, Allocator>;
61     using self_type = micro_queue<T, Allocator>;
62 public:
63     using size_type = std::size_t;
64     using value_type = T;
65     using reference = value_type&;
66     using const_reference = const value_type&;
67 
68     using allocator_type = Allocator;
69     using allocator_traits_type = tbb::detail::allocator_traits<allocator_type>;
70     using queue_allocator_type = typename allocator_traits_type::template rebind_alloc<queue_rep_type>;
71 
72     static constexpr size_type item_size = sizeof(T);
73     static constexpr size_type items_per_page = item_size <=   8 ? 32 :
74                                                 item_size <=  16 ? 16 :
75                                                 item_size <=  32 ?  8 :
76                                                 item_size <=  64 ?  4 :
77                                                 item_size <= 128 ?  2 : 1;
78 
79     struct padded_page {
padded_pagepadded_page80         padded_page() {}
~padded_pagepadded_page81         ~padded_page() {}
82 
83         reference operator[] (std::size_t index) {
84             __TBB_ASSERT(index < items_per_page, "Index out of range");
85             return items[index];
86         }
87 
88         const_reference operator[] (std::size_t index) const {
89             __TBB_ASSERT(index < items_per_page, "Index out of range");
90             return items[index];
91         }
92 
93         padded_page* next{ nullptr };
94         std::atomic<std::uintptr_t> mask{};
95 
96         union {
97             value_type items[items_per_page];
98         };
99     }; // struct padded_page
100 
101     using page_allocator_type = typename allocator_traits_type::template rebind_alloc<padded_page>;
102 protected:
103     using page_allocator_traits = tbb::detail::allocator_traits<page_allocator_type>;
104 
105 public:
106     using item_constructor_type = void (*)(value_type* location, const void* src);
107     micro_queue() = default;
108     micro_queue( const micro_queue& ) = delete;
109     micro_queue& operator=( const micro_queue& ) = delete;
110 
prepare_page(ticket_type k,queue_rep_type & base,page_allocator_type page_allocator,padded_page * & p)111     size_type prepare_page( ticket_type k, queue_rep_type& base, page_allocator_type page_allocator,
112                             padded_page*& p ) {
113         __TBB_ASSERT(p == nullptr, "Invalid page argument for prepare_page");
114         k &= -queue_rep_type::n_queue;
115         size_type index = modulo_power_of_two(k / queue_rep_type::n_queue, items_per_page);
116         if (!index) {
117             try_call( [&] {
118                 p = page_allocator_traits::allocate(page_allocator, 1);
119             }).on_exception( [&] {
120                 ++base.n_invalid_entries;
121                 invalidate_page( k );
122             });
123             page_allocator_traits::construct(page_allocator, p);
124         }
125 
126         spin_wait_until_my_turn(tail_counter, k, base);
127         d1::call_itt_notify(d1::acquired, &tail_counter);
128 
129         if (p) {
130             spin_mutex::scoped_lock lock( page_mutex );
131             padded_page* q = tail_page.load(std::memory_order_relaxed);
132             if (is_valid_page(q)) {
133                 q->next = p;
134             } else {
135                 head_page.store(p, std::memory_order_relaxed);
136             }
137             tail_page.store(p, std::memory_order_relaxed);
138         } else {
139             p = tail_page.load(std::memory_order_relaxed);
140         }
141         return index;
142     }
143 
144     template<typename... Args>
push(ticket_type k,queue_rep_type & base,queue_allocator_type & allocator,Args &&...args)145     void push( ticket_type k, queue_rep_type& base, queue_allocator_type& allocator, Args&&... args )
146     {
147         padded_page* p = nullptr;
148         page_allocator_type page_allocator(allocator);
149         size_type index = prepare_page(k, base, page_allocator, p);
150         __TBB_ASSERT(p != nullptr, "Page was not prepared");
151 
152         // try_call API is not convenient here due to broken
153         // variadic capture on GCC 4.8.5
154         auto value_guard = make_raii_guard([&] {
155             ++base.n_invalid_entries;
156             d1::call_itt_notify(d1::releasing, &tail_counter);
157             tail_counter.fetch_add(queue_rep_type::n_queue);
158         });
159 
160         page_allocator_traits::construct(page_allocator, &(*p)[index], std::forward<Args>(args)...);
161         // If no exception was thrown, mark item as present.
162         p->mask.store(p->mask.load(std::memory_order_relaxed) | uintptr_t(1) << index, std::memory_order_relaxed);
163         d1::call_itt_notify(d1::releasing, &tail_counter);
164 
165         value_guard.dismiss();
166         tail_counter.fetch_add(queue_rep_type::n_queue);
167     }
168 
abort_push(ticket_type k,queue_rep_type & base,queue_allocator_type & allocator)169     void abort_push( ticket_type k, queue_rep_type& base, queue_allocator_type& allocator ) {
170         padded_page* p = nullptr;
171         prepare_page(k, base, allocator, p);
172         ++base.n_invalid_entries;
173         tail_counter.fetch_add(queue_rep_type::n_queue);
174     }
175 
pop(void * dst,ticket_type k,queue_rep_type & base,queue_allocator_type & allocator)176     bool pop( void* dst, ticket_type k, queue_rep_type& base, queue_allocator_type& allocator ) {
177         k &= -queue_rep_type::n_queue;
178         spin_wait_until_eq(head_counter, k);
179         d1::call_itt_notify(d1::acquired, &head_counter);
180         spin_wait_while_eq(tail_counter, k);
181         d1::call_itt_notify(d1::acquired, &tail_counter);
182         padded_page *p = head_page.load(std::memory_order_relaxed);
183         __TBB_ASSERT( p, nullptr );
184         size_type index = modulo_power_of_two( k/queue_rep_type::n_queue, items_per_page );
185         bool success = false;
186         {
187             page_allocator_type page_allocator(allocator);
188             micro_queue_pop_finalizer<self_type, value_type, page_allocator_type> finalizer(*this, page_allocator,
189                 k + queue_rep_type::n_queue, index == items_per_page - 1 ? p : nullptr );
190             if (p->mask.load(std::memory_order_relaxed) & (std::uintptr_t(1) << index)) {
191                 success = true;
192                 assign_and_destroy_item(dst, *p, index);
193             } else {
194                 --base.n_invalid_entries;
195             }
196         }
197         return success;
198     }
199 
assign(const micro_queue & src,queue_allocator_type & allocator,item_constructor_type construct_item)200     micro_queue& assign( const micro_queue& src, queue_allocator_type& allocator,
201         item_constructor_type construct_item )
202     {
203         head_counter.store(src.head_counter.load(std::memory_order_relaxed), std::memory_order_relaxed);
204         tail_counter.store(src.tail_counter.load(std::memory_order_relaxed), std::memory_order_relaxed);
205 
206         const padded_page* srcp = src.head_page.load(std::memory_order_relaxed);
207         if( is_valid_page(srcp) ) {
208             ticket_type g_index = head_counter.load(std::memory_order_relaxed);
209             size_type n_items  = (tail_counter.load(std::memory_order_relaxed) - head_counter.load(std::memory_order_relaxed))
210                 / queue_rep_type::n_queue;
211             size_type index = modulo_power_of_two(head_counter.load(std::memory_order_relaxed) / queue_rep_type::n_queue, items_per_page);
212             size_type end_in_first_page = (index+n_items < items_per_page) ? (index + n_items) : items_per_page;
213 
214             try_call( [&] {
215                 head_page.store(make_copy(allocator, srcp, index, end_in_first_page, g_index, construct_item), std::memory_order_relaxed);
216             }).on_exception( [&] {
217                 head_counter.store(0, std::memory_order_relaxed);
218                 tail_counter.store(0, std::memory_order_relaxed);
219             });
220             padded_page* cur_page = head_page.load(std::memory_order_relaxed);
221 
222             try_call( [&] {
223                 if (srcp != src.tail_page.load(std::memory_order_relaxed)) {
224                     for (srcp = srcp->next; srcp != src.tail_page.load(std::memory_order_relaxed); srcp=srcp->next ) {
225                         cur_page->next = make_copy( allocator, srcp, 0, items_per_page, g_index, construct_item );
226                         cur_page = cur_page->next;
227                     }
228 
229                     __TBB_ASSERT(srcp == src.tail_page.load(std::memory_order_relaxed), nullptr );
230                     size_type last_index = modulo_power_of_two(tail_counter.load(std::memory_order_relaxed) / queue_rep_type::n_queue, items_per_page);
231                     if( last_index==0 ) last_index = items_per_page;
232 
233                     cur_page->next = make_copy( allocator, srcp, 0, last_index, g_index, construct_item );
234                     cur_page = cur_page->next;
235                 }
236                 tail_page.store(cur_page, std::memory_order_relaxed);
237             }).on_exception( [&] {
238                 padded_page* invalid_page = reinterpret_cast<padded_page*>(std::uintptr_t(1));
239                 tail_page.store(invalid_page, std::memory_order_relaxed);
240             });
241         } else {
242             head_page.store(nullptr, std::memory_order_relaxed);
243             tail_page.store(nullptr, std::memory_order_relaxed);
244         }
245         return *this;
246     }
247 
make_copy(queue_allocator_type & allocator,const padded_page * src_page,size_type begin_in_page,size_type end_in_page,ticket_type & g_index,item_constructor_type construct_item)248     padded_page* make_copy( queue_allocator_type& allocator, const padded_page* src_page, size_type begin_in_page,
249         size_type end_in_page, ticket_type& g_index, item_constructor_type construct_item )
250     {
251         page_allocator_type page_allocator(allocator);
252         padded_page* new_page = page_allocator_traits::allocate(page_allocator, 1);
253         new_page->next = nullptr;
254         new_page->mask.store(src_page->mask.load(std::memory_order_relaxed), std::memory_order_relaxed);
255         for (; begin_in_page!=end_in_page; ++begin_in_page, ++g_index) {
256             if (new_page->mask.load(std::memory_order_relaxed) & uintptr_t(1) << begin_in_page) {
257                 copy_item(*new_page, begin_in_page, *src_page, begin_in_page, construct_item);
258             }
259         }
260         return new_page;
261     }
262 
invalidate_page(ticket_type k)263     void invalidate_page( ticket_type k )  {
264         // Append an invalid page at address 1 so that no more pushes are allowed.
265         padded_page* invalid_page = reinterpret_cast<padded_page*>(std::uintptr_t(1));
266         {
267             spin_mutex::scoped_lock lock( page_mutex );
268             tail_counter.store(k + queue_rep_type::n_queue + 1, std::memory_order_relaxed);
269             padded_page* q = tail_page.load(std::memory_order_relaxed);
270             if (is_valid_page(q)) {
271                 q->next = invalid_page;
272             } else {
273                 head_page.store(invalid_page, std::memory_order_relaxed);
274             }
275             tail_page.store(invalid_page, std::memory_order_relaxed);
276         }
277     }
278 
get_head_page()279     padded_page* get_head_page() {
280         return head_page.load(std::memory_order_relaxed);
281     }
282 
283     void clear(queue_allocator_type& allocator, padded_page* new_head = nullptr, padded_page* new_tail = nullptr) {
284         padded_page* curr_page = get_head_page();
285         size_type index = (head_counter.load(std::memory_order_relaxed) / queue_rep_type::n_queue) % items_per_page;
286         page_allocator_type page_allocator(allocator);
287 
288         while (curr_page && is_valid_page(curr_page)) {
289             while (index != items_per_page) {
290                 if (curr_page->mask.load(std::memory_order_relaxed) & (std::uintptr_t(1) << index)) {
291                     page_allocator_traits::destroy(page_allocator, &curr_page->operator[](index));
292                 }
293                 ++index;
294             }
295 
296             index = 0;
297             padded_page* next_page = curr_page->next;
298             page_allocator_traits::destroy(page_allocator, curr_page);
299             page_allocator_traits::deallocate(page_allocator, curr_page, 1);
300             curr_page = next_page;
301         }
302         head_counter.store(0, std::memory_order_relaxed);
303         tail_counter.store(0, std::memory_order_relaxed);
304         head_page.store(new_head, std::memory_order_relaxed);
305         tail_page.store(new_tail, std::memory_order_relaxed);
306     }
307 
clear_and_invalidate(queue_allocator_type & allocator)308     void clear_and_invalidate(queue_allocator_type& allocator) {
309         padded_page* invalid_page = reinterpret_cast<padded_page*>(std::uintptr_t(1));
310         clear(allocator, invalid_page, invalid_page);
311     }
312 
313 private:
314     // template <typename U, typename A>
315     friend class micro_queue_pop_finalizer<self_type, value_type, page_allocator_type>;
316 
317     // Class used to ensure exception-safety of method "pop"
318     class destroyer  {
319         value_type& my_value;
320     public:
destroyer(reference value)321         destroyer( reference value ) : my_value(value) {}
322         destroyer( const destroyer& ) = delete;
323         destroyer& operator=( const destroyer& ) = delete;
~destroyer()324         ~destroyer() {my_value.~T();}
325     }; // class destroyer
326 
copy_item(padded_page & dst,size_type dindex,const padded_page & src,size_type sindex,item_constructor_type construct_item)327     void copy_item( padded_page& dst, size_type dindex, const padded_page& src, size_type sindex,
328         item_constructor_type construct_item )
329     {
330         auto& src_item = src[sindex];
331         construct_item( &dst[dindex], static_cast<const void*>(&src_item) );
332     }
333 
assign_and_destroy_item(void * dst,padded_page & src,size_type index)334     void assign_and_destroy_item( void* dst, padded_page& src, size_type index ) {
335         auto& from = src[index];
336         destroyer d(from);
337         *static_cast<T*>(dst) = std::move(from);
338     }
339 
spin_wait_until_my_turn(std::atomic<ticket_type> & counter,ticket_type k,queue_rep_type & rb)340     void spin_wait_until_my_turn( std::atomic<ticket_type>& counter, ticket_type k, queue_rep_type& rb ) const {
341         for (atomic_backoff b{};; b.pause()) {
342             ticket_type c = counter.load(std::memory_order_acquire);
343             if (c == k) return;
344             else if (c & 1) {
345                 ++rb.n_invalid_entries;
346                 throw_exception( exception_id::bad_last_alloc);
347             }
348         }
349     }
350 
351     std::atomic<padded_page*> head_page{};
352     std::atomic<ticket_type> head_counter{};
353 
354     std::atomic<padded_page*> tail_page{};
355     std::atomic<ticket_type> tail_counter{};
356 
357     spin_mutex page_mutex{};
358 }; // class micro_queue
359 
360 #if _MSC_VER && !defined(__INTEL_COMPILER)
361 #pragma warning( pop )
362 #endif // warning 4146 is back
363 
364 template <typename Container, typename T, typename Allocator>
365 class micro_queue_pop_finalizer {
366 public:
367     using padded_page = typename Container::padded_page;
368     using allocator_type = Allocator;
369     using allocator_traits_type = tbb::detail::allocator_traits<allocator_type>;
370 
micro_queue_pop_finalizer(Container & queue,Allocator & alloc,ticket_type k,padded_page * p)371     micro_queue_pop_finalizer( Container& queue, Allocator& alloc, ticket_type k, padded_page* p ) :
372         my_ticket_type(k), my_queue(queue), my_page(p), allocator(alloc)
373     {}
374 
375     micro_queue_pop_finalizer( const micro_queue_pop_finalizer& ) = delete;
376     micro_queue_pop_finalizer& operator=( const micro_queue_pop_finalizer& ) = delete;
377 
~micro_queue_pop_finalizer()378     ~micro_queue_pop_finalizer() {
379         padded_page* p = my_page;
380         if( is_valid_page(p) ) {
381             spin_mutex::scoped_lock lock( my_queue.page_mutex );
382             padded_page* q = p->next;
383             my_queue.head_page.store(q, std::memory_order_relaxed);
384             if( !is_valid_page(q) ) {
385                 my_queue.tail_page.store(nullptr, std::memory_order_relaxed);
386             }
387         }
388         my_queue.head_counter.store(my_ticket_type, std::memory_order_release);
389         if ( is_valid_page(p) ) {
390             allocator_traits_type::destroy(allocator, static_cast<padded_page*>(p));
391             allocator_traits_type::deallocate(allocator, static_cast<padded_page*>(p), 1);
392         }
393     }
394 private:
395     ticket_type my_ticket_type;
396     Container& my_queue;
397     padded_page* my_page;
398     Allocator& allocator;
399 }; // class micro_queue_pop_finalizer
400 
401 #if _MSC_VER && !defined(__INTEL_COMPILER)
402 // structure was padded due to alignment specifier
403 #pragma warning( push )
404 #pragma warning( disable: 4324 )
405 #endif
406 
407 template <typename T, typename Allocator>
408 struct concurrent_queue_rep {
409     using self_type = concurrent_queue_rep<T, Allocator>;
410     using size_type = std::size_t;
411     using micro_queue_type = micro_queue<T, Allocator>;
412     using allocator_type = Allocator;
413     using allocator_traits_type = tbb::detail::allocator_traits<allocator_type>;
414     using padded_page = typename micro_queue_type::padded_page;
415     using page_allocator_type = typename micro_queue_type::page_allocator_type;
416     using item_constructor_type = typename micro_queue_type::item_constructor_type;
417 private:
418     using page_allocator_traits = tbb::detail::allocator_traits<page_allocator_type>;
419     using queue_allocator_type = typename allocator_traits_type::template rebind_alloc<self_type>;
420 
421 public:
422     // must be power of 2
423     static constexpr size_type n_queue = 8;
424     // Approximately n_queue/golden ratio
425     static constexpr size_type phi = 3;
426     static constexpr size_type item_size = micro_queue_type::item_size;
427     static constexpr size_type items_per_page = micro_queue_type::items_per_page;
428 
concurrent_queue_repconcurrent_queue_rep429     concurrent_queue_rep() {}
430 
431     concurrent_queue_rep( const concurrent_queue_rep& ) = delete;
432     concurrent_queue_rep& operator=( const concurrent_queue_rep& ) = delete;
433 
clearconcurrent_queue_rep434     void clear( queue_allocator_type& alloc ) {
435         for (size_type index = 0; index < n_queue; ++index) {
436             array[index].clear(alloc);
437         }
438         head_counter.store(0, std::memory_order_relaxed);
439         tail_counter.store(0, std::memory_order_relaxed);
440         n_invalid_entries.store(0, std::memory_order_relaxed);
441     }
442 
assignconcurrent_queue_rep443     void assign( const concurrent_queue_rep& src, queue_allocator_type& alloc, item_constructor_type construct_item ) {
444         head_counter.store(src.head_counter.load(std::memory_order_relaxed), std::memory_order_relaxed);
445         tail_counter.store(src.tail_counter.load(std::memory_order_relaxed), std::memory_order_relaxed);
446         n_invalid_entries.store(src.n_invalid_entries.load(std::memory_order_relaxed), std::memory_order_relaxed);
447 
448         // copy or move micro_queues
449         size_type queue_idx = 0;
450         try_call( [&] {
451             for (; queue_idx < n_queue; ++queue_idx) {
452                 array[queue_idx].assign(src.array[queue_idx], alloc, construct_item);
453             }
454         }).on_exception( [&] {
455             for (size_type i = 0; i < queue_idx + 1; ++i) {
456                 array[i].clear_and_invalidate(alloc);
457             }
458             head_counter.store(0, std::memory_order_relaxed);
459             tail_counter.store(0, std::memory_order_relaxed);
460             n_invalid_entries.store(0, std::memory_order_relaxed);
461         });
462 
463         __TBB_ASSERT(head_counter.load(std::memory_order_relaxed) == src.head_counter.load(std::memory_order_relaxed) &&
464                      tail_counter.load(std::memory_order_relaxed) == src.tail_counter.load(std::memory_order_relaxed),
465                      "the source concurrent queue should not be concurrently modified." );
466     }
467 
emptyconcurrent_queue_rep468     bool empty() const {
469         ticket_type tc = tail_counter.load(std::memory_order_acquire);
470         ticket_type hc = head_counter.load(std::memory_order_relaxed);
471         // if tc!=r.tail_counter, the queue was not empty at some point between the two reads.
472         return tc == tail_counter.load(std::memory_order_relaxed) &&
473                std::ptrdiff_t(tc - hc - n_invalid_entries.load(std::memory_order_relaxed)) <= 0;
474     }
475 
sizeconcurrent_queue_rep476     std::ptrdiff_t size() const {
477         __TBB_ASSERT(sizeof(std::ptrdiff_t) <= sizeof(size_type), nullptr);
478         std::ptrdiff_t hc = head_counter.load(std::memory_order_acquire);
479         std::ptrdiff_t tc = tail_counter.load(std::memory_order_relaxed);
480         std::ptrdiff_t nie = n_invalid_entries.load(std::memory_order_relaxed);
481 
482         return tc - hc - nie;
483     }
484 
485     friend class micro_queue<T, Allocator>;
486 
487     // Map ticket_type to an array index
indexconcurrent_queue_rep488     static size_type index( ticket_type k ) {
489         return k * phi % n_queue;
490     }
491 
chooseconcurrent_queue_rep492     micro_queue_type& choose( ticket_type k ) {
493         // The formula here approximates LRU in a cache-oblivious way.
494         return array[index(k)];
495     }
496 
497     alignas(max_nfs_size) micro_queue_type array[n_queue];
498 
alignasconcurrent_queue_rep499     alignas(max_nfs_size) std::atomic<ticket_type> head_counter{};
alignasconcurrent_queue_rep500     alignas(max_nfs_size) std::atomic<ticket_type> tail_counter{};
alignasconcurrent_queue_rep501     alignas(max_nfs_size) std::atomic<size_type> n_invalid_entries{};
502 }; // class concurrent_queue_rep
503 
504 #if _MSC_VER && !defined(__INTEL_COMPILER)
505 #pragma warning( pop )
506 #endif
507 
508 template <typename Value, typename Allocator>
509 class concurrent_queue_iterator_base {
510     using queue_rep_type = concurrent_queue_rep<Value, Allocator>;
511     using padded_page = typename queue_rep_type::padded_page;
512 protected:
513     concurrent_queue_iterator_base() = default;
514 
concurrent_queue_iterator_base(const concurrent_queue_iterator_base & other)515     concurrent_queue_iterator_base( const concurrent_queue_iterator_base& other ) {
516         assign(other);
517     }
518 
concurrent_queue_iterator_base(queue_rep_type * queue_rep)519     concurrent_queue_iterator_base( queue_rep_type* queue_rep )
520         : my_queue_rep(queue_rep),
521           my_head_counter(my_queue_rep->head_counter.load(std::memory_order_relaxed))
522     {
523         for (std::size_t i = 0; i < queue_rep_type::n_queue; ++i) {
524             my_array[i] = my_queue_rep->array[i].get_head_page();
525         }
526 
527         if (!get_item(my_item, my_head_counter)) advance();
528     }
529 
assign(const concurrent_queue_iterator_base & other)530     void assign( const concurrent_queue_iterator_base& other ) {
531         my_item = other.my_item;
532         my_queue_rep = other.my_queue_rep;
533 
534         if (my_queue_rep != nullptr) {
535             my_head_counter = other.my_head_counter;
536 
537             for (std::size_t i = 0; i < queue_rep_type::n_queue; ++i) {
538                 my_array[i] = other.my_array[i];
539             }
540         }
541     }
542 
advance()543     void advance() {
544         __TBB_ASSERT(my_item, "Attempt to increment iterator past end of the queue");
545         std::size_t k = my_head_counter;
546 #if TBB_USE_ASSERT
547         Value* tmp;
548         get_item(tmp, k);
549         __TBB_ASSERT(my_item == tmp, nullptr);
550 #endif
551         std::size_t i = modulo_power_of_two(k / queue_rep_type::n_queue, my_queue_rep->items_per_page);
552         if (i == my_queue_rep->items_per_page - 1) {
553             padded_page*& root = my_array[queue_rep_type::index(k)];
554             root = root->next;
555         }
556         // Advance k
557         my_head_counter = ++k;
558         if (!get_item(my_item, k)) advance();
559     }
560 
561     concurrent_queue_iterator_base& operator=( const concurrent_queue_iterator_base& other ) {
562         this->assign(other);
563         return *this;
564     }
565 
get_item(Value * & item,std::size_t k)566     bool get_item( Value*& item, std::size_t k ) {
567         if (k == my_queue_rep->tail_counter.load(std::memory_order_relaxed)) {
568             item = nullptr;
569             return true;
570         } else {
571             padded_page* p = my_array[queue_rep_type::index(k)];
572             __TBB_ASSERT(p, nullptr);
573             std::size_t i = modulo_power_of_two(k / queue_rep_type::n_queue, my_queue_rep->items_per_page);
574             item = &(*p)[i];
575             return (p->mask & uintptr_t(1) << i) != 0;
576         }
577     }
578 
579     Value* my_item{ nullptr };
580     queue_rep_type* my_queue_rep{ nullptr };
581     ticket_type my_head_counter{};
582     padded_page* my_array[queue_rep_type::n_queue]{};
583 }; // class concurrent_queue_iterator_base
584 
585 struct concurrent_queue_iterator_provider {
586     template <typename Iterator, typename Container>
getconcurrent_queue_iterator_provider587     static Iterator get( const Container& container ) {
588         return Iterator(container);
589     }
590 }; // struct concurrent_queue_iterator_provider
591 
592 template <typename Container, typename Value, typename Allocator>
593 class concurrent_queue_iterator : public concurrent_queue_iterator_base<typename std::remove_cv<Value>::type, Allocator> {
594     using base_type = concurrent_queue_iterator_base<typename std::remove_cv<Value>::type, Allocator>;
595 public:
596     using value_type = Value;
597     using pointer = value_type*;
598     using reference = value_type&;
599     using difference_type = std::ptrdiff_t;
600     using iterator_category = std::forward_iterator_tag;
601 
602     concurrent_queue_iterator() = default;
603 
604     /** If Value==Container::value_type, then this routine is the copy constructor.
605         If Value==const Container::value_type, then this routine is a conversion constructor. */
concurrent_queue_iterator(const concurrent_queue_iterator<Container,typename Container::value_type,Allocator> & other)606     concurrent_queue_iterator( const concurrent_queue_iterator<Container, typename Container::value_type, Allocator>& other )
607         : base_type(other) {}
608 
609 private:
concurrent_queue_iterator(const Container & container)610     concurrent_queue_iterator( const Container& container )
611         : base_type(container.my_queue_representation) {}
612 public:
613     concurrent_queue_iterator& operator=( const concurrent_queue_iterator<Container, typename Container::value_type, Allocator>& other ) {
614         this->assign(other);
615         return *this;
616     }
617 
618     reference operator*() const {
619         return *static_cast<pointer>(this->my_item);
620     }
621 
622     pointer operator->() const { return &operator*(); }
623 
624     concurrent_queue_iterator& operator++() {
625         this->advance();
626         return *this;
627     }
628 
629     concurrent_queue_iterator operator++(int) {
630         concurrent_queue_iterator tmp = *this;
631         ++*this;
632         return tmp;
633     }
634 
635     friend bool operator==( const concurrent_queue_iterator& lhs, const concurrent_queue_iterator& rhs ) {
636         return lhs.my_item == rhs.my_item;
637     }
638 
639     friend bool operator!=( const concurrent_queue_iterator& lhs, const concurrent_queue_iterator& rhs ) {
640         return lhs.my_item != rhs.my_item;
641     }
642 private:
643     friend struct concurrent_queue_iterator_provider;
644 }; // class concurrent_queue_iterator
645 
646 } // namespace d2
647 } // namespace detail
648 } // tbb
649 
650 #endif // __TBB_detail__concurrent_queue_base_H
651