1 /* 2 Copyright (c) 2005-2022 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #include "oneapi/tbb/global_control.h" // global_control::active_value 18 19 #include "market.h" 20 #include "main.h" 21 #include "governor.h" 22 #include "arena.h" 23 #include "thread_data.h" 24 #include "itt_notify.h" 25 26 #include <cstring> // std::memset() 27 28 namespace tbb { 29 namespace detail { 30 namespace r1 { 31 32 /** This method must be invoked under my_arenas_list_mutex. **/ 33 arena* market::select_next_arena( arena* hint ) { 34 unsigned next_arena_priority_level = num_priority_levels; 35 if ( hint ) 36 next_arena_priority_level = hint->my_priority_level; 37 for ( unsigned idx = 0; idx < next_arena_priority_level; ++idx ) { 38 if ( !my_arenas[idx].empty() ) 39 return &*my_arenas[idx].begin(); 40 } 41 // don't change if arena with higher priority is not found. 42 return hint; 43 } 44 45 void market::insert_arena_into_list ( arena& a ) { 46 __TBB_ASSERT( a.my_priority_level < num_priority_levels, nullptr ); 47 my_arenas[a.my_priority_level].push_front( a ); 48 __TBB_ASSERT( !my_next_arena || my_next_arena->my_priority_level < num_priority_levels, nullptr ); 49 my_next_arena = select_next_arena( my_next_arena ); 50 } 51 52 void market::remove_arena_from_list ( arena& a ) { 53 __TBB_ASSERT( a.my_priority_level < num_priority_levels, nullptr ); 54 my_arenas[a.my_priority_level].remove( a ); 55 if ( my_next_arena == &a ) 56 my_next_arena = nullptr; 57 my_next_arena = select_next_arena( my_next_arena ); 58 } 59 60 //------------------------------------------------------------------------ 61 // market 62 //------------------------------------------------------------------------ 63 64 market::market ( unsigned workers_soft_limit, unsigned workers_hard_limit, std::size_t stack_size ) 65 : my_num_workers_hard_limit(workers_hard_limit) 66 , my_num_workers_soft_limit(workers_soft_limit) 67 , my_next_arena(nullptr) 68 , my_ref_count(1) 69 , my_stack_size(stack_size) 70 , my_workers_soft_limit_to_report(workers_soft_limit) 71 { 72 // Once created RML server will start initializing workers that will need 73 // global market instance to get worker stack size 74 my_server = governor::create_rml_server( *this ); 75 __TBB_ASSERT( my_server, "Failed to create RML server" ); 76 } 77 78 market::~market() { 79 poison_pointer(my_server); 80 poison_pointer(my_next_arena); 81 } 82 83 static unsigned calc_workers_soft_limit(unsigned workers_soft_limit, unsigned workers_hard_limit) { 84 if( int soft_limit = market::app_parallelism_limit() ) 85 workers_soft_limit = soft_limit-1; 86 else // if user set no limits (yet), use market's parameter 87 workers_soft_limit = max( governor::default_num_threads() - 1, workers_soft_limit ); 88 if( workers_soft_limit >= workers_hard_limit ) 89 workers_soft_limit = workers_hard_limit-1; 90 return workers_soft_limit; 91 } 92 93 bool market::add_ref_unsafe( global_market_mutex_type::scoped_lock& lock, bool is_public, unsigned workers_requested, std::size_t stack_size ) { 94 market *m = theMarket; 95 if( m ) { 96 ++m->my_ref_count; 97 const unsigned old_public_count = is_public ? m->my_public_ref_count++ : /*any non-zero value*/1; 98 lock.release(); 99 if( old_public_count==0 ) 100 set_active_num_workers( calc_workers_soft_limit(workers_requested, m->my_num_workers_hard_limit) ); 101 102 // do not warn if default number of workers is requested 103 if( workers_requested != governor::default_num_threads()-1 ) { 104 __TBB_ASSERT( skip_soft_limit_warning > workers_requested, 105 "skip_soft_limit_warning must be larger than any valid workers_requested" ); 106 unsigned soft_limit_to_report = m->my_workers_soft_limit_to_report.load(std::memory_order_relaxed); 107 if( soft_limit_to_report < workers_requested ) { 108 runtime_warning( "The number of workers is currently limited to %u. " 109 "The request for %u workers is ignored. Further requests for more workers " 110 "will be silently ignored until the limit changes.\n", 111 soft_limit_to_report, workers_requested ); 112 // The race is possible when multiple threads report warnings. 113 // We are OK with that, as there are just multiple warnings. 114 unsigned expected_limit = soft_limit_to_report; 115 m->my_workers_soft_limit_to_report.compare_exchange_strong(expected_limit, skip_soft_limit_warning); 116 } 117 118 } 119 if( m->my_stack_size < stack_size ) 120 runtime_warning( "Thread stack size has been already set to %u. " 121 "The request for larger stack (%u) cannot be satisfied.\n", m->my_stack_size, stack_size ); 122 return true; 123 } 124 return false; 125 } 126 127 market& market::global_market(bool is_public, unsigned workers_requested, std::size_t stack_size) { 128 global_market_mutex_type::scoped_lock lock( theMarketMutex ); 129 if( !market::add_ref_unsafe(lock, is_public, workers_requested, stack_size) ) { 130 // TODO: A lot is done under theMarketMutex locked. Can anything be moved out? 131 if( stack_size == 0 ) 132 stack_size = global_control::active_value(global_control::thread_stack_size); 133 // Expecting that 4P is suitable for most applications. 134 // Limit to 2P for large thread number. 135 // TODO: ask RML for max concurrency and possibly correct hard_limit 136 const unsigned factor = governor::default_num_threads()<=128? 4 : 2; 137 // The requested number of threads is intentionally not considered in 138 // computation of the hard limit, in order to separate responsibilities 139 // and avoid complicated interactions between global_control and task_scheduler_init. 140 // The market guarantees that at least 256 threads might be created. 141 const unsigned workers_hard_limit = max(max(factor*governor::default_num_threads(), 256u), app_parallelism_limit()); 142 const unsigned workers_soft_limit = calc_workers_soft_limit(workers_requested, workers_hard_limit); 143 // Create the global market instance 144 std::size_t size = sizeof(market); 145 __TBB_ASSERT( __TBB_offsetof(market, my_workers) + sizeof(std::atomic<thread_data*>) == sizeof(market), 146 "my_workers must be the last data field of the market class"); 147 size += sizeof(std::atomic<thread_data*>) * (workers_hard_limit - 1); 148 __TBB_InitOnce::add_ref(); 149 void* storage = cache_aligned_allocate(size); 150 std::memset( storage, 0, size ); 151 // Initialize and publish global market 152 market* m = new (storage) market( workers_soft_limit, workers_hard_limit, stack_size ); 153 if( is_public ) 154 m->my_public_ref_count.store(1, std::memory_order_relaxed); 155 if (market::is_lifetime_control_present()) { 156 ++m->my_public_ref_count; 157 ++m->my_ref_count; 158 } 159 theMarket = m; 160 // This check relies on the fact that for shared RML default_concurrency==max_concurrency 161 if ( !governor::UsePrivateRML && m->my_server->default_concurrency() < workers_soft_limit ) 162 runtime_warning( "RML might limit the number of workers to %u while %u is requested.\n" 163 , m->my_server->default_concurrency(), workers_soft_limit ); 164 } 165 return *theMarket; 166 } 167 168 void market::destroy () { 169 this->market::~market(); // qualified to suppress warning 170 cache_aligned_deallocate( this ); 171 __TBB_InitOnce::remove_ref(); 172 } 173 174 bool market::release ( bool is_public, bool blocking_terminate ) { 175 market::enforce([this] { return theMarket == this; }, "Global market instance was destroyed prematurely?"); 176 bool do_release = false; 177 { 178 global_market_mutex_type::scoped_lock lock( theMarketMutex ); 179 if ( blocking_terminate ) { 180 __TBB_ASSERT( is_public, "Only an object with a public reference can request the blocking terminate" ); 181 while ( my_public_ref_count.load(std::memory_order_relaxed) == 1 && 182 my_ref_count.load(std::memory_order_relaxed) > 1 ) { 183 lock.release(); 184 // To guarantee that request_close_connection() is called by the last external thread, we need to wait till all 185 // references are released. Re-read my_public_ref_count to limit waiting if new external threads are created. 186 // Theoretically, new private references to the market can be added during waiting making it potentially 187 // endless. 188 // TODO: revise why the weak scheduler needs market's pointer and try to remove this wait. 189 // Note that the market should know about its schedulers for cancellation/exception/priority propagation, 190 // see e.g. task_group_context::cancel_group_execution() 191 while ( my_public_ref_count.load(std::memory_order_acquire) == 1 && 192 my_ref_count.load(std::memory_order_acquire) > 1 ) { 193 yield(); 194 } 195 lock.acquire( theMarketMutex ); 196 } 197 } 198 if ( is_public ) { 199 __TBB_ASSERT( theMarket == this, "Global market instance was destroyed prematurely?" ); 200 __TBB_ASSERT( my_public_ref_count.load(std::memory_order_relaxed), nullptr); 201 --my_public_ref_count; 202 } 203 if ( --my_ref_count == 0 ) { 204 __TBB_ASSERT( !my_public_ref_count.load(std::memory_order_relaxed), nullptr); 205 do_release = true; 206 theMarket = nullptr; 207 } 208 } 209 if( do_release ) { 210 __TBB_ASSERT( !my_public_ref_count.load(std::memory_order_relaxed), 211 "No public references remain if we remove the market." ); 212 // inform RML that blocking termination is required 213 my_join_workers = blocking_terminate; 214 my_server->request_close_connection(); 215 return blocking_terminate; 216 } 217 return false; 218 } 219 220 int market::update_workers_request() { 221 int old_request = my_num_workers_requested; 222 my_num_workers_requested = min(my_total_demand.load(std::memory_order_relaxed), 223 (int)my_num_workers_soft_limit.load(std::memory_order_relaxed)); 224 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY 225 if (my_mandatory_num_requested > 0) { 226 __TBB_ASSERT(my_num_workers_soft_limit.load(std::memory_order_relaxed) == 0, nullptr); 227 my_num_workers_requested = 1; 228 } 229 #endif 230 update_allotment(my_num_workers_requested); 231 return my_num_workers_requested - old_request; 232 } 233 234 void market::set_active_num_workers ( unsigned soft_limit ) { 235 market *m; 236 237 { 238 global_market_mutex_type::scoped_lock lock( theMarketMutex ); 239 if ( !theMarket ) 240 return; // actual value will be used at market creation 241 m = theMarket; 242 if (m->my_num_workers_soft_limit.load(std::memory_order_relaxed) == soft_limit) 243 return; 244 ++m->my_ref_count; 245 } 246 // have my_ref_count for market, use it safely 247 248 int delta = 0; 249 { 250 arenas_list_mutex_type::scoped_lock lock( m->my_arenas_list_mutex ); 251 __TBB_ASSERT(soft_limit <= m->my_num_workers_hard_limit, nullptr); 252 253 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY 254 arena_list_type* arenas = m->my_arenas; 255 256 if (m->my_num_workers_soft_limit.load(std::memory_order_relaxed) == 0 && 257 m->my_mandatory_num_requested > 0) 258 { 259 for (unsigned level = 0; level < num_priority_levels; ++level ) 260 for (arena_list_type::iterator it = arenas[level].begin(); it != arenas[level].end(); ++it) 261 if (it->my_global_concurrency_mode.load(std::memory_order_relaxed)) 262 m->disable_mandatory_concurrency_impl(&*it); 263 } 264 __TBB_ASSERT(m->my_mandatory_num_requested == 0, nullptr); 265 #endif 266 267 m->my_num_workers_soft_limit.store(soft_limit, std::memory_order_release); 268 // report only once after new soft limit value is set 269 m->my_workers_soft_limit_to_report.store(soft_limit, std::memory_order_relaxed); 270 271 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY 272 if (m->my_num_workers_soft_limit.load(std::memory_order_relaxed) == 0) { 273 for (unsigned level = 0; level < num_priority_levels; ++level ) 274 for (arena_list_type::iterator it = arenas[level].begin(); it != arenas[level].end(); ++it) 275 if (it->has_enqueued_tasks()) 276 m->enable_mandatory_concurrency_impl(&*it); 277 } 278 #endif 279 280 delta = m->update_workers_request(); 281 } 282 // adjust_job_count_estimate must be called outside of any locks 283 if( delta!=0 ) 284 m->my_server->adjust_job_count_estimate( delta ); 285 // release internal market reference to match ++m->my_ref_count above 286 m->release( /*is_public=*/false, /*blocking_terminate=*/false ); 287 } 288 289 bool governor::does_client_join_workers (const rml::tbb_client &client) { 290 return ((const market&)client).must_join_workers(); 291 } 292 293 arena* market::create_arena ( int num_slots, int num_reserved_slots, unsigned arena_priority_level, 294 std::size_t stack_size ) 295 { 296 __TBB_ASSERT( num_slots > 0, nullptr); 297 __TBB_ASSERT( num_reserved_slots <= num_slots, nullptr); 298 // Add public market reference for an external thread/task_arena (that adds an internal reference in exchange). 299 market &m = global_market( /*is_public=*/true, num_slots-num_reserved_slots, stack_size ); 300 arena& a = arena::allocate_arena( m, num_slots, num_reserved_slots, arena_priority_level ); 301 // Add newly created arena into the existing market's list. 302 arenas_list_mutex_type::scoped_lock lock(m.my_arenas_list_mutex); 303 m.insert_arena_into_list(a); 304 return &a; 305 } 306 307 /** This method must be invoked under my_arenas_list_mutex. **/ 308 void market::detach_arena ( arena& a ) { 309 market::enforce([this] { return theMarket == this; }, "Global market instance was destroyed prematurely?"); 310 __TBB_ASSERT( !a.my_slots[0].is_occupied(), nullptr); 311 if (a.my_global_concurrency_mode.load(std::memory_order_relaxed)) 312 disable_mandatory_concurrency_impl(&a); 313 314 remove_arena_from_list(a); 315 if (a.my_aba_epoch == my_arenas_aba_epoch.load(std::memory_order_relaxed)) { 316 my_arenas_aba_epoch.store(my_arenas_aba_epoch.load(std::memory_order_relaxed) + 1, std::memory_order_relaxed); 317 } 318 } 319 320 void market::try_destroy_arena ( arena* a, uintptr_t aba_epoch, unsigned priority_level ) { 321 bool locked = true; 322 __TBB_ASSERT( a, nullptr); 323 // we hold reference to the server, so market cannot be destroyed at any moment here 324 __TBB_ASSERT(!is_poisoned(my_server), nullptr); 325 my_arenas_list_mutex.lock(); 326 arena_list_type::iterator it = my_arenas[priority_level].begin(); 327 for ( ; it != my_arenas[priority_level].end(); ++it ) { 328 if ( a == &*it ) { 329 if ( it->my_aba_epoch == aba_epoch ) { 330 // Arena is alive 331 // Acquire my_references to sync with threads that just left the arena 332 if (!a->my_num_workers_requested && !a->my_references.load(std::memory_order_acquire)) { 333 __TBB_ASSERT( 334 !a->my_num_workers_allotted.load(std::memory_order_relaxed) && 335 (a->my_pool_state == arena::SNAPSHOT_EMPTY || !a->my_max_num_workers), 336 "Inconsistent arena state" 337 ); 338 // Arena is abandoned. Destroy it. 339 detach_arena( *a ); 340 my_arenas_list_mutex.unlock(); 341 locked = false; 342 a->free_arena(); 343 } 344 } 345 if (locked) 346 my_arenas_list_mutex.unlock(); 347 return; 348 } 349 } 350 my_arenas_list_mutex.unlock(); 351 } 352 353 /** This method must be invoked under my_arenas_list_mutex. **/ 354 arena* market::arena_in_need ( arena_list_type* arenas, arena* hint ) { 355 // TODO: make sure arena with higher priority returned only if there are available slots in it. 356 hint = select_next_arena( hint ); 357 if ( !hint ) 358 return nullptr; 359 arena_list_type::iterator it = hint; 360 unsigned curr_priority_level = hint->my_priority_level; 361 __TBB_ASSERT( it != arenas[curr_priority_level].end(), nullptr ); 362 do { 363 arena& a = *it; 364 if ( ++it == arenas[curr_priority_level].end() ) { 365 do { 366 ++curr_priority_level %= num_priority_levels; 367 } while ( arenas[curr_priority_level].empty() ); 368 it = arenas[curr_priority_level].begin(); 369 } 370 if( a.num_workers_active() < a.my_num_workers_allotted.load(std::memory_order_relaxed) ) { 371 a.my_references += arena::ref_worker; 372 return &a; 373 } 374 } while ( it != hint ); 375 return nullptr; 376 } 377 378 arena* market::arena_in_need(arena* prev) { 379 if (my_total_demand.load(std::memory_order_acquire) <= 0) 380 return nullptr; 381 arenas_list_mutex_type::scoped_lock lock(my_arenas_list_mutex, /*is_writer=*/false); 382 // TODO: introduce three state response: alive, not_alive, no_market_arenas 383 if ( is_arena_alive(prev) ) 384 return arena_in_need(my_arenas, prev); 385 return arena_in_need(my_arenas, my_next_arena); 386 } 387 388 int market::update_allotment ( arena_list_type* arenas, int workers_demand, int max_workers ) { 389 __TBB_ASSERT( workers_demand > 0, nullptr ); 390 max_workers = min(workers_demand, max_workers); 391 int unassigned_workers = max_workers; 392 int assigned = 0; 393 int carry = 0; 394 unsigned max_priority_level = num_priority_levels; 395 for (unsigned list_idx = 0; list_idx < num_priority_levels; ++list_idx ) { 396 int assigned_per_priority = min(my_priority_level_demand[list_idx], unassigned_workers); 397 unassigned_workers -= assigned_per_priority; 398 for (arena_list_type::iterator it = arenas[list_idx].begin(); it != arenas[list_idx].end(); ++it) { 399 arena& a = *it; 400 __TBB_ASSERT(a.my_num_workers_requested >= 0, nullptr); 401 __TBB_ASSERT(a.my_num_workers_requested <= int(a.my_max_num_workers) 402 || (a.my_max_num_workers == 0 && a.my_local_concurrency_requests > 0 && a.my_num_workers_requested == 1), nullptr); 403 if (a.my_num_workers_requested == 0) { 404 __TBB_ASSERT(!a.my_num_workers_allotted.load(std::memory_order_relaxed), nullptr); 405 continue; 406 } 407 408 if (max_priority_level == num_priority_levels) { 409 max_priority_level = list_idx; 410 } 411 412 int allotted = 0; 413 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY 414 if (my_num_workers_soft_limit.load(std::memory_order_relaxed) == 0) { 415 __TBB_ASSERT(max_workers == 0 || max_workers == 1, nullptr); 416 allotted = a.my_global_concurrency_mode.load(std::memory_order_relaxed) && 417 assigned < max_workers ? 1 : 0; 418 } else 419 #endif 420 { 421 int tmp = a.my_num_workers_requested * assigned_per_priority + carry; 422 allotted = tmp / my_priority_level_demand[list_idx]; 423 carry = tmp % my_priority_level_demand[list_idx]; 424 __TBB_ASSERT(allotted <= a.my_num_workers_requested, nullptr); 425 __TBB_ASSERT(allotted <= int(a.my_num_slots - a.my_num_reserved_slots), nullptr); 426 } 427 a.my_num_workers_allotted.store(allotted, std::memory_order_relaxed); 428 a.my_is_top_priority.store(list_idx == max_priority_level, std::memory_order_relaxed); 429 assigned += allotted; 430 } 431 } 432 __TBB_ASSERT( 0 <= assigned && assigned <= max_workers, nullptr ); 433 return assigned; 434 } 435 436 /** This method must be invoked under my_arenas_list_mutex. **/ 437 bool market::is_arena_in_list( arena_list_type &arenas, arena *a ) { 438 __TBB_ASSERT( a, "Expected non-null pointer to arena." ); 439 for ( arena_list_type::iterator it = arenas.begin(); it != arenas.end(); ++it ) 440 if ( a == &*it ) 441 return true; 442 return false; 443 } 444 445 /** This method must be invoked under my_arenas_list_mutex. **/ 446 bool market::is_arena_alive(arena* a) { 447 if ( !a ) 448 return false; 449 450 // Still cannot access internals of the arena since the object itself might be destroyed. 451 452 for ( unsigned idx = 0; idx < num_priority_levels; ++idx ) { 453 if ( is_arena_in_list( my_arenas[idx], a ) ) 454 return true; 455 } 456 return false; 457 } 458 459 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY 460 void market::enable_mandatory_concurrency_impl ( arena *a ) { 461 __TBB_ASSERT(!a->my_global_concurrency_mode.load(std::memory_order_relaxed), nullptr); 462 __TBB_ASSERT(my_num_workers_soft_limit.load(std::memory_order_relaxed) == 0, nullptr); 463 464 a->my_global_concurrency_mode.store(true, std::memory_order_relaxed); 465 my_mandatory_num_requested++; 466 } 467 468 void market::enable_mandatory_concurrency ( arena *a ) { 469 int delta = 0; 470 { 471 arenas_list_mutex_type::scoped_lock lock(my_arenas_list_mutex); 472 if (my_num_workers_soft_limit.load(std::memory_order_relaxed) != 0 || 473 a->my_global_concurrency_mode.load(std::memory_order_relaxed)) 474 return; 475 476 enable_mandatory_concurrency_impl(a); 477 delta = update_workers_request(); 478 } 479 480 if (delta != 0) 481 my_server->adjust_job_count_estimate(delta); 482 } 483 484 void market::disable_mandatory_concurrency_impl(arena* a) { 485 __TBB_ASSERT(a->my_global_concurrency_mode.load(std::memory_order_relaxed), nullptr); 486 __TBB_ASSERT(my_mandatory_num_requested > 0, nullptr); 487 488 a->my_global_concurrency_mode.store(false, std::memory_order_relaxed); 489 my_mandatory_num_requested--; 490 } 491 492 void market::mandatory_concurrency_disable ( arena *a ) { 493 int delta = 0; 494 { 495 arenas_list_mutex_type::scoped_lock lock(my_arenas_list_mutex); 496 if (!a->my_global_concurrency_mode.load(std::memory_order_relaxed)) 497 return; 498 // There is a racy window in advertise_new_work between mandtory concurrency enabling and 499 // setting SNAPSHOT_FULL. It gives a chance to spawn request to disable mandatory concurrency. 500 // Therefore, we double check that there is no enqueued tasks. 501 if (a->has_enqueued_tasks()) 502 return; 503 504 __TBB_ASSERT(my_num_workers_soft_limit.load(std::memory_order_relaxed) == 0, nullptr); 505 disable_mandatory_concurrency_impl(a); 506 507 delta = update_workers_request(); 508 } 509 if (delta != 0) 510 my_server->adjust_job_count_estimate(delta); 511 } 512 #endif /* __TBB_ENQUEUE_ENFORCED_CONCURRENCY */ 513 514 void market::adjust_demand ( arena& a, int delta, bool mandatory ) { 515 if (!delta) { 516 return; 517 } 518 int target_epoch{}; 519 { 520 arenas_list_mutex_type::scoped_lock lock(my_arenas_list_mutex); 521 __TBB_ASSERT(theMarket != nullptr, "market instance was destroyed prematurely?"); 522 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY 523 if (mandatory) { 524 __TBB_ASSERT(delta == 1 || delta == -1, nullptr); 525 // Count the number of mandatory requests and proceed only for 0->1 and 1->0 transitions. 526 a.my_local_concurrency_requests += delta; 527 if ((delta > 0 && a.my_local_concurrency_requests != 1) || 528 (delta < 0 && a.my_local_concurrency_requests != 0)) 529 { 530 return; 531 } 532 } 533 #endif 534 a.my_total_num_workers_requested += delta; 535 int target_workers = 0; 536 // Cap target_workers into interval [0, a.my_max_num_workers] 537 if (a.my_total_num_workers_requested > 0) { 538 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY 539 // At least one thread should be requested when mandatory concurrency 540 int max_num_workers = int(a.my_max_num_workers); 541 if (a.my_local_concurrency_requests > 0 && max_num_workers == 0) { 542 max_num_workers = 1; 543 } 544 #endif 545 target_workers = min(a.my_total_num_workers_requested, max_num_workers); 546 } 547 548 delta = target_workers - a.my_num_workers_requested; 549 550 if (delta == 0) { 551 return; 552 } 553 554 a.my_num_workers_requested += delta; 555 if (a.my_num_workers_requested == 0) { 556 a.my_num_workers_allotted.store(0, std::memory_order_relaxed); 557 } 558 559 int total_demand = my_total_demand.load(std::memory_order_relaxed) + delta; 560 my_total_demand.store(total_demand, std::memory_order_relaxed); 561 my_priority_level_demand[a.my_priority_level] += delta; 562 unsigned effective_soft_limit = my_num_workers_soft_limit.load(std::memory_order_relaxed); 563 if (my_mandatory_num_requested > 0) { 564 __TBB_ASSERT(effective_soft_limit == 0, nullptr); 565 effective_soft_limit = 1; 566 } 567 568 update_allotment(effective_soft_limit); 569 if (delta > 0) { 570 // can't overflow soft_limit, but remember values request by arenas in 571 // my_total_demand to not prematurely release workers to RML 572 if (my_num_workers_requested + delta > (int)effective_soft_limit) 573 delta = effective_soft_limit - my_num_workers_requested; 574 } 575 else { 576 // the number of workers should not be decreased below my_total_demand 577 if (my_num_workers_requested + delta < total_demand) 578 delta = min(total_demand, (int)effective_soft_limit) - my_num_workers_requested; 579 } 580 my_num_workers_requested += delta; 581 __TBB_ASSERT(my_num_workers_requested <= (int)effective_soft_limit, nullptr); 582 583 target_epoch = a.my_adjust_demand_target_epoch++; 584 } 585 586 a.my_adjust_demand_current_epoch.wait_until(target_epoch, /* context = */ target_epoch, std::memory_order_relaxed); 587 // Must be called outside of any locks 588 my_server->adjust_job_count_estimate( delta ); 589 a.my_adjust_demand_current_epoch.exchange(target_epoch + 1); 590 a.my_adjust_demand_current_epoch.notify_relaxed(target_epoch + 1); 591 } 592 593 void market::process( job& j ) { 594 thread_data& td = static_cast<thread_data&>(j); 595 // td.my_arena can be dead. Don't access it until arena_in_need is called 596 arena *a = td.my_arena; 597 for (int i = 0; i < 2; ++i) { 598 while ( (a = arena_in_need(a)) ) { 599 a->process(td); 600 } 601 // Workers leave market because there is no arena in need. It can happen earlier than 602 // adjust_job_count_estimate() decreases my_slack and RML can put this thread to sleep. 603 // It might result in a busy-loop checking for my_slack<0 and calling this method instantly. 604 // the yield refines this spinning. 605 if ( !i ) { 606 yield(); 607 } 608 } 609 } 610 611 void market::cleanup( job& j) { 612 market::enforce([this] { return theMarket != this; }, nullptr ); 613 governor::auto_terminate(&j); 614 } 615 616 void market::acknowledge_close_connection() { 617 destroy(); 618 } 619 620 ::rml::job* market::create_one_job() { 621 unsigned short index = ++my_first_unused_worker_idx; 622 __TBB_ASSERT( index > 0, nullptr); 623 ITT_THREAD_SET_NAME(_T("TBB Worker Thread")); 624 // index serves as a hint decreasing conflicts between workers when they migrate between arenas 625 thread_data* td = new(cache_aligned_allocate(sizeof(thread_data))) thread_data{ index, true }; 626 __TBB_ASSERT( index <= my_num_workers_hard_limit, nullptr); 627 __TBB_ASSERT( my_workers[index - 1].load(std::memory_order_relaxed) == nullptr, nullptr); 628 my_workers[index - 1].store(td, std::memory_order_release); 629 return td; 630 } 631 632 void market::add_external_thread(thread_data& td) { 633 context_state_propagation_mutex_type::scoped_lock lock(the_context_state_propagation_mutex); 634 my_masters.push_front(td); 635 } 636 637 void market::remove_external_thread(thread_data& td) { 638 context_state_propagation_mutex_type::scoped_lock lock(the_context_state_propagation_mutex); 639 my_masters.remove(td); 640 } 641 642 } // namespace r1 643 } // namespace detail 644 } // namespace tbb 645