xref: /oneTBB/src/tbb/market.cpp (revision d86ed7fb)
1 /*
2     Copyright (c) 2005-2020 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #include "oneapi/tbb/global_control.h" // global_control::active_value
18 
19 #include "market.h"
20 #include "main.h"
21 #include "governor.h"
22 #include "arena.h"
23 #include "thread_data.h"
24 #include "itt_notify.h"
25 
26 #include <cstring> // std::memset()
27 
28 namespace tbb {
29 namespace detail {
30 namespace r1 {
31 
32 /** This method must be invoked under my_arenas_list_mutex. **/
33 arena* market::select_next_arena( arena* hint ) {
34     unsigned next_arena_priority_level = num_priority_levels;
35     if ( hint )
36         next_arena_priority_level = hint->my_priority_level;
37     for ( unsigned idx = 0; idx < next_arena_priority_level; ++idx ) {
38         if ( !my_arenas[idx].empty() )
39             return &*my_arenas[idx].begin();
40     }
41     // don't change if arena with higher priority is not found.
42     return hint;
43 }
44 
45 void market::insert_arena_into_list ( arena& a ) {
46     __TBB_ASSERT( a.my_priority_level < num_priority_levels, nullptr );
47     my_arenas[a.my_priority_level].push_front( a );
48     __TBB_ASSERT( !my_next_arena || my_next_arena->my_priority_level < num_priority_levels, nullptr );
49     my_next_arena = select_next_arena( my_next_arena );
50 }
51 
52 void market::remove_arena_from_list ( arena& a ) {
53     __TBB_ASSERT( a.my_priority_level < num_priority_levels, nullptr );
54     my_arenas[a.my_priority_level].remove( a );
55     if ( my_next_arena == &a )
56         my_next_arena = nullptr;
57     my_next_arena = select_next_arena( my_next_arena );
58 }
59 
60 //------------------------------------------------------------------------
61 // market
62 //------------------------------------------------------------------------
63 
64 market::market ( unsigned workers_soft_limit, unsigned workers_hard_limit, std::size_t stack_size )
65     : my_num_workers_hard_limit(workers_hard_limit)
66     , my_num_workers_soft_limit(workers_soft_limit)
67     , my_next_arena(nullptr)
68     , my_ref_count(1)
69     , my_stack_size(stack_size)
70     , my_workers_soft_limit_to_report(workers_soft_limit)
71 {
72     // Once created RML server will start initializing workers that will need
73     // global market instance to get worker stack size
74     my_server = governor::create_rml_server( *this );
75     __TBB_ASSERT( my_server, "Failed to create RML server" );
76 }
77 
78 static unsigned calc_workers_soft_limit(unsigned workers_soft_limit, unsigned workers_hard_limit) {
79     if( int soft_limit = market::app_parallelism_limit() )
80         workers_soft_limit = soft_limit-1;
81     else // if user set no limits (yet), use market's parameter
82         workers_soft_limit = max( governor::default_num_threads() - 1, workers_soft_limit );
83     if( workers_soft_limit >= workers_hard_limit )
84         workers_soft_limit = workers_hard_limit-1;
85     return workers_soft_limit;
86 }
87 
88 bool market::add_ref_unsafe( global_market_mutex_type::scoped_lock& lock, bool is_public, unsigned workers_requested, std::size_t stack_size ) {
89     market *m = theMarket;
90     if( m ) {
91         ++m->my_ref_count;
92         const unsigned old_public_count = is_public ? m->my_public_ref_count++ : /*any non-zero value*/1;
93         lock.release();
94         if( old_public_count==0 )
95             set_active_num_workers( calc_workers_soft_limit(workers_requested, m->my_num_workers_hard_limit) );
96 
97         // do not warn if default number of workers is requested
98         if( workers_requested != governor::default_num_threads()-1 ) {
99             __TBB_ASSERT( skip_soft_limit_warning > workers_requested,
100                           "skip_soft_limit_warning must be larger than any valid workers_requested" );
101             unsigned soft_limit_to_report = m->my_workers_soft_limit_to_report.load(std::memory_order_relaxed);
102             if( soft_limit_to_report < workers_requested ) {
103                 runtime_warning( "The number of workers is currently limited to %u. "
104                                  "The request for %u workers is ignored. Further requests for more workers "
105                                  "will be silently ignored until the limit changes.\n",
106                                  soft_limit_to_report, workers_requested );
107                 // The race is possible when multiple threads report warnings.
108                 // We are OK with that, as there are just multiple warnings.
109                 unsigned expected_limit = soft_limit_to_report;
110                 m->my_workers_soft_limit_to_report.compare_exchange_strong(expected_limit, skip_soft_limit_warning);
111             }
112 
113         }
114         if( m->my_stack_size < stack_size )
115             runtime_warning( "Thread stack size has been already set to %u. "
116                              "The request for larger stack (%u) cannot be satisfied.\n", m->my_stack_size, stack_size );
117         return true;
118     }
119     return false;
120 }
121 
122 market& market::global_market(bool is_public, unsigned workers_requested, std::size_t stack_size) {
123     global_market_mutex_type::scoped_lock lock( theMarketMutex );
124     if( !market::add_ref_unsafe(lock, is_public, workers_requested, stack_size) ) {
125         // TODO: A lot is done under theMarketMutex locked. Can anything be moved out?
126         if( stack_size == 0 )
127             stack_size = global_control::active_value(global_control::thread_stack_size);
128         // Expecting that 4P is suitable for most applications.
129         // Limit to 2P for large thread number.
130         // TODO: ask RML for max concurrency and possibly correct hard_limit
131         const unsigned factor = governor::default_num_threads()<=128? 4 : 2;
132         // The requested number of threads is intentionally not considered in
133         // computation of the hard limit, in order to separate responsibilities
134         // and avoid complicated interactions between global_control and task_scheduler_init.
135         // The market guarantees that at least 256 threads might be created.
136         const unsigned workers_hard_limit = max(max(factor*governor::default_num_threads(), 256u), app_parallelism_limit());
137         const unsigned workers_soft_limit = calc_workers_soft_limit(workers_requested, workers_hard_limit);
138         // Create the global market instance
139         std::size_t size = sizeof(market);
140         __TBB_ASSERT( __TBB_offsetof(market, my_workers) + sizeof(thread_data*) == sizeof(market),
141                       "my_workers must be the last data field of the market class");
142         size += sizeof(thread_data*) * (workers_hard_limit - 1);
143         __TBB_InitOnce::add_ref();
144         void* storage = cache_aligned_allocate(size);
145         std::memset( storage, 0, size );
146         // Initialize and publish global market
147         market* m = new (storage) market( workers_soft_limit, workers_hard_limit, stack_size );
148         if( is_public )
149             m->my_public_ref_count.store(1, std::memory_order_relaxed);
150 #if __TBB_SUPPORTS_WORKERS_WAITING_IN_TERMINATE
151         if (market::is_lifetime_control_present()) {
152             ++m->my_public_ref_count;
153             ++m->my_ref_count;
154         }
155 #endif // __TBB_SUPPORTS_WORKERS_WAITING_IN_TERMINATE
156         theMarket = m;
157         // This check relies on the fact that for shared RML default_concurrency==max_concurrency
158         if ( !governor::UsePrivateRML && m->my_server->default_concurrency() < workers_soft_limit )
159             runtime_warning( "RML might limit the number of workers to %u while %u is requested.\n"
160                     , m->my_server->default_concurrency(), workers_soft_limit );
161     }
162     return *theMarket;
163 }
164 
165 void market::destroy () {
166     this->market::~market(); // qualified to suppress warning
167     cache_aligned_deallocate( this );
168     __TBB_InitOnce::remove_ref();
169 }
170 
171 bool market::release ( bool is_public, bool blocking_terminate ) {
172     market::enforce([this] { return theMarket == this; }, "Global market instance was destroyed prematurely?");
173     bool do_release = false;
174     {
175         global_market_mutex_type::scoped_lock lock( theMarketMutex );
176         if ( blocking_terminate ) {
177             __TBB_ASSERT( is_public, "Only an object with a public reference can request the blocking terminate" );
178             while ( my_public_ref_count.load(std::memory_order_relaxed) == 1 &&
179                     my_ref_count.load(std::memory_order_relaxed) > 1 ) {
180                 lock.release();
181                 // To guarantee that request_close_connection() is called by the last master, we need to wait till all
182                 // references are released. Re-read my_public_ref_count to limit waiting if new masters are created.
183                 // Theoretically, new private references to the market can be added during waiting making it potentially
184                 // endless.
185                 // TODO: revise why the weak scheduler needs market's pointer and try to remove this wait.
186                 // Note that the market should know about its schedulers for cancellation/exception/priority propagation,
187                 // see e.g. task_group_context::cancel_group_execution()
188                 while ( my_public_ref_count.load(std::memory_order_acquire) == 1 &&
189                         my_ref_count.load(std::memory_order_acquire) > 1 ) {
190                     yield();
191                 }
192                 lock.acquire( theMarketMutex );
193             }
194         }
195         if ( is_public ) {
196             __TBB_ASSERT( theMarket == this, "Global market instance was destroyed prematurely?" );
197             __TBB_ASSERT( my_public_ref_count.load(std::memory_order_relaxed), NULL );
198             --my_public_ref_count;
199         }
200         if ( --my_ref_count == 0 ) {
201             __TBB_ASSERT( !my_public_ref_count.load(std::memory_order_relaxed), NULL );
202             do_release = true;
203             theMarket = NULL;
204         }
205     }
206     if( do_release ) {
207         __TBB_ASSERT( !my_public_ref_count.load(std::memory_order_relaxed),
208             "No public references remain if we remove the market." );
209         // inform RML that blocking termination is required
210         my_join_workers = blocking_terminate;
211         my_server->request_close_connection();
212         return blocking_terminate;
213     }
214     return false;
215 }
216 
217 int market::update_workers_request() {
218     int old_request = my_num_workers_requested;
219     my_num_workers_requested = min(my_total_demand.load(std::memory_order_relaxed),
220                                    (int)my_num_workers_soft_limit.load(std::memory_order_relaxed));
221 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
222     if (my_mandatory_num_requested > 0) {
223         __TBB_ASSERT(my_num_workers_soft_limit.load(std::memory_order_relaxed) == 0, NULL);
224         my_num_workers_requested = 1;
225     }
226 #endif
227     update_allotment(my_num_workers_requested);
228     return my_num_workers_requested - old_request;
229 }
230 
231 void market::set_active_num_workers ( unsigned soft_limit ) {
232     market *m;
233 
234     {
235         global_market_mutex_type::scoped_lock lock( theMarketMutex );
236         if ( !theMarket )
237             return; // actual value will be used at market creation
238         m = theMarket;
239         if (m->my_num_workers_soft_limit.load(std::memory_order_relaxed) == soft_limit)
240             return;
241         ++m->my_ref_count;
242     }
243     // have my_ref_count for market, use it safely
244 
245     int delta = 0;
246     {
247         arenas_list_mutex_type::scoped_lock lock( m->my_arenas_list_mutex );
248         __TBB_ASSERT(soft_limit <= m->my_num_workers_hard_limit, NULL);
249 
250 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
251         arena_list_type* arenas = m->my_arenas;
252 
253         if (m->my_num_workers_soft_limit.load(std::memory_order_relaxed) == 0 &&
254             m->my_mandatory_num_requested > 0)
255         {
256             for (unsigned level = 0; level < num_priority_levels; ++level )
257                 for (arena_list_type::iterator it = arenas[level].begin(); it != arenas[level].end(); ++it)
258                     if (it->my_global_concurrency_mode.load(std::memory_order_relaxed))
259                         m->disable_mandatory_concurrency_impl(&*it);
260         }
261         __TBB_ASSERT(m->my_mandatory_num_requested == 0, NULL);
262 #endif
263 
264         m->my_num_workers_soft_limit.store(soft_limit, std::memory_order_release);
265         // report only once after new soft limit value is set
266         m->my_workers_soft_limit_to_report.store(soft_limit, std::memory_order_relaxed);
267 
268 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
269         if (m->my_num_workers_soft_limit.load(std::memory_order_relaxed) == 0) {
270             for (unsigned level = 0; level < num_priority_levels; ++level )
271                 for (arena_list_type::iterator it = arenas[level].begin(); it != arenas[level].end(); ++it)
272                     if (it->has_enqueued_tasks())
273                         m->enable_mandatory_concurrency_impl(&*it);
274         }
275 #endif
276 
277         delta = m->update_workers_request();
278     }
279     // adjust_job_count_estimate must be called outside of any locks
280     if( delta!=0 )
281         m->my_server->adjust_job_count_estimate( delta );
282     // release internal market reference to match ++m->my_ref_count above
283     m->release( /*is_public=*/false, /*blocking_terminate=*/false );
284 }
285 
286 bool governor::does_client_join_workers (const rml::tbb_client &client) {
287     return ((const market&)client).must_join_workers();
288 }
289 
290 arena* market::create_arena ( int num_slots, int num_reserved_slots, unsigned arena_priority_level,
291                               std::size_t stack_size )
292 {
293     __TBB_ASSERT( num_slots > 0, NULL );
294     __TBB_ASSERT( num_reserved_slots <= num_slots, NULL );
295     // Add public market reference for master thread/task_arena (that adds an internal reference in exchange).
296     market &m = global_market( /*is_public=*/true, num_slots-num_reserved_slots, stack_size );
297     arena& a = arena::allocate_arena( m, num_slots, num_reserved_slots, arena_priority_level );
298     // Add newly created arena into the existing market's list.
299     arenas_list_mutex_type::scoped_lock lock(m.my_arenas_list_mutex);
300     m.insert_arena_into_list(a);
301     return &a;
302 }
303 
304 /** This method must be invoked under my_arenas_list_mutex. **/
305 void market::detach_arena ( arena& a ) {
306     market::enforce([this] { return theMarket == this; }, "Global market instance was destroyed prematurely?");
307     __TBB_ASSERT( !a.my_slots[0].is_occupied(), NULL );
308     if (a.my_global_concurrency_mode.load(std::memory_order_relaxed))
309         disable_mandatory_concurrency_impl(&a);
310 
311     remove_arena_from_list(a);
312     if (a.my_aba_epoch == my_arenas_aba_epoch.load(std::memory_order_relaxed)) {
313         my_arenas_aba_epoch.store(my_arenas_aba_epoch.load(std::memory_order_relaxed) + 1, std::memory_order_relaxed);
314     }
315 }
316 
317 void market::try_destroy_arena ( arena* a, uintptr_t aba_epoch, unsigned priority_level ) {
318     bool locked = true;
319     __TBB_ASSERT( a, NULL );
320     // we hold reference to the market, so it cannot be destroyed at any moment here
321     market::enforce([this] { return theMarket == this; }, NULL);
322     __TBB_ASSERT( my_ref_count!=0, NULL );
323     my_arenas_list_mutex.lock();
324         arena_list_type::iterator it = my_arenas[priority_level].begin();
325         for ( ; it != my_arenas[priority_level].end(); ++it ) {
326             if ( a == &*it ) {
327                 if ( it->my_aba_epoch == aba_epoch ) {
328                     // Arena is alive
329                     if ( !a->my_num_workers_requested && !a->my_references.load(std::memory_order_relaxed) ) {
330                         __TBB_ASSERT(
331                             !a->my_num_workers_allotted.load(std::memory_order_relaxed) &&
332                             (a->my_pool_state == arena::SNAPSHOT_EMPTY || !a->my_max_num_workers),
333                             "Inconsistent arena state"
334                         );
335                         // Arena is abandoned. Destroy it.
336                         detach_arena( *a );
337                         my_arenas_list_mutex.unlock();
338                         locked = false;
339                         a->free_arena();
340                     }
341                 }
342                 if (locked)
343                     my_arenas_list_mutex.unlock();
344                 return;
345             }
346         }
347     my_arenas_list_mutex.unlock();
348 }
349 
350 /** This method must be invoked under my_arenas_list_mutex. **/
351 arena* market::arena_in_need ( arena_list_type* arenas, arena* hint ) {
352     // TODO: make sure arena with higher priority returned only if there are available slots in it.
353     hint = select_next_arena( hint );
354     if ( !hint )
355         return nullptr;
356     arena_list_type::iterator it = hint;
357     unsigned curr_priority_level = hint->my_priority_level;
358     __TBB_ASSERT( it != arenas[curr_priority_level].end(), nullptr );
359     do {
360         arena& a = *it;
361         if ( ++it == arenas[curr_priority_level].end() ) {
362             do {
363                 ++curr_priority_level %= num_priority_levels;
364             } while ( arenas[curr_priority_level].empty() );
365             it = arenas[curr_priority_level].begin();
366         }
367         if( a.num_workers_active() < a.my_num_workers_allotted.load(std::memory_order_relaxed) ) {
368             a.my_references += arena::ref_worker;
369             return &a;
370         }
371     } while ( it != hint );
372     return nullptr;
373 }
374 
375 arena* market::arena_in_need(arena* prev) {
376     if (my_total_demand.load(std::memory_order_acquire) <= 0)
377         return nullptr;
378     arenas_list_mutex_type::scoped_lock lock(my_arenas_list_mutex, /*is_writer=*/false);
379     // TODO: introduce three state response: alive, not_alive, no_market_arenas
380     if ( is_arena_alive(prev) )
381         return arena_in_need(my_arenas, prev);
382     return arena_in_need(my_arenas, my_next_arena);
383 }
384 
385 int market::update_allotment ( arena_list_type* arenas, int workers_demand, int max_workers ) {
386     __TBB_ASSERT( workers_demand > 0, nullptr );
387     max_workers = min(workers_demand, max_workers);
388     int unassigned_workers = max_workers;
389     int assigned = 0;
390     int carry = 0;
391     unsigned max_priority_level = num_priority_levels;
392     for (unsigned list_idx = 0; list_idx < num_priority_levels; ++list_idx ) {
393         int assigned_per_priority = 0;
394         for (arena_list_type::iterator it = arenas[list_idx].begin(); it != arenas[list_idx].end(); ++it) {
395             arena& a = *it;
396             __TBB_ASSERT(a.my_num_workers_requested >= 0 && a.my_num_workers_requested <= int(a.my_max_num_workers), nullptr);
397             if (a.my_num_workers_requested == 0) {
398                 __TBB_ASSERT(!a.my_num_workers_allotted.load(std::memory_order_relaxed), nullptr);
399                 continue;
400             }
401 
402             if (max_priority_level == num_priority_levels) {
403                 max_priority_level = list_idx;
404             }
405 
406             int allotted = 0;
407 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
408             if (my_num_workers_soft_limit.load(std::memory_order_relaxed) == 0) {
409                 __TBB_ASSERT(max_workers == 0 || max_workers == 1, nullptr);
410                 allotted = a.my_global_concurrency_mode.load(std::memory_order_relaxed) &&
411                     assigned < max_workers ? 1 : 0;
412             } else
413 #endif
414             {
415                 int tmp = a.my_num_workers_requested * unassigned_workers + carry;
416                 allotted = tmp / my_priority_level_demand[list_idx];
417                 carry = tmp % my_priority_level_demand[list_idx];
418                 // a.my_num_workers_requested may temporarily exceed a.my_max_num_workers
419                 allotted = min(allotted, (int)a.my_max_num_workers);
420             }
421             a.my_num_workers_allotted.store(allotted, std::memory_order_relaxed);
422             a.my_is_top_priority.store(list_idx == max_priority_level, std::memory_order_relaxed);
423             assigned += allotted;
424             assigned_per_priority += allotted;
425         }
426         unassigned_workers -= assigned_per_priority;
427     }
428     __TBB_ASSERT( 0 <= assigned && assigned <= max_workers, nullptr );
429     return assigned;
430 }
431 
432 /** This method must be invoked under my_arenas_list_mutex. **/
433 bool market::is_arena_in_list( arena_list_type &arenas, arena *a ) {
434     __TBB_ASSERT( a, "Expected non-null pointer to arena." );
435     for ( arena_list_type::iterator it = arenas.begin(); it != arenas.end(); ++it )
436         if ( a == &*it )
437             return true;
438     return false;
439 }
440 
441 /** This method must be invoked under my_arenas_list_mutex. **/
442 bool market::is_arena_alive(arena* a) {
443     if ( !a )
444         return false;
445 
446     // Still cannot access internals of the arena since the object itself might be destroyed.
447 
448     for ( unsigned idx = 0; idx < num_priority_levels; ++idx ) {
449         if ( is_arena_in_list( my_arenas[idx], a ) )
450             return true;
451     }
452     return false;
453 }
454 
455 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
456 void market::enable_mandatory_concurrency_impl ( arena *a ) {
457     __TBB_ASSERT(!a->my_global_concurrency_mode.load(std::memory_order_relaxed), NULL);
458     __TBB_ASSERT(my_num_workers_soft_limit.load(std::memory_order_relaxed) == 0, NULL);
459 
460     a->my_global_concurrency_mode.store(true, std::memory_order_relaxed);
461     my_mandatory_num_requested++;
462 }
463 
464 void market::enable_mandatory_concurrency ( arena *a ) {
465     int delta = 0;
466     {
467         arenas_list_mutex_type::scoped_lock lock(my_arenas_list_mutex);
468         if (my_num_workers_soft_limit.load(std::memory_order_relaxed) != 0 ||
469             a->my_global_concurrency_mode.load(std::memory_order_relaxed))
470             return;
471 
472         enable_mandatory_concurrency_impl(a);
473         delta = update_workers_request();
474     }
475 
476     if (delta != 0)
477         my_server->adjust_job_count_estimate(delta);
478 }
479 
480 void market::disable_mandatory_concurrency_impl(arena* a) {
481     __TBB_ASSERT(a->my_global_concurrency_mode.load(std::memory_order_relaxed), NULL);
482     __TBB_ASSERT(my_mandatory_num_requested > 0, NULL);
483 
484     a->my_global_concurrency_mode.store(false, std::memory_order_relaxed);
485     my_mandatory_num_requested--;
486 }
487 
488 void market::mandatory_concurrency_disable ( arena *a ) {
489     int delta = 0;
490     {
491         arenas_list_mutex_type::scoped_lock lock(my_arenas_list_mutex);
492         if (!a->my_global_concurrency_mode.load(std::memory_order_relaxed))
493             return;
494         // There is a racy window in advertise_new_work between mandtory concurrency enabling and
495         // setting SNAPSHOT_FULL. It gives a chance to spawn request to disable mandatory concurrency.
496         // Therefore, we double check that there is no enqueued tasks.
497         if (a->has_enqueued_tasks())
498             return;
499 
500         __TBB_ASSERT(my_num_workers_soft_limit.load(std::memory_order_relaxed) == 0, NULL);
501         disable_mandatory_concurrency_impl(a);
502 
503         delta = update_workers_request();
504     }
505     if (delta != 0)
506         my_server->adjust_job_count_estimate(delta);
507 }
508 #endif /* __TBB_ENQUEUE_ENFORCED_CONCURRENCY */
509 
510 void market::adjust_demand ( arena& a, int delta ) {
511     market::enforce([] { return theMarket != nullptr; }, "market instance was destroyed prematurely?");
512     if ( !delta )
513         return;
514     my_arenas_list_mutex.lock();
515     a.my_total_num_workers_requested += delta;
516     int target_workers = 0;
517     // Cap target_workers into interval [0, a.my_max_num_workers]
518     if (a.my_total_num_workers_requested > 0) {
519         target_workers = a.my_total_num_workers_requested < int(a.my_max_num_workers) ?
520             a.my_total_num_workers_requested : a.my_max_num_workers;
521     }
522 
523     delta = target_workers - a.my_num_workers_requested;
524 
525     if (delta == 0) {
526         my_arenas_list_mutex.unlock();
527         return;
528     }
529 
530     a.my_num_workers_requested += delta;
531     if (a.my_num_workers_requested == 0) {
532         a.my_num_workers_allotted.store(0, std::memory_order_relaxed);
533     }
534 
535     int total_demand = my_total_demand.load(std::memory_order_relaxed) + delta;
536     my_total_demand.store(total_demand, std::memory_order_relaxed);
537     my_priority_level_demand[a.my_priority_level] += delta;
538     unsigned effective_soft_limit = my_num_workers_soft_limit.load(std::memory_order_relaxed);
539     if (my_mandatory_num_requested > 0) {
540         __TBB_ASSERT(effective_soft_limit == 0, NULL);
541         effective_soft_limit = 1;
542     }
543 
544     update_allotment(effective_soft_limit);
545     if ( delta > 0 ) {
546         // can't overflow soft_limit, but remember values request by arenas in
547         // my_total_demand to not prematurely release workers to RML
548         if ( my_num_workers_requested+delta > (int)effective_soft_limit)
549             delta = effective_soft_limit - my_num_workers_requested;
550     } else {
551         // the number of workers should not be decreased below my_total_demand
552         if ( my_num_workers_requested+delta < total_demand )
553             delta = min(total_demand, (int)effective_soft_limit) - my_num_workers_requested;
554     }
555     my_num_workers_requested += delta;
556     __TBB_ASSERT( my_num_workers_requested <= (int)effective_soft_limit, NULL );
557 
558     int target_epoch = my_adjust_demand_target_epoch++;
559 
560     my_arenas_list_mutex.unlock();
561 
562     spin_wait_until_eq(my_adjust_demand_current_epoch, target_epoch);
563     // Must be called outside of any locks
564     my_server->adjust_job_count_estimate( delta );
565     my_adjust_demand_current_epoch.store(target_epoch + 1, std::memory_order_release);
566 }
567 
568 void market::process( job& j ) {
569     thread_data& td = static_cast<thread_data&>(j);
570     // td.my_arena can be dead. Don't access it until arena_in_need is called
571     arena *a = td.my_arena;
572     for (int i = 0; i < 2; ++i) {
573         while ( (a = arena_in_need(a)) ) {
574             a->process(td);
575             a = nullptr; // to avoid double checks in arena_in_need(arena*) for the same priority level
576         }
577         // Workers leave market because there is no arena in need. It can happen earlier than
578         // adjust_job_count_estimate() decreases my_slack and RML can put this thread to sleep.
579         // It might result in a busy-loop checking for my_slack<0 and calling this method instantly.
580         // the yield refines this spinning.
581         if ( !i ) {
582             yield();
583         }
584     }
585 }
586 
587 void market::cleanup( job& j) {
588     market::enforce([this] { return theMarket != this; }, NULL );
589     governor::auto_terminate(&j);
590 }
591 
592 void market::acknowledge_close_connection() {
593     destroy();
594 }
595 
596 ::rml::job* market::create_one_job() {
597     unsigned short index = ++my_first_unused_worker_idx;
598     __TBB_ASSERT( index > 0, NULL );
599     ITT_THREAD_SET_NAME(_T("TBB Worker Thread"));
600     // index serves as a hint decreasing conflicts between workers when they migrate between arenas
601     thread_data* td = new(cache_aligned_allocate(sizeof(thread_data))) thread_data{ index, true };
602     __TBB_ASSERT( index <= my_num_workers_hard_limit, NULL );
603     __TBB_ASSERT( my_workers[index - 1] == nullptr, NULL );
604     my_workers[index - 1] = td;
605     return td;
606 }
607 
608 void market::add_external_thread(thread_data& td) {
609     context_state_propagation_mutex_type::scoped_lock lock(the_context_state_propagation_mutex);
610     my_masters.push_front(td);
611 }
612 
613 void market::remove_external_thread(thread_data& td) {
614     context_state_propagation_mutex_type::scoped_lock lock(the_context_state_propagation_mutex);
615     my_masters.remove(td);
616 }
617 
618 } // namespace r1
619 } // namespace detail
620 } // namespace tbb
621