149e08aacStbbdev /*
2c21e688aSSergey Zheltov     Copyright (c) 2005-2022 Intel Corporation
349e08aacStbbdev 
449e08aacStbbdev     Licensed under the Apache License, Version 2.0 (the "License");
549e08aacStbbdev     you may not use this file except in compliance with the License.
649e08aacStbbdev     You may obtain a copy of the License at
749e08aacStbbdev 
849e08aacStbbdev         http://www.apache.org/licenses/LICENSE-2.0
949e08aacStbbdev 
1049e08aacStbbdev     Unless required by applicable law or agreed to in writing, software
1149e08aacStbbdev     distributed under the License is distributed on an "AS IS" BASIS,
1249e08aacStbbdev     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1349e08aacStbbdev     See the License for the specific language governing permissions and
1449e08aacStbbdev     limitations under the License.
1549e08aacStbbdev */
1649e08aacStbbdev 
1749e08aacStbbdev #ifndef __TBB_detail__concurrent_queue_base_H
1849e08aacStbbdev #define __TBB_detail__concurrent_queue_base_H
1949e08aacStbbdev 
2049e08aacStbbdev #include "_utils.h"
2149e08aacStbbdev #include "_exception.h"
2249e08aacStbbdev #include "_machine.h"
2349e08aacStbbdev #include "_allocator_traits.h"
2449e08aacStbbdev 
2549e08aacStbbdev #include "../profiling.h"
2649e08aacStbbdev #include "../spin_mutex.h"
2749e08aacStbbdev #include "../cache_aligned_allocator.h"
2849e08aacStbbdev 
2949e08aacStbbdev #include <atomic>
3049e08aacStbbdev 
3149e08aacStbbdev namespace tbb {
3249e08aacStbbdev namespace detail {
33fbc48b39Svlserov namespace d2 {
3449e08aacStbbdev 
3549e08aacStbbdev using ticket_type = std::size_t;
3649e08aacStbbdev 
3749e08aacStbbdev template <typename Page>
is_valid_page(const Page p)3849e08aacStbbdev inline bool is_valid_page(const Page p) {
3949e08aacStbbdev     return reinterpret_cast<std::uintptr_t>(p) > 1;
4049e08aacStbbdev }
4149e08aacStbbdev 
4249e08aacStbbdev template <typename T, typename Allocator>
4349e08aacStbbdev struct concurrent_queue_rep;
4449e08aacStbbdev 
4549e08aacStbbdev template <typename Container, typename T, typename Allocator>
4649e08aacStbbdev class micro_queue_pop_finalizer;
4749e08aacStbbdev 
4849e08aacStbbdev #if _MSC_VER && !defined(__INTEL_COMPILER)
4949e08aacStbbdev // unary minus operator applied to unsigned type, result still unsigned
5049e08aacStbbdev #pragma warning( push )
5149e08aacStbbdev #pragma warning( disable: 4146 )
5249e08aacStbbdev #endif
5349e08aacStbbdev 
5449e08aacStbbdev // A queue using simple locking.
5549e08aacStbbdev // For efficiency, this class has no constructor.
5649e08aacStbbdev // The caller is expected to zero-initialize it.
5749e08aacStbbdev template <typename T, typename Allocator>
5849e08aacStbbdev class micro_queue {
5949e08aacStbbdev private:
6049e08aacStbbdev     using queue_rep_type = concurrent_queue_rep<T, Allocator>;
6149e08aacStbbdev     using self_type = micro_queue<T, Allocator>;
6249e08aacStbbdev public:
6349e08aacStbbdev     using size_type = std::size_t;
6449e08aacStbbdev     using value_type = T;
6549e08aacStbbdev     using reference = value_type&;
6649e08aacStbbdev     using const_reference = const value_type&;
6749e08aacStbbdev 
6849e08aacStbbdev     using allocator_type = Allocator;
6949e08aacStbbdev     using allocator_traits_type = tbb::detail::allocator_traits<allocator_type>;
70fbc48b39Svlserov     using queue_allocator_type = typename allocator_traits_type::template rebind_alloc<queue_rep_type>;
7149e08aacStbbdev 
7249e08aacStbbdev     static constexpr size_type item_size = sizeof(T);
7349e08aacStbbdev     static constexpr size_type items_per_page = item_size <=   8 ? 32 :
7449e08aacStbbdev                                                 item_size <=  16 ? 16 :
7549e08aacStbbdev                                                 item_size <=  32 ?  8 :
7649e08aacStbbdev                                                 item_size <=  64 ?  4 :
7749e08aacStbbdev                                                 item_size <= 128 ?  2 : 1;
7849e08aacStbbdev 
7949e08aacStbbdev     struct padded_page {
padded_pagepadded_page8049e08aacStbbdev         padded_page() {}
~padded_pagepadded_page8149e08aacStbbdev         ~padded_page() {}
8249e08aacStbbdev 
8349e08aacStbbdev         reference operator[] (std::size_t index) {
8449e08aacStbbdev             __TBB_ASSERT(index < items_per_page, "Index out of range");
8549e08aacStbbdev             return items[index];
8649e08aacStbbdev         }
8749e08aacStbbdev 
8849e08aacStbbdev         const_reference operator[] (std::size_t index) const {
8949e08aacStbbdev             __TBB_ASSERT(index < items_per_page, "Index out of range");
9049e08aacStbbdev             return items[index];
9149e08aacStbbdev         }
9249e08aacStbbdev 
9349e08aacStbbdev         padded_page* next{ nullptr };
9449e08aacStbbdev         std::atomic<std::uintptr_t> mask{};
9549e08aacStbbdev 
9649e08aacStbbdev         union {
9749e08aacStbbdev             value_type items[items_per_page];
9849e08aacStbbdev         };
9949e08aacStbbdev     }; // struct padded_page
10049e08aacStbbdev 
10149e08aacStbbdev     using page_allocator_type = typename allocator_traits_type::template rebind_alloc<padded_page>;
10249e08aacStbbdev protected:
10349e08aacStbbdev     using page_allocator_traits = tbb::detail::allocator_traits<page_allocator_type>;
10449e08aacStbbdev 
10549e08aacStbbdev public:
10649e08aacStbbdev     using item_constructor_type = void (*)(value_type* location, const void* src);
10749e08aacStbbdev     micro_queue() = default;
10849e08aacStbbdev     micro_queue( const micro_queue& ) = delete;
10949e08aacStbbdev     micro_queue& operator=( const micro_queue& ) = delete;
11049e08aacStbbdev 
prepare_page(ticket_type k,queue_rep_type & base,page_allocator_type page_allocator,padded_page * & p)11149e08aacStbbdev     size_type prepare_page( ticket_type k, queue_rep_type& base, page_allocator_type page_allocator,
11249e08aacStbbdev                             padded_page*& p ) {
11349e08aacStbbdev         __TBB_ASSERT(p == nullptr, "Invalid page argument for prepare_page");
11449e08aacStbbdev         k &= -queue_rep_type::n_queue;
11549e08aacStbbdev         size_type index = modulo_power_of_two(k / queue_rep_type::n_queue, items_per_page);
11649e08aacStbbdev         if (!index) {
11749e08aacStbbdev             try_call( [&] {
11849e08aacStbbdev                 p = page_allocator_traits::allocate(page_allocator, 1);
11949e08aacStbbdev             }).on_exception( [&] {
12049e08aacStbbdev                 ++base.n_invalid_entries;
12149e08aacStbbdev                 invalidate_page( k );
12249e08aacStbbdev             });
12349e08aacStbbdev             page_allocator_traits::construct(page_allocator, p);
12449e08aacStbbdev         }
12549e08aacStbbdev 
126*eac94650SAlex         spin_wait_until_my_turn(tail_counter, k, base);
127fbc48b39Svlserov         d1::call_itt_notify(d1::acquired, &tail_counter);
12849e08aacStbbdev 
12949e08aacStbbdev         if (p) {
13049e08aacStbbdev             spin_mutex::scoped_lock lock( page_mutex );
13149e08aacStbbdev             padded_page* q = tail_page.load(std::memory_order_relaxed);
13249e08aacStbbdev             if (is_valid_page(q)) {
13349e08aacStbbdev                 q->next = p;
13449e08aacStbbdev             } else {
13549e08aacStbbdev                 head_page.store(p, std::memory_order_relaxed);
13649e08aacStbbdev             }
137*eac94650SAlex             tail_page.store(p, std::memory_order_relaxed);
13849e08aacStbbdev         } else {
139*eac94650SAlex             p = tail_page.load(std::memory_order_relaxed);
14049e08aacStbbdev         }
14149e08aacStbbdev         return index;
14249e08aacStbbdev     }
14349e08aacStbbdev 
14449e08aacStbbdev     template<typename... Args>
push(ticket_type k,queue_rep_type & base,queue_allocator_type & allocator,Args &&...args)145fbc48b39Svlserov     void push( ticket_type k, queue_rep_type& base, queue_allocator_type& allocator, Args&&... args )
14649e08aacStbbdev     {
14749e08aacStbbdev         padded_page* p = nullptr;
148fbc48b39Svlserov         page_allocator_type page_allocator(allocator);
14949e08aacStbbdev         size_type index = prepare_page(k, base, page_allocator, p);
15049e08aacStbbdev         __TBB_ASSERT(p != nullptr, "Page was not prepared");
15149e08aacStbbdev 
15249e08aacStbbdev         // try_call API is not convenient here due to broken
15349e08aacStbbdev         // variadic capture on GCC 4.8.5
15449e08aacStbbdev         auto value_guard = make_raii_guard([&] {
15549e08aacStbbdev             ++base.n_invalid_entries;
156fbc48b39Svlserov             d1::call_itt_notify(d1::releasing, &tail_counter);
15749e08aacStbbdev             tail_counter.fetch_add(queue_rep_type::n_queue);
15849e08aacStbbdev         });
15949e08aacStbbdev 
16049e08aacStbbdev         page_allocator_traits::construct(page_allocator, &(*p)[index], std::forward<Args>(args)...);
16149e08aacStbbdev         // If no exception was thrown, mark item as present.
16249e08aacStbbdev         p->mask.store(p->mask.load(std::memory_order_relaxed) | uintptr_t(1) << index, std::memory_order_relaxed);
163fbc48b39Svlserov         d1::call_itt_notify(d1::releasing, &tail_counter);
16449e08aacStbbdev 
16549e08aacStbbdev         value_guard.dismiss();
16649e08aacStbbdev         tail_counter.fetch_add(queue_rep_type::n_queue);
16749e08aacStbbdev     }
16849e08aacStbbdev 
abort_push(ticket_type k,queue_rep_type & base,queue_allocator_type & allocator)169fbc48b39Svlserov     void abort_push( ticket_type k, queue_rep_type& base, queue_allocator_type& allocator ) {
17049e08aacStbbdev         padded_page* p = nullptr;
171fbc48b39Svlserov         prepare_page(k, base, allocator, p);
17249e08aacStbbdev         ++base.n_invalid_entries;
17349e08aacStbbdev         tail_counter.fetch_add(queue_rep_type::n_queue);
17449e08aacStbbdev     }
17549e08aacStbbdev 
pop(void * dst,ticket_type k,queue_rep_type & base,queue_allocator_type & allocator)176fbc48b39Svlserov     bool pop( void* dst, ticket_type k, queue_rep_type& base, queue_allocator_type& allocator ) {
17749e08aacStbbdev         k &= -queue_rep_type::n_queue;
178478de5b1Stbbdev         spin_wait_until_eq(head_counter, k);
179fbc48b39Svlserov         d1::call_itt_notify(d1::acquired, &head_counter);
180478de5b1Stbbdev         spin_wait_while_eq(tail_counter, k);
181fbc48b39Svlserov         d1::call_itt_notify(d1::acquired, &tail_counter);
182*eac94650SAlex         padded_page *p = head_page.load(std::memory_order_relaxed);
18349e08aacStbbdev         __TBB_ASSERT( p, nullptr );
18449e08aacStbbdev         size_type index = modulo_power_of_two( k/queue_rep_type::n_queue, items_per_page );
18549e08aacStbbdev         bool success = false;
18649e08aacStbbdev         {
187fbc48b39Svlserov             page_allocator_type page_allocator(allocator);
18849e08aacStbbdev             micro_queue_pop_finalizer<self_type, value_type, page_allocator_type> finalizer(*this, page_allocator,
18949e08aacStbbdev                 k + queue_rep_type::n_queue, index == items_per_page - 1 ? p : nullptr );
19049e08aacStbbdev             if (p->mask.load(std::memory_order_relaxed) & (std::uintptr_t(1) << index)) {
19149e08aacStbbdev                 success = true;
19249e08aacStbbdev                 assign_and_destroy_item(dst, *p, index);
19349e08aacStbbdev             } else {
19449e08aacStbbdev                 --base.n_invalid_entries;
19549e08aacStbbdev             }
19649e08aacStbbdev         }
19749e08aacStbbdev         return success;
19849e08aacStbbdev     }
19949e08aacStbbdev 
assign(const micro_queue & src,queue_allocator_type & allocator,item_constructor_type construct_item)200fbc48b39Svlserov     micro_queue& assign( const micro_queue& src, queue_allocator_type& allocator,
20149e08aacStbbdev         item_constructor_type construct_item )
20249e08aacStbbdev     {
20349e08aacStbbdev         head_counter.store(src.head_counter.load(std::memory_order_relaxed), std::memory_order_relaxed);
20449e08aacStbbdev         tail_counter.store(src.tail_counter.load(std::memory_order_relaxed), std::memory_order_relaxed);
20549e08aacStbbdev 
20649e08aacStbbdev         const padded_page* srcp = src.head_page.load(std::memory_order_relaxed);
20749e08aacStbbdev         if( is_valid_page(srcp) ) {
20849e08aacStbbdev             ticket_type g_index = head_counter.load(std::memory_order_relaxed);
20949e08aacStbbdev             size_type n_items  = (tail_counter.load(std::memory_order_relaxed) - head_counter.load(std::memory_order_relaxed))
21049e08aacStbbdev                 / queue_rep_type::n_queue;
21149e08aacStbbdev             size_type index = modulo_power_of_two(head_counter.load(std::memory_order_relaxed) / queue_rep_type::n_queue, items_per_page);
21249e08aacStbbdev             size_type end_in_first_page = (index+n_items < items_per_page) ? (index + n_items) : items_per_page;
21349e08aacStbbdev 
21449e08aacStbbdev             try_call( [&] {
215fbc48b39Svlserov                 head_page.store(make_copy(allocator, srcp, index, end_in_first_page, g_index, construct_item), std::memory_order_relaxed);
21649e08aacStbbdev             }).on_exception( [&] {
21749e08aacStbbdev                 head_counter.store(0, std::memory_order_relaxed);
21849e08aacStbbdev                 tail_counter.store(0, std::memory_order_relaxed);
21949e08aacStbbdev             });
22049e08aacStbbdev             padded_page* cur_page = head_page.load(std::memory_order_relaxed);
22149e08aacStbbdev 
22249e08aacStbbdev             try_call( [&] {
22349e08aacStbbdev                 if (srcp != src.tail_page.load(std::memory_order_relaxed)) {
22449e08aacStbbdev                     for (srcp = srcp->next; srcp != src.tail_page.load(std::memory_order_relaxed); srcp=srcp->next ) {
225fbc48b39Svlserov                         cur_page->next = make_copy( allocator, srcp, 0, items_per_page, g_index, construct_item );
22649e08aacStbbdev                         cur_page = cur_page->next;
22749e08aacStbbdev                     }
22849e08aacStbbdev 
22949e08aacStbbdev                     __TBB_ASSERT(srcp == src.tail_page.load(std::memory_order_relaxed), nullptr );
23049e08aacStbbdev                     size_type last_index = modulo_power_of_two(tail_counter.load(std::memory_order_relaxed) / queue_rep_type::n_queue, items_per_page);
23149e08aacStbbdev                     if( last_index==0 ) last_index = items_per_page;
23249e08aacStbbdev 
233fbc48b39Svlserov                     cur_page->next = make_copy( allocator, srcp, 0, last_index, g_index, construct_item );
23449e08aacStbbdev                     cur_page = cur_page->next;
23549e08aacStbbdev                 }
23649e08aacStbbdev                 tail_page.store(cur_page, std::memory_order_relaxed);
23749e08aacStbbdev             }).on_exception( [&] {
23849e08aacStbbdev                 padded_page* invalid_page = reinterpret_cast<padded_page*>(std::uintptr_t(1));
23949e08aacStbbdev                 tail_page.store(invalid_page, std::memory_order_relaxed);
24049e08aacStbbdev             });
24149e08aacStbbdev         } else {
24249e08aacStbbdev             head_page.store(nullptr, std::memory_order_relaxed);
24349e08aacStbbdev             tail_page.store(nullptr, std::memory_order_relaxed);
24449e08aacStbbdev         }
24549e08aacStbbdev         return *this;
24649e08aacStbbdev     }
24749e08aacStbbdev 
make_copy(queue_allocator_type & allocator,const padded_page * src_page,size_type begin_in_page,size_type end_in_page,ticket_type & g_index,item_constructor_type construct_item)248fbc48b39Svlserov     padded_page* make_copy( queue_allocator_type& allocator, const padded_page* src_page, size_type begin_in_page,
24949e08aacStbbdev         size_type end_in_page, ticket_type& g_index, item_constructor_type construct_item )
25049e08aacStbbdev     {
251fbc48b39Svlserov         page_allocator_type page_allocator(allocator);
25249e08aacStbbdev         padded_page* new_page = page_allocator_traits::allocate(page_allocator, 1);
25349e08aacStbbdev         new_page->next = nullptr;
25449e08aacStbbdev         new_page->mask.store(src_page->mask.load(std::memory_order_relaxed), std::memory_order_relaxed);
25549e08aacStbbdev         for (; begin_in_page!=end_in_page; ++begin_in_page, ++g_index) {
25649e08aacStbbdev             if (new_page->mask.load(std::memory_order_relaxed) & uintptr_t(1) << begin_in_page) {
25749e08aacStbbdev                 copy_item(*new_page, begin_in_page, *src_page, begin_in_page, construct_item);
25849e08aacStbbdev             }
25949e08aacStbbdev         }
26049e08aacStbbdev         return new_page;
26149e08aacStbbdev     }
26249e08aacStbbdev 
invalidate_page(ticket_type k)26349e08aacStbbdev     void invalidate_page( ticket_type k )  {
26449e08aacStbbdev         // Append an invalid page at address 1 so that no more pushes are allowed.
26549e08aacStbbdev         padded_page* invalid_page = reinterpret_cast<padded_page*>(std::uintptr_t(1));
26649e08aacStbbdev         {
26749e08aacStbbdev             spin_mutex::scoped_lock lock( page_mutex );
26849e08aacStbbdev             tail_counter.store(k + queue_rep_type::n_queue + 1, std::memory_order_relaxed);
26949e08aacStbbdev             padded_page* q = tail_page.load(std::memory_order_relaxed);
27049e08aacStbbdev             if (is_valid_page(q)) {
27149e08aacStbbdev                 q->next = invalid_page;
27249e08aacStbbdev             } else {
27349e08aacStbbdev                 head_page.store(invalid_page, std::memory_order_relaxed);
27449e08aacStbbdev             }
27549e08aacStbbdev             tail_page.store(invalid_page, std::memory_order_relaxed);
27649e08aacStbbdev         }
27749e08aacStbbdev     }
27849e08aacStbbdev 
get_head_page()27949e08aacStbbdev     padded_page* get_head_page() {
28049e08aacStbbdev         return head_page.load(std::memory_order_relaxed);
28149e08aacStbbdev     }
28249e08aacStbbdev 
2838155aaebSkboyarinov     void clear(queue_allocator_type& allocator, padded_page* new_head = nullptr, padded_page* new_tail = nullptr) {
2848155aaebSkboyarinov         padded_page* curr_page = get_head_page();
2858155aaebSkboyarinov         size_type index = (head_counter.load(std::memory_order_relaxed) / queue_rep_type::n_queue) % items_per_page;
286fbc48b39Svlserov         page_allocator_type page_allocator(allocator);
28749e08aacStbbdev 
2888155aaebSkboyarinov         while (curr_page && is_valid_page(curr_page)) {
2898155aaebSkboyarinov             while (index != items_per_page) {
2908155aaebSkboyarinov                 if (curr_page->mask.load(std::memory_order_relaxed) & (std::uintptr_t(1) << index)) {
2918155aaebSkboyarinov                     page_allocator_traits::destroy(page_allocator, &curr_page->operator[](index));
29249e08aacStbbdev                 }
2938155aaebSkboyarinov                 ++index;
2948155aaebSkboyarinov             }
2958155aaebSkboyarinov 
2968155aaebSkboyarinov             index = 0;
29749e08aacStbbdev             padded_page* next_page = curr_page->next;
29849e08aacStbbdev             page_allocator_traits::destroy(page_allocator, curr_page);
29949e08aacStbbdev             page_allocator_traits::deallocate(page_allocator, curr_page, 1);
30049e08aacStbbdev             curr_page = next_page;
30149e08aacStbbdev         }
3028155aaebSkboyarinov         head_counter.store(0, std::memory_order_relaxed);
3038155aaebSkboyarinov         tail_counter.store(0, std::memory_order_relaxed);
3048155aaebSkboyarinov         head_page.store(new_head, std::memory_order_relaxed);
3058155aaebSkboyarinov         tail_page.store(new_tail, std::memory_order_relaxed);
3068155aaebSkboyarinov     }
30749e08aacStbbdev 
clear_and_invalidate(queue_allocator_type & allocator)3088155aaebSkboyarinov     void clear_and_invalidate(queue_allocator_type& allocator) {
30949e08aacStbbdev         padded_page* invalid_page = reinterpret_cast<padded_page*>(std::uintptr_t(1));
3108155aaebSkboyarinov         clear(allocator, invalid_page, invalid_page);
31149e08aacStbbdev     }
31249e08aacStbbdev 
31349e08aacStbbdev private:
31449e08aacStbbdev     // template <typename U, typename A>
31549e08aacStbbdev     friend class micro_queue_pop_finalizer<self_type, value_type, page_allocator_type>;
31649e08aacStbbdev 
31749e08aacStbbdev     // Class used to ensure exception-safety of method "pop"
31849e08aacStbbdev     class destroyer  {
31949e08aacStbbdev         value_type& my_value;
32049e08aacStbbdev     public:
destroyer(reference value)32149e08aacStbbdev         destroyer( reference value ) : my_value(value) {}
32249e08aacStbbdev         destroyer( const destroyer& ) = delete;
32349e08aacStbbdev         destroyer& operator=( const destroyer& ) = delete;
~destroyer()32449e08aacStbbdev         ~destroyer() {my_value.~T();}
32549e08aacStbbdev     }; // class destroyer
32649e08aacStbbdev 
copy_item(padded_page & dst,size_type dindex,const padded_page & src,size_type sindex,item_constructor_type construct_item)32749e08aacStbbdev     void copy_item( padded_page& dst, size_type dindex, const padded_page& src, size_type sindex,
32849e08aacStbbdev         item_constructor_type construct_item )
32949e08aacStbbdev     {
33049e08aacStbbdev         auto& src_item = src[sindex];
33149e08aacStbbdev         construct_item( &dst[dindex], static_cast<const void*>(&src_item) );
33249e08aacStbbdev     }
33349e08aacStbbdev 
assign_and_destroy_item(void * dst,padded_page & src,size_type index)33449e08aacStbbdev     void assign_and_destroy_item( void* dst, padded_page& src, size_type index ) {
33549e08aacStbbdev         auto& from = src[index];
33649e08aacStbbdev         destroyer d(from);
33749e08aacStbbdev         *static_cast<T*>(dst) = std::move(from);
33849e08aacStbbdev     }
33949e08aacStbbdev 
spin_wait_until_my_turn(std::atomic<ticket_type> & counter,ticket_type k,queue_rep_type & rb)34049e08aacStbbdev     void spin_wait_until_my_turn( std::atomic<ticket_type>& counter, ticket_type k, queue_rep_type& rb ) const {
341*eac94650SAlex         for (atomic_backoff b{};; b.pause()) {
342*eac94650SAlex             ticket_type c = counter.load(std::memory_order_acquire);
34349e08aacStbbdev             if (c == k) return;
34449e08aacStbbdev             else if (c & 1) {
34549e08aacStbbdev                 ++rb.n_invalid_entries;
34649e08aacStbbdev                 throw_exception( exception_id::bad_last_alloc);
34749e08aacStbbdev             }
34849e08aacStbbdev         }
34949e08aacStbbdev     }
35049e08aacStbbdev 
35149e08aacStbbdev     std::atomic<padded_page*> head_page{};
35249e08aacStbbdev     std::atomic<ticket_type> head_counter{};
35349e08aacStbbdev 
35449e08aacStbbdev     std::atomic<padded_page*> tail_page{};
35549e08aacStbbdev     std::atomic<ticket_type> tail_counter{};
35649e08aacStbbdev 
35749e08aacStbbdev     spin_mutex page_mutex{};
35849e08aacStbbdev }; // class micro_queue
35949e08aacStbbdev 
36049e08aacStbbdev #if _MSC_VER && !defined(__INTEL_COMPILER)
36149e08aacStbbdev #pragma warning( pop )
36249e08aacStbbdev #endif // warning 4146 is back
36349e08aacStbbdev 
36449e08aacStbbdev template <typename Container, typename T, typename Allocator>
36549e08aacStbbdev class micro_queue_pop_finalizer {
36649e08aacStbbdev public:
36749e08aacStbbdev     using padded_page = typename Container::padded_page;
36849e08aacStbbdev     using allocator_type = Allocator;
36949e08aacStbbdev     using allocator_traits_type = tbb::detail::allocator_traits<allocator_type>;
37049e08aacStbbdev 
micro_queue_pop_finalizer(Container & queue,Allocator & alloc,ticket_type k,padded_page * p)37149e08aacStbbdev     micro_queue_pop_finalizer( Container& queue, Allocator& alloc, ticket_type k, padded_page* p ) :
37249e08aacStbbdev         my_ticket_type(k), my_queue(queue), my_page(p), allocator(alloc)
37349e08aacStbbdev     {}
37449e08aacStbbdev 
37549e08aacStbbdev     micro_queue_pop_finalizer( const micro_queue_pop_finalizer& ) = delete;
37649e08aacStbbdev     micro_queue_pop_finalizer& operator=( const micro_queue_pop_finalizer& ) = delete;
37749e08aacStbbdev 
~micro_queue_pop_finalizer()37849e08aacStbbdev     ~micro_queue_pop_finalizer() {
37949e08aacStbbdev         padded_page* p = my_page;
38049e08aacStbbdev         if( is_valid_page(p) ) {
38149e08aacStbbdev             spin_mutex::scoped_lock lock( my_queue.page_mutex );
38249e08aacStbbdev             padded_page* q = p->next;
383*eac94650SAlex             my_queue.head_page.store(q, std::memory_order_relaxed);
38449e08aacStbbdev             if( !is_valid_page(q) ) {
385*eac94650SAlex                 my_queue.tail_page.store(nullptr, std::memory_order_relaxed);
38649e08aacStbbdev             }
38749e08aacStbbdev         }
388478de5b1Stbbdev         my_queue.head_counter.store(my_ticket_type, std::memory_order_release);
38949e08aacStbbdev         if ( is_valid_page(p) ) {
39049e08aacStbbdev             allocator_traits_type::destroy(allocator, static_cast<padded_page*>(p));
39149e08aacStbbdev             allocator_traits_type::deallocate(allocator, static_cast<padded_page*>(p), 1);
39249e08aacStbbdev         }
39349e08aacStbbdev     }
39449e08aacStbbdev private:
39549e08aacStbbdev     ticket_type my_ticket_type;
39649e08aacStbbdev     Container& my_queue;
39749e08aacStbbdev     padded_page* my_page;
39849e08aacStbbdev     Allocator& allocator;
39949e08aacStbbdev }; // class micro_queue_pop_finalizer
40049e08aacStbbdev 
401b15aabb3Stbbdev #if _MSC_VER && !defined(__INTEL_COMPILER)
402b15aabb3Stbbdev // structure was padded due to alignment specifier
403b15aabb3Stbbdev #pragma warning( push )
404b15aabb3Stbbdev #pragma warning( disable: 4324 )
405b15aabb3Stbbdev #endif
406b15aabb3Stbbdev 
40749e08aacStbbdev template <typename T, typename Allocator>
40849e08aacStbbdev struct concurrent_queue_rep {
40949e08aacStbbdev     using self_type = concurrent_queue_rep<T, Allocator>;
41049e08aacStbbdev     using size_type = std::size_t;
41149e08aacStbbdev     using micro_queue_type = micro_queue<T, Allocator>;
41249e08aacStbbdev     using allocator_type = Allocator;
41349e08aacStbbdev     using allocator_traits_type = tbb::detail::allocator_traits<allocator_type>;
41449e08aacStbbdev     using padded_page = typename micro_queue_type::padded_page;
41549e08aacStbbdev     using page_allocator_type = typename micro_queue_type::page_allocator_type;
41649e08aacStbbdev     using item_constructor_type = typename micro_queue_type::item_constructor_type;
41749e08aacStbbdev private:
41849e08aacStbbdev     using page_allocator_traits = tbb::detail::allocator_traits<page_allocator_type>;
41949e08aacStbbdev     using queue_allocator_type = typename allocator_traits_type::template rebind_alloc<self_type>;
42049e08aacStbbdev 
42149e08aacStbbdev public:
42249e08aacStbbdev     // must be power of 2
42349e08aacStbbdev     static constexpr size_type n_queue = 8;
42449e08aacStbbdev     // Approximately n_queue/golden ratio
42549e08aacStbbdev     static constexpr size_type phi = 3;
42649e08aacStbbdev     static constexpr size_type item_size = micro_queue_type::item_size;
42749e08aacStbbdev     static constexpr size_type items_per_page = micro_queue_type::items_per_page;
42849e08aacStbbdev 
concurrent_queue_repconcurrent_queue_rep429fbc48b39Svlserov     concurrent_queue_rep() {}
43049e08aacStbbdev 
43149e08aacStbbdev     concurrent_queue_rep( const concurrent_queue_rep& ) = delete;
43249e08aacStbbdev     concurrent_queue_rep& operator=( const concurrent_queue_rep& ) = delete;
43349e08aacStbbdev 
clearconcurrent_queue_rep434fbc48b39Svlserov     void clear( queue_allocator_type& alloc ) {
4358155aaebSkboyarinov         for (size_type index = 0; index < n_queue; ++index) {
4368155aaebSkboyarinov             array[index].clear(alloc);
43749e08aacStbbdev         }
4388155aaebSkboyarinov         head_counter.store(0, std::memory_order_relaxed);
4398155aaebSkboyarinov         tail_counter.store(0, std::memory_order_relaxed);
4408155aaebSkboyarinov         n_invalid_entries.store(0, std::memory_order_relaxed);
44149e08aacStbbdev     }
44249e08aacStbbdev 
assignconcurrent_queue_rep443fbc48b39Svlserov     void assign( const concurrent_queue_rep& src, queue_allocator_type& alloc, item_constructor_type construct_item ) {
44449e08aacStbbdev         head_counter.store(src.head_counter.load(std::memory_order_relaxed), std::memory_order_relaxed);
44549e08aacStbbdev         tail_counter.store(src.tail_counter.load(std::memory_order_relaxed), std::memory_order_relaxed);
44649e08aacStbbdev         n_invalid_entries.store(src.n_invalid_entries.load(std::memory_order_relaxed), std::memory_order_relaxed);
44749e08aacStbbdev 
44849e08aacStbbdev         // copy or move micro_queues
44949e08aacStbbdev         size_type queue_idx = 0;
45049e08aacStbbdev         try_call( [&] {
45149e08aacStbbdev             for (; queue_idx < n_queue; ++queue_idx) {
452fbc48b39Svlserov                 array[queue_idx].assign(src.array[queue_idx], alloc, construct_item);
45349e08aacStbbdev             }
45449e08aacStbbdev         }).on_exception( [&] {
45549e08aacStbbdev             for (size_type i = 0; i < queue_idx + 1; ++i) {
4568155aaebSkboyarinov                 array[i].clear_and_invalidate(alloc);
45749e08aacStbbdev             }
45849e08aacStbbdev             head_counter.store(0, std::memory_order_relaxed);
45949e08aacStbbdev             tail_counter.store(0, std::memory_order_relaxed);
46049e08aacStbbdev             n_invalid_entries.store(0, std::memory_order_relaxed);
46149e08aacStbbdev         });
46249e08aacStbbdev 
46349e08aacStbbdev         __TBB_ASSERT(head_counter.load(std::memory_order_relaxed) == src.head_counter.load(std::memory_order_relaxed) &&
46449e08aacStbbdev                      tail_counter.load(std::memory_order_relaxed) == src.tail_counter.load(std::memory_order_relaxed),
46549e08aacStbbdev                      "the source concurrent queue should not be concurrently modified." );
46649e08aacStbbdev     }
46749e08aacStbbdev 
emptyconcurrent_queue_rep46849e08aacStbbdev     bool empty() const {
46949e08aacStbbdev         ticket_type tc = tail_counter.load(std::memory_order_acquire);
47049e08aacStbbdev         ticket_type hc = head_counter.load(std::memory_order_relaxed);
47149e08aacStbbdev         // if tc!=r.tail_counter, the queue was not empty at some point between the two reads.
47249e08aacStbbdev         return tc == tail_counter.load(std::memory_order_relaxed) &&
47349e08aacStbbdev                std::ptrdiff_t(tc - hc - n_invalid_entries.load(std::memory_order_relaxed)) <= 0;
47449e08aacStbbdev     }
47549e08aacStbbdev 
sizeconcurrent_queue_rep47649e08aacStbbdev     std::ptrdiff_t size() const {
47757f524caSIlya Isaev         __TBB_ASSERT(sizeof(std::ptrdiff_t) <= sizeof(size_type), nullptr);
47849e08aacStbbdev         std::ptrdiff_t hc = head_counter.load(std::memory_order_acquire);
47949e08aacStbbdev         std::ptrdiff_t tc = tail_counter.load(std::memory_order_relaxed);
48049e08aacStbbdev         std::ptrdiff_t nie = n_invalid_entries.load(std::memory_order_relaxed);
48149e08aacStbbdev 
48249e08aacStbbdev         return tc - hc - nie;
48349e08aacStbbdev     }
48449e08aacStbbdev 
48549e08aacStbbdev     friend class micro_queue<T, Allocator>;
48649e08aacStbbdev 
48749e08aacStbbdev     // Map ticket_type to an array index
indexconcurrent_queue_rep48849e08aacStbbdev     static size_type index( ticket_type k ) {
48949e08aacStbbdev         return k * phi % n_queue;
49049e08aacStbbdev     }
49149e08aacStbbdev 
chooseconcurrent_queue_rep49249e08aacStbbdev     micro_queue_type& choose( ticket_type k ) {
49349e08aacStbbdev         // The formula here approximates LRU in a cache-oblivious way.
49449e08aacStbbdev         return array[index(k)];
49549e08aacStbbdev     }
49649e08aacStbbdev 
49749e08aacStbbdev     alignas(max_nfs_size) micro_queue_type array[n_queue];
49849e08aacStbbdev 
alignasconcurrent_queue_rep49949e08aacStbbdev     alignas(max_nfs_size) std::atomic<ticket_type> head_counter{};
alignasconcurrent_queue_rep50049e08aacStbbdev     alignas(max_nfs_size) std::atomic<ticket_type> tail_counter{};
alignasconcurrent_queue_rep50149e08aacStbbdev     alignas(max_nfs_size) std::atomic<size_type> n_invalid_entries{};
50249e08aacStbbdev }; // class concurrent_queue_rep
50349e08aacStbbdev 
504b15aabb3Stbbdev #if _MSC_VER && !defined(__INTEL_COMPILER)
505b15aabb3Stbbdev #pragma warning( pop )
506b15aabb3Stbbdev #endif
507b15aabb3Stbbdev 
50849e08aacStbbdev template <typename Value, typename Allocator>
50949e08aacStbbdev class concurrent_queue_iterator_base {
51049e08aacStbbdev     using queue_rep_type = concurrent_queue_rep<Value, Allocator>;
51149e08aacStbbdev     using padded_page = typename queue_rep_type::padded_page;
51249e08aacStbbdev protected:
51349e08aacStbbdev     concurrent_queue_iterator_base() = default;
51449e08aacStbbdev 
concurrent_queue_iterator_base(const concurrent_queue_iterator_base & other)51549e08aacStbbdev     concurrent_queue_iterator_base( const concurrent_queue_iterator_base& other ) {
51649e08aacStbbdev         assign(other);
51749e08aacStbbdev     }
51849e08aacStbbdev 
concurrent_queue_iterator_base(queue_rep_type * queue_rep)51949e08aacStbbdev     concurrent_queue_iterator_base( queue_rep_type* queue_rep )
52049e08aacStbbdev         : my_queue_rep(queue_rep),
52149e08aacStbbdev           my_head_counter(my_queue_rep->head_counter.load(std::memory_order_relaxed))
52249e08aacStbbdev     {
52349e08aacStbbdev         for (std::size_t i = 0; i < queue_rep_type::n_queue; ++i) {
52449e08aacStbbdev             my_array[i] = my_queue_rep->array[i].get_head_page();
52549e08aacStbbdev         }
52649e08aacStbbdev 
52749e08aacStbbdev         if (!get_item(my_item, my_head_counter)) advance();
52849e08aacStbbdev     }
52949e08aacStbbdev 
assign(const concurrent_queue_iterator_base & other)53049e08aacStbbdev     void assign( const concurrent_queue_iterator_base& other ) {
53149e08aacStbbdev         my_item = other.my_item;
53249e08aacStbbdev         my_queue_rep = other.my_queue_rep;
53349e08aacStbbdev 
53449e08aacStbbdev         if (my_queue_rep != nullptr) {
53549e08aacStbbdev             my_head_counter = other.my_head_counter;
53649e08aacStbbdev 
53749e08aacStbbdev             for (std::size_t i = 0; i < queue_rep_type::n_queue; ++i) {
53849e08aacStbbdev                 my_array[i] = other.my_array[i];
53949e08aacStbbdev             }
54049e08aacStbbdev         }
54149e08aacStbbdev     }
54249e08aacStbbdev 
advance()54349e08aacStbbdev     void advance() {
54449e08aacStbbdev         __TBB_ASSERT(my_item, "Attempt to increment iterator past end of the queue");
54549e08aacStbbdev         std::size_t k = my_head_counter;
54649e08aacStbbdev #if TBB_USE_ASSERT
54749e08aacStbbdev         Value* tmp;
54849e08aacStbbdev         get_item(tmp, k);
54949e08aacStbbdev         __TBB_ASSERT(my_item == tmp, nullptr);
55049e08aacStbbdev #endif
55149e08aacStbbdev         std::size_t i = modulo_power_of_two(k / queue_rep_type::n_queue, my_queue_rep->items_per_page);
55249e08aacStbbdev         if (i == my_queue_rep->items_per_page - 1) {
55349e08aacStbbdev             padded_page*& root = my_array[queue_rep_type::index(k)];
55449e08aacStbbdev             root = root->next;
55549e08aacStbbdev         }
55649e08aacStbbdev         // Advance k
55749e08aacStbbdev         my_head_counter = ++k;
55849e08aacStbbdev         if (!get_item(my_item, k)) advance();
55949e08aacStbbdev     }
56049e08aacStbbdev 
56149e08aacStbbdev     concurrent_queue_iterator_base& operator=( const concurrent_queue_iterator_base& other ) {
56249e08aacStbbdev         this->assign(other);
56349e08aacStbbdev         return *this;
56449e08aacStbbdev     }
56549e08aacStbbdev 
get_item(Value * & item,std::size_t k)56649e08aacStbbdev     bool get_item( Value*& item, std::size_t k ) {
56749e08aacStbbdev         if (k == my_queue_rep->tail_counter.load(std::memory_order_relaxed)) {
56849e08aacStbbdev             item = nullptr;
56949e08aacStbbdev             return true;
57049e08aacStbbdev         } else {
57149e08aacStbbdev             padded_page* p = my_array[queue_rep_type::index(k)];
57249e08aacStbbdev             __TBB_ASSERT(p, nullptr);
57349e08aacStbbdev             std::size_t i = modulo_power_of_two(k / queue_rep_type::n_queue, my_queue_rep->items_per_page);
57449e08aacStbbdev             item = &(*p)[i];
57549e08aacStbbdev             return (p->mask & uintptr_t(1) << i) != 0;
57649e08aacStbbdev         }
57749e08aacStbbdev     }
57849e08aacStbbdev 
57949e08aacStbbdev     Value* my_item{ nullptr };
58049e08aacStbbdev     queue_rep_type* my_queue_rep{ nullptr };
58149e08aacStbbdev     ticket_type my_head_counter{};
582478de5b1Stbbdev     padded_page* my_array[queue_rep_type::n_queue]{};
58349e08aacStbbdev }; // class concurrent_queue_iterator_base
58449e08aacStbbdev 
58549e08aacStbbdev struct concurrent_queue_iterator_provider {
58649e08aacStbbdev     template <typename Iterator, typename Container>
getconcurrent_queue_iterator_provider58749e08aacStbbdev     static Iterator get( const Container& container ) {
58849e08aacStbbdev         return Iterator(container);
58949e08aacStbbdev     }
59049e08aacStbbdev }; // struct concurrent_queue_iterator_provider
59149e08aacStbbdev 
59249e08aacStbbdev template <typename Container, typename Value, typename Allocator>
59349e08aacStbbdev class concurrent_queue_iterator : public concurrent_queue_iterator_base<typename std::remove_cv<Value>::type, Allocator> {
59449e08aacStbbdev     using base_type = concurrent_queue_iterator_base<typename std::remove_cv<Value>::type, Allocator>;
59549e08aacStbbdev public:
59649e08aacStbbdev     using value_type = Value;
59749e08aacStbbdev     using pointer = value_type*;
59849e08aacStbbdev     using reference = value_type&;
59949e08aacStbbdev     using difference_type = std::ptrdiff_t;
60049e08aacStbbdev     using iterator_category = std::forward_iterator_tag;
60149e08aacStbbdev 
60249e08aacStbbdev     concurrent_queue_iterator() = default;
60349e08aacStbbdev 
60449e08aacStbbdev     /** If Value==Container::value_type, then this routine is the copy constructor.
60549e08aacStbbdev         If Value==const Container::value_type, then this routine is a conversion constructor. */
concurrent_queue_iterator(const concurrent_queue_iterator<Container,typename Container::value_type,Allocator> & other)60649e08aacStbbdev     concurrent_queue_iterator( const concurrent_queue_iterator<Container, typename Container::value_type, Allocator>& other )
60749e08aacStbbdev         : base_type(other) {}
60849e08aacStbbdev 
60949e08aacStbbdev private:
concurrent_queue_iterator(const Container & container)61049e08aacStbbdev     concurrent_queue_iterator( const Container& container )
61149e08aacStbbdev         : base_type(container.my_queue_representation) {}
61249e08aacStbbdev public:
61349e08aacStbbdev     concurrent_queue_iterator& operator=( const concurrent_queue_iterator<Container, typename Container::value_type, Allocator>& other ) {
61449e08aacStbbdev         this->assign(other);
61549e08aacStbbdev         return *this;
61649e08aacStbbdev     }
61749e08aacStbbdev 
61849e08aacStbbdev     reference operator*() const {
61949e08aacStbbdev         return *static_cast<pointer>(this->my_item);
62049e08aacStbbdev     }
62149e08aacStbbdev 
62249e08aacStbbdev     pointer operator->() const { return &operator*(); }
62349e08aacStbbdev 
62449e08aacStbbdev     concurrent_queue_iterator& operator++() {
62549e08aacStbbdev         this->advance();
62649e08aacStbbdev         return *this;
62749e08aacStbbdev     }
62849e08aacStbbdev 
62949e08aacStbbdev     concurrent_queue_iterator operator++(int) {
63049e08aacStbbdev         concurrent_queue_iterator tmp = *this;
63149e08aacStbbdev         ++*this;
63249e08aacStbbdev         return tmp;
63349e08aacStbbdev     }
63449e08aacStbbdev 
63549e08aacStbbdev     friend bool operator==( const concurrent_queue_iterator& lhs, const concurrent_queue_iterator& rhs ) {
63649e08aacStbbdev         return lhs.my_item == rhs.my_item;
63749e08aacStbbdev     }
63849e08aacStbbdev 
63949e08aacStbbdev     friend bool operator!=( const concurrent_queue_iterator& lhs, const concurrent_queue_iterator& rhs ) {
64049e08aacStbbdev         return lhs.my_item != rhs.my_item;
64149e08aacStbbdev     }
64249e08aacStbbdev private:
64349e08aacStbbdev     friend struct concurrent_queue_iterator_provider;
64449e08aacStbbdev }; // class concurrent_queue_iterator
64549e08aacStbbdev 
646fbc48b39Svlserov } // namespace d2
64749e08aacStbbdev } // namespace detail
64849e08aacStbbdev } // tbb
64949e08aacStbbdev 
65049e08aacStbbdev #endif // __TBB_detail__concurrent_queue_base_H
651