1 /*
2 Copyright (c) 2022-2023 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16
17 #include "thread_dispatcher.h"
18 #include "threading_control.h"
19
20 namespace tbb {
21 namespace detail {
22 namespace r1 {
23
thread_dispatcher(threading_control & tc,unsigned hard_limit,std::size_t stack_size)24 thread_dispatcher::thread_dispatcher(threading_control& tc, unsigned hard_limit, std::size_t stack_size)
25 : my_threading_control(tc)
26 , my_num_workers_hard_limit(hard_limit)
27 , my_stack_size(stack_size)
28 {
29 my_server = governor::create_rml_server( *this );
30 __TBB_ASSERT( my_server, "Failed to create RML server" );
31 }
32
~thread_dispatcher()33 thread_dispatcher::~thread_dispatcher() {
34 poison_pointer(my_server);
35 }
36
select_next_client(thread_dispatcher_client * hint)37 thread_dispatcher_client* thread_dispatcher::select_next_client(thread_dispatcher_client* hint) {
38 unsigned next_client_priority_level = num_priority_levels;
39 if (hint) {
40 next_client_priority_level = hint->priority_level();
41 }
42
43 for (unsigned idx = 0; idx < next_client_priority_level; ++idx) {
44 if (!my_client_list[idx].empty()) {
45 return &*my_client_list[idx].begin();
46 }
47 }
48
49 return hint;
50 }
51
create_client(arena & a)52 thread_dispatcher_client* thread_dispatcher::create_client(arena& a) {
53 return new (cache_aligned_allocate(sizeof(thread_dispatcher_client))) thread_dispatcher_client(a, my_clients_aba_epoch);
54 }
55
56
register_client(thread_dispatcher_client * client)57 void thread_dispatcher::register_client(thread_dispatcher_client* client) {
58 client_list_mutex_type::scoped_lock lock(my_list_mutex);
59 insert_client(*client);
60 }
61
try_unregister_client(thread_dispatcher_client * client,std::uint64_t aba_epoch,unsigned priority)62 bool thread_dispatcher::try_unregister_client(thread_dispatcher_client* client, std::uint64_t aba_epoch, unsigned priority) {
63 __TBB_ASSERT(client, nullptr);
64 // we hold reference to the server, so market cannot be destroyed at any moment here
65 __TBB_ASSERT(!is_poisoned(my_server), nullptr);
66 my_list_mutex.lock();
67 for (auto& it : my_client_list[priority]) {
68 if (client == &it) {
69 if (it.get_aba_epoch() == aba_epoch) {
70 // Client is alive
71 // Acquire my_references to sync with threads that just left the arena
72 // Pay attention that references should be read before workers_requested because
73 // if references is no zero some other thread might call adjust_demand and lead to
74 // a race over workers_requested
75 if (!client->references() && !client->has_request()) {
76 // Client is abandoned. Destroy it.
77 remove_client(*client);
78 ++my_clients_aba_epoch;
79
80 my_list_mutex.unlock();
81 destroy_client(client);
82
83 return true;
84 }
85 }
86 break;
87 }
88 }
89 my_list_mutex.unlock();
90 return false;
91 }
92
destroy_client(thread_dispatcher_client * client)93 void thread_dispatcher::destroy_client(thread_dispatcher_client* client) {
94 client->~thread_dispatcher_client();
95 cache_aligned_deallocate(client);
96 }
97
98 // Should be called under lock
insert_client(thread_dispatcher_client & client)99 void thread_dispatcher::insert_client(thread_dispatcher_client& client) {
100 __TBB_ASSERT(client.priority_level() < num_priority_levels, nullptr);
101 my_client_list[client.priority_level()].push_front(client);
102
103 __TBB_ASSERT(!my_next_client || my_next_client->priority_level() < num_priority_levels, nullptr);
104 my_next_client = select_next_client(my_next_client);
105 }
106
107 // Should be called under lock
remove_client(thread_dispatcher_client & client)108 void thread_dispatcher::remove_client(thread_dispatcher_client& client) {
109 __TBB_ASSERT(client.priority_level() < num_priority_levels, nullptr);
110 my_client_list[client.priority_level()].remove(client);
111
112 if (my_next_client == &client) {
113 my_next_client = nullptr;
114 }
115 my_next_client = select_next_client(my_next_client);
116 }
117
is_client_alive(thread_dispatcher_client * client)118 bool thread_dispatcher::is_client_alive(thread_dispatcher_client* client) {
119 if (!client) {
120 return false;
121 }
122
123 // Still cannot access internals of the client since the object itself might be destroyed.
124 for (auto& priority_list : my_client_list) {
125 for (auto& c : priority_list) {
126 if (client == &c) {
127 return true;
128 }
129 }
130 }
131 return false;
132 }
133
client_in_need(client_list_type * clients,thread_dispatcher_client * hint)134 thread_dispatcher_client* thread_dispatcher::client_in_need(client_list_type* clients, thread_dispatcher_client* hint) {
135 // TODO: make sure client with higher priority returned only if there are available slots in it.
136 hint = select_next_client(hint);
137 if (!hint) {
138 return nullptr;
139 }
140
141 client_list_type::iterator it = hint;
142 unsigned curr_priority_level = hint->priority_level();
143 __TBB_ASSERT(it != clients[curr_priority_level].end(), nullptr);
144 do {
145 thread_dispatcher_client& t = *it;
146 if (++it == clients[curr_priority_level].end()) {
147 do {
148 ++curr_priority_level %= num_priority_levels;
149 } while (clients[curr_priority_level].empty());
150 it = clients[curr_priority_level].begin();
151 }
152 if (t.try_join()) {
153 return &t;
154 }
155 } while (it != hint);
156 return nullptr;
157 }
158
client_in_need(thread_dispatcher_client * prev)159 thread_dispatcher_client* thread_dispatcher::client_in_need(thread_dispatcher_client* prev) {
160 client_list_mutex_type::scoped_lock lock(my_list_mutex, /*is_writer=*/false);
161 if (is_client_alive(prev)) {
162 return client_in_need(my_client_list, prev);
163 }
164 return client_in_need(my_client_list, my_next_client);
165 }
166
adjust_job_count_estimate(int delta)167 void thread_dispatcher::adjust_job_count_estimate(int delta) {
168 my_server->adjust_job_count_estimate(delta);
169 }
170
release(bool blocking_terminate)171 void thread_dispatcher::release(bool blocking_terminate) {
172 my_join_workers = blocking_terminate;
173 my_server->request_close_connection();
174 }
175
process(job & j)176 void thread_dispatcher::process(job& j) {
177 thread_data& td = static_cast<thread_data&>(j);
178 // td.my_last_client can be dead. Don't access it until client_in_need is called
179 thread_dispatcher_client* client = td.my_last_client;
180 for (int i = 0; i < 2; ++i) {
181 while ((client = client_in_need(client)) ) {
182 td.my_last_client = client;
183 client->process(td);
184 }
185 // Workers leave thread_dispatcher because there is no client in need. It can happen earlier than
186 // adjust_job_count_estimate() decreases my_slack and RML can put this thread to sleep.
187 // It might result in a busy-loop checking for my_slack<0 and calling this method instantly.
188 // the yield refines this spinning.
189 if ( !i ) {
190 yield();
191 }
192 }
193 }
194
195
196 //! Used when RML asks for join mode during workers termination.
must_join_workers() const197 bool thread_dispatcher::must_join_workers() const { return my_join_workers; }
198
199 //! Returns the requested stack size of worker threads.
worker_stack_size() const200 std::size_t thread_dispatcher::worker_stack_size() const { return my_stack_size; }
201
acknowledge_close_connection()202 void thread_dispatcher::acknowledge_close_connection() {
203 my_threading_control.destroy();
204 }
205
create_one_job()206 ::rml::job* thread_dispatcher::create_one_job() {
207 unsigned short index = ++my_first_unused_worker_idx;
208 __TBB_ASSERT(index > 0, nullptr);
209 ITT_THREAD_SET_NAME(_T("TBB Worker Thread"));
210 // index serves as a hint decreasing conflicts between workers when they migrate between arenas
211 thread_data* td = new (cache_aligned_allocate(sizeof(thread_data))) thread_data{ index, true };
212 __TBB_ASSERT(index <= my_num_workers_hard_limit, nullptr);
213 my_threading_control.register_thread(*td);
214 return td;
215 }
216
cleanup(job & j)217 void thread_dispatcher::cleanup(job& j) {
218 my_threading_control.unregister_thread(static_cast<thread_data&>(j));
219 governor::auto_terminate(&j);
220 }
221
222 } // namespace r1
223 } // namespace detail
224 } // namespace tbb
225