151c0b2f7Stbbdev /*
2b15aabb3Stbbdev Copyright (c) 2020-2021 Intel Corporation
351c0b2f7Stbbdev
451c0b2f7Stbbdev Licensed under the Apache License, Version 2.0 (the "License");
551c0b2f7Stbbdev you may not use this file except in compliance with the License.
651c0b2f7Stbbdev You may obtain a copy of the License at
751c0b2f7Stbbdev
851c0b2f7Stbbdev http://www.apache.org/licenses/LICENSE-2.0
951c0b2f7Stbbdev
1051c0b2f7Stbbdev Unless required by applicable law or agreed to in writing, software
1151c0b2f7Stbbdev distributed under the License is distributed on an "AS IS" BASIS,
1251c0b2f7Stbbdev WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1351c0b2f7Stbbdev See the License for the specific language governing permissions and
1451c0b2f7Stbbdev limitations under the License.
1551c0b2f7Stbbdev */
1651c0b2f7Stbbdev
1749e08aacStbbdev #include "oneapi/tbb/detail/_utils.h"
1849e08aacStbbdev #include "oneapi/tbb/concurrent_queue.h"
1949e08aacStbbdev #include "oneapi/tbb/cache_aligned_allocator.h"
2051c0b2f7Stbbdev #include "concurrent_monitor.h"
2151c0b2f7Stbbdev
2251c0b2f7Stbbdev namespace tbb {
2351c0b2f7Stbbdev namespace detail {
2451c0b2f7Stbbdev namespace r1 {
2551c0b2f7Stbbdev
2651c0b2f7Stbbdev static constexpr std::size_t monitors_number = 2;
2751c0b2f7Stbbdev
allocate_bounded_queue_rep(std::size_t queue_rep_size)2851c0b2f7Stbbdev std::uint8_t* __TBB_EXPORTED_FUNC allocate_bounded_queue_rep( std::size_t queue_rep_size )
2951c0b2f7Stbbdev {
3051c0b2f7Stbbdev std::size_t monitors_mem_size = sizeof(concurrent_monitor) * monitors_number;
3151c0b2f7Stbbdev std::uint8_t* mem = static_cast<std::uint8_t*>(cache_aligned_allocate(queue_rep_size + monitors_mem_size));
3251c0b2f7Stbbdev
3351c0b2f7Stbbdev concurrent_monitor* monitors = reinterpret_cast<concurrent_monitor*>(mem + queue_rep_size);
3451c0b2f7Stbbdev for (std::size_t i = 0; i < monitors_number; ++i) {
3551c0b2f7Stbbdev new (monitors + i) concurrent_monitor();
3651c0b2f7Stbbdev }
3751c0b2f7Stbbdev
3851c0b2f7Stbbdev return mem;
3951c0b2f7Stbbdev }
4051c0b2f7Stbbdev
deallocate_bounded_queue_rep(std::uint8_t * mem,std::size_t queue_rep_size)4151c0b2f7Stbbdev void __TBB_EXPORTED_FUNC deallocate_bounded_queue_rep( std::uint8_t* mem, std::size_t queue_rep_size )
4251c0b2f7Stbbdev {
4351c0b2f7Stbbdev concurrent_monitor* monitors = reinterpret_cast<concurrent_monitor*>(mem + queue_rep_size);
4451c0b2f7Stbbdev for (std::size_t i = 0; i < monitors_number; ++i) {
4551c0b2f7Stbbdev monitors[i].~concurrent_monitor();
4651c0b2f7Stbbdev }
4751c0b2f7Stbbdev
4851c0b2f7Stbbdev cache_aligned_deallocate(mem);
4951c0b2f7Stbbdev }
5051c0b2f7Stbbdev
wait_bounded_queue_monitor(concurrent_monitor * monitors,std::size_t monitor_tag,std::ptrdiff_t target,d1::delegate_base & predicate)5151c0b2f7Stbbdev void __TBB_EXPORTED_FUNC wait_bounded_queue_monitor( concurrent_monitor* monitors, std::size_t monitor_tag,
5251c0b2f7Stbbdev std::ptrdiff_t target, d1::delegate_base& predicate )
5351c0b2f7Stbbdev {
5451c0b2f7Stbbdev __TBB_ASSERT(monitor_tag < monitors_number, nullptr);
5551c0b2f7Stbbdev concurrent_monitor& monitor = monitors[monitor_tag];
5651c0b2f7Stbbdev
578dcbd5b1Stbbdev monitor.wait<concurrent_monitor::thread_context>([&] { return !predicate(); }, std::uintptr_t(target));
5851c0b2f7Stbbdev }
5951c0b2f7Stbbdev
abort_bounded_queue_monitors(concurrent_monitor * monitors)6051c0b2f7Stbbdev void __TBB_EXPORTED_FUNC abort_bounded_queue_monitors( concurrent_monitor* monitors ) {
61*fbc48b39Svlserov concurrent_monitor& items_avail = monitors[d2::cbq_items_avail_tag];
62*fbc48b39Svlserov concurrent_monitor& slots_avail = monitors[d2::cbq_slots_avail_tag];
6351c0b2f7Stbbdev
6451c0b2f7Stbbdev items_avail.abort_all();
6551c0b2f7Stbbdev slots_avail.abort_all();
6651c0b2f7Stbbdev }
6751c0b2f7Stbbdev
6851c0b2f7Stbbdev struct predicate_leq {
6951c0b2f7Stbbdev std::size_t my_ticket;
predicate_leqtbb::detail::r1::predicate_leq7051c0b2f7Stbbdev predicate_leq( std::size_t ticket ) : my_ticket(ticket) {}
operator ()tbb::detail::r1::predicate_leq7151c0b2f7Stbbdev bool operator() ( std::uintptr_t ticket ) const { return static_cast<std::size_t>(ticket) <= my_ticket; }
7251c0b2f7Stbbdev };
7351c0b2f7Stbbdev
notify_bounded_queue_monitor(concurrent_monitor * monitors,std::size_t monitor_tag,std::size_t ticket)7451c0b2f7Stbbdev void __TBB_EXPORTED_FUNC notify_bounded_queue_monitor( concurrent_monitor* monitors,
7551c0b2f7Stbbdev std::size_t monitor_tag, std::size_t ticket)
7651c0b2f7Stbbdev {
7751c0b2f7Stbbdev __TBB_ASSERT(monitor_tag < monitors_number, nullptr);
7851c0b2f7Stbbdev concurrent_monitor& monitor = monitors[monitor_tag];
7951c0b2f7Stbbdev monitor.notify(predicate_leq(ticket));
8051c0b2f7Stbbdev }
8151c0b2f7Stbbdev
8251c0b2f7Stbbdev } // namespace r1
8351c0b2f7Stbbdev } // namespace detail
8451c0b2f7Stbbdev } // namespace tbb
85