1 /*
2 Copyright (c) 2005-2022 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16
17 #ifndef __TBB_detail__concurrent_queue_base_H
18 #define __TBB_detail__concurrent_queue_base_H
19
20 #include "_utils.h"
21 #include "_exception.h"
22 #include "_machine.h"
23 #include "_allocator_traits.h"
24
25 #include "../profiling.h"
26 #include "../spin_mutex.h"
27 #include "../cache_aligned_allocator.h"
28
29 #include <atomic>
30
31 namespace tbb {
32 namespace detail {
33 namespace d2 {
34
35 using ticket_type = std::size_t;
36
37 template <typename Page>
is_valid_page(const Page p)38 inline bool is_valid_page(const Page p) {
39 return reinterpret_cast<std::uintptr_t>(p) > 1;
40 }
41
42 template <typename T, typename Allocator>
43 struct concurrent_queue_rep;
44
45 template <typename Container, typename T, typename Allocator>
46 class micro_queue_pop_finalizer;
47
48 #if _MSC_VER && !defined(__INTEL_COMPILER)
49 // unary minus operator applied to unsigned type, result still unsigned
50 #pragma warning( push )
51 #pragma warning( disable: 4146 )
52 #endif
53
54 // A queue using simple locking.
55 // For efficiency, this class has no constructor.
56 // The caller is expected to zero-initialize it.
57 template <typename T, typename Allocator>
58 class micro_queue {
59 private:
60 using queue_rep_type = concurrent_queue_rep<T, Allocator>;
61 using self_type = micro_queue<T, Allocator>;
62 public:
63 using size_type = std::size_t;
64 using value_type = T;
65 using reference = value_type&;
66 using const_reference = const value_type&;
67
68 using allocator_type = Allocator;
69 using allocator_traits_type = tbb::detail::allocator_traits<allocator_type>;
70 using queue_allocator_type = typename allocator_traits_type::template rebind_alloc<queue_rep_type>;
71
72 static constexpr size_type item_size = sizeof(T);
73 static constexpr size_type items_per_page = item_size <= 8 ? 32 :
74 item_size <= 16 ? 16 :
75 item_size <= 32 ? 8 :
76 item_size <= 64 ? 4 :
77 item_size <= 128 ? 2 : 1;
78
79 struct padded_page {
padded_pagepadded_page80 padded_page() {}
~padded_pagepadded_page81 ~padded_page() {}
82
83 reference operator[] (std::size_t index) {
84 __TBB_ASSERT(index < items_per_page, "Index out of range");
85 return items[index];
86 }
87
88 const_reference operator[] (std::size_t index) const {
89 __TBB_ASSERT(index < items_per_page, "Index out of range");
90 return items[index];
91 }
92
93 padded_page* next{ nullptr };
94 std::atomic<std::uintptr_t> mask{};
95
96 union {
97 value_type items[items_per_page];
98 };
99 }; // struct padded_page
100
101 using page_allocator_type = typename allocator_traits_type::template rebind_alloc<padded_page>;
102 protected:
103 using page_allocator_traits = tbb::detail::allocator_traits<page_allocator_type>;
104
105 public:
106 using item_constructor_type = void (*)(value_type* location, const void* src);
107 micro_queue() = default;
108 micro_queue( const micro_queue& ) = delete;
109 micro_queue& operator=( const micro_queue& ) = delete;
110
prepare_page(ticket_type k,queue_rep_type & base,page_allocator_type page_allocator,padded_page * & p)111 size_type prepare_page( ticket_type k, queue_rep_type& base, page_allocator_type page_allocator,
112 padded_page*& p ) {
113 __TBB_ASSERT(p == nullptr, "Invalid page argument for prepare_page");
114 k &= -queue_rep_type::n_queue;
115 size_type index = modulo_power_of_two(k / queue_rep_type::n_queue, items_per_page);
116 if (!index) {
117 try_call( [&] {
118 p = page_allocator_traits::allocate(page_allocator, 1);
119 }).on_exception( [&] {
120 ++base.n_invalid_entries;
121 invalidate_page( k );
122 });
123 page_allocator_traits::construct(page_allocator, p);
124 }
125
126 spin_wait_until_my_turn(tail_counter, k, base);
127 d1::call_itt_notify(d1::acquired, &tail_counter);
128
129 if (p) {
130 spin_mutex::scoped_lock lock( page_mutex );
131 padded_page* q = tail_page.load(std::memory_order_relaxed);
132 if (is_valid_page(q)) {
133 q->next = p;
134 } else {
135 head_page.store(p, std::memory_order_relaxed);
136 }
137 tail_page.store(p, std::memory_order_relaxed);
138 } else {
139 p = tail_page.load(std::memory_order_relaxed);
140 }
141 return index;
142 }
143
144 template<typename... Args>
push(ticket_type k,queue_rep_type & base,queue_allocator_type & allocator,Args &&...args)145 void push( ticket_type k, queue_rep_type& base, queue_allocator_type& allocator, Args&&... args )
146 {
147 padded_page* p = nullptr;
148 page_allocator_type page_allocator(allocator);
149 size_type index = prepare_page(k, base, page_allocator, p);
150 __TBB_ASSERT(p != nullptr, "Page was not prepared");
151
152 // try_call API is not convenient here due to broken
153 // variadic capture on GCC 4.8.5
154 auto value_guard = make_raii_guard([&] {
155 ++base.n_invalid_entries;
156 d1::call_itt_notify(d1::releasing, &tail_counter);
157 tail_counter.fetch_add(queue_rep_type::n_queue);
158 });
159
160 page_allocator_traits::construct(page_allocator, &(*p)[index], std::forward<Args>(args)...);
161 // If no exception was thrown, mark item as present.
162 p->mask.store(p->mask.load(std::memory_order_relaxed) | uintptr_t(1) << index, std::memory_order_relaxed);
163 d1::call_itt_notify(d1::releasing, &tail_counter);
164
165 value_guard.dismiss();
166 tail_counter.fetch_add(queue_rep_type::n_queue);
167 }
168
abort_push(ticket_type k,queue_rep_type & base,queue_allocator_type & allocator)169 void abort_push( ticket_type k, queue_rep_type& base, queue_allocator_type& allocator ) {
170 padded_page* p = nullptr;
171 prepare_page(k, base, allocator, p);
172 ++base.n_invalid_entries;
173 tail_counter.fetch_add(queue_rep_type::n_queue);
174 }
175
pop(void * dst,ticket_type k,queue_rep_type & base,queue_allocator_type & allocator)176 bool pop( void* dst, ticket_type k, queue_rep_type& base, queue_allocator_type& allocator ) {
177 k &= -queue_rep_type::n_queue;
178 spin_wait_until_eq(head_counter, k);
179 d1::call_itt_notify(d1::acquired, &head_counter);
180 spin_wait_while_eq(tail_counter, k);
181 d1::call_itt_notify(d1::acquired, &tail_counter);
182 padded_page *p = head_page.load(std::memory_order_relaxed);
183 __TBB_ASSERT( p, nullptr );
184 size_type index = modulo_power_of_two( k/queue_rep_type::n_queue, items_per_page );
185 bool success = false;
186 {
187 page_allocator_type page_allocator(allocator);
188 micro_queue_pop_finalizer<self_type, value_type, page_allocator_type> finalizer(*this, page_allocator,
189 k + queue_rep_type::n_queue, index == items_per_page - 1 ? p : nullptr );
190 if (p->mask.load(std::memory_order_relaxed) & (std::uintptr_t(1) << index)) {
191 success = true;
192 assign_and_destroy_item(dst, *p, index);
193 } else {
194 --base.n_invalid_entries;
195 }
196 }
197 return success;
198 }
199
assign(const micro_queue & src,queue_allocator_type & allocator,item_constructor_type construct_item)200 micro_queue& assign( const micro_queue& src, queue_allocator_type& allocator,
201 item_constructor_type construct_item )
202 {
203 head_counter.store(src.head_counter.load(std::memory_order_relaxed), std::memory_order_relaxed);
204 tail_counter.store(src.tail_counter.load(std::memory_order_relaxed), std::memory_order_relaxed);
205
206 const padded_page* srcp = src.head_page.load(std::memory_order_relaxed);
207 if( is_valid_page(srcp) ) {
208 ticket_type g_index = head_counter.load(std::memory_order_relaxed);
209 size_type n_items = (tail_counter.load(std::memory_order_relaxed) - head_counter.load(std::memory_order_relaxed))
210 / queue_rep_type::n_queue;
211 size_type index = modulo_power_of_two(head_counter.load(std::memory_order_relaxed) / queue_rep_type::n_queue, items_per_page);
212 size_type end_in_first_page = (index+n_items < items_per_page) ? (index + n_items) : items_per_page;
213
214 try_call( [&] {
215 head_page.store(make_copy(allocator, srcp, index, end_in_first_page, g_index, construct_item), std::memory_order_relaxed);
216 }).on_exception( [&] {
217 head_counter.store(0, std::memory_order_relaxed);
218 tail_counter.store(0, std::memory_order_relaxed);
219 });
220 padded_page* cur_page = head_page.load(std::memory_order_relaxed);
221
222 try_call( [&] {
223 if (srcp != src.tail_page.load(std::memory_order_relaxed)) {
224 for (srcp = srcp->next; srcp != src.tail_page.load(std::memory_order_relaxed); srcp=srcp->next ) {
225 cur_page->next = make_copy( allocator, srcp, 0, items_per_page, g_index, construct_item );
226 cur_page = cur_page->next;
227 }
228
229 __TBB_ASSERT(srcp == src.tail_page.load(std::memory_order_relaxed), nullptr );
230 size_type last_index = modulo_power_of_two(tail_counter.load(std::memory_order_relaxed) / queue_rep_type::n_queue, items_per_page);
231 if( last_index==0 ) last_index = items_per_page;
232
233 cur_page->next = make_copy( allocator, srcp, 0, last_index, g_index, construct_item );
234 cur_page = cur_page->next;
235 }
236 tail_page.store(cur_page, std::memory_order_relaxed);
237 }).on_exception( [&] {
238 padded_page* invalid_page = reinterpret_cast<padded_page*>(std::uintptr_t(1));
239 tail_page.store(invalid_page, std::memory_order_relaxed);
240 });
241 } else {
242 head_page.store(nullptr, std::memory_order_relaxed);
243 tail_page.store(nullptr, std::memory_order_relaxed);
244 }
245 return *this;
246 }
247
make_copy(queue_allocator_type & allocator,const padded_page * src_page,size_type begin_in_page,size_type end_in_page,ticket_type & g_index,item_constructor_type construct_item)248 padded_page* make_copy( queue_allocator_type& allocator, const padded_page* src_page, size_type begin_in_page,
249 size_type end_in_page, ticket_type& g_index, item_constructor_type construct_item )
250 {
251 page_allocator_type page_allocator(allocator);
252 padded_page* new_page = page_allocator_traits::allocate(page_allocator, 1);
253 new_page->next = nullptr;
254 new_page->mask.store(src_page->mask.load(std::memory_order_relaxed), std::memory_order_relaxed);
255 for (; begin_in_page!=end_in_page; ++begin_in_page, ++g_index) {
256 if (new_page->mask.load(std::memory_order_relaxed) & uintptr_t(1) << begin_in_page) {
257 copy_item(*new_page, begin_in_page, *src_page, begin_in_page, construct_item);
258 }
259 }
260 return new_page;
261 }
262
invalidate_page(ticket_type k)263 void invalidate_page( ticket_type k ) {
264 // Append an invalid page at address 1 so that no more pushes are allowed.
265 padded_page* invalid_page = reinterpret_cast<padded_page*>(std::uintptr_t(1));
266 {
267 spin_mutex::scoped_lock lock( page_mutex );
268 tail_counter.store(k + queue_rep_type::n_queue + 1, std::memory_order_relaxed);
269 padded_page* q = tail_page.load(std::memory_order_relaxed);
270 if (is_valid_page(q)) {
271 q->next = invalid_page;
272 } else {
273 head_page.store(invalid_page, std::memory_order_relaxed);
274 }
275 tail_page.store(invalid_page, std::memory_order_relaxed);
276 }
277 }
278
get_head_page()279 padded_page* get_head_page() {
280 return head_page.load(std::memory_order_relaxed);
281 }
282
283 void clear(queue_allocator_type& allocator, padded_page* new_head = nullptr, padded_page* new_tail = nullptr) {
284 padded_page* curr_page = get_head_page();
285 size_type index = (head_counter.load(std::memory_order_relaxed) / queue_rep_type::n_queue) % items_per_page;
286 page_allocator_type page_allocator(allocator);
287
288 while (curr_page && is_valid_page(curr_page)) {
289 while (index != items_per_page) {
290 if (curr_page->mask.load(std::memory_order_relaxed) & (std::uintptr_t(1) << index)) {
291 page_allocator_traits::destroy(page_allocator, &curr_page->operator[](index));
292 }
293 ++index;
294 }
295
296 index = 0;
297 padded_page* next_page = curr_page->next;
298 page_allocator_traits::destroy(page_allocator, curr_page);
299 page_allocator_traits::deallocate(page_allocator, curr_page, 1);
300 curr_page = next_page;
301 }
302 head_counter.store(0, std::memory_order_relaxed);
303 tail_counter.store(0, std::memory_order_relaxed);
304 head_page.store(new_head, std::memory_order_relaxed);
305 tail_page.store(new_tail, std::memory_order_relaxed);
306 }
307
clear_and_invalidate(queue_allocator_type & allocator)308 void clear_and_invalidate(queue_allocator_type& allocator) {
309 padded_page* invalid_page = reinterpret_cast<padded_page*>(std::uintptr_t(1));
310 clear(allocator, invalid_page, invalid_page);
311 }
312
313 private:
314 // template <typename U, typename A>
315 friend class micro_queue_pop_finalizer<self_type, value_type, page_allocator_type>;
316
317 // Class used to ensure exception-safety of method "pop"
318 class destroyer {
319 value_type& my_value;
320 public:
destroyer(reference value)321 destroyer( reference value ) : my_value(value) {}
322 destroyer( const destroyer& ) = delete;
323 destroyer& operator=( const destroyer& ) = delete;
~destroyer()324 ~destroyer() {my_value.~T();}
325 }; // class destroyer
326
copy_item(padded_page & dst,size_type dindex,const padded_page & src,size_type sindex,item_constructor_type construct_item)327 void copy_item( padded_page& dst, size_type dindex, const padded_page& src, size_type sindex,
328 item_constructor_type construct_item )
329 {
330 auto& src_item = src[sindex];
331 construct_item( &dst[dindex], static_cast<const void*>(&src_item) );
332 }
333
assign_and_destroy_item(void * dst,padded_page & src,size_type index)334 void assign_and_destroy_item( void* dst, padded_page& src, size_type index ) {
335 auto& from = src[index];
336 destroyer d(from);
337 *static_cast<T*>(dst) = std::move(from);
338 }
339
spin_wait_until_my_turn(std::atomic<ticket_type> & counter,ticket_type k,queue_rep_type & rb)340 void spin_wait_until_my_turn( std::atomic<ticket_type>& counter, ticket_type k, queue_rep_type& rb ) const {
341 for (atomic_backoff b{};; b.pause()) {
342 ticket_type c = counter.load(std::memory_order_acquire);
343 if (c == k) return;
344 else if (c & 1) {
345 ++rb.n_invalid_entries;
346 throw_exception( exception_id::bad_last_alloc);
347 }
348 }
349 }
350
351 std::atomic<padded_page*> head_page{};
352 std::atomic<ticket_type> head_counter{};
353
354 std::atomic<padded_page*> tail_page{};
355 std::atomic<ticket_type> tail_counter{};
356
357 spin_mutex page_mutex{};
358 }; // class micro_queue
359
360 #if _MSC_VER && !defined(__INTEL_COMPILER)
361 #pragma warning( pop )
362 #endif // warning 4146 is back
363
364 template <typename Container, typename T, typename Allocator>
365 class micro_queue_pop_finalizer {
366 public:
367 using padded_page = typename Container::padded_page;
368 using allocator_type = Allocator;
369 using allocator_traits_type = tbb::detail::allocator_traits<allocator_type>;
370
micro_queue_pop_finalizer(Container & queue,Allocator & alloc,ticket_type k,padded_page * p)371 micro_queue_pop_finalizer( Container& queue, Allocator& alloc, ticket_type k, padded_page* p ) :
372 my_ticket_type(k), my_queue(queue), my_page(p), allocator(alloc)
373 {}
374
375 micro_queue_pop_finalizer( const micro_queue_pop_finalizer& ) = delete;
376 micro_queue_pop_finalizer& operator=( const micro_queue_pop_finalizer& ) = delete;
377
~micro_queue_pop_finalizer()378 ~micro_queue_pop_finalizer() {
379 padded_page* p = my_page;
380 if( is_valid_page(p) ) {
381 spin_mutex::scoped_lock lock( my_queue.page_mutex );
382 padded_page* q = p->next;
383 my_queue.head_page.store(q, std::memory_order_relaxed);
384 if( !is_valid_page(q) ) {
385 my_queue.tail_page.store(nullptr, std::memory_order_relaxed);
386 }
387 }
388 my_queue.head_counter.store(my_ticket_type, std::memory_order_release);
389 if ( is_valid_page(p) ) {
390 allocator_traits_type::destroy(allocator, static_cast<padded_page*>(p));
391 allocator_traits_type::deallocate(allocator, static_cast<padded_page*>(p), 1);
392 }
393 }
394 private:
395 ticket_type my_ticket_type;
396 Container& my_queue;
397 padded_page* my_page;
398 Allocator& allocator;
399 }; // class micro_queue_pop_finalizer
400
401 #if _MSC_VER && !defined(__INTEL_COMPILER)
402 // structure was padded due to alignment specifier
403 #pragma warning( push )
404 #pragma warning( disable: 4324 )
405 #endif
406
407 template <typename T, typename Allocator>
408 struct concurrent_queue_rep {
409 using self_type = concurrent_queue_rep<T, Allocator>;
410 using size_type = std::size_t;
411 using micro_queue_type = micro_queue<T, Allocator>;
412 using allocator_type = Allocator;
413 using allocator_traits_type = tbb::detail::allocator_traits<allocator_type>;
414 using padded_page = typename micro_queue_type::padded_page;
415 using page_allocator_type = typename micro_queue_type::page_allocator_type;
416 using item_constructor_type = typename micro_queue_type::item_constructor_type;
417 private:
418 using page_allocator_traits = tbb::detail::allocator_traits<page_allocator_type>;
419 using queue_allocator_type = typename allocator_traits_type::template rebind_alloc<self_type>;
420
421 public:
422 // must be power of 2
423 static constexpr size_type n_queue = 8;
424 // Approximately n_queue/golden ratio
425 static constexpr size_type phi = 3;
426 static constexpr size_type item_size = micro_queue_type::item_size;
427 static constexpr size_type items_per_page = micro_queue_type::items_per_page;
428
concurrent_queue_repconcurrent_queue_rep429 concurrent_queue_rep() {}
430
431 concurrent_queue_rep( const concurrent_queue_rep& ) = delete;
432 concurrent_queue_rep& operator=( const concurrent_queue_rep& ) = delete;
433
clearconcurrent_queue_rep434 void clear( queue_allocator_type& alloc ) {
435 for (size_type index = 0; index < n_queue; ++index) {
436 array[index].clear(alloc);
437 }
438 head_counter.store(0, std::memory_order_relaxed);
439 tail_counter.store(0, std::memory_order_relaxed);
440 n_invalid_entries.store(0, std::memory_order_relaxed);
441 }
442
assignconcurrent_queue_rep443 void assign( const concurrent_queue_rep& src, queue_allocator_type& alloc, item_constructor_type construct_item ) {
444 head_counter.store(src.head_counter.load(std::memory_order_relaxed), std::memory_order_relaxed);
445 tail_counter.store(src.tail_counter.load(std::memory_order_relaxed), std::memory_order_relaxed);
446 n_invalid_entries.store(src.n_invalid_entries.load(std::memory_order_relaxed), std::memory_order_relaxed);
447
448 // copy or move micro_queues
449 size_type queue_idx = 0;
450 try_call( [&] {
451 for (; queue_idx < n_queue; ++queue_idx) {
452 array[queue_idx].assign(src.array[queue_idx], alloc, construct_item);
453 }
454 }).on_exception( [&] {
455 for (size_type i = 0; i < queue_idx + 1; ++i) {
456 array[i].clear_and_invalidate(alloc);
457 }
458 head_counter.store(0, std::memory_order_relaxed);
459 tail_counter.store(0, std::memory_order_relaxed);
460 n_invalid_entries.store(0, std::memory_order_relaxed);
461 });
462
463 __TBB_ASSERT(head_counter.load(std::memory_order_relaxed) == src.head_counter.load(std::memory_order_relaxed) &&
464 tail_counter.load(std::memory_order_relaxed) == src.tail_counter.load(std::memory_order_relaxed),
465 "the source concurrent queue should not be concurrently modified." );
466 }
467
emptyconcurrent_queue_rep468 bool empty() const {
469 ticket_type tc = tail_counter.load(std::memory_order_acquire);
470 ticket_type hc = head_counter.load(std::memory_order_relaxed);
471 // if tc!=r.tail_counter, the queue was not empty at some point between the two reads.
472 return tc == tail_counter.load(std::memory_order_relaxed) &&
473 std::ptrdiff_t(tc - hc - n_invalid_entries.load(std::memory_order_relaxed)) <= 0;
474 }
475
sizeconcurrent_queue_rep476 std::ptrdiff_t size() const {
477 __TBB_ASSERT(sizeof(std::ptrdiff_t) <= sizeof(size_type), nullptr);
478 std::ptrdiff_t hc = head_counter.load(std::memory_order_acquire);
479 std::ptrdiff_t tc = tail_counter.load(std::memory_order_relaxed);
480 std::ptrdiff_t nie = n_invalid_entries.load(std::memory_order_relaxed);
481
482 return tc - hc - nie;
483 }
484
485 friend class micro_queue<T, Allocator>;
486
487 // Map ticket_type to an array index
indexconcurrent_queue_rep488 static size_type index( ticket_type k ) {
489 return k * phi % n_queue;
490 }
491
chooseconcurrent_queue_rep492 micro_queue_type& choose( ticket_type k ) {
493 // The formula here approximates LRU in a cache-oblivious way.
494 return array[index(k)];
495 }
496
497 alignas(max_nfs_size) micro_queue_type array[n_queue];
498
alignasconcurrent_queue_rep499 alignas(max_nfs_size) std::atomic<ticket_type> head_counter{};
alignasconcurrent_queue_rep500 alignas(max_nfs_size) std::atomic<ticket_type> tail_counter{};
alignasconcurrent_queue_rep501 alignas(max_nfs_size) std::atomic<size_type> n_invalid_entries{};
502 }; // class concurrent_queue_rep
503
504 #if _MSC_VER && !defined(__INTEL_COMPILER)
505 #pragma warning( pop )
506 #endif
507
508 template <typename Value, typename Allocator>
509 class concurrent_queue_iterator_base {
510 using queue_rep_type = concurrent_queue_rep<Value, Allocator>;
511 using padded_page = typename queue_rep_type::padded_page;
512 protected:
513 concurrent_queue_iterator_base() = default;
514
concurrent_queue_iterator_base(const concurrent_queue_iterator_base & other)515 concurrent_queue_iterator_base( const concurrent_queue_iterator_base& other ) {
516 assign(other);
517 }
518
concurrent_queue_iterator_base(queue_rep_type * queue_rep)519 concurrent_queue_iterator_base( queue_rep_type* queue_rep )
520 : my_queue_rep(queue_rep),
521 my_head_counter(my_queue_rep->head_counter.load(std::memory_order_relaxed))
522 {
523 for (std::size_t i = 0; i < queue_rep_type::n_queue; ++i) {
524 my_array[i] = my_queue_rep->array[i].get_head_page();
525 }
526
527 if (!get_item(my_item, my_head_counter)) advance();
528 }
529
assign(const concurrent_queue_iterator_base & other)530 void assign( const concurrent_queue_iterator_base& other ) {
531 my_item = other.my_item;
532 my_queue_rep = other.my_queue_rep;
533
534 if (my_queue_rep != nullptr) {
535 my_head_counter = other.my_head_counter;
536
537 for (std::size_t i = 0; i < queue_rep_type::n_queue; ++i) {
538 my_array[i] = other.my_array[i];
539 }
540 }
541 }
542
advance()543 void advance() {
544 __TBB_ASSERT(my_item, "Attempt to increment iterator past end of the queue");
545 std::size_t k = my_head_counter;
546 #if TBB_USE_ASSERT
547 Value* tmp;
548 get_item(tmp, k);
549 __TBB_ASSERT(my_item == tmp, nullptr);
550 #endif
551 std::size_t i = modulo_power_of_two(k / queue_rep_type::n_queue, my_queue_rep->items_per_page);
552 if (i == my_queue_rep->items_per_page - 1) {
553 padded_page*& root = my_array[queue_rep_type::index(k)];
554 root = root->next;
555 }
556 // Advance k
557 my_head_counter = ++k;
558 if (!get_item(my_item, k)) advance();
559 }
560
561 concurrent_queue_iterator_base& operator=( const concurrent_queue_iterator_base& other ) {
562 this->assign(other);
563 return *this;
564 }
565
get_item(Value * & item,std::size_t k)566 bool get_item( Value*& item, std::size_t k ) {
567 if (k == my_queue_rep->tail_counter.load(std::memory_order_relaxed)) {
568 item = nullptr;
569 return true;
570 } else {
571 padded_page* p = my_array[queue_rep_type::index(k)];
572 __TBB_ASSERT(p, nullptr);
573 std::size_t i = modulo_power_of_two(k / queue_rep_type::n_queue, my_queue_rep->items_per_page);
574 item = &(*p)[i];
575 return (p->mask & uintptr_t(1) << i) != 0;
576 }
577 }
578
579 Value* my_item{ nullptr };
580 queue_rep_type* my_queue_rep{ nullptr };
581 ticket_type my_head_counter{};
582 padded_page* my_array[queue_rep_type::n_queue]{};
583 }; // class concurrent_queue_iterator_base
584
585 struct concurrent_queue_iterator_provider {
586 template <typename Iterator, typename Container>
getconcurrent_queue_iterator_provider587 static Iterator get( const Container& container ) {
588 return Iterator(container);
589 }
590 }; // struct concurrent_queue_iterator_provider
591
592 template <typename Container, typename Value, typename Allocator>
593 class concurrent_queue_iterator : public concurrent_queue_iterator_base<typename std::remove_cv<Value>::type, Allocator> {
594 using base_type = concurrent_queue_iterator_base<typename std::remove_cv<Value>::type, Allocator>;
595 public:
596 using value_type = Value;
597 using pointer = value_type*;
598 using reference = value_type&;
599 using difference_type = std::ptrdiff_t;
600 using iterator_category = std::forward_iterator_tag;
601
602 concurrent_queue_iterator() = default;
603
604 /** If Value==Container::value_type, then this routine is the copy constructor.
605 If Value==const Container::value_type, then this routine is a conversion constructor. */
concurrent_queue_iterator(const concurrent_queue_iterator<Container,typename Container::value_type,Allocator> & other)606 concurrent_queue_iterator( const concurrent_queue_iterator<Container, typename Container::value_type, Allocator>& other )
607 : base_type(other) {}
608
609 private:
concurrent_queue_iterator(const Container & container)610 concurrent_queue_iterator( const Container& container )
611 : base_type(container.my_queue_representation) {}
612 public:
613 concurrent_queue_iterator& operator=( const concurrent_queue_iterator<Container, typename Container::value_type, Allocator>& other ) {
614 this->assign(other);
615 return *this;
616 }
617
618 reference operator*() const {
619 return *static_cast<pointer>(this->my_item);
620 }
621
622 pointer operator->() const { return &operator*(); }
623
624 concurrent_queue_iterator& operator++() {
625 this->advance();
626 return *this;
627 }
628
629 concurrent_queue_iterator operator++(int) {
630 concurrent_queue_iterator tmp = *this;
631 ++*this;
632 return tmp;
633 }
634
635 friend bool operator==( const concurrent_queue_iterator& lhs, const concurrent_queue_iterator& rhs ) {
636 return lhs.my_item == rhs.my_item;
637 }
638
639 friend bool operator!=( const concurrent_queue_iterator& lhs, const concurrent_queue_iterator& rhs ) {
640 return lhs.my_item != rhs.my_item;
641 }
642 private:
643 friend struct concurrent_queue_iterator_provider;
644 }; // class concurrent_queue_iterator
645
646 } // namespace d2
647 } // namespace detail
648 } // tbb
649
650 #endif // __TBB_detail__concurrent_queue_base_H
651