xref: /oneTBB/src/tbb/market.cpp (revision fd76f452)
1 /*
2     Copyright (c) 2005-2022 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #include "oneapi/tbb/global_control.h" // global_control::active_value
18 
19 #include "market.h"
20 #include "main.h"
21 #include "governor.h"
22 #include "arena.h"
23 #include "thread_data.h"
24 #include "itt_notify.h"
25 
26 #include <cstring> // std::memset()
27 
28 namespace tbb {
29 namespace detail {
30 namespace r1 {
31 
32 /** This method must be invoked under my_arenas_list_mutex. **/
33 arena* market::select_next_arena( arena* hint ) {
34     unsigned next_arena_priority_level = num_priority_levels;
35     if ( hint )
36         next_arena_priority_level = hint->my_priority_level;
37     for ( unsigned idx = 0; idx < next_arena_priority_level; ++idx ) {
38         if ( !my_arenas[idx].empty() )
39             return &*my_arenas[idx].begin();
40     }
41     // don't change if arena with higher priority is not found.
42     return hint;
43 }
44 
45 void market::insert_arena_into_list ( arena& a ) {
46     __TBB_ASSERT( a.my_priority_level < num_priority_levels, nullptr );
47     my_arenas[a.my_priority_level].push_front( a );
48     __TBB_ASSERT( !my_next_arena || my_next_arena->my_priority_level < num_priority_levels, nullptr );
49     my_next_arena = select_next_arena( my_next_arena );
50 }
51 
52 void market::remove_arena_from_list ( arena& a ) {
53     __TBB_ASSERT( a.my_priority_level < num_priority_levels, nullptr );
54     my_arenas[a.my_priority_level].remove( a );
55     if ( my_next_arena == &a )
56         my_next_arena = nullptr;
57     my_next_arena = select_next_arena( my_next_arena );
58 }
59 
60 //------------------------------------------------------------------------
61 // market
62 //------------------------------------------------------------------------
63 
64 market::market ( unsigned workers_soft_limit, unsigned workers_hard_limit, std::size_t stack_size )
65     : my_num_workers_hard_limit(workers_hard_limit)
66     , my_num_workers_soft_limit(workers_soft_limit)
67     , my_next_arena(nullptr)
68     , my_ref_count(1)
69     , my_stack_size(stack_size)
70     , my_workers_soft_limit_to_report(workers_soft_limit)
71 {
72     // Once created RML server will start initializing workers that will need
73     // global market instance to get worker stack size
74     my_server = governor::create_rml_server( *this );
75     __TBB_ASSERT( my_server, "Failed to create RML server" );
76 }
77 
78 static unsigned calc_workers_soft_limit(unsigned workers_soft_limit, unsigned workers_hard_limit) {
79     if( int soft_limit = market::app_parallelism_limit() )
80         workers_soft_limit = soft_limit-1;
81     else // if user set no limits (yet), use market's parameter
82         workers_soft_limit = max( governor::default_num_threads() - 1, workers_soft_limit );
83     if( workers_soft_limit >= workers_hard_limit )
84         workers_soft_limit = workers_hard_limit-1;
85     return workers_soft_limit;
86 }
87 
88 bool market::add_ref_unsafe( global_market_mutex_type::scoped_lock& lock, bool is_public, unsigned workers_requested, std::size_t stack_size ) {
89     market *m = theMarket;
90     if( m ) {
91         ++m->my_ref_count;
92         const unsigned old_public_count = is_public ? m->my_public_ref_count++ : /*any non-zero value*/1;
93         lock.release();
94         if( old_public_count==0 )
95             set_active_num_workers( calc_workers_soft_limit(workers_requested, m->my_num_workers_hard_limit) );
96 
97         // do not warn if default number of workers is requested
98         if( workers_requested != governor::default_num_threads()-1 ) {
99             __TBB_ASSERT( skip_soft_limit_warning > workers_requested,
100                           "skip_soft_limit_warning must be larger than any valid workers_requested" );
101             unsigned soft_limit_to_report = m->my_workers_soft_limit_to_report.load(std::memory_order_relaxed);
102             if( soft_limit_to_report < workers_requested ) {
103                 runtime_warning( "The number of workers is currently limited to %u. "
104                                  "The request for %u workers is ignored. Further requests for more workers "
105                                  "will be silently ignored until the limit changes.\n",
106                                  soft_limit_to_report, workers_requested );
107                 // The race is possible when multiple threads report warnings.
108                 // We are OK with that, as there are just multiple warnings.
109                 unsigned expected_limit = soft_limit_to_report;
110                 m->my_workers_soft_limit_to_report.compare_exchange_strong(expected_limit, skip_soft_limit_warning);
111             }
112 
113         }
114         if( m->my_stack_size < stack_size )
115             runtime_warning( "Thread stack size has been already set to %u. "
116                              "The request for larger stack (%u) cannot be satisfied.\n", m->my_stack_size, stack_size );
117         return true;
118     }
119     return false;
120 }
121 
122 market& market::global_market(bool is_public, unsigned workers_requested, std::size_t stack_size) {
123     global_market_mutex_type::scoped_lock lock( theMarketMutex );
124     if( !market::add_ref_unsafe(lock, is_public, workers_requested, stack_size) ) {
125         // TODO: A lot is done under theMarketMutex locked. Can anything be moved out?
126         if( stack_size == 0 )
127             stack_size = global_control::active_value(global_control::thread_stack_size);
128         // Expecting that 4P is suitable for most applications.
129         // Limit to 2P for large thread number.
130         // TODO: ask RML for max concurrency and possibly correct hard_limit
131         const unsigned factor = governor::default_num_threads()<=128? 4 : 2;
132         // The requested number of threads is intentionally not considered in
133         // computation of the hard limit, in order to separate responsibilities
134         // and avoid complicated interactions between global_control and task_scheduler_init.
135         // The market guarantees that at least 256 threads might be created.
136         const unsigned workers_hard_limit = max(max(factor*governor::default_num_threads(), 256u), app_parallelism_limit());
137         const unsigned workers_soft_limit = calc_workers_soft_limit(workers_requested, workers_hard_limit);
138         // Create the global market instance
139         std::size_t size = sizeof(market);
140         __TBB_ASSERT( __TBB_offsetof(market, my_workers) + sizeof(std::atomic<thread_data*>) == sizeof(market),
141                       "my_workers must be the last data field of the market class");
142         size += sizeof(std::atomic<thread_data*>) * (workers_hard_limit - 1);
143         __TBB_InitOnce::add_ref();
144         void* storage = cache_aligned_allocate(size);
145         std::memset( storage, 0, size );
146         // Initialize and publish global market
147         market* m = new (storage) market( workers_soft_limit, workers_hard_limit, stack_size );
148         if( is_public )
149             m->my_public_ref_count.store(1, std::memory_order_relaxed);
150         if (market::is_lifetime_control_present()) {
151             ++m->my_public_ref_count;
152             ++m->my_ref_count;
153         }
154         theMarket = m;
155         // This check relies on the fact that for shared RML default_concurrency==max_concurrency
156         if ( !governor::UsePrivateRML && m->my_server->default_concurrency() < workers_soft_limit )
157             runtime_warning( "RML might limit the number of workers to %u while %u is requested.\n"
158                     , m->my_server->default_concurrency(), workers_soft_limit );
159     }
160     return *theMarket;
161 }
162 
163 void market::destroy () {
164     this->market::~market(); // qualified to suppress warning
165     cache_aligned_deallocate( this );
166     __TBB_InitOnce::remove_ref();
167 }
168 
169 bool market::release ( bool is_public, bool blocking_terminate ) {
170     market::enforce([this] { return theMarket == this; }, "Global market instance was destroyed prematurely?");
171     bool do_release = false;
172     {
173         global_market_mutex_type::scoped_lock lock( theMarketMutex );
174         if ( blocking_terminate ) {
175             __TBB_ASSERT( is_public, "Only an object with a public reference can request the blocking terminate" );
176             while ( my_public_ref_count.load(std::memory_order_relaxed) == 1 &&
177                     my_ref_count.load(std::memory_order_relaxed) > 1 ) {
178                 lock.release();
179                 // To guarantee that request_close_connection() is called by the last external thread, we need to wait till all
180                 // references are released. Re-read my_public_ref_count to limit waiting if new external threads are created.
181                 // Theoretically, new private references to the market can be added during waiting making it potentially
182                 // endless.
183                 // TODO: revise why the weak scheduler needs market's pointer and try to remove this wait.
184                 // Note that the market should know about its schedulers for cancellation/exception/priority propagation,
185                 // see e.g. task_group_context::cancel_group_execution()
186                 while ( my_public_ref_count.load(std::memory_order_acquire) == 1 &&
187                         my_ref_count.load(std::memory_order_acquire) > 1 ) {
188                     yield();
189                 }
190                 lock.acquire( theMarketMutex );
191             }
192         }
193         if ( is_public ) {
194             __TBB_ASSERT( theMarket == this, "Global market instance was destroyed prematurely?" );
195             __TBB_ASSERT( my_public_ref_count.load(std::memory_order_relaxed), nullptr);
196             --my_public_ref_count;
197         }
198         if ( --my_ref_count == 0 ) {
199             __TBB_ASSERT( !my_public_ref_count.load(std::memory_order_relaxed), nullptr);
200             do_release = true;
201             theMarket = nullptr;
202         }
203     }
204     if( do_release ) {
205         __TBB_ASSERT( !my_public_ref_count.load(std::memory_order_relaxed),
206             "No public references remain if we remove the market." );
207         // inform RML that blocking termination is required
208         my_join_workers = blocking_terminate;
209         my_server->request_close_connection();
210         return blocking_terminate;
211     }
212     return false;
213 }
214 
215 int market::update_workers_request() {
216     int old_request = my_num_workers_requested;
217     my_num_workers_requested = min(my_total_demand.load(std::memory_order_relaxed),
218                                    (int)my_num_workers_soft_limit.load(std::memory_order_relaxed));
219 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
220     if (my_mandatory_num_requested > 0) {
221         __TBB_ASSERT(my_num_workers_soft_limit.load(std::memory_order_relaxed) == 0, nullptr);
222         my_num_workers_requested = 1;
223     }
224 #endif
225     update_allotment(my_num_workers_requested);
226     return my_num_workers_requested - old_request;
227 }
228 
229 void market::set_active_num_workers ( unsigned soft_limit ) {
230     market *m;
231 
232     {
233         global_market_mutex_type::scoped_lock lock( theMarketMutex );
234         if ( !theMarket )
235             return; // actual value will be used at market creation
236         m = theMarket;
237         if (m->my_num_workers_soft_limit.load(std::memory_order_relaxed) == soft_limit)
238             return;
239         ++m->my_ref_count;
240     }
241     // have my_ref_count for market, use it safely
242 
243     int delta = 0;
244     {
245         arenas_list_mutex_type::scoped_lock lock( m->my_arenas_list_mutex );
246         __TBB_ASSERT(soft_limit <= m->my_num_workers_hard_limit, nullptr);
247 
248 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
249         arena_list_type* arenas = m->my_arenas;
250 
251         if (m->my_num_workers_soft_limit.load(std::memory_order_relaxed) == 0 &&
252             m->my_mandatory_num_requested > 0)
253         {
254             for (unsigned level = 0; level < num_priority_levels; ++level )
255                 for (arena_list_type::iterator it = arenas[level].begin(); it != arenas[level].end(); ++it)
256                     if (it->my_global_concurrency_mode.load(std::memory_order_relaxed))
257                         m->disable_mandatory_concurrency_impl(&*it);
258         }
259         __TBB_ASSERT(m->my_mandatory_num_requested == 0, nullptr);
260 #endif
261 
262         m->my_num_workers_soft_limit.store(soft_limit, std::memory_order_release);
263         // report only once after new soft limit value is set
264         m->my_workers_soft_limit_to_report.store(soft_limit, std::memory_order_relaxed);
265 
266 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
267         if (m->my_num_workers_soft_limit.load(std::memory_order_relaxed) == 0) {
268             for (unsigned level = 0; level < num_priority_levels; ++level )
269                 for (arena_list_type::iterator it = arenas[level].begin(); it != arenas[level].end(); ++it)
270                     if (it->has_enqueued_tasks())
271                         m->enable_mandatory_concurrency_impl(&*it);
272         }
273 #endif
274 
275         delta = m->update_workers_request();
276     }
277     // adjust_job_count_estimate must be called outside of any locks
278     if( delta!=0 )
279         m->my_server->adjust_job_count_estimate( delta );
280     // release internal market reference to match ++m->my_ref_count above
281     m->release( /*is_public=*/false, /*blocking_terminate=*/false );
282 }
283 
284 bool governor::does_client_join_workers (const rml::tbb_client &client) {
285     return ((const market&)client).must_join_workers();
286 }
287 
288 arena* market::create_arena ( int num_slots, int num_reserved_slots, unsigned arena_priority_level,
289                               std::size_t stack_size )
290 {
291     __TBB_ASSERT( num_slots > 0, nullptr);
292     __TBB_ASSERT( num_reserved_slots <= num_slots, nullptr);
293     // Add public market reference for an external thread/task_arena (that adds an internal reference in exchange).
294     market &m = global_market( /*is_public=*/true, num_slots-num_reserved_slots, stack_size );
295     arena& a = arena::allocate_arena( m, num_slots, num_reserved_slots, arena_priority_level );
296     // Add newly created arena into the existing market's list.
297     arenas_list_mutex_type::scoped_lock lock(m.my_arenas_list_mutex);
298     m.insert_arena_into_list(a);
299     return &a;
300 }
301 
302 /** This method must be invoked under my_arenas_list_mutex. **/
303 void market::detach_arena ( arena& a ) {
304     market::enforce([this] { return theMarket == this; }, "Global market instance was destroyed prematurely?");
305     __TBB_ASSERT( !a.my_slots[0].is_occupied(), nullptr);
306     if (a.my_global_concurrency_mode.load(std::memory_order_relaxed))
307         disable_mandatory_concurrency_impl(&a);
308 
309     remove_arena_from_list(a);
310     if (a.my_aba_epoch == my_arenas_aba_epoch.load(std::memory_order_relaxed)) {
311         my_arenas_aba_epoch.store(my_arenas_aba_epoch.load(std::memory_order_relaxed) + 1, std::memory_order_relaxed);
312     }
313 }
314 
315 void market::try_destroy_arena ( arena* a, uintptr_t aba_epoch, unsigned priority_level ) {
316     bool locked = true;
317     __TBB_ASSERT( a, nullptr);
318     // we hold reference to the market, so it cannot be destroyed at any moment here
319     market::enforce([this] { return theMarket == this; }, nullptr);
320     __TBB_ASSERT( my_ref_count!=0, nullptr);
321     my_arenas_list_mutex.lock();
322         arena_list_type::iterator it = my_arenas[priority_level].begin();
323         for ( ; it != my_arenas[priority_level].end(); ++it ) {
324             if ( a == &*it ) {
325                 if ( it->my_aba_epoch == aba_epoch ) {
326                     // Arena is alive
327                     // Acquire my_references to sync with threads that just left the arena
328                     if (!a->my_num_workers_requested && !a->my_references.load(std::memory_order_acquire)) {
329                         __TBB_ASSERT(
330                             !a->my_num_workers_allotted.load(std::memory_order_relaxed) &&
331                             (a->my_pool_state == arena::SNAPSHOT_EMPTY || !a->my_max_num_workers),
332                             "Inconsistent arena state"
333                         );
334                         // Arena is abandoned. Destroy it.
335                         detach_arena( *a );
336                         my_arenas_list_mutex.unlock();
337                         locked = false;
338                         a->free_arena();
339                     }
340                 }
341                 if (locked)
342                     my_arenas_list_mutex.unlock();
343                 return;
344             }
345         }
346     my_arenas_list_mutex.unlock();
347 }
348 
349 /** This method must be invoked under my_arenas_list_mutex. **/
350 arena* market::arena_in_need ( arena_list_type* arenas, arena* hint ) {
351     // TODO: make sure arena with higher priority returned only if there are available slots in it.
352     hint = select_next_arena( hint );
353     if ( !hint )
354         return nullptr;
355     arena_list_type::iterator it = hint;
356     unsigned curr_priority_level = hint->my_priority_level;
357     __TBB_ASSERT( it != arenas[curr_priority_level].end(), nullptr );
358     do {
359         arena& a = *it;
360         if ( ++it == arenas[curr_priority_level].end() ) {
361             do {
362                 ++curr_priority_level %= num_priority_levels;
363             } while ( arenas[curr_priority_level].empty() );
364             it = arenas[curr_priority_level].begin();
365         }
366         if( a.num_workers_active() < a.my_num_workers_allotted.load(std::memory_order_relaxed) ) {
367             a.my_references += arena::ref_worker;
368             return &a;
369         }
370     } while ( it != hint );
371     return nullptr;
372 }
373 
374 arena* market::arena_in_need(arena* prev) {
375     if (my_total_demand.load(std::memory_order_acquire) <= 0)
376         return nullptr;
377     arenas_list_mutex_type::scoped_lock lock(my_arenas_list_mutex, /*is_writer=*/false);
378     // TODO: introduce three state response: alive, not_alive, no_market_arenas
379     if ( is_arena_alive(prev) )
380         return arena_in_need(my_arenas, prev);
381     return arena_in_need(my_arenas, my_next_arena);
382 }
383 
384 int market::update_allotment ( arena_list_type* arenas, int workers_demand, int max_workers ) {
385     __TBB_ASSERT( workers_demand > 0, nullptr );
386     max_workers = min(workers_demand, max_workers);
387     int unassigned_workers = max_workers;
388     int assigned = 0;
389     int carry = 0;
390     unsigned max_priority_level = num_priority_levels;
391     for (unsigned list_idx = 0; list_idx < num_priority_levels; ++list_idx ) {
392         int assigned_per_priority = min(my_priority_level_demand[list_idx], unassigned_workers);
393         unassigned_workers -= assigned_per_priority;
394         for (arena_list_type::iterator it = arenas[list_idx].begin(); it != arenas[list_idx].end(); ++it) {
395             arena& a = *it;
396             __TBB_ASSERT(a.my_num_workers_requested >= 0, nullptr);
397             __TBB_ASSERT(a.my_num_workers_requested <= int(a.my_max_num_workers)
398                 || (a.my_max_num_workers == 0 && a.my_local_concurrency_requests > 0 && a.my_num_workers_requested == 1), nullptr);
399             if (a.my_num_workers_requested == 0) {
400                 __TBB_ASSERT(!a.my_num_workers_allotted.load(std::memory_order_relaxed), nullptr);
401                 continue;
402             }
403 
404             if (max_priority_level == num_priority_levels) {
405                 max_priority_level = list_idx;
406             }
407 
408             int allotted = 0;
409 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
410             if (my_num_workers_soft_limit.load(std::memory_order_relaxed) == 0) {
411                 __TBB_ASSERT(max_workers == 0 || max_workers == 1, nullptr);
412                 allotted = a.my_global_concurrency_mode.load(std::memory_order_relaxed) &&
413                     assigned < max_workers ? 1 : 0;
414             } else
415 #endif
416             {
417                 int tmp = a.my_num_workers_requested * assigned_per_priority + carry;
418                 allotted = tmp / my_priority_level_demand[list_idx];
419                 carry = tmp % my_priority_level_demand[list_idx];
420                 __TBB_ASSERT(allotted <= a.my_num_workers_requested, nullptr);
421                 __TBB_ASSERT(allotted <= int(a.my_num_slots - a.my_num_reserved_slots), nullptr);
422             }
423             a.my_num_workers_allotted.store(allotted, std::memory_order_relaxed);
424             a.my_is_top_priority.store(list_idx == max_priority_level, std::memory_order_relaxed);
425             assigned += allotted;
426         }
427     }
428     __TBB_ASSERT( 0 <= assigned && assigned <= max_workers, nullptr );
429     return assigned;
430 }
431 
432 /** This method must be invoked under my_arenas_list_mutex. **/
433 bool market::is_arena_in_list( arena_list_type &arenas, arena *a ) {
434     __TBB_ASSERT( a, "Expected non-null pointer to arena." );
435     for ( arena_list_type::iterator it = arenas.begin(); it != arenas.end(); ++it )
436         if ( a == &*it )
437             return true;
438     return false;
439 }
440 
441 /** This method must be invoked under my_arenas_list_mutex. **/
442 bool market::is_arena_alive(arena* a) {
443     if ( !a )
444         return false;
445 
446     // Still cannot access internals of the arena since the object itself might be destroyed.
447 
448     for ( unsigned idx = 0; idx < num_priority_levels; ++idx ) {
449         if ( is_arena_in_list( my_arenas[idx], a ) )
450             return true;
451     }
452     return false;
453 }
454 
455 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
456 void market::enable_mandatory_concurrency_impl ( arena *a ) {
457     __TBB_ASSERT(!a->my_global_concurrency_mode.load(std::memory_order_relaxed), nullptr);
458     __TBB_ASSERT(my_num_workers_soft_limit.load(std::memory_order_relaxed) == 0, nullptr);
459 
460     a->my_global_concurrency_mode.store(true, std::memory_order_relaxed);
461     my_mandatory_num_requested++;
462 }
463 
464 void market::enable_mandatory_concurrency ( arena *a ) {
465     int delta = 0;
466     {
467         arenas_list_mutex_type::scoped_lock lock(my_arenas_list_mutex);
468         if (my_num_workers_soft_limit.load(std::memory_order_relaxed) != 0 ||
469             a->my_global_concurrency_mode.load(std::memory_order_relaxed))
470             return;
471 
472         enable_mandatory_concurrency_impl(a);
473         delta = update_workers_request();
474     }
475 
476     if (delta != 0)
477         my_server->adjust_job_count_estimate(delta);
478 }
479 
480 void market::disable_mandatory_concurrency_impl(arena* a) {
481     __TBB_ASSERT(a->my_global_concurrency_mode.load(std::memory_order_relaxed), nullptr);
482     __TBB_ASSERT(my_mandatory_num_requested > 0, nullptr);
483 
484     a->my_global_concurrency_mode.store(false, std::memory_order_relaxed);
485     my_mandatory_num_requested--;
486 }
487 
488 void market::mandatory_concurrency_disable ( arena *a ) {
489     int delta = 0;
490     {
491         arenas_list_mutex_type::scoped_lock lock(my_arenas_list_mutex);
492         if (!a->my_global_concurrency_mode.load(std::memory_order_relaxed))
493             return;
494         // There is a racy window in advertise_new_work between mandtory concurrency enabling and
495         // setting SNAPSHOT_FULL. It gives a chance to spawn request to disable mandatory concurrency.
496         // Therefore, we double check that there is no enqueued tasks.
497         if (a->has_enqueued_tasks())
498             return;
499 
500         __TBB_ASSERT(my_num_workers_soft_limit.load(std::memory_order_relaxed) == 0, nullptr);
501         disable_mandatory_concurrency_impl(a);
502 
503         delta = update_workers_request();
504     }
505     if (delta != 0)
506         my_server->adjust_job_count_estimate(delta);
507 }
508 #endif /* __TBB_ENQUEUE_ENFORCED_CONCURRENCY */
509 
510 void market::adjust_demand ( arena& a, int delta, bool mandatory ) {
511     if (!delta) {
512         return;
513     }
514     int target_epoch{};
515     {
516         arenas_list_mutex_type::scoped_lock lock(my_arenas_list_mutex);
517         __TBB_ASSERT(theMarket != nullptr, "market instance was destroyed prematurely?");
518 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
519         if (mandatory) {
520             __TBB_ASSERT(delta == 1 || delta == -1, nullptr);
521             // Count the number of mandatory requests and proceed only for 0->1 and 1->0 transitions.
522             a.my_local_concurrency_requests += delta;
523             if ((delta > 0 && a.my_local_concurrency_requests != 1) ||
524                 (delta < 0 && a.my_local_concurrency_requests != 0))
525             {
526                 return;
527             }
528         }
529 #endif
530         a.my_total_num_workers_requested += delta;
531         int target_workers = 0;
532         // Cap target_workers into interval [0, a.my_max_num_workers]
533         if (a.my_total_num_workers_requested > 0) {
534 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
535             // At least one thread should be requested when mandatory concurrency
536             int max_num_workers = int(a.my_max_num_workers);
537             if (a.my_local_concurrency_requests > 0 && max_num_workers == 0) {
538                 max_num_workers = 1;
539             }
540 #endif
541             target_workers = min(a.my_total_num_workers_requested, max_num_workers);
542         }
543 
544         delta = target_workers - a.my_num_workers_requested;
545 
546         if (delta == 0) {
547             return;
548         }
549 
550         a.my_num_workers_requested += delta;
551         if (a.my_num_workers_requested == 0) {
552             a.my_num_workers_allotted.store(0, std::memory_order_relaxed);
553         }
554 
555         int total_demand = my_total_demand.load(std::memory_order_relaxed) + delta;
556         my_total_demand.store(total_demand, std::memory_order_relaxed);
557         my_priority_level_demand[a.my_priority_level] += delta;
558         unsigned effective_soft_limit = my_num_workers_soft_limit.load(std::memory_order_relaxed);
559         if (my_mandatory_num_requested > 0) {
560             __TBB_ASSERT(effective_soft_limit == 0, nullptr);
561             effective_soft_limit = 1;
562         }
563 
564         update_allotment(effective_soft_limit);
565         if (delta > 0) {
566             // can't overflow soft_limit, but remember values request by arenas in
567             // my_total_demand to not prematurely release workers to RML
568             if (my_num_workers_requested + delta > (int)effective_soft_limit)
569                 delta = effective_soft_limit - my_num_workers_requested;
570         }
571         else {
572             // the number of workers should not be decreased below my_total_demand
573             if (my_num_workers_requested + delta < total_demand)
574                 delta = min(total_demand, (int)effective_soft_limit) - my_num_workers_requested;
575         }
576         my_num_workers_requested += delta;
577         __TBB_ASSERT(my_num_workers_requested <= (int)effective_soft_limit, nullptr);
578 
579         target_epoch = a.my_adjust_demand_target_epoch++;
580     }
581 
582     a.my_adjust_demand_current_epoch.wait_until(target_epoch, /* context = */ target_epoch, std::memory_order_relaxed);
583     // Must be called outside of any locks
584     my_server->adjust_job_count_estimate( delta );
585     a.my_adjust_demand_current_epoch.exchange(target_epoch + 1);
586     a.my_adjust_demand_current_epoch.notify_relaxed(target_epoch + 1);
587 }
588 
589 void market::process( job& j ) {
590     thread_data& td = static_cast<thread_data&>(j);
591     // td.my_arena can be dead. Don't access it until arena_in_need is called
592     arena *a = td.my_arena;
593     for (int i = 0; i < 2; ++i) {
594         while ( (a = arena_in_need(a)) ) {
595             a->process(td);
596         }
597         // Workers leave market because there is no arena in need. It can happen earlier than
598         // adjust_job_count_estimate() decreases my_slack and RML can put this thread to sleep.
599         // It might result in a busy-loop checking for my_slack<0 and calling this method instantly.
600         // the yield refines this spinning.
601         if ( !i ) {
602             yield();
603         }
604     }
605 }
606 
607 void market::cleanup( job& j) {
608     market::enforce([this] { return theMarket != this; }, nullptr );
609     governor::auto_terminate(&j);
610 }
611 
612 void market::acknowledge_close_connection() {
613     destroy();
614 }
615 
616 ::rml::job* market::create_one_job() {
617     unsigned short index = ++my_first_unused_worker_idx;
618     __TBB_ASSERT( index > 0, nullptr);
619     ITT_THREAD_SET_NAME(_T("TBB Worker Thread"));
620     // index serves as a hint decreasing conflicts between workers when they migrate between arenas
621     thread_data* td = new(cache_aligned_allocate(sizeof(thread_data))) thread_data{ index, true };
622     __TBB_ASSERT( index <= my_num_workers_hard_limit, nullptr);
623     __TBB_ASSERT( my_workers[index - 1].load(std::memory_order_relaxed) == nullptr, nullptr);
624     my_workers[index - 1].store(td, std::memory_order_release);
625     return td;
626 }
627 
628 void market::add_external_thread(thread_data& td) {
629     context_state_propagation_mutex_type::scoped_lock lock(the_context_state_propagation_mutex);
630     my_masters.push_front(td);
631 }
632 
633 void market::remove_external_thread(thread_data& td) {
634     context_state_propagation_mutex_type::scoped_lock lock(the_context_state_propagation_mutex);
635     my_masters.remove(td);
636 }
637 
638 } // namespace r1
639 } // namespace detail
640 } // namespace tbb
641