1 /* 2 Copyright (c) 2005-2021 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #include "oneapi/tbb/global_control.h" // global_control::active_value 18 19 #include "market.h" 20 #include "main.h" 21 #include "governor.h" 22 #include "arena.h" 23 #include "thread_data.h" 24 #include "itt_notify.h" 25 26 #include <cstring> // std::memset() 27 28 namespace tbb { 29 namespace detail { 30 namespace r1 { 31 32 /** This method must be invoked under my_arenas_list_mutex. **/ 33 arena* market::select_next_arena( arena* hint ) { 34 unsigned next_arena_priority_level = num_priority_levels; 35 if ( hint ) 36 next_arena_priority_level = hint->my_priority_level; 37 for ( unsigned idx = 0; idx < next_arena_priority_level; ++idx ) { 38 if ( !my_arenas[idx].empty() ) 39 return &*my_arenas[idx].begin(); 40 } 41 // don't change if arena with higher priority is not found. 42 return hint; 43 } 44 45 void market::insert_arena_into_list ( arena& a ) { 46 __TBB_ASSERT( a.my_priority_level < num_priority_levels, nullptr ); 47 my_arenas[a.my_priority_level].push_front( a ); 48 __TBB_ASSERT( !my_next_arena || my_next_arena->my_priority_level < num_priority_levels, nullptr ); 49 my_next_arena = select_next_arena( my_next_arena ); 50 } 51 52 void market::remove_arena_from_list ( arena& a ) { 53 __TBB_ASSERT( a.my_priority_level < num_priority_levels, nullptr ); 54 my_arenas[a.my_priority_level].remove( a ); 55 if ( my_next_arena == &a ) 56 my_next_arena = nullptr; 57 my_next_arena = select_next_arena( my_next_arena ); 58 } 59 60 //------------------------------------------------------------------------ 61 // market 62 //------------------------------------------------------------------------ 63 64 market::market ( unsigned workers_soft_limit, unsigned workers_hard_limit, std::size_t stack_size ) 65 : my_num_workers_hard_limit(workers_hard_limit) 66 , my_num_workers_soft_limit(workers_soft_limit) 67 , my_next_arena(nullptr) 68 , my_ref_count(1) 69 , my_stack_size(stack_size) 70 , my_workers_soft_limit_to_report(workers_soft_limit) 71 { 72 // Once created RML server will start initializing workers that will need 73 // global market instance to get worker stack size 74 my_server = governor::create_rml_server( *this ); 75 __TBB_ASSERT( my_server, "Failed to create RML server" ); 76 } 77 78 static unsigned calc_workers_soft_limit(unsigned workers_soft_limit, unsigned workers_hard_limit) { 79 if( int soft_limit = market::app_parallelism_limit() ) 80 workers_soft_limit = soft_limit-1; 81 else // if user set no limits (yet), use market's parameter 82 workers_soft_limit = max( governor::default_num_threads() - 1, workers_soft_limit ); 83 if( workers_soft_limit >= workers_hard_limit ) 84 workers_soft_limit = workers_hard_limit-1; 85 return workers_soft_limit; 86 } 87 88 bool market::add_ref_unsafe( global_market_mutex_type::scoped_lock& lock, bool is_public, unsigned workers_requested, std::size_t stack_size ) { 89 market *m = theMarket; 90 if( m ) { 91 ++m->my_ref_count; 92 const unsigned old_public_count = is_public ? m->my_public_ref_count++ : /*any non-zero value*/1; 93 lock.release(); 94 if( old_public_count==0 ) 95 set_active_num_workers( calc_workers_soft_limit(workers_requested, m->my_num_workers_hard_limit) ); 96 97 // do not warn if default number of workers is requested 98 if( workers_requested != governor::default_num_threads()-1 ) { 99 __TBB_ASSERT( skip_soft_limit_warning > workers_requested, 100 "skip_soft_limit_warning must be larger than any valid workers_requested" ); 101 unsigned soft_limit_to_report = m->my_workers_soft_limit_to_report.load(std::memory_order_relaxed); 102 if( soft_limit_to_report < workers_requested ) { 103 runtime_warning( "The number of workers is currently limited to %u. " 104 "The request for %u workers is ignored. Further requests for more workers " 105 "will be silently ignored until the limit changes.\n", 106 soft_limit_to_report, workers_requested ); 107 // The race is possible when multiple threads report warnings. 108 // We are OK with that, as there are just multiple warnings. 109 unsigned expected_limit = soft_limit_to_report; 110 m->my_workers_soft_limit_to_report.compare_exchange_strong(expected_limit, skip_soft_limit_warning); 111 } 112 113 } 114 if( m->my_stack_size < stack_size ) 115 runtime_warning( "Thread stack size has been already set to %u. " 116 "The request for larger stack (%u) cannot be satisfied.\n", m->my_stack_size, stack_size ); 117 return true; 118 } 119 return false; 120 } 121 122 market& market::global_market(bool is_public, unsigned workers_requested, std::size_t stack_size) { 123 global_market_mutex_type::scoped_lock lock( theMarketMutex ); 124 if( !market::add_ref_unsafe(lock, is_public, workers_requested, stack_size) ) { 125 // TODO: A lot is done under theMarketMutex locked. Can anything be moved out? 126 if( stack_size == 0 ) 127 stack_size = global_control::active_value(global_control::thread_stack_size); 128 // Expecting that 4P is suitable for most applications. 129 // Limit to 2P for large thread number. 130 // TODO: ask RML for max concurrency and possibly correct hard_limit 131 const unsigned factor = governor::default_num_threads()<=128? 4 : 2; 132 // The requested number of threads is intentionally not considered in 133 // computation of the hard limit, in order to separate responsibilities 134 // and avoid complicated interactions between global_control and task_scheduler_init. 135 // The market guarantees that at least 256 threads might be created. 136 const unsigned workers_hard_limit = max(max(factor*governor::default_num_threads(), 256u), app_parallelism_limit()); 137 const unsigned workers_soft_limit = calc_workers_soft_limit(workers_requested, workers_hard_limit); 138 // Create the global market instance 139 std::size_t size = sizeof(market); 140 __TBB_ASSERT( __TBB_offsetof(market, my_workers) + sizeof(thread_data*) == sizeof(market), 141 "my_workers must be the last data field of the market class"); 142 size += sizeof(thread_data*) * (workers_hard_limit - 1); 143 __TBB_InitOnce::add_ref(); 144 void* storage = cache_aligned_allocate(size); 145 std::memset( storage, 0, size ); 146 // Initialize and publish global market 147 market* m = new (storage) market( workers_soft_limit, workers_hard_limit, stack_size ); 148 if( is_public ) 149 m->my_public_ref_count.store(1, std::memory_order_relaxed); 150 #if __TBB_SUPPORTS_WORKERS_WAITING_IN_TERMINATE 151 if (market::is_lifetime_control_present()) { 152 ++m->my_public_ref_count; 153 ++m->my_ref_count; 154 } 155 #endif // __TBB_SUPPORTS_WORKERS_WAITING_IN_TERMINATE 156 theMarket = m; 157 // This check relies on the fact that for shared RML default_concurrency==max_concurrency 158 if ( !governor::UsePrivateRML && m->my_server->default_concurrency() < workers_soft_limit ) 159 runtime_warning( "RML might limit the number of workers to %u while %u is requested.\n" 160 , m->my_server->default_concurrency(), workers_soft_limit ); 161 } 162 return *theMarket; 163 } 164 165 void market::destroy () { 166 this->market::~market(); // qualified to suppress warning 167 cache_aligned_deallocate( this ); 168 __TBB_InitOnce::remove_ref(); 169 } 170 171 bool market::release ( bool is_public, bool blocking_terminate ) { 172 market::enforce([this] { return theMarket == this; }, "Global market instance was destroyed prematurely?"); 173 bool do_release = false; 174 { 175 global_market_mutex_type::scoped_lock lock( theMarketMutex ); 176 if ( blocking_terminate ) { 177 __TBB_ASSERT( is_public, "Only an object with a public reference can request the blocking terminate" ); 178 while ( my_public_ref_count.load(std::memory_order_relaxed) == 1 && 179 my_ref_count.load(std::memory_order_relaxed) > 1 ) { 180 lock.release(); 181 // To guarantee that request_close_connection() is called by the last external thread, we need to wait till all 182 // references are released. Re-read my_public_ref_count to limit waiting if new external threads are created. 183 // Theoretically, new private references to the market can be added during waiting making it potentially 184 // endless. 185 // TODO: revise why the weak scheduler needs market's pointer and try to remove this wait. 186 // Note that the market should know about its schedulers for cancellation/exception/priority propagation, 187 // see e.g. task_group_context::cancel_group_execution() 188 while ( my_public_ref_count.load(std::memory_order_acquire) == 1 && 189 my_ref_count.load(std::memory_order_acquire) > 1 ) { 190 yield(); 191 } 192 lock.acquire( theMarketMutex ); 193 } 194 } 195 if ( is_public ) { 196 __TBB_ASSERT( theMarket == this, "Global market instance was destroyed prematurely?" ); 197 __TBB_ASSERT( my_public_ref_count.load(std::memory_order_relaxed), NULL ); 198 --my_public_ref_count; 199 } 200 if ( --my_ref_count == 0 ) { 201 __TBB_ASSERT( !my_public_ref_count.load(std::memory_order_relaxed), NULL ); 202 do_release = true; 203 theMarket = NULL; 204 } 205 } 206 if( do_release ) { 207 __TBB_ASSERT( !my_public_ref_count.load(std::memory_order_relaxed), 208 "No public references remain if we remove the market." ); 209 // inform RML that blocking termination is required 210 my_join_workers = blocking_terminate; 211 my_server->request_close_connection(); 212 return blocking_terminate; 213 } 214 return false; 215 } 216 217 int market::update_workers_request() { 218 int old_request = my_num_workers_requested; 219 my_num_workers_requested = min(my_total_demand.load(std::memory_order_relaxed), 220 (int)my_num_workers_soft_limit.load(std::memory_order_relaxed)); 221 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY 222 if (my_mandatory_num_requested > 0) { 223 __TBB_ASSERT(my_num_workers_soft_limit.load(std::memory_order_relaxed) == 0, NULL); 224 my_num_workers_requested = 1; 225 } 226 #endif 227 update_allotment(my_num_workers_requested); 228 return my_num_workers_requested - old_request; 229 } 230 231 void market::set_active_num_workers ( unsigned soft_limit ) { 232 market *m; 233 234 { 235 global_market_mutex_type::scoped_lock lock( theMarketMutex ); 236 if ( !theMarket ) 237 return; // actual value will be used at market creation 238 m = theMarket; 239 if (m->my_num_workers_soft_limit.load(std::memory_order_relaxed) == soft_limit) 240 return; 241 ++m->my_ref_count; 242 } 243 // have my_ref_count for market, use it safely 244 245 int delta = 0; 246 { 247 arenas_list_mutex_type::scoped_lock lock( m->my_arenas_list_mutex ); 248 __TBB_ASSERT(soft_limit <= m->my_num_workers_hard_limit, NULL); 249 250 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY 251 arena_list_type* arenas = m->my_arenas; 252 253 if (m->my_num_workers_soft_limit.load(std::memory_order_relaxed) == 0 && 254 m->my_mandatory_num_requested > 0) 255 { 256 for (unsigned level = 0; level < num_priority_levels; ++level ) 257 for (arena_list_type::iterator it = arenas[level].begin(); it != arenas[level].end(); ++it) 258 if (it->my_global_concurrency_mode.load(std::memory_order_relaxed)) 259 m->disable_mandatory_concurrency_impl(&*it); 260 } 261 __TBB_ASSERT(m->my_mandatory_num_requested == 0, NULL); 262 #endif 263 264 m->my_num_workers_soft_limit.store(soft_limit, std::memory_order_release); 265 // report only once after new soft limit value is set 266 m->my_workers_soft_limit_to_report.store(soft_limit, std::memory_order_relaxed); 267 268 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY 269 if (m->my_num_workers_soft_limit.load(std::memory_order_relaxed) == 0) { 270 for (unsigned level = 0; level < num_priority_levels; ++level ) 271 for (arena_list_type::iterator it = arenas[level].begin(); it != arenas[level].end(); ++it) 272 if (it->has_enqueued_tasks()) 273 m->enable_mandatory_concurrency_impl(&*it); 274 } 275 #endif 276 277 delta = m->update_workers_request(); 278 } 279 // adjust_job_count_estimate must be called outside of any locks 280 if( delta!=0 ) 281 m->my_server->adjust_job_count_estimate( delta ); 282 // release internal market reference to match ++m->my_ref_count above 283 m->release( /*is_public=*/false, /*blocking_terminate=*/false ); 284 } 285 286 bool governor::does_client_join_workers (const rml::tbb_client &client) { 287 return ((const market&)client).must_join_workers(); 288 } 289 290 arena* market::create_arena ( int num_slots, int num_reserved_slots, unsigned arena_priority_level, 291 std::size_t stack_size ) 292 { 293 __TBB_ASSERT( num_slots > 0, NULL ); 294 __TBB_ASSERT( num_reserved_slots <= num_slots, NULL ); 295 // Add public market reference for an external thread/task_arena (that adds an internal reference in exchange). 296 market &m = global_market( /*is_public=*/true, num_slots-num_reserved_slots, stack_size ); 297 arena& a = arena::allocate_arena( m, num_slots, num_reserved_slots, arena_priority_level ); 298 // Add newly created arena into the existing market's list. 299 arenas_list_mutex_type::scoped_lock lock(m.my_arenas_list_mutex); 300 m.insert_arena_into_list(a); 301 return &a; 302 } 303 304 /** This method must be invoked under my_arenas_list_mutex. **/ 305 void market::detach_arena ( arena& a ) { 306 market::enforce([this] { return theMarket == this; }, "Global market instance was destroyed prematurely?"); 307 __TBB_ASSERT( !a.my_slots[0].is_occupied(), NULL ); 308 if (a.my_global_concurrency_mode.load(std::memory_order_relaxed)) 309 disable_mandatory_concurrency_impl(&a); 310 311 remove_arena_from_list(a); 312 if (a.my_aba_epoch == my_arenas_aba_epoch.load(std::memory_order_relaxed)) { 313 my_arenas_aba_epoch.store(my_arenas_aba_epoch.load(std::memory_order_relaxed) + 1, std::memory_order_relaxed); 314 } 315 } 316 317 void market::try_destroy_arena ( arena* a, uintptr_t aba_epoch, unsigned priority_level ) { 318 bool locked = true; 319 __TBB_ASSERT( a, NULL ); 320 // we hold reference to the market, so it cannot be destroyed at any moment here 321 market::enforce([this] { return theMarket == this; }, NULL); 322 __TBB_ASSERT( my_ref_count!=0, NULL ); 323 my_arenas_list_mutex.lock(); 324 arena_list_type::iterator it = my_arenas[priority_level].begin(); 325 for ( ; it != my_arenas[priority_level].end(); ++it ) { 326 if ( a == &*it ) { 327 if ( it->my_aba_epoch == aba_epoch ) { 328 // Arena is alive 329 if ( !a->my_num_workers_requested && !a->my_references.load(std::memory_order_relaxed) ) { 330 __TBB_ASSERT( 331 !a->my_num_workers_allotted.load(std::memory_order_relaxed) && 332 (a->my_pool_state == arena::SNAPSHOT_EMPTY || !a->my_max_num_workers), 333 "Inconsistent arena state" 334 ); 335 // Arena is abandoned. Destroy it. 336 detach_arena( *a ); 337 my_arenas_list_mutex.unlock(); 338 locked = false; 339 a->free_arena(); 340 } 341 } 342 if (locked) 343 my_arenas_list_mutex.unlock(); 344 return; 345 } 346 } 347 my_arenas_list_mutex.unlock(); 348 } 349 350 /** This method must be invoked under my_arenas_list_mutex. **/ 351 arena* market::arena_in_need ( arena_list_type* arenas, arena* hint ) { 352 // TODO: make sure arena with higher priority returned only if there are available slots in it. 353 hint = select_next_arena( hint ); 354 if ( !hint ) 355 return nullptr; 356 arena_list_type::iterator it = hint; 357 unsigned curr_priority_level = hint->my_priority_level; 358 __TBB_ASSERT( it != arenas[curr_priority_level].end(), nullptr ); 359 do { 360 arena& a = *it; 361 if ( ++it == arenas[curr_priority_level].end() ) { 362 do { 363 ++curr_priority_level %= num_priority_levels; 364 } while ( arenas[curr_priority_level].empty() ); 365 it = arenas[curr_priority_level].begin(); 366 } 367 if( a.num_workers_active() < a.my_num_workers_allotted.load(std::memory_order_relaxed) ) { 368 a.my_references += arena::ref_worker; 369 return &a; 370 } 371 } while ( it != hint ); 372 return nullptr; 373 } 374 375 arena* market::arena_in_need(arena* prev) { 376 if (my_total_demand.load(std::memory_order_acquire) <= 0) 377 return nullptr; 378 arenas_list_mutex_type::scoped_lock lock(my_arenas_list_mutex, /*is_writer=*/false); 379 // TODO: introduce three state response: alive, not_alive, no_market_arenas 380 if ( is_arena_alive(prev) ) 381 return arena_in_need(my_arenas, prev); 382 return arena_in_need(my_arenas, my_next_arena); 383 } 384 385 int market::update_allotment ( arena_list_type* arenas, int workers_demand, int max_workers ) { 386 __TBB_ASSERT( workers_demand > 0, nullptr ); 387 max_workers = min(workers_demand, max_workers); 388 int unassigned_workers = max_workers; 389 int assigned = 0; 390 int carry = 0; 391 unsigned max_priority_level = num_priority_levels; 392 for (unsigned list_idx = 0; list_idx < num_priority_levels; ++list_idx ) { 393 int assigned_per_priority = min(my_priority_level_demand[list_idx], unassigned_workers); 394 unassigned_workers -= assigned_per_priority; 395 for (arena_list_type::iterator it = arenas[list_idx].begin(); it != arenas[list_idx].end(); ++it) { 396 arena& a = *it; 397 __TBB_ASSERT(a.my_num_workers_requested >= 0, nullptr); 398 __TBB_ASSERT(a.my_num_workers_requested <= int(a.my_max_num_workers) 399 || (a.my_max_num_workers == 0 && a.my_local_concurrency_requests > 0 && a.my_num_workers_requested == 1), nullptr); 400 if (a.my_num_workers_requested == 0) { 401 __TBB_ASSERT(!a.my_num_workers_allotted.load(std::memory_order_relaxed), nullptr); 402 continue; 403 } 404 405 if (max_priority_level == num_priority_levels) { 406 max_priority_level = list_idx; 407 } 408 409 int allotted = 0; 410 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY 411 if (my_num_workers_soft_limit.load(std::memory_order_relaxed) == 0) { 412 __TBB_ASSERT(max_workers == 0 || max_workers == 1, nullptr); 413 allotted = a.my_global_concurrency_mode.load(std::memory_order_relaxed) && 414 assigned < max_workers ? 1 : 0; 415 } else 416 #endif 417 { 418 int tmp = a.my_num_workers_requested * assigned_per_priority + carry; 419 allotted = tmp / my_priority_level_demand[list_idx]; 420 carry = tmp % my_priority_level_demand[list_idx]; 421 __TBB_ASSERT(allotted <= a.my_num_workers_requested, nullptr); 422 __TBB_ASSERT(allotted <= int(a.my_num_slots - a.my_num_reserved_slots), nullptr); 423 } 424 a.my_num_workers_allotted.store(allotted, std::memory_order_relaxed); 425 a.my_is_top_priority.store(list_idx == max_priority_level, std::memory_order_relaxed); 426 assigned += allotted; 427 } 428 } 429 __TBB_ASSERT( 0 <= assigned && assigned <= max_workers, nullptr ); 430 return assigned; 431 } 432 433 /** This method must be invoked under my_arenas_list_mutex. **/ 434 bool market::is_arena_in_list( arena_list_type &arenas, arena *a ) { 435 __TBB_ASSERT( a, "Expected non-null pointer to arena." ); 436 for ( arena_list_type::iterator it = arenas.begin(); it != arenas.end(); ++it ) 437 if ( a == &*it ) 438 return true; 439 return false; 440 } 441 442 /** This method must be invoked under my_arenas_list_mutex. **/ 443 bool market::is_arena_alive(arena* a) { 444 if ( !a ) 445 return false; 446 447 // Still cannot access internals of the arena since the object itself might be destroyed. 448 449 for ( unsigned idx = 0; idx < num_priority_levels; ++idx ) { 450 if ( is_arena_in_list( my_arenas[idx], a ) ) 451 return true; 452 } 453 return false; 454 } 455 456 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY 457 void market::enable_mandatory_concurrency_impl ( arena *a ) { 458 __TBB_ASSERT(!a->my_global_concurrency_mode.load(std::memory_order_relaxed), NULL); 459 __TBB_ASSERT(my_num_workers_soft_limit.load(std::memory_order_relaxed) == 0, NULL); 460 461 a->my_global_concurrency_mode.store(true, std::memory_order_relaxed); 462 my_mandatory_num_requested++; 463 } 464 465 void market::enable_mandatory_concurrency ( arena *a ) { 466 int delta = 0; 467 { 468 arenas_list_mutex_type::scoped_lock lock(my_arenas_list_mutex); 469 if (my_num_workers_soft_limit.load(std::memory_order_relaxed) != 0 || 470 a->my_global_concurrency_mode.load(std::memory_order_relaxed)) 471 return; 472 473 enable_mandatory_concurrency_impl(a); 474 delta = update_workers_request(); 475 } 476 477 if (delta != 0) 478 my_server->adjust_job_count_estimate(delta); 479 } 480 481 void market::disable_mandatory_concurrency_impl(arena* a) { 482 __TBB_ASSERT(a->my_global_concurrency_mode.load(std::memory_order_relaxed), NULL); 483 __TBB_ASSERT(my_mandatory_num_requested > 0, NULL); 484 485 a->my_global_concurrency_mode.store(false, std::memory_order_relaxed); 486 my_mandatory_num_requested--; 487 } 488 489 void market::mandatory_concurrency_disable ( arena *a ) { 490 int delta = 0; 491 { 492 arenas_list_mutex_type::scoped_lock lock(my_arenas_list_mutex); 493 if (!a->my_global_concurrency_mode.load(std::memory_order_relaxed)) 494 return; 495 // There is a racy window in advertise_new_work between mandtory concurrency enabling and 496 // setting SNAPSHOT_FULL. It gives a chance to spawn request to disable mandatory concurrency. 497 // Therefore, we double check that there is no enqueued tasks. 498 if (a->has_enqueued_tasks()) 499 return; 500 501 __TBB_ASSERT(my_num_workers_soft_limit.load(std::memory_order_relaxed) == 0, NULL); 502 disable_mandatory_concurrency_impl(a); 503 504 delta = update_workers_request(); 505 } 506 if (delta != 0) 507 my_server->adjust_job_count_estimate(delta); 508 } 509 #endif /* __TBB_ENQUEUE_ENFORCED_CONCURRENCY */ 510 511 void market::adjust_demand ( arena& a, int delta, bool mandatory ) { 512 if (!delta) { 513 return; 514 } 515 int target_epoch{}; 516 { 517 arenas_list_mutex_type::scoped_lock lock(my_arenas_list_mutex); 518 __TBB_ASSERT(theMarket != nullptr, "market instance was destroyed prematurely?"); 519 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY 520 if (mandatory) { 521 __TBB_ASSERT(delta == 1 || delta == -1, nullptr); 522 // Count the number of mandatory requests and proceed only for 0->1 and 1->0 transitions. 523 a.my_local_concurrency_requests += delta; 524 if ((delta > 0 && a.my_local_concurrency_requests != 1) || 525 (delta < 0 && a.my_local_concurrency_requests != 0)) 526 { 527 return; 528 } 529 } 530 #endif 531 a.my_total_num_workers_requested += delta; 532 int target_workers = 0; 533 // Cap target_workers into interval [0, a.my_max_num_workers] 534 if (a.my_total_num_workers_requested > 0) { 535 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY 536 // At least one thread should be requested when mandatory concurrency 537 int max_num_workers = int(a.my_max_num_workers); 538 if (a.my_local_concurrency_requests > 0 && max_num_workers == 0) { 539 max_num_workers = 1; 540 } 541 #endif 542 target_workers = min(a.my_total_num_workers_requested, max_num_workers); 543 } 544 545 delta = target_workers - a.my_num_workers_requested; 546 547 if (delta == 0) { 548 return; 549 } 550 551 a.my_num_workers_requested += delta; 552 if (a.my_num_workers_requested == 0) { 553 a.my_num_workers_allotted.store(0, std::memory_order_relaxed); 554 } 555 556 int total_demand = my_total_demand.load(std::memory_order_relaxed) + delta; 557 my_total_demand.store(total_demand, std::memory_order_relaxed); 558 my_priority_level_demand[a.my_priority_level] += delta; 559 unsigned effective_soft_limit = my_num_workers_soft_limit.load(std::memory_order_relaxed); 560 if (my_mandatory_num_requested > 0) { 561 __TBB_ASSERT(effective_soft_limit == 0, NULL); 562 effective_soft_limit = 1; 563 } 564 565 update_allotment(effective_soft_limit); 566 if (delta > 0) { 567 // can't overflow soft_limit, but remember values request by arenas in 568 // my_total_demand to not prematurely release workers to RML 569 if (my_num_workers_requested + delta > (int)effective_soft_limit) 570 delta = effective_soft_limit - my_num_workers_requested; 571 } 572 else { 573 // the number of workers should not be decreased below my_total_demand 574 if (my_num_workers_requested + delta < total_demand) 575 delta = min(total_demand, (int)effective_soft_limit) - my_num_workers_requested; 576 } 577 my_num_workers_requested += delta; 578 __TBB_ASSERT(my_num_workers_requested <= (int)effective_soft_limit, NULL); 579 580 target_epoch = my_adjust_demand_target_epoch++; 581 } 582 583 my_adjust_demand_current_epoch.wait_until(target_epoch, /* context = */ target_epoch, std::memory_order_acquire); 584 // Must be called outside of any locks 585 my_server->adjust_job_count_estimate( delta ); 586 my_adjust_demand_current_epoch.exchange(target_epoch + 1); 587 my_adjust_demand_current_epoch.notify_relaxed(target_epoch + 1); 588 } 589 590 void market::process( job& j ) { 591 thread_data& td = static_cast<thread_data&>(j); 592 // td.my_arena can be dead. Don't access it until arena_in_need is called 593 arena *a = td.my_arena; 594 for (int i = 0; i < 2; ++i) { 595 while ( (a = arena_in_need(a)) ) { 596 a->process(td); 597 } 598 // Workers leave market because there is no arena in need. It can happen earlier than 599 // adjust_job_count_estimate() decreases my_slack and RML can put this thread to sleep. 600 // It might result in a busy-loop checking for my_slack<0 and calling this method instantly. 601 // the yield refines this spinning. 602 if ( !i ) { 603 yield(); 604 } 605 } 606 } 607 608 void market::cleanup( job& j) { 609 market::enforce([this] { return theMarket != this; }, NULL ); 610 governor::auto_terminate(&j); 611 } 612 613 void market::acknowledge_close_connection() { 614 destroy(); 615 } 616 617 ::rml::job* market::create_one_job() { 618 unsigned short index = ++my_first_unused_worker_idx; 619 __TBB_ASSERT( index > 0, NULL ); 620 ITT_THREAD_SET_NAME(_T("TBB Worker Thread")); 621 // index serves as a hint decreasing conflicts between workers when they migrate between arenas 622 thread_data* td = new(cache_aligned_allocate(sizeof(thread_data))) thread_data{ index, true }; 623 __TBB_ASSERT( index <= my_num_workers_hard_limit, NULL ); 624 __TBB_ASSERT( my_workers[index - 1] == nullptr, NULL ); 625 my_workers[index - 1] = td; 626 return td; 627 } 628 629 void market::add_external_thread(thread_data& td) { 630 context_state_propagation_mutex_type::scoped_lock lock(the_context_state_propagation_mutex); 631 my_masters.push_front(td); 632 } 633 634 void market::remove_external_thread(thread_data& td) { 635 context_state_propagation_mutex_type::scoped_lock lock(the_context_state_propagation_mutex); 636 my_masters.remove(td); 637 } 638 639 } // namespace r1 640 } // namespace detail 641 } // namespace tbb 642