149e08aacStbbdev /*
2c21e688aSSergey Zheltov Copyright (c) 2005-2022 Intel Corporation
349e08aacStbbdev
449e08aacStbbdev Licensed under the Apache License, Version 2.0 (the "License");
549e08aacStbbdev you may not use this file except in compliance with the License.
649e08aacStbbdev You may obtain a copy of the License at
749e08aacStbbdev
849e08aacStbbdev http://www.apache.org/licenses/LICENSE-2.0
949e08aacStbbdev
1049e08aacStbbdev Unless required by applicable law or agreed to in writing, software
1149e08aacStbbdev distributed under the License is distributed on an "AS IS" BASIS,
1249e08aacStbbdev WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1349e08aacStbbdev See the License for the specific language governing permissions and
1449e08aacStbbdev limitations under the License.
1549e08aacStbbdev */
1649e08aacStbbdev
1749e08aacStbbdev #ifndef __TBB_detail__concurrent_queue_base_H
1849e08aacStbbdev #define __TBB_detail__concurrent_queue_base_H
1949e08aacStbbdev
2049e08aacStbbdev #include "_utils.h"
2149e08aacStbbdev #include "_exception.h"
2249e08aacStbbdev #include "_machine.h"
2349e08aacStbbdev #include "_allocator_traits.h"
2449e08aacStbbdev
2549e08aacStbbdev #include "../profiling.h"
2649e08aacStbbdev #include "../spin_mutex.h"
2749e08aacStbbdev #include "../cache_aligned_allocator.h"
2849e08aacStbbdev
2949e08aacStbbdev #include <atomic>
3049e08aacStbbdev
3149e08aacStbbdev namespace tbb {
3249e08aacStbbdev namespace detail {
33fbc48b39Svlserov namespace d2 {
3449e08aacStbbdev
3549e08aacStbbdev using ticket_type = std::size_t;
3649e08aacStbbdev
3749e08aacStbbdev template <typename Page>
is_valid_page(const Page p)3849e08aacStbbdev inline bool is_valid_page(const Page p) {
3949e08aacStbbdev return reinterpret_cast<std::uintptr_t>(p) > 1;
4049e08aacStbbdev }
4149e08aacStbbdev
4249e08aacStbbdev template <typename T, typename Allocator>
4349e08aacStbbdev struct concurrent_queue_rep;
4449e08aacStbbdev
4549e08aacStbbdev template <typename Container, typename T, typename Allocator>
4649e08aacStbbdev class micro_queue_pop_finalizer;
4749e08aacStbbdev
4849e08aacStbbdev #if _MSC_VER && !defined(__INTEL_COMPILER)
4949e08aacStbbdev // unary minus operator applied to unsigned type, result still unsigned
5049e08aacStbbdev #pragma warning( push )
5149e08aacStbbdev #pragma warning( disable: 4146 )
5249e08aacStbbdev #endif
5349e08aacStbbdev
5449e08aacStbbdev // A queue using simple locking.
5549e08aacStbbdev // For efficiency, this class has no constructor.
5649e08aacStbbdev // The caller is expected to zero-initialize it.
5749e08aacStbbdev template <typename T, typename Allocator>
5849e08aacStbbdev class micro_queue {
5949e08aacStbbdev private:
6049e08aacStbbdev using queue_rep_type = concurrent_queue_rep<T, Allocator>;
6149e08aacStbbdev using self_type = micro_queue<T, Allocator>;
6249e08aacStbbdev public:
6349e08aacStbbdev using size_type = std::size_t;
6449e08aacStbbdev using value_type = T;
6549e08aacStbbdev using reference = value_type&;
6649e08aacStbbdev using const_reference = const value_type&;
6749e08aacStbbdev
6849e08aacStbbdev using allocator_type = Allocator;
6949e08aacStbbdev using allocator_traits_type = tbb::detail::allocator_traits<allocator_type>;
70fbc48b39Svlserov using queue_allocator_type = typename allocator_traits_type::template rebind_alloc<queue_rep_type>;
7149e08aacStbbdev
7249e08aacStbbdev static constexpr size_type item_size = sizeof(T);
7349e08aacStbbdev static constexpr size_type items_per_page = item_size <= 8 ? 32 :
7449e08aacStbbdev item_size <= 16 ? 16 :
7549e08aacStbbdev item_size <= 32 ? 8 :
7649e08aacStbbdev item_size <= 64 ? 4 :
7749e08aacStbbdev item_size <= 128 ? 2 : 1;
7849e08aacStbbdev
7949e08aacStbbdev struct padded_page {
padded_pagepadded_page8049e08aacStbbdev padded_page() {}
~padded_pagepadded_page8149e08aacStbbdev ~padded_page() {}
8249e08aacStbbdev
8349e08aacStbbdev reference operator[] (std::size_t index) {
8449e08aacStbbdev __TBB_ASSERT(index < items_per_page, "Index out of range");
8549e08aacStbbdev return items[index];
8649e08aacStbbdev }
8749e08aacStbbdev
8849e08aacStbbdev const_reference operator[] (std::size_t index) const {
8949e08aacStbbdev __TBB_ASSERT(index < items_per_page, "Index out of range");
9049e08aacStbbdev return items[index];
9149e08aacStbbdev }
9249e08aacStbbdev
9349e08aacStbbdev padded_page* next{ nullptr };
9449e08aacStbbdev std::atomic<std::uintptr_t> mask{};
9549e08aacStbbdev
9649e08aacStbbdev union {
9749e08aacStbbdev value_type items[items_per_page];
9849e08aacStbbdev };
9949e08aacStbbdev }; // struct padded_page
10049e08aacStbbdev
10149e08aacStbbdev using page_allocator_type = typename allocator_traits_type::template rebind_alloc<padded_page>;
10249e08aacStbbdev protected:
10349e08aacStbbdev using page_allocator_traits = tbb::detail::allocator_traits<page_allocator_type>;
10449e08aacStbbdev
10549e08aacStbbdev public:
10649e08aacStbbdev using item_constructor_type = void (*)(value_type* location, const void* src);
10749e08aacStbbdev micro_queue() = default;
10849e08aacStbbdev micro_queue( const micro_queue& ) = delete;
10949e08aacStbbdev micro_queue& operator=( const micro_queue& ) = delete;
11049e08aacStbbdev
prepare_page(ticket_type k,queue_rep_type & base,page_allocator_type page_allocator,padded_page * & p)11149e08aacStbbdev size_type prepare_page( ticket_type k, queue_rep_type& base, page_allocator_type page_allocator,
11249e08aacStbbdev padded_page*& p ) {
11349e08aacStbbdev __TBB_ASSERT(p == nullptr, "Invalid page argument for prepare_page");
11449e08aacStbbdev k &= -queue_rep_type::n_queue;
11549e08aacStbbdev size_type index = modulo_power_of_two(k / queue_rep_type::n_queue, items_per_page);
11649e08aacStbbdev if (!index) {
11749e08aacStbbdev try_call( [&] {
11849e08aacStbbdev p = page_allocator_traits::allocate(page_allocator, 1);
11949e08aacStbbdev }).on_exception( [&] {
12049e08aacStbbdev ++base.n_invalid_entries;
12149e08aacStbbdev invalidate_page( k );
12249e08aacStbbdev });
12349e08aacStbbdev page_allocator_traits::construct(page_allocator, p);
12449e08aacStbbdev }
12549e08aacStbbdev
126*eac94650SAlex spin_wait_until_my_turn(tail_counter, k, base);
127fbc48b39Svlserov d1::call_itt_notify(d1::acquired, &tail_counter);
12849e08aacStbbdev
12949e08aacStbbdev if (p) {
13049e08aacStbbdev spin_mutex::scoped_lock lock( page_mutex );
13149e08aacStbbdev padded_page* q = tail_page.load(std::memory_order_relaxed);
13249e08aacStbbdev if (is_valid_page(q)) {
13349e08aacStbbdev q->next = p;
13449e08aacStbbdev } else {
13549e08aacStbbdev head_page.store(p, std::memory_order_relaxed);
13649e08aacStbbdev }
137*eac94650SAlex tail_page.store(p, std::memory_order_relaxed);
13849e08aacStbbdev } else {
139*eac94650SAlex p = tail_page.load(std::memory_order_relaxed);
14049e08aacStbbdev }
14149e08aacStbbdev return index;
14249e08aacStbbdev }
14349e08aacStbbdev
14449e08aacStbbdev template<typename... Args>
push(ticket_type k,queue_rep_type & base,queue_allocator_type & allocator,Args &&...args)145fbc48b39Svlserov void push( ticket_type k, queue_rep_type& base, queue_allocator_type& allocator, Args&&... args )
14649e08aacStbbdev {
14749e08aacStbbdev padded_page* p = nullptr;
148fbc48b39Svlserov page_allocator_type page_allocator(allocator);
14949e08aacStbbdev size_type index = prepare_page(k, base, page_allocator, p);
15049e08aacStbbdev __TBB_ASSERT(p != nullptr, "Page was not prepared");
15149e08aacStbbdev
15249e08aacStbbdev // try_call API is not convenient here due to broken
15349e08aacStbbdev // variadic capture on GCC 4.8.5
15449e08aacStbbdev auto value_guard = make_raii_guard([&] {
15549e08aacStbbdev ++base.n_invalid_entries;
156fbc48b39Svlserov d1::call_itt_notify(d1::releasing, &tail_counter);
15749e08aacStbbdev tail_counter.fetch_add(queue_rep_type::n_queue);
15849e08aacStbbdev });
15949e08aacStbbdev
16049e08aacStbbdev page_allocator_traits::construct(page_allocator, &(*p)[index], std::forward<Args>(args)...);
16149e08aacStbbdev // If no exception was thrown, mark item as present.
16249e08aacStbbdev p->mask.store(p->mask.load(std::memory_order_relaxed) | uintptr_t(1) << index, std::memory_order_relaxed);
163fbc48b39Svlserov d1::call_itt_notify(d1::releasing, &tail_counter);
16449e08aacStbbdev
16549e08aacStbbdev value_guard.dismiss();
16649e08aacStbbdev tail_counter.fetch_add(queue_rep_type::n_queue);
16749e08aacStbbdev }
16849e08aacStbbdev
abort_push(ticket_type k,queue_rep_type & base,queue_allocator_type & allocator)169fbc48b39Svlserov void abort_push( ticket_type k, queue_rep_type& base, queue_allocator_type& allocator ) {
17049e08aacStbbdev padded_page* p = nullptr;
171fbc48b39Svlserov prepare_page(k, base, allocator, p);
17249e08aacStbbdev ++base.n_invalid_entries;
17349e08aacStbbdev tail_counter.fetch_add(queue_rep_type::n_queue);
17449e08aacStbbdev }
17549e08aacStbbdev
pop(void * dst,ticket_type k,queue_rep_type & base,queue_allocator_type & allocator)176fbc48b39Svlserov bool pop( void* dst, ticket_type k, queue_rep_type& base, queue_allocator_type& allocator ) {
17749e08aacStbbdev k &= -queue_rep_type::n_queue;
178478de5b1Stbbdev spin_wait_until_eq(head_counter, k);
179fbc48b39Svlserov d1::call_itt_notify(d1::acquired, &head_counter);
180478de5b1Stbbdev spin_wait_while_eq(tail_counter, k);
181fbc48b39Svlserov d1::call_itt_notify(d1::acquired, &tail_counter);
182*eac94650SAlex padded_page *p = head_page.load(std::memory_order_relaxed);
18349e08aacStbbdev __TBB_ASSERT( p, nullptr );
18449e08aacStbbdev size_type index = modulo_power_of_two( k/queue_rep_type::n_queue, items_per_page );
18549e08aacStbbdev bool success = false;
18649e08aacStbbdev {
187fbc48b39Svlserov page_allocator_type page_allocator(allocator);
18849e08aacStbbdev micro_queue_pop_finalizer<self_type, value_type, page_allocator_type> finalizer(*this, page_allocator,
18949e08aacStbbdev k + queue_rep_type::n_queue, index == items_per_page - 1 ? p : nullptr );
19049e08aacStbbdev if (p->mask.load(std::memory_order_relaxed) & (std::uintptr_t(1) << index)) {
19149e08aacStbbdev success = true;
19249e08aacStbbdev assign_and_destroy_item(dst, *p, index);
19349e08aacStbbdev } else {
19449e08aacStbbdev --base.n_invalid_entries;
19549e08aacStbbdev }
19649e08aacStbbdev }
19749e08aacStbbdev return success;
19849e08aacStbbdev }
19949e08aacStbbdev
assign(const micro_queue & src,queue_allocator_type & allocator,item_constructor_type construct_item)200fbc48b39Svlserov micro_queue& assign( const micro_queue& src, queue_allocator_type& allocator,
20149e08aacStbbdev item_constructor_type construct_item )
20249e08aacStbbdev {
20349e08aacStbbdev head_counter.store(src.head_counter.load(std::memory_order_relaxed), std::memory_order_relaxed);
20449e08aacStbbdev tail_counter.store(src.tail_counter.load(std::memory_order_relaxed), std::memory_order_relaxed);
20549e08aacStbbdev
20649e08aacStbbdev const padded_page* srcp = src.head_page.load(std::memory_order_relaxed);
20749e08aacStbbdev if( is_valid_page(srcp) ) {
20849e08aacStbbdev ticket_type g_index = head_counter.load(std::memory_order_relaxed);
20949e08aacStbbdev size_type n_items = (tail_counter.load(std::memory_order_relaxed) - head_counter.load(std::memory_order_relaxed))
21049e08aacStbbdev / queue_rep_type::n_queue;
21149e08aacStbbdev size_type index = modulo_power_of_two(head_counter.load(std::memory_order_relaxed) / queue_rep_type::n_queue, items_per_page);
21249e08aacStbbdev size_type end_in_first_page = (index+n_items < items_per_page) ? (index + n_items) : items_per_page;
21349e08aacStbbdev
21449e08aacStbbdev try_call( [&] {
215fbc48b39Svlserov head_page.store(make_copy(allocator, srcp, index, end_in_first_page, g_index, construct_item), std::memory_order_relaxed);
21649e08aacStbbdev }).on_exception( [&] {
21749e08aacStbbdev head_counter.store(0, std::memory_order_relaxed);
21849e08aacStbbdev tail_counter.store(0, std::memory_order_relaxed);
21949e08aacStbbdev });
22049e08aacStbbdev padded_page* cur_page = head_page.load(std::memory_order_relaxed);
22149e08aacStbbdev
22249e08aacStbbdev try_call( [&] {
22349e08aacStbbdev if (srcp != src.tail_page.load(std::memory_order_relaxed)) {
22449e08aacStbbdev for (srcp = srcp->next; srcp != src.tail_page.load(std::memory_order_relaxed); srcp=srcp->next ) {
225fbc48b39Svlserov cur_page->next = make_copy( allocator, srcp, 0, items_per_page, g_index, construct_item );
22649e08aacStbbdev cur_page = cur_page->next;
22749e08aacStbbdev }
22849e08aacStbbdev
22949e08aacStbbdev __TBB_ASSERT(srcp == src.tail_page.load(std::memory_order_relaxed), nullptr );
23049e08aacStbbdev size_type last_index = modulo_power_of_two(tail_counter.load(std::memory_order_relaxed) / queue_rep_type::n_queue, items_per_page);
23149e08aacStbbdev if( last_index==0 ) last_index = items_per_page;
23249e08aacStbbdev
233fbc48b39Svlserov cur_page->next = make_copy( allocator, srcp, 0, last_index, g_index, construct_item );
23449e08aacStbbdev cur_page = cur_page->next;
23549e08aacStbbdev }
23649e08aacStbbdev tail_page.store(cur_page, std::memory_order_relaxed);
23749e08aacStbbdev }).on_exception( [&] {
23849e08aacStbbdev padded_page* invalid_page = reinterpret_cast<padded_page*>(std::uintptr_t(1));
23949e08aacStbbdev tail_page.store(invalid_page, std::memory_order_relaxed);
24049e08aacStbbdev });
24149e08aacStbbdev } else {
24249e08aacStbbdev head_page.store(nullptr, std::memory_order_relaxed);
24349e08aacStbbdev tail_page.store(nullptr, std::memory_order_relaxed);
24449e08aacStbbdev }
24549e08aacStbbdev return *this;
24649e08aacStbbdev }
24749e08aacStbbdev
make_copy(queue_allocator_type & allocator,const padded_page * src_page,size_type begin_in_page,size_type end_in_page,ticket_type & g_index,item_constructor_type construct_item)248fbc48b39Svlserov padded_page* make_copy( queue_allocator_type& allocator, const padded_page* src_page, size_type begin_in_page,
24949e08aacStbbdev size_type end_in_page, ticket_type& g_index, item_constructor_type construct_item )
25049e08aacStbbdev {
251fbc48b39Svlserov page_allocator_type page_allocator(allocator);
25249e08aacStbbdev padded_page* new_page = page_allocator_traits::allocate(page_allocator, 1);
25349e08aacStbbdev new_page->next = nullptr;
25449e08aacStbbdev new_page->mask.store(src_page->mask.load(std::memory_order_relaxed), std::memory_order_relaxed);
25549e08aacStbbdev for (; begin_in_page!=end_in_page; ++begin_in_page, ++g_index) {
25649e08aacStbbdev if (new_page->mask.load(std::memory_order_relaxed) & uintptr_t(1) << begin_in_page) {
25749e08aacStbbdev copy_item(*new_page, begin_in_page, *src_page, begin_in_page, construct_item);
25849e08aacStbbdev }
25949e08aacStbbdev }
26049e08aacStbbdev return new_page;
26149e08aacStbbdev }
26249e08aacStbbdev
invalidate_page(ticket_type k)26349e08aacStbbdev void invalidate_page( ticket_type k ) {
26449e08aacStbbdev // Append an invalid page at address 1 so that no more pushes are allowed.
26549e08aacStbbdev padded_page* invalid_page = reinterpret_cast<padded_page*>(std::uintptr_t(1));
26649e08aacStbbdev {
26749e08aacStbbdev spin_mutex::scoped_lock lock( page_mutex );
26849e08aacStbbdev tail_counter.store(k + queue_rep_type::n_queue + 1, std::memory_order_relaxed);
26949e08aacStbbdev padded_page* q = tail_page.load(std::memory_order_relaxed);
27049e08aacStbbdev if (is_valid_page(q)) {
27149e08aacStbbdev q->next = invalid_page;
27249e08aacStbbdev } else {
27349e08aacStbbdev head_page.store(invalid_page, std::memory_order_relaxed);
27449e08aacStbbdev }
27549e08aacStbbdev tail_page.store(invalid_page, std::memory_order_relaxed);
27649e08aacStbbdev }
27749e08aacStbbdev }
27849e08aacStbbdev
get_head_page()27949e08aacStbbdev padded_page* get_head_page() {
28049e08aacStbbdev return head_page.load(std::memory_order_relaxed);
28149e08aacStbbdev }
28249e08aacStbbdev
2838155aaebSkboyarinov void clear(queue_allocator_type& allocator, padded_page* new_head = nullptr, padded_page* new_tail = nullptr) {
2848155aaebSkboyarinov padded_page* curr_page = get_head_page();
2858155aaebSkboyarinov size_type index = (head_counter.load(std::memory_order_relaxed) / queue_rep_type::n_queue) % items_per_page;
286fbc48b39Svlserov page_allocator_type page_allocator(allocator);
28749e08aacStbbdev
2888155aaebSkboyarinov while (curr_page && is_valid_page(curr_page)) {
2898155aaebSkboyarinov while (index != items_per_page) {
2908155aaebSkboyarinov if (curr_page->mask.load(std::memory_order_relaxed) & (std::uintptr_t(1) << index)) {
2918155aaebSkboyarinov page_allocator_traits::destroy(page_allocator, &curr_page->operator[](index));
29249e08aacStbbdev }
2938155aaebSkboyarinov ++index;
2948155aaebSkboyarinov }
2958155aaebSkboyarinov
2968155aaebSkboyarinov index = 0;
29749e08aacStbbdev padded_page* next_page = curr_page->next;
29849e08aacStbbdev page_allocator_traits::destroy(page_allocator, curr_page);
29949e08aacStbbdev page_allocator_traits::deallocate(page_allocator, curr_page, 1);
30049e08aacStbbdev curr_page = next_page;
30149e08aacStbbdev }
3028155aaebSkboyarinov head_counter.store(0, std::memory_order_relaxed);
3038155aaebSkboyarinov tail_counter.store(0, std::memory_order_relaxed);
3048155aaebSkboyarinov head_page.store(new_head, std::memory_order_relaxed);
3058155aaebSkboyarinov tail_page.store(new_tail, std::memory_order_relaxed);
3068155aaebSkboyarinov }
30749e08aacStbbdev
clear_and_invalidate(queue_allocator_type & allocator)3088155aaebSkboyarinov void clear_and_invalidate(queue_allocator_type& allocator) {
30949e08aacStbbdev padded_page* invalid_page = reinterpret_cast<padded_page*>(std::uintptr_t(1));
3108155aaebSkboyarinov clear(allocator, invalid_page, invalid_page);
31149e08aacStbbdev }
31249e08aacStbbdev
31349e08aacStbbdev private:
31449e08aacStbbdev // template <typename U, typename A>
31549e08aacStbbdev friend class micro_queue_pop_finalizer<self_type, value_type, page_allocator_type>;
31649e08aacStbbdev
31749e08aacStbbdev // Class used to ensure exception-safety of method "pop"
31849e08aacStbbdev class destroyer {
31949e08aacStbbdev value_type& my_value;
32049e08aacStbbdev public:
destroyer(reference value)32149e08aacStbbdev destroyer( reference value ) : my_value(value) {}
32249e08aacStbbdev destroyer( const destroyer& ) = delete;
32349e08aacStbbdev destroyer& operator=( const destroyer& ) = delete;
~destroyer()32449e08aacStbbdev ~destroyer() {my_value.~T();}
32549e08aacStbbdev }; // class destroyer
32649e08aacStbbdev
copy_item(padded_page & dst,size_type dindex,const padded_page & src,size_type sindex,item_constructor_type construct_item)32749e08aacStbbdev void copy_item( padded_page& dst, size_type dindex, const padded_page& src, size_type sindex,
32849e08aacStbbdev item_constructor_type construct_item )
32949e08aacStbbdev {
33049e08aacStbbdev auto& src_item = src[sindex];
33149e08aacStbbdev construct_item( &dst[dindex], static_cast<const void*>(&src_item) );
33249e08aacStbbdev }
33349e08aacStbbdev
assign_and_destroy_item(void * dst,padded_page & src,size_type index)33449e08aacStbbdev void assign_and_destroy_item( void* dst, padded_page& src, size_type index ) {
33549e08aacStbbdev auto& from = src[index];
33649e08aacStbbdev destroyer d(from);
33749e08aacStbbdev *static_cast<T*>(dst) = std::move(from);
33849e08aacStbbdev }
33949e08aacStbbdev
spin_wait_until_my_turn(std::atomic<ticket_type> & counter,ticket_type k,queue_rep_type & rb)34049e08aacStbbdev void spin_wait_until_my_turn( std::atomic<ticket_type>& counter, ticket_type k, queue_rep_type& rb ) const {
341*eac94650SAlex for (atomic_backoff b{};; b.pause()) {
342*eac94650SAlex ticket_type c = counter.load(std::memory_order_acquire);
34349e08aacStbbdev if (c == k) return;
34449e08aacStbbdev else if (c & 1) {
34549e08aacStbbdev ++rb.n_invalid_entries;
34649e08aacStbbdev throw_exception( exception_id::bad_last_alloc);
34749e08aacStbbdev }
34849e08aacStbbdev }
34949e08aacStbbdev }
35049e08aacStbbdev
35149e08aacStbbdev std::atomic<padded_page*> head_page{};
35249e08aacStbbdev std::atomic<ticket_type> head_counter{};
35349e08aacStbbdev
35449e08aacStbbdev std::atomic<padded_page*> tail_page{};
35549e08aacStbbdev std::atomic<ticket_type> tail_counter{};
35649e08aacStbbdev
35749e08aacStbbdev spin_mutex page_mutex{};
35849e08aacStbbdev }; // class micro_queue
35949e08aacStbbdev
36049e08aacStbbdev #if _MSC_VER && !defined(__INTEL_COMPILER)
36149e08aacStbbdev #pragma warning( pop )
36249e08aacStbbdev #endif // warning 4146 is back
36349e08aacStbbdev
36449e08aacStbbdev template <typename Container, typename T, typename Allocator>
36549e08aacStbbdev class micro_queue_pop_finalizer {
36649e08aacStbbdev public:
36749e08aacStbbdev using padded_page = typename Container::padded_page;
36849e08aacStbbdev using allocator_type = Allocator;
36949e08aacStbbdev using allocator_traits_type = tbb::detail::allocator_traits<allocator_type>;
37049e08aacStbbdev
micro_queue_pop_finalizer(Container & queue,Allocator & alloc,ticket_type k,padded_page * p)37149e08aacStbbdev micro_queue_pop_finalizer( Container& queue, Allocator& alloc, ticket_type k, padded_page* p ) :
37249e08aacStbbdev my_ticket_type(k), my_queue(queue), my_page(p), allocator(alloc)
37349e08aacStbbdev {}
37449e08aacStbbdev
37549e08aacStbbdev micro_queue_pop_finalizer( const micro_queue_pop_finalizer& ) = delete;
37649e08aacStbbdev micro_queue_pop_finalizer& operator=( const micro_queue_pop_finalizer& ) = delete;
37749e08aacStbbdev
~micro_queue_pop_finalizer()37849e08aacStbbdev ~micro_queue_pop_finalizer() {
37949e08aacStbbdev padded_page* p = my_page;
38049e08aacStbbdev if( is_valid_page(p) ) {
38149e08aacStbbdev spin_mutex::scoped_lock lock( my_queue.page_mutex );
38249e08aacStbbdev padded_page* q = p->next;
383*eac94650SAlex my_queue.head_page.store(q, std::memory_order_relaxed);
38449e08aacStbbdev if( !is_valid_page(q) ) {
385*eac94650SAlex my_queue.tail_page.store(nullptr, std::memory_order_relaxed);
38649e08aacStbbdev }
38749e08aacStbbdev }
388478de5b1Stbbdev my_queue.head_counter.store(my_ticket_type, std::memory_order_release);
38949e08aacStbbdev if ( is_valid_page(p) ) {
39049e08aacStbbdev allocator_traits_type::destroy(allocator, static_cast<padded_page*>(p));
39149e08aacStbbdev allocator_traits_type::deallocate(allocator, static_cast<padded_page*>(p), 1);
39249e08aacStbbdev }
39349e08aacStbbdev }
39449e08aacStbbdev private:
39549e08aacStbbdev ticket_type my_ticket_type;
39649e08aacStbbdev Container& my_queue;
39749e08aacStbbdev padded_page* my_page;
39849e08aacStbbdev Allocator& allocator;
39949e08aacStbbdev }; // class micro_queue_pop_finalizer
40049e08aacStbbdev
401b15aabb3Stbbdev #if _MSC_VER && !defined(__INTEL_COMPILER)
402b15aabb3Stbbdev // structure was padded due to alignment specifier
403b15aabb3Stbbdev #pragma warning( push )
404b15aabb3Stbbdev #pragma warning( disable: 4324 )
405b15aabb3Stbbdev #endif
406b15aabb3Stbbdev
40749e08aacStbbdev template <typename T, typename Allocator>
40849e08aacStbbdev struct concurrent_queue_rep {
40949e08aacStbbdev using self_type = concurrent_queue_rep<T, Allocator>;
41049e08aacStbbdev using size_type = std::size_t;
41149e08aacStbbdev using micro_queue_type = micro_queue<T, Allocator>;
41249e08aacStbbdev using allocator_type = Allocator;
41349e08aacStbbdev using allocator_traits_type = tbb::detail::allocator_traits<allocator_type>;
41449e08aacStbbdev using padded_page = typename micro_queue_type::padded_page;
41549e08aacStbbdev using page_allocator_type = typename micro_queue_type::page_allocator_type;
41649e08aacStbbdev using item_constructor_type = typename micro_queue_type::item_constructor_type;
41749e08aacStbbdev private:
41849e08aacStbbdev using page_allocator_traits = tbb::detail::allocator_traits<page_allocator_type>;
41949e08aacStbbdev using queue_allocator_type = typename allocator_traits_type::template rebind_alloc<self_type>;
42049e08aacStbbdev
42149e08aacStbbdev public:
42249e08aacStbbdev // must be power of 2
42349e08aacStbbdev static constexpr size_type n_queue = 8;
42449e08aacStbbdev // Approximately n_queue/golden ratio
42549e08aacStbbdev static constexpr size_type phi = 3;
42649e08aacStbbdev static constexpr size_type item_size = micro_queue_type::item_size;
42749e08aacStbbdev static constexpr size_type items_per_page = micro_queue_type::items_per_page;
42849e08aacStbbdev
concurrent_queue_repconcurrent_queue_rep429fbc48b39Svlserov concurrent_queue_rep() {}
43049e08aacStbbdev
43149e08aacStbbdev concurrent_queue_rep( const concurrent_queue_rep& ) = delete;
43249e08aacStbbdev concurrent_queue_rep& operator=( const concurrent_queue_rep& ) = delete;
43349e08aacStbbdev
clearconcurrent_queue_rep434fbc48b39Svlserov void clear( queue_allocator_type& alloc ) {
4358155aaebSkboyarinov for (size_type index = 0; index < n_queue; ++index) {
4368155aaebSkboyarinov array[index].clear(alloc);
43749e08aacStbbdev }
4388155aaebSkboyarinov head_counter.store(0, std::memory_order_relaxed);
4398155aaebSkboyarinov tail_counter.store(0, std::memory_order_relaxed);
4408155aaebSkboyarinov n_invalid_entries.store(0, std::memory_order_relaxed);
44149e08aacStbbdev }
44249e08aacStbbdev
assignconcurrent_queue_rep443fbc48b39Svlserov void assign( const concurrent_queue_rep& src, queue_allocator_type& alloc, item_constructor_type construct_item ) {
44449e08aacStbbdev head_counter.store(src.head_counter.load(std::memory_order_relaxed), std::memory_order_relaxed);
44549e08aacStbbdev tail_counter.store(src.tail_counter.load(std::memory_order_relaxed), std::memory_order_relaxed);
44649e08aacStbbdev n_invalid_entries.store(src.n_invalid_entries.load(std::memory_order_relaxed), std::memory_order_relaxed);
44749e08aacStbbdev
44849e08aacStbbdev // copy or move micro_queues
44949e08aacStbbdev size_type queue_idx = 0;
45049e08aacStbbdev try_call( [&] {
45149e08aacStbbdev for (; queue_idx < n_queue; ++queue_idx) {
452fbc48b39Svlserov array[queue_idx].assign(src.array[queue_idx], alloc, construct_item);
45349e08aacStbbdev }
45449e08aacStbbdev }).on_exception( [&] {
45549e08aacStbbdev for (size_type i = 0; i < queue_idx + 1; ++i) {
4568155aaebSkboyarinov array[i].clear_and_invalidate(alloc);
45749e08aacStbbdev }
45849e08aacStbbdev head_counter.store(0, std::memory_order_relaxed);
45949e08aacStbbdev tail_counter.store(0, std::memory_order_relaxed);
46049e08aacStbbdev n_invalid_entries.store(0, std::memory_order_relaxed);
46149e08aacStbbdev });
46249e08aacStbbdev
46349e08aacStbbdev __TBB_ASSERT(head_counter.load(std::memory_order_relaxed) == src.head_counter.load(std::memory_order_relaxed) &&
46449e08aacStbbdev tail_counter.load(std::memory_order_relaxed) == src.tail_counter.load(std::memory_order_relaxed),
46549e08aacStbbdev "the source concurrent queue should not be concurrently modified." );
46649e08aacStbbdev }
46749e08aacStbbdev
emptyconcurrent_queue_rep46849e08aacStbbdev bool empty() const {
46949e08aacStbbdev ticket_type tc = tail_counter.load(std::memory_order_acquire);
47049e08aacStbbdev ticket_type hc = head_counter.load(std::memory_order_relaxed);
47149e08aacStbbdev // if tc!=r.tail_counter, the queue was not empty at some point between the two reads.
47249e08aacStbbdev return tc == tail_counter.load(std::memory_order_relaxed) &&
47349e08aacStbbdev std::ptrdiff_t(tc - hc - n_invalid_entries.load(std::memory_order_relaxed)) <= 0;
47449e08aacStbbdev }
47549e08aacStbbdev
sizeconcurrent_queue_rep47649e08aacStbbdev std::ptrdiff_t size() const {
47757f524caSIlya Isaev __TBB_ASSERT(sizeof(std::ptrdiff_t) <= sizeof(size_type), nullptr);
47849e08aacStbbdev std::ptrdiff_t hc = head_counter.load(std::memory_order_acquire);
47949e08aacStbbdev std::ptrdiff_t tc = tail_counter.load(std::memory_order_relaxed);
48049e08aacStbbdev std::ptrdiff_t nie = n_invalid_entries.load(std::memory_order_relaxed);
48149e08aacStbbdev
48249e08aacStbbdev return tc - hc - nie;
48349e08aacStbbdev }
48449e08aacStbbdev
48549e08aacStbbdev friend class micro_queue<T, Allocator>;
48649e08aacStbbdev
48749e08aacStbbdev // Map ticket_type to an array index
indexconcurrent_queue_rep48849e08aacStbbdev static size_type index( ticket_type k ) {
48949e08aacStbbdev return k * phi % n_queue;
49049e08aacStbbdev }
49149e08aacStbbdev
chooseconcurrent_queue_rep49249e08aacStbbdev micro_queue_type& choose( ticket_type k ) {
49349e08aacStbbdev // The formula here approximates LRU in a cache-oblivious way.
49449e08aacStbbdev return array[index(k)];
49549e08aacStbbdev }
49649e08aacStbbdev
49749e08aacStbbdev alignas(max_nfs_size) micro_queue_type array[n_queue];
49849e08aacStbbdev
alignasconcurrent_queue_rep49949e08aacStbbdev alignas(max_nfs_size) std::atomic<ticket_type> head_counter{};
alignasconcurrent_queue_rep50049e08aacStbbdev alignas(max_nfs_size) std::atomic<ticket_type> tail_counter{};
alignasconcurrent_queue_rep50149e08aacStbbdev alignas(max_nfs_size) std::atomic<size_type> n_invalid_entries{};
50249e08aacStbbdev }; // class concurrent_queue_rep
50349e08aacStbbdev
504b15aabb3Stbbdev #if _MSC_VER && !defined(__INTEL_COMPILER)
505b15aabb3Stbbdev #pragma warning( pop )
506b15aabb3Stbbdev #endif
507b15aabb3Stbbdev
50849e08aacStbbdev template <typename Value, typename Allocator>
50949e08aacStbbdev class concurrent_queue_iterator_base {
51049e08aacStbbdev using queue_rep_type = concurrent_queue_rep<Value, Allocator>;
51149e08aacStbbdev using padded_page = typename queue_rep_type::padded_page;
51249e08aacStbbdev protected:
51349e08aacStbbdev concurrent_queue_iterator_base() = default;
51449e08aacStbbdev
concurrent_queue_iterator_base(const concurrent_queue_iterator_base & other)51549e08aacStbbdev concurrent_queue_iterator_base( const concurrent_queue_iterator_base& other ) {
51649e08aacStbbdev assign(other);
51749e08aacStbbdev }
51849e08aacStbbdev
concurrent_queue_iterator_base(queue_rep_type * queue_rep)51949e08aacStbbdev concurrent_queue_iterator_base( queue_rep_type* queue_rep )
52049e08aacStbbdev : my_queue_rep(queue_rep),
52149e08aacStbbdev my_head_counter(my_queue_rep->head_counter.load(std::memory_order_relaxed))
52249e08aacStbbdev {
52349e08aacStbbdev for (std::size_t i = 0; i < queue_rep_type::n_queue; ++i) {
52449e08aacStbbdev my_array[i] = my_queue_rep->array[i].get_head_page();
52549e08aacStbbdev }
52649e08aacStbbdev
52749e08aacStbbdev if (!get_item(my_item, my_head_counter)) advance();
52849e08aacStbbdev }
52949e08aacStbbdev
assign(const concurrent_queue_iterator_base & other)53049e08aacStbbdev void assign( const concurrent_queue_iterator_base& other ) {
53149e08aacStbbdev my_item = other.my_item;
53249e08aacStbbdev my_queue_rep = other.my_queue_rep;
53349e08aacStbbdev
53449e08aacStbbdev if (my_queue_rep != nullptr) {
53549e08aacStbbdev my_head_counter = other.my_head_counter;
53649e08aacStbbdev
53749e08aacStbbdev for (std::size_t i = 0; i < queue_rep_type::n_queue; ++i) {
53849e08aacStbbdev my_array[i] = other.my_array[i];
53949e08aacStbbdev }
54049e08aacStbbdev }
54149e08aacStbbdev }
54249e08aacStbbdev
advance()54349e08aacStbbdev void advance() {
54449e08aacStbbdev __TBB_ASSERT(my_item, "Attempt to increment iterator past end of the queue");
54549e08aacStbbdev std::size_t k = my_head_counter;
54649e08aacStbbdev #if TBB_USE_ASSERT
54749e08aacStbbdev Value* tmp;
54849e08aacStbbdev get_item(tmp, k);
54949e08aacStbbdev __TBB_ASSERT(my_item == tmp, nullptr);
55049e08aacStbbdev #endif
55149e08aacStbbdev std::size_t i = modulo_power_of_two(k / queue_rep_type::n_queue, my_queue_rep->items_per_page);
55249e08aacStbbdev if (i == my_queue_rep->items_per_page - 1) {
55349e08aacStbbdev padded_page*& root = my_array[queue_rep_type::index(k)];
55449e08aacStbbdev root = root->next;
55549e08aacStbbdev }
55649e08aacStbbdev // Advance k
55749e08aacStbbdev my_head_counter = ++k;
55849e08aacStbbdev if (!get_item(my_item, k)) advance();
55949e08aacStbbdev }
56049e08aacStbbdev
56149e08aacStbbdev concurrent_queue_iterator_base& operator=( const concurrent_queue_iterator_base& other ) {
56249e08aacStbbdev this->assign(other);
56349e08aacStbbdev return *this;
56449e08aacStbbdev }
56549e08aacStbbdev
get_item(Value * & item,std::size_t k)56649e08aacStbbdev bool get_item( Value*& item, std::size_t k ) {
56749e08aacStbbdev if (k == my_queue_rep->tail_counter.load(std::memory_order_relaxed)) {
56849e08aacStbbdev item = nullptr;
56949e08aacStbbdev return true;
57049e08aacStbbdev } else {
57149e08aacStbbdev padded_page* p = my_array[queue_rep_type::index(k)];
57249e08aacStbbdev __TBB_ASSERT(p, nullptr);
57349e08aacStbbdev std::size_t i = modulo_power_of_two(k / queue_rep_type::n_queue, my_queue_rep->items_per_page);
57449e08aacStbbdev item = &(*p)[i];
57549e08aacStbbdev return (p->mask & uintptr_t(1) << i) != 0;
57649e08aacStbbdev }
57749e08aacStbbdev }
57849e08aacStbbdev
57949e08aacStbbdev Value* my_item{ nullptr };
58049e08aacStbbdev queue_rep_type* my_queue_rep{ nullptr };
58149e08aacStbbdev ticket_type my_head_counter{};
582478de5b1Stbbdev padded_page* my_array[queue_rep_type::n_queue]{};
58349e08aacStbbdev }; // class concurrent_queue_iterator_base
58449e08aacStbbdev
58549e08aacStbbdev struct concurrent_queue_iterator_provider {
58649e08aacStbbdev template <typename Iterator, typename Container>
getconcurrent_queue_iterator_provider58749e08aacStbbdev static Iterator get( const Container& container ) {
58849e08aacStbbdev return Iterator(container);
58949e08aacStbbdev }
59049e08aacStbbdev }; // struct concurrent_queue_iterator_provider
59149e08aacStbbdev
59249e08aacStbbdev template <typename Container, typename Value, typename Allocator>
59349e08aacStbbdev class concurrent_queue_iterator : public concurrent_queue_iterator_base<typename std::remove_cv<Value>::type, Allocator> {
59449e08aacStbbdev using base_type = concurrent_queue_iterator_base<typename std::remove_cv<Value>::type, Allocator>;
59549e08aacStbbdev public:
59649e08aacStbbdev using value_type = Value;
59749e08aacStbbdev using pointer = value_type*;
59849e08aacStbbdev using reference = value_type&;
59949e08aacStbbdev using difference_type = std::ptrdiff_t;
60049e08aacStbbdev using iterator_category = std::forward_iterator_tag;
60149e08aacStbbdev
60249e08aacStbbdev concurrent_queue_iterator() = default;
60349e08aacStbbdev
60449e08aacStbbdev /** If Value==Container::value_type, then this routine is the copy constructor.
60549e08aacStbbdev If Value==const Container::value_type, then this routine is a conversion constructor. */
concurrent_queue_iterator(const concurrent_queue_iterator<Container,typename Container::value_type,Allocator> & other)60649e08aacStbbdev concurrent_queue_iterator( const concurrent_queue_iterator<Container, typename Container::value_type, Allocator>& other )
60749e08aacStbbdev : base_type(other) {}
60849e08aacStbbdev
60949e08aacStbbdev private:
concurrent_queue_iterator(const Container & container)61049e08aacStbbdev concurrent_queue_iterator( const Container& container )
61149e08aacStbbdev : base_type(container.my_queue_representation) {}
61249e08aacStbbdev public:
61349e08aacStbbdev concurrent_queue_iterator& operator=( const concurrent_queue_iterator<Container, typename Container::value_type, Allocator>& other ) {
61449e08aacStbbdev this->assign(other);
61549e08aacStbbdev return *this;
61649e08aacStbbdev }
61749e08aacStbbdev
61849e08aacStbbdev reference operator*() const {
61949e08aacStbbdev return *static_cast<pointer>(this->my_item);
62049e08aacStbbdev }
62149e08aacStbbdev
62249e08aacStbbdev pointer operator->() const { return &operator*(); }
62349e08aacStbbdev
62449e08aacStbbdev concurrent_queue_iterator& operator++() {
62549e08aacStbbdev this->advance();
62649e08aacStbbdev return *this;
62749e08aacStbbdev }
62849e08aacStbbdev
62949e08aacStbbdev concurrent_queue_iterator operator++(int) {
63049e08aacStbbdev concurrent_queue_iterator tmp = *this;
63149e08aacStbbdev ++*this;
63249e08aacStbbdev return tmp;
63349e08aacStbbdev }
63449e08aacStbbdev
63549e08aacStbbdev friend bool operator==( const concurrent_queue_iterator& lhs, const concurrent_queue_iterator& rhs ) {
63649e08aacStbbdev return lhs.my_item == rhs.my_item;
63749e08aacStbbdev }
63849e08aacStbbdev
63949e08aacStbbdev friend bool operator!=( const concurrent_queue_iterator& lhs, const concurrent_queue_iterator& rhs ) {
64049e08aacStbbdev return lhs.my_item != rhs.my_item;
64149e08aacStbbdev }
64249e08aacStbbdev private:
64349e08aacStbbdev friend struct concurrent_queue_iterator_provider;
64449e08aacStbbdev }; // class concurrent_queue_iterator
64549e08aacStbbdev
646fbc48b39Svlserov } // namespace d2
64749e08aacStbbdev } // namespace detail
64849e08aacStbbdev } // tbb
64949e08aacStbbdev
65049e08aacStbbdev #endif // __TBB_detail__concurrent_queue_base_H
651