1 /* 2 Copyright (c) 2005-2022 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #ifndef __TBB_detail__concurrent_queue_base_H 18 #define __TBB_detail__concurrent_queue_base_H 19 20 #include "_utils.h" 21 #include "_exception.h" 22 #include "_machine.h" 23 #include "_allocator_traits.h" 24 25 #include "../profiling.h" 26 #include "../spin_mutex.h" 27 #include "../cache_aligned_allocator.h" 28 29 #include <atomic> 30 31 namespace tbb { 32 namespace detail { 33 namespace d2 { 34 35 using ticket_type = std::size_t; 36 37 template <typename Page> 38 inline bool is_valid_page(const Page p) { 39 return reinterpret_cast<std::uintptr_t>(p) > 1; 40 } 41 42 template <typename T, typename Allocator> 43 struct concurrent_queue_rep; 44 45 template <typename Container, typename T, typename Allocator> 46 class micro_queue_pop_finalizer; 47 48 #if _MSC_VER && !defined(__INTEL_COMPILER) 49 // unary minus operator applied to unsigned type, result still unsigned 50 #pragma warning( push ) 51 #pragma warning( disable: 4146 ) 52 #endif 53 54 // A queue using simple locking. 55 // For efficiency, this class has no constructor. 56 // The caller is expected to zero-initialize it. 57 template <typename T, typename Allocator> 58 class micro_queue { 59 private: 60 using queue_rep_type = concurrent_queue_rep<T, Allocator>; 61 using self_type = micro_queue<T, Allocator>; 62 public: 63 using size_type = std::size_t; 64 using value_type = T; 65 using reference = value_type&; 66 using const_reference = const value_type&; 67 68 using allocator_type = Allocator; 69 using allocator_traits_type = tbb::detail::allocator_traits<allocator_type>; 70 using queue_allocator_type = typename allocator_traits_type::template rebind_alloc<queue_rep_type>; 71 72 static constexpr size_type item_size = sizeof(T); 73 static constexpr size_type items_per_page = item_size <= 8 ? 32 : 74 item_size <= 16 ? 16 : 75 item_size <= 32 ? 8 : 76 item_size <= 64 ? 4 : 77 item_size <= 128 ? 2 : 1; 78 79 struct padded_page { 80 padded_page() {} 81 ~padded_page() {} 82 83 reference operator[] (std::size_t index) { 84 __TBB_ASSERT(index < items_per_page, "Index out of range"); 85 return items[index]; 86 } 87 88 const_reference operator[] (std::size_t index) const { 89 __TBB_ASSERT(index < items_per_page, "Index out of range"); 90 return items[index]; 91 } 92 93 padded_page* next{ nullptr }; 94 std::atomic<std::uintptr_t> mask{}; 95 96 union { 97 value_type items[items_per_page]; 98 }; 99 }; // struct padded_page 100 101 using page_allocator_type = typename allocator_traits_type::template rebind_alloc<padded_page>; 102 protected: 103 using page_allocator_traits = tbb::detail::allocator_traits<page_allocator_type>; 104 105 public: 106 using item_constructor_type = void (*)(value_type* location, const void* src); 107 micro_queue() = default; 108 micro_queue( const micro_queue& ) = delete; 109 micro_queue& operator=( const micro_queue& ) = delete; 110 111 size_type prepare_page( ticket_type k, queue_rep_type& base, page_allocator_type page_allocator, 112 padded_page*& p ) { 113 __TBB_ASSERT(p == nullptr, "Invalid page argument for prepare_page"); 114 k &= -queue_rep_type::n_queue; 115 size_type index = modulo_power_of_two(k / queue_rep_type::n_queue, items_per_page); 116 if (!index) { 117 try_call( [&] { 118 p = page_allocator_traits::allocate(page_allocator, 1); 119 }).on_exception( [&] { 120 ++base.n_invalid_entries; 121 invalidate_page( k ); 122 }); 123 page_allocator_traits::construct(page_allocator, p); 124 } 125 126 spin_wait_until_my_turn(tail_counter, k, base); 127 d1::call_itt_notify(d1::acquired, &tail_counter); 128 129 if (p) { 130 spin_mutex::scoped_lock lock( page_mutex ); 131 padded_page* q = tail_page.load(std::memory_order_relaxed); 132 if (is_valid_page(q)) { 133 q->next = p; 134 } else { 135 head_page.store(p, std::memory_order_relaxed); 136 } 137 tail_page.store(p, std::memory_order_relaxed); 138 } else { 139 p = tail_page.load(std::memory_order_relaxed); 140 } 141 return index; 142 } 143 144 template<typename... Args> 145 void push( ticket_type k, queue_rep_type& base, queue_allocator_type& allocator, Args&&... args ) 146 { 147 padded_page* p = nullptr; 148 page_allocator_type page_allocator(allocator); 149 size_type index = prepare_page(k, base, page_allocator, p); 150 __TBB_ASSERT(p != nullptr, "Page was not prepared"); 151 152 // try_call API is not convenient here due to broken 153 // variadic capture on GCC 4.8.5 154 auto value_guard = make_raii_guard([&] { 155 ++base.n_invalid_entries; 156 d1::call_itt_notify(d1::releasing, &tail_counter); 157 tail_counter.fetch_add(queue_rep_type::n_queue); 158 }); 159 160 page_allocator_traits::construct(page_allocator, &(*p)[index], std::forward<Args>(args)...); 161 // If no exception was thrown, mark item as present. 162 p->mask.store(p->mask.load(std::memory_order_relaxed) | uintptr_t(1) << index, std::memory_order_relaxed); 163 d1::call_itt_notify(d1::releasing, &tail_counter); 164 165 value_guard.dismiss(); 166 tail_counter.fetch_add(queue_rep_type::n_queue); 167 } 168 169 void abort_push( ticket_type k, queue_rep_type& base, queue_allocator_type& allocator ) { 170 padded_page* p = nullptr; 171 prepare_page(k, base, allocator, p); 172 ++base.n_invalid_entries; 173 tail_counter.fetch_add(queue_rep_type::n_queue); 174 } 175 176 bool pop( void* dst, ticket_type k, queue_rep_type& base, queue_allocator_type& allocator ) { 177 k &= -queue_rep_type::n_queue; 178 spin_wait_until_eq(head_counter, k); 179 d1::call_itt_notify(d1::acquired, &head_counter); 180 spin_wait_while_eq(tail_counter, k); 181 d1::call_itt_notify(d1::acquired, &tail_counter); 182 padded_page *p = head_page.load(std::memory_order_relaxed); 183 __TBB_ASSERT( p, nullptr ); 184 size_type index = modulo_power_of_two( k/queue_rep_type::n_queue, items_per_page ); 185 bool success = false; 186 { 187 page_allocator_type page_allocator(allocator); 188 micro_queue_pop_finalizer<self_type, value_type, page_allocator_type> finalizer(*this, page_allocator, 189 k + queue_rep_type::n_queue, index == items_per_page - 1 ? p : nullptr ); 190 if (p->mask.load(std::memory_order_relaxed) & (std::uintptr_t(1) << index)) { 191 success = true; 192 assign_and_destroy_item(dst, *p, index); 193 } else { 194 --base.n_invalid_entries; 195 } 196 } 197 return success; 198 } 199 200 micro_queue& assign( const micro_queue& src, queue_allocator_type& allocator, 201 item_constructor_type construct_item ) 202 { 203 head_counter.store(src.head_counter.load(std::memory_order_relaxed), std::memory_order_relaxed); 204 tail_counter.store(src.tail_counter.load(std::memory_order_relaxed), std::memory_order_relaxed); 205 206 const padded_page* srcp = src.head_page.load(std::memory_order_relaxed); 207 if( is_valid_page(srcp) ) { 208 ticket_type g_index = head_counter.load(std::memory_order_relaxed); 209 size_type n_items = (tail_counter.load(std::memory_order_relaxed) - head_counter.load(std::memory_order_relaxed)) 210 / queue_rep_type::n_queue; 211 size_type index = modulo_power_of_two(head_counter.load(std::memory_order_relaxed) / queue_rep_type::n_queue, items_per_page); 212 size_type end_in_first_page = (index+n_items < items_per_page) ? (index + n_items) : items_per_page; 213 214 try_call( [&] { 215 head_page.store(make_copy(allocator, srcp, index, end_in_first_page, g_index, construct_item), std::memory_order_relaxed); 216 }).on_exception( [&] { 217 head_counter.store(0, std::memory_order_relaxed); 218 tail_counter.store(0, std::memory_order_relaxed); 219 }); 220 padded_page* cur_page = head_page.load(std::memory_order_relaxed); 221 222 try_call( [&] { 223 if (srcp != src.tail_page.load(std::memory_order_relaxed)) { 224 for (srcp = srcp->next; srcp != src.tail_page.load(std::memory_order_relaxed); srcp=srcp->next ) { 225 cur_page->next = make_copy( allocator, srcp, 0, items_per_page, g_index, construct_item ); 226 cur_page = cur_page->next; 227 } 228 229 __TBB_ASSERT(srcp == src.tail_page.load(std::memory_order_relaxed), nullptr ); 230 size_type last_index = modulo_power_of_two(tail_counter.load(std::memory_order_relaxed) / queue_rep_type::n_queue, items_per_page); 231 if( last_index==0 ) last_index = items_per_page; 232 233 cur_page->next = make_copy( allocator, srcp, 0, last_index, g_index, construct_item ); 234 cur_page = cur_page->next; 235 } 236 tail_page.store(cur_page, std::memory_order_relaxed); 237 }).on_exception( [&] { 238 padded_page* invalid_page = reinterpret_cast<padded_page*>(std::uintptr_t(1)); 239 tail_page.store(invalid_page, std::memory_order_relaxed); 240 }); 241 } else { 242 head_page.store(nullptr, std::memory_order_relaxed); 243 tail_page.store(nullptr, std::memory_order_relaxed); 244 } 245 return *this; 246 } 247 248 padded_page* make_copy( queue_allocator_type& allocator, const padded_page* src_page, size_type begin_in_page, 249 size_type end_in_page, ticket_type& g_index, item_constructor_type construct_item ) 250 { 251 page_allocator_type page_allocator(allocator); 252 padded_page* new_page = page_allocator_traits::allocate(page_allocator, 1); 253 new_page->next = nullptr; 254 new_page->mask.store(src_page->mask.load(std::memory_order_relaxed), std::memory_order_relaxed); 255 for (; begin_in_page!=end_in_page; ++begin_in_page, ++g_index) { 256 if (new_page->mask.load(std::memory_order_relaxed) & uintptr_t(1) << begin_in_page) { 257 copy_item(*new_page, begin_in_page, *src_page, begin_in_page, construct_item); 258 } 259 } 260 return new_page; 261 } 262 263 void invalidate_page( ticket_type k ) { 264 // Append an invalid page at address 1 so that no more pushes are allowed. 265 padded_page* invalid_page = reinterpret_cast<padded_page*>(std::uintptr_t(1)); 266 { 267 spin_mutex::scoped_lock lock( page_mutex ); 268 tail_counter.store(k + queue_rep_type::n_queue + 1, std::memory_order_relaxed); 269 padded_page* q = tail_page.load(std::memory_order_relaxed); 270 if (is_valid_page(q)) { 271 q->next = invalid_page; 272 } else { 273 head_page.store(invalid_page, std::memory_order_relaxed); 274 } 275 tail_page.store(invalid_page, std::memory_order_relaxed); 276 } 277 } 278 279 padded_page* get_head_page() { 280 return head_page.load(std::memory_order_relaxed); 281 } 282 283 void clear(queue_allocator_type& allocator, padded_page* new_head = nullptr, padded_page* new_tail = nullptr) { 284 padded_page* curr_page = get_head_page(); 285 size_type index = (head_counter.load(std::memory_order_relaxed) / queue_rep_type::n_queue) % items_per_page; 286 page_allocator_type page_allocator(allocator); 287 288 while (curr_page && is_valid_page(curr_page)) { 289 while (index != items_per_page) { 290 if (curr_page->mask.load(std::memory_order_relaxed) & (std::uintptr_t(1) << index)) { 291 page_allocator_traits::destroy(page_allocator, &curr_page->operator[](index)); 292 } 293 ++index; 294 } 295 296 index = 0; 297 padded_page* next_page = curr_page->next; 298 page_allocator_traits::destroy(page_allocator, curr_page); 299 page_allocator_traits::deallocate(page_allocator, curr_page, 1); 300 curr_page = next_page; 301 } 302 head_counter.store(0, std::memory_order_relaxed); 303 tail_counter.store(0, std::memory_order_relaxed); 304 head_page.store(new_head, std::memory_order_relaxed); 305 tail_page.store(new_tail, std::memory_order_relaxed); 306 } 307 308 void clear_and_invalidate(queue_allocator_type& allocator) { 309 padded_page* invalid_page = reinterpret_cast<padded_page*>(std::uintptr_t(1)); 310 clear(allocator, invalid_page, invalid_page); 311 } 312 313 private: 314 // template <typename U, typename A> 315 friend class micro_queue_pop_finalizer<self_type, value_type, page_allocator_type>; 316 317 // Class used to ensure exception-safety of method "pop" 318 class destroyer { 319 value_type& my_value; 320 public: 321 destroyer( reference value ) : my_value(value) {} 322 destroyer( const destroyer& ) = delete; 323 destroyer& operator=( const destroyer& ) = delete; 324 ~destroyer() {my_value.~T();} 325 }; // class destroyer 326 327 void copy_item( padded_page& dst, size_type dindex, const padded_page& src, size_type sindex, 328 item_constructor_type construct_item ) 329 { 330 auto& src_item = src[sindex]; 331 construct_item( &dst[dindex], static_cast<const void*>(&src_item) ); 332 } 333 334 void assign_and_destroy_item( void* dst, padded_page& src, size_type index ) { 335 auto& from = src[index]; 336 destroyer d(from); 337 *static_cast<T*>(dst) = std::move(from); 338 } 339 340 void spin_wait_until_my_turn( std::atomic<ticket_type>& counter, ticket_type k, queue_rep_type& rb ) const { 341 for (atomic_backoff b{};; b.pause()) { 342 ticket_type c = counter.load(std::memory_order_acquire); 343 if (c == k) return; 344 else if (c & 1) { 345 ++rb.n_invalid_entries; 346 throw_exception( exception_id::bad_last_alloc); 347 } 348 } 349 } 350 351 std::atomic<padded_page*> head_page{}; 352 std::atomic<ticket_type> head_counter{}; 353 354 std::atomic<padded_page*> tail_page{}; 355 std::atomic<ticket_type> tail_counter{}; 356 357 spin_mutex page_mutex{}; 358 }; // class micro_queue 359 360 #if _MSC_VER && !defined(__INTEL_COMPILER) 361 #pragma warning( pop ) 362 #endif // warning 4146 is back 363 364 template <typename Container, typename T, typename Allocator> 365 class micro_queue_pop_finalizer { 366 public: 367 using padded_page = typename Container::padded_page; 368 using allocator_type = Allocator; 369 using allocator_traits_type = tbb::detail::allocator_traits<allocator_type>; 370 371 micro_queue_pop_finalizer( Container& queue, Allocator& alloc, ticket_type k, padded_page* p ) : 372 my_ticket_type(k), my_queue(queue), my_page(p), allocator(alloc) 373 {} 374 375 micro_queue_pop_finalizer( const micro_queue_pop_finalizer& ) = delete; 376 micro_queue_pop_finalizer& operator=( const micro_queue_pop_finalizer& ) = delete; 377 378 ~micro_queue_pop_finalizer() { 379 padded_page* p = my_page; 380 if( is_valid_page(p) ) { 381 spin_mutex::scoped_lock lock( my_queue.page_mutex ); 382 padded_page* q = p->next; 383 my_queue.head_page.store(q, std::memory_order_relaxed); 384 if( !is_valid_page(q) ) { 385 my_queue.tail_page.store(nullptr, std::memory_order_relaxed); 386 } 387 } 388 my_queue.head_counter.store(my_ticket_type, std::memory_order_release); 389 if ( is_valid_page(p) ) { 390 allocator_traits_type::destroy(allocator, static_cast<padded_page*>(p)); 391 allocator_traits_type::deallocate(allocator, static_cast<padded_page*>(p), 1); 392 } 393 } 394 private: 395 ticket_type my_ticket_type; 396 Container& my_queue; 397 padded_page* my_page; 398 Allocator& allocator; 399 }; // class micro_queue_pop_finalizer 400 401 #if _MSC_VER && !defined(__INTEL_COMPILER) 402 // structure was padded due to alignment specifier 403 #pragma warning( push ) 404 #pragma warning( disable: 4324 ) 405 #endif 406 407 template <typename T, typename Allocator> 408 struct concurrent_queue_rep { 409 using self_type = concurrent_queue_rep<T, Allocator>; 410 using size_type = std::size_t; 411 using micro_queue_type = micro_queue<T, Allocator>; 412 using allocator_type = Allocator; 413 using allocator_traits_type = tbb::detail::allocator_traits<allocator_type>; 414 using padded_page = typename micro_queue_type::padded_page; 415 using page_allocator_type = typename micro_queue_type::page_allocator_type; 416 using item_constructor_type = typename micro_queue_type::item_constructor_type; 417 private: 418 using page_allocator_traits = tbb::detail::allocator_traits<page_allocator_type>; 419 using queue_allocator_type = typename allocator_traits_type::template rebind_alloc<self_type>; 420 421 public: 422 // must be power of 2 423 static constexpr size_type n_queue = 8; 424 // Approximately n_queue/golden ratio 425 static constexpr size_type phi = 3; 426 static constexpr size_type item_size = micro_queue_type::item_size; 427 static constexpr size_type items_per_page = micro_queue_type::items_per_page; 428 429 concurrent_queue_rep() {} 430 431 concurrent_queue_rep( const concurrent_queue_rep& ) = delete; 432 concurrent_queue_rep& operator=( const concurrent_queue_rep& ) = delete; 433 434 void clear( queue_allocator_type& alloc ) { 435 for (size_type index = 0; index < n_queue; ++index) { 436 array[index].clear(alloc); 437 } 438 head_counter.store(0, std::memory_order_relaxed); 439 tail_counter.store(0, std::memory_order_relaxed); 440 n_invalid_entries.store(0, std::memory_order_relaxed); 441 } 442 443 void assign( const concurrent_queue_rep& src, queue_allocator_type& alloc, item_constructor_type construct_item ) { 444 head_counter.store(src.head_counter.load(std::memory_order_relaxed), std::memory_order_relaxed); 445 tail_counter.store(src.tail_counter.load(std::memory_order_relaxed), std::memory_order_relaxed); 446 n_invalid_entries.store(src.n_invalid_entries.load(std::memory_order_relaxed), std::memory_order_relaxed); 447 448 // copy or move micro_queues 449 size_type queue_idx = 0; 450 try_call( [&] { 451 for (; queue_idx < n_queue; ++queue_idx) { 452 array[queue_idx].assign(src.array[queue_idx], alloc, construct_item); 453 } 454 }).on_exception( [&] { 455 for (size_type i = 0; i < queue_idx + 1; ++i) { 456 array[i].clear_and_invalidate(alloc); 457 } 458 head_counter.store(0, std::memory_order_relaxed); 459 tail_counter.store(0, std::memory_order_relaxed); 460 n_invalid_entries.store(0, std::memory_order_relaxed); 461 }); 462 463 __TBB_ASSERT(head_counter.load(std::memory_order_relaxed) == src.head_counter.load(std::memory_order_relaxed) && 464 tail_counter.load(std::memory_order_relaxed) == src.tail_counter.load(std::memory_order_relaxed), 465 "the source concurrent queue should not be concurrently modified." ); 466 } 467 468 bool empty() const { 469 ticket_type tc = tail_counter.load(std::memory_order_acquire); 470 ticket_type hc = head_counter.load(std::memory_order_relaxed); 471 // if tc!=r.tail_counter, the queue was not empty at some point between the two reads. 472 return tc == tail_counter.load(std::memory_order_relaxed) && 473 std::ptrdiff_t(tc - hc - n_invalid_entries.load(std::memory_order_relaxed)) <= 0; 474 } 475 476 std::ptrdiff_t size() const { 477 __TBB_ASSERT(sizeof(std::ptrdiff_t) <= sizeof(size_type), nullptr); 478 std::ptrdiff_t hc = head_counter.load(std::memory_order_acquire); 479 std::ptrdiff_t tc = tail_counter.load(std::memory_order_relaxed); 480 std::ptrdiff_t nie = n_invalid_entries.load(std::memory_order_relaxed); 481 482 return tc - hc - nie; 483 } 484 485 friend class micro_queue<T, Allocator>; 486 487 // Map ticket_type to an array index 488 static size_type index( ticket_type k ) { 489 return k * phi % n_queue; 490 } 491 492 micro_queue_type& choose( ticket_type k ) { 493 // The formula here approximates LRU in a cache-oblivious way. 494 return array[index(k)]; 495 } 496 497 alignas(max_nfs_size) micro_queue_type array[n_queue]; 498 499 alignas(max_nfs_size) std::atomic<ticket_type> head_counter{}; 500 alignas(max_nfs_size) std::atomic<ticket_type> tail_counter{}; 501 alignas(max_nfs_size) std::atomic<size_type> n_invalid_entries{}; 502 }; // class concurrent_queue_rep 503 504 #if _MSC_VER && !defined(__INTEL_COMPILER) 505 #pragma warning( pop ) 506 #endif 507 508 template <typename Value, typename Allocator> 509 class concurrent_queue_iterator_base { 510 using queue_rep_type = concurrent_queue_rep<Value, Allocator>; 511 using padded_page = typename queue_rep_type::padded_page; 512 protected: 513 concurrent_queue_iterator_base() = default; 514 515 concurrent_queue_iterator_base( const concurrent_queue_iterator_base& other ) { 516 assign(other); 517 } 518 519 concurrent_queue_iterator_base( queue_rep_type* queue_rep ) 520 : my_queue_rep(queue_rep), 521 my_head_counter(my_queue_rep->head_counter.load(std::memory_order_relaxed)) 522 { 523 for (std::size_t i = 0; i < queue_rep_type::n_queue; ++i) { 524 my_array[i] = my_queue_rep->array[i].get_head_page(); 525 } 526 527 if (!get_item(my_item, my_head_counter)) advance(); 528 } 529 530 void assign( const concurrent_queue_iterator_base& other ) { 531 my_item = other.my_item; 532 my_queue_rep = other.my_queue_rep; 533 534 if (my_queue_rep != nullptr) { 535 my_head_counter = other.my_head_counter; 536 537 for (std::size_t i = 0; i < queue_rep_type::n_queue; ++i) { 538 my_array[i] = other.my_array[i]; 539 } 540 } 541 } 542 543 void advance() { 544 __TBB_ASSERT(my_item, "Attempt to increment iterator past end of the queue"); 545 std::size_t k = my_head_counter; 546 #if TBB_USE_ASSERT 547 Value* tmp; 548 get_item(tmp, k); 549 __TBB_ASSERT(my_item == tmp, nullptr); 550 #endif 551 std::size_t i = modulo_power_of_two(k / queue_rep_type::n_queue, my_queue_rep->items_per_page); 552 if (i == my_queue_rep->items_per_page - 1) { 553 padded_page*& root = my_array[queue_rep_type::index(k)]; 554 root = root->next; 555 } 556 // Advance k 557 my_head_counter = ++k; 558 if (!get_item(my_item, k)) advance(); 559 } 560 561 concurrent_queue_iterator_base& operator=( const concurrent_queue_iterator_base& other ) { 562 this->assign(other); 563 return *this; 564 } 565 566 bool get_item( Value*& item, std::size_t k ) { 567 if (k == my_queue_rep->tail_counter.load(std::memory_order_relaxed)) { 568 item = nullptr; 569 return true; 570 } else { 571 padded_page* p = my_array[queue_rep_type::index(k)]; 572 __TBB_ASSERT(p, nullptr); 573 std::size_t i = modulo_power_of_two(k / queue_rep_type::n_queue, my_queue_rep->items_per_page); 574 item = &(*p)[i]; 575 return (p->mask & uintptr_t(1) << i) != 0; 576 } 577 } 578 579 Value* my_item{ nullptr }; 580 queue_rep_type* my_queue_rep{ nullptr }; 581 ticket_type my_head_counter{}; 582 padded_page* my_array[queue_rep_type::n_queue]{}; 583 }; // class concurrent_queue_iterator_base 584 585 struct concurrent_queue_iterator_provider { 586 template <typename Iterator, typename Container> 587 static Iterator get( const Container& container ) { 588 return Iterator(container); 589 } 590 }; // struct concurrent_queue_iterator_provider 591 592 template <typename Container, typename Value, typename Allocator> 593 class concurrent_queue_iterator : public concurrent_queue_iterator_base<typename std::remove_cv<Value>::type, Allocator> { 594 using base_type = concurrent_queue_iterator_base<typename std::remove_cv<Value>::type, Allocator>; 595 public: 596 using value_type = Value; 597 using pointer = value_type*; 598 using reference = value_type&; 599 using difference_type = std::ptrdiff_t; 600 using iterator_category = std::forward_iterator_tag; 601 602 concurrent_queue_iterator() = default; 603 604 /** If Value==Container::value_type, then this routine is the copy constructor. 605 If Value==const Container::value_type, then this routine is a conversion constructor. */ 606 concurrent_queue_iterator( const concurrent_queue_iterator<Container, typename Container::value_type, Allocator>& other ) 607 : base_type(other) {} 608 609 private: 610 concurrent_queue_iterator( const Container& container ) 611 : base_type(container.my_queue_representation) {} 612 public: 613 concurrent_queue_iterator& operator=( const concurrent_queue_iterator<Container, typename Container::value_type, Allocator>& other ) { 614 this->assign(other); 615 return *this; 616 } 617 618 reference operator*() const { 619 return *static_cast<pointer>(this->my_item); 620 } 621 622 pointer operator->() const { return &operator*(); } 623 624 concurrent_queue_iterator& operator++() { 625 this->advance(); 626 return *this; 627 } 628 629 concurrent_queue_iterator operator++(int) { 630 concurrent_queue_iterator tmp = *this; 631 ++*this; 632 return tmp; 633 } 634 635 friend bool operator==( const concurrent_queue_iterator& lhs, const concurrent_queue_iterator& rhs ) { 636 return lhs.my_item == rhs.my_item; 637 } 638 639 friend bool operator!=( const concurrent_queue_iterator& lhs, const concurrent_queue_iterator& rhs ) { 640 return lhs.my_item != rhs.my_item; 641 } 642 private: 643 friend struct concurrent_queue_iterator_provider; 644 }; // class concurrent_queue_iterator 645 646 } // namespace d2 647 } // namespace detail 648 } // tbb 649 650 #endif // __TBB_detail__concurrent_queue_base_H 651