1 /* 2 Copyright (c) 2005-2021 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #ifndef __TBB_detail__concurrent_queue_base_H 18 #define __TBB_detail__concurrent_queue_base_H 19 20 #include "_utils.h" 21 #include "_exception.h" 22 #include "_machine.h" 23 #include "_allocator_traits.h" 24 25 #include "../profiling.h" 26 #include "../spin_mutex.h" 27 #include "../cache_aligned_allocator.h" 28 29 #include <atomic> 30 31 namespace tbb { 32 namespace detail { 33 namespace d2 { 34 35 using ticket_type = std::size_t; 36 37 template <typename Page> 38 inline bool is_valid_page(const Page p) { 39 return reinterpret_cast<std::uintptr_t>(p) > 1; 40 } 41 42 template <typename T, typename Allocator> 43 struct concurrent_queue_rep; 44 45 template <typename Container, typename T, typename Allocator> 46 class micro_queue_pop_finalizer; 47 48 #if _MSC_VER && !defined(__INTEL_COMPILER) 49 // unary minus operator applied to unsigned type, result still unsigned 50 #pragma warning( push ) 51 #pragma warning( disable: 4146 ) 52 #endif 53 54 // A queue using simple locking. 55 // For efficiency, this class has no constructor. 56 // The caller is expected to zero-initialize it. 57 template <typename T, typename Allocator> 58 class micro_queue { 59 private: 60 using queue_rep_type = concurrent_queue_rep<T, Allocator>; 61 using self_type = micro_queue<T, Allocator>; 62 public: 63 using size_type = std::size_t; 64 using value_type = T; 65 using reference = value_type&; 66 using const_reference = const value_type&; 67 68 using allocator_type = Allocator; 69 using allocator_traits_type = tbb::detail::allocator_traits<allocator_type>; 70 using queue_allocator_type = typename allocator_traits_type::template rebind_alloc<queue_rep_type>; 71 72 static constexpr size_type item_size = sizeof(T); 73 static constexpr size_type items_per_page = item_size <= 8 ? 32 : 74 item_size <= 16 ? 16 : 75 item_size <= 32 ? 8 : 76 item_size <= 64 ? 4 : 77 item_size <= 128 ? 2 : 1; 78 79 struct padded_page { 80 padded_page() {} 81 ~padded_page() {} 82 83 reference operator[] (std::size_t index) { 84 __TBB_ASSERT(index < items_per_page, "Index out of range"); 85 return items[index]; 86 } 87 88 const_reference operator[] (std::size_t index) const { 89 __TBB_ASSERT(index < items_per_page, "Index out of range"); 90 return items[index]; 91 } 92 93 padded_page* next{ nullptr }; 94 std::atomic<std::uintptr_t> mask{}; 95 96 union { 97 value_type items[items_per_page]; 98 }; 99 }; // struct padded_page 100 101 using page_allocator_type = typename allocator_traits_type::template rebind_alloc<padded_page>; 102 protected: 103 using page_allocator_traits = tbb::detail::allocator_traits<page_allocator_type>; 104 105 public: 106 using item_constructor_type = void (*)(value_type* location, const void* src); 107 micro_queue() = default; 108 micro_queue( const micro_queue& ) = delete; 109 micro_queue& operator=( const micro_queue& ) = delete; 110 111 size_type prepare_page( ticket_type k, queue_rep_type& base, page_allocator_type page_allocator, 112 padded_page*& p ) { 113 __TBB_ASSERT(p == nullptr, "Invalid page argument for prepare_page"); 114 k &= -queue_rep_type::n_queue; 115 size_type index = modulo_power_of_two(k / queue_rep_type::n_queue, items_per_page); 116 if (!index) { 117 try_call( [&] { 118 p = page_allocator_traits::allocate(page_allocator, 1); 119 }).on_exception( [&] { 120 ++base.n_invalid_entries; 121 invalidate_page( k ); 122 }); 123 page_allocator_traits::construct(page_allocator, p); 124 } 125 126 if (tail_counter.load(std::memory_order_relaxed) != k) spin_wait_until_my_turn(tail_counter, k, base); 127 d1::call_itt_notify(d1::acquired, &tail_counter); 128 129 if (p) { 130 spin_mutex::scoped_lock lock( page_mutex ); 131 padded_page* q = tail_page.load(std::memory_order_relaxed); 132 if (is_valid_page(q)) { 133 q->next = p; 134 } else { 135 head_page.store(p, std::memory_order_relaxed); 136 } 137 tail_page.store(p, std::memory_order_release); 138 } else { 139 p = tail_page.load(std::memory_order_acquire); // TODO may be relaxed ? 140 } 141 return index; 142 } 143 144 template<typename... Args> 145 void push( ticket_type k, queue_rep_type& base, queue_allocator_type& allocator, Args&&... args ) 146 { 147 padded_page* p = nullptr; 148 page_allocator_type page_allocator(allocator); 149 size_type index = prepare_page(k, base, page_allocator, p); 150 __TBB_ASSERT(p != nullptr, "Page was not prepared"); 151 152 // try_call API is not convenient here due to broken 153 // variadic capture on GCC 4.8.5 154 auto value_guard = make_raii_guard([&] { 155 ++base.n_invalid_entries; 156 d1::call_itt_notify(d1::releasing, &tail_counter); 157 tail_counter.fetch_add(queue_rep_type::n_queue); 158 }); 159 160 page_allocator_traits::construct(page_allocator, &(*p)[index], std::forward<Args>(args)...); 161 // If no exception was thrown, mark item as present. 162 p->mask.store(p->mask.load(std::memory_order_relaxed) | uintptr_t(1) << index, std::memory_order_relaxed); 163 d1::call_itt_notify(d1::releasing, &tail_counter); 164 165 value_guard.dismiss(); 166 tail_counter.fetch_add(queue_rep_type::n_queue); 167 } 168 169 void abort_push( ticket_type k, queue_rep_type& base, queue_allocator_type& allocator ) { 170 padded_page* p = nullptr; 171 prepare_page(k, base, allocator, p); 172 ++base.n_invalid_entries; 173 tail_counter.fetch_add(queue_rep_type::n_queue); 174 } 175 176 bool pop( void* dst, ticket_type k, queue_rep_type& base, queue_allocator_type& allocator) { 177 k &= -queue_rep_type::n_queue; 178 spin_wait_until_eq(head_counter, k); 179 d1::call_itt_notify(d1::acquired, &head_counter); 180 spin_wait_while_eq(tail_counter, k); 181 d1::call_itt_notify(d1::acquired, &tail_counter); 182 padded_page *p = head_page.load(std::memory_order_acquire); 183 __TBB_ASSERT( p, nullptr ); 184 size_type index = modulo_power_of_two( k/queue_rep_type::n_queue, items_per_page ); 185 bool success = false; 186 { 187 page_allocator_type page_allocator(allocator); 188 micro_queue_pop_finalizer<self_type, value_type, page_allocator_type> finalizer(*this, page_allocator, 189 k + queue_rep_type::n_queue, index == items_per_page - 1 ? p : nullptr ); 190 if (p->mask.load(std::memory_order_relaxed) & (std::uintptr_t(1) << index)) { 191 success = true; 192 assign_and_destroy_item( dst, *p, index ); 193 } else { 194 --base.n_invalid_entries; 195 } 196 } 197 return success; 198 } 199 200 micro_queue& assign( const micro_queue& src, queue_allocator_type& allocator, 201 item_constructor_type construct_item ) 202 { 203 head_counter.store(src.head_counter.load(std::memory_order_relaxed), std::memory_order_relaxed); 204 tail_counter.store(src.tail_counter.load(std::memory_order_relaxed), std::memory_order_relaxed); 205 206 const padded_page* srcp = src.head_page.load(std::memory_order_relaxed); 207 if( is_valid_page(srcp) ) { 208 ticket_type g_index = head_counter.load(std::memory_order_relaxed); 209 size_type n_items = (tail_counter.load(std::memory_order_relaxed) - head_counter.load(std::memory_order_relaxed)) 210 / queue_rep_type::n_queue; 211 size_type index = modulo_power_of_two(head_counter.load(std::memory_order_relaxed) / queue_rep_type::n_queue, items_per_page); 212 size_type end_in_first_page = (index+n_items < items_per_page) ? (index + n_items) : items_per_page; 213 214 try_call( [&] { 215 head_page.store(make_copy(allocator, srcp, index, end_in_first_page, g_index, construct_item), std::memory_order_relaxed); 216 }).on_exception( [&] { 217 head_counter.store(0, std::memory_order_relaxed); 218 tail_counter.store(0, std::memory_order_relaxed); 219 }); 220 padded_page* cur_page = head_page.load(std::memory_order_relaxed); 221 222 try_call( [&] { 223 if (srcp != src.tail_page.load(std::memory_order_relaxed)) { 224 for (srcp = srcp->next; srcp != src.tail_page.load(std::memory_order_relaxed); srcp=srcp->next ) { 225 cur_page->next = make_copy( allocator, srcp, 0, items_per_page, g_index, construct_item ); 226 cur_page = cur_page->next; 227 } 228 229 __TBB_ASSERT(srcp == src.tail_page.load(std::memory_order_relaxed), nullptr ); 230 size_type last_index = modulo_power_of_two(tail_counter.load(std::memory_order_relaxed) / queue_rep_type::n_queue, items_per_page); 231 if( last_index==0 ) last_index = items_per_page; 232 233 cur_page->next = make_copy( allocator, srcp, 0, last_index, g_index, construct_item ); 234 cur_page = cur_page->next; 235 } 236 tail_page.store(cur_page, std::memory_order_relaxed); 237 }).on_exception( [&] { 238 padded_page* invalid_page = reinterpret_cast<padded_page*>(std::uintptr_t(1)); 239 tail_page.store(invalid_page, std::memory_order_relaxed); 240 }); 241 } else { 242 head_page.store(nullptr, std::memory_order_relaxed); 243 tail_page.store(nullptr, std::memory_order_relaxed); 244 } 245 return *this; 246 } 247 248 padded_page* make_copy( queue_allocator_type& allocator, const padded_page* src_page, size_type begin_in_page, 249 size_type end_in_page, ticket_type& g_index, item_constructor_type construct_item ) 250 { 251 page_allocator_type page_allocator(allocator); 252 padded_page* new_page = page_allocator_traits::allocate(page_allocator, 1); 253 new_page->next = nullptr; 254 new_page->mask.store(src_page->mask.load(std::memory_order_relaxed), std::memory_order_relaxed); 255 for (; begin_in_page!=end_in_page; ++begin_in_page, ++g_index) { 256 if (new_page->mask.load(std::memory_order_relaxed) & uintptr_t(1) << begin_in_page) { 257 copy_item(*new_page, begin_in_page, *src_page, begin_in_page, construct_item); 258 } 259 } 260 return new_page; 261 } 262 263 void invalidate_page( ticket_type k ) { 264 // Append an invalid page at address 1 so that no more pushes are allowed. 265 padded_page* invalid_page = reinterpret_cast<padded_page*>(std::uintptr_t(1)); 266 { 267 spin_mutex::scoped_lock lock( page_mutex ); 268 tail_counter.store(k + queue_rep_type::n_queue + 1, std::memory_order_relaxed); 269 padded_page* q = tail_page.load(std::memory_order_relaxed); 270 if (is_valid_page(q)) { 271 q->next = invalid_page; 272 } else { 273 head_page.store(invalid_page, std::memory_order_relaxed); 274 } 275 tail_page.store(invalid_page, std::memory_order_relaxed); 276 } 277 } 278 279 padded_page* get_tail_page() { 280 return tail_page.load(std::memory_order_relaxed); 281 } 282 283 padded_page* get_head_page() { 284 return head_page.load(std::memory_order_relaxed); 285 } 286 287 void set_tail_page( padded_page* pg ) { 288 tail_page.store(pg, std::memory_order_relaxed); 289 } 290 291 void clear(queue_allocator_type& allocator ) { 292 padded_page* curr_page = head_page.load(std::memory_order_relaxed); 293 std::size_t index = head_counter.load(std::memory_order_relaxed); 294 page_allocator_type page_allocator(allocator); 295 296 while (curr_page) { 297 for (; index != items_per_page - 1; ++index) { 298 curr_page->operator[](index).~value_type(); 299 } 300 padded_page* next_page = curr_page->next; 301 page_allocator_traits::destroy(page_allocator, curr_page); 302 page_allocator_traits::deallocate(page_allocator, curr_page, 1); 303 curr_page = next_page; 304 } 305 306 padded_page* invalid_page = reinterpret_cast<padded_page*>(std::uintptr_t(1)); 307 head_page.store(invalid_page, std::memory_order_relaxed); 308 tail_page.store(invalid_page, std::memory_order_relaxed); 309 } 310 311 private: 312 // template <typename U, typename A> 313 friend class micro_queue_pop_finalizer<self_type, value_type, page_allocator_type>; 314 315 // Class used to ensure exception-safety of method "pop" 316 class destroyer { 317 value_type& my_value; 318 public: 319 destroyer( reference value ) : my_value(value) {} 320 destroyer( const destroyer& ) = delete; 321 destroyer& operator=( const destroyer& ) = delete; 322 ~destroyer() {my_value.~T();} 323 }; // class destroyer 324 325 void copy_item( padded_page& dst, size_type dindex, const padded_page& src, size_type sindex, 326 item_constructor_type construct_item ) 327 { 328 auto& src_item = src[sindex]; 329 construct_item( &dst[dindex], static_cast<const void*>(&src_item) ); 330 } 331 332 void assign_and_destroy_item( void* dst, padded_page& src, size_type index ) { 333 auto& from = src[index]; 334 destroyer d(from); 335 *static_cast<T*>(dst) = std::move(from); 336 } 337 338 void spin_wait_until_my_turn( std::atomic<ticket_type>& counter, ticket_type k, queue_rep_type& rb ) const { 339 for (atomic_backoff b(true);; b.pause()) { 340 ticket_type c = counter; 341 if (c == k) return; 342 else if (c & 1) { 343 ++rb.n_invalid_entries; 344 throw_exception( exception_id::bad_last_alloc); 345 } 346 } 347 } 348 349 std::atomic<padded_page*> head_page{}; 350 std::atomic<ticket_type> head_counter{}; 351 352 std::atomic<padded_page*> tail_page{}; 353 std::atomic<ticket_type> tail_counter{}; 354 355 spin_mutex page_mutex{}; 356 }; // class micro_queue 357 358 #if _MSC_VER && !defined(__INTEL_COMPILER) 359 #pragma warning( pop ) 360 #endif // warning 4146 is back 361 362 template <typename Container, typename T, typename Allocator> 363 class micro_queue_pop_finalizer { 364 public: 365 using padded_page = typename Container::padded_page; 366 using allocator_type = Allocator; 367 using allocator_traits_type = tbb::detail::allocator_traits<allocator_type>; 368 369 micro_queue_pop_finalizer( Container& queue, Allocator& alloc, ticket_type k, padded_page* p ) : 370 my_ticket_type(k), my_queue(queue), my_page(p), allocator(alloc) 371 {} 372 373 micro_queue_pop_finalizer( const micro_queue_pop_finalizer& ) = delete; 374 micro_queue_pop_finalizer& operator=( const micro_queue_pop_finalizer& ) = delete; 375 376 ~micro_queue_pop_finalizer() { 377 padded_page* p = my_page; 378 if( is_valid_page(p) ) { 379 spin_mutex::scoped_lock lock( my_queue.page_mutex ); 380 padded_page* q = p->next; 381 my_queue.head_page.store(q, std::memory_order_release); 382 if( !is_valid_page(q) ) { 383 my_queue.tail_page.store(nullptr, std::memory_order_release); 384 } 385 } 386 my_queue.head_counter.store(my_ticket_type, std::memory_order_release); 387 if ( is_valid_page(p) ) { 388 allocator_traits_type::destroy(allocator, static_cast<padded_page*>(p)); 389 allocator_traits_type::deallocate(allocator, static_cast<padded_page*>(p), 1); 390 } 391 } 392 private: 393 ticket_type my_ticket_type; 394 Container& my_queue; 395 padded_page* my_page; 396 Allocator& allocator; 397 }; // class micro_queue_pop_finalizer 398 399 #if _MSC_VER && !defined(__INTEL_COMPILER) 400 // structure was padded due to alignment specifier 401 #pragma warning( push ) 402 #pragma warning( disable: 4324 ) 403 #endif 404 405 template <typename T, typename Allocator> 406 struct concurrent_queue_rep { 407 using self_type = concurrent_queue_rep<T, Allocator>; 408 using size_type = std::size_t; 409 using micro_queue_type = micro_queue<T, Allocator>; 410 using allocator_type = Allocator; 411 using allocator_traits_type = tbb::detail::allocator_traits<allocator_type>; 412 using padded_page = typename micro_queue_type::padded_page; 413 using page_allocator_type = typename micro_queue_type::page_allocator_type; 414 using item_constructor_type = typename micro_queue_type::item_constructor_type; 415 private: 416 using page_allocator_traits = tbb::detail::allocator_traits<page_allocator_type>; 417 using queue_allocator_type = typename allocator_traits_type::template rebind_alloc<self_type>; 418 419 public: 420 // must be power of 2 421 static constexpr size_type n_queue = 8; 422 // Approximately n_queue/golden ratio 423 static constexpr size_type phi = 3; 424 static constexpr size_type item_size = micro_queue_type::item_size; 425 static constexpr size_type items_per_page = micro_queue_type::items_per_page; 426 427 concurrent_queue_rep() {} 428 429 concurrent_queue_rep( const concurrent_queue_rep& ) = delete; 430 concurrent_queue_rep& operator=( const concurrent_queue_rep& ) = delete; 431 432 void clear( queue_allocator_type& alloc ) { 433 page_allocator_type page_allocator(alloc); 434 for (size_type i = 0; i < n_queue; ++i) { 435 padded_page* tail_page = array[i].get_tail_page(); 436 if( is_valid_page(tail_page) ) { 437 __TBB_ASSERT(array[i].get_head_page() == tail_page, "at most one page should remain" ); 438 page_allocator_traits::destroy(page_allocator, static_cast<padded_page*>(tail_page)); 439 page_allocator_traits::deallocate(page_allocator, static_cast<padded_page*>(tail_page), 1); 440 array[i].set_tail_page(nullptr); 441 } else { 442 __TBB_ASSERT(!is_valid_page(array[i].get_head_page()), "head page pointer corrupt?"); 443 } 444 } 445 } 446 447 void assign( const concurrent_queue_rep& src, queue_allocator_type& alloc, item_constructor_type construct_item ) { 448 head_counter.store(src.head_counter.load(std::memory_order_relaxed), std::memory_order_relaxed); 449 tail_counter.store(src.tail_counter.load(std::memory_order_relaxed), std::memory_order_relaxed); 450 n_invalid_entries.store(src.n_invalid_entries.load(std::memory_order_relaxed), std::memory_order_relaxed); 451 452 // copy or move micro_queues 453 size_type queue_idx = 0; 454 try_call( [&] { 455 for (; queue_idx < n_queue; ++queue_idx) { 456 array[queue_idx].assign(src.array[queue_idx], alloc, construct_item); 457 } 458 }).on_exception( [&] { 459 for (size_type i = 0; i < queue_idx + 1; ++i) { 460 array[i].clear(alloc); 461 } 462 head_counter.store(0, std::memory_order_relaxed); 463 tail_counter.store(0, std::memory_order_relaxed); 464 n_invalid_entries.store(0, std::memory_order_relaxed); 465 }); 466 467 __TBB_ASSERT(head_counter.load(std::memory_order_relaxed) == src.head_counter.load(std::memory_order_relaxed) && 468 tail_counter.load(std::memory_order_relaxed) == src.tail_counter.load(std::memory_order_relaxed), 469 "the source concurrent queue should not be concurrently modified." ); 470 } 471 472 bool empty() const { 473 ticket_type tc = tail_counter.load(std::memory_order_acquire); 474 ticket_type hc = head_counter.load(std::memory_order_relaxed); 475 // if tc!=r.tail_counter, the queue was not empty at some point between the two reads. 476 return tc == tail_counter.load(std::memory_order_relaxed) && 477 std::ptrdiff_t(tc - hc - n_invalid_entries.load(std::memory_order_relaxed)) <= 0; 478 } 479 480 std::ptrdiff_t size() const { 481 __TBB_ASSERT(sizeof(std::ptrdiff_t) <= sizeof(size_type), NULL); 482 std::ptrdiff_t hc = head_counter.load(std::memory_order_acquire); 483 std::ptrdiff_t tc = tail_counter.load(std::memory_order_relaxed); 484 std::ptrdiff_t nie = n_invalid_entries.load(std::memory_order_relaxed); 485 486 return tc - hc - nie; 487 } 488 489 friend class micro_queue<T, Allocator>; 490 491 // Map ticket_type to an array index 492 static size_type index( ticket_type k ) { 493 return k * phi % n_queue; 494 } 495 496 micro_queue_type& choose( ticket_type k ) { 497 // The formula here approximates LRU in a cache-oblivious way. 498 return array[index(k)]; 499 } 500 501 alignas(max_nfs_size) micro_queue_type array[n_queue]; 502 503 alignas(max_nfs_size) std::atomic<ticket_type> head_counter{}; 504 alignas(max_nfs_size) std::atomic<ticket_type> tail_counter{}; 505 alignas(max_nfs_size) std::atomic<size_type> n_invalid_entries{}; 506 }; // class concurrent_queue_rep 507 508 #if _MSC_VER && !defined(__INTEL_COMPILER) 509 #pragma warning( pop ) 510 #endif 511 512 template <typename Value, typename Allocator> 513 class concurrent_queue_iterator_base { 514 using queue_rep_type = concurrent_queue_rep<Value, Allocator>; 515 using padded_page = typename queue_rep_type::padded_page; 516 protected: 517 concurrent_queue_iterator_base() = default; 518 519 concurrent_queue_iterator_base( const concurrent_queue_iterator_base& other ) { 520 assign(other); 521 } 522 523 concurrent_queue_iterator_base( queue_rep_type* queue_rep ) 524 : my_queue_rep(queue_rep), 525 my_head_counter(my_queue_rep->head_counter.load(std::memory_order_relaxed)) 526 { 527 for (std::size_t i = 0; i < queue_rep_type::n_queue; ++i) { 528 my_array[i] = my_queue_rep->array[i].get_head_page(); 529 } 530 531 if (!get_item(my_item, my_head_counter)) advance(); 532 } 533 534 void assign( const concurrent_queue_iterator_base& other ) { 535 my_item = other.my_item; 536 my_queue_rep = other.my_queue_rep; 537 538 if (my_queue_rep != nullptr) { 539 my_head_counter = other.my_head_counter; 540 541 for (std::size_t i = 0; i < queue_rep_type::n_queue; ++i) { 542 my_array[i] = other.my_array[i]; 543 } 544 } 545 } 546 547 void advance() { 548 __TBB_ASSERT(my_item, "Attempt to increment iterator past end of the queue"); 549 std::size_t k = my_head_counter; 550 #if TBB_USE_ASSERT 551 Value* tmp; 552 get_item(tmp, k); 553 __TBB_ASSERT(my_item == tmp, nullptr); 554 #endif 555 std::size_t i = modulo_power_of_two(k / queue_rep_type::n_queue, my_queue_rep->items_per_page); 556 if (i == my_queue_rep->items_per_page - 1) { 557 padded_page*& root = my_array[queue_rep_type::index(k)]; 558 root = root->next; 559 } 560 // Advance k 561 my_head_counter = ++k; 562 if (!get_item(my_item, k)) advance(); 563 } 564 565 concurrent_queue_iterator_base& operator=( const concurrent_queue_iterator_base& other ) { 566 this->assign(other); 567 return *this; 568 } 569 570 bool get_item( Value*& item, std::size_t k ) { 571 if (k == my_queue_rep->tail_counter.load(std::memory_order_relaxed)) { 572 item = nullptr; 573 return true; 574 } else { 575 padded_page* p = my_array[queue_rep_type::index(k)]; 576 __TBB_ASSERT(p, nullptr); 577 std::size_t i = modulo_power_of_two(k / queue_rep_type::n_queue, my_queue_rep->items_per_page); 578 item = &(*p)[i]; 579 return (p->mask & uintptr_t(1) << i) != 0; 580 } 581 } 582 583 Value* my_item{ nullptr }; 584 queue_rep_type* my_queue_rep{ nullptr }; 585 ticket_type my_head_counter{}; 586 padded_page* my_array[queue_rep_type::n_queue]{}; 587 }; // class concurrent_queue_iterator_base 588 589 struct concurrent_queue_iterator_provider { 590 template <typename Iterator, typename Container> 591 static Iterator get( const Container& container ) { 592 return Iterator(container); 593 } 594 }; // struct concurrent_queue_iterator_provider 595 596 template <typename Container, typename Value, typename Allocator> 597 class concurrent_queue_iterator : public concurrent_queue_iterator_base<typename std::remove_cv<Value>::type, Allocator> { 598 using base_type = concurrent_queue_iterator_base<typename std::remove_cv<Value>::type, Allocator>; 599 public: 600 using value_type = Value; 601 using pointer = value_type*; 602 using reference = value_type&; 603 using difference_type = std::ptrdiff_t; 604 using iterator_category = std::forward_iterator_tag; 605 606 concurrent_queue_iterator() = default; 607 608 /** If Value==Container::value_type, then this routine is the copy constructor. 609 If Value==const Container::value_type, then this routine is a conversion constructor. */ 610 concurrent_queue_iterator( const concurrent_queue_iterator<Container, typename Container::value_type, Allocator>& other ) 611 : base_type(other) {} 612 613 private: 614 concurrent_queue_iterator( const Container& container ) 615 : base_type(container.my_queue_representation) {} 616 public: 617 concurrent_queue_iterator& operator=( const concurrent_queue_iterator<Container, typename Container::value_type, Allocator>& other ) { 618 this->assign(other); 619 return *this; 620 } 621 622 reference operator*() const { 623 return *static_cast<pointer>(this->my_item); 624 } 625 626 pointer operator->() const { return &operator*(); } 627 628 concurrent_queue_iterator& operator++() { 629 this->advance(); 630 return *this; 631 } 632 633 concurrent_queue_iterator operator++(int) { 634 concurrent_queue_iterator tmp = *this; 635 ++*this; 636 return tmp; 637 } 638 639 friend bool operator==( const concurrent_queue_iterator& lhs, const concurrent_queue_iterator& rhs ) { 640 return lhs.my_item == rhs.my_item; 641 } 642 643 friend bool operator!=( const concurrent_queue_iterator& lhs, const concurrent_queue_iterator& rhs ) { 644 return lhs.my_item != rhs.my_item; 645 } 646 private: 647 friend struct concurrent_queue_iterator_provider; 648 }; // class concurrent_queue_iterator 649 650 } // namespace d2 651 } // namespace detail 652 } // tbb 653 654 #endif // __TBB_detail__concurrent_queue_base_H 655