1 /* 2 Copyright (c) 2005-2022 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #include "oneapi/tbb/global_control.h" // global_control::active_value 18 19 #include "market.h" 20 #include "main.h" 21 #include "governor.h" 22 #include "arena.h" 23 #include "thread_data.h" 24 #include "itt_notify.h" 25 26 #include <cstring> // std::memset() 27 28 namespace tbb { 29 namespace detail { 30 namespace r1 { 31 32 /** This method must be invoked under my_arenas_list_mutex. **/ 33 arena* market::select_next_arena( arena* hint ) { 34 unsigned next_arena_priority_level = num_priority_levels; 35 if ( hint ) 36 next_arena_priority_level = hint->my_priority_level; 37 for ( unsigned idx = 0; idx < next_arena_priority_level; ++idx ) { 38 if ( !my_arenas[idx].empty() ) 39 return &*my_arenas[idx].begin(); 40 } 41 // don't change if arena with higher priority is not found. 42 return hint; 43 } 44 45 void market::insert_arena_into_list ( arena& a ) { 46 __TBB_ASSERT( a.my_priority_level < num_priority_levels, nullptr ); 47 my_arenas[a.my_priority_level].push_front( a ); 48 __TBB_ASSERT( !my_next_arena || my_next_arena->my_priority_level < num_priority_levels, nullptr ); 49 my_next_arena = select_next_arena( my_next_arena ); 50 } 51 52 void market::remove_arena_from_list ( arena& a ) { 53 __TBB_ASSERT( a.my_priority_level < num_priority_levels, nullptr ); 54 my_arenas[a.my_priority_level].remove( a ); 55 if ( my_next_arena == &a ) 56 my_next_arena = nullptr; 57 my_next_arena = select_next_arena( my_next_arena ); 58 } 59 60 //------------------------------------------------------------------------ 61 // market 62 //------------------------------------------------------------------------ 63 64 market::market ( unsigned workers_soft_limit, unsigned workers_hard_limit, std::size_t stack_size ) 65 : my_num_workers_hard_limit(workers_hard_limit) 66 , my_num_workers_soft_limit(workers_soft_limit) 67 , my_next_arena(nullptr) 68 , my_ref_count(1) 69 , my_stack_size(stack_size) 70 , my_workers_soft_limit_to_report(workers_soft_limit) 71 { 72 // Once created RML server will start initializing workers that will need 73 // global market instance to get worker stack size 74 my_server = governor::create_rml_server( *this ); 75 __TBB_ASSERT( my_server, "Failed to create RML server" ); 76 } 77 78 static unsigned calc_workers_soft_limit(unsigned workers_soft_limit, unsigned workers_hard_limit) { 79 if( int soft_limit = market::app_parallelism_limit() ) 80 workers_soft_limit = soft_limit-1; 81 else // if user set no limits (yet), use market's parameter 82 workers_soft_limit = max( governor::default_num_threads() - 1, workers_soft_limit ); 83 if( workers_soft_limit >= workers_hard_limit ) 84 workers_soft_limit = workers_hard_limit-1; 85 return workers_soft_limit; 86 } 87 88 bool market::add_ref_unsafe( global_market_mutex_type::scoped_lock& lock, bool is_public, unsigned workers_requested, std::size_t stack_size ) { 89 market *m = theMarket; 90 if( m ) { 91 ++m->my_ref_count; 92 const unsigned old_public_count = is_public ? m->my_public_ref_count++ : /*any non-zero value*/1; 93 lock.release(); 94 if( old_public_count==0 ) 95 set_active_num_workers( calc_workers_soft_limit(workers_requested, m->my_num_workers_hard_limit) ); 96 97 // do not warn if default number of workers is requested 98 if( workers_requested != governor::default_num_threads()-1 ) { 99 __TBB_ASSERT( skip_soft_limit_warning > workers_requested, 100 "skip_soft_limit_warning must be larger than any valid workers_requested" ); 101 unsigned soft_limit_to_report = m->my_workers_soft_limit_to_report.load(std::memory_order_relaxed); 102 if( soft_limit_to_report < workers_requested ) { 103 runtime_warning( "The number of workers is currently limited to %u. " 104 "The request for %u workers is ignored. Further requests for more workers " 105 "will be silently ignored until the limit changes.\n", 106 soft_limit_to_report, workers_requested ); 107 // The race is possible when multiple threads report warnings. 108 // We are OK with that, as there are just multiple warnings. 109 unsigned expected_limit = soft_limit_to_report; 110 m->my_workers_soft_limit_to_report.compare_exchange_strong(expected_limit, skip_soft_limit_warning); 111 } 112 113 } 114 if( m->my_stack_size < stack_size ) 115 runtime_warning( "Thread stack size has been already set to %u. " 116 "The request for larger stack (%u) cannot be satisfied.\n", m->my_stack_size, stack_size ); 117 return true; 118 } 119 return false; 120 } 121 122 market& market::global_market(bool is_public, unsigned workers_requested, std::size_t stack_size) { 123 global_market_mutex_type::scoped_lock lock( theMarketMutex ); 124 if( !market::add_ref_unsafe(lock, is_public, workers_requested, stack_size) ) { 125 // TODO: A lot is done under theMarketMutex locked. Can anything be moved out? 126 if( stack_size == 0 ) 127 stack_size = global_control::active_value(global_control::thread_stack_size); 128 // Expecting that 4P is suitable for most applications. 129 // Limit to 2P for large thread number. 130 // TODO: ask RML for max concurrency and possibly correct hard_limit 131 const unsigned factor = governor::default_num_threads()<=128? 4 : 2; 132 // The requested number of threads is intentionally not considered in 133 // computation of the hard limit, in order to separate responsibilities 134 // and avoid complicated interactions between global_control and task_scheduler_init. 135 // The market guarantees that at least 256 threads might be created. 136 const unsigned workers_hard_limit = max(max(factor*governor::default_num_threads(), 256u), app_parallelism_limit()); 137 const unsigned workers_soft_limit = calc_workers_soft_limit(workers_requested, workers_hard_limit); 138 // Create the global market instance 139 std::size_t size = sizeof(market); 140 __TBB_ASSERT( __TBB_offsetof(market, my_workers) + sizeof(std::atomic<thread_data*>) == sizeof(market), 141 "my_workers must be the last data field of the market class"); 142 size += sizeof(std::atomic<thread_data*>) * (workers_hard_limit - 1); 143 __TBB_InitOnce::add_ref(); 144 void* storage = cache_aligned_allocate(size); 145 std::memset( storage, 0, size ); 146 // Initialize and publish global market 147 market* m = new (storage) market( workers_soft_limit, workers_hard_limit, stack_size ); 148 if( is_public ) 149 m->my_public_ref_count.store(1, std::memory_order_relaxed); 150 if (market::is_lifetime_control_present()) { 151 ++m->my_public_ref_count; 152 ++m->my_ref_count; 153 } 154 theMarket = m; 155 // This check relies on the fact that for shared RML default_concurrency==max_concurrency 156 if ( !governor::UsePrivateRML && m->my_server->default_concurrency() < workers_soft_limit ) 157 runtime_warning( "RML might limit the number of workers to %u while %u is requested.\n" 158 , m->my_server->default_concurrency(), workers_soft_limit ); 159 } 160 return *theMarket; 161 } 162 163 void market::destroy () { 164 this->market::~market(); // qualified to suppress warning 165 cache_aligned_deallocate( this ); 166 __TBB_InitOnce::remove_ref(); 167 } 168 169 bool market::release ( bool is_public, bool blocking_terminate ) { 170 market::enforce([this] { return theMarket == this; }, "Global market instance was destroyed prematurely?"); 171 bool do_release = false; 172 { 173 global_market_mutex_type::scoped_lock lock( theMarketMutex ); 174 if ( blocking_terminate ) { 175 __TBB_ASSERT( is_public, "Only an object with a public reference can request the blocking terminate" ); 176 while ( my_public_ref_count.load(std::memory_order_relaxed) == 1 && 177 my_ref_count.load(std::memory_order_relaxed) > 1 ) { 178 lock.release(); 179 // To guarantee that request_close_connection() is called by the last external thread, we need to wait till all 180 // references are released. Re-read my_public_ref_count to limit waiting if new external threads are created. 181 // Theoretically, new private references to the market can be added during waiting making it potentially 182 // endless. 183 // TODO: revise why the weak scheduler needs market's pointer and try to remove this wait. 184 // Note that the market should know about its schedulers for cancellation/exception/priority propagation, 185 // see e.g. task_group_context::cancel_group_execution() 186 while ( my_public_ref_count.load(std::memory_order_acquire) == 1 && 187 my_ref_count.load(std::memory_order_acquire) > 1 ) { 188 yield(); 189 } 190 lock.acquire( theMarketMutex ); 191 } 192 } 193 if ( is_public ) { 194 __TBB_ASSERT( theMarket == this, "Global market instance was destroyed prematurely?" ); 195 __TBB_ASSERT( my_public_ref_count.load(std::memory_order_relaxed), nullptr); 196 --my_public_ref_count; 197 } 198 if ( --my_ref_count == 0 ) { 199 __TBB_ASSERT( !my_public_ref_count.load(std::memory_order_relaxed), nullptr); 200 do_release = true; 201 theMarket = nullptr; 202 } 203 } 204 if( do_release ) { 205 __TBB_ASSERT( !my_public_ref_count.load(std::memory_order_relaxed), 206 "No public references remain if we remove the market." ); 207 // inform RML that blocking termination is required 208 my_join_workers = blocking_terminate; 209 my_server->request_close_connection(); 210 return blocking_terminate; 211 } 212 return false; 213 } 214 215 int market::update_workers_request() { 216 int old_request = my_num_workers_requested; 217 my_num_workers_requested = min(my_total_demand.load(std::memory_order_relaxed), 218 (int)my_num_workers_soft_limit.load(std::memory_order_relaxed)); 219 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY 220 if (my_mandatory_num_requested > 0) { 221 __TBB_ASSERT(my_num_workers_soft_limit.load(std::memory_order_relaxed) == 0, nullptr); 222 my_num_workers_requested = 1; 223 } 224 #endif 225 update_allotment(my_num_workers_requested); 226 return my_num_workers_requested - old_request; 227 } 228 229 void market::set_active_num_workers ( unsigned soft_limit ) { 230 market *m; 231 232 { 233 global_market_mutex_type::scoped_lock lock( theMarketMutex ); 234 if ( !theMarket ) 235 return; // actual value will be used at market creation 236 m = theMarket; 237 if (m->my_num_workers_soft_limit.load(std::memory_order_relaxed) == soft_limit) 238 return; 239 ++m->my_ref_count; 240 } 241 // have my_ref_count for market, use it safely 242 243 int delta = 0; 244 { 245 arenas_list_mutex_type::scoped_lock lock( m->my_arenas_list_mutex ); 246 __TBB_ASSERT(soft_limit <= m->my_num_workers_hard_limit, nullptr); 247 248 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY 249 arena_list_type* arenas = m->my_arenas; 250 251 if (m->my_num_workers_soft_limit.load(std::memory_order_relaxed) == 0 && 252 m->my_mandatory_num_requested > 0) 253 { 254 for (unsigned level = 0; level < num_priority_levels; ++level ) 255 for (arena_list_type::iterator it = arenas[level].begin(); it != arenas[level].end(); ++it) 256 if (it->my_global_concurrency_mode.load(std::memory_order_relaxed)) 257 m->disable_mandatory_concurrency_impl(&*it); 258 } 259 __TBB_ASSERT(m->my_mandatory_num_requested == 0, nullptr); 260 #endif 261 262 m->my_num_workers_soft_limit.store(soft_limit, std::memory_order_release); 263 // report only once after new soft limit value is set 264 m->my_workers_soft_limit_to_report.store(soft_limit, std::memory_order_relaxed); 265 266 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY 267 if (m->my_num_workers_soft_limit.load(std::memory_order_relaxed) == 0) { 268 for (unsigned level = 0; level < num_priority_levels; ++level ) 269 for (arena_list_type::iterator it = arenas[level].begin(); it != arenas[level].end(); ++it) 270 if (it->has_enqueued_tasks()) 271 m->enable_mandatory_concurrency_impl(&*it); 272 } 273 #endif 274 275 delta = m->update_workers_request(); 276 } 277 // adjust_job_count_estimate must be called outside of any locks 278 if( delta!=0 ) 279 m->my_server->adjust_job_count_estimate( delta ); 280 // release internal market reference to match ++m->my_ref_count above 281 m->release( /*is_public=*/false, /*blocking_terminate=*/false ); 282 } 283 284 bool governor::does_client_join_workers (const rml::tbb_client &client) { 285 return ((const market&)client).must_join_workers(); 286 } 287 288 arena* market::create_arena ( int num_slots, int num_reserved_slots, unsigned arena_priority_level, 289 std::size_t stack_size ) 290 { 291 __TBB_ASSERT( num_slots > 0, nullptr); 292 __TBB_ASSERT( num_reserved_slots <= num_slots, nullptr); 293 // Add public market reference for an external thread/task_arena (that adds an internal reference in exchange). 294 market &m = global_market( /*is_public=*/true, num_slots-num_reserved_slots, stack_size ); 295 arena& a = arena::allocate_arena( m, num_slots, num_reserved_slots, arena_priority_level ); 296 // Add newly created arena into the existing market's list. 297 arenas_list_mutex_type::scoped_lock lock(m.my_arenas_list_mutex); 298 m.insert_arena_into_list(a); 299 return &a; 300 } 301 302 /** This method must be invoked under my_arenas_list_mutex. **/ 303 void market::detach_arena ( arena& a ) { 304 market::enforce([this] { return theMarket == this; }, "Global market instance was destroyed prematurely?"); 305 __TBB_ASSERT( !a.my_slots[0].is_occupied(), nullptr); 306 if (a.my_global_concurrency_mode.load(std::memory_order_relaxed)) 307 disable_mandatory_concurrency_impl(&a); 308 309 remove_arena_from_list(a); 310 if (a.my_aba_epoch == my_arenas_aba_epoch.load(std::memory_order_relaxed)) { 311 my_arenas_aba_epoch.store(my_arenas_aba_epoch.load(std::memory_order_relaxed) + 1, std::memory_order_relaxed); 312 } 313 } 314 315 void market::try_destroy_arena ( arena* a, uintptr_t aba_epoch, unsigned priority_level ) { 316 bool locked = true; 317 __TBB_ASSERT( a, nullptr); 318 // we hold reference to the market, so it cannot be destroyed at any moment here 319 market::enforce([this] { return theMarket == this; }, nullptr); 320 __TBB_ASSERT( my_ref_count!=0, nullptr); 321 my_arenas_list_mutex.lock(); 322 arena_list_type::iterator it = my_arenas[priority_level].begin(); 323 for ( ; it != my_arenas[priority_level].end(); ++it ) { 324 if ( a == &*it ) { 325 if ( it->my_aba_epoch == aba_epoch ) { 326 // Arena is alive 327 // Acquire my_references to sync with threads that just left the arena 328 if (!a->my_num_workers_requested && !a->my_references.load(std::memory_order_acquire)) { 329 __TBB_ASSERT( 330 !a->my_num_workers_allotted.load(std::memory_order_relaxed) && 331 (a->my_pool_state == arena::SNAPSHOT_EMPTY || !a->my_max_num_workers), 332 "Inconsistent arena state" 333 ); 334 // Arena is abandoned. Destroy it. 335 detach_arena( *a ); 336 my_arenas_list_mutex.unlock(); 337 locked = false; 338 a->free_arena(); 339 } 340 } 341 if (locked) 342 my_arenas_list_mutex.unlock(); 343 return; 344 } 345 } 346 my_arenas_list_mutex.unlock(); 347 } 348 349 /** This method must be invoked under my_arenas_list_mutex. **/ 350 arena* market::arena_in_need ( arena_list_type* arenas, arena* hint ) { 351 // TODO: make sure arena with higher priority returned only if there are available slots in it. 352 hint = select_next_arena( hint ); 353 if ( !hint ) 354 return nullptr; 355 arena_list_type::iterator it = hint; 356 unsigned curr_priority_level = hint->my_priority_level; 357 __TBB_ASSERT( it != arenas[curr_priority_level].end(), nullptr ); 358 do { 359 arena& a = *it; 360 if ( ++it == arenas[curr_priority_level].end() ) { 361 do { 362 ++curr_priority_level %= num_priority_levels; 363 } while ( arenas[curr_priority_level].empty() ); 364 it = arenas[curr_priority_level].begin(); 365 } 366 if( a.num_workers_active() < a.my_num_workers_allotted.load(std::memory_order_relaxed) ) { 367 a.my_references += arena::ref_worker; 368 return &a; 369 } 370 } while ( it != hint ); 371 return nullptr; 372 } 373 374 arena* market::arena_in_need(arena* prev) { 375 if (my_total_demand.load(std::memory_order_acquire) <= 0) 376 return nullptr; 377 arenas_list_mutex_type::scoped_lock lock(my_arenas_list_mutex, /*is_writer=*/false); 378 // TODO: introduce three state response: alive, not_alive, no_market_arenas 379 if ( is_arena_alive(prev) ) 380 return arena_in_need(my_arenas, prev); 381 return arena_in_need(my_arenas, my_next_arena); 382 } 383 384 int market::update_allotment ( arena_list_type* arenas, int workers_demand, int max_workers ) { 385 __TBB_ASSERT( workers_demand > 0, nullptr ); 386 max_workers = min(workers_demand, max_workers); 387 int unassigned_workers = max_workers; 388 int assigned = 0; 389 int carry = 0; 390 unsigned max_priority_level = num_priority_levels; 391 for (unsigned list_idx = 0; list_idx < num_priority_levels; ++list_idx ) { 392 int assigned_per_priority = min(my_priority_level_demand[list_idx], unassigned_workers); 393 unassigned_workers -= assigned_per_priority; 394 for (arena_list_type::iterator it = arenas[list_idx].begin(); it != arenas[list_idx].end(); ++it) { 395 arena& a = *it; 396 __TBB_ASSERT(a.my_num_workers_requested >= 0, nullptr); 397 __TBB_ASSERT(a.my_num_workers_requested <= int(a.my_max_num_workers) 398 || (a.my_max_num_workers == 0 && a.my_local_concurrency_requests > 0 && a.my_num_workers_requested == 1), nullptr); 399 if (a.my_num_workers_requested == 0) { 400 __TBB_ASSERT(!a.my_num_workers_allotted.load(std::memory_order_relaxed), nullptr); 401 continue; 402 } 403 404 if (max_priority_level == num_priority_levels) { 405 max_priority_level = list_idx; 406 } 407 408 int allotted = 0; 409 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY 410 if (my_num_workers_soft_limit.load(std::memory_order_relaxed) == 0) { 411 __TBB_ASSERT(max_workers == 0 || max_workers == 1, nullptr); 412 allotted = a.my_global_concurrency_mode.load(std::memory_order_relaxed) && 413 assigned < max_workers ? 1 : 0; 414 } else 415 #endif 416 { 417 int tmp = a.my_num_workers_requested * assigned_per_priority + carry; 418 allotted = tmp / my_priority_level_demand[list_idx]; 419 carry = tmp % my_priority_level_demand[list_idx]; 420 __TBB_ASSERT(allotted <= a.my_num_workers_requested, nullptr); 421 __TBB_ASSERT(allotted <= int(a.my_num_slots - a.my_num_reserved_slots), nullptr); 422 } 423 a.my_num_workers_allotted.store(allotted, std::memory_order_relaxed); 424 a.my_is_top_priority.store(list_idx == max_priority_level, std::memory_order_relaxed); 425 assigned += allotted; 426 } 427 } 428 __TBB_ASSERT( 0 <= assigned && assigned <= max_workers, nullptr ); 429 return assigned; 430 } 431 432 /** This method must be invoked under my_arenas_list_mutex. **/ 433 bool market::is_arena_in_list( arena_list_type &arenas, arena *a ) { 434 __TBB_ASSERT( a, "Expected non-null pointer to arena." ); 435 for ( arena_list_type::iterator it = arenas.begin(); it != arenas.end(); ++it ) 436 if ( a == &*it ) 437 return true; 438 return false; 439 } 440 441 /** This method must be invoked under my_arenas_list_mutex. **/ 442 bool market::is_arena_alive(arena* a) { 443 if ( !a ) 444 return false; 445 446 // Still cannot access internals of the arena since the object itself might be destroyed. 447 448 for ( unsigned idx = 0; idx < num_priority_levels; ++idx ) { 449 if ( is_arena_in_list( my_arenas[idx], a ) ) 450 return true; 451 } 452 return false; 453 } 454 455 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY 456 void market::enable_mandatory_concurrency_impl ( arena *a ) { 457 __TBB_ASSERT(!a->my_global_concurrency_mode.load(std::memory_order_relaxed), nullptr); 458 __TBB_ASSERT(my_num_workers_soft_limit.load(std::memory_order_relaxed) == 0, nullptr); 459 460 a->my_global_concurrency_mode.store(true, std::memory_order_relaxed); 461 my_mandatory_num_requested++; 462 } 463 464 void market::enable_mandatory_concurrency ( arena *a ) { 465 int delta = 0; 466 { 467 arenas_list_mutex_type::scoped_lock lock(my_arenas_list_mutex); 468 if (my_num_workers_soft_limit.load(std::memory_order_relaxed) != 0 || 469 a->my_global_concurrency_mode.load(std::memory_order_relaxed)) 470 return; 471 472 enable_mandatory_concurrency_impl(a); 473 delta = update_workers_request(); 474 } 475 476 if (delta != 0) 477 my_server->adjust_job_count_estimate(delta); 478 } 479 480 void market::disable_mandatory_concurrency_impl(arena* a) { 481 __TBB_ASSERT(a->my_global_concurrency_mode.load(std::memory_order_relaxed), nullptr); 482 __TBB_ASSERT(my_mandatory_num_requested > 0, nullptr); 483 484 a->my_global_concurrency_mode.store(false, std::memory_order_relaxed); 485 my_mandatory_num_requested--; 486 } 487 488 void market::mandatory_concurrency_disable ( arena *a ) { 489 int delta = 0; 490 { 491 arenas_list_mutex_type::scoped_lock lock(my_arenas_list_mutex); 492 if (!a->my_global_concurrency_mode.load(std::memory_order_relaxed)) 493 return; 494 // There is a racy window in advertise_new_work between mandtory concurrency enabling and 495 // setting SNAPSHOT_FULL. It gives a chance to spawn request to disable mandatory concurrency. 496 // Therefore, we double check that there is no enqueued tasks. 497 if (a->has_enqueued_tasks()) 498 return; 499 500 __TBB_ASSERT(my_num_workers_soft_limit.load(std::memory_order_relaxed) == 0, nullptr); 501 disable_mandatory_concurrency_impl(a); 502 503 delta = update_workers_request(); 504 } 505 if (delta != 0) 506 my_server->adjust_job_count_estimate(delta); 507 } 508 #endif /* __TBB_ENQUEUE_ENFORCED_CONCURRENCY */ 509 510 void market::adjust_demand ( arena& a, int delta, bool mandatory ) { 511 if (!delta) { 512 return; 513 } 514 int target_epoch{}; 515 { 516 arenas_list_mutex_type::scoped_lock lock(my_arenas_list_mutex); 517 __TBB_ASSERT(theMarket != nullptr, "market instance was destroyed prematurely?"); 518 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY 519 if (mandatory) { 520 __TBB_ASSERT(delta == 1 || delta == -1, nullptr); 521 // Count the number of mandatory requests and proceed only for 0->1 and 1->0 transitions. 522 a.my_local_concurrency_requests += delta; 523 if ((delta > 0 && a.my_local_concurrency_requests != 1) || 524 (delta < 0 && a.my_local_concurrency_requests != 0)) 525 { 526 return; 527 } 528 } 529 #endif 530 a.my_total_num_workers_requested += delta; 531 int target_workers = 0; 532 // Cap target_workers into interval [0, a.my_max_num_workers] 533 if (a.my_total_num_workers_requested > 0) { 534 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY 535 // At least one thread should be requested when mandatory concurrency 536 int max_num_workers = int(a.my_max_num_workers); 537 if (a.my_local_concurrency_requests > 0 && max_num_workers == 0) { 538 max_num_workers = 1; 539 } 540 #endif 541 target_workers = min(a.my_total_num_workers_requested, max_num_workers); 542 } 543 544 delta = target_workers - a.my_num_workers_requested; 545 546 if (delta == 0) { 547 return; 548 } 549 550 a.my_num_workers_requested += delta; 551 if (a.my_num_workers_requested == 0) { 552 a.my_num_workers_allotted.store(0, std::memory_order_relaxed); 553 } 554 555 int total_demand = my_total_demand.load(std::memory_order_relaxed) + delta; 556 my_total_demand.store(total_demand, std::memory_order_relaxed); 557 my_priority_level_demand[a.my_priority_level] += delta; 558 unsigned effective_soft_limit = my_num_workers_soft_limit.load(std::memory_order_relaxed); 559 if (my_mandatory_num_requested > 0) { 560 __TBB_ASSERT(effective_soft_limit == 0, nullptr); 561 effective_soft_limit = 1; 562 } 563 564 update_allotment(effective_soft_limit); 565 if (delta > 0) { 566 // can't overflow soft_limit, but remember values request by arenas in 567 // my_total_demand to not prematurely release workers to RML 568 if (my_num_workers_requested + delta > (int)effective_soft_limit) 569 delta = effective_soft_limit - my_num_workers_requested; 570 } 571 else { 572 // the number of workers should not be decreased below my_total_demand 573 if (my_num_workers_requested + delta < total_demand) 574 delta = min(total_demand, (int)effective_soft_limit) - my_num_workers_requested; 575 } 576 my_num_workers_requested += delta; 577 __TBB_ASSERT(my_num_workers_requested <= (int)effective_soft_limit, nullptr); 578 579 target_epoch = a.my_adjust_demand_target_epoch++; 580 } 581 582 a.my_adjust_demand_current_epoch.wait_until(target_epoch, /* context = */ target_epoch, std::memory_order_relaxed); 583 // Must be called outside of any locks 584 my_server->adjust_job_count_estimate( delta ); 585 a.my_adjust_demand_current_epoch.exchange(target_epoch + 1); 586 a.my_adjust_demand_current_epoch.notify_relaxed(target_epoch + 1); 587 } 588 589 void market::process( job& j ) { 590 thread_data& td = static_cast<thread_data&>(j); 591 // td.my_arena can be dead. Don't access it until arena_in_need is called 592 arena *a = td.my_arena; 593 for (int i = 0; i < 2; ++i) { 594 while ( (a = arena_in_need(a)) ) { 595 a->process(td); 596 } 597 // Workers leave market because there is no arena in need. It can happen earlier than 598 // adjust_job_count_estimate() decreases my_slack and RML can put this thread to sleep. 599 // It might result in a busy-loop checking for my_slack<0 and calling this method instantly. 600 // the yield refines this spinning. 601 if ( !i ) { 602 yield(); 603 } 604 } 605 } 606 607 void market::cleanup( job& j) { 608 market::enforce([this] { return theMarket != this; }, nullptr ); 609 governor::auto_terminate(&j); 610 } 611 612 void market::acknowledge_close_connection() { 613 destroy(); 614 } 615 616 ::rml::job* market::create_one_job() { 617 unsigned short index = ++my_first_unused_worker_idx; 618 __TBB_ASSERT( index > 0, nullptr); 619 ITT_THREAD_SET_NAME(_T("TBB Worker Thread")); 620 // index serves as a hint decreasing conflicts between workers when they migrate between arenas 621 thread_data* td = new(cache_aligned_allocate(sizeof(thread_data))) thread_data{ index, true }; 622 __TBB_ASSERT( index <= my_num_workers_hard_limit, nullptr); 623 __TBB_ASSERT( my_workers[index - 1].load(std::memory_order_relaxed) == nullptr, nullptr); 624 my_workers[index - 1].store(td, std::memory_order_release); 625 return td; 626 } 627 628 void market::add_external_thread(thread_data& td) { 629 context_state_propagation_mutex_type::scoped_lock lock(the_context_state_propagation_mutex); 630 my_masters.push_front(td); 631 } 632 633 void market::remove_external_thread(thread_data& td) { 634 context_state_propagation_mutex_type::scoped_lock lock(the_context_state_propagation_mutex); 635 my_masters.remove(td); 636 } 637 638 } // namespace r1 639 } // namespace detail 640 } // namespace tbb 641