xref: /oneTBB/src/tbb/market.cpp (revision 2eccd5f9)
1 /*
2     Copyright (c) 2005-2022 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #include "oneapi/tbb/global_control.h" // global_control::active_value
18 
19 #include "market.h"
20 #include "main.h"
21 #include "governor.h"
22 #include "arena.h"
23 #include "thread_data.h"
24 #include "itt_notify.h"
25 
26 #include <cstring> // std::memset()
27 
28 namespace tbb {
29 namespace detail {
30 namespace r1 {
31 
32 /** This method must be invoked under my_arenas_list_mutex. **/
33 arena* market::select_next_arena( arena* hint ) {
34     unsigned next_arena_priority_level = num_priority_levels;
35     if ( hint )
36         next_arena_priority_level = hint->my_priority_level;
37     for ( unsigned idx = 0; idx < next_arena_priority_level; ++idx ) {
38         if ( !my_arenas[idx].empty() )
39             return &*my_arenas[idx].begin();
40     }
41     // don't change if arena with higher priority is not found.
42     return hint;
43 }
44 
45 void market::insert_arena_into_list ( arena& a ) {
46     __TBB_ASSERT( a.my_priority_level < num_priority_levels, nullptr );
47     my_arenas[a.my_priority_level].push_front( a );
48     __TBB_ASSERT( !my_next_arena || my_next_arena->my_priority_level < num_priority_levels, nullptr );
49     my_next_arena = select_next_arena( my_next_arena );
50 }
51 
52 void market::remove_arena_from_list ( arena& a ) {
53     __TBB_ASSERT( a.my_priority_level < num_priority_levels, nullptr );
54     my_arenas[a.my_priority_level].remove( a );
55     if ( my_next_arena == &a )
56         my_next_arena = nullptr;
57     my_next_arena = select_next_arena( my_next_arena );
58 }
59 
60 //------------------------------------------------------------------------
61 // market
62 //------------------------------------------------------------------------
63 
64 market::market ( unsigned workers_soft_limit, unsigned workers_hard_limit, std::size_t stack_size )
65     : my_num_workers_hard_limit(workers_hard_limit)
66     , my_num_workers_soft_limit(workers_soft_limit)
67     , my_next_arena(nullptr)
68     , my_ref_count(1)
69     , my_stack_size(stack_size)
70     , my_workers_soft_limit_to_report(workers_soft_limit)
71 {
72     // Once created RML server will start initializing workers that will need
73     // global market instance to get worker stack size
74     my_server = governor::create_rml_server( *this );
75     __TBB_ASSERT( my_server, "Failed to create RML server" );
76 }
77 
78 market::~market() {
79     poison_pointer(my_server);
80     poison_pointer(my_next_arena);
81 }
82 
83 static unsigned calc_workers_soft_limit(unsigned workers_soft_limit, unsigned workers_hard_limit) {
84     if( int soft_limit = market::app_parallelism_limit() )
85         workers_soft_limit = soft_limit-1;
86     else // if user set no limits (yet), use market's parameter
87         workers_soft_limit = max( governor::default_num_threads() - 1, workers_soft_limit );
88     if( workers_soft_limit >= workers_hard_limit )
89         workers_soft_limit = workers_hard_limit-1;
90     return workers_soft_limit;
91 }
92 
93 bool market::add_ref_unsafe( global_market_mutex_type::scoped_lock& lock, bool is_public, unsigned workers_requested, std::size_t stack_size ) {
94     market *m = theMarket;
95     if( m ) {
96         ++m->my_ref_count;
97         const unsigned old_public_count = is_public ? m->my_public_ref_count++ : /*any non-zero value*/1;
98         lock.release();
99         if( old_public_count==0 )
100             set_active_num_workers( calc_workers_soft_limit(workers_requested, m->my_num_workers_hard_limit) );
101 
102         // do not warn if default number of workers is requested
103         if( workers_requested != governor::default_num_threads()-1 ) {
104             __TBB_ASSERT( skip_soft_limit_warning > workers_requested,
105                           "skip_soft_limit_warning must be larger than any valid workers_requested" );
106             unsigned soft_limit_to_report = m->my_workers_soft_limit_to_report.load(std::memory_order_relaxed);
107             if( soft_limit_to_report < workers_requested ) {
108                 runtime_warning( "The number of workers is currently limited to %u. "
109                                  "The request for %u workers is ignored. Further requests for more workers "
110                                  "will be silently ignored until the limit changes.\n",
111                                  soft_limit_to_report, workers_requested );
112                 // The race is possible when multiple threads report warnings.
113                 // We are OK with that, as there are just multiple warnings.
114                 unsigned expected_limit = soft_limit_to_report;
115                 m->my_workers_soft_limit_to_report.compare_exchange_strong(expected_limit, skip_soft_limit_warning);
116             }
117 
118         }
119         if( m->my_stack_size < stack_size )
120             runtime_warning( "Thread stack size has been already set to %u. "
121                              "The request for larger stack (%u) cannot be satisfied.\n", m->my_stack_size, stack_size );
122         return true;
123     }
124     return false;
125 }
126 
127 market& market::global_market(bool is_public, unsigned workers_requested, std::size_t stack_size) {
128     global_market_mutex_type::scoped_lock lock( theMarketMutex );
129     if( !market::add_ref_unsafe(lock, is_public, workers_requested, stack_size) ) {
130         // TODO: A lot is done under theMarketMutex locked. Can anything be moved out?
131         if( stack_size == 0 )
132             stack_size = global_control::active_value(global_control::thread_stack_size);
133         // Expecting that 4P is suitable for most applications.
134         // Limit to 2P for large thread number.
135         // TODO: ask RML for max concurrency and possibly correct hard_limit
136         const unsigned factor = governor::default_num_threads()<=128? 4 : 2;
137         // The requested number of threads is intentionally not considered in
138         // computation of the hard limit, in order to separate responsibilities
139         // and avoid complicated interactions between global_control and task_scheduler_init.
140         // The market guarantees that at least 256 threads might be created.
141         const unsigned workers_hard_limit = max(max(factor*governor::default_num_threads(), 256u), app_parallelism_limit());
142         const unsigned workers_soft_limit = calc_workers_soft_limit(workers_requested, workers_hard_limit);
143         // Create the global market instance
144         std::size_t size = sizeof(market);
145         __TBB_ASSERT( __TBB_offsetof(market, my_workers) + sizeof(std::atomic<thread_data*>) == sizeof(market),
146                       "my_workers must be the last data field of the market class");
147         size += sizeof(std::atomic<thread_data*>) * (workers_hard_limit - 1);
148         __TBB_InitOnce::add_ref();
149         void* storage = cache_aligned_allocate(size);
150         std::memset( storage, 0, size );
151         // Initialize and publish global market
152         market* m = new (storage) market( workers_soft_limit, workers_hard_limit, stack_size );
153         if( is_public )
154             m->my_public_ref_count.store(1, std::memory_order_relaxed);
155         if (market::is_lifetime_control_present()) {
156             ++m->my_public_ref_count;
157             ++m->my_ref_count;
158         }
159         theMarket = m;
160         // This check relies on the fact that for shared RML default_concurrency==max_concurrency
161         if ( !governor::UsePrivateRML && m->my_server->default_concurrency() < workers_soft_limit )
162             runtime_warning( "RML might limit the number of workers to %u while %u is requested.\n"
163                     , m->my_server->default_concurrency(), workers_soft_limit );
164     }
165     return *theMarket;
166 }
167 
168 void market::destroy () {
169     this->market::~market(); // qualified to suppress warning
170     cache_aligned_deallocate( this );
171     __TBB_InitOnce::remove_ref();
172 }
173 
174 bool market::release ( bool is_public, bool blocking_terminate ) {
175     market::enforce([this] { return theMarket == this; }, "Global market instance was destroyed prematurely?");
176     bool do_release = false;
177     {
178         global_market_mutex_type::scoped_lock lock( theMarketMutex );
179         if ( blocking_terminate ) {
180             __TBB_ASSERT( is_public, "Only an object with a public reference can request the blocking terminate" );
181             while ( my_public_ref_count.load(std::memory_order_relaxed) == 1 &&
182                     my_ref_count.load(std::memory_order_relaxed) > 1 ) {
183                 lock.release();
184                 // To guarantee that request_close_connection() is called by the last external thread, we need to wait till all
185                 // references are released. Re-read my_public_ref_count to limit waiting if new external threads are created.
186                 // Theoretically, new private references to the market can be added during waiting making it potentially
187                 // endless.
188                 // TODO: revise why the weak scheduler needs market's pointer and try to remove this wait.
189                 // Note that the market should know about its schedulers for cancellation/exception/priority propagation,
190                 // see e.g. task_group_context::cancel_group_execution()
191                 while ( my_public_ref_count.load(std::memory_order_acquire) == 1 &&
192                         my_ref_count.load(std::memory_order_acquire) > 1 ) {
193                     yield();
194                 }
195                 lock.acquire( theMarketMutex );
196             }
197         }
198         if ( is_public ) {
199             __TBB_ASSERT( theMarket == this, "Global market instance was destroyed prematurely?" );
200             __TBB_ASSERT( my_public_ref_count.load(std::memory_order_relaxed), nullptr);
201             --my_public_ref_count;
202         }
203         if ( --my_ref_count == 0 ) {
204             __TBB_ASSERT( !my_public_ref_count.load(std::memory_order_relaxed), nullptr);
205             do_release = true;
206             theMarket = nullptr;
207         }
208     }
209     if( do_release ) {
210         __TBB_ASSERT( !my_public_ref_count.load(std::memory_order_relaxed),
211             "No public references remain if we remove the market." );
212         // inform RML that blocking termination is required
213         my_join_workers = blocking_terminate;
214         my_server->request_close_connection();
215         return blocking_terminate;
216     }
217     return false;
218 }
219 
220 int market::update_workers_request() {
221     int old_request = my_num_workers_requested;
222     my_num_workers_requested = min(my_total_demand.load(std::memory_order_relaxed),
223                                    (int)my_num_workers_soft_limit.load(std::memory_order_relaxed));
224 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
225     if (my_mandatory_num_requested > 0) {
226         __TBB_ASSERT(my_num_workers_soft_limit.load(std::memory_order_relaxed) == 0, nullptr);
227         my_num_workers_requested = 1;
228     }
229 #endif
230     update_allotment(my_num_workers_requested);
231     return my_num_workers_requested - old_request;
232 }
233 
234 void market::set_active_num_workers ( unsigned soft_limit ) {
235     market *m;
236 
237     {
238         global_market_mutex_type::scoped_lock lock( theMarketMutex );
239         if ( !theMarket )
240             return; // actual value will be used at market creation
241         m = theMarket;
242         if (m->my_num_workers_soft_limit.load(std::memory_order_relaxed) == soft_limit)
243             return;
244         ++m->my_ref_count;
245     }
246     // have my_ref_count for market, use it safely
247 
248     int delta = 0;
249     {
250         arenas_list_mutex_type::scoped_lock lock( m->my_arenas_list_mutex );
251         __TBB_ASSERT(soft_limit <= m->my_num_workers_hard_limit, nullptr);
252 
253 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
254         arena_list_type* arenas = m->my_arenas;
255 
256         if (m->my_num_workers_soft_limit.load(std::memory_order_relaxed) == 0 &&
257             m->my_mandatory_num_requested > 0)
258         {
259             for (unsigned level = 0; level < num_priority_levels; ++level )
260                 for (arena_list_type::iterator it = arenas[level].begin(); it != arenas[level].end(); ++it)
261                     if (it->my_global_concurrency_mode.load(std::memory_order_relaxed))
262                         m->disable_mandatory_concurrency_impl(&*it);
263         }
264         __TBB_ASSERT(m->my_mandatory_num_requested == 0, nullptr);
265 #endif
266 
267         m->my_num_workers_soft_limit.store(soft_limit, std::memory_order_release);
268         // report only once after new soft limit value is set
269         m->my_workers_soft_limit_to_report.store(soft_limit, std::memory_order_relaxed);
270 
271 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
272         if (m->my_num_workers_soft_limit.load(std::memory_order_relaxed) == 0) {
273             for (unsigned level = 0; level < num_priority_levels; ++level )
274                 for (arena_list_type::iterator it = arenas[level].begin(); it != arenas[level].end(); ++it)
275                     if (it->has_enqueued_tasks())
276                         m->enable_mandatory_concurrency_impl(&*it);
277         }
278 #endif
279 
280         delta = m->update_workers_request();
281     }
282     // adjust_job_count_estimate must be called outside of any locks
283     if( delta!=0 )
284         m->my_server->adjust_job_count_estimate( delta );
285     // release internal market reference to match ++m->my_ref_count above
286     m->release( /*is_public=*/false, /*blocking_terminate=*/false );
287 }
288 
289 bool governor::does_client_join_workers (const rml::tbb_client &client) {
290     return ((const market&)client).must_join_workers();
291 }
292 
293 arena* market::create_arena ( int num_slots, int num_reserved_slots, unsigned arena_priority_level,
294                               std::size_t stack_size )
295 {
296     __TBB_ASSERT( num_slots > 0, nullptr);
297     __TBB_ASSERT( num_reserved_slots <= num_slots, nullptr);
298     // Add public market reference for an external thread/task_arena (that adds an internal reference in exchange).
299     market &m = global_market( /*is_public=*/true, num_slots-num_reserved_slots, stack_size );
300     arena& a = arena::allocate_arena( m, num_slots, num_reserved_slots, arena_priority_level );
301     // Add newly created arena into the existing market's list.
302     arenas_list_mutex_type::scoped_lock lock(m.my_arenas_list_mutex);
303     m.insert_arena_into_list(a);
304     return &a;
305 }
306 
307 /** This method must be invoked under my_arenas_list_mutex. **/
308 void market::detach_arena ( arena& a ) {
309     market::enforce([this] { return theMarket == this; }, "Global market instance was destroyed prematurely?");
310     __TBB_ASSERT( !a.my_slots[0].is_occupied(), nullptr);
311     if (a.my_global_concurrency_mode.load(std::memory_order_relaxed))
312         disable_mandatory_concurrency_impl(&a);
313 
314     remove_arena_from_list(a);
315     if (a.my_aba_epoch == my_arenas_aba_epoch.load(std::memory_order_relaxed)) {
316         my_arenas_aba_epoch.store(my_arenas_aba_epoch.load(std::memory_order_relaxed) + 1, std::memory_order_relaxed);
317     }
318 }
319 
320 void market::try_destroy_arena ( arena* a, uintptr_t aba_epoch, unsigned priority_level ) {
321     bool locked = true;
322     __TBB_ASSERT( a, nullptr);
323     // we hold reference to the server, so market cannot be destroyed at any moment here
324     __TBB_ASSERT(!is_poisoned(my_server), nullptr);
325     my_arenas_list_mutex.lock();
326         arena_list_type::iterator it = my_arenas[priority_level].begin();
327         for ( ; it != my_arenas[priority_level].end(); ++it ) {
328             if ( a == &*it ) {
329                 if ( it->my_aba_epoch == aba_epoch ) {
330                     // Arena is alive
331                     // Acquire my_references to sync with threads that just left the arena
332                     if (!a->my_num_workers_requested && !a->my_references.load(std::memory_order_acquire)) {
333                         __TBB_ASSERT(
334                             !a->my_num_workers_allotted.load(std::memory_order_relaxed) &&
335                             (a->my_pool_state == arena::SNAPSHOT_EMPTY || !a->my_max_num_workers),
336                             "Inconsistent arena state"
337                         );
338                         // Arena is abandoned. Destroy it.
339                         detach_arena( *a );
340                         my_arenas_list_mutex.unlock();
341                         locked = false;
342                         a->free_arena();
343                     }
344                 }
345                 if (locked)
346                     my_arenas_list_mutex.unlock();
347                 return;
348             }
349         }
350     my_arenas_list_mutex.unlock();
351 }
352 
353 /** This method must be invoked under my_arenas_list_mutex. **/
354 arena* market::arena_in_need ( arena_list_type* arenas, arena* hint ) {
355     // TODO: make sure arena with higher priority returned only if there are available slots in it.
356     hint = select_next_arena( hint );
357     if ( !hint )
358         return nullptr;
359     arena_list_type::iterator it = hint;
360     unsigned curr_priority_level = hint->my_priority_level;
361     __TBB_ASSERT( it != arenas[curr_priority_level].end(), nullptr );
362     do {
363         arena& a = *it;
364         if ( ++it == arenas[curr_priority_level].end() ) {
365             do {
366                 ++curr_priority_level %= num_priority_levels;
367             } while ( arenas[curr_priority_level].empty() );
368             it = arenas[curr_priority_level].begin();
369         }
370         if( a.num_workers_active() < a.my_num_workers_allotted.load(std::memory_order_relaxed) ) {
371             a.my_references += arena::ref_worker;
372             return &a;
373         }
374     } while ( it != hint );
375     return nullptr;
376 }
377 
378 arena* market::arena_in_need(arena* prev) {
379     if (my_total_demand.load(std::memory_order_acquire) <= 0)
380         return nullptr;
381     arenas_list_mutex_type::scoped_lock lock(my_arenas_list_mutex, /*is_writer=*/false);
382     // TODO: introduce three state response: alive, not_alive, no_market_arenas
383     if ( is_arena_alive(prev) )
384         return arena_in_need(my_arenas, prev);
385     return arena_in_need(my_arenas, my_next_arena);
386 }
387 
388 int market::update_allotment ( arena_list_type* arenas, int workers_demand, int max_workers ) {
389     __TBB_ASSERT( workers_demand > 0, nullptr );
390     max_workers = min(workers_demand, max_workers);
391     int unassigned_workers = max_workers;
392     int assigned = 0;
393     int carry = 0;
394     unsigned max_priority_level = num_priority_levels;
395     for (unsigned list_idx = 0; list_idx < num_priority_levels; ++list_idx ) {
396         int assigned_per_priority = min(my_priority_level_demand[list_idx], unassigned_workers);
397         unassigned_workers -= assigned_per_priority;
398         for (arena_list_type::iterator it = arenas[list_idx].begin(); it != arenas[list_idx].end(); ++it) {
399             arena& a = *it;
400             __TBB_ASSERT(a.my_num_workers_requested >= 0, nullptr);
401             __TBB_ASSERT(a.my_num_workers_requested <= int(a.my_max_num_workers)
402                 || (a.my_max_num_workers == 0 && a.my_local_concurrency_requests > 0 && a.my_num_workers_requested == 1), nullptr);
403             if (a.my_num_workers_requested == 0) {
404                 __TBB_ASSERT(!a.my_num_workers_allotted.load(std::memory_order_relaxed), nullptr);
405                 continue;
406             }
407 
408             if (max_priority_level == num_priority_levels) {
409                 max_priority_level = list_idx;
410             }
411 
412             int allotted = 0;
413 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
414             if (my_num_workers_soft_limit.load(std::memory_order_relaxed) == 0) {
415                 __TBB_ASSERT(max_workers == 0 || max_workers == 1, nullptr);
416                 allotted = a.my_global_concurrency_mode.load(std::memory_order_relaxed) &&
417                     assigned < max_workers ? 1 : 0;
418             } else
419 #endif
420             {
421                 int tmp = a.my_num_workers_requested * assigned_per_priority + carry;
422                 allotted = tmp / my_priority_level_demand[list_idx];
423                 carry = tmp % my_priority_level_demand[list_idx];
424                 __TBB_ASSERT(allotted <= a.my_num_workers_requested, nullptr);
425                 __TBB_ASSERT(allotted <= int(a.my_num_slots - a.my_num_reserved_slots), nullptr);
426             }
427             a.my_num_workers_allotted.store(allotted, std::memory_order_relaxed);
428             a.my_is_top_priority.store(list_idx == max_priority_level, std::memory_order_relaxed);
429             assigned += allotted;
430         }
431     }
432     __TBB_ASSERT( 0 <= assigned && assigned <= max_workers, nullptr );
433     return assigned;
434 }
435 
436 /** This method must be invoked under my_arenas_list_mutex. **/
437 bool market::is_arena_in_list( arena_list_type &arenas, arena *a ) {
438     __TBB_ASSERT( a, "Expected non-null pointer to arena." );
439     for ( arena_list_type::iterator it = arenas.begin(); it != arenas.end(); ++it )
440         if ( a == &*it )
441             return true;
442     return false;
443 }
444 
445 /** This method must be invoked under my_arenas_list_mutex. **/
446 bool market::is_arena_alive(arena* a) {
447     if ( !a )
448         return false;
449 
450     // Still cannot access internals of the arena since the object itself might be destroyed.
451 
452     for ( unsigned idx = 0; idx < num_priority_levels; ++idx ) {
453         if ( is_arena_in_list( my_arenas[idx], a ) )
454             return true;
455     }
456     return false;
457 }
458 
459 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
460 void market::enable_mandatory_concurrency_impl ( arena *a ) {
461     __TBB_ASSERT(!a->my_global_concurrency_mode.load(std::memory_order_relaxed), nullptr);
462     __TBB_ASSERT(my_num_workers_soft_limit.load(std::memory_order_relaxed) == 0, nullptr);
463 
464     a->my_global_concurrency_mode.store(true, std::memory_order_relaxed);
465     my_mandatory_num_requested++;
466 }
467 
468 void market::enable_mandatory_concurrency ( arena *a ) {
469     int delta = 0;
470     {
471         arenas_list_mutex_type::scoped_lock lock(my_arenas_list_mutex);
472         if (my_num_workers_soft_limit.load(std::memory_order_relaxed) != 0 ||
473             a->my_global_concurrency_mode.load(std::memory_order_relaxed))
474             return;
475 
476         enable_mandatory_concurrency_impl(a);
477         delta = update_workers_request();
478     }
479 
480     if (delta != 0)
481         my_server->adjust_job_count_estimate(delta);
482 }
483 
484 void market::disable_mandatory_concurrency_impl(arena* a) {
485     __TBB_ASSERT(a->my_global_concurrency_mode.load(std::memory_order_relaxed), nullptr);
486     __TBB_ASSERT(my_mandatory_num_requested > 0, nullptr);
487 
488     a->my_global_concurrency_mode.store(false, std::memory_order_relaxed);
489     my_mandatory_num_requested--;
490 }
491 
492 void market::mandatory_concurrency_disable ( arena *a ) {
493     int delta = 0;
494     {
495         arenas_list_mutex_type::scoped_lock lock(my_arenas_list_mutex);
496         if (!a->my_global_concurrency_mode.load(std::memory_order_relaxed))
497             return;
498         // There is a racy window in advertise_new_work between mandtory concurrency enabling and
499         // setting SNAPSHOT_FULL. It gives a chance to spawn request to disable mandatory concurrency.
500         // Therefore, we double check that there is no enqueued tasks.
501         if (a->has_enqueued_tasks())
502             return;
503 
504         __TBB_ASSERT(my_num_workers_soft_limit.load(std::memory_order_relaxed) == 0, nullptr);
505         disable_mandatory_concurrency_impl(a);
506 
507         delta = update_workers_request();
508     }
509     if (delta != 0)
510         my_server->adjust_job_count_estimate(delta);
511 }
512 #endif /* __TBB_ENQUEUE_ENFORCED_CONCURRENCY */
513 
514 void market::adjust_demand ( arena& a, int delta, bool mandatory ) {
515     if (!delta) {
516         return;
517     }
518     int target_epoch{};
519     {
520         arenas_list_mutex_type::scoped_lock lock(my_arenas_list_mutex);
521         __TBB_ASSERT(theMarket != nullptr, "market instance was destroyed prematurely?");
522 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
523         if (mandatory) {
524             __TBB_ASSERT(delta == 1 || delta == -1, nullptr);
525             // Count the number of mandatory requests and proceed only for 0->1 and 1->0 transitions.
526             a.my_local_concurrency_requests += delta;
527             if ((delta > 0 && a.my_local_concurrency_requests != 1) ||
528                 (delta < 0 && a.my_local_concurrency_requests != 0))
529             {
530                 return;
531             }
532         }
533 #endif
534         a.my_total_num_workers_requested += delta;
535         int target_workers = 0;
536         // Cap target_workers into interval [0, a.my_max_num_workers]
537         if (a.my_total_num_workers_requested > 0) {
538 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
539             // At least one thread should be requested when mandatory concurrency
540             int max_num_workers = int(a.my_max_num_workers);
541             if (a.my_local_concurrency_requests > 0 && max_num_workers == 0) {
542                 max_num_workers = 1;
543             }
544 #endif
545             target_workers = min(a.my_total_num_workers_requested, max_num_workers);
546         }
547 
548         delta = target_workers - a.my_num_workers_requested;
549 
550         if (delta == 0) {
551             return;
552         }
553 
554         a.my_num_workers_requested += delta;
555         if (a.my_num_workers_requested == 0) {
556             a.my_num_workers_allotted.store(0, std::memory_order_relaxed);
557         }
558 
559         int total_demand = my_total_demand.load(std::memory_order_relaxed) + delta;
560         my_total_demand.store(total_demand, std::memory_order_relaxed);
561         my_priority_level_demand[a.my_priority_level] += delta;
562         unsigned effective_soft_limit = my_num_workers_soft_limit.load(std::memory_order_relaxed);
563         if (my_mandatory_num_requested > 0) {
564             __TBB_ASSERT(effective_soft_limit == 0, nullptr);
565             effective_soft_limit = 1;
566         }
567 
568         update_allotment(effective_soft_limit);
569         if (delta > 0) {
570             // can't overflow soft_limit, but remember values request by arenas in
571             // my_total_demand to not prematurely release workers to RML
572             if (my_num_workers_requested + delta > (int)effective_soft_limit)
573                 delta = effective_soft_limit - my_num_workers_requested;
574         }
575         else {
576             // the number of workers should not be decreased below my_total_demand
577             if (my_num_workers_requested + delta < total_demand)
578                 delta = min(total_demand, (int)effective_soft_limit) - my_num_workers_requested;
579         }
580         my_num_workers_requested += delta;
581         __TBB_ASSERT(my_num_workers_requested <= (int)effective_soft_limit, nullptr);
582 
583         target_epoch = a.my_adjust_demand_target_epoch++;
584     }
585 
586     a.my_adjust_demand_current_epoch.wait_until(target_epoch, /* context = */ target_epoch, std::memory_order_relaxed);
587     // Must be called outside of any locks
588     my_server->adjust_job_count_estimate( delta );
589     a.my_adjust_demand_current_epoch.exchange(target_epoch + 1);
590     a.my_adjust_demand_current_epoch.notify_relaxed(target_epoch + 1);
591 }
592 
593 void market::process( job& j ) {
594     thread_data& td = static_cast<thread_data&>(j);
595     // td.my_arena can be dead. Don't access it until arena_in_need is called
596     arena *a = td.my_arena;
597     for (int i = 0; i < 2; ++i) {
598         while ( (a = arena_in_need(a)) ) {
599             a->process(td);
600         }
601         // Workers leave market because there is no arena in need. It can happen earlier than
602         // adjust_job_count_estimate() decreases my_slack and RML can put this thread to sleep.
603         // It might result in a busy-loop checking for my_slack<0 and calling this method instantly.
604         // the yield refines this spinning.
605         if ( !i ) {
606             yield();
607         }
608     }
609 }
610 
611 void market::cleanup( job& j) {
612     market::enforce([this] { return theMarket != this; }, nullptr );
613     governor::auto_terminate(&j);
614 }
615 
616 void market::acknowledge_close_connection() {
617     destroy();
618 }
619 
620 ::rml::job* market::create_one_job() {
621     unsigned short index = ++my_first_unused_worker_idx;
622     __TBB_ASSERT( index > 0, nullptr);
623     ITT_THREAD_SET_NAME(_T("TBB Worker Thread"));
624     // index serves as a hint decreasing conflicts between workers when they migrate between arenas
625     thread_data* td = new(cache_aligned_allocate(sizeof(thread_data))) thread_data{ index, true };
626     __TBB_ASSERT( index <= my_num_workers_hard_limit, nullptr);
627     __TBB_ASSERT( my_workers[index - 1].load(std::memory_order_relaxed) == nullptr, nullptr);
628     my_workers[index - 1].store(td, std::memory_order_release);
629     return td;
630 }
631 
632 void market::add_external_thread(thread_data& td) {
633     context_state_propagation_mutex_type::scoped_lock lock(the_context_state_propagation_mutex);
634     my_masters.push_front(td);
635 }
636 
637 void market::remove_external_thread(thread_data& td) {
638     context_state_propagation_mutex_type::scoped_lock lock(the_context_state_propagation_mutex);
639     my_masters.remove(td);
640 }
641 
642 } // namespace r1
643 } // namespace detail
644 } // namespace tbb
645