151c0b2f7Stbbdev /*
251c0b2f7Stbbdev     Copyright (c) 2020 Intel Corporation
351c0b2f7Stbbdev 
451c0b2f7Stbbdev     Licensed under the Apache License, Version 2.0 (the "License");
551c0b2f7Stbbdev     you may not use this file except in compliance with the License.
651c0b2f7Stbbdev     You may obtain a copy of the License at
751c0b2f7Stbbdev 
851c0b2f7Stbbdev         http://www.apache.org/licenses/LICENSE-2.0
951c0b2f7Stbbdev 
1051c0b2f7Stbbdev     Unless required by applicable law or agreed to in writing, software
1151c0b2f7Stbbdev     distributed under the License is distributed on an "AS IS" BASIS,
1251c0b2f7Stbbdev     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1351c0b2f7Stbbdev     See the License for the specific language governing permissions and
1451c0b2f7Stbbdev     limitations under the License.
1551c0b2f7Stbbdev */
1651c0b2f7Stbbdev 
17*49e08aacStbbdev #include "oneapi/tbb/detail/_utils.h"
18*49e08aacStbbdev #include "oneapi/tbb/concurrent_queue.h"
19*49e08aacStbbdev #include "oneapi/tbb/cache_aligned_allocator.h"
2051c0b2f7Stbbdev #include "concurrent_monitor.h"
2151c0b2f7Stbbdev 
2251c0b2f7Stbbdev namespace tbb {
2351c0b2f7Stbbdev namespace detail {
2451c0b2f7Stbbdev namespace r1 {
2551c0b2f7Stbbdev 
2651c0b2f7Stbbdev static constexpr std::size_t monitors_number = 2;
2751c0b2f7Stbbdev 
2851c0b2f7Stbbdev std::uint8_t* __TBB_EXPORTED_FUNC allocate_bounded_queue_rep( std::size_t queue_rep_size )
2951c0b2f7Stbbdev {
3051c0b2f7Stbbdev     std::size_t monitors_mem_size = sizeof(concurrent_monitor) * monitors_number;
3151c0b2f7Stbbdev     std::uint8_t* mem = static_cast<std::uint8_t*>(cache_aligned_allocate(queue_rep_size + monitors_mem_size));
3251c0b2f7Stbbdev 
3351c0b2f7Stbbdev     concurrent_monitor* monitors = reinterpret_cast<concurrent_monitor*>(mem + queue_rep_size);
3451c0b2f7Stbbdev     for (std::size_t i = 0; i < monitors_number; ++i) {
3551c0b2f7Stbbdev         new (monitors + i) concurrent_monitor();
3651c0b2f7Stbbdev     }
3751c0b2f7Stbbdev 
3851c0b2f7Stbbdev     return mem;
3951c0b2f7Stbbdev }
4051c0b2f7Stbbdev 
4151c0b2f7Stbbdev void __TBB_EXPORTED_FUNC deallocate_bounded_queue_rep( std::uint8_t* mem, std::size_t queue_rep_size )
4251c0b2f7Stbbdev {
4351c0b2f7Stbbdev     concurrent_monitor* monitors = reinterpret_cast<concurrent_monitor*>(mem + queue_rep_size);
4451c0b2f7Stbbdev     for (std::size_t i = 0; i < monitors_number; ++i) {
4551c0b2f7Stbbdev         monitors[i].~concurrent_monitor();
4651c0b2f7Stbbdev     }
4751c0b2f7Stbbdev 
4851c0b2f7Stbbdev     cache_aligned_deallocate(mem);
4951c0b2f7Stbbdev }
5051c0b2f7Stbbdev 
5151c0b2f7Stbbdev static bool call_predicate( d1::delegate_base& predicate, concurrent_monitor& monitor, concurrent_monitor::thread_context& thr_ctx ) {
5251c0b2f7Stbbdev     bool res = false;
5351c0b2f7Stbbdev     tbb::detail::d0::try_call( [&] {
5451c0b2f7Stbbdev         res = predicate();
5551c0b2f7Stbbdev     }).on_exception( [&] {
5651c0b2f7Stbbdev         monitor.cancel_wait(thr_ctx);
5751c0b2f7Stbbdev     });
5851c0b2f7Stbbdev 
5951c0b2f7Stbbdev     return res;
6051c0b2f7Stbbdev }
6151c0b2f7Stbbdev 
6251c0b2f7Stbbdev void __TBB_EXPORTED_FUNC wait_bounded_queue_monitor( concurrent_monitor* monitors, std::size_t monitor_tag,
6351c0b2f7Stbbdev                                                         std::ptrdiff_t target, d1::delegate_base& predicate )
6451c0b2f7Stbbdev {
6551c0b2f7Stbbdev     __TBB_ASSERT(monitor_tag < monitors_number, nullptr);
6651c0b2f7Stbbdev     concurrent_monitor& monitor = monitors[monitor_tag];
6751c0b2f7Stbbdev 
6851c0b2f7Stbbdev     concurrent_monitor::thread_context thr_ctx;
69*49e08aacStbbdev     monitor.prepare_wait(thr_ctx, std::uintptr_t(target));
7051c0b2f7Stbbdev     while (call_predicate(predicate, monitor, thr_ctx)) {
7151c0b2f7Stbbdev         if (monitor.commit_wait(thr_ctx)) {
7251c0b2f7Stbbdev             return;
7351c0b2f7Stbbdev         }
74*49e08aacStbbdev         monitor.prepare_wait(thr_ctx, std::uintptr_t(target));
7551c0b2f7Stbbdev     }
7651c0b2f7Stbbdev 
7751c0b2f7Stbbdev     monitor.cancel_wait(thr_ctx);
7851c0b2f7Stbbdev }
7951c0b2f7Stbbdev 
8051c0b2f7Stbbdev void __TBB_EXPORTED_FUNC abort_bounded_queue_monitors( concurrent_monitor* monitors ) {
8151c0b2f7Stbbdev     concurrent_monitor& items_avail = monitors[d1::cbq_items_avail_tag];
8251c0b2f7Stbbdev     concurrent_monitor& slots_avail = monitors[d1::cbq_slots_avail_tag];
8351c0b2f7Stbbdev 
8451c0b2f7Stbbdev     items_avail.abort_all();
8551c0b2f7Stbbdev     slots_avail.abort_all();
8651c0b2f7Stbbdev }
8751c0b2f7Stbbdev 
8851c0b2f7Stbbdev struct predicate_leq {
8951c0b2f7Stbbdev     std::size_t my_ticket;
9051c0b2f7Stbbdev     predicate_leq( std::size_t ticket ) : my_ticket(ticket) {}
9151c0b2f7Stbbdev     bool operator() ( std::uintptr_t ticket ) const { return static_cast<std::size_t>(ticket) <= my_ticket; }
9251c0b2f7Stbbdev };
9351c0b2f7Stbbdev 
9451c0b2f7Stbbdev void __TBB_EXPORTED_FUNC notify_bounded_queue_monitor( concurrent_monitor* monitors,
9551c0b2f7Stbbdev                                                                std::size_t monitor_tag, std::size_t ticket)
9651c0b2f7Stbbdev {
9751c0b2f7Stbbdev     __TBB_ASSERT(monitor_tag < monitors_number, nullptr);
9851c0b2f7Stbbdev     concurrent_monitor& monitor = monitors[monitor_tag];
9951c0b2f7Stbbdev     monitor.notify(predicate_leq(ticket));
10051c0b2f7Stbbdev }
10151c0b2f7Stbbdev 
10251c0b2f7Stbbdev } // namespace r1
10351c0b2f7Stbbdev } // namespace detail
10451c0b2f7Stbbdev } // namespace tbb
105