xref: /oneTBB/src/tbb/market.cpp (revision 6caecf96)
1 /*
2     Copyright (c) 2005-2021 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #include "oneapi/tbb/global_control.h" // global_control::active_value
18 
19 #include "market.h"
20 #include "main.h"
21 #include "governor.h"
22 #include "arena.h"
23 #include "thread_data.h"
24 #include "itt_notify.h"
25 
26 #include <cstring> // std::memset()
27 
28 namespace tbb {
29 namespace detail {
30 namespace r1 {
31 
32 /** This method must be invoked under my_arenas_list_mutex. **/
33 arena* market::select_next_arena( arena* hint ) {
34     unsigned next_arena_priority_level = num_priority_levels;
35     if ( hint )
36         next_arena_priority_level = hint->my_priority_level;
37     for ( unsigned idx = 0; idx < next_arena_priority_level; ++idx ) {
38         if ( !my_arenas[idx].empty() )
39             return &*my_arenas[idx].begin();
40     }
41     // don't change if arena with higher priority is not found.
42     return hint;
43 }
44 
45 void market::insert_arena_into_list ( arena& a ) {
46     __TBB_ASSERT( a.my_priority_level < num_priority_levels, nullptr );
47     my_arenas[a.my_priority_level].push_front( a );
48     __TBB_ASSERT( !my_next_arena || my_next_arena->my_priority_level < num_priority_levels, nullptr );
49     my_next_arena = select_next_arena( my_next_arena );
50 }
51 
52 void market::remove_arena_from_list ( arena& a ) {
53     __TBB_ASSERT( a.my_priority_level < num_priority_levels, nullptr );
54     my_arenas[a.my_priority_level].remove( a );
55     if ( my_next_arena == &a )
56         my_next_arena = nullptr;
57     my_next_arena = select_next_arena( my_next_arena );
58 }
59 
60 //------------------------------------------------------------------------
61 // market
62 //------------------------------------------------------------------------
63 
64 market::market ( unsigned workers_soft_limit, unsigned workers_hard_limit, std::size_t stack_size )
65     : my_num_workers_hard_limit(workers_hard_limit)
66     , my_num_workers_soft_limit(workers_soft_limit)
67     , my_next_arena(nullptr)
68     , my_ref_count(1)
69     , my_stack_size(stack_size)
70     , my_workers_soft_limit_to_report(workers_soft_limit)
71 {
72     // Once created RML server will start initializing workers that will need
73     // global market instance to get worker stack size
74     my_server = governor::create_rml_server( *this );
75     __TBB_ASSERT( my_server, "Failed to create RML server" );
76 }
77 
78 static unsigned calc_workers_soft_limit(unsigned workers_soft_limit, unsigned workers_hard_limit) {
79     if( int soft_limit = market::app_parallelism_limit() )
80         workers_soft_limit = soft_limit-1;
81     else // if user set no limits (yet), use market's parameter
82         workers_soft_limit = max( governor::default_num_threads() - 1, workers_soft_limit );
83     if( workers_soft_limit >= workers_hard_limit )
84         workers_soft_limit = workers_hard_limit-1;
85     return workers_soft_limit;
86 }
87 
88 bool market::add_ref_unsafe( global_market_mutex_type::scoped_lock& lock, bool is_public, unsigned workers_requested, std::size_t stack_size ) {
89     market *m = theMarket;
90     if( m ) {
91         ++m->my_ref_count;
92         const unsigned old_public_count = is_public ? m->my_public_ref_count++ : /*any non-zero value*/1;
93         lock.release();
94         if( old_public_count==0 )
95             set_active_num_workers( calc_workers_soft_limit(workers_requested, m->my_num_workers_hard_limit) );
96 
97         // do not warn if default number of workers is requested
98         if( workers_requested != governor::default_num_threads()-1 ) {
99             __TBB_ASSERT( skip_soft_limit_warning > workers_requested,
100                           "skip_soft_limit_warning must be larger than any valid workers_requested" );
101             unsigned soft_limit_to_report = m->my_workers_soft_limit_to_report.load(std::memory_order_relaxed);
102             if( soft_limit_to_report < workers_requested ) {
103                 runtime_warning( "The number of workers is currently limited to %u. "
104                                  "The request for %u workers is ignored. Further requests for more workers "
105                                  "will be silently ignored until the limit changes.\n",
106                                  soft_limit_to_report, workers_requested );
107                 // The race is possible when multiple threads report warnings.
108                 // We are OK with that, as there are just multiple warnings.
109                 unsigned expected_limit = soft_limit_to_report;
110                 m->my_workers_soft_limit_to_report.compare_exchange_strong(expected_limit, skip_soft_limit_warning);
111             }
112 
113         }
114         if( m->my_stack_size < stack_size )
115             runtime_warning( "Thread stack size has been already set to %u. "
116                              "The request for larger stack (%u) cannot be satisfied.\n", m->my_stack_size, stack_size );
117         return true;
118     }
119     return false;
120 }
121 
122 market& market::global_market(bool is_public, unsigned workers_requested, std::size_t stack_size) {
123     global_market_mutex_type::scoped_lock lock( theMarketMutex );
124     if( !market::add_ref_unsafe(lock, is_public, workers_requested, stack_size) ) {
125         // TODO: A lot is done under theMarketMutex locked. Can anything be moved out?
126         if( stack_size == 0 )
127             stack_size = global_control::active_value(global_control::thread_stack_size);
128         // Expecting that 4P is suitable for most applications.
129         // Limit to 2P for large thread number.
130         // TODO: ask RML for max concurrency and possibly correct hard_limit
131         const unsigned factor = governor::default_num_threads()<=128? 4 : 2;
132         // The requested number of threads is intentionally not considered in
133         // computation of the hard limit, in order to separate responsibilities
134         // and avoid complicated interactions between global_control and task_scheduler_init.
135         // The market guarantees that at least 256 threads might be created.
136         const unsigned workers_hard_limit = max(max(factor*governor::default_num_threads(), 256u), app_parallelism_limit());
137         const unsigned workers_soft_limit = calc_workers_soft_limit(workers_requested, workers_hard_limit);
138         // Create the global market instance
139         std::size_t size = sizeof(market);
140         __TBB_ASSERT( __TBB_offsetof(market, my_workers) + sizeof(thread_data*) == sizeof(market),
141                       "my_workers must be the last data field of the market class");
142         size += sizeof(thread_data*) * (workers_hard_limit - 1);
143         __TBB_InitOnce::add_ref();
144         void* storage = cache_aligned_allocate(size);
145         std::memset( storage, 0, size );
146         // Initialize and publish global market
147         market* m = new (storage) market( workers_soft_limit, workers_hard_limit, stack_size );
148         if( is_public )
149             m->my_public_ref_count.store(1, std::memory_order_relaxed);
150 #if __TBB_SUPPORTS_WORKERS_WAITING_IN_TERMINATE
151         if (market::is_lifetime_control_present()) {
152             ++m->my_public_ref_count;
153             ++m->my_ref_count;
154         }
155 #endif // __TBB_SUPPORTS_WORKERS_WAITING_IN_TERMINATE
156         theMarket = m;
157         // This check relies on the fact that for shared RML default_concurrency==max_concurrency
158         if ( !governor::UsePrivateRML && m->my_server->default_concurrency() < workers_soft_limit )
159             runtime_warning( "RML might limit the number of workers to %u while %u is requested.\n"
160                     , m->my_server->default_concurrency(), workers_soft_limit );
161     }
162     return *theMarket;
163 }
164 
165 void market::destroy () {
166     this->market::~market(); // qualified to suppress warning
167     cache_aligned_deallocate( this );
168     __TBB_InitOnce::remove_ref();
169 }
170 
171 bool market::release ( bool is_public, bool blocking_terminate ) {
172     market::enforce([this] { return theMarket == this; }, "Global market instance was destroyed prematurely?");
173     bool do_release = false;
174     {
175         global_market_mutex_type::scoped_lock lock( theMarketMutex );
176         if ( blocking_terminate ) {
177             __TBB_ASSERT( is_public, "Only an object with a public reference can request the blocking terminate" );
178             while ( my_public_ref_count.load(std::memory_order_relaxed) == 1 &&
179                     my_ref_count.load(std::memory_order_relaxed) > 1 ) {
180                 lock.release();
181                 // To guarantee that request_close_connection() is called by the last external thread, we need to wait till all
182                 // references are released. Re-read my_public_ref_count to limit waiting if new external threads are created.
183                 // Theoretically, new private references to the market can be added during waiting making it potentially
184                 // endless.
185                 // TODO: revise why the weak scheduler needs market's pointer and try to remove this wait.
186                 // Note that the market should know about its schedulers for cancellation/exception/priority propagation,
187                 // see e.g. task_group_context::cancel_group_execution()
188                 while ( my_public_ref_count.load(std::memory_order_acquire) == 1 &&
189                         my_ref_count.load(std::memory_order_acquire) > 1 ) {
190                     yield();
191                 }
192                 lock.acquire( theMarketMutex );
193             }
194         }
195         if ( is_public ) {
196             __TBB_ASSERT( theMarket == this, "Global market instance was destroyed prematurely?" );
197             __TBB_ASSERT( my_public_ref_count.load(std::memory_order_relaxed), NULL );
198             --my_public_ref_count;
199         }
200         if ( --my_ref_count == 0 ) {
201             __TBB_ASSERT( !my_public_ref_count.load(std::memory_order_relaxed), NULL );
202             do_release = true;
203             theMarket = NULL;
204         }
205     }
206     if( do_release ) {
207         __TBB_ASSERT( !my_public_ref_count.load(std::memory_order_relaxed),
208             "No public references remain if we remove the market." );
209         // inform RML that blocking termination is required
210         my_join_workers = blocking_terminate;
211         my_server->request_close_connection();
212         return blocking_terminate;
213     }
214     return false;
215 }
216 
217 int market::update_workers_request() {
218     int old_request = my_num_workers_requested;
219     my_num_workers_requested = min(my_total_demand.load(std::memory_order_relaxed),
220                                    (int)my_num_workers_soft_limit.load(std::memory_order_relaxed));
221 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
222     if (my_mandatory_num_requested > 0) {
223         __TBB_ASSERT(my_num_workers_soft_limit.load(std::memory_order_relaxed) == 0, NULL);
224         my_num_workers_requested = 1;
225     }
226 #endif
227     update_allotment(my_num_workers_requested);
228     return my_num_workers_requested - old_request;
229 }
230 
231 void market::set_active_num_workers ( unsigned soft_limit ) {
232     market *m;
233 
234     {
235         global_market_mutex_type::scoped_lock lock( theMarketMutex );
236         if ( !theMarket )
237             return; // actual value will be used at market creation
238         m = theMarket;
239         if (m->my_num_workers_soft_limit.load(std::memory_order_relaxed) == soft_limit)
240             return;
241         ++m->my_ref_count;
242     }
243     // have my_ref_count for market, use it safely
244 
245     int delta = 0;
246     {
247         arenas_list_mutex_type::scoped_lock lock( m->my_arenas_list_mutex );
248         __TBB_ASSERT(soft_limit <= m->my_num_workers_hard_limit, NULL);
249 
250 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
251         arena_list_type* arenas = m->my_arenas;
252 
253         if (m->my_num_workers_soft_limit.load(std::memory_order_relaxed) == 0 &&
254             m->my_mandatory_num_requested > 0)
255         {
256             for (unsigned level = 0; level < num_priority_levels; ++level )
257                 for (arena_list_type::iterator it = arenas[level].begin(); it != arenas[level].end(); ++it)
258                     if (it->my_global_concurrency_mode.load(std::memory_order_relaxed))
259                         m->disable_mandatory_concurrency_impl(&*it);
260         }
261         __TBB_ASSERT(m->my_mandatory_num_requested == 0, NULL);
262 #endif
263 
264         m->my_num_workers_soft_limit.store(soft_limit, std::memory_order_release);
265         // report only once after new soft limit value is set
266         m->my_workers_soft_limit_to_report.store(soft_limit, std::memory_order_relaxed);
267 
268 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
269         if (m->my_num_workers_soft_limit.load(std::memory_order_relaxed) == 0) {
270             for (unsigned level = 0; level < num_priority_levels; ++level )
271                 for (arena_list_type::iterator it = arenas[level].begin(); it != arenas[level].end(); ++it)
272                     if (it->has_enqueued_tasks())
273                         m->enable_mandatory_concurrency_impl(&*it);
274         }
275 #endif
276 
277         delta = m->update_workers_request();
278     }
279     // adjust_job_count_estimate must be called outside of any locks
280     if( delta!=0 )
281         m->my_server->adjust_job_count_estimate( delta );
282     // release internal market reference to match ++m->my_ref_count above
283     m->release( /*is_public=*/false, /*blocking_terminate=*/false );
284 }
285 
286 bool governor::does_client_join_workers (const rml::tbb_client &client) {
287     return ((const market&)client).must_join_workers();
288 }
289 
290 arena* market::create_arena ( int num_slots, int num_reserved_slots, unsigned arena_priority_level,
291                               std::size_t stack_size )
292 {
293     __TBB_ASSERT( num_slots > 0, NULL );
294     __TBB_ASSERT( num_reserved_slots <= num_slots, NULL );
295     // Add public market reference for an external thread/task_arena (that adds an internal reference in exchange).
296     market &m = global_market( /*is_public=*/true, num_slots-num_reserved_slots, stack_size );
297     arena& a = arena::allocate_arena( m, num_slots, num_reserved_slots, arena_priority_level );
298     // Add newly created arena into the existing market's list.
299     arenas_list_mutex_type::scoped_lock lock(m.my_arenas_list_mutex);
300     m.insert_arena_into_list(a);
301     return &a;
302 }
303 
304 /** This method must be invoked under my_arenas_list_mutex. **/
305 void market::detach_arena ( arena& a ) {
306     market::enforce([this] { return theMarket == this; }, "Global market instance was destroyed prematurely?");
307     __TBB_ASSERT( !a.my_slots[0].is_occupied(), NULL );
308     if (a.my_global_concurrency_mode.load(std::memory_order_relaxed))
309         disable_mandatory_concurrency_impl(&a);
310 
311     remove_arena_from_list(a);
312     if (a.my_aba_epoch == my_arenas_aba_epoch.load(std::memory_order_relaxed)) {
313         my_arenas_aba_epoch.store(my_arenas_aba_epoch.load(std::memory_order_relaxed) + 1, std::memory_order_relaxed);
314     }
315 }
316 
317 void market::try_destroy_arena ( arena* a, uintptr_t aba_epoch, unsigned priority_level ) {
318     bool locked = true;
319     __TBB_ASSERT( a, NULL );
320     // we hold reference to the market, so it cannot be destroyed at any moment here
321     market::enforce([this] { return theMarket == this; }, NULL);
322     __TBB_ASSERT( my_ref_count!=0, NULL );
323     my_arenas_list_mutex.lock();
324         arena_list_type::iterator it = my_arenas[priority_level].begin();
325         for ( ; it != my_arenas[priority_level].end(); ++it ) {
326             if ( a == &*it ) {
327                 if ( it->my_aba_epoch == aba_epoch ) {
328                     // Arena is alive
329                     if ( !a->my_num_workers_requested && !a->my_references.load(std::memory_order_relaxed) ) {
330                         __TBB_ASSERT(
331                             !a->my_num_workers_allotted.load(std::memory_order_relaxed) &&
332                             (a->my_pool_state == arena::SNAPSHOT_EMPTY || !a->my_max_num_workers),
333                             "Inconsistent arena state"
334                         );
335                         // Arena is abandoned. Destroy it.
336                         detach_arena( *a );
337                         my_arenas_list_mutex.unlock();
338                         locked = false;
339                         a->free_arena();
340                     }
341                 }
342                 if (locked)
343                     my_arenas_list_mutex.unlock();
344                 return;
345             }
346         }
347     my_arenas_list_mutex.unlock();
348 }
349 
350 /** This method must be invoked under my_arenas_list_mutex. **/
351 arena* market::arena_in_need ( arena_list_type* arenas, arena* hint ) {
352     // TODO: make sure arena with higher priority returned only if there are available slots in it.
353     hint = select_next_arena( hint );
354     if ( !hint )
355         return nullptr;
356     arena_list_type::iterator it = hint;
357     unsigned curr_priority_level = hint->my_priority_level;
358     __TBB_ASSERT( it != arenas[curr_priority_level].end(), nullptr );
359     do {
360         arena& a = *it;
361         if ( ++it == arenas[curr_priority_level].end() ) {
362             do {
363                 ++curr_priority_level %= num_priority_levels;
364             } while ( arenas[curr_priority_level].empty() );
365             it = arenas[curr_priority_level].begin();
366         }
367         if( a.num_workers_active() < a.my_num_workers_allotted.load(std::memory_order_relaxed) ) {
368             a.my_references += arena::ref_worker;
369             return &a;
370         }
371     } while ( it != hint );
372     return nullptr;
373 }
374 
375 arena* market::arena_in_need(arena* prev) {
376     if (my_total_demand.load(std::memory_order_acquire) <= 0)
377         return nullptr;
378     arenas_list_mutex_type::scoped_lock lock(my_arenas_list_mutex, /*is_writer=*/false);
379     // TODO: introduce three state response: alive, not_alive, no_market_arenas
380     if ( is_arena_alive(prev) )
381         return arena_in_need(my_arenas, prev);
382     return arena_in_need(my_arenas, my_next_arena);
383 }
384 
385 int market::update_allotment ( arena_list_type* arenas, int workers_demand, int max_workers ) {
386     __TBB_ASSERT( workers_demand > 0, nullptr );
387     max_workers = min(workers_demand, max_workers);
388     int unassigned_workers = max_workers;
389     int assigned = 0;
390     int carry = 0;
391     unsigned max_priority_level = num_priority_levels;
392     for (unsigned list_idx = 0; list_idx < num_priority_levels; ++list_idx ) {
393         int assigned_per_priority = min(my_priority_level_demand[list_idx], unassigned_workers);
394         unassigned_workers -= assigned_per_priority;
395         for (arena_list_type::iterator it = arenas[list_idx].begin(); it != arenas[list_idx].end(); ++it) {
396             arena& a = *it;
397             __TBB_ASSERT(a.my_num_workers_requested >= 0, nullptr);
398             __TBB_ASSERT(a.my_num_workers_requested <= int(a.my_max_num_workers)
399                 || (a.my_max_num_workers == 0 && a.my_local_concurrency_requests > 0 && a.my_num_workers_requested == 1), nullptr);
400             if (a.my_num_workers_requested == 0) {
401                 __TBB_ASSERT(!a.my_num_workers_allotted.load(std::memory_order_relaxed), nullptr);
402                 continue;
403             }
404 
405             if (max_priority_level == num_priority_levels) {
406                 max_priority_level = list_idx;
407             }
408 
409             int allotted = 0;
410 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
411             if (my_num_workers_soft_limit.load(std::memory_order_relaxed) == 0) {
412                 __TBB_ASSERT(max_workers == 0 || max_workers == 1, nullptr);
413                 allotted = a.my_global_concurrency_mode.load(std::memory_order_relaxed) &&
414                     assigned < max_workers ? 1 : 0;
415             } else
416 #endif
417             {
418                 int tmp = a.my_num_workers_requested * assigned_per_priority + carry;
419                 allotted = tmp / my_priority_level_demand[list_idx];
420                 carry = tmp % my_priority_level_demand[list_idx];
421                 __TBB_ASSERT(allotted <= a.my_num_workers_requested, nullptr);
422                 __TBB_ASSERT(allotted <= int(a.my_num_slots - a.my_num_reserved_slots), nullptr);
423             }
424             a.my_num_workers_allotted.store(allotted, std::memory_order_relaxed);
425             a.my_is_top_priority.store(list_idx == max_priority_level, std::memory_order_relaxed);
426             assigned += allotted;
427         }
428     }
429     __TBB_ASSERT( 0 <= assigned && assigned <= max_workers, nullptr );
430     return assigned;
431 }
432 
433 /** This method must be invoked under my_arenas_list_mutex. **/
434 bool market::is_arena_in_list( arena_list_type &arenas, arena *a ) {
435     __TBB_ASSERT( a, "Expected non-null pointer to arena." );
436     for ( arena_list_type::iterator it = arenas.begin(); it != arenas.end(); ++it )
437         if ( a == &*it )
438             return true;
439     return false;
440 }
441 
442 /** This method must be invoked under my_arenas_list_mutex. **/
443 bool market::is_arena_alive(arena* a) {
444     if ( !a )
445         return false;
446 
447     // Still cannot access internals of the arena since the object itself might be destroyed.
448 
449     for ( unsigned idx = 0; idx < num_priority_levels; ++idx ) {
450         if ( is_arena_in_list( my_arenas[idx], a ) )
451             return true;
452     }
453     return false;
454 }
455 
456 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
457 void market::enable_mandatory_concurrency_impl ( arena *a ) {
458     __TBB_ASSERT(!a->my_global_concurrency_mode.load(std::memory_order_relaxed), NULL);
459     __TBB_ASSERT(my_num_workers_soft_limit.load(std::memory_order_relaxed) == 0, NULL);
460 
461     a->my_global_concurrency_mode.store(true, std::memory_order_relaxed);
462     my_mandatory_num_requested++;
463 }
464 
465 void market::enable_mandatory_concurrency ( arena *a ) {
466     int delta = 0;
467     {
468         arenas_list_mutex_type::scoped_lock lock(my_arenas_list_mutex);
469         if (my_num_workers_soft_limit.load(std::memory_order_relaxed) != 0 ||
470             a->my_global_concurrency_mode.load(std::memory_order_relaxed))
471             return;
472 
473         enable_mandatory_concurrency_impl(a);
474         delta = update_workers_request();
475     }
476 
477     if (delta != 0)
478         my_server->adjust_job_count_estimate(delta);
479 }
480 
481 void market::disable_mandatory_concurrency_impl(arena* a) {
482     __TBB_ASSERT(a->my_global_concurrency_mode.load(std::memory_order_relaxed), NULL);
483     __TBB_ASSERT(my_mandatory_num_requested > 0, NULL);
484 
485     a->my_global_concurrency_mode.store(false, std::memory_order_relaxed);
486     my_mandatory_num_requested--;
487 }
488 
489 void market::mandatory_concurrency_disable ( arena *a ) {
490     int delta = 0;
491     {
492         arenas_list_mutex_type::scoped_lock lock(my_arenas_list_mutex);
493         if (!a->my_global_concurrency_mode.load(std::memory_order_relaxed))
494             return;
495         // There is a racy window in advertise_new_work between mandtory concurrency enabling and
496         // setting SNAPSHOT_FULL. It gives a chance to spawn request to disable mandatory concurrency.
497         // Therefore, we double check that there is no enqueued tasks.
498         if (a->has_enqueued_tasks())
499             return;
500 
501         __TBB_ASSERT(my_num_workers_soft_limit.load(std::memory_order_relaxed) == 0, NULL);
502         disable_mandatory_concurrency_impl(a);
503 
504         delta = update_workers_request();
505     }
506     if (delta != 0)
507         my_server->adjust_job_count_estimate(delta);
508 }
509 #endif /* __TBB_ENQUEUE_ENFORCED_CONCURRENCY */
510 
511 void market::adjust_demand ( arena& a, int delta, bool mandatory ) {
512     if (!delta) {
513         return;
514     }
515     int target_epoch{};
516     {
517         arenas_list_mutex_type::scoped_lock lock(my_arenas_list_mutex);
518         __TBB_ASSERT(theMarket != nullptr, "market instance was destroyed prematurely?");
519 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
520         if (mandatory) {
521             __TBB_ASSERT(delta == 1 || delta == -1, nullptr);
522             // Count the number of mandatory requests and proceed only for 0->1 and 1->0 transitions.
523             a.my_local_concurrency_requests += delta;
524             if ((delta > 0 && a.my_local_concurrency_requests != 1) ||
525                 (delta < 0 && a.my_local_concurrency_requests != 0))
526             {
527                 return;
528             }
529         }
530 #endif
531         a.my_total_num_workers_requested += delta;
532         int target_workers = 0;
533         // Cap target_workers into interval [0, a.my_max_num_workers]
534         if (a.my_total_num_workers_requested > 0) {
535 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
536             // At least one thread should be requested when mandatory concurrency
537             int max_num_workers = int(a.my_max_num_workers);
538             if (a.my_local_concurrency_requests > 0 && max_num_workers == 0) {
539                 max_num_workers = 1;
540             }
541 #endif
542             target_workers = min(a.my_total_num_workers_requested, max_num_workers);
543         }
544 
545         delta = target_workers - a.my_num_workers_requested;
546 
547         if (delta == 0) {
548             return;
549         }
550 
551         a.my_num_workers_requested += delta;
552         if (a.my_num_workers_requested == 0) {
553             a.my_num_workers_allotted.store(0, std::memory_order_relaxed);
554         }
555 
556         int total_demand = my_total_demand.load(std::memory_order_relaxed) + delta;
557         my_total_demand.store(total_demand, std::memory_order_relaxed);
558         my_priority_level_demand[a.my_priority_level] += delta;
559         unsigned effective_soft_limit = my_num_workers_soft_limit.load(std::memory_order_relaxed);
560         if (my_mandatory_num_requested > 0) {
561             __TBB_ASSERT(effective_soft_limit == 0, NULL);
562             effective_soft_limit = 1;
563         }
564 
565         update_allotment(effective_soft_limit);
566         if (delta > 0) {
567             // can't overflow soft_limit, but remember values request by arenas in
568             // my_total_demand to not prematurely release workers to RML
569             if (my_num_workers_requested + delta > (int)effective_soft_limit)
570                 delta = effective_soft_limit - my_num_workers_requested;
571         }
572         else {
573             // the number of workers should not be decreased below my_total_demand
574             if (my_num_workers_requested + delta < total_demand)
575                 delta = min(total_demand, (int)effective_soft_limit) - my_num_workers_requested;
576         }
577         my_num_workers_requested += delta;
578         __TBB_ASSERT(my_num_workers_requested <= (int)effective_soft_limit, NULL);
579 
580         target_epoch = my_adjust_demand_target_epoch++;
581     }
582 
583     my_adjust_demand_current_epoch.wait_until(target_epoch, /* context = */ target_epoch, std::memory_order_acquire);
584     // Must be called outside of any locks
585     my_server->adjust_job_count_estimate( delta );
586     my_adjust_demand_current_epoch.exchange(target_epoch + 1);
587     my_adjust_demand_current_epoch.notify_relaxed(target_epoch + 1);
588 }
589 
590 void market::process( job& j ) {
591     thread_data& td = static_cast<thread_data&>(j);
592     // td.my_arena can be dead. Don't access it until arena_in_need is called
593     arena *a = td.my_arena;
594     for (int i = 0; i < 2; ++i) {
595         while ( (a = arena_in_need(a)) ) {
596             a->process(td);
597         }
598         // Workers leave market because there is no arena in need. It can happen earlier than
599         // adjust_job_count_estimate() decreases my_slack and RML can put this thread to sleep.
600         // It might result in a busy-loop checking for my_slack<0 and calling this method instantly.
601         // the yield refines this spinning.
602         if ( !i ) {
603             yield();
604         }
605     }
606 }
607 
608 void market::cleanup( job& j) {
609     market::enforce([this] { return theMarket != this; }, NULL );
610     governor::auto_terminate(&j);
611 }
612 
613 void market::acknowledge_close_connection() {
614     destroy();
615 }
616 
617 ::rml::job* market::create_one_job() {
618     unsigned short index = ++my_first_unused_worker_idx;
619     __TBB_ASSERT( index > 0, NULL );
620     ITT_THREAD_SET_NAME(_T("TBB Worker Thread"));
621     // index serves as a hint decreasing conflicts between workers when they migrate between arenas
622     thread_data* td = new(cache_aligned_allocate(sizeof(thread_data))) thread_data{ index, true };
623     __TBB_ASSERT( index <= my_num_workers_hard_limit, NULL );
624     __TBB_ASSERT( my_workers[index - 1] == nullptr, NULL );
625     my_workers[index - 1] = td;
626     return td;
627 }
628 
629 void market::add_external_thread(thread_data& td) {
630     context_state_propagation_mutex_type::scoped_lock lock(the_context_state_propagation_mutex);
631     my_masters.push_front(td);
632 }
633 
634 void market::remove_external_thread(thread_data& td) {
635     context_state_propagation_mutex_type::scoped_lock lock(the_context_state_propagation_mutex);
636     my_masters.remove(td);
637 }
638 
639 } // namespace r1
640 } // namespace detail
641 } // namespace tbb
642