xref: /oneTBB/src/tbb/tcm_adaptor.cpp (revision 627cac6d)
1 /*
2     Copyright (c) 2023-2024 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #include "oneapi/tbb/detail/_intrusive_list_node.h"
18 #include "oneapi/tbb/detail/_template_helpers.h"
19 #include "oneapi/tbb/task_arena.h"
20 
21 #include "pm_client.h"
22 #include "dynamic_link.h"
23 #include "misc.h"
24 #include "tcm.h"
25 #include "tcm_adaptor.h"
26 
27 #include <iostream>
28 
29 namespace tbb {
30 namespace detail {
31 namespace r1 {
32 
33 namespace {
34 #if __TBB_WEAK_SYMBOLS_PRESENT
35 #pragma weak tcmConnect
36 #pragma weak tcmDisconnect
37 #pragma weak tcmRequestPermit
38 #pragma weak tcmGetPermitData
39 #pragma weak tcmReleasePermit
40 #pragma weak tcmIdlePermit
41 #pragma weak tcmDeactivatePermit
42 #pragma weak tcmActivatePermit
43 #pragma weak tcmRegisterThread
44 #pragma weak tcmUnregisterThread
45 #pragma weak tcmGetVersionInfo
46 #endif /* __TBB_WEAK_SYMBOLS_PRESENT */
47 
48 tcm_result_t(*tcm_connect)(tcm_callback_t callback, tcm_client_id_t* client_id){nullptr};
49 tcm_result_t(*tcm_disconnect)(tcm_client_id_t client_id){ nullptr };
50 tcm_result_t(*tcm_request_permit)(tcm_client_id_t client_id, tcm_permit_request_t request,
51     void* callback_arg, tcm_permit_handle_t* permit_handle, tcm_permit_t* permit){nullptr};
52 tcm_result_t(*tcm_get_permit_data)(tcm_permit_handle_t permit_handle, tcm_permit_t* permit){nullptr};
53 tcm_result_t(*tcm_release_permit)(tcm_permit_handle_t permit){nullptr};
54 tcm_result_t(*tcm_idle_permit)(tcm_permit_handle_t permit_handle){nullptr};
55 tcm_result_t(*tcm_deactivate_permit)(tcm_permit_handle_t permit_handle){nullptr};
56 tcm_result_t(*tcm_activate_permit)(tcm_permit_handle_t permit_handle){nullptr};
57 tcm_result_t(*tcm_register_thread)(tcm_permit_handle_t permit_handle){nullptr};
58 tcm_result_t(*tcm_unregister_thread)(){nullptr};
59 tcm_result_t (*tcm_get_version_info)(char* buffer, uint32_t buffer_size){nullptr};
60 
61 static const dynamic_link_descriptor tcm_link_table[] = {
62     DLD(tcmConnect, tcm_connect),
63     DLD(tcmDisconnect, tcm_disconnect),
64     DLD(tcmRequestPermit, tcm_request_permit),
65     DLD(tcmGetPermitData, tcm_get_permit_data),
66     DLD(tcmReleasePermit, tcm_release_permit),
67     DLD(tcmIdlePermit, tcm_idle_permit),
68     DLD(tcmDeactivatePermit, tcm_deactivate_permit),
69     DLD(tcmActivatePermit, tcm_activate_permit),
70     DLD(tcmRegisterThread, tcm_register_thread),
71     DLD(tcmUnregisterThread, tcm_unregister_thread),
72     DLD(tcmGetVersionInfo, tcm_get_version_info)
73 };
74 
75 #if TBB_USE_DEBUG
76 #define DEBUG_SUFFIX "_debug"
77 #else
78 #define DEBUG_SUFFIX
79 #endif /* TBB_USE_DEBUG */
80 
81 #if _WIN32 || _WIN64
82 #define LIBRARY_EXTENSION ".dll"
83 #define LIBRARY_PREFIX
84 #elif __unix__
85 #define LIBRARY_EXTENSION ".so.1"
86 #define LIBRARY_PREFIX "lib"
87 #else
88 #define LIBRARY_EXTENSION
89 #define LIBRARY_PREFIX
90 #endif /* __unix__ */
91 
92 #define TCMLIB_NAME LIBRARY_PREFIX "tcm" DEBUG_SUFFIX LIBRARY_EXTENSION
93 
94 static bool tcm_functions_loaded{ false };
95 }
96 
97 class tcm_client : public pm_client {
98     using tcm_client_mutex_type = d1::mutex;
99 public:
tcm_client(tcm_adaptor & adaptor,arena & a)100     tcm_client(tcm_adaptor& adaptor, arena& a) : pm_client(a), my_tcm_adaptor(adaptor) {}
101 
~tcm_client()102     ~tcm_client() {
103         if (my_permit_handle) {
104             __TBB_ASSERT(tcm_release_permit, nullptr);
105             auto res = tcm_release_permit(my_permit_handle);
106             __TBB_ASSERT_EX(res == TCM_RESULT_SUCCESS, nullptr);
107         }
108     }
109 
update_concurrency(uint32_t concurrency)110     int update_concurrency(uint32_t concurrency) {
111         return my_arena.update_concurrency(concurrency);
112     }
113 
priority_level()114     unsigned priority_level() {
115         return my_arena.priority_level();
116     }
117 
permit_request()118     tcm_permit_request_t& permit_request() {
119         return my_permit_request;
120     }
121 
permit_handle()122     tcm_permit_handle_t& permit_handle() {
123         return my_permit_handle;
124     }
125 
actualize_permit()126     void actualize_permit() {
127         __TBB_ASSERT(tcm_get_permit_data, nullptr);
128         int delta{};
129         {
130             tcm_client_mutex_type::scoped_lock lock(my_permit_mutex);
131 
132             uint32_t new_concurrency{};
133             tcm_permit_t new_permit{ &new_concurrency, nullptr, 1, TCM_PERMIT_STATE_VOID, {} };
134             auto res = tcm_get_permit_data(my_permit_handle, &new_permit);
135             __TBB_ASSERT_EX(res == TCM_RESULT_SUCCESS, nullptr);
136 
137             // The permit has changed during the reading, so the callback will be invoked soon one more time and
138             // we can just skip this renegotiation iteration.
139             if (!new_permit.flags.stale) {
140                 // If there is no other demand in TCM, the permit may still have granted concurrency but
141                 // be in the deactivated state thus we enforce 0 allotment to preserve arena invariants.
142                 delta = update_concurrency(new_permit.state != TCM_PERMIT_STATE_INACTIVE ? new_concurrency : 0);
143             }
144         }
145         if (delta) {
146             my_tcm_adaptor.notify_thread_request(delta);
147         }
148     }
149 
request_permit(tcm_client_id_t client_id)150     void request_permit(tcm_client_id_t client_id) {
151         __TBB_ASSERT(tcm_request_permit, nullptr);
152 
153         my_permit_request.max_sw_threads = max_workers();
154         my_permit_request.min_sw_threads = my_permit_request.max_sw_threads == 0 ? 0 : min_workers();
155 
156         if (my_permit_request.constraints_size > 0) {
157             my_permit_request.cpu_constraints->min_concurrency = my_permit_request.min_sw_threads;
158             my_permit_request.cpu_constraints->max_concurrency = my_permit_request.max_sw_threads;
159         }
160 
161         __TBB_ASSERT(my_permit_request.max_sw_threads >= my_permit_request.min_sw_threads, nullptr);
162 
163         tcm_result_t res = tcm_request_permit(client_id, my_permit_request, this, &my_permit_handle, nullptr);
164         __TBB_ASSERT_EX(res == TCM_RESULT_SUCCESS, nullptr);
165     }
166 
deactivate_permit()167     void deactivate_permit() {
168          __TBB_ASSERT(tcm_deactivate_permit, nullptr);
169         tcm_result_t res = tcm_deactivate_permit(my_permit_handle);
170         __TBB_ASSERT_EX(res == TCM_RESULT_SUCCESS, nullptr);
171     }
172 
init(d1::constraints & constraints)173     void init(d1::constraints& constraints) {
174         __TBB_ASSERT(tcm_request_permit, nullptr);
175         __TBB_ASSERT(tcm_deactivate_permit, nullptr);
176 
177         if (constraints.core_type            != d1::task_arena::automatic ||
178             constraints.numa_id              != d1::task_arena::automatic ||
179             constraints.max_threads_per_core != d1::task_arena::automatic)
180         {
181             my_permit_constraints.max_concurrency = constraints.max_concurrency;
182             my_permit_constraints.min_concurrency = 0;
183             my_permit_constraints.core_type_id = constraints.core_type;
184             my_permit_constraints.numa_id = constraints.numa_id;
185             my_permit_constraints.threads_per_core = constraints.max_threads_per_core;
186 
187             my_permit_request.cpu_constraints = &my_permit_constraints;
188             my_permit_request.constraints_size = 1;
189         }
190 
191         my_permit_request.min_sw_threads = 0;
192         my_permit_request.max_sw_threads = 0;
193     }
194 
register_thread()195     void register_thread() override {
196         __TBB_ASSERT(tcm_register_thread, nullptr);
197         auto return_code = tcm_register_thread(my_permit_handle);
198         __TBB_ASSERT_EX(return_code == TCM_RESULT_SUCCESS, nullptr);
199     }
200 
unregister_thread()201     void unregister_thread() override {
202         __TBB_ASSERT(tcm_unregister_thread, nullptr);
203         auto return_code = tcm_unregister_thread();
204         __TBB_ASSERT_EX(return_code == TCM_RESULT_SUCCESS, nullptr);
205     }
206 
207 private:
208     tcm_cpu_constraints_t my_permit_constraints = TCM_PERMIT_REQUEST_CONSTRAINTS_INITIALIZER;
209     tcm_permit_request_t my_permit_request = TCM_PERMIT_REQUEST_INITIALIZER;
210     tcm_permit_handle_t my_permit_handle{};
211     tcm_client_mutex_type my_permit_mutex;
212     tcm_adaptor& my_tcm_adaptor;
213 };
214 
215 //------------------------------------------------------------------------
216 // tcm_adaptor_impl
217 //------------------------------------------------------------------------
218 
219 struct tcm_adaptor_impl {
220     using demand_mutex_type = d1::mutex;
221     demand_mutex_type my_demand_mutex;
222     tcm_client_id_t client_id{};
223 
tcm_adaptor_impltbb::detail::r1::tcm_adaptor_impl224     tcm_adaptor_impl(tcm_client_id_t id) : client_id(id)
225     {}
226 };
227 
228 //------------------------------------------------------------------------
229 // tcm_adaptor
230 //------------------------------------------------------------------------
231 
renegotiation_callback(tcm_permit_handle_t,void * client_ptr,tcm_callback_flags_t)232 tcm_result_t renegotiation_callback(tcm_permit_handle_t, void* client_ptr, tcm_callback_flags_t) {
233     __TBB_ASSERT(client_ptr, nullptr);
234     static_cast<tcm_client*>(client_ptr)->actualize_permit();
235     return TCM_RESULT_SUCCESS;
236 }
237 
initialize()238 void tcm_adaptor::initialize() {
239     tcm_functions_loaded = dynamic_link(TCMLIB_NAME, tcm_link_table, /* tcm_link_table size = */ 11);
240 }
241 
is_initialized()242 bool tcm_adaptor::is_initialized() {
243     return tcm_functions_loaded;
244 }
245 
print_version()246 void tcm_adaptor::print_version() {
247     if (is_initialized()) {
248         __TBB_ASSERT(tcm_get_version_info, nullptr);
249         char buffer[1024];
250         tcm_get_version_info(buffer, 1024);
251         std::fprintf(stderr, "%.*s", 1024, buffer);
252     }
253 }
254 
tcm_adaptor()255 tcm_adaptor::tcm_adaptor() {
256     __TBB_ASSERT(tcm_connect, nullptr);
257     tcm_client_id_t client_id{};
258     auto return_code = tcm_connect(renegotiation_callback, &client_id);
259     if (return_code == TCM_RESULT_SUCCESS) {
260         my_impl = make_cache_aligned_unique<tcm_adaptor_impl>(client_id);
261     }
262 }
263 
~tcm_adaptor()264 tcm_adaptor::~tcm_adaptor() {
265     if (my_impl) {
266         __TBB_ASSERT(tcm_disconnect, nullptr);
267         auto return_code = tcm_disconnect(my_impl->client_id);
268         __TBB_ASSERT_EX(return_code == TCM_RESULT_SUCCESS, nullptr);
269         my_impl = nullptr;
270     }
271 }
272 
is_connected()273 bool tcm_adaptor::is_connected() {
274     return my_impl != nullptr;
275 }
276 
create_client(arena & a)277 pm_client* tcm_adaptor::create_client(arena& a) {
278     return new (cache_aligned_allocate(sizeof(tcm_client))) tcm_client(*this, a);
279 }
280 
register_client(pm_client * c,d1::constraints & constraints)281 void tcm_adaptor::register_client(pm_client* c, d1::constraints& constraints) {
282     static_cast<tcm_client*>(c)->init(constraints);
283 }
284 
unregister_and_destroy_client(pm_client & c)285 void tcm_adaptor::unregister_and_destroy_client(pm_client& c) {
286     auto& client = static_cast<tcm_client&>(c);
287 
288     {
289         tcm_adaptor_impl::demand_mutex_type::scoped_lock lock(my_impl->my_demand_mutex);
290         client.~tcm_client();
291     }
292     cache_aligned_deallocate(&client);
293 }
294 
set_active_num_workers(int)295 void tcm_adaptor::set_active_num_workers(int) {}
296 
297 
adjust_demand(pm_client & c,int mandatory_delta,int workers_delta)298 void tcm_adaptor::adjust_demand(pm_client& c, int mandatory_delta, int workers_delta) {
299     __TBB_ASSERT(-1 <= mandatory_delta && mandatory_delta <= 1, nullptr);
300 
301     auto& client = static_cast<tcm_client&>(c);
302     {
303         tcm_adaptor_impl::demand_mutex_type::scoped_lock lock(my_impl->my_demand_mutex);
304 
305         // Update client's state
306         workers_delta = client.update_request(mandatory_delta, workers_delta);
307         if (workers_delta == 0) return;
308 
309         if (client.max_workers() == 0) {
310             client.deactivate_permit();
311         } else {
312             client.request_permit(my_impl->client_id);
313         }
314     }
315 
316     client.actualize_permit();
317 }
318 
319 } // namespace r1
320 } // namespace detail
321 } // namespace tbb
322