1 /* 2 * Copyright (c) 2000-2020 Apple Inc. All rights reserved. 3 * 4 * @APPLE_OSREFERENCE_LICENSE_HEADER_START@ 5 * 6 * This file contains Original Code and/or Modifications of Original Code 7 * as defined in and that are subject to the Apple Public Source License 8 * Version 2.0 (the 'License'). You may not use this file except in 9 * compliance with the License. The rights granted to you under the License 10 * may not be used to create, or enable the creation or redistribution of, 11 * unlawful or unlicensed copies of an Apple operating system, or to 12 * circumvent, violate, or enable the circumvention or violation of, any 13 * terms of an Apple operating system software license agreement. 14 * 15 * Please obtain a copy of the License at 16 * http://www.opensource.apple.com/apsl/ and read it before using this file. 17 * 18 * The Original Code and all software distributed under the License are 19 * distributed on an 'AS IS' basis, WITHOUT WARRANTY OF ANY KIND, EITHER 20 * EXPRESS OR IMPLIED, AND APPLE HEREBY DISCLAIMS ALL SUCH WARRANTIES, 21 * INCLUDING WITHOUT LIMITATION, ANY WARRANTIES OF MERCHANTABILITY, 22 * FITNESS FOR A PARTICULAR PURPOSE, QUIET ENJOYMENT OR NON-INFRINGEMENT. 23 * Please see the License for the specific language governing rights and 24 * limitations under the License. 25 * 26 * @APPLE_OSREFERENCE_LICENSE_HEADER_END@ 27 */ 28 /* Copyright (c) 1995-2018 Apple, Inc. All Rights Reserved */ 29 30 #include <sys/cdefs.h> 31 32 #include <kern/assert.h> 33 #include <kern/ast.h> 34 #include <kern/clock.h> 35 #include <kern/cpu_data.h> 36 #include <kern/kern_types.h> 37 #include <kern/policy_internal.h> 38 #include <kern/processor.h> 39 #include <kern/sched_prim.h> /* for thread_exception_return */ 40 #include <kern/task.h> 41 #include <kern/thread.h> 42 #include <kern/zalloc.h> 43 #include <mach/kern_return.h> 44 #include <mach/mach_param.h> 45 #include <mach/mach_port.h> 46 #include <mach/mach_types.h> 47 #include <mach/mach_vm.h> 48 #include <mach/sync_policy.h> 49 #include <mach/task.h> 50 #include <mach/thread_act.h> /* for thread_resume */ 51 #include <mach/thread_policy.h> 52 #include <mach/thread_status.h> 53 #include <mach/vm_prot.h> 54 #include <mach/vm_statistics.h> 55 #include <machine/atomic.h> 56 #include <machine/machine_routines.h> 57 #include <vm/vm_map.h> 58 #include <vm/vm_protos.h> 59 60 #include <sys/eventvar.h> 61 #include <sys/kdebug.h> 62 #include <sys/kernel.h> 63 #include <sys/lock.h> 64 #include <sys/param.h> 65 #include <sys/proc_info.h> /* for fill_procworkqueue */ 66 #include <sys/proc_internal.h> 67 #include <sys/pthread_shims.h> 68 #include <sys/resourcevar.h> 69 #include <sys/signalvar.h> 70 #include <sys/sysctl.h> 71 #include <sys/sysproto.h> 72 #include <sys/systm.h> 73 #include <sys/ulock.h> /* for ulock_owner_value_to_port_name */ 74 75 #include <pthread/bsdthread_private.h> 76 #include <pthread/workqueue_syscalls.h> 77 #include <pthread/workqueue_internal.h> 78 #include <pthread/workqueue_trace.h> 79 80 #include <os/log.h> 81 82 static void workq_unpark_continue(void *uth, wait_result_t wr) __dead2; 83 static void workq_schedule_creator(proc_t p, struct workqueue *wq, 84 workq_kern_threadreq_flags_t flags); 85 86 static bool workq_threadreq_admissible(struct workqueue *wq, struct uthread *uth, 87 workq_threadreq_t req); 88 89 static uint32_t workq_constrained_allowance(struct workqueue *wq, 90 thread_qos_t at_qos, struct uthread *uth, bool may_start_timer); 91 92 static bool workq_thread_is_busy(uint64_t cur_ts, 93 _Atomic uint64_t *lastblocked_tsp); 94 95 static int workq_sysctl_handle_usecs SYSCTL_HANDLER_ARGS; 96 97 #pragma mark globals 98 99 struct workq_usec_var { 100 uint32_t usecs; 101 uint64_t abstime; 102 }; 103 104 #define WORKQ_SYSCTL_USECS(var, init) \ 105 static struct workq_usec_var var = { .usecs = init }; \ 106 SYSCTL_OID(_kern, OID_AUTO, var##_usecs, \ 107 CTLTYPE_INT | CTLFLAG_RW | CTLFLAG_LOCKED, &var, 0, \ 108 workq_sysctl_handle_usecs, "I", "") 109 110 static LCK_GRP_DECLARE(workq_lck_grp, "workq"); 111 os_refgrp_decl(static, workq_refgrp, "workq", NULL); 112 113 static ZONE_DECLARE(workq_zone_workqueue, "workq.wq", 114 sizeof(struct workqueue), ZC_NONE); 115 static ZONE_DECLARE(workq_zone_threadreq, "workq.threadreq", 116 sizeof(struct workq_threadreq_s), ZC_CACHING); 117 118 static struct mpsc_daemon_queue workq_deallocate_queue; 119 120 WORKQ_SYSCTL_USECS(wq_stalled_window, WQ_STALLED_WINDOW_USECS); 121 WORKQ_SYSCTL_USECS(wq_reduce_pool_window, WQ_REDUCE_POOL_WINDOW_USECS); 122 WORKQ_SYSCTL_USECS(wq_max_timer_interval, WQ_MAX_TIMER_INTERVAL_USECS); 123 static uint32_t wq_max_threads = WORKQUEUE_MAXTHREADS; 124 static uint32_t wq_max_constrained_threads = WORKQUEUE_MAXTHREADS / 8; 125 static uint32_t wq_init_constrained_limit = 1; 126 static uint16_t wq_death_max_load; 127 static uint32_t wq_max_parallelism[WORKQ_NUM_QOS_BUCKETS]; 128 129 #pragma mark sysctls 130 131 static int 132 workq_sysctl_handle_usecs SYSCTL_HANDLER_ARGS 133 { 134 #pragma unused(arg2) 135 struct workq_usec_var *v = arg1; 136 int error = sysctl_handle_int(oidp, &v->usecs, 0, req); 137 if (error || !req->newptr) { 138 return error; 139 } 140 clock_interval_to_absolutetime_interval(v->usecs, NSEC_PER_USEC, 141 &v->abstime); 142 return 0; 143 } 144 145 SYSCTL_INT(_kern, OID_AUTO, wq_max_threads, CTLFLAG_RW | CTLFLAG_LOCKED, 146 &wq_max_threads, 0, ""); 147 148 SYSCTL_INT(_kern, OID_AUTO, wq_max_constrained_threads, CTLFLAG_RW | CTLFLAG_LOCKED, 149 &wq_max_constrained_threads, 0, ""); 150 151 #pragma mark p_wqptr 152 153 #define WQPTR_IS_INITING_VALUE ((struct workqueue *)~(uintptr_t)0) 154 155 static struct workqueue * 156 proc_get_wqptr_fast(struct proc *p) 157 { 158 return os_atomic_load(&p->p_wqptr, relaxed); 159 } 160 161 static struct workqueue * 162 proc_get_wqptr(struct proc *p) 163 { 164 struct workqueue *wq = proc_get_wqptr_fast(p); 165 return wq == WQPTR_IS_INITING_VALUE ? NULL : wq; 166 } 167 168 static void 169 proc_set_wqptr(struct proc *p, struct workqueue *wq) 170 { 171 wq = os_atomic_xchg(&p->p_wqptr, wq, release); 172 if (wq == WQPTR_IS_INITING_VALUE) { 173 proc_lock(p); 174 thread_wakeup(&p->p_wqptr); 175 proc_unlock(p); 176 } 177 } 178 179 static bool 180 proc_init_wqptr_or_wait(struct proc *p) 181 { 182 struct workqueue *wq; 183 184 proc_lock(p); 185 wq = os_atomic_load(&p->p_wqptr, relaxed); 186 187 if (wq == NULL) { 188 os_atomic_store(&p->p_wqptr, WQPTR_IS_INITING_VALUE, relaxed); 189 proc_unlock(p); 190 return true; 191 } 192 193 if (wq == WQPTR_IS_INITING_VALUE) { 194 assert_wait(&p->p_wqptr, THREAD_UNINT); 195 proc_unlock(p); 196 thread_block(THREAD_CONTINUE_NULL); 197 } else { 198 proc_unlock(p); 199 } 200 return false; 201 } 202 203 static inline event_t 204 workq_parked_wait_event(struct uthread *uth) 205 { 206 return (event_t)&uth->uu_workq_stackaddr; 207 } 208 209 static inline void 210 workq_thread_wakeup(struct uthread *uth) 211 { 212 thread_wakeup_thread(workq_parked_wait_event(uth), uth->uu_thread); 213 } 214 215 #pragma mark wq_thactive 216 217 #if defined(__LP64__) 218 // Layout is: 219 // 127 - 115 : 13 bits of zeroes 220 // 114 - 112 : best QoS among all pending constrained requests 221 // 111 - 0 : MGR, AUI, UI, IN, DF, UT, BG+MT buckets every 16 bits 222 #define WQ_THACTIVE_BUCKET_WIDTH 16 223 #define WQ_THACTIVE_QOS_SHIFT (7 * WQ_THACTIVE_BUCKET_WIDTH) 224 #else 225 // Layout is: 226 // 63 - 61 : best QoS among all pending constrained requests 227 // 60 : Manager bucket (0 or 1) 228 // 59 - 0 : AUI, UI, IN, DF, UT, BG+MT buckets every 10 bits 229 #define WQ_THACTIVE_BUCKET_WIDTH 10 230 #define WQ_THACTIVE_QOS_SHIFT (6 * WQ_THACTIVE_BUCKET_WIDTH + 1) 231 #endif 232 #define WQ_THACTIVE_BUCKET_MASK ((1U << WQ_THACTIVE_BUCKET_WIDTH) - 1) 233 #define WQ_THACTIVE_BUCKET_HALF (1U << (WQ_THACTIVE_BUCKET_WIDTH - 1)) 234 235 static_assert(sizeof(wq_thactive_t) * CHAR_BIT - WQ_THACTIVE_QOS_SHIFT >= 3, 236 "Make sure we have space to encode a QoS"); 237 238 static inline wq_thactive_t 239 _wq_thactive(struct workqueue *wq) 240 { 241 return os_atomic_load_wide(&wq->wq_thactive, relaxed); 242 } 243 244 static inline int 245 _wq_bucket(thread_qos_t qos) 246 { 247 // Map both BG and MT to the same bucket by over-shifting down and 248 // clamping MT and BG together. 249 switch (qos) { 250 case THREAD_QOS_MAINTENANCE: 251 return 0; 252 default: 253 return qos - 2; 254 } 255 } 256 257 #define WQ_THACTIVE_BEST_CONSTRAINED_REQ_QOS(tha) \ 258 ((thread_qos_t)((tha) >> WQ_THACTIVE_QOS_SHIFT)) 259 260 static inline thread_qos_t 261 _wq_thactive_best_constrained_req_qos(struct workqueue *wq) 262 { 263 // Avoid expensive atomic operations: the three bits we're loading are in 264 // a single byte, and always updated under the workqueue lock 265 wq_thactive_t v = *(wq_thactive_t *)&wq->wq_thactive; 266 return WQ_THACTIVE_BEST_CONSTRAINED_REQ_QOS(v); 267 } 268 269 static void 270 _wq_thactive_refresh_best_constrained_req_qos(struct workqueue *wq) 271 { 272 thread_qos_t old_qos, new_qos; 273 workq_threadreq_t req; 274 275 req = priority_queue_max(&wq->wq_constrained_queue, 276 struct workq_threadreq_s, tr_entry); 277 new_qos = req ? req->tr_qos : THREAD_QOS_UNSPECIFIED; 278 old_qos = _wq_thactive_best_constrained_req_qos(wq); 279 if (old_qos != new_qos) { 280 long delta = (long)new_qos - (long)old_qos; 281 wq_thactive_t v = (wq_thactive_t)delta << WQ_THACTIVE_QOS_SHIFT; 282 /* 283 * We can do an atomic add relative to the initial load because updates 284 * to this qos are always serialized under the workqueue lock. 285 */ 286 v = os_atomic_add(&wq->wq_thactive, v, relaxed); 287 #ifdef __LP64__ 288 WQ_TRACE_WQ(TRACE_wq_thactive_update, wq, (uint64_t)v, 289 (uint64_t)(v >> 64), 0, 0); 290 #else 291 WQ_TRACE_WQ(TRACE_wq_thactive_update, wq, v, 0, 0, 0); 292 #endif 293 } 294 } 295 296 static inline wq_thactive_t 297 _wq_thactive_offset_for_qos(thread_qos_t qos) 298 { 299 return (wq_thactive_t)1 << (_wq_bucket(qos) * WQ_THACTIVE_BUCKET_WIDTH); 300 } 301 302 static inline wq_thactive_t 303 _wq_thactive_inc(struct workqueue *wq, thread_qos_t qos) 304 { 305 wq_thactive_t v = _wq_thactive_offset_for_qos(qos); 306 return os_atomic_add_orig(&wq->wq_thactive, v, relaxed); 307 } 308 309 static inline wq_thactive_t 310 _wq_thactive_dec(struct workqueue *wq, thread_qos_t qos) 311 { 312 wq_thactive_t v = _wq_thactive_offset_for_qos(qos); 313 return os_atomic_sub_orig(&wq->wq_thactive, v, relaxed); 314 } 315 316 static inline void 317 _wq_thactive_move(struct workqueue *wq, 318 thread_qos_t old_qos, thread_qos_t new_qos) 319 { 320 wq_thactive_t v = _wq_thactive_offset_for_qos(new_qos) - 321 _wq_thactive_offset_for_qos(old_qos); 322 os_atomic_add(&wq->wq_thactive, v, relaxed); 323 wq->wq_thscheduled_count[_wq_bucket(old_qos)]--; 324 wq->wq_thscheduled_count[_wq_bucket(new_qos)]++; 325 } 326 327 static inline uint32_t 328 _wq_thactive_aggregate_downto_qos(struct workqueue *wq, wq_thactive_t v, 329 thread_qos_t qos, uint32_t *busycount, uint32_t *max_busycount) 330 { 331 uint32_t count = 0, active; 332 uint64_t curtime; 333 334 assert(WORKQ_THREAD_QOS_MIN <= qos && qos <= WORKQ_THREAD_QOS_MAX); 335 336 if (busycount) { 337 curtime = mach_absolute_time(); 338 *busycount = 0; 339 } 340 if (max_busycount) { 341 *max_busycount = THREAD_QOS_LAST - qos; 342 } 343 344 int i = _wq_bucket(qos); 345 v >>= i * WQ_THACTIVE_BUCKET_WIDTH; 346 for (; i < WORKQ_NUM_QOS_BUCKETS; i++, v >>= WQ_THACTIVE_BUCKET_WIDTH) { 347 active = v & WQ_THACTIVE_BUCKET_MASK; 348 count += active; 349 350 if (busycount && wq->wq_thscheduled_count[i] > active) { 351 if (workq_thread_is_busy(curtime, &wq->wq_lastblocked_ts[i])) { 352 /* 353 * We only consider the last blocked thread for a given bucket 354 * as busy because we don't want to take the list lock in each 355 * sched callback. However this is an approximation that could 356 * contribute to thread creation storms. 357 */ 358 (*busycount)++; 359 } 360 } 361 } 362 363 return count; 364 } 365 366 #pragma mark wq_flags 367 368 static inline uint32_t 369 _wq_flags(struct workqueue *wq) 370 { 371 return os_atomic_load(&wq->wq_flags, relaxed); 372 } 373 374 static inline bool 375 _wq_exiting(struct workqueue *wq) 376 { 377 return _wq_flags(wq) & WQ_EXITING; 378 } 379 380 bool 381 workq_is_exiting(struct proc *p) 382 { 383 struct workqueue *wq = proc_get_wqptr(p); 384 return !wq || _wq_exiting(wq); 385 } 386 387 #pragma mark workqueue lock 388 389 static bool 390 workq_lock_spin_is_acquired_kdp(struct workqueue *wq) 391 { 392 return kdp_lck_spin_is_acquired(&wq->wq_lock); 393 } 394 395 static inline void 396 workq_lock_spin(struct workqueue *wq) 397 { 398 lck_spin_lock_grp(&wq->wq_lock, &workq_lck_grp); 399 } 400 401 static inline void 402 workq_lock_held(__assert_only struct workqueue *wq) 403 { 404 LCK_SPIN_ASSERT(&wq->wq_lock, LCK_ASSERT_OWNED); 405 } 406 407 static inline bool 408 workq_lock_try(struct workqueue *wq) 409 { 410 return lck_spin_try_lock_grp(&wq->wq_lock, &workq_lck_grp); 411 } 412 413 static inline void 414 workq_unlock(struct workqueue *wq) 415 { 416 lck_spin_unlock(&wq->wq_lock); 417 } 418 419 #pragma mark idle thread lists 420 421 #define WORKQ_POLICY_INIT(qos) \ 422 (struct uu_workq_policy){ .qos_req = qos, .qos_bucket = qos } 423 424 static inline thread_qos_t 425 workq_pri_bucket(struct uu_workq_policy req) 426 { 427 return MAX(MAX(req.qos_req, req.qos_max), req.qos_override); 428 } 429 430 static inline thread_qos_t 431 workq_pri_override(struct uu_workq_policy req) 432 { 433 return MAX(workq_pri_bucket(req), req.qos_bucket); 434 } 435 436 static inline bool 437 workq_thread_needs_params_change(workq_threadreq_t req, struct uthread *uth) 438 { 439 workq_threadreq_param_t cur_trp, req_trp = { }; 440 441 cur_trp.trp_value = uth->uu_save.uus_workq_park_data.workloop_params; 442 if (req->tr_flags & WORKQ_TR_FLAG_WL_PARAMS) { 443 req_trp = kqueue_threadreq_workloop_param(req); 444 } 445 446 /* 447 * CPU percent flags are handled separately to policy changes, so ignore 448 * them for all of these checks. 449 */ 450 uint16_t cur_flags = (cur_trp.trp_flags & ~TRP_CPUPERCENT); 451 uint16_t req_flags = (req_trp.trp_flags & ~TRP_CPUPERCENT); 452 453 if (!req_flags && !cur_flags) { 454 return false; 455 } 456 457 if (req_flags != cur_flags) { 458 return true; 459 } 460 461 if ((req_flags & TRP_PRIORITY) && req_trp.trp_pri != cur_trp.trp_pri) { 462 return true; 463 } 464 465 if ((req_flags & TRP_POLICY) && req_trp.trp_pol != cur_trp.trp_pol) { 466 return true; 467 } 468 469 return false; 470 } 471 472 static inline bool 473 workq_thread_needs_priority_change(workq_threadreq_t req, struct uthread *uth) 474 { 475 if (workq_thread_needs_params_change(req, uth)) { 476 return true; 477 } 478 479 return req->tr_qos != workq_pri_override(uth->uu_workq_pri); 480 } 481 482 static void 483 workq_thread_update_bucket(proc_t p, struct workqueue *wq, struct uthread *uth, 484 struct uu_workq_policy old_pri, struct uu_workq_policy new_pri, 485 bool force_run) 486 { 487 thread_qos_t old_bucket = old_pri.qos_bucket; 488 thread_qos_t new_bucket = workq_pri_bucket(new_pri); 489 490 if (old_bucket != new_bucket) { 491 _wq_thactive_move(wq, old_bucket, new_bucket); 492 } 493 494 new_pri.qos_bucket = new_bucket; 495 uth->uu_workq_pri = new_pri; 496 497 if (workq_pri_override(old_pri) != new_bucket) { 498 thread_set_workq_override(uth->uu_thread, new_bucket); 499 } 500 501 if (wq->wq_reqcount && (old_bucket > new_bucket || force_run)) { 502 int flags = WORKQ_THREADREQ_CAN_CREATE_THREADS; 503 if (old_bucket > new_bucket) { 504 /* 505 * When lowering our bucket, we may unblock a thread request, 506 * but we can't drop our priority before we have evaluated 507 * whether this is the case, and if we ever drop the workqueue lock 508 * that would cause a priority inversion. 509 * 510 * We hence have to disallow thread creation in that case. 511 */ 512 flags = 0; 513 } 514 workq_schedule_creator(p, wq, flags); 515 } 516 } 517 518 /* 519 * Sets/resets the cpu percent limits on the current thread. We can't set 520 * these limits from outside of the current thread, so this function needs 521 * to be called when we're executing on the intended 522 */ 523 static void 524 workq_thread_reset_cpupercent(workq_threadreq_t req, struct uthread *uth) 525 { 526 assert(uth == current_uthread()); 527 workq_threadreq_param_t trp = { }; 528 529 if (req && (req->tr_flags & WORKQ_TR_FLAG_WL_PARAMS)) { 530 trp = kqueue_threadreq_workloop_param(req); 531 } 532 533 if (uth->uu_workq_flags & UT_WORKQ_CPUPERCENT) { 534 /* 535 * Going through disable when we have an existing CPU percent limit 536 * set will force the ledger to refill the token bucket of the current 537 * thread. Removing any penalty applied by previous thread use. 538 */ 539 thread_set_cpulimit(THREAD_CPULIMIT_DISABLE, 0, 0); 540 uth->uu_workq_flags &= ~UT_WORKQ_CPUPERCENT; 541 } 542 543 if (trp.trp_flags & TRP_CPUPERCENT) { 544 thread_set_cpulimit(THREAD_CPULIMIT_BLOCK, trp.trp_cpupercent, 545 (uint64_t)trp.trp_refillms * NSEC_PER_SEC); 546 uth->uu_workq_flags |= UT_WORKQ_CPUPERCENT; 547 } 548 } 549 550 static void 551 workq_thread_reset_pri(struct workqueue *wq, struct uthread *uth, 552 workq_threadreq_t req, bool unpark) 553 { 554 thread_t th = uth->uu_thread; 555 thread_qos_t qos = req ? req->tr_qos : WORKQ_THREAD_QOS_CLEANUP; 556 workq_threadreq_param_t trp = { }; 557 int priority = 31; 558 int policy = POLICY_TIMESHARE; 559 560 if (req && (req->tr_flags & WORKQ_TR_FLAG_WL_PARAMS)) { 561 trp = kqueue_threadreq_workloop_param(req); 562 } 563 564 uth->uu_workq_pri = WORKQ_POLICY_INIT(qos); 565 uth->uu_workq_flags &= ~UT_WORKQ_OUTSIDE_QOS; 566 567 if (unpark) { 568 uth->uu_save.uus_workq_park_data.workloop_params = trp.trp_value; 569 // qos sent out to userspace (may differ from uu_workq_pri on param threads) 570 uth->uu_save.uus_workq_park_data.qos = qos; 571 } 572 573 if (qos == WORKQ_THREAD_QOS_MANAGER) { 574 uint32_t mgr_pri = wq->wq_event_manager_priority; 575 assert(trp.trp_value == 0); // manager qos and thread policy don't mix 576 577 if (mgr_pri & _PTHREAD_PRIORITY_SCHED_PRI_FLAG) { 578 mgr_pri &= _PTHREAD_PRIORITY_SCHED_PRI_MASK; 579 thread_set_workq_pri(th, THREAD_QOS_UNSPECIFIED, mgr_pri, 580 POLICY_TIMESHARE); 581 return; 582 } 583 584 qos = _pthread_priority_thread_qos(mgr_pri); 585 } else { 586 if (trp.trp_flags & TRP_PRIORITY) { 587 qos = THREAD_QOS_UNSPECIFIED; 588 priority = trp.trp_pri; 589 uth->uu_workq_flags |= UT_WORKQ_OUTSIDE_QOS; 590 } 591 592 if (trp.trp_flags & TRP_POLICY) { 593 policy = trp.trp_pol; 594 } 595 } 596 597 thread_set_workq_pri(th, qos, priority, policy); 598 } 599 600 /* 601 * Called by kevent with the NOTE_WL_THREAD_REQUEST knote lock held, 602 * every time a servicer is being told about a new max QoS. 603 */ 604 void 605 workq_thread_set_max_qos(struct proc *p, workq_threadreq_t kqr) 606 { 607 struct uu_workq_policy old_pri, new_pri; 608 struct uthread *uth = current_uthread(); 609 struct workqueue *wq = proc_get_wqptr_fast(p); 610 thread_qos_t qos = kqr->tr_kq_qos_index; 611 612 if (uth->uu_workq_pri.qos_max == qos) { 613 return; 614 } 615 616 workq_lock_spin(wq); 617 old_pri = new_pri = uth->uu_workq_pri; 618 new_pri.qos_max = qos; 619 workq_thread_update_bucket(p, wq, uth, old_pri, new_pri, false); 620 workq_unlock(wq); 621 } 622 623 #pragma mark idle threads accounting and handling 624 625 static inline struct uthread * 626 workq_oldest_killable_idle_thread(struct workqueue *wq) 627 { 628 struct uthread *uth = TAILQ_LAST(&wq->wq_thidlelist, workq_uthread_head); 629 630 if (uth && !uth->uu_save.uus_workq_park_data.has_stack) { 631 uth = TAILQ_PREV(uth, workq_uthread_head, uu_workq_entry); 632 if (uth) { 633 assert(uth->uu_save.uus_workq_park_data.has_stack); 634 } 635 } 636 return uth; 637 } 638 639 static inline uint64_t 640 workq_kill_delay_for_idle_thread(struct workqueue *wq) 641 { 642 uint64_t delay = wq_reduce_pool_window.abstime; 643 uint16_t idle = wq->wq_thidlecount; 644 645 /* 646 * If we have less than wq_death_max_load threads, have a 5s timer. 647 * 648 * For the next wq_max_constrained_threads ones, decay linearly from 649 * from 5s to 50ms. 650 */ 651 if (idle <= wq_death_max_load) { 652 return delay; 653 } 654 655 if (wq_max_constrained_threads > idle - wq_death_max_load) { 656 delay *= (wq_max_constrained_threads - (idle - wq_death_max_load)); 657 } 658 return delay / wq_max_constrained_threads; 659 } 660 661 static inline bool 662 workq_should_kill_idle_thread(struct workqueue *wq, struct uthread *uth, 663 uint64_t now) 664 { 665 uint64_t delay = workq_kill_delay_for_idle_thread(wq); 666 return now - uth->uu_save.uus_workq_park_data.idle_stamp > delay; 667 } 668 669 static void 670 workq_death_call_schedule(struct workqueue *wq, uint64_t deadline) 671 { 672 uint32_t wq_flags = os_atomic_load(&wq->wq_flags, relaxed); 673 674 if (wq_flags & (WQ_EXITING | WQ_DEATH_CALL_SCHEDULED)) { 675 return; 676 } 677 os_atomic_or(&wq->wq_flags, WQ_DEATH_CALL_SCHEDULED, relaxed); 678 679 WQ_TRACE_WQ(TRACE_wq_death_call | DBG_FUNC_NONE, wq, 1, 0, 0, 0); 680 681 /* 682 * <rdar://problem/13139182> Due to how long term timers work, the leeway 683 * can't be too short, so use 500ms which is long enough that we will not 684 * wake up the CPU for killing threads, but short enough that it doesn't 685 * fall into long-term timer list shenanigans. 686 */ 687 thread_call_enter_delayed_with_leeway(wq->wq_death_call, NULL, deadline, 688 wq_reduce_pool_window.abstime / 10, 689 THREAD_CALL_DELAY_LEEWAY | THREAD_CALL_DELAY_USER_BACKGROUND); 690 } 691 692 /* 693 * `decrement` is set to the number of threads that are no longer dying: 694 * - because they have been resuscitated just in time (workq_pop_idle_thread) 695 * - or have been killed (workq_thread_terminate). 696 */ 697 static void 698 workq_death_policy_evaluate(struct workqueue *wq, uint16_t decrement) 699 { 700 struct uthread *uth; 701 702 assert(wq->wq_thdying_count >= decrement); 703 if ((wq->wq_thdying_count -= decrement) > 0) { 704 return; 705 } 706 707 if (wq->wq_thidlecount <= 1) { 708 return; 709 } 710 711 if ((uth = workq_oldest_killable_idle_thread(wq)) == NULL) { 712 return; 713 } 714 715 uint64_t now = mach_absolute_time(); 716 uint64_t delay = workq_kill_delay_for_idle_thread(wq); 717 718 if (now - uth->uu_save.uus_workq_park_data.idle_stamp > delay) { 719 WQ_TRACE_WQ(TRACE_wq_thread_terminate | DBG_FUNC_START, 720 wq, wq->wq_thidlecount, 0, 0, 0); 721 wq->wq_thdying_count++; 722 uth->uu_workq_flags |= UT_WORKQ_DYING; 723 if ((uth->uu_workq_flags & UT_WORKQ_IDLE_CLEANUP) == 0) { 724 workq_thread_wakeup(uth); 725 } 726 return; 727 } 728 729 workq_death_call_schedule(wq, 730 uth->uu_save.uus_workq_park_data.idle_stamp + delay); 731 } 732 733 void 734 workq_thread_terminate(struct proc *p, struct uthread *uth) 735 { 736 struct workqueue *wq = proc_get_wqptr_fast(p); 737 738 workq_lock_spin(wq); 739 TAILQ_REMOVE(&wq->wq_thrunlist, uth, uu_workq_entry); 740 if (uth->uu_workq_flags & UT_WORKQ_DYING) { 741 WQ_TRACE_WQ(TRACE_wq_thread_terminate | DBG_FUNC_END, 742 wq, wq->wq_thidlecount, 0, 0, 0); 743 workq_death_policy_evaluate(wq, 1); 744 } 745 if (wq->wq_nthreads-- == wq_max_threads) { 746 /* 747 * We got under the thread limit again, which may have prevented 748 * thread creation from happening, redrive if there are pending requests 749 */ 750 if (wq->wq_reqcount) { 751 workq_schedule_creator(p, wq, WORKQ_THREADREQ_CAN_CREATE_THREADS); 752 } 753 } 754 workq_unlock(wq); 755 756 thread_deallocate(uth->uu_thread); 757 } 758 759 static void 760 workq_kill_old_threads_call(void *param0, void *param1 __unused) 761 { 762 struct workqueue *wq = param0; 763 764 workq_lock_spin(wq); 765 WQ_TRACE_WQ(TRACE_wq_death_call | DBG_FUNC_START, wq, 0, 0, 0, 0); 766 os_atomic_andnot(&wq->wq_flags, WQ_DEATH_CALL_SCHEDULED, relaxed); 767 workq_death_policy_evaluate(wq, 0); 768 WQ_TRACE_WQ(TRACE_wq_death_call | DBG_FUNC_END, wq, 0, 0, 0, 0); 769 workq_unlock(wq); 770 } 771 772 static struct uthread * 773 workq_pop_idle_thread(struct workqueue *wq, uint8_t uu_flags, 774 bool *needs_wakeup) 775 { 776 struct uthread *uth; 777 778 if ((uth = TAILQ_FIRST(&wq->wq_thidlelist))) { 779 TAILQ_REMOVE(&wq->wq_thidlelist, uth, uu_workq_entry); 780 } else { 781 uth = TAILQ_FIRST(&wq->wq_thnewlist); 782 TAILQ_REMOVE(&wq->wq_thnewlist, uth, uu_workq_entry); 783 } 784 TAILQ_INSERT_TAIL(&wq->wq_thrunlist, uth, uu_workq_entry); 785 786 assert((uth->uu_workq_flags & UT_WORKQ_RUNNING) == 0); 787 uth->uu_workq_flags |= UT_WORKQ_RUNNING | uu_flags; 788 if ((uu_flags & UT_WORKQ_OVERCOMMIT) == 0) { 789 wq->wq_constrained_threads_scheduled++; 790 } 791 wq->wq_threads_scheduled++; 792 wq->wq_thidlecount--; 793 794 if (__improbable(uth->uu_workq_flags & UT_WORKQ_DYING)) { 795 uth->uu_workq_flags ^= UT_WORKQ_DYING; 796 workq_death_policy_evaluate(wq, 1); 797 *needs_wakeup = false; 798 } else if (uth->uu_workq_flags & UT_WORKQ_IDLE_CLEANUP) { 799 *needs_wakeup = false; 800 } else { 801 *needs_wakeup = true; 802 } 803 return uth; 804 } 805 806 /* 807 * Called by thread_create_workq_waiting() during thread initialization, before 808 * assert_wait, before the thread has been started. 809 */ 810 event_t 811 workq_thread_init_and_wq_lock(task_t task, thread_t th) 812 { 813 struct uthread *uth = get_bsdthread_info(th); 814 815 uth->uu_workq_flags = UT_WORKQ_NEW; 816 uth->uu_workq_pri = WORKQ_POLICY_INIT(THREAD_QOS_LEGACY); 817 uth->uu_workq_thport = MACH_PORT_NULL; 818 uth->uu_workq_stackaddr = 0; 819 uth->uu_workq_pthread_kill_allowed = 0; 820 821 thread_set_tag(th, THREAD_TAG_PTHREAD | THREAD_TAG_WORKQUEUE); 822 thread_reset_workq_qos(th, THREAD_QOS_LEGACY); 823 824 workq_lock_spin(proc_get_wqptr_fast(get_bsdtask_info(task))); 825 return workq_parked_wait_event(uth); 826 } 827 828 /** 829 * Try to add a new workqueue thread. 830 * 831 * - called with workq lock held 832 * - dropped and retaken around thread creation 833 * - return with workq lock held 834 */ 835 static bool 836 workq_add_new_idle_thread(proc_t p, struct workqueue *wq) 837 { 838 mach_vm_offset_t th_stackaddr; 839 kern_return_t kret; 840 thread_t th; 841 842 wq->wq_nthreads++; 843 844 workq_unlock(wq); 845 846 vm_map_t vmap = get_task_map(p->task); 847 848 kret = pthread_functions->workq_create_threadstack(p, vmap, &th_stackaddr); 849 if (kret != KERN_SUCCESS) { 850 WQ_TRACE_WQ(TRACE_wq_thread_create_failed | DBG_FUNC_NONE, wq, 851 kret, 1, 0, 0); 852 goto out; 853 } 854 855 kret = thread_create_workq_waiting(p->task, workq_unpark_continue, &th); 856 if (kret != KERN_SUCCESS) { 857 WQ_TRACE_WQ(TRACE_wq_thread_create_failed | DBG_FUNC_NONE, wq, 858 kret, 0, 0, 0); 859 pthread_functions->workq_destroy_threadstack(p, vmap, th_stackaddr); 860 goto out; 861 } 862 863 // thread_create_workq_waiting() will return with the wq lock held 864 // on success, because it calls workq_thread_init_and_wq_lock() above 865 866 struct uthread *uth = get_bsdthread_info(th); 867 868 wq->wq_creations++; 869 wq->wq_thidlecount++; 870 uth->uu_workq_stackaddr = (user_addr_t)th_stackaddr; 871 TAILQ_INSERT_TAIL(&wq->wq_thnewlist, uth, uu_workq_entry); 872 873 WQ_TRACE_WQ(TRACE_wq_thread_create | DBG_FUNC_NONE, wq, 0, 0, 0, 0); 874 return true; 875 876 out: 877 workq_lock_spin(wq); 878 /* 879 * Do not redrive here if we went under wq_max_threads again, 880 * it is the responsibility of the callers of this function 881 * to do so when it fails. 882 */ 883 wq->wq_nthreads--; 884 return false; 885 } 886 887 #define WORKQ_UNPARK_FOR_DEATH_WAS_IDLE 0x1 888 889 __attribute__((noreturn, noinline)) 890 static void 891 workq_unpark_for_death_and_unlock(proc_t p, struct workqueue *wq, 892 struct uthread *uth, uint32_t death_flags, uint32_t setup_flags) 893 { 894 thread_qos_t qos = workq_pri_override(uth->uu_workq_pri); 895 bool first_use = uth->uu_workq_flags & UT_WORKQ_NEW; 896 897 if (qos > WORKQ_THREAD_QOS_CLEANUP) { 898 workq_thread_reset_pri(wq, uth, NULL, /*unpark*/ true); 899 qos = WORKQ_THREAD_QOS_CLEANUP; 900 } 901 902 workq_thread_reset_cpupercent(NULL, uth); 903 904 if (death_flags & WORKQ_UNPARK_FOR_DEATH_WAS_IDLE) { 905 wq->wq_thidlecount--; 906 if (first_use) { 907 TAILQ_REMOVE(&wq->wq_thnewlist, uth, uu_workq_entry); 908 } else { 909 TAILQ_REMOVE(&wq->wq_thidlelist, uth, uu_workq_entry); 910 } 911 } 912 TAILQ_INSERT_TAIL(&wq->wq_thrunlist, uth, uu_workq_entry); 913 914 workq_unlock(wq); 915 916 if (setup_flags & WQ_SETUP_CLEAR_VOUCHER) { 917 __assert_only kern_return_t kr; 918 kr = thread_set_voucher_name(MACH_PORT_NULL); 919 assert(kr == KERN_SUCCESS); 920 } 921 922 uint32_t flags = WQ_FLAG_THREAD_NEWSPI | qos | WQ_FLAG_THREAD_PRIO_QOS; 923 thread_t th = uth->uu_thread; 924 vm_map_t vmap = get_task_map(p->task); 925 926 if (!first_use) { 927 flags |= WQ_FLAG_THREAD_REUSE; 928 } 929 930 pthread_functions->workq_setup_thread(p, th, vmap, uth->uu_workq_stackaddr, 931 uth->uu_workq_thport, 0, WQ_SETUP_EXIT_THREAD, flags); 932 __builtin_unreachable(); 933 } 934 935 bool 936 workq_is_current_thread_updating_turnstile(struct workqueue *wq) 937 { 938 return wq->wq_turnstile_updater == current_thread(); 939 } 940 941 __attribute__((always_inline)) 942 static inline void 943 workq_perform_turnstile_operation_locked(struct workqueue *wq, 944 void (^operation)(void)) 945 { 946 workq_lock_held(wq); 947 wq->wq_turnstile_updater = current_thread(); 948 operation(); 949 wq->wq_turnstile_updater = THREAD_NULL; 950 } 951 952 static void 953 workq_turnstile_update_inheritor(struct workqueue *wq, 954 turnstile_inheritor_t inheritor, 955 turnstile_update_flags_t flags) 956 { 957 if (wq->wq_inheritor == inheritor) { 958 return; 959 } 960 wq->wq_inheritor = inheritor; 961 workq_perform_turnstile_operation_locked(wq, ^{ 962 turnstile_update_inheritor(wq->wq_turnstile, inheritor, 963 flags | TURNSTILE_IMMEDIATE_UPDATE); 964 turnstile_update_inheritor_complete(wq->wq_turnstile, 965 TURNSTILE_INTERLOCK_HELD); 966 }); 967 } 968 969 static void 970 workq_push_idle_thread(proc_t p, struct workqueue *wq, struct uthread *uth, 971 uint32_t setup_flags) 972 { 973 uint64_t now = mach_absolute_time(); 974 bool is_creator = (uth == wq->wq_creator); 975 976 if ((uth->uu_workq_flags & UT_WORKQ_OVERCOMMIT) == 0) { 977 wq->wq_constrained_threads_scheduled--; 978 } 979 uth->uu_workq_flags &= ~(UT_WORKQ_RUNNING | UT_WORKQ_OVERCOMMIT); 980 TAILQ_REMOVE(&wq->wq_thrunlist, uth, uu_workq_entry); 981 wq->wq_threads_scheduled--; 982 983 if (is_creator) { 984 wq->wq_creator = NULL; 985 WQ_TRACE_WQ(TRACE_wq_creator_select, wq, 3, 0, 986 uth->uu_save.uus_workq_park_data.yields, 0); 987 } 988 989 if (wq->wq_inheritor == uth->uu_thread) { 990 assert(wq->wq_creator == NULL); 991 if (wq->wq_reqcount) { 992 workq_turnstile_update_inheritor(wq, wq, TURNSTILE_INHERITOR_WORKQ); 993 } else { 994 workq_turnstile_update_inheritor(wq, TURNSTILE_INHERITOR_NULL, 0); 995 } 996 } 997 998 if (uth->uu_workq_flags & UT_WORKQ_NEW) { 999 assert(is_creator || (_wq_flags(wq) & WQ_EXITING)); 1000 TAILQ_INSERT_TAIL(&wq->wq_thnewlist, uth, uu_workq_entry); 1001 wq->wq_thidlecount++; 1002 return; 1003 } 1004 1005 if (!is_creator) { 1006 _wq_thactive_dec(wq, uth->uu_workq_pri.qos_bucket); 1007 wq->wq_thscheduled_count[_wq_bucket(uth->uu_workq_pri.qos_bucket)]--; 1008 uth->uu_workq_flags |= UT_WORKQ_IDLE_CLEANUP; 1009 } 1010 1011 uth->uu_save.uus_workq_park_data.idle_stamp = now; 1012 1013 struct uthread *oldest = workq_oldest_killable_idle_thread(wq); 1014 uint16_t cur_idle = wq->wq_thidlecount; 1015 1016 if (cur_idle >= wq_max_constrained_threads || 1017 (wq->wq_thdying_count == 0 && oldest && 1018 workq_should_kill_idle_thread(wq, oldest, now))) { 1019 /* 1020 * Immediately kill threads if we have too may of them. 1021 * 1022 * And swap "place" with the oldest one we'd have woken up. 1023 * This is a relatively desperate situation where we really 1024 * need to kill threads quickly and it's best to kill 1025 * the one that's currently on core than context switching. 1026 */ 1027 if (oldest) { 1028 oldest->uu_save.uus_workq_park_data.idle_stamp = now; 1029 TAILQ_REMOVE(&wq->wq_thidlelist, oldest, uu_workq_entry); 1030 TAILQ_INSERT_HEAD(&wq->wq_thidlelist, oldest, uu_workq_entry); 1031 } 1032 1033 WQ_TRACE_WQ(TRACE_wq_thread_terminate | DBG_FUNC_START, 1034 wq, cur_idle, 0, 0, 0); 1035 wq->wq_thdying_count++; 1036 uth->uu_workq_flags |= UT_WORKQ_DYING; 1037 uth->uu_workq_flags &= ~UT_WORKQ_IDLE_CLEANUP; 1038 workq_unpark_for_death_and_unlock(p, wq, uth, 0, setup_flags); 1039 __builtin_unreachable(); 1040 } 1041 1042 struct uthread *tail = TAILQ_LAST(&wq->wq_thidlelist, workq_uthread_head); 1043 1044 cur_idle += 1; 1045 wq->wq_thidlecount = cur_idle; 1046 1047 if (cur_idle >= wq_death_max_load && tail && 1048 tail->uu_save.uus_workq_park_data.has_stack) { 1049 uth->uu_save.uus_workq_park_data.has_stack = false; 1050 TAILQ_INSERT_TAIL(&wq->wq_thidlelist, uth, uu_workq_entry); 1051 } else { 1052 uth->uu_save.uus_workq_park_data.has_stack = true; 1053 TAILQ_INSERT_HEAD(&wq->wq_thidlelist, uth, uu_workq_entry); 1054 } 1055 1056 if (!tail) { 1057 uint64_t delay = workq_kill_delay_for_idle_thread(wq); 1058 workq_death_call_schedule(wq, now + delay); 1059 } 1060 } 1061 1062 #pragma mark thread requests 1063 1064 static inline int 1065 workq_priority_for_req(workq_threadreq_t req) 1066 { 1067 thread_qos_t qos = req->tr_qos; 1068 1069 if (req->tr_flags & WORKQ_TR_FLAG_WL_OUTSIDE_QOS) { 1070 workq_threadreq_param_t trp = kqueue_threadreq_workloop_param(req); 1071 assert(trp.trp_flags & TRP_PRIORITY); 1072 return trp.trp_pri; 1073 } 1074 return thread_workq_pri_for_qos(qos); 1075 } 1076 1077 static inline struct priority_queue_sched_max * 1078 workq_priority_queue_for_req(struct workqueue *wq, workq_threadreq_t req) 1079 { 1080 if (req->tr_flags & WORKQ_TR_FLAG_WL_OUTSIDE_QOS) { 1081 return &wq->wq_special_queue; 1082 } else if (req->tr_flags & WORKQ_TR_FLAG_OVERCOMMIT) { 1083 return &wq->wq_overcommit_queue; 1084 } else { 1085 return &wq->wq_constrained_queue; 1086 } 1087 } 1088 1089 /* 1090 * returns true if the the enqueued request is the highest priority item 1091 * in its priority queue. 1092 */ 1093 static bool 1094 workq_threadreq_enqueue(struct workqueue *wq, workq_threadreq_t req) 1095 { 1096 assert(req->tr_state == WORKQ_TR_STATE_NEW); 1097 1098 req->tr_state = WORKQ_TR_STATE_QUEUED; 1099 wq->wq_reqcount += req->tr_count; 1100 1101 if (req->tr_qos == WORKQ_THREAD_QOS_MANAGER) { 1102 assert(wq->wq_event_manager_threadreq == NULL); 1103 assert(req->tr_flags & WORKQ_TR_FLAG_KEVENT); 1104 assert(req->tr_count == 1); 1105 wq->wq_event_manager_threadreq = req; 1106 return true; 1107 } 1108 1109 struct priority_queue_sched_max *q = workq_priority_queue_for_req(wq, req); 1110 priority_queue_entry_set_sched_pri(q, &req->tr_entry, 1111 workq_priority_for_req(req), false); 1112 1113 if (priority_queue_insert(q, &req->tr_entry)) { 1114 if ((req->tr_flags & WORKQ_TR_FLAG_OVERCOMMIT) == 0) { 1115 _wq_thactive_refresh_best_constrained_req_qos(wq); 1116 } 1117 return true; 1118 } 1119 return false; 1120 } 1121 1122 /* 1123 * returns true if the the dequeued request was the highest priority item 1124 * in its priority queue. 1125 */ 1126 static bool 1127 workq_threadreq_dequeue(struct workqueue *wq, workq_threadreq_t req) 1128 { 1129 wq->wq_reqcount--; 1130 1131 if (--req->tr_count == 0) { 1132 if (req->tr_qos == WORKQ_THREAD_QOS_MANAGER) { 1133 assert(wq->wq_event_manager_threadreq == req); 1134 assert(req->tr_count == 0); 1135 wq->wq_event_manager_threadreq = NULL; 1136 return true; 1137 } 1138 if (priority_queue_remove(workq_priority_queue_for_req(wq, req), 1139 &req->tr_entry)) { 1140 if ((req->tr_flags & WORKQ_TR_FLAG_OVERCOMMIT) == 0) { 1141 _wq_thactive_refresh_best_constrained_req_qos(wq); 1142 } 1143 return true; 1144 } 1145 } 1146 return false; 1147 } 1148 1149 static void 1150 workq_threadreq_destroy(proc_t p, workq_threadreq_t req) 1151 { 1152 req->tr_state = WORKQ_TR_STATE_CANCELED; 1153 if (req->tr_flags & (WORKQ_TR_FLAG_WORKLOOP | WORKQ_TR_FLAG_KEVENT)) { 1154 kqueue_threadreq_cancel(p, req); 1155 } else { 1156 zfree(workq_zone_threadreq, req); 1157 } 1158 } 1159 1160 #pragma mark workqueue thread creation thread calls 1161 1162 static inline bool 1163 workq_thread_call_prepost(struct workqueue *wq, uint32_t sched, uint32_t pend, 1164 uint32_t fail_mask) 1165 { 1166 uint32_t old_flags, new_flags; 1167 1168 os_atomic_rmw_loop(&wq->wq_flags, old_flags, new_flags, acquire, { 1169 if (__improbable(old_flags & (WQ_EXITING | sched | pend | fail_mask))) { 1170 os_atomic_rmw_loop_give_up(return false); 1171 } 1172 if (__improbable(old_flags & WQ_PROC_SUSPENDED)) { 1173 new_flags = old_flags | pend; 1174 } else { 1175 new_flags = old_flags | sched; 1176 } 1177 }); 1178 1179 return (old_flags & WQ_PROC_SUSPENDED) == 0; 1180 } 1181 1182 #define WORKQ_SCHEDULE_DELAYED_THREAD_CREATION_RESTART 0x1 1183 1184 static bool 1185 workq_schedule_delayed_thread_creation(struct workqueue *wq, int flags) 1186 { 1187 assert(!preemption_enabled()); 1188 1189 if (!workq_thread_call_prepost(wq, WQ_DELAYED_CALL_SCHEDULED, 1190 WQ_DELAYED_CALL_PENDED, WQ_IMMEDIATE_CALL_PENDED | 1191 WQ_IMMEDIATE_CALL_SCHEDULED)) { 1192 return false; 1193 } 1194 1195 uint64_t now = mach_absolute_time(); 1196 1197 if (flags & WORKQ_SCHEDULE_DELAYED_THREAD_CREATION_RESTART) { 1198 /* do not change the window */ 1199 } else if (now - wq->wq_thread_call_last_run <= wq->wq_timer_interval) { 1200 wq->wq_timer_interval *= 2; 1201 if (wq->wq_timer_interval > wq_max_timer_interval.abstime) { 1202 wq->wq_timer_interval = (uint32_t)wq_max_timer_interval.abstime; 1203 } 1204 } else if (now - wq->wq_thread_call_last_run > 2 * wq->wq_timer_interval) { 1205 wq->wq_timer_interval /= 2; 1206 if (wq->wq_timer_interval < wq_stalled_window.abstime) { 1207 wq->wq_timer_interval = (uint32_t)wq_stalled_window.abstime; 1208 } 1209 } 1210 1211 WQ_TRACE_WQ(TRACE_wq_start_add_timer, wq, wq->wq_reqcount, 1212 _wq_flags(wq), wq->wq_timer_interval, 0); 1213 1214 thread_call_t call = wq->wq_delayed_call; 1215 uintptr_t arg = WQ_DELAYED_CALL_SCHEDULED; 1216 uint64_t deadline = now + wq->wq_timer_interval; 1217 if (thread_call_enter1_delayed(call, (void *)arg, deadline)) { 1218 panic("delayed_call was already enqueued"); 1219 } 1220 return true; 1221 } 1222 1223 static void 1224 workq_schedule_immediate_thread_creation(struct workqueue *wq) 1225 { 1226 assert(!preemption_enabled()); 1227 1228 if (workq_thread_call_prepost(wq, WQ_IMMEDIATE_CALL_SCHEDULED, 1229 WQ_IMMEDIATE_CALL_PENDED, 0)) { 1230 WQ_TRACE_WQ(TRACE_wq_start_add_timer, wq, wq->wq_reqcount, 1231 _wq_flags(wq), 0, 0); 1232 1233 uintptr_t arg = WQ_IMMEDIATE_CALL_SCHEDULED; 1234 if (thread_call_enter1(wq->wq_immediate_call, (void *)arg)) { 1235 panic("immediate_call was already enqueued"); 1236 } 1237 } 1238 } 1239 1240 void 1241 workq_proc_suspended(struct proc *p) 1242 { 1243 struct workqueue *wq = proc_get_wqptr(p); 1244 1245 if (wq) { 1246 os_atomic_or(&wq->wq_flags, WQ_PROC_SUSPENDED, relaxed); 1247 } 1248 } 1249 1250 void 1251 workq_proc_resumed(struct proc *p) 1252 { 1253 struct workqueue *wq = proc_get_wqptr(p); 1254 uint32_t wq_flags; 1255 1256 if (!wq) { 1257 return; 1258 } 1259 1260 wq_flags = os_atomic_andnot_orig(&wq->wq_flags, WQ_PROC_SUSPENDED | 1261 WQ_DELAYED_CALL_PENDED | WQ_IMMEDIATE_CALL_PENDED, relaxed); 1262 if ((wq_flags & WQ_EXITING) == 0) { 1263 disable_preemption(); 1264 if (wq_flags & WQ_IMMEDIATE_CALL_PENDED) { 1265 workq_schedule_immediate_thread_creation(wq); 1266 } else if (wq_flags & WQ_DELAYED_CALL_PENDED) { 1267 workq_schedule_delayed_thread_creation(wq, 1268 WORKQ_SCHEDULE_DELAYED_THREAD_CREATION_RESTART); 1269 } 1270 enable_preemption(); 1271 } 1272 } 1273 1274 /** 1275 * returns whether lastblocked_tsp is within wq_stalled_window usecs of now 1276 */ 1277 static bool 1278 workq_thread_is_busy(uint64_t now, _Atomic uint64_t *lastblocked_tsp) 1279 { 1280 uint64_t lastblocked_ts = os_atomic_load_wide(lastblocked_tsp, relaxed); 1281 if (now <= lastblocked_ts) { 1282 /* 1283 * Because the update of the timestamp when a thread blocks 1284 * isn't serialized against us looking at it (i.e. we don't hold 1285 * the workq lock), it's possible to have a timestamp that matches 1286 * the current time or that even looks to be in the future relative 1287 * to when we grabbed the current time... 1288 * 1289 * Just treat this as a busy thread since it must have just blocked. 1290 */ 1291 return true; 1292 } 1293 return (now - lastblocked_ts) < wq_stalled_window.abstime; 1294 } 1295 1296 static void 1297 workq_add_new_threads_call(void *_p, void *flags) 1298 { 1299 proc_t p = _p; 1300 struct workqueue *wq = proc_get_wqptr(p); 1301 uint32_t my_flag = (uint32_t)(uintptr_t)flags; 1302 1303 /* 1304 * workq_exit() will set the workqueue to NULL before 1305 * it cancels thread calls. 1306 */ 1307 if (!wq) { 1308 return; 1309 } 1310 1311 assert((my_flag == WQ_DELAYED_CALL_SCHEDULED) || 1312 (my_flag == WQ_IMMEDIATE_CALL_SCHEDULED)); 1313 1314 WQ_TRACE_WQ(TRACE_wq_add_timer | DBG_FUNC_START, wq, _wq_flags(wq), 1315 wq->wq_nthreads, wq->wq_thidlecount, 0); 1316 1317 workq_lock_spin(wq); 1318 1319 wq->wq_thread_call_last_run = mach_absolute_time(); 1320 os_atomic_andnot(&wq->wq_flags, my_flag, release); 1321 1322 /* This can drop the workqueue lock, and take it again */ 1323 workq_schedule_creator(p, wq, WORKQ_THREADREQ_CAN_CREATE_THREADS); 1324 1325 workq_unlock(wq); 1326 1327 WQ_TRACE_WQ(TRACE_wq_add_timer | DBG_FUNC_END, wq, 0, 1328 wq->wq_nthreads, wq->wq_thidlecount, 0); 1329 } 1330 1331 #pragma mark thread state tracking 1332 1333 static void 1334 workq_sched_callback(int type, thread_t thread) 1335 { 1336 struct uthread *uth = get_bsdthread_info(thread); 1337 proc_t proc = get_bsdtask_info(get_threadtask(thread)); 1338 struct workqueue *wq = proc_get_wqptr(proc); 1339 thread_qos_t req_qos, qos = uth->uu_workq_pri.qos_bucket; 1340 wq_thactive_t old_thactive; 1341 bool start_timer = false; 1342 1343 if (qos == WORKQ_THREAD_QOS_MANAGER) { 1344 return; 1345 } 1346 1347 switch (type) { 1348 case SCHED_CALL_BLOCK: 1349 old_thactive = _wq_thactive_dec(wq, qos); 1350 req_qos = WQ_THACTIVE_BEST_CONSTRAINED_REQ_QOS(old_thactive); 1351 1352 /* 1353 * Remember the timestamp of the last thread that blocked in this 1354 * bucket, it used used by admission checks to ignore one thread 1355 * being inactive if this timestamp is recent enough. 1356 * 1357 * If we collide with another thread trying to update the 1358 * last_blocked (really unlikely since another thread would have to 1359 * get scheduled and then block after we start down this path), it's 1360 * not a problem. Either timestamp is adequate, so no need to retry 1361 */ 1362 os_atomic_store_wide(&wq->wq_lastblocked_ts[_wq_bucket(qos)], 1363 thread_last_run_time(thread), relaxed); 1364 1365 if (req_qos == THREAD_QOS_UNSPECIFIED) { 1366 /* 1367 * No pending request at the moment we could unblock, move on. 1368 */ 1369 } else if (qos < req_qos) { 1370 /* 1371 * The blocking thread is at a lower QoS than the highest currently 1372 * pending constrained request, nothing has to be redriven 1373 */ 1374 } else { 1375 uint32_t max_busycount, old_req_count; 1376 old_req_count = _wq_thactive_aggregate_downto_qos(wq, old_thactive, 1377 req_qos, NULL, &max_busycount); 1378 /* 1379 * If it is possible that may_start_constrained_thread had refused 1380 * admission due to being over the max concurrency, we may need to 1381 * spin up a new thread. 1382 * 1383 * We take into account the maximum number of busy threads 1384 * that can affect may_start_constrained_thread as looking at the 1385 * actual number may_start_constrained_thread will see is racy. 1386 * 1387 * IOW at NCPU = 4, for IN (req_qos = 1), if the old req count is 1388 * between NCPU (4) and NCPU - 2 (2) we need to redrive. 1389 */ 1390 uint32_t conc = wq_max_parallelism[_wq_bucket(qos)]; 1391 if (old_req_count <= conc && conc <= old_req_count + max_busycount) { 1392 start_timer = workq_schedule_delayed_thread_creation(wq, 0); 1393 } 1394 } 1395 if (__improbable(kdebug_enable)) { 1396 __unused uint32_t old = _wq_thactive_aggregate_downto_qos(wq, 1397 old_thactive, qos, NULL, NULL); 1398 WQ_TRACE_WQ(TRACE_wq_thread_block | DBG_FUNC_START, wq, 1399 old - 1, qos | (req_qos << 8), 1400 wq->wq_reqcount << 1 | start_timer, 0); 1401 } 1402 break; 1403 1404 case SCHED_CALL_UNBLOCK: 1405 /* 1406 * we cannot take the workqueue_lock here... 1407 * an UNBLOCK can occur from a timer event which 1408 * is run from an interrupt context... if the workqueue_lock 1409 * is already held by this processor, we'll deadlock... 1410 * the thread lock for the thread being UNBLOCKED 1411 * is also held 1412 */ 1413 old_thactive = _wq_thactive_inc(wq, qos); 1414 if (__improbable(kdebug_enable)) { 1415 __unused uint32_t old = _wq_thactive_aggregate_downto_qos(wq, 1416 old_thactive, qos, NULL, NULL); 1417 req_qos = WQ_THACTIVE_BEST_CONSTRAINED_REQ_QOS(old_thactive); 1418 WQ_TRACE_WQ(TRACE_wq_thread_block | DBG_FUNC_END, wq, 1419 old + 1, qos | (req_qos << 8), 1420 wq->wq_threads_scheduled, 0); 1421 } 1422 break; 1423 } 1424 } 1425 1426 #pragma mark workq lifecycle 1427 1428 void 1429 workq_reference(struct workqueue *wq) 1430 { 1431 os_ref_retain(&wq->wq_refcnt); 1432 } 1433 1434 static void 1435 workq_deallocate_queue_invoke(mpsc_queue_chain_t e, 1436 __assert_only mpsc_daemon_queue_t dq) 1437 { 1438 struct workqueue *wq; 1439 struct turnstile *ts; 1440 1441 wq = mpsc_queue_element(e, struct workqueue, wq_destroy_link); 1442 assert(dq == &workq_deallocate_queue); 1443 1444 turnstile_complete((uintptr_t)wq, &wq->wq_turnstile, &ts, TURNSTILE_WORKQS); 1445 assert(ts); 1446 turnstile_cleanup(); 1447 turnstile_deallocate(ts); 1448 1449 lck_spin_destroy(&wq->wq_lock, &workq_lck_grp); 1450 zfree(workq_zone_workqueue, wq); 1451 } 1452 1453 static void 1454 workq_deallocate(struct workqueue *wq) 1455 { 1456 if (os_ref_release_relaxed(&wq->wq_refcnt) == 0) { 1457 workq_deallocate_queue_invoke(&wq->wq_destroy_link, 1458 &workq_deallocate_queue); 1459 } 1460 } 1461 1462 void 1463 workq_deallocate_safe(struct workqueue *wq) 1464 { 1465 if (__improbable(os_ref_release_relaxed(&wq->wq_refcnt) == 0)) { 1466 mpsc_daemon_enqueue(&workq_deallocate_queue, &wq->wq_destroy_link, 1467 MPSC_QUEUE_DISABLE_PREEMPTION); 1468 } 1469 } 1470 1471 /** 1472 * Setup per-process state for the workqueue. 1473 */ 1474 int 1475 workq_open(struct proc *p, __unused struct workq_open_args *uap, 1476 __unused int32_t *retval) 1477 { 1478 struct workqueue *wq; 1479 int error = 0; 1480 1481 if ((p->p_lflag & P_LREGISTER) == 0) { 1482 return EINVAL; 1483 } 1484 1485 if (wq_init_constrained_limit) { 1486 uint32_t limit, num_cpus = ml_wait_max_cpus(); 1487 1488 /* 1489 * set up the limit for the constrained pool 1490 * this is a virtual pool in that we don't 1491 * maintain it on a separate idle and run list 1492 */ 1493 limit = num_cpus * WORKQUEUE_CONSTRAINED_FACTOR; 1494 1495 if (limit > wq_max_constrained_threads) { 1496 wq_max_constrained_threads = limit; 1497 } 1498 1499 if (wq_max_threads > WQ_THACTIVE_BUCKET_HALF) { 1500 wq_max_threads = WQ_THACTIVE_BUCKET_HALF; 1501 } 1502 if (wq_max_threads > CONFIG_THREAD_MAX - 20) { 1503 wq_max_threads = CONFIG_THREAD_MAX - 20; 1504 } 1505 1506 wq_death_max_load = (uint16_t)fls(num_cpus) + 1; 1507 1508 for (thread_qos_t qos = WORKQ_THREAD_QOS_MIN; qos <= WORKQ_THREAD_QOS_MAX; qos++) { 1509 wq_max_parallelism[_wq_bucket(qos)] = 1510 qos_max_parallelism(qos, QOS_PARALLELISM_COUNT_LOGICAL); 1511 } 1512 1513 wq_init_constrained_limit = 0; 1514 } 1515 1516 if (proc_get_wqptr(p) == NULL) { 1517 if (proc_init_wqptr_or_wait(p) == FALSE) { 1518 assert(proc_get_wqptr(p) != NULL); 1519 goto out; 1520 } 1521 1522 wq = (struct workqueue *)zalloc(workq_zone_workqueue); 1523 bzero(wq, sizeof(struct workqueue)); 1524 1525 os_ref_init_count(&wq->wq_refcnt, &workq_refgrp, 1); 1526 1527 // Start the event manager at the priority hinted at by the policy engine 1528 thread_qos_t mgr_priority_hint = task_get_default_manager_qos(current_task()); 1529 pthread_priority_t pp = _pthread_priority_make_from_thread_qos(mgr_priority_hint, 0, 0); 1530 wq->wq_event_manager_priority = (uint32_t)pp; 1531 wq->wq_timer_interval = (uint32_t)wq_stalled_window.abstime; 1532 wq->wq_proc = p; 1533 turnstile_prepare((uintptr_t)wq, &wq->wq_turnstile, turnstile_alloc(), 1534 TURNSTILE_WORKQS); 1535 1536 TAILQ_INIT(&wq->wq_thrunlist); 1537 TAILQ_INIT(&wq->wq_thnewlist); 1538 TAILQ_INIT(&wq->wq_thidlelist); 1539 priority_queue_init(&wq->wq_overcommit_queue); 1540 priority_queue_init(&wq->wq_constrained_queue); 1541 priority_queue_init(&wq->wq_special_queue); 1542 1543 wq->wq_delayed_call = thread_call_allocate_with_options( 1544 workq_add_new_threads_call, p, THREAD_CALL_PRIORITY_KERNEL, 1545 THREAD_CALL_OPTIONS_ONCE); 1546 wq->wq_immediate_call = thread_call_allocate_with_options( 1547 workq_add_new_threads_call, p, THREAD_CALL_PRIORITY_KERNEL, 1548 THREAD_CALL_OPTIONS_ONCE); 1549 wq->wq_death_call = thread_call_allocate_with_options( 1550 workq_kill_old_threads_call, wq, 1551 THREAD_CALL_PRIORITY_USER, THREAD_CALL_OPTIONS_ONCE); 1552 1553 lck_spin_init(&wq->wq_lock, &workq_lck_grp, LCK_ATTR_NULL); 1554 1555 WQ_TRACE_WQ(TRACE_wq_create | DBG_FUNC_NONE, wq, 1556 VM_KERNEL_ADDRHIDE(wq), 0, 0, 0); 1557 proc_set_wqptr(p, wq); 1558 } 1559 out: 1560 1561 return error; 1562 } 1563 1564 /* 1565 * Routine: workq_mark_exiting 1566 * 1567 * Function: Mark the work queue such that new threads will not be added to the 1568 * work queue after we return. 1569 * 1570 * Conditions: Called against the current process. 1571 */ 1572 void 1573 workq_mark_exiting(struct proc *p) 1574 { 1575 struct workqueue *wq = proc_get_wqptr(p); 1576 uint32_t wq_flags; 1577 workq_threadreq_t mgr_req; 1578 1579 if (!wq) { 1580 return; 1581 } 1582 1583 WQ_TRACE_WQ(TRACE_wq_pthread_exit | DBG_FUNC_START, wq, 0, 0, 0, 0); 1584 1585 workq_lock_spin(wq); 1586 1587 wq_flags = os_atomic_or_orig(&wq->wq_flags, WQ_EXITING, relaxed); 1588 if (__improbable(wq_flags & WQ_EXITING)) { 1589 panic("workq_mark_exiting called twice"); 1590 } 1591 1592 /* 1593 * Opportunistically try to cancel thread calls that are likely in flight. 1594 * workq_exit() will do the proper cleanup. 1595 */ 1596 if (wq_flags & WQ_IMMEDIATE_CALL_SCHEDULED) { 1597 thread_call_cancel(wq->wq_immediate_call); 1598 } 1599 if (wq_flags & WQ_DELAYED_CALL_SCHEDULED) { 1600 thread_call_cancel(wq->wq_delayed_call); 1601 } 1602 if (wq_flags & WQ_DEATH_CALL_SCHEDULED) { 1603 thread_call_cancel(wq->wq_death_call); 1604 } 1605 1606 mgr_req = wq->wq_event_manager_threadreq; 1607 wq->wq_event_manager_threadreq = NULL; 1608 wq->wq_reqcount = 0; /* workq_schedule_creator must not look at queues */ 1609 wq->wq_creator = NULL; 1610 workq_turnstile_update_inheritor(wq, TURNSTILE_INHERITOR_NULL, 0); 1611 1612 workq_unlock(wq); 1613 1614 if (mgr_req) { 1615 kqueue_threadreq_cancel(p, mgr_req); 1616 } 1617 /* 1618 * No one touches the priority queues once WQ_EXITING is set. 1619 * It is hence safe to do the tear down without holding any lock. 1620 */ 1621 priority_queue_destroy(&wq->wq_overcommit_queue, 1622 struct workq_threadreq_s, tr_entry, ^(workq_threadreq_t e){ 1623 workq_threadreq_destroy(p, e); 1624 }); 1625 priority_queue_destroy(&wq->wq_constrained_queue, 1626 struct workq_threadreq_s, tr_entry, ^(workq_threadreq_t e){ 1627 workq_threadreq_destroy(p, e); 1628 }); 1629 priority_queue_destroy(&wq->wq_special_queue, 1630 struct workq_threadreq_s, tr_entry, ^(workq_threadreq_t e){ 1631 workq_threadreq_destroy(p, e); 1632 }); 1633 1634 WQ_TRACE(TRACE_wq_pthread_exit | DBG_FUNC_END, 0, 0, 0, 0, 0); 1635 } 1636 1637 /* 1638 * Routine: workq_exit 1639 * 1640 * Function: clean up the work queue structure(s) now that there are no threads 1641 * left running inside the work queue (except possibly current_thread). 1642 * 1643 * Conditions: Called by the last thread in the process. 1644 * Called against current process. 1645 */ 1646 void 1647 workq_exit(struct proc *p) 1648 { 1649 struct workqueue *wq; 1650 struct uthread *uth, *tmp; 1651 1652 wq = os_atomic_xchg(&p->p_wqptr, NULL, relaxed); 1653 if (wq != NULL) { 1654 thread_t th = current_thread(); 1655 1656 WQ_TRACE_WQ(TRACE_wq_workqueue_exit | DBG_FUNC_START, wq, 0, 0, 0, 0); 1657 1658 if (thread_get_tag(th) & THREAD_TAG_WORKQUEUE) { 1659 /* 1660 * <rdar://problem/40111515> Make sure we will no longer call the 1661 * sched call, if we ever block this thread, which the cancel_wait 1662 * below can do. 1663 */ 1664 thread_sched_call(th, NULL); 1665 } 1666 1667 /* 1668 * Thread calls are always scheduled by the proc itself or under the 1669 * workqueue spinlock if WQ_EXITING is not yet set. 1670 * 1671 * Either way, when this runs, the proc has no threads left beside 1672 * the one running this very code, so we know no thread call can be 1673 * dispatched anymore. 1674 */ 1675 thread_call_cancel_wait(wq->wq_delayed_call); 1676 thread_call_cancel_wait(wq->wq_immediate_call); 1677 thread_call_cancel_wait(wq->wq_death_call); 1678 thread_call_free(wq->wq_delayed_call); 1679 thread_call_free(wq->wq_immediate_call); 1680 thread_call_free(wq->wq_death_call); 1681 1682 /* 1683 * Clean up workqueue data structures for threads that exited and 1684 * didn't get a chance to clean up after themselves. 1685 * 1686 * idle/new threads should have been interrupted and died on their own 1687 */ 1688 TAILQ_FOREACH_SAFE(uth, &wq->wq_thrunlist, uu_workq_entry, tmp) { 1689 thread_sched_call(uth->uu_thread, NULL); 1690 thread_deallocate(uth->uu_thread); 1691 } 1692 assert(TAILQ_EMPTY(&wq->wq_thnewlist)); 1693 assert(TAILQ_EMPTY(&wq->wq_thidlelist)); 1694 1695 WQ_TRACE_WQ(TRACE_wq_destroy | DBG_FUNC_END, wq, 1696 VM_KERNEL_ADDRHIDE(wq), 0, 0, 0); 1697 1698 workq_deallocate(wq); 1699 1700 WQ_TRACE(TRACE_wq_workqueue_exit | DBG_FUNC_END, 0, 0, 0, 0, 0); 1701 } 1702 } 1703 1704 1705 #pragma mark bsd thread control 1706 1707 static bool 1708 _pthread_priority_to_policy(pthread_priority_t priority, 1709 thread_qos_policy_data_t *data) 1710 { 1711 data->qos_tier = _pthread_priority_thread_qos(priority); 1712 data->tier_importance = _pthread_priority_relpri(priority); 1713 if (data->qos_tier == THREAD_QOS_UNSPECIFIED || data->tier_importance > 0 || 1714 data->tier_importance < THREAD_QOS_MIN_TIER_IMPORTANCE) { 1715 return false; 1716 } 1717 return true; 1718 } 1719 1720 static int 1721 bsdthread_set_self(proc_t p, thread_t th, pthread_priority_t priority, 1722 mach_port_name_t voucher, enum workq_set_self_flags flags) 1723 { 1724 struct uthread *uth = get_bsdthread_info(th); 1725 struct workqueue *wq = proc_get_wqptr(p); 1726 1727 kern_return_t kr; 1728 int unbind_rv = 0, qos_rv = 0, voucher_rv = 0, fixedpri_rv = 0; 1729 bool is_wq_thread = (thread_get_tag(th) & THREAD_TAG_WORKQUEUE); 1730 1731 if (flags & WORKQ_SET_SELF_WQ_KEVENT_UNBIND) { 1732 if (!is_wq_thread) { 1733 unbind_rv = EINVAL; 1734 goto qos; 1735 } 1736 1737 if (uth->uu_workq_pri.qos_bucket == WORKQ_THREAD_QOS_MANAGER) { 1738 unbind_rv = EINVAL; 1739 goto qos; 1740 } 1741 1742 workq_threadreq_t kqr = uth->uu_kqr_bound; 1743 if (kqr == NULL) { 1744 unbind_rv = EALREADY; 1745 goto qos; 1746 } 1747 1748 if (kqr->tr_flags & WORKQ_TR_FLAG_WORKLOOP) { 1749 unbind_rv = EINVAL; 1750 goto qos; 1751 } 1752 1753 kqueue_threadreq_unbind(p, kqr); 1754 } 1755 1756 qos: 1757 if (flags & WORKQ_SET_SELF_QOS_FLAG) { 1758 thread_qos_policy_data_t new_policy; 1759 1760 if (!_pthread_priority_to_policy(priority, &new_policy)) { 1761 qos_rv = EINVAL; 1762 goto voucher; 1763 } 1764 1765 if (!is_wq_thread) { 1766 /* 1767 * Threads opted out of QoS can't change QoS 1768 */ 1769 if (!thread_has_qos_policy(th)) { 1770 qos_rv = EPERM; 1771 goto voucher; 1772 } 1773 } else if (uth->uu_workq_pri.qos_bucket == WORKQ_THREAD_QOS_MANAGER || 1774 uth->uu_workq_pri.qos_bucket == WORKQ_THREAD_QOS_ABOVEUI) { 1775 /* 1776 * Workqueue manager threads or threads above UI can't change QoS 1777 */ 1778 qos_rv = EINVAL; 1779 goto voucher; 1780 } else { 1781 /* 1782 * For workqueue threads, possibly adjust buckets and redrive thread 1783 * requests. 1784 */ 1785 bool old_overcommit = uth->uu_workq_flags & UT_WORKQ_OVERCOMMIT; 1786 bool new_overcommit = priority & _PTHREAD_PRIORITY_OVERCOMMIT_FLAG; 1787 struct uu_workq_policy old_pri, new_pri; 1788 bool force_run = false; 1789 1790 workq_lock_spin(wq); 1791 1792 if (old_overcommit != new_overcommit) { 1793 uth->uu_workq_flags ^= UT_WORKQ_OVERCOMMIT; 1794 if (old_overcommit) { 1795 wq->wq_constrained_threads_scheduled++; 1796 } else if (wq->wq_constrained_threads_scheduled-- == 1797 wq_max_constrained_threads) { 1798 force_run = true; 1799 } 1800 } 1801 1802 old_pri = new_pri = uth->uu_workq_pri; 1803 new_pri.qos_req = (thread_qos_t)new_policy.qos_tier; 1804 workq_thread_update_bucket(p, wq, uth, old_pri, new_pri, force_run); 1805 workq_unlock(wq); 1806 } 1807 1808 kr = thread_policy_set_internal(th, THREAD_QOS_POLICY, 1809 (thread_policy_t)&new_policy, THREAD_QOS_POLICY_COUNT); 1810 if (kr != KERN_SUCCESS) { 1811 qos_rv = EINVAL; 1812 } 1813 } 1814 1815 voucher: 1816 if (flags & WORKQ_SET_SELF_VOUCHER_FLAG) { 1817 kr = thread_set_voucher_name(voucher); 1818 if (kr != KERN_SUCCESS) { 1819 voucher_rv = ENOENT; 1820 goto fixedpri; 1821 } 1822 } 1823 1824 fixedpri: 1825 if (qos_rv) { 1826 goto done; 1827 } 1828 if (flags & WORKQ_SET_SELF_FIXEDPRIORITY_FLAG) { 1829 thread_extended_policy_data_t extpol = {.timeshare = 0}; 1830 1831 if (is_wq_thread) { 1832 /* Not allowed on workqueue threads */ 1833 fixedpri_rv = ENOTSUP; 1834 goto done; 1835 } 1836 1837 kr = thread_policy_set_internal(th, THREAD_EXTENDED_POLICY, 1838 (thread_policy_t)&extpol, THREAD_EXTENDED_POLICY_COUNT); 1839 if (kr != KERN_SUCCESS) { 1840 fixedpri_rv = EINVAL; 1841 goto done; 1842 } 1843 } else if (flags & WORKQ_SET_SELF_TIMESHARE_FLAG) { 1844 thread_extended_policy_data_t extpol = {.timeshare = 1}; 1845 1846 if (is_wq_thread) { 1847 /* Not allowed on workqueue threads */ 1848 fixedpri_rv = ENOTSUP; 1849 goto done; 1850 } 1851 1852 kr = thread_policy_set_internal(th, THREAD_EXTENDED_POLICY, 1853 (thread_policy_t)&extpol, THREAD_EXTENDED_POLICY_COUNT); 1854 if (kr != KERN_SUCCESS) { 1855 fixedpri_rv = EINVAL; 1856 goto done; 1857 } 1858 } 1859 1860 done: 1861 if (qos_rv && voucher_rv) { 1862 /* Both failed, give that a unique error. */ 1863 return EBADMSG; 1864 } 1865 1866 if (unbind_rv) { 1867 return unbind_rv; 1868 } 1869 1870 if (qos_rv) { 1871 return qos_rv; 1872 } 1873 1874 if (voucher_rv) { 1875 return voucher_rv; 1876 } 1877 1878 if (fixedpri_rv) { 1879 return fixedpri_rv; 1880 } 1881 1882 1883 return 0; 1884 } 1885 1886 static int 1887 bsdthread_add_explicit_override(proc_t p, mach_port_name_t kport, 1888 pthread_priority_t pp, user_addr_t resource) 1889 { 1890 thread_qos_t qos = _pthread_priority_thread_qos(pp); 1891 if (qos == THREAD_QOS_UNSPECIFIED) { 1892 return EINVAL; 1893 } 1894 1895 thread_t th = port_name_to_thread(kport, 1896 PORT_TO_THREAD_IN_CURRENT_TASK); 1897 if (th == THREAD_NULL) { 1898 return ESRCH; 1899 } 1900 1901 int rv = proc_thread_qos_add_override(p->task, th, 0, qos, TRUE, 1902 resource, THREAD_QOS_OVERRIDE_TYPE_PTHREAD_EXPLICIT_OVERRIDE); 1903 1904 thread_deallocate(th); 1905 return rv; 1906 } 1907 1908 static int 1909 bsdthread_remove_explicit_override(proc_t p, mach_port_name_t kport, 1910 user_addr_t resource) 1911 { 1912 thread_t th = port_name_to_thread(kport, 1913 PORT_TO_THREAD_IN_CURRENT_TASK); 1914 if (th == THREAD_NULL) { 1915 return ESRCH; 1916 } 1917 1918 int rv = proc_thread_qos_remove_override(p->task, th, 0, resource, 1919 THREAD_QOS_OVERRIDE_TYPE_PTHREAD_EXPLICIT_OVERRIDE); 1920 1921 thread_deallocate(th); 1922 return rv; 1923 } 1924 1925 static int 1926 workq_thread_add_dispatch_override(proc_t p, mach_port_name_t kport, 1927 pthread_priority_t pp, user_addr_t ulock_addr) 1928 { 1929 struct uu_workq_policy old_pri, new_pri; 1930 struct workqueue *wq = proc_get_wqptr(p); 1931 1932 thread_qos_t qos_override = _pthread_priority_thread_qos(pp); 1933 if (qos_override == THREAD_QOS_UNSPECIFIED) { 1934 return EINVAL; 1935 } 1936 1937 thread_t thread = port_name_to_thread(kport, 1938 PORT_TO_THREAD_IN_CURRENT_TASK); 1939 if (thread == THREAD_NULL) { 1940 return ESRCH; 1941 } 1942 1943 struct uthread *uth = get_bsdthread_info(thread); 1944 if ((thread_get_tag(thread) & THREAD_TAG_WORKQUEUE) == 0) { 1945 thread_deallocate(thread); 1946 return EPERM; 1947 } 1948 1949 WQ_TRACE_WQ(TRACE_wq_override_dispatch | DBG_FUNC_NONE, 1950 wq, thread_tid(thread), 1, pp, 0); 1951 1952 thread_mtx_lock(thread); 1953 1954 if (ulock_addr) { 1955 uint32_t val; 1956 int rc; 1957 /* 1958 * Workaround lack of explicit support for 'no-fault copyin' 1959 * <rdar://problem/24999882>, as disabling preemption prevents paging in 1960 */ 1961 disable_preemption(); 1962 rc = copyin_atomic32(ulock_addr, &val); 1963 enable_preemption(); 1964 if (rc == 0 && ulock_owner_value_to_port_name(val) != kport) { 1965 goto out; 1966 } 1967 } 1968 1969 workq_lock_spin(wq); 1970 1971 old_pri = uth->uu_workq_pri; 1972 if (old_pri.qos_override >= qos_override) { 1973 /* Nothing to do */ 1974 } else if (thread == current_thread()) { 1975 new_pri = old_pri; 1976 new_pri.qos_override = qos_override; 1977 workq_thread_update_bucket(p, wq, uth, old_pri, new_pri, false); 1978 } else { 1979 uth->uu_workq_pri.qos_override = qos_override; 1980 if (qos_override > workq_pri_override(old_pri)) { 1981 thread_set_workq_override(thread, qos_override); 1982 } 1983 } 1984 1985 workq_unlock(wq); 1986 1987 out: 1988 thread_mtx_unlock(thread); 1989 thread_deallocate(thread); 1990 return 0; 1991 } 1992 1993 static int 1994 workq_thread_reset_dispatch_override(proc_t p, thread_t thread) 1995 { 1996 struct uu_workq_policy old_pri, new_pri; 1997 struct workqueue *wq = proc_get_wqptr(p); 1998 struct uthread *uth = get_bsdthread_info(thread); 1999 2000 if ((thread_get_tag(thread) & THREAD_TAG_WORKQUEUE) == 0) { 2001 return EPERM; 2002 } 2003 2004 WQ_TRACE_WQ(TRACE_wq_override_reset | DBG_FUNC_NONE, wq, 0, 0, 0, 0); 2005 2006 workq_lock_spin(wq); 2007 old_pri = new_pri = uth->uu_workq_pri; 2008 new_pri.qos_override = THREAD_QOS_UNSPECIFIED; 2009 workq_thread_update_bucket(p, wq, uth, old_pri, new_pri, false); 2010 workq_unlock(wq); 2011 return 0; 2012 } 2013 2014 static int 2015 workq_thread_allow_kill(__unused proc_t p, thread_t thread, bool enable) 2016 { 2017 if (!(thread_get_tag(thread) & THREAD_TAG_WORKQUEUE)) { 2018 // If the thread isn't a workqueue thread, don't set the 2019 // kill_allowed bit; however, we still need to return 0 2020 // instead of an error code since this code is executed 2021 // on the abort path which needs to not depend on the 2022 // pthread_t (returning an error depends on pthread_t via 2023 // cerror_nocancel) 2024 return 0; 2025 } 2026 struct uthread *uth = get_bsdthread_info(thread); 2027 uth->uu_workq_pthread_kill_allowed = enable; 2028 return 0; 2029 } 2030 2031 static int 2032 bsdthread_get_max_parallelism(thread_qos_t qos, unsigned long flags, 2033 int *retval) 2034 { 2035 static_assert(QOS_PARALLELISM_COUNT_LOGICAL == 2036 _PTHREAD_QOS_PARALLELISM_COUNT_LOGICAL, "logical"); 2037 static_assert(QOS_PARALLELISM_REALTIME == 2038 _PTHREAD_QOS_PARALLELISM_REALTIME, "realtime"); 2039 2040 if (flags & ~(QOS_PARALLELISM_REALTIME | QOS_PARALLELISM_COUNT_LOGICAL)) { 2041 return EINVAL; 2042 } 2043 2044 if (flags & QOS_PARALLELISM_REALTIME) { 2045 if (qos) { 2046 return EINVAL; 2047 } 2048 } else if (qos == THREAD_QOS_UNSPECIFIED || qos >= THREAD_QOS_LAST) { 2049 return EINVAL; 2050 } 2051 2052 *retval = qos_max_parallelism(qos, flags); 2053 return 0; 2054 } 2055 2056 #define ENSURE_UNUSED(arg) \ 2057 ({ if ((arg) != 0) { return EINVAL; } }) 2058 2059 int 2060 bsdthread_ctl(struct proc *p, struct bsdthread_ctl_args *uap, int *retval) 2061 { 2062 switch (uap->cmd) { 2063 case BSDTHREAD_CTL_QOS_OVERRIDE_START: 2064 return bsdthread_add_explicit_override(p, (mach_port_name_t)uap->arg1, 2065 (pthread_priority_t)uap->arg2, uap->arg3); 2066 case BSDTHREAD_CTL_QOS_OVERRIDE_END: 2067 ENSURE_UNUSED(uap->arg3); 2068 return bsdthread_remove_explicit_override(p, (mach_port_name_t)uap->arg1, 2069 (user_addr_t)uap->arg2); 2070 2071 case BSDTHREAD_CTL_QOS_OVERRIDE_DISPATCH: 2072 return workq_thread_add_dispatch_override(p, (mach_port_name_t)uap->arg1, 2073 (pthread_priority_t)uap->arg2, uap->arg3); 2074 case BSDTHREAD_CTL_QOS_OVERRIDE_RESET: 2075 return workq_thread_reset_dispatch_override(p, current_thread()); 2076 2077 case BSDTHREAD_CTL_SET_SELF: 2078 return bsdthread_set_self(p, current_thread(), 2079 (pthread_priority_t)uap->arg1, (mach_port_name_t)uap->arg2, 2080 (enum workq_set_self_flags)uap->arg3); 2081 2082 case BSDTHREAD_CTL_QOS_MAX_PARALLELISM: 2083 ENSURE_UNUSED(uap->arg3); 2084 return bsdthread_get_max_parallelism((thread_qos_t)uap->arg1, 2085 (unsigned long)uap->arg2, retval); 2086 case BSDTHREAD_CTL_WORKQ_ALLOW_KILL: 2087 ENSURE_UNUSED(uap->arg2); 2088 ENSURE_UNUSED(uap->arg3); 2089 return workq_thread_allow_kill(p, current_thread(), (bool)uap->arg1); 2090 2091 case BSDTHREAD_CTL_SET_QOS: 2092 case BSDTHREAD_CTL_QOS_DISPATCH_ASYNCHRONOUS_OVERRIDE_ADD: 2093 case BSDTHREAD_CTL_QOS_DISPATCH_ASYNCHRONOUS_OVERRIDE_RESET: 2094 /* no longer supported */ 2095 return ENOTSUP; 2096 2097 default: 2098 return EINVAL; 2099 } 2100 } 2101 2102 #pragma mark workqueue thread manipulation 2103 2104 static void __dead2 2105 workq_unpark_select_threadreq_or_park_and_unlock(proc_t p, struct workqueue *wq, 2106 struct uthread *uth, uint32_t setup_flags); 2107 2108 static void __dead2 2109 workq_select_threadreq_or_park_and_unlock(proc_t p, struct workqueue *wq, 2110 struct uthread *uth, uint32_t setup_flags); 2111 2112 static void workq_setup_and_run(proc_t p, struct uthread *uth, int flags) __dead2; 2113 2114 #if KDEBUG_LEVEL >= KDEBUG_LEVEL_STANDARD 2115 static inline uint64_t 2116 workq_trace_req_id(workq_threadreq_t req) 2117 { 2118 struct kqworkloop *kqwl; 2119 if (req->tr_flags & WORKQ_TR_FLAG_WORKLOOP) { 2120 kqwl = __container_of(req, struct kqworkloop, kqwl_request); 2121 return kqwl->kqwl_dynamicid; 2122 } 2123 2124 return VM_KERNEL_ADDRHIDE(req); 2125 } 2126 #endif 2127 2128 /** 2129 * Entry point for libdispatch to ask for threads 2130 */ 2131 static int 2132 workq_reqthreads(struct proc *p, uint32_t reqcount, pthread_priority_t pp) 2133 { 2134 thread_qos_t qos = _pthread_priority_thread_qos(pp); 2135 struct workqueue *wq = proc_get_wqptr(p); 2136 uint32_t unpaced, upcall_flags = WQ_FLAG_THREAD_NEWSPI; 2137 2138 if (wq == NULL || reqcount <= 0 || reqcount > UINT16_MAX || 2139 qos == THREAD_QOS_UNSPECIFIED) { 2140 return EINVAL; 2141 } 2142 2143 WQ_TRACE_WQ(TRACE_wq_wqops_reqthreads | DBG_FUNC_NONE, 2144 wq, reqcount, pp, 0, 0); 2145 2146 workq_threadreq_t req = zalloc(workq_zone_threadreq); 2147 priority_queue_entry_init(&req->tr_entry); 2148 req->tr_state = WORKQ_TR_STATE_NEW; 2149 req->tr_flags = 0; 2150 req->tr_qos = qos; 2151 2152 if (pp & _PTHREAD_PRIORITY_OVERCOMMIT_FLAG) { 2153 req->tr_flags |= WORKQ_TR_FLAG_OVERCOMMIT; 2154 upcall_flags |= WQ_FLAG_THREAD_OVERCOMMIT; 2155 } 2156 2157 WQ_TRACE_WQ(TRACE_wq_thread_request_initiate | DBG_FUNC_NONE, 2158 wq, workq_trace_req_id(req), req->tr_qos, reqcount, 0); 2159 2160 workq_lock_spin(wq); 2161 do { 2162 if (_wq_exiting(wq)) { 2163 goto exiting; 2164 } 2165 2166 /* 2167 * When userspace is asking for parallelism, wakeup up to (reqcount - 1) 2168 * threads without pacing, to inform the scheduler of that workload. 2169 * 2170 * The last requests, or the ones that failed the admission checks are 2171 * enqueued and go through the regular creator codepath. 2172 * 2173 * If there aren't enough threads, add one, but re-evaluate everything 2174 * as conditions may now have changed. 2175 */ 2176 if (reqcount > 1 && (req->tr_flags & WORKQ_TR_FLAG_OVERCOMMIT) == 0) { 2177 unpaced = workq_constrained_allowance(wq, qos, NULL, false); 2178 if (unpaced >= reqcount - 1) { 2179 unpaced = reqcount - 1; 2180 } 2181 } else { 2182 unpaced = reqcount - 1; 2183 } 2184 2185 /* 2186 * This path does not currently handle custom workloop parameters 2187 * when creating threads for parallelism. 2188 */ 2189 assert(!(req->tr_flags & WORKQ_TR_FLAG_WL_PARAMS)); 2190 2191 /* 2192 * This is a trimmed down version of workq_threadreq_bind_and_unlock() 2193 */ 2194 while (unpaced > 0 && wq->wq_thidlecount) { 2195 struct uthread *uth; 2196 bool needs_wakeup; 2197 uint8_t uu_flags = UT_WORKQ_EARLY_BOUND; 2198 2199 if (req->tr_flags & WORKQ_TR_FLAG_OVERCOMMIT) { 2200 uu_flags |= UT_WORKQ_OVERCOMMIT; 2201 } 2202 2203 uth = workq_pop_idle_thread(wq, uu_flags, &needs_wakeup); 2204 2205 _wq_thactive_inc(wq, qos); 2206 wq->wq_thscheduled_count[_wq_bucket(qos)]++; 2207 workq_thread_reset_pri(wq, uth, req, /*unpark*/ true); 2208 wq->wq_fulfilled++; 2209 2210 uth->uu_save.uus_workq_park_data.upcall_flags = upcall_flags; 2211 uth->uu_save.uus_workq_park_data.thread_request = req; 2212 if (needs_wakeup) { 2213 workq_thread_wakeup(uth); 2214 } 2215 unpaced--; 2216 reqcount--; 2217 } 2218 } while (unpaced && wq->wq_nthreads < wq_max_threads && 2219 workq_add_new_idle_thread(p, wq)); 2220 2221 if (_wq_exiting(wq)) { 2222 goto exiting; 2223 } 2224 2225 req->tr_count = (uint16_t)reqcount; 2226 if (workq_threadreq_enqueue(wq, req)) { 2227 /* This can drop the workqueue lock, and take it again */ 2228 workq_schedule_creator(p, wq, WORKQ_THREADREQ_CAN_CREATE_THREADS); 2229 } 2230 workq_unlock(wq); 2231 return 0; 2232 2233 exiting: 2234 workq_unlock(wq); 2235 zfree(workq_zone_threadreq, req); 2236 return ECANCELED; 2237 } 2238 2239 bool 2240 workq_kern_threadreq_initiate(struct proc *p, workq_threadreq_t req, 2241 struct turnstile *workloop_ts, thread_qos_t qos, 2242 workq_kern_threadreq_flags_t flags) 2243 { 2244 struct workqueue *wq = proc_get_wqptr_fast(p); 2245 struct uthread *uth = NULL; 2246 2247 assert(req->tr_flags & (WORKQ_TR_FLAG_WORKLOOP | WORKQ_TR_FLAG_KEVENT)); 2248 2249 if (req->tr_flags & WORKQ_TR_FLAG_WL_OUTSIDE_QOS) { 2250 workq_threadreq_param_t trp = kqueue_threadreq_workloop_param(req); 2251 qos = thread_workq_qos_for_pri(trp.trp_pri); 2252 if (qos == THREAD_QOS_UNSPECIFIED) { 2253 qos = WORKQ_THREAD_QOS_ABOVEUI; 2254 } 2255 } 2256 2257 assert(req->tr_state == WORKQ_TR_STATE_IDLE); 2258 priority_queue_entry_init(&req->tr_entry); 2259 req->tr_count = 1; 2260 req->tr_state = WORKQ_TR_STATE_NEW; 2261 req->tr_qos = qos; 2262 2263 WQ_TRACE_WQ(TRACE_wq_thread_request_initiate | DBG_FUNC_NONE, wq, 2264 workq_trace_req_id(req), qos, 1, 0); 2265 2266 if (flags & WORKQ_THREADREQ_ATTEMPT_REBIND) { 2267 /* 2268 * we're called back synchronously from the context of 2269 * kqueue_threadreq_unbind from within workq_thread_return() 2270 * we can try to match up this thread with this request ! 2271 */ 2272 uth = current_uthread(); 2273 assert(uth->uu_kqr_bound == NULL); 2274 } 2275 2276 workq_lock_spin(wq); 2277 if (_wq_exiting(wq)) { 2278 req->tr_state = WORKQ_TR_STATE_IDLE; 2279 workq_unlock(wq); 2280 return false; 2281 } 2282 2283 if (uth && workq_threadreq_admissible(wq, uth, req)) { 2284 assert(uth != wq->wq_creator); 2285 if (uth->uu_workq_pri.qos_bucket != req->tr_qos) { 2286 _wq_thactive_move(wq, uth->uu_workq_pri.qos_bucket, req->tr_qos); 2287 workq_thread_reset_pri(wq, uth, req, /*unpark*/ false); 2288 } 2289 /* 2290 * We're called from workq_kern_threadreq_initiate() 2291 * due to an unbind, with the kq req held. 2292 */ 2293 WQ_TRACE_WQ(TRACE_wq_thread_logical_run | DBG_FUNC_START, wq, 2294 workq_trace_req_id(req), 0, 0, 0); 2295 wq->wq_fulfilled++; 2296 kqueue_threadreq_bind(p, req, uth->uu_thread, 0); 2297 } else { 2298 if (workloop_ts) { 2299 workq_perform_turnstile_operation_locked(wq, ^{ 2300 turnstile_update_inheritor(workloop_ts, wq->wq_turnstile, 2301 TURNSTILE_IMMEDIATE_UPDATE | TURNSTILE_INHERITOR_TURNSTILE); 2302 turnstile_update_inheritor_complete(workloop_ts, 2303 TURNSTILE_INTERLOCK_HELD); 2304 }); 2305 } 2306 if (workq_threadreq_enqueue(wq, req)) { 2307 workq_schedule_creator(p, wq, flags); 2308 } 2309 } 2310 2311 workq_unlock(wq); 2312 2313 return true; 2314 } 2315 2316 void 2317 workq_kern_threadreq_modify(struct proc *p, workq_threadreq_t req, 2318 thread_qos_t qos, workq_kern_threadreq_flags_t flags) 2319 { 2320 struct workqueue *wq = proc_get_wqptr_fast(p); 2321 bool make_overcommit = false; 2322 2323 if (req->tr_flags & WORKQ_TR_FLAG_WL_OUTSIDE_QOS) { 2324 /* Requests outside-of-QoS shouldn't accept modify operations */ 2325 return; 2326 } 2327 2328 workq_lock_spin(wq); 2329 2330 assert(req->tr_qos != WORKQ_THREAD_QOS_MANAGER); 2331 assert(req->tr_flags & (WORKQ_TR_FLAG_KEVENT | WORKQ_TR_FLAG_WORKLOOP)); 2332 2333 if (req->tr_state == WORKQ_TR_STATE_BINDING) { 2334 kqueue_threadreq_bind(p, req, req->tr_thread, 0); 2335 workq_unlock(wq); 2336 return; 2337 } 2338 2339 if (flags & WORKQ_THREADREQ_MAKE_OVERCOMMIT) { 2340 make_overcommit = (req->tr_flags & WORKQ_TR_FLAG_OVERCOMMIT) == 0; 2341 } 2342 2343 if (_wq_exiting(wq) || (req->tr_qos == qos && !make_overcommit)) { 2344 workq_unlock(wq); 2345 return; 2346 } 2347 2348 assert(req->tr_count == 1); 2349 if (req->tr_state != WORKQ_TR_STATE_QUEUED) { 2350 panic("Invalid thread request (%p) state %d", req, req->tr_state); 2351 } 2352 2353 WQ_TRACE_WQ(TRACE_wq_thread_request_modify | DBG_FUNC_NONE, wq, 2354 workq_trace_req_id(req), qos, 0, 0); 2355 2356 struct priority_queue_sched_max *pq = workq_priority_queue_for_req(wq, req); 2357 workq_threadreq_t req_max; 2358 2359 /* 2360 * Stage 1: Dequeue the request from its priority queue. 2361 * 2362 * If we dequeue the root item of the constrained priority queue, 2363 * maintain the best constrained request qos invariant. 2364 */ 2365 if (priority_queue_remove(pq, &req->tr_entry)) { 2366 if ((req->tr_flags & WORKQ_TR_FLAG_OVERCOMMIT) == 0) { 2367 _wq_thactive_refresh_best_constrained_req_qos(wq); 2368 } 2369 } 2370 2371 /* 2372 * Stage 2: Apply changes to the thread request 2373 * 2374 * If the item will not become the root of the priority queue it belongs to, 2375 * then we need to wait in line, just enqueue and return quickly. 2376 */ 2377 if (__improbable(make_overcommit)) { 2378 req->tr_flags ^= WORKQ_TR_FLAG_OVERCOMMIT; 2379 pq = workq_priority_queue_for_req(wq, req); 2380 } 2381 req->tr_qos = qos; 2382 2383 req_max = priority_queue_max(pq, struct workq_threadreq_s, tr_entry); 2384 if (req_max && req_max->tr_qos >= qos) { 2385 priority_queue_entry_set_sched_pri(pq, &req->tr_entry, 2386 workq_priority_for_req(req), false); 2387 priority_queue_insert(pq, &req->tr_entry); 2388 workq_unlock(wq); 2389 return; 2390 } 2391 2392 /* 2393 * Stage 3: Reevaluate whether we should run the thread request. 2394 * 2395 * Pretend the thread request is new again: 2396 * - adjust wq_reqcount to not count it anymore. 2397 * - make its state WORKQ_TR_STATE_NEW (so that workq_threadreq_bind_and_unlock 2398 * properly attempts a synchronous bind) 2399 */ 2400 wq->wq_reqcount--; 2401 req->tr_state = WORKQ_TR_STATE_NEW; 2402 if (workq_threadreq_enqueue(wq, req)) { 2403 workq_schedule_creator(p, wq, flags); 2404 } 2405 workq_unlock(wq); 2406 } 2407 2408 void 2409 workq_kern_threadreq_lock(struct proc *p) 2410 { 2411 workq_lock_spin(proc_get_wqptr_fast(p)); 2412 } 2413 2414 void 2415 workq_kern_threadreq_unlock(struct proc *p) 2416 { 2417 workq_unlock(proc_get_wqptr_fast(p)); 2418 } 2419 2420 void 2421 workq_kern_threadreq_update_inheritor(struct proc *p, workq_threadreq_t req, 2422 thread_t owner, struct turnstile *wl_ts, 2423 turnstile_update_flags_t flags) 2424 { 2425 struct workqueue *wq = proc_get_wqptr_fast(p); 2426 turnstile_inheritor_t inheritor; 2427 2428 assert(req->tr_qos != WORKQ_THREAD_QOS_MANAGER); 2429 assert(req->tr_flags & WORKQ_TR_FLAG_WORKLOOP); 2430 workq_lock_held(wq); 2431 2432 if (req->tr_state == WORKQ_TR_STATE_BINDING) { 2433 kqueue_threadreq_bind(p, req, req->tr_thread, 2434 KQUEUE_THREADERQ_BIND_NO_INHERITOR_UPDATE); 2435 return; 2436 } 2437 2438 if (_wq_exiting(wq)) { 2439 inheritor = TURNSTILE_INHERITOR_NULL; 2440 } else { 2441 if (req->tr_state != WORKQ_TR_STATE_QUEUED) { 2442 panic("Invalid thread request (%p) state %d", req, req->tr_state); 2443 } 2444 2445 if (owner) { 2446 inheritor = owner; 2447 flags |= TURNSTILE_INHERITOR_THREAD; 2448 } else { 2449 inheritor = wq->wq_turnstile; 2450 flags |= TURNSTILE_INHERITOR_TURNSTILE; 2451 } 2452 } 2453 2454 workq_perform_turnstile_operation_locked(wq, ^{ 2455 turnstile_update_inheritor(wl_ts, inheritor, flags); 2456 }); 2457 } 2458 2459 void 2460 workq_kern_threadreq_redrive(struct proc *p, workq_kern_threadreq_flags_t flags) 2461 { 2462 struct workqueue *wq = proc_get_wqptr_fast(p); 2463 2464 workq_lock_spin(wq); 2465 workq_schedule_creator(p, wq, flags); 2466 workq_unlock(wq); 2467 } 2468 2469 void 2470 workq_schedule_creator_turnstile_redrive(struct workqueue *wq, bool locked) 2471 { 2472 if (locked) { 2473 workq_schedule_creator(NULL, wq, WORKQ_THREADREQ_NONE); 2474 } else { 2475 workq_schedule_immediate_thread_creation(wq); 2476 } 2477 } 2478 2479 static int 2480 workq_thread_return(struct proc *p, struct workq_kernreturn_args *uap, 2481 struct workqueue *wq) 2482 { 2483 thread_t th = current_thread(); 2484 struct uthread *uth = get_bsdthread_info(th); 2485 workq_threadreq_t kqr = uth->uu_kqr_bound; 2486 workq_threadreq_param_t trp = { }; 2487 int nevents = uap->affinity, error; 2488 user_addr_t eventlist = uap->item; 2489 2490 if (((thread_get_tag(th) & THREAD_TAG_WORKQUEUE) == 0) || 2491 (uth->uu_workq_flags & UT_WORKQ_DYING)) { 2492 return EINVAL; 2493 } 2494 2495 if (eventlist && nevents && kqr == NULL) { 2496 return EINVAL; 2497 } 2498 2499 /* reset signal mask on the workqueue thread to default state */ 2500 if (uth->uu_sigmask != (sigset_t)(~workq_threadmask)) { 2501 proc_lock(p); 2502 uth->uu_sigmask = ~workq_threadmask; 2503 proc_unlock(p); 2504 } 2505 2506 if (kqr && kqr->tr_flags & WORKQ_TR_FLAG_WL_PARAMS) { 2507 /* 2508 * Ensure we store the threadreq param before unbinding 2509 * the kqr from this thread. 2510 */ 2511 trp = kqueue_threadreq_workloop_param(kqr); 2512 } 2513 2514 /* 2515 * Freeze thee base pri while we decide the fate of this thread. 2516 * 2517 * Either: 2518 * - we return to user and kevent_cleanup will have unfrozen the base pri, 2519 * - or we proceed to workq_select_threadreq_or_park_and_unlock() who will. 2520 */ 2521 thread_freeze_base_pri(th); 2522 2523 if (kqr) { 2524 uint32_t upcall_flags = WQ_FLAG_THREAD_NEWSPI | WQ_FLAG_THREAD_REUSE; 2525 if (kqr->tr_flags & WORKQ_TR_FLAG_WORKLOOP) { 2526 upcall_flags |= WQ_FLAG_THREAD_WORKLOOP | WQ_FLAG_THREAD_KEVENT; 2527 } else { 2528 upcall_flags |= WQ_FLAG_THREAD_KEVENT; 2529 } 2530 if (uth->uu_workq_pri.qos_bucket == WORKQ_THREAD_QOS_MANAGER) { 2531 upcall_flags |= WQ_FLAG_THREAD_EVENT_MANAGER; 2532 } else { 2533 if (uth->uu_workq_flags & UT_WORKQ_OVERCOMMIT) { 2534 upcall_flags |= WQ_FLAG_THREAD_OVERCOMMIT; 2535 } 2536 if (uth->uu_workq_flags & UT_WORKQ_OUTSIDE_QOS) { 2537 upcall_flags |= WQ_FLAG_THREAD_OUTSIDEQOS; 2538 } else { 2539 upcall_flags |= uth->uu_workq_pri.qos_req | 2540 WQ_FLAG_THREAD_PRIO_QOS; 2541 } 2542 } 2543 2544 error = pthread_functions->workq_handle_stack_events(p, th, 2545 get_task_map(p->task), uth->uu_workq_stackaddr, 2546 uth->uu_workq_thport, eventlist, nevents, upcall_flags); 2547 if (error) { 2548 assert(uth->uu_kqr_bound == kqr); 2549 return error; 2550 } 2551 2552 // pthread is supposed to pass KEVENT_FLAG_PARKING here 2553 // which should cause the above call to either: 2554 // - not return 2555 // - return an error 2556 // - return 0 and have unbound properly 2557 assert(uth->uu_kqr_bound == NULL); 2558 } 2559 2560 WQ_TRACE_WQ(TRACE_wq_runthread | DBG_FUNC_END, wq, uap->options, 0, 0, 0); 2561 2562 thread_sched_call(th, NULL); 2563 thread_will_park_or_terminate(th); 2564 #if CONFIG_WORKLOOP_DEBUG 2565 UU_KEVENT_HISTORY_WRITE_ENTRY(uth, { .uu_error = -1, }); 2566 #endif 2567 2568 workq_lock_spin(wq); 2569 WQ_TRACE_WQ(TRACE_wq_thread_logical_run | DBG_FUNC_END, wq, 0, 0, 0, 0); 2570 uth->uu_save.uus_workq_park_data.workloop_params = trp.trp_value; 2571 workq_select_threadreq_or_park_and_unlock(p, wq, uth, 2572 WQ_SETUP_CLEAR_VOUCHER); 2573 __builtin_unreachable(); 2574 } 2575 2576 /** 2577 * Multiplexed call to interact with the workqueue mechanism 2578 */ 2579 int 2580 workq_kernreturn(struct proc *p, struct workq_kernreturn_args *uap, int32_t *retval) 2581 { 2582 int options = uap->options; 2583 int arg2 = uap->affinity; 2584 int arg3 = uap->prio; 2585 struct workqueue *wq = proc_get_wqptr(p); 2586 int error = 0; 2587 2588 if ((p->p_lflag & P_LREGISTER) == 0) { 2589 return EINVAL; 2590 } 2591 2592 switch (options) { 2593 case WQOPS_QUEUE_NEWSPISUPP: { 2594 /* 2595 * arg2 = offset of serialno into dispatch queue 2596 * arg3 = kevent support 2597 */ 2598 int offset = arg2; 2599 if (arg3 & 0x01) { 2600 // If we get here, then userspace has indicated support for kevent delivery. 2601 } 2602 2603 p->p_dispatchqueue_serialno_offset = (uint64_t)offset; 2604 break; 2605 } 2606 case WQOPS_QUEUE_REQTHREADS: { 2607 /* 2608 * arg2 = number of threads to start 2609 * arg3 = priority 2610 */ 2611 error = workq_reqthreads(p, arg2, arg3); 2612 break; 2613 } 2614 case WQOPS_SET_EVENT_MANAGER_PRIORITY: { 2615 /* 2616 * arg2 = priority for the manager thread 2617 * 2618 * if _PTHREAD_PRIORITY_SCHED_PRI_FLAG is set, 2619 * the low bits of the value contains a scheduling priority 2620 * instead of a QOS value 2621 */ 2622 pthread_priority_t pri = arg2; 2623 2624 if (wq == NULL) { 2625 error = EINVAL; 2626 break; 2627 } 2628 2629 /* 2630 * Normalize the incoming priority so that it is ordered numerically. 2631 */ 2632 if (pri & _PTHREAD_PRIORITY_SCHED_PRI_FLAG) { 2633 pri &= (_PTHREAD_PRIORITY_SCHED_PRI_MASK | 2634 _PTHREAD_PRIORITY_SCHED_PRI_FLAG); 2635 } else { 2636 thread_qos_t qos = _pthread_priority_thread_qos(pri); 2637 int relpri = _pthread_priority_relpri(pri); 2638 if (relpri > 0 || relpri < THREAD_QOS_MIN_TIER_IMPORTANCE || 2639 qos == THREAD_QOS_UNSPECIFIED) { 2640 error = EINVAL; 2641 break; 2642 } 2643 pri &= ~_PTHREAD_PRIORITY_FLAGS_MASK; 2644 } 2645 2646 /* 2647 * If userspace passes a scheduling priority, that wins over any QoS. 2648 * Userspace should takes care not to lower the priority this way. 2649 */ 2650 workq_lock_spin(wq); 2651 if (wq->wq_event_manager_priority < (uint32_t)pri) { 2652 wq->wq_event_manager_priority = (uint32_t)pri; 2653 } 2654 workq_unlock(wq); 2655 break; 2656 } 2657 case WQOPS_THREAD_KEVENT_RETURN: 2658 case WQOPS_THREAD_WORKLOOP_RETURN: 2659 case WQOPS_THREAD_RETURN: { 2660 error = workq_thread_return(p, uap, wq); 2661 break; 2662 } 2663 2664 case WQOPS_SHOULD_NARROW: { 2665 /* 2666 * arg2 = priority to test 2667 * arg3 = unused 2668 */ 2669 thread_t th = current_thread(); 2670 struct uthread *uth = get_bsdthread_info(th); 2671 if (((thread_get_tag(th) & THREAD_TAG_WORKQUEUE) == 0) || 2672 (uth->uu_workq_flags & (UT_WORKQ_DYING | UT_WORKQ_OVERCOMMIT))) { 2673 error = EINVAL; 2674 break; 2675 } 2676 2677 thread_qos_t qos = _pthread_priority_thread_qos(arg2); 2678 if (qos == THREAD_QOS_UNSPECIFIED) { 2679 error = EINVAL; 2680 break; 2681 } 2682 workq_lock_spin(wq); 2683 bool should_narrow = !workq_constrained_allowance(wq, qos, uth, false); 2684 workq_unlock(wq); 2685 2686 *retval = should_narrow; 2687 break; 2688 } 2689 case WQOPS_SETUP_DISPATCH: { 2690 /* 2691 * item = pointer to workq_dispatch_config structure 2692 * arg2 = sizeof(item) 2693 */ 2694 struct workq_dispatch_config cfg; 2695 bzero(&cfg, sizeof(cfg)); 2696 2697 error = copyin(uap->item, &cfg, MIN(sizeof(cfg), (unsigned long) arg2)); 2698 if (error) { 2699 break; 2700 } 2701 2702 if (cfg.wdc_flags & ~WORKQ_DISPATCH_SUPPORTED_FLAGS || 2703 cfg.wdc_version < WORKQ_DISPATCH_MIN_SUPPORTED_VERSION) { 2704 error = ENOTSUP; 2705 break; 2706 } 2707 2708 /* Load fields from version 1 */ 2709 p->p_dispatchqueue_serialno_offset = cfg.wdc_queue_serialno_offs; 2710 2711 /* Load fields from version 2 */ 2712 if (cfg.wdc_version >= 2) { 2713 p->p_dispatchqueue_label_offset = cfg.wdc_queue_label_offs; 2714 } 2715 2716 break; 2717 } 2718 default: 2719 error = EINVAL; 2720 break; 2721 } 2722 2723 return error; 2724 } 2725 2726 /* 2727 * We have no work to do, park ourselves on the idle list. 2728 * 2729 * Consumes the workqueue lock and does not return. 2730 */ 2731 __attribute__((noreturn, noinline)) 2732 static void 2733 workq_park_and_unlock(proc_t p, struct workqueue *wq, struct uthread *uth, 2734 uint32_t setup_flags) 2735 { 2736 assert(uth == current_uthread()); 2737 assert(uth->uu_kqr_bound == NULL); 2738 workq_push_idle_thread(p, wq, uth, setup_flags); // may not return 2739 2740 workq_thread_reset_cpupercent(NULL, uth); 2741 2742 if ((uth->uu_workq_flags & UT_WORKQ_IDLE_CLEANUP) && 2743 !(uth->uu_workq_flags & UT_WORKQ_DYING)) { 2744 workq_unlock(wq); 2745 2746 /* 2747 * workq_push_idle_thread() will unset `has_stack` 2748 * if it wants us to free the stack before parking. 2749 */ 2750 if (!uth->uu_save.uus_workq_park_data.has_stack) { 2751 pthread_functions->workq_markfree_threadstack(p, uth->uu_thread, 2752 get_task_map(p->task), uth->uu_workq_stackaddr); 2753 } 2754 2755 /* 2756 * When we remove the voucher from the thread, we may lose our importance 2757 * causing us to get preempted, so we do this after putting the thread on 2758 * the idle list. Then, when we get our importance back we'll be able to 2759 * use this thread from e.g. the kevent call out to deliver a boosting 2760 * message. 2761 */ 2762 __assert_only kern_return_t kr; 2763 kr = thread_set_voucher_name(MACH_PORT_NULL); 2764 assert(kr == KERN_SUCCESS); 2765 2766 workq_lock_spin(wq); 2767 uth->uu_workq_flags &= ~UT_WORKQ_IDLE_CLEANUP; 2768 setup_flags &= ~WQ_SETUP_CLEAR_VOUCHER; 2769 } 2770 2771 WQ_TRACE_WQ(TRACE_wq_thread_logical_run | DBG_FUNC_END, wq, 0, 0, 0, 0); 2772 2773 if (uth->uu_workq_flags & UT_WORKQ_RUNNING) { 2774 /* 2775 * While we'd dropped the lock to unset our voucher, someone came 2776 * around and made us runnable. But because we weren't waiting on the 2777 * event their thread_wakeup() was ineffectual. To correct for that, 2778 * we just run the continuation ourselves. 2779 */ 2780 workq_unpark_select_threadreq_or_park_and_unlock(p, wq, uth, setup_flags); 2781 __builtin_unreachable(); 2782 } 2783 2784 if (uth->uu_workq_flags & UT_WORKQ_DYING) { 2785 workq_unpark_for_death_and_unlock(p, wq, uth, 2786 WORKQ_UNPARK_FOR_DEATH_WAS_IDLE, setup_flags); 2787 __builtin_unreachable(); 2788 } 2789 2790 thread_set_pending_block_hint(uth->uu_thread, kThreadWaitParkedWorkQueue); 2791 assert_wait(workq_parked_wait_event(uth), THREAD_INTERRUPTIBLE); 2792 workq_unlock(wq); 2793 thread_block(workq_unpark_continue); 2794 __builtin_unreachable(); 2795 } 2796 2797 static inline bool 2798 workq_may_start_event_mgr_thread(struct workqueue *wq, struct uthread *uth) 2799 { 2800 /* 2801 * There's an event manager request and either: 2802 * - no event manager currently running 2803 * - we are re-using the event manager 2804 */ 2805 return wq->wq_thscheduled_count[_wq_bucket(WORKQ_THREAD_QOS_MANAGER)] == 0 || 2806 (uth && uth->uu_workq_pri.qos_bucket == WORKQ_THREAD_QOS_MANAGER); 2807 } 2808 2809 static uint32_t 2810 workq_constrained_allowance(struct workqueue *wq, thread_qos_t at_qos, 2811 struct uthread *uth, bool may_start_timer) 2812 { 2813 assert(at_qos != WORKQ_THREAD_QOS_MANAGER); 2814 uint32_t count = 0; 2815 2816 uint32_t max_count = wq->wq_constrained_threads_scheduled; 2817 if (uth && (uth->uu_workq_flags & UT_WORKQ_OVERCOMMIT) == 0) { 2818 /* 2819 * don't count the current thread as scheduled 2820 */ 2821 assert(max_count > 0); 2822 max_count--; 2823 } 2824 if (max_count >= wq_max_constrained_threads) { 2825 WQ_TRACE_WQ(TRACE_wq_constrained_admission | DBG_FUNC_NONE, wq, 1, 2826 wq->wq_constrained_threads_scheduled, 2827 wq_max_constrained_threads, 0); 2828 /* 2829 * we need 1 or more constrained threads to return to the kernel before 2830 * we can dispatch additional work 2831 */ 2832 return 0; 2833 } 2834 max_count -= wq_max_constrained_threads; 2835 2836 /* 2837 * Compute a metric for many how many threads are active. We find the 2838 * highest priority request outstanding and then add up the number of 2839 * active threads in that and all higher-priority buckets. We'll also add 2840 * any "busy" threads which are not active but blocked recently enough that 2841 * we can't be sure they've gone idle yet. We'll then compare this metric 2842 * to our max concurrency to decide whether to add a new thread. 2843 */ 2844 2845 uint32_t busycount, thactive_count; 2846 2847 thactive_count = _wq_thactive_aggregate_downto_qos(wq, _wq_thactive(wq), 2848 at_qos, &busycount, NULL); 2849 2850 if (uth && uth->uu_workq_pri.qos_bucket != WORKQ_THREAD_QOS_MANAGER && 2851 at_qos <= uth->uu_workq_pri.qos_bucket) { 2852 /* 2853 * Don't count this thread as currently active, but only if it's not 2854 * a manager thread, as _wq_thactive_aggregate_downto_qos ignores active 2855 * managers. 2856 */ 2857 assert(thactive_count > 0); 2858 thactive_count--; 2859 } 2860 2861 count = wq_max_parallelism[_wq_bucket(at_qos)]; 2862 if (count > thactive_count + busycount) { 2863 count -= thactive_count + busycount; 2864 WQ_TRACE_WQ(TRACE_wq_constrained_admission | DBG_FUNC_NONE, wq, 2, 2865 thactive_count, busycount, 0); 2866 return MIN(count, max_count); 2867 } else { 2868 WQ_TRACE_WQ(TRACE_wq_constrained_admission | DBG_FUNC_NONE, wq, 3, 2869 thactive_count, busycount, 0); 2870 } 2871 2872 if (busycount && may_start_timer) { 2873 /* 2874 * If this is called from the add timer, we won't have another timer 2875 * fire when the thread exits the "busy" state, so rearm the timer. 2876 */ 2877 workq_schedule_delayed_thread_creation(wq, 0); 2878 } 2879 2880 return 0; 2881 } 2882 2883 static bool 2884 workq_threadreq_admissible(struct workqueue *wq, struct uthread *uth, 2885 workq_threadreq_t req) 2886 { 2887 if (req->tr_qos == WORKQ_THREAD_QOS_MANAGER) { 2888 return workq_may_start_event_mgr_thread(wq, uth); 2889 } 2890 if ((req->tr_flags & WORKQ_TR_FLAG_OVERCOMMIT) == 0) { 2891 return workq_constrained_allowance(wq, req->tr_qos, uth, true); 2892 } 2893 return true; 2894 } 2895 2896 static workq_threadreq_t 2897 workq_threadreq_select_for_creator(struct workqueue *wq) 2898 { 2899 workq_threadreq_t req_qos, req_pri, req_tmp, req_mgr; 2900 thread_qos_t qos = THREAD_QOS_UNSPECIFIED; 2901 uint8_t pri = 0; 2902 2903 /* 2904 * Compute the best priority request, and ignore the turnstile for now 2905 */ 2906 2907 req_pri = priority_queue_max(&wq->wq_special_queue, 2908 struct workq_threadreq_s, tr_entry); 2909 if (req_pri) { 2910 pri = (uint8_t)priority_queue_entry_sched_pri(&wq->wq_special_queue, 2911 &req_pri->tr_entry); 2912 } 2913 2914 /* 2915 * Handle the manager thread request. The special queue might yield 2916 * a higher priority, but the manager always beats the QoS world. 2917 */ 2918 2919 req_mgr = wq->wq_event_manager_threadreq; 2920 if (req_mgr && workq_may_start_event_mgr_thread(wq, NULL)) { 2921 uint32_t mgr_pri = wq->wq_event_manager_priority; 2922 2923 if (mgr_pri & _PTHREAD_PRIORITY_SCHED_PRI_FLAG) { 2924 mgr_pri &= _PTHREAD_PRIORITY_SCHED_PRI_MASK; 2925 } else { 2926 mgr_pri = thread_workq_pri_for_qos( 2927 _pthread_priority_thread_qos(mgr_pri)); 2928 } 2929 2930 return mgr_pri >= pri ? req_mgr : req_pri; 2931 } 2932 2933 /* 2934 * Compute the best QoS Request, and check whether it beats the "pri" one 2935 */ 2936 2937 req_qos = priority_queue_max(&wq->wq_overcommit_queue, 2938 struct workq_threadreq_s, tr_entry); 2939 if (req_qos) { 2940 qos = req_qos->tr_qos; 2941 } 2942 2943 req_tmp = priority_queue_max(&wq->wq_constrained_queue, 2944 struct workq_threadreq_s, tr_entry); 2945 2946 if (req_tmp && qos < req_tmp->tr_qos) { 2947 if (pri && pri >= thread_workq_pri_for_qos(req_tmp->tr_qos)) { 2948 return req_pri; 2949 } 2950 2951 if (workq_constrained_allowance(wq, req_tmp->tr_qos, NULL, true)) { 2952 /* 2953 * If the constrained thread request is the best one and passes 2954 * the admission check, pick it. 2955 */ 2956 return req_tmp; 2957 } 2958 } 2959 2960 if (pri && (!qos || pri >= thread_workq_pri_for_qos(qos))) { 2961 return req_pri; 2962 } 2963 2964 if (req_qos) { 2965 return req_qos; 2966 } 2967 2968 /* 2969 * If we had no eligible request but we have a turnstile push, 2970 * it must be a non overcommit thread request that failed 2971 * the admission check. 2972 * 2973 * Just fake a BG thread request so that if the push stops the creator 2974 * priority just drops to 4. 2975 */ 2976 if (turnstile_workq_proprietor_of_max_turnstile(wq->wq_turnstile, NULL)) { 2977 static struct workq_threadreq_s workq_sync_push_fake_req = { 2978 .tr_qos = THREAD_QOS_BACKGROUND, 2979 }; 2980 2981 return &workq_sync_push_fake_req; 2982 } 2983 2984 return NULL; 2985 } 2986 2987 static workq_threadreq_t 2988 workq_threadreq_select(struct workqueue *wq, struct uthread *uth) 2989 { 2990 workq_threadreq_t req_qos, req_pri, req_tmp, req_mgr; 2991 uintptr_t proprietor; 2992 thread_qos_t qos = THREAD_QOS_UNSPECIFIED; 2993 uint8_t pri = 0; 2994 2995 if (uth == wq->wq_creator) { 2996 uth = NULL; 2997 } 2998 2999 /* 3000 * Compute the best priority request (special or turnstile) 3001 */ 3002 3003 pri = (uint8_t)turnstile_workq_proprietor_of_max_turnstile(wq->wq_turnstile, 3004 &proprietor); 3005 if (pri) { 3006 struct kqworkloop *kqwl = (struct kqworkloop *)proprietor; 3007 req_pri = &kqwl->kqwl_request; 3008 if (req_pri->tr_state != WORKQ_TR_STATE_QUEUED) { 3009 panic("Invalid thread request (%p) state %d", 3010 req_pri, req_pri->tr_state); 3011 } 3012 } else { 3013 req_pri = NULL; 3014 } 3015 3016 req_tmp = priority_queue_max(&wq->wq_special_queue, 3017 struct workq_threadreq_s, tr_entry); 3018 if (req_tmp && pri < priority_queue_entry_sched_pri(&wq->wq_special_queue, 3019 &req_tmp->tr_entry)) { 3020 req_pri = req_tmp; 3021 pri = (uint8_t)priority_queue_entry_sched_pri(&wq->wq_special_queue, 3022 &req_tmp->tr_entry); 3023 } 3024 3025 /* 3026 * Handle the manager thread request. The special queue might yield 3027 * a higher priority, but the manager always beats the QoS world. 3028 */ 3029 3030 req_mgr = wq->wq_event_manager_threadreq; 3031 if (req_mgr && workq_may_start_event_mgr_thread(wq, uth)) { 3032 uint32_t mgr_pri = wq->wq_event_manager_priority; 3033 3034 if (mgr_pri & _PTHREAD_PRIORITY_SCHED_PRI_FLAG) { 3035 mgr_pri &= _PTHREAD_PRIORITY_SCHED_PRI_MASK; 3036 } else { 3037 mgr_pri = thread_workq_pri_for_qos( 3038 _pthread_priority_thread_qos(mgr_pri)); 3039 } 3040 3041 return mgr_pri >= pri ? req_mgr : req_pri; 3042 } 3043 3044 /* 3045 * Compute the best QoS Request, and check whether it beats the "pri" one 3046 */ 3047 3048 req_qos = priority_queue_max(&wq->wq_overcommit_queue, 3049 struct workq_threadreq_s, tr_entry); 3050 if (req_qos) { 3051 qos = req_qos->tr_qos; 3052 } 3053 3054 req_tmp = priority_queue_max(&wq->wq_constrained_queue, 3055 struct workq_threadreq_s, tr_entry); 3056 3057 if (req_tmp && qos < req_tmp->tr_qos) { 3058 if (pri && pri >= thread_workq_pri_for_qos(req_tmp->tr_qos)) { 3059 return req_pri; 3060 } 3061 3062 if (workq_constrained_allowance(wq, req_tmp->tr_qos, uth, true)) { 3063 /* 3064 * If the constrained thread request is the best one and passes 3065 * the admission check, pick it. 3066 */ 3067 return req_tmp; 3068 } 3069 } 3070 3071 if (req_pri && (!qos || pri >= thread_workq_pri_for_qos(qos))) { 3072 return req_pri; 3073 } 3074 3075 return req_qos; 3076 } 3077 3078 /* 3079 * The creator is an anonymous thread that is counted as scheduled, 3080 * but otherwise without its scheduler callback set or tracked as active 3081 * that is used to make other threads. 3082 * 3083 * When more requests are added or an existing one is hurried along, 3084 * a creator is elected and setup, or the existing one overridden accordingly. 3085 * 3086 * While this creator is in flight, because no request has been dequeued, 3087 * already running threads have a chance at stealing thread requests avoiding 3088 * useless context switches, and the creator once scheduled may not find any 3089 * work to do and will then just park again. 3090 * 3091 * The creator serves the dual purpose of informing the scheduler of work that 3092 * hasn't be materialized as threads yet, and also as a natural pacing mechanism 3093 * for thread creation. 3094 * 3095 * By being anonymous (and not bound to anything) it means that thread requests 3096 * can be stolen from this creator by threads already on core yielding more 3097 * efficient scheduling and reduced context switches. 3098 */ 3099 static void 3100 workq_schedule_creator(proc_t p, struct workqueue *wq, 3101 workq_kern_threadreq_flags_t flags) 3102 { 3103 workq_threadreq_t req; 3104 struct uthread *uth; 3105 bool needs_wakeup; 3106 3107 workq_lock_held(wq); 3108 assert(p || (flags & WORKQ_THREADREQ_CAN_CREATE_THREADS) == 0); 3109 3110 again: 3111 uth = wq->wq_creator; 3112 3113 if (!wq->wq_reqcount) { 3114 /* 3115 * There is no thread request left. 3116 * 3117 * If there is a creator, leave everything in place, so that it cleans 3118 * up itself in workq_push_idle_thread(). 3119 * 3120 * Else, make sure the turnstile state is reset to no inheritor. 3121 */ 3122 if (uth == NULL) { 3123 workq_turnstile_update_inheritor(wq, TURNSTILE_INHERITOR_NULL, 0); 3124 } 3125 return; 3126 } 3127 3128 req = workq_threadreq_select_for_creator(wq); 3129 if (req == NULL) { 3130 /* 3131 * There isn't a thread request that passes the admission check. 3132 * 3133 * If there is a creator, do not touch anything, the creator will sort 3134 * it out when it runs. 3135 * 3136 * Else, set the inheritor to "WORKQ" so that the turnstile propagation 3137 * code calls us if anything changes. 3138 */ 3139 if (uth == NULL) { 3140 workq_turnstile_update_inheritor(wq, wq, TURNSTILE_INHERITOR_WORKQ); 3141 } 3142 return; 3143 } 3144 3145 if (uth) { 3146 /* 3147 * We need to maybe override the creator we already have 3148 */ 3149 if (workq_thread_needs_priority_change(req, uth)) { 3150 WQ_TRACE_WQ(TRACE_wq_creator_select | DBG_FUNC_NONE, 3151 wq, 1, thread_tid(uth->uu_thread), req->tr_qos, 0); 3152 workq_thread_reset_pri(wq, uth, req, /*unpark*/ true); 3153 } 3154 assert(wq->wq_inheritor == uth->uu_thread); 3155 } else if (wq->wq_thidlecount) { 3156 /* 3157 * We need to unpark a creator thread 3158 */ 3159 wq->wq_creator = uth = workq_pop_idle_thread(wq, UT_WORKQ_OVERCOMMIT, 3160 &needs_wakeup); 3161 /* Always reset the priorities on the newly chosen creator */ 3162 workq_thread_reset_pri(wq, uth, req, /*unpark*/ true); 3163 workq_turnstile_update_inheritor(wq, uth->uu_thread, 3164 TURNSTILE_INHERITOR_THREAD); 3165 WQ_TRACE_WQ(TRACE_wq_creator_select | DBG_FUNC_NONE, 3166 wq, 2, thread_tid(uth->uu_thread), req->tr_qos, 0); 3167 uth->uu_save.uus_workq_park_data.fulfilled_snapshot = wq->wq_fulfilled; 3168 uth->uu_save.uus_workq_park_data.yields = 0; 3169 if (needs_wakeup) { 3170 workq_thread_wakeup(uth); 3171 } 3172 } else { 3173 /* 3174 * We need to allocate a thread... 3175 */ 3176 if (__improbable(wq->wq_nthreads >= wq_max_threads)) { 3177 /* out of threads, just go away */ 3178 flags = WORKQ_THREADREQ_NONE; 3179 } else if (flags & WORKQ_THREADREQ_SET_AST_ON_FAILURE) { 3180 act_set_astkevent(current_thread(), AST_KEVENT_REDRIVE_THREADREQ); 3181 } else if (!(flags & WORKQ_THREADREQ_CAN_CREATE_THREADS)) { 3182 /* This can drop the workqueue lock, and take it again */ 3183 workq_schedule_immediate_thread_creation(wq); 3184 } else if (workq_add_new_idle_thread(p, wq)) { 3185 goto again; 3186 } else { 3187 workq_schedule_delayed_thread_creation(wq, 0); 3188 } 3189 3190 /* 3191 * If the current thread is the inheritor: 3192 * 3193 * If we set the AST, then the thread will stay the inheritor until 3194 * either the AST calls workq_kern_threadreq_redrive(), or it parks 3195 * and calls workq_push_idle_thread(). 3196 * 3197 * Else, the responsibility of the thread creation is with a thread-call 3198 * and we need to clear the inheritor. 3199 */ 3200 if ((flags & WORKQ_THREADREQ_SET_AST_ON_FAILURE) == 0 && 3201 wq->wq_inheritor == current_thread()) { 3202 workq_turnstile_update_inheritor(wq, TURNSTILE_INHERITOR_NULL, 0); 3203 } 3204 } 3205 } 3206 3207 /** 3208 * Same as workq_unpark_select_threadreq_or_park_and_unlock, 3209 * but do not allow early binds. 3210 * 3211 * Called with the base pri frozen, will unfreeze it. 3212 */ 3213 __attribute__((noreturn, noinline)) 3214 static void 3215 workq_select_threadreq_or_park_and_unlock(proc_t p, struct workqueue *wq, 3216 struct uthread *uth, uint32_t setup_flags) 3217 { 3218 workq_threadreq_t req = NULL; 3219 bool is_creator = (wq->wq_creator == uth); 3220 bool schedule_creator = false; 3221 3222 if (__improbable(_wq_exiting(wq))) { 3223 WQ_TRACE_WQ(TRACE_wq_select_threadreq | DBG_FUNC_NONE, wq, 0, 0, 0, 0); 3224 goto park; 3225 } 3226 3227 if (wq->wq_reqcount == 0) { 3228 WQ_TRACE_WQ(TRACE_wq_select_threadreq | DBG_FUNC_NONE, wq, 1, 0, 0, 0); 3229 goto park; 3230 } 3231 3232 req = workq_threadreq_select(wq, uth); 3233 if (__improbable(req == NULL)) { 3234 WQ_TRACE_WQ(TRACE_wq_select_threadreq | DBG_FUNC_NONE, wq, 2, 0, 0, 0); 3235 goto park; 3236 } 3237 3238 uint8_t tr_flags = req->tr_flags; 3239 struct turnstile *req_ts = kqueue_threadreq_get_turnstile(req); 3240 3241 /* 3242 * Attempt to setup ourselves as the new thing to run, moving all priority 3243 * pushes to ourselves. 3244 * 3245 * If the current thread is the creator, then the fact that we are presently 3246 * running is proof that we'll do something useful, so keep going. 3247 * 3248 * For other cases, peek at the AST to know whether the scheduler wants 3249 * to preempt us, if yes, park instead, and move the thread request 3250 * turnstile back to the workqueue. 3251 */ 3252 if (req_ts) { 3253 workq_perform_turnstile_operation_locked(wq, ^{ 3254 turnstile_update_inheritor(req_ts, uth->uu_thread, 3255 TURNSTILE_IMMEDIATE_UPDATE | TURNSTILE_INHERITOR_THREAD); 3256 turnstile_update_inheritor_complete(req_ts, 3257 TURNSTILE_INTERLOCK_HELD); 3258 }); 3259 } 3260 3261 if (is_creator) { 3262 WQ_TRACE_WQ(TRACE_wq_creator_select, wq, 4, 0, 3263 uth->uu_save.uus_workq_park_data.yields, 0); 3264 wq->wq_creator = NULL; 3265 _wq_thactive_inc(wq, req->tr_qos); 3266 wq->wq_thscheduled_count[_wq_bucket(req->tr_qos)]++; 3267 } else if (uth->uu_workq_pri.qos_bucket != req->tr_qos) { 3268 _wq_thactive_move(wq, uth->uu_workq_pri.qos_bucket, req->tr_qos); 3269 } 3270 3271 workq_thread_reset_pri(wq, uth, req, /*unpark*/ true); 3272 3273 thread_unfreeze_base_pri(uth->uu_thread); 3274 #if 0 // <rdar://problem/55259863> to turn this back on 3275 if (__improbable(thread_unfreeze_base_pri(uth->uu_thread) && !is_creator)) { 3276 if (req_ts) { 3277 workq_perform_turnstile_operation_locked(wq, ^{ 3278 turnstile_update_inheritor(req_ts, wq->wq_turnstile, 3279 TURNSTILE_IMMEDIATE_UPDATE | TURNSTILE_INHERITOR_TURNSTILE); 3280 turnstile_update_inheritor_complete(req_ts, 3281 TURNSTILE_INTERLOCK_HELD); 3282 }); 3283 } 3284 WQ_TRACE_WQ(TRACE_wq_select_threadreq | DBG_FUNC_NONE, wq, 3, 0, 0, 0); 3285 goto park_thawed; 3286 } 3287 #endif 3288 3289 /* 3290 * We passed all checks, dequeue the request, bind to it, and set it up 3291 * to return to user. 3292 */ 3293 WQ_TRACE_WQ(TRACE_wq_thread_logical_run | DBG_FUNC_START, wq, 3294 workq_trace_req_id(req), 0, 0, 0); 3295 wq->wq_fulfilled++; 3296 schedule_creator = workq_threadreq_dequeue(wq, req); 3297 3298 if (tr_flags & (WORKQ_TR_FLAG_KEVENT | WORKQ_TR_FLAG_WORKLOOP)) { 3299 kqueue_threadreq_bind_prepost(p, req, uth); 3300 req = NULL; 3301 } else if (req->tr_count > 0) { 3302 req = NULL; 3303 } 3304 3305 workq_thread_reset_cpupercent(req, uth); 3306 if (uth->uu_workq_flags & UT_WORKQ_NEW) { 3307 uth->uu_workq_flags ^= UT_WORKQ_NEW; 3308 setup_flags |= WQ_SETUP_FIRST_USE; 3309 } 3310 if (tr_flags & WORKQ_TR_FLAG_OVERCOMMIT) { 3311 if ((uth->uu_workq_flags & UT_WORKQ_OVERCOMMIT) == 0) { 3312 uth->uu_workq_flags |= UT_WORKQ_OVERCOMMIT; 3313 wq->wq_constrained_threads_scheduled--; 3314 } 3315 } else { 3316 if ((uth->uu_workq_flags & UT_WORKQ_OVERCOMMIT) != 0) { 3317 uth->uu_workq_flags &= ~UT_WORKQ_OVERCOMMIT; 3318 wq->wq_constrained_threads_scheduled++; 3319 } 3320 } 3321 3322 if (is_creator || schedule_creator) { 3323 /* This can drop the workqueue lock, and take it again */ 3324 workq_schedule_creator(p, wq, WORKQ_THREADREQ_CAN_CREATE_THREADS); 3325 } 3326 3327 workq_unlock(wq); 3328 3329 if (req) { 3330 zfree(workq_zone_threadreq, req); 3331 } 3332 3333 /* 3334 * Run Thread, Run! 3335 */ 3336 uint32_t upcall_flags = WQ_FLAG_THREAD_NEWSPI; 3337 if (uth->uu_workq_pri.qos_bucket == WORKQ_THREAD_QOS_MANAGER) { 3338 upcall_flags |= WQ_FLAG_THREAD_EVENT_MANAGER; 3339 } else if (tr_flags & WORKQ_TR_FLAG_OVERCOMMIT) { 3340 upcall_flags |= WQ_FLAG_THREAD_OVERCOMMIT; 3341 } 3342 if (tr_flags & WORKQ_TR_FLAG_KEVENT) { 3343 upcall_flags |= WQ_FLAG_THREAD_KEVENT; 3344 } 3345 if (tr_flags & WORKQ_TR_FLAG_WORKLOOP) { 3346 upcall_flags |= WQ_FLAG_THREAD_WORKLOOP | WQ_FLAG_THREAD_KEVENT; 3347 } 3348 uth->uu_save.uus_workq_park_data.upcall_flags = upcall_flags; 3349 3350 if (tr_flags & (WORKQ_TR_FLAG_KEVENT | WORKQ_TR_FLAG_WORKLOOP)) { 3351 kqueue_threadreq_bind_commit(p, uth->uu_thread); 3352 } 3353 workq_setup_and_run(p, uth, setup_flags); 3354 __builtin_unreachable(); 3355 3356 park: 3357 thread_unfreeze_base_pri(uth->uu_thread); 3358 #if 0 // <rdar://problem/55259863> 3359 park_thawed: 3360 #endif 3361 workq_park_and_unlock(p, wq, uth, setup_flags); 3362 } 3363 3364 /** 3365 * Runs a thread request on a thread 3366 * 3367 * - if thread is THREAD_NULL, will find a thread and run the request there. 3368 * Otherwise, the thread must be the current thread. 3369 * 3370 * - if req is NULL, will find the highest priority request and run that. If 3371 * it is not NULL, it must be a threadreq object in state NEW. If it can not 3372 * be run immediately, it will be enqueued and moved to state QUEUED. 3373 * 3374 * Either way, the thread request object serviced will be moved to state 3375 * BINDING and attached to the uthread. 3376 * 3377 * Should be called with the workqueue lock held. Will drop it. 3378 * Should be called with the base pri not frozen. 3379 */ 3380 __attribute__((noreturn, noinline)) 3381 static void 3382 workq_unpark_select_threadreq_or_park_and_unlock(proc_t p, struct workqueue *wq, 3383 struct uthread *uth, uint32_t setup_flags) 3384 { 3385 if (uth->uu_workq_flags & UT_WORKQ_EARLY_BOUND) { 3386 if (uth->uu_workq_flags & UT_WORKQ_NEW) { 3387 setup_flags |= WQ_SETUP_FIRST_USE; 3388 } 3389 uth->uu_workq_flags &= ~(UT_WORKQ_NEW | UT_WORKQ_EARLY_BOUND); 3390 /* 3391 * This pointer is possibly freed and only used for tracing purposes. 3392 */ 3393 workq_threadreq_t req = uth->uu_save.uus_workq_park_data.thread_request; 3394 workq_unlock(wq); 3395 WQ_TRACE_WQ(TRACE_wq_thread_logical_run | DBG_FUNC_START, wq, 3396 VM_KERNEL_ADDRHIDE(req), 0, 0, 0); 3397 (void)req; 3398 workq_setup_and_run(p, uth, setup_flags); 3399 __builtin_unreachable(); 3400 } 3401 3402 thread_freeze_base_pri(uth->uu_thread); 3403 workq_select_threadreq_or_park_and_unlock(p, wq, uth, setup_flags); 3404 } 3405 3406 static bool 3407 workq_creator_should_yield(struct workqueue *wq, struct uthread *uth) 3408 { 3409 thread_qos_t qos = workq_pri_override(uth->uu_workq_pri); 3410 3411 if (qos >= THREAD_QOS_USER_INTERACTIVE) { 3412 return false; 3413 } 3414 3415 uint32_t snapshot = uth->uu_save.uus_workq_park_data.fulfilled_snapshot; 3416 if (wq->wq_fulfilled == snapshot) { 3417 return false; 3418 } 3419 3420 uint32_t cnt = 0, conc = wq_max_parallelism[_wq_bucket(qos)]; 3421 if (wq->wq_fulfilled - snapshot > conc) { 3422 /* we fulfilled more than NCPU requests since being dispatched */ 3423 WQ_TRACE_WQ(TRACE_wq_creator_yield, wq, 1, 3424 wq->wq_fulfilled, snapshot, 0); 3425 return true; 3426 } 3427 3428 for (int i = _wq_bucket(qos); i < WORKQ_NUM_QOS_BUCKETS; i++) { 3429 cnt += wq->wq_thscheduled_count[i]; 3430 } 3431 if (conc <= cnt) { 3432 /* We fulfilled requests and have more than NCPU scheduled threads */ 3433 WQ_TRACE_WQ(TRACE_wq_creator_yield, wq, 2, 3434 wq->wq_fulfilled, snapshot, 0); 3435 return true; 3436 } 3437 3438 return false; 3439 } 3440 3441 /** 3442 * parked thread wakes up 3443 */ 3444 __attribute__((noreturn, noinline)) 3445 static void 3446 workq_unpark_continue(void *parameter __unused, wait_result_t wr __unused) 3447 { 3448 thread_t th = current_thread(); 3449 struct uthread *uth = get_bsdthread_info(th); 3450 proc_t p = current_proc(); 3451 struct workqueue *wq = proc_get_wqptr_fast(p); 3452 3453 workq_lock_spin(wq); 3454 3455 if (wq->wq_creator == uth && workq_creator_should_yield(wq, uth)) { 3456 /* 3457 * If the number of threads we have out are able to keep up with the 3458 * demand, then we should avoid sending this creator thread to 3459 * userspace. 3460 */ 3461 uth->uu_save.uus_workq_park_data.fulfilled_snapshot = wq->wq_fulfilled; 3462 uth->uu_save.uus_workq_park_data.yields++; 3463 workq_unlock(wq); 3464 thread_yield_with_continuation(workq_unpark_continue, NULL); 3465 __builtin_unreachable(); 3466 } 3467 3468 if (__probable(uth->uu_workq_flags & UT_WORKQ_RUNNING)) { 3469 workq_unpark_select_threadreq_or_park_and_unlock(p, wq, uth, WQ_SETUP_NONE); 3470 __builtin_unreachable(); 3471 } 3472 3473 if (__probable(wr == THREAD_AWAKENED)) { 3474 /* 3475 * We were set running, but for the purposes of dying. 3476 */ 3477 assert(uth->uu_workq_flags & UT_WORKQ_DYING); 3478 assert((uth->uu_workq_flags & UT_WORKQ_NEW) == 0); 3479 } else { 3480 /* 3481 * workaround for <rdar://problem/38647347>, 3482 * in case we do hit userspace, make sure calling 3483 * workq_thread_terminate() does the right thing here, 3484 * and if we never call it, that workq_exit() will too because it sees 3485 * this thread on the runlist. 3486 */ 3487 assert(wr == THREAD_INTERRUPTED); 3488 wq->wq_thdying_count++; 3489 uth->uu_workq_flags |= UT_WORKQ_DYING; 3490 } 3491 3492 workq_unpark_for_death_and_unlock(p, wq, uth, 3493 WORKQ_UNPARK_FOR_DEATH_WAS_IDLE, WQ_SETUP_NONE); 3494 __builtin_unreachable(); 3495 } 3496 3497 __attribute__((noreturn, noinline)) 3498 static void 3499 workq_setup_and_run(proc_t p, struct uthread *uth, int setup_flags) 3500 { 3501 thread_t th = uth->uu_thread; 3502 vm_map_t vmap = get_task_map(p->task); 3503 3504 if (setup_flags & WQ_SETUP_CLEAR_VOUCHER) { 3505 /* 3506 * For preemption reasons, we want to reset the voucher as late as 3507 * possible, so we do it in two places: 3508 * - Just before parking (i.e. in workq_park_and_unlock()) 3509 * - Prior to doing the setup for the next workitem (i.e. here) 3510 * 3511 * Those two places are sufficient to ensure we always reset it before 3512 * it goes back out to user space, but be careful to not break that 3513 * guarantee. 3514 */ 3515 __assert_only kern_return_t kr; 3516 kr = thread_set_voucher_name(MACH_PORT_NULL); 3517 assert(kr == KERN_SUCCESS); 3518 } 3519 3520 uint32_t upcall_flags = uth->uu_save.uus_workq_park_data.upcall_flags; 3521 if (!(setup_flags & WQ_SETUP_FIRST_USE)) { 3522 upcall_flags |= WQ_FLAG_THREAD_REUSE; 3523 } 3524 3525 if (uth->uu_workq_flags & UT_WORKQ_OUTSIDE_QOS) { 3526 /* 3527 * For threads that have an outside-of-QoS thread priority, indicate 3528 * to userspace that setting QoS should only affect the TSD and not 3529 * change QOS in the kernel. 3530 */ 3531 upcall_flags |= WQ_FLAG_THREAD_OUTSIDEQOS; 3532 } else { 3533 /* 3534 * Put the QoS class value into the lower bits of the reuse_thread 3535 * register, this is where the thread priority used to be stored 3536 * anyway. 3537 */ 3538 upcall_flags |= uth->uu_save.uus_workq_park_data.qos | 3539 WQ_FLAG_THREAD_PRIO_QOS; 3540 } 3541 3542 if (uth->uu_workq_thport == MACH_PORT_NULL) { 3543 /* convert_thread_to_port() consumes a reference */ 3544 thread_reference(th); 3545 ipc_port_t port = convert_thread_to_port(th); 3546 uth->uu_workq_thport = ipc_port_copyout_send(port, get_task_ipcspace(p->task)); 3547 } 3548 3549 /* 3550 * Call out to pthread, this sets up the thread, pulls in kevent structs 3551 * onto the stack, sets up the thread state and then returns to userspace. 3552 */ 3553 WQ_TRACE_WQ(TRACE_wq_runthread | DBG_FUNC_START, 3554 proc_get_wqptr_fast(p), 0, 0, 0, 0); 3555 thread_sched_call(th, workq_sched_callback); 3556 pthread_functions->workq_setup_thread(p, th, vmap, uth->uu_workq_stackaddr, 3557 uth->uu_workq_thport, 0, setup_flags, upcall_flags); 3558 3559 __builtin_unreachable(); 3560 } 3561 3562 #pragma mark misc 3563 3564 int 3565 fill_procworkqueue(proc_t p, struct proc_workqueueinfo * pwqinfo) 3566 { 3567 struct workqueue *wq = proc_get_wqptr(p); 3568 int error = 0; 3569 int activecount; 3570 3571 if (wq == NULL) { 3572 return EINVAL; 3573 } 3574 3575 /* 3576 * This is sometimes called from interrupt context by the kperf sampler. 3577 * In that case, it's not safe to spin trying to take the lock since we 3578 * might already hold it. So, we just try-lock it and error out if it's 3579 * already held. Since this is just a debugging aid, and all our callers 3580 * are able to handle an error, that's fine. 3581 */ 3582 bool locked = workq_lock_try(wq); 3583 if (!locked) { 3584 return EBUSY; 3585 } 3586 3587 wq_thactive_t act = _wq_thactive(wq); 3588 activecount = _wq_thactive_aggregate_downto_qos(wq, act, 3589 WORKQ_THREAD_QOS_MIN, NULL, NULL); 3590 if (act & _wq_thactive_offset_for_qos(WORKQ_THREAD_QOS_MANAGER)) { 3591 activecount++; 3592 } 3593 pwqinfo->pwq_nthreads = wq->wq_nthreads; 3594 pwqinfo->pwq_runthreads = activecount; 3595 pwqinfo->pwq_blockedthreads = wq->wq_threads_scheduled - activecount; 3596 pwqinfo->pwq_state = 0; 3597 3598 if (wq->wq_constrained_threads_scheduled >= wq_max_constrained_threads) { 3599 pwqinfo->pwq_state |= WQ_EXCEEDED_CONSTRAINED_THREAD_LIMIT; 3600 } 3601 3602 if (wq->wq_nthreads >= wq_max_threads) { 3603 pwqinfo->pwq_state |= WQ_EXCEEDED_TOTAL_THREAD_LIMIT; 3604 } 3605 3606 workq_unlock(wq); 3607 return error; 3608 } 3609 3610 boolean_t 3611 workqueue_get_pwq_exceeded(void *v, boolean_t *exceeded_total, 3612 boolean_t *exceeded_constrained) 3613 { 3614 proc_t p = v; 3615 struct proc_workqueueinfo pwqinfo; 3616 int err; 3617 3618 assert(p != NULL); 3619 assert(exceeded_total != NULL); 3620 assert(exceeded_constrained != NULL); 3621 3622 err = fill_procworkqueue(p, &pwqinfo); 3623 if (err) { 3624 return FALSE; 3625 } 3626 if (!(pwqinfo.pwq_state & WQ_FLAGS_AVAILABLE)) { 3627 return FALSE; 3628 } 3629 3630 *exceeded_total = (pwqinfo.pwq_state & WQ_EXCEEDED_TOTAL_THREAD_LIMIT); 3631 *exceeded_constrained = (pwqinfo.pwq_state & WQ_EXCEEDED_CONSTRAINED_THREAD_LIMIT); 3632 3633 return TRUE; 3634 } 3635 3636 uint32_t 3637 workqueue_get_pwq_state_kdp(void * v) 3638 { 3639 static_assert((WQ_EXCEEDED_CONSTRAINED_THREAD_LIMIT << 17) == 3640 kTaskWqExceededConstrainedThreadLimit); 3641 static_assert((WQ_EXCEEDED_TOTAL_THREAD_LIMIT << 17) == 3642 kTaskWqExceededTotalThreadLimit); 3643 static_assert((WQ_FLAGS_AVAILABLE << 17) == kTaskWqFlagsAvailable); 3644 static_assert((WQ_FLAGS_AVAILABLE | WQ_EXCEEDED_TOTAL_THREAD_LIMIT | 3645 WQ_EXCEEDED_CONSTRAINED_THREAD_LIMIT) == 0x7); 3646 3647 if (v == NULL) { 3648 return 0; 3649 } 3650 3651 proc_t p = v; 3652 struct workqueue *wq = proc_get_wqptr(p); 3653 3654 if (wq == NULL || workq_lock_spin_is_acquired_kdp(wq)) { 3655 return 0; 3656 } 3657 3658 uint32_t pwq_state = WQ_FLAGS_AVAILABLE; 3659 3660 if (wq->wq_constrained_threads_scheduled >= wq_max_constrained_threads) { 3661 pwq_state |= WQ_EXCEEDED_CONSTRAINED_THREAD_LIMIT; 3662 } 3663 3664 if (wq->wq_nthreads >= wq_max_threads) { 3665 pwq_state |= WQ_EXCEEDED_TOTAL_THREAD_LIMIT; 3666 } 3667 3668 return pwq_state; 3669 } 3670 3671 void 3672 workq_init(void) 3673 { 3674 clock_interval_to_absolutetime_interval(wq_stalled_window.usecs, 3675 NSEC_PER_USEC, &wq_stalled_window.abstime); 3676 clock_interval_to_absolutetime_interval(wq_reduce_pool_window.usecs, 3677 NSEC_PER_USEC, &wq_reduce_pool_window.abstime); 3678 clock_interval_to_absolutetime_interval(wq_max_timer_interval.usecs, 3679 NSEC_PER_USEC, &wq_max_timer_interval.abstime); 3680 3681 thread_deallocate_daemon_register_queue(&workq_deallocate_queue, 3682 workq_deallocate_queue_invoke); 3683 } 3684