1 /* 2 Copyright (c) 2005-2020 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #include "oneapi/tbb/global_control.h" // global_control::active_value 18 19 #include "market.h" 20 #include "main.h" 21 #include "governor.h" 22 #include "arena.h" 23 #include "thread_data.h" 24 #include "itt_notify.h" 25 26 #include <cstring> // std::memset() 27 28 namespace tbb { 29 namespace detail { 30 namespace r1 { 31 32 /** This method must be invoked under my_arenas_list_mutex. **/ 33 arena* market::select_next_arena( arena* hint ) { 34 unsigned next_arena_priority_level = num_priority_levels; 35 if ( hint ) 36 next_arena_priority_level = hint->my_priority_level; 37 for ( unsigned idx = 0; idx < next_arena_priority_level; ++idx ) { 38 if ( !my_arenas[idx].empty() ) 39 return &*my_arenas[idx].begin(); 40 } 41 // don't change if arena with higher priority is not found. 42 return hint; 43 } 44 45 void market::insert_arena_into_list ( arena& a ) { 46 __TBB_ASSERT( a.my_priority_level < num_priority_levels, nullptr ); 47 my_arenas[a.my_priority_level].push_front( a ); 48 __TBB_ASSERT( !my_next_arena || my_next_arena->my_priority_level < num_priority_levels, nullptr ); 49 my_next_arena = select_next_arena( my_next_arena ); 50 } 51 52 void market::remove_arena_from_list ( arena& a ) { 53 __TBB_ASSERT( a.my_priority_level < num_priority_levels, nullptr ); 54 my_arenas[a.my_priority_level].remove( a ); 55 if ( my_next_arena == &a ) 56 my_next_arena = nullptr; 57 my_next_arena = select_next_arena( my_next_arena ); 58 } 59 60 //------------------------------------------------------------------------ 61 // market 62 //------------------------------------------------------------------------ 63 64 market::market ( unsigned workers_soft_limit, unsigned workers_hard_limit, std::size_t stack_size ) 65 : my_num_workers_hard_limit(workers_hard_limit) 66 , my_num_workers_soft_limit(workers_soft_limit) 67 , my_next_arena(nullptr) 68 , my_ref_count(1) 69 , my_stack_size(stack_size) 70 , my_workers_soft_limit_to_report(workers_soft_limit) 71 { 72 // Once created RML server will start initializing workers that will need 73 // global market instance to get worker stack size 74 my_server = governor::create_rml_server( *this ); 75 __TBB_ASSERT( my_server, "Failed to create RML server" ); 76 } 77 78 static unsigned calc_workers_soft_limit(unsigned workers_soft_limit, unsigned workers_hard_limit) { 79 if( int soft_limit = market::app_parallelism_limit() ) 80 workers_soft_limit = soft_limit-1; 81 else // if user set no limits (yet), use market's parameter 82 workers_soft_limit = max( governor::default_num_threads() - 1, workers_soft_limit ); 83 if( workers_soft_limit >= workers_hard_limit ) 84 workers_soft_limit = workers_hard_limit-1; 85 return workers_soft_limit; 86 } 87 88 bool market::add_ref_unsafe( global_market_mutex_type::scoped_lock& lock, bool is_public, unsigned workers_requested, std::size_t stack_size ) { 89 market *m = theMarket; 90 if( m ) { 91 ++m->my_ref_count; 92 const unsigned old_public_count = is_public ? m->my_public_ref_count++ : /*any non-zero value*/1; 93 lock.release(); 94 if( old_public_count==0 ) 95 set_active_num_workers( calc_workers_soft_limit(workers_requested, m->my_num_workers_hard_limit) ); 96 97 // do not warn if default number of workers is requested 98 if( workers_requested != governor::default_num_threads()-1 ) { 99 __TBB_ASSERT( skip_soft_limit_warning > workers_requested, 100 "skip_soft_limit_warning must be larger than any valid workers_requested" ); 101 unsigned soft_limit_to_report = m->my_workers_soft_limit_to_report.load(std::memory_order_relaxed); 102 if( soft_limit_to_report < workers_requested ) { 103 runtime_warning( "The number of workers is currently limited to %u. " 104 "The request for %u workers is ignored. Further requests for more workers " 105 "will be silently ignored until the limit changes.\n", 106 soft_limit_to_report, workers_requested ); 107 // The race is possible when multiple threads report warnings. 108 // We are OK with that, as there are just multiple warnings. 109 unsigned expected_limit = soft_limit_to_report; 110 m->my_workers_soft_limit_to_report.compare_exchange_strong(expected_limit, skip_soft_limit_warning); 111 } 112 113 } 114 if( m->my_stack_size < stack_size ) 115 runtime_warning( "Thread stack size has been already set to %u. " 116 "The request for larger stack (%u) cannot be satisfied.\n", m->my_stack_size, stack_size ); 117 return true; 118 } 119 return false; 120 } 121 122 market& market::global_market(bool is_public, unsigned workers_requested, std::size_t stack_size) { 123 global_market_mutex_type::scoped_lock lock( theMarketMutex ); 124 if( !market::add_ref_unsafe(lock, is_public, workers_requested, stack_size) ) { 125 // TODO: A lot is done under theMarketMutex locked. Can anything be moved out? 126 if( stack_size == 0 ) 127 stack_size = global_control::active_value(global_control::thread_stack_size); 128 // Expecting that 4P is suitable for most applications. 129 // Limit to 2P for large thread number. 130 // TODO: ask RML for max concurrency and possibly correct hard_limit 131 const unsigned factor = governor::default_num_threads()<=128? 4 : 2; 132 // The requested number of threads is intentionally not considered in 133 // computation of the hard limit, in order to separate responsibilities 134 // and avoid complicated interactions between global_control and task_scheduler_init. 135 // The market guarantees that at least 256 threads might be created. 136 const unsigned workers_hard_limit = max(max(factor*governor::default_num_threads(), 256u), app_parallelism_limit()); 137 const unsigned workers_soft_limit = calc_workers_soft_limit(workers_requested, workers_hard_limit); 138 // Create the global market instance 139 std::size_t size = sizeof(market); 140 __TBB_ASSERT( __TBB_offsetof(market, my_workers) + sizeof(thread_data*) == sizeof(market), 141 "my_workers must be the last data field of the market class"); 142 size += sizeof(thread_data*) * (workers_hard_limit - 1); 143 __TBB_InitOnce::add_ref(); 144 void* storage = cache_aligned_allocate(size); 145 std::memset( storage, 0, size ); 146 // Initialize and publish global market 147 market* m = new (storage) market( workers_soft_limit, workers_hard_limit, stack_size ); 148 if( is_public ) 149 m->my_public_ref_count.store(1, std::memory_order_relaxed); 150 #if __TBB_SUPPORTS_WORKERS_WAITING_IN_TERMINATE 151 if (market::is_lifetime_control_present()) { 152 ++m->my_public_ref_count; 153 ++m->my_ref_count; 154 } 155 #endif // __TBB_SUPPORTS_WORKERS_WAITING_IN_TERMINATE 156 theMarket = m; 157 // This check relies on the fact that for shared RML default_concurrency==max_concurrency 158 if ( !governor::UsePrivateRML && m->my_server->default_concurrency() < workers_soft_limit ) 159 runtime_warning( "RML might limit the number of workers to %u while %u is requested.\n" 160 , m->my_server->default_concurrency(), workers_soft_limit ); 161 } 162 return *theMarket; 163 } 164 165 void market::destroy () { 166 this->market::~market(); // qualified to suppress warning 167 cache_aligned_deallocate( this ); 168 __TBB_InitOnce::remove_ref(); 169 } 170 171 bool market::release ( bool is_public, bool blocking_terminate ) { 172 market::enforce([this] { return theMarket == this; }, "Global market instance was destroyed prematurely?"); 173 bool do_release = false; 174 { 175 global_market_mutex_type::scoped_lock lock( theMarketMutex ); 176 if ( blocking_terminate ) { 177 __TBB_ASSERT( is_public, "Only an object with a public reference can request the blocking terminate" ); 178 while ( my_public_ref_count.load(std::memory_order_relaxed) == 1 && 179 my_ref_count.load(std::memory_order_relaxed) > 1 ) { 180 lock.release(); 181 // To guarantee that request_close_connection() is called by the last master, we need to wait till all 182 // references are released. Re-read my_public_ref_count to limit waiting if new masters are created. 183 // Theoretically, new private references to the market can be added during waiting making it potentially 184 // endless. 185 // TODO: revise why the weak scheduler needs market's pointer and try to remove this wait. 186 // Note that the market should know about its schedulers for cancellation/exception/priority propagation, 187 // see e.g. task_group_context::cancel_group_execution() 188 while ( my_public_ref_count.load(std::memory_order_acquire) == 1 && 189 my_ref_count.load(std::memory_order_acquire) > 1 ) { 190 yield(); 191 } 192 lock.acquire( theMarketMutex ); 193 } 194 } 195 if ( is_public ) { 196 __TBB_ASSERT( theMarket == this, "Global market instance was destroyed prematurely?" ); 197 __TBB_ASSERT( my_public_ref_count.load(std::memory_order_relaxed), NULL ); 198 --my_public_ref_count; 199 } 200 if ( --my_ref_count == 0 ) { 201 __TBB_ASSERT( !my_public_ref_count.load(std::memory_order_relaxed), NULL ); 202 do_release = true; 203 theMarket = NULL; 204 } 205 } 206 if( do_release ) { 207 __TBB_ASSERT( !my_public_ref_count.load(std::memory_order_relaxed), 208 "No public references remain if we remove the market." ); 209 // inform RML that blocking termination is required 210 my_join_workers = blocking_terminate; 211 my_server->request_close_connection(); 212 return blocking_terminate; 213 } 214 return false; 215 } 216 217 int market::update_workers_request() { 218 int old_request = my_num_workers_requested; 219 my_num_workers_requested = min(my_total_demand.load(std::memory_order_relaxed), 220 (int)my_num_workers_soft_limit.load(std::memory_order_relaxed)); 221 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY 222 if (my_mandatory_num_requested > 0) { 223 __TBB_ASSERT(my_num_workers_soft_limit.load(std::memory_order_relaxed) == 0, NULL); 224 my_num_workers_requested = 1; 225 } 226 #endif 227 update_allotment(my_num_workers_requested); 228 return my_num_workers_requested - old_request; 229 } 230 231 void market::set_active_num_workers ( unsigned soft_limit ) { 232 market *m; 233 234 { 235 global_market_mutex_type::scoped_lock lock( theMarketMutex ); 236 if ( !theMarket ) 237 return; // actual value will be used at market creation 238 m = theMarket; 239 if (m->my_num_workers_soft_limit.load(std::memory_order_relaxed) == soft_limit) 240 return; 241 ++m->my_ref_count; 242 } 243 // have my_ref_count for market, use it safely 244 245 int delta = 0; 246 { 247 arenas_list_mutex_type::scoped_lock lock( m->my_arenas_list_mutex ); 248 __TBB_ASSERT(soft_limit <= m->my_num_workers_hard_limit, NULL); 249 250 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY 251 arena_list_type* arenas = m->my_arenas; 252 253 if (m->my_num_workers_soft_limit.load(std::memory_order_relaxed) == 0 && 254 m->my_mandatory_num_requested > 0) 255 { 256 for (unsigned level = 0; level < num_priority_levels; ++level ) 257 for (arena_list_type::iterator it = arenas[level].begin(); it != arenas[level].end(); ++it) 258 if (it->my_global_concurrency_mode.load(std::memory_order_relaxed)) 259 m->disable_mandatory_concurrency_impl(&*it); 260 } 261 __TBB_ASSERT(m->my_mandatory_num_requested == 0, NULL); 262 #endif 263 264 m->my_num_workers_soft_limit.store(soft_limit, std::memory_order_release); 265 // report only once after new soft limit value is set 266 m->my_workers_soft_limit_to_report.store(soft_limit, std::memory_order_relaxed); 267 268 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY 269 if (m->my_num_workers_soft_limit.load(std::memory_order_relaxed) == 0) { 270 for (unsigned level = 0; level < num_priority_levels; ++level ) 271 for (arena_list_type::iterator it = arenas[level].begin(); it != arenas[level].end(); ++it) 272 if (it->has_enqueued_tasks()) 273 m->enable_mandatory_concurrency_impl(&*it); 274 } 275 #endif 276 277 delta = m->update_workers_request(); 278 } 279 // adjust_job_count_estimate must be called outside of any locks 280 if( delta!=0 ) 281 m->my_server->adjust_job_count_estimate( delta ); 282 // release internal market reference to match ++m->my_ref_count above 283 m->release( /*is_public=*/false, /*blocking_terminate=*/false ); 284 } 285 286 bool governor::does_client_join_workers (const rml::tbb_client &client) { 287 return ((const market&)client).must_join_workers(); 288 } 289 290 arena* market::create_arena ( int num_slots, int num_reserved_slots, unsigned arena_priority_level, 291 std::size_t stack_size ) 292 { 293 __TBB_ASSERT( num_slots > 0, NULL ); 294 __TBB_ASSERT( num_reserved_slots <= num_slots, NULL ); 295 // Add public market reference for master thread/task_arena (that adds an internal reference in exchange). 296 market &m = global_market( /*is_public=*/true, num_slots-num_reserved_slots, stack_size ); 297 arena& a = arena::allocate_arena( m, num_slots, num_reserved_slots, arena_priority_level ); 298 // Add newly created arena into the existing market's list. 299 arenas_list_mutex_type::scoped_lock lock(m.my_arenas_list_mutex); 300 m.insert_arena_into_list(a); 301 return &a; 302 } 303 304 /** This method must be invoked under my_arenas_list_mutex. **/ 305 void market::detach_arena ( arena& a ) { 306 market::enforce([this] { return theMarket == this; }, "Global market instance was destroyed prematurely?"); 307 __TBB_ASSERT( !a.my_slots[0].is_occupied(), NULL ); 308 if (a.my_global_concurrency_mode.load(std::memory_order_relaxed)) 309 disable_mandatory_concurrency_impl(&a); 310 311 remove_arena_from_list(a); 312 if (a.my_aba_epoch == my_arenas_aba_epoch.load(std::memory_order_relaxed)) { 313 my_arenas_aba_epoch.store(my_arenas_aba_epoch.load(std::memory_order_relaxed) + 1, std::memory_order_relaxed); 314 } 315 } 316 317 void market::try_destroy_arena ( arena* a, uintptr_t aba_epoch, unsigned priority_level ) { 318 bool locked = true; 319 __TBB_ASSERT( a, NULL ); 320 // we hold reference to the market, so it cannot be destroyed at any moment here 321 market::enforce([this] { return theMarket == this; }, NULL); 322 __TBB_ASSERT( my_ref_count!=0, NULL ); 323 my_arenas_list_mutex.lock(); 324 arena_list_type::iterator it = my_arenas[priority_level].begin(); 325 for ( ; it != my_arenas[priority_level].end(); ++it ) { 326 if ( a == &*it ) { 327 if ( it->my_aba_epoch == aba_epoch ) { 328 // Arena is alive 329 if ( !a->my_num_workers_requested && !a->my_references.load(std::memory_order_relaxed) ) { 330 __TBB_ASSERT( 331 !a->my_num_workers_allotted.load(std::memory_order_relaxed) && 332 (a->my_pool_state == arena::SNAPSHOT_EMPTY || !a->my_max_num_workers), 333 "Inconsistent arena state" 334 ); 335 // Arena is abandoned. Destroy it. 336 detach_arena( *a ); 337 my_arenas_list_mutex.unlock(); 338 locked = false; 339 a->free_arena(); 340 } 341 } 342 if (locked) 343 my_arenas_list_mutex.unlock(); 344 return; 345 } 346 } 347 my_arenas_list_mutex.unlock(); 348 } 349 350 /** This method must be invoked under my_arenas_list_mutex. **/ 351 arena* market::arena_in_need ( arena_list_type* arenas, arena* hint ) { 352 // TODO: make sure arena with higher priority returned only if there are available slots in it. 353 hint = select_next_arena( hint ); 354 if ( !hint ) 355 return nullptr; 356 arena_list_type::iterator it = hint; 357 unsigned curr_priority_level = hint->my_priority_level; 358 __TBB_ASSERT( it != arenas[curr_priority_level].end(), nullptr ); 359 do { 360 arena& a = *it; 361 if ( ++it == arenas[curr_priority_level].end() ) { 362 do { 363 ++curr_priority_level %= num_priority_levels; 364 } while ( arenas[curr_priority_level].empty() ); 365 it = arenas[curr_priority_level].begin(); 366 } 367 if( a.num_workers_active() < a.my_num_workers_allotted.load(std::memory_order_relaxed) ) { 368 a.my_references += arena::ref_worker; 369 return &a; 370 } 371 } while ( it != hint ); 372 return nullptr; 373 } 374 375 arena* market::arena_in_need(arena* prev) { 376 if (my_total_demand.load(std::memory_order_acquire) <= 0) 377 return nullptr; 378 arenas_list_mutex_type::scoped_lock lock(my_arenas_list_mutex, /*is_writer=*/false); 379 // TODO: introduce three state response: alive, not_alive, no_market_arenas 380 if ( is_arena_alive(prev) ) 381 return arena_in_need(my_arenas, prev); 382 return arena_in_need(my_arenas, my_next_arena); 383 } 384 385 int market::update_allotment ( arena_list_type* arenas, int workers_demand, int max_workers ) { 386 __TBB_ASSERT( workers_demand > 0, nullptr ); 387 max_workers = min(workers_demand, max_workers); 388 int unassigned_workers = max_workers; 389 int assigned = 0; 390 int carry = 0; 391 unsigned max_priority_level = num_priority_levels; 392 for (unsigned list_idx = 0; list_idx < num_priority_levels; ++list_idx ) { 393 int assigned_per_priority = 0; 394 for (arena_list_type::iterator it = arenas[list_idx].begin(); it != arenas[list_idx].end(); ++it) { 395 arena& a = *it; 396 __TBB_ASSERT(a.my_num_workers_requested >= 0 && a.my_num_workers_requested <= int(a.my_max_num_workers), nullptr); 397 if (a.my_num_workers_requested == 0) { 398 __TBB_ASSERT(!a.my_num_workers_allotted.load(std::memory_order_relaxed), nullptr); 399 continue; 400 } 401 402 if (max_priority_level == num_priority_levels) { 403 max_priority_level = list_idx; 404 } 405 406 int allotted = 0; 407 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY 408 if (my_num_workers_soft_limit.load(std::memory_order_relaxed) == 0) { 409 __TBB_ASSERT(max_workers == 0 || max_workers == 1, nullptr); 410 allotted = a.my_global_concurrency_mode.load(std::memory_order_relaxed) && 411 assigned < max_workers ? 1 : 0; 412 } else 413 #endif 414 { 415 int tmp = a.my_num_workers_requested * unassigned_workers + carry; 416 allotted = tmp / my_priority_level_demand[list_idx]; 417 carry = tmp % my_priority_level_demand[list_idx]; 418 // a.my_num_workers_requested may temporarily exceed a.my_max_num_workers 419 allotted = min(allotted, (int)a.my_max_num_workers); 420 } 421 a.my_num_workers_allotted.store(allotted, std::memory_order_relaxed); 422 a.my_is_top_priority.store(list_idx == max_priority_level, std::memory_order_relaxed); 423 assigned += allotted; 424 assigned_per_priority += allotted; 425 } 426 unassigned_workers -= assigned_per_priority; 427 } 428 __TBB_ASSERT( 0 <= assigned && assigned <= max_workers, nullptr ); 429 return assigned; 430 } 431 432 /** This method must be invoked under my_arenas_list_mutex. **/ 433 bool market::is_arena_in_list( arena_list_type &arenas, arena *a ) { 434 __TBB_ASSERT( a, "Expected non-null pointer to arena." ); 435 for ( arena_list_type::iterator it = arenas.begin(); it != arenas.end(); ++it ) 436 if ( a == &*it ) 437 return true; 438 return false; 439 } 440 441 /** This method must be invoked under my_arenas_list_mutex. **/ 442 bool market::is_arena_alive(arena* a) { 443 if ( !a ) 444 return false; 445 446 // Still cannot access internals of the arena since the object itself might be destroyed. 447 448 for ( unsigned idx = 0; idx < num_priority_levels; ++idx ) { 449 if ( is_arena_in_list( my_arenas[idx], a ) ) 450 return true; 451 } 452 return false; 453 } 454 455 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY 456 void market::enable_mandatory_concurrency_impl ( arena *a ) { 457 __TBB_ASSERT(!a->my_global_concurrency_mode.load(std::memory_order_relaxed), NULL); 458 __TBB_ASSERT(my_num_workers_soft_limit.load(std::memory_order_relaxed) == 0, NULL); 459 460 a->my_global_concurrency_mode.store(true, std::memory_order_relaxed); 461 my_mandatory_num_requested++; 462 } 463 464 void market::enable_mandatory_concurrency ( arena *a ) { 465 int delta = 0; 466 { 467 arenas_list_mutex_type::scoped_lock lock(my_arenas_list_mutex); 468 if (my_num_workers_soft_limit.load(std::memory_order_relaxed) != 0 || 469 a->my_global_concurrency_mode.load(std::memory_order_relaxed)) 470 return; 471 472 enable_mandatory_concurrency_impl(a); 473 delta = update_workers_request(); 474 } 475 476 if (delta != 0) 477 my_server->adjust_job_count_estimate(delta); 478 } 479 480 void market::disable_mandatory_concurrency_impl(arena* a) { 481 __TBB_ASSERT(a->my_global_concurrency_mode.load(std::memory_order_relaxed), NULL); 482 __TBB_ASSERT(my_mandatory_num_requested > 0, NULL); 483 484 a->my_global_concurrency_mode.store(false, std::memory_order_relaxed); 485 my_mandatory_num_requested--; 486 } 487 488 void market::mandatory_concurrency_disable ( arena *a ) { 489 int delta = 0; 490 { 491 arenas_list_mutex_type::scoped_lock lock(my_arenas_list_mutex); 492 if (!a->my_global_concurrency_mode.load(std::memory_order_relaxed)) 493 return; 494 // There is a racy window in advertise_new_work between mandtory concurrency enabling and 495 // setting SNAPSHOT_FULL. It gives a chance to spawn request to disable mandatory concurrency. 496 // Therefore, we double check that there is no enqueued tasks. 497 if (a->has_enqueued_tasks()) 498 return; 499 500 __TBB_ASSERT(my_num_workers_soft_limit.load(std::memory_order_relaxed) == 0, NULL); 501 disable_mandatory_concurrency_impl(a); 502 503 delta = update_workers_request(); 504 } 505 if (delta != 0) 506 my_server->adjust_job_count_estimate(delta); 507 } 508 #endif /* __TBB_ENQUEUE_ENFORCED_CONCURRENCY */ 509 510 void market::adjust_demand ( arena& a, int delta ) { 511 market::enforce([] { return theMarket != nullptr; }, "market instance was destroyed prematurely?"); 512 if ( !delta ) 513 return; 514 my_arenas_list_mutex.lock(); 515 a.my_total_num_workers_requested += delta; 516 int target_workers = 0; 517 // Cap target_workers into interval [0, a.my_max_num_workers] 518 if (a.my_total_num_workers_requested > 0) { 519 target_workers = a.my_total_num_workers_requested < int(a.my_max_num_workers) ? 520 a.my_total_num_workers_requested : a.my_max_num_workers; 521 } 522 523 delta = target_workers - a.my_num_workers_requested; 524 525 if (delta == 0) { 526 my_arenas_list_mutex.unlock(); 527 return; 528 } 529 530 a.my_num_workers_requested += delta; 531 if (a.my_num_workers_requested == 0) { 532 a.my_num_workers_allotted.store(0, std::memory_order_relaxed); 533 } 534 535 int total_demand = my_total_demand.load(std::memory_order_relaxed) + delta; 536 my_total_demand.store(total_demand, std::memory_order_relaxed); 537 my_priority_level_demand[a.my_priority_level] += delta; 538 unsigned effective_soft_limit = my_num_workers_soft_limit.load(std::memory_order_relaxed); 539 if (my_mandatory_num_requested > 0) { 540 __TBB_ASSERT(effective_soft_limit == 0, NULL); 541 effective_soft_limit = 1; 542 } 543 544 update_allotment(effective_soft_limit); 545 if ( delta > 0 ) { 546 // can't overflow soft_limit, but remember values request by arenas in 547 // my_total_demand to not prematurely release workers to RML 548 if ( my_num_workers_requested+delta > (int)effective_soft_limit) 549 delta = effective_soft_limit - my_num_workers_requested; 550 } else { 551 // the number of workers should not be decreased below my_total_demand 552 if ( my_num_workers_requested+delta < total_demand ) 553 delta = min(total_demand, (int)effective_soft_limit) - my_num_workers_requested; 554 } 555 my_num_workers_requested += delta; 556 __TBB_ASSERT( my_num_workers_requested <= (int)effective_soft_limit, NULL ); 557 558 int target_epoch = my_adjust_demand_target_epoch++; 559 560 my_arenas_list_mutex.unlock(); 561 562 spin_wait_until_eq(my_adjust_demand_current_epoch, target_epoch); 563 // Must be called outside of any locks 564 my_server->adjust_job_count_estimate( delta ); 565 my_adjust_demand_current_epoch.store(target_epoch + 1, std::memory_order_release); 566 } 567 568 void market::process( job& j ) { 569 thread_data& td = static_cast<thread_data&>(j); 570 // td.my_arena can be dead. Don't access it until arena_in_need is called 571 arena *a = td.my_arena; 572 for (int i = 0; i < 2; ++i) { 573 while ( (a = arena_in_need(a)) ) { 574 a->process(td); 575 a = nullptr; // to avoid double checks in arena_in_need(arena*) for the same priority level 576 } 577 // Workers leave market because there is no arena in need. It can happen earlier than 578 // adjust_job_count_estimate() decreases my_slack and RML can put this thread to sleep. 579 // It might result in a busy-loop checking for my_slack<0 and calling this method instantly. 580 // the yield refines this spinning. 581 if ( !i ) { 582 yield(); 583 } 584 } 585 } 586 587 void market::cleanup( job& j) { 588 market::enforce([this] { return theMarket != this; }, NULL ); 589 governor::auto_terminate(&j); 590 } 591 592 void market::acknowledge_close_connection() { 593 destroy(); 594 } 595 596 ::rml::job* market::create_one_job() { 597 unsigned short index = ++my_first_unused_worker_idx; 598 __TBB_ASSERT( index > 0, NULL ); 599 ITT_THREAD_SET_NAME(_T("TBB Worker Thread")); 600 // index serves as a hint decreasing conflicts between workers when they migrate between arenas 601 thread_data* td = new(cache_aligned_allocate(sizeof(thread_data))) thread_data{ index, true }; 602 __TBB_ASSERT( index <= my_num_workers_hard_limit, NULL ); 603 __TBB_ASSERT( my_workers[index - 1] == nullptr, NULL ); 604 my_workers[index - 1] = td; 605 return td; 606 } 607 608 void market::add_external_thread(thread_data& td) { 609 context_state_propagation_mutex_type::scoped_lock lock(the_context_state_propagation_mutex); 610 my_masters.push_front(td); 611 } 612 613 void market::remove_external_thread(thread_data& td) { 614 context_state_propagation_mutex_type::scoped_lock lock(the_context_state_propagation_mutex); 615 my_masters.remove(td); 616 } 617 618 } // namespace r1 619 } // namespace detail 620 } // namespace tbb 621