1*51c0b2f7Stbbdev /*
2*51c0b2f7Stbbdev     Copyright (c) 2020 Intel Corporation
3*51c0b2f7Stbbdev 
4*51c0b2f7Stbbdev     Licensed under the Apache License, Version 2.0 (the "License");
5*51c0b2f7Stbbdev     you may not use this file except in compliance with the License.
6*51c0b2f7Stbbdev     You may obtain a copy of the License at
7*51c0b2f7Stbbdev 
8*51c0b2f7Stbbdev         http://www.apache.org/licenses/LICENSE-2.0
9*51c0b2f7Stbbdev 
10*51c0b2f7Stbbdev     Unless required by applicable law or agreed to in writing, software
11*51c0b2f7Stbbdev     distributed under the License is distributed on an "AS IS" BASIS,
12*51c0b2f7Stbbdev     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13*51c0b2f7Stbbdev     See the License for the specific language governing permissions and
14*51c0b2f7Stbbdev     limitations under the License.
15*51c0b2f7Stbbdev */
16*51c0b2f7Stbbdev 
17*51c0b2f7Stbbdev #include "tbb/detail/_utils.h"
18*51c0b2f7Stbbdev #include "tbb/concurrent_queue.h"
19*51c0b2f7Stbbdev #include "tbb/cache_aligned_allocator.h"
20*51c0b2f7Stbbdev #include "concurrent_monitor.h"
21*51c0b2f7Stbbdev 
22*51c0b2f7Stbbdev namespace tbb {
23*51c0b2f7Stbbdev namespace detail {
24*51c0b2f7Stbbdev namespace r1 {
25*51c0b2f7Stbbdev 
26*51c0b2f7Stbbdev static constexpr std::size_t monitors_number = 2;
27*51c0b2f7Stbbdev 
28*51c0b2f7Stbbdev std::uint8_t* __TBB_EXPORTED_FUNC allocate_bounded_queue_rep( std::size_t queue_rep_size )
29*51c0b2f7Stbbdev {
30*51c0b2f7Stbbdev     std::size_t monitors_mem_size = sizeof(concurrent_monitor) * monitors_number;
31*51c0b2f7Stbbdev     std::uint8_t* mem = static_cast<std::uint8_t*>(cache_aligned_allocate(queue_rep_size + monitors_mem_size));
32*51c0b2f7Stbbdev 
33*51c0b2f7Stbbdev     concurrent_monitor* monitors = reinterpret_cast<concurrent_monitor*>(mem + queue_rep_size);
34*51c0b2f7Stbbdev     for (std::size_t i = 0; i < monitors_number; ++i) {
35*51c0b2f7Stbbdev         new (monitors + i) concurrent_monitor();
36*51c0b2f7Stbbdev     }
37*51c0b2f7Stbbdev 
38*51c0b2f7Stbbdev     return mem;
39*51c0b2f7Stbbdev }
40*51c0b2f7Stbbdev 
41*51c0b2f7Stbbdev void __TBB_EXPORTED_FUNC deallocate_bounded_queue_rep( std::uint8_t* mem, std::size_t queue_rep_size )
42*51c0b2f7Stbbdev {
43*51c0b2f7Stbbdev     concurrent_monitor* monitors = reinterpret_cast<concurrent_monitor*>(mem + queue_rep_size);
44*51c0b2f7Stbbdev     for (std::size_t i = 0; i < monitors_number; ++i) {
45*51c0b2f7Stbbdev         monitors[i].~concurrent_monitor();
46*51c0b2f7Stbbdev     }
47*51c0b2f7Stbbdev 
48*51c0b2f7Stbbdev     cache_aligned_deallocate(mem);
49*51c0b2f7Stbbdev }
50*51c0b2f7Stbbdev 
51*51c0b2f7Stbbdev static bool call_predicate( d1::delegate_base& predicate, concurrent_monitor& monitor, concurrent_monitor::thread_context& thr_ctx ) {
52*51c0b2f7Stbbdev     bool res = false;
53*51c0b2f7Stbbdev     tbb::detail::d0::try_call( [&] {
54*51c0b2f7Stbbdev         res = predicate();
55*51c0b2f7Stbbdev     }).on_exception( [&] {
56*51c0b2f7Stbbdev         monitor.cancel_wait(thr_ctx);
57*51c0b2f7Stbbdev     });
58*51c0b2f7Stbbdev 
59*51c0b2f7Stbbdev     return res;
60*51c0b2f7Stbbdev }
61*51c0b2f7Stbbdev 
62*51c0b2f7Stbbdev void __TBB_EXPORTED_FUNC wait_bounded_queue_monitor( concurrent_monitor* monitors, std::size_t monitor_tag,
63*51c0b2f7Stbbdev                                                         std::ptrdiff_t target, d1::delegate_base& predicate )
64*51c0b2f7Stbbdev {
65*51c0b2f7Stbbdev     __TBB_ASSERT(monitor_tag < monitors_number, nullptr);
66*51c0b2f7Stbbdev     concurrent_monitor& monitor = monitors[monitor_tag];
67*51c0b2f7Stbbdev 
68*51c0b2f7Stbbdev     concurrent_monitor::thread_context thr_ctx;
69*51c0b2f7Stbbdev     monitor.prepare_wait(thr_ctx, target);
70*51c0b2f7Stbbdev     while (call_predicate(predicate, monitor, thr_ctx)) {
71*51c0b2f7Stbbdev         if (monitor.commit_wait(thr_ctx)) {
72*51c0b2f7Stbbdev             return;
73*51c0b2f7Stbbdev         }
74*51c0b2f7Stbbdev         monitor.prepare_wait(thr_ctx, target);
75*51c0b2f7Stbbdev     }
76*51c0b2f7Stbbdev 
77*51c0b2f7Stbbdev     monitor.cancel_wait(thr_ctx);
78*51c0b2f7Stbbdev }
79*51c0b2f7Stbbdev 
80*51c0b2f7Stbbdev void __TBB_EXPORTED_FUNC abort_bounded_queue_monitors( concurrent_monitor* monitors ) {
81*51c0b2f7Stbbdev     concurrent_monitor& items_avail = monitors[d1::cbq_items_avail_tag];
82*51c0b2f7Stbbdev     concurrent_monitor& slots_avail = monitors[d1::cbq_slots_avail_tag];
83*51c0b2f7Stbbdev 
84*51c0b2f7Stbbdev     items_avail.abort_all();
85*51c0b2f7Stbbdev     slots_avail.abort_all();
86*51c0b2f7Stbbdev }
87*51c0b2f7Stbbdev 
88*51c0b2f7Stbbdev struct predicate_leq {
89*51c0b2f7Stbbdev     std::size_t my_ticket;
90*51c0b2f7Stbbdev     predicate_leq( std::size_t ticket ) : my_ticket(ticket) {}
91*51c0b2f7Stbbdev     bool operator() ( std::uintptr_t ticket ) const { return static_cast<std::size_t>(ticket) <= my_ticket; }
92*51c0b2f7Stbbdev };
93*51c0b2f7Stbbdev 
94*51c0b2f7Stbbdev void __TBB_EXPORTED_FUNC notify_bounded_queue_monitor( concurrent_monitor* monitors,
95*51c0b2f7Stbbdev                                                                std::size_t monitor_tag, std::size_t ticket)
96*51c0b2f7Stbbdev {
97*51c0b2f7Stbbdev     __TBB_ASSERT(monitor_tag < monitors_number, nullptr);
98*51c0b2f7Stbbdev     concurrent_monitor& monitor = monitors[monitor_tag];
99*51c0b2f7Stbbdev     monitor.notify(predicate_leq(ticket));
100*51c0b2f7Stbbdev }
101*51c0b2f7Stbbdev 
102*51c0b2f7Stbbdev } // namespace r1
103*51c0b2f7Stbbdev } // namespace detail
104*51c0b2f7Stbbdev } // namespace tbb
105