xref: /oneTBB/src/tbb/thread_dispatcher.cpp (revision c4568449)
1 /*
2     Copyright (c) 2022-2023 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #include "thread_dispatcher.h"
18 #include "threading_control.h"
19 
20 namespace tbb {
21 namespace detail {
22 namespace r1 {
23 
thread_dispatcher(threading_control & tc,unsigned hard_limit,std::size_t stack_size)24 thread_dispatcher::thread_dispatcher(threading_control& tc, unsigned hard_limit, std::size_t stack_size)
25     : my_threading_control(tc)
26     , my_num_workers_hard_limit(hard_limit)
27     , my_stack_size(stack_size)
28 {
29     my_server = governor::create_rml_server( *this );
30     __TBB_ASSERT( my_server, "Failed to create RML server" );
31 }
32 
~thread_dispatcher()33 thread_dispatcher::~thread_dispatcher() {
34     poison_pointer(my_server);
35 }
36 
select_next_client(thread_dispatcher_client * hint)37 thread_dispatcher_client* thread_dispatcher::select_next_client(thread_dispatcher_client* hint) {
38     unsigned next_client_priority_level = num_priority_levels;
39     if (hint) {
40         next_client_priority_level = hint->priority_level();
41     }
42 
43     for (unsigned idx = 0; idx < next_client_priority_level; ++idx) {
44         if (!my_client_list[idx].empty()) {
45             return &*my_client_list[idx].begin();
46         }
47     }
48 
49     return hint;
50 }
51 
create_client(arena & a)52 thread_dispatcher_client* thread_dispatcher::create_client(arena& a) {
53     return new (cache_aligned_allocate(sizeof(thread_dispatcher_client))) thread_dispatcher_client(a, my_clients_aba_epoch);
54 }
55 
56 
register_client(thread_dispatcher_client * client)57 void thread_dispatcher::register_client(thread_dispatcher_client* client) {
58     client_list_mutex_type::scoped_lock lock(my_list_mutex);
59     insert_client(*client);
60 }
61 
try_unregister_client(thread_dispatcher_client * client,std::uint64_t aba_epoch,unsigned priority)62 bool thread_dispatcher::try_unregister_client(thread_dispatcher_client* client, std::uint64_t aba_epoch, unsigned priority) {
63     __TBB_ASSERT(client, nullptr);
64     // we hold reference to the server, so market cannot be destroyed at any moment here
65     __TBB_ASSERT(!is_poisoned(my_server), nullptr);
66     my_list_mutex.lock();
67     for (auto& it : my_client_list[priority]) {
68         if (client == &it) {
69             if (it.get_aba_epoch() == aba_epoch) {
70                 // Client is alive
71                 // Acquire my_references to sync with threads that just left the arena
72                 // Pay attention that references should be read before workers_requested because
73                 // if references is no zero some other thread might call adjust_demand and lead to
74                 // a race over workers_requested
75                 if (!client->references() && !client->has_request()) {
76                     // Client is abandoned. Destroy it.
77                     remove_client(*client);
78                     ++my_clients_aba_epoch;
79 
80                     my_list_mutex.unlock();
81                     destroy_client(client);
82 
83                     return true;
84                 }
85             }
86             break;
87         }
88     }
89     my_list_mutex.unlock();
90     return false;
91 }
92 
destroy_client(thread_dispatcher_client * client)93 void thread_dispatcher::destroy_client(thread_dispatcher_client* client) {
94     client->~thread_dispatcher_client();
95     cache_aligned_deallocate(client);
96 }
97 
98 // Should be called under lock
insert_client(thread_dispatcher_client & client)99 void thread_dispatcher::insert_client(thread_dispatcher_client& client) {
100     __TBB_ASSERT(client.priority_level() < num_priority_levels, nullptr);
101     my_client_list[client.priority_level()].push_front(client);
102 
103     __TBB_ASSERT(!my_next_client || my_next_client->priority_level() < num_priority_levels, nullptr);
104     my_next_client = select_next_client(my_next_client);
105 }
106 
107 // Should be called under lock
remove_client(thread_dispatcher_client & client)108 void thread_dispatcher::remove_client(thread_dispatcher_client& client) {
109     __TBB_ASSERT(client.priority_level() < num_priority_levels, nullptr);
110     my_client_list[client.priority_level()].remove(client);
111 
112     if (my_next_client == &client) {
113         my_next_client = nullptr;
114     }
115     my_next_client = select_next_client(my_next_client);
116 }
117 
is_client_alive(thread_dispatcher_client * client)118 bool thread_dispatcher::is_client_alive(thread_dispatcher_client* client) {
119     if (!client) {
120         return false;
121     }
122 
123     // Still cannot access internals of the client since the object itself might be destroyed.
124     for (auto& priority_list : my_client_list) {
125         for (auto& c : priority_list) {
126             if (client == &c) {
127                 return true;
128             }
129         }
130     }
131     return false;
132 }
133 
client_in_need(client_list_type * clients,thread_dispatcher_client * hint)134 thread_dispatcher_client* thread_dispatcher::client_in_need(client_list_type* clients, thread_dispatcher_client* hint) {
135     // TODO: make sure client with higher priority returned only if there are available slots in it.
136     hint = select_next_client(hint);
137     if (!hint) {
138         return nullptr;
139     }
140 
141     client_list_type::iterator it = hint;
142     unsigned curr_priority_level = hint->priority_level();
143     __TBB_ASSERT(it != clients[curr_priority_level].end(), nullptr);
144     do {
145         thread_dispatcher_client& t = *it;
146         if (++it == clients[curr_priority_level].end()) {
147             do {
148                 ++curr_priority_level %= num_priority_levels;
149             } while (clients[curr_priority_level].empty());
150             it = clients[curr_priority_level].begin();
151         }
152         if (t.try_join()) {
153             return &t;
154         }
155     } while (it != hint);
156     return nullptr;
157 }
158 
client_in_need(thread_dispatcher_client * prev)159 thread_dispatcher_client* thread_dispatcher::client_in_need(thread_dispatcher_client* prev) {
160     client_list_mutex_type::scoped_lock lock(my_list_mutex, /*is_writer=*/false);
161     if (is_client_alive(prev)) {
162         return client_in_need(my_client_list, prev);
163     }
164     return client_in_need(my_client_list, my_next_client);
165 }
166 
adjust_job_count_estimate(int delta)167 void thread_dispatcher::adjust_job_count_estimate(int delta) {
168     my_server->adjust_job_count_estimate(delta);
169 }
170 
release(bool blocking_terminate)171 void thread_dispatcher::release(bool blocking_terminate) {
172     my_join_workers = blocking_terminate;
173     my_server->request_close_connection();
174 }
175 
process(job & j)176 void thread_dispatcher::process(job& j) {
177     thread_data& td = static_cast<thread_data&>(j);
178     // td.my_last_client can be dead. Don't access it until client_in_need is called
179     thread_dispatcher_client* client = td.my_last_client;
180     for (int i = 0; i < 2; ++i) {
181         while ((client = client_in_need(client)) ) {
182             td.my_last_client = client;
183             client->process(td);
184         }
185         // Workers leave thread_dispatcher because there is no client in need. It can happen earlier than
186         // adjust_job_count_estimate() decreases my_slack and RML can put this thread to sleep.
187         // It might result in a busy-loop checking for my_slack<0 and calling this method instantly.
188         // the yield refines this spinning.
189         if ( !i ) {
190             yield();
191         }
192     }
193 }
194 
195 
196 //! Used when RML asks for join mode during workers termination.
must_join_workers() const197 bool thread_dispatcher::must_join_workers() const { return my_join_workers; }
198 
199 //! Returns the requested stack size of worker threads.
worker_stack_size() const200 std::size_t thread_dispatcher::worker_stack_size() const { return my_stack_size; }
201 
acknowledge_close_connection()202 void thread_dispatcher::acknowledge_close_connection() {
203     my_threading_control.destroy();
204 }
205 
create_one_job()206 ::rml::job* thread_dispatcher::create_one_job() {
207     unsigned short index = ++my_first_unused_worker_idx;
208     __TBB_ASSERT(index > 0, nullptr);
209     ITT_THREAD_SET_NAME(_T("TBB Worker Thread"));
210     // index serves as a hint decreasing conflicts between workers when they migrate between arenas
211     thread_data* td = new (cache_aligned_allocate(sizeof(thread_data))) thread_data{ index, true };
212     __TBB_ASSERT(index <= my_num_workers_hard_limit, nullptr);
213     my_threading_control.register_thread(*td);
214     return td;
215 }
216 
cleanup(job & j)217 void thread_dispatcher::cleanup(job& j) {
218     my_threading_control.unregister_thread(static_cast<thread_data&>(j));
219     governor::auto_terminate(&j);
220 }
221 
222 } // namespace r1
223 } // namespace detail
224 } // namespace tbb
225