1 /*
2 Copyright (c) 2023-2024 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16
17 #include "oneapi/tbb/detail/_intrusive_list_node.h"
18 #include "oneapi/tbb/detail/_template_helpers.h"
19 #include "oneapi/tbb/task_arena.h"
20
21 #include "pm_client.h"
22 #include "dynamic_link.h"
23 #include "misc.h"
24 #include "tcm.h"
25 #include "tcm_adaptor.h"
26
27 #include <iostream>
28
29 namespace tbb {
30 namespace detail {
31 namespace r1 {
32
33 namespace {
34 #if __TBB_WEAK_SYMBOLS_PRESENT
35 #pragma weak tcmConnect
36 #pragma weak tcmDisconnect
37 #pragma weak tcmRequestPermit
38 #pragma weak tcmGetPermitData
39 #pragma weak tcmReleasePermit
40 #pragma weak tcmIdlePermit
41 #pragma weak tcmDeactivatePermit
42 #pragma weak tcmActivatePermit
43 #pragma weak tcmRegisterThread
44 #pragma weak tcmUnregisterThread
45 #pragma weak tcmGetVersionInfo
46 #endif /* __TBB_WEAK_SYMBOLS_PRESENT */
47
48 tcm_result_t(*tcm_connect)(tcm_callback_t callback, tcm_client_id_t* client_id){nullptr};
49 tcm_result_t(*tcm_disconnect)(tcm_client_id_t client_id){ nullptr };
50 tcm_result_t(*tcm_request_permit)(tcm_client_id_t client_id, tcm_permit_request_t request,
51 void* callback_arg, tcm_permit_handle_t* permit_handle, tcm_permit_t* permit){nullptr};
52 tcm_result_t(*tcm_get_permit_data)(tcm_permit_handle_t permit_handle, tcm_permit_t* permit){nullptr};
53 tcm_result_t(*tcm_release_permit)(tcm_permit_handle_t permit){nullptr};
54 tcm_result_t(*tcm_idle_permit)(tcm_permit_handle_t permit_handle){nullptr};
55 tcm_result_t(*tcm_deactivate_permit)(tcm_permit_handle_t permit_handle){nullptr};
56 tcm_result_t(*tcm_activate_permit)(tcm_permit_handle_t permit_handle){nullptr};
57 tcm_result_t(*tcm_register_thread)(tcm_permit_handle_t permit_handle){nullptr};
58 tcm_result_t(*tcm_unregister_thread)(){nullptr};
59 tcm_result_t (*tcm_get_version_info)(char* buffer, uint32_t buffer_size){nullptr};
60
61 static const dynamic_link_descriptor tcm_link_table[] = {
62 DLD(tcmConnect, tcm_connect),
63 DLD(tcmDisconnect, tcm_disconnect),
64 DLD(tcmRequestPermit, tcm_request_permit),
65 DLD(tcmGetPermitData, tcm_get_permit_data),
66 DLD(tcmReleasePermit, tcm_release_permit),
67 DLD(tcmIdlePermit, tcm_idle_permit),
68 DLD(tcmDeactivatePermit, tcm_deactivate_permit),
69 DLD(tcmActivatePermit, tcm_activate_permit),
70 DLD(tcmRegisterThread, tcm_register_thread),
71 DLD(tcmUnregisterThread, tcm_unregister_thread),
72 DLD(tcmGetVersionInfo, tcm_get_version_info)
73 };
74
75 #if TBB_USE_DEBUG
76 #define DEBUG_SUFFIX "_debug"
77 #else
78 #define DEBUG_SUFFIX
79 #endif /* TBB_USE_DEBUG */
80
81 #if _WIN32 || _WIN64
82 #define LIBRARY_EXTENSION ".dll"
83 #define LIBRARY_PREFIX
84 #elif __unix__
85 #define LIBRARY_EXTENSION ".so.1"
86 #define LIBRARY_PREFIX "lib"
87 #else
88 #define LIBRARY_EXTENSION
89 #define LIBRARY_PREFIX
90 #endif /* __unix__ */
91
92 #define TCMLIB_NAME LIBRARY_PREFIX "tcm" DEBUG_SUFFIX LIBRARY_EXTENSION
93
94 static bool tcm_functions_loaded{ false };
95 }
96
97 class tcm_client : public pm_client {
98 using tcm_client_mutex_type = d1::mutex;
99 public:
tcm_client(tcm_adaptor & adaptor,arena & a)100 tcm_client(tcm_adaptor& adaptor, arena& a) : pm_client(a), my_tcm_adaptor(adaptor) {}
101
~tcm_client()102 ~tcm_client() {
103 if (my_permit_handle) {
104 __TBB_ASSERT(tcm_release_permit, nullptr);
105 auto res = tcm_release_permit(my_permit_handle);
106 __TBB_ASSERT_EX(res == TCM_RESULT_SUCCESS, nullptr);
107 }
108 }
109
update_concurrency(uint32_t concurrency)110 int update_concurrency(uint32_t concurrency) {
111 return my_arena.update_concurrency(concurrency);
112 }
113
priority_level()114 unsigned priority_level() {
115 return my_arena.priority_level();
116 }
117
permit_request()118 tcm_permit_request_t& permit_request() {
119 return my_permit_request;
120 }
121
permit_handle()122 tcm_permit_handle_t& permit_handle() {
123 return my_permit_handle;
124 }
125
actualize_permit()126 void actualize_permit() {
127 __TBB_ASSERT(tcm_get_permit_data, nullptr);
128 int delta{};
129 {
130 tcm_client_mutex_type::scoped_lock lock(my_permit_mutex);
131
132 uint32_t new_concurrency{};
133 tcm_permit_t new_permit{ &new_concurrency, nullptr, 1, TCM_PERMIT_STATE_VOID, {} };
134 auto res = tcm_get_permit_data(my_permit_handle, &new_permit);
135 __TBB_ASSERT_EX(res == TCM_RESULT_SUCCESS, nullptr);
136
137 // The permit has changed during the reading, so the callback will be invoked soon one more time and
138 // we can just skip this renegotiation iteration.
139 if (!new_permit.flags.stale) {
140 // If there is no other demand in TCM, the permit may still have granted concurrency but
141 // be in the deactivated state thus we enforce 0 allotment to preserve arena invariants.
142 delta = update_concurrency(new_permit.state != TCM_PERMIT_STATE_INACTIVE ? new_concurrency : 0);
143 }
144 }
145 if (delta) {
146 my_tcm_adaptor.notify_thread_request(delta);
147 }
148 }
149
request_permit(tcm_client_id_t client_id)150 void request_permit(tcm_client_id_t client_id) {
151 __TBB_ASSERT(tcm_request_permit, nullptr);
152
153 my_permit_request.max_sw_threads = max_workers();
154 my_permit_request.min_sw_threads = my_permit_request.max_sw_threads == 0 ? 0 : min_workers();
155
156 if (my_permit_request.constraints_size > 0) {
157 my_permit_request.cpu_constraints->min_concurrency = my_permit_request.min_sw_threads;
158 my_permit_request.cpu_constraints->max_concurrency = my_permit_request.max_sw_threads;
159 }
160
161 __TBB_ASSERT(my_permit_request.max_sw_threads >= my_permit_request.min_sw_threads, nullptr);
162
163 tcm_result_t res = tcm_request_permit(client_id, my_permit_request, this, &my_permit_handle, nullptr);
164 __TBB_ASSERT_EX(res == TCM_RESULT_SUCCESS, nullptr);
165 }
166
deactivate_permit()167 void deactivate_permit() {
168 __TBB_ASSERT(tcm_deactivate_permit, nullptr);
169 tcm_result_t res = tcm_deactivate_permit(my_permit_handle);
170 __TBB_ASSERT_EX(res == TCM_RESULT_SUCCESS, nullptr);
171 }
172
init(d1::constraints & constraints)173 void init(d1::constraints& constraints) {
174 __TBB_ASSERT(tcm_request_permit, nullptr);
175 __TBB_ASSERT(tcm_deactivate_permit, nullptr);
176
177 if (constraints.core_type != d1::task_arena::automatic ||
178 constraints.numa_id != d1::task_arena::automatic ||
179 constraints.max_threads_per_core != d1::task_arena::automatic)
180 {
181 my_permit_constraints.max_concurrency = constraints.max_concurrency;
182 my_permit_constraints.min_concurrency = 0;
183 my_permit_constraints.core_type_id = constraints.core_type;
184 my_permit_constraints.numa_id = constraints.numa_id;
185 my_permit_constraints.threads_per_core = constraints.max_threads_per_core;
186
187 my_permit_request.cpu_constraints = &my_permit_constraints;
188 my_permit_request.constraints_size = 1;
189 }
190
191 my_permit_request.min_sw_threads = 0;
192 my_permit_request.max_sw_threads = 0;
193 }
194
register_thread()195 void register_thread() override {
196 __TBB_ASSERT(tcm_register_thread, nullptr);
197 auto return_code = tcm_register_thread(my_permit_handle);
198 __TBB_ASSERT_EX(return_code == TCM_RESULT_SUCCESS, nullptr);
199 }
200
unregister_thread()201 void unregister_thread() override {
202 __TBB_ASSERT(tcm_unregister_thread, nullptr);
203 auto return_code = tcm_unregister_thread();
204 __TBB_ASSERT_EX(return_code == TCM_RESULT_SUCCESS, nullptr);
205 }
206
207 private:
208 tcm_cpu_constraints_t my_permit_constraints = TCM_PERMIT_REQUEST_CONSTRAINTS_INITIALIZER;
209 tcm_permit_request_t my_permit_request = TCM_PERMIT_REQUEST_INITIALIZER;
210 tcm_permit_handle_t my_permit_handle{};
211 tcm_client_mutex_type my_permit_mutex;
212 tcm_adaptor& my_tcm_adaptor;
213 };
214
215 //------------------------------------------------------------------------
216 // tcm_adaptor_impl
217 //------------------------------------------------------------------------
218
219 struct tcm_adaptor_impl {
220 using demand_mutex_type = d1::mutex;
221 demand_mutex_type my_demand_mutex;
222 tcm_client_id_t client_id{};
223
tcm_adaptor_impltbb::detail::r1::tcm_adaptor_impl224 tcm_adaptor_impl(tcm_client_id_t id) : client_id(id)
225 {}
226 };
227
228 //------------------------------------------------------------------------
229 // tcm_adaptor
230 //------------------------------------------------------------------------
231
renegotiation_callback(tcm_permit_handle_t,void * client_ptr,tcm_callback_flags_t)232 tcm_result_t renegotiation_callback(tcm_permit_handle_t, void* client_ptr, tcm_callback_flags_t) {
233 __TBB_ASSERT(client_ptr, nullptr);
234 static_cast<tcm_client*>(client_ptr)->actualize_permit();
235 return TCM_RESULT_SUCCESS;
236 }
237
initialize()238 void tcm_adaptor::initialize() {
239 tcm_functions_loaded = dynamic_link(TCMLIB_NAME, tcm_link_table, /* tcm_link_table size = */ 11);
240 }
241
is_initialized()242 bool tcm_adaptor::is_initialized() {
243 return tcm_functions_loaded;
244 }
245
print_version()246 void tcm_adaptor::print_version() {
247 if (is_initialized()) {
248 __TBB_ASSERT(tcm_get_version_info, nullptr);
249 char buffer[1024];
250 tcm_get_version_info(buffer, 1024);
251 std::fprintf(stderr, "%.*s", 1024, buffer);
252 }
253 }
254
tcm_adaptor()255 tcm_adaptor::tcm_adaptor() {
256 __TBB_ASSERT(tcm_connect, nullptr);
257 tcm_client_id_t client_id{};
258 auto return_code = tcm_connect(renegotiation_callback, &client_id);
259 if (return_code == TCM_RESULT_SUCCESS) {
260 my_impl = make_cache_aligned_unique<tcm_adaptor_impl>(client_id);
261 }
262 }
263
~tcm_adaptor()264 tcm_adaptor::~tcm_adaptor() {
265 if (my_impl) {
266 __TBB_ASSERT(tcm_disconnect, nullptr);
267 auto return_code = tcm_disconnect(my_impl->client_id);
268 __TBB_ASSERT_EX(return_code == TCM_RESULT_SUCCESS, nullptr);
269 my_impl = nullptr;
270 }
271 }
272
is_connected()273 bool tcm_adaptor::is_connected() {
274 return my_impl != nullptr;
275 }
276
create_client(arena & a)277 pm_client* tcm_adaptor::create_client(arena& a) {
278 return new (cache_aligned_allocate(sizeof(tcm_client))) tcm_client(*this, a);
279 }
280
register_client(pm_client * c,d1::constraints & constraints)281 void tcm_adaptor::register_client(pm_client* c, d1::constraints& constraints) {
282 static_cast<tcm_client*>(c)->init(constraints);
283 }
284
unregister_and_destroy_client(pm_client & c)285 void tcm_adaptor::unregister_and_destroy_client(pm_client& c) {
286 auto& client = static_cast<tcm_client&>(c);
287
288 {
289 tcm_adaptor_impl::demand_mutex_type::scoped_lock lock(my_impl->my_demand_mutex);
290 client.~tcm_client();
291 }
292 cache_aligned_deallocate(&client);
293 }
294
set_active_num_workers(int)295 void tcm_adaptor::set_active_num_workers(int) {}
296
297
adjust_demand(pm_client & c,int mandatory_delta,int workers_delta)298 void tcm_adaptor::adjust_demand(pm_client& c, int mandatory_delta, int workers_delta) {
299 __TBB_ASSERT(-1 <= mandatory_delta && mandatory_delta <= 1, nullptr);
300
301 auto& client = static_cast<tcm_client&>(c);
302 {
303 tcm_adaptor_impl::demand_mutex_type::scoped_lock lock(my_impl->my_demand_mutex);
304
305 // Update client's state
306 workers_delta = client.update_request(mandatory_delta, workers_delta);
307 if (workers_delta == 0) return;
308
309 if (client.max_workers() == 0) {
310 client.deactivate_permit();
311 } else {
312 client.request_permit(my_impl->client_id);
313 }
314 }
315
316 client.actualize_permit();
317 }
318
319 } // namespace r1
320 } // namespace detail
321 } // namespace tbb
322