1 /* 2 Copyright (c) 2023 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #include "oneapi/tbb/detail/_intrusive_list_node.h" 18 #include "oneapi/tbb/detail/_template_helpers.h" 19 #include "oneapi/tbb/task_arena.h" 20 21 #include "pm_client.h" 22 #include "dynamic_link.h" 23 #include "misc.h" 24 #include "tcm.h" 25 #include "tcm_adaptor.h" 26 27 #include <iostream> 28 29 namespace tbb { 30 namespace detail { 31 namespace r1 { 32 33 namespace { 34 #if __TBB_WEAK_SYMBOLS_PRESENT 35 #pragma weak tcmConnect 36 #pragma weak tcmDisconnect 37 #pragma weak tcmRequestPermit 38 #pragma weak tcmGetPermitData 39 #pragma weak tcmReleasePermit 40 #pragma weak tcmIdlePermit 41 #pragma weak tcmDeactivatePermit 42 #pragma weak tcmActivatePermit 43 #pragma weak tcmRegisterThread 44 #pragma weak tcmUnregisterThread 45 #pragma weak tcmGetVersionInfo 46 #endif /* __TBB_WEAK_SYMBOLS_PRESENT */ 47 48 tcm_result_t(*tcm_connect)(tcm_callback_t callback, tcm_client_id_t* client_id){nullptr}; 49 tcm_result_t(*tcm_disconnect)(tcm_client_id_t client_id){ nullptr }; 50 tcm_result_t(*tcm_request_permit)(tcm_client_id_t client_id, tcm_permit_request_t request, 51 void* callback_arg, tcm_permit_handle_t* permit_handle, tcm_permit_t* permit){nullptr}; 52 tcm_result_t(*tcm_get_permit_data)(tcm_permit_handle_t permit_handle, tcm_permit_t* permit){nullptr}; 53 tcm_result_t(*tcm_release_permit)(tcm_permit_handle_t permit){nullptr}; 54 tcm_result_t(*tcm_idle_permit)(tcm_permit_handle_t permit_handle){nullptr}; 55 tcm_result_t(*tcm_deactivate_permit)(tcm_permit_handle_t permit_handle){nullptr}; 56 tcm_result_t(*tcm_activate_permit)(tcm_permit_handle_t permit_handle){nullptr}; 57 tcm_result_t(*tcm_register_thread)(tcm_permit_handle_t permit_handle){nullptr}; 58 tcm_result_t(*tcm_unregister_thread)(){nullptr}; 59 tcm_result_t (*tcm_get_version_info)(char* buffer, uint32_t buffer_size){nullptr}; 60 61 static const dynamic_link_descriptor tcm_link_table[] = { 62 DLD(tcmConnect, tcm_connect), 63 DLD(tcmDisconnect, tcm_disconnect), 64 DLD(tcmRequestPermit, tcm_request_permit), 65 DLD(tcmGetPermitData, tcm_get_permit_data), 66 DLD(tcmReleasePermit, tcm_release_permit), 67 DLD(tcmIdlePermit, tcm_idle_permit), 68 DLD(tcmDeactivatePermit, tcm_deactivate_permit), 69 DLD(tcmActivatePermit, tcm_activate_permit), 70 DLD(tcmRegisterThread, tcm_register_thread), 71 DLD(tcmUnregisterThread, tcm_unregister_thread), 72 DLD(tcmGetVersionInfo, tcm_get_version_info) 73 }; 74 75 #if TBB_USE_DEBUG 76 #define DEBUG_SUFFIX "_debug" 77 #else 78 #define DEBUG_SUFFIX 79 #endif /* TBB_USE_DEBUG */ 80 81 #if _WIN32 || _WIN64 82 #define LIBRARY_EXTENSION ".dll" 83 #define LIBRARY_PREFIX 84 #elif __unix__ 85 #define LIBRARY_EXTENSION ".so.1" 86 #define LIBRARY_PREFIX "lib" 87 #else 88 #define LIBRARY_EXTENSION 89 #define LIBRARY_PREFIX 90 #endif /* __unix__ */ 91 92 #define TCMLIB_NAME LIBRARY_PREFIX "tcm" DEBUG_SUFFIX LIBRARY_EXTENSION 93 94 static bool tcm_functions_loaded{ false }; 95 } 96 97 class tcm_client : public pm_client { 98 using tcm_client_mutex_type = d1::mutex; 99 public: 100 tcm_client(tcm_adaptor& adaptor, arena& a) : pm_client(a), my_tcm_adaptor(adaptor) {} 101 102 ~tcm_client() { 103 if (my_permit_handle) { 104 __TBB_ASSERT(tcm_release_permit, nullptr); 105 auto res = tcm_release_permit(my_permit_handle); 106 __TBB_ASSERT_EX(res == TCM_RESULT_SUCCESS, nullptr); 107 } 108 } 109 110 int update_concurrency(uint32_t concurrency) { 111 return my_arena.update_concurrency(concurrency); 112 } 113 114 unsigned priority_level() { 115 return my_arena.priority_level(); 116 } 117 118 tcm_permit_request_t& permit_request() { 119 return my_permit_request; 120 } 121 122 tcm_permit_handle_t& permit_handle() { 123 return my_permit_handle; 124 } 125 126 void actualize_permit() { 127 __TBB_ASSERT(tcm_get_permit_data, nullptr); 128 int delta{}; 129 { 130 tcm_client_mutex_type::scoped_lock lock(my_permit_mutex); 131 132 uint32_t new_concurrency{}; 133 tcm_permit_t new_permit{ &new_concurrency, nullptr, 1, TCM_PERMIT_STATE_VOID, {} }; 134 auto res = tcm_get_permit_data(my_permit_handle, &new_permit); 135 __TBB_ASSERT_EX(res == TCM_RESULT_SUCCESS, nullptr); 136 137 // The permit has changed during the reading, so the callback will be invoked soon one more time and 138 // we can just skip this renegotiation iteration. 139 if (!new_permit.flags.stale) { 140 __TBB_ASSERT( 141 new_permit.state != TCM_PERMIT_STATE_INACTIVE || new_concurrency == 0, 142 "TCM did not nullify resources while deactivating the permit" 143 ); 144 delta = update_concurrency(new_concurrency); 145 } 146 } 147 if (delta) { 148 my_tcm_adaptor.notify_thread_request(delta); 149 } 150 } 151 152 void request_permit(tcm_client_id_t client_id) { 153 __TBB_ASSERT(tcm_request_permit, nullptr); 154 155 my_permit_request.max_sw_threads = max_workers(); 156 my_permit_request.min_sw_threads = my_permit_request.max_sw_threads == 0 ? 0 : min_workers(); 157 158 if (my_permit_request.constraints_size > 0) { 159 my_permit_request.cpu_constraints->min_concurrency = my_permit_request.min_sw_threads; 160 my_permit_request.cpu_constraints->max_concurrency = my_permit_request.max_sw_threads; 161 } 162 163 __TBB_ASSERT(my_permit_request.max_sw_threads >= my_permit_request.min_sw_threads, nullptr); 164 165 tcm_result_t res = tcm_request_permit(client_id, my_permit_request, this, &my_permit_handle, nullptr); 166 __TBB_ASSERT_EX(res == TCM_RESULT_SUCCESS, nullptr); 167 } 168 169 void deactivate_permit() { 170 __TBB_ASSERT(tcm_deactivate_permit, nullptr); 171 tcm_result_t res = tcm_deactivate_permit(my_permit_handle); 172 __TBB_ASSERT_EX(res == TCM_RESULT_SUCCESS, nullptr); 173 } 174 175 void init(d1::constraints& constraints) { 176 __TBB_ASSERT(tcm_request_permit, nullptr); 177 __TBB_ASSERT(tcm_deactivate_permit, nullptr); 178 179 if (constraints.core_type != d1::task_arena::automatic || 180 constraints.numa_id != d1::task_arena::automatic || 181 constraints.max_threads_per_core != d1::task_arena::automatic) 182 { 183 my_permit_constraints.max_concurrency = constraints.max_concurrency; 184 my_permit_constraints.min_concurrency = 0; 185 my_permit_constraints.core_type_id = constraints.core_type; 186 my_permit_constraints.numa_id = constraints.numa_id; 187 my_permit_constraints.threads_per_core = constraints.max_threads_per_core; 188 189 my_permit_request.cpu_constraints = &my_permit_constraints; 190 my_permit_request.constraints_size = 1; 191 } 192 193 my_permit_request.min_sw_threads = 0; 194 my_permit_request.max_sw_threads = 0; 195 } 196 197 void register_thread() override { 198 __TBB_ASSERT(tcm_register_thread, nullptr); 199 auto return_code = tcm_register_thread(my_permit_handle); 200 __TBB_ASSERT_EX(return_code == TCM_RESULT_SUCCESS, nullptr); 201 } 202 203 void unregister_thread() override { 204 __TBB_ASSERT(tcm_unregister_thread, nullptr); 205 auto return_code = tcm_unregister_thread(); 206 __TBB_ASSERT_EX(return_code == TCM_RESULT_SUCCESS, nullptr); 207 } 208 209 private: 210 tcm_cpu_constraints_t my_permit_constraints = TCM_PERMIT_REQUEST_CONSTRAINTS_INITIALIZER; 211 tcm_permit_request_t my_permit_request = TCM_PERMIT_REQUEST_INITIALIZER; 212 tcm_permit_handle_t my_permit_handle{}; 213 tcm_client_mutex_type my_permit_mutex; 214 tcm_adaptor& my_tcm_adaptor; 215 }; 216 217 //------------------------------------------------------------------------ 218 // tcm_adaptor_impl 219 //------------------------------------------------------------------------ 220 221 struct tcm_adaptor_impl { 222 using demand_mutex_type = d1::mutex; 223 demand_mutex_type my_demand_mutex; 224 tcm_client_id_t client_id{}; 225 226 tcm_adaptor_impl(tcm_client_id_t id) : client_id(id) 227 {} 228 }; 229 230 //------------------------------------------------------------------------ 231 // tcm_adaptor 232 //------------------------------------------------------------------------ 233 234 tcm_result_t renegotiation_callback(tcm_permit_handle_t, void* client_ptr, tcm_callback_flags_t) { 235 __TBB_ASSERT(client_ptr, nullptr); 236 static_cast<tcm_client*>(client_ptr)->actualize_permit(); 237 return TCM_RESULT_SUCCESS; 238 } 239 240 void tcm_adaptor::initialize() { 241 tcm_functions_loaded = dynamic_link(TCMLIB_NAME, tcm_link_table, /* tcm_link_table size = */ 11); 242 } 243 244 bool tcm_adaptor::is_initialized() { 245 return tcm_functions_loaded; 246 } 247 248 void tcm_adaptor::print_version() { 249 if (is_initialized()) { 250 __TBB_ASSERT(tcm_get_version_info, nullptr); 251 char buffer[1024]; 252 tcm_get_version_info(buffer, 1024); 253 std::fprintf(stderr, "%.*s", 1024, buffer); 254 } 255 } 256 257 tcm_adaptor::tcm_adaptor() { 258 __TBB_ASSERT(tcm_connect, nullptr); 259 tcm_client_id_t client_id{}; 260 auto return_code = tcm_connect(renegotiation_callback, &client_id); 261 if (return_code == TCM_RESULT_SUCCESS) { 262 my_impl = make_cache_aligned_unique<tcm_adaptor_impl>(client_id); 263 } 264 } 265 266 tcm_adaptor::~tcm_adaptor() { 267 if (my_impl) { 268 __TBB_ASSERT(tcm_disconnect, nullptr); 269 auto return_code = tcm_disconnect(my_impl->client_id); 270 __TBB_ASSERT_EX(return_code == TCM_RESULT_SUCCESS, nullptr); 271 my_impl = nullptr; 272 } 273 } 274 275 bool tcm_adaptor::is_connected() { 276 return my_impl != nullptr; 277 } 278 279 pm_client* tcm_adaptor::create_client(arena& a) { 280 return new (cache_aligned_allocate(sizeof(tcm_client))) tcm_client(*this, a); 281 } 282 283 void tcm_adaptor::register_client(pm_client* c, d1::constraints& constraints) { 284 static_cast<tcm_client*>(c)->init(constraints); 285 } 286 287 void tcm_adaptor::unregister_and_destroy_client(pm_client& c) { 288 auto& client = static_cast<tcm_client&>(c); 289 290 { 291 tcm_adaptor_impl::demand_mutex_type::scoped_lock lock(my_impl->my_demand_mutex); 292 client.~tcm_client(); 293 } 294 cache_aligned_deallocate(&client); 295 } 296 297 void tcm_adaptor::set_active_num_workers(int) {} 298 299 300 void tcm_adaptor::adjust_demand(pm_client& c, int mandatory_delta, int workers_delta) { 301 __TBB_ASSERT(-1 <= mandatory_delta && mandatory_delta <= 1, nullptr); 302 303 auto& client = static_cast<tcm_client&>(c); 304 { 305 tcm_adaptor_impl::demand_mutex_type::scoped_lock lock(my_impl->my_demand_mutex); 306 307 // Update client's state 308 workers_delta = client.update_request(mandatory_delta, workers_delta); 309 if (workers_delta == 0) return; 310 311 if (client.max_workers() == 0) { 312 client.deactivate_permit(); 313 } else { 314 client.request_permit(my_impl->client_id); 315 } 316 } 317 318 client.actualize_permit(); 319 } 320 321 } // namespace r1 322 } // namespace detail 323 } // namespace tbb 324