1*51c0b2f7Stbbdev /* 2*51c0b2f7Stbbdev Copyright (c) 2020 Intel Corporation 3*51c0b2f7Stbbdev 4*51c0b2f7Stbbdev Licensed under the Apache License, Version 2.0 (the "License"); 5*51c0b2f7Stbbdev you may not use this file except in compliance with the License. 6*51c0b2f7Stbbdev You may obtain a copy of the License at 7*51c0b2f7Stbbdev 8*51c0b2f7Stbbdev http://www.apache.org/licenses/LICENSE-2.0 9*51c0b2f7Stbbdev 10*51c0b2f7Stbbdev Unless required by applicable law or agreed to in writing, software 11*51c0b2f7Stbbdev distributed under the License is distributed on an "AS IS" BASIS, 12*51c0b2f7Stbbdev WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13*51c0b2f7Stbbdev See the License for the specific language governing permissions and 14*51c0b2f7Stbbdev limitations under the License. 15*51c0b2f7Stbbdev */ 16*51c0b2f7Stbbdev 17*51c0b2f7Stbbdev #include "tbb/detail/_utils.h" 18*51c0b2f7Stbbdev #include "tbb/concurrent_queue.h" 19*51c0b2f7Stbbdev #include "tbb/cache_aligned_allocator.h" 20*51c0b2f7Stbbdev #include "concurrent_monitor.h" 21*51c0b2f7Stbbdev 22*51c0b2f7Stbbdev namespace tbb { 23*51c0b2f7Stbbdev namespace detail { 24*51c0b2f7Stbbdev namespace r1 { 25*51c0b2f7Stbbdev 26*51c0b2f7Stbbdev static constexpr std::size_t monitors_number = 2; 27*51c0b2f7Stbbdev 28*51c0b2f7Stbbdev std::uint8_t* __TBB_EXPORTED_FUNC allocate_bounded_queue_rep( std::size_t queue_rep_size ) 29*51c0b2f7Stbbdev { 30*51c0b2f7Stbbdev std::size_t monitors_mem_size = sizeof(concurrent_monitor) * monitors_number; 31*51c0b2f7Stbbdev std::uint8_t* mem = static_cast<std::uint8_t*>(cache_aligned_allocate(queue_rep_size + monitors_mem_size)); 32*51c0b2f7Stbbdev 33*51c0b2f7Stbbdev concurrent_monitor* monitors = reinterpret_cast<concurrent_monitor*>(mem + queue_rep_size); 34*51c0b2f7Stbbdev for (std::size_t i = 0; i < monitors_number; ++i) { 35*51c0b2f7Stbbdev new (monitors + i) concurrent_monitor(); 36*51c0b2f7Stbbdev } 37*51c0b2f7Stbbdev 38*51c0b2f7Stbbdev return mem; 39*51c0b2f7Stbbdev } 40*51c0b2f7Stbbdev 41*51c0b2f7Stbbdev void __TBB_EXPORTED_FUNC deallocate_bounded_queue_rep( std::uint8_t* mem, std::size_t queue_rep_size ) 42*51c0b2f7Stbbdev { 43*51c0b2f7Stbbdev concurrent_monitor* monitors = reinterpret_cast<concurrent_monitor*>(mem + queue_rep_size); 44*51c0b2f7Stbbdev for (std::size_t i = 0; i < monitors_number; ++i) { 45*51c0b2f7Stbbdev monitors[i].~concurrent_monitor(); 46*51c0b2f7Stbbdev } 47*51c0b2f7Stbbdev 48*51c0b2f7Stbbdev cache_aligned_deallocate(mem); 49*51c0b2f7Stbbdev } 50*51c0b2f7Stbbdev 51*51c0b2f7Stbbdev static bool call_predicate( d1::delegate_base& predicate, concurrent_monitor& monitor, concurrent_monitor::thread_context& thr_ctx ) { 52*51c0b2f7Stbbdev bool res = false; 53*51c0b2f7Stbbdev tbb::detail::d0::try_call( [&] { 54*51c0b2f7Stbbdev res = predicate(); 55*51c0b2f7Stbbdev }).on_exception( [&] { 56*51c0b2f7Stbbdev monitor.cancel_wait(thr_ctx); 57*51c0b2f7Stbbdev }); 58*51c0b2f7Stbbdev 59*51c0b2f7Stbbdev return res; 60*51c0b2f7Stbbdev } 61*51c0b2f7Stbbdev 62*51c0b2f7Stbbdev void __TBB_EXPORTED_FUNC wait_bounded_queue_monitor( concurrent_monitor* monitors, std::size_t monitor_tag, 63*51c0b2f7Stbbdev std::ptrdiff_t target, d1::delegate_base& predicate ) 64*51c0b2f7Stbbdev { 65*51c0b2f7Stbbdev __TBB_ASSERT(monitor_tag < monitors_number, nullptr); 66*51c0b2f7Stbbdev concurrent_monitor& monitor = monitors[monitor_tag]; 67*51c0b2f7Stbbdev 68*51c0b2f7Stbbdev concurrent_monitor::thread_context thr_ctx; 69*51c0b2f7Stbbdev monitor.prepare_wait(thr_ctx, target); 70*51c0b2f7Stbbdev while (call_predicate(predicate, monitor, thr_ctx)) { 71*51c0b2f7Stbbdev if (monitor.commit_wait(thr_ctx)) { 72*51c0b2f7Stbbdev return; 73*51c0b2f7Stbbdev } 74*51c0b2f7Stbbdev monitor.prepare_wait(thr_ctx, target); 75*51c0b2f7Stbbdev } 76*51c0b2f7Stbbdev 77*51c0b2f7Stbbdev monitor.cancel_wait(thr_ctx); 78*51c0b2f7Stbbdev } 79*51c0b2f7Stbbdev 80*51c0b2f7Stbbdev void __TBB_EXPORTED_FUNC abort_bounded_queue_monitors( concurrent_monitor* monitors ) { 81*51c0b2f7Stbbdev concurrent_monitor& items_avail = monitors[d1::cbq_items_avail_tag]; 82*51c0b2f7Stbbdev concurrent_monitor& slots_avail = monitors[d1::cbq_slots_avail_tag]; 83*51c0b2f7Stbbdev 84*51c0b2f7Stbbdev items_avail.abort_all(); 85*51c0b2f7Stbbdev slots_avail.abort_all(); 86*51c0b2f7Stbbdev } 87*51c0b2f7Stbbdev 88*51c0b2f7Stbbdev struct predicate_leq { 89*51c0b2f7Stbbdev std::size_t my_ticket; 90*51c0b2f7Stbbdev predicate_leq( std::size_t ticket ) : my_ticket(ticket) {} 91*51c0b2f7Stbbdev bool operator() ( std::uintptr_t ticket ) const { return static_cast<std::size_t>(ticket) <= my_ticket; } 92*51c0b2f7Stbbdev }; 93*51c0b2f7Stbbdev 94*51c0b2f7Stbbdev void __TBB_EXPORTED_FUNC notify_bounded_queue_monitor( concurrent_monitor* monitors, 95*51c0b2f7Stbbdev std::size_t monitor_tag, std::size_t ticket) 96*51c0b2f7Stbbdev { 97*51c0b2f7Stbbdev __TBB_ASSERT(monitor_tag < monitors_number, nullptr); 98*51c0b2f7Stbbdev concurrent_monitor& monitor = monitors[monitor_tag]; 99*51c0b2f7Stbbdev monitor.notify(predicate_leq(ticket)); 100*51c0b2f7Stbbdev } 101*51c0b2f7Stbbdev 102*51c0b2f7Stbbdev } // namespace r1 103*51c0b2f7Stbbdev } // namespace detail 104*51c0b2f7Stbbdev } // namespace tbb 105